Files
Kettenoeler/Reverse-Engineering CAN-Bus/trace_signal_fitter.py
Marcel Peterkau a9053997a1
All checks were successful
CI-Build/Kettenoeler/pipeline/head This commit looks good
update of trace_signal_fitter.py and some doc
2025-08-27 23:59:57 +02:00

661 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
trace_signal_fitter.py Advanced Range-/Unsupervised-Fit mit Physik-Constraints & Bericht
Modi:
1) Range-Fit (supervised): --rmin/--rmax gesetzt → finde scale & offset, maximiere Hit-Ratio in [rmin, rmax].
2) Unsupervised: ohne Range → plausible Rohsignale nach Smoothness/Var/Rate/Span.
Neu:
- Periodizität: Rate (Hz), Jitter (std der Inter-Arrival-Times), CV.
- Slew-Rate: p95/p99 von |Δ|/s (supervised in phys-Einheit, unsupervised normiert auf Roh-Span).
- Grenzwerte als Argumente (--rate-min/max, --jitter-max-ms, --max-slope-abs, --max-slope-frac, ...).
- Zusätzlich signed 16-bit Varianten (le16s/be16s).
- JSON + Markdown-Bericht pro Trace mit PASS/FAIL und Begründungen.
Logformat (Kettenöler):
<timestamp_ms> <TX|RX> 0x<ID_HEX> <DLC> <byte0> <byte1> ... <byte7>
Outputs:
- supervised: <trace>_encoding_candidates.csv, Plots, <trace>_report.md, <trace>_report.json
- unsupervised: <trace>_unsupervised_candidates.csv, Plots, <trace>_report.md, <trace>_report.json
"""
from __future__ import annotations
import sys
import json
import argparse
from pathlib import Path
from typing import List, Tuple, Dict, Iterable
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# ---------- Parsing ----------
def parse_trace(path: Path, rx_only: bool = False) -> pd.DataFrame:
"""
Robustes Parsen des Kettenöler-Formats:
<ts_ms> <TX|RX> 0x<ID> <DLC> <b0> <b1> ... (hex)
"""
rows = []
with open(path, "r", errors="ignore") as f:
for line in f:
parts = line.strip().split()
if len(parts) < 4:
continue
try:
ts = int(parts[0])
dr = parts[1]
if rx_only and dr != "RX":
continue
cid = int(parts[2], 16) if parts[2].lower().startswith("0x") else int(parts[2], 16)
dlc = int(parts[3])
bytes_hex = parts[4:4+dlc] if dlc > 0 else []
data = []
for b in bytes_hex:
try:
data.append(int(b, 16))
except Exception:
data.append(0)
rows.append((ts, dr, cid, data))
except Exception:
continue
df = pd.DataFrame(rows, columns=["ts", "dir", "id", "data"])
if df.empty:
return df
df["time_s"] = (df["ts"] - df["ts"].min()) / 1000.0
return df
# ---------- Helpers ----------
def be16(a: int, b: int) -> int: return (a << 8) | b
def le16(a: int, b: int) -> int: return a | (b << 8)
def s16(u: int) -> int: return u if u < 0x8000 else u - 0x10000
def p_quant_abs_diff(arr: np.ndarray, q: float) -> float:
if arr.size < 2:
return 0.0
d = np.abs(np.diff(arr))
return float(np.percentile(d, q * 100))
def p_quant(arr: np.ndarray, q: float) -> float:
if arr.size == 0:
return 0.0
return float(np.percentile(arr, q * 100))
def interarrival_metrics(times: np.ndarray) -> Dict[str, float]:
if times.size < 2:
return {"rate_hz": 0.0, "period_mean": 0.0, "period_std": 0.0, "jitter_cv": 0.0, "n": int(times.size)}
dt = np.diff(times)
period_mean = float(np.mean(dt))
period_std = float(np.std(dt))
rate_hz = 1.0 / period_mean if period_mean > 0 else 0.0
jitter_cv = (period_std / period_mean) if period_mean > 0 else 0.0
return {"rate_hz": rate_hz, "period_mean": period_mean, "period_std": period_std, "jitter_cv": jitter_cv, "n": int(times.size)}
def slope_metrics(values: np.ndarray, times: np.ndarray) -> Dict[str, float]:
if values.size < 2:
return {"slope_p95": 0.0, "slope_p99": 0.0, "jerk_p95": 0.0}
dv = np.abs(np.diff(values))
dt = np.diff(times)
# vermeide Division durch 0
dt = np.where(dt <= 0, np.nan, dt)
slope = dv / dt
slope = slope[~np.isnan(slope)]
if slope.size == 0:
return {"slope_p95": 0.0, "slope_p99": 0.0, "jerk_p95": 0.0}
jerk = np.abs(np.diff(slope))
return {
"slope_p95": float(np.percentile(slope, 95)),
"slope_p99": float(np.percentile(slope, 99)),
"jerk_p95": float(np.percentile(jerk, 95)) if jerk.size > 0 else 0.0,
}
def prefilter(vals: np.ndarray) -> Tuple[bool, Dict[str, float]]:
if vals.size < 12:
return False, {"reason": "too_few_samples"}
uniq = np.unique(vals)
if uniq.size <= 2:
return False, {"reason": "too_constant"}
p95 = p_quant_abs_diff(vals, 0.95)
if p95 == 0:
return False, {"reason": "no_changes"}
r = float(np.percentile(vals, 97) - np.percentile(vals, 3) + 1e-9)
if p95 > 0.5 * r:
return False, {"reason": "too_jumpi"}
return True, {"p95_abs_diff": p95, "span_est": r}
def try_scaleset() -> List[float]:
base = [
1e-3, 2e-3, 5e-3,
1e-2, 2e-2, 5e-2,
0.05, 0.0625, 0.1, 0.125, 0.2, 0.25, 0.5,
0.75, 0.8, 1.0, 1.25, 2.0, 5.0, 10.0
]
return sorted(set(base))
def interval_best_offset(raw: np.ndarray, scale: float, rmin: float, rmax: float) -> Tuple[float, float]:
"""
Finde das Offset, das die meisten Werte (scale*raw + offset) in [rmin, rmax] bringt.
Sweep über Intervallgrenzen (klassische "interval stabbing" Lösung).
"""
a = rmin - scale * raw
b = rmax - scale * raw
lo = np.minimum(a, b)
hi = np.maximum(a, b)
events = []
for L, H in zip(lo, hi):
events.append((L, +1))
events.append((H, -1))
events.sort(key=lambda t: (t[0], -t[1]))
best = -1
cur = 0
best_x = None
for x, v in events:
cur += v
if cur > best:
best = cur
best_x = x
hit_ratio = float(best) / float(len(raw)) if len(raw) else 0.0
return float(best_x if best_x is not None else 0.0), hit_ratio
# ---------- Candidate Generation ----------
def gen_candidates(df: pd.DataFrame) -> Iterable[Tuple[str, np.ndarray, np.ndarray]]:
"""
Liefert (label, values, times) für:
- 8-bit Bytes D0..D7
- 16-bit adjazente Paare (LE/BE) + signed Varianten
Times wird auf die gefilterten Indizes gemappt (DLC-abhängig).
"""
times_all = df["time_s"].to_numpy(dtype=float)
data = df["data"].tolist()
# 8-bit
for i in range(8):
idx = [k for k, d in enumerate(data) if len(d) > i]
if len(idx) < 3:
continue
vals = np.array([data[k][i] for k in idx], dtype=float)
t = times_all[idx]
yield f"byte[{i}]", vals, t
# 16-bit adjazent
for i in range(7):
j = i + 1
idx = [k for k, d in enumerate(data) if len(d) > j]
if len(idx) < 3:
continue
a = [data[k][i] for k in idx]
b = [data[k][j] for k in idx]
u_le = np.array([le16(x, y) for x, y in zip(a, b)], dtype=float)
u_be = np.array([be16(x, y) for x, y in zip(a, b)], dtype=float)
s_le = np.array([s16(le16(x, y)) for x, y in zip(a, b)], dtype=float)
s_be = np.array([s16(be16(x, y)) for x, y in zip(a, b)], dtype=float)
t = times_all[idx]
yield f"le16[{i}-{j}]", u_le, t
yield f"be16[{i}-{j}]", u_be, t
yield f"le16s[{i}-{j}]", s_le, t
yield f"be16s[{i}-{j}]", s_be, t
# ---------- Evaluation ----------
def evaluate_supervised(label: str,
vals: np.ndarray,
times: np.ndarray,
rmin: float,
rmax: float,
allow_neg_scale: bool,
constraints: Dict[str, float]) -> Dict[str, float] | None:
ok, meta = prefilter(vals)
if not ok:
return None
scales = try_scaleset()
if allow_neg_scale:
scales += [-s for s in scales if s > 0]
best = {"hit_ratio": -1.0, "scale": None, "offset": 0.0}
for s in scales:
o, hr = interval_best_offset(vals, s, rmin, rmax)
if hr > best["hit_ratio"]:
best = {"scale": s, "offset": float(o), "hit_ratio": hr}
phys = vals * best["scale"] + best["offset"]
within = (phys >= rmin) & (phys <= rmax)
in_count = int(np.count_nonzero(within))
p95_raw = p_quant_abs_diff(vals, 0.95)
p95_phys = p_quant_abs_diff(phys, 0.95)
ia = interarrival_metrics(times[:len(vals)])
sm = slope_metrics(phys, times[:len(phys)])
prange = (rmax - rmin) if (rmax > rmin) else 1.0
slope_p95_frac = sm["slope_p95"] / prange
slope_p99_frac = sm["slope_p99"] / prange
failures = []
if constraints.get("rate_min") is not None and ia["rate_hz"] < constraints["rate_min"] - 1e-9:
failures.append(f"rate {ia['rate_hz']:.2f}Hz < min {constraints['rate_min']:.2f}Hz")
if constraints.get("rate_max") is not None and ia["rate_hz"] > constraints["rate_max"] + 1e-9:
failures.append(f"rate {ia['rate_hz']:.2f}Hz > max {constraints['rate_max']:.2f}Hz")
if constraints.get("jitter_max_ms") is not None:
jitter_ms = ia["period_std"] * 1000.0
if jitter_ms > constraints["jitter_max_ms"] + 1e-9:
failures.append(f"jitter {jitter_ms:.1f}ms > max {constraints['jitter_max_ms']:.1f}ms")
def _resolve_abs_slope_limit():
if constraints.get("max_slope_abs") is not None:
return constraints["max_slope_abs"]
if constraints.get("max_slope_frac") is not None:
return constraints["max_slope_frac"] * prange
return None
max_s_abs = _resolve_abs_slope_limit()
if max_s_abs is not None:
q = constraints.get("slope_quantile", 0.95)
qv = sm["slope_p95"] if q <= 0.95 else sm["slope_p99"]
if qv > max_s_abs + 1e-9:
failures.append(f"slope(q={q:.2f}) {qv:.3g} > max {max_s_abs:.3g}")
uniq_ratio = len(np.unique(vals)) / float(len(vals))
if constraints.get("min_uniq_ratio") is not None and uniq_ratio < constraints["min_uniq_ratio"] - 1e-9:
failures.append(f"uniq_ratio {uniq_ratio:.3f} < min {constraints['min_uniq_ratio']:.3f}")
passed = (len(failures) == 0)
# Quality Score
score = best["hit_ratio"]
if max_s_abs is not None and max_s_abs > 0:
slope_pen = min(sm["slope_p95"] / max_s_abs, 1.0)
score *= (1.0 - 0.3 * slope_pen)
if constraints.get("jitter_max_ms") is not None:
jitter_ms = ia["period_std"] * 1000.0
jitter_pen = min(jitter_ms / constraints["jitter_max_ms"], 1.0)
score *= (1.0 - 0.2 * jitter_pen)
return {
"label": label,
"mode": "range_fit",
"n": int(vals.size),
"raw_min": float(np.min(vals)),
"raw_max": float(np.max(vals)),
"raw_var": float(np.var(vals)),
"p95_absdiff_raw": float(p95_raw),
"scale": float(best["scale"]),
"offset": float(best["offset"]),
"hit_ratio": float(best["hit_ratio"]),
"in_count": in_count,
"phys_min": float(np.min(phys)),
"phys_max": float(np.max(phys)),
"p95_absdiff_phys": float(p95_phys),
"span_phys": float(np.percentile(phys, 97) - np.percentile(phys, 3)),
"rate_hz_est": float(ia["rate_hz"]),
"period_std_ms": float(ia["period_std"] * 1000.0),
"jitter_cv": float(ia["jitter_cv"]),
"slope_p95_per_s": float(sm["slope_p95"]),
"slope_p99_per_s": float(sm["slope_p99"]),
"slope_p95_frac": float(slope_p95_frac),
"slope_p99_frac": float(slope_p99_frac),
"uniq_ratio": float(uniq_ratio),
"passed": bool(passed),
"fail_reasons": "; ".join(failures),
"quality_score": float(score),
}
def evaluate_unsupervised(label: str,
vals: np.ndarray,
times: np.ndarray,
min_smooth: float = 0.2,
max_slope_frac_raw: float | None = None,
slope_quantile: float = 0.95) -> Dict[str, float] | None:
if vals.size < 12:
return None
p95 = p_quant_abs_diff(vals, 0.95)
span = float(np.percentile(vals, 97) - np.percentile(vals, 3) + 1e-9)
smooth = 1.0 - min(max(p95 / span, 0.0), 1.0)
uniq_ratio = float(len(np.unique(vals))) / float(vals.size)
var = float(np.var(vals))
ia = interarrival_metrics(times[:len(vals)])
sm = slope_metrics(vals, times[:len(vals)])
slope_q = sm["slope_p95"] if slope_quantile <= 0.95 else sm["slope_p99"]
slope_frac_raw = (slope_q / span) if span > 0 else 0.0
if uniq_ratio <= 0.02:
return None
if smooth < min_smooth:
return None
if (max_slope_frac_raw is not None) and (slope_frac_raw > max_slope_frac_raw):
return None
return {
"label": label,
"mode": "unsupervised",
"n": int(vals.size),
"raw_min": float(np.min(vals)),
"raw_max": float(np.max(vals)),
"raw_var": var,
"span_raw": span,
"p95_absdiff_raw": float(p95),
"smoothness": float(smooth),
"uniq_ratio": float(uniq_ratio),
"rate_hz_est": float(ia["rate_hz"]),
"period_std_ms": float(ia["period_std"] * 1000.0),
"jitter_cv": float(ia["jitter_cv"]),
"slope_q_raw": float(slope_q),
"slope_frac_raw": float(slope_frac_raw),
}
# ---------- Plot & Report ----------
def plot_timeseries(times: np.ndarray, series: np.ndarray, out_png: Path, title: str, ylabel: str) -> None:
plt.figure(figsize=(10, 4))
plt.plot(times[:len(series)], series, marker=".", linestyle="-")
plt.xlabel("Zeit (s)")
plt.ylabel(ylabel)
plt.title(title)
plt.grid(True)
plt.tight_layout()
out_png.parent.mkdir(parents=True, exist_ok=True)
plt.savefig(out_png, dpi=150)
plt.close()
def df_to_md_table(df: pd.DataFrame) -> str:
"""Robustes Markdown-Table: nutzt to_markdown falls vorhanden, sonst CSV in Codeblock."""
try:
return df.to_markdown(index=False) # benötigt evtl. 'tabulate'
except Exception:
return "```\n" + df.to_csv(index=False) + "```"
def write_report_md(path: Path, header: dict, top_rows: pd.DataFrame, failures: pd.DataFrame, mode: str, links: dict) -> None:
md = []
md.append(f"# Trace Report {header.get('trace_name','')}")
md.append("")
md.append(f"- **Mode:** {mode}")
for k, v in header.items():
if k in ("trace_name",):
continue
md.append(f"- **{k}**: {v}")
md.append("")
if mode == "range_fit":
md.append("## Top-Kandidaten (Range-Fit)")
md.append("Hit-Ratio, Slope/Jitter & Score beste zuerst.\n")
if top_rows is not None and not top_rows.empty:
md.append(df_to_md_table(top_rows))
else:
md.append("_Keine Kandidaten über Schwelle._")
md.append("")
if failures is not None and not failures.empty:
md.append("## Ausgeschlossene Kandidaten (Gründe)\n")
md.append(df_to_md_table(failures[["label", "fail_reasons"]]))
else:
md.append("## Top-Kandidaten (Unsupervised)\n")
if top_rows is not None and not top_rows.empty:
md.append(df_to_md_table(top_rows))
else:
md.append("_Keine plausiblen Rohsignale._")
md.append("\n## Artefakte")
for k, v in links.items():
md.append(f"- **{k}**: `{v}`")
path.write_text("\n".join(md), encoding="utf-8")
# ---------- Main ----------
def main():
ap = argparse.ArgumentParser(description="Range-/Unsupervised-Fit mit physikbasierten Constraints + Bericht")
ap.add_argument("trace", help="Pfad zur .trace Datei")
# supervision
ap.add_argument("--rmin", type=float, default=None)
ap.add_argument("--rmax", type=float, default=None)
ap.add_argument("--allow-neg-scale", action="store_true")
# shared
ap.add_argument("--rx-only", action="store_true")
ap.add_argument("--outdir", default=".")
ap.add_argument("--plots-top", type=int, default=8)
# supervised thresholds
ap.add_argument("--min-hit", type=float, default=0.5)
ap.add_argument("--rate-min", type=float, default=None)
ap.add_argument("--rate-max", type=float, default=None)
ap.add_argument("--jitter-max-ms", type=float, default=None)
ap.add_argument("--max-slope-abs", type=float, default=None, help="Max |Δphys|/s (z. B. °C/s, km/h/s)")
ap.add_argument("--max-slope-frac", type=float, default=None, help="Max |Δphys|/s relativ zu (rmax-rmin)")
ap.add_argument("--slope-quantile", type=float, default=0.95, help="0.95 oder 0.99")
ap.add_argument("--min-uniq-ratio", type=float, default=None)
# unsupervised thresholds
ap.add_argument("--min-smooth", type=float, default=0.2)
ap.add_argument("--max-slope-frac-raw", type=float, default=None, help="roh: (|Δraw|/s)/Span")
args = ap.parse_args()
trace = Path(args.trace)
df = parse_trace(trace, rx_only=args.rx_only)
if df.empty:
print("Keine Daten in Trace.", file=sys.stderr)
sys.exit(2)
supervised = (args.rmin is not None) and (args.rmax is not None)
outdir = Path(args.outdir)
outdir.mkdir(parents=True, exist_ok=True)
if supervised:
constraints = {
"rate_min": args.rate_min,
"rate_max": args.rate_max,
"jitter_max_ms": args.jitter_max_ms,
"max_slope_abs": args.max_slope_abs,
"max_slope_frac": args.max_slope_frac,
"slope_quantile": args.slope_quantile,
"min_uniq_ratio": args.min_uniq_ratio,
}
results = []
rejected = []
for label, series, times in gen_candidates(df):
r = evaluate_supervised(label, series, times, args.rmin, args.rmax, args.allow_neg_scale, constraints)
if r is None:
continue
if r["hit_ratio"] >= args.min_hit:
(results if r["passed"] else rejected).append({**r, "trace": trace.stem})
if not results and not rejected:
print("Keine Kandidaten über Schwelle gefunden.", file=sys.stderr)
sys.exit(3)
df_ok = pd.DataFrame(results).sort_values(
["quality_score", "hit_ratio", "p95_absdiff_phys", "rate_hz_est", "n"],
ascending=[False, False, True, False, False]
)
df_rej = pd.DataFrame(rejected)
csv_path = outdir / f"{trace.stem}_encoding_candidates.csv"
if not df_ok.empty:
df_ok.to_csv(csv_path, index=False)
print(f"Kandidaten-CSV: {csv_path}")
# Plots für Top-Kandidaten (oder Rejected, falls keine OK)
top_for_plots = df_ok if not df_ok.empty else df_rej
data = df["data"].tolist()
times_all = df["time_s"].to_numpy(dtype=float)
def reconstruct_vals(label: str) -> np.ndarray | None:
if label.startswith("byte["):
i = int(label.split("[")[1].split("]")[0])
idx = [k for k, d in enumerate(data) if len(d) > i]
if not idx: return None
return np.array([data[k][i] for k in idx], dtype=float), times_all[idx]
elif label.startswith(("le16", "be16", "le16s", "be16s")):
signed = label.startswith(("le16s", "be16s"))
i, j = map(int, label.split("[")[1].split("]")[0].split("-"))
idx = [k for k, d in enumerate(data) if len(d) > j]
if not idx: return None
a = [data[k][i] for k in idx]
b = [data[k][j] for k in idx]
if label.startswith("le16"):
v = [le16(x, y) for x, y in zip(a, b)]
else:
v = [be16(x, y) for x, y in zip(a, b)]
if signed:
v = [s16(int(x)) for x in v]
return np.array(v, dtype=float), times_all[idx]
return None
for _, row in top_for_plots.head(max(1, args.plots_top)).iterrows():
rec = reconstruct_vals(row["label"])
if rec is None:
continue
vals, tt = rec
phys = vals * row["scale"] + row["offset"]
out_png = outdir / f"{trace.stem}_{row['label'].replace('[','_').replace(']','')}.png"
plot_timeseries(tt[:len(phys)], phys, out_png,
f"{trace.name} {row['label']} (scale={row['scale']:.6g}, offset={row['offset']:.6g})",
"phys (geschätzt)")
# Bericht
hdr = {
"trace_name": trace.name,
"mode": "range_fit",
"rmin": args.rmin,
"rmax": args.rmax,
"min_hit": args.min_hit,
"rate_min": args.rate_min,
"rate_max": args.rate_max,
"jitter_max_ms": args.jitter_max_ms,
"max_slope_abs": args.max_slope_abs,
"max_slope_frac": args.max_slope_frac,
"slope_quantile": args.slope_quantile,
}
top_view = df_ok.head(12)[
["label", "quality_score", "hit_ratio", "scale", "offset",
"rate_hz_est", "period_std_ms", "slope_p95_per_s", "slope_p99_per_s",
"p95_absdiff_phys", "uniq_ratio"]
] if not df_ok.empty else pd.DataFrame()
fail_view = df_rej[["label", "fail_reasons"]] if not df_rej.empty else pd.DataFrame()
md_path = outdir / f"{trace.stem}_report.md"
json_path = outdir / f"{trace.stem}_report.json"
write_report_md(md_path, hdr, top_view, fail_view, "range_fit",
{"candidates_csv": str(csv_path) if not df_ok.empty else "(leer)"})
with open(json_path, "w", encoding="utf-8") as f:
json.dump({
"header": hdr,
"accepted": df_ok.to_dict(orient="records"),
"rejected": df_rej.to_dict(orient="records"),
}, f, ensure_ascii=False, indent=2)
print(f"Report: {md_path}")
print(f"Report JSON: {json_path}")
if not df_ok.empty:
print("\nTop-Kandidaten:")
cols = ["label", "quality_score", "hit_ratio", "scale", "offset",
"rate_hz_est", "period_std_ms", "slope_p95_per_s", "slope_p99_per_s"]
print(df_ok.head(10)[cols].to_string(index=False))
else:
print("\nKeine Kandidaten PASS; siehe Gründe in report.")
else:
# Unsupervised
results = []
for label, series, times in gen_candidates(df):
r = evaluate_unsupervised(label, series, times,
min_smooth=args.min_smooth,
max_slope_frac_raw=args.max_slope_frac_raw,
slope_quantile=args.slope_quantile)
if r is None:
continue
r["trace"] = trace.stem
results.append(r)
if not results:
print("Keine plausiblen Rohsignale gefunden. Tipp: --min-smooth senken.", file=sys.stderr)
sys.exit(3)
df_res = pd.DataFrame(results).sort_values(
["smoothness", "span_raw", "raw_var", "rate_hz_est", "n"],
ascending=[False, False, False, False, False]
)
csv_path = outdir / f"{trace.stem}_unsupervised_candidates.csv"
df_res.to_csv(csv_path, index=False)
print(f"Unsupervised-CSV: {csv_path}")
# Plots der Top-N (Rohwerte)
data = df["data"].tolist()
times_all = df["time_s"].to_numpy(dtype=float)
def reconstruct_raw(label: str) -> Tuple[np.ndarray, np.ndarray] | None:
if label.startswith("byte["):
i = int(label.split("[")[1].split("]")[0])
idx = [k for k, d in enumerate(data) if len(d) > i]
if not idx: return None
return np.array([data[k][i] for k in idx], dtype=float), times_all[idx]
elif label.startswith(("le16", "be16", "le16s", "be16s")):
signed = label.startswith(("le16s", "be16s"))
i, j = map(int, label.split("[")[1].split("]")[0].split("-"))
idx = [k for k, d in enumerate(data) if len(d) > j]
if not idx: return None
a = [data[k][i] for k in idx]
b = [data[k][j] for k in idx]
if label.startswith("le16"):
v = [le16(x, y) for x, y in zip(a, b)]
else:
v = [be16(x, y) for x, y in zip(a, b)]
if signed:
v = [s16(int(x)) for x in v]
return np.array(v, dtype=float), times_all[idx]
return None
for _, row in df_res.head(max(1, args.plots_top)).iterrows():
rec = reconstruct_raw(row["label"])
if rec is None:
continue
vals, tt = rec
out_png = outdir / f"{trace.stem}_{row['label'].replace('[','_').replace(']','')}_raw.png"
plot_timeseries(tt[:len(vals)], vals, out_png,
f"{trace.name} {row['label']} (raw)", "raw")
# Bericht
hdr = {
"trace_name": trace.name,
"mode": "unsupervised",
"min_smooth": args.min_smooth,
"max_slope_frac_raw": args.max_slope_frac_raw,
}
top_view = df_res.head(12)[
["label", "smoothness", "span_raw", "raw_var",
"rate_hz_est", "period_std_ms", "slope_frac_raw", "uniq_ratio"]
]
md_path = outdir / f"{trace.stem}_report.md"
json_path = outdir / f"{trace.stem}_report.json"
write_report_md(md_path, hdr, top_view, pd.DataFrame(), "unsupervised",
{"candidates_csv": str(csv_path)})
with open(json_path, "w", encoding="utf-8") as f:
json.dump({
"header": hdr,
"accepted": df_res.to_dict(orient="records"),
}, f, ensure_ascii=False, indent=2)
print(f"Report: {md_path}")
print(f"Report JSON: {json_path}")
if __name__ == "__main__":
main()