From eebc5296e0c3297e38b86f04343bf034e0739f56 Mon Sep 17 00:00:00 2001 From: Marcel Peterkau Date: Sat, 27 Dec 2025 09:52:16 +0100 Subject: [PATCH] DHCP-Server um Reservierungen, Extended Options und Force Renew erweitern --- src/dhcp_server.py | 259 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 243 insertions(+), 16 deletions(-) diff --git a/src/dhcp_server.py b/src/dhcp_server.py index 4e035e0..5571f88 100644 --- a/src/dhcp_server.py +++ b/src/dhcp_server.py @@ -1,13 +1,16 @@ import os +import select import socket import struct import threading import time -import select from collections import deque from typing import Dict, List, Optional, Tuple + import ipaddress +from utils import normalize_mac + MAGIC_COOKIE = b"\x63\x82\x53\x63" @@ -20,8 +23,9 @@ DHCPACK = 5 DHCPNAK = 6 DHCPRELEASE = 7 DHCPINFORM = 8 +DHCPFORCERENEW = 9 -LEASE_SECONDS = 3600 # 1 hour leases +LEASE_SECONDS = 3600 # default 1 hour leases def mac_bytes_to_str(b: bytes) -> str: @@ -94,18 +98,32 @@ class DHCPServer: # NEW: configurable DNS servers for DHCP Option 6 self.dns_servers: List[str] = [] + self.reservations: Dict[str, Dict[str, Optional[int]]] = {} # normalized MAC -> {"ip": str, "lease": Optional[int]} + self.extended_options: Dict[str, str] = {} + self._network: Optional[ipaddress.IPv4Network] = None + self.default_lease_seconds: int = LEASE_SECONDS # -------------------- Public API -------------------- - def start(self, interface: str, start_ip: str, mask: str, dns_servers: Optional[List[str]] = None) -> None: + def start( + self, + interface: str, + start_ip: str, + mask: str, + dns_servers: Optional[List[str]] = None, + reservations: Optional[List[Dict[str, str]]] = None, + extended_options: Optional[Dict[str, Dict[str, str]]] = None, + ) -> None: if self.is_running(): raise RuntimeError("Server is already running.") self.interface = interface self.ip_start = start_ip self.subnet_mask = mask + self._network = ipaddress.IPv4Network((start_ip, mask), strict=False) + self.default_lease_seconds = LEASE_SECONDS # build pool based on interface network self.server_ip = start_ip # in this lab version we use the provided IP as "server identifier" - self._pool = self._build_pool(start_ip, mask) + self._pool = self._build_pool(self._network, start_ip) # configure DNS (filter invalid entries); default to [server_ip, 8.8.8.8] self.dns_servers = [] @@ -119,6 +137,25 @@ class DHCPServer: if not self.dns_servers and self.server_ip: self.dns_servers = [self.server_ip, "8.8.8.8"] + # static MAC reservations + self.reservations = self._prepare_reservations(reservations or []) + # extended DHCP options (numeric keyed) + self.extended_options = {} + if extended_options: + for code, data in extended_options.items(): + val = data.get("value") if isinstance(data, dict) else data + if val: + self.extended_options[str(code)] = str(val) + # If option 51 is provided, use it as default lease + opt51 = self.extended_options.get("51") + if opt51: + try: + lease_int = int(opt51) + if lease_int > 0: + self.default_lease_seconds = lease_int + except Exception: + self._log_line("Extended option 51 ignored (invalid integer).") + self._stop_evt.clear() self._thread = threading.Thread(target=self._run_loop, name="DHCPWorker", daemon=True) self._thread.start() @@ -150,6 +187,10 @@ class DHCPServer: with self._lock: return {mac: ip for mac, (ip, _) in self._leases.items()} + def get_leases_with_expiry(self) -> Dict[str, Tuple[str, float]]: + with self._lock: + return {mac: (ip, expiry) for mac, (ip, expiry) in self._leases.items()} + def get_status(self) -> str: return self._status @@ -160,6 +201,38 @@ class DHCPServer: items.append(self._log.popleft()) return items + def force_renew_all(self) -> int: + """ + Send DHCPFORCERENEW across the subnet (all hosts). + Uses known MAC from leases when available; otherwise sends with empty CHADDR. + Returns count of attempted sends. + """ + if not self.is_running() or not self._sock: + raise RuntimeError("Server not running.") + if not self._network: + raise RuntimeError("Subnet unbekannt.") + + with self._lock: + lease_items = list(self._leases.items()) # (mac, (ip, expiry)) + lease_by_ip = {ip: mac for mac, (ip, _) in lease_items} + + sent = 0 + for host in self._network.hosts(): + ip = str(host) + if ip == self.server_ip: + continue + mac = lease_by_ip.get(ip) + pkt = self._build_forcerenew_packet(mac, ip) + if not pkt: + continue + try: + self._sock.sendto(pkt, (ip, 68)) + sent += 1 + except Exception as e: + self._log_line(f"FORCERENEW to {mac}/{ip} failed: {e}") + self._log_line(f"FORCERENEW sent to {sent} client(s).") + return sent + # -------------------- Internals -------------------- def _run_loop(self) -> None: try: @@ -238,10 +311,9 @@ class DHCPServer: if mac in self._active_clients: self._active_clients.remove(mac) - def _build_pool(self, start_ip: str, mask: str) -> List[str]: + def _build_pool(self, network: ipaddress.IPv4Network, start_ip: str) -> List[str]: # Build pool from the network containing start_ip with provided mask - net = ipaddress.IPv4Network((start_ip, mask), strict=False) - all_hosts = list(net.hosts()) + all_hosts = list(network.hosts()) # Begin at/after start_ip; exclude start_ip (we use it as server/gateway) try: start_idx = all_hosts.index(ipaddress.IPv4Address(start_ip)) @@ -256,6 +328,61 @@ class DHCPServer: pool.remove(start_ip) return pool + def _prepare_reservations(self, reservations: List[Dict[str, str]]) -> Dict[str, Dict[str, Optional[int]]]: + prepared: Dict[str, Dict[str, Optional[int]]] = {} + for res in reservations: + mac_raw = res.get("mac", "") + ip = res.get("ip", "") + lease = res.get("lease_time") + try: + lease_int = int(lease) if lease not in (None, "") else None + if lease_int is not None and lease_int <= 0: + lease_int = None + except Exception: + lease_int = None + try: + mac_norm = normalize_mac(mac_raw) + except Exception: + self._log_line(f"Ignoring reservation '{mac_raw}': invalid MAC") + continue + try: + ip_norm = str(ipaddress.IPv4Address(ip)) + except Exception: + self._log_line(f"Ignoring reservation {mac_norm}: invalid IP '{ip}'") + continue + if ip_norm == self.server_ip: + self._log_line(f"Ignoring reservation {mac_norm}: IP {ip_norm} is server IP") + continue + if self._network and ipaddress.IPv4Address(ip_norm) not in self._network: + self._log_line(f"Ignoring reservation {mac_norm}: IP {ip_norm} not in subnet") + continue + prepared[mac_norm] = {"ip": ip_norm, "lease": lease_int} + if ip_norm in self._pool: + self._pool.remove(ip_norm) + return prepared + + def _reserved_ip_for(self, mac: str) -> Optional[str]: + entry = self.reservations.get(mac.upper()) + return entry["ip"] if entry else None + + def _reserved_lease_for(self, mac: str) -> Optional[int]: + entry = self.reservations.get(mac.upper()) + return entry.get("lease") if entry else None + + def _is_ip_available(self, ip: Optional[str], mac: str) -> bool: + if not ip: + return False + # Reserved for different MAC? + for res_mac, res in self.reservations.items(): + res_ip = res.get("ip") + if res_ip == ip and res_mac != mac: + return False + with self._lock: + for lease_mac, (lease_ip, _) in self._leases.items(): + if lease_ip == ip and lease_mac != mac: + return False + return True + def _next_free_ip(self) -> Optional[str]: with self._lock: used = {ip for ip, _ in self._leases.values()} @@ -309,37 +436,62 @@ class DHCPServer: pass def _dhcp_offer(self, mac: str, xid: int, giaddr: bytes) -> None: - ip = self._next_free_ip() + reserved_ip = self._reserved_ip_for(mac) + lease_seconds = self._reserved_lease_for(mac) or self.default_lease_seconds + if reserved_ip and self._is_ip_available(reserved_ip, mac): + ip = reserved_ip + else: + ip = self._next_free_ip() if not ip: self._log_line("No free IP to offer.") return - self._log_line(f"OFFER {ip} to {mac}") + if reserved_ip: + self._log_line(f"OFFER (reserved) {ip} to {mac}") + else: + self._log_line(f"OFFER {ip} to {mac}") pkt = self._build_reply_packet( - msg_type=DHCPOFFER, mac=mac, xid=xid, yiaddr=ip, giaddr=giaddr + msg_type=DHCPOFFER, mac=mac, xid=xid, yiaddr=ip, giaddr=giaddr, lease_seconds=lease_seconds ) self._send_broadcast(pkt) def _dhcp_ack(self, mac: str, xid: int, giaddr: bytes, requested_ip: Optional[str]) -> None: # If requested IP is available or already leased to this MAC, use it; else choose next free + reserved_ip = self._reserved_ip_for(mac) + lease_seconds = self._reserved_lease_for(mac) or self.default_lease_seconds + if reserved_ip and not self._is_ip_available(reserved_ip, mac): + reserved_ip = None + with self._lock: current = self._leases.get(mac, (None, 0))[0] - ip = requested_ip or current or self._next_free_ip() + ip: Optional[str] = None + if reserved_ip: + ip = reserved_ip + elif requested_ip and self._is_ip_available(requested_ip, mac): + ip = requested_ip + elif current and self._is_ip_available(current, mac): + ip = current + else: + ip = self._next_free_ip() + if not ip: self._log_line("No IP available for ACK.") return # Register/renew lease - expiry = time.time() + LEASE_SECONDS + expiry = time.time() + lease_seconds with self._lock: self._leases[mac] = (ip, expiry) if mac not in self._active_clients: self._active_clients.append(mac) - self._log_line(f"ACK {ip} to {mac}") + if reserved_ip: + self._log_line(f"ACK (reserved) {ip} to {mac}") + else: + self._log_line(f"ACK {ip} to {mac}") pkt = self._build_reply_packet( - msg_type=DHCPACK, mac=mac, xid=xid, yiaddr=ip, giaddr=giaddr + msg_type=DHCPACK, mac=mac, xid=xid, yiaddr=ip, giaddr=giaddr, lease_seconds=lease_seconds ) self._send_broadcast(pkt) - def _build_reply_packet(self, msg_type: int, mac: str, xid: int, yiaddr: str, giaddr: bytes) -> bytes: + def _build_reply_packet(self, msg_type: int, mac: str, xid: int, yiaddr: str, giaddr: bytes, lease_seconds: int) -> bytes: # BOOTP header op = 2 # reply htype = 1 @@ -374,8 +526,52 @@ class DHCPServer: dns_bytes = b"".join(ip2bytes(ip) for ip in self.dns_servers) options.append((6, dns_bytes)) - options.append((51, struct.pack("!I", LEASE_SECONDS))) # lease time + # Numeric extended options + for code_str, val in self.extended_options.items(): + try: + code = int(code_str) + except Exception: + self._log_line(f"Ignoring extended option with non-numeric code '{code_str}'") + continue + opt_bytes = self._encode_extended_option(code, val) + if opt_bytes is None: + continue + options.append((code, opt_bytes)) + options.append((51, struct.pack("!I", lease_seconds))) # lease time + + return hdr + build_options(options) + + def _build_forcerenew_packet(self, mac: Optional[str], client_ip: str) -> Optional[bytes]: + """Minimal FORCERENEW per RFC 3203 to trigger clients to renew.""" + try: + ciaddr = ip2bytes(client_ip) + except Exception: + return None + op = 2 + htype = 1 + hlen = 6 if mac else 0 + hops = 0 + xid = 0 + secs = 0 + flags = 0 + yiaddr_b = b"\x00\x00\x00\x00" + siaddr = ip2bytes(self.server_ip or "0.0.0.0") + giaddr_b = b"\x00\x00\x00\x00" + chaddr = ( + bytes.fromhex(mac.replace(":", "")) + b"\x00" * 10 + if mac + else b"\x00" * 16 + ) + sname = b"\x00" * 64 + filef = b"\x00" * 128 + hdr = struct.pack("!BBBBIHH4s4s4s4s16s64s128s", + op, htype, hlen, hops, xid, secs, flags, + ciaddr, yiaddr_b, siaddr, giaddr_b, chaddr, sname, filef) + options = [ + (53, bytes([DHCPFORCERENEW])), + (54, siaddr), + ] return hdr + build_options(options) def _send_broadcast(self, pkt: bytes) -> None: @@ -393,3 +589,34 @@ class DHCPServer: def _set_status(self, st: str) -> None: self._status = st + + def _encode_extended_option(self, code: int, value: str) -> Optional[bytes]: + if not value: + return None + if code == 42: + # NTP servers as comma-separated IP list + parts = [p.strip() for p in value.split(",") if p.strip()] + ips = [] + for p in parts: + try: + ips.append(ip2bytes(p)) + except Exception: + continue + if not ips: + self._log_line("Extended option 42 skipped (no valid IPs).") + return None + return b"".join(ips) + if code == 51: + try: + lease_int = int(value) + if lease_int <= 0: + return None + return struct.pack("!I", lease_int) + except Exception: + self._log_line("Extended option 51 ignored (invalid integer).") + return None + try: + return value.encode("ascii") + except Exception: + self._log_line(f"Extended option {code} not sent (invalid encoding).") + return None