All checks were successful
CI-Build/Kettenoeler/pipeline/head This commit looks good
661 lines
26 KiB
Python
661 lines
26 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
trace_signal_fitter.py – Advanced Range-/Unsupervised-Fit mit Physik-Constraints & Bericht
|
||
|
||
Modi:
|
||
1) Range-Fit (supervised): --rmin/--rmax gesetzt → finde scale & offset, maximiere Hit-Ratio in [rmin, rmax].
|
||
2) Unsupervised: ohne Range → plausible Rohsignale nach Smoothness/Var/Rate/Span.
|
||
|
||
Neu:
|
||
- Periodizität: Rate (Hz), Jitter (std der Inter-Arrival-Times), CV.
|
||
- Slew-Rate: p95/p99 von |Δ|/s (supervised in phys-Einheit, unsupervised normiert auf Roh-Span).
|
||
- Grenzwerte als Argumente (--rate-min/max, --jitter-max-ms, --max-slope-abs, --max-slope-frac, ...).
|
||
- Zusätzlich signed 16-bit Varianten (le16s/be16s).
|
||
- JSON + Markdown-Bericht pro Trace mit PASS/FAIL und Begründungen.
|
||
|
||
Logformat (Kettenöler):
|
||
<timestamp_ms> <TX|RX> 0x<ID_HEX> <DLC> <byte0> <byte1> ... <byte7>
|
||
|
||
Outputs:
|
||
- supervised: <trace>_encoding_candidates.csv, Plots, <trace>_report.md, <trace>_report.json
|
||
- unsupervised: <trace>_unsupervised_candidates.csv, Plots, <trace>_report.md, <trace>_report.json
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import sys
|
||
import json
|
||
import argparse
|
||
from pathlib import Path
|
||
from typing import List, Tuple, Dict, Iterable
|
||
|
||
import numpy as np
|
||
import pandas as pd
|
||
import matplotlib.pyplot as plt
|
||
|
||
|
||
# ---------- Parsing ----------
|
||
|
||
def parse_trace(path: Path, rx_only: bool = False) -> pd.DataFrame:
|
||
"""
|
||
Robustes Parsen des Kettenöler-Formats:
|
||
<ts_ms> <TX|RX> 0x<ID> <DLC> <b0> <b1> ... (hex)
|
||
"""
|
||
rows = []
|
||
with open(path, "r", errors="ignore") as f:
|
||
for line in f:
|
||
parts = line.strip().split()
|
||
if len(parts) < 4:
|
||
continue
|
||
try:
|
||
ts = int(parts[0])
|
||
dr = parts[1]
|
||
if rx_only and dr != "RX":
|
||
continue
|
||
cid = int(parts[2], 16) if parts[2].lower().startswith("0x") else int(parts[2], 16)
|
||
dlc = int(parts[3])
|
||
bytes_hex = parts[4:4+dlc] if dlc > 0 else []
|
||
data = []
|
||
for b in bytes_hex:
|
||
try:
|
||
data.append(int(b, 16))
|
||
except Exception:
|
||
data.append(0)
|
||
rows.append((ts, dr, cid, data))
|
||
except Exception:
|
||
continue
|
||
|
||
df = pd.DataFrame(rows, columns=["ts", "dir", "id", "data"])
|
||
if df.empty:
|
||
return df
|
||
df["time_s"] = (df["ts"] - df["ts"].min()) / 1000.0
|
||
return df
|
||
|
||
|
||
# ---------- Helpers ----------
|
||
|
||
def be16(a: int, b: int) -> int: return (a << 8) | b
|
||
def le16(a: int, b: int) -> int: return a | (b << 8)
|
||
def s16(u: int) -> int: return u if u < 0x8000 else u - 0x10000
|
||
|
||
def p_quant_abs_diff(arr: np.ndarray, q: float) -> float:
|
||
if arr.size < 2:
|
||
return 0.0
|
||
d = np.abs(np.diff(arr))
|
||
return float(np.percentile(d, q * 100))
|
||
|
||
def p_quant(arr: np.ndarray, q: float) -> float:
|
||
if arr.size == 0:
|
||
return 0.0
|
||
return float(np.percentile(arr, q * 100))
|
||
|
||
def interarrival_metrics(times: np.ndarray) -> Dict[str, float]:
|
||
if times.size < 2:
|
||
return {"rate_hz": 0.0, "period_mean": 0.0, "period_std": 0.0, "jitter_cv": 0.0, "n": int(times.size)}
|
||
dt = np.diff(times)
|
||
period_mean = float(np.mean(dt))
|
||
period_std = float(np.std(dt))
|
||
rate_hz = 1.0 / period_mean if period_mean > 0 else 0.0
|
||
jitter_cv = (period_std / period_mean) if period_mean > 0 else 0.0
|
||
return {"rate_hz": rate_hz, "period_mean": period_mean, "period_std": period_std, "jitter_cv": jitter_cv, "n": int(times.size)}
|
||
|
||
def slope_metrics(values: np.ndarray, times: np.ndarray) -> Dict[str, float]:
|
||
if values.size < 2:
|
||
return {"slope_p95": 0.0, "slope_p99": 0.0, "jerk_p95": 0.0}
|
||
dv = np.abs(np.diff(values))
|
||
dt = np.diff(times)
|
||
# vermeide Division durch 0
|
||
dt = np.where(dt <= 0, np.nan, dt)
|
||
slope = dv / dt
|
||
slope = slope[~np.isnan(slope)]
|
||
if slope.size == 0:
|
||
return {"slope_p95": 0.0, "slope_p99": 0.0, "jerk_p95": 0.0}
|
||
jerk = np.abs(np.diff(slope))
|
||
return {
|
||
"slope_p95": float(np.percentile(slope, 95)),
|
||
"slope_p99": float(np.percentile(slope, 99)),
|
||
"jerk_p95": float(np.percentile(jerk, 95)) if jerk.size > 0 else 0.0,
|
||
}
|
||
|
||
def prefilter(vals: np.ndarray) -> Tuple[bool, Dict[str, float]]:
|
||
if vals.size < 12:
|
||
return False, {"reason": "too_few_samples"}
|
||
uniq = np.unique(vals)
|
||
if uniq.size <= 2:
|
||
return False, {"reason": "too_constant"}
|
||
p95 = p_quant_abs_diff(vals, 0.95)
|
||
if p95 == 0:
|
||
return False, {"reason": "no_changes"}
|
||
r = float(np.percentile(vals, 97) - np.percentile(vals, 3) + 1e-9)
|
||
if p95 > 0.5 * r:
|
||
return False, {"reason": "too_jumpi"}
|
||
return True, {"p95_abs_diff": p95, "span_est": r}
|
||
|
||
def try_scaleset() -> List[float]:
|
||
base = [
|
||
1e-3, 2e-3, 5e-3,
|
||
1e-2, 2e-2, 5e-2,
|
||
0.05, 0.0625, 0.1, 0.125, 0.2, 0.25, 0.5,
|
||
0.75, 0.8, 1.0, 1.25, 2.0, 5.0, 10.0
|
||
]
|
||
return sorted(set(base))
|
||
|
||
def interval_best_offset(raw: np.ndarray, scale: float, rmin: float, rmax: float) -> Tuple[float, float]:
|
||
"""
|
||
Finde das Offset, das die meisten Werte (scale*raw + offset) in [rmin, rmax] bringt.
|
||
Sweep über Intervallgrenzen (klassische "interval stabbing" Lösung).
|
||
"""
|
||
a = rmin - scale * raw
|
||
b = rmax - scale * raw
|
||
lo = np.minimum(a, b)
|
||
hi = np.maximum(a, b)
|
||
events = []
|
||
for L, H in zip(lo, hi):
|
||
events.append((L, +1))
|
||
events.append((H, -1))
|
||
events.sort(key=lambda t: (t[0], -t[1]))
|
||
best = -1
|
||
cur = 0
|
||
best_x = None
|
||
for x, v in events:
|
||
cur += v
|
||
if cur > best:
|
||
best = cur
|
||
best_x = x
|
||
hit_ratio = float(best) / float(len(raw)) if len(raw) else 0.0
|
||
return float(best_x if best_x is not None else 0.0), hit_ratio
|
||
|
||
|
||
# ---------- Candidate Generation ----------
|
||
|
||
def gen_candidates(df: pd.DataFrame) -> Iterable[Tuple[str, np.ndarray, np.ndarray]]:
|
||
"""
|
||
Liefert (label, values, times) für:
|
||
- 8-bit Bytes D0..D7
|
||
- 16-bit adjazente Paare (LE/BE) + signed Varianten
|
||
Times wird auf die gefilterten Indizes gemappt (DLC-abhängig).
|
||
"""
|
||
times_all = df["time_s"].to_numpy(dtype=float)
|
||
data = df["data"].tolist()
|
||
|
||
# 8-bit
|
||
for i in range(8):
|
||
idx = [k for k, d in enumerate(data) if len(d) > i]
|
||
if len(idx) < 3:
|
||
continue
|
||
vals = np.array([data[k][i] for k in idx], dtype=float)
|
||
t = times_all[idx]
|
||
yield f"byte[{i}]", vals, t
|
||
|
||
# 16-bit adjazent
|
||
for i in range(7):
|
||
j = i + 1
|
||
idx = [k for k, d in enumerate(data) if len(d) > j]
|
||
if len(idx) < 3:
|
||
continue
|
||
a = [data[k][i] for k in idx]
|
||
b = [data[k][j] for k in idx]
|
||
u_le = np.array([le16(x, y) for x, y in zip(a, b)], dtype=float)
|
||
u_be = np.array([be16(x, y) for x, y in zip(a, b)], dtype=float)
|
||
s_le = np.array([s16(le16(x, y)) for x, y in zip(a, b)], dtype=float)
|
||
s_be = np.array([s16(be16(x, y)) for x, y in zip(a, b)], dtype=float)
|
||
t = times_all[idx]
|
||
yield f"le16[{i}-{j}]", u_le, t
|
||
yield f"be16[{i}-{j}]", u_be, t
|
||
yield f"le16s[{i}-{j}]", s_le, t
|
||
yield f"be16s[{i}-{j}]", s_be, t
|
||
|
||
|
||
# ---------- Evaluation ----------
|
||
|
||
def evaluate_supervised(label: str,
|
||
vals: np.ndarray,
|
||
times: np.ndarray,
|
||
rmin: float,
|
||
rmax: float,
|
||
allow_neg_scale: bool,
|
||
constraints: Dict[str, float]) -> Dict[str, float] | None:
|
||
ok, meta = prefilter(vals)
|
||
if not ok:
|
||
return None
|
||
|
||
scales = try_scaleset()
|
||
if allow_neg_scale:
|
||
scales += [-s for s in scales if s > 0]
|
||
|
||
best = {"hit_ratio": -1.0, "scale": None, "offset": 0.0}
|
||
for s in scales:
|
||
o, hr = interval_best_offset(vals, s, rmin, rmax)
|
||
if hr > best["hit_ratio"]:
|
||
best = {"scale": s, "offset": float(o), "hit_ratio": hr}
|
||
|
||
phys = vals * best["scale"] + best["offset"]
|
||
within = (phys >= rmin) & (phys <= rmax)
|
||
in_count = int(np.count_nonzero(within))
|
||
|
||
p95_raw = p_quant_abs_diff(vals, 0.95)
|
||
p95_phys = p_quant_abs_diff(phys, 0.95)
|
||
|
||
ia = interarrival_metrics(times[:len(vals)])
|
||
sm = slope_metrics(phys, times[:len(phys)])
|
||
|
||
prange = (rmax - rmin) if (rmax > rmin) else 1.0
|
||
slope_p95_frac = sm["slope_p95"] / prange
|
||
slope_p99_frac = sm["slope_p99"] / prange
|
||
|
||
failures = []
|
||
|
||
if constraints.get("rate_min") is not None and ia["rate_hz"] < constraints["rate_min"] - 1e-9:
|
||
failures.append(f"rate {ia['rate_hz']:.2f}Hz < min {constraints['rate_min']:.2f}Hz")
|
||
if constraints.get("rate_max") is not None and ia["rate_hz"] > constraints["rate_max"] + 1e-9:
|
||
failures.append(f"rate {ia['rate_hz']:.2f}Hz > max {constraints['rate_max']:.2f}Hz")
|
||
|
||
if constraints.get("jitter_max_ms") is not None:
|
||
jitter_ms = ia["period_std"] * 1000.0
|
||
if jitter_ms > constraints["jitter_max_ms"] + 1e-9:
|
||
failures.append(f"jitter {jitter_ms:.1f}ms > max {constraints['jitter_max_ms']:.1f}ms")
|
||
|
||
def _resolve_abs_slope_limit():
|
||
if constraints.get("max_slope_abs") is not None:
|
||
return constraints["max_slope_abs"]
|
||
if constraints.get("max_slope_frac") is not None:
|
||
return constraints["max_slope_frac"] * prange
|
||
return None
|
||
|
||
max_s_abs = _resolve_abs_slope_limit()
|
||
if max_s_abs is not None:
|
||
q = constraints.get("slope_quantile", 0.95)
|
||
qv = sm["slope_p95"] if q <= 0.95 else sm["slope_p99"]
|
||
if qv > max_s_abs + 1e-9:
|
||
failures.append(f"slope(q={q:.2f}) {qv:.3g} > max {max_s_abs:.3g}")
|
||
|
||
uniq_ratio = len(np.unique(vals)) / float(len(vals))
|
||
if constraints.get("min_uniq_ratio") is not None and uniq_ratio < constraints["min_uniq_ratio"] - 1e-9:
|
||
failures.append(f"uniq_ratio {uniq_ratio:.3f} < min {constraints['min_uniq_ratio']:.3f}")
|
||
|
||
passed = (len(failures) == 0)
|
||
|
||
# Quality Score
|
||
score = best["hit_ratio"]
|
||
if max_s_abs is not None and max_s_abs > 0:
|
||
slope_pen = min(sm["slope_p95"] / max_s_abs, 1.0)
|
||
score *= (1.0 - 0.3 * slope_pen)
|
||
if constraints.get("jitter_max_ms") is not None:
|
||
jitter_ms = ia["period_std"] * 1000.0
|
||
jitter_pen = min(jitter_ms / constraints["jitter_max_ms"], 1.0)
|
||
score *= (1.0 - 0.2 * jitter_pen)
|
||
|
||
return {
|
||
"label": label,
|
||
"mode": "range_fit",
|
||
"n": int(vals.size),
|
||
"raw_min": float(np.min(vals)),
|
||
"raw_max": float(np.max(vals)),
|
||
"raw_var": float(np.var(vals)),
|
||
"p95_absdiff_raw": float(p95_raw),
|
||
"scale": float(best["scale"]),
|
||
"offset": float(best["offset"]),
|
||
"hit_ratio": float(best["hit_ratio"]),
|
||
"in_count": in_count,
|
||
"phys_min": float(np.min(phys)),
|
||
"phys_max": float(np.max(phys)),
|
||
"p95_absdiff_phys": float(p95_phys),
|
||
"span_phys": float(np.percentile(phys, 97) - np.percentile(phys, 3)),
|
||
"rate_hz_est": float(ia["rate_hz"]),
|
||
"period_std_ms": float(ia["period_std"] * 1000.0),
|
||
"jitter_cv": float(ia["jitter_cv"]),
|
||
"slope_p95_per_s": float(sm["slope_p95"]),
|
||
"slope_p99_per_s": float(sm["slope_p99"]),
|
||
"slope_p95_frac": float(slope_p95_frac),
|
||
"slope_p99_frac": float(slope_p99_frac),
|
||
"uniq_ratio": float(uniq_ratio),
|
||
"passed": bool(passed),
|
||
"fail_reasons": "; ".join(failures),
|
||
"quality_score": float(score),
|
||
}
|
||
|
||
def evaluate_unsupervised(label: str,
|
||
vals: np.ndarray,
|
||
times: np.ndarray,
|
||
min_smooth: float = 0.2,
|
||
max_slope_frac_raw: float | None = None,
|
||
slope_quantile: float = 0.95) -> Dict[str, float] | None:
|
||
if vals.size < 12:
|
||
return None
|
||
p95 = p_quant_abs_diff(vals, 0.95)
|
||
span = float(np.percentile(vals, 97) - np.percentile(vals, 3) + 1e-9)
|
||
smooth = 1.0 - min(max(p95 / span, 0.0), 1.0)
|
||
uniq_ratio = float(len(np.unique(vals))) / float(vals.size)
|
||
var = float(np.var(vals))
|
||
|
||
ia = interarrival_metrics(times[:len(vals)])
|
||
sm = slope_metrics(vals, times[:len(vals)])
|
||
slope_q = sm["slope_p95"] if slope_quantile <= 0.95 else sm["slope_p99"]
|
||
slope_frac_raw = (slope_q / span) if span > 0 else 0.0
|
||
|
||
if uniq_ratio <= 0.02:
|
||
return None
|
||
if smooth < min_smooth:
|
||
return None
|
||
if (max_slope_frac_raw is not None) and (slope_frac_raw > max_slope_frac_raw):
|
||
return None
|
||
|
||
return {
|
||
"label": label,
|
||
"mode": "unsupervised",
|
||
"n": int(vals.size),
|
||
"raw_min": float(np.min(vals)),
|
||
"raw_max": float(np.max(vals)),
|
||
"raw_var": var,
|
||
"span_raw": span,
|
||
"p95_absdiff_raw": float(p95),
|
||
"smoothness": float(smooth),
|
||
"uniq_ratio": float(uniq_ratio),
|
||
"rate_hz_est": float(ia["rate_hz"]),
|
||
"period_std_ms": float(ia["period_std"] * 1000.0),
|
||
"jitter_cv": float(ia["jitter_cv"]),
|
||
"slope_q_raw": float(slope_q),
|
||
"slope_frac_raw": float(slope_frac_raw),
|
||
}
|
||
|
||
|
||
# ---------- Plot & Report ----------
|
||
|
||
def plot_timeseries(times: np.ndarray, series: np.ndarray, out_png: Path, title: str, ylabel: str) -> None:
|
||
plt.figure(figsize=(10, 4))
|
||
plt.plot(times[:len(series)], series, marker=".", linestyle="-")
|
||
plt.xlabel("Zeit (s)")
|
||
plt.ylabel(ylabel)
|
||
plt.title(title)
|
||
plt.grid(True)
|
||
plt.tight_layout()
|
||
out_png.parent.mkdir(parents=True, exist_ok=True)
|
||
plt.savefig(out_png, dpi=150)
|
||
plt.close()
|
||
|
||
def df_to_md_table(df: pd.DataFrame) -> str:
|
||
"""Robustes Markdown-Table: nutzt to_markdown falls vorhanden, sonst CSV in Codeblock."""
|
||
try:
|
||
return df.to_markdown(index=False) # benötigt evtl. 'tabulate'
|
||
except Exception:
|
||
return "```\n" + df.to_csv(index=False) + "```"
|
||
|
||
def write_report_md(path: Path, header: dict, top_rows: pd.DataFrame, failures: pd.DataFrame, mode: str, links: dict) -> None:
|
||
md = []
|
||
md.append(f"# Trace Report – {header.get('trace_name','')}")
|
||
md.append("")
|
||
md.append(f"- **Mode:** {mode}")
|
||
for k, v in header.items():
|
||
if k in ("trace_name",):
|
||
continue
|
||
md.append(f"- **{k}**: {v}")
|
||
md.append("")
|
||
|
||
if mode == "range_fit":
|
||
md.append("## Top-Kandidaten (Range-Fit)")
|
||
md.append("Hit-Ratio, Slope/Jitter & Score – beste zuerst.\n")
|
||
if top_rows is not None and not top_rows.empty:
|
||
md.append(df_to_md_table(top_rows))
|
||
else:
|
||
md.append("_Keine Kandidaten über Schwelle._")
|
||
md.append("")
|
||
if failures is not None and not failures.empty:
|
||
md.append("## Ausgeschlossene Kandidaten (Gründe)\n")
|
||
md.append(df_to_md_table(failures[["label", "fail_reasons"]]))
|
||
else:
|
||
md.append("## Top-Kandidaten (Unsupervised)\n")
|
||
if top_rows is not None and not top_rows.empty:
|
||
md.append(df_to_md_table(top_rows))
|
||
else:
|
||
md.append("_Keine plausiblen Rohsignale._")
|
||
|
||
md.append("\n## Artefakte")
|
||
for k, v in links.items():
|
||
md.append(f"- **{k}**: `{v}`")
|
||
path.write_text("\n".join(md), encoding="utf-8")
|
||
|
||
|
||
# ---------- Main ----------
|
||
|
||
def main():
|
||
ap = argparse.ArgumentParser(description="Range-/Unsupervised-Fit mit physikbasierten Constraints + Bericht")
|
||
ap.add_argument("trace", help="Pfad zur .trace Datei")
|
||
|
||
# supervision
|
||
ap.add_argument("--rmin", type=float, default=None)
|
||
ap.add_argument("--rmax", type=float, default=None)
|
||
ap.add_argument("--allow-neg-scale", action="store_true")
|
||
|
||
# shared
|
||
ap.add_argument("--rx-only", action="store_true")
|
||
ap.add_argument("--outdir", default=".")
|
||
ap.add_argument("--plots-top", type=int, default=8)
|
||
|
||
# supervised thresholds
|
||
ap.add_argument("--min-hit", type=float, default=0.5)
|
||
ap.add_argument("--rate-min", type=float, default=None)
|
||
ap.add_argument("--rate-max", type=float, default=None)
|
||
ap.add_argument("--jitter-max-ms", type=float, default=None)
|
||
ap.add_argument("--max-slope-abs", type=float, default=None, help="Max |Δphys|/s (z. B. °C/s, km/h/s)")
|
||
ap.add_argument("--max-slope-frac", type=float, default=None, help="Max |Δphys|/s relativ zu (rmax-rmin)")
|
||
ap.add_argument("--slope-quantile", type=float, default=0.95, help="0.95 oder 0.99")
|
||
ap.add_argument("--min-uniq-ratio", type=float, default=None)
|
||
|
||
# unsupervised thresholds
|
||
ap.add_argument("--min-smooth", type=float, default=0.2)
|
||
ap.add_argument("--max-slope-frac-raw", type=float, default=None, help="roh: (|Δraw|/s)/Span")
|
||
|
||
args = ap.parse_args()
|
||
|
||
trace = Path(args.trace)
|
||
df = parse_trace(trace, rx_only=args.rx_only)
|
||
if df.empty:
|
||
print("Keine Daten in Trace.", file=sys.stderr)
|
||
sys.exit(2)
|
||
|
||
supervised = (args.rmin is not None) and (args.rmax is not None)
|
||
outdir = Path(args.outdir)
|
||
outdir.mkdir(parents=True, exist_ok=True)
|
||
|
||
if supervised:
|
||
constraints = {
|
||
"rate_min": args.rate_min,
|
||
"rate_max": args.rate_max,
|
||
"jitter_max_ms": args.jitter_max_ms,
|
||
"max_slope_abs": args.max_slope_abs,
|
||
"max_slope_frac": args.max_slope_frac,
|
||
"slope_quantile": args.slope_quantile,
|
||
"min_uniq_ratio": args.min_uniq_ratio,
|
||
}
|
||
results = []
|
||
rejected = []
|
||
for label, series, times in gen_candidates(df):
|
||
r = evaluate_supervised(label, series, times, args.rmin, args.rmax, args.allow_neg_scale, constraints)
|
||
if r is None:
|
||
continue
|
||
if r["hit_ratio"] >= args.min_hit:
|
||
(results if r["passed"] else rejected).append({**r, "trace": trace.stem})
|
||
|
||
if not results and not rejected:
|
||
print("Keine Kandidaten über Schwelle gefunden.", file=sys.stderr)
|
||
sys.exit(3)
|
||
|
||
df_ok = pd.DataFrame(results).sort_values(
|
||
["quality_score", "hit_ratio", "p95_absdiff_phys", "rate_hz_est", "n"],
|
||
ascending=[False, False, True, False, False]
|
||
)
|
||
df_rej = pd.DataFrame(rejected)
|
||
|
||
csv_path = outdir / f"{trace.stem}_encoding_candidates.csv"
|
||
if not df_ok.empty:
|
||
df_ok.to_csv(csv_path, index=False)
|
||
print(f"Kandidaten-CSV: {csv_path}")
|
||
|
||
# Plots für Top-Kandidaten (oder Rejected, falls keine OK)
|
||
top_for_plots = df_ok if not df_ok.empty else df_rej
|
||
data = df["data"].tolist()
|
||
times_all = df["time_s"].to_numpy(dtype=float)
|
||
|
||
def reconstruct_vals(label: str) -> np.ndarray | None:
|
||
if label.startswith("byte["):
|
||
i = int(label.split("[")[1].split("]")[0])
|
||
idx = [k for k, d in enumerate(data) if len(d) > i]
|
||
if not idx: return None
|
||
return np.array([data[k][i] for k in idx], dtype=float), times_all[idx]
|
||
elif label.startswith(("le16", "be16", "le16s", "be16s")):
|
||
signed = label.startswith(("le16s", "be16s"))
|
||
i, j = map(int, label.split("[")[1].split("]")[0].split("-"))
|
||
idx = [k for k, d in enumerate(data) if len(d) > j]
|
||
if not idx: return None
|
||
a = [data[k][i] for k in idx]
|
||
b = [data[k][j] for k in idx]
|
||
if label.startswith("le16"):
|
||
v = [le16(x, y) for x, y in zip(a, b)]
|
||
else:
|
||
v = [be16(x, y) for x, y in zip(a, b)]
|
||
if signed:
|
||
v = [s16(int(x)) for x in v]
|
||
return np.array(v, dtype=float), times_all[idx]
|
||
return None
|
||
|
||
for _, row in top_for_plots.head(max(1, args.plots_top)).iterrows():
|
||
rec = reconstruct_vals(row["label"])
|
||
if rec is None:
|
||
continue
|
||
vals, tt = rec
|
||
phys = vals * row["scale"] + row["offset"]
|
||
out_png = outdir / f"{trace.stem}_{row['label'].replace('[','_').replace(']','')}.png"
|
||
plot_timeseries(tt[:len(phys)], phys, out_png,
|
||
f"{trace.name} – {row['label']} (scale={row['scale']:.6g}, offset={row['offset']:.6g})",
|
||
"phys (geschätzt)")
|
||
|
||
# Bericht
|
||
hdr = {
|
||
"trace_name": trace.name,
|
||
"mode": "range_fit",
|
||
"rmin": args.rmin,
|
||
"rmax": args.rmax,
|
||
"min_hit": args.min_hit,
|
||
"rate_min": args.rate_min,
|
||
"rate_max": args.rate_max,
|
||
"jitter_max_ms": args.jitter_max_ms,
|
||
"max_slope_abs": args.max_slope_abs,
|
||
"max_slope_frac": args.max_slope_frac,
|
||
"slope_quantile": args.slope_quantile,
|
||
}
|
||
top_view = df_ok.head(12)[
|
||
["label", "quality_score", "hit_ratio", "scale", "offset",
|
||
"rate_hz_est", "period_std_ms", "slope_p95_per_s", "slope_p99_per_s",
|
||
"p95_absdiff_phys", "uniq_ratio"]
|
||
] if not df_ok.empty else pd.DataFrame()
|
||
fail_view = df_rej[["label", "fail_reasons"]] if not df_rej.empty else pd.DataFrame()
|
||
|
||
md_path = outdir / f"{trace.stem}_report.md"
|
||
json_path = outdir / f"{trace.stem}_report.json"
|
||
write_report_md(md_path, hdr, top_view, fail_view, "range_fit",
|
||
{"candidates_csv": str(csv_path) if not df_ok.empty else "(leer)"})
|
||
with open(json_path, "w", encoding="utf-8") as f:
|
||
json.dump({
|
||
"header": hdr,
|
||
"accepted": df_ok.to_dict(orient="records"),
|
||
"rejected": df_rej.to_dict(orient="records"),
|
||
}, f, ensure_ascii=False, indent=2)
|
||
print(f"Report: {md_path}")
|
||
print(f"Report JSON: {json_path}")
|
||
|
||
if not df_ok.empty:
|
||
print("\nTop-Kandidaten:")
|
||
cols = ["label", "quality_score", "hit_ratio", "scale", "offset",
|
||
"rate_hz_est", "period_std_ms", "slope_p95_per_s", "slope_p99_per_s"]
|
||
print(df_ok.head(10)[cols].to_string(index=False))
|
||
else:
|
||
print("\nKeine Kandidaten PASS; siehe Gründe in report.")
|
||
|
||
else:
|
||
# Unsupervised
|
||
results = []
|
||
for label, series, times in gen_candidates(df):
|
||
r = evaluate_unsupervised(label, series, times,
|
||
min_smooth=args.min_smooth,
|
||
max_slope_frac_raw=args.max_slope_frac_raw,
|
||
slope_quantile=args.slope_quantile)
|
||
if r is None:
|
||
continue
|
||
r["trace"] = trace.stem
|
||
results.append(r)
|
||
|
||
if not results:
|
||
print("Keine plausiblen Rohsignale gefunden. Tipp: --min-smooth senken.", file=sys.stderr)
|
||
sys.exit(3)
|
||
|
||
df_res = pd.DataFrame(results).sort_values(
|
||
["smoothness", "span_raw", "raw_var", "rate_hz_est", "n"],
|
||
ascending=[False, False, False, False, False]
|
||
)
|
||
|
||
csv_path = outdir / f"{trace.stem}_unsupervised_candidates.csv"
|
||
df_res.to_csv(csv_path, index=False)
|
||
print(f"Unsupervised-CSV: {csv_path}")
|
||
|
||
# Plots der Top-N (Rohwerte)
|
||
data = df["data"].tolist()
|
||
times_all = df["time_s"].to_numpy(dtype=float)
|
||
|
||
def reconstruct_raw(label: str) -> Tuple[np.ndarray, np.ndarray] | None:
|
||
if label.startswith("byte["):
|
||
i = int(label.split("[")[1].split("]")[0])
|
||
idx = [k for k, d in enumerate(data) if len(d) > i]
|
||
if not idx: return None
|
||
return np.array([data[k][i] for k in idx], dtype=float), times_all[idx]
|
||
elif label.startswith(("le16", "be16", "le16s", "be16s")):
|
||
signed = label.startswith(("le16s", "be16s"))
|
||
i, j = map(int, label.split("[")[1].split("]")[0].split("-"))
|
||
idx = [k for k, d in enumerate(data) if len(d) > j]
|
||
if not idx: return None
|
||
a = [data[k][i] for k in idx]
|
||
b = [data[k][j] for k in idx]
|
||
if label.startswith("le16"):
|
||
v = [le16(x, y) for x, y in zip(a, b)]
|
||
else:
|
||
v = [be16(x, y) for x, y in zip(a, b)]
|
||
if signed:
|
||
v = [s16(int(x)) for x in v]
|
||
return np.array(v, dtype=float), times_all[idx]
|
||
return None
|
||
|
||
for _, row in df_res.head(max(1, args.plots_top)).iterrows():
|
||
rec = reconstruct_raw(row["label"])
|
||
if rec is None:
|
||
continue
|
||
vals, tt = rec
|
||
out_png = outdir / f"{trace.stem}_{row['label'].replace('[','_').replace(']','')}_raw.png"
|
||
plot_timeseries(tt[:len(vals)], vals, out_png,
|
||
f"{trace.name} – {row['label']} (raw)", "raw")
|
||
|
||
# Bericht
|
||
hdr = {
|
||
"trace_name": trace.name,
|
||
"mode": "unsupervised",
|
||
"min_smooth": args.min_smooth,
|
||
"max_slope_frac_raw": args.max_slope_frac_raw,
|
||
}
|
||
top_view = df_res.head(12)[
|
||
["label", "smoothness", "span_raw", "raw_var",
|
||
"rate_hz_est", "period_std_ms", "slope_frac_raw", "uniq_ratio"]
|
||
]
|
||
md_path = outdir / f"{trace.stem}_report.md"
|
||
json_path = outdir / f"{trace.stem}_report.json"
|
||
write_report_md(md_path, hdr, top_view, pd.DataFrame(), "unsupervised",
|
||
{"candidates_csv": str(csv_path)})
|
||
with open(json_path, "w", encoding="utf-8") as f:
|
||
json.dump({
|
||
"header": hdr,
|
||
"accepted": df_res.to_dict(orient="records"),
|
||
}, f, ensure_ascii=False, indent=2)
|
||
print(f"Report: {md_path}")
|
||
print(f"Report JSON: {json_path}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|