DHCP-Server um Reservierungen, Extended Options und Force Renew erweitern

This commit is contained in:
2025-12-27 09:52:16 +01:00
parent 2276c83443
commit eebc5296e0

View File

@@ -1,13 +1,16 @@
import os
import select
import socket
import struct
import threading
import time
import select
from collections import deque
from typing import Dict, List, Optional, Tuple
import ipaddress
from utils import normalize_mac
MAGIC_COOKIE = b"\x63\x82\x53\x63"
@@ -20,8 +23,9 @@ DHCPACK = 5
DHCPNAK = 6
DHCPRELEASE = 7
DHCPINFORM = 8
DHCPFORCERENEW = 9
LEASE_SECONDS = 3600 # 1 hour leases
LEASE_SECONDS = 3600 # default 1 hour leases
def mac_bytes_to_str(b: bytes) -> str:
@@ -94,18 +98,32 @@ class DHCPServer:
# NEW: configurable DNS servers for DHCP Option 6
self.dns_servers: List[str] = []
self.reservations: Dict[str, Dict[str, Optional[int]]] = {} # normalized MAC -> {"ip": str, "lease": Optional[int]}
self.extended_options: Dict[str, str] = {}
self._network: Optional[ipaddress.IPv4Network] = None
self.default_lease_seconds: int = LEASE_SECONDS
# -------------------- Public API --------------------
def start(self, interface: str, start_ip: str, mask: str, dns_servers: Optional[List[str]] = None) -> None:
def start(
self,
interface: str,
start_ip: str,
mask: str,
dns_servers: Optional[List[str]] = None,
reservations: Optional[List[Dict[str, str]]] = None,
extended_options: Optional[Dict[str, Dict[str, str]]] = None,
) -> None:
if self.is_running():
raise RuntimeError("Server is already running.")
self.interface = interface
self.ip_start = start_ip
self.subnet_mask = mask
self._network = ipaddress.IPv4Network((start_ip, mask), strict=False)
self.default_lease_seconds = LEASE_SECONDS
# build pool based on interface network
self.server_ip = start_ip # in this lab version we use the provided IP as "server identifier"
self._pool = self._build_pool(start_ip, mask)
self._pool = self._build_pool(self._network, start_ip)
# configure DNS (filter invalid entries); default to [server_ip, 8.8.8.8]
self.dns_servers = []
@@ -119,6 +137,25 @@ class DHCPServer:
if not self.dns_servers and self.server_ip:
self.dns_servers = [self.server_ip, "8.8.8.8"]
# static MAC reservations
self.reservations = self._prepare_reservations(reservations or [])
# extended DHCP options (numeric keyed)
self.extended_options = {}
if extended_options:
for code, data in extended_options.items():
val = data.get("value") if isinstance(data, dict) else data
if val:
self.extended_options[str(code)] = str(val)
# If option 51 is provided, use it as default lease
opt51 = self.extended_options.get("51")
if opt51:
try:
lease_int = int(opt51)
if lease_int > 0:
self.default_lease_seconds = lease_int
except Exception:
self._log_line("Extended option 51 ignored (invalid integer).")
self._stop_evt.clear()
self._thread = threading.Thread(target=self._run_loop, name="DHCPWorker", daemon=True)
self._thread.start()
@@ -150,6 +187,10 @@ class DHCPServer:
with self._lock:
return {mac: ip for mac, (ip, _) in self._leases.items()}
def get_leases_with_expiry(self) -> Dict[str, Tuple[str, float]]:
with self._lock:
return {mac: (ip, expiry) for mac, (ip, expiry) in self._leases.items()}
def get_status(self) -> str:
return self._status
@@ -160,6 +201,38 @@ class DHCPServer:
items.append(self._log.popleft())
return items
def force_renew_all(self) -> int:
"""
Send DHCPFORCERENEW across the subnet (all hosts).
Uses known MAC from leases when available; otherwise sends with empty CHADDR.
Returns count of attempted sends.
"""
if not self.is_running() or not self._sock:
raise RuntimeError("Server not running.")
if not self._network:
raise RuntimeError("Subnet unbekannt.")
with self._lock:
lease_items = list(self._leases.items()) # (mac, (ip, expiry))
lease_by_ip = {ip: mac for mac, (ip, _) in lease_items}
sent = 0
for host in self._network.hosts():
ip = str(host)
if ip == self.server_ip:
continue
mac = lease_by_ip.get(ip)
pkt = self._build_forcerenew_packet(mac, ip)
if not pkt:
continue
try:
self._sock.sendto(pkt, (ip, 68))
sent += 1
except Exception as e:
self._log_line(f"FORCERENEW to {mac}/{ip} failed: {e}")
self._log_line(f"FORCERENEW sent to {sent} client(s).")
return sent
# -------------------- Internals --------------------
def _run_loop(self) -> None:
try:
@@ -238,10 +311,9 @@ class DHCPServer:
if mac in self._active_clients:
self._active_clients.remove(mac)
def _build_pool(self, start_ip: str, mask: str) -> List[str]:
def _build_pool(self, network: ipaddress.IPv4Network, start_ip: str) -> List[str]:
# Build pool from the network containing start_ip with provided mask
net = ipaddress.IPv4Network((start_ip, mask), strict=False)
all_hosts = list(net.hosts())
all_hosts = list(network.hosts())
# Begin at/after start_ip; exclude start_ip (we use it as server/gateway)
try:
start_idx = all_hosts.index(ipaddress.IPv4Address(start_ip))
@@ -256,6 +328,61 @@ class DHCPServer:
pool.remove(start_ip)
return pool
def _prepare_reservations(self, reservations: List[Dict[str, str]]) -> Dict[str, Dict[str, Optional[int]]]:
prepared: Dict[str, Dict[str, Optional[int]]] = {}
for res in reservations:
mac_raw = res.get("mac", "")
ip = res.get("ip", "")
lease = res.get("lease_time")
try:
lease_int = int(lease) if lease not in (None, "") else None
if lease_int is not None and lease_int <= 0:
lease_int = None
except Exception:
lease_int = None
try:
mac_norm = normalize_mac(mac_raw)
except Exception:
self._log_line(f"Ignoring reservation '{mac_raw}': invalid MAC")
continue
try:
ip_norm = str(ipaddress.IPv4Address(ip))
except Exception:
self._log_line(f"Ignoring reservation {mac_norm}: invalid IP '{ip}'")
continue
if ip_norm == self.server_ip:
self._log_line(f"Ignoring reservation {mac_norm}: IP {ip_norm} is server IP")
continue
if self._network and ipaddress.IPv4Address(ip_norm) not in self._network:
self._log_line(f"Ignoring reservation {mac_norm}: IP {ip_norm} not in subnet")
continue
prepared[mac_norm] = {"ip": ip_norm, "lease": lease_int}
if ip_norm in self._pool:
self._pool.remove(ip_norm)
return prepared
def _reserved_ip_for(self, mac: str) -> Optional[str]:
entry = self.reservations.get(mac.upper())
return entry["ip"] if entry else None
def _reserved_lease_for(self, mac: str) -> Optional[int]:
entry = self.reservations.get(mac.upper())
return entry.get("lease") if entry else None
def _is_ip_available(self, ip: Optional[str], mac: str) -> bool:
if not ip:
return False
# Reserved for different MAC?
for res_mac, res in self.reservations.items():
res_ip = res.get("ip")
if res_ip == ip and res_mac != mac:
return False
with self._lock:
for lease_mac, (lease_ip, _) in self._leases.items():
if lease_ip == ip and lease_mac != mac:
return False
return True
def _next_free_ip(self) -> Optional[str]:
with self._lock:
used = {ip for ip, _ in self._leases.values()}
@@ -309,37 +436,62 @@ class DHCPServer:
pass
def _dhcp_offer(self, mac: str, xid: int, giaddr: bytes) -> None:
ip = self._next_free_ip()
reserved_ip = self._reserved_ip_for(mac)
lease_seconds = self._reserved_lease_for(mac) or self.default_lease_seconds
if reserved_ip and self._is_ip_available(reserved_ip, mac):
ip = reserved_ip
else:
ip = self._next_free_ip()
if not ip:
self._log_line("No free IP to offer.")
return
self._log_line(f"OFFER {ip} to {mac}")
if reserved_ip:
self._log_line(f"OFFER (reserved) {ip} to {mac}")
else:
self._log_line(f"OFFER {ip} to {mac}")
pkt = self._build_reply_packet(
msg_type=DHCPOFFER, mac=mac, xid=xid, yiaddr=ip, giaddr=giaddr
msg_type=DHCPOFFER, mac=mac, xid=xid, yiaddr=ip, giaddr=giaddr, lease_seconds=lease_seconds
)
self._send_broadcast(pkt)
def _dhcp_ack(self, mac: str, xid: int, giaddr: bytes, requested_ip: Optional[str]) -> None:
# If requested IP is available or already leased to this MAC, use it; else choose next free
reserved_ip = self._reserved_ip_for(mac)
lease_seconds = self._reserved_lease_for(mac) or self.default_lease_seconds
if reserved_ip and not self._is_ip_available(reserved_ip, mac):
reserved_ip = None
with self._lock:
current = self._leases.get(mac, (None, 0))[0]
ip = requested_ip or current or self._next_free_ip()
ip: Optional[str] = None
if reserved_ip:
ip = reserved_ip
elif requested_ip and self._is_ip_available(requested_ip, mac):
ip = requested_ip
elif current and self._is_ip_available(current, mac):
ip = current
else:
ip = self._next_free_ip()
if not ip:
self._log_line("No IP available for ACK.")
return
# Register/renew lease
expiry = time.time() + LEASE_SECONDS
expiry = time.time() + lease_seconds
with self._lock:
self._leases[mac] = (ip, expiry)
if mac not in self._active_clients:
self._active_clients.append(mac)
self._log_line(f"ACK {ip} to {mac}")
if reserved_ip:
self._log_line(f"ACK (reserved) {ip} to {mac}")
else:
self._log_line(f"ACK {ip} to {mac}")
pkt = self._build_reply_packet(
msg_type=DHCPACK, mac=mac, xid=xid, yiaddr=ip, giaddr=giaddr
msg_type=DHCPACK, mac=mac, xid=xid, yiaddr=ip, giaddr=giaddr, lease_seconds=lease_seconds
)
self._send_broadcast(pkt)
def _build_reply_packet(self, msg_type: int, mac: str, xid: int, yiaddr: str, giaddr: bytes) -> bytes:
def _build_reply_packet(self, msg_type: int, mac: str, xid: int, yiaddr: str, giaddr: bytes, lease_seconds: int) -> bytes:
# BOOTP header
op = 2 # reply
htype = 1
@@ -374,8 +526,52 @@ class DHCPServer:
dns_bytes = b"".join(ip2bytes(ip) for ip in self.dns_servers)
options.append((6, dns_bytes))
options.append((51, struct.pack("!I", LEASE_SECONDS))) # lease time
# Numeric extended options
for code_str, val in self.extended_options.items():
try:
code = int(code_str)
except Exception:
self._log_line(f"Ignoring extended option with non-numeric code '{code_str}'")
continue
opt_bytes = self._encode_extended_option(code, val)
if opt_bytes is None:
continue
options.append((code, opt_bytes))
options.append((51, struct.pack("!I", lease_seconds))) # lease time
return hdr + build_options(options)
def _build_forcerenew_packet(self, mac: Optional[str], client_ip: str) -> Optional[bytes]:
"""Minimal FORCERENEW per RFC 3203 to trigger clients to renew."""
try:
ciaddr = ip2bytes(client_ip)
except Exception:
return None
op = 2
htype = 1
hlen = 6 if mac else 0
hops = 0
xid = 0
secs = 0
flags = 0
yiaddr_b = b"\x00\x00\x00\x00"
siaddr = ip2bytes(self.server_ip or "0.0.0.0")
giaddr_b = b"\x00\x00\x00\x00"
chaddr = (
bytes.fromhex(mac.replace(":", "")) + b"\x00" * 10
if mac
else b"\x00" * 16
)
sname = b"\x00" * 64
filef = b"\x00" * 128
hdr = struct.pack("!BBBBIHH4s4s4s4s16s64s128s",
op, htype, hlen, hops, xid, secs, flags,
ciaddr, yiaddr_b, siaddr, giaddr_b, chaddr, sname, filef)
options = [
(53, bytes([DHCPFORCERENEW])),
(54, siaddr),
]
return hdr + build_options(options)
def _send_broadcast(self, pkt: bytes) -> None:
@@ -393,3 +589,34 @@ class DHCPServer:
def _set_status(self, st: str) -> None:
self._status = st
def _encode_extended_option(self, code: int, value: str) -> Optional[bytes]:
if not value:
return None
if code == 42:
# NTP servers as comma-separated IP list
parts = [p.strip() for p in value.split(",") if p.strip()]
ips = []
for p in parts:
try:
ips.append(ip2bytes(p))
except Exception:
continue
if not ips:
self._log_line("Extended option 42 skipped (no valid IPs).")
return None
return b"".join(ips)
if code == 51:
try:
lease_int = int(value)
if lease_int <= 0:
return None
return struct.pack("!I", lease_int)
except Exception:
self._log_line("Extended option 51 ignored (invalid integer).")
return None
try:
return value.encode("ascii")
except Exception:
self._log_line(f"Extended option {code} not sent (invalid encoding).")
return None