#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ trace_signal_fitter.py – Advanced Range-/Unsupervised-Fit mit Physik-Constraints & Bericht Modi: 1) Range-Fit (supervised): --rmin/--rmax gesetzt → finde scale & offset, maximiere Hit-Ratio in [rmin, rmax]. 2) Unsupervised: ohne Range → plausible Rohsignale nach Smoothness/Var/Rate/Span. Neu: - Periodizität: Rate (Hz), Jitter (std der Inter-Arrival-Times), CV. - Slew-Rate: p95/p99 von |Δ|/s (supervised in phys-Einheit, unsupervised normiert auf Roh-Span). - Grenzwerte als Argumente (--rate-min/max, --jitter-max-ms, --max-slope-abs, --max-slope-frac, ...). - Zusätzlich signed 16-bit Varianten (le16s/be16s). - JSON + Markdown-Bericht pro Trace mit PASS/FAIL und Begründungen. Logformat (Kettenöler): 0x ... Outputs: - supervised: _encoding_candidates.csv, Plots, _report.md, _report.json - unsupervised: _unsupervised_candidates.csv, Plots, _report.md, _report.json """ from __future__ import annotations import sys import json import argparse from pathlib import Path from typing import List, Tuple, Dict, Iterable import numpy as np import pandas as pd import matplotlib.pyplot as plt # ---------- Parsing ---------- def parse_trace(path: Path, rx_only: bool = False) -> pd.DataFrame: """ Robustes Parsen des Kettenöler-Formats: 0x ... (hex) """ rows = [] with open(path, "r", errors="ignore") as f: for line in f: parts = line.strip().split() if len(parts) < 4: continue try: ts = int(parts[0]) dr = parts[1] if rx_only and dr != "RX": continue cid = int(parts[2], 16) if parts[2].lower().startswith("0x") else int(parts[2], 16) dlc = int(parts[3]) bytes_hex = parts[4:4+dlc] if dlc > 0 else [] data = [] for b in bytes_hex: try: data.append(int(b, 16)) except Exception: data.append(0) rows.append((ts, dr, cid, data)) except Exception: continue df = pd.DataFrame(rows, columns=["ts", "dir", "id", "data"]) if df.empty: return df df["time_s"] = (df["ts"] - df["ts"].min()) / 1000.0 return df # ---------- Helpers ---------- def be16(a: int, b: int) -> int: return (a << 8) | b def le16(a: int, b: int) -> int: return a | (b << 8) def s16(u: int) -> int: return u if u < 0x8000 else u - 0x10000 def p_quant_abs_diff(arr: np.ndarray, q: float) -> float: if arr.size < 2: return 0.0 d = np.abs(np.diff(arr)) return float(np.percentile(d, q * 100)) def p_quant(arr: np.ndarray, q: float) -> float: if arr.size == 0: return 0.0 return float(np.percentile(arr, q * 100)) def interarrival_metrics(times: np.ndarray) -> Dict[str, float]: if times.size < 2: return {"rate_hz": 0.0, "period_mean": 0.0, "period_std": 0.0, "jitter_cv": 0.0, "n": int(times.size)} dt = np.diff(times) period_mean = float(np.mean(dt)) period_std = float(np.std(dt)) rate_hz = 1.0 / period_mean if period_mean > 0 else 0.0 jitter_cv = (period_std / period_mean) if period_mean > 0 else 0.0 return {"rate_hz": rate_hz, "period_mean": period_mean, "period_std": period_std, "jitter_cv": jitter_cv, "n": int(times.size)} def slope_metrics(values: np.ndarray, times: np.ndarray) -> Dict[str, float]: if values.size < 2: return {"slope_p95": 0.0, "slope_p99": 0.0, "jerk_p95": 0.0} dv = np.abs(np.diff(values)) dt = np.diff(times) # vermeide Division durch 0 dt = np.where(dt <= 0, np.nan, dt) slope = dv / dt slope = slope[~np.isnan(slope)] if slope.size == 0: return {"slope_p95": 0.0, "slope_p99": 0.0, "jerk_p95": 0.0} jerk = np.abs(np.diff(slope)) return { "slope_p95": float(np.percentile(slope, 95)), "slope_p99": float(np.percentile(slope, 99)), "jerk_p95": float(np.percentile(jerk, 95)) if jerk.size > 0 else 0.0, } def prefilter(vals: np.ndarray) -> Tuple[bool, Dict[str, float]]: if vals.size < 12: return False, {"reason": "too_few_samples"} uniq = np.unique(vals) if uniq.size <= 2: return False, {"reason": "too_constant"} p95 = p_quant_abs_diff(vals, 0.95) if p95 == 0: return False, {"reason": "no_changes"} r = float(np.percentile(vals, 97) - np.percentile(vals, 3) + 1e-9) if p95 > 0.5 * r: return False, {"reason": "too_jumpi"} return True, {"p95_abs_diff": p95, "span_est": r} def try_scaleset() -> List[float]: base = [ 1e-3, 2e-3, 5e-3, 1e-2, 2e-2, 5e-2, 0.05, 0.0625, 0.1, 0.125, 0.2, 0.25, 0.5, 0.75, 0.8, 1.0, 1.25, 2.0, 5.0, 10.0 ] return sorted(set(base)) def interval_best_offset(raw: np.ndarray, scale: float, rmin: float, rmax: float) -> Tuple[float, float]: """ Finde das Offset, das die meisten Werte (scale*raw + offset) in [rmin, rmax] bringt. Sweep über Intervallgrenzen (klassische "interval stabbing" Lösung). """ a = rmin - scale * raw b = rmax - scale * raw lo = np.minimum(a, b) hi = np.maximum(a, b) events = [] for L, H in zip(lo, hi): events.append((L, +1)) events.append((H, -1)) events.sort(key=lambda t: (t[0], -t[1])) best = -1 cur = 0 best_x = None for x, v in events: cur += v if cur > best: best = cur best_x = x hit_ratio = float(best) / float(len(raw)) if len(raw) else 0.0 return float(best_x if best_x is not None else 0.0), hit_ratio # ---------- Candidate Generation ---------- def gen_candidates(df: pd.DataFrame) -> Iterable[Tuple[str, np.ndarray, np.ndarray]]: """ Liefert (label, values, times) für: - 8-bit Bytes D0..D7 - 16-bit adjazente Paare (LE/BE) + signed Varianten Times wird auf die gefilterten Indizes gemappt (DLC-abhängig). """ times_all = df["time_s"].to_numpy(dtype=float) data = df["data"].tolist() # 8-bit for i in range(8): idx = [k for k, d in enumerate(data) if len(d) > i] if len(idx) < 3: continue vals = np.array([data[k][i] for k in idx], dtype=float) t = times_all[idx] yield f"byte[{i}]", vals, t # 16-bit adjazent for i in range(7): j = i + 1 idx = [k for k, d in enumerate(data) if len(d) > j] if len(idx) < 3: continue a = [data[k][i] for k in idx] b = [data[k][j] for k in idx] u_le = np.array([le16(x, y) for x, y in zip(a, b)], dtype=float) u_be = np.array([be16(x, y) for x, y in zip(a, b)], dtype=float) s_le = np.array([s16(le16(x, y)) for x, y in zip(a, b)], dtype=float) s_be = np.array([s16(be16(x, y)) for x, y in zip(a, b)], dtype=float) t = times_all[idx] yield f"le16[{i}-{j}]", u_le, t yield f"be16[{i}-{j}]", u_be, t yield f"le16s[{i}-{j}]", s_le, t yield f"be16s[{i}-{j}]", s_be, t # ---------- Evaluation ---------- def evaluate_supervised(label: str, vals: np.ndarray, times: np.ndarray, rmin: float, rmax: float, allow_neg_scale: bool, constraints: Dict[str, float]) -> Dict[str, float] | None: ok, meta = prefilter(vals) if not ok: return None scales = try_scaleset() if allow_neg_scale: scales += [-s for s in scales if s > 0] best = {"hit_ratio": -1.0, "scale": None, "offset": 0.0} for s in scales: o, hr = interval_best_offset(vals, s, rmin, rmax) if hr > best["hit_ratio"]: best = {"scale": s, "offset": float(o), "hit_ratio": hr} phys = vals * best["scale"] + best["offset"] within = (phys >= rmin) & (phys <= rmax) in_count = int(np.count_nonzero(within)) p95_raw = p_quant_abs_diff(vals, 0.95) p95_phys = p_quant_abs_diff(phys, 0.95) ia = interarrival_metrics(times[:len(vals)]) sm = slope_metrics(phys, times[:len(phys)]) prange = (rmax - rmin) if (rmax > rmin) else 1.0 slope_p95_frac = sm["slope_p95"] / prange slope_p99_frac = sm["slope_p99"] / prange failures = [] if constraints.get("rate_min") is not None and ia["rate_hz"] < constraints["rate_min"] - 1e-9: failures.append(f"rate {ia['rate_hz']:.2f}Hz < min {constraints['rate_min']:.2f}Hz") if constraints.get("rate_max") is not None and ia["rate_hz"] > constraints["rate_max"] + 1e-9: failures.append(f"rate {ia['rate_hz']:.2f}Hz > max {constraints['rate_max']:.2f}Hz") if constraints.get("jitter_max_ms") is not None: jitter_ms = ia["period_std"] * 1000.0 if jitter_ms > constraints["jitter_max_ms"] + 1e-9: failures.append(f"jitter {jitter_ms:.1f}ms > max {constraints['jitter_max_ms']:.1f}ms") def _resolve_abs_slope_limit(): if constraints.get("max_slope_abs") is not None: return constraints["max_slope_abs"] if constraints.get("max_slope_frac") is not None: return constraints["max_slope_frac"] * prange return None max_s_abs = _resolve_abs_slope_limit() if max_s_abs is not None: q = constraints.get("slope_quantile", 0.95) qv = sm["slope_p95"] if q <= 0.95 else sm["slope_p99"] if qv > max_s_abs + 1e-9: failures.append(f"slope(q={q:.2f}) {qv:.3g} > max {max_s_abs:.3g}") uniq_ratio = len(np.unique(vals)) / float(len(vals)) if constraints.get("min_uniq_ratio") is not None and uniq_ratio < constraints["min_uniq_ratio"] - 1e-9: failures.append(f"uniq_ratio {uniq_ratio:.3f} < min {constraints['min_uniq_ratio']:.3f}") passed = (len(failures) == 0) # Quality Score score = best["hit_ratio"] if max_s_abs is not None and max_s_abs > 0: slope_pen = min(sm["slope_p95"] / max_s_abs, 1.0) score *= (1.0 - 0.3 * slope_pen) if constraints.get("jitter_max_ms") is not None: jitter_ms = ia["period_std"] * 1000.0 jitter_pen = min(jitter_ms / constraints["jitter_max_ms"], 1.0) score *= (1.0 - 0.2 * jitter_pen) return { "label": label, "mode": "range_fit", "n": int(vals.size), "raw_min": float(np.min(vals)), "raw_max": float(np.max(vals)), "raw_var": float(np.var(vals)), "p95_absdiff_raw": float(p95_raw), "scale": float(best["scale"]), "offset": float(best["offset"]), "hit_ratio": float(best["hit_ratio"]), "in_count": in_count, "phys_min": float(np.min(phys)), "phys_max": float(np.max(phys)), "p95_absdiff_phys": float(p95_phys), "span_phys": float(np.percentile(phys, 97) - np.percentile(phys, 3)), "rate_hz_est": float(ia["rate_hz"]), "period_std_ms": float(ia["period_std"] * 1000.0), "jitter_cv": float(ia["jitter_cv"]), "slope_p95_per_s": float(sm["slope_p95"]), "slope_p99_per_s": float(sm["slope_p99"]), "slope_p95_frac": float(slope_p95_frac), "slope_p99_frac": float(slope_p99_frac), "uniq_ratio": float(uniq_ratio), "passed": bool(passed), "fail_reasons": "; ".join(failures), "quality_score": float(score), } def evaluate_unsupervised(label: str, vals: np.ndarray, times: np.ndarray, min_smooth: float = 0.2, max_slope_frac_raw: float | None = None, slope_quantile: float = 0.95) -> Dict[str, float] | None: if vals.size < 12: return None p95 = p_quant_abs_diff(vals, 0.95) span = float(np.percentile(vals, 97) - np.percentile(vals, 3) + 1e-9) smooth = 1.0 - min(max(p95 / span, 0.0), 1.0) uniq_ratio = float(len(np.unique(vals))) / float(vals.size) var = float(np.var(vals)) ia = interarrival_metrics(times[:len(vals)]) sm = slope_metrics(vals, times[:len(vals)]) slope_q = sm["slope_p95"] if slope_quantile <= 0.95 else sm["slope_p99"] slope_frac_raw = (slope_q / span) if span > 0 else 0.0 if uniq_ratio <= 0.02: return None if smooth < min_smooth: return None if (max_slope_frac_raw is not None) and (slope_frac_raw > max_slope_frac_raw): return None return { "label": label, "mode": "unsupervised", "n": int(vals.size), "raw_min": float(np.min(vals)), "raw_max": float(np.max(vals)), "raw_var": var, "span_raw": span, "p95_absdiff_raw": float(p95), "smoothness": float(smooth), "uniq_ratio": float(uniq_ratio), "rate_hz_est": float(ia["rate_hz"]), "period_std_ms": float(ia["period_std"] * 1000.0), "jitter_cv": float(ia["jitter_cv"]), "slope_q_raw": float(slope_q), "slope_frac_raw": float(slope_frac_raw), } # ---------- Plot & Report ---------- def plot_timeseries(times: np.ndarray, series: np.ndarray, out_png: Path, title: str, ylabel: str) -> None: plt.figure(figsize=(10, 4)) plt.plot(times[:len(series)], series, marker=".", linestyle="-") plt.xlabel("Zeit (s)") plt.ylabel(ylabel) plt.title(title) plt.grid(True) plt.tight_layout() out_png.parent.mkdir(parents=True, exist_ok=True) plt.savefig(out_png, dpi=150) plt.close() def df_to_md_table(df: pd.DataFrame) -> str: """Robustes Markdown-Table: nutzt to_markdown falls vorhanden, sonst CSV in Codeblock.""" try: return df.to_markdown(index=False) # benötigt evtl. 'tabulate' except Exception: return "```\n" + df.to_csv(index=False) + "```" def write_report_md(path: Path, header: dict, top_rows: pd.DataFrame, failures: pd.DataFrame, mode: str, links: dict) -> None: md = [] md.append(f"# Trace Report – {header.get('trace_name','')}") md.append("") md.append(f"- **Mode:** {mode}") for k, v in header.items(): if k in ("trace_name",): continue md.append(f"- **{k}**: {v}") md.append("") if mode == "range_fit": md.append("## Top-Kandidaten (Range-Fit)") md.append("Hit-Ratio, Slope/Jitter & Score – beste zuerst.\n") if top_rows is not None and not top_rows.empty: md.append(df_to_md_table(top_rows)) else: md.append("_Keine Kandidaten über Schwelle._") md.append("") if failures is not None and not failures.empty: md.append("## Ausgeschlossene Kandidaten (Gründe)\n") md.append(df_to_md_table(failures[["label", "fail_reasons"]])) else: md.append("## Top-Kandidaten (Unsupervised)\n") if top_rows is not None and not top_rows.empty: md.append(df_to_md_table(top_rows)) else: md.append("_Keine plausiblen Rohsignale._") md.append("\n## Artefakte") for k, v in links.items(): md.append(f"- **{k}**: `{v}`") path.write_text("\n".join(md), encoding="utf-8") # ---------- Main ---------- def main(): ap = argparse.ArgumentParser(description="Range-/Unsupervised-Fit mit physikbasierten Constraints + Bericht") ap.add_argument("trace", help="Pfad zur .trace Datei") # supervision ap.add_argument("--rmin", type=float, default=None) ap.add_argument("--rmax", type=float, default=None) ap.add_argument("--allow-neg-scale", action="store_true") # shared ap.add_argument("--rx-only", action="store_true") ap.add_argument("--outdir", default=".") ap.add_argument("--plots-top", type=int, default=8) # supervised thresholds ap.add_argument("--min-hit", type=float, default=0.5) ap.add_argument("--rate-min", type=float, default=None) ap.add_argument("--rate-max", type=float, default=None) ap.add_argument("--jitter-max-ms", type=float, default=None) ap.add_argument("--max-slope-abs", type=float, default=None, help="Max |Δphys|/s (z. B. °C/s, km/h/s)") ap.add_argument("--max-slope-frac", type=float, default=None, help="Max |Δphys|/s relativ zu (rmax-rmin)") ap.add_argument("--slope-quantile", type=float, default=0.95, help="0.95 oder 0.99") ap.add_argument("--min-uniq-ratio", type=float, default=None) # unsupervised thresholds ap.add_argument("--min-smooth", type=float, default=0.2) ap.add_argument("--max-slope-frac-raw", type=float, default=None, help="roh: (|Δraw|/s)/Span") args = ap.parse_args() trace = Path(args.trace) df = parse_trace(trace, rx_only=args.rx_only) if df.empty: print("Keine Daten in Trace.", file=sys.stderr) sys.exit(2) supervised = (args.rmin is not None) and (args.rmax is not None) outdir = Path(args.outdir) outdir.mkdir(parents=True, exist_ok=True) if supervised: constraints = { "rate_min": args.rate_min, "rate_max": args.rate_max, "jitter_max_ms": args.jitter_max_ms, "max_slope_abs": args.max_slope_abs, "max_slope_frac": args.max_slope_frac, "slope_quantile": args.slope_quantile, "min_uniq_ratio": args.min_uniq_ratio, } results = [] rejected = [] for label, series, times in gen_candidates(df): r = evaluate_supervised(label, series, times, args.rmin, args.rmax, args.allow_neg_scale, constraints) if r is None: continue if r["hit_ratio"] >= args.min_hit: (results if r["passed"] else rejected).append({**r, "trace": trace.stem}) if not results and not rejected: print("Keine Kandidaten über Schwelle gefunden.", file=sys.stderr) sys.exit(3) df_ok = pd.DataFrame(results).sort_values( ["quality_score", "hit_ratio", "p95_absdiff_phys", "rate_hz_est", "n"], ascending=[False, False, True, False, False] ) df_rej = pd.DataFrame(rejected) csv_path = outdir / f"{trace.stem}_encoding_candidates.csv" if not df_ok.empty: df_ok.to_csv(csv_path, index=False) print(f"Kandidaten-CSV: {csv_path}") # Plots für Top-Kandidaten (oder Rejected, falls keine OK) top_for_plots = df_ok if not df_ok.empty else df_rej data = df["data"].tolist() times_all = df["time_s"].to_numpy(dtype=float) def reconstruct_vals(label: str) -> np.ndarray | None: if label.startswith("byte["): i = int(label.split("[")[1].split("]")[0]) idx = [k for k, d in enumerate(data) if len(d) > i] if not idx: return None return np.array([data[k][i] for k in idx], dtype=float), times_all[idx] elif label.startswith(("le16", "be16", "le16s", "be16s")): signed = label.startswith(("le16s", "be16s")) i, j = map(int, label.split("[")[1].split("]")[0].split("-")) idx = [k for k, d in enumerate(data) if len(d) > j] if not idx: return None a = [data[k][i] for k in idx] b = [data[k][j] for k in idx] if label.startswith("le16"): v = [le16(x, y) for x, y in zip(a, b)] else: v = [be16(x, y) for x, y in zip(a, b)] if signed: v = [s16(int(x)) for x in v] return np.array(v, dtype=float), times_all[idx] return None for _, row in top_for_plots.head(max(1, args.plots_top)).iterrows(): rec = reconstruct_vals(row["label"]) if rec is None: continue vals, tt = rec phys = vals * row["scale"] + row["offset"] out_png = outdir / f"{trace.stem}_{row['label'].replace('[','_').replace(']','')}.png" plot_timeseries(tt[:len(phys)], phys, out_png, f"{trace.name} – {row['label']} (scale={row['scale']:.6g}, offset={row['offset']:.6g})", "phys (geschätzt)") # Bericht hdr = { "trace_name": trace.name, "mode": "range_fit", "rmin": args.rmin, "rmax": args.rmax, "min_hit": args.min_hit, "rate_min": args.rate_min, "rate_max": args.rate_max, "jitter_max_ms": args.jitter_max_ms, "max_slope_abs": args.max_slope_abs, "max_slope_frac": args.max_slope_frac, "slope_quantile": args.slope_quantile, } top_view = df_ok.head(12)[ ["label", "quality_score", "hit_ratio", "scale", "offset", "rate_hz_est", "period_std_ms", "slope_p95_per_s", "slope_p99_per_s", "p95_absdiff_phys", "uniq_ratio"] ] if not df_ok.empty else pd.DataFrame() fail_view = df_rej[["label", "fail_reasons"]] if not df_rej.empty else pd.DataFrame() md_path = outdir / f"{trace.stem}_report.md" json_path = outdir / f"{trace.stem}_report.json" write_report_md(md_path, hdr, top_view, fail_view, "range_fit", {"candidates_csv": str(csv_path) if not df_ok.empty else "(leer)"}) with open(json_path, "w", encoding="utf-8") as f: json.dump({ "header": hdr, "accepted": df_ok.to_dict(orient="records"), "rejected": df_rej.to_dict(orient="records"), }, f, ensure_ascii=False, indent=2) print(f"Report: {md_path}") print(f"Report JSON: {json_path}") if not df_ok.empty: print("\nTop-Kandidaten:") cols = ["label", "quality_score", "hit_ratio", "scale", "offset", "rate_hz_est", "period_std_ms", "slope_p95_per_s", "slope_p99_per_s"] print(df_ok.head(10)[cols].to_string(index=False)) else: print("\nKeine Kandidaten PASS; siehe Gründe in report.") else: # Unsupervised results = [] for label, series, times in gen_candidates(df): r = evaluate_unsupervised(label, series, times, min_smooth=args.min_smooth, max_slope_frac_raw=args.max_slope_frac_raw, slope_quantile=args.slope_quantile) if r is None: continue r["trace"] = trace.stem results.append(r) if not results: print("Keine plausiblen Rohsignale gefunden. Tipp: --min-smooth senken.", file=sys.stderr) sys.exit(3) df_res = pd.DataFrame(results).sort_values( ["smoothness", "span_raw", "raw_var", "rate_hz_est", "n"], ascending=[False, False, False, False, False] ) csv_path = outdir / f"{trace.stem}_unsupervised_candidates.csv" df_res.to_csv(csv_path, index=False) print(f"Unsupervised-CSV: {csv_path}") # Plots der Top-N (Rohwerte) data = df["data"].tolist() times_all = df["time_s"].to_numpy(dtype=float) def reconstruct_raw(label: str) -> Tuple[np.ndarray, np.ndarray] | None: if label.startswith("byte["): i = int(label.split("[")[1].split("]")[0]) idx = [k for k, d in enumerate(data) if len(d) > i] if not idx: return None return np.array([data[k][i] for k in idx], dtype=float), times_all[idx] elif label.startswith(("le16", "be16", "le16s", "be16s")): signed = label.startswith(("le16s", "be16s")) i, j = map(int, label.split("[")[1].split("]")[0].split("-")) idx = [k for k, d in enumerate(data) if len(d) > j] if not idx: return None a = [data[k][i] for k in idx] b = [data[k][j] for k in idx] if label.startswith("le16"): v = [le16(x, y) for x, y in zip(a, b)] else: v = [be16(x, y) for x, y in zip(a, b)] if signed: v = [s16(int(x)) for x in v] return np.array(v, dtype=float), times_all[idx] return None for _, row in df_res.head(max(1, args.plots_top)).iterrows(): rec = reconstruct_raw(row["label"]) if rec is None: continue vals, tt = rec out_png = outdir / f"{trace.stem}_{row['label'].replace('[','_').replace(']','')}_raw.png" plot_timeseries(tt[:len(vals)], vals, out_png, f"{trace.name} – {row['label']} (raw)", "raw") # Bericht hdr = { "trace_name": trace.name, "mode": "unsupervised", "min_smooth": args.min_smooth, "max_slope_frac_raw": args.max_slope_frac_raw, } top_view = df_res.head(12)[ ["label", "smoothness", "span_raw", "raw_var", "rate_hz_est", "period_std_ms", "slope_frac_raw", "uniq_ratio"] ] md_path = outdir / f"{trace.stem}_report.md" json_path = outdir / f"{trace.stem}_report.json" write_report_md(md_path, hdr, top_view, pd.DataFrame(), "unsupervised", {"candidates_csv": str(csv_path)}) with open(json_path, "w", encoding="utf-8") as f: json.dump({ "header": hdr, "accepted": df_res.to_dict(orient="records"), }, f, ensure_ascii=False, indent=2) print(f"Report: {md_path}") print(f"Report JSON: {json_path}") if __name__ == "__main__": main()