Compare commits

...

7 Commits

6 changed files with 935 additions and 83 deletions

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2024 Simple-DHCP-Server contributors
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -18,11 +18,20 @@ Benutze dieses Tool nur in isolierten Testumgebungen oder auf einem dedizierten
- liest IP & Subnetz des gewählten Interface automatisch aus
- vergibt Adressen aus einem dynamischen Pool (ab Start-IP)
- **Neu:** Eingabefelder für Primary/Secondary DNS (DHCP Option 6)
- Lease-Time optional per Extended Option 51 oder pro Reservierung
- **Config-Profiles**: Interface/IP/DNS + Reservierungen/Extended Options als JSON speichern & laden
- **Statische Reservierungen**: MAC → IP Tabelle in der GUI, wird vom Server beachtet
- **Leases-Tab**: Restlaufzeit sichtbar, Direkt-Button „Add“ um Leases in Reservierungen zu übernehmen
- **Force Renew**: DHCPFORCERENEW an alle Hosts im Subnetz auslösen
- **Optionale PXE-Infos**: DHCP Option 66 (TFTP-Server) und 67 (Bootfile), wenn gesetzt
- **DHCP-Optionen:**
- Option 1: Subnetzmaske
- Option 3: Router/Gateway (setzt automatisch auf Server-IP)
- Option 6: DNS-Server (aus GUI)
- Option 51: Lease-Time
- Option 42: NTP-Server (kommagetrennte IPs)
- Option 15: DNS-Suffix
- Option 51: Lease Time (Pflicht; Default 3600s, override per Extended Options/Reservierung)
- Option 66/67: TFTP-Server & Bootfile
- **Log-Panel & Statusleiste**: zeigt DHCP-Events in Echtzeit
- **Echte DHCP-Pakete**: DISCOVER → OFFER, REQUEST → ACK, RELEASE → Lease-Freigabe
@@ -134,9 +143,10 @@ Leases werden nur im Speicher gehalten (kein persistentes Lease-File).
## 🧭 Roadmap
* [ ] Lease-Persistenz (JSON oder SQLite)
* [ ] Statische MAC-Reservierungen
* [x] Statische MAC-Reservierungen
* [x] Mehr Optionen (NTP, PXE, Extended Options)
* [ ] Konfigurierbarer Pool (Start/End-Adresse)
* [ ] Mehr Optionen (NTP, PXE, benutzerdefiniert)
* [ ] Persistente Config-/Lease-Imports für PXE-Labs
---

View File

