dumped that AI-Shitt and rebuild from scratch . works now

This commit is contained in:
2025-09-11 11:31:21 +02:00
parent 6556fbf649
commit 7cf3fe2e9e
5 changed files with 598 additions and 175 deletions

View File

@@ -1,35 +1,377 @@
import os
import socket
import struct
import threading
import time
import select
from collections import deque
from typing import Dict, List, Optional, Tuple
import ipaddress
MAGIC_COOKIE = b"\x63\x82\x53\x63"
# DHCP message types
DHCPDISCOVER = 1
DHCPOFFER = 2
DHCPREQUEST = 3
DHCPDECLINE = 4
DHCPACK = 5
DHCPNAK = 6
DHCPRELEASE = 7
DHCPINFORM = 8
LEASE_SECONDS = 3600 # 1 hour leases
def mac_bytes_to_str(b: bytes) -> str:
return ":".join(f"{x:02X}" for x in b)
def parse_dhcp_options(opts: bytes) -> Dict[int, bytes]:
res: Dict[int, bytes] = {}
i = 0
while i < len(opts):
code = opts[i]
i += 1
if code == 0: # pad
continue
if code == 255: # end
break
if i >= len(opts):
break
ln = opts[i]
i += 1
val = opts[i:i+ln]
i += ln
res[code] = val
return res
def build_options(pairs: List[Tuple[int, bytes]]) -> bytes:
out = bytearray()
out += MAGIC_COOKIE
for code, val in pairs:
out += bytes([code, len(val)]) + val
out += b"\xff" # END
return bytes(out)
def ip2bytes(ip: str) -> bytes:
return socket.inet_aton(ip)
def bytes2ip(b: bytes) -> str:
return socket.inet_ntoa(b)
class DHCPServer: class DHCPServer:
def __init__(self): """
self.leases = {} Minimal functional IPv4 DHCP server for lab/testing.
self.active_clients = [] - Binds to the specified interface (SO_BINDTODEVICE on Linux).
- Listens on UDP/67, answers DISCOVER with OFFER and REQUEST with ACK.
- Very simple lease pool carved from start_ip within the interface's subnet.
- No persistence; in-memory leases.
"""
def start(self, interface, ip_range, subnet): def __init__(self) -> None:
# Start the DHCP server on the specified interface self._thread: Optional[threading.Thread] = None
pass self._stop_evt = threading.Event()
self._lock = threading.Lock()
def stop(self): self.interface: Optional[str] = None
# Stop the DHCP server self.ip_start: Optional[str] = None
pass self.subnet_mask: Optional[str] = None
self.server_ip: Optional[str] = None # inferred from interface IP
def handle_request(self, request): self._leases: Dict[str, Tuple[str, float]] = {} # MAC -> (IP, expiry_ts)
# Handle incoming DHCP requests self._active_clients: List[str] = [] # list of MACs
pass self._sock: Optional[socket.socket] = None
def add_lease(self, client_ip, client_mac): self._pool: List[str] = [] # IP pool
# Add a new lease for a client self._log = deque(maxlen=500)
self.leases[client_mac] = client_ip self._status = "idle"
self.active_clients.append(client_mac)
def remove_lease(self, client_mac): # -------------------- Public API --------------------
# Remove a lease for a client def start(self, interface: str, start_ip: str, mask: str) -> None:
if client_mac in self.leases: if self.is_running():
del self.leases[client_mac] raise RuntimeError("Server is already running.")
self.active_clients.remove(client_mac) self.interface = interface
self.ip_start = start_ip
self.subnet_mask = mask
def get_active_clients(self): # build pool based on interface network
# Return a list of active clients self.server_ip = start_ip # We use the provided IP as "server identifier"; typically you'd use iface IP
return self.active_clients self._pool = self._build_pool(start_ip, mask)
def get_leases(self): self._stop_evt.clear()
# Return the current leases self._thread = threading.Thread(target=self._run_loop, name="DHCPWorker", daemon=True)
return self.leases self._thread.start()
def stop(self) -> None:
if not self.is_running():
return
self._stop_evt.set()
if self._sock:
try:
self._sock.close()
except Exception:
pass
if self._thread and self._thread.is_alive():
self._thread.join(timeout=3.0)
self._thread = None
self._sock = None
self._set_status("stopped")
self._log_line("Server stopped.")
def is_running(self) -> bool:
return self._thread is not None and self._thread.is_alive() and not self._stop_evt.is_set()
def get_active_clients(self) -> List[str]:
with self._lock:
return list(self._active_clients)
def get_leases(self) -> Dict[str, str]:
with self._lock:
return {mac: ip for mac, (ip, _) in self._leases.items()}
def get_status(self) -> str:
return self._status
def pop_logs(self, max_items: int = 100) -> List[str]:
items: List[str] = []
with self._lock:
while self._log and len(items) < max_items:
items.append(self._log.popleft())
return items
# -------------------- Internals --------------------
def _run_loop(self) -> None:
try:
self._set_status("starting")
self._log_line(f"Starting on '{self.interface}' with start {self.ip_start}, mask {self.subnet_mask}")
self._sock = self._open_socket(self.interface)
self._set_status("running")
self._log_line("Listening on UDP/67. WARNING: Requires root privileges.")
while not self._stop_evt.is_set():
r, _, _ = select.select([self._sock], [], [], 0.5)
if not r:
self._expire_leases()
continue
try:
data, addr = self._sock.recvfrom(4096)
except OSError:
break
self._handle_packet(data, addr)
except PermissionError as e:
self._set_status("error")
self._log_line(f"Permission error (need root?): {e}")
except Exception as e:
self._set_status("error")
self._log_line(f"ERROR: {e!r}")
finally:
try:
if self._sock:
self._sock.close()
except Exception:
pass
def _open_socket(self, iface: str) -> socket.socket:
# Basic root check; binding will fail anyway if not root
if hasattr(os, "geteuid") and os.geteuid() != 0:
raise PermissionError("Root privileges required to bind DHCP server port 67.")
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Make sure we can reuse
try:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except Exception:
pass
try:
SO_REUSEPORT = getattr(socket, "SO_REUSEPORT", None)
if SO_REUSEPORT is not None:
s.setsockopt(socket.SOL_SOCKET, SO_REUSEPORT, 1)
except Exception:
pass
try:
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
except Exception:
pass
# Bind to the specific device (Linux)
try:
SO_BINDTODEVICE = 25
s.setsockopt(socket.SOL_SOCKET, SO_BINDTODEVICE, iface.encode())
self._log_line(f"SO_BINDTODEVICE set to {iface}")
except Exception as e:
# If not available, better abort to avoid affecting other interfaces
s.close()
raise PermissionError("SO_BINDTODEVICE not available; cannot restrict to selected interface.") from e
# Bind to server port
s.bind(("0.0.0.0", 67))
return s
def _expire_leases(self) -> None:
now = time.time()
with self._lock:
expired = [mac for mac, (_, t) in self._leases.items() if t <= now]
for mac in expired:
self._leases.pop(mac, None)
if mac in self._active_clients:
self._active_clients.remove(mac)
def _build_pool(self, start_ip: str, mask: str) -> List[str]:
# Build pool from the network containing start_ip with provided mask
net = ipaddress.IPv4Network((start_ip, mask), strict=False)
all_hosts = list(net.hosts())
# Begin at start_ip position (or nearest host) and go upwards, excluding start_ip itself for server identity
try:
start_idx = all_hosts.index(ipaddress.IPv4Address(start_ip))
except ValueError:
# pick the first host >= start_ip, else 0
start_idx = 0
for i, h in enumerate(all_hosts):
if int(h) >= int(ipaddress.IPv4Address(start_ip)):
start_idx = i
break
pool = [str(h) for h in all_hosts[start_idx:]]
# Ensure server_ip isn't handed out
if start_ip in pool:
pool.remove(start_ip)
return pool
def _next_free_ip(self) -> Optional[str]:
with self._lock:
used = {ip for ip, _ in self._leases.values()}
for ip in self._pool:
if ip not in used:
return ip
return None
def _handle_packet(self, data: bytes, addr) -> None:
# Basic BOOTP header length check (fixed 236 + cookie 4 -> options after)
if len(data) < 240:
self._log_line("Drop: packet too short")
return
op, htype, hlen, hops, xid, secs, flags = struct.unpack("!BBBBIHH", data[:12])
ciaddr = data[12:16]
yiaddr = data[16:20]
siaddr = data[20:24]
giaddr = data[24:28]
chaddr = data[28:44] # 16 bytes
# sname: data[44:108]; file: data[108:236]
# Options
cookie = data[236:240]
if cookie != MAGIC_COOKIE:
self._log_line("Drop: bad magic cookie")
return
opts = parse_dhcp_options(data[240:])
mac = mac_bytes_to_str(chaddr[:hlen])
msgtype = opts.get(53, b"\x00")
m = msgtype[0] if msgtype else 0
self._log_line(f"RX {len(data)}B from {addr[0]}:{addr[1]} op={op} mac={mac} type={m}")
if m == DHCPDISCOVER:
self._dhcp_offer(mac, xid, giaddr)
elif m == DHCPREQUEST:
# honor requested IP (opt 50) if possible, else next free
req_ip = opts.get(50, None)
ipstr = bytes2ip(req_ip) if req_ip and len(req_ip) == 4 else None
self._dhcp_ack(mac, xid, giaddr, requested_ip=ipstr)
elif m == DHCPRELEASE:
with self._lock:
self._leases.pop(mac, None)
if mac in self._active_clients:
self._active_clients.remove(mac)
self._log_line(f"RELEASE {mac}")
else:
# ignore other types in this minimal server
pass
def _dhcp_offer(self, mac: str, xid: int, giaddr: bytes) -> None:
ip = self._next_free_ip()
if not ip:
self._log_line("No free IP to offer.")
return
self._log_line(f"OFFER {ip} to {mac}")
pkt = self._build_reply_packet(
msg_type=DHCPOFFER, mac=mac, xid=xid, yiaddr=ip, giaddr=giaddr
)
self._send_broadcast(pkt)
def _dhcp_ack(self, mac: str, xid: int, giaddr: bytes, requested_ip: Optional[str]) -> None:
# If requested IP is available or already leased to this MAC, use it; else choose next free
with self._lock:
current = self._leases.get(mac, (None, 0))[0]
ip = requested_ip or current or self._next_free_ip()
if not ip:
self._log_line("No IP available for ACK.")
return
# Register/renew lease
expiry = time.time() + LEASE_SECONDS
with self._lock:
self._leases[mac] = (ip, expiry)
if mac not in self._active_clients:
self._active_clients.append(mac)
self._log_line(f"ACK {ip} to {mac}")
pkt = self._build_reply_packet(
msg_type=DHCPACK, mac=mac, xid=xid, yiaddr=ip, giaddr=giaddr
)
self._send_broadcast(pkt)
def _build_reply_packet(self, msg_type: int, mac: str, xid: int, yiaddr: str, giaddr: bytes) -> bytes:
# BOOTP header
op = 2 # reply
htype = 1
hlen = 6
hops = 0
secs = 0
flags = 0x0000 # could set broadcast flag if desired
ciaddr = b"\x00\x00\x00\x00"
yiaddr_b = ip2bytes(yiaddr)
siaddr = ip2bytes(self.server_ip or "0.0.0.0")
giaddr_b = giaddr if len(giaddr) == 4 else b"\x00\x00\x00\x00"
chaddr = bytes.fromhex(mac.replace(":", "")) + b"\x00" * 10 # pad to 16
sname = b"\x00" * 64
filef = b"\x00" * 128
hdr = struct.pack("!BBBBIHH4s4s4s4s16s64s128s",
op, htype, hlen, hops, xid, secs, flags,
ciaddr, yiaddr_b, siaddr, giaddr_b, chaddr, sname, filef)
# Options: message type, server id, subnet mask, router (server), lease time
options = [
(53, bytes([msg_type])),
(54, siaddr), # server identifier
]
if self.subnet_mask:
options.append((1, ip2bytes(self.subnet_mask))) # subnet mask
if self.server_ip:
options.append((3, ip2bytes(self.server_ip))) # router
options.append((6, ip2bytes(self.server_ip))) # DNS (for lab convenience)
options.append((51, struct.pack("!I", LEASE_SECONDS))) # lease time
return hdr + build_options(options)
def _send_broadcast(self, pkt: bytes) -> None:
if not self._sock:
return
try:
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
except Exception:
pass
self._sock.sendto(pkt, ("255.255.255.255", 68))
def _log_line(self, msg: str) -> None:
with self._lock:
self._log.append(time.strftime("[%H:%M:%S] ") + msg)
def _set_status(self, st: str) -> None:
self._status = st

