Compare commits

...

3 Commits

5 changed files with 860 additions and 101 deletions

View File

@@ -18,10 +18,19 @@ Benutze dieses Tool nur in isolierten Testumgebungen oder auf einem dedizierten
- liest IP & Subnetz des gewählten Interface automatisch aus - liest IP & Subnetz des gewählten Interface automatisch aus
- vergibt Adressen aus einem dynamischen Pool (ab Start-IP) - vergibt Adressen aus einem dynamischen Pool (ab Start-IP)
- **Neu:** Eingabefelder für Primary/Secondary DNS (DHCP Option 6) - **Neu:** Eingabefelder für Primary/Secondary DNS (DHCP Option 6)
- Lease-Time optional per Extended Option 51 oder pro Reservierung
- **Config-Profiles**: Interface/IP/DNS + Reservierungen/Extended Options als JSON speichern & laden
- **Statische Reservierungen**: MAC → IP Tabelle in der GUI, wird vom Server beachtet
- **Leases-Tab**: Restlaufzeit sichtbar, Direkt-Button „Add“ um Leases in Reservierungen zu übernehmen
- **Optionale PXE-Infos**: DHCP Option 66 (TFTP-Server) und 67 (Bootfile), wenn gesetzt
- **DHCP-Optionen:** - **DHCP-Optionen:**
- Option 1: Subnetzmaske - Option 1: Subnetzmaske
- Option 3: Router/Gateway (setzt automatisch auf Server-IP) - Option 3: Router/Gateway (setzt automatisch auf Server-IP)
- Option 6: DNS-Server (aus GUI) - Option 6: DNS-Server (aus GUI)
- Option 42: NTP-Server (kommagetrennte IPs)
- Option 15: DNS-Suffix
- Option 51: Lease Time (Pflicht; Default 3600s, kann in Extended Options gesetzt werden)
- Option 66/67: TFTP-Server & Bootfile
- Option 51: Lease-Time - Option 51: Lease-Time
- **Log-Panel & Statusleiste**: zeigt DHCP-Events in Echtzeit - **Log-Panel & Statusleiste**: zeigt DHCP-Events in Echtzeit
- **Echte DHCP-Pakete**: DISCOVER → OFFER, REQUEST → ACK, RELEASE → Lease-Freigabe - **Echte DHCP-Pakete**: DISCOVER → OFFER, REQUEST → ACK, RELEASE → Lease-Freigabe
@@ -143,4 +152,4 @@ Leases werden nur im Speicher gehalten (kein persistentes Lease-File).
## ⚠️ Haftungsausschluss ## ⚠️ Haftungsausschluss
Dies ist ein **Entwicklungs- und Lern-Tool**. Es ist *nicht* als Ersatz für produktive DHCP-Server gedacht. Dies ist ein **Entwicklungs- und Lern-Tool**. Es ist *nicht* als Ersatz für produktive DHCP-Server gedacht.
Benutzung auf eigene Gefahr prüfe deine Netzumgebung sorgfältig, bevor du den Server startest! Benutzung auf eigene Gefahr prüfe deine Netzumgebung sorgfältig, bevor du den Server startest!

View File

