# app/can.py — SocketCAN Link-Control + Diagnostics (pyroute2) + Helpers from __future__ import annotations import json import logging import math import os import subprocess import sys from typing import List, Tuple import shutil from pyroute2 import IPRoute, NetlinkError # Logger (wird von deinem config.setup_logging konfiguriert) log = logging.getLogger("configapp") # ------------------------- Capability-Helpers ------------------------------- CAP_NET_ADMIN_BIT = 12 # Linux CAP_NET_ADMIN def have_cap_netadmin() -> bool: """Prüft, ob der aktuelle Prozess CAP_NET_ADMIN effektiv hat (oder root ist).""" if os.geteuid() == 0: return True try: with open("/proc/self/status", "r", encoding="utf-8") as f: for line in f: if line.startswith("CapEff:"): mask = int(line.split()[1], 16) return bool(mask & (1 << CAP_NET_ADMIN_BIT)) except Exception: pass return False def need_caps_message() -> str: exe = os.path.realpath(sys.executable) return ( "Keine Berechtigung für Netlink-Änderungen.\n\n" "Option A) Als root starten (sudo)\n" "Option B) Capabilities auf das laufende Python setzen:\n" f" sudo setcap cap_net_admin,cap_net_raw=eip \"{exe}\"\n" f" getcap \"{exe}\"\n" ) def _ip_json(iface: str) -> dict: out = subprocess.check_output(["ip", "-details", "-json", "link", "show", iface], text=True) return json.loads(out)[0] def _read_bittiming(iface: str) -> dict: try: j = _ip_json(iface) return ((j.get("linkinfo") or {}).get("info_data") or {}).get("bittiming") or {} except Exception: return {} def _cli(cmd: list[str]) -> tuple[int, str, str]: r = subprocess.run(cmd, capture_output=True, text=True) return r.returncode, (r.stdout or "").strip(), (r.stderr or "").strip() def ensure_capabilities() -> None: """prüft, ob Python und ip die nötigen Rechte haben; sonst RuntimeError.""" problems = [] if not have_cap_netadmin(): problems.append("• Dem laufenden Python fehlen CAP_NET_ADMIN / CAP_NET_RAW.") ip_bin = shutil.which("ip") if not ip_bin: problems.append("• 'ip' (iproute2) nicht gefunden.") else: try: out = subprocess.check_output(["getcap", ip_bin], text=True) if "cap_net_admin" not in out: problems.append(f"• '{ip_bin}' hat keine cap_net_admin/cap_net_raw.") except Exception: problems.append("• Konnte Capabilities von 'ip' nicht prüfen.") if problems: raise RuntimeError( "CAN Link-Operation nicht möglich. Bitte führe einmalig:\n" " sudo prepare_can.sh\n\n" "Probleme:\n" + "\n".join(problems) ) # --------------------------- Interface-Helpers ------------------------------ def list_can_ifaces() -> List[str]: """Listet can*/vcan* Interfaces auf (erst ip -json, dann /sys Fallback).""" try: out = subprocess.check_output(["ip", "-json", "link"], text=True) items = json.loads(out) names = [ it["ifname"] for it in items if it.get("link_type") in ("can", None) and (it["ifname"].startswith("can") or it["ifname"].startswith("vcan")) ] return sorted(set(names)) except Exception: try: names = [n for n in os.listdir("/sys/class/net") if n.startswith(("can", "vcan"))] return sorted(set(names)) except Exception: return [] def _ip_show_json(iface: str) -> dict: out = subprocess.check_output(["ip", "-details", "-json", "link", "show", iface], text=True) return json.loads(out)[0] def _link_info(ipr: IPRoute, iface: str) -> Tuple[int, str, dict]: idxs = ipr.link_lookup(ifname=iface) if not idxs: raise RuntimeError(f"Interface '{iface}' nicht gefunden") info = ipr.get_links(idxs[0])[0] kind = "none" for k, v in info.get("attrs", []): if k == "IFLA_LINKINFO": for kk, vv in v.get("attrs", []): if kk == "IFLA_INFO_KIND": kind = str(vv) break return idxs[0], kind, info def link_state(iface: str) -> str: """Gibt 'UP'/'DOWN'/… zurück.""" try: with IPRoute() as ipr: idx, _, info = _link_info(ipr, iface) st = info.get("state") if isinstance(st, str): return st.upper() for k, v in info.get("attrs", []): if k == "IFLA_OPERSTATE": return str(v).upper() except Exception: pass return "UNKNOWN" def link_kind(iface: str) -> str: """liefert das INFO_KIND (z. B. 'can', 'slcan', 'vcan', 'none').""" try: with IPRoute() as ipr: _, kind, _ = _link_info(ipr, iface) return kind except Exception: return "unknown" # ------------------------- Bit-Timing Berechnung ---------------------------- def _extract_can_caps(j: dict): """holt clock & bittiming_const aus ip -details -json.""" li = (j.get("linkinfo") or {}) info = (li.get("info_data") or {}) btc = info.get("bittiming_const") or {} clock = info.get("clock") limits = { "tseg1_min": (btc.get("tseg1") or {}).get("min", 2), "tseg1_max": (btc.get("tseg1") or {}).get("max", 256), "tseg2_min": (btc.get("tseg2") or {}).get("min", 2), "tseg2_max": (btc.get("tseg2") or {}).get("max", 128), "sjw_min" : (btc.get("sjw") or {}).get("min", 1), "sjw_max" : (btc.get("sjw") or {}).get("max", 128), "brp_min" : (btc.get("brp") or {}).get("min", 1), "brp_max" : (btc.get("brp") or {}).get("max", 512), "brp_inc" : (btc.get("brp_inc") or 1), } return clock, limits def _compute_bittiming(clock_hz: int, bitrate: int, limits: dict, target_sp: float = 0.875): """ Sucht brp, tseg1(=prop_seg+phase_seg1), tseg2, sjw s.d. bitrate ~= clock / (brp * (1 + tseg1 + tseg2)) sample_point ~= (1 + tseg1) / (1 + tseg1 + tseg2) Liefert dict für info_data['bittiming'] (oder None). """ brp_min, brp_max, brp_inc = limits["brp_min"], limits["brp_max"], max(1, limits["brp_inc"]) tseg1_min, tseg1_max = limits["tseg1_min"], limits["tseg1_max"] tseg2_min, tseg2_max = limits["tseg2_min"], limits["tseg2_max"] sjw_max = limits["sjw_max"] best = None best_err = float("inf"); best_sp_err = float("inf") for brp in range(brp_min, brp_max + 1, brp_inc): tq_per_bit_f = clock_hz / (brp * float(bitrate)) tq_min = max(1 + tseg1_min + tseg2_min, int(tq_per_bit_f) - 2) tq_max = min(1 + tseg1_max + tseg2_max, int(tq_per_bit_f) + 2) for tq in range(tq_min, tq_max + 1): tseg1_f = target_sp * tq - 1.0 for tseg1 in (math.floor(tseg1_f), math.ceil(tseg1_f)): tseg1 = int(tseg1) if not (tseg1_min <= tseg1 <= tseg1_max): continue tseg2 = (tq - 1) - tseg1 if not (tseg2_min <= tseg2 <= tseg2_max): continue real_bitrate = clock_hz / (brp * tq) err = abs(real_bitrate - bitrate) sp = (1.0 + tseg1) / tq sp_err = abs(sp - target_sp) if (err < best_err - 1e-6) or (abs(err - best_err) < 1e-6 and sp_err < best_sp_err): best_err, best_sp_err = err, sp_err phase_seg1 = max(1, min(tseg1 - 1, tseg1 // 2)) prop_seg = tseg1 - phase_seg1 sjw = min(sjw_max, max(1, min(tseg2, 16))) best = { "brp": int(brp), "prop_seg": int(prop_seg), "phase_seg1": int(phase_seg1), "phase_seg2": int(tseg2), "sjw": int(sjw), } return best # --------------------------- Link Up / Down -------------------------------- def link_up(iface: str, bitrate: int = 500000, fd: bool = False, set_params: bool = True) -> None: ensure_capabilities() # Snapshot vorher try: j0 = _ip_json(iface) except Exception as e: raise RuntimeError(f"Interface '{iface}' nicht lesbar: {e}") kind = (j0.get("linkinfo") or {}).get("info_kind", "unknown") state = j0.get("operstate", "UNKNOWN").upper() log.info(f"[link_up] iface={iface} kind={kind} state={state} target_bitrate={bitrate}") with IPRoute() as ipr: idx, _, _ = _link_info(ipr, iface) if state == "UP": log.info("[link_up] already UP -> nothing to do") return # 1) DOWN try: ipr.link("set", index=idx, state="down") log.info("[link_up] state down -> OK") except NetlinkError as e: if e.code not in (0, 22): raise RuntimeError(f"Netlink error bei '{iface}' (down): {e.code} {e}") from e # 2) bittiming setzen (nur 'can') if set_params and kind == "can": # Clock & Limits holen caps = _ip_json(iface) clock, limits = _extract_can_caps(caps) if not clock: raise RuntimeError("CAN-Clock nicht gefunden – kann Bit-Timing nicht berechnen.") cfg = _compute_bittiming(clock, int(bitrate), limits, target_sp=0.875) if not cfg: raise RuntimeError("Kein gültiges Bit-Timing innerhalb der Treibergrenzen gefunden.") log.info(f"[link_up] computed bittiming: {cfg} (clock={clock})") # 2a) pyroute2: bittiming setzen try: ipr.link("set", index=idx, kind="can", info_data={"bittiming": cfg}) log.info("[link_up] pyroute2: set bittiming -> OK") except NetlinkError as e: log.error(f"[link_up] pyroute2: set bittiming FAILED: code={e.code} msg={e}") # wir probieren weiter mit CLI-Fallback # 2b) prüfen, ob Kernel bittiming akzeptiert hat bt = _read_bittiming(iface) log.info(f"[link_up] kernel bittiming after pyroute2: {bt}") if not bt or int(bt.get("bitrate", 0)) <= 0: # 2c) CLI-Fallback – GENAU wie dein funktionierender Handtest rc, out, err = _cli(["ip", "link", "set", iface, "down"]) log.info(f"[CLI] ip link set {iface} down (rc={rc})") if out: log.info("[CLI STDOUT] " + out) if err: log.info("[CLI STDERR] " + err) rc, out, err = _cli(["ip", "link", "set", iface, "type", "can", "bitrate", str(int(bitrate))]) log.info(f"[CLI] ip link set {iface} type can bitrate {bitrate} (rc={rc})") if out: log.info("[CLI STDOUT] " + out) if err: log.info("[CLI STDERR] " + err) bt = _read_bittiming(iface) log.info(f"[link_up] kernel bittiming after CLI: {bt}") if not bt or int(bt.get("bitrate", 0)) <= 0: raise RuntimeError("Bit-Timing wurde nicht übernommen (Kernel meldet kein bittiming).") # 3) UP try: ipr.link("set", index=idx, state="up") log.info("[link_up] state up -> OK") except NetlinkError as e: raise RuntimeError(f"Netlink error bei '{iface}' (up): {e.code} {e}") from e def link_down(iface: str) -> None: """Bringt das Interface sicher auf DOWN (idempotent).""" with IPRoute() as ipr: idx, _, _ = _link_info(ipr, iface) try: ipr.link("set", index=idx, state="down") log.info("[link_down] state down -> OK") except NetlinkError as e: if e.code not in (0, 22): raise RuntimeError(f"Netlink error bei '{iface}' (down): {e.code} {e}") from e