View File

@@ -1,68 +1,209 @@
from tkinter import Tk, Label, Button, Entry, StringVar, OptionMenu, Listbox, END import os
import psutil import socket
import tkinter as tk
from tkinter import messagebox, ttk
from typing import Optional
from dhcp_server import DHCPServer from dhcp_server import DHCPServer
import utils
class DHCPApp:
def __init__(self, master):
self.master = master
master.title("DHCP Server")
self.interface_label = Label(master, text="Select Network Interface:") WARNING_TEXT = (
self.interface_label.pack() "ACHTUNG!\n\n"
"Ein DHCP-Server im falschen Netz kann massive Störungen verursachen "
"(IP-Konflikte, Netzwerkausfall, heilloses Chaos). "
"Stelle sicher, dass du auf einem isolierten Testnetz oder dem "
"korrekten Interface arbeitest.\n\n"
"Willst du den Server wirklich starten?"
)
self.interface_var = StringVar(master)
self.interfaces = self.get_network_interfaces()
self.interface_menu = OptionMenu(master, self.interface_var, *self.interfaces)
self.interface_menu.pack()
self.ip_label = Label(master, text="IP Address:") def _preflight_require_root_and_bind(iface: str) -> None:
self.ip_label.pack() # Root-Check (nur auf Unix relevant; Windows hat kein geteuid)
self.ip_entry = Entry(master) if hasattr(os, "geteuid") and os.geteuid() != 0:
self.ip_entry.pack() raise PermissionError("Root-Rechte erforderlich (Port 67).")
self.subnet_label = Label(master, text="Subnet Mask:") # Bind-Test
self.subnet_label.pack() s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.subnet_entry = Entry(master) try:
self.subnet_entry.pack() try:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
SO_REUSEPORT = getattr(socket, "SO_REUSEPORT", None)
if SO_REUSEPORT is not None:
s.setsockopt(socket.SOL_SOCKET, SO_REUSEPORT, 1)
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
except Exception:
pass
self.start_button = Button(master, text="Start DHCP Server", command=self.start_dhcp_server) # strikt an Interface binden
self.start_button.pack() try:
SO_BINDTODEVICE = 25
s.setsockopt(socket.SOL_SOCKET, SO_BINDTODEVICE, iface.encode())
except Exception as e:
raise PermissionError("SO_BINDTODEVICE nicht verfügbar kann nicht exklusiv an Interface binden.") from e
self.stop_button = Button(master, text="Stop DHCP Server", command=self.stop_dhcp_server) # Port 67 testen
self.stop_button.pack() s.bind(("0.0.0.0", 67))
finally:
try:
s.close()
except Exception:
pass
self.clients_label = Label(master, text="Active Clients:")
self.clients_label.pack()
self.clients_listbox = Listbox(master) class DHCPApp(tk.Frame):
self.clients_listbox.pack() def __init__(self, master: tk.Tk) -> None:
super().__init__(master)
master.title("Simple DHCP Server (Lab)")
master.minsize(680, 460)
self.pack(fill="both", expand=True, padx=12, pady=12)
self.dhcp_server = None # Single server instance
self.server: Optional[DHCPServer] = None
def get_network_interfaces(self): # --- Top form ---
return psutil.net_if_addrs().keys() form = ttk.Frame(self)
form.pack(fill="x", pady=(0, 8))
def start_dhcp_server(self): ttk.Label(form, text="Interface:").grid(row=0, column=0, sticky="w")
ip = self.ip_entry.get() self.if_var = tk.StringVar()
subnet = self.subnet_entry.get() self.if_menu = ttk.OptionMenu(form, self.if_var, None, *utils.get_network_interfaces(), command=self.on_iface_change)
interface = self.interface_var.get() self.if_menu.grid(row=0, column=1, sticky="ew", padx=(6, 16))
self.dhcp_server = DHCPServer(interface, ip, subnet)
self.dhcp_server.start()
self.update_clients_list()
def stop_dhcp_server(self): ttk.Label(form, text="Server/Start-IP:").grid(row=0, column=2, sticky="w")
if self.dhcp_server: self.ip_var = tk.StringVar(value="")
self.dhcp_server.stop() self.ip_entry = ttk.Entry(form, textvariable=self.ip_var, width=16)
self.dhcp_server = None self.ip_entry.grid(row=0, column=3, sticky="w", padx=(6, 16))
self.clients_listbox.delete(0, END)
def update_clients_list(self): ttk.Label(form, text="Subnetzmaske:").grid(row=0, column=4, sticky="w")
if self.dhcp_server: self.mask_var = tk.StringVar(value="")
self.clients_listbox.delete(0, END) self.mask_entry = ttk.Entry(form, textvariable=self.mask_var, width=16)
for client in self.dhcp_server.get_active_clients(): self.mask_entry.grid(row=0, column=5, sticky="w", padx=(6, 16))
self.clients_listbox.insert(END, client)
if __name__ == "__main__": form.columnconfigure(1, weight=1)
root = Tk()
app = DHCPApp(root) # --- Buttons ---
root.mainloop() btns = ttk.Frame(self)
btns.pack(fill="x", pady=(0, 8))
self.btn_start = ttk.Button(btns, text="Start", command=self.start_server)
self.btn_stop = ttk.Button(btns, text="Stop", command=self.stop_server, state="disabled")
self.btn_start.pack(side="left")
self.btn_stop.pack(side="left", padx=(8, 0))
# --- Clients & Logs Paned Layout ---
paned = ttk.Panedwindow(self, orient="horizontal")
paned.pack(fill="both", expand=True)
# Clients list
left = ttk.Labelframe(paned, text="Aktive Clients / Leases")
self.clients = tk.Listbox(left, height=12)
self.clients.pack(fill="both", expand=True, padx=6, pady=6)
paned.add(left, weight=1)
# Logs
right = ttk.Labelframe(paned, text="Log")
self.log = tk.Text(right, height=12, state="disabled")
self.log.pack(fill="both", expand=True, padx=6, pady=6)
paned.add(right, weight=2)
# --- Status bar ---
self.status_var = tk.StringVar(value="Bereit.")
status = ttk.Label(self, textvariable=self.status_var, anchor="w", relief="sunken")
status.pack(fill="x", side="bottom")
# initial iface autofill (if any interface exists)
ifaces = utils.get_network_interfaces()
if ifaces:
self.if_var.set(ifaces[0])
self.on_iface_change(ifaces[0])
# periodic refresh
self.after(400, self._refresh)
# -------------------- UI logic --------------------
def on_iface_change(self, _sel=None):
iface = self.if_var.get().strip()
ip, mask = utils.get_iface_ipv4_config(iface)
if ip:
self.ip_var.set(ip)
if mask:
self.mask_var.set(mask)
def _set_controls_enabled(self, enabled: bool):
state = "normal" if enabled else "disabled"
self.if_menu.configure(state=state)
self.ip_entry.configure(state=state)
self.mask_entry.configure(state=state)
self.btn_start.configure(state="normal" if enabled else "disabled")
self.btn_stop.configure(state="disabled" if enabled else "normal")
def start_server(self) -> None:
if self.server and self.server.is_running():
messagebox.showinfo("Info", "Server läuft bereits.")
return
# Big fat warning
if not messagebox.askyesno("Warnung", WARNING_TEXT, icon="warning"):
return
iface = self.if_var.get().strip()
ip = self.ip_var.get().strip()
mask = self.mask_var.get().strip()
if not iface:
messagebox.showerror("Fehler", "Bitte ein Netzwerk-Interface wählen.")
return
try:
ip = utils.format_ip_address(ip)
utils.validate_subnet_mask(mask)
except Exception as e:
messagebox.showerror("Ungültige Eingabe", str(e))
return
# --- Preflight: Root + Bind-Test (SO_BINDTODEVICE) ---
try:
_preflight_require_root_and_bind(iface)
except PermissionError as e:
self.status_var.set("Start fehlgeschlagen: Rechte.")
messagebox.showerror("Start fehlgeschlagen (Rechte)", f"{e}\n\nBitte mit sudo starten.")
return
except OSError as e:
self.status_var.set("Start fehlgeschlagen: Bind.")
messagebox.showerror("Start fehlgeschlagen (Bind)", f"Konnte Port 67 auf {iface} nicht binden:\n{e}")
return
# --- Start ---
try:
self.server = DHCPServer()
self.server.start(iface, ip, mask)
self.status_var.set("Server gestartet.")
self._set_controls_enabled(False)
except Exception as e:
self.server = None
self.status_var.set("Start fehlgeschlagen.")
messagebox.showerror("Start fehlgeschlagen", str(e))
def stop_server(self) -> None:
if self.server:
self.server.stop()
self._set_controls_enabled(True)
self.status_var.set("Server gestoppt.")
def _append_log(self, lines):
if not lines:
return
self.log.configure(state="normal")
for ln in lines:
self.log.insert("end", ln + "\n")
self.log.see("end")
self.log.configure(state="disabled")
def _refresh(self) -> None:
# update clients
self.clients.delete(0, "end")
if self.server and self.server.is_running():
leases = self.server.get_leases()
for mac, ip in leases.items():
self.clients.insert("end", f"{mac}{ip}")
self.status_var.set(f"Status: {self.server.get_status()}")
# pull logs
self._append_log(self.server.pop_logs(100))
self.after(400, self._refresh)

