diff --git a/Reverse-Engineering CAN-Bus/HOW-TO-REVERSE.md b/Reverse-Engineering CAN-Bus/HOW-TO-REVERSE.md new file mode 100644 index 0000000..a1e52b5 --- /dev/null +++ b/Reverse-Engineering CAN-Bus/HOW-TO-REVERSE.md @@ -0,0 +1,90 @@ +# HOW-TO_REVERSE – Praxisleitfaden fürs CAN-Reverse-Engineering + +Dieses How-To ist dein Werkzeugkasten, um aus nackten Frames echte Signale zu destillieren. Es ist kein Orakel, sondern ein **Experimentier-Protokoll**: miss, verifiziere, falsifiziere. Nutze es zusammen mit der GUI/CLI (Splitter, Explorer, Batch-Analyzer, Range-/Unsupervised-Fit). + +--- + +## 0) Vorbereitungen (Daten sammeln wie ein Ingenieur) +- **Zustände trennen**: *Zündung an*, *Motor aus*, *Motor an Leerlauf*, *Schieben*, *aufgebockt Hinterrad drehen*, *kurze Fahrt*. +- **Aktoren toggeln**: Blinker, Bremse, Licht, Lüfter → generiere Ground-Truth für Bits/Flags. +- **Nur RX** analysieren, wenn deine Hardware parallel TX sendet (Störmuster vermeiden). + +--- + +## 1) Frame- & ID-Ebene zuerst +- **Repetition Rate (Hz)**: Zyklische Sensor-IDs senden stabil. + - *Daumenwerte*: WheelSpeed 20–100 Hz, RPM 10–50 Hz, TPS/APS 20–100 Hz, Lenkwinkel 50–100 Hz, Temperaturen 1–10 Hz. +- **Jitter** der Periodizität: Streuung der Inter-Arrival-Times. Niedrig = sauberer Zyklus. +- **DLC-Stabilität**: schwankende Payload-Länge → ggf. ISO-TP / Multiplex. +- **Change-Density**: Anteil Frames mit Payload-Änderung. Zu hoch → Counter/Checksumme, zu niedrig → Status/Träge. + +--- + +## 2) Byte/Bit-Signaturen +- **Bit-Flip-Rate** pro Bit: ~50% → Flag/Event; sehr regelmäßig → Pattern/Timer. +- **Rolling Counter**: 4/8-bit Sequenzen (0..15/255), oft konstant steigend. +- **Checksumme**: Byte hängt deterministisch von anderen Bytes ab; häufig letzte Position. +- **Endianness**: 16-bit LE/BE testen. Monotone Trends/kleine Deltas weisen auf richtige Byteordnung. +- **Quantisierung**: typische Schrittweiten (z. B. 0.5 °C, 0.25 km/h). + +--- + +## 3) Physik als Filter (Slew-Rate & Grenzen) +Miss **ΔWert/Δt** robust (95-Perzentil, nicht Max): +- **Temperaturen**: sehr träge → ΔT/s ≪ 1 °C/s. +- **Fahrgeschwindigkeit**: 0→100 km/h < 1 s unrealistisch; grob ≤ 30–50 km/h/s (Straße). +- **RPM**: schnelle Sprünge möglich, aber nicht teleport. 1k→8k in 1–3 s plausibel. +- **Lenkwinkel**: schnell, aber begrenzt; **Jerk** (ΔΔ/Δt) nicht absurd. + +Alles, was diese Checks bricht, ist selten dein gesuchtes physikalisches Signal (oder deine Skalierung ist falsch). + +--- + +## 4) Korrelation & Kausalität +- **Cross-Korrelation**: RPM ↔ WheelSpeed (Gang drin), Brake-Bit ↔ Pressure, Blinker-Bit ↔ Blinkfrequenz (~1–2 Hz). +- **Gang** aus Ratio (RPM/Speed) ableiten und Kandidaten validieren. +- **Event-Marker** setzen und zeitgleichen Byte-Kipp suchen. + +--- + +## 5) Protokoll & Multiplex +- **ISO-TP**: Muster `0x10 len …` (First Frame), `0x21…` (Consecutive). Enthält selten einzelne Sensorkanäle. +- **Multiplexer**: Ein Byte schaltet die „Seite“ der Payload um. Erkennbar am Sprungverhalten. + +--- + +## 6) Statistik-Fingerabdrücke +- **Unique-Ratio** = |unique|/n. Zu klein → Flag/Konstante; moderat → analog. +- **Entropy** pro Byte → Daten/Checksumme vs. Status. +- **Plateaus/Hysterese**: aktualisiert nur bei Δ≥Schwelle. + +--- + +## 7) Scale/Offset systematisch schätzen +- **Scale-Raster**: Dekaden + praxisnahe Werte (0.0625, 0.1, 0.25, 0.5, 0.75, 1, 2, 5, 10 …). +- **Offset** via **Intervall-Überdeckung**: wähle das Offset, das die meisten Samples in [rmin, rmax] bringt. +- **Vorzeichen prüfen**: signed/unsigned, ggf. negative Scales zulassen. + +--- + +## 8) Workflow-Cookbook +1. **Splitten** (Logs → Traces). +2. **ID-Explorer/Batch**: Periodizität, Change-Density, 8/16-bit Plots. +3. **Range-Fit** mit physikalischen Ranges *und* **Slew-Limits** (Δ/Δt) + **Rate/Jitter-Constraints**. +4. **Cross-Checks**: Kandidaten gegen andere Kanäle testen (RPM↔Speed, Brake↔Pressure). +5. **Iterieren**: Range/Constraints verfeinern, Plots sichten, Hypothesen anpassen. + +--- + +## 9) Typische Fallen +- Falsche Endianness → Teleports. +- Counter/Checksumme im selben 16-bit-Wort → zuerst trennen. +- DLC<8 → 16-bit-Kombis fehlen; keine Dummies einstreuen. +- ×10/×100-Skalierung: Δ/Δt wirkt absurd groß. +- BCD/ASCII in Diag/Odometer – nicht physikalisch. + +--- + +## 10) Ziel: Berichte statt Bauchgefühl +Automatisiere Tests & Schwellen. Lass Skripts einen **Analysebericht** schreiben: „PASS (smooth, low jitter, rate ok)“ vs. „FAIL (jitter, slope99 zu hoch, hit-ratio zu klein)“. +Das minimiert Confirmation Bias – und macht Ergebnisse reproduzierbar. diff --git a/Reverse-Engineering CAN-Bus/main.py b/Reverse-Engineering CAN-Bus/main.py index 1a07227..151bcaf 100644 --- a/Reverse-Engineering CAN-Bus/main.py +++ b/Reverse-Engineering CAN-Bus/main.py @@ -699,95 +699,197 @@ class TabTraceBatch(ttk.Frame): def _append(self, s): self.txt.insert(tk.END, s); self.txt.see(tk.END) - -# ---------------- Tab 4: Range-Fit (supervised + unsupervised) ---------------- +# ---------------- Tab 4: Range-Fit (supervised + unsupervised, mit Physik-Constraints) ---------------- class TabRangeFit(ttk.Frame): def __init__(self, master, state: AppState, header: Header): super().__init__(master, padding=10) self.state = state self.header = header + self._last_outdir = None self._build_ui() def _build_ui(self): self.columnconfigure(0, weight=1) - self.rowconfigure(2, weight=1) + self.rowconfigure(3, weight=1) # unified trace panel (single-select) - self.trace_panel = TracePanel(self, self.state, title="Trace wählen (Single)", single_select=True, height=8) + self.trace_panel = TracePanel(self, self.state, title="Trace wählen (Single)", single_select=True, height=10) self.trace_panel.grid(row=0, column=0, sticky="nsew", padx=5, pady=(5,10)) - # Parameter - frm_params = ttk.LabelFrame(self, text="Range-/Unsupervised-Fit Parameter") + # Parameter Frames + frm_params = ttk.Frame(self) frm_params.grid(row=1, column=0, sticky="nsew", padx=5, pady=5) - for c in (1,3,5): + for c in range(6): frm_params.columnconfigure(c, weight=1) - # Range (leer lassen => unsupervised) - ttk.Label(frm_params, text="Range-Min (leer = unsupervised)").grid(row=0, column=0, sticky="e") + # --- Supervised (Range & Physik) --- + box_sup = ttk.LabelFrame(frm_params, text="Supervised (Range-Fit) – lasse leer für Unsupervised") + box_sup.grid(row=0, column=0, columnspan=6, sticky="nsew", padx=5, pady=5) + for c in range(6): + box_sup.columnconfigure(c, weight=1) + + ttk.Label(box_sup, text="Range-Min").grid(row=0, column=0, sticky="e") self.rmin = tk.StringVar(value="") - ttk.Entry(frm_params, textvariable=self.rmin, width=12).grid(row=0, column=1, sticky="w", padx=5) + ttk.Entry(box_sup, textvariable=self.rmin, width=12).grid(row=0, column=1, sticky="w", padx=5) - ttk.Label(frm_params, text="Range-Max (leer = unsupervised)").grid(row=0, column=2, sticky="e") + ttk.Label(box_sup, text="Range-Max").grid(row=0, column=2, sticky="e") self.rmax = tk.StringVar(value="") - ttk.Entry(frm_params, textvariable=self.rmax, width=12).grid(row=0, column=3, sticky="w", padx=5) + ttk.Entry(box_sup, textvariable=self.rmax, width=12).grid(row=0, column=3, sticky="w", padx=5) - ttk.Label(frm_params, text="Min. Hit-Ratio (Range-Fit)").grid(row=0, column=4, sticky="e") + ttk.Label(box_sup, text="Min. Hit-Ratio (0..1)").grid(row=0, column=4, sticky="e") self.min_hit = tk.DoubleVar(value=0.5) - ttk.Entry(frm_params, textvariable=self.min_hit, width=10).grid(row=0, column=5, sticky="w", padx=5) + ttk.Entry(box_sup, textvariable=self.min_hit, width=10).grid(row=0, column=5, sticky="w", padx=5) - ttk.Label(frm_params, text="Min. Smoothness (Unsupervised)").grid(row=1, column=0, sticky="e") + self.allow_neg = tk.BooleanVar(value=False) + ttk.Checkbutton(box_sup, text="negative Scale erlauben", variable=self.allow_neg).grid(row=1, column=0, columnspan=2, sticky="w") + + ttk.Label(box_sup, text="Rate-Min (Hz)").grid(row=1, column=2, sticky="e") + self.rate_min = tk.StringVar(value="") + ttk.Entry(box_sup, textvariable=self.rate_min, width=10).grid(row=1, column=3, sticky="w", padx=5) + + ttk.Label(box_sup, text="Rate-Max (Hz)").grid(row=1, column=4, sticky="e") + self.rate_max = tk.StringVar(value="") + ttk.Entry(box_sup, textvariable=self.rate_max, width=10).grid(row=1, column=5, sticky="w", padx=5) + + ttk.Label(box_sup, text="Jitter-Max (ms)").grid(row=2, column=0, sticky="e") + self.jitter_max = tk.StringVar(value="") + ttk.Entry(box_sup, textvariable=self.jitter_max, width=10).grid(row=2, column=1, sticky="w", padx=5) + + ttk.Label(box_sup, text="Max-Slope-Abs (phys/s)").grid(row=2, column=2, sticky="e") + self.slope_abs = tk.StringVar(value="") + ttk.Entry(box_sup, textvariable=self.slope_abs, width=12).grid(row=2, column=3, sticky="w", padx=5) + + ttk.Label(box_sup, text="Max-Slope-Frac (/s)").grid(row=2, column=4, sticky="e") + self.slope_frac = tk.StringVar(value="") + ttk.Entry(box_sup, textvariable=self.slope_frac, width=12).grid(row=2, column=5, sticky="w", padx=5) + + ttk.Label(box_sup, text="Slope-Quantile").grid(row=3, column=0, sticky="e") + self.slope_q = tk.DoubleVar(value=0.95) # 0.95 oder 0.99 + ttk.Entry(box_sup, textvariable=self.slope_q, width=10).grid(row=3, column=1, sticky="w", padx=5) + + ttk.Label(box_sup, text="Min-Unique-Ratio").grid(row=3, column=2, sticky="e") + self.min_uniq = tk.StringVar(value="") + ttk.Entry(box_sup, textvariable=self.min_uniq, width=10).grid(row=3, column=3, sticky="w", padx=5) + + # --- Unsupervised --- + box_uns = ttk.LabelFrame(frm_params, text="Unsupervised (ohne Range)") + box_uns.grid(row=1, column=0, columnspan=6, sticky="nsew", padx=5, pady=5) + for c in range(6): + box_uns.columnconfigure(c, weight=1) + + ttk.Label(box_uns, text="Min. Smoothness (0..1)").grid(row=0, column=0, sticky="e") self.min_smooth = tk.DoubleVar(value=0.2) - ttk.Entry(frm_params, textvariable=self.min_smooth, width=10).grid(row=1, column=1, sticky="w", padx=5) + ttk.Entry(box_uns, textvariable=self.min_smooth, width=12).grid(row=0, column=1, sticky="w", padx=5) + + ttk.Label(box_uns, text="Max-Slope-Frac-RAW (/s)").grid(row=0, column=2, sticky="e") + self.max_slope_frac_raw = tk.StringVar(value="") + ttk.Entry(box_uns, textvariable=self.max_slope_frac_raw, width=12).grid(row=0, column=3, sticky="w", padx=5) + + # --- Allgemein/Output --- + box_out = ttk.LabelFrame(frm_params, text="Allgemein & Output") + box_out.grid(row=2, column=0, columnspan=6, sticky="nsew", padx=5, pady=5) + for c in range(6): + box_out.columnconfigure(c, weight=1) self.rx_only = tk.BooleanVar(value=False) - self.neg_scale = tk.BooleanVar(value=False) # nur bei Range-Fit - ttk.Checkbutton(frm_params, text="nur RX", variable=self.rx_only).grid(row=1, column=2, sticky="w") - ttk.Checkbutton(frm_params, text="negative Scale erlauben (Range-Fit)", variable=self.neg_scale).grid(row=1, column=3, sticky="w") + ttk.Checkbutton(box_out, text="nur RX", variable=self.rx_only).grid(row=0, column=0, sticky="w") - ttk.Label(frm_params, text="Plots Top-N").grid(row=1, column=4, sticky="e") + ttk.Label(box_out, text="Plots Top-N").grid(row=0, column=1, sticky="e") self.plots_top = tk.IntVar(value=8) - ttk.Entry(frm_params, textvariable=self.plots_top, width=10).grid(row=1, column=5, sticky="w", padx=5) + ttk.Entry(box_out, textvariable=self.plots_top, width=10).grid(row=0, column=2, sticky="w", padx=5) - ttk.Label(frm_params, text="Output-Label").grid(row=2, column=0, sticky="e") + ttk.Label(box_out, text="Output-Label").grid(row=0, column=3, sticky="e") self.out_label = tk.StringVar(value="rangefit") - ttk.Entry(frm_params, textvariable=self.out_label, width=16).grid(row=2, column=1, sticky="w", padx=5) + ttk.Entry(box_out, textvariable=self.out_label, width=18).grid(row=0, column=4, sticky="w", padx=5) self.use_ts = tk.BooleanVar(value=True) - ttk.Checkbutton(frm_params, text="Zeitstempel-Unterordner", variable=self.use_ts).grid(row=2, column=2, sticky="w", padx=2) + ttk.Checkbutton(box_out, text="Zeitstempel-Unterordner", variable=self.use_ts).grid(row=0, column=5, sticky="w") - # Start + Konsole + # Start + Konsole + Aktionen frm_run = ttk.Frame(self) - frm_run.grid(row=3, column=0, sticky="ew", padx=5, pady=5) + frm_run.grid(row=2, column=0, sticky="ew", padx=5, pady=5) ttk.Button(frm_run, text="Start Range-/Unsupervised-Fit", command=self._on_run).pack(side="left", padx=5) + ttk.Button(frm_run, text="Report öffnen", command=self._open_last_report).pack(side="left", padx=5) + ttk.Button(frm_run, text="Output-Ordner öffnen", command=self._open_last_outdir).pack(side="left", padx=5) frm_out = ttk.LabelFrame(self, text="Ausgabe") - frm_out.grid(row=4, column=0, sticky="nsew", padx=5, pady=5) + frm_out.grid(row=3, column=0, sticky="nsew", padx=5, pady=5) frm_out.columnconfigure(0, weight=1); frm_out.rowconfigure(0, weight=1) - self.txt = tk.Text(frm_out, height=12); self.txt.grid(row=0, column=0, sticky="nsew") + self.txt = tk.Text(frm_out, height=14); self.txt.grid(row=0, column=0, sticky="nsew") sbo = ttk.Scrollbar(frm_out, orient="vertical", command=self.txt.yview); sbo.grid(row=0, column=1, sticky="ns") self.txt.configure(yscrollcommand=sbo.set) + # --- helpers --- def _append(self, s): self.txt.insert(tk.END, s); self.txt.see(tk.END) - def _on_run(self): + def _stamp(self): + import datetime as _dt + return _dt.datetime.now().strftime("%Y%m%d_%H%M%S") + + def _build_outdir(self, supervised: bool) -> Path: + out_root = self.state.analyze_out_root() + label = (self.out_label.get().strip() or ("rangefit" if supervised else "unsupervised")) + stamp = f"{self._stamp()}_{label}" if self.use_ts.get() else label + outdir = out_root / stamp + outdir.mkdir(parents=True, exist_ok=True) + self._last_outdir = outdir + return outdir + + def _selected_trace(self): sel = self.trace_panel.get_selected() if not sel: messagebox.showwarning("Hinweis", "Bitte genau eine .trace-Datei auswählen.") - return + return None if len(sel) != 1: messagebox.showwarning("Hinweis", "Range-Fit benötigt genau eine .trace-Datei (Single-Select).") + return None + return sel[0] + + def _maybe(self, val: str, flag: str, args: list): + v = (val or "").strip() + if v != "": + args += [flag, v] + + def _open_path(self, p: Path): + try: + if sys.platform.startswith("darwin"): + subprocess.Popen(["open", str(p)]) + elif os.name == "nt": + os.startfile(str(p)) # type: ignore + else: + subprocess.Popen(["xdg-open", str(p)]) + except Exception as e: + messagebox.showwarning("Fehler", f"Konnte nicht öffnen:\n{p}\n{e}") + + def _open_last_outdir(self): + if self._last_outdir and self._last_outdir.exists(): + self._open_path(self._last_outdir) + else: + messagebox.showinfo("Hinweis", "Noch kein Output-Ordner vorhanden.") + + def _open_last_report(self): + if not (self._last_outdir and self._last_outdir.exists()): + messagebox.showinfo("Hinweis", "Noch kein Report erzeugt.") + return + # versuche ein *_report.md im letzten Outdir zu finden + md = list(Path(self._last_outdir).glob("*_report.md")) + if not md: + messagebox.showinfo("Hinweis", "Kein Report gefunden.") + return + self._open_path(md[0]) + + def _on_run(self): + trace = self._selected_trace() + if not trace: return - trace = sel[0] + # supervised? rmin = self.rmin.get().strip() rmax = self.rmax.get().strip() supervised = bool(rmin) and bool(rmax) - out_root = self.state.analyze_out_root() - label = (self.out_label.get().strip() or ("rangefit" if supervised else "unsupervised")) - stamp = f"{now_stamp()}_{label}" if self.use_ts.get() else label - outdir = ensure_dir(out_root / stamp) + outdir = self._build_outdir(supervised) cmd = [ sys.executable, @@ -801,25 +903,38 @@ class TabRangeFit(ttk.Frame): if supervised: cmd += ["--rmin", rmin, "--rmax", rmax, "--min-hit", str(self.min_hit.get())] - if self.neg_scale.get(): + if self.allow_neg.get(): cmd.append("--allow-neg-scale") + self._maybe(self.rate_min.get(), "--rate-min", cmd) + self._maybe(self.rate_max.get(), "--rate-max", cmd) + self._maybe(self.jitter_max.get(), "--jitter-max-ms", cmd) + self._maybe(self.slope_abs.get(), "--max-slope-abs", cmd) + self._maybe(self.slope_frac.get(), "--max-slope-frac", cmd) + cmd += ["--slope-quantile", str(self.slope_q.get())] + self._maybe(self.min_uniq.get(), "--min-uniq-ratio", cmd) else: + # unsupervised cmd += ["--min-smooth", str(self.min_smooth.get())] + self._maybe(self.max_slope_frac_raw.get(), "--max-slope-frac-raw", cmd) + cmd += ["--slope-quantile", str(self.slope_q.get())] # wird intern für p95/p99 gewählt - self._run_cmd(cmd) - self._append(f"\nDone. Output: {outdir}\n") + self._append(f"\n>>> RUN: {' '.join(cmd)}\n") + t = threading.Thread(target=self._run_cmd, args=(cmd,), daemon=True) + t.start() def _run_cmd(self, cmd): - self._append(f"\n>>> RUN: {' '.join(cmd)}\n") try: with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) as proc: - for line in proc.stdout: self._append(line) + for line in proc.stdout: + self._append(line) rc = proc.wait() - if rc != 0: self._append(f"[Exit-Code {rc}]\n") + if rc != 0: + self._append(f"[Exit-Code {rc}]\n") + else: + self._append(f"\nDone. Output: {self._last_outdir}\n") except Exception as e: self._append(f"[Fehler] {e}\n") - # ---------------- App Shell ---------------- class App(tk.Tk): def __init__(self): diff --git a/Reverse-Engineering CAN-Bus/trace_signal_fitter.py b/Reverse-Engineering CAN-Bus/trace_signal_fitter.py index 2e611b0..34422e2 100644 --- a/Reverse-Engineering CAN-Bus/trace_signal_fitter.py +++ b/Reverse-Engineering CAN-Bus/trace_signal_fitter.py @@ -1,151 +1,294 @@ #!/usr/bin/env python3 +# -*- coding: utf-8 -*- """ -trace_signal_fitter.py ----------------------- -Zwei Betriebsarten für eine einzelne .trace-Datei: +trace_signal_fitter.py – Advanced Range-/Unsupervised-Fit mit Physik-Constraints & Bericht -1) Range-Fit (überwacht): --rmin/--rmax gesetzt - Sucht für alle 8-bit (D0..D7) und adjazenten 16-bit (LE/BE) eine lineare Abbildung - phys = raw*scale + offset, die möglichst viele Samples in [rmin, rmax] bringt. - Ranking primär nach hit_ratio. - -2) Unsupervised (ohne Range): --rmin/--rmax weggelassen - Findet „plausible“ physikalische Kandidaten nach Glattheit/Varianz/Spannweite/Rate, - ohne Scale/Offset zu schätzen (raw-Werte direkt). Ranking primär nach „smoothness“. +Modi: +1) Range-Fit (supervised): --rmin/--rmax gesetzt → finde scale & offset, maximiere Hit-Ratio in [rmin, rmax]. +2) Unsupervised: ohne Range → plausible Rohsignale nach Smoothness/Var/Rate/Span. +Neu: +- Periodizität: Rate (Hz), Jitter (std der Inter-Arrival-Times), CV. +- Slew-Rate: p95/p99 von |Δ|/s (supervised in phys-Einheit, unsupervised normiert auf Roh-Span). +- Grenzwerte als Argumente (--rate-min/max, --jitter-max-ms, --max-slope-abs, --max-slope-frac, ...). +- Zusätzlich signed 16-bit Varianten (le16s/be16s). +- JSON + Markdown-Bericht pro Trace mit PASS/FAIL und Begründungen. Logformat (Kettenöler): - 0x ... + 0x ... Outputs: -- Range-Fit: _encoding_candidates.csv + optional Plots -- Unsupervised:_unsupervised_candidates.csv + optional Plots - +- supervised: _encoding_candidates.csv, Plots, _report.md, _report.json +- unsupervised: _unsupervised_candidates.csv, Plots, _report.md, _report.json """ -import re +from __future__ import annotations + import sys +import json import argparse from pathlib import Path +from typing import List, Tuple, Dict, Iterable + import numpy as np import pandas as pd import matplotlib.pyplot as plt -LOG_PATTERN = re.compile(r"(\d+)\s+(TX|RX)\s+0x([0-9A-Fa-f]+)\s+\d+\s+((?:[0-9A-Fa-f]{2}\s+)+)") -def parse_trace(path: Path, rx_only=False) -> pd.DataFrame: +# ---------- Parsing ---------- + +def parse_trace(path: Path, rx_only: bool = False) -> pd.DataFrame: + """ + Robustes Parsen des Kettenöler-Formats: + 0x ... (hex) + """ rows = [] with open(path, "r", errors="ignore") as f: for line in f: - m = LOG_PATTERN.match(line) - if not m: + parts = line.strip().split() + if len(parts) < 4: continue - ts = int(m.group(1)); dr = m.group(2) - if rx_only and dr != "RX": + try: + ts = int(parts[0]) + dr = parts[1] + if rx_only and dr != "RX": + continue + cid = int(parts[2], 16) if parts[2].lower().startswith("0x") else int(parts[2], 16) + dlc = int(parts[3]) + bytes_hex = parts[4:4+dlc] if dlc > 0 else [] + data = [] + for b in bytes_hex: + try: + data.append(int(b, 16)) + except Exception: + data.append(0) + rows.append((ts, dr, cid, data)) + except Exception: continue - cid = int(m.group(3), 16) - data = [int(x, 16) for x in m.group(4).split() if x.strip()] - rows.append((ts, dr, cid, data)) - df = pd.DataFrame(rows, columns=["ts","dir","id","data"]) + + df = pd.DataFrame(rows, columns=["ts", "dir", "id", "data"]) if df.empty: return df - df["time_s"] = (df["ts"] - df["ts"].min())/1000.0 + df["time_s"] = (df["ts"] - df["ts"].min()) / 1000.0 return df -def be16(a,b): return (a<<8)|b -def le16(a,b): return a | (b<<8) -def p95_abs_diff(arr: np.ndarray) -> float: +# ---------- Helpers ---------- + +def be16(a: int, b: int) -> int: return (a << 8) | b +def le16(a: int, b: int) -> int: return a | (b << 8) +def s16(u: int) -> int: return u if u < 0x8000 else u - 0x10000 + +def p_quant_abs_diff(arr: np.ndarray, q: float) -> float: if arr.size < 2: return 0.0 d = np.abs(np.diff(arr)) - return float(np.percentile(d, 95)) + return float(np.percentile(d, q * 100)) -def basic_rate(times: np.ndarray) -> float: - if times.size < 2: return 0.0 - dur = times.max() - times.min() - if dur <= 0: return 0.0 - return float(times.size / dur) +def p_quant(arr: np.ndarray, q: float) -> float: + if arr.size == 0: + return 0.0 + return float(np.percentile(arr, q * 100)) -def interval_best_offset(raw: np.ndarray, scale: float, rmin: float, rmax: float): - a = rmin - scale*raw - b = rmax - scale*raw - lo = np.minimum(a,b) - hi = np.maximum(a,b) +def interarrival_metrics(times: np.ndarray) -> Dict[str, float]: + if times.size < 2: + return {"rate_hz": 0.0, "period_mean": 0.0, "period_std": 0.0, "jitter_cv": 0.0, "n": int(times.size)} + dt = np.diff(times) + period_mean = float(np.mean(dt)) + period_std = float(np.std(dt)) + rate_hz = 1.0 / period_mean if period_mean > 0 else 0.0 + jitter_cv = (period_std / period_mean) if period_mean > 0 else 0.0 + return {"rate_hz": rate_hz, "period_mean": period_mean, "period_std": period_std, "jitter_cv": jitter_cv, "n": int(times.size)} + +def slope_metrics(values: np.ndarray, times: np.ndarray) -> Dict[str, float]: + if values.size < 2: + return {"slope_p95": 0.0, "slope_p99": 0.0, "jerk_p95": 0.0} + dv = np.abs(np.diff(values)) + dt = np.diff(times) + # vermeide Division durch 0 + dt = np.where(dt <= 0, np.nan, dt) + slope = dv / dt + slope = slope[~np.isnan(slope)] + if slope.size == 0: + return {"slope_p95": 0.0, "slope_p99": 0.0, "jerk_p95": 0.0} + jerk = np.abs(np.diff(slope)) + return { + "slope_p95": float(np.percentile(slope, 95)), + "slope_p99": float(np.percentile(slope, 99)), + "jerk_p95": float(np.percentile(jerk, 95)) if jerk.size > 0 else 0.0, + } + +def prefilter(vals: np.ndarray) -> Tuple[bool, Dict[str, float]]: + if vals.size < 12: + return False, {"reason": "too_few_samples"} + uniq = np.unique(vals) + if uniq.size <= 2: + return False, {"reason": "too_constant"} + p95 = p_quant_abs_diff(vals, 0.95) + if p95 == 0: + return False, {"reason": "no_changes"} + r = float(np.percentile(vals, 97) - np.percentile(vals, 3) + 1e-9) + if p95 > 0.5 * r: + return False, {"reason": "too_jumpi"} + return True, {"p95_abs_diff": p95, "span_est": r} + +def try_scaleset() -> List[float]: + base = [ + 1e-3, 2e-3, 5e-3, + 1e-2, 2e-2, 5e-2, + 0.05, 0.0625, 0.1, 0.125, 0.2, 0.25, 0.5, + 0.75, 0.8, 1.0, 1.25, 2.0, 5.0, 10.0 + ] + return sorted(set(base)) + +def interval_best_offset(raw: np.ndarray, scale: float, rmin: float, rmax: float) -> Tuple[float, float]: + """ + Finde das Offset, das die meisten Werte (scale*raw + offset) in [rmin, rmax] bringt. + Sweep über Intervallgrenzen (klassische "interval stabbing" Lösung). + """ + a = rmin - scale * raw + b = rmax - scale * raw + lo = np.minimum(a, b) + hi = np.maximum(a, b) events = [] - for L,H in zip(lo,hi): + for L, H in zip(lo, hi): events.append((L, +1)) events.append((H, -1)) events.sort(key=lambda t: (t[0], -t[1])) - best = -1; cur = 0; best_x = None + best = -1 + cur = 0 + best_x = None for x, v in events: cur += v if cur > best: - best = cur; best_x = x - return best_x, float(best)/float(len(raw)) + best = cur + best_x = x + hit_ratio = float(best) / float(len(raw)) if len(raw) else 0.0 + return float(best_x if best_x is not None else 0.0), hit_ratio -def gen_candidates(df: pd.DataFrame): - times = df["time_s"].to_numpy(dtype=float) + +# ---------- Candidate Generation ---------- + +def gen_candidates(df: pd.DataFrame) -> Iterable[Tuple[str, np.ndarray, np.ndarray]]: + """ + Liefert (label, values, times) für: + - 8-bit Bytes D0..D7 + - 16-bit adjazente Paare (LE/BE) + signed Varianten + Times wird auf die gefilterten Indizes gemappt (DLC-abhängig). + """ + times_all = df["time_s"].to_numpy(dtype=float) data = df["data"].tolist() + # 8-bit for i in range(8): - vals = [d[i] for d in data if len(d)>i] - if not vals: continue - yield (f"byte[{i}]", np.array(vals, dtype=float)), times[:len(vals)] - # 16-bit (adjacent) - pairs = [(i,i+1) for i in range(7)] - for i,j in pairs: - vals = [le16(d[i],d[j]) for d in data if len(d)>j] - if vals: - yield (f"le16[{i}-{j}]", np.array(vals, dtype=float)), times[:len(vals)] - vals = [be16(d[i],d[j]) for d in data if len(d)>j] - if vals: - yield (f"be16[{i}-{j}]", np.array(vals, dtype=float)), times[:len(vals)] + idx = [k for k, d in enumerate(data) if len(d) > i] + if len(idx) < 3: + continue + vals = np.array([data[k][i] for k in idx], dtype=float) + t = times_all[idx] + yield f"byte[{i}]", vals, t -def prefilter(vals: np.ndarray): - if vals.size < 12: - return False, {"reason":"too_few_samples"} - uniq = np.unique(vals) - if uniq.size <= 2: - return False, {"reason":"too_constant"} - p95 = p95_abs_diff(vals) - if p95 == 0: - return False, {"reason":"no_changes"} - r = float(np.percentile(vals, 97) - np.percentile(vals, 3) + 1e-9) - if p95 > 0.5*r: - return False, {"reason":"too_jumpi"} - return True, {"p95_abs_diff":p95, "span_est":r} + # 16-bit adjazent + for i in range(7): + j = i + 1 + idx = [k for k, d in enumerate(data) if len(d) > j] + if len(idx) < 3: + continue + a = [data[k][i] for k in idx] + b = [data[k][j] for k in idx] + u_le = np.array([le16(x, y) for x, y in zip(a, b)], dtype=float) + u_be = np.array([be16(x, y) for x, y in zip(a, b)], dtype=float) + s_le = np.array([s16(le16(x, y)) for x, y in zip(a, b)], dtype=float) + s_be = np.array([s16(be16(x, y)) for x, y in zip(a, b)], dtype=float) + t = times_all[idx] + yield f"le16[{i}-{j}]", u_le, t + yield f"be16[{i}-{j}]", u_be, t + yield f"le16s[{i}-{j}]", s_le, t + yield f"be16s[{i}-{j}]", s_be, t -def try_scaleset(): - base = [1e-3, 2e-3, 5e-3, - 1e-2, 2e-2, 5e-2, - 0.1, 0.2, 0.25, 0.5, - 1.0, 2.0, 5.0, 10.0, - 0.0625, 0.125, 0.75, 0.8, 1.25] - return sorted(set(base)) -def evaluate_supervised(label, vals: np.ndarray, times: np.ndarray, rmin: float, rmax: float, allow_neg_scale=False): +# ---------- Evaluation ---------- + +def evaluate_supervised(label: str, + vals: np.ndarray, + times: np.ndarray, + rmin: float, + rmax: float, + allow_neg_scale: bool, + constraints: Dict[str, float]) -> Dict[str, float] | None: ok, meta = prefilter(vals) if not ok: return None + scales = try_scaleset() if allow_neg_scale: - scales = scales + [-s for s in scales if s>0] - best = {"hit_ratio": -1.0} + scales += [-s for s in scales if s > 0] + + best = {"hit_ratio": -1.0, "scale": None, "offset": 0.0} for s in scales: o, hr = interval_best_offset(vals, s, rmin, rmax) if hr > best["hit_ratio"]: - best = {"scale":s, "offset":float(o), "hit_ratio":hr} - phys = vals*best["scale"] + best["offset"] - within = (phys>=rmin) & (phys<=rmax) + best = {"scale": s, "offset": float(o), "hit_ratio": hr} + + phys = vals * best["scale"] + best["offset"] + within = (phys >= rmin) & (phys <= rmax) in_count = int(np.count_nonzero(within)) - p95_raw = p95_abs_diff(vals) - p95_phys = p95_abs_diff(phys) - rate = basic_rate(times[:len(vals)]) + + p95_raw = p_quant_abs_diff(vals, 0.95) + p95_phys = p_quant_abs_diff(phys, 0.95) + + ia = interarrival_metrics(times[:len(vals)]) + sm = slope_metrics(phys, times[:len(phys)]) + + prange = (rmax - rmin) if (rmax > rmin) else 1.0 + slope_p95_frac = sm["slope_p95"] / prange + slope_p99_frac = sm["slope_p99"] / prange + + failures = [] + + if constraints.get("rate_min") is not None and ia["rate_hz"] < constraints["rate_min"] - 1e-9: + failures.append(f"rate {ia['rate_hz']:.2f}Hz < min {constraints['rate_min']:.2f}Hz") + if constraints.get("rate_max") is not None and ia["rate_hz"] > constraints["rate_max"] + 1e-9: + failures.append(f"rate {ia['rate_hz']:.2f}Hz > max {constraints['rate_max']:.2f}Hz") + + if constraints.get("jitter_max_ms") is not None: + jitter_ms = ia["period_std"] * 1000.0 + if jitter_ms > constraints["jitter_max_ms"] + 1e-9: + failures.append(f"jitter {jitter_ms:.1f}ms > max {constraints['jitter_max_ms']:.1f}ms") + + def _resolve_abs_slope_limit(): + if constraints.get("max_slope_abs") is not None: + return constraints["max_slope_abs"] + if constraints.get("max_slope_frac") is not None: + return constraints["max_slope_frac"] * prange + return None + + max_s_abs = _resolve_abs_slope_limit() + if max_s_abs is not None: + q = constraints.get("slope_quantile", 0.95) + qv = sm["slope_p95"] if q <= 0.95 else sm["slope_p99"] + if qv > max_s_abs + 1e-9: + failures.append(f"slope(q={q:.2f}) {qv:.3g} > max {max_s_abs:.3g}") + + uniq_ratio = len(np.unique(vals)) / float(len(vals)) + if constraints.get("min_uniq_ratio") is not None and uniq_ratio < constraints["min_uniq_ratio"] - 1e-9: + failures.append(f"uniq_ratio {uniq_ratio:.3f} < min {constraints['min_uniq_ratio']:.3f}") + + passed = (len(failures) == 0) + + # Quality Score + score = best["hit_ratio"] + if max_s_abs is not None and max_s_abs > 0: + slope_pen = min(sm["slope_p95"] / max_s_abs, 1.0) + score *= (1.0 - 0.3 * slope_pen) + if constraints.get("jitter_max_ms") is not None: + jitter_ms = ia["period_std"] * 1000.0 + jitter_pen = min(jitter_ms / constraints["jitter_max_ms"], 1.0) + score *= (1.0 - 0.2 * jitter_pen) + return { "label": label, "mode": "range_fit", "n": int(vals.size), - "rate_hz_est": float(rate), "raw_min": float(np.min(vals)), "raw_max": float(np.max(vals)), "raw_var": float(np.var(vals)), @@ -158,38 +301,49 @@ def evaluate_supervised(label, vals: np.ndarray, times: np.ndarray, rmin: float, "phys_max": float(np.max(phys)), "p95_absdiff_phys": float(p95_phys), "span_phys": float(np.percentile(phys, 97) - np.percentile(phys, 3)), - "prefilter_span_est": float(meta.get("span_est", 0.0)), - "prefilter_p95_absdiff": float(meta.get("p95_abs_diff", 0.0)), + "rate_hz_est": float(ia["rate_hz"]), + "period_std_ms": float(ia["period_std"] * 1000.0), + "jitter_cv": float(ia["jitter_cv"]), + "slope_p95_per_s": float(sm["slope_p95"]), + "slope_p99_per_s": float(sm["slope_p99"]), + "slope_p95_frac": float(slope_p95_frac), + "slope_p99_frac": float(slope_p99_frac), + "uniq_ratio": float(uniq_ratio), + "passed": bool(passed), + "fail_reasons": "; ".join(failures), + "quality_score": float(score), } -def evaluate_unsupervised(label, vals: np.ndarray, times: np.ndarray, min_smooth=0.2): - """ - Liefert nur Plausibilitätsmetriken (keine scale/offset). - smoothness = 1 - clamp(p95(|Δ|) / span, 0..1) - uniq_ratio = |unique| / n - Ranking: smoothness desc, span desc, var desc, rate desc, n desc - """ +def evaluate_unsupervised(label: str, + vals: np.ndarray, + times: np.ndarray, + min_smooth: float = 0.2, + max_slope_frac_raw: float | None = None, + slope_quantile: float = 0.95) -> Dict[str, float] | None: if vals.size < 12: return None - p95 = p95_abs_diff(vals) + p95 = p_quant_abs_diff(vals, 0.95) span = float(np.percentile(vals, 97) - np.percentile(vals, 3) + 1e-9) - smooth = 1.0 - min(max(p95/span, 0.0), 1.0) - uniq = len(np.unique(vals)) - uniq_ratio = float(uniq) / float(vals.size) + smooth = 1.0 - min(max(p95 / span, 0.0), 1.0) + uniq_ratio = float(len(np.unique(vals))) / float(vals.size) var = float(np.var(vals)) - rate = basic_rate(times[:len(vals)]) - # Filter: zu konstant, zu sprunghaft + ia = interarrival_metrics(times[:len(vals)]) + sm = slope_metrics(vals, times[:len(vals)]) + slope_q = sm["slope_p95"] if slope_quantile <= 0.95 else sm["slope_p99"] + slope_frac_raw = (slope_q / span) if span > 0 else 0.0 + if uniq_ratio <= 0.02: return None if smooth < min_smooth: return None + if (max_slope_frac_raw is not None) and (slope_frac_raw > max_slope_frac_raw): + return None return { "label": label, "mode": "unsupervised", "n": int(vals.size), - "rate_hz_est": float(rate), "raw_min": float(np.min(vals)), "raw_max": float(np.max(vals)), "raw_var": var, @@ -197,119 +351,310 @@ def evaluate_unsupervised(label, vals: np.ndarray, times: np.ndarray, min_smooth "p95_absdiff_raw": float(p95), "smoothness": float(smooth), "uniq_ratio": float(uniq_ratio), + "rate_hz_est": float(ia["rate_hz"]), + "period_std_ms": float(ia["period_std"] * 1000.0), + "jitter_cv": float(ia["jitter_cv"]), + "slope_q_raw": float(slope_q), + "slope_frac_raw": float(slope_frac_raw), } -def plot_timeseries(times, series, out_png: Path, title: str, ylabel: str): - plt.figure(figsize=(10,4)) + +# ---------- Plot & Report ---------- + +def plot_timeseries(times: np.ndarray, series: np.ndarray, out_png: Path, title: str, ylabel: str) -> None: + plt.figure(figsize=(10, 4)) plt.plot(times[:len(series)], series, marker=".", linestyle="-") - plt.xlabel("Zeit (s)"); plt.ylabel(ylabel) - plt.title(title); plt.grid(True); plt.tight_layout() + plt.xlabel("Zeit (s)") + plt.ylabel(ylabel) + plt.title(title) + plt.grid(True) + plt.tight_layout() out_png.parent.mkdir(parents=True, exist_ok=True) - plt.savefig(out_png, dpi=150); plt.close() + plt.savefig(out_png, dpi=150) + plt.close() + +def df_to_md_table(df: pd.DataFrame) -> str: + """Robustes Markdown-Table: nutzt to_markdown falls vorhanden, sonst CSV in Codeblock.""" + try: + return df.to_markdown(index=False) # benötigt evtl. 'tabulate' + except Exception: + return "```\n" + df.to_csv(index=False) + "```" + +def write_report_md(path: Path, header: dict, top_rows: pd.DataFrame, failures: pd.DataFrame, mode: str, links: dict) -> None: + md = [] + md.append(f"# Trace Report – {header.get('trace_name','')}") + md.append("") + md.append(f"- **Mode:** {mode}") + for k, v in header.items(): + if k in ("trace_name",): + continue + md.append(f"- **{k}**: {v}") + md.append("") + + if mode == "range_fit": + md.append("## Top-Kandidaten (Range-Fit)") + md.append("Hit-Ratio, Slope/Jitter & Score – beste zuerst.\n") + if top_rows is not None and not top_rows.empty: + md.append(df_to_md_table(top_rows)) + else: + md.append("_Keine Kandidaten über Schwelle._") + md.append("") + if failures is not None and not failures.empty: + md.append("## Ausgeschlossene Kandidaten (Gründe)\n") + md.append(df_to_md_table(failures[["label", "fail_reasons"]])) + else: + md.append("## Top-Kandidaten (Unsupervised)\n") + if top_rows is not None and not top_rows.empty: + md.append(df_to_md_table(top_rows)) + else: + md.append("_Keine plausiblen Rohsignale._") + + md.append("\n## Artefakte") + for k, v in links.items(): + md.append(f"- **{k}**: `{v}`") + path.write_text("\n".join(md), encoding="utf-8") + + +# ---------- Main ---------- def main(): - ap = argparse.ArgumentParser(description="Finde Encoding-Kandidaten (mit Range) oder plausible Rohsignale (ohne Range) in einer .trace-Datei") - ap.add_argument("trace", help="Pfad zur .trace Datei (aus can_split_by_id.py)") - ap.add_argument("--rmin", type=float, default=None, help="untere Grenze des Zielbereichs (phys)") - ap.add_argument("--rmax", type=float, default=None, help="obere Grenze des Zielbereichs (phys)") - ap.add_argument("--rx-only", action="store_true", help="Nur RX Frames nutzen") - ap.add_argument("--allow-neg-scale", action="store_true", help="Auch negative scale testen (nur Range-Fit)") - ap.add_argument("--outdir", default=".", help="Output-Verzeichnis (CSV/Plots)") - ap.add_argument("--plots-top", type=int, default=8, help="Erzeuge Plots für die Top-N Kandidaten") - ap.add_argument("--min-hit", type=float, default=0.5, help="Mindest-Hit-Ratio für Range-Fit (0..1)") - ap.add_argument("--min-smooth", type=float, default=0.2, help="Mindest-Smoothness für Unsupervised (0..1)") + ap = argparse.ArgumentParser(description="Range-/Unsupervised-Fit mit physikbasierten Constraints + Bericht") + ap.add_argument("trace", help="Pfad zur .trace Datei") + + # supervision + ap.add_argument("--rmin", type=float, default=None) + ap.add_argument("--rmax", type=float, default=None) + ap.add_argument("--allow-neg-scale", action="store_true") + + # shared + ap.add_argument("--rx-only", action="store_true") + ap.add_argument("--outdir", default=".") + ap.add_argument("--plots-top", type=int, default=8) + + # supervised thresholds + ap.add_argument("--min-hit", type=float, default=0.5) + ap.add_argument("--rate-min", type=float, default=None) + ap.add_argument("--rate-max", type=float, default=None) + ap.add_argument("--jitter-max-ms", type=float, default=None) + ap.add_argument("--max-slope-abs", type=float, default=None, help="Max |Δphys|/s (z. B. °C/s, km/h/s)") + ap.add_argument("--max-slope-frac", type=float, default=None, help="Max |Δphys|/s relativ zu (rmax-rmin)") + ap.add_argument("--slope-quantile", type=float, default=0.95, help="0.95 oder 0.99") + ap.add_argument("--min-uniq-ratio", type=float, default=None) + + # unsupervised thresholds + ap.add_argument("--min-smooth", type=float, default=0.2) + ap.add_argument("--max-slope-frac-raw", type=float, default=None, help="roh: (|Δraw|/s)/Span") + args = ap.parse_args() trace = Path(args.trace) df = parse_trace(trace, rx_only=args.rx_only) if df.empty: - print("Keine Daten in Trace.", file=sys.stderr); sys.exit(2) + print("Keine Daten in Trace.", file=sys.stderr) + sys.exit(2) supervised = (args.rmin is not None) and (args.rmax is not None) - results = [] + outdir = Path(args.outdir) + outdir.mkdir(parents=True, exist_ok=True) - for (label, series), times in gen_candidates(df): - if supervised: - r = evaluate_supervised(label, series, times, args.rmin, args.rmax, allow_neg_scale=args.allow_neg_scale) - if r is None: + if supervised: + constraints = { + "rate_min": args.rate_min, + "rate_max": args.rate_max, + "jitter_max_ms": args.jitter_max_ms, + "max_slope_abs": args.max_slope_abs, + "max_slope_frac": args.max_slope_frac, + "slope_quantile": args.slope_quantile, + "min_uniq_ratio": args.min_uniq_ratio, + } + results = [] + rejected = [] + for label, series, times in gen_candidates(df): + r = evaluate_supervised(label, series, times, args.rmin, args.rmax, args.allow_neg_scale, constraints) + if r is None: continue if r["hit_ratio"] >= args.min_hit: - r["trace"] = trace.stem - results.append(r) + (results if r["passed"] else rejected).append({**r, "trace": trace.stem}) + + if not results and not rejected: + print("Keine Kandidaten über Schwelle gefunden.", file=sys.stderr) + sys.exit(3) + + df_ok = pd.DataFrame(results).sort_values( + ["quality_score", "hit_ratio", "p95_absdiff_phys", "rate_hz_est", "n"], + ascending=[False, False, True, False, False] + ) + df_rej = pd.DataFrame(rejected) + + csv_path = outdir / f"{trace.stem}_encoding_candidates.csv" + if not df_ok.empty: + df_ok.to_csv(csv_path, index=False) + print(f"Kandidaten-CSV: {csv_path}") + + # Plots für Top-Kandidaten (oder Rejected, falls keine OK) + top_for_plots = df_ok if not df_ok.empty else df_rej + data = df["data"].tolist() + times_all = df["time_s"].to_numpy(dtype=float) + + def reconstruct_vals(label: str) -> np.ndarray | None: + if label.startswith("byte["): + i = int(label.split("[")[1].split("]")[0]) + idx = [k for k, d in enumerate(data) if len(d) > i] + if not idx: return None + return np.array([data[k][i] for k in idx], dtype=float), times_all[idx] + elif label.startswith(("le16", "be16", "le16s", "be16s")): + signed = label.startswith(("le16s", "be16s")) + i, j = map(int, label.split("[")[1].split("]")[0].split("-")) + idx = [k for k, d in enumerate(data) if len(d) > j] + if not idx: return None + a = [data[k][i] for k in idx] + b = [data[k][j] for k in idx] + if label.startswith("le16"): + v = [le16(x, y) for x, y in zip(a, b)] + else: + v = [be16(x, y) for x, y in zip(a, b)] + if signed: + v = [s16(int(x)) for x in v] + return np.array(v, dtype=float), times_all[idx] + return None + + for _, row in top_for_plots.head(max(1, args.plots_top)).iterrows(): + rec = reconstruct_vals(row["label"]) + if rec is None: + continue + vals, tt = rec + phys = vals * row["scale"] + row["offset"] + out_png = outdir / f"{trace.stem}_{row['label'].replace('[','_').replace(']','')}.png" + plot_timeseries(tt[:len(phys)], phys, out_png, + f"{trace.name} – {row['label']} (scale={row['scale']:.6g}, offset={row['offset']:.6g})", + "phys (geschätzt)") + + # Bericht + hdr = { + "trace_name": trace.name, + "mode": "range_fit", + "rmin": args.rmin, + "rmax": args.rmax, + "min_hit": args.min_hit, + "rate_min": args.rate_min, + "rate_max": args.rate_max, + "jitter_max_ms": args.jitter_max_ms, + "max_slope_abs": args.max_slope_abs, + "max_slope_frac": args.max_slope_frac, + "slope_quantile": args.slope_quantile, + } + top_view = df_ok.head(12)[ + ["label", "quality_score", "hit_ratio", "scale", "offset", + "rate_hz_est", "period_std_ms", "slope_p95_per_s", "slope_p99_per_s", + "p95_absdiff_phys", "uniq_ratio"] + ] if not df_ok.empty else pd.DataFrame() + fail_view = df_rej[["label", "fail_reasons"]] if not df_rej.empty else pd.DataFrame() + + md_path = outdir / f"{trace.stem}_report.md" + json_path = outdir / f"{trace.stem}_report.json" + write_report_md(md_path, hdr, top_view, fail_view, "range_fit", + {"candidates_csv": str(csv_path) if not df_ok.empty else "(leer)"}) + with open(json_path, "w", encoding="utf-8") as f: + json.dump({ + "header": hdr, + "accepted": df_ok.to_dict(orient="records"), + "rejected": df_rej.to_dict(orient="records"), + }, f, ensure_ascii=False, indent=2) + print(f"Report: {md_path}") + print(f"Report JSON: {json_path}") + + if not df_ok.empty: + print("\nTop-Kandidaten:") + cols = ["label", "quality_score", "hit_ratio", "scale", "offset", + "rate_hz_est", "period_std_ms", "slope_p95_per_s", "slope_p99_per_s"] + print(df_ok.head(10)[cols].to_string(index=False)) else: - r = evaluate_unsupervised(label, series, times, min_smooth=args.min_smooth) + print("\nKeine Kandidaten PASS; siehe Gründe in report.") + + else: + # Unsupervised + results = [] + for label, series, times in gen_candidates(df): + r = evaluate_unsupervised(label, series, times, + min_smooth=args.min_smooth, + max_slope_frac_raw=args.max_slope_frac_raw, + slope_quantile=args.slope_quantile) if r is None: continue r["trace"] = trace.stem results.append(r) - if not results: - if supervised: - print("Keine Kandidaten über Schwelle gefunden. Tipp: --min-hit senken oder --allow-neg-scale testen.", file=sys.stderr) - else: + if not results: print("Keine plausiblen Rohsignale gefunden. Tipp: --min-smooth senken.", file=sys.stderr) - sys.exit(3) + sys.exit(3) - outdir = Path(args.outdir); outdir.mkdir(parents=True, exist_ok=True) + df_res = pd.DataFrame(results).sort_values( + ["smoothness", "span_raw", "raw_var", "rate_hz_est", "n"], + ascending=[False, False, False, False, False] + ) - if supervised: - df_res = pd.DataFrame(results).sort_values(["hit_ratio", "p95_absdiff_phys", "rate_hz_est", "n"], ascending=[False, True, False, False]) - csv_path = outdir / f"{trace.stem}_encoding_candidates.csv" - df_res.to_csv(csv_path, index=False) - print(f"Kandidaten-CSV: {csv_path}") - # Plots - for _, row in df_res.head(args.plots_top).iterrows(): - # decode again - times = df["time_s"].to_numpy(dtype=float) - data = df["data"].tolist() - label = row["label"] - if label.startswith("byte["): - i = int(label.split("[")[1].split("]")[0]) - vals = np.array([d[i] for d in data if len(d)>i], dtype=float) - elif label.startswith("le16["): - i,j = map(int, label.split("[")[1].split("]")[0].split("-")) - vals = np.array([le16(d[i],d[j]) for d in data if len(d)>j], dtype=float) - elif label.startswith("be16["): - i,j = map(int, label.split("[")[1].split("]")[0].split("-")) - vals = np.array([be16(d[i],d[j]) for d in data if len(d)>j], dtype=float) - else: - continue - phys = vals*row["scale"] + row["offset"] - out_png = outdir / f"{trace.stem}_{label.replace('[','_').replace(']','')}.png" - plot_timeseries(times[:len(phys)], phys, out_png, f"{trace.name} – {label} (scale={row['scale']:.6g}, offset={row['offset']:.6g})", "phys (geschätzt)") - # console - cols = ["label","hit_ratio","scale","offset","p95_absdiff_phys","rate_hz_est","n","phys_min","phys_max"] - print("\nTop-Kandidaten:") - print(df_res.head(10)[cols].to_string(index=False)) - else: - # Unsupervised - df_res = pd.DataFrame(results).sort_values(["smoothness","span_raw","raw_var","rate_hz_est","n"], ascending=[False, False, False, False, False]) csv_path = outdir / f"{trace.stem}_unsupervised_candidates.csv" df_res.to_csv(csv_path, index=False) print(f"Unsupervised-CSV: {csv_path}") - # Plots - for _, row in df_res.head(max(1, args.plots_top)).iterrows(): - # regenerate series for plot - times = df["time_s"].to_numpy(dtype=float) - data = df["data"].tolist() - label = row["label"] + + # Plots der Top-N (Rohwerte) + data = df["data"].tolist() + times_all = df["time_s"].to_numpy(dtype=float) + + def reconstruct_raw(label: str) -> Tuple[np.ndarray, np.ndarray] | None: if label.startswith("byte["): i = int(label.split("[")[1].split("]")[0]) - vals = np.array([d[i] for d in data if len(d)>i], dtype=float) - elif label.startswith("le16["): - i,j = map(int, label.split("[")[1].split("]")[0].split("-")) - vals = np.array([le16(d[i],d[j]) for d in data if len(d)>j], dtype=float) - elif label.startswith("be16["): - i,j = map(int, label.split("[")[1].split("]")[0].split("-")) - vals = np.array([be16(d[i],d[j]) for d in data if len(d)>j], dtype=float) - else: + idx = [k for k, d in enumerate(data) if len(d) > i] + if not idx: return None + return np.array([data[k][i] for k in idx], dtype=float), times_all[idx] + elif label.startswith(("le16", "be16", "le16s", "be16s")): + signed = label.startswith(("le16s", "be16s")) + i, j = map(int, label.split("[")[1].split("]")[0].split("-")) + idx = [k for k, d in enumerate(data) if len(d) > j] + if not idx: return None + a = [data[k][i] for k in idx] + b = [data[k][j] for k in idx] + if label.startswith("le16"): + v = [le16(x, y) for x, y in zip(a, b)] + else: + v = [be16(x, y) for x, y in zip(a, b)] + if signed: + v = [s16(int(x)) for x in v] + return np.array(v, dtype=float), times_all[idx] + return None + + for _, row in df_res.head(max(1, args.plots_top)).iterrows(): + rec = reconstruct_raw(row["label"]) + if rec is None: continue - out_png = outdir / f"{trace.stem}_{label.replace('[','_').replace(']','')}_raw.png" - plot_timeseries(times[:len(vals)], vals, out_png, f"{trace.name} – {label} (raw)", "raw") - # console - cols = ["label","smoothness","span_raw","raw_var","rate_hz_est","n","uniq_ratio","p95_absdiff_raw"] - print("\nTop plausible Rohsignale:") - print(df_res.head(10)[cols].to_string(index=False)) + vals, tt = rec + out_png = outdir / f"{trace.stem}_{row['label'].replace('[','_').replace(']','')}_raw.png" + plot_timeseries(tt[:len(vals)], vals, out_png, + f"{trace.name} – {row['label']} (raw)", "raw") + + # Bericht + hdr = { + "trace_name": trace.name, + "mode": "unsupervised", + "min_smooth": args.min_smooth, + "max_slope_frac_raw": args.max_slope_frac_raw, + } + top_view = df_res.head(12)[ + ["label", "smoothness", "span_raw", "raw_var", + "rate_hz_est", "period_std_ms", "slope_frac_raw", "uniq_ratio"] + ] + md_path = outdir / f"{trace.stem}_report.md" + json_path = outdir / f"{trace.stem}_report.json" + write_report_md(md_path, hdr, top_view, pd.DataFrame(), "unsupervised", + {"candidates_csv": str(csv_path)}) + with open(json_path, "w", encoding="utf-8") as f: + json.dump({ + "header": hdr, + "accepted": df_res.to_dict(orient="records"), + }, f, ensure_ascii=False, indent=2) + print(f"Report: {md_path}") + print(f"Report JSON: {json_path}") + if __name__ == "__main__": - main() \ No newline at end of file + main()