first commit

This commit is contained in:
2025-08-24 23:46:59 +02:00
commit 8195e570b3
11 changed files with 1077 additions and 0 deletions

45
.gitignore vendored Normal file
View File

@@ -0,0 +1,45 @@
# Python
__pycache__/
*.py[cod]
*.pyo
*.pyd
*.pkl
*.pklz
*.egg-info/
*.egg
*.manifest
*.spec
# Build
build/
dist/
.eggs/
# Logs (Ordner behalten, Dateien ignorieren)
logs/*
!logs/.gitkeep
*.log
# Virtual Environments
venv/
.env/
.venv/
# System-Dateien
.DS_Store
Thumbs.db
# Backup-Dateien
*.bak
*.tmp
*.swp
*.swo
# Editor/IDE
.vscode/
.idea/
# Projekt-spezifische
settings.json
settings.json.bak
tmp/

137
README.md Normal file
View File

@@ -0,0 +1,137 @@
# OBD-II ECU Simulator für SocketCAN
Dieses Projekt simuliert ein OBD-II Steuergerät (ECU) über SocketCAN auf Linux.
Es eignet sich, um Embedded-Hardware (z. B. den Kettenöler) zu testen, ohne ein echtes Fahrzeug anschließen zu müssen.
## Features
- **OBD-II Responder (11-bit IDs, ISO 15765-4)**
- Unterstützt Mode 01 / PID `0x0D` (Geschwindigkeit)
- Unterstützt Mode 01 / PID `0x0C` (Motordrehzahl)
- **Tkinter-GUI**
- Gangwahl (N/0 6)
- Gasregler (0100 %)
- Anzeige von Geschwindigkeit & Drehzahl (berechnet über einfaches Antriebsmodell)
- CAN-Interface Auswahl, Bitrate, Timeout, RESP-ID
- Link-Up/Down Steuerung direkt in der GUI
- Settings in `settings.json` speicherbar
- **Trace-Fenster**: ähnlich CANalyzer Light
- Stream-Modus (alle Frames)
- Aggregat-Modus (eine Zeile pro ID+Richtung)
- **Robust gegen Interface-Down/Up**
- Erkennt automatisch, wenn `can0` (oder anderes IF) Down geht
- Öffnet Bus neu, sobald wieder Up
## Voraussetzungen
- Linux mit SocketCAN Support
- Python ≥ 3.10
- Pakete:
```bash
pip install -r requirements.txt
````
(enthält `python-can`, `pyroute2`, `typing_extensions`, `wrapt`, `packaging`)
* Ein CAN-Interface (z. B. [CANable](https://canable.io/), `gs_usb`, Peak, vcan)
## Installation & Start
```bash
git clone <repo>
cd Kettenöler-Testsoftware
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
./start.sh
```
Das GUI startet und öffnet automatisch den Responder.
Über die Buttons „Link UP/DOWN“ kann das CAN-Interface hoch-/runtergefahren werden.
## Rechte / Berechtigungen
Um CAN-Links ohne `sudo` hoch/runter zu setzen, benötigt der Python-Interpreter die Capability `CAP_NET_ADMIN`:
```bash
sudo setcap cap_net_admin,cap_net_raw=eip "$(readlink -f .venv/bin/python)"
getcap "$(readlink -f .venv/bin/python)"
```
Alternativ: App mit `sudo ./start.sh` starten.
## Nutzung
1. **Interface vorbereiten**
Klassisches CAN-Interface:
```bash
sudo ip link set can0 down
sudo ip link set can0 type can bitrate 500000
sudo ip link set can0 up
```
Virtuelles Interface (nur Softwaretests):
```bash
sudo modprobe vcan
sudo ip link add dev vcan0 type vcan
sudo ip link set vcan0 up
```
2. **Simulator starten**
GUI öffnen (`./start.sh`).
3. **Test mit can-utils**
Terminal 1:
```bash
candump can0
```
Terminal 2: Anfrage nach Geschwindigkeit:
```bash
cansend can0 7DF#02010D0000000000
```
Erwartung:
* Request `7DF`
* Response `7E8` mit `03 41 0D <speed>`
Anfrage nach Drehzahl:
```bash
cansend can0 7DF#02010C0000000000
```
4. **Trace im GUI**
* „Stream“: zeigt alle Frames einzeln
* „Aggregate“: fasst pro CAN-ID zusammen (Count, Last Data)
## Projektstruktur
```
main.py Startpunkt
app/
├─ gui.py Tkinter GUI
├─ can.py CAN-Responder + Link-Control (pyroute2)
├─ simulator.py Physikmodell (Gang + Gas → Geschwindigkeit/RPM)
└─ config.py Settings + Logging
settings.json Konfigurationsdatei (wird beim Speichern erzeugt)
```
## Bekannte Einschränkungen
* Nur wenige PIDs implementiert (0x0C, 0x0D).
* Antwort immer mit fixer DLC=8.
* Einfaches Driveline-Modell (keine realistische Fahrzeugphysik).
* Trace-Fenster ist eine Light-Variante, kein vollwertiger CANalyzer.
## Lizenz
\[MIT] oder \[GPLv3] bitte je nach Projektziel eintragen.

0
app/__init__.py Normal file
View File

303
app/can.py Normal file
View File

@@ -0,0 +1,303 @@
# can.py — SocketCAN OBD-II Responder + Link-Control
from __future__ import annotations
import logging
import threading
import time
import os
import sys
import subprocess
from typing import Callable, Dict, Optional, List
from pyroute2 import IPRoute, NetlinkError
import json
import can
OBD_REQ_ID = 0x7DF
PID_SPEED = 0x0D
PID_RPM = 0x0C
CAP_NET_ADMIN_BIT = 12 # Linux CAP_NET_ADMIN
def have_cap_netadmin() -> bool:
if os.geteuid() == 0:
return True
try:
with open("/proc/self/status", "r", encoding="utf-8") as f:
for line in f:
if line.startswith("CapEff:"):
mask = int(line.split()[1], 16)
return bool(mask & (1 << CAP_NET_ADMIN_BIT))
except Exception:
pass
return False
def need_caps_message() -> str:
exe = os.path.realpath(sys.executable)
return (
"Keine Berechtigung für 'ip link'.\n\n"
"Option A) Als root starten (sudo)\n"
"Option B) Capabilities auf das laufende Python setzen:\n"
f" sudo setcap cap_net_admin,cap_net_raw=eip \"{exe}\"\n"
f" getcap \"{exe}\"\n"
)
def list_can_ifaces() -> List[str]:
"""Listet verfügbare CAN/vCAN-Interfaces via `ip -json link`."""
try:
out = subprocess.check_output(["ip", "-json", "link"], text=True)
import json
items = json.loads(out)
names = [i["ifname"] for i in items if i.get("link_type") in ("can", None) and (i["ifname"].startswith("can") or i["ifname"].startswith("vcan"))]
return sorted(set(names))
except Exception:
# Fallback: /sys/class/net
try:
import os
names = [n for n in os.listdir("/sys/class/net") if n.startswith(("can", "vcan"))]
return sorted(set(names))
except Exception:
return []
def _link_info(ipr: IPRoute, iface: str):
idxs = ipr.link_lookup(ifname=iface)
if not idxs:
raise RuntimeError(f"Interface '{iface}' nicht gefunden")
info = ipr.get_links(idxs[0])[0]
kind = "none"
for k, v in info.get("attrs", []):
if k == "IFLA_LINKINFO":
for kk, vv in v.get("attrs", []):
if kk == "IFLA_INFO_KIND":
kind = str(vv)
break
return idxs[0], kind, info
def link_state(iface: str) -> str:
try:
with IPRoute() as ipr:
idx, _, info = _link_info(ipr, iface)
st = info.get("state")
if isinstance(st, str):
return st.upper()
for k, v in info.get("attrs", []):
if k == "IFLA_OPERSTATE":
return str(v).upper()
except Exception:
pass
return "UNKNOWN"
def link_up(iface: str, bitrate: int = 500000, fd: bool = False, set_params: bool = True) -> None:
with IPRoute() as ipr:
idx, kind, _ = _link_info(ipr, iface)
# Wenn bereits UP: nichts tun.
try:
cur = ipr.get_links(idx)[0]
if (cur.get("state") or "").upper() == "UP":
return
except Exception:
pass
# 1) down (ignoriere "invalid argument" / already down)
try:
ipr.link("set", index=idx, state="down")
except NetlinkError as e:
if e.code not in (0, 22):
raise RuntimeError(f"Netlink error bei '{iface}' (down): {e.code} {e}") from e
# 2) optional: Parameter nur bei 'can' EINVAL ignorieren
if set_params and kind == "can":
try:
ipr.link("set", index=idx, kind="can", data={"bitrate": int(bitrate)})
except NetlinkError as e:
if e.code != 22: # alles außer EINVAL weiterreichen
raise RuntimeError(f"Netlink error bei '{iface}' (bitrate): {e.code} {e}") from e
# EINVAL -> Treiber mag Param-Change jetzt nicht -> einfach weitermachen
# 3) up (hier darf es notfalls knallen)
try:
ipr.link("set", index=idx, state="up")
except NetlinkError as e:
raise RuntimeError(f"Netlink error bei '{iface}' (up): {e.code} {e}") from e
def link_down(iface: str) -> None:
with IPRoute() as ipr:
idx, _, _ = _link_info(ipr, iface)
try:
ipr.link("set", index=idx, state="down")
except NetlinkError as e:
if e.code not in (0, 22): # ignore EINVAL/OK
raise RuntimeError(f"Netlink error bei '{iface}' (down): {e.code} {e}") from e
def link_kind(iface: str) -> str:
"""liefert nur das INFO_KIND (z.B. 'can', 'slcan', 'vcan', 'none')"""
try:
with IPRoute() as ipr:
_, kind, _ = _link_info(ipr, iface)
return kind
except Exception:
return "unknown"
class ObdResponder:
"""
OBD-II Mode-01 PID-Responder über SocketCAN (11-bit).
Non-blocking, threadsicher, mit Rebind-Funktion und robustem Reopen,
falls das Interface DOWN ist.
"""
def __init__(
self,
interface: str,
resp_id: int,
timeout_ms: int = 200,
logger: Optional[logging.Logger] = None,
):
self.interface = interface
self.resp_id = resp_id
self.timeout_ms = timeout_ms
self.log = logger or logging.getLogger("obdcan")
# PID-Provider: pid -> callable() -> 8-Byte-Payload
self.providers: Dict[int, Callable[[], bytes]] = {}
# Laufzustand / CAN-Ressourcen
self._run = threading.Event()
self._run.set()
self.bus: Optional[can.BusABC] = None
self.reader: Optional[can.BufferedReader] = None
self.notifier: Optional[can.Notifier] = None
# Service-Thread, der bei IF=UP den Bus öffnet und RX abwickelt
self._thread = threading.Thread(
target=self._service_loop, name="OBD-SVC", daemon=True
)
self._thread.start()
# ---------- Lifecycle ----------
def _open_bus(self) -> None:
# ggf. alte Ressourcen schließen, dann neu öffnen
self._close_bus()
self._close_bus()
self.bus = can.interface.Bus(channel=self.interface, interface="socketcan")
self.log.info("OBD responder started on %s (resp_id=0x%03X)", self.interface, self.resp_id)
self.log.info(
"OBD responder started on %s (resp_id=0x%03X)",
self.interface, self.resp_id
)
def _close_bus(self) -> None:
try:
if self.notifier:
self.notifier.stop()
except Exception:
pass
try:
if self.bus:
self.bus.shutdown()
except Exception:
pass
self.bus = None
def stop(self) -> None:
self._run.clear()
try:
self._thread.join(timeout=1.0)
except RuntimeError:
pass
self._close_bus()
def rebind(self, interface: Optional[str] = None, resp_id: Optional[int] = None) -> None:
if interface is not None:
self.interface = interface
if resp_id is not None:
self.resp_id = resp_id
# Bus schließen; Service-Loop öffnet ihn wieder, sobald IF=UP ist
self._close_bus()
self.log.info("Rebind requested: %s, resp=0x%03X", self.interface, self.resp_id)
# ---------- Öffentliche API ----------
def register_pid(self, pid: int, provider: Callable[[], bytes]) -> None:
self.providers[pid] = provider
# ---------- Service-Loop (robust gegen 'Network is down') ----------
def _service_loop(self) -> None:
backoff = 0.5
while self._run.is_set():
# Bus öffnen, wenn IF up ist
if self.bus is None:
if link_state(self.interface) == "UP":
try:
self._open_bus()
backoff = 0.5
except Exception as e:
self.log.warning("Bus open failed: %s", e)
time.sleep(backoff)
backoff = min(5.0, backoff * 1.7)
continue
else:
time.sleep(0.5)
continue
# RX: eigene PollSchleife statt Notifier
try:
msg = self.bus.recv(0.05) # blocking short timeout
if msg is not None:
self._handle(msg)
except (can.CanOperationError, OSError):
# IF ging down -> Bus schließen und später neu öffnen
self.log.info("CAN went DOWN — closing bus, will retry…")
self._close_bus()
time.sleep(0.5)
except Exception as e:
self.log.warning("CAN recv error: %s", e)
time.sleep(0.1)
# ---------- Message-Handler ----------
def _handle(self, msg: can.Message) -> None:
if msg.is_extended_id or msg.arbitration_id != OBD_REQ_ID:
return
data = bytes(msg.data)
if len(data) < 3:
return
# tolerant: 02 01 <pid> ... oder 01 <pid> ...
if data[0] == 0x02 and len(data) >= 3:
mode, pid = data[1], data[2]
else:
mode, pid = data[0], (data[1] if len(data) >= 2 else None)
if mode != 0x01 or pid is None:
return
provider = self.providers.get(pid)
if not provider:
return
try:
payload = provider()
if not isinstance(payload, (bytes, bytearray)) or len(payload) != 8:
return
out = can.Message(
arbitration_id=self.resp_id,
is_extended_id=False,
data=payload,
dlc=8
)
if self.bus:
self.bus.send(out)
except can.CanError:
self.log.warning("CAN send failed (bus off?)")
except Exception as e:
self.log.exception("Provider error: %s", e)
# --- Ende Patch ObdResponder -----------------------------------------------
# Helfer fürs Formatieren
def make_speed_response(speed_kmh: int) -> bytes:
A = max(0, min(255, int(speed_kmh)))
return bytes([0x03, 0x41, PID_SPEED, A, 0x00, 0x00, 0x00, 0x00])
def make_rpm_response(rpm: int) -> bytes:
raw = max(0, int(rpm)) * 4
A = (raw >> 8) & 0xFF
B = raw & 0xFF
return bytes([0x04, 0x41, PID_RPM, A, B, 0x00, 0x00, 0x00])

62
app/config.py Normal file
View File

@@ -0,0 +1,62 @@
import json
import logging
import os
from logging.handlers import RotatingFileHandler
from pathlib import Path
APP_ROOT = Path(__file__).resolve().parents[1]
SETTINGS_PATH = APP_ROOT / "settings.json"
DEFAULTS = {
"can": {
"interface": "vcan0",
"resp_id": "0x7E8",
"timeout_ms": 200
},
"ui": {
"font_family": "DejaVu Sans",
"font_size": 10,
"window": {
"width": 1100,
"height": 720
}
},
"logging": {
"level": "INFO",
"file": "logs/app.log"
}
}
def load_settings():
cfg = DEFAULTS.copy()
if SETTINGS_PATH.exists():
try:
with open(SETTINGS_PATH, "r", encoding="utf-8") as f:
data = json.load(f)
# shallow merge
for k, v in data.items():
if isinstance(v, dict) and k in cfg:
cfg[k].update(v)
else:
cfg[k] = v
except Exception as e:
print("WARN: konnte settings.json nicht laden:", e)
return cfg
def setup_logging(cfg):
level = getattr(logging, cfg["logging"].get("level", "INFO").upper(), logging.INFO)
log_file = cfg["logging"].get("file", "logs/app.log")
log_path = (APP_ROOT / log_file).resolve()
log_path.parent.mkdir(parents=True, exist_ok=True)
logger = logging.getLogger("configapp")
logger.setLevel(level)
handler = RotatingFileHandler(log_path, maxBytes=1_000_000, backupCount=2, encoding="utf-8")
fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
handler.setFormatter(fmt)
logger.addHandler(handler)
sh = logging.StreamHandler()
sh.setFormatter(fmt)
logger.addHandler(sh)
return logger

440
app/gui.py Normal file
View File

@@ -0,0 +1,440 @@
# gui.py — Tk-App mit Interface-Dropdown, Link Up/Down, Settings-View/Save + CAN-Trace
from __future__ import annotations
import json
import threading
import time
import tkinter as tk
from tkinter import ttk, messagebox
from collections import deque, defaultdict
import can # nur für Trace-Reader
from .config import load_settings, setup_logging, SETTINGS_PATH, APP_ROOT
from .simulator import EcuState, DrivelineModel
from .can import (
ObdResponder, make_speed_response, make_rpm_response,
list_can_ifaces, link_up, link_down,
have_cap_netadmin, link_state, link_kind
)
# ---------- kleine Trace-Helfer ----------
class TraceCollector:
"""
Liest mit eigenem BufferedReader vom SocketCAN und sammelt Frames.
- stream_buffer: deque mit (ts, id, dlc, data_bytes)
- aggregate: dict[(id, dir)] -> {count, last_ts, last_data}
"""
def __init__(self, channel: str):
self.channel = channel
self.bus = None
self._run = threading.Event(); self._run.set()
self._thread = threading.Thread(target=self._rx_loop, name="CAN-TRACE", daemon=True)
self.stream_buffer = deque(maxlen=2000)
self.lock = threading.Lock()
def _open(self):
self._close()
self.bus = can.interface.Bus(channel=self.channel, interface="socketcan")
def _close(self):
try:
if self.bus: self.bus.shutdown()
except Exception: pass
self.bus = None
def start(self):
self._thread.start()
def stop(self):
self._run.clear()
try: self._thread.join(timeout=1.0)
except RuntimeError: pass
self._close()
def _rx_loop(self):
backoff = 0.5
while self._run.is_set():
if self.bus is None:
if link_state(self.channel) == "UP":
try:
self._open()
backoff = 0.5
except Exception:
time.sleep(backoff); backoff = min(5.0, backoff*1.7)
continue
else:
time.sleep(0.5); continue
try:
msg = self.bus.recv(0.05)
if msg and not msg.is_error_frame and not msg.is_remote_frame:
ts = time.time()
with self.lock:
self.stream_buffer.append((ts, msg.arbitration_id, msg.dlc, bytes(msg.data)))
except (can.CanOperationError, OSError):
# IF down → ruhig schließen, kein Traceback
self._close()
time.sleep(0.5)
except Exception:
time.sleep(0.05)
def snapshot_stream(self):
with self.lock:
return list(self.stream_buffer)
def launch_gui():
cfg = load_settings()
logger = setup_logging(cfg)
# read config values
can_iface = (cfg.get("can", {}).get("interface")) or "can0"
resp_id_raw = (cfg.get("can", {}).get("resp_id")) or "0x7E8"
try:
resp_id = int(resp_id_raw, 16) if isinstance(resp_id_raw, str) else int(resp_id_raw)
except Exception:
resp_id = 0x7E8
timeout_ms = cfg.get("can", {}).get("timeout_ms", 200)
bitrate = cfg.get("can", {}).get("baudrate", 500000)
ecu = EcuState(DrivelineModel())
responder = ObdResponder(interface=can_iface, resp_id=resp_id, timeout_ms=timeout_ms, logger=logger)
# register providers
responder.register_pid(0x0D, lambda: make_speed_response(int(round(ecu.snapshot()[3]))))
responder.register_pid(0x0C, lambda: make_rpm_response(int(ecu.snapshot()[2])))
# physics thread
running = True
def physics_loop():
while running:
ecu.update()
time.sleep(0.02)
t = threading.Thread(target=physics_loop, daemon=True)
t.start()
# Trace-Collector (eigener Bus, hört alles auf can_iface)
tracer = TraceCollector(can_iface)
tracer.start()
# --- Tk UI ---
root = tk.Tk()
root.title("OBD-II ECU Simulator SocketCAN")
# window size from cfg
try:
w = int(cfg["ui"]["window"]["width"]); h = int(cfg["ui"]["window"]["height"])
root.geometry(f"{w}x{h}")
except Exception:
pass
# fonts/styles
family = cfg.get("ui", {}).get("font_family", "TkDefaultFont")
size = int(cfg.get("ui", {}).get("font_size", 10))
style = ttk.Style()
style.configure("TLabel", font=(family, size))
style.configure("Header.TLabel", font=(family, size+2, "bold"))
style.configure("TButton", font=(family, size))
# layout
root.columnconfigure(0, weight=1); root.rowconfigure(0, weight=1)
main = ttk.Frame(root, padding=10); main.grid(row=0, column=0, sticky="nsew")
main.columnconfigure(1, weight=1)
# === Controls: Gear + Throttle ===
ttk.Label(main, text="Gang").grid(row=0, column=0, sticky="w")
gear_var = tk.IntVar(value=0)
gear_box = ttk.Combobox(main, textvariable=gear_var, state="readonly", values=[0,1,2,3,4,5,6], width=5)
gear_box.grid(row=0, column=1, sticky="w", padx=(6,12))
gear_box.bind("<<ComboboxSelected>>", lambda _e: ecu.set_gear(gear_var.get()))
ttk.Label(main, text="Gas (%)").grid(row=1, column=0, sticky="w")
thr = ttk.Scale(main, from_=0, to=100, orient="horizontal",
command=lambda v: ecu.set_throttle(int(float(v))))
thr.set(0)
thr.grid(row=1, column=1, sticky="ew", padx=(6,12))
lbl_speed = ttk.Label(main, text="Speed: 0 km/h", style="Header.TLabel")
lbl_rpm = ttk.Label(main, text="RPM: 0")
lbl_speed.grid(row=2, column=0, columnspan=2, sticky="w", pady=(10,0))
lbl_rpm.grid(row=3, column=0, columnspan=2, sticky="w")
# === CAN Panel ===
sep = ttk.Separator(main); sep.grid(row=4, column=0, columnspan=2, sticky="ew", pady=(10,10))
can_frame = ttk.LabelFrame(main, text="CAN & Settings", padding=10)
can_frame.grid(row=5, column=0, columnspan=2, sticky="nsew")
can_frame.columnconfigure(1, weight=1)
ttk.Label(can_frame, text="Interface").grid(row=0, column=0, sticky="w")
iface_var = tk.StringVar(value=can_iface)
iface_list = list_can_ifaces() or [can_iface]
iface_dd = ttk.Combobox(can_frame, textvariable=iface_var, values=iface_list, state="readonly", width=12)
iface_dd.grid(row=0, column=1, sticky="w", padx=(6,12))
def refresh_ifaces():
lst = list_can_ifaces()
if not lst:
messagebox.showwarning("Interfaces", "Keine can*/vcan* Interfaces gefunden.")
return
iface_dd.config(values=lst)
ttk.Button(can_frame, text="Refresh", command=refresh_ifaces).grid(row=0, column=2, padx=4)
ttk.Label(can_frame, text="RESP-ID (hex)").grid(row=1, column=0, sticky="w")
resp_var = tk.StringVar(value=f"0x{resp_id:03X}")
resp_entry = ttk.Entry(can_frame, textvariable=resp_var, width=10)
resp_entry.grid(row=1, column=1, sticky="w", padx=(6,12))
ttk.Label(can_frame, text="Timeout (ms)").grid(row=2, column=0, sticky="w")
to_var = tk.IntVar(value=int(timeout_ms))
to_spin = ttk.Spinbox(can_frame, from_=10, to=5000, increment=10, textvariable=to_var, width=8)
to_spin.grid(row=2, column=1, sticky="w", padx=(6,12))
ttk.Label(can_frame, text="Bitrate").grid(row=3, column=0, sticky="w")
br_var = tk.IntVar(value=int(bitrate))
br_spin = ttk.Spinbox(can_frame, from_=20000, to=1000000, increment=10000, textvariable=br_var, width=10)
br_spin.grid(row=3, column=1, sticky="w", padx=(6,12))
# unter Bitrate-Spinbox
set_params = tk.BooleanVar(value=True)
ttk.Checkbutton(can_frame, text="Bitrate beim UP setzen", variable=set_params).grid(row=3, column=2, sticky="w")
# add Kind-Anzeige
kind_label = ttk.Label(can_frame, text=f"Kind: {link_kind(can_iface)}")
kind_label.grid(row=0, column=3, sticky="w", padx=(12,0))
# Link control
def do_link_up():
try:
# Kind-Anzeige aktualisieren (falls Interface gewechselt)
kind_label.config(text=f"Kind: {link_kind(iface_var.get())}")
if link_state(iface_var.get()) == "UP":
messagebox.showinfo("CAN", f"{iface_var.get()} ist bereits UP")
return
# NEU: set_params aus Checkbox
link_up(iface_var.get(), bitrate=br_var.get(), fd=False, set_params=set_params.get())
msg = f"{iface_var.get()} ist UP"
if set_params.get():
msg += f" @ {br_var.get()} bit/s (falls vom Treiber unterstützt)"
else:
msg += " (Bitrate unverändert)"
messagebox.showinfo("CAN", msg)
except PermissionError as e:
messagebox.showerror("Berechtigung", str(e))
except Exception as e:
messagebox.showerror("CAN", f"Link UP fehlgeschlagen:\n{e}")
def do_link_down():
try:
if link_state(iface_var.get()) == "DOWN":
messagebox.showinfo("CAN", f"{iface_var.get()} ist bereits DOWN")
return
link_down(iface_var.get())
messagebox.showinfo("CAN", f"{iface_var.get()} ist DOWN")
except PermissionError as e:
messagebox.showerror("Berechtigung", str(e))
except Exception as e:
messagebox.showerror("CAN", f"Link DOWN fehlgeschlagen:\n{e}")
btn_up = ttk.Button(can_frame, text="Link UP", command=do_link_up)
btn_down = ttk.Button(can_frame, text="Link DOWN", command=do_link_down)
btn_up.grid(row=4, column=0, pady=(8,0), sticky="w")
btn_down.grid(row=4, column=1, pady=(8,0), sticky="w")
# Rebind responder
def do_rebind():
nonlocal can_iface, resp_id, timeout_ms, bitrate, tracer
can_iface = iface_var.get()
try:
new_resp = int(resp_var.get(), 16)
except Exception:
messagebox.showerror("RESP-ID", "Bitte gültige Hex-Zahl, z.B. 0x7E8")
return
resp_id = new_resp
timeout_ms = to_var.get()
bitrate = br_var.get()
try:
responder.rebind(interface=can_iface, resp_id=resp_id)
# Trace-Collector auf neues IF neu binden
try:
tracer.stop()
except Exception:
pass
tracer = TraceCollector(can_iface)
tracer.start()
messagebox.showinfo("CAN", f"Responder neu gebunden: {can_iface}, RESP 0x{resp_id:03X}")
except Exception as e:
messagebox.showerror("CAN", f"Rebind fehlgeschlagen:\n{e}")
ttk.Button(can_frame, text="Responder Rebind", command=do_rebind).grid(row=4, column=2, pady=(8,0), sticky="w")
# CAP-Status
caps_ok = have_cap_netadmin()
cap_label = ttk.Label(can_frame, text=f"CAP_NET_ADMIN: {'yes' if caps_ok else 'no'}")
cap_label.grid(row=6, column=0, columnspan=2, sticky="w", pady=(6,0))
if not caps_ok:
btn_up.state(["disabled"]); btn_down.state(["disabled"])
# Statusbar
status = ttk.Label(main, text=f"CAN: {can_iface} | RESP-ID: 0x{resp_id:03X}", relief="sunken", anchor="w")
status.grid(row=6, column=0, columnspan=2, sticky="ew", pady=(10,0))
# === TRACE-FENSTER (unten) ===
trace_frame = ttk.LabelFrame(root, text="CAN Trace", padding=6)
trace_frame.grid(row=1, column=0, sticky="nsew")
root.rowconfigure(1, weight=1)
trace_frame.columnconfigure(0, weight=1)
trace_frame.rowconfigure(1, weight=1)
# Controls: Mode, Pause, Clear, Autoscroll
ctrl = ttk.Frame(trace_frame)
ctrl.grid(row=0, column=0, sticky="ew", pady=(0,4))
ctrl.columnconfigure(5, weight=1)
mode_var = tk.StringVar(value="stream") # "stream" | "aggregate"
ttk.Label(ctrl, text="Modus:").grid(row=0, column=0, sticky="w")
mode_dd = ttk.Combobox(ctrl, textvariable=mode_var, state="readonly", width=10,
values=["stream", "aggregate"])
mode_dd.grid(row=0, column=1, sticky="w", padx=(4,12))
paused = tk.BooleanVar(value=False)
ttk.Checkbutton(ctrl, text="Pause", variable=paused).grid(row=0, column=2, sticky="w")
autoscroll = tk.BooleanVar(value=True)
ttk.Checkbutton(ctrl, text="Auto-Scroll", variable=autoscroll).grid(row=0, column=3, sticky="w")
def do_clear():
nonlocal aggregate_cache
tree.delete(*tree.get_children())
aggregate_cache.clear()
ttk.Button(ctrl, text="Clear", command=do_clear).grid(row=0, column=4, padx=(8,0), sticky="w")
# Treeview
cols_stream = ("time", "dir", "id", "dlc", "data")
cols_agg = ("id", "dir", "count", "last_time", "last_dlc", "last_data")
tree = ttk.Treeview(trace_frame, columns=cols_stream, show="headings", height=10)
tree.grid(row=1, column=0, sticky="nsew")
sb_y = ttk.Scrollbar(trace_frame, orient="vertical", command=tree.yview)
tree.configure(yscrollcommand=sb_y.set)
sb_y.grid(row=1, column=1, sticky="ns")
def setup_columns(mode: str):
tree.delete(*tree.get_children())
if mode == "stream":
tree.config(columns=cols_stream)
headings = [("time","Time"),("dir","Dir"),("id","ID"),("dlc","DLC"),("data","Data")]
widths = [140, 60, 90, 60, 520]
else:
tree.config(columns=cols_agg)
headings = [("id","ID"),("dir","Dir"),("count","Count"),("last_time","Last Time"),("last_dlc","DLC"),("last_data","Last Data")]
widths = [90, 60, 80, 140, 60, 520]
for (col, text), w in zip(headings, widths):
tree.heading(col, text=text)
tree.column(col, width=w, anchor="w")
setup_columns("stream")
aggregate_cache: dict[tuple[int,str], dict] = {}
def fmt_time(ts: float) -> str:
# hh:mm:ss.mmm
lt = time.localtime(ts)
return time.strftime("%H:%M:%S", lt) + f".{int((ts%1)*1000):03d}"
def fmt_id(i: int) -> str:
return f"0x{i:03X}"
def fmt_data(b: bytes) -> str:
return " ".join(f"{x:02X}" for x in b)
# periodic UI update
last_index = 0
def tick():
nonlocal can_iface, resp_id, last_index
# Top-Status
g, tval, rpm, spd = ecu.snapshot()
caps = "CAP:yes" if have_cap_netadmin() else "CAP:no"
st = link_state(can_iface)
lbl_speed.config(text=f"Speed: {int(round(spd))} km/h")
lbl_rpm.config(text=f"RPM: {rpm}")
st = link_state(can_iface)
kd = link_kind(can_iface)
status.config(text=f"CAN: {can_iface}({st},{kd}) | RESP-ID: 0x{resp_id:03X} | Gear {g} | Throttle {tval}% | {caps}")
# Trace
if not paused.get():
mode = mode_var.get()
if mode == "stream":
setup_columns("stream") if tree["columns"] != cols_stream else None
# append new items
buf = tracer.snapshot_stream()
# nur neue ab letztem Index
for ts, cid, dlc, data in buf[last_index:]:
# Richtung heuristisch
if cid == 0x7DF:
d = "RX"
elif cid == resp_id:
d = "TX"
else:
d = "?"
tree.insert("", "end",
values=(fmt_time(ts), d, fmt_id(cid), dlc, fmt_data(data)))
# autoscroll
if autoscroll.get() and buf[last_index:]:
tree.see(tree.get_children()[-1])
last_index = len(buf)
else:
setup_columns("aggregate") if tree["columns"] != cols_agg else None
# baue Aggregat neu (leicht, schnell)
buf = tracer.snapshot_stream()
agg: dict[tuple[int,str], dict] = {}
for ts, cid, dlc, data in buf:
if cid == 0x7DF:
d = "RX"
elif cid == resp_id:
d = "TX"
else:
d = "?"
key = (cid, d)
entry = agg.get(key)
if entry is None:
agg[key] = {"count":1, "last_ts":ts, "last_dlc":dlc, "last_data":data}
else:
entry["count"] += 1
if ts >= entry["last_ts"]:
entry["last_ts"] = ts
entry["last_dlc"] = dlc
entry["last_data"] = data
# nur neu zeichnen, wenn sich was ändert
if agg != aggregate_cache:
tree.delete(*tree.get_children())
# sortiert nach ID, RX vor TX
for (cid, d) in sorted(agg.keys(), key=lambda k:(k[0], 0 if k[1]=="RX" else 1)):
e = agg[(cid,d)]
tree.insert("", "end",
values=(fmt_id(cid), d, e["count"],
fmt_time(e["last_ts"]),
e["last_dlc"], fmt_data(e["last_data"])))
aggregate_cache.clear()
aggregate_cache.update(agg)
root.after(50, tick)
tick()
def on_close():
nonlocal running
running = False
try:
tracer.stop()
except Exception:
pass
try:
responder.stop()
finally:
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_close)
root.mainloop()

