From 27993d72ee0ed36f7f3f48dc82e7806d55a686ec Mon Sep 17 00:00:00 2001 From: Marcel Peterkau Date: Wed, 27 Aug 2025 23:28:43 +0200 Subject: [PATCH] Added Reverse-Engineering Toolkit --- Reverse-Engineering CAN-Bus/.gitignore | 45 + Reverse-Engineering CAN-Bus/README.md | 272 ++++++ .../can_split_by_id.py | 95 ++ .../can_universal_signal_finder.py | 272 ++++++ .../id_signal_explorer.py | 142 +++ Reverse-Engineering CAN-Bus/main.py | 858 ++++++++++++++++++ Reverse-Engineering CAN-Bus/models/.gitignore | 18 + .../Notes.txt | 12 + .../Triumph Speed Twin 1200 RS (2025).json | 13 + Reverse-Engineering CAN-Bus/requirements.txt | 3 + Reverse-Engineering CAN-Bus/start.sh | 18 + .../trace_batch_analyzer.py | 186 ++++ .../trace_signal_fitter.py | 315 +++++++ 13 files changed, 2249 insertions(+) create mode 100644 Reverse-Engineering CAN-Bus/.gitignore create mode 100644 Reverse-Engineering CAN-Bus/README.md create mode 100644 Reverse-Engineering CAN-Bus/can_split_by_id.py create mode 100644 Reverse-Engineering CAN-Bus/can_universal_signal_finder.py create mode 100644 Reverse-Engineering CAN-Bus/id_signal_explorer.py create mode 100644 Reverse-Engineering CAN-Bus/main.py create mode 100644 Reverse-Engineering CAN-Bus/models/.gitignore create mode 100644 Reverse-Engineering CAN-Bus/models/Triumph Speed Twin 1200 RS (2025)/Notes.txt create mode 100644 Reverse-Engineering CAN-Bus/models/Triumph Speed Twin 1200 RS (2025)/Triumph Speed Twin 1200 RS (2025).json create mode 100644 Reverse-Engineering CAN-Bus/requirements.txt create mode 100755 Reverse-Engineering CAN-Bus/start.sh create mode 100644 Reverse-Engineering CAN-Bus/trace_batch_analyzer.py create mode 100644 Reverse-Engineering CAN-Bus/trace_signal_fitter.py diff --git a/Reverse-Engineering CAN-Bus/.gitignore b/Reverse-Engineering CAN-Bus/.gitignore new file mode 100644 index 0000000..629823c --- /dev/null +++ b/Reverse-Engineering CAN-Bus/.gitignore @@ -0,0 +1,45 @@ +# Python +__pycache__/ +*.py[cod] +*.pyo +*.pyd +*.pkl +*.pklz +*.egg-info/ +*.egg +*.manifest +*.spec + +# Build +build/ +dist/ +.eggs/ + +# Logs (Ordner behalten, Dateien ignorieren) +logs/* +!logs/.gitkeep +*.log + +# Virtual Environments +venv/ +.env/ +.venv/ + +# System-Dateien +.DS_Store +Thumbs.db + +# Backup-Dateien +*.bak +*.tmp +*.swp +*.swo + +# Editor/IDE +.vscode/ +.idea/ + +# Projekt-spezifische +settings.json +settings.json.bak +tmp/ diff --git a/Reverse-Engineering CAN-Bus/README.md b/Reverse-Engineering CAN-Bus/README.md new file mode 100644 index 0000000..65def26 --- /dev/null +++ b/Reverse-Engineering CAN-Bus/README.md @@ -0,0 +1,272 @@ +# Kettenöler – CAN Reverse-Engineering Toolkit + +Toolsuite (GUI + CLI) zum Analysieren von CAN-Logs im **Kettenöler-Format**. +Funktionen: Logs **splitten** (pro CAN-ID), **explorative Visualisierung** (8-/16-Bit, LE/BE), **Batch-Analysen** über viele `.trace`, **Ranking** plausibler Signale und **Range-Fit** (lineare Abbildung `phys = raw*scale + offset`), optional **unsupervised** ohne vorgegebene Range. + +--- + +## Features (Überblick) + +* **Einheitliche GUI** (`main.py`) mit globalem Header (Workdir, Ordnerstruktur, Log-Auswahl). +* **Gemeinsame Trace-Auswahl** in allen Trace-Tabs (gleiches Panel, synchronisiert über Tabs): + + * **ID Explorer** (Multi-Select) + * **Traces Batch-Analyse** (Multi-Select oder kompletter Ordner) + * **Range-Fit** (Single-Select, supervised *oder* unsupervised) +* **Splitter**: Logs → `.trace` pro CAN-ID (`traces/…`, inkl. `overview_ids.csv`). +* **Einzel-ID-Explorer**: Plots aller Byte-Kanäle (8-Bit) und Nachbar-Wortkombis (16-Bit LE/BE) + Kurzstatistik. +* **Batch-Analyzer**: Kennzahlen/Plots für alle `.trace` in einem Ordner, globales Ranking. +* **Range-/Unsupervised-Fit**: + + * *Supervised*: findet `scale` & `offset` für Zielbereich `[rmin, rmax]` (Offset via Intervall-Überdeckung, Scale aus plausibler Menge). + * *Unsupervised*: identifiziert „ruhige“ physikalische Kandidaten ohne Range (Smoothness/Varianz/Rate/Spannweite). +* **Output-Hygiene**: Ergebnisse stets unter `analyze_out/_/…`, optionale Zeitstempel-Unterordner verhindern Überschreiben. +* **Projektdatei** (`Projekt.json`): speichert Workdir, Subfolder, Log-Auswahl, aktiven Traces-Ordner, etc. +* **„Neuester Split“**-Button: springt in den jüngsten Unterordner von `traces/`. + +--- + +## Repository-Komponenten + +* **GUI** + + * `main.py` – zentrales Frontend mit Tabs (Multi-Log Analyse, ID Explorer, Traces Batch-Analyse, Range-Fit). +* **CLI-Tools** + + * `can_split_by_id.py` – Splittet Logs nach CAN-ID → `.trace`. + * `id_signal_explorer.py` – Visualisiert/analysiert eine `.trace` (8-Bit, 16-Bit LE/BE) + `summary_stats.csv`. + * `trace_batch_analyzer.py` – Batch-Analyse für viele `.trace` + globales Ranking. + * `trace_signal_fitter.py` – **Range-Fit** (scale/offset) **oder** **Unsupervised-Fit** (ohne Range). + +> Optional/Alt: `can_universal_signal_finder.py` – ursprünglicher Multi-Log-Analyzer (Ranking auf Rohdatenebene). + +--- + +## Installation + +* **Python** ≥ 3.10 +* Abhängigkeiten: `pandas`, `numpy`, `matplotlib` +* Setup: + + ```bash + python3 -m venv .venv + source .venv/bin/activate # Windows: .venv\Scripts\activate + pip install -r requirements.txt + ``` + +--- + +## Logformat (Kettenöler) + +Eine Zeile pro Frame: + +``` + 0x ... +``` + +Beispiel: + +``` +123456 RX 0x208 8 11 22 33 44 55 66 77 88 +``` + +--- + +## Projekt-/Ordnerstruktur + +Ein **Workdir** bündelt alles zu einem Fahrzeug/Projekt: + +``` +/ + Projekt.json # GUI-Einstellungen + logs/ # Input-Logs + traces/ # per-ID .trace (vom Split) + analyze_out/ # Ergebnisse; je Run eigener Timestamp-Unterordner +``` + +**Namenskonventionen** + +* Split-Ergebnisse: `traces//0x_.trace` +* Outputs: `analyze_out/_/…` + +--- + +## Modelle-Ordner & Git + +Wenn du pro Modell arbeitest, z. B.: + +``` +models/ + Triumph 2023/ + logs/ + traces/ + analyze_out/ + Projekt.json +``` + +Lege in `models/` folgende **`.gitignore`** ab, damit `traces/` und `analyze_out/` **in jedem Modell-Unterordner** ignoriert werden – `logs/` und `.json` bleiben versioniert: + +```gitignore +*/traces/ +*/traces/** +*/analyze_out/ +*/analyze_out/** + +traces/ +traces/** +analyze_out/ +analyze_out/** + +# optional: typos +*/analyze.out/ +*/analyze.out/** +analyze.out/ +analyze.out/** +``` + +Leere Ordner wie `logs/` ggf. mit `.gitkeep` befüllen. + +--- + +## GUI-Benutzung + +```bash +python3 main.py +``` + +### Globaler Header (immer oben) + +* **Workdir** wählen, **Logs scannen** → Liste aller gefundenen Logfiles (Multi-Select). +* Subfolder einstellen: `logs`, `traces`, `analyze_out` (alle **parallel** im Workdir). +* **Projekt speichern/laden** (`Projekt.json`). +* Beim Workdir-Wechsel/Projekt-Laden setzt die GUI den **aktiven Traces-Ordner** automatisch auf `traces/` bzw. den **jüngsten** Unterordner. + +### Einheitliches Trace-Panel (in allen Trace-Tabs) + +* Links: Liste der `.trace` +* Rechts: **Traces-Ordner wählen**, **Workdir/traces**, **Neuester Split**, **Refresh**, (optional **Alle**, **Keine**) +* Änderungen am Ordner/Liste wirken **sofort in allen Tabs**. + +### Tab: Multi-Log Analyse + +* Ranking direkt aus Logs (Include/Exclude-IDs, optional Range mit `scale/offset`). +* Output: `analyze_out/_multilog/…` +* Optional: „Jede Logdatei separat“ → je Log eigener Unterordner. + +### Tab: ID Explorer + +* **Split** (aus Header-Logauswahl): Logs → `.trace` nach `traces[/]`, plus `overview_ids.csv`. + Danach wird der neue Traces-Pfad **automatisch aktiviert**. +* **Einzel-ID Analyse** (Multi-Select): + + * Plots: Byte\[0..7] (8-Bit) + LE/BE für Paare (0-1 … 6-7) + * `summary_stats.csv` pro Trace + * Output: `analyze_out/_id_explore/…` + +### Tab: Traces Batch-Analyse + +* Nutzt die gemeinsame Trace-Liste. +* **Ohne Auswahl** → kompletter Ordner; **mit Auswahl** → es wird ein Subset-Ordner gebaut (Hardlinks/Kopie) und nur dieses analysiert. +* Parameter: `--rx-only`, `scale`, `offset`, `range-min/max`, `top`, `--plots`. +* Output: + + * je Trace: `*_combostats.csv` (+ Plots), + * global: `summary_top_combinations.csv` + * unter `analyze_out/_trace_batch/…` + +### Tab: Range-Fit (Single-Select) + +* **Zwei Modi**: + + 1. **Supervised** (Range-Min/Max gesetzt): findet `scale` & `offset`, maximiert **Hit-Ratio** im Zielbereich. + Output: `_encoding_candidates.csv` + phys-Plots (Top-N). + 2. **Unsupervised** (Range leer): bewertet Kandidaten nach **Smoothness**, **Spannweite**, **Varianz**, **Rate**, **Uniqueness**. + Output: `_unsupervised_candidates.csv` + Roh-Plots (Top-N). +* Optionen: `nur RX`, `negative Scale erlauben` (nur supervised), `Min. Hit-Ratio`, `Min. Smoothness`, `Plots Top-N`, `Output-Label`. +* Output: `analyze_out/_rangefit/…` + +--- + +## CLI-Quickstart + +### 1) Splitten + +```bash +python3 can_split_by_id.py logs/run1.log logs/run2.log \ + --outdir /traces/20250827_1200 \ + --rx-only +``` + +### 2) Einzel-ID-Explorer + +```bash +python3 id_signal_explorer.py /traces/20250827_1200/0x208_run1.trace \ + --outdir /analyze_out/20250827_1210_id_explore +``` + +### 3) Batch-Analyse + +```bash +python3 trace_batch_analyzer.py \ + --traces-dir /traces/20250827_1200 \ + --outdir /analyze_out/20250827_1220_trace_batch \ + --rx-only --plots --top 8 \ + --range-min 31 --range-max 80 +``` + +### 4) Range-/Unsupervised-Fit (eine `.trace`) + +```bash +# Supervised (z. B. Kühlmittel 31..80°C) +python3 trace_signal_fitter.py \ + --rmin 31 --rmax 80 \ + --outdir /analyze_out/20250827_1230_rangefit \ + --plots-top 8 --min-hit 0.5 --allow-neg-scale + +# Unsupervised (ohne Range) +python3 trace_signal_fitter.py \ + --outdir /analyze_out/20250827_1240_unsupervised \ + --plots-top 8 --min-smooth 0.2 +``` + +--- + +## Algorithmen & Heuristiken + +* **Kombinationen**: + + * 8-Bit: `D0..D7` + * 16-Bit (adjazent): LE & BE für Paare `(0,1)…(6,7)` + *(32-Bit & bit-gepackte Felder: auf der Roadmap)* + +* **Prefilter** (für „ruhige“ physikalische Größen): + Mindestanzahl Samples, nicht (nahezu) konstant, keine exzessiven Sprünge (p95 der |Δ| relativ zur Spannweite). + +* **Range-Fit**: + Für jeden Kandidaten `raw` wird über eine Menge plausibler **Scales** gesucht; für jedes `scale` wird das **Offset** via **Intervall-Überdeckung** bestimmt (`rmin ≤ scale*raw_i + offset ≤ rmax`). Ranking: Hit-Ratio ↓, dann Glattheit (p95 phys) ↑, Rate ↓, n ↓. + +* **Unsupervised**: + **Smoothness** = `1 − clamp(p95(|Δ|)/span, 0..1)`; zusätzlich **span**, **var**, **rate**, **uniq\_ratio**. Ranking auf diese Metriken. + +--- + +## Tipps & Troubleshooting + +* **Keine Kandidaten (Range-Fit)**: `--min-hit` senken, `--allow-neg-scale` testen, Range prüfen, längeres/variableres Log nutzen. +* **Alles wird gefiltert (Unsupervised)**: `--min-smooth` senken; ggf. `--rx-only` aktivieren. +* **Leere/komische Plots**: DLC < 8 → teils keine 16-Bit-Kombis; Frames sehr selten → Rate niedrig. +* **Ordner stets sauber**: Zeitstempel-Unterordner aktiv lassen; pro Run eigene Artefakte. + +--- + +## Roadmap + +* 32-Bit-Kombinationen, bit-gepackte Felder. +* Histogramme, Autokorrelation, Ausreißer-Detektoren. +* vordefinierte Signal-Profile (z. B. *WheelSpeed*, *CoolantTemp*). + +--- + +## Lizenz / Haftung + +Nur zu Analyse-/Reverse-Engineering-Zwecken. Nutzung auf eigene Verantwortung. diff --git a/Reverse-Engineering CAN-Bus/can_split_by_id.py b/Reverse-Engineering CAN-Bus/can_split_by_id.py new file mode 100644 index 0000000..ec8969a --- /dev/null +++ b/Reverse-Engineering CAN-Bus/can_split_by_id.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python3 +import re +import sys +import argparse +from pathlib import Path +from collections import defaultdict + +LOG_PATTERN = re.compile(r"(\d+)\s+(TX|RX)\s+0x([0-9A-Fa-f]+)\s+(\d+)\s+((?:[0-9A-Fa-f]{2}\s+)+)") + +def main(): + ap = argparse.ArgumentParser(description="Split Kettenöler CAN log(s) into per-ID .trace files and build an overview") + ap.add_argument("logs", nargs="+", help="Input log file(s)") + ap.add_argument("--outdir", default="traces", help="Output directory for per-ID trace files") + ap.add_argument("--rx-only", action="store_true", help="Keep only RX frames in traces and stats") + args = ap.parse_args() + + outdir = Path(args.outdir) + outdir.mkdir(parents=True, exist_ok=True) + + writers = {} + stats = defaultdict(lambda: { + "id_hex": None, "rx":0, "tx":0, "count":0, "first_ts":None, "last_ts":None, + "first_file":None, "dlc_set": set() + }) + + def get_writer(can_id_hex: str, src_name: str): + # filename pattern: 0xID_.trace + safe_src = Path(src_name).name + fn = outdir / f"{can_id_hex}_{safe_src}.trace" + if fn not in writers: + writers[fn] = fn.open("a", encoding="utf-8") + return writers[fn] + + total = 0 + written = 0 + for p in args.logs: + with open(p, "r", errors="ignore") as f: + for line in f: + m = LOG_PATTERN.match(line) + if not m: + continue + ts = int(m.group(1)) + dr = m.group(2) + cid_hex = m.group(3).upper() + dlc = int(m.group(4)) + data = m.group(5) + + total += 1 + if args.rx_only and dr != "RX": + continue + + key = int(cid_hex, 16) + s = stats[key] + s["id_hex"] = f"0x{cid_hex}" + s["count"] += 1 + s["rx"] += 1 if dr == "RX" else 0 + s["tx"] += 1 if dr == "TX" else 0 + s["first_ts"] = ts if s["first_ts"] is None else min(s["first_ts"], ts) + s["last_ts"] = ts if s["last_ts"] is None else max(s["last_ts"], ts) + s["first_file"] = s["first_file"] or Path(p).name + s["dlc_set"].add(dlc) + + w = get_writer(f"0x{cid_hex}", Path(p).name) + w.write(line) + written += 1 + + for fh in writers.values(): + fh.close() + + # build overview CSV + import pandas as pd + rows = [] + for cid, s in stats.items(): + dur_ms = 0 if s["first_ts"] is None else (s["last_ts"] - s["first_ts"]) + rate_hz = (s["rx"] if args.rx_only else s["count"]) / (dur_ms/1000.0) if dur_ms > 0 else 0.0 + rows.append({ + "id_dec": cid, + "id_hex": s["id_hex"], + "count": s["count"], + "rx": s["rx"], + "tx": s["tx"], + "duration_s": round(dur_ms/1000.0, 6), + "rate_hz_est": round(rate_hz, 6), + "first_file": s["first_file"], + "dlc_variants": ",".join(sorted(str(x) for x in s["dlc_set"])), + }) + df = pd.DataFrame(rows).sort_values(["rate_hz_est","count"], ascending=[False, False]) + csv_path = outdir / "overview_ids.csv" + df.to_csv(csv_path, index=False) + + print(f"Done. Parsed {total} lines, wrote {written} lines into per-ID traces at {outdir}.") + print(f"Overview: {csv_path}") + +if __name__ == "__main__": + main() diff --git a/Reverse-Engineering CAN-Bus/can_universal_signal_finder.py b/Reverse-Engineering CAN-Bus/can_universal_signal_finder.py new file mode 100644 index 0000000..f3100bd --- /dev/null +++ b/Reverse-Engineering CAN-Bus/can_universal_signal_finder.py @@ -0,0 +1,272 @@ + +#!/usr/bin/env python3 +import re +import sys +import argparse +from pathlib import Path +from typing import List, Tuple, Optional, Dict +import pandas as pd +import numpy as np +import matplotlib.pyplot as plt + +LOG_PATTERN = re.compile(r"(\d+)\s+(TX|RX)\s+0x([0-9A-Fa-f]+)\s+\d+\s+((?:[0-9A-Fa-f]{2}\s+)+)") + +def parse_log(path: Path) -> pd.DataFrame: + rows = [] + with open(path, "r", errors="ignore") as f: + for line in f: + m = LOG_PATTERN.match(line) + if not m: + continue + ts = int(m.group(1)) + direction = m.group(2) + can_id = int(m.group(3), 16) + data = [int(x, 16) for x in m.group(4).split() if x.strip()] + rows.append((path.name, ts, direction, can_id, data)) + df = pd.DataFrame(rows, columns=["file","ts","dir","id","data"]) + if df.empty: + return df + # time base per file → seconds from file start + df["time_s"] = df.groupby("file")["ts"].transform(lambda s: (s - s.min())/1000.0) + return df + +def le16(data: List[int], offset: int) -> Optional[int]: + if len(data) < offset+2: + return None + return data[offset] | (data[offset+1] << 8) + +def be16(data: List[int], offset: int) -> Optional[int]: + if len(data) < offset+2: + return None + return (data[offset] << 8) | data[offset+1] + +def phys(val: float, scale: float, offs: float) -> float: + return val*scale + offs + +def decode_series(arr_data: List[List[int]], endian: str, offset: int) -> List[Optional[int]]: + out = [] + for d in arr_data: + v = le16(d, offset) if endian == "le" else be16(d, offset) + out.append(v) + return out + +def score_values(vals: np.ndarray) -> Dict[str, float]: + if len(vals) < 3: + return {"variance":0.0, "changes":0, "unique_ratio":0.0} + var = float(np.var(vals)) + changes = int(np.count_nonzero(np.diff(vals))) + unique_ratio = len(set(vals.tolist()))/len(vals) + return {"variance":var, "changes":changes, "unique_ratio":unique_ratio} + +def analyze(df: pd.DataFrame, include_ids: Optional[List[int]], exclude_ids: Optional[List[int]]): + # Group by ID and try each 16-bit word + combos = [] + ids = sorted(df["id"].unique().tolist()) + if include_ids: + ids = [i for i in ids if i in include_ids] + if exclude_ids: + ids = [i for i in ids if i not in exclude_ids] + + for cid in ids: + grp = df[df["id"]==cid] + for endian in ("le","be"): + for off in (0,2,4,6): + dec = decode_series(grp["data"].tolist(), endian, off) + # filter Nones + pairs = [(t, v) for t, v in zip(grp["time_s"].tolist(), dec) if v is not None] + if len(pairs) < 4: + continue + times = np.array([p[0] for p in pairs], dtype=float) + vals = np.array([p[1] for p in pairs], dtype=float) + sc = score_values(vals) + combos.append({ + "id": cid, + "endian": endian, + "offset": off, + "n": len(vals), + "variance": sc["variance"], + "changes": sc["changes"], + "unique_ratio": sc["unique_ratio"], + "rate_hz": float(len(vals)) / (times.max()-times.min()+1e-9) + }) + cand_df = pd.DataFrame(combos) + return cand_df + +def range_filter_stats(vals: np.ndarray, scale: float, offs: float, rmin: Optional[float], rmax: Optional[float]) -> Dict[str, float]: + if vals.size == 0: + return {"hit_ratio":0.0, "min_phys":np.nan, "max_phys":np.nan} + phys_vals = vals*scale + offs + if rmin is None and rmax is None: + return {"hit_ratio":1.0, "min_phys":float(np.min(phys_vals)), "max_phys":float(np.max(phys_vals))} + mask = np.ones_like(phys_vals, dtype=bool) + if rmin is not None: + mask &= (phys_vals >= rmin) + if rmax is not None: + mask &= (phys_vals <= rmax) + hit_ratio = float(np.count_nonzero(mask))/len(phys_vals) + return {"hit_ratio":hit_ratio, "min_phys":float(np.min(phys_vals)), "max_phys":float(np.max(phys_vals))} + +def export_candidate_timeseries(df: pd.DataFrame, cid: int, endian: str, off: int, scale: float, offs: float, outdir: Path, basename_hint: str): + sub = df[df["id"]==cid].copy() + if sub.empty: + return False, None + dec = decode_series(sub["data"].tolist(), endian, off) + sub["raw16"] = dec + sub = sub.dropna(subset=["raw16"]).copy() + if sub.empty: + return False, None + + sub["phys"] = sub["raw16"].astype(float)*scale + offs + # Save CSV + csv_path = outdir / f"{basename_hint}_0x{cid:X}_{endian}_off{off}.csv" + sub[["file","time_s","id","raw16","phys"]].to_csv(csv_path, index=False) + + # Plot (single-plot image) + plt.figure(figsize=(10,5)) + plt.plot(sub["time_s"].to_numpy(), sub["phys"].to_numpy(), marker="o") + plt.xlabel("Zeit (s)") + plt.ylabel("Wert (phys)") + plt.title(f"{basename_hint} 0x{cid:X} ({endian} @ +{off})") + plt.grid(True) + plt.tight_layout() + img_path = outdir / f"{basename_hint}_0x{cid:X}_{endian}_off{off}.png" + plt.savefig(img_path, dpi=150) + plt.close() + return True, (csv_path, img_path) + +def main(): + ap = argparse.ArgumentParser(description="Universal CAN signal finder (WheelSpeed etc.) for Kettenöler logs") + ap.add_argument("logs", nargs="+", help="Log-Dateien (gleiche Struktur wie Kettenöler)") + ap.add_argument("--outdir", default="analyze_out", help="Ausgabeverzeichnis") + ap.add_argument("--top", type=int, default=20, help="Top-N Kandidaten global (nach Variance) exportieren, falls Range-Filter nichts findet") + ap.add_argument("--include-ids", default="", help="Nur diese IDs (kommagetrennt, z.B. 0x208,0x209)") + ap.add_argument("--exclude-ids", default="", help="Diese IDs ausschließen (kommagetrennt)") + ap.add_argument("--scale", type=float, default=1.0, help="Skalierung: phys = raw*scale + offset") + ap.add_argument("--offset", type=float, default=0.0, help="Offset: phys = raw*scale + offset") + ap.add_argument("--range-min", type=float, default=None, help="Min physischer Zielbereich (nach Scale/Offset)") + ap.add_argument("--range-max", type=float, default=None, help="Max physischer Zielbereich (nach Scale/Offset)") + ap.add_argument("--range-hit-ratio", type=float, default=0.6, help="Mindestanteil der Werte im Zielbereich [0..1]") + ap.add_argument("--per-id-limit", type=int, default=2, help="Max Anzahl Dekodierungen pro ID (z.B. beste zwei Offsets/Endianness)") + + args = ap.parse_args() + + # Parse include/exclude lists + def parse_ids(s: str): + if not s.strip(): + return None + out = [] + for tok in s.split(","): + tok = tok.strip() + if not tok: + continue + if tok.lower().startswith("0x"): + out.append(int(tok,16)) + else: + out.append(int(tok)) + return out + + include_ids = parse_ids(args.include_ids) + exclude_ids = parse_ids(args.exclude_ids) + + # Load logs + frames = [] + for p in args.logs: + df = parse_log(Path(p)) + if df.empty: + print(f"Warn: {p} ergab keine Daten oder passte nicht zum Muster.", file=sys.stderr) + else: + frames.append(df) + if not frames: + print("Keine Daten.", file=sys.stderr) + sys.exit(2) + + df_all = pd.concat(frames, ignore_index=True) + outdir = Path(args.outdir) + outdir.mkdir(parents=True, exist_ok=True) + + # Analyze all combos + cand = analyze(df_all, include_ids, exclude_ids) + if cand.empty: + print("Keine dekodierbaren 16-bit Felder gefunden.", file=sys.stderr) + sys.exit(3) + + # Range filter pass + cand = cand.sort_values(["variance","changes","unique_ratio"], ascending=[False, False, False]).reset_index(drop=True) + + # For each candidate row, compute range-hit stats + hits = [] + for _, row in cand.iterrows(): + cid = int(row["id"]) + endian = row["endian"] + off = int(row["offset"]) + + sub = df_all[df_all["id"]==cid] + dec = decode_series(sub["data"].tolist(), endian, off) + vals = np.array([v for v in dec if v is not None], dtype=float) + if vals.size == 0: + continue + rng = range_filter_stats(vals, args.scale, args.offset, args.range_min, args.range_max) + hits.append((rng["hit_ratio"], rng["min_phys"], rng["max_phys"])) + if hits: + cand[["hit_ratio","min_phys","max_phys"]] = pd.DataFrame(hits, index=cand.index) + else: + cand["hit_ratio"] = 0.0 + cand["min_phys"] = np.nan + cand["max_phys"] = np.nan + + # Export global candidate table + cand_out = outdir / "candidates_global.csv" + cand.to_csv(cand_out, index=False) + print(f"Globales Kandidaten-CSV: {cand_out}") + + # Decide which candidates to export as timeseries + selected = [] + if args.range_min is not None or args.range_max is not None: + # choose those meeting ratio threshold; group by ID and take best few per ID + ok = cand[cand["hit_ratio"] >= args.range_hit_ratio].copy() + if ok.empty: + print("Range-Filter hat keine Kandidaten gefunden; falle zurück auf Top-N nach Varianz.", file=sys.stderr) + else: + # per ID, take best by hit_ratio then variance + for cid, grp in ok.groupby("id"): + grp = grp.sort_values(["hit_ratio","variance","changes","unique_ratio"], ascending=[False, False, False, False]) + selected.extend(grp.head(args.per_id_limit).to_dict("records")) + if not selected: + # fallback → global top-N by variance (limit per ID) + per_id_count = {} + for _, row in cand.iterrows(): + cid = int(row["id"]); per_id_count.setdefault(cid,0) + if len(selected) >= args.top: + break + if per_id_count[cid] >= args.per_id_limit: + continue + selected.append(row.to_dict()) + per_id_count[cid] += 1 + + # Export per-candidate CSVs and plots + exp_index = [] + base_hint = "decoded" + for row in selected: + cid = int(row["id"]) + endian = row["endian"] + off = int(row["offset"]) + ok, pair = export_candidate_timeseries(df_all, cid, endian, off, args.scale, args.offset, outdir, base_hint) + if ok and pair: + exp_index.append({ + "id": cid, + "endian": endian, + "offset": off, + "csv": str(pair[0]), + "plot": str(pair[1]) + }) + + idx_df = pd.DataFrame(exp_index) + idx_path = outdir / "exports_index.csv" + idx_df.to_csv(idx_path, index=False) + print(f"Export-Index: {idx_path}") + + print("Fertig. Tipp: Mit --range-min/--range-max und --scale/--offset kannst du auf plausible physikalische Bereiche filtern.") + print("Beispiel: --scale 0.01 --range-min 0 --range-max 250 (wenn raw≈cm/s → km/h)") + +if __name__ == "__main__": + main() diff --git a/Reverse-Engineering CAN-Bus/id_signal_explorer.py b/Reverse-Engineering CAN-Bus/id_signal_explorer.py new file mode 100644 index 0000000..54eb7c3 --- /dev/null +++ b/Reverse-Engineering CAN-Bus/id_signal_explorer.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python3 +import re +import sys +import argparse +from pathlib import Path +import pandas as pd +import numpy as np +import matplotlib.pyplot as plt + +LOG_PATTERN = re.compile(r"(\d+)\s+(TX|RX)\s+0x([0-9A-Fa-f]+)\s+\d+\s+((?:[0-9A-Fa-f]{2}\s+)+)") + +def parse_trace(path: Path) -> pd.DataFrame: + rows = [] + with open(path, "r", errors="ignore") as f: + for line in f: + m = LOG_PATTERN.match(line) + if not m: + continue + ts = int(m.group(1)) + direction = m.group(2) + can_id = int(m.group(3), 16) + data = [int(x, 16) for x in m.group(4).split() if x.strip()] + rows.append((ts, direction, can_id, data)) + df = pd.DataFrame(rows, columns=["ts","dir","id","data"]) + if df.empty: + return df + df["time_s"] = (df["ts"] - df["ts"].min())/1000.0 + return df + +def be16(b): + return (b[0]<<8) | b[1] + +def le16(b): + return b[0] | (b[1]<<8) + +def main(): + ap = argparse.ArgumentParser(description="Per-ID explorer: generate plots for 8-bit and 16-bit combinations") + ap.add_argument("trace", help="Single-ID .trace file (from can_split_by_id.py)") + ap.add_argument("--outdir", default=None, help="Output directory; default: _explore") + ap.add_argument("--prefix", default="viz", help="File prefix for exports") + ap.add_argument("--rx-only", action="store_true", help="Use only RX frames") + args = ap.parse_args() + + trace = Path(args.trace) + df = parse_trace(trace) + if df.empty: + print("No data in trace.", file=sys.stderr) + sys.exit(1) + + if args.rx_only: + df = df[df["dir"]=="RX"].copy() + if df.empty: + print("No RX frames.", file=sys.stderr) + sys.exit(2) + + outdir = Path(args.outdir) if args.outdir else trace.with_suffix("").parent / (trace.stem + "_explore") + outdir.mkdir(parents=True, exist_ok=True) + + # --- 8-bit channels --- + for idx in range(8): + vals = [d[idx] if len(d)>idx else None for d in df["data"].tolist()] + times = [t for t, v in zip(df["time_s"].tolist(), vals) if v is not None] + series = [v for v in vals if v is not None] + if not series: + continue + plt.figure(figsize=(10,4)) + plt.plot(times, series, marker=".", linestyle="-") + plt.xlabel("Zeit (s)") + plt.ylabel(f"Byte[{idx}] (8-bit)") + plt.title(f"{trace.name} – 8-bit Byte {idx}") + plt.grid(True) + fn = outdir / f"{args.prefix}_byte{idx}.png" + plt.tight_layout() + plt.savefig(fn, dpi=150) + plt.close() + + # --- 16-bit combos --- + pairs = [(i,i+1) for i in range(7)] + # LE + for i,j in pairs: + times, series = [], [] + for t, d in zip(df["time_s"].tolist(), df["data"].tolist()): + if len(d) > j: + series.append(le16([d[i], d[j]])); times.append(t) + if not series: + continue + plt.figure(figsize=(10,4)) + plt.plot(times, series, marker=".", linestyle="-") + plt.xlabel("Zeit (s)") + plt.ylabel(f"LE16 @{i}-{j}") + plt.title(f"{trace.name} – LE16 Bytes {i}-{j}") + plt.grid(True) + fn = outdir / f"{args.prefix}_le16_{i}-{j}.png" + plt.tight_layout() + plt.savefig(fn, dpi=150) + plt.close() + + # BE + for i,j in pairs: + times, series = [], [] + for t, d in zip(df["time_s"].tolist(), df["data"].tolist()): + if len(d) > j: + series.append(be16([d[i], d[j]])); times.append(t) + if not series: + continue + plt.figure(figsize=(10,4)) + plt.plot(times, series, marker=".", linestyle="-") + plt.xlabel("Zeit (s)") + plt.ylabel(f"BE16 @{i}-{j}") + plt.title(f"{trace.name} – BE16 Bytes {i}-{j}") + plt.grid(True) + fn = outdir / f"{args.prefix}_be16_{i}-{j}.png" + plt.tight_layout() + plt.savefig(fn, dpi=150) + plt.close() + + # Summary stats + stats = [] + # 8-bit stats + for idx in range(8): + vals = [d[idx] if len(d)>idx else None for d in df["data"].tolist()] + vals = [v for v in vals if v is not None] + if not vals: + continue + arr = np.array(vals, dtype=float) + stats.append({"type":"byte8", "slot":idx, "min":float(arr.min()), "max":float(arr.max()), "var":float(arr.var())}) + # 16-bit stats + for i,j in pairs: + vals = [le16([d[i],d[j]]) for d in df["data"].tolist() if len(d)>j] + if vals: + arr = np.array(vals, dtype=float) + stats.append({"type":"le16", "slot":f"{i}-{j}", "min":float(arr.min()), "max":float(arr.max()), "var":float(arr.var())}) + vals = [be16([d[i],d[j]]) for d in df["data"].tolist() if len(d)>j] + if vals: + arr = np.array(vals, dtype=float) + stats.append({"type":"be16", "slot":f"{i}-{j}", "min":float(arr.min()), "max":float(arr.max()), "var":float(arr.var())}) + + pd.DataFrame(stats).to_csv(outdir / "summary_stats.csv", index=False) + print(f"Exported 8-bit & 16-bit plots and summary_stats.csv to {outdir}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/Reverse-Engineering CAN-Bus/main.py b/Reverse-Engineering CAN-Bus/main.py new file mode 100644 index 0000000..1a07227 --- /dev/null +++ b/Reverse-Engineering CAN-Bus/main.py @@ -0,0 +1,858 @@ +#!/usr/bin/env python3 +import json +import os +import sys +import threading +import subprocess +import shutil +from pathlib import Path +from datetime import datetime +import tkinter as tk +from tkinter import ttk, filedialog, messagebox +import tempfile + +SCRIPT_NAME = "can_universal_signal_finder.py" +SPLIT_SCRIPT = "can_split_by_id.py" +EXPLORE_SCRIPT = "id_signal_explorer.py" +TRACE_BATCH = "trace_batch_analyzer.py" +RANGE_FITTER = "trace_signal_fitter.py" + +LOG_PATTERNS = ("*.log", "*.txt") +TRACE_PATTERNS = ("*.trace",) + + +# ---------------- helpers ---------------- +def now_stamp(): + return datetime.now().strftime("%Y%m%d_%H%M%S") + + +def find_logs(root: Path, rel_logs_dir: str): + base = (root / rel_logs_dir) if rel_logs_dir else root + found = [] + if not base.exists(): + return found + for pat in LOG_PATTERNS: + found += [str(p) for p in base.glob(pat)] + found += [str(p) for p in base.rglob(pat)] # include subdirs + return sorted(set(found)) + + +def find_traces(base: Path): + """Liste .trace im Basisordner und eine Ebene tiefer.""" + files = [] + if not base.exists(): + return files + for pat in TRACE_PATTERNS: + files += [str(p) for p in base.glob(pat)] + files += [str(p) for p in base.glob(f"*/*{pat[1:]}")] # eine Ebene tiefer + return sorted(set(files)) + + +def ensure_dir(p: Path): + p.mkdir(parents=True, exist_ok=True) + return p + + +def latest_subdir(base: Path) -> Path: + """Neuester Unterordner in base, sonst base selbst.""" + if not base.exists(): + return base + subs = [p for p in base.iterdir() if p.is_dir()] + if not subs: + return base + return max(subs, key=lambda p: p.stat().st_mtime) + + +# ---------------- shared app state ---------------- +class AppState: + def __init__(self): + # core paths + self.workdir = tk.StringVar(value="") + self.logs_dir = tk.StringVar(value="logs") + self.traces_dir = tk.StringVar(value="traces") + self.analyze_out_base = tk.StringVar(value="analyze_out") + + # discovered logs + self.available_logs = [] # absolute paths + self.selected_log_indices = [] # indices in header listbox + + # project defaults + self.timestamp_runs = tk.BooleanVar(value=True) + + # shared traces directory + file list + self.traces_current_dir = tk.StringVar(value="") # absoluter Pfad zum aktuell angezeigten Traces-Ordner + self.traces_files = [] # Liste der .trace in current_dir (inkl. eine Ebene tiefer) + self._trace_observers = [] # callbacks, die Liste aktualisieren + + # hook: wenn der Pfad geändert wird, scannen + self.traces_current_dir.trace_add("write", self._on_traces_dir_changed) + + # --- path helpers --- + def workdir_path(self) -> Path: + wd = self.workdir.get().strip() or "." + return Path(wd) + + def logs_base_path(self) -> Path: + return self.workdir_path() / (self.logs_dir.get().strip() or "logs") + + def traces_base_path(self) -> Path: + return self.workdir_path() / (self.traces_dir.get().strip() or "traces") + + def analyze_out_root(self) -> Path: + return self.workdir_path() / (self.analyze_out_base.get().strip() or "analyze_out") + + # --- traces state --- + def add_trace_observer(self, cb): + if cb not in self._trace_observers: + self._trace_observers.append(cb) + + def _notify_trace_observers(self): + for cb in list(self._trace_observers): + try: + cb(self.traces_files) + except Exception: + pass + + def _on_traces_dir_changed(self, *_): + base = Path(self.traces_current_dir.get().strip() or str(self.traces_base_path())) + self.traces_files = find_traces(base) + self._notify_trace_observers() + + def set_traces_dir(self, path: str): + self.traces_current_dir.set(path) # löst automatisch scan + notify aus + + def refresh_traces(self): + # retrigger write to force refresh + self._on_traces_dir_changed() + + def set_traces_to_default_or_latest(self): + base = self.traces_base_path() + target = latest_subdir(base) + self.set_traces_dir(str(target)) + + +# ---------------- header (workdir + logs selection) ---------------- +class Header(ttk.Frame): + def __init__(self, master, state: AppState): + super().__init__(master, padding=8) + self.state = state + self._build_ui() + + def _build_ui(self): + self.columnconfigure(1, weight=1) + self.columnconfigure(3, weight=1) + # row 0: workdir + scan + ttk.Label(self, text="Workdir:").grid(row=0, column=0, sticky="w") + self.ent_workdir = ttk.Entry(self, textvariable=self.state.workdir) + self.ent_workdir.grid(row=0, column=1, sticky="ew", padx=6) + ttk.Button(self, text="Wählen…", command=self.pick_workdir).grid(row=0, column=2, padx=5) + ttk.Button(self, text="Logs scannen", command=self.scan_logs).grid(row=0, column=3, padx=5) + + # row 1: subfolders + timestamp checkbox + ttk.Label(self, text="Logs-Unterordner:").grid(row=1, column=0, sticky="w") + ttk.Entry(self, textvariable=self.state.logs_dir, width=24).grid(row=1, column=1, sticky="w", padx=6) + + ttk.Label(self, text="Traces-Unterordner:").grid(row=1, column=2, sticky="w") + ttk.Entry(self, textvariable=self.state.traces_dir, width=24).grid(row=1, column=3, sticky="w", padx=6) + + ttk.Label(self, text="Analyze-Output:").grid(row=2, column=0, sticky="w") + ttk.Entry(self, textvariable=self.state.analyze_out_base, width=24).grid(row=2, column=1, sticky="w", padx=6) + ttk.Checkbutton(self, text="Zeitstempel-Unterordner pro Run", variable=self.state.timestamp_runs).grid(row=2, column=2, columnspan=2, sticky="w") + + # row 3: logs list + frm = ttk.LabelFrame(self, text="Gefundene Logdateien (Mehrfachauswahl möglich)") + frm.grid(row=3, column=0, columnspan=4, sticky="nsew", pady=(8,0)) + self.rowconfigure(3, weight=1) + frm.columnconfigure(0, weight=1) + frm.rowconfigure(0, weight=1) + + self.lst_logs = tk.Listbox(frm, height=6, selectmode=tk.EXTENDED) + self.lst_logs.grid(row=0, column=0, sticky="nsew", padx=(8,4), pady=8) + + btns = ttk.Frame(frm) + btns.grid(row=0, column=1, sticky="ns", padx=(4,8), pady=8) + ttk.Button(btns, text="Alle wählen", command=self.select_all).pack(fill="x", pady=2) + ttk.Button(btns, text="Keine", command=self.select_none).pack(fill="x", pady=2) + ttk.Separator(btns, orient="horizontal").pack(fill="x", pady=6) + ttk.Button(btns, text="Manuell hinzufügen…", command=self.add_logs_manual).pack(fill="x", pady=2) + ttk.Button(btns, text="Entfernen", command=self.remove_selected_logs).pack(fill="x", pady=2) + ttk.Button(btns, text="Liste leeren", command=self.clear_logs).pack(fill="x", pady=2) + ttk.Separator(btns, orient="horizontal").pack(fill="x", pady=6) + ttk.Button(btns, text="Projekt speichern…", command=self.save_project).pack(fill="x", pady=2) + ttk.Button(btns, text="Projekt laden…", command=self.load_project).pack(fill="x", pady=2) + + # ---- actions ---- + def pick_workdir(self): + d = filedialog.askdirectory(title="Workdir auswählen") + if d: + self.state.workdir.set(d) + self.scan_logs() + # automatisch auch traces default/latest setzen + self.state.set_traces_to_default_or_latest() + + def scan_logs(self): + wd = self.state.workdir_path() + logs_dir = self.state.logs_dir.get().strip() + found = find_logs(wd, logs_dir) + self.state.available_logs = found + self.lst_logs.delete(0, tk.END) + for p in found: + self.lst_logs.insert(tk.END, p) + # default-select all + self.lst_logs.select_set(0, tk.END) + self.state.selected_log_indices = list(range(len(found))) + + def select_all(self): + self.lst_logs.select_set(0, tk.END) + self.state.selected_log_indices = list(range(self.lst_logs.size())) + + def select_none(self): + self.lst_logs.select_clear(0, tk.END) + self.state.selected_log_indices = [] + + def add_logs_manual(self): + paths = filedialog.askopenfilenames(title="Logdateien auswählen", filetypes=[("Logfiles","*.log *.txt"),("Alle Dateien","*.*")]) + if not paths: return + for p in paths: + if p not in self.state.available_logs: + self.state.available_logs.append(p) + self.lst_logs.insert(tk.END, p) + # if workdir empty, infer from first added + if not self.state.workdir.get().strip(): + self.state.workdir.set(str(Path(paths[0]).resolve().parent)) + # auch traces default/latest + self.state.set_traces_to_default_or_latest() + + def remove_selected_logs(self): + sel = list(self.lst_logs.curselection()) + sel.reverse() + for i in sel: + p = self.lst_logs.get(i) + if p in self.state.available_logs: + self.state.available_logs.remove(p) + self.lst_logs.delete(i) + self.state.selected_log_indices = [i for i in range(self.lst_logs.size()) if self.lst_logs.select_includes(i)] + + def clear_logs(self): + self.state.available_logs = [] + self.lst_logs.delete(0, tk.END) + self.state.selected_log_indices = [] + + def selected_logs(self): + idx = self.lst_logs.curselection() + if not idx: + return [] + return [self.lst_logs.get(i) for i in idx] + + # ---- project save/load ---- + def collect_project(self): + return { + "workdir": self.state.workdir.get(), + "logs_dir": self.state.logs_dir.get(), + "traces_dir": self.state.traces_dir.get(), + "analyze_out_base": self.state.analyze_out_base.get(), + "timestamp_runs": bool(self.state.timestamp_runs.get()), + "available_logs": self.state.available_logs, + "selected_indices": list(self.lst_logs.curselection()), + "traces_current_dir": self.state.traces_current_dir.get(), + } + + def apply_project(self, cfg): + self.state.workdir.set(cfg.get("workdir","")) + self.state.logs_dir.set(cfg.get("logs_dir","logs")) + self.state.traces_dir.set(cfg.get("traces_dir","traces")) + self.state.analyze_out_base.set(cfg.get("analyze_out_base","analyze_out")) + self.state.timestamp_runs.set(cfg.get("timestamp_runs", True)) + # restore logs + self.scan_logs() + # If project contained explicit available_logs, merge + for p in cfg.get("available_logs", []): + if p not in self.state.available_logs: + self.state.available_logs.append(p) + self.lst_logs.insert(tk.END, p) + # re-select indices if valid + self.lst_logs.select_clear(0, tk.END) + for i in cfg.get("selected_indices", []): + if 0 <= i < self.lst_logs.size(): + self.lst_logs.select_set(i) + # traces current dir: sofern vorhanden nutzen, sonst default/latest + tdir = cfg.get("traces_current_dir", "") + if tdir and Path(tdir).exists(): + self.state.set_traces_dir(tdir) + else: + self.state.set_traces_to_default_or_latest() + + def save_project(self): + cfg = self.collect_project() + path = filedialog.asksaveasfilename(title="Projekt speichern", defaultextension=".json", filetypes=[("Projektdatei","*.json")]) + if not path: return + with open(path, "w", encoding="utf-8") as f: + json.dump(cfg, f, indent=2) + messagebox.showinfo("Gespeichert", f"Projekt gespeichert:\n{path}") + + def load_project(self): + path = filedialog.askopenfilename(title="Projekt laden", filetypes=[("Projektdatei","*.json"),("Alle Dateien","*.*")]) + if not path: return + with open(path, "r", encoding="utf-8") as f: + cfg = json.load(f) + self.apply_project(cfg) + messagebox.showinfo("Geladen", f"Projekt geladen:\n{path}") + + +# ---------------- shared Trace Panel ---------------- +class TracePanel(ttk.LabelFrame): + """ + Einheitliche Trace-Auswahl: Liste links, Buttons rechts. + Nutzt AppState.traces_current_dir + AppState.traces_files. + single_select=True => Listbox SINGLE, versteckt 'Alle/Keine'. + """ + def __init__(self, master, state: AppState, title="Traces", single_select=False, height=10): + super().__init__(master, text=title) + self.state = state + self.single_select = single_select + self.height = height + self._build_ui() + # subscribe to state updates + self.state.add_trace_observer(self._on_traces_updated) + # initial fill from state + self._on_traces_updated(self.state.traces_files) + + def _build_ui(self): + self.columnconfigure(0, weight=1) + self.rowconfigure(0, weight=1) + + selectmode = tk.SINGLE if self.single_select else tk.EXTENDED + self.lst = tk.Listbox(self, height=self.height, selectmode=selectmode) + self.lst.grid(row=0, column=0, sticky="nsew", padx=(8,4), pady=8) + + btns = ttk.Frame(self) + btns.grid(row=0, column=1, sticky="ns", padx=(4,8), pady=8) + + ttk.Button(btns, text="Traces-Ordner wählen…", command=self._pick_traces_dir).pack(fill="x", pady=2) + ttk.Button(btns, text="Workdir/traces", command=self._use_default_traces).pack(fill="x", pady=2) + ttk.Button(btns, text="Neuester Split", command=self._use_latest_split).pack(fill="x", pady=2) + ttk.Button(btns, text="Refresh", command=self._refresh_traces).pack(fill="x", pady=6) + if not self.single_select: + ttk.Button(btns, text="Alle wählen", command=lambda: self.lst.select_set(0, tk.END)).pack(fill="x", pady=2) + ttk.Button(btns, text="Keine", command=lambda: self.lst.select_clear(0, tk.END)).pack(fill="x", pady=2) + + # --- state sync --- + def _on_traces_updated(self, files): + # refresh list content + cur_sel_paths = self.get_selected() + self.lst.delete(0, tk.END) + for p in files: + self.lst.insert(tk.END, p) + # try to restore selection + if cur_sel_paths: + path_to_index = {self.lst.get(i): i for i in range(self.lst.size())} + for p in cur_sel_paths: + if p in path_to_index: + self.lst.select_set(path_to_index[p]) + + def _pick_traces_dir(self): + d = filedialog.askdirectory(title="Traces-Ordner wählen", initialdir=str(self.state.traces_base_path())) + if d: + self.state.set_traces_dir(d) + + def _use_default_traces(self): + # default or latest under Workdir/traces + self.state.set_traces_to_default_or_latest() + + def _use_latest_split(self): + base = self.state.traces_base_path() + target = latest_subdir(base) + self.state.set_traces_dir(str(target)) + + def _refresh_traces(self): + self.state.refresh_traces() + + def get_selected(self): + idx = self.lst.curselection() + return [self.lst.get(i) for i in idx] + + +# ---------------- Tab 1: Multi-Log Analyse (ranking optional) ---------------- +class TabAnalyze(ttk.Frame): + def __init__(self, master, state: AppState, header: Header): + super().__init__(master, padding=10) + self.state = state + self.header = header + self._build_ui() + + def _build_ui(self): + self.columnconfigure(0, weight=1) + self.rowconfigure(2, weight=1) + + # params + params = ttk.LabelFrame(self, text="Analyse-Parameter") + params.grid(row=0, column=0, sticky="nsew", padx=5, pady=5) + for c in (1,3): + params.columnconfigure(c, weight=1) + + ttk.Label(params, text="Include-IDs (z.B. 0x208,0x209):").grid(row=0, column=0, sticky="w") + self.include_var = tk.StringVar(value="") + ttk.Entry(params, textvariable=self.include_var).grid(row=0, column=1, sticky="ew", padx=5) + + ttk.Label(params, text="Exclude-IDs:").grid(row=0, column=2, sticky="w") + self.exclude_var = tk.StringVar(value="") + ttk.Entry(params, textvariable=self.exclude_var).grid(row=0, column=3, sticky="ew", padx=5) + + ttk.Label(params, text="Scale:").grid(row=1, column=0, sticky="w") + self.scale_var = tk.DoubleVar(value=1.0) + ttk.Entry(params, textvariable=self.scale_var, width=12).grid(row=1, column=1, sticky="w", padx=5) + + ttk.Label(params, text="Offset:").grid(row=1, column=2, sticky="w") + self.offset_var = tk.DoubleVar(value=0.0) + ttk.Entry(params, textvariable=self.offset_var, width=12).grid(row=1, column=3, sticky="w", padx=5) + + ttk.Label(params, text="Range-Min:").grid(row=2, column=0, sticky="w") + self.rmin_var = tk.StringVar(value="") + ttk.Entry(params, textvariable=self.rmin_var, width=12).grid(row=2, column=1, sticky="w", padx=5) + + ttk.Label(params, text="Range-Max:").grid(row=2, column=2, sticky="w") + self.rmax_var = tk.StringVar(value="") + ttk.Entry(params, textvariable=self.rmax_var, width=12).grid(row=2, column=3, sticky="w", padx=5) + + ttk.Label(params, text="Range-Hit-Ratio (0..1):").grid(row=3, column=0, sticky="w") + self.hit_ratio_var = tk.DoubleVar(value=0.6) + ttk.Entry(params, textvariable=self.hit_ratio_var, width=12).grid(row=3, column=1, sticky="w", padx=5) + + ttk.Label(params, text="Top-N (Fallback):").grid(row=3, column=2, sticky="w") + self.top_var = tk.IntVar(value=20) + ttk.Entry(params, textvariable=self.top_var, width=12).grid(row=3, column=3, sticky="w", padx=5) + + ttk.Label(params, text="Per-ID-Limit:").grid(row=4, column=0, sticky="w") + self.per_id_limit_var = tk.IntVar(value=2) + ttk.Entry(params, textvariable=self.per_id_limit_var, width=12).grid(row=4, column=1, sticky="w", padx=5) + + self.run_separately_var = tk.BooleanVar(value=False) + ttk.Checkbutton(params, text="Jede Logdatei separat laufen lassen", variable=self.run_separately_var).grid(row=4, column=2, columnspan=2, sticky="w", padx=5) + + # run + console + run = ttk.Frame(self) + run.grid(row=1, column=0, sticky="ew", padx=5, pady=5) + ttk.Button(run, text="Analyse starten (Ranking)", command=self.on_run).pack(side="left", padx=5) + + out = ttk.LabelFrame(self, text="Ausgabe") + out.grid(row=2, column=0, sticky="nsew", padx=5, pady=5) + out.columnconfigure(0, weight=1); out.rowconfigure(0, weight=1) + self.txt = tk.Text(out, height=12); self.txt.grid(row=0, column=0, sticky="nsew") + sb = ttk.Scrollbar(out, orient="vertical", command=self.txt.yview); sb.grid(row=0, column=1, sticky="ns") + self.txt.configure(yscrollcommand=sb.set) + + def on_run(self): + logs = self.header.selected_logs() + if not logs: + messagebox.showwarning("Hinweis", "Bitte oben im Header Logdateien auswählen.") + return + t = threading.Thread(target=self._run_worker, args=(logs,), daemon=True) + self.txt.delete("1.0", tk.END) + self._append("Starte Analyse…\n") + t.start() + + def _run_worker(self, logs): + script_path = Path(__file__).parent / SCRIPT_NAME + if not script_path.exists(): + self._append(f"[Fehler] Script nicht gefunden: {script_path}\n"); return + + # output root: workdir/analyze_out/_multilog + out_root = self.state.analyze_out_root() + stamp = now_stamp() + "_multilog" + outdir = ensure_dir(out_root / stamp) + + def build_args(): + args = [sys.executable, str(script_path)] + if self.include_var.get().strip(): + args += ["--include-ids", self.include_var.get().strip()] + if self.exclude_var.get().strip(): + args += ["--exclude-ids", self.exclude_var.get().strip()] + args += ["--scale", str(self.scale_var.get()), "--offset", str(self.offset_var.get())] + if self.rmin_var.get().strip(): args += ["--range-min", self.rmin_var.get().strip()] + if self.rmax_var.get().strip(): args += ["--range-max", self.rmax_var.get().strip()] + args += ["--range-hit-ratio", str(self.hit_ratio_var.get())] + args += ["--top", str(self.top_var.get()), "--per-id-limit", str(self.per_id_limit_var.get())] + return args + + if self.run_separately_var.get(): + for p in logs: + sub = ensure_dir(outdir / Path(p).stem) + cmd = build_args() + ["--outdir", str(sub), p] + self._run_cmd(cmd) + else: + cmd = build_args() + ["--outdir", str(outdir)] + logs + self._run_cmd(cmd) + + self._append(f"\nDone. Output: {outdir}\n") + + def _run_cmd(self, cmd): + self._append(f"\n>>> RUN: {' '.join(cmd)}\n") + try: + with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) as proc: + for line in proc.stdout: self._append(line) + rc = proc.wait() + if rc != 0: self._append(f"[Exit-Code {rc}]\n") + except Exception as e: + self._append(f"[Fehler] {e}\n") + + def _append(self, s): self.txt.insert(tk.END, s); self.txt.see(tk.END) + + +# ---------------- Tab 2: ID Explorer (split + single-ID analyze) ---------------- +class TabExplorer(ttk.Frame): + def __init__(self, master, state: AppState, header: Header): + super().__init__(master, padding=10) + self.state = state + self.header = header + self._build_ui() + + def _build_ui(self): + self.columnconfigure(0, weight=1) + self.rowconfigure(3, weight=1) + + # split controls + frm_split = ttk.LabelFrame(self, text="Split: Logs → per-ID Traces") + frm_split.grid(row=0, column=0, sticky="ew", padx=5, pady=5) + frm_split.columnconfigure(1, weight=1) + + self.rx_only_var = tk.BooleanVar(value=False) + self.ts_split_var = tk.BooleanVar(value=True) + + ttk.Label(frm_split, text="Ziel (Workdir/traces[/timestamp])").grid(row=0, column=0, sticky="w", padx=5) + ttk.Label(frm_split, textvariable=self.state.traces_dir).grid(row=0, column=1, sticky="w") + ttk.Checkbutton(frm_split, text="nur RX", variable=self.rx_only_var).grid(row=1, column=0, sticky="w", padx=5) + ttk.Checkbutton(frm_split, text="Zeitstempel-Unterordner", variable=self.ts_split_var).grid(row=1, column=1, sticky="w", padx=5) + ttk.Button(frm_split, text="Split starten", command=self.on_split).grid(row=1, column=2, sticky="e", padx=5) + + # unified trace panel (multi-select) + self.trace_panel = TracePanel(self, self.state, title="Traces im ausgewählten Ordner", single_select=False, height=10) + self.trace_panel.grid(row=1, column=0, sticky="nsew", padx=5, pady=(8,10)) + + # single-ID analyze + frm_one = ttk.LabelFrame(self, text="Einzel-ID Analyse (Plots + summary_stats)") + frm_one.grid(row=2, column=0, sticky="nsew", padx=5, pady=5) + frm_one.columnconfigure(1, weight=1) + ttk.Label(frm_one, text="Output-Basis (unter Workdir/analyze_out):").grid(row=0, column=0, sticky="w") + self.one_out_base = tk.StringVar(value="id_explore") + ttk.Entry(frm_one, textvariable=self.one_out_base).grid(row=0, column=1, sticky="ew", padx=5) + self.ts_one = tk.BooleanVar(value=True) + ttk.Checkbutton(frm_one, text="Zeitstempel-Unterordner", variable=self.ts_one).grid(row=0, column=2, sticky="w", padx=5) + ttk.Button(frm_one, text="Analyse starten", command=self.on_one_analyze).grid(row=0, column=3, sticky="e", padx=5) + + # console + out = ttk.LabelFrame(self, text="Ausgabe") + out.grid(row=3, column=0, sticky="nsew", padx=5, pady=5) + out.columnconfigure(0, weight=1); out.rowconfigure(0, weight=1) + self.txt = tk.Text(out, height=12); self.txt.grid(row=0, column=0, sticky="nsew") + sb = ttk.Scrollbar(out, orient="vertical", command=self.txt.yview); sb.grid(row=0, column=1, sticky="ns") + self.txt.configure(yscrollcommand=sb.set) + + def on_split(self): + logs = self.header.selected_logs() + if not logs: + messagebox.showwarning("Hinweis", "Bitte oben im Header Logdateien auswählen."); return + outdir = self.state.traces_base_path() + if self.ts_split_var.get(): outdir = outdir / now_stamp() + ensure_dir(outdir) + cmd = [sys.executable, str(Path(__file__).parent / SPLIT_SCRIPT), "--outdir", str(outdir)] + if self.rx_only_var.get(): cmd.append("--rx-only") + cmd += logs + self._run_cmd(cmd) + # nach dem Split: globalen Traces-Ordner setzen (neuester Ordner) + self.state.set_traces_dir(str(outdir)) + + def on_one_analyze(self): + sel = self.trace_panel.get_selected() + if not sel: + messagebox.showwarning("Hinweis", "Bitte mindestens eine .trace auswählen."); return + out_root = self.state.analyze_out_root() + stamp = now_stamp() + "_id_explore" if self.ts_one.get() else "id_explore" + outdir = ensure_dir(out_root / stamp) + for trace in sel: + cmd = [sys.executable, str(Path(__file__).parent / EXPLORE_SCRIPT), "--outdir", str(outdir), trace] + self._run_cmd(cmd) + self._append(f"\nDone. Output: {outdir}\n") + + def _run_cmd(self, cmd): + self._append(f"\n>>> RUN: {' '.join(cmd)}\n") + try: + with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) as proc: + for line in proc.stdout: self._append(line) + rc = proc.wait() + if rc != 0: self._append(f"[Exit-Code {rc}]\n") + except Exception as e: + self._append(f"[Fehler] {e}\n") + + def _append(self, s): self.txt.insert(tk.END, s); self.txt.see(tk.END) + + +# ---------------- Tab 3: Traces Batch-Analyse ---------------- +class TabTraceBatch(ttk.Frame): + def __init__(self, master, state: AppState, header: Header): + super().__init__(master, padding=10) + self.state = state + self.header = header + self._build_ui() + + def _build_ui(self): + self.columnconfigure(0, weight=1) + self.rowconfigure(2, weight=1) + + # unified trace panel (multi-select) + self.trace_panel = TracePanel(self, self.state, title="Traces (Ordner/Subset wählen)", single_select=False, height=10) + self.trace_panel.grid(row=0, column=0, sticky="nsew", padx=5, pady=(5,10)) + + # Params + pr = ttk.LabelFrame(self, text="Analyse-Parameter") + pr.grid(row=1, column=0, sticky="nsew", padx=5, pady=5) + for c in (1,3): + pr.columnconfigure(c, weight=1) + + self.rx_only = tk.BooleanVar(value=False) + ttk.Checkbutton(pr, text="nur RX", variable=self.rx_only).grid(row=0, column=0, sticky="w", padx=5) + + ttk.Label(pr, text="Scale").grid(row=0, column=1, sticky="e") + self.scale = tk.DoubleVar(value=1.0) + ttk.Entry(pr, textvariable=self.scale, width=12).grid(row=0, column=2, sticky="w", padx=5) + + ttk.Label(pr, text="Offset").grid(row=0, column=3, sticky="e") + self.offset = tk.DoubleVar(value=0.0) + ttk.Entry(pr, textvariable=self.offset, width=12).grid(row=0, column=4, sticky="w", padx=5) + + ttk.Label(pr, text="Range-Min").grid(row=1, column=1, sticky="e") + self.rmin = tk.StringVar(value="") + ttk.Entry(pr, textvariable=self.rmin, width=12).grid(row=1, column=2, sticky="w", padx=5) + + ttk.Label(pr, text="Range-Max").grid(row=1, column=3, sticky="e") + self.rmax = tk.StringVar(value="") + ttk.Entry(pr, textvariable=self.rmax, width=12).grid(row=1, column=4, sticky="w", padx=5) + + ttk.Label(pr, text="Top pro Trace").grid(row=2, column=1, sticky="e") + self.top = tk.IntVar(value=8) + ttk.Entry(pr, textvariable=self.top, width=12).grid(row=2, column=2, sticky="w", padx=5) + + self.use_ts = tk.BooleanVar(value=True) + ttk.Checkbutton(pr, text="Zeitstempel-Unterordner", variable=self.use_ts).grid(row=2, column=3, sticky="w", padx=5) + + # Run & console + run = ttk.Frame(self) + run.grid(row=3, column=0, sticky="ew", padx=5, pady=5) + ttk.Button(run, text="Batch starten", command=self.on_run).pack(side="left", padx=5) + + out = ttk.LabelFrame(self, text="Ausgabe") + out.grid(row=4, column=0, sticky="nsew", padx=5, pady=5) + out.columnconfigure(0, weight=1); out.rowconfigure(0, weight=1) + self.txt = tk.Text(out, height=12); self.txt.grid(row=0, column=0, sticky="nsew") + sb = ttk.Scrollbar(out, orient="vertical", command=self.txt.yview); sb.grid(row=0, column=1, sticky="ns") + self.txt.configure(yscrollcommand=sb.set) + + def on_run(self): + # nutze Auswahl oder – falls leer – kompletten Ordner + selected = self.trace_panel.get_selected() + traces_dir = Path(self.state.traces_current_dir.get().strip() or str(self.state.traces_base_path())) + if not traces_dir.exists(): + messagebox.showwarning("Hinweis", "Bitte gültigen Traces-Ordner wählen."); return + + out_root = self.state.analyze_out_root() + label = "trace_batch" + stamp = now_stamp() + "_" + label if self.use_ts.get() else label + outdir = ensure_dir(out_root / stamp) + + # falls Auswahl getroffen wurde, temporären Subset-Ordner bauen + subset_dir = None + if selected: + subset_dir = ensure_dir(outdir / "_subset") + for p in selected: + src = Path(p) + dst = subset_dir / src.name + try: + # versuchen Hardlink (schnell, platzsparend) + if dst.exists(): + dst.unlink() + os.link(src, dst) + except Exception: + # Fallback: Kopieren + shutil.copy2(src, dst) + + run_dir = subset_dir if subset_dir else traces_dir + + cmd = [sys.executable, str(Path(__file__).parent/TRACE_BATCH), + "--traces-dir", str(run_dir), "--outdir", str(outdir), + "--scale", str(self.scale.get()), "--offset", str(self.offset.get()), + "--top", str(self.top.get()), "--plots"] + if self.rmin.get().strip(): cmd += ["--range-min", self.rmin.get().strip()] + if self.rmax.get().strip(): cmd += ["--range-max", self.rmax.get().strip()] + if self.rx_only.get(): cmd.append("--rx-only") + + self._run_cmd(cmd) + self._append(f"\nDone. Output: {outdir}\n") + + def _run_cmd(self, cmd): + self._append(f"\n>>> RUN: {' '.join(cmd)}\n") + try: + with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) as proc: + for line in proc.stdout: self._append(line) + rc = proc.wait() + if rc != 0: self._append(f"[Exit-Code {rc}]\n") + except Exception as e: + self._append(f"[Fehler] {e}\n") + + def _append(self, s): self.txt.insert(tk.END, s); self.txt.see(tk.END) + + +# ---------------- Tab 4: Range-Fit (supervised + unsupervised) ---------------- +class TabRangeFit(ttk.Frame): + def __init__(self, master, state: AppState, header: Header): + super().__init__(master, padding=10) + self.state = state + self.header = header + self._build_ui() + + def _build_ui(self): + self.columnconfigure(0, weight=1) + self.rowconfigure(2, weight=1) + + # unified trace panel (single-select) + self.trace_panel = TracePanel(self, self.state, title="Trace wählen (Single)", single_select=True, height=8) + self.trace_panel.grid(row=0, column=0, sticky="nsew", padx=5, pady=(5,10)) + + # Parameter + frm_params = ttk.LabelFrame(self, text="Range-/Unsupervised-Fit Parameter") + frm_params.grid(row=1, column=0, sticky="nsew", padx=5, pady=5) + for c in (1,3,5): + frm_params.columnconfigure(c, weight=1) + + # Range (leer lassen => unsupervised) + ttk.Label(frm_params, text="Range-Min (leer = unsupervised)").grid(row=0, column=0, sticky="e") + self.rmin = tk.StringVar(value="") + ttk.Entry(frm_params, textvariable=self.rmin, width=12).grid(row=0, column=1, sticky="w", padx=5) + + ttk.Label(frm_params, text="Range-Max (leer = unsupervised)").grid(row=0, column=2, sticky="e") + self.rmax = tk.StringVar(value="") + ttk.Entry(frm_params, textvariable=self.rmax, width=12).grid(row=0, column=3, sticky="w", padx=5) + + ttk.Label(frm_params, text="Min. Hit-Ratio (Range-Fit)").grid(row=0, column=4, sticky="e") + self.min_hit = tk.DoubleVar(value=0.5) + ttk.Entry(frm_params, textvariable=self.min_hit, width=10).grid(row=0, column=5, sticky="w", padx=5) + + ttk.Label(frm_params, text="Min. Smoothness (Unsupervised)").grid(row=1, column=0, sticky="e") + self.min_smooth = tk.DoubleVar(value=0.2) + ttk.Entry(frm_params, textvariable=self.min_smooth, width=10).grid(row=1, column=1, sticky="w", padx=5) + + self.rx_only = tk.BooleanVar(value=False) + self.neg_scale = tk.BooleanVar(value=False) # nur bei Range-Fit + ttk.Checkbutton(frm_params, text="nur RX", variable=self.rx_only).grid(row=1, column=2, sticky="w") + ttk.Checkbutton(frm_params, text="negative Scale erlauben (Range-Fit)", variable=self.neg_scale).grid(row=1, column=3, sticky="w") + + ttk.Label(frm_params, text="Plots Top-N").grid(row=1, column=4, sticky="e") + self.plots_top = tk.IntVar(value=8) + ttk.Entry(frm_params, textvariable=self.plots_top, width=10).grid(row=1, column=5, sticky="w", padx=5) + + ttk.Label(frm_params, text="Output-Label").grid(row=2, column=0, sticky="e") + self.out_label = tk.StringVar(value="rangefit") + ttk.Entry(frm_params, textvariable=self.out_label, width=16).grid(row=2, column=1, sticky="w", padx=5) + + self.use_ts = tk.BooleanVar(value=True) + ttk.Checkbutton(frm_params, text="Zeitstempel-Unterordner", variable=self.use_ts).grid(row=2, column=2, sticky="w", padx=2) + + # Start + Konsole + frm_run = ttk.Frame(self) + frm_run.grid(row=3, column=0, sticky="ew", padx=5, pady=5) + ttk.Button(frm_run, text="Start Range-/Unsupervised-Fit", command=self._on_run).pack(side="left", padx=5) + + frm_out = ttk.LabelFrame(self, text="Ausgabe") + frm_out.grid(row=4, column=0, sticky="nsew", padx=5, pady=5) + frm_out.columnconfigure(0, weight=1); frm_out.rowconfigure(0, weight=1) + self.txt = tk.Text(frm_out, height=12); self.txt.grid(row=0, column=0, sticky="nsew") + sbo = ttk.Scrollbar(frm_out, orient="vertical", command=self.txt.yview); sbo.grid(row=0, column=1, sticky="ns") + self.txt.configure(yscrollcommand=sbo.set) + + def _append(self, s): + self.txt.insert(tk.END, s); self.txt.see(tk.END) + + def _on_run(self): + sel = self.trace_panel.get_selected() + if not sel: + messagebox.showwarning("Hinweis", "Bitte genau eine .trace-Datei auswählen.") + return + if len(sel) != 1: + messagebox.showwarning("Hinweis", "Range-Fit benötigt genau eine .trace-Datei (Single-Select).") + return + + trace = sel[0] + rmin = self.rmin.get().strip() + rmax = self.rmax.get().strip() + supervised = bool(rmin) and bool(rmax) + + out_root = self.state.analyze_out_root() + label = (self.out_label.get().strip() or ("rangefit" if supervised else "unsupervised")) + stamp = f"{now_stamp()}_{label}" if self.use_ts.get() else label + outdir = ensure_dir(out_root / stamp) + + cmd = [ + sys.executable, + str(Path(__file__).parent / RANGE_FITTER), + trace, + "--outdir", str(outdir), + "--plots-top", str(self.plots_top.get()), + ] + if self.rx_only.get(): + cmd.append("--rx-only") + + if supervised: + cmd += ["--rmin", rmin, "--rmax", rmax, "--min-hit", str(self.min_hit.get())] + if self.neg_scale.get(): + cmd.append("--allow-neg-scale") + else: + cmd += ["--min-smooth", str(self.min_smooth.get())] + + self._run_cmd(cmd) + self._append(f"\nDone. Output: {outdir}\n") + + def _run_cmd(self, cmd): + self._append(f"\n>>> RUN: {' '.join(cmd)}\n") + try: + with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) as proc: + for line in proc.stdout: self._append(line) + rc = proc.wait() + if rc != 0: self._append(f"[Exit-Code {rc}]\n") + except Exception as e: + self._append(f"[Fehler] {e}\n") + + +# ---------------- App Shell ---------------- +class App(tk.Tk): + def __init__(self): + super().__init__() + self.title("CAN Universal Signal Finder – GUI") + self.geometry("1180x860") + self.configure(padx=8, pady=8) + + # shared state + self.state = AppState() + + # header (always visible) + self.header = Header(self, self.state) + self.header.pack(fill="x", side="top") + + # Tabs + nb = ttk.Notebook(self) + nb.pack(fill="both", expand=True) + + self.tab_analyze = TabAnalyze(nb, self.state, self.header) + self.tab_explorer = TabExplorer(nb, self.state, self.header) + self.tab_batch = TabTraceBatch(nb, self.state, self.header) + self.tab_rangefit = TabRangeFit(nb, self.state, self.header) + + nb.add(self.tab_analyze, text="Multi-Log Analyse") + nb.add(self.tab_explorer, text="ID Explorer") + nb.add(self.tab_batch, text="Traces Batch-Analyse") + nb.add(self.tab_rangefit, text="Range-Fit") + + # init: traces auf default/latest stellen + self.state.set_traces_to_default_or_latest() + + +if __name__ == "__main__": + app = App() + app.mainloop() diff --git a/Reverse-Engineering CAN-Bus/models/.gitignore b/Reverse-Engineering CAN-Bus/models/.gitignore new file mode 100644 index 0000000..1dffce1 --- /dev/null +++ b/Reverse-Engineering CAN-Bus/models/.gitignore @@ -0,0 +1,18 @@ +# Ignoriere in JEDEM unmittelbaren Unterordner von models/ +# die Verzeichnisse "traces" und "analyze_out" +*/traces/ +*/traces/** +*/analyze_out/ +*/analyze_out/** + +# Falls jemand versehentlich direkt unter models/ solche Ordner anlegt, auch ignorieren: +traces/ +traces/** +analyze_out/ +analyze_out/** + +# (Optional, falls du dich mal vertippst) +*/analyze.out/ +*/analyze.out/** +analyze.out/ +analyze.out/** diff --git a/Reverse-Engineering CAN-Bus/models/Triumph Speed Twin 1200 RS (2025)/Notes.txt b/Reverse-Engineering CAN-Bus/models/Triumph Speed Twin 1200 RS (2025)/Notes.txt new file mode 100644 index 0000000..fe0c738 --- /dev/null +++ b/Reverse-Engineering CAN-Bus/models/Triumph Speed Twin 1200 RS (2025)/Notes.txt @@ -0,0 +1,12 @@ + +possible CAN Ids !? (from Forum somwhere) +Message ID: + +0x540 - byte 0 - bits 6...4 - Gear Position - 0 = N, 1-6 = gears 1-6 +bit 1 - Neutral Light - 1 = on, 0 = off +bit 2 - Check engine light???? +0x550 - byte 0 - bits 2...0 - Coolant bars on dashboard +- bit 3 - Warning light - 1 = on, 0 = off +0x570 - bytes 2-3 - Coolant temp - (256 * byte 3 + byte 2) / 10 = Temp in Degrees C +0x518 - Possible revs - divide by 4 +0x519 - Similar to 0x518 Possibly TPS unsure. Doesn't actuate when only tps is rotated. \ No newline at end of file diff --git a/Reverse-Engineering CAN-Bus/models/Triumph Speed Twin 1200 RS (2025)/Triumph Speed Twin 1200 RS (2025).json b/Reverse-Engineering CAN-Bus/models/Triumph Speed Twin 1200 RS (2025)/Triumph Speed Twin 1200 RS (2025).json new file mode 100644 index 0000000..1b16341 --- /dev/null +++ b/Reverse-Engineering CAN-Bus/models/Triumph Speed Twin 1200 RS (2025)/Triumph Speed Twin 1200 RS (2025).json @@ -0,0 +1,13 @@ +{ + "workdir": "models/Triumph Speed Twin 1200 RS (2025)", + "logs_dir": "logs", + "traces_dir": "traces", + "analyze_out_base": "analyze_out", + "timestamp_runs": true, + "available_logs": [ + "models/Triumph Speed Twin 1200 RS (2025)/logs/cantrace-raw-2025-08-27T17-45-27-980Z-1.log" + ], + "selected_indices": [ + 0 + ] +} \ No newline at end of file diff --git a/Reverse-Engineering CAN-Bus/requirements.txt b/Reverse-Engineering CAN-Bus/requirements.txt new file mode 100644 index 0000000..a54c885 --- /dev/null +++ b/Reverse-Engineering CAN-Bus/requirements.txt @@ -0,0 +1,3 @@ +pandas>=2.0.0 +numpy>=1.24.0 +matplotlib>=3.7.0 \ No newline at end of file diff --git a/Reverse-Engineering CAN-Bus/start.sh b/Reverse-Engineering CAN-Bus/start.sh new file mode 100755 index 0000000..9f03fd1 --- /dev/null +++ b/Reverse-Engineering CAN-Bus/start.sh @@ -0,0 +1,18 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Choose python (allow override with $PYTHON) +PYTHON_BIN="${PYTHON:-python3}" +VENV_DIR=".venv" + +if [ ! -d "$VENV_DIR" ]; then + "$PYTHON_BIN" -m venv "$VENV_DIR" +fi + +# shellcheck disable=SC1091 +source "$VENV_DIR/bin/activate" + +python -m pip install --upgrade pip +pip install -r requirements.txt + +exec python main.py "$@" diff --git a/Reverse-Engineering CAN-Bus/trace_batch_analyzer.py b/Reverse-Engineering CAN-Bus/trace_batch_analyzer.py new file mode 100644 index 0000000..3f1bd14 --- /dev/null +++ b/Reverse-Engineering CAN-Bus/trace_batch_analyzer.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 +import re +import sys +import argparse +from pathlib import Path +import pandas as pd +import numpy as np +import matplotlib.pyplot as plt + +LOG_PATTERN = re.compile(r"(\d+)\s+(TX|RX)\s+0x([0-9A-Fa-f]+)\s+\d+\s+((?:[0-9A-Fa-f]{2}\s+)+)") + +def parse_trace(path: Path, rx_only=False) -> pd.DataFrame: + rows = [] + with open(path, "r", errors="ignore") as f: + for line in f: + m = LOG_PATTERN.match(line) + if not m: + continue + ts = int(m.group(1)) + dr = m.group(2) + if rx_only and dr != "RX": + continue + cid = int(m.group(3), 16) + data = [int(x, 16) for x in m.group(4).split() if x.strip()] + rows.append((ts, dr, cid, data)) + df = pd.DataFrame(rows, columns=["ts","dir","id","data"]) + if df.empty: + return df + df["time_s"] = (df["ts"] - df["ts"].min())/1000.0 + return df + +def be16(a,b): return (a<<8)|b +def le16(a,b): return a | (b<<8) + +def analyze_one_trace(df: pd.DataFrame, scale=1.0, offs=0.0, rmin=None, rmax=None): + """Return stats for all 8-bit bytes and all adjacent 16-bit pairs (LE/BE).""" + stats = [] + # 8-bit + for i in range(8): + vals = [d[i] for d in df["data"] if len(d)>i] + if not vals: continue + arr = np.array(vals, dtype=float) + phys = arr*scale + offs + hit = np.ones_like(phys, dtype=bool) + if rmin is not None: hit &= (phys>=rmin) + if rmax is not None: hit &= (phys<=rmax) + stats.append({ + "type":"byte8","slot":str(i), + "n":len(arr), + "min":float(arr.min()),"max":float(arr.max()),"var":float(arr.var()), + "hit_ratio": float(np.count_nonzero(hit))/len(hit) if len(hit)>0 else 0.0, + "min_phys": float(phys.min()), "max_phys": float(phys.max()) + }) + # 16-bit + pairs = [(i,i+1) for i in range(7)] + for i,j in pairs: + # LE + vals = [le16(d[i],d[j]) for d in df["data"] if len(d)>j] + if vals: + arr = np.array(vals, dtype=float); phys = arr*scale + offs + hit = np.ones_like(phys, dtype=bool) + if rmin is not None: hit &= (phys>=rmin) + if rmax is not None: hit &= (phys<=rmax) + stats.append({ + "type":"le16","slot":f"{i}-{j}", + "n":len(arr), + "min":float(arr.min()),"max":float(arr.max()),"var":float(arr.var()), + "hit_ratio": float(np.count_nonzero(hit))/len(hit) if len(hit)>0 else 0.0, + "min_phys": float(phys.min()), "max_phys": float(phys.max()) + }) + # BE + vals = [be16(d[i],d[j]) for d in df["data"] if len(d)>j] + if vals: + arr = np.array(vals, dtype=float); phys = arr*scale + offs + hit = np.ones_like(phys, dtype=bool) + if rmin is not None: hit &= (phys>=rmin) + if rmax is not None: hit &= (phys<=rmax) + stats.append({ + "type":"be16","slot":f"{i}-{j}", + "n":len(arr), + "min":float(arr.min()),"max":float(arr.max()),"var":float(arr.var()), + "hit_ratio": float(np.count_nonzero(hit))/len(hit) if len(hit)>0 else 0.0, + "min_phys": float(phys.min()), "max_phys": float(phys.max()) + }) + return pd.DataFrame(stats) + +def plot_one_trace(df: pd.DataFrame, outdir: Path, prefix: str): + outdir.mkdir(parents=True, exist_ok=True) + # 8-bit plots + for i in range(8): + times, series = [], [] + for t,d in zip(df["time_s"], df["data"]): + if len(d)>i: + times.append(t); series.append(d[i]) + if not series: continue + import matplotlib.pyplot as plt + plt.figure(figsize=(10,4)) + plt.plot(times, series, marker=".", linestyle="-") + plt.xlabel("Zeit (s)"); plt.ylabel(f"Byte[{i}] (8-bit)") + plt.title(f"{prefix} – 8-bit Byte {i}") + plt.grid(True); plt.tight_layout() + plt.savefig(outdir / f"{prefix}_byte{i}.png", dpi=150); plt.close() + # 16-bit plots (LE/BE) + pairs = [(i,i+1) for i in range(7)] + for i,j in pairs: + times, series = [], [] + for t,d in zip(df["time_s"], df["data"]): + if len(d)>j: times.append(t); series.append(le16(d[i],d[j])) + if series: + import matplotlib.pyplot as plt + plt.figure(figsize=(10,4)) + plt.plot(times, series, marker=".", linestyle="-") + plt.xlabel("Zeit (s)"); plt.ylabel(f"LE16 @{i}-{j}") + plt.title(f"{prefix} – LE16 {i}-{j}") + plt.grid(True); plt.tight_layout() + plt.savefig(outdir / f"{prefix}_le16_{i}-{j}.png", dpi=150); plt.close() + times, series = [], [] + for t,d in zip(df["time_s"], df["data"]): + if len(d)>j: times.append(t); series.append(be16(d[i],d[j])) + if series: + import matplotlib.pyplot as plt + plt.figure(figsize=(10,4)) + plt.plot(times, series, marker=".", linestyle="-") + plt.xlabel("Zeit (s)"); plt.ylabel(f"BE16 @{i}-{j}") + plt.title(f"{prefix} – BE16 {i}-{j}") + plt.grid(True); plt.tight_layout() + plt.savefig(outdir / f"{prefix}_be16_{i}-{j}.png", dpi=150); plt.close() + +def main(): + ap = argparse.ArgumentParser(description="Batch analyze per-ID traces and rank 8/16-bit combinations") + ap.add_argument("--traces-dir", required=True, help="Directory containing *.trace files") + ap.add_argument("--outdir", required=True, help="Output directory for analysis results") + ap.add_argument("--rx-only", action="store_true", help="Use RX frames only") + ap.add_argument("--plots", action="store_true", help="Also generate plots for each trace") + ap.add_argument("--scale", type=float, default=1.0, help="phys = raw*scale + offset") + ap.add_argument("--offset", type=float, default=0.0, help="phys = raw*scale + offset") + ap.add_argument("--range-min", type=float, default=None, help="physical min (after scale/offset)") + ap.add_argument("--range-max", type=float, default=None, help="physical max (after scale/offset)") + ap.add_argument("--top", type=int, default=8, help="Export top combos per trace to summary") + args = ap.parse_args() + + tdir = Path(args.traces_dir) + outdir = Path(args.outdir); outdir.mkdir(parents=True, exist_ok=True) + + traces = sorted([p for p in tdir.glob("*.trace")]) + if not traces: + print("No .trace files found.", file=sys.stderr) + sys.exit(2) + + global_rows = [] + for tr in traces: + df = parse_trace(tr, rx_only=args.rx_only) + if df.empty: + continue + stats = analyze_one_trace(df, args.scale, args.offset, args.range_min, args.range_max) + # Ranking: primarily by hit_ratio (if range given), else by variance; break ties by var then n + if args.range_min is not None or args.range_max is not None: + stats = stats.sort_values(["hit_ratio","var","n"], ascending=[False, False, False]) + else: + stats = stats.sort_values(["var","n"], ascending=[False, False]) + # write per-trace csv + per_csv = outdir / f"{tr.stem}_combostats.csv" + stats.to_csv(per_csv, index=False) + + # append top rows with trace id hint + stem = tr.stem # e.g., 0x208_log1 + for _, row in stats.head(args.top).iterrows(): + r = row.to_dict() + r["trace"] = stem + global_rows.append(r) + + # plots (optional) into a subdir per trace + if args.plots: + plot_dir = outdir / f"{tr.stem}_plots" + plot_one_trace(df, plot_dir, prefix=tr.stem) + + # global summary + if global_rows: + gdf = pd.DataFrame(global_rows) + gdf.to_csv(outdir / "summary_top_combinations.csv", index=False) + print(f"Global summary written: {outdir/'summary_top_combinations.csv'}") + + print(f"Processed {len(traces)} trace files. Results at: {outdir}") + +if __name__ == "__main__": + main() diff --git a/Reverse-Engineering CAN-Bus/trace_signal_fitter.py b/Reverse-Engineering CAN-Bus/trace_signal_fitter.py new file mode 100644 index 0000000..2e611b0 --- /dev/null +++ b/Reverse-Engineering CAN-Bus/trace_signal_fitter.py @@ -0,0 +1,315 @@ +#!/usr/bin/env python3 +""" +trace_signal_fitter.py +---------------------- +Zwei Betriebsarten für eine einzelne .trace-Datei: + +1) Range-Fit (überwacht): --rmin/--rmax gesetzt + Sucht für alle 8-bit (D0..D7) und adjazenten 16-bit (LE/BE) eine lineare Abbildung + phys = raw*scale + offset, die möglichst viele Samples in [rmin, rmax] bringt. + Ranking primär nach hit_ratio. + +2) Unsupervised (ohne Range): --rmin/--rmax weggelassen + Findet „plausible“ physikalische Kandidaten nach Glattheit/Varianz/Spannweite/Rate, + ohne Scale/Offset zu schätzen (raw-Werte direkt). Ranking primär nach „smoothness“. + + +Logformat (Kettenöler): + 0x ... + +Outputs: +- Range-Fit: _encoding_candidates.csv + optional Plots +- Unsupervised:_unsupervised_candidates.csv + optional Plots + +""" +import re +import sys +import argparse +from pathlib import Path +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt + +LOG_PATTERN = re.compile(r"(\d+)\s+(TX|RX)\s+0x([0-9A-Fa-f]+)\s+\d+\s+((?:[0-9A-Fa-f]{2}\s+)+)") + +def parse_trace(path: Path, rx_only=False) -> pd.DataFrame: + rows = [] + with open(path, "r", errors="ignore") as f: + for line in f: + m = LOG_PATTERN.match(line) + if not m: + continue + ts = int(m.group(1)); dr = m.group(2) + if rx_only and dr != "RX": + continue + cid = int(m.group(3), 16) + data = [int(x, 16) for x in m.group(4).split() if x.strip()] + rows.append((ts, dr, cid, data)) + df = pd.DataFrame(rows, columns=["ts","dir","id","data"]) + if df.empty: + return df + df["time_s"] = (df["ts"] - df["ts"].min())/1000.0 + return df + +def be16(a,b): return (a<<8)|b +def le16(a,b): return a | (b<<8) + +def p95_abs_diff(arr: np.ndarray) -> float: + if arr.size < 2: + return 0.0 + d = np.abs(np.diff(arr)) + return float(np.percentile(d, 95)) + +def basic_rate(times: np.ndarray) -> float: + if times.size < 2: return 0.0 + dur = times.max() - times.min() + if dur <= 0: return 0.0 + return float(times.size / dur) + +def interval_best_offset(raw: np.ndarray, scale: float, rmin: float, rmax: float): + a = rmin - scale*raw + b = rmax - scale*raw + lo = np.minimum(a,b) + hi = np.maximum(a,b) + events = [] + for L,H in zip(lo,hi): + events.append((L, +1)) + events.append((H, -1)) + events.sort(key=lambda t: (t[0], -t[1])) + best = -1; cur = 0; best_x = None + for x, v in events: + cur += v + if cur > best: + best = cur; best_x = x + return best_x, float(best)/float(len(raw)) + +def gen_candidates(df: pd.DataFrame): + times = df["time_s"].to_numpy(dtype=float) + data = df["data"].tolist() + # 8-bit + for i in range(8): + vals = [d[i] for d in data if len(d)>i] + if not vals: continue + yield (f"byte[{i}]", np.array(vals, dtype=float)), times[:len(vals)] + # 16-bit (adjacent) + pairs = [(i,i+1) for i in range(7)] + for i,j in pairs: + vals = [le16(d[i],d[j]) for d in data if len(d)>j] + if vals: + yield (f"le16[{i}-{j}]", np.array(vals, dtype=float)), times[:len(vals)] + vals = [be16(d[i],d[j]) for d in data if len(d)>j] + if vals: + yield (f"be16[{i}-{j}]", np.array(vals, dtype=float)), times[:len(vals)] + +def prefilter(vals: np.ndarray): + if vals.size < 12: + return False, {"reason":"too_few_samples"} + uniq = np.unique(vals) + if uniq.size <= 2: + return False, {"reason":"too_constant"} + p95 = p95_abs_diff(vals) + if p95 == 0: + return False, {"reason":"no_changes"} + r = float(np.percentile(vals, 97) - np.percentile(vals, 3) + 1e-9) + if p95 > 0.5*r: + return False, {"reason":"too_jumpi"} + return True, {"p95_abs_diff":p95, "span_est":r} + +def try_scaleset(): + base = [1e-3, 2e-3, 5e-3, + 1e-2, 2e-2, 5e-2, + 0.1, 0.2, 0.25, 0.5, + 1.0, 2.0, 5.0, 10.0, + 0.0625, 0.125, 0.75, 0.8, 1.25] + return sorted(set(base)) + +def evaluate_supervised(label, vals: np.ndarray, times: np.ndarray, rmin: float, rmax: float, allow_neg_scale=False): + ok, meta = prefilter(vals) + if not ok: + return None + scales = try_scaleset() + if allow_neg_scale: + scales = scales + [-s for s in scales if s>0] + best = {"hit_ratio": -1.0} + for s in scales: + o, hr = interval_best_offset(vals, s, rmin, rmax) + if hr > best["hit_ratio"]: + best = {"scale":s, "offset":float(o), "hit_ratio":hr} + phys = vals*best["scale"] + best["offset"] + within = (phys>=rmin) & (phys<=rmax) + in_count = int(np.count_nonzero(within)) + p95_raw = p95_abs_diff(vals) + p95_phys = p95_abs_diff(phys) + rate = basic_rate(times[:len(vals)]) + return { + "label": label, + "mode": "range_fit", + "n": int(vals.size), + "rate_hz_est": float(rate), + "raw_min": float(np.min(vals)), + "raw_max": float(np.max(vals)), + "raw_var": float(np.var(vals)), + "p95_absdiff_raw": float(p95_raw), + "scale": float(best["scale"]), + "offset": float(best["offset"]), + "hit_ratio": float(best["hit_ratio"]), + "in_count": in_count, + "phys_min": float(np.min(phys)), + "phys_max": float(np.max(phys)), + "p95_absdiff_phys": float(p95_phys), + "span_phys": float(np.percentile(phys, 97) - np.percentile(phys, 3)), + "prefilter_span_est": float(meta.get("span_est", 0.0)), + "prefilter_p95_absdiff": float(meta.get("p95_abs_diff", 0.0)), + } + +def evaluate_unsupervised(label, vals: np.ndarray, times: np.ndarray, min_smooth=0.2): + """ + Liefert nur Plausibilitätsmetriken (keine scale/offset). + smoothness = 1 - clamp(p95(|Δ|) / span, 0..1) + uniq_ratio = |unique| / n + Ranking: smoothness desc, span desc, var desc, rate desc, n desc + """ + if vals.size < 12: + return None + p95 = p95_abs_diff(vals) + span = float(np.percentile(vals, 97) - np.percentile(vals, 3) + 1e-9) + smooth = 1.0 - min(max(p95/span, 0.0), 1.0) + uniq = len(np.unique(vals)) + uniq_ratio = float(uniq) / float(vals.size) + var = float(np.var(vals)) + rate = basic_rate(times[:len(vals)]) + + # Filter: zu konstant, zu sprunghaft + if uniq_ratio <= 0.02: + return None + if smooth < min_smooth: + return None + + return { + "label": label, + "mode": "unsupervised", + "n": int(vals.size), + "rate_hz_est": float(rate), + "raw_min": float(np.min(vals)), + "raw_max": float(np.max(vals)), + "raw_var": var, + "span_raw": span, + "p95_absdiff_raw": float(p95), + "smoothness": float(smooth), + "uniq_ratio": float(uniq_ratio), + } + +def plot_timeseries(times, series, out_png: Path, title: str, ylabel: str): + plt.figure(figsize=(10,4)) + plt.plot(times[:len(series)], series, marker=".", linestyle="-") + plt.xlabel("Zeit (s)"); plt.ylabel(ylabel) + plt.title(title); plt.grid(True); plt.tight_layout() + out_png.parent.mkdir(parents=True, exist_ok=True) + plt.savefig(out_png, dpi=150); plt.close() + +def main(): + ap = argparse.ArgumentParser(description="Finde Encoding-Kandidaten (mit Range) oder plausible Rohsignale (ohne Range) in einer .trace-Datei") + ap.add_argument("trace", help="Pfad zur .trace Datei (aus can_split_by_id.py)") + ap.add_argument("--rmin", type=float, default=None, help="untere Grenze des Zielbereichs (phys)") + ap.add_argument("--rmax", type=float, default=None, help="obere Grenze des Zielbereichs (phys)") + ap.add_argument("--rx-only", action="store_true", help="Nur RX Frames nutzen") + ap.add_argument("--allow-neg-scale", action="store_true", help="Auch negative scale testen (nur Range-Fit)") + ap.add_argument("--outdir", default=".", help="Output-Verzeichnis (CSV/Plots)") + ap.add_argument("--plots-top", type=int, default=8, help="Erzeuge Plots für die Top-N Kandidaten") + ap.add_argument("--min-hit", type=float, default=0.5, help="Mindest-Hit-Ratio für Range-Fit (0..1)") + ap.add_argument("--min-smooth", type=float, default=0.2, help="Mindest-Smoothness für Unsupervised (0..1)") + args = ap.parse_args() + + trace = Path(args.trace) + df = parse_trace(trace, rx_only=args.rx_only) + if df.empty: + print("Keine Daten in Trace.", file=sys.stderr); sys.exit(2) + + supervised = (args.rmin is not None) and (args.rmax is not None) + results = [] + + for (label, series), times in gen_candidates(df): + if supervised: + r = evaluate_supervised(label, series, times, args.rmin, args.rmax, allow_neg_scale=args.allow_neg_scale) + if r is None: + continue + if r["hit_ratio"] >= args.min_hit: + r["trace"] = trace.stem + results.append(r) + else: + r = evaluate_unsupervised(label, series, times, min_smooth=args.min_smooth) + if r is None: + continue + r["trace"] = trace.stem + results.append(r) + + if not results: + if supervised: + print("Keine Kandidaten über Schwelle gefunden. Tipp: --min-hit senken oder --allow-neg-scale testen.", file=sys.stderr) + else: + print("Keine plausiblen Rohsignale gefunden. Tipp: --min-smooth senken.", file=sys.stderr) + sys.exit(3) + + outdir = Path(args.outdir); outdir.mkdir(parents=True, exist_ok=True) + + if supervised: + df_res = pd.DataFrame(results).sort_values(["hit_ratio", "p95_absdiff_phys", "rate_hz_est", "n"], ascending=[False, True, False, False]) + csv_path = outdir / f"{trace.stem}_encoding_candidates.csv" + df_res.to_csv(csv_path, index=False) + print(f"Kandidaten-CSV: {csv_path}") + # Plots + for _, row in df_res.head(args.plots_top).iterrows(): + # decode again + times = df["time_s"].to_numpy(dtype=float) + data = df["data"].tolist() + label = row["label"] + if label.startswith("byte["): + i = int(label.split("[")[1].split("]")[0]) + vals = np.array([d[i] for d in data if len(d)>i], dtype=float) + elif label.startswith("le16["): + i,j = map(int, label.split("[")[1].split("]")[0].split("-")) + vals = np.array([le16(d[i],d[j]) for d in data if len(d)>j], dtype=float) + elif label.startswith("be16["): + i,j = map(int, label.split("[")[1].split("]")[0].split("-")) + vals = np.array([be16(d[i],d[j]) for d in data if len(d)>j], dtype=float) + else: + continue + phys = vals*row["scale"] + row["offset"] + out_png = outdir / f"{trace.stem}_{label.replace('[','_').replace(']','')}.png" + plot_timeseries(times[:len(phys)], phys, out_png, f"{trace.name} – {label} (scale={row['scale']:.6g}, offset={row['offset']:.6g})", "phys (geschätzt)") + # console + cols = ["label","hit_ratio","scale","offset","p95_absdiff_phys","rate_hz_est","n","phys_min","phys_max"] + print("\nTop-Kandidaten:") + print(df_res.head(10)[cols].to_string(index=False)) + else: + # Unsupervised + df_res = pd.DataFrame(results).sort_values(["smoothness","span_raw","raw_var","rate_hz_est","n"], ascending=[False, False, False, False, False]) + csv_path = outdir / f"{trace.stem}_unsupervised_candidates.csv" + df_res.to_csv(csv_path, index=False) + print(f"Unsupervised-CSV: {csv_path}") + # Plots + for _, row in df_res.head(max(1, args.plots_top)).iterrows(): + # regenerate series for plot + times = df["time_s"].to_numpy(dtype=float) + data = df["data"].tolist() + label = row["label"] + if label.startswith("byte["): + i = int(label.split("[")[1].split("]")[0]) + vals = np.array([d[i] for d in data if len(d)>i], dtype=float) + elif label.startswith("le16["): + i,j = map(int, label.split("[")[1].split("]")[0].split("-")) + vals = np.array([le16(d[i],d[j]) for d in data if len(d)>j], dtype=float) + elif label.startswith("be16["): + i,j = map(int, label.split("[")[1].split("]")[0].split("-")) + vals = np.array([be16(d[i],d[j]) for d in data if len(d)>j], dtype=float) + else: + continue + out_png = outdir / f"{trace.stem}_{label.replace('[','_').replace(']','')}_raw.png" + plot_timeseries(times[:len(vals)], vals, out_png, f"{trace.name} – {label} (raw)", "raw") + # console + cols = ["label","smoothness","span_raw","raw_var","rate_hz_est","n","uniq_ratio","p95_absdiff_raw"] + print("\nTop plausible Rohsignale:") + print(df_res.head(10)[cols].to_string(index=False)) + +if __name__ == "__main__": + main() \ No newline at end of file