View File

@@ -1,69 +1,16 @@
import tkinter as tk import tkinter as tk
from tkinter import ttk from gui import DHCPApp
from dhcp_server import DHCPServer
import utils
class DHCPApp: def main():
def __init__(self, root): root = tk.Tk()
self.root = root try:
self.root.title("Simple DHCP Server") import sv_ttk # optional pretty theme
sv_ttk.use_dark_theme()
self.server = None except Exception:
pass
self.interface_label = tk.Label(root, text="Select Network Interface:") app = DHCPApp(root)
self.interface_label.grid(row=0, column=0, padx=10, pady=10) root.geometry("860x540")
root.mainloop()
self.interface_var = tk.StringVar()
self.interface_dropdown = ttk.Combobox(root, textvariable=self.interface_var)
self.interface_dropdown['values'] = utils.get_network_interfaces()
self.interface_dropdown.grid(row=0, column=1, padx=10, pady=10)
self.ip_label = tk.Label(root, text="DHCP IP Range Start:")
self.ip_label.grid(row=1, column=0, padx=10, pady=10)
self.ip_entry = tk.Entry(root)
self.ip_entry.grid(row=1, column=1, padx=10, pady=10)
self.subnet_label = tk.Label(root, text="Subnet Mask:")
self.subnet_label.grid(row=2, column=0, padx=10, pady=10)
self.subnet_entry = tk.Entry(root)
self.subnet_entry.grid(row=2, column=1, padx=10, pady=10)
self.start_button = tk.Button(root, text="Start DHCP Server", command=self.start_server)
self.start_button.grid(row=3, column=0, padx=10, pady=10)
self.stop_button = tk.Button(root, text="Stop DHCP Server", command=self.stop_server)
self.stop_button.grid(row=3, column=1, padx=10, pady=10)
self.clients_label = tk.Label(root, text="Active Clients:")
self.clients_label.grid(row=4, column=0, padx=10, pady=10)
self.clients_listbox = tk.Listbox(root, width=50)
self.clients_listbox.grid(row=5, column=0, columnspan=2, padx=10, pady=10)
def start_server(self):
if not self.server:
ip_range = self.ip_entry.get()
subnet = self.subnet_entry.get()
interface = self.interface_var.get()
self.server = DHCPServer(interface, ip_range, subnet)
self.server.start()
self.update_clients()
def stop_server(self):
if self.server:
self.server.stop()
self.server = None
self.clients_listbox.delete(0, tk.END)
def update_clients(self):
if self.server:
self.clients_listbox.delete(0, tk.END)
for client in self.server.get_active_clients():
self.clients_listbox.insert(tk.END, client)
if __name__ == "__main__": if __name__ == "__main__":
root = tk.Tk() main()
app = DHCPApp(root)
root.mainloop()