@@ -1,13 +1,16 @@
import os import os
import select
import socket import socket
import struct import struct
import threading import threading
import time import time
import select
from collections import deque from collections import deque
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
import ipaddress import ipaddress
from utils import normalize_mac
MAGIC_COOKIE = b"\x63\x82\x53\x63" MAGIC_COOKIE = b"\x63\x82\x53\x63"
@@ -20,8 +23,9 @@ DHCPACK = 5
DHCPNAK = 6 DHCPNAK = 6
DHCPRELEASE = 7 DHCPRELEASE = 7
DHCPINFORM = 8 DHCPINFORM = 8
DHCPFORCERENEW = 9
LEASE_SECONDS = 3600 # 1 hour leases LEASE_SECONDS = 3600 # default 1 hour leases
def mac_bytes_to_str(b: bytes) -> str: def mac_bytes_to_str(b: bytes) -> str:
@@ -94,18 +98,32 @@ class DHCPServer:
# NEW: configurable DNS servers for DHCP Option 6 # NEW: configurable DNS servers for DHCP Option 6
self.dns_servers: List[str] = [] self.dns_servers: List[str] = []
self.reservations: Dict[str, Dict[str, Optional[int]]] = {} # normalized MAC -> {"ip": str, "lease": Optional[int]}
self.extended_options: Dict[str, str] = {}
self._network: Optional[ipaddress.IPv4Network] = None
self.default_lease_seconds: int = LEASE_SECONDS
# -------------------- Public API -------------------- # -------------------- Public API --------------------
def start(self, interface: str, start_ip: str, mask: str, dns_servers: Optional[List[str]] = None) -> None: def start(
self,
interface: str,
start_ip: str,
mask: str,
dns_servers: Optional[List[str]] = None,
reservations: Optional[List[Dict[str, str]]] = None,
extended_options: Optional[Dict[str, Dict[str, str]]] = None,
) -> None:
if self.is_running(): if self.is_running():
raise RuntimeError("Server is already running.") raise RuntimeError("Server is already running.")
self.interface = interface self.interface = interface
self.ip_start = start_ip self.ip_start = start_ip
self.subnet_mask = mask self.subnet_mask = mask
self._network = ipaddress.IPv4Network((start_ip, mask), strict=False)
self.default_lease_seconds = LEASE_SECONDS
# build pool based on interface network # build pool based on interface network
self.server_ip = start_ip # in this lab version we use the provided IP as "server identifier" self.server_ip = start_ip # in this lab version we use the provided IP as "server identifier"
self._pool = self._build_pool(start_ip, mask) self._pool = self._build_pool(self._network, start_ip)
# configure DNS (filter invalid entries); default to [server_ip, 8.8.8.8] # configure DNS (filter invalid entries); default to [server_ip, 8.8.8.8]
self.dns_servers = [] self.dns_servers = []
@@ -119,6 +137,25 @@ class DHCPServer:
if not self.dns_servers and self.server_ip: if not self.dns_servers and self.server_ip:
self.dns_servers = [self.server_ip, "8.8.8.8"] self.dns_servers = [self.server_ip, "8.8.8.8"]
# static MAC reservations
self.reservations = self._prepare_reservations(reservations or [])
# extended DHCP options (numeric keyed)
self.extended_options = {}
if extended_options:
for code, data in extended_options.items():
val = data.get("value") if isinstance(data, dict) else data
if val:
self.extended_options[str(code)] = str(val)
# If option 51 is provided, use it as default lease
opt51 = self.extended_options.get("51")
if opt51:
try:
lease_int = int(opt51)
if lease_int > 0:
self.default_lease_seconds = lease_int
except Exception:
self._log_line("Extended option 51 ignored (invalid integer).")
self._stop_evt.clear() self._stop_evt.clear()
self._thread = threading.Thread(target=self._run_loop, name="DHCPWorker", daemon=True) self._thread = threading.Thread(target=self._run_loop, name="DHCPWorker", daemon=True)
self._thread.start() self._thread.start()
@@ -150,6 +187,10 @@ class DHCPServer:
with self._lock: with self._lock:
return {mac: ip for mac, (ip, _) in self._leases.items()} return {mac: ip for mac, (ip, _) in self._leases.items()}
def get_leases_with_expiry(self) -> Dict[str, Tuple[str, float]]:
with self._lock:
return {mac: (ip, expiry) for mac, (ip, expiry) in self._leases.items()}
def get_status(self) -> str: def get_status(self) -> str:
return self._status return self._status
@@ -160,6 +201,38 @@ class DHCPServer:
items.append(self._log.popleft()) items.append(self._log.popleft())
return items return items
def force_renew_all(self) -> int:
"""
Send DHCPFORCERENEW across the subnet (all hosts).
Uses known MAC from leases when available; otherwise sends with empty CHADDR.
Returns count of attempted sends.
"""
if not self.is_running() or not self._sock:
raise RuntimeError("Server not running.")
if not self._network:
raise RuntimeError("Subnet unbekannt.")
with self._lock:
lease_items = list(self._leases.items()) # (mac, (ip, expiry))
lease_by_ip = {ip: mac for mac, (ip, _) in lease_items}
sent = 0
for host in self._network.hosts():
ip = str(host)
if ip == self.server_ip:
continue
mac = lease_by_ip.get(ip)
pkt = self._build_forcerenew_packet(mac, ip)
if not pkt:
continue
try:
self._sock.sendto(pkt, (ip, 68))
sent += 1
except Exception as e:
self._log_line(f"FORCERENEW to {mac}/{ip} failed: {e}")
self._log_line(f"FORCERENEW sent to {sent} client(s).")
return sent
# -------------------- Internals -------------------- # -------------------- Internals --------------------
def _run_loop(self) -> None: def _run_loop(self) -> None:
try: try:
@@ -238,10 +311,9 @@ class DHCPServer:
if mac in self._active_clients: if mac in self._active_clients:
self._active_clients.remove(mac) self._active_clients.remove(mac)
def _build_pool(self, start_ip: str, mask: str) -> List[str]: def _build_pool(self, network: ipaddress.IPv4Network, start_ip: str) -> List[str]:
# Build pool from the network containing start_ip with provided mask # Build pool from the network containing start_ip with provided mask
net = ipaddress.IPv4Network((start_ip, mask), strict=False) all_hosts = list(network.hosts())
all_hosts = list(net.hosts())
# Begin at/after start_ip; exclude start_ip (we use it as server/gateway) # Begin at/after start_ip; exclude start_ip (we use it as server/gateway)
try: try:
start_idx = all_hosts.index(ipaddress.IPv4Address(start_ip)) start_idx = all_hosts.index(ipaddress.IPv4Address(start_ip))
@@ -256,6 +328,61 @@ class DHCPServer:
pool.remove(start_ip) pool.remove(start_ip)
return pool return pool
def _prepare_reservations(self, reservations: List[Dict[str, str]]) -> Dict[str, Dict[str, Optional[int]]]:
prepared: Dict[str, Dict[str, Optional[int]]] = {}
for res in reservations:
mac_raw = res.get("mac", "")
ip = res.get("ip", "")
lease = res.get("lease_time")
try:
lease_int = int(lease) if lease not in (None, "") else None
if lease_int is not None and lease_int <= 0:
lease_int = None
except Exception:
lease_int = None
try:
mac_norm = normalize_mac(mac_raw)
except Exception:
self._log_line(f"Ignoring reservation '{mac_raw}': invalid MAC")
continue
try:
ip_norm = str(ipaddress.IPv4Address(ip))
except Exception:
self._log_line(f"Ignoring reservation {mac_norm}: invalid IP '{ip}'")
continue
if ip_norm == self.server_ip:
self._log_line(f"Ignoring reservation {mac_norm}: IP {ip_norm} is server IP")
continue
if self._network and ipaddress.IPv4Address(ip_norm) not in self._network:
self._log_line(f"Ignoring reservation {mac_norm}: IP {ip_norm} not in subnet")
continue
prepared[mac_norm] = {"ip": ip_norm, "lease": lease_int}
if ip_norm in self._pool:
self._pool.remove(ip_norm)
return prepared
def _reserved_ip_for(self, mac: str) -> Optional[str]:
entry = self.reservations.get(mac.upper())
return entry["ip"] if entry else None
def _reserved_lease_for(self, mac: str) -> Optional[int]:
entry = self.reservations.get(mac.upper())
return entry.get("lease") if entry else None
def _is_ip_available(self, ip: Optional[str], mac: str) -> bool:
if not ip:
return False
# Reserved for different MAC?
for res_mac, res in self.reservations.items():
res_ip = res.get("ip")
if res_ip == ip and res_mac != mac:
return False
with self._lock:
for lease_mac, (lease_ip, _) in self._leases.items():
if lease_ip == ip and lease_mac != mac:
return False
return True
def _next_free_ip(self) -> Optional[str]: def _next_free_ip(self) -> Optional[str]:
with self._lock: with self._lock:
used = {ip for ip, _ in self._leases.values()} used = {ip for ip, _ in self._leases.values()}
@@ -309,37 +436,62 @@ class DHCPServer:
pass pass
def _dhcp_offer(self, mac: str, xid: int, giaddr: bytes) -> None: def _dhcp_offer(self, mac: str, xid: int, giaddr: bytes) -> None:
ip = self._next_free_ip() reserved_ip = self._reserved_ip_for(mac)
lease_seconds = self._reserved_lease_for(mac) or self.default_lease_seconds
if reserved_ip and self._is_ip_available(reserved_ip, mac):
ip = reserved_ip
else:
ip = self._next_free_ip()
if not ip: if not ip:
self._log_line("No free IP to offer.") self._log_line("No free IP to offer.")
return return
self._log_line(f"OFFER {ip} to {mac}") if reserved_ip:
self._log_line(f"OFFER (reserved) {ip} to {mac}")
else:
self._log_line(f"OFFER {ip} to {mac}")
pkt = self._build_reply_packet( pkt = self._build_reply_packet(
msg_type=DHCPOFFER, mac=mac, xid=xid, yiaddr=ip, giaddr=giaddr msg_type=DHCPOFFER, mac=mac, xid=xid, yiaddr=ip, giaddr=giaddr, lease_seconds=lease_seconds
) )
self._send_broadcast(pkt) self._send_broadcast(pkt)
def _dhcp_ack(self, mac: str, xid: int, giaddr: bytes, requested_ip: Optional[str]) -> None: def _dhcp_ack(self, mac: str, xid: int, giaddr: bytes, requested_ip: Optional[str]) -> None:
# If requested IP is available or already leased to this MAC, use it; else choose next free # If requested IP is available or already leased to this MAC, use it; else choose next free
reserved_ip = self._reserved_ip_for(mac)
lease_seconds = self._reserved_lease_for(mac) or self.default_lease_seconds
if reserved_ip and not self._is_ip_available(reserved_ip, mac):
reserved_ip = None
with self._lock: with self._lock:
current = self._leases.get(mac, (None, 0))[0] current = self._leases.get(mac, (None, 0))[0]
ip = requested_ip or current or self._next_free_ip() ip: Optional[str] = None
if reserved_ip:
ip = reserved_ip
elif requested_ip and self._is_ip_available(requested_ip, mac):
ip = requested_ip
elif current and self._is_ip_available(current, mac):
ip = current
else:
ip = self._next_free_ip()
if not ip: if not ip:
self._log_line("No IP available for ACK.") self._log_line("No IP available for ACK.")
return return
# Register/renew lease # Register/renew lease
expiry = time.time() + LEASE_SECONDS expiry = time.time() + lease_seconds
with self._lock: with self._lock:
self._leases[mac] = (ip, expiry) self._leases[mac] = (ip, expiry)
if mac not in self._active_clients: if mac not in self._active_clients:
self._active_clients.append(mac) self._active_clients.append(mac)
self._log_line(f"ACK {ip} to {mac}") if reserved_ip:
self._log_line(f"ACK (reserved) {ip} to {mac}")
else:
self._log_line(f"ACK {ip} to {mac}")
pkt = self._build_reply_packet( pkt = self._build_reply_packet(
msg_type=DHCPACK, mac=mac, xid=xid, yiaddr=ip, giaddr=giaddr msg_type=DHCPACK, mac=mac, xid=xid, yiaddr=ip, giaddr=giaddr, lease_seconds=lease_seconds
) )
self._send_broadcast(pkt) self._send_broadcast(pkt)
def _build_reply_packet(self, msg_type: int, mac: str, xid: int, yiaddr: str, giaddr: bytes) -> bytes: def _build_reply_packet(self, msg_type: int, mac: str, xid: int, yiaddr: str, giaddr: bytes, lease_seconds: int) -> bytes:
# BOOTP header # BOOTP header
op = 2 # reply op = 2 # reply
htype = 1 htype = 1
@@ -374,8 +526,52 @@ class DHCPServer:
dns_bytes = b"".join(ip2bytes(ip) for ip in self.dns_servers) dns_bytes = b"".join(ip2bytes(ip) for ip in self.dns_servers)
options.append((6, dns_bytes)) options.append((6, dns_bytes))
options.append((51, struct.pack("!I", LEASE_SECONDS))) # lease time # Numeric extended options
for code_str, val in self.extended_options.items():
try:
code = int(code_str)
except Exception:
self._log_line(f"Ignoring extended option with non-numeric code '{code_str}'")
continue
opt_bytes = self._encode_extended_option(code, val)
if opt_bytes is None:
continue
options.append((code, opt_bytes))
options.append((51, struct.pack("!I", lease_seconds))) # lease time
return hdr + build_options(options)
def _build_forcerenew_packet(self, mac: Optional[str], client_ip: str) -> Optional[bytes]:
"""Minimal FORCERENEW per RFC 3203 to trigger clients to renew."""
try:
ciaddr = ip2bytes(client_ip)
except Exception:
return None
op = 2
htype = 1
hlen = 6 if mac else 0
hops = 0
xid = 0
secs = 0
flags = 0
yiaddr_b = b"\x00\x00\x00\x00"
siaddr = ip2bytes(self.server_ip or "0.0.0.0")
giaddr_b = b"\x00\x00\x00\x00"
chaddr = (
bytes.fromhex(mac.replace(":", "")) + b"\x00" * 10
if mac
else b"\x00" * 16
)
sname = b"\x00" * 64
filef = b"\x00" * 128
hdr = struct.pack("!BBBBIHH4s4s4s4s16s64s128s",
op, htype, hlen, hops, xid, secs, flags,
ciaddr, yiaddr_b, siaddr, giaddr_b, chaddr, sname, filef)
options = [
(53, bytes([DHCPFORCERENEW])),
(54, siaddr),
]
return hdr + build_options(options) return hdr + build_options(options)
def _send_broadcast(self, pkt: bytes) -> None: def _send_broadcast(self, pkt: bytes) -> None:
@@ -393,3 +589,34 @@ class DHCPServer:
def _set_status(self, st: str) -> None: def _set_status(self, st: str) -> None:
self._status = st self._status = st
def _encode_extended_option(self, code: int, value: str) -> Optional[bytes]:
if not value:
return None
if code == 42:
# NTP servers as comma-separated IP list
parts = [p.strip() for p in value.split(",") if p.strip()]
ips = []
for p in parts:
try:
ips.append(ip2bytes(p))
except Exception:
continue
if not ips:
self._log_line("Extended option 42 skipped (no valid IPs).")
return None
return b"".join(ips)
if code == 51:
try:
lease_int = int(value)
if lease_int <= 0:
return None
return struct.pack("!I", lease_int)
except Exception:
self._log_line("Extended option 51 ignored (invalid integer).")
return None
try:
return value.encode("ascii")
except Exception:
self._log_line(f"Extended option {code} not sent (invalid encoding).")
return None