@@ -1,13 +1,16 @@
import os
import select
import socket
import struct
import threading
import time
import select
from collections import deque
from typing import Dict, List, Optional, Tuple
import ipaddress
from utils import normalize_mac
MAGIC_COOKIE = b"\x63\x82\x53\x63"
@@ -20,8 +23,9 @@ DHCPACK = 5
DHCPNAK = 6
DHCPRELEASE = 7
DHCPINFORM = 8
DHCPFORCERENEW = 9
LEASE_SECONDS = 3600 # 1 hour leases
LEASE_SECONDS = 3600 # default 1 hour leases
def mac_bytes_to_str(b: bytes) -> str:
@@ -94,18 +98,32 @@ class DHCPServer:
# NEW: configurable DNS servers for DHCP Option 6
self.dns_servers: List[str] = []
self.reservations: Dict[str, Dict[str, Optional[int]]] = {} # normalized MAC -> {"ip": str, "lease": Optional[int]}
self.extended_options: Dict[str, str] = {}
self._network: Optional[ipaddress.IPv4Network] = None
self.default_lease_seconds: int = LEASE_SECONDS
# -------------------- Public API --------------------
def start(self, interface: str, start_ip: str, mask: str, dns_servers: Optional[List[str]] = None) -> None:
def start(
self,
interface: str,
start_ip: str,
mask: str,
dns_servers: Optional[List[str]] = None,
reservations: Optional[List[Dict[str, str]]] = None,
extended_options: Optional[Dict[str, Dict[str, str]]] = None,
) -> None:
if self.is_running():
raise RuntimeError("Server is already running.")
self.interface = interface
self.ip_start = start_ip
self.subnet_mask = mask
self._network = ipaddress.IPv4Network((start_ip, mask), strict=False)
self.default_lease_seconds = LEASE_SECONDS
# build pool based on interface network
self.server_ip = start_ip # in this lab version we use the provided IP as "server identifier"
self._pool = self._build_pool(start_ip, mask)
self._pool = self._build_pool(self._network, start_ip)
# configure DNS (filter invalid entries); default to [server_ip, 8.8.8.8]
self.dns_servers = []
@@ -119,6 +137,25 @@ class DHCPServer:
if not self.dns_servers and self.server_ip:
self.dns_servers = [self.server_ip, "8.8.8.8"]
# static MAC reservations
self.reservations = self._prepare_reservations(reservations or [])
# extended DHCP options (numeric keyed)
self.extended_options = {}
if extended_options:
for code, data in extended_options.items():
val = data.get("value") if isinstance(data, dict) else data
if val:
self.extended_options[str(code)] = str(val)
# If option 51 is provided, use it as default lease
opt51 = self.extended_options.get("51")
if opt51:
try:
lease_int = int(opt51)
if lease_int > 0:
self.default_lease_seconds = lease_int
except Exception:
self._log_line("Extended option 51 ignored (invalid integer).")
self._stop_evt.clear()
self._thread = threading.Thread(target=self._run_loop, name="DHCPWorker", daemon=True)
self._thread.start()
@@ -150,6 +187,10 @@ class DHCPServer:
with self._lock:
return {mac: ip for mac, (ip, _) in self._leases.items()}
def get_leases_with_expiry(self) -> Dict[str, Tuple[str, float]]:
with self._lock:
return {mac: (ip, expiry) for mac, (ip, expiry) in self._leases.items()}
def get_status(self) -> str:
return self._status
@@ -160,6 +201,38 @@ class DHCPServer:
items.append(self._log.popleft())
return items
def force_renew_all(self) -> int:
"""
Send DHCPFORCERENEW across the subnet (all hosts).
Uses known MAC from leases when available; otherwise sends with empty CHADDR.
Returns count of attempted sends.
"""
if not self.is_running() or not self._sock:
raise RuntimeError("Server not running.")
if not self._network:
raise RuntimeError("Subnet unbekannt.")
with self._lock:
lease_items = list(self._leases.items()) # (mac, (ip, expiry))
lease_by_ip = {ip: mac for mac, (ip, _) in lease_items}
sent = 0
for host in self._network.hosts():
ip = str(host)
if ip == self.server_ip:
continue
mac = lease_by_ip.get(ip)
pkt = self._build_forcerenew_packet(mac, ip)
if not pkt:
continue
try:
self._sock.sendto(pkt, (ip, 68))
sent += 1
except Exception as e:
self._log_line(f"FORCERENEW to {mac}/{ip} failed: {e}")
self._log_line(f"FORCERENEW sent to {sent} client(s).")
return sent
# -------------------- Internals --------------------
def _run_loop(self) -> None:
try:
@@ -238,10 +311,9 @@ class DHCPServer:
if mac in self._active_clients:
self._active_clients.remove(mac)
def _build_pool(self, start_ip: str, mask: str) -> List[str]:
def _build_pool(self, network: ipaddress.IPv4Network, start_ip: str) -> List[str]:
# Build pool from the network containing start_ip with provided mask
net = ipaddress.IPv4Network((start_ip, mask), strict=False)
all_hosts = list(net.hosts())
all_hosts = list(network.hosts())
# Begin at/after start_ip; exclude start_ip (we use it as server/gateway)
try:
start_idx = all_hosts.index(ipaddress.IPv4Address(start_ip))
@@ -256,6 +328,61 @@ class DHCPServer:
pool.remove(start_ip)
return pool
def _prepare_reservations(self, reservations: List[Dict[str, str]]) -> Dict[str, Dict[str, Optional[int]]]:
prepared: Dict[str, Dict[str, Optional[int]]] = {}
for res in reservations:
mac_raw = res.get("mac", "")
ip = res.get("ip", "")
lease = res.get("lease_time")
try:
lease_int = int(lease) if lease not in (None, "") else None
if lease_int is not None and lease_int <= 0:
lease_int = None
except Exception:
lease_int = None
try:
mac_norm = normalize_mac(mac_raw)
except Exception:
self._log_line(f"Ignoring reservation '{mac_raw}': invalid MAC")
continue
try:
ip_norm = str(ipaddress.IPv4Address(ip))
except Exception:
self._log_line(f"Ignoring reservation {mac_norm}: invalid IP '{ip}'")
continue
if ip_norm == self.server_ip:
self._log_line(f"Ignoring reservation {mac_norm}: IP {ip_norm} is server IP")
continue
if self._network and ipaddress.IPv4Address(ip_norm) not in self._network:
self._log_line(f"Ignoring reservation {mac_norm}: IP {ip_norm} not in subnet")
continue
prepared[mac_norm] = {"ip": ip_norm, "lease": lease_int}
if ip_norm in self._pool:
self._pool.remove(ip_norm)
return prepared
def _reserved_ip_for(self, mac: str) -> Optional[str]:
entry = self.reservations.get(mac.upper())
return entry["ip"] if entry else None
def _reserved_lease_for(self, mac: str) -> Optional[int]:
entry = self.reservations.get(mac.upper())
return entry.get("lease") if entry else None
def _is_ip_available(self, ip: Optional[str], mac: str) -> bool:
if not ip:
return False
# Reserved for different MAC?
for res_mac, res in self.reservations.items():
res_ip = res.get("ip")
if res_ip == ip and res_mac != mac:
return False
with self._lock:
for lease_mac, (lease_ip, _) in self._leases.items():
if lease_ip == ip and lease_mac != mac:
return False
return True
def _next_free_ip(self) -> Optional[str]:
with self._lock:
used = {ip for ip, _ in self._leases.values()}
@@ -309,37 +436,62 @@ class DHCPServer:
pass
def _dhcp_offer(self, mac: str, xid: int, giaddr: bytes) -> None:
reserved_ip = self._reserved_ip_for(mac)
lease_seconds = self._reserved_lease_for(mac) or self.default_lease_seconds
if reserved_ip and self._is_ip_available(reserved_ip, mac):
ip = reserved_ip
else:
ip = self._next_free_ip()
if not ip:
self._log_line("No free IP to offer.")
return
if reserved_ip:
self._log_line(f"OFFER (reserved) {ip} to {mac}")
else:
self._log_line(f"OFFER {ip} to {mac}")
pkt = self._build_reply_packet(
msg_type=DHCPOFFER, mac=mac, xid=xid, yiaddr=ip, giaddr=giaddr
msg_type=DHCPOFFER, mac=mac, xid=xid, yiaddr=ip, giaddr=giaddr, lease_seconds=lease_seconds
)
self._send_broadcast(pkt)
def _dhcp_ack(self, mac: str, xid: int, giaddr: bytes, requested_ip: Optional[str]) -> None:
# If requested IP is available or already leased to this MAC, use it; else choose next free
reserved_ip = self._reserved_ip_for(mac)
lease_seconds = self._reserved_lease_for(mac) or self.default_lease_seconds
if reserved_ip and not self._is_ip_available(reserved_ip, mac):
reserved_ip = None
with self._lock:
current = self._leases.get(mac, (None, 0))[0]
ip = requested_ip or current or self._next_free_ip()
ip: Optional[str] = None
if reserved_ip:
ip = reserved_ip
elif requested_ip and self._is_ip_available(requested_ip, mac):
ip = requested_ip
elif current and self._is_ip_available(current, mac):
ip = current
else:
ip = self._next_free_ip()
if not ip:
self._log_line("No IP available for ACK.")
return
# Register/renew lease
expiry = time.time() + LEASE_SECONDS
expiry = time.time() + lease_seconds
with self._lock:
self._leases[mac] = (ip, expiry)
if mac not in self._active_clients:
self._active_clients.append(mac)
if reserved_ip:
self._log_line(f"ACK (reserved) {ip} to {mac}")
else:
self._log_line(f"ACK {ip} to {mac}")
pkt = self._build_reply_packet(
msg_type=DHCPACK, mac=mac, xid=xid, yiaddr=ip, giaddr=giaddr
msg_type=DHCPACK, mac=mac, xid=xid, yiaddr=ip, giaddr=giaddr, lease_seconds=lease_seconds
)
self._send_broadcast(pkt)
def _build_reply_packet(self, msg_type: int, mac: str, xid: int, yiaddr: str, giaddr: bytes) -> bytes:
def _build_reply_packet(self, msg_type: int, mac: str, xid: int, yiaddr: str, giaddr: bytes, lease_seconds: int) -> bytes:
# BOOTP header
op = 2 # reply
htype = 1
@@ -374,8 +526,52 @@ class DHCPServer:
dns_bytes = b"".join(ip2bytes(ip) for ip in self.dns_servers)
options.append((6, dns_bytes))
options.append((51, struct.pack("!I", LEASE_SECONDS))) # lease time
# Numeric extended options
for code_str, val in self.extended_options.items():
try:
code = int(code_str)
except Exception:
self._log_line(f"Ignoring extended option with non-numeric code '{code_str}'")
continue
opt_bytes = self._encode_extended_option(code, val)
if opt_bytes is None:
continue
options.append((code, opt_bytes))
options.append((51, struct.pack("!I", lease_seconds))) # lease time
return hdr + build_options(options)
def _build_forcerenew_packet(self, mac: Optional[str], client_ip: str) -> Optional[bytes]:
"""Minimal FORCERENEW per RFC 3203 to trigger clients to renew."""
try:
ciaddr = ip2bytes(client_ip)
except Exception:
return None
op = 2
htype = 1
hlen = 6 if mac else 0
hops = 0
xid = 0
secs = 0
flags = 0
yiaddr_b = b"\x00\x00\x00\x00"
siaddr = ip2bytes(self.server_ip or "0.0.0.0")
giaddr_b = b"\x00\x00\x00\x00"
chaddr = (
bytes.fromhex(mac.replace(":", "")) + b"\x00" * 10
if mac
else b"\x00" * 16
)
sname = b"\x00" * 64
filef = b"\x00" * 128
hdr = struct.pack("!BBBBIHH4s4s4s4s16s64s128s",
op, htype, hlen, hops, xid, secs, flags,
ciaddr, yiaddr_b, siaddr, giaddr_b, chaddr, sname, filef)
options = [
(53, bytes([DHCPFORCERENEW])),
(54, siaddr),
]
return hdr + build_options(options)
def _send_broadcast(self, pkt: bytes) -> None:
@@ -393,3 +589,34 @@ class DHCPServer:
def _set_status(self, st: str) -> None:
self._status = st
def _encode_extended_option(self, code: int, value: str) -> Optional[bytes]:
if not value:
return None
if code == 42:
# NTP servers as comma-separated IP list
parts = [p.strip() for p in value.split(",") if p.strip()]
ips = []
for p in parts:
try:
ips.append(ip2bytes(p))
except Exception:
continue
if not ips:
self._log_line("Extended option 42 skipped (no valid IPs).")
return None
return b"".join(ips)
if code == 51:
try:
lease_int = int(value)
if lease_int <= 0:
return None
return struct.pack("!I", lease_int)
except Exception:
self._log_line("Extended option 51 ignored (invalid integer).")
return None
try:
return value.encode("ascii")
except Exception:
self._log_line(f"Extended option {code} not sent (invalid encoding).")
return None

