From 7cf3fe2e9e75b4fcbcfef34cc0f56c8709c8dd63 Mon Sep 17 00:00:00 2001 From: Marcel Peterkau Date: Thu, 11 Sep 2025 11:31:21 +0200 Subject: [PATCH] dumped that AI-Shitt and rebuild from scratch . works now --- src/dhcp_server.py | 396 +++++++++++++++++++++++++++++++++++++++++---- src/gui.py | 245 ++++++++++++++++++++++------ src/main.py | 77 ++------- src/utils.py | 55 +++---- start.sh | 0 5 files changed, 598 insertions(+), 175 deletions(-) mode change 100644 => 100755 start.sh diff --git a/src/dhcp_server.py b/src/dhcp_server.py index 03ff2d4..94cf038 100644 --- a/src/dhcp_server.py +++ b/src/dhcp_server.py @@ -1,35 +1,377 @@ +import os +import socket +import struct +import threading +import time +import select +from collections import deque +from typing import Dict, List, Optional, Tuple +import ipaddress + + +MAGIC_COOKIE = b"\x63\x82\x53\x63" + +# DHCP message types +DHCPDISCOVER = 1 +DHCPOFFER = 2 +DHCPREQUEST = 3 +DHCPDECLINE = 4 +DHCPACK = 5 +DHCPNAK = 6 +DHCPRELEASE = 7 +DHCPINFORM = 8 + +LEASE_SECONDS = 3600 # 1 hour leases + + +def mac_bytes_to_str(b: bytes) -> str: + return ":".join(f"{x:02X}" for x in b) + + +def parse_dhcp_options(opts: bytes) -> Dict[int, bytes]: + res: Dict[int, bytes] = {} + i = 0 + while i < len(opts): + code = opts[i] + i += 1 + if code == 0: # pad + continue + if code == 255: # end + break + if i >= len(opts): + break + ln = opts[i] + i += 1 + val = opts[i:i+ln] + i += ln + res[code] = val + return res + + +def build_options(pairs: List[Tuple[int, bytes]]) -> bytes: + out = bytearray() + out += MAGIC_COOKIE + for code, val in pairs: + out += bytes([code, len(val)]) + val + out += b"\xff" # END + return bytes(out) + + +def ip2bytes(ip: str) -> bytes: + return socket.inet_aton(ip) + + +def bytes2ip(b: bytes) -> str: + return socket.inet_ntoa(b) + + class DHCPServer: - def __init__(self): - self.leases = {} - self.active_clients = [] + """ + Minimal functional IPv4 DHCP server for lab/testing. + - Binds to the specified interface (SO_BINDTODEVICE on Linux). + - Listens on UDP/67, answers DISCOVER with OFFER and REQUEST with ACK. + - Very simple lease pool carved from start_ip within the interface's subnet. + - No persistence; in-memory leases. + """ - def start(self, interface, ip_range, subnet): - # Start the DHCP server on the specified interface - pass + def __init__(self) -> None: + self._thread: Optional[threading.Thread] = None + self._stop_evt = threading.Event() + self._lock = threading.Lock() - def stop(self): - # Stop the DHCP server - pass + self.interface: Optional[str] = None + self.ip_start: Optional[str] = None + self.subnet_mask: Optional[str] = None + self.server_ip: Optional[str] = None # inferred from interface IP - def handle_request(self, request): - # Handle incoming DHCP requests - pass + self._leases: Dict[str, Tuple[str, float]] = {} # MAC -> (IP, expiry_ts) + self._active_clients: List[str] = [] # list of MACs + self._sock: Optional[socket.socket] = None - def add_lease(self, client_ip, client_mac): - # Add a new lease for a client - self.leases[client_mac] = client_ip - self.active_clients.append(client_mac) + self._pool: List[str] = [] # IP pool + self._log = deque(maxlen=500) + self._status = "idle" - def remove_lease(self, client_mac): - # Remove a lease for a client - if client_mac in self.leases: - del self.leases[client_mac] - self.active_clients.remove(client_mac) + # -------------------- Public API -------------------- + def start(self, interface: str, start_ip: str, mask: str) -> None: + if self.is_running(): + raise RuntimeError("Server is already running.") + self.interface = interface + self.ip_start = start_ip + self.subnet_mask = mask - def get_active_clients(self): - # Return a list of active clients - return self.active_clients + # build pool based on interface network + self.server_ip = start_ip # We use the provided IP as "server identifier"; typically you'd use iface IP + self._pool = self._build_pool(start_ip, mask) - def get_leases(self): - # Return the current leases - return self.leases \ No newline at end of file + self._stop_evt.clear() + self._thread = threading.Thread(target=self._run_loop, name="DHCPWorker", daemon=True) + self._thread.start() + + def stop(self) -> None: + if not self.is_running(): + return + self._stop_evt.set() + if self._sock: + try: + self._sock.close() + except Exception: + pass + if self._thread and self._thread.is_alive(): + self._thread.join(timeout=3.0) + self._thread = None + self._sock = None + self._set_status("stopped") + self._log_line("Server stopped.") + + def is_running(self) -> bool: + return self._thread is not None and self._thread.is_alive() and not self._stop_evt.is_set() + + def get_active_clients(self) -> List[str]: + with self._lock: + return list(self._active_clients) + + def get_leases(self) -> Dict[str, str]: + with self._lock: + return {mac: ip for mac, (ip, _) in self._leases.items()} + + def get_status(self) -> str: + return self._status + + def pop_logs(self, max_items: int = 100) -> List[str]: + items: List[str] = [] + with self._lock: + while self._log and len(items) < max_items: + items.append(self._log.popleft()) + return items + + # -------------------- Internals -------------------- + def _run_loop(self) -> None: + try: + self._set_status("starting") + self._log_line(f"Starting on '{self.interface}' with start {self.ip_start}, mask {self.subnet_mask}") + self._sock = self._open_socket(self.interface) + self._set_status("running") + self._log_line("Listening on UDP/67. WARNING: Requires root privileges.") + + while not self._stop_evt.is_set(): + r, _, _ = select.select([self._sock], [], [], 0.5) + if not r: + self._expire_leases() + continue + try: + data, addr = self._sock.recvfrom(4096) + except OSError: + break + self._handle_packet(data, addr) + + except PermissionError as e: + self._set_status("error") + self._log_line(f"Permission error (need root?): {e}") + except Exception as e: + self._set_status("error") + self._log_line(f"ERROR: {e!r}") + finally: + try: + if self._sock: + self._sock.close() + except Exception: + pass + + def _open_socket(self, iface: str) -> socket.socket: + # Basic root check; binding will fail anyway if not root + if hasattr(os, "geteuid") and os.geteuid() != 0: + raise PermissionError("Root privileges required to bind DHCP server port 67.") + + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + # Make sure we can reuse + try: + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + except Exception: + pass + try: + SO_REUSEPORT = getattr(socket, "SO_REUSEPORT", None) + if SO_REUSEPORT is not None: + s.setsockopt(socket.SOL_SOCKET, SO_REUSEPORT, 1) + except Exception: + pass + try: + s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + except Exception: + pass + + # Bind to the specific device (Linux) + try: + SO_BINDTODEVICE = 25 + s.setsockopt(socket.SOL_SOCKET, SO_BINDTODEVICE, iface.encode()) + self._log_line(f"SO_BINDTODEVICE set to {iface}") + except Exception as e: + # If not available, better abort to avoid affecting other interfaces + s.close() + raise PermissionError("SO_BINDTODEVICE not available; cannot restrict to selected interface.") from e + + # Bind to server port + s.bind(("0.0.0.0", 67)) + return s + + def _expire_leases(self) -> None: + now = time.time() + with self._lock: + expired = [mac for mac, (_, t) in self._leases.items() if t <= now] + for mac in expired: + self._leases.pop(mac, None) + if mac in self._active_clients: + self._active_clients.remove(mac) + + def _build_pool(self, start_ip: str, mask: str) -> List[str]: + # Build pool from the network containing start_ip with provided mask + net = ipaddress.IPv4Network((start_ip, mask), strict=False) + all_hosts = list(net.hosts()) + # Begin at start_ip position (or nearest host) and go upwards, excluding start_ip itself for server identity + try: + start_idx = all_hosts.index(ipaddress.IPv4Address(start_ip)) + except ValueError: + # pick the first host >= start_ip, else 0 + start_idx = 0 + for i, h in enumerate(all_hosts): + if int(h) >= int(ipaddress.IPv4Address(start_ip)): + start_idx = i + break + pool = [str(h) for h in all_hosts[start_idx:]] + # Ensure server_ip isn't handed out + if start_ip in pool: + pool.remove(start_ip) + return pool + + def _next_free_ip(self) -> Optional[str]: + with self._lock: + used = {ip for ip, _ in self._leases.values()} + for ip in self._pool: + if ip not in used: + return ip + return None + + def _handle_packet(self, data: bytes, addr) -> None: + # Basic BOOTP header length check (fixed 236 + cookie 4 -> options after) + if len(data) < 240: + self._log_line("Drop: packet too short") + return + + op, htype, hlen, hops, xid, secs, flags = struct.unpack("!BBBBIHH", data[:12]) + ciaddr = data[12:16] + yiaddr = data[16:20] + siaddr = data[20:24] + giaddr = data[24:28] + chaddr = data[28:44] # 16 bytes + # sname: data[44:108]; file: data[108:236] + + # Options + cookie = data[236:240] + if cookie != MAGIC_COOKIE: + self._log_line("Drop: bad magic cookie") + return + opts = parse_dhcp_options(data[240:]) + + mac = mac_bytes_to_str(chaddr[:hlen]) + msgtype = opts.get(53, b"\x00") + m = msgtype[0] if msgtype else 0 + + self._log_line(f"RX {len(data)}B from {addr[0]}:{addr[1]} op={op} mac={mac} type={m}") + + if m == DHCPDISCOVER: + self._dhcp_offer(mac, xid, giaddr) + elif m == DHCPREQUEST: + # honor requested IP (opt 50) if possible, else next free + req_ip = opts.get(50, None) + ipstr = bytes2ip(req_ip) if req_ip and len(req_ip) == 4 else None + self._dhcp_ack(mac, xid, giaddr, requested_ip=ipstr) + elif m == DHCPRELEASE: + with self._lock: + self._leases.pop(mac, None) + if mac in self._active_clients: + self._active_clients.remove(mac) + self._log_line(f"RELEASE {mac}") + else: + # ignore other types in this minimal server + pass + + def _dhcp_offer(self, mac: str, xid: int, giaddr: bytes) -> None: + ip = self._next_free_ip() + if not ip: + self._log_line("No free IP to offer.") + return + self._log_line(f"OFFER {ip} to {mac}") + pkt = self._build_reply_packet( + msg_type=DHCPOFFER, mac=mac, xid=xid, yiaddr=ip, giaddr=giaddr + ) + self._send_broadcast(pkt) + + def _dhcp_ack(self, mac: str, xid: int, giaddr: bytes, requested_ip: Optional[str]) -> None: + # If requested IP is available or already leased to this MAC, use it; else choose next free + with self._lock: + current = self._leases.get(mac, (None, 0))[0] + ip = requested_ip or current or self._next_free_ip() + if not ip: + self._log_line("No IP available for ACK.") + return + # Register/renew lease + expiry = time.time() + LEASE_SECONDS + with self._lock: + self._leases[mac] = (ip, expiry) + if mac not in self._active_clients: + self._active_clients.append(mac) + self._log_line(f"ACK {ip} to {mac}") + pkt = self._build_reply_packet( + msg_type=DHCPACK, mac=mac, xid=xid, yiaddr=ip, giaddr=giaddr + ) + self._send_broadcast(pkt) + + def _build_reply_packet(self, msg_type: int, mac: str, xid: int, yiaddr: str, giaddr: bytes) -> bytes: + # BOOTP header + op = 2 # reply + htype = 1 + hlen = 6 + hops = 0 + secs = 0 + flags = 0x0000 # could set broadcast flag if desired + ciaddr = b"\x00\x00\x00\x00" + yiaddr_b = ip2bytes(yiaddr) + siaddr = ip2bytes(self.server_ip or "0.0.0.0") + giaddr_b = giaddr if len(giaddr) == 4 else b"\x00\x00\x00\x00" + chaddr = bytes.fromhex(mac.replace(":", "")) + b"\x00" * 10 # pad to 16 + sname = b"\x00" * 64 + filef = b"\x00" * 128 + + hdr = struct.pack("!BBBBIHH4s4s4s4s16s64s128s", + op, htype, hlen, hops, xid, secs, flags, + ciaddr, yiaddr_b, siaddr, giaddr_b, chaddr, sname, filef) + + # Options: message type, server id, subnet mask, router (server), lease time + options = [ + (53, bytes([msg_type])), + (54, siaddr), # server identifier + ] + if self.subnet_mask: + options.append((1, ip2bytes(self.subnet_mask))) # subnet mask + if self.server_ip: + options.append((3, ip2bytes(self.server_ip))) # router + options.append((6, ip2bytes(self.server_ip))) # DNS (for lab convenience) + options.append((51, struct.pack("!I", LEASE_SECONDS))) # lease time + + return hdr + build_options(options) + + def _send_broadcast(self, pkt: bytes) -> None: + if not self._sock: + return + try: + self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + except Exception: + pass + self._sock.sendto(pkt, ("255.255.255.255", 68)) + + def _log_line(self, msg: str) -> None: + with self._lock: + self._log.append(time.strftime("[%H:%M:%S] ") + msg) + + def _set_status(self, st: str) -> None: + self._status = st diff --git a/src/gui.py b/src/gui.py index 2e3b82f..397a53e 100644 --- a/src/gui.py +++ b/src/gui.py @@ -1,68 +1,209 @@ -from tkinter import Tk, Label, Button, Entry, StringVar, OptionMenu, Listbox, END -import psutil +import os +import socket +import tkinter as tk +from tkinter import messagebox, ttk +from typing import Optional + from dhcp_server import DHCPServer +import utils -class DHCPApp: - def __init__(self, master): - self.master = master - master.title("DHCP Server") - self.interface_label = Label(master, text="Select Network Interface:") - self.interface_label.pack() +WARNING_TEXT = ( + "ACHTUNG!\n\n" + "Ein DHCP-Server im falschen Netz kann massive Störungen verursachen " + "(IP-Konflikte, Netzwerkausfall, heilloses Chaos). " + "Stelle sicher, dass du auf einem isolierten Testnetz oder dem " + "korrekten Interface arbeitest.\n\n" + "Willst du den Server wirklich starten?" +) - self.interface_var = StringVar(master) - self.interfaces = self.get_network_interfaces() - self.interface_menu = OptionMenu(master, self.interface_var, *self.interfaces) - self.interface_menu.pack() - self.ip_label = Label(master, text="IP Address:") - self.ip_label.pack() - self.ip_entry = Entry(master) - self.ip_entry.pack() +def _preflight_require_root_and_bind(iface: str) -> None: + # Root-Check (nur auf Unix relevant; Windows hat kein geteuid) + if hasattr(os, "geteuid") and os.geteuid() != 0: + raise PermissionError("Root-Rechte erforderlich (Port 67).") - self.subnet_label = Label(master, text="Subnet Mask:") - self.subnet_label.pack() - self.subnet_entry = Entry(master) - self.subnet_entry.pack() + # Bind-Test + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + try: + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + SO_REUSEPORT = getattr(socket, "SO_REUSEPORT", None) + if SO_REUSEPORT is not None: + s.setsockopt(socket.SOL_SOCKET, SO_REUSEPORT, 1) + s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + except Exception: + pass - self.start_button = Button(master, text="Start DHCP Server", command=self.start_dhcp_server) - self.start_button.pack() + # strikt an Interface binden + try: + SO_BINDTODEVICE = 25 + s.setsockopt(socket.SOL_SOCKET, SO_BINDTODEVICE, iface.encode()) + except Exception as e: + raise PermissionError("SO_BINDTODEVICE nicht verfügbar – kann nicht exklusiv an Interface binden.") from e - self.stop_button = Button(master, text="Stop DHCP Server", command=self.stop_dhcp_server) - self.stop_button.pack() + # Port 67 testen + s.bind(("0.0.0.0", 67)) + finally: + try: + s.close() + except Exception: + pass - self.clients_label = Label(master, text="Active Clients:") - self.clients_label.pack() - self.clients_listbox = Listbox(master) - self.clients_listbox.pack() +class DHCPApp(tk.Frame): + def __init__(self, master: tk.Tk) -> None: + super().__init__(master) + master.title("Simple DHCP Server (Lab)") + master.minsize(680, 460) + self.pack(fill="both", expand=True, padx=12, pady=12) - self.dhcp_server = None + # Single server instance + self.server: Optional[DHCPServer] = None - def get_network_interfaces(self): - return psutil.net_if_addrs().keys() + # --- Top form --- + form = ttk.Frame(self) + form.pack(fill="x", pady=(0, 8)) - def start_dhcp_server(self): - ip = self.ip_entry.get() - subnet = self.subnet_entry.get() - interface = self.interface_var.get() - self.dhcp_server = DHCPServer(interface, ip, subnet) - self.dhcp_server.start() - self.update_clients_list() + ttk.Label(form, text="Interface:").grid(row=0, column=0, sticky="w") + self.if_var = tk.StringVar() + self.if_menu = ttk.OptionMenu(form, self.if_var, None, *utils.get_network_interfaces(), command=self.on_iface_change) + self.if_menu.grid(row=0, column=1, sticky="ew", padx=(6, 16)) - def stop_dhcp_server(self): - if self.dhcp_server: - self.dhcp_server.stop() - self.dhcp_server = None - self.clients_listbox.delete(0, END) + ttk.Label(form, text="Server/Start-IP:").grid(row=0, column=2, sticky="w") + self.ip_var = tk.StringVar(value="") + self.ip_entry = ttk.Entry(form, textvariable=self.ip_var, width=16) + self.ip_entry.grid(row=0, column=3, sticky="w", padx=(6, 16)) - def update_clients_list(self): - if self.dhcp_server: - self.clients_listbox.delete(0, END) - for client in self.dhcp_server.get_active_clients(): - self.clients_listbox.insert(END, client) + ttk.Label(form, text="Subnetzmaske:").grid(row=0, column=4, sticky="w") + self.mask_var = tk.StringVar(value="") + self.mask_entry = ttk.Entry(form, textvariable=self.mask_var, width=16) + self.mask_entry.grid(row=0, column=5, sticky="w", padx=(6, 16)) -if __name__ == "__main__": - root = Tk() - app = DHCPApp(root) - root.mainloop() \ No newline at end of file + form.columnconfigure(1, weight=1) + + # --- Buttons --- + btns = ttk.Frame(self) + btns.pack(fill="x", pady=(0, 8)) + self.btn_start = ttk.Button(btns, text="Start", command=self.start_server) + self.btn_stop = ttk.Button(btns, text="Stop", command=self.stop_server, state="disabled") + self.btn_start.pack(side="left") + self.btn_stop.pack(side="left", padx=(8, 0)) + + # --- Clients & Logs Paned Layout --- + paned = ttk.Panedwindow(self, orient="horizontal") + paned.pack(fill="both", expand=True) + + # Clients list + left = ttk.Labelframe(paned, text="Aktive Clients / Leases") + self.clients = tk.Listbox(left, height=12) + self.clients.pack(fill="both", expand=True, padx=6, pady=6) + paned.add(left, weight=1) + + # Logs + right = ttk.Labelframe(paned, text="Log") + self.log = tk.Text(right, height=12, state="disabled") + self.log.pack(fill="both", expand=True, padx=6, pady=6) + paned.add(right, weight=2) + + # --- Status bar --- + self.status_var = tk.StringVar(value="Bereit.") + status = ttk.Label(self, textvariable=self.status_var, anchor="w", relief="sunken") + status.pack(fill="x", side="bottom") + + # initial iface autofill (if any interface exists) + ifaces = utils.get_network_interfaces() + if ifaces: + self.if_var.set(ifaces[0]) + self.on_iface_change(ifaces[0]) + + # periodic refresh + self.after(400, self._refresh) + + # -------------------- UI logic -------------------- + def on_iface_change(self, _sel=None): + iface = self.if_var.get().strip() + ip, mask = utils.get_iface_ipv4_config(iface) + if ip: + self.ip_var.set(ip) + if mask: + self.mask_var.set(mask) + + def _set_controls_enabled(self, enabled: bool): + state = "normal" if enabled else "disabled" + self.if_menu.configure(state=state) + self.ip_entry.configure(state=state) + self.mask_entry.configure(state=state) + self.btn_start.configure(state="normal" if enabled else "disabled") + self.btn_stop.configure(state="disabled" if enabled else "normal") + + def start_server(self) -> None: + if self.server and self.server.is_running(): + messagebox.showinfo("Info", "Server läuft bereits.") + return + # Big fat warning + if not messagebox.askyesno("Warnung", WARNING_TEXT, icon="warning"): + return + + iface = self.if_var.get().strip() + ip = self.ip_var.get().strip() + mask = self.mask_var.get().strip() + if not iface: + messagebox.showerror("Fehler", "Bitte ein Netzwerk-Interface wählen.") + return + try: + ip = utils.format_ip_address(ip) + utils.validate_subnet_mask(mask) + except Exception as e: + messagebox.showerror("Ungültige Eingabe", str(e)) + return + + # --- Preflight: Root + Bind-Test (SO_BINDTODEVICE) --- + try: + _preflight_require_root_and_bind(iface) + except PermissionError as e: + self.status_var.set("Start fehlgeschlagen: Rechte.") + messagebox.showerror("Start fehlgeschlagen (Rechte)", f"{e}\n\nBitte mit sudo starten.") + return + except OSError as e: + self.status_var.set("Start fehlgeschlagen: Bind.") + messagebox.showerror("Start fehlgeschlagen (Bind)", f"Konnte Port 67 auf {iface} nicht binden:\n{e}") + return + + # --- Start --- + try: + self.server = DHCPServer() + self.server.start(iface, ip, mask) + self.status_var.set("Server gestartet.") + self._set_controls_enabled(False) + except Exception as e: + self.server = None + self.status_var.set("Start fehlgeschlagen.") + messagebox.showerror("Start fehlgeschlagen", str(e)) + + def stop_server(self) -> None: + if self.server: + self.server.stop() + self._set_controls_enabled(True) + self.status_var.set("Server gestoppt.") + + def _append_log(self, lines): + if not lines: + return + self.log.configure(state="normal") + for ln in lines: + self.log.insert("end", ln + "\n") + self.log.see("end") + self.log.configure(state="disabled") + + def _refresh(self) -> None: + # update clients + self.clients.delete(0, "end") + if self.server and self.server.is_running(): + leases = self.server.get_leases() + for mac, ip in leases.items(): + self.clients.insert("end", f"{mac} → {ip}") + self.status_var.set(f"Status: {self.server.get_status()}") + # pull logs + self._append_log(self.server.pop_logs(100)) + self.after(400, self._refresh) diff --git a/src/main.py b/src/main.py index 93f3edc..af4bc5f 100644 --- a/src/main.py +++ b/src/main.py @@ -1,69 +1,16 @@ import tkinter as tk -from tkinter import ttk -from dhcp_server import DHCPServer -import utils +from gui import DHCPApp -class DHCPApp: - def __init__(self, root): - self.root = root - self.root.title("Simple DHCP Server") - - self.server = None - - self.interface_label = tk.Label(root, text="Select Network Interface:") - self.interface_label.grid(row=0, column=0, padx=10, pady=10) - - self.interface_var = tk.StringVar() - self.interface_dropdown = ttk.Combobox(root, textvariable=self.interface_var) - self.interface_dropdown['values'] = utils.get_network_interfaces() - self.interface_dropdown.grid(row=0, column=1, padx=10, pady=10) - - self.ip_label = tk.Label(root, text="DHCP IP Range Start:") - self.ip_label.grid(row=1, column=0, padx=10, pady=10) - - self.ip_entry = tk.Entry(root) - self.ip_entry.grid(row=1, column=1, padx=10, pady=10) - - self.subnet_label = tk.Label(root, text="Subnet Mask:") - self.subnet_label.grid(row=2, column=0, padx=10, pady=10) - - self.subnet_entry = tk.Entry(root) - self.subnet_entry.grid(row=2, column=1, padx=10, pady=10) - - self.start_button = tk.Button(root, text="Start DHCP Server", command=self.start_server) - self.start_button.grid(row=3, column=0, padx=10, pady=10) - - self.stop_button = tk.Button(root, text="Stop DHCP Server", command=self.stop_server) - self.stop_button.grid(row=3, column=1, padx=10, pady=10) - - self.clients_label = tk.Label(root, text="Active Clients:") - self.clients_label.grid(row=4, column=0, padx=10, pady=10) - - self.clients_listbox = tk.Listbox(root, width=50) - self.clients_listbox.grid(row=5, column=0, columnspan=2, padx=10, pady=10) - - def start_server(self): - if not self.server: - ip_range = self.ip_entry.get() - subnet = self.subnet_entry.get() - interface = self.interface_var.get() - self.server = DHCPServer(interface, ip_range, subnet) - self.server.start() - self.update_clients() - - def stop_server(self): - if self.server: - self.server.stop() - self.server = None - self.clients_listbox.delete(0, tk.END) - - def update_clients(self): - if self.server: - self.clients_listbox.delete(0, tk.END) - for client in self.server.get_active_clients(): - self.clients_listbox.insert(tk.END, client) +def main(): + root = tk.Tk() + try: + import sv_ttk # optional pretty theme + sv_ttk.use_dark_theme() + except Exception: + pass + app = DHCPApp(root) + root.geometry("860x540") + root.mainloop() if __name__ == "__main__": - root = tk.Tk() - app = DHCPApp(root) - root.mainloop() \ No newline at end of file + main() diff --git a/src/utils.py b/src/utils.py index 91e7192..355fa0d 100644 --- a/src/utils.py +++ b/src/utils.py @@ -1,40 +1,33 @@ -def get_available_interfaces(): - import netifaces - return netifaces.interfaces() - -def format_ip_address(ip): - try: - parts = ip.split('.') - if len(parts) != 4: - raise ValueError("Invalid IP address format") - return '.'.join(str(int(part)) for part in parts) - except ValueError as e: - raise ValueError(f"Error formatting IP address: {e}") - -def validate_subnet_mask(mask): - valid_masks = [ - '255.255.255.255', '255.255.255.254', '255.255.255.252', - '255.255.255.248', '255.255.255.240', '255.255.255.224', - '255.255.255.192', '255.255.255.128', '255.255.255.0', - '255.255.254.0', '255.255.252.0', '255.255.248.0', - '255.255.240.0', '255.255.224.0', '255.255.192.0', - '255.255.128.0', '255.255.0.0', '255.254.0.0', - '255.252.0.0', '255.248.0.0', '255.240.0.0', - '255.224.0.0', '255.192.0.0', '255.128.0.0', - '255.0.0.0', '254.0.0.0', '252.0.0.0', - '248.0.0.0', '240.0.0.0', '224.0.0.0', - '192.0.0.0', '128.0.0.0', '0.0.0.0' - ] - if mask not in valid_masks: - raise ValueError("Invalid subnet mask") - import socket +import ipaddress import psutil +from typing import Optional, Tuple def get_network_interfaces(): + """Return only interfaces that have an IPv4 address to keep choices sane.""" interfaces = [] for iface, addrs in psutil.net_if_addrs().items(): for addr in addrs: if addr.family == socket.AF_INET: interfaces.append(iface) - return interfaces \ No newline at end of file + break + return sorted(interfaces) + +def get_iface_ipv4_config(iface: str) -> Tuple[Optional[str], Optional[str]]: + """Return (ipv4_addr, netmask) for interface, or (None, None) if not present.""" + addrs = psutil.net_if_addrs().get(iface, []) + for a in addrs: + if a.family == socket.AF_INET: + return a.address, a.netmask + return None, None + +def format_ip_address(ip: str) -> str: + """Normalize dotted quad; raises on invalid input.""" + ip_obj = ipaddress.ip_address(ip) + if ip_obj.version != 4: + raise ValueError("Only IPv4 is supported.") + return str(ip_obj) + +def validate_subnet_mask(mask: str) -> None: + """Accept masks like 255.255.255.0; raise ValueError if invalid.""" + ipaddress.IPv4Network(f"0.0.0.0/{mask}") diff --git a/start.sh b/start.sh old mode 100644 new mode 100755