View File

@@ -1,10 +1,14 @@
import ipaddress
import json
import os import os
import socket import socket
import time
import tkinter as tk import tkinter as tk
from tkinter import messagebox, ttk from tkinter import filedialog, messagebox, ttk
from typing import Optional, List from typing import Dict, List, Optional, Tuple
from dhcp_server import DHCPServer from dhcp_server import DHCPServer
from option_specs import EXTENDED_OPTION_SPECS
import utils import utils
@@ -59,64 +63,84 @@ class DHCPApp(tk.Frame):
# Single server instance # Single server instance
self.server: Optional[DHCPServer] = None self.server: Optional[DHCPServer] = None
self.reservations: List[Dict[str, str]] = []
self.last_config_path: Optional[str] = None
self.ext_option_specs = list(EXTENDED_OPTION_SPECS)
self.ext_opt_values: Dict[str, tk.StringVar] = {}
self.ext_opt_enabled: Dict[str, tk.BooleanVar] = {}
# --- Top form --- # --- Toolbar ---
form = ttk.Frame(self) toolbar = ttk.Frame(self)
form.pack(fill="x", pady=(0, 8)) toolbar.pack(fill="x", pady=(0, 6))
self.btn_start = ttk.Button(toolbar, text="Start", command=self.start_server)
self.btn_stop = ttk.Button(toolbar, text="Stop", command=self.stop_server, state="disabled")
self.btn_rescan = ttk.Button(toolbar, text="Interfaces neu scannen", command=self.rescan_interfaces)
self.btn_load = ttk.Button(toolbar, text="Config laden", command=self.load_config)
self.btn_save = ttk.Button(toolbar, text="Config speichern", command=self.save_config)
self.btn_forcerenew = ttk.Button(toolbar, text="Force Renew", command=self.force_renew_all)
self.btn_start.pack(side="left", padx=(0, 6))
self.btn_stop.pack(side="left", padx=(0, 12))
self.btn_rescan.pack(side="left", padx=(0, 12))
self.btn_load.pack(side="left", padx=(0, 6))
self.btn_save.pack(side="left")
self.btn_forcerenew.pack(side="left", padx=(12, 0))
# Zeile 0 # --- Interface left, IP settings right ---
ttk.Label(form, text="Interface:").grid(row=0, column=0, sticky="w") top = ttk.Frame(self)
top.pack(fill="x", pady=(0, 10), anchor="w")
# Left: Interface box with dropdown + info
iface_box = ttk.Labelframe(top, text="Interface")
iface_box.pack(side="left", padx=(0, 8), fill="y")
iface_box.update_idletasks()
iface_box.configure(width=400)
iface_box.pack_propagate(False)
iface_box.columnconfigure(1, weight=1)
ttk.Label(iface_box, text="Interface:").grid(row=0, column=0, sticky="w", padx=(8, 4), pady=(8, 6))
self.if_var = tk.StringVar() self.if_var = tk.StringVar()
self.if_menu = ttk.OptionMenu(form, self.if_var, None, command=self.on_iface_change) self.if_combo = ttk.Combobox(iface_box, textvariable=self.if_var, state="readonly", width=18)
self.if_menu.grid(row=0, column=1, sticky="ew", padx=(6, 16)) self.if_combo.grid(row=0, column=1, sticky="ew", padx=(4, 8), pady=(8, 6))
form.columnconfigure(1, weight=1) self.if_combo.bind("<<ComboboxSelected>>", self.on_iface_change)
ttk.Label(form, text="Server/Start-IP:").grid(row=0, column=2, sticky="w") self.mac_var = tk.StringVar(value="-")
self.speed_var = tk.StringVar(value="-")
ttk.Label(iface_box, text="MAC:").grid(row=1, column=0, sticky="w", padx=(8, 4), pady=(2, 8))
ttk.Label(iface_box, textvariable=self.mac_var).grid(row=1, column=1, sticky="w", padx=(4, 8), pady=(2, 8))
ttk.Label(iface_box, text="Link:").grid(row=2, column=0, sticky="w", padx=(8, 4), pady=(0, 10))
ttk.Label(iface_box, textvariable=self.speed_var).grid(row=2, column=1, sticky="w", padx=(4, 8), pady=(0, 10))
# Right: IP settings box
ip_box = ttk.Labelframe(top, text="IP-Settings")
ip_box.pack(side="left", fill="y")
ip_box.update_idletasks()
ip_box.configure(width=420)
ip_box.pack_propagate(False)
ip_box.columnconfigure(1, weight=1)
ip_box.columnconfigure(3, weight=1)
ttk.Label(ip_box, text="Server/Start-IP:").grid(row=0, column=0, sticky="w", padx=(8, 4), pady=(8, 4))
self.ip_var = tk.StringVar(value="") self.ip_var = tk.StringVar(value="")
self.ip_entry = ttk.Entry(form, textvariable=self.ip_var, width=16) self.ip_entry = ttk.Entry(ip_box, textvariable=self.ip_var, width=16)
self.ip_entry.grid(row=0, column=3, sticky="w", padx=(6, 16)) self.ip_entry.grid(row=0, column=1, sticky="w", padx=(4, 12), pady=(8, 4))
ttk.Label(form, text="Subnetzmaske:").grid(row=0, column=4, sticky="w") ttk.Label(ip_box, text="Subnetzmaske:").grid(row=0, column=2, sticky="w", padx=(8, 4), pady=(8, 4))
self.mask_var = tk.StringVar(value="") self.mask_var = tk.StringVar(value="")
self.mask_entry = ttk.Entry(form, textvariable=self.mask_var, width=16) self.mask_entry = ttk.Entry(ip_box, textvariable=self.mask_var, width=16)
self.mask_entry.grid(row=0, column=5, sticky="w", padx=(6, 16)) self.mask_entry.grid(row=0, column=3, sticky="w", padx=(4, 12), pady=(8, 4))
# Zeile 1 DNS Felder ttk.Label(ip_box, text="Primary DNS:").grid(row=1, column=0, sticky="w", padx=(8, 4), pady=(4, 10))
ttk.Label(form, text="Primary DNS:").grid(row=1, column=0, sticky="w", pady=(6, 0))
self.dns1_var = tk.StringVar(value="") self.dns1_var = tk.StringVar(value="")
self.dns1_entry = ttk.Entry(form, textvariable=self.dns1_var, width=16) self.dns1_entry = ttk.Entry(ip_box, textvariable=self.dns1_var, width=16)
self.dns1_entry.grid(row=1, column=1, sticky="w", padx=(6, 16), pady=(6, 0)) self.dns1_entry.grid(row=1, column=1, sticky="w", padx=(4, 12), pady=(4, 10))
ttk.Label(form, text="Secondary DNS:").grid(row=1, column=2, sticky="w", pady=(6, 0)) ttk.Label(ip_box, text="Secondary DNS:").grid(row=1, column=2, sticky="w", padx=(8, 4), pady=(4, 10))
self.dns2_var = tk.StringVar(value="") self.dns2_var = tk.StringVar(value="")
self.dns2_entry = ttk.Entry(form, textvariable=self.dns2_var, width=16) self.dns2_entry = ttk.Entry(ip_box, textvariable=self.dns2_var, width=16)
self.dns2_entry.grid(row=1, column=3, sticky="w", padx=(6, 16), pady=(6, 0)) self.dns2_entry.grid(row=1, column=3, sticky="w", padx=(4, 12), pady=(4, 10))
# --- Buttons --- # --- Notebook with tabs ---
btns = ttk.Frame(self) self._build_tabs()
btns.pack(fill="x", pady=(8, 8))
self.btn_start = ttk.Button(btns, text="Start", command=self.start_server)
self.btn_stop = ttk.Button(btns, text="Stop", command=self.stop_server, state="disabled")
self.btn_rescan = ttk.Button(btns, text="Interfaces neu scannen", command=self.rescan_interfaces)
self.btn_start.pack(side="left")
self.btn_stop.pack(side="left", padx=(8, 0))
self.btn_rescan.pack(side="left", padx=(8, 0))
# --- Clients & Logs Paned Layout ---
self.paned = ttk.Panedwindow(self, orient="horizontal")
self.paned.pack(fill="both", expand=True)
# Clients list
left = ttk.Labelframe(self.paned, text="Aktive Clients / Leases")
self.clients = tk.Listbox(left, height=12)
self.clients.pack(fill="both", expand=True, padx=6, pady=6)
self.paned.add(left, weight=1)
# Logs
right = ttk.Labelframe(self.paned, text="Log")
self.log = tk.Text(right, height=12, state="disabled")
self.log.pack(fill="both", expand=True, padx=6, pady=6)
self.paned.add(right, weight=2)
# --- Status bar --- # --- Status bar ---
self.status_var = tk.StringVar(value="Bereit.") self.status_var = tk.StringVar(value="Bereit.")
@@ -125,12 +149,137 @@ class DHCPApp(tk.Frame):
self._refresh_interface_list(initial=True) self._refresh_interface_list(initial=True)
self._apply_min_sizes() self._apply_min_sizes()
self.paned.bind("<Configure>", lambda _e: self._enforce_pane_sizes())
self.after_idle(self._enforce_pane_sizes) # keep disabled option display in sync with GUI IP/DNS
for v in (self.ip_var, self.mask_var, self.dns1_var, self.dns2_var):
v.trace_add("write", lambda *_: self._refresh_disabled_ext_options())
self._refresh_disabled_ext_options()
# periodic refresh # periodic refresh
self.after(400, self._refresh) self.after(400, self._refresh)
# -------------------- UI builders --------------------
def _build_tabs(self) -> None:
self.notebook = ttk.Notebook(self)
self.notebook.pack(fill="both", expand=True)
self._build_lease_tab()
self._build_log_tab()
self._build_reservations_tab()
self._build_extended_options_tab()
def _build_lease_tab(self) -> None:
tab = ttk.Frame(self.notebook)
columns = ("mac", "ip", "remaining", "action")
self.leases_tree = ttk.Treeview(tab, columns=columns, show="headings", height=10)
self.leases_tree.heading("mac", text="MAC-Adresse")
self.leases_tree.heading("ip", text="IP-Adresse")
self.leases_tree.heading("remaining", text="Restlaufzeit")
self.leases_tree.heading("action", text="Reservierung")
self.leases_tree.column("mac", width=160, anchor="w")
self.leases_tree.column("ip", width=140, anchor="w")
self.leases_tree.column("remaining", width=120, anchor="w")
self.leases_tree.column("action", width=120, anchor="w")
scroll = ttk.Scrollbar(tab, orient="vertical", command=self.leases_tree.yview)
self.leases_tree.configure(yscrollcommand=scroll.set)
self.leases_tree.pack(side="left", fill="both", expand=True, padx=6, pady=6)
scroll.pack(side="right", fill="y")
self.leases_tree.bind("<ButtonRelease-1>", self._on_lease_click)
self.notebook.add(tab, text="Leases")
def _build_log_tab(self) -> None:
tab = ttk.Frame(self.notebook)
self.log = tk.Text(tab, height=12, state="disabled")
scroll = ttk.Scrollbar(tab, orient="vertical", command=self.log.yview)
self.log.configure(yscrollcommand=scroll.set)
self.log.pack(side="left", fill="both", expand=True, padx=6, pady=6)
scroll.pack(side="right", fill="y")
self.notebook.add(tab, text="Log")
def _build_reservations_tab(self) -> None:
tab = ttk.Frame(self.notebook)
tree_frame = ttk.Frame(tab)
tree_frame.pack(fill="both", expand=True, padx=6, pady=6)
columns = ("mac", "ip", "lease", "name")
self.res_tree = ttk.Treeview(tree_frame, columns=columns, show="headings", selectmode="extended", height=8)
self.res_tree.heading("mac", text="MAC-Adresse")
self.res_tree.heading("ip", text="IP-Adresse")
self.res_tree.heading("lease", text="Lease (s)")
self.res_tree.heading("name", text="Name/Notiz")
self.res_tree.column("mac", width=150, anchor="w")
self.res_tree.column("ip", width=140, anchor="w")
self.res_tree.column("lease", width=90, anchor="w")
self.res_tree.column("name", width=200, anchor="w")
scroll = ttk.Scrollbar(tree_frame, orient="vertical", command=self.res_tree.yview)
self.res_tree.configure(yscrollcommand=scroll.set)
self.res_tree.pack(side="left", fill="both", expand=True)
scroll.pack(side="right", fill="y")
self.res_tree.bind("<<TreeviewSelect>>", self._on_reservation_select)
form = ttk.Labelframe(tab, text="Eintrag bearbeiten")
form.pack(fill="x", padx=6, pady=(0, 8))
ttk.Label(form, text="MAC:").grid(row=0, column=0, sticky="w", padx=(6, 2), pady=4)
self.res_mac_var = tk.StringVar()
ttk.Entry(form, textvariable=self.res_mac_var, width=22).grid(row=0, column=1, sticky="w", pady=4)
ttk.Label(form, text="IP:").grid(row=0, column=2, sticky="w", padx=(12, 2), pady=4)
self.res_ip_var = tk.StringVar()
ttk.Entry(form, textvariable=self.res_ip_var, width=18).grid(row=0, column=3, sticky="w", pady=4)
ttk.Label(form, text="Lease (s):").grid(row=0, column=4, sticky="w", padx=(12, 2), pady=4)
self.res_lease_var = tk.StringVar()
ttk.Entry(form, textvariable=self.res_lease_var, width=10).grid(row=0, column=5, sticky="w", pady=4)
ttk.Label(form, text="Name/Notiz:").grid(row=0, column=6, sticky="w", padx=(12, 2), pady=4)
self.res_name_var = tk.StringVar()
ttk.Entry(form, textvariable=self.res_name_var, width=22).grid(row=0, column=7, sticky="w", pady=4)
actions = ttk.Frame(form)
actions.grid(row=0, column=8, sticky="e", padx=(12, 6))
ttk.Button(actions, text="Hinzufügen/Aktualisieren", command=self.add_or_update_reservation).pack(side="left", padx=(0, 6))
ttk.Button(actions, text="Löschen", command=self.remove_reservation).pack(side="left")
self.notebook.add(tab, text="Reservierungen")
def _build_extended_options_tab(self) -> None:
tab = ttk.Frame(self.notebook)
form = ttk.Frame(tab)
form.pack(fill="x", padx=8, pady=(8, 8))
ttk.Label(form, text="Aktiv").grid(row=0, column=0, sticky="w", padx=(0, 12))
ttk.Label(form, text="Option").grid(row=0, column=1, sticky="w", padx=(0, 12))
ttk.Label(form, text="Wert").grid(row=0, column=2, sticky="w", padx=(0, 12))
ttk.Label(form, text="Beschreibung").grid(row=0, column=3, sticky="w", padx=(0, 12))
form.columnconfigure(3, weight=1)
row = 1
for spec in self.ext_option_specs:
code = spec["code"]
default_val = spec.get("default", "")
mandatory = bool(spec.get("mandatory"))
self.ext_opt_values[code] = tk.StringVar(value=default_val)
self.ext_opt_enabled[code] = tk.BooleanVar(value=mandatory)
disabled = bool(spec.get("disabled"))
desc_txt = spec.get("desc", "")
if disabled:
desc_txt = f"{desc_txt} (aus GUI/Server, gesperrt)"
chk = ttk.Checkbutton(form, variable=self.ext_opt_enabled[code])
if disabled:
chk.state(["disabled"])
chk.grid(row=row, column=0, sticky="w", pady=4)
ttk.Label(form, text=code).grid(row=row, column=1, sticky="w", pady=4)
entry = ttk.Entry(form, textvariable=self.ext_opt_values[code], width=32, state="disabled" if disabled else "normal")
entry.grid(row=row, column=2, sticky="w", padx=(0, 12), pady=4)
ttk.Label(form, text=desc_txt).grid(row=row, column=3, sticky="w", padx=(0, 12), pady=4)
row += 1
ttk.Label(tab, text="Weitere Optionen kannst du in option_specs.py ergänzen.").pack(anchor="w", padx=8, pady=(4, 8))
self.notebook.add(tab, text="Extended Options")
# -------------------- UI logic -------------------- # -------------------- UI logic --------------------
def on_iface_change(self, _sel=None): def on_iface_change(self, _sel=None):
iface = self.if_var.get().strip() iface = self.if_var.get().strip()
@@ -147,10 +296,11 @@ class DHCPApp(tk.Frame):
if mask: if mask:
self.mask_var.set(mask) self.mask_var.set(mask)
# Secondary DNS bleibt leer, damit du frei wählen kannst # Secondary DNS bleibt leer, damit du frei wählen kannst
self._update_iface_info(iface)
def _set_controls_enabled(self, enabled: bool): def _set_controls_enabled(self, enabled: bool):
state = "normal" if enabled else "disabled" state = "readonly" if enabled else "disabled"
self.if_menu.configure(state=state) self.if_combo.configure(state=state)
self.ip_entry.configure(state=state) self.ip_entry.configure(state=state)
self.mask_entry.configure(state=state) self.mask_entry.configure(state=state)
self.dns1_entry.configure(state=state) self.dns1_entry.configure(state=state)
@@ -158,24 +308,25 @@ class DHCPApp(tk.Frame):
self.btn_rescan.configure(state=state) self.btn_rescan.configure(state=state)
self.btn_start.configure(state="normal" if enabled else "disabled") self.btn_start.configure(state="normal" if enabled else "disabled")
self.btn_stop.configure(state="disabled" if enabled else "normal") self.btn_stop.configure(state="disabled" if enabled else "normal")
self.btn_forcerenew.configure(state="disabled" if enabled else "normal")
def _clear_iface_fields(self) -> None: def _clear_iface_fields(self) -> None:
self.ip_var.set("") self.ip_var.set("")
self.mask_var.set("") self.mask_var.set("")
self.dns1_var.set("") self.dns1_var.set("")
self.dns2_var.set("") self.dns2_var.set("")
self.mac_var.set("-")
self.speed_var.set("-")
self._refresh_disabled_ext_options()
def _select_iface(self, iface: str) -> None: def _select_iface(self, iface: str) -> None:
self.if_var.set(iface) self.if_var.set(iface)
self.on_iface_change(iface) self.on_iface_change()
def _refresh_interface_list(self, initial: bool = False) -> None: def _refresh_interface_list(self, initial: bool = False) -> None:
interfaces = utils.get_network_interfaces() interfaces = utils.get_network_interfaces()
current = self.if_var.get().strip() current = self.if_var.get().strip()
menu = self.if_menu["menu"] self.if_combo["values"] = interfaces
menu.delete(0, "end")
for iface in interfaces:
menu.add_command(label=iface, command=lambda val=iface: self._select_iface(val))
if current in interfaces: if current in interfaces:
selection = current selection = current
@@ -193,30 +344,24 @@ class DHCPApp(tk.Frame):
self._clear_iface_fields() self._clear_iface_fields()
if not initial: if not initial:
self.status_var.set("Keine Netzwerk-Interfaces gefunden.") self.status_var.set("Keine Netzwerk-Interfaces gefunden.")
if selection:
self._update_iface_info(selection)
self._refresh_disabled_ext_options()
def _apply_min_sizes(self) -> None: def _apply_min_sizes(self) -> None:
# Ensure window cannot shrink below a comfortable layout # Ensure window cannot shrink below a comfortable layout
self.update_idletasks() self.update_idletasks()
min_w = max(self.winfo_reqwidth() + 16, 720) min_w = max(self.winfo_reqwidth() + 16, 840)
min_h = max(self.winfo_reqheight() + 16, 540) min_h = max(self.winfo_reqheight() + 16, 580)
self.master.minsize(min_w, min_h) self.master.minsize(min_w, min_h)
def _enforce_pane_sizes(self) -> None: def _update_iface_info(self, iface: str) -> None:
# prevent panes from collapsing too far mac, speed = utils.get_iface_hwinfo(iface)
try: self.mac_var.set(mac or "-")
if not self.paned or not self.paned.panes(): if speed is None or speed <= 0:
return self.speed_var.set("-")
total = self.paned.winfo_width() else:
left_min, right_min = 240, 360 self.speed_var.set(f"{speed} Mbit/s")
if total <= left_min + right_min:
return
pos = self.paned.sashpos(0)
if pos < left_min:
self.paned.sashpos(0, left_min)
elif pos > total - right_min:
self.paned.sashpos(0, total - right_min)
except Exception:
pass
def rescan_interfaces(self) -> None: def rescan_interfaces(self) -> None:
if self.server and self.server.is_running(): if self.server and self.server.is_running():
@@ -267,13 +412,28 @@ class DHCPApp(tk.Frame):
messagebox.showerror("Start fehlgeschlagen (Bind)", f"Konnte Port 67 auf {iface} nicht binden:\n{e}") messagebox.showerror("Start fehlgeschlagen (Bind)", f"Konnte Port 67 auf {iface} nicht binden:\n{e}")
return return
reservations = self._get_reservation_dict()
extended_opts = self._collect_extended_options()
try:
self._ensure_mandatory_ext_options(fail_on_missing=True)
except Exception as e:
messagebox.showerror("Pflicht-Option fehlt", str(e))
return
# --- Start --- # --- Start ---
try: try:
self.server = DHCPServer() self.server = DHCPServer()
# NEU: DNS-Liste an den Server übergeben self.server.start(
self.server.start(iface, ip, mask, dns_servers=dns_list) iface,
ip,
mask,
dns_servers=dns_list,
reservations=reservations,
extended_options=extended_opts,
)
self.status_var.set("Server gestartet.") self.status_var.set("Server gestartet.")
self._set_controls_enabled(False) self._set_controls_enabled(False)
self.btn_forcerenew.configure(state="normal")
except Exception as e: except Exception as e:
self.server = None self.server = None
self.status_var.set("Start fehlgeschlagen.") self.status_var.set("Start fehlgeschlagen.")
@@ -283,8 +443,208 @@ class DHCPApp(tk.Frame):
if self.server: if self.server:
self.server.stop() self.server.stop()
self._set_controls_enabled(True) self._set_controls_enabled(True)
self.btn_forcerenew.configure(state="disabled")
self.status_var.set("Server gestoppt.") self.status_var.set("Server gestoppt.")
# -------------------- Reservations --------------------
def _refresh_reservations_view(self) -> None:
for item in self.res_tree.get_children():
self.res_tree.delete(item)
for entry in self.reservations:
lease_raw = entry.get("lease_time", "")
lease = lease_raw if lease_raw is not None else ""
self.res_tree.insert("", "end", values=(entry.get("mac", ""), entry.get("ip", ""), lease, entry.get("name", "")))
def add_or_update_reservation(self) -> None:
mac = self.res_mac_var.get().strip()
ip = self.res_ip_var.get().strip()
lease = self.res_lease_var.get().strip()
name = self.res_name_var.get().strip()
if not mac or not ip:
messagebox.showerror("Fehler", "MAC und IP sind erforderlich.")
return
try:
mac_norm = utils.normalize_mac(mac)
ip_norm = utils.format_ip_address(ip)
lease_val = None
if lease:
lease_int = int(lease)
if lease_int <= 0:
raise ValueError("Lease muss > 0 sein.")
lease_val = lease_int
except Exception as e:
messagebox.showerror("Ungültige Eingabe", str(e))
return
existing = next((r for r in self.reservations if r.get("mac") == mac_norm), None)
if existing:
existing.update({"ip": ip_norm, "name": name, "lease_time": lease_val})
else:
self.reservations.append({"mac": mac_norm, "ip": ip_norm, "name": name, "lease_time": lease_val})
self._refresh_reservations_view()
def remove_reservation(self) -> None:
selected = self.res_tree.selection()
if not selected:
return
macs = {self.res_tree.item(item, "values")[0] for item in selected}
self.reservations = [r for r in self.reservations if r.get("mac") not in macs]
self._refresh_reservations_view()
def _on_reservation_select(self, _event=None) -> None:
selected = self.res_tree.selection()
if not selected:
return
vals = self.res_tree.item(selected[0], "values")
if vals:
self.res_mac_var.set(vals[0])
self.res_ip_var.set(vals[1])
self.res_lease_var.set(vals[2])
self.res_name_var.set(vals[3] if len(vals) > 3 else "")
def _get_reservation_dict(self) -> List[Dict[str, str]]:
return [
{
"mac": entry["mac"],
"ip": entry["ip"],
"name": entry.get("name", ""),
"lease_time": entry.get("lease_time"),
}
for entry in self.reservations
if entry.get("mac") and entry.get("ip")
]
def _has_reservation(self, mac: str) -> bool:
return any(r.get("mac") == mac for r in self.reservations)
# -------------------- Extended Options --------------------
def _collect_extended_options(self) -> Dict[str, str]:
opts: Dict[str, Dict[str, str]] = {}
for spec in self.ext_option_specs:
code = spec["code"]
if spec.get("disabled"):
continue
val = (self.ext_opt_values.get(code) or tk.StringVar()).get().strip()
enabled = bool((self.ext_opt_enabled.get(code) or tk.BooleanVar()).get())
if spec.get("mandatory") and not val:
continue
if (enabled or spec.get("mandatory")) and val:
opts[code] = {"value": val, "enabled": True}
return opts
# -------------------- Config load/save --------------------
def _collect_config(self) -> Dict:
dns_entries = [d for d in (self.dns1_var.get().strip(), self.dns2_var.get().strip()) if d]
return {
"interface": self.if_var.get().strip(),
"server_ip": self.ip_var.get().strip(),
"subnet_mask": self.mask_var.get().strip(),
"dns": dns_entries,
"reservations": self.reservations,
"extended_options": {
spec["code"]: {
"value": (self.ext_opt_values[spec["code"]].get().strip()),
"enabled": bool(self.ext_opt_enabled[spec["code"]].get()),
}
for spec in self.ext_option_specs
if not spec.get("disabled")
},
}
def _apply_config(self, cfg: Dict) -> None:
self.if_var.set(cfg.get("interface", ""))
self.ip_var.set(cfg.get("server_ip", ""))
self.mask_var.set(cfg.get("subnet_mask", ""))
dns = cfg.get("dns") or []
self.dns1_var.set(dns[0] if len(dns) > 0 else "")
self.dns2_var.set(dns[1] if len(dns) > 1 else "")
self._refresh_disabled_ext_options()
# reservations
self.reservations = []
for item in cfg.get("reservations") or []:
mac = item.get("mac") or ""
ip = item.get("ip") or ""
name = item.get("name") or ""
lease_time = item.get("lease_time")
try:
lease_time = int(lease_time) if lease_time not in (None, "", "None") else None
if lease_time is not None and lease_time <= 0:
lease_time = None
except Exception:
lease_time = None
try:
mac_norm = utils.normalize_mac(mac)
ip_norm = utils.format_ip_address(ip)
except Exception:
continue
self.reservations.append({"mac": mac_norm, "ip": ip_norm, "name": name, "lease_time": lease_time})
self._refresh_reservations_view()
ext = cfg.get("extended_options") or {}
# First clear
for code in self.ext_option_specs:
default_val = code.get("default", "")
self.ext_opt_values[code["code"]].set(default_val)
self.ext_opt_enabled[code["code"]].set(bool(code.get("mandatory")))
self._refresh_disabled_ext_options()
# Apply values if present (numeric keys preferred)
for code in self.ext_option_specs:
c = code["code"]
if code.get("disabled"):
continue
entry = ext.get(c)
if entry is None:
continue
if isinstance(entry, dict):
self.ext_opt_values[c].set(entry.get("value", "") or code.get("default", ""))
self.ext_opt_enabled[c].set(bool(entry.get("enabled")) or bool(code.get("mandatory")))
else:
self.ext_opt_values[c].set(str(entry))
self.ext_opt_enabled[c].set(True)
# Enforce mandatory defaults if still empty
self._ensure_mandatory_ext_options(fail_on_missing=False)
def load_config(self) -> None:
path = filedialog.askopenfilename(
title="Config laden",
filetypes=[("JSON", "*.json"), ("Alle Dateien", "*.*")],
initialfile=os.path.basename(self.last_config_path or ""),
)
if not path:
return
try:
with open(path, "r", encoding="utf-8") as f:
cfg = json.load(f)
except Exception as e:
messagebox.showerror("Fehler beim Laden", str(e))
return
self._apply_config(cfg)
self.last_config_path = path
self.status_var.set(f"Config geladen: {os.path.basename(path)}")
def save_config(self) -> None:
path = filedialog.asksaveasfilename(
title="Config speichern",
defaultextension=".json",
filetypes=[("JSON", "*.json"), ("Alle Dateien", "*.*")],
initialfile=os.path.basename(self.last_config_path or "dhcp_config.json"),
)
if not path:
return
cfg = self._collect_config()
try:
with open(path, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2)
except Exception as e:
messagebox.showerror("Fehler beim Speichern", str(e))
return
self.last_config_path = path
self.status_var.set(f"Config gespeichert: {os.path.basename(path)}")
# -------------------- Refresh / Log --------------------
def _append_log(self, lines): def _append_log(self, lines):
if not lines: if not lines:
return return
@@ -294,14 +654,135 @@ class DHCPApp(tk.Frame):
self.log.see("end") self.log.see("end")
self.log.configure(state="disabled") self.log.configure(state="disabled")
def _update_leases_view(self, leases: Dict[str, Tuple[str, float]]) -> None:
current_items = {self.leases_tree.item(i, "values")[0]: i for i in self.leases_tree.get_children()}
# remove stale
for mac in set(current_items.keys()) - set(leases.keys()):
self.leases_tree.delete(current_items[mac])
# add/update
now = time.time()
for mac, (ip, expiry) in sorted(leases.items()):
remaining = max(int(expiry - now), 0)
remaining_txt = self._format_duration(remaining)
action = "Reserviert" if self._has_reservation(mac) else "Add"
if mac in current_items:
self.leases_tree.item(current_items[mac], values=(mac, ip, remaining_txt, action))
else:
self.leases_tree.insert("", "end", values=(mac, ip, remaining_txt, action))
def _refresh(self) -> None: def _refresh(self) -> None:
# update clients leases: Dict[str, Tuple[str, float]] = {}
self.clients.delete(0, "end")
if self.server and self.server.is_running(): if self.server and self.server.is_running():
leases = self.server.get_leases() leases = self.server.get_leases_with_expiry()
for mac, ip in leases.items():
self.clients.insert("end", f"{mac}{ip}")
self.status_var.set(f"Status: {self.server.get_status()}") self.status_var.set(f"Status: {self.server.get_status()}")
# pull logs # pull logs
self._append_log(self.server.pop_logs(100)) self._append_log(self.server.pop_logs(100))
else:
self.status_var.set("Bereit.")
self._update_leases_view(leases)
self.after(400, self._refresh) self.after(400, self._refresh)
def _refresh_disabled_ext_options(self) -> None:
# Update display values for disabled, auto-managed DHCP options
dns_list = [d for d in (self.dns1_var.get().strip(), self.dns2_var.get().strip()) if d]
dns_text = ", ".join(dns_list)
auto_values = {
"1": self.mask_var.get().strip(),
"3": self.ip_var.get().strip(),
"6": dns_text,
}
for spec in self.ext_option_specs:
if not spec.get("disabled"):
continue
code = spec["code"]
if code in auto_values:
self.ext_opt_values[code].set(auto_values[code])
# -------------------- Force Renew --------------------
def force_renew_all(self) -> None:
if not self.server or not self.server.is_running():
messagebox.showinfo("Info", "Server läuft nicht.")
return
ip = self.ip_var.get().strip()
mask = self.mask_var.get().strip()
try:
net = ipaddress.IPv4Network((ip, mask), strict=False)
except Exception:
messagebox.showerror("Fehler", "Ungültiges Netz (IP/Subnetzmaske) für Force Renew.")
return
host_count = len(list(net.hosts()))
targets = max(host_count - 1, 0) # exclude server IP roughly
extra_warn = ""
if targets > 512:
extra_warn = f"\nACHTUNG: {targets} Ziele das kann dauern."
if not messagebox.askyesno(
"Warnung",
f"DHCPFORCERENEW wird an alle {targets} IPs im Subnetz {net} gesendet.\n"
"Viele Clients ignorieren das, aber es erzeugt Traffic." + extra_warn,
icon="warning",
):
return
try:
sent = self.server.force_renew_all()
self.status_var.set(f"FORCERENEW gesendet an {sent} Client(s).")
except Exception as e:
messagebox.showerror("Fehler", str(e))
# -------------------- Leases actions --------------------
def _on_lease_click(self, event) -> None:
item_id = self.leases_tree.identify_row(event.y)
col = self.leases_tree.identify_column(event.x)
if not item_id or col != "#4": # action column
return
vals = self.leases_tree.item(item_id, "values")
if len(vals) < 4:
return
mac, ip, action = vals[0], vals[1], vals[3]
if action != "Add":
return
self._add_reservation_from_lease(mac, ip)
def _add_reservation_from_lease(self, mac: str, ip: str) -> None:
if self._has_reservation(mac):
messagebox.showinfo("Info", "Reservierung existiert bereits.")
return
self.res_mac_var.set(mac)
self.res_ip_var.set(ip)
self.res_name_var.set("")
self.add_or_update_reservation()
# -------------------- Helpers --------------------
def _format_duration(self, seconds: int) -> str:
if seconds <= 0:
return "0s"
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
if h > 0:
return f"{h}h {m}m"
if m > 0:
return f"{m}m {s}s"
return f"{s}s"
def _ensure_mandatory_ext_options(self, fail_on_missing: bool) -> None:
missing = []
for spec in self.ext_option_specs:
if spec.get("disabled"):
continue
if not spec.get("mandatory"):
continue
code = spec["code"]
val = self.ext_opt_values[code].get().strip()
if not val:
default_val = spec.get("default", "")
if default_val:
self.ext_opt_values[code].set(default_val)
val = default_val
if not val:
missing.append(f"Option {code} ({spec.get('label','')})")
else:
self.ext_opt_enabled[code].set(True)
if missing and fail_on_missing:
raise ValueError("Folgende Pflicht-Optionen fehlen: " + ", ".join(missing))