View File

@@ -1,40 +1,33 @@
def get_available_interfaces():
import netifaces
return netifaces.interfaces()
def format_ip_address(ip):
try:
parts = ip.split('.')
if len(parts) != 4:
raise ValueError("Invalid IP address format")
return '.'.join(str(int(part)) for part in parts)
except ValueError as e:
raise ValueError(f"Error formatting IP address: {e}")
def validate_subnet_mask(mask):
valid_masks = [
'255.255.255.255', '255.255.255.254', '255.255.255.252',
'255.255.255.248', '255.255.255.240', '255.255.255.224',
'255.255.255.192', '255.255.255.128', '255.255.255.0',
'255.255.254.0', '255.255.252.0', '255.255.248.0',
'255.255.240.0', '255.255.224.0', '255.255.192.0',
'255.255.128.0', '255.255.0.0', '255.254.0.0',
'255.252.0.0', '255.248.0.0', '255.240.0.0',
'255.224.0.0', '255.192.0.0', '255.128.0.0',
'255.0.0.0', '254.0.0.0', '252.0.0.0',
'248.0.0.0', '240.0.0.0', '224.0.0.0',
'192.0.0.0', '128.0.0.0', '0.0.0.0'
]
if mask not in valid_masks:
raise ValueError("Invalid subnet mask")
import socket import socket
import ipaddress
import psutil import psutil
from typing import Optional, Tuple
def get_network_interfaces(): def get_network_interfaces():
"""Return only interfaces that have an IPv4 address to keep choices sane."""
interfaces = [] interfaces = []
for iface, addrs in psutil.net_if_addrs().items(): for iface, addrs in psutil.net_if_addrs().items():
for addr in addrs: for addr in addrs:
if addr.family == socket.AF_INET: if addr.family == socket.AF_INET:
interfaces.append(iface) interfaces.append(iface)
return interfaces break
return sorted(interfaces)
def get_iface_ipv4_config(iface: str) -> Tuple[Optional[str], Optional[str]]:
"""Return (ipv4_addr, netmask) for interface, or (None, None) if not present."""
addrs = psutil.net_if_addrs().get(iface, [])
for a in addrs:
if a.family == socket.AF_INET:
return a.address, a.netmask
return None, None
def format_ip_address(ip: str) -> str:
"""Normalize dotted quad; raises on invalid input."""
ip_obj = ipaddress.ip_address(ip)
if ip_obj.version != 4:
raise ValueError("Only IPv4 is supported.")
return str(ip_obj)
def validate_subnet_mask(mask: str) -> None:
"""Accept masks like 255.255.255.0; raise ValueError if invalid."""
ipaddress.IPv4Network(f"0.0.0.0/{mask}")

0
start.sh Normal file → Executable file
View File