# Project layout (drop-in) # # app/ # ├─ gui.py ← new main GUI with Simulator tabs + Save/Load # ├─ config.py (unchanged) # ├─ can.py (unchanged) # ├─ obd2.py (unchanged; GUI registers PIDs) # ├─ tabs/ # │ ├─ __init__.py # │ ├─ basic.py ← base/vehicle tab (ignition, mass, type, ABS/TCS) # │ ├─ engine.py ← engine tab # │ ├─ gearbox.py ← gearbox tab # │ └─ dtc.py ← DTC toggles tab # └─ simulation/ # ├─ __init__.py # ├─ simulator_main.py ← VehicleSimulator wrapper used by GUI # ├─ vehicle.py ← core state + module orchestration # └─ modules/ # ├─ __init__.py # ├─ engine.py # ├─ gearbox.py # └─ abs_.py # ============================= # app/gui.py # ============================= from __future__ import annotations import json import threading import time import tkinter as tk from tkinter import ttk, messagebox, filedialog from collections import deque import subprocess import can # for trace from .config import load_settings, setup_logging from .obd2 import ObdResponder, make_speed_response, make_rpm_response from .can import ( list_can_ifaces, link_up, link_down, link_state, link_kind, have_cap_netadmin ) # Simulator pieces from .simulation.simulator_main import VehicleSimulator from .tabs.basic import BasicTab from .tabs.engine import EngineTab from .tabs.gearbox import GearboxTab from .tabs.dtc import DtcTab from .tabs.dashboard import DashboardTab # ---------- CAN Trace Collector ---------- class TraceCollector: def __init__(self, channel: str): self.channel = channel self.bus = None self._run = threading.Event(); self._run.set() self._thread = threading.Thread(target=self._rx_loop, name="CAN-TRACE", daemon=True) self.stream_buffer = deque(maxlen=2000) self.lock = threading.Lock() def _open(self): self._close() self.bus = can.interface.Bus(channel=self.channel, interface="socketcan") def _close(self): try: if self.bus: self.bus.shutdown() except Exception: pass self.bus = None def start(self): self._thread.start() def stop(self): self._run.clear() try: self._thread.join(timeout=1.0) except RuntimeError: pass self._close() def _rx_loop(self): backoff = 0.5 while self._run.is_set(): if self.bus is None: if link_state(self.channel) == "UP": try: self._open(); backoff = 0.5 except Exception: time.sleep(backoff); backoff = min(5.0, backoff*1.7) continue else: time.sleep(0.5); continue try: msg = self.bus.recv(0.05) if msg and not msg.is_error_frame and not msg.is_remote_frame: ts = time.time() with self.lock: self.stream_buffer.append((ts, msg.arbitration_id, msg.dlc, bytes(msg.data))) except (can.CanOperationError, OSError): self._close(); time.sleep(0.5) except Exception: time.sleep(0.05) def snapshot_stream(self): with self.lock: return list(self.stream_buffer) # ============================= # GUI Launcher (reworked layout) # ============================= def launch_gui(): cfg = load_settings() logger = setup_logging(cfg) # Config can_iface = (cfg.get("can", {}).get("interface")) or "can0" resp_id_raw = (cfg.get("can", {}).get("resp_id")) or "0x7E8" try: resp_id = int(resp_id_raw, 16) if isinstance(resp_id_raw, str) else int(resp_id_raw) except Exception: resp_id = 0x7E8 timeout_ms = cfg.get("can", {}).get("timeout_ms", 200) bitrate = cfg.get("can", {}).get("baudrate", 500000) # Simulator sim = VehicleSimulator() # OBD2 responder responder = ObdResponder(interface=can_iface, resp_id=resp_id, timeout_ms=timeout_ms, logger=logger) responder.register_pid(0x0D, lambda: make_speed_response(int(round(sim.snapshot()["speed_kmh"])))) responder.register_pid(0x0C, lambda: make_rpm_response(int(sim.snapshot()["rpm"]))) # Physics thread running = True def physics_loop(): last = time.monotonic() while running: now = time.monotonic() dt = min(0.05, max(0.0, now - last)) last = now sim.update(dt) time.sleep(0.02) threading.Thread(target=physics_loop, daemon=True).start() tracer = TraceCollector(can_iface); tracer.start() # Tk window root = tk.Tk(); root.title("OBD-II ECU Simulator – SocketCAN") root.geometry(f"{cfg.get('ui',{}).get('window',{}).get('width',1100)}x{cfg.get('ui',{}).get('window',{}).get('height',720)}") family = cfg.get("ui", {}).get("font_family", "TkDefaultFont") size = int(cfg.get("ui", {}).get("font_size", 10)) style = ttk.Style() style.configure("TLabel", font=(family, size)) style.configure("Header.TLabel", font=(family, size+2, "bold")) style.configure("Small.TLabel", font=(family, max(8, size-1))) # Menu (Load/Save config) menubar = tk.Menu(root) filemenu = tk.Menu(menubar, tearoff=0) def action_load(): path = filedialog.askopenfilename(filetypes=[("JSON", "*.json"), ("All", "*.*")]) if not path: return try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) for tab in sim_tabs: tab.load_from_config(data) sim.load_config(data) messagebox.showinfo("Simulator", "Konfiguration geladen.") except Exception as e: messagebox.showerror("Laden fehlgeschlagen", str(e)) def action_save(): cfg_dict = sim.export_config() for tab in sim_tabs: tab.save_into_config(cfg_dict) path = filedialog.asksaveasfilename(defaultextension=".json", filetypes=[("JSON", "*.json"), ("All", "*.*")]) if not path: return try: with open(path, "w", encoding="utf-8") as f: json.dump(cfg_dict, f, indent=2) messagebox.showinfo("Simulator", "Konfiguration gespeichert.") except Exception as e: messagebox.showerror("Speichern fehlgeschlagen", str(e)) filemenu.add_command(label="Konfiguration laden…", command=action_load) filemenu.add_command(label="Konfiguration speichern…", command=action_save) filemenu.add_separator(); filemenu.add_command(label="Beenden", command=root.destroy) menubar.add_cascade(label="Datei", menu=filemenu) root.config(menu=menubar) # ===== New Layout ====================================================== # Grid with two rows: # Row 0: Left = CAN settings, Right = Simulator tabs # Row 1: Trace spanning both columns # ====================================================================== root.columnconfigure(0, weight=1) root.columnconfigure(1, weight=2) root.rowconfigure(1, weight=1) # trace grows # --- LEFT: CAN Settings ------------------------------------------------ can_frame = ttk.LabelFrame(root, text="CAN & Settings", padding=8) can_frame.grid(row=0, column=0, sticky="nsew", padx=(8,4), pady=(8,4)) for i in range(2): can_frame.columnconfigure(i, weight=1) ttk.Label(can_frame, text="Interface").grid(row=0, column=0, sticky="w") iface_var = tk.StringVar(value=can_iface) iface_list = list_can_ifaces() or [can_iface] iface_dd = ttk.Combobox(can_frame, textvariable=iface_var, values=iface_list, state="readonly", width=12) iface_dd.grid(row=0, column=1, sticky="ew", padx=(6,0)) def refresh_ifaces(): lst = list_can_ifaces() if not lst: messagebox.showwarning("Interfaces", "Keine can*/vcan* Interfaces gefunden.") return iface_dd.config(values=lst) ttk.Button(can_frame, text="Refresh", command=refresh_ifaces).grid(row=0, column=2, padx=(6,0)) ttk.Label(can_frame, text="RESP-ID (hex)").grid(row=1, column=0, sticky="w") resp_var = tk.StringVar(value=f"0x{resp_id:03X}") ttk.Entry(can_frame, textvariable=resp_var, width=10).grid(row=1, column=1, sticky="w", padx=(6,0)) ttk.Label(can_frame, text="Timeout (ms)").grid(row=2, column=0, sticky="w") to_var = tk.IntVar(value=int(timeout_ms)) ttk.Spinbox(can_frame, from_=10, to=5000, increment=10, textvariable=to_var, width=8).grid(row=2, column=1, sticky="w", padx=(6,0)) ttk.Label(can_frame, text="Bitrate").grid(row=3, column=0, sticky="w") br_var = tk.IntVar(value=int(bitrate)) ttk.Spinbox(can_frame, from_=20000, to=1000000, increment=10000, textvariable=br_var, width=10).grid(row=3, column=1, sticky="w", padx=(6,0)) set_params = tk.BooleanVar(value=True) ttk.Checkbutton(can_frame, text="Bitrate beim UP setzen", variable=set_params).grid(row=3, column=2, sticky="w") kind_label = ttk.Label(can_frame, text=f"Kind: {link_kind(can_iface)}", style="Small.TLabel") kind_label.grid(row=4, column=0, columnspan=3, sticky="w", pady=(4,0)) # Buttons row btns = ttk.Frame(can_frame) btns.grid(row=5, column=0, columnspan=3, sticky="ew", pady=(8,0)) btns.columnconfigure(0, weight=0) btns.columnconfigure(1, weight=0) btns.columnconfigure(2, weight=1) def do_link_up(): try: kind_label.config(text=f"Kind: {link_kind(iface_var.get())}") if link_state(iface_var.get()) == "UP": messagebox.showinfo("CAN", f"{iface_var.get()} ist bereits UP"); return link_up(iface_var.get(), bitrate=br_var.get(), fd=False, set_params=set_params.get()) try: out = subprocess.check_output(["ip", "-details", "-json", "link", "show", iface_var.get()], text=True) info = json.loads(out)[0] bt = (info.get("linkinfo", {}) or {}).get("info_data", {}).get("bittiming") or {} br = bt.get("bitrate"); sp = bt.get("sample-point") if br: messagebox.showinfo("CAN", f"{iface_var.get()} ist UP @ {br} bit/s (sample-point {sp})") except Exception: pass except Exception as e: messagebox.showerror("CAN", f"Link UP fehlgeschlagen:{e}") def do_link_down(): try: if link_state(iface_var.get()) == "DOWN": messagebox.showinfo("CAN", f"{iface_var.get()} ist bereits DOWN"); return link_down(iface_var.get()); messagebox.showinfo("CAN", f"{iface_var.get()} ist DOWN") except Exception as e: messagebox.showerror("CAN", f"Link DOWN fehlgeschlagen:{e}") ttk.Button(btns, text="Link UP", command=do_link_up).grid(row=0, column=0, sticky="w") ttk.Button(btns, text="Link DOWN", command=do_link_down).grid(row=0, column=1, sticky="w", padx=(6,0)) def do_rebind(): nonlocal can_iface, resp_id, timeout_ms, bitrate, tracer can_iface = iface_var.get() try: new_resp = int(resp_var.get(), 16) except Exception: messagebox.showerror("RESP-ID", "Bitte gültige Hex-Zahl, z.B. 0x7E8"); return resp_id = new_resp; timeout_ms = to_var.get(); bitrate = br_var.get() try: responder.rebind(interface=can_iface, resp_id=resp_id) tracer.stop(); tracer = TraceCollector(can_iface); tracer.start() messagebox.showinfo("CAN", f"Responder neu gebunden: {can_iface}, RESP 0x{resp_id:03X}") except Exception as e: messagebox.showerror("CAN", f"Rebind fehlgeschlagen:{e}") ttk.Button(btns, text="Responder Rebind", command=do_rebind).grid(row=0, column=2, sticky="w", padx=(12,0)) ttk.Label(can_frame, text=f"CAP_NET_ADMIN: {'yes' if have_cap_netadmin() else 'no'}", style="Small.TLabel")\ .grid(row=6, column=0, columnspan=3, sticky="w", pady=(6,0)) # --- RIGHT: Simulator Tabs -------------------------------------------- right = ttk.Frame(root) right.grid(row=0, column=1, sticky="nsew", padx=(4,8), pady=(8,4)) right.columnconfigure(0, weight=1) right.rowconfigure(0, weight=1) nb_sim = ttk.Notebook(right) nb_sim.grid(row=0, column=0, sticky="nsew") basics_tab = BasicTab(nb_sim, sim) engine_tab = EngineTab(nb_sim, sim) gearbox_tab = GearboxTab(nb_sim, sim) dtc_tab = DtcTab(nb_sim, sim) dashboard_tab = DashboardTab(nb_sim, sim) sim_tabs = [basics_tab, engine_tab, gearbox_tab, dtc_tab, dashboard_tab] nb_sim.add(basics_tab.frame, text="Basisdaten") nb_sim.add(engine_tab.frame, text="Motor") nb_sim.add(gearbox_tab.frame, text="Getriebe") nb_sim.add(dtc_tab.frame, text="DTCs") nb_sim.add(dashboard_tab.frame, text="Dashboard") # --- BOTTOM: Trace (spans both columns) ------------------------------- trace_frame = ttk.LabelFrame(root, text="CAN Trace", padding=6) trace_frame.grid(row=1, column=0, columnspan=2, sticky="nsew", padx=8, pady=(0,8)) trace_frame.columnconfigure(0, weight=1) trace_frame.rowconfigure(1, weight=1) ctrl = ttk.Frame(trace_frame) ctrl.grid(row=0, column=0, sticky="ew", pady=(0,4)) ctrl.columnconfigure(5, weight=1) mode_var = tk.StringVar(value="stream") ttk.Label(ctrl, text="Modus:").grid(row=0, column=0, sticky="w") ttk.Combobox(ctrl, textvariable=mode_var, state="readonly", width=10, values=["stream","aggregate"])\ .grid(row=0, column=1, sticky="w", padx=(4,12)) paused = tk.BooleanVar(value=False) ttk.Checkbutton(ctrl, text="Pause", variable=paused).grid(row=0, column=2, sticky="w") autoscroll = tk.BooleanVar(value=True) ttk.Checkbutton(ctrl, text="Auto-Scroll", variable=autoscroll).grid(row=0, column=3, sticky="w") tree = ttk.Treeview(trace_frame, columns=("time","dir","id","dlc","data"), show="headings", height=10) tree.grid(row=1, column=0, sticky="nsew") sb_y = ttk.Scrollbar(trace_frame, orient="vertical", command=tree.yview) tree.configure(yscrollcommand=sb_y.set); sb_y.grid(row=1, column=1, sticky="ns") def fmt_time(ts: float) -> str: lt = time.localtime(ts) return time.strftime("%H:%M:%S", lt) + f".{int((ts%1)*1000):03d}" def fmt_id(i: int) -> str: return f"0x{i:03X}" def fmt_data(b: bytes) -> str: return " ".join(f"{x:02X}" for x in b) last_index = 0 def tick(): nonlocal last_index snap = sim.snapshot() # Optional: könnte in eine Statusbar ausgelagert werden root.title(f"OBD-II ECU Simulator – RPM {int(snap['rpm'])} | {int(round(snap['speed_kmh']))} km/h") if not paused.get(): mode = mode_var.get() buf = tracer.snapshot_stream() if mode == "stream": for ts, cid, dlc, data in buf[last_index:]: d = "RX" if cid == 0x7DF else ("TX" if cid == responder.resp_id else "?") tree.insert("", "end", values=(fmt_time(ts), d, fmt_id(cid), dlc, fmt_data(data))) if autoscroll.get() and buf[last_index:]: tree.see(tree.get_children()[-1]) else: tree.delete(*tree.get_children()) agg = {} for ts, cid, dlc, data in buf: d = "RX" if cid == 0x7DF else ("TX" if cid == responder.resp_id else "?") key = (cid, d) e = agg.get(key) if not e: agg[key] = {"count":1, "last_ts":ts, "last_dlc":dlc, "last_data":data} else: e["count"] += 1 if ts >= e["last_ts"]: e["last_ts"], e["last_dlc"], e["last_data"] = ts, dlc, data for (cid, d) in sorted(agg.keys(), key=lambda k:(k[0], 0 if k[1]=="RX" else 1)): e = agg[(cid, d)] tree.insert("", "end", values=(fmt_id(cid), d, e["count"], fmt_time(e["last_ts"]), e["last_dlc"], fmt_data(e["last_data"])) ) last_index = len(buf) root.after(50, tick) tick() def on_close(): nonlocal running running = False try: tracer.stop() except Exception: pass try: responder.stop() finally: root.destroy() root.protocol("WM_DELETE_WINDOW", on_close) root.mainloop()