From 4c41be706da182268f13c59ff922b2f06124ea65 Mon Sep 17 00:00:00 2001 From: Marcel Peterkau Date: Wed, 27 Aug 2025 19:05:45 +0200 Subject: [PATCH] Fixed premission-issues with additional script --- app/can.py | 427 ++++++++++++++++++++++++------------------------- app/gui.py | 18 ++- app/obd2.py | 154 ++++++++++++++++++ prepare_can.sh | 56 +++++++ 4 files changed, 436 insertions(+), 219 deletions(-) create mode 100644 app/obd2.py create mode 100755 prepare_can.sh diff --git a/app/can.py b/app/can.py index d6e4ffc..d2824b9 100644 --- a/app/can.py +++ b/app/can.py @@ -1,23 +1,26 @@ -# can.py — SocketCAN OBD-II Responder + Link-Control +# app/can.py — SocketCAN Link-Control + Diagnostics (pyroute2) + Helpers from __future__ import annotations -import logging -import threading -import time -import os -import sys -import subprocess -from typing import Callable, Dict, Optional, List -from pyroute2 import IPRoute, NetlinkError -import json -import can -OBD_REQ_ID = 0x7DF -PID_SPEED = 0x0D -PID_RPM = 0x0C +import json +import logging +import math +import os +import subprocess +import sys +from typing import List, Tuple +import shutil + +from pyroute2 import IPRoute, NetlinkError + +# Logger (wird von deinem config.setup_logging konfiguriert) +log = logging.getLogger("configapp") + +# ------------------------- Capability-Helpers ------------------------------- CAP_NET_ADMIN_BIT = 12 # Linux CAP_NET_ADMIN def have_cap_netadmin() -> bool: + """Prüft, ob der aktuelle Prozess CAP_NET_ADMIN effektiv hat (oder root ist).""" if os.geteuid() == 0: return True try: @@ -33,31 +36,77 @@ def have_cap_netadmin() -> bool: def need_caps_message() -> str: exe = os.path.realpath(sys.executable) return ( - "Keine Berechtigung für 'ip link'.\n\n" + "Keine Berechtigung für Netlink-Änderungen.\n\n" "Option A) Als root starten (sudo)\n" "Option B) Capabilities auf das laufende Python setzen:\n" f" sudo setcap cap_net_admin,cap_net_raw=eip \"{exe}\"\n" f" getcap \"{exe}\"\n" ) +def _ip_json(iface: str) -> dict: + out = subprocess.check_output(["ip", "-details", "-json", "link", "show", iface], text=True) + return json.loads(out)[0] + +def _read_bittiming(iface: str) -> dict: + try: + j = _ip_json(iface) + return ((j.get("linkinfo") or {}).get("info_data") or {}).get("bittiming") or {} + except Exception: + return {} + +def _cli(cmd: list[str]) -> tuple[int, str, str]: + r = subprocess.run(cmd, capture_output=True, text=True) + return r.returncode, (r.stdout or "").strip(), (r.stderr or "").strip() + +def ensure_capabilities() -> None: + """prüft, ob Python und ip die nötigen Rechte haben; sonst RuntimeError.""" + problems = [] + if not have_cap_netadmin(): + problems.append("• Dem laufenden Python fehlen CAP_NET_ADMIN / CAP_NET_RAW.") + ip_bin = shutil.which("ip") + if not ip_bin: + problems.append("• 'ip' (iproute2) nicht gefunden.") + else: + try: + out = subprocess.check_output(["getcap", ip_bin], text=True) + if "cap_net_admin" not in out: + problems.append(f"• '{ip_bin}' hat keine cap_net_admin/cap_net_raw.") + except Exception: + problems.append("• Konnte Capabilities von 'ip' nicht prüfen.") + + if problems: + raise RuntimeError( + "CAN Link-Operation nicht möglich. Bitte führe einmalig:\n" + " sudo prepare_can.sh\n\n" + "Probleme:\n" + "\n".join(problems) + ) + +# --------------------------- Interface-Helpers ------------------------------ + def list_can_ifaces() -> List[str]: - """Listet verfügbare CAN/vCAN-Interfaces via `ip -json link`.""" + """Listet can*/vcan* Interfaces auf (erst ip -json, dann /sys Fallback).""" try: out = subprocess.check_output(["ip", "-json", "link"], text=True) - import json items = json.loads(out) - names = [i["ifname"] for i in items if i.get("link_type") in ("can", None) and (i["ifname"].startswith("can") or i["ifname"].startswith("vcan"))] + names = [ + it["ifname"] + for it in items + if it.get("link_type") in ("can", None) + and (it["ifname"].startswith("can") or it["ifname"].startswith("vcan")) + ] return sorted(set(names)) except Exception: - # Fallback: /sys/class/net try: - import os names = [n for n in os.listdir("/sys/class/net") if n.startswith(("can", "vcan"))] return sorted(set(names)) except Exception: return [] - -def _link_info(ipr: IPRoute, iface: str): + +def _ip_show_json(iface: str) -> dict: + out = subprocess.check_output(["ip", "-details", "-json", "link", "show", iface], text=True) + return json.loads(out)[0] + +def _link_info(ipr: IPRoute, iface: str) -> Tuple[int, str, dict]: idxs = ipr.link_lookup(ifname=iface) if not idxs: raise RuntimeError(f"Interface '{iface}' nicht gefunden") @@ -72,6 +121,7 @@ def _link_info(ipr: IPRoute, iface: str): return idxs[0], kind, info def link_state(iface: str) -> str: + """Gibt 'UP'/'DOWN'/… zurück.""" try: with IPRoute() as ipr: idx, _, info = _link_info(ipr, iface) @@ -85,51 +135,8 @@ def link_state(iface: str) -> str: pass return "UNKNOWN" -def link_up(iface: str, bitrate: int = 500000, fd: bool = False, set_params: bool = True) -> None: - with IPRoute() as ipr: - idx, kind, _ = _link_info(ipr, iface) - - # Wenn bereits UP: nichts tun. - try: - cur = ipr.get_links(idx)[0] - if (cur.get("state") or "").upper() == "UP": - return - except Exception: - pass - - # 1) down (ignoriere "invalid argument" / already down) - try: - ipr.link("set", index=idx, state="down") - except NetlinkError as e: - if e.code not in (0, 22): - raise RuntimeError(f"Netlink error bei '{iface}' (down): {e.code} {e}") from e - - # 2) optional: Parameter nur bei 'can' – EINVAL ignorieren - if set_params and kind == "can": - try: - ipr.link("set", index=idx, kind="can", data={"bitrate": int(bitrate)}) - except NetlinkError as e: - if e.code != 22: # alles außer EINVAL weiterreichen - raise RuntimeError(f"Netlink error bei '{iface}' (bitrate): {e.code} {e}") from e - # EINVAL -> Treiber mag Param-Change jetzt nicht -> einfach weitermachen - - # 3) up (hier darf es notfalls knallen) - try: - ipr.link("set", index=idx, state="up") - except NetlinkError as e: - raise RuntimeError(f"Netlink error bei '{iface}' (up): {e.code} {e}") from e - -def link_down(iface: str) -> None: - with IPRoute() as ipr: - idx, _, _ = _link_info(ipr, iface) - try: - ipr.link("set", index=idx, state="down") - except NetlinkError as e: - if e.code not in (0, 22): # ignore EINVAL/OK - raise RuntimeError(f"Netlink error bei '{iface}' (down): {e.code} {e}") from e - def link_kind(iface: str) -> str: - """liefert nur das INFO_KIND (z.B. 'can', 'slcan', 'vcan', 'none')""" + """liefert das INFO_KIND (z. B. 'can', 'slcan', 'vcan', 'none').""" try: with IPRoute() as ipr: _, kind, _ = _link_info(ipr, iface) @@ -137,167 +144,155 @@ def link_kind(iface: str) -> str: except Exception: return "unknown" +# ------------------------- Bit-Timing Berechnung ---------------------------- -class ObdResponder: +def _extract_can_caps(j: dict): + """holt clock & bittiming_const aus ip -details -json.""" + li = (j.get("linkinfo") or {}) + info = (li.get("info_data") or {}) + btc = info.get("bittiming_const") or {} + clock = info.get("clock") + limits = { + "tseg1_min": (btc.get("tseg1") or {}).get("min", 2), + "tseg1_max": (btc.get("tseg1") or {}).get("max", 256), + "tseg2_min": (btc.get("tseg2") or {}).get("min", 2), + "tseg2_max": (btc.get("tseg2") or {}).get("max", 128), + "sjw_min" : (btc.get("sjw") or {}).get("min", 1), + "sjw_max" : (btc.get("sjw") or {}).get("max", 128), + "brp_min" : (btc.get("brp") or {}).get("min", 1), + "brp_max" : (btc.get("brp") or {}).get("max", 512), + "brp_inc" : (btc.get("brp_inc") or 1), + } + return clock, limits + +def _compute_bittiming(clock_hz: int, bitrate: int, limits: dict, target_sp: float = 0.875): """ - OBD-II Mode-01 PID-Responder über SocketCAN (11-bit). - Non-blocking, threadsicher, mit Rebind-Funktion und robustem Reopen, - falls das Interface DOWN ist. + Sucht brp, tseg1(=prop_seg+phase_seg1), tseg2, sjw s.d. + bitrate ~= clock / (brp * (1 + tseg1 + tseg2)) + sample_point ~= (1 + tseg1) / (1 + tseg1 + tseg2) + Liefert dict für info_data['bittiming'] (oder None). """ - def __init__( - self, - interface: str, - resp_id: int, - timeout_ms: int = 200, - logger: Optional[logging.Logger] = None, - ): - self.interface = interface - self.resp_id = resp_id - self.timeout_ms = timeout_ms - self.log = logger or logging.getLogger("obdcan") + brp_min, brp_max, brp_inc = limits["brp_min"], limits["brp_max"], max(1, limits["brp_inc"]) + tseg1_min, tseg1_max = limits["tseg1_min"], limits["tseg1_max"] + tseg2_min, tseg2_max = limits["tseg2_min"], limits["tseg2_max"] + sjw_max = limits["sjw_max"] - # PID-Provider: pid -> callable() -> 8-Byte-Payload - self.providers: Dict[int, Callable[[], bytes]] = {} + best = None + best_err = float("inf"); best_sp_err = float("inf") - # Laufzustand / CAN-Ressourcen - self._run = threading.Event() - self._run.set() - self.bus: Optional[can.BusABC] = None - self.reader: Optional[can.BufferedReader] = None - self.notifier: Optional[can.Notifier] = None - - # Service-Thread, der bei IF=UP den Bus öffnet und RX abwickelt - self._thread = threading.Thread( - target=self._service_loop, name="OBD-SVC", daemon=True - ) - self._thread.start() - - # ---------- Lifecycle ---------- - def _open_bus(self) -> None: - # ggf. alte Ressourcen schließen, dann neu öffnen - self._close_bus() - self._close_bus() - self.bus = can.interface.Bus(channel=self.interface, interface="socketcan") - self.log.info("OBD responder started on %s (resp_id=0x%03X)", self.interface, self.resp_id) - self.log.info( - "OBD responder started on %s (resp_id=0x%03X)", - self.interface, self.resp_id - ) - - def _close_bus(self) -> None: - try: - if self.notifier: - self.notifier.stop() - except Exception: - pass - try: - if self.bus: - self.bus.shutdown() - except Exception: - pass - self.bus = None - - def stop(self) -> None: - self._run.clear() - try: - self._thread.join(timeout=1.0) - except RuntimeError: - pass - self._close_bus() - - def rebind(self, interface: Optional[str] = None, resp_id: Optional[int] = None) -> None: - if interface is not None: - self.interface = interface - if resp_id is not None: - self.resp_id = resp_id - # Bus schließen; Service-Loop öffnet ihn wieder, sobald IF=UP ist - self._close_bus() - self.log.info("Rebind requested: %s, resp=0x%03X", self.interface, self.resp_id) - - # ---------- Öffentliche API ---------- - def register_pid(self, pid: int, provider: Callable[[], bytes]) -> None: - self.providers[pid] = provider - - # ---------- Service-Loop (robust gegen 'Network is down') ---------- - def _service_loop(self) -> None: - backoff = 0.5 - while self._run.is_set(): - # Bus öffnen, wenn IF up ist - if self.bus is None: - if link_state(self.interface) == "UP": - try: - self._open_bus() - backoff = 0.5 - except Exception as e: - self.log.warning("Bus open failed: %s", e) - time.sleep(backoff) - backoff = min(5.0, backoff * 1.7) - continue - else: - time.sleep(0.5) + for brp in range(brp_min, brp_max + 1, brp_inc): + tq_per_bit_f = clock_hz / (brp * float(bitrate)) + tq_min = max(1 + tseg1_min + tseg2_min, int(tq_per_bit_f) - 2) + tq_max = min(1 + tseg1_max + tseg2_max, int(tq_per_bit_f) + 2) + for tq in range(tq_min, tq_max + 1): + tseg1_f = target_sp * tq - 1.0 + for tseg1 in (math.floor(tseg1_f), math.ceil(tseg1_f)): + tseg1 = int(tseg1) + if not (tseg1_min <= tseg1 <= tseg1_max): continue + tseg2 = (tq - 1) - tseg1 + if not (tseg2_min <= tseg2 <= tseg2_max): + continue + real_bitrate = clock_hz / (brp * tq) + err = abs(real_bitrate - bitrate) + sp = (1.0 + tseg1) / tq + sp_err = abs(sp - target_sp) + if (err < best_err - 1e-6) or (abs(err - best_err) < 1e-6 and sp_err < best_sp_err): + best_err, best_sp_err = err, sp_err + phase_seg1 = max(1, min(tseg1 - 1, tseg1 // 2)) + prop_seg = tseg1 - phase_seg1 + sjw = min(sjw_max, max(1, min(tseg2, 16))) + best = { + "brp": int(brp), + "prop_seg": int(prop_seg), + "phase_seg1": int(phase_seg1), + "phase_seg2": int(tseg2), + "sjw": int(sjw), + } + return best - # RX: eigene Poll‑Schleife statt Notifier - try: - msg = self.bus.recv(0.05) # blocking short timeout - if msg is not None: - self._handle(msg) - except (can.CanOperationError, OSError): - # IF ging down -> Bus schließen und später neu öffnen - self.log.info("CAN went DOWN — closing bus, will retry…") - self._close_bus() - time.sleep(0.5) - except Exception as e: - self.log.warning("CAN recv error: %s", e) - time.sleep(0.1) +# --------------------------- Link Up / Down -------------------------------- - # ---------- Message-Handler ---------- - def _handle(self, msg: can.Message) -> None: - if msg.is_extended_id or msg.arbitration_id != OBD_REQ_ID: - return - data = bytes(msg.data) - if len(data) < 3: - return - - # tolerant: 02 01 ... oder 01 ... - if data[0] == 0x02 and len(data) >= 3: - mode, pid = data[1], data[2] - else: - mode, pid = data[0], (data[1] if len(data) >= 2 else None) - - if mode != 0x01 or pid is None: - return - - provider = self.providers.get(pid) - if not provider: +def link_up(iface: str, bitrate: int = 500000, fd: bool = False, set_params: bool = True) -> None: + ensure_capabilities() + # Snapshot vorher + try: + j0 = _ip_json(iface) + except Exception as e: + raise RuntimeError(f"Interface '{iface}' nicht lesbar: {e}") + kind = (j0.get("linkinfo") or {}).get("info_kind", "unknown") + state = j0.get("operstate", "UNKNOWN").upper() + log.info(f"[link_up] iface={iface} kind={kind} state={state} target_bitrate={bitrate}") + + with IPRoute() as ipr: + idx, _, _ = _link_info(ipr, iface) + + if state == "UP": + log.info("[link_up] already UP -> nothing to do") return + # 1) DOWN try: - payload = provider() - if not isinstance(payload, (bytes, bytearray)) or len(payload) != 8: - return - out = can.Message( - arbitration_id=self.resp_id, - is_extended_id=False, - data=payload, - dlc=8 - ) - if self.bus: - self.bus.send(out) - except can.CanError: - self.log.warning("CAN send failed (bus off?)") - except Exception as e: - self.log.exception("Provider error: %s", e) + ipr.link("set", index=idx, state="down") + log.info("[link_up] state down -> OK") + except NetlinkError as e: + if e.code not in (0, 22): + raise RuntimeError(f"Netlink error bei '{iface}' (down): {e.code} {e}") from e -# --- Ende Patch ObdResponder ----------------------------------------------- + # 2) bittiming setzen (nur 'can') + if set_params and kind == "can": + # Clock & Limits holen + caps = _ip_json(iface) + clock, limits = _extract_can_caps(caps) + if not clock: + raise RuntimeError("CAN-Clock nicht gefunden – kann Bit-Timing nicht berechnen.") + cfg = _compute_bittiming(clock, int(bitrate), limits, target_sp=0.875) + if not cfg: + raise RuntimeError("Kein gültiges Bit-Timing innerhalb der Treibergrenzen gefunden.") + log.info(f"[link_up] computed bittiming: {cfg} (clock={clock})") -# Helfer fürs Formatieren -def make_speed_response(speed_kmh: int) -> bytes: - A = max(0, min(255, int(speed_kmh))) - return bytes([0x03, 0x41, PID_SPEED, A, 0x00, 0x00, 0x00, 0x00]) + # 2a) pyroute2: bittiming setzen + try: + ipr.link("set", index=idx, kind="can", info_data={"bittiming": cfg}) + log.info("[link_up] pyroute2: set bittiming -> OK") + except NetlinkError as e: + log.error(f"[link_up] pyroute2: set bittiming FAILED: code={e.code} msg={e}") + # wir probieren weiter mit CLI-Fallback -def make_rpm_response(rpm: int) -> bytes: - raw = max(0, int(rpm)) * 4 - A = (raw >> 8) & 0xFF - B = raw & 0xFF - return bytes([0x04, 0x41, PID_RPM, A, B, 0x00, 0x00, 0x00]) + # 2b) prüfen, ob Kernel bittiming akzeptiert hat + bt = _read_bittiming(iface) + log.info(f"[link_up] kernel bittiming after pyroute2: {bt}") + if not bt or int(bt.get("bitrate", 0)) <= 0: + # 2c) CLI-Fallback – GENAU wie dein funktionierender Handtest + rc, out, err = _cli(["ip", "link", "set", iface, "down"]) + log.info(f"[CLI] ip link set {iface} down (rc={rc})") + if out: log.info("[CLI STDOUT] " + out) + if err: log.info("[CLI STDERR] " + err) + + rc, out, err = _cli(["ip", "link", "set", iface, "type", "can", "bitrate", str(int(bitrate))]) + log.info(f"[CLI] ip link set {iface} type can bitrate {bitrate} (rc={rc})") + if out: log.info("[CLI STDOUT] " + out) + if err: log.info("[CLI STDERR] " + err) + + bt = _read_bittiming(iface) + log.info(f"[link_up] kernel bittiming after CLI: {bt}") + if not bt or int(bt.get("bitrate", 0)) <= 0: + raise RuntimeError("Bit-Timing wurde nicht übernommen (Kernel meldet kein bittiming).") + + # 3) UP + try: + ipr.link("set", index=idx, state="up") + log.info("[link_up] state up -> OK") + except NetlinkError as e: + raise RuntimeError(f"Netlink error bei '{iface}' (up): {e.code} {e}") from e + +def link_down(iface: str) -> None: + """Bringt das Interface sicher auf DOWN (idempotent).""" + with IPRoute() as ipr: + idx, _, _ = _link_info(ipr, iface) + try: + ipr.link("set", index=idx, state="down") + log.info("[link_down] state down -> OK") + except NetlinkError as e: + if e.code not in (0, 22): + raise RuntimeError(f"Netlink error bei '{iface}' (down): {e.code} {e}") from e diff --git a/app/gui.py b/app/gui.py index e2379d3..2a4a4ab 100644 --- a/app/gui.py +++ b/app/gui.py @@ -6,15 +6,16 @@ import time import tkinter as tk from tkinter import ttk, messagebox from collections import deque, defaultdict +import subprocess import can # nur für Trace-Reader from .config import load_settings, setup_logging, SETTINGS_PATH, APP_ROOT from .simulator import EcuState, DrivelineModel +from .obd2 import ObdResponder, make_speed_response, make_rpm_response from .can import ( - ObdResponder, make_speed_response, make_rpm_response, - list_can_ifaces, link_up, link_down, - have_cap_netadmin, link_state, link_kind + list_can_ifaces, link_up, link_down, link_state, link_kind, + have_cap_netadmin, need_caps_message ) @@ -215,6 +216,17 @@ def launch_gui(): # NEU: set_params aus Checkbox link_up(iface_var.get(), bitrate=br_var.get(), fd=False, set_params=set_params.get()) msg = f"{iface_var.get()} ist UP" + # nach erfolgreichem link_up(...) – in gui.py + try: + out = subprocess.check_output(["ip", "-details", "-json", "link", "show", iface_var.get()], text=True) + info = json.loads(out)[0] + bt = (info.get("linkinfo", {}) or {}).get("info_data", {}).get("bittiming") or {} + br = bt.get("bitrate"); sp = bt.get("sample-point") + if br: + messagebox.showinfo("CAN", f"{iface_var.get()} ist UP @ {br} bit/s (sample-point {sp})") + except Exception: + pass + if set_params.get(): msg += f" @ {br_var.get()} bit/s (falls vom Treiber unterstützt)" else: diff --git a/app/obd2.py b/app/obd2.py new file mode 100644 index 0000000..fd7c879 --- /dev/null +++ b/app/obd2.py @@ -0,0 +1,154 @@ +# app/obd2.py — OBD-II Responder (SocketCAN) + Payload-Helpers +from __future__ import annotations + +import logging +import threading +import time +from typing import Callable, Dict, Optional + +import can + +from .can import link_state # nur für "IF ist UP?"-Check + +# 11-bit OBD-II IDs (ISO 15765-4) +OBD_REQ_ID = 0x7DF +PID_SPEED = 0x0D +PID_RPM = 0x0C + +class ObdResponder: + """ + OBD-II Mode 01 PID-Responder (11-bit) über SocketCAN. + - eigener Polling-RX-Loop (kein Notifier -> keine Stacktraces beim Link DOWN) + - rebind(interface/resp_id) schließt Bus; Loop öffnet neu, sobald IF=UP + """ + def __init__( + self, + interface: str, + resp_id: int, + timeout_ms: int = 200, + logger: Optional[logging.Logger] = None, + ): + self.interface = interface + self.resp_id = resp_id + self.timeout_ms = timeout_ms + self.log = logger or logging.getLogger("obdcan") + + self.providers: Dict[int, Callable[[], bytes]] = {} + self._run = threading.Event(); self._run.set() + + self.bus: Optional[can.BusABC] = None + self._thread = threading.Thread(target=self._service_loop, name="OBD-SVC", daemon=True) + self._thread.start() + + # ---------- Lifecycle ---------- + def _open_bus(self) -> None: + self._close_bus() + self.bus = can.interface.Bus(channel=self.interface, interface="socketcan") + self.log.info("OBD responder started on %s (resp_id=0x%03X)", self.interface, self.resp_id) + + def _close_bus(self) -> None: + try: + if self.bus: + self.bus.shutdown() + except Exception: + pass + self.bus = None + + def stop(self) -> None: + self._run.clear() + try: + self._thread.join(timeout=1.0) + except RuntimeError: + pass + self._close_bus() + + def rebind(self, interface: Optional[str] = None, resp_id: Optional[int] = None) -> None: + if interface is not None: + self.interface = interface + if resp_id is not None: + self.resp_id = resp_id + self._close_bus() + self.log.info("Rebind requested: %s, resp=0x%03X", self.interface, self.resp_id) + + # ---------- API ---------- + def register_pid(self, pid: int, provider: Callable[[], bytes]) -> None: + self.providers[pid] = provider + + # ---------- RX/Service ---------- + def _service_loop(self) -> None: + backoff = 0.5 + while self._run.is_set(): + if self.bus is None: + # Öffnen nur, wenn Interface UP ist + if link_state(self.interface) == "UP": + try: + self._open_bus() + backoff = 0.5 + except Exception as e: + self.log.warning("Bus open failed: %s", e) + time.sleep(backoff); backoff = min(5.0, backoff * 1.7) + continue + else: + time.sleep(0.5); continue + + try: + msg = self.bus.recv(0.05) + if msg is not None: + self._handle(msg) + except (can.CanOperationError, OSError): + # IF ging down -> Bus schließen und später neu öffnen + self.log.info("CAN went DOWN — closing bus, will retry…") + self._close_bus() + time.sleep(0.5) + except Exception as e: + self.log.warning("CAN recv error: %s", e) + time.sleep(0.1) + + def _handle(self, msg: can.Message) -> None: + if msg.is_extended_id or msg.arbitration_id != OBD_REQ_ID: + return + data = bytes(msg.data) + if len(data) < 2: + return + + # tolerant: 02 01 ... oder 01 ... + if data[0] == 0x02 and len(data) >= 3: + mode, pid = data[1], data[2] + else: + mode, pid = data[0], (data[1] if len(data) >= 2 else None) + + if mode != 0x01 or pid is None: + return + + provider = self.providers.get(pid) + if not provider: + return + + try: + payload = provider() + if not isinstance(payload, (bytes, bytearray)) or len(payload) != 8: + return + out = can.Message( + arbitration_id=self.resp_id, + is_extended_id=False, + data=payload, + dlc=8 + ) + if self.bus: + self.bus.send(out) + except can.CanError: + self.log.warning("CAN send failed (bus off?)") + except Exception as e: + self.log.exception("Provider error: %s", e) + +# ------------------------- Payload-Helpers ---------------------------------- + +def make_speed_response(speed_kmh: int) -> bytes: + A = max(0, min(255, int(speed_kmh))) + return bytes([0x03, 0x41, PID_SPEED, A, 0x00, 0x00, 0x00, 0x00]) + +def make_rpm_response(rpm: int) -> bytes: + raw = max(0, int(rpm)) * 4 + A = (raw >> 8) & 0xFF + B = raw & 0xFF + return bytes([0x04, 0x41, PID_RPM, A, B, 0x00, 0x00, 0x00]) diff --git a/prepare_can.sh b/prepare_can.sh new file mode 100755 index 0000000..51b9889 --- /dev/null +++ b/prepare_can.sh @@ -0,0 +1,56 @@ +#!/usr/bin/env bash +set -euo pipefail + +# === OBD-II Simulator – Systemvorbereitung === +# Verwendung: +# sudo ./prepare_can.sh # einrichten +# sudo ./prepare_can.sh --undo # rückgängig machen + +IP_BIN="${IP_BIN:-$(command -v ip || true)}" +PY_BIN="${PY_BIN:-$(python3 -c 'import sys,os; print(os.path.realpath(sys.executable))' 2>/dev/null || true)}" + +if [[ "$#" -gt 0 && "$1" == "--undo" ]]; then + echo ">>> Entferne Capabilities von Python und ip …" + if [[ -n "$PY_BIN" && -x "$PY_BIN" ]]; then + sudo setcap -r "$PY_BIN" || true + getcap "$PY_BIN" || true + fi + if [[ -n "$IP_BIN" && -x "$IP_BIN" ]]; then + sudo setcap -r "$IP_BIN" || true + getcap "$IP_BIN" || true + fi + echo ">>> Undo abgeschlossen." + exit 0 +fi + +if [[ -z "$PY_BIN" || ! -x "$PY_BIN" ]]; then + echo "FEHLER: Konnte Python-Interpreter nicht ermitteln. PY_BIN=… setzen." + exit 1 +fi +if [[ -z "$IP_BIN" || ! -x "$IP_BIN" ]]; then + echo "FEHLER: 'ip' (iproute2) nicht gefunden – bitte installieren." + exit 1 +fi + +echo ">>> Python: $PY_BIN" +echo ">>> ip: $IP_BIN" + +echo ">>> Setze Capabilities auf Python und ip …" +sudo setcap cap_net_admin,cap_net_raw=eip "$PY_BIN" || true +sudo setcap cap_net_admin,cap_net_raw=ep "$IP_BIN" || true + +echo ">>> Prüfe Capabilities:" +getcap "$PY_BIN" || true +getcap "$IP_BIN" || true + +# Optional: vcan0 einrichten +if ! ip link show vcan0 &>/dev/null; then + echo ">>> Richte vcan0 ein (virtuelles CAN, nur zu Testzwecken)…" + sudo modprobe vcan || true + sudo ip link add dev vcan0 type vcan || true +fi +sudo ip link set vcan0 up || true +ip -details link show vcan0 | sed 's/^/ /' + +echo ">>> Vorbereitung abgeschlossen." +echo "Starte die App jetzt einfach normal (ohne sudo)."