View File

@@ -1,10 +1,14 @@
import ipaddress
import json
import os
import socket
import time
import tkinter as tk
from tkinter import messagebox, ttk
from typing import Optional, List
from tkinter import filedialog, messagebox, ttk
from typing import Dict, List, Optional, Tuple
from dhcp_server import DHCPServer
from option_specs import EXTENDED_OPTION_SPECS
import utils
@@ -55,86 +59,236 @@ class DHCPApp(tk.Frame):
def __init__(self, master: tk.Tk) -> None:
super().__init__(master)
master.title("Simple DHCP Server (Lab)")
master.minsize(700, 500)
self.pack(fill="both", expand=True, padx=12, pady=12)
# Single server instance
self.server: Optional[DHCPServer] = None
self.reservations: List[Dict[str, str]] = []
self.last_config_path: Optional[str] = None
self.ext_option_specs = list(EXTENDED_OPTION_SPECS)
self.ext_opt_values: Dict[str, tk.StringVar] = {}
self.ext_opt_enabled: Dict[str, tk.BooleanVar] = {}
# --- Top form ---
form = ttk.Frame(self)
form.pack(fill="x", pady=(0, 8))
# --- Toolbar ---
toolbar = ttk.Frame(self)
toolbar.pack(fill="x", pady=(0, 6))
self.btn_start = ttk.Button(toolbar, text="Start", command=self.start_server)
self.btn_stop = ttk.Button(toolbar, text="Stop", command=self.stop_server, state="disabled")
self.btn_rescan = ttk.Button(toolbar, text="Interfaces neu scannen", command=self.rescan_interfaces)
self.btn_load = ttk.Button(toolbar, text="Config laden", command=self.load_config)
self.btn_save = ttk.Button(toolbar, text="Config speichern", command=self.save_config)
self.btn_forcerenew = ttk.Button(toolbar, text="Force Renew", command=self.force_renew_all)
self.btn_start.pack(side="left", padx=(0, 6))
self.btn_stop.pack(side="left", padx=(0, 12))
self.btn_rescan.pack(side="left", padx=(0, 12))
self.btn_load.pack(side="left", padx=(0, 6))
self.btn_save.pack(side="left")
self.btn_forcerenew.pack(side="left", padx=(12, 0))
# Zeile 0
ttk.Label(form, text="Interface:").grid(row=0, column=0, sticky="w")
# --- Interface left, IP settings right ---
top = ttk.Frame(self)
top.pack(fill="x", pady=(0, 10), anchor="w")
# Left: Interface box with dropdown + info
iface_box = ttk.Labelframe(top, text="Interface")
iface_box.pack(side="left", padx=(0, 8), fill="y")
iface_box.update_idletasks()
iface_box.configure(width=400)
iface_box.pack_propagate(False)
iface_box.columnconfigure(1, weight=1)
ttk.Label(iface_box, text="Interface:").grid(row=0, column=0, sticky="w", padx=(8, 4), pady=(8, 6))
self.if_var = tk.StringVar()
self.if_menu = ttk.OptionMenu(form, self.if_var, None, *utils.get_network_interfaces(), command=self.on_iface_change)
self.if_menu.grid(row=0, column=1, sticky="ew", padx=(6, 16))
form.columnconfigure(1, weight=1)
self.if_combo = ttk.Combobox(iface_box, textvariable=self.if_var, state="readonly", width=18)
self.if_combo.grid(row=0, column=1, sticky="ew", padx=(4, 8), pady=(8, 6))
self.if_combo.bind("<<ComboboxSelected>>", self.on_iface_change)
ttk.Label(form, text="Server/Start-IP:").grid(row=0, column=2, sticky="w")
self.mac_var = tk.StringVar(value="-")
self.speed_var = tk.StringVar(value="-")
ttk.Label(iface_box, text="MAC:").grid(row=1, column=0, sticky="w", padx=(8, 4), pady=(2, 8))
ttk.Label(iface_box, textvariable=self.mac_var).grid(row=1, column=1, sticky="w", padx=(4, 8), pady=(2, 8))
ttk.Label(iface_box, text="Link:").grid(row=2, column=0, sticky="w", padx=(8, 4), pady=(0, 10))
ttk.Label(iface_box, textvariable=self.speed_var).grid(row=2, column=1, sticky="w", padx=(4, 8), pady=(0, 10))
# Right: IP settings box
ip_box = ttk.Labelframe(top, text="IP-Settings")
ip_box.pack(side="left", fill="y")
ip_box.update_idletasks()
ip_box.configure(width=420)
ip_box.pack_propagate(False)
ip_box.columnconfigure(1, weight=1)
ip_box.columnconfigure(3, weight=1)
ttk.Label(ip_box, text="Server/Start-IP:").grid(row=0, column=0, sticky="w", padx=(8, 4), pady=(8, 4))
self.ip_var = tk.StringVar(value="")
self.ip_entry = ttk.Entry(form, textvariable=self.ip_var, width=16)
self.ip_entry.grid(row=0, column=3, sticky="w", padx=(6, 16))
self.ip_entry = ttk.Entry(ip_box, textvariable=self.ip_var, width=16)
self.ip_entry.grid(row=0, column=1, sticky="w", padx=(4, 12), pady=(8, 4))
ttk.Label(form, text="Subnetzmaske:").grid(row=0, column=4, sticky="w")
ttk.Label(ip_box, text="Subnetzmaske:").grid(row=0, column=2, sticky="w", padx=(8, 4), pady=(8, 4))
self.mask_var = tk.StringVar(value="")
self.mask_entry = ttk.Entry(form, textvariable=self.mask_var, width=16)
self.mask_entry.grid(row=0, column=5, sticky="w", padx=(6, 16))
self.mask_entry = ttk.Entry(ip_box, textvariable=self.mask_var, width=16)
self.mask_entry.grid(row=0, column=3, sticky="w", padx=(4, 12), pady=(8, 4))
# Zeile 1 DNS Felder
ttk.Label(form, text="Primary DNS:").grid(row=1, column=0, sticky="w", pady=(6, 0))
ttk.Label(ip_box, text="Primary DNS:").grid(row=1, column=0, sticky="w", padx=(8, 4), pady=(4, 10))
self.dns1_var = tk.StringVar(value="")
self.dns1_entry = ttk.Entry(form, textvariable=self.dns1_var, width=16)
self.dns1_entry.grid(row=1, column=1, sticky="w", padx=(6, 16), pady=(6, 0))
self.dns1_entry = ttk.Entry(ip_box, textvariable=self.dns1_var, width=16)
self.dns1_entry.grid(row=1, column=1, sticky="w", padx=(4, 12), pady=(4, 10))
ttk.Label(form, text="Secondary DNS:").grid(row=1, column=2, sticky="w", pady=(6, 0))
ttk.Label(ip_box, text="Secondary DNS:").grid(row=1, column=2, sticky="w", padx=(8, 4), pady=(4, 10))
self.dns2_var = tk.StringVar(value="")
self.dns2_entry = ttk.Entry(form, textvariable=self.dns2_var, width=16)
self.dns2_entry.grid(row=1, column=3, sticky="w", padx=(6, 16), pady=(6, 0))
self.dns2_entry = ttk.Entry(ip_box, textvariable=self.dns2_var, width=16)
self.dns2_entry.grid(row=1, column=3, sticky="w", padx=(4, 12), pady=(4, 10))
# --- Buttons ---
btns = ttk.Frame(self)
btns.pack(fill="x", pady=(8, 8))
self.btn_start = ttk.Button(btns, text="Start", command=self.start_server)
self.btn_stop = ttk.Button(btns, text="Stop", command=self.stop_server, state="disabled")
self.btn_start.pack(side="left")
self.btn_stop.pack(side="left", padx=(8, 0))
# --- Clients & Logs Paned Layout ---
paned = ttk.Panedwindow(self, orient="horizontal")
paned.pack(fill="both", expand=True)
# Clients list
left = ttk.Labelframe(paned, text="Aktive Clients / Leases")
self.clients = tk.Listbox(left, height=12)
self.clients.pack(fill="both", expand=True, padx=6, pady=6)
paned.add(left, weight=1)
# Logs
right = ttk.Labelframe(paned, text="Log")
self.log = tk.Text(right, height=12, state="disabled")
self.log.pack(fill="both", expand=True, padx=6, pady=6)
paned.add(right, weight=2)
# --- Notebook with tabs ---
self._build_tabs()
# --- Status bar ---
self.status_var = tk.StringVar(value="Bereit.")
status = ttk.Label(self, textvariable=self.status_var, anchor="w", relief="sunken")
status.pack(fill="x", side="bottom")
# initial iface autofill (if any interface exists)
ifaces = utils.get_network_interfaces()
if ifaces:
self.if_var.set(ifaces[0])
self.on_iface_change(ifaces[0])
self._refresh_interface_list(initial=True)
self._apply_min_sizes()
# keep disabled option display in sync with GUI IP/DNS
for v in (self.ip_var, self.mask_var, self.dns1_var, self.dns2_var):
v.trace_add("write", lambda *_: self._refresh_disabled_ext_options())
self._refresh_disabled_ext_options()
# periodic refresh
self.after(400, self._refresh)
# -------------------- UI builders --------------------
def _build_tabs(self) -> None:
self.notebook = ttk.Notebook(self)
self.notebook.pack(fill="both", expand=True)
self._build_lease_tab()
self._build_log_tab()
self._build_reservations_tab()
self._build_extended_options_tab()
def _build_lease_tab(self) -> None:
tab = ttk.Frame(self.notebook)
columns = ("mac", "ip", "remaining", "action")
self.leases_tree = ttk.Treeview(tab, columns=columns, show="headings", height=10)
self.leases_tree.heading("mac", text="MAC-Adresse")
self.leases_tree.heading("ip", text="IP-Adresse")
self.leases_tree.heading("remaining", text="Restlaufzeit")
self.leases_tree.heading("action", text="Reservierung")
self.leases_tree.column("mac", width=160, anchor="w")
self.leases_tree.column("ip", width=140, anchor="w")
self.leases_tree.column("remaining", width=120, anchor="w")
self.leases_tree.column("action", width=120, anchor="w")
scroll = ttk.Scrollbar(tab, orient="vertical", command=self.leases_tree.yview)
self.leases_tree.configure(yscrollcommand=scroll.set)
self.leases_tree.pack(side="left", fill="both", expand=True, padx=6, pady=6)
scroll.pack(side="right", fill="y")
self.leases_tree.bind("<ButtonRelease-1>", self._on_lease_click)
self.notebook.add(tab, text="Leases")
def _build_log_tab(self) -> None:
tab = ttk.Frame(self.notebook)
self.log = tk.Text(tab, height=12, state="disabled")
scroll = ttk.Scrollbar(tab, orient="vertical", command=self.log.yview)
self.log.configure(yscrollcommand=scroll.set)
self.log.pack(side="left", fill="both", expand=True, padx=6, pady=6)
scroll.pack(side="right", fill="y")
self.notebook.add(tab, text="Log")
def _build_reservations_tab(self) -> None:
tab = ttk.Frame(self.notebook)
tree_frame = ttk.Frame(tab)
tree_frame.pack(fill="both", expand=True, padx=6, pady=6)
columns = ("mac", "ip", "lease", "name")
self.res_tree = ttk.Treeview(tree_frame, columns=columns, show="headings", selectmode="extended", height=8)
self.res_tree.heading("mac", text="MAC-Adresse")
self.res_tree.heading("ip", text="IP-Adresse")
self.res_tree.heading("lease", text="Lease (s)")
self.res_tree.heading("name", text="Name/Notiz")
self.res_tree.column("mac", width=150, anchor="w")
self.res_tree.column("ip", width=140, anchor="w")
self.res_tree.column("lease", width=90, anchor="w")
self.res_tree.column("name", width=200, anchor="w")
scroll = ttk.Scrollbar(tree_frame, orient="vertical", command=self.res_tree.yview)
self.res_tree.configure(yscrollcommand=scroll.set)
self.res_tree.pack(side="left", fill="both", expand=True)
scroll.pack(side="right", fill="y")
self.res_tree.bind("<<TreeviewSelect>>", self._on_reservation_select)
form = ttk.Labelframe(tab, text="Eintrag bearbeiten")
form.pack(fill="x", padx=6, pady=(0, 8))
ttk.Label(form, text="MAC:").grid(row=0, column=0, sticky="w", padx=(6, 2), pady=4)
self.res_mac_var = tk.StringVar()
ttk.Entry(form, textvariable=self.res_mac_var, width=22).grid(row=0, column=1, sticky="w", pady=4)
ttk.Label(form, text="IP:").grid(row=0, column=2, sticky="w", padx=(12, 2), pady=4)
self.res_ip_var = tk.StringVar()
ttk.Entry(form, textvariable=self.res_ip_var, width=18).grid(row=0, column=3, sticky="w", pady=4)
ttk.Label(form, text="Lease (s):").grid(row=0, column=4, sticky="w", padx=(12, 2), pady=4)
self.res_lease_var = tk.StringVar()
ttk.Entry(form, textvariable=self.res_lease_var, width=10).grid(row=0, column=5, sticky="w", pady=4)
ttk.Label(form, text="Name/Notiz:").grid(row=0, column=6, sticky="w", padx=(12, 2), pady=4)
self.res_name_var = tk.StringVar()
ttk.Entry(form, textvariable=self.res_name_var, width=22).grid(row=0, column=7, sticky="w", pady=4)
actions = ttk.Frame(form)
actions.grid(row=0, column=8, sticky="e", padx=(12, 6))
ttk.Button(actions, text="Hinzufügen/Aktualisieren", command=self.add_or_update_reservation).pack(side="left", padx=(0, 6))
ttk.Button(actions, text="Löschen", command=self.remove_reservation).pack(side="left")
self.notebook.add(tab, text="Reservierungen")
def _build_extended_options_tab(self) -> None:
tab = ttk.Frame(self.notebook)
form = ttk.Frame(tab)
form.pack(fill="x", padx=8, pady=(8, 8))
ttk.Label(form, text="Aktiv").grid(row=0, column=0, sticky="w", padx=(0, 12))
ttk.Label(form, text="Option").grid(row=0, column=1, sticky="w", padx=(0, 12))
ttk.Label(form, text="Wert").grid(row=0, column=2, sticky="w", padx=(0, 12))
ttk.Label(form, text="Beschreibung").grid(row=0, column=3, sticky="w", padx=(0, 12))
form.columnconfigure(3, weight=1)
row = 1
for spec in self.ext_option_specs:
code = spec["code"]
default_val = spec.get("default", "")
mandatory = bool(spec.get("mandatory"))
self.ext_opt_values[code] = tk.StringVar(value=default_val)
self.ext_opt_enabled[code] = tk.BooleanVar(value=mandatory)
disabled = bool(spec.get("disabled"))
desc_txt = spec.get("desc", "")
if disabled:
desc_txt = f"{desc_txt} (aus GUI/Server, gesperrt)"
chk = ttk.Checkbutton(form, variable=self.ext_opt_enabled[code])
if disabled:
chk.state(["disabled"])
chk.grid(row=row, column=0, sticky="w", pady=4)
ttk.Label(form, text=code).grid(row=row, column=1, sticky="w", pady=4)
entry = ttk.Entry(form, textvariable=self.ext_opt_values[code], width=32, state="disabled" if disabled else "normal")
entry.grid(row=row, column=2, sticky="w", padx=(0, 12), pady=4)
ttk.Label(form, text=desc_txt).grid(row=row, column=3, sticky="w", padx=(0, 12), pady=4)
row += 1
ttk.Label(tab, text="Weitere Optionen kannst du in option_specs.py ergänzen.").pack(anchor="w", padx=8, pady=(4, 8))
self.notebook.add(tab, text="Extended Options")
# -------------------- UI logic --------------------
def on_iface_change(self, _sel=None):
iface = self.if_var.get().strip()
if not iface:
self._clear_iface_fields()
return
ip, mask = utils.get_iface_ipv4_config(iface)
self._clear_iface_fields()
if ip:
self.ip_var.set(ip)
# sinnvolles Default: Primary DNS = Interface-IP
@@ -142,16 +296,78 @@ class DHCPApp(tk.Frame):
if mask:
self.mask_var.set(mask)
# Secondary DNS bleibt leer, damit du frei wählen kannst
self._update_iface_info(iface)
def _set_controls_enabled(self, enabled: bool):
state = "normal" if enabled else "disabled"
self.if_menu.configure(state=state)
state = "readonly" if enabled else "disabled"
self.if_combo.configure(state=state)
self.ip_entry.configure(state=state)
self.mask_entry.configure(state=state)
self.dns1_entry.configure(state=state)
self.dns2_entry.configure(state=state)
self.btn_rescan.configure(state=state)
self.btn_start.configure(state="normal" if enabled else "disabled")
self.btn_stop.configure(state="disabled" if enabled else "normal")
self.btn_forcerenew.configure(state="disabled" if enabled else "normal")
def _clear_iface_fields(self) -> None:
self.ip_var.set("")
self.mask_var.set("")
self.dns1_var.set("")
self.dns2_var.set("")
self.mac_var.set("-")
self.speed_var.set("-")
self._refresh_disabled_ext_options()
def _select_iface(self, iface: str) -> None:
self.if_var.set(iface)
self.on_iface_change()
def _refresh_interface_list(self, initial: bool = False) -> None:
interfaces = utils.get_network_interfaces()
current = self.if_var.get().strip()
self.if_combo["values"] = interfaces
if current in interfaces:
selection = current
elif interfaces:
selection = interfaces[0]
else:
selection = ""
if selection:
self._select_iface(selection)
if not initial:
self.status_var.set(f"{len(interfaces)} Interface(s) geladen.")
else:
self.if_var.set("")
self._clear_iface_fields()
if not initial:
self.status_var.set("Keine Netzwerk-Interfaces gefunden.")
if selection:
self._update_iface_info(selection)
self._refresh_disabled_ext_options()
def _apply_min_sizes(self) -> None:
# Ensure window cannot shrink below a comfortable layout
self.update_idletasks()
min_w = max(self.winfo_reqwidth() + 16, 840)
min_h = max(self.winfo_reqheight() + 16, 580)
self.master.minsize(min_w, min_h)
def _update_iface_info(self, iface: str) -> None:
mac, speed = utils.get_iface_hwinfo(iface)
self.mac_var.set(mac or "-")
if speed is None or speed <= 0:
self.speed_var.set("-")
else:
self.speed_var.set(f"{speed} Mbit/s")
def rescan_interfaces(self) -> None:
if self.server and self.server.is_running():
messagebox.showinfo("Server läuft", "Stoppe den Server, bevor du Interfaces neu scannst.")
return
self._refresh_interface_list()
def start_server(self) -> None:
if self.server and self.server.is_running():
@@ -196,13 +412,28 @@ class DHCPApp(tk.Frame):
messagebox.showerror("Start fehlgeschlagen (Bind)", f"Konnte Port 67 auf {iface} nicht binden:\n{e}")
return
reservations = self._get_reservation_dict()
extended_opts = self._collect_extended_options()
try:
self._ensure_mandatory_ext_options(fail_on_missing=True)
except Exception as e:
messagebox.showerror("Pflicht-Option fehlt", str(e))
return
# --- Start ---
try:
self.server = DHCPServer()
# NEU: DNS-Liste an den Server übergeben
self.server.start(iface, ip, mask, dns_servers=dns_list)
self.server.start(
iface,
ip,
mask,
dns_servers=dns_list,
reservations=reservations,
extended_options=extended_opts,
)
self.status_var.set("Server gestartet.")
self._set_controls_enabled(False)
self.btn_forcerenew.configure(state="normal")
except Exception as e:
self.server = None
self.status_var.set("Start fehlgeschlagen.")
@@ -212,8 +443,208 @@ class DHCPApp(tk.Frame):
if self.server:
self.server.stop()
self._set_controls_enabled(True)
self.btn_forcerenew.configure(state="disabled")
self.status_var.set("Server gestoppt.")
# -------------------- Reservations --------------------
def _refresh_reservations_view(self) -> None:
for item in self.res_tree.get_children():
self.res_tree.delete(item)
for entry in self.reservations:
lease_raw = entry.get("lease_time", "")
lease = lease_raw if lease_raw is not None else ""
self.res_tree.insert("", "end", values=(entry.get("mac", ""), entry.get("ip", ""), lease, entry.get("name", "")))
def add_or_update_reservation(self) -> None:
mac = self.res_mac_var.get().strip()
ip = self.res_ip_var.get().strip()
lease = self.res_lease_var.get().strip()
name = self.res_name_var.get().strip()
if not mac or not ip:
messagebox.showerror("Fehler", "MAC und IP sind erforderlich.")
return
try:
mac_norm = utils.normalize_mac(mac)
ip_norm = utils.format_ip_address(ip)
lease_val = None
if lease:
lease_int = int(lease)
if lease_int <= 0:
raise ValueError("Lease muss > 0 sein.")
lease_val = lease_int
except Exception as e:
messagebox.showerror("Ungültige Eingabe", str(e))
return
existing = next((r for r in self.reservations if r.get("mac") == mac_norm), None)
if existing:
existing.update({"ip": ip_norm, "name": name, "lease_time": lease_val})
else:
self.reservations.append({"mac": mac_norm, "ip": ip_norm, "name": name, "lease_time": lease_val})
self._refresh_reservations_view()
def remove_reservation(self) -> None:
selected = self.res_tree.selection()
if not selected:
return
macs = {self.res_tree.item(item, "values")[0] for item in selected}
self.reservations = [r for r in self.reservations if r.get("mac") not in macs]
self._refresh_reservations_view()
def _on_reservation_select(self, _event=None) -> None:
selected = self.res_tree.selection()
if not selected:
return
vals = self.res_tree.item(selected[0], "values")
if vals:
self.res_mac_var.set(vals[0])
self.res_ip_var.set(vals[1])
self.res_lease_var.set(vals[2])
self.res_name_var.set(vals[3] if len(vals) > 3 else "")
def _get_reservation_dict(self) -> List[Dict[str, str]]:
return [
{
"mac": entry["mac"],
"ip": entry["ip"],
"name": entry.get("name", ""),
"lease_time": entry.get("lease_time"),
}
for entry in self.reservations
if entry.get("mac") and entry.get("ip")
]
def _has_reservation(self, mac: str) -> bool:
return any(r.get("mac") == mac for r in self.reservations)
# -------------------- Extended Options --------------------
def _collect_extended_options(self) -> Dict[str, str]:
opts: Dict[str, Dict[str, str]] = {}
for spec in self.ext_option_specs:
code = spec["code"]
if spec.get("disabled"):
continue
val = (self.ext_opt_values.get(code) or tk.StringVar()).get().strip()
enabled = bool((self.ext_opt_enabled.get(code) or tk.BooleanVar()).get())
if spec.get("mandatory") and not val:
continue
if (enabled or spec.get("mandatory")) and val:
opts[code] = {"value": val, "enabled": True}
return opts
# -------------------- Config load/save --------------------
def _collect_config(self) -> Dict:
dns_entries = [d for d in (self.dns1_var.get().strip(), self.dns2_var.get().strip()) if d]
return {
"interface": self.if_var.get().strip(),
"server_ip": self.ip_var.get().strip(),
"subnet_mask": self.mask_var.get().strip(),
"dns": dns_entries,
"reservations": self.reservations,
"extended_options": {
spec["code"]: {
"value": (self.ext_opt_values[spec["code"]].get().strip()),
"enabled": bool(self.ext_opt_enabled[spec["code"]].get()),
}
for spec in self.ext_option_specs
if not spec.get("disabled")
},
}
def _apply_config(self, cfg: Dict) -> None:
self.if_var.set(cfg.get("interface", ""))
self.ip_var.set(cfg.get("server_ip", ""))
self.mask_var.set(cfg.get("subnet_mask", ""))
dns = cfg.get("dns") or []
self.dns1_var.set(dns[0] if len(dns) > 0 else "")
self.dns2_var.set(dns[1] if len(dns) > 1 else "")
self._refresh_disabled_ext_options()
# reservations
self.reservations = []
for item in cfg.get("reservations") or []:
mac = item.get("mac") or ""
ip = item.get("ip") or ""
name = item.get("name") or ""
lease_time = item.get("lease_time")
try:
lease_time = int(lease_time) if lease_time not in (None, "", "None") else None
if lease_time is not None and lease_time <= 0:
lease_time = None
except Exception:
lease_time = None
try:
mac_norm = utils.normalize_mac(mac)
ip_norm = utils.format_ip_address(ip)
except Exception:
continue
self.reservations.append({"mac": mac_norm, "ip": ip_norm, "name": name, "lease_time": lease_time})
self._refresh_reservations_view()
ext = cfg.get("extended_options") or {}
# First clear
for code in self.ext_option_specs:
default_val = code.get("default", "")
self.ext_opt_values[code["code"]].set(default_val)
self.ext_opt_enabled[code["code"]].set(bool(code.get("mandatory")))
self._refresh_disabled_ext_options()
# Apply values if present (numeric keys preferred)
for code in self.ext_option_specs:
c = code["code"]
if code.get("disabled"):
continue
entry = ext.get(c)
if entry is None:
continue
if isinstance(entry, dict):
self.ext_opt_values[c].set(entry.get("value", "") or code.get("default", ""))
self.ext_opt_enabled[c].set(bool(entry.get("enabled")) or bool(code.get("mandatory")))
else:
self.ext_opt_values[c].set(str(entry))
self.ext_opt_enabled[c].set(True)
# Enforce mandatory defaults if still empty
self._ensure_mandatory_ext_options(fail_on_missing=False)
def load_config(self) -> None:
path = filedialog.askopenfilename(
title="Config laden",
filetypes=[("JSON", "*.json"), ("Alle Dateien", "*.*")],
initialfile=os.path.basename(self.last_config_path or ""),
)
if not path:
return
try:
with open(path, "r", encoding="utf-8") as f:
cfg = json.load(f)
except Exception as e:
messagebox.showerror("Fehler beim Laden", str(e))
return
self._apply_config(cfg)
self.last_config_path = path
self.status_var.set(f"Config geladen: {os.path.basename(path)}")
def save_config(self) -> None:
path = filedialog.asksaveasfilename(
title="Config speichern",
defaultextension=".json",
filetypes=[("JSON", "*.json"), ("Alle Dateien", "*.*")],
initialfile=os.path.basename(self.last_config_path or "dhcp_config.json"),
)
if not path:
return
cfg = self._collect_config()
try:
with open(path, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2)
except Exception as e:
messagebox.showerror("Fehler beim Speichern", str(e))
return
self.last_config_path = path
self.status_var.set(f"Config gespeichert: {os.path.basename(path)}")
# -------------------- Refresh / Log --------------------
def _append_log(self, lines):
if not lines:
return
@@ -223,14 +654,135 @@ class DHCPApp(tk.Frame):
self.log.see("end")
self.log.configure(state="disabled")
def _update_leases_view(self, leases: Dict[str, Tuple[str, float]]) -> None:
current_items = {self.leases_tree.item(i, "values")[0]: i for i in self.leases_tree.get_children()}
# remove stale
for mac in set(current_items.keys()) - set(leases.keys()):
self.leases_tree.delete(current_items[mac])
# add/update
now = time.time()
for mac, (ip, expiry) in sorted(leases.items()):
remaining = max(int(expiry - now), 0)
remaining_txt = self._format_duration(remaining)
action = "Reserviert" if self._has_reservation(mac) else "Add"
if mac in current_items:
self.leases_tree.item(current_items[mac], values=(mac, ip, remaining_txt, action))
else:
self.leases_tree.insert("", "end", values=(mac, ip, remaining_txt, action))
def _refresh(self) -> None:
# update clients
self.clients.delete(0, "end")
leases: Dict[str, Tuple[str, float]] = {}
if self.server and self.server.is_running():
leases = self.server.get_leases()
for mac, ip in leases.items():
self.clients.insert("end", f"{mac}{ip}")
leases = self.server.get_leases_with_expiry()
self.status_var.set(f"Status: {self.server.get_status()}")
# pull logs
self._append_log(self.server.pop_logs(100))
else:
self.status_var.set("Bereit.")
self._update_leases_view(leases)
self.after(400, self._refresh)
def _refresh_disabled_ext_options(self) -> None:
# Update display values for disabled, auto-managed DHCP options
dns_list = [d for d in (self.dns1_var.get().strip(), self.dns2_var.get().strip()) if d]
dns_text = ", ".join(dns_list)
auto_values = {
"1": self.mask_var.get().strip(),
"3": self.ip_var.get().strip(),
"6": dns_text,
}
for spec in self.ext_option_specs:
if not spec.get("disabled"):
continue
code = spec["code"]
if code in auto_values:
self.ext_opt_values[code].set(auto_values[code])
# -------------------- Force Renew --------------------
def force_renew_all(self) -> None:
if not self.server or not self.server.is_running():
messagebox.showinfo("Info", "Server läuft nicht.")
return
ip = self.ip_var.get().strip()
mask = self.mask_var.get().strip()
try:
net = ipaddress.IPv4Network((ip, mask), strict=False)
except Exception:
messagebox.showerror("Fehler", "Ungültiges Netz (IP/Subnetzmaske) für Force Renew.")
return
host_count = len(list(net.hosts()))
targets = max(host_count - 1, 0) # exclude server IP roughly
extra_warn = ""
if targets > 512:
extra_warn = f"\nACHTUNG: {targets} Ziele das kann dauern."
if not messagebox.askyesno(
"Warnung",
f"DHCPFORCERENEW wird an alle {targets} IPs im Subnetz {net} gesendet.\n"
"Viele Clients ignorieren das, aber es erzeugt Traffic." + extra_warn,
icon="warning",
):
return
try:
sent = self.server.force_renew_all()
self.status_var.set(f"FORCERENEW gesendet an {sent} Client(s).")
except Exception as e:
messagebox.showerror("Fehler", str(e))
# -------------------- Leases actions --------------------
def _on_lease_click(self, event) -> None:
item_id = self.leases_tree.identify_row(event.y)
col = self.leases_tree.identify_column(event.x)
if not item_id or col != "#4": # action column
return
vals = self.leases_tree.item(item_id, "values")
if len(vals) < 4:
return
mac, ip, action = vals[0], vals[1], vals[3]
if action != "Add":
return
self._add_reservation_from_lease(mac, ip)
def _add_reservation_from_lease(self, mac: str, ip: str) -> None:
if self._has_reservation(mac):
messagebox.showinfo("Info", "Reservierung existiert bereits.")
return
self.res_mac_var.set(mac)
self.res_ip_var.set(ip)
self.res_name_var.set("")
self.add_or_update_reservation()
# -------------------- Helpers --------------------
def _format_duration(self, seconds: int) -> str:
if seconds <= 0:
return "0s"
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
if h > 0:
return f"{h}h {m}m"
if m > 0:
return f"{m}m {s}s"
return f"{s}s"
def _ensure_mandatory_ext_options(self, fail_on_missing: bool) -> None:
missing = []
for spec in self.ext_option_specs:
if spec.get("disabled"):
continue
if not spec.get("mandatory"):
continue
code = spec["code"]
val = self.ext_opt_values[code].get().strip()
if not val:
default_val = spec.get("default", "")
if default_val:
self.ext_opt_values[code].set(default_val)
val = default_val
if not val:
missing.append(f"Option {code} ({spec.get('label','')})")
else:
self.ext_opt_enabled[code].set(True)
if missing and fail_on_missing:
raise ValueError("Folgende Pflicht-Optionen fehlen: " + ", ".join(missing))