18
src/option_specs.py Normal file
View File

@@ -0,0 +1,18 @@
"""
Definitions for supported extended DHCP options in the GUI.
Extend this list to add more options; codes must be numeric strings.
"""
EXTENDED_OPTION_SPECS = [
# Managed internally (anzeige, aber nicht editierbar)
{"code": "1", "label": "Subnet Mask", "desc": "Automatisch aus IP-Settings", "type": "text", "disabled": True},
{"code": "3", "label": "Router/Gateway", "desc": "Automatisch auf Server-IP", "type": "text", "disabled": True},
{"code": "6", "label": "DNS-Server", "desc": "Aus IP-Settings (GUI)", "type": "text", "disabled": True},
# Freie Optionen (aufsteigend)
{"code": "15", "label": "DNS-Suffix", "desc": "DNS-Domäne", "type": "text"},
{"code": "42", "label": "NTP-Server", "desc": "NTP-Server IPs (kommagetrennt)", "type": "text"},
{"code": "51", "label": "Lease Time", "desc": "Lease in Sekunden (mandatory)", "type": "text", "mandatory": True, "default": "3600"},
{"code": "66", "label": "TFTP-Server", "desc": "TFTP-Servername oder IP", "type": "text"},
{"code": "67", "label": "Bootfile", "desc": "Bootfile-Name (PXE)", "type": "text"},
]

