Files
OBD2-Simulator/app/gui.py

405 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Project layout (drop-in)
#
# app/
# ├─ gui.py ← new main GUI with Simulator tabs + Save/Load
# ├─ config.py (unchanged)
# ├─ can.py (unchanged)
# ├─ obd2.py (unchanged; GUI registers PIDs)
# ├─ tabs/
# │ ├─ __init__.py
# │ ├─ basic.py ← base/vehicle tab (ignition, mass, type, ABS/TCS)
# │ ├─ engine.py ← engine tab
# │ ├─ gearbox.py ← gearbox tab
# │ └─ dtc.py ← DTC toggles tab
# └─ simulation/
# ├─ __init__.py
# ├─ simulator_main.py ← VehicleSimulator wrapper used by GUI
# ├─ vehicle.py ← core state + module orchestration
# └─ modules/
# ├─ __init__.py
# ├─ engine.py
# ├─ gearbox.py
# └─ abs_.py
# =============================
# app/gui.py
# =============================
from __future__ import annotations
import json
import threading
import time
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
from collections import deque
import subprocess
import can # for trace
from .config import load_settings, setup_logging
from .obd2 import ObdResponder, make_speed_response, make_rpm_response
from .can import (
list_can_ifaces, link_up, link_down, link_state, link_kind,
have_cap_netadmin
)
# Simulator pieces
from .simulation.simulator_main import VehicleSimulator
from .tabs.basic import BasicTab
from .tabs.engine import EngineTab
from .tabs.gearbox import GearboxTab
from .tabs.dtc import DtcTab
from .tabs.dashboard import DashboardTab
# ---------- CAN Trace Collector ----------
class TraceCollector:
def __init__(self, channel: str):
self.channel = channel
self.bus = None
self._run = threading.Event(); self._run.set()
self._thread = threading.Thread(target=self._rx_loop, name="CAN-TRACE", daemon=True)
self.stream_buffer = deque(maxlen=2000)
self.lock = threading.Lock()
def _open(self):
self._close()
self.bus = can.interface.Bus(channel=self.channel, interface="socketcan")
def _close(self):
try:
if self.bus: self.bus.shutdown()
except Exception:
pass
self.bus = None
def start(self):
self._thread.start()
def stop(self):
self._run.clear()
try:
self._thread.join(timeout=1.0)
except RuntimeError:
pass
self._close()
def _rx_loop(self):
backoff = 0.5
while self._run.is_set():
if self.bus is None:
if link_state(self.channel) == "UP":
try:
self._open(); backoff = 0.5
except Exception:
time.sleep(backoff); backoff = min(5.0, backoff*1.7)
continue
else:
time.sleep(0.5); continue
try:
msg = self.bus.recv(0.05)
if msg and not msg.is_error_frame and not msg.is_remote_frame:
ts = time.time()
with self.lock:
self.stream_buffer.append((ts, msg.arbitration_id, msg.dlc, bytes(msg.data)))
except (can.CanOperationError, OSError):
self._close(); time.sleep(0.5)
except Exception:
time.sleep(0.05)
def snapshot_stream(self):
with self.lock:
return list(self.stream_buffer)
# =============================
# GUI Launcher (reworked layout)
# =============================
def launch_gui():
cfg = load_settings()
logger = setup_logging(cfg)
# Config
can_iface = (cfg.get("can", {}).get("interface")) or "can0"
resp_id_raw = (cfg.get("can", {}).get("resp_id")) or "0x7E8"
try:
resp_id = int(resp_id_raw, 16) if isinstance(resp_id_raw, str) else int(resp_id_raw)
except Exception:
resp_id = 0x7E8
timeout_ms = cfg.get("can", {}).get("timeout_ms", 200)
bitrate = cfg.get("can", {}).get("baudrate", 500000)
# Simulator
sim = VehicleSimulator()
# OBD2 responder
responder = ObdResponder(interface=can_iface, resp_id=resp_id, timeout_ms=timeout_ms, logger=logger)
responder.register_pid(0x0D, lambda: make_speed_response(int(round(sim.snapshot()["speed_kmh"]))))
responder.register_pid(0x0C, lambda: make_rpm_response(int(sim.snapshot()["rpm"])))
# Physics thread
running = True
def physics_loop():
last = time.monotonic()
while running:
now = time.monotonic()
dt = min(0.05, max(0.0, now - last))
last = now
sim.update(dt)
time.sleep(0.02)
threading.Thread(target=physics_loop, daemon=True).start()
tracer = TraceCollector(can_iface); tracer.start()
# Tk window
root = tk.Tk(); root.title("OBD-II ECU Simulator SocketCAN")
root.geometry(f"{cfg.get('ui',{}).get('window',{}).get('width',1100)}x{cfg.get('ui',{}).get('window',{}).get('height',720)}")
family = cfg.get("ui", {}).get("font_family", "TkDefaultFont")
size = int(cfg.get("ui", {}).get("font_size", 10))
style = ttk.Style()
style.configure("TLabel", font=(family, size))
style.configure("Header.TLabel", font=(family, size+2, "bold"))
style.configure("Small.TLabel", font=(family, max(8, size-1)))
# Menu (Load/Save config)
menubar = tk.Menu(root)
filemenu = tk.Menu(menubar, tearoff=0)
def action_load():
path = filedialog.askopenfilename(filetypes=[("JSON", "*.json"), ("All", "*.*")])
if not path: return
try:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
for tab in sim_tabs: tab.load_from_config(data)
sim.load_config(data)
messagebox.showinfo("Simulator", "Konfiguration geladen.")
except Exception as e:
messagebox.showerror("Laden fehlgeschlagen", str(e))
def action_save():
cfg_dict = sim.export_config()
for tab in sim_tabs: tab.save_into_config(cfg_dict)
path = filedialog.asksaveasfilename(defaultextension=".json", filetypes=[("JSON", "*.json"), ("All", "*.*")])
if not path: return
try:
with open(path, "w", encoding="utf-8") as f:
json.dump(cfg_dict, f, indent=2)
messagebox.showinfo("Simulator", "Konfiguration gespeichert.")
except Exception as e:
messagebox.showerror("Speichern fehlgeschlagen", str(e))
filemenu.add_command(label="Konfiguration laden…", command=action_load)
filemenu.add_command(label="Konfiguration speichern…", command=action_save)
filemenu.add_separator(); filemenu.add_command(label="Beenden", command=root.destroy)
menubar.add_cascade(label="Datei", menu=filemenu)
root.config(menu=menubar)
# ===== New Layout ======================================================
# Grid with two rows:
# Row 0: Left = CAN settings, Right = Simulator tabs
# Row 1: Trace spanning both columns
# ======================================================================
root.columnconfigure(0, weight=1)
root.columnconfigure(1, weight=2)
root.rowconfigure(1, weight=1) # trace grows
# --- LEFT: CAN Settings ------------------------------------------------
can_frame = ttk.LabelFrame(root, text="CAN & Settings", padding=8)
can_frame.grid(row=0, column=0, sticky="nsew", padx=(8,4), pady=(8,4))
for i in range(2): can_frame.columnconfigure(i, weight=1)
ttk.Label(can_frame, text="Interface").grid(row=0, column=0, sticky="w")
iface_var = tk.StringVar(value=can_iface)
iface_list = list_can_ifaces() or [can_iface]
iface_dd = ttk.Combobox(can_frame, textvariable=iface_var, values=iface_list, state="readonly", width=12)
iface_dd.grid(row=0, column=1, sticky="ew", padx=(6,0))
def refresh_ifaces():
lst = list_can_ifaces()
if not lst:
messagebox.showwarning("Interfaces", "Keine can*/vcan* Interfaces gefunden.")
return
iface_dd.config(values=lst)
ttk.Button(can_frame, text="Refresh", command=refresh_ifaces).grid(row=0, column=2, padx=(6,0))
ttk.Label(can_frame, text="RESP-ID (hex)").grid(row=1, column=0, sticky="w")
resp_var = tk.StringVar(value=f"0x{resp_id:03X}")
ttk.Entry(can_frame, textvariable=resp_var, width=10).grid(row=1, column=1, sticky="w", padx=(6,0))
ttk.Label(can_frame, text="Timeout (ms)").grid(row=2, column=0, sticky="w")
to_var = tk.IntVar(value=int(timeout_ms))
ttk.Spinbox(can_frame, from_=10, to=5000, increment=10, textvariable=to_var, width=8).grid(row=2, column=1, sticky="w", padx=(6,0))
ttk.Label(can_frame, text="Bitrate").grid(row=3, column=0, sticky="w")
br_var = tk.IntVar(value=int(bitrate))
ttk.Spinbox(can_frame, from_=20000, to=1000000, increment=10000, textvariable=br_var, width=10).grid(row=3, column=1, sticky="w", padx=(6,0))
set_params = tk.BooleanVar(value=True)
ttk.Checkbutton(can_frame, text="Bitrate beim UP setzen", variable=set_params).grid(row=3, column=2, sticky="w")
kind_label = ttk.Label(can_frame, text=f"Kind: {link_kind(can_iface)}", style="Small.TLabel")
kind_label.grid(row=4, column=0, columnspan=3, sticky="w", pady=(4,0))
# Buttons row
btns = ttk.Frame(can_frame)
btns.grid(row=5, column=0, columnspan=3, sticky="ew", pady=(8,0))
btns.columnconfigure(0, weight=0)
btns.columnconfigure(1, weight=0)
btns.columnconfigure(2, weight=1)
def do_link_up():
try:
kind_label.config(text=f"Kind: {link_kind(iface_var.get())}")
if link_state(iface_var.get()) == "UP":
messagebox.showinfo("CAN", f"{iface_var.get()} ist bereits UP"); return
link_up(iface_var.get(), bitrate=br_var.get(), fd=False, set_params=set_params.get())
try:
out = subprocess.check_output(["ip", "-details", "-json", "link", "show", iface_var.get()], text=True)
info = json.loads(out)[0]
bt = (info.get("linkinfo", {}) or {}).get("info_data", {}).get("bittiming") or {}
br = bt.get("bitrate"); sp = bt.get("sample-point")
if br:
messagebox.showinfo("CAN", f"{iface_var.get()} ist UP @ {br} bit/s (sample-point {sp})")
except Exception:
pass
except Exception as e:
messagebox.showerror("CAN", f"Link UP fehlgeschlagen:{e}")
def do_link_down():
try:
if link_state(iface_var.get()) == "DOWN":
messagebox.showinfo("CAN", f"{iface_var.get()} ist bereits DOWN"); return
link_down(iface_var.get()); messagebox.showinfo("CAN", f"{iface_var.get()} ist DOWN")
except Exception as e:
messagebox.showerror("CAN", f"Link DOWN fehlgeschlagen:{e}")
ttk.Button(btns, text="Link UP", command=do_link_up).grid(row=0, column=0, sticky="w")
ttk.Button(btns, text="Link DOWN", command=do_link_down).grid(row=0, column=1, sticky="w", padx=(6,0))
def do_rebind():
nonlocal can_iface, resp_id, timeout_ms, bitrate, tracer
can_iface = iface_var.get()
try:
new_resp = int(resp_var.get(), 16)
except Exception:
messagebox.showerror("RESP-ID", "Bitte gültige Hex-Zahl, z.B. 0x7E8"); return
resp_id = new_resp; timeout_ms = to_var.get(); bitrate = br_var.get()
try:
responder.rebind(interface=can_iface, resp_id=resp_id)
tracer.stop(); tracer = TraceCollector(can_iface); tracer.start()
messagebox.showinfo("CAN", f"Responder neu gebunden: {can_iface}, RESP 0x{resp_id:03X}")
except Exception as e:
messagebox.showerror("CAN", f"Rebind fehlgeschlagen:{e}")
ttk.Button(btns, text="Responder Rebind", command=do_rebind).grid(row=0, column=2, sticky="w", padx=(12,0))
ttk.Label(can_frame, text=f"CAP_NET_ADMIN: {'yes' if have_cap_netadmin() else 'no'}", style="Small.TLabel")\
.grid(row=6, column=0, columnspan=3, sticky="w", pady=(6,0))
# --- RIGHT: Simulator Tabs --------------------------------------------
right = ttk.Frame(root)
right.grid(row=0, column=1, sticky="nsew", padx=(4,8), pady=(8,4))
right.columnconfigure(0, weight=1)
right.rowconfigure(0, weight=1)
nb_sim = ttk.Notebook(right)
nb_sim.grid(row=0, column=0, sticky="nsew")
basics_tab = BasicTab(nb_sim, sim)
engine_tab = EngineTab(nb_sim, sim)
gearbox_tab = GearboxTab(nb_sim, sim)
dtc_tab = DtcTab(nb_sim, sim)
dashboard_tab = DashboardTab(nb_sim, sim)
sim_tabs = [basics_tab, engine_tab, gearbox_tab, dtc_tab, dashboard_tab]
nb_sim.add(basics_tab.frame, text="Basisdaten")
nb_sim.add(engine_tab.frame, text="Motor")
nb_sim.add(gearbox_tab.frame, text="Getriebe")
nb_sim.add(dtc_tab.frame, text="DTCs")
nb_sim.add(dashboard_tab.frame, text="Dashboard")
# --- BOTTOM: Trace (spans both columns) -------------------------------
trace_frame = ttk.LabelFrame(root, text="CAN Trace", padding=6)
trace_frame.grid(row=1, column=0, columnspan=2, sticky="nsew", padx=8, pady=(0,8))
trace_frame.columnconfigure(0, weight=1)
trace_frame.rowconfigure(1, weight=1)
ctrl = ttk.Frame(trace_frame)
ctrl.grid(row=0, column=0, sticky="ew", pady=(0,4))
ctrl.columnconfigure(5, weight=1)
mode_var = tk.StringVar(value="stream")
ttk.Label(ctrl, text="Modus:").grid(row=0, column=0, sticky="w")
ttk.Combobox(ctrl, textvariable=mode_var, state="readonly", width=10, values=["stream","aggregate"])\
.grid(row=0, column=1, sticky="w", padx=(4,12))
paused = tk.BooleanVar(value=False)
ttk.Checkbutton(ctrl, text="Pause", variable=paused).grid(row=0, column=2, sticky="w")
autoscroll = tk.BooleanVar(value=True)
ttk.Checkbutton(ctrl, text="Auto-Scroll", variable=autoscroll).grid(row=0, column=3, sticky="w")
tree = ttk.Treeview(trace_frame, columns=("time","dir","id","dlc","data"), show="headings", height=10)
tree.grid(row=1, column=0, sticky="nsew")
sb_y = ttk.Scrollbar(trace_frame, orient="vertical", command=tree.yview)
tree.configure(yscrollcommand=sb_y.set); sb_y.grid(row=1, column=1, sticky="ns")
def fmt_time(ts: float) -> str:
lt = time.localtime(ts)
return time.strftime("%H:%M:%S", lt) + f".{int((ts%1)*1000):03d}"
def fmt_id(i: int) -> str: return f"0x{i:03X}"
def fmt_data(b: bytes) -> str: return " ".join(f"{x:02X}" for x in b)
last_index = 0
def tick():
nonlocal last_index
snap = sim.snapshot()
# Optional: könnte in eine Statusbar ausgelagert werden
root.title(f"OBD-II ECU Simulator RPM {int(snap['rpm'])} | {int(round(snap['speed_kmh']))} km/h")
if not paused.get():
mode = mode_var.get()
buf = tracer.snapshot_stream()
if mode == "stream":
for ts, cid, dlc, data in buf[last_index:]:
d = "RX" if cid == 0x7DF else ("TX" if cid == responder.resp_id else "?")
tree.insert("", "end", values=(fmt_time(ts), d, fmt_id(cid), dlc, fmt_data(data)))
if autoscroll.get() and buf[last_index:]:
tree.see(tree.get_children()[-1])
else:
tree.delete(*tree.get_children())
agg = {}
for ts, cid, dlc, data in buf:
d = "RX" if cid == 0x7DF else ("TX" if cid == responder.resp_id else "?")
key = (cid, d)
e = agg.get(key)
if not e:
agg[key] = {"count":1, "last_ts":ts, "last_dlc":dlc, "last_data":data}
else:
e["count"] += 1
if ts >= e["last_ts"]:
e["last_ts"], e["last_dlc"], e["last_data"] = ts, dlc, data
for (cid, d) in sorted(agg.keys(), key=lambda k:(k[0], 0 if k[1]=="RX" else 1)):
e = agg[(cid, d)]
tree.insert("", "end", values=(fmt_id(cid), d, e["count"], fmt_time(e["last_ts"]), e["last_dlc"], fmt_data(e["last_data"])) )
last_index = len(buf)
root.after(50, tick)
tick()
def on_close():
nonlocal running
running = False
try: tracer.stop()
except Exception: pass
try: responder.stop()
finally:
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_close)
root.mainloop()