Fixed premission-issues with additional script

This commit is contained in:
2025-08-27 19:05:45 +02:00
parent 8195e570b3
commit 4c41be706d
4 changed files with 436 additions and 219 deletions

View File

@@ -1,23 +1,26 @@
# can.py — SocketCAN OBD-II Responder + Link-Control
# app/can.py — SocketCAN Link-Control + Diagnostics (pyroute2) + Helpers
from __future__ import annotations
import logging
import threading
import time
import os
import sys
import subprocess
from typing import Callable, Dict, Optional, List
from pyroute2 import IPRoute, NetlinkError
import json
import can
OBD_REQ_ID = 0x7DF
PID_SPEED = 0x0D
PID_RPM = 0x0C
import json
import logging
import math
import os
import subprocess
import sys
from typing import List, Tuple
import shutil
from pyroute2 import IPRoute, NetlinkError
# Logger (wird von deinem config.setup_logging konfiguriert)
log = logging.getLogger("configapp")
# ------------------------- Capability-Helpers -------------------------------
CAP_NET_ADMIN_BIT = 12 # Linux CAP_NET_ADMIN
def have_cap_netadmin() -> bool:
"""Prüft, ob der aktuelle Prozess CAP_NET_ADMIN effektiv hat (oder root ist)."""
if os.geteuid() == 0:
return True
try:
@@ -33,31 +36,77 @@ def have_cap_netadmin() -> bool:
def need_caps_message() -> str:
exe = os.path.realpath(sys.executable)
return (
"Keine Berechtigung für 'ip link'.\n\n"
"Keine Berechtigung für Netlink-Änderungen.\n\n"
"Option A) Als root starten (sudo)\n"
"Option B) Capabilities auf das laufende Python setzen:\n"
f" sudo setcap cap_net_admin,cap_net_raw=eip \"{exe}\"\n"
f" getcap \"{exe}\"\n"
)
def _ip_json(iface: str) -> dict:
out = subprocess.check_output(["ip", "-details", "-json", "link", "show", iface], text=True)
return json.loads(out)[0]
def _read_bittiming(iface: str) -> dict:
try:
j = _ip_json(iface)
return ((j.get("linkinfo") or {}).get("info_data") or {}).get("bittiming") or {}
except Exception:
return {}
def _cli(cmd: list[str]) -> tuple[int, str, str]:
r = subprocess.run(cmd, capture_output=True, text=True)
return r.returncode, (r.stdout or "").strip(), (r.stderr or "").strip()
def ensure_capabilities() -> None:
"""prüft, ob Python und ip die nötigen Rechte haben; sonst RuntimeError."""
problems = []
if not have_cap_netadmin():
problems.append("• Dem laufenden Python fehlen CAP_NET_ADMIN / CAP_NET_RAW.")
ip_bin = shutil.which("ip")
if not ip_bin:
problems.append("'ip' (iproute2) nicht gefunden.")
else:
try:
out = subprocess.check_output(["getcap", ip_bin], text=True)
if "cap_net_admin" not in out:
problems.append(f"'{ip_bin}' hat keine cap_net_admin/cap_net_raw.")
except Exception:
problems.append("• Konnte Capabilities von 'ip' nicht prüfen.")
if problems:
raise RuntimeError(
"CAN Link-Operation nicht möglich. Bitte führe einmalig:\n"
" sudo prepare_can.sh\n\n"
"Probleme:\n" + "\n".join(problems)
)
# --------------------------- Interface-Helpers ------------------------------
def list_can_ifaces() -> List[str]:
"""Listet verfügbare CAN/vCAN-Interfaces via `ip -json link`."""
"""Listet can*/vcan* Interfaces auf (erst ip -json, dann /sys Fallback)."""
try:
out = subprocess.check_output(["ip", "-json", "link"], text=True)
import json
items = json.loads(out)
names = [i["ifname"] for i in items if i.get("link_type") in ("can", None) and (i["ifname"].startswith("can") or i["ifname"].startswith("vcan"))]
names = [
it["ifname"]
for it in items
if it.get("link_type") in ("can", None)
and (it["ifname"].startswith("can") or it["ifname"].startswith("vcan"))
]
return sorted(set(names))
except Exception:
# Fallback: /sys/class/net
try:
import os
names = [n for n in os.listdir("/sys/class/net") if n.startswith(("can", "vcan"))]
return sorted(set(names))
except Exception:
return []
def _link_info(ipr: IPRoute, iface: str):
def _ip_show_json(iface: str) -> dict:
out = subprocess.check_output(["ip", "-details", "-json", "link", "show", iface], text=True)
return json.loads(out)[0]
def _link_info(ipr: IPRoute, iface: str) -> Tuple[int, str, dict]:
idxs = ipr.link_lookup(ifname=iface)
if not idxs:
raise RuntimeError(f"Interface '{iface}' nicht gefunden")
@@ -72,6 +121,7 @@ def _link_info(ipr: IPRoute, iface: str):
return idxs[0], kind, info
def link_state(iface: str) -> str:
"""Gibt 'UP'/'DOWN'/… zurück."""
try:
with IPRoute() as ipr:
idx, _, info = _link_info(ipr, iface)
@@ -85,51 +135,8 @@ def link_state(iface: str) -> str:
pass
return "UNKNOWN"
def link_up(iface: str, bitrate: int = 500000, fd: bool = False, set_params: bool = True) -> None:
with IPRoute() as ipr:
idx, kind, _ = _link_info(ipr, iface)
# Wenn bereits UP: nichts tun.
try:
cur = ipr.get_links(idx)[0]
if (cur.get("state") or "").upper() == "UP":
return
except Exception:
pass
# 1) down (ignoriere "invalid argument" / already down)
try:
ipr.link("set", index=idx, state="down")
except NetlinkError as e:
if e.code not in (0, 22):
raise RuntimeError(f"Netlink error bei '{iface}' (down): {e.code} {e}") from e
# 2) optional: Parameter nur bei 'can' EINVAL ignorieren
if set_params and kind == "can":
try:
ipr.link("set", index=idx, kind="can", data={"bitrate": int(bitrate)})
except NetlinkError as e:
if e.code != 22: # alles außer EINVAL weiterreichen
raise RuntimeError(f"Netlink error bei '{iface}' (bitrate): {e.code} {e}") from e
# EINVAL -> Treiber mag Param-Change jetzt nicht -> einfach weitermachen
# 3) up (hier darf es notfalls knallen)
try:
ipr.link("set", index=idx, state="up")
except NetlinkError as e:
raise RuntimeError(f"Netlink error bei '{iface}' (up): {e.code} {e}") from e
def link_down(iface: str) -> None:
with IPRoute() as ipr:
idx, _, _ = _link_info(ipr, iface)
try:
ipr.link("set", index=idx, state="down")
except NetlinkError as e:
if e.code not in (0, 22): # ignore EINVAL/OK
raise RuntimeError(f"Netlink error bei '{iface}' (down): {e.code} {e}") from e
def link_kind(iface: str) -> str:
"""liefert nur das INFO_KIND (z.B. 'can', 'slcan', 'vcan', 'none')"""
"""liefert das INFO_KIND (z. B. 'can', 'slcan', 'vcan', 'none')."""
try:
with IPRoute() as ipr:
_, kind, _ = _link_info(ipr, iface)
@@ -137,167 +144,155 @@ def link_kind(iface: str) -> str:
except Exception:
return "unknown"
# ------------------------- Bit-Timing Berechnung ----------------------------
class ObdResponder:
def _extract_can_caps(j: dict):
"""holt clock & bittiming_const aus ip -details -json."""
li = (j.get("linkinfo") or {})
info = (li.get("info_data") or {})
btc = info.get("bittiming_const") or {}
clock = info.get("clock")
limits = {
"tseg1_min": (btc.get("tseg1") or {}).get("min", 2),
"tseg1_max": (btc.get("tseg1") or {}).get("max", 256),
"tseg2_min": (btc.get("tseg2") or {}).get("min", 2),
"tseg2_max": (btc.get("tseg2") or {}).get("max", 128),
"sjw_min" : (btc.get("sjw") or {}).get("min", 1),
"sjw_max" : (btc.get("sjw") or {}).get("max", 128),
"brp_min" : (btc.get("brp") or {}).get("min", 1),
"brp_max" : (btc.get("brp") or {}).get("max", 512),
"brp_inc" : (btc.get("brp_inc") or 1),
}
return clock, limits
def _compute_bittiming(clock_hz: int, bitrate: int, limits: dict, target_sp: float = 0.875):
"""
OBD-II Mode-01 PID-Responder über SocketCAN (11-bit).
Non-blocking, threadsicher, mit Rebind-Funktion und robustem Reopen,
falls das Interface DOWN ist.
Sucht brp, tseg1(=prop_seg+phase_seg1), tseg2, sjw s.d.
bitrate ~= clock / (brp * (1 + tseg1 + tseg2))
sample_point ~= (1 + tseg1) / (1 + tseg1 + tseg2)
Liefert dict für info_data['bittiming'] (oder None).
"""
def __init__(
self,
interface: str,
resp_id: int,
timeout_ms: int = 200,
logger: Optional[logging.Logger] = None,
):
self.interface = interface
self.resp_id = resp_id
self.timeout_ms = timeout_ms
self.log = logger or logging.getLogger("obdcan")
brp_min, brp_max, brp_inc = limits["brp_min"], limits["brp_max"], max(1, limits["brp_inc"])
tseg1_min, tseg1_max = limits["tseg1_min"], limits["tseg1_max"]
tseg2_min, tseg2_max = limits["tseg2_min"], limits["tseg2_max"]
sjw_max = limits["sjw_max"]
# PID-Provider: pid -> callable() -> 8-Byte-Payload
self.providers: Dict[int, Callable[[], bytes]] = {}
best = None
best_err = float("inf"); best_sp_err = float("inf")
# Laufzustand / CAN-Ressourcen
self._run = threading.Event()
self._run.set()
self.bus: Optional[can.BusABC] = None
self.reader: Optional[can.BufferedReader] = None
self.notifier: Optional[can.Notifier] = None
# Service-Thread, der bei IF=UP den Bus öffnet und RX abwickelt
self._thread = threading.Thread(
target=self._service_loop, name="OBD-SVC", daemon=True
)
self._thread.start()
# ---------- Lifecycle ----------
def _open_bus(self) -> None:
# ggf. alte Ressourcen schließen, dann neu öffnen
self._close_bus()
self._close_bus()
self.bus = can.interface.Bus(channel=self.interface, interface="socketcan")
self.log.info("OBD responder started on %s (resp_id=0x%03X)", self.interface, self.resp_id)
self.log.info(
"OBD responder started on %s (resp_id=0x%03X)",
self.interface, self.resp_id
)
def _close_bus(self) -> None:
try:
if self.notifier:
self.notifier.stop()
except Exception:
pass
try:
if self.bus:
self.bus.shutdown()
except Exception:
pass
self.bus = None
def stop(self) -> None:
self._run.clear()
try:
self._thread.join(timeout=1.0)
except RuntimeError:
pass
self._close_bus()
def rebind(self, interface: Optional[str] = None, resp_id: Optional[int] = None) -> None:
if interface is not None:
self.interface = interface
if resp_id is not None:
self.resp_id = resp_id
# Bus schließen; Service-Loop öffnet ihn wieder, sobald IF=UP ist
self._close_bus()
self.log.info("Rebind requested: %s, resp=0x%03X", self.interface, self.resp_id)
# ---------- Öffentliche API ----------
def register_pid(self, pid: int, provider: Callable[[], bytes]) -> None:
self.providers[pid] = provider
# ---------- Service-Loop (robust gegen 'Network is down') ----------
def _service_loop(self) -> None:
backoff = 0.5
while self._run.is_set():
# Bus öffnen, wenn IF up ist
if self.bus is None:
if link_state(self.interface) == "UP":
try:
self._open_bus()
backoff = 0.5
except Exception as e:
self.log.warning("Bus open failed: %s", e)
time.sleep(backoff)
backoff = min(5.0, backoff * 1.7)
continue
else:
time.sleep(0.5)
for brp in range(brp_min, brp_max + 1, brp_inc):
tq_per_bit_f = clock_hz / (brp * float(bitrate))
tq_min = max(1 + tseg1_min + tseg2_min, int(tq_per_bit_f) - 2)
tq_max = min(1 + tseg1_max + tseg2_max, int(tq_per_bit_f) + 2)
for tq in range(tq_min, tq_max + 1):
tseg1_f = target_sp * tq - 1.0
for tseg1 in (math.floor(tseg1_f), math.ceil(tseg1_f)):
tseg1 = int(tseg1)
if not (tseg1_min <= tseg1 <= tseg1_max):
continue
tseg2 = (tq - 1) - tseg1
if not (tseg2_min <= tseg2 <= tseg2_max):
continue
real_bitrate = clock_hz / (brp * tq)
err = abs(real_bitrate - bitrate)
sp = (1.0 + tseg1) / tq
sp_err = abs(sp - target_sp)
if (err < best_err - 1e-6) or (abs(err - best_err) < 1e-6 and sp_err < best_sp_err):
best_err, best_sp_err = err, sp_err
phase_seg1 = max(1, min(tseg1 - 1, tseg1 // 2))
prop_seg = tseg1 - phase_seg1
sjw = min(sjw_max, max(1, min(tseg2, 16)))
best = {
"brp": int(brp),
"prop_seg": int(prop_seg),
"phase_seg1": int(phase_seg1),
"phase_seg2": int(tseg2),
"sjw": int(sjw),
}
return best
# RX: eigene PollSchleife statt Notifier
try:
msg = self.bus.recv(0.05) # blocking short timeout
if msg is not None:
self._handle(msg)
except (can.CanOperationError, OSError):
# IF ging down -> Bus schließen und später neu öffnen
self.log.info("CAN went DOWN — closing bus, will retry…")
self._close_bus()
time.sleep(0.5)
except Exception as e:
self.log.warning("CAN recv error: %s", e)
time.sleep(0.1)
# --------------------------- Link Up / Down --------------------------------
# ---------- Message-Handler ----------
def _handle(self, msg: can.Message) -> None:
if msg.is_extended_id or msg.arbitration_id != OBD_REQ_ID:
return
data = bytes(msg.data)
if len(data) < 3:
return
# tolerant: 02 01 <pid> ... oder 01 <pid> ...
if data[0] == 0x02 and len(data) >= 3:
mode, pid = data[1], data[2]
else:
mode, pid = data[0], (data[1] if len(data) >= 2 else None)
if mode != 0x01 or pid is None:
return
provider = self.providers.get(pid)
if not provider:
def link_up(iface: str, bitrate: int = 500000, fd: bool = False, set_params: bool = True) -> None:
ensure_capabilities()
# Snapshot vorher
try:
j0 = _ip_json(iface)
except Exception as e:
raise RuntimeError(f"Interface '{iface}' nicht lesbar: {e}")
kind = (j0.get("linkinfo") or {}).get("info_kind", "unknown")
state = j0.get("operstate", "UNKNOWN").upper()
log.info(f"[link_up] iface={iface} kind={kind} state={state} target_bitrate={bitrate}")
with IPRoute() as ipr:
idx, _, _ = _link_info(ipr, iface)
if state == "UP":
log.info("[link_up] already UP -> nothing to do")
return
# 1) DOWN
try:
payload = provider()
if not isinstance(payload, (bytes, bytearray)) or len(payload) != 8:
return
out = can.Message(
arbitration_id=self.resp_id,
is_extended_id=False,
data=payload,
dlc=8
)
if self.bus:
self.bus.send(out)
except can.CanError:
self.log.warning("CAN send failed (bus off?)")
except Exception as e:
self.log.exception("Provider error: %s", e)
ipr.link("set", index=idx, state="down")
log.info("[link_up] state down -> OK")
except NetlinkError as e:
if e.code not in (0, 22):
raise RuntimeError(f"Netlink error bei '{iface}' (down): {e.code} {e}") from e
# --- Ende Patch ObdResponder -----------------------------------------------
# 2) bittiming setzen (nur 'can')
if set_params and kind == "can":
# Clock & Limits holen
caps = _ip_json(iface)
clock, limits = _extract_can_caps(caps)
if not clock:
raise RuntimeError("CAN-Clock nicht gefunden kann Bit-Timing nicht berechnen.")
cfg = _compute_bittiming(clock, int(bitrate), limits, target_sp=0.875)
if not cfg:
raise RuntimeError("Kein gültiges Bit-Timing innerhalb der Treibergrenzen gefunden.")
log.info(f"[link_up] computed bittiming: {cfg} (clock={clock})")
# Helfer fürs Formatieren
def make_speed_response(speed_kmh: int) -> bytes:
A = max(0, min(255, int(speed_kmh)))
return bytes([0x03, 0x41, PID_SPEED, A, 0x00, 0x00, 0x00, 0x00])
# 2a) pyroute2: bittiming setzen
try:
ipr.link("set", index=idx, kind="can", info_data={"bittiming": cfg})
log.info("[link_up] pyroute2: set bittiming -> OK")
except NetlinkError as e:
log.error(f"[link_up] pyroute2: set bittiming FAILED: code={e.code} msg={e}")
# wir probieren weiter mit CLI-Fallback
def make_rpm_response(rpm: int) -> bytes:
raw = max(0, int(rpm)) * 4
A = (raw >> 8) & 0xFF
B = raw & 0xFF
return bytes([0x04, 0x41, PID_RPM, A, B, 0x00, 0x00, 0x00])
# 2b) prüfen, ob Kernel bittiming akzeptiert hat
bt = _read_bittiming(iface)
log.info(f"[link_up] kernel bittiming after pyroute2: {bt}")
if not bt or int(bt.get("bitrate", 0)) <= 0:
# 2c) CLI-Fallback GENAU wie dein funktionierender Handtest
rc, out, err = _cli(["ip", "link", "set", iface, "down"])
log.info(f"[CLI] ip link set {iface} down (rc={rc})")
if out: log.info("[CLI STDOUT] " + out)
if err: log.info("[CLI STDERR] " + err)
rc, out, err = _cli(["ip", "link", "set", iface, "type", "can", "bitrate", str(int(bitrate))])
log.info(f"[CLI] ip link set {iface} type can bitrate {bitrate} (rc={rc})")
if out: log.info("[CLI STDOUT] " + out)
if err: log.info("[CLI STDERR] " + err)
bt = _read_bittiming(iface)
log.info(f"[link_up] kernel bittiming after CLI: {bt}")
if not bt or int(bt.get("bitrate", 0)) <= 0:
raise RuntimeError("Bit-Timing wurde nicht übernommen (Kernel meldet kein bittiming).")
# 3) UP
try:
ipr.link("set", index=idx, state="up")
log.info("[link_up] state up -> OK")
except NetlinkError as e:
raise RuntimeError(f"Netlink error bei '{iface}' (up): {e.code} {e}") from e
def link_down(iface: str) -> None:
"""Bringt das Interface sicher auf DOWN (idempotent)."""
with IPRoute() as ipr:
idx, _, _ = _link_info(ipr, iface)
try:
ipr.link("set", index=idx, state="down")
log.info("[link_down] state down -> OK")
except NetlinkError as e:
if e.code not in (0, 22):
raise RuntimeError(f"Netlink error bei '{iface}' (down): {e.code} {e}") from e

View File

@@ -6,15 +6,16 @@ import time
import tkinter as tk
from tkinter import ttk, messagebox
from collections import deque, defaultdict
import subprocess
import can # nur für Trace-Reader
from .config import load_settings, setup_logging, SETTINGS_PATH, APP_ROOT
from .simulator import EcuState, DrivelineModel
from .obd2 import ObdResponder, make_speed_response, make_rpm_response
from .can import (
ObdResponder, make_speed_response, make_rpm_response,
list_can_ifaces, link_up, link_down,
have_cap_netadmin, link_state, link_kind
list_can_ifaces, link_up, link_down, link_state, link_kind,
have_cap_netadmin, need_caps_message
)
@@ -215,6 +216,17 @@ def launch_gui():
# NEU: set_params aus Checkbox
link_up(iface_var.get(), bitrate=br_var.get(), fd=False, set_params=set_params.get())
msg = f"{iface_var.get()} ist UP"
# nach erfolgreichem link_up(...) in gui.py
try:
out = subprocess.check_output(["ip", "-details", "-json", "link", "show", iface_var.get()], text=True)
info = json.loads(out)[0]
bt = (info.get("linkinfo", {}) or {}).get("info_data", {}).get("bittiming") or {}
br = bt.get("bitrate"); sp = bt.get("sample-point")
if br:
messagebox.showinfo("CAN", f"{iface_var.get()} ist UP @ {br} bit/s (sample-point {sp})")
except Exception:
pass
if set_params.get():
msg += f" @ {br_var.get()} bit/s (falls vom Treiber unterstützt)"
else:

154
app/obd2.py Normal file
View File

@@ -0,0 +1,154 @@
# app/obd2.py — OBD-II Responder (SocketCAN) + Payload-Helpers
from __future__ import annotations
import logging
import threading
import time
from typing import Callable, Dict, Optional
import can
from .can import link_state # nur für "IF ist UP?"-Check
# 11-bit OBD-II IDs (ISO 15765-4)
OBD_REQ_ID = 0x7DF
PID_SPEED = 0x0D
PID_RPM = 0x0C
class ObdResponder:
"""
OBD-II Mode 01 PID-Responder (11-bit) über SocketCAN.
- eigener Polling-RX-Loop (kein Notifier -> keine Stacktraces beim Link DOWN)
- rebind(interface/resp_id) schließt Bus; Loop öffnet neu, sobald IF=UP
"""
def __init__(
self,
interface: str,
resp_id: int,
timeout_ms: int = 200,
logger: Optional[logging.Logger] = None,
):
self.interface = interface
self.resp_id = resp_id
self.timeout_ms = timeout_ms
self.log = logger or logging.getLogger("obdcan")
self.providers: Dict[int, Callable[[], bytes]] = {}
self._run = threading.Event(); self._run.set()
self.bus: Optional[can.BusABC] = None
self._thread = threading.Thread(target=self._service_loop, name="OBD-SVC", daemon=True)
self._thread.start()
# ---------- Lifecycle ----------
def _open_bus(self) -> None:
self._close_bus()
self.bus = can.interface.Bus(channel=self.interface, interface="socketcan")
self.log.info("OBD responder started on %s (resp_id=0x%03X)", self.interface, self.resp_id)
def _close_bus(self) -> None:
try:
if self.bus:
self.bus.shutdown()
except Exception:
pass
self.bus = None
def stop(self) -> None:
self._run.clear()
try:
self._thread.join(timeout=1.0)
except RuntimeError:
pass
self._close_bus()
def rebind(self, interface: Optional[str] = None, resp_id: Optional[int] = None) -> None:
if interface is not None:
self.interface = interface
if resp_id is not None:
self.resp_id = resp_id
self._close_bus()
self.log.info("Rebind requested: %s, resp=0x%03X", self.interface, self.resp_id)
# ---------- API ----------
def register_pid(self, pid: int, provider: Callable[[], bytes]) -> None:
self.providers[pid] = provider
# ---------- RX/Service ----------
def _service_loop(self) -> None:
backoff = 0.5
while self._run.is_set():
if self.bus is None:
# Öffnen nur, wenn Interface UP ist
if link_state(self.interface) == "UP":
try:
self._open_bus()
backoff = 0.5
except Exception as e:
self.log.warning("Bus open failed: %s", e)
time.sleep(backoff); backoff = min(5.0, backoff * 1.7)
continue
else:
time.sleep(0.5); continue
try:
msg = self.bus.recv(0.05)
if msg is not None:
self._handle(msg)
except (can.CanOperationError, OSError):
# IF ging down -> Bus schließen und später neu öffnen
self.log.info("CAN went DOWN — closing bus, will retry…")
self._close_bus()
time.sleep(0.5)
except Exception as e:
self.log.warning("CAN recv error: %s", e)
time.sleep(0.1)
def _handle(self, msg: can.Message) -> None:
if msg.is_extended_id or msg.arbitration_id != OBD_REQ_ID:
return
data = bytes(msg.data)
if len(data) < 2:
return
# tolerant: 02 01 <pid> ... oder 01 <pid> ...
if data[0] == 0x02 and len(data) >= 3:
mode, pid = data[1], data[2]
else:
mode, pid = data[0], (data[1] if len(data) >= 2 else None)
if mode != 0x01 or pid is None:
return
provider = self.providers.get(pid)
if not provider:
return
try:
payload = provider()
if not isinstance(payload, (bytes, bytearray)) or len(payload) != 8:
return
out = can.Message(
arbitration_id=self.resp_id,
is_extended_id=False,
data=payload,
dlc=8
)
if self.bus:
self.bus.send(out)
except can.CanError:
self.log.warning("CAN send failed (bus off?)")
except Exception as e:
self.log.exception("Provider error: %s", e)
# ------------------------- Payload-Helpers ----------------------------------
def make_speed_response(speed_kmh: int) -> bytes:
A = max(0, min(255, int(speed_kmh)))
return bytes([0x03, 0x41, PID_SPEED, A, 0x00, 0x00, 0x00, 0x00])
def make_rpm_response(rpm: int) -> bytes:
raw = max(0, int(rpm)) * 4
A = (raw >> 8) & 0xFF
B = raw & 0xFF
return bytes([0x04, 0x41, PID_RPM, A, B, 0x00, 0x00, 0x00])