View File

@@ -1,8 +1,11 @@
import socket
import ipaddress import ipaddress
import psutil import socket
import string
from typing import Optional, Tuple from typing import Optional, Tuple
import psutil
def get_network_interfaces(): def get_network_interfaces():
"""Return only interfaces that have an IPv4 address to keep choices sane.""" """Return only interfaces that have an IPv4 address to keep choices sane."""
interfaces = [] interfaces = []
@@ -21,6 +24,19 @@ def get_iface_ipv4_config(iface: str) -> Tuple[Optional[str], Optional[str]]:
return a.address, a.netmask return a.address, a.netmask
return None, None return None, None
def get_iface_hwinfo(iface: str) -> Tuple[Optional[str], Optional[int]]:
"""Return (MAC, speed_mbps) where available."""
mac = None
addrs = psutil.net_if_addrs().get(iface, [])
for a in addrs:
if getattr(a, "family", None) == getattr(psutil, "AF_LINK", None):
mac = a.address
break
stats = psutil.net_if_stats().get(iface)
speed = stats.speed if stats else None
return mac, speed
def format_ip_address(ip: str) -> str: def format_ip_address(ip: str) -> str:
"""Normalize dotted quad; raises on invalid input.""" """Normalize dotted quad; raises on invalid input."""
ip_obj = ipaddress.ip_address(ip) ip_obj = ipaddress.ip_address(ip)
@@ -31,3 +47,11 @@ def format_ip_address(ip: str) -> str:
def validate_subnet_mask(mask: str) -> None: def validate_subnet_mask(mask: str) -> None:
"""Accept masks like 255.255.255.0; raise ValueError if invalid.""" """Accept masks like 255.255.255.0; raise ValueError if invalid."""
ipaddress.IPv4Network(f"0.0.0.0/{mask}") ipaddress.IPv4Network(f"0.0.0.0/{mask}")
def normalize_mac(mac: str) -> str:
"""Return MAC as AA:BB:CC:DD:EE:FF; raises on invalid input."""
cleaned = mac.strip().replace("-", "").replace(":", "").replace(".", "").lower()
if len(cleaned) != 12 or any(c not in string.hexdigits for c in cleaned):
raise ValueError("Ungültige MAC-Adresse.")
return ":".join(cleaned[i:i+2] for i in range(0, 12, 2)).upper()