66
app/simulator.py Normal file
View File

@@ -0,0 +1,66 @@
# simulator.py — Driveline & ECU-State
from __future__ import annotations
import threading
import time
from dataclasses import dataclass
@dataclass
class DrivelineModel:
idle_rpm: int = 1400
max_rpm: int = 9500
kmh_per_krpm: tuple = (0.0, 12.0, 19.0, 25.0, 32.0, 38.0, 45.0)
rpm_rise_per_s: int = 5000
rpm_fall_per_s: int = 3500
def target_rpm_from_throttle(self, throttle_pct: int) -> int:
t = max(0, min(100, throttle_pct)) / 100.0
return int(self.idle_rpm + t * (self.max_rpm - self.idle_rpm))
def speed_from_rpm_gear(self, rpm: int, gear: int) -> float:
if gear <= 0:
return 0.0
k = self.kmh_per_krpm[min(gear, len(self.kmh_per_krpm) - 1)]
return (rpm / 1000.0) * k
class EcuState:
"""Thread-sichere Zustandsmaschine (Gang, Gas, RPM, Speed)."""
def __init__(self, model: DrivelineModel | None = None) -> None:
self.model = model or DrivelineModel()
self._lock = threading.Lock()
self._gear = 0
self._throttle = 0
self._rpm = self.model.idle_rpm
self._speed = 0.0
self._last = time.monotonic()
def set_gear(self, gear: int) -> None:
with self._lock:
self._gear = max(0, min(6, int(gear)))
def set_throttle(self, thr: int) -> None:
with self._lock:
self._throttle = max(0, min(100, int(thr)))
def snapshot(self) -> tuple[int, int, int, float]:
with self._lock:
return self._gear, self._throttle, self._rpm, self._speed
def update(self) -> None:
now = time.monotonic()
dt = max(0.0, min(0.1, now - self._last))
self._last = now
with self._lock:
target = self.model.target_rpm_from_throttle(self._throttle)
if self._rpm < target:
self._rpm = min(self._rpm + int(self.model.rpm_rise_per_s * dt), target)
else:
self._rpm = max(self._rpm - int(self.model.rpm_fall_per_s * dt), target)
min_idle = 800 if self._gear == 0 and self._throttle == 0 else self.model.idle_rpm
self._rpm = max(min_idle, min(self._rpm, self.model.max_rpm))
target_speed = self.model.speed_from_rpm_gear(self._rpm, self._gear)
alpha = min(1.0, 4.0 * dt)
if self._gear == 0:
target_speed = 0.0
self._speed = (1 - alpha) * self._speed + alpha * target_speed
self._speed = max(0.0, min(self._speed, 299.0))

0
logs/.gitkeep Normal file
View File

4
main.py Normal file
View File

@@ -0,0 +1,4 @@
from app.gui import launch_gui
if __name__ == "__main__":
launch_gui()

2
requirements.txt Normal file
View File

@@ -0,0 +1,2 @@
python-can
pyroute2

18
start.sh Executable file
View File

@@ -0,0 +1,18 @@
#!/usr/bin/env bash
set -euo pipefail
# Choose python (allow override with $PYTHON)
PYTHON_BIN="${PYTHON:-python3}"
VENV_DIR=".venv"
if [ ! -d "$VENV_DIR" ]; then
"$PYTHON_BIN" -m venv "$VENV_DIR"
fi
# shellcheck disable=SC1091
source "$VENV_DIR/bin/activate"
python -m pip install --upgrade pip
pip install -r requirements.txt
exec python main.py "$@"