18
src/option_specs.py Normal file
View File

@@ -0,0 +1,18 @@
"""
Definitions for supported extended DHCP options in the GUI.
Extend this list to add more options; codes must be numeric strings.
"""
EXTENDED_OPTION_SPECS = [
# Managed internally (anzeige, aber nicht editierbar)
{"code": "1", "label": "Subnet Mask", "desc": "Automatisch aus IP-Settings", "type": "text", "disabled": True},
{"code": "3", "label": "Router/Gateway", "desc": "Automatisch auf Server-IP", "type": "text", "disabled": True},
{"code": "6", "label": "DNS-Server", "desc": "Aus IP-Settings (GUI)", "type": "text", "disabled": True},
# Freie Optionen (aufsteigend)
{"code": "15", "label": "DNS-Suffix", "desc": "DNS-Domäne", "type": "text"},
{"code": "42", "label": "NTP-Server", "desc": "NTP-Server IPs (kommagetrennt)", "type": "text"},
{"code": "51", "label": "Lease Time", "desc": "Lease in Sekunden (mandatory)", "type": "text", "mandatory": True, "default": "3600"},
{"code": "66", "label": "TFTP-Server", "desc": "TFTP-Servername oder IP", "type": "text"},
{"code": "67", "label": "Bootfile", "desc": "Bootfile-Name (PXE)", "type": "text"},
]

