From 8195e570b394fdfcc3775f65af2cd58957cd0be9 Mon Sep 17 00:00:00 2001 From: Marcel Peterkau Date: Sun, 24 Aug 2025 23:46:59 +0200 Subject: [PATCH] first commit --- .gitignore | 45 +++++ README.md | 137 +++++++++++++++ app/__init__.py | 0 app/can.py | 303 ++++++++++++++++++++++++++++++++ app/config.py | 62 +++++++ app/gui.py | 440 +++++++++++++++++++++++++++++++++++++++++++++++ app/simulator.py | 66 +++++++ logs/.gitkeep | 0 main.py | 4 + requirements.txt | 2 + start.sh | 18 ++ 11 files changed, 1077 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 app/__init__.py create mode 100644 app/can.py create mode 100644 app/config.py create mode 100644 app/gui.py create mode 100644 app/simulator.py create mode 100644 logs/.gitkeep create mode 100644 main.py create mode 100644 requirements.txt create mode 100755 start.sh diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..629823c --- /dev/null +++ b/.gitignore @@ -0,0 +1,45 @@ +# Python +__pycache__/ +*.py[cod] +*.pyo +*.pyd +*.pkl +*.pklz +*.egg-info/ +*.egg +*.manifest +*.spec + +# Build +build/ +dist/ +.eggs/ + +# Logs (Ordner behalten, Dateien ignorieren) +logs/* +!logs/.gitkeep +*.log + +# Virtual Environments +venv/ +.env/ +.venv/ + +# System-Dateien +.DS_Store +Thumbs.db + +# Backup-Dateien +*.bak +*.tmp +*.swp +*.swo + +# Editor/IDE +.vscode/ +.idea/ + +# Projekt-spezifische +settings.json +settings.json.bak +tmp/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..9d6a12f --- /dev/null +++ b/README.md @@ -0,0 +1,137 @@ +# OBD-II ECU Simulator für SocketCAN + +Dieses Projekt simuliert ein OBD-II Steuergerät (ECU) über SocketCAN auf Linux. +Es eignet sich, um Embedded-Hardware (z. B. den Kettenöler) zu testen, ohne ein echtes Fahrzeug anschließen zu müssen. + +## Features + +- **OBD-II Responder (11-bit IDs, ISO 15765-4)** + - Unterstützt Mode 01 / PID `0x0D` (Geschwindigkeit) + - Unterstützt Mode 01 / PID `0x0C` (Motordrehzahl) + +- **Tkinter-GUI** + - Gangwahl (N/0 – 6) + - Gasregler (0–100 %) + - Anzeige von Geschwindigkeit & Drehzahl (berechnet über einfaches Antriebsmodell) + - CAN-Interface Auswahl, Bitrate, Timeout, RESP-ID + - Link-Up/Down Steuerung direkt in der GUI + - Settings in `settings.json` speicherbar + - **Trace-Fenster**: ähnlich CANalyzer Light + - Stream-Modus (alle Frames) + - Aggregat-Modus (eine Zeile pro ID+Richtung) + +- **Robust gegen Interface-Down/Up** + - Erkennt automatisch, wenn `can0` (oder anderes IF) Down geht + - Öffnet Bus neu, sobald wieder Up + +## Voraussetzungen + +- Linux mit SocketCAN Support +- Python ≥ 3.10 +- Pakete: + ```bash + pip install -r requirements.txt +```` + +(enthält `python-can`, `pyroute2`, `typing_extensions`, `wrapt`, `packaging`) + +* Ein CAN-Interface (z. B. [CANable](https://canable.io/), `gs_usb`, Peak, vcan) + +## Installation & Start + +```bash +git clone +cd Kettenöler-Testsoftware +python -m venv .venv +source .venv/bin/activate +pip install -r requirements.txt +./start.sh +``` + +Das GUI startet und öffnet automatisch den Responder. +Über die Buttons „Link UP/DOWN“ kann das CAN-Interface hoch-/runtergefahren werden. + +## Rechte / Berechtigungen + +Um CAN-Links ohne `sudo` hoch/runter zu setzen, benötigt der Python-Interpreter die Capability `CAP_NET_ADMIN`: + +```bash +sudo setcap cap_net_admin,cap_net_raw=eip "$(readlink -f .venv/bin/python)" +getcap "$(readlink -f .venv/bin/python)" +``` + +Alternativ: App mit `sudo ./start.sh` starten. + +## Nutzung + +1. **Interface vorbereiten** + Klassisches CAN-Interface: + + ```bash + sudo ip link set can0 down + sudo ip link set can0 type can bitrate 500000 + sudo ip link set can0 up + ``` + + Virtuelles Interface (nur Softwaretests): + + ```bash + sudo modprobe vcan + sudo ip link add dev vcan0 type vcan + sudo ip link set vcan0 up + ``` + +2. **Simulator starten** + GUI öffnen (`./start.sh`). + +3. **Test mit can-utils** + Terminal 1: + + ```bash + candump can0 + ``` + + Terminal 2: Anfrage nach Geschwindigkeit: + + ```bash + cansend can0 7DF#02010D0000000000 + ``` + + Erwartung: + + * Request `7DF` + * Response `7E8` mit `03 41 0D ` + + Anfrage nach Drehzahl: + + ```bash + cansend can0 7DF#02010C0000000000 + ``` + +4. **Trace im GUI** + + * „Stream“: zeigt alle Frames einzeln + * „Aggregate“: fasst pro CAN-ID zusammen (Count, Last Data) + +## Projektstruktur + +``` +main.py – Startpunkt +app/ + ├─ gui.py – Tkinter GUI + ├─ can.py – CAN-Responder + Link-Control (pyroute2) + ├─ simulator.py – Physikmodell (Gang + Gas → Geschwindigkeit/RPM) + └─ config.py – Settings + Logging +settings.json – Konfigurationsdatei (wird beim Speichern erzeugt) +``` + +## Bekannte Einschränkungen + +* Nur wenige PIDs implementiert (0x0C, 0x0D). +* Antwort immer mit fixer DLC=8. +* Einfaches Driveline-Modell (keine realistische Fahrzeugphysik). +* Trace-Fenster ist eine Light-Variante, kein vollwertiger CANalyzer. + +## Lizenz + +\[MIT] oder \[GPLv3] – bitte je nach Projektziel eintragen. \ No newline at end of file diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/can.py b/app/can.py new file mode 100644 index 0000000..d6e4ffc --- /dev/null +++ b/app/can.py @@ -0,0 +1,303 @@ +# can.py — SocketCAN OBD-II Responder + Link-Control +from __future__ import annotations +import logging +import threading +import time +import os +import sys +import subprocess +from typing import Callable, Dict, Optional, List +from pyroute2 import IPRoute, NetlinkError +import json +import can + +OBD_REQ_ID = 0x7DF +PID_SPEED = 0x0D +PID_RPM = 0x0C + +CAP_NET_ADMIN_BIT = 12 # Linux CAP_NET_ADMIN + +def have_cap_netadmin() -> bool: + if os.geteuid() == 0: + return True + try: + with open("/proc/self/status", "r", encoding="utf-8") as f: + for line in f: + if line.startswith("CapEff:"): + mask = int(line.split()[1], 16) + return bool(mask & (1 << CAP_NET_ADMIN_BIT)) + except Exception: + pass + return False + +def need_caps_message() -> str: + exe = os.path.realpath(sys.executable) + return ( + "Keine Berechtigung für 'ip link'.\n\n" + "Option A) Als root starten (sudo)\n" + "Option B) Capabilities auf das laufende Python setzen:\n" + f" sudo setcap cap_net_admin,cap_net_raw=eip \"{exe}\"\n" + f" getcap \"{exe}\"\n" + ) + +def list_can_ifaces() -> List[str]: + """Listet verfügbare CAN/vCAN-Interfaces via `ip -json link`.""" + try: + out = subprocess.check_output(["ip", "-json", "link"], text=True) + import json + items = json.loads(out) + names = [i["ifname"] for i in items if i.get("link_type") in ("can", None) and (i["ifname"].startswith("can") or i["ifname"].startswith("vcan"))] + return sorted(set(names)) + except Exception: + # Fallback: /sys/class/net + try: + import os + names = [n for n in os.listdir("/sys/class/net") if n.startswith(("can", "vcan"))] + return sorted(set(names)) + except Exception: + return [] + +def _link_info(ipr: IPRoute, iface: str): + idxs = ipr.link_lookup(ifname=iface) + if not idxs: + raise RuntimeError(f"Interface '{iface}' nicht gefunden") + info = ipr.get_links(idxs[0])[0] + kind = "none" + for k, v in info.get("attrs", []): + if k == "IFLA_LINKINFO": + for kk, vv in v.get("attrs", []): + if kk == "IFLA_INFO_KIND": + kind = str(vv) + break + return idxs[0], kind, info + +def link_state(iface: str) -> str: + try: + with IPRoute() as ipr: + idx, _, info = _link_info(ipr, iface) + st = info.get("state") + if isinstance(st, str): + return st.upper() + for k, v in info.get("attrs", []): + if k == "IFLA_OPERSTATE": + return str(v).upper() + except Exception: + pass + return "UNKNOWN" + +def link_up(iface: str, bitrate: int = 500000, fd: bool = False, set_params: bool = True) -> None: + with IPRoute() as ipr: + idx, kind, _ = _link_info(ipr, iface) + + # Wenn bereits UP: nichts tun. + try: + cur = ipr.get_links(idx)[0] + if (cur.get("state") or "").upper() == "UP": + return + except Exception: + pass + + # 1) down (ignoriere "invalid argument" / already down) + try: + ipr.link("set", index=idx, state="down") + except NetlinkError as e: + if e.code not in (0, 22): + raise RuntimeError(f"Netlink error bei '{iface}' (down): {e.code} {e}") from e + + # 2) optional: Parameter nur bei 'can' – EINVAL ignorieren + if set_params and kind == "can": + try: + ipr.link("set", index=idx, kind="can", data={"bitrate": int(bitrate)}) + except NetlinkError as e: + if e.code != 22: # alles außer EINVAL weiterreichen + raise RuntimeError(f"Netlink error bei '{iface}' (bitrate): {e.code} {e}") from e + # EINVAL -> Treiber mag Param-Change jetzt nicht -> einfach weitermachen + + # 3) up (hier darf es notfalls knallen) + try: + ipr.link("set", index=idx, state="up") + except NetlinkError as e: + raise RuntimeError(f"Netlink error bei '{iface}' (up): {e.code} {e}") from e + +def link_down(iface: str) -> None: + with IPRoute() as ipr: + idx, _, _ = _link_info(ipr, iface) + try: + ipr.link("set", index=idx, state="down") + except NetlinkError as e: + if e.code not in (0, 22): # ignore EINVAL/OK + raise RuntimeError(f"Netlink error bei '{iface}' (down): {e.code} {e}") from e + +def link_kind(iface: str) -> str: + """liefert nur das INFO_KIND (z.B. 'can', 'slcan', 'vcan', 'none')""" + try: + with IPRoute() as ipr: + _, kind, _ = _link_info(ipr, iface) + return kind + except Exception: + return "unknown" + + +class ObdResponder: + """ + OBD-II Mode-01 PID-Responder über SocketCAN (11-bit). + Non-blocking, threadsicher, mit Rebind-Funktion und robustem Reopen, + falls das Interface DOWN ist. + """ + def __init__( + self, + interface: str, + resp_id: int, + timeout_ms: int = 200, + logger: Optional[logging.Logger] = None, + ): + self.interface = interface + self.resp_id = resp_id + self.timeout_ms = timeout_ms + self.log = logger or logging.getLogger("obdcan") + + # PID-Provider: pid -> callable() -> 8-Byte-Payload + self.providers: Dict[int, Callable[[], bytes]] = {} + + # Laufzustand / CAN-Ressourcen + self._run = threading.Event() + self._run.set() + self.bus: Optional[can.BusABC] = None + self.reader: Optional[can.BufferedReader] = None + self.notifier: Optional[can.Notifier] = None + + # Service-Thread, der bei IF=UP den Bus öffnet und RX abwickelt + self._thread = threading.Thread( + target=self._service_loop, name="OBD-SVC", daemon=True + ) + self._thread.start() + + # ---------- Lifecycle ---------- + def _open_bus(self) -> None: + # ggf. alte Ressourcen schließen, dann neu öffnen + self._close_bus() + self._close_bus() + self.bus = can.interface.Bus(channel=self.interface, interface="socketcan") + self.log.info("OBD responder started on %s (resp_id=0x%03X)", self.interface, self.resp_id) + self.log.info( + "OBD responder started on %s (resp_id=0x%03X)", + self.interface, self.resp_id + ) + + def _close_bus(self) -> None: + try: + if self.notifier: + self.notifier.stop() + except Exception: + pass + try: + if self.bus: + self.bus.shutdown() + except Exception: + pass + self.bus = None + + def stop(self) -> None: + self._run.clear() + try: + self._thread.join(timeout=1.0) + except RuntimeError: + pass + self._close_bus() + + def rebind(self, interface: Optional[str] = None, resp_id: Optional[int] = None) -> None: + if interface is not None: + self.interface = interface + if resp_id is not None: + self.resp_id = resp_id + # Bus schließen; Service-Loop öffnet ihn wieder, sobald IF=UP ist + self._close_bus() + self.log.info("Rebind requested: %s, resp=0x%03X", self.interface, self.resp_id) + + # ---------- Öffentliche API ---------- + def register_pid(self, pid: int, provider: Callable[[], bytes]) -> None: + self.providers[pid] = provider + + # ---------- Service-Loop (robust gegen 'Network is down') ---------- + def _service_loop(self) -> None: + backoff = 0.5 + while self._run.is_set(): + # Bus öffnen, wenn IF up ist + if self.bus is None: + if link_state(self.interface) == "UP": + try: + self._open_bus() + backoff = 0.5 + except Exception as e: + self.log.warning("Bus open failed: %s", e) + time.sleep(backoff) + backoff = min(5.0, backoff * 1.7) + continue + else: + time.sleep(0.5) + continue + + # RX: eigene Poll‑Schleife statt Notifier + try: + msg = self.bus.recv(0.05) # blocking short timeout + if msg is not None: + self._handle(msg) + except (can.CanOperationError, OSError): + # IF ging down -> Bus schließen und später neu öffnen + self.log.info("CAN went DOWN — closing bus, will retry…") + self._close_bus() + time.sleep(0.5) + except Exception as e: + self.log.warning("CAN recv error: %s", e) + time.sleep(0.1) + + # ---------- Message-Handler ---------- + def _handle(self, msg: can.Message) -> None: + if msg.is_extended_id or msg.arbitration_id != OBD_REQ_ID: + return + data = bytes(msg.data) + if len(data) < 3: + return + + # tolerant: 02 01 ... oder 01 ... + if data[0] == 0x02 and len(data) >= 3: + mode, pid = data[1], data[2] + else: + mode, pid = data[0], (data[1] if len(data) >= 2 else None) + + if mode != 0x01 or pid is None: + return + + provider = self.providers.get(pid) + if not provider: + return + + try: + payload = provider() + if not isinstance(payload, (bytes, bytearray)) or len(payload) != 8: + return + out = can.Message( + arbitration_id=self.resp_id, + is_extended_id=False, + data=payload, + dlc=8 + ) + if self.bus: + self.bus.send(out) + except can.CanError: + self.log.warning("CAN send failed (bus off?)") + except Exception as e: + self.log.exception("Provider error: %s", e) + +# --- Ende Patch ObdResponder ----------------------------------------------- + +# Helfer fürs Formatieren +def make_speed_response(speed_kmh: int) -> bytes: + A = max(0, min(255, int(speed_kmh))) + return bytes([0x03, 0x41, PID_SPEED, A, 0x00, 0x00, 0x00, 0x00]) + +def make_rpm_response(rpm: int) -> bytes: + raw = max(0, int(rpm)) * 4 + A = (raw >> 8) & 0xFF + B = raw & 0xFF + return bytes([0x04, 0x41, PID_RPM, A, B, 0x00, 0x00, 0x00]) diff --git a/app/config.py b/app/config.py new file mode 100644 index 0000000..b270976 --- /dev/null +++ b/app/config.py @@ -0,0 +1,62 @@ +import json +import logging +import os +from logging.handlers import RotatingFileHandler +from pathlib import Path + +APP_ROOT = Path(__file__).resolve().parents[1] +SETTINGS_PATH = APP_ROOT / "settings.json" + +DEFAULTS = { + "can": { + "interface": "vcan0", + "resp_id": "0x7E8", + "timeout_ms": 200 + }, + "ui": { + "font_family": "DejaVu Sans", + "font_size": 10, + "window": { + "width": 1100, + "height": 720 + } + }, + "logging": { + "level": "INFO", + "file": "logs/app.log" + } +} + +def load_settings(): + cfg = DEFAULTS.copy() + if SETTINGS_PATH.exists(): + try: + with open(SETTINGS_PATH, "r", encoding="utf-8") as f: + data = json.load(f) + # shallow merge + for k, v in data.items(): + if isinstance(v, dict) and k in cfg: + cfg[k].update(v) + else: + cfg[k] = v + except Exception as e: + print("WARN: konnte settings.json nicht laden:", e) + return cfg + +def setup_logging(cfg): + level = getattr(logging, cfg["logging"].get("level", "INFO").upper(), logging.INFO) + log_file = cfg["logging"].get("file", "logs/app.log") + log_path = (APP_ROOT / log_file).resolve() + log_path.parent.mkdir(parents=True, exist_ok=True) + logger = logging.getLogger("configapp") + logger.setLevel(level) + + handler = RotatingFileHandler(log_path, maxBytes=1_000_000, backupCount=2, encoding="utf-8") + fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s") + handler.setFormatter(fmt) + logger.addHandler(handler) + + sh = logging.StreamHandler() + sh.setFormatter(fmt) + logger.addHandler(sh) + return logger diff --git a/app/gui.py b/app/gui.py new file mode 100644 index 0000000..e2379d3 --- /dev/null +++ b/app/gui.py @@ -0,0 +1,440 @@ +# gui.py — Tk-App mit Interface-Dropdown, Link Up/Down, Settings-View/Save + CAN-Trace +from __future__ import annotations +import json +import threading +import time +import tkinter as tk +from tkinter import ttk, messagebox +from collections import deque, defaultdict + +import can # nur für Trace-Reader + +from .config import load_settings, setup_logging, SETTINGS_PATH, APP_ROOT +from .simulator import EcuState, DrivelineModel +from .can import ( + ObdResponder, make_speed_response, make_rpm_response, + list_can_ifaces, link_up, link_down, + have_cap_netadmin, link_state, link_kind +) + + +# ---------- kleine Trace-Helfer ---------- +class TraceCollector: + """ + Liest mit eigenem BufferedReader vom SocketCAN und sammelt Frames. + - stream_buffer: deque mit (ts, id, dlc, data_bytes) + - aggregate: dict[(id, dir)] -> {count, last_ts, last_data} + """ + def __init__(self, channel: str): + self.channel = channel + self.bus = None + self._run = threading.Event(); self._run.set() + self._thread = threading.Thread(target=self._rx_loop, name="CAN-TRACE", daemon=True) + self.stream_buffer = deque(maxlen=2000) + self.lock = threading.Lock() + + def _open(self): + self._close() + self.bus = can.interface.Bus(channel=self.channel, interface="socketcan") + + def _close(self): + try: + if self.bus: self.bus.shutdown() + except Exception: pass + self.bus = None + + def start(self): + self._thread.start() + + def stop(self): + self._run.clear() + try: self._thread.join(timeout=1.0) + except RuntimeError: pass + self._close() + + def _rx_loop(self): + backoff = 0.5 + while self._run.is_set(): + if self.bus is None: + if link_state(self.channel) == "UP": + try: + self._open() + backoff = 0.5 + except Exception: + time.sleep(backoff); backoff = min(5.0, backoff*1.7) + continue + else: + time.sleep(0.5); continue + try: + msg = self.bus.recv(0.05) + if msg and not msg.is_error_frame and not msg.is_remote_frame: + ts = time.time() + with self.lock: + self.stream_buffer.append((ts, msg.arbitration_id, msg.dlc, bytes(msg.data))) + except (can.CanOperationError, OSError): + # IF down → ruhig schließen, kein Traceback + self._close() + time.sleep(0.5) + except Exception: + time.sleep(0.05) + + def snapshot_stream(self): + with self.lock: + return list(self.stream_buffer) + + +def launch_gui(): + cfg = load_settings() + logger = setup_logging(cfg) + + # read config values + can_iface = (cfg.get("can", {}).get("interface")) or "can0" + resp_id_raw = (cfg.get("can", {}).get("resp_id")) or "0x7E8" + try: + resp_id = int(resp_id_raw, 16) if isinstance(resp_id_raw, str) else int(resp_id_raw) + except Exception: + resp_id = 0x7E8 + timeout_ms = cfg.get("can", {}).get("timeout_ms", 200) + bitrate = cfg.get("can", {}).get("baudrate", 500000) + + ecu = EcuState(DrivelineModel()) + responder = ObdResponder(interface=can_iface, resp_id=resp_id, timeout_ms=timeout_ms, logger=logger) + + # register providers + responder.register_pid(0x0D, lambda: make_speed_response(int(round(ecu.snapshot()[3])))) + responder.register_pid(0x0C, lambda: make_rpm_response(int(ecu.snapshot()[2]))) + + # physics thread + running = True + def physics_loop(): + while running: + ecu.update() + time.sleep(0.02) + t = threading.Thread(target=physics_loop, daemon=True) + t.start() + + # Trace-Collector (eigener Bus, hört alles auf can_iface) + tracer = TraceCollector(can_iface) + tracer.start() + + # --- Tk UI --- + root = tk.Tk() + root.title("OBD-II ECU Simulator – SocketCAN") + + # window size from cfg + try: + w = int(cfg["ui"]["window"]["width"]); h = int(cfg["ui"]["window"]["height"]) + root.geometry(f"{w}x{h}") + except Exception: + pass + + # fonts/styles + family = cfg.get("ui", {}).get("font_family", "TkDefaultFont") + size = int(cfg.get("ui", {}).get("font_size", 10)) + style = ttk.Style() + style.configure("TLabel", font=(family, size)) + style.configure("Header.TLabel", font=(family, size+2, "bold")) + style.configure("TButton", font=(family, size)) + + # layout + root.columnconfigure(0, weight=1); root.rowconfigure(0, weight=1) + main = ttk.Frame(root, padding=10); main.grid(row=0, column=0, sticky="nsew") + main.columnconfigure(1, weight=1) + + # === Controls: Gear + Throttle === + ttk.Label(main, text="Gang").grid(row=0, column=0, sticky="w") + gear_var = tk.IntVar(value=0) + gear_box = ttk.Combobox(main, textvariable=gear_var, state="readonly", values=[0,1,2,3,4,5,6], width=5) + gear_box.grid(row=0, column=1, sticky="w", padx=(6,12)) + gear_box.bind("<>", lambda _e: ecu.set_gear(gear_var.get())) + + ttk.Label(main, text="Gas (%)").grid(row=1, column=0, sticky="w") + thr = ttk.Scale(main, from_=0, to=100, orient="horizontal", + command=lambda v: ecu.set_throttle(int(float(v)))) + thr.set(0) + thr.grid(row=1, column=1, sticky="ew", padx=(6,12)) + + lbl_speed = ttk.Label(main, text="Speed: 0 km/h", style="Header.TLabel") + lbl_rpm = ttk.Label(main, text="RPM: 0") + lbl_speed.grid(row=2, column=0, columnspan=2, sticky="w", pady=(10,0)) + lbl_rpm.grid(row=3, column=0, columnspan=2, sticky="w") + + # === CAN Panel === + sep = ttk.Separator(main); sep.grid(row=4, column=0, columnspan=2, sticky="ew", pady=(10,10)) + + can_frame = ttk.LabelFrame(main, text="CAN & Settings", padding=10) + can_frame.grid(row=5, column=0, columnspan=2, sticky="nsew") + can_frame.columnconfigure(1, weight=1) + + ttk.Label(can_frame, text="Interface").grid(row=0, column=0, sticky="w") + iface_var = tk.StringVar(value=can_iface) + iface_list = list_can_ifaces() or [can_iface] + iface_dd = ttk.Combobox(can_frame, textvariable=iface_var, values=iface_list, state="readonly", width=12) + iface_dd.grid(row=0, column=1, sticky="w", padx=(6,12)) + + def refresh_ifaces(): + lst = list_can_ifaces() + if not lst: + messagebox.showwarning("Interfaces", "Keine can*/vcan* Interfaces gefunden.") + return + iface_dd.config(values=lst) + ttk.Button(can_frame, text="Refresh", command=refresh_ifaces).grid(row=0, column=2, padx=4) + + ttk.Label(can_frame, text="RESP-ID (hex)").grid(row=1, column=0, sticky="w") + resp_var = tk.StringVar(value=f"0x{resp_id:03X}") + resp_entry = ttk.Entry(can_frame, textvariable=resp_var, width=10) + resp_entry.grid(row=1, column=1, sticky="w", padx=(6,12)) + + ttk.Label(can_frame, text="Timeout (ms)").grid(row=2, column=0, sticky="w") + to_var = tk.IntVar(value=int(timeout_ms)) + to_spin = ttk.Spinbox(can_frame, from_=10, to=5000, increment=10, textvariable=to_var, width=8) + to_spin.grid(row=2, column=1, sticky="w", padx=(6,12)) + + ttk.Label(can_frame, text="Bitrate").grid(row=3, column=0, sticky="w") + br_var = tk.IntVar(value=int(bitrate)) + br_spin = ttk.Spinbox(can_frame, from_=20000, to=1000000, increment=10000, textvariable=br_var, width=10) + br_spin.grid(row=3, column=1, sticky="w", padx=(6,12)) + + # unter Bitrate-Spinbox + set_params = tk.BooleanVar(value=True) + ttk.Checkbutton(can_frame, text="Bitrate beim UP setzen", variable=set_params).grid(row=3, column=2, sticky="w") + + # add Kind-Anzeige + kind_label = ttk.Label(can_frame, text=f"Kind: {link_kind(can_iface)}") + kind_label.grid(row=0, column=3, sticky="w", padx=(12,0)) + + # Link control + def do_link_up(): + try: + # Kind-Anzeige aktualisieren (falls Interface gewechselt) + kind_label.config(text=f"Kind: {link_kind(iface_var.get())}") + + if link_state(iface_var.get()) == "UP": + messagebox.showinfo("CAN", f"{iface_var.get()} ist bereits UP") + return + # NEU: set_params aus Checkbox + link_up(iface_var.get(), bitrate=br_var.get(), fd=False, set_params=set_params.get()) + msg = f"{iface_var.get()} ist UP" + if set_params.get(): + msg += f" @ {br_var.get()} bit/s (falls vom Treiber unterstützt)" + else: + msg += " (Bitrate unverändert)" + messagebox.showinfo("CAN", msg) + except PermissionError as e: + messagebox.showerror("Berechtigung", str(e)) + except Exception as e: + messagebox.showerror("CAN", f"Link UP fehlgeschlagen:\n{e}") + + def do_link_down(): + try: + if link_state(iface_var.get()) == "DOWN": + messagebox.showinfo("CAN", f"{iface_var.get()} ist bereits DOWN") + return + link_down(iface_var.get()) + messagebox.showinfo("CAN", f"{iface_var.get()} ist DOWN") + except PermissionError as e: + messagebox.showerror("Berechtigung", str(e)) + except Exception as e: + messagebox.showerror("CAN", f"Link DOWN fehlgeschlagen:\n{e}") + + btn_up = ttk.Button(can_frame, text="Link UP", command=do_link_up) + btn_down = ttk.Button(can_frame, text="Link DOWN", command=do_link_down) + btn_up.grid(row=4, column=0, pady=(8,0), sticky="w") + btn_down.grid(row=4, column=1, pady=(8,0), sticky="w") + + # Rebind responder + def do_rebind(): + nonlocal can_iface, resp_id, timeout_ms, bitrate, tracer + can_iface = iface_var.get() + try: + new_resp = int(resp_var.get(), 16) + except Exception: + messagebox.showerror("RESP-ID", "Bitte gültige Hex-Zahl, z.B. 0x7E8") + return + resp_id = new_resp + timeout_ms = to_var.get() + bitrate = br_var.get() + try: + responder.rebind(interface=can_iface, resp_id=resp_id) + # Trace-Collector auf neues IF neu binden + try: + tracer.stop() + except Exception: + pass + tracer = TraceCollector(can_iface) + tracer.start() + messagebox.showinfo("CAN", f"Responder neu gebunden: {can_iface}, RESP 0x{resp_id:03X}") + except Exception as e: + messagebox.showerror("CAN", f"Rebind fehlgeschlagen:\n{e}") + + ttk.Button(can_frame, text="Responder Rebind", command=do_rebind).grid(row=4, column=2, pady=(8,0), sticky="w") + + # CAP-Status + caps_ok = have_cap_netadmin() + cap_label = ttk.Label(can_frame, text=f"CAP_NET_ADMIN: {'yes' if caps_ok else 'no'}") + cap_label.grid(row=6, column=0, columnspan=2, sticky="w", pady=(6,0)) + if not caps_ok: + btn_up.state(["disabled"]); btn_down.state(["disabled"]) + + # Statusbar + status = ttk.Label(main, text=f"CAN: {can_iface} | RESP-ID: 0x{resp_id:03X}", relief="sunken", anchor="w") + status.grid(row=6, column=0, columnspan=2, sticky="ew", pady=(10,0)) + + # === TRACE-FENSTER (unten) === + trace_frame = ttk.LabelFrame(root, text="CAN Trace", padding=6) + trace_frame.grid(row=1, column=0, sticky="nsew") + root.rowconfigure(1, weight=1) + trace_frame.columnconfigure(0, weight=1) + trace_frame.rowconfigure(1, weight=1) + + # Controls: Mode, Pause, Clear, Autoscroll + ctrl = ttk.Frame(trace_frame) + ctrl.grid(row=0, column=0, sticky="ew", pady=(0,4)) + ctrl.columnconfigure(5, weight=1) + + mode_var = tk.StringVar(value="stream") # "stream" | "aggregate" + ttk.Label(ctrl, text="Modus:").grid(row=0, column=0, sticky="w") + mode_dd = ttk.Combobox(ctrl, textvariable=mode_var, state="readonly", width=10, + values=["stream", "aggregate"]) + mode_dd.grid(row=0, column=1, sticky="w", padx=(4,12)) + + paused = tk.BooleanVar(value=False) + ttk.Checkbutton(ctrl, text="Pause", variable=paused).grid(row=0, column=2, sticky="w") + + autoscroll = tk.BooleanVar(value=True) + ttk.Checkbutton(ctrl, text="Auto-Scroll", variable=autoscroll).grid(row=0, column=3, sticky="w") + + def do_clear(): + nonlocal aggregate_cache + tree.delete(*tree.get_children()) + aggregate_cache.clear() + ttk.Button(ctrl, text="Clear", command=do_clear).grid(row=0, column=4, padx=(8,0), sticky="w") + + # Treeview + cols_stream = ("time", "dir", "id", "dlc", "data") + cols_agg = ("id", "dir", "count", "last_time", "last_dlc", "last_data") + + tree = ttk.Treeview(trace_frame, columns=cols_stream, show="headings", height=10) + tree.grid(row=1, column=0, sticky="nsew") + sb_y = ttk.Scrollbar(trace_frame, orient="vertical", command=tree.yview) + tree.configure(yscrollcommand=sb_y.set) + sb_y.grid(row=1, column=1, sticky="ns") + + def setup_columns(mode: str): + tree.delete(*tree.get_children()) + if mode == "stream": + tree.config(columns=cols_stream) + headings = [("time","Time"),("dir","Dir"),("id","ID"),("dlc","DLC"),("data","Data")] + widths = [140, 60, 90, 60, 520] + else: + tree.config(columns=cols_agg) + headings = [("id","ID"),("dir","Dir"),("count","Count"),("last_time","Last Time"),("last_dlc","DLC"),("last_data","Last Data")] + widths = [90, 60, 80, 140, 60, 520] + for (col, text), w in zip(headings, widths): + tree.heading(col, text=text) + tree.column(col, width=w, anchor="w") + setup_columns("stream") + + aggregate_cache: dict[tuple[int,str], dict] = {} + + def fmt_time(ts: float) -> str: + # hh:mm:ss.mmm + lt = time.localtime(ts) + return time.strftime("%H:%M:%S", lt) + f".{int((ts%1)*1000):03d}" + + def fmt_id(i: int) -> str: + return f"0x{i:03X}" + + def fmt_data(b: bytes) -> str: + return " ".join(f"{x:02X}" for x in b) + + # periodic UI update + last_index = 0 + def tick(): + nonlocal can_iface, resp_id, last_index + # Top-Status + g, tval, rpm, spd = ecu.snapshot() + caps = "CAP:yes" if have_cap_netadmin() else "CAP:no" + st = link_state(can_iface) + lbl_speed.config(text=f"Speed: {int(round(spd))} km/h") + lbl_rpm.config(text=f"RPM: {rpm}") + st = link_state(can_iface) + kd = link_kind(can_iface) + status.config(text=f"CAN: {can_iface}({st},{kd}) | RESP-ID: 0x{resp_id:03X} | Gear {g} | Throttle {tval}% | {caps}") + + + # Trace + if not paused.get(): + mode = mode_var.get() + if mode == "stream": + setup_columns("stream") if tree["columns"] != cols_stream else None + # append new items + buf = tracer.snapshot_stream() + # nur neue ab letztem Index + for ts, cid, dlc, data in buf[last_index:]: + # Richtung heuristisch + if cid == 0x7DF: + d = "RX" + elif cid == resp_id: + d = "TX" + else: + d = "?" + tree.insert("", "end", + values=(fmt_time(ts), d, fmt_id(cid), dlc, fmt_data(data))) + # autoscroll + if autoscroll.get() and buf[last_index:]: + tree.see(tree.get_children()[-1]) + last_index = len(buf) + else: + setup_columns("aggregate") if tree["columns"] != cols_agg else None + # baue Aggregat neu (leicht, schnell) + buf = tracer.snapshot_stream() + agg: dict[tuple[int,str], dict] = {} + for ts, cid, dlc, data in buf: + if cid == 0x7DF: + d = "RX" + elif cid == resp_id: + d = "TX" + else: + d = "?" + key = (cid, d) + entry = agg.get(key) + if entry is None: + agg[key] = {"count":1, "last_ts":ts, "last_dlc":dlc, "last_data":data} + else: + entry["count"] += 1 + if ts >= entry["last_ts"]: + entry["last_ts"] = ts + entry["last_dlc"] = dlc + entry["last_data"] = data + # nur neu zeichnen, wenn sich was ändert + if agg != aggregate_cache: + tree.delete(*tree.get_children()) + # sortiert nach ID, RX vor TX + for (cid, d) in sorted(agg.keys(), key=lambda k:(k[0], 0 if k[1]=="RX" else 1)): + e = agg[(cid,d)] + tree.insert("", "end", + values=(fmt_id(cid), d, e["count"], + fmt_time(e["last_ts"]), + e["last_dlc"], fmt_data(e["last_data"]))) + aggregate_cache.clear() + aggregate_cache.update(agg) + + root.after(50, tick) + + tick() + + def on_close(): + nonlocal running + running = False + try: + tracer.stop() + except Exception: + pass + try: + responder.stop() + finally: + root.destroy() + + root.protocol("WM_DELETE_WINDOW", on_close) + root.mainloop() diff --git a/app/simulator.py b/app/simulator.py new file mode 100644 index 0000000..3d7968c --- /dev/null +++ b/app/simulator.py @@ -0,0 +1,66 @@ +# simulator.py — Driveline & ECU-State +from __future__ import annotations +import threading +import time +from dataclasses import dataclass + +@dataclass +class DrivelineModel: + idle_rpm: int = 1400 + max_rpm: int = 9500 + kmh_per_krpm: tuple = (0.0, 12.0, 19.0, 25.0, 32.0, 38.0, 45.0) + rpm_rise_per_s: int = 5000 + rpm_fall_per_s: int = 3500 + + def target_rpm_from_throttle(self, throttle_pct: int) -> int: + t = max(0, min(100, throttle_pct)) / 100.0 + return int(self.idle_rpm + t * (self.max_rpm - self.idle_rpm)) + + def speed_from_rpm_gear(self, rpm: int, gear: int) -> float: + if gear <= 0: + return 0.0 + k = self.kmh_per_krpm[min(gear, len(self.kmh_per_krpm) - 1)] + return (rpm / 1000.0) * k + +class EcuState: + """Thread-sichere Zustandsmaschine (Gang, Gas, RPM, Speed).""" + def __init__(self, model: DrivelineModel | None = None) -> None: + self.model = model or DrivelineModel() + self._lock = threading.Lock() + self._gear = 0 + self._throttle = 0 + self._rpm = self.model.idle_rpm + self._speed = 0.0 + self._last = time.monotonic() + + def set_gear(self, gear: int) -> None: + with self._lock: + self._gear = max(0, min(6, int(gear))) + + def set_throttle(self, thr: int) -> None: + with self._lock: + self._throttle = max(0, min(100, int(thr))) + + def snapshot(self) -> tuple[int, int, int, float]: + with self._lock: + return self._gear, self._throttle, self._rpm, self._speed + + def update(self) -> None: + now = time.monotonic() + dt = max(0.0, min(0.1, now - self._last)) + self._last = now + with self._lock: + target = self.model.target_rpm_from_throttle(self._throttle) + if self._rpm < target: + self._rpm = min(self._rpm + int(self.model.rpm_rise_per_s * dt), target) + else: + self._rpm = max(self._rpm - int(self.model.rpm_fall_per_s * dt), target) + min_idle = 800 if self._gear == 0 and self._throttle == 0 else self.model.idle_rpm + self._rpm = max(min_idle, min(self._rpm, self.model.max_rpm)) + + target_speed = self.model.speed_from_rpm_gear(self._rpm, self._gear) + alpha = min(1.0, 4.0 * dt) + if self._gear == 0: + target_speed = 0.0 + self._speed = (1 - alpha) * self._speed + alpha * target_speed + self._speed = max(0.0, min(self._speed, 299.0)) diff --git a/logs/.gitkeep b/logs/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/main.py b/main.py new file mode 100644 index 0000000..e6a11e7 --- /dev/null +++ b/main.py @@ -0,0 +1,4 @@ +from app.gui import launch_gui + +if __name__ == "__main__": + launch_gui() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..b301fd2 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +python-can +pyroute2 diff --git a/start.sh b/start.sh new file mode 100755 index 0000000..9f03fd1 --- /dev/null +++ b/start.sh @@ -0,0 +1,18 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Choose python (allow override with $PYTHON) +PYTHON_BIN="${PYTHON:-python3}" +VENV_DIR=".venv" + +if [ ! -d "$VENV_DIR" ]; then + "$PYTHON_BIN" -m venv "$VENV_DIR" +fi + +# shellcheck disable=SC1091 +source "$VENV_DIR/bin/activate" + +python -m pip install --upgrade pip +pip install -r requirements.txt + +exec python main.py "$@"