View File

@@ -1,8 +1,11 @@
import socket
import ipaddress
import psutil
import socket
import string
from typing import Optional, Tuple
import psutil
def get_network_interfaces():
"""Return only interfaces that have an IPv4 address to keep choices sane."""
interfaces = []
@@ -21,6 +24,19 @@ def get_iface_ipv4_config(iface: str) -> Tuple[Optional[str], Optional[str]]:
return a.address, a.netmask
return None, None
def get_iface_hwinfo(iface: str) -> Tuple[Optional[str], Optional[int]]:
"""Return (MAC, speed_mbps) where available."""
mac = None
addrs = psutil.net_if_addrs().get(iface, [])
for a in addrs:
if getattr(a, "family", None) == getattr(psutil, "AF_LINK", None):
mac = a.address
break
stats = psutil.net_if_stats().get(iface)
speed = stats.speed if stats else None
return mac, speed
def format_ip_address(ip: str) -> str:
"""Normalize dotted quad; raises on invalid input."""
ip_obj = ipaddress.ip_address(ip)
@@ -31,3 +47,11 @@ def format_ip_address(ip: str) -> str:
def validate_subnet_mask(mask: str) -> None:
"""Accept masks like 255.255.255.0; raise ValueError if invalid."""
ipaddress.IPv4Network(f"0.0.0.0/{mask}")
def normalize_mac(mac: str) -> str:
"""Return MAC as AA:BB:CC:DD:EE:FF; raises on invalid input."""
cleaned = mac.strip().replace("-", "").replace(":", "").replace(".", "").lower()
if len(cleaned) != 12 or any(c not in string.hexdigits for c in cleaned):
raise ValueError("Ungültige MAC-Adresse.")
return ":".join(cleaned[i:i+2] for i in range(0, 12, 2)).upper()