Files
Kettenoeler/Reverse-Engineering CAN-Bus/trace_signal_fitter.py

315 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
trace_signal_fitter.py
----------------------
Zwei Betriebsarten für eine einzelne .trace-Datei:
1) Range-Fit (überwacht): --rmin/--rmax gesetzt
Sucht für alle 8-bit (D0..D7) und adjazenten 16-bit (LE/BE) eine lineare Abbildung
phys = raw*scale + offset, die möglichst viele Samples in [rmin, rmax] bringt.
Ranking primär nach hit_ratio.
2) Unsupervised (ohne Range): --rmin/--rmax weggelassen
Findet „plausible“ physikalische Kandidaten nach Glattheit/Varianz/Spannweite/Rate,
ohne Scale/Offset zu schätzen (raw-Werte direkt). Ranking primär nach „smoothness“.
Logformat (Kettenöler):
<timestamp_ms> <TX/RX> 0x<ID_HEX> <dlc> <byte0> <byte1> ...
Outputs:
- Range-Fit: <trace_stem>_encoding_candidates.csv + optional Plots
- Unsupervised:<trace_stem>_unsupervised_candidates.csv + optional Plots
"""
import re
import sys
import argparse
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
LOG_PATTERN = re.compile(r"(\d+)\s+(TX|RX)\s+0x([0-9A-Fa-f]+)\s+\d+\s+((?:[0-9A-Fa-f]{2}\s+)+)")
def parse_trace(path: Path, rx_only=False) -> pd.DataFrame:
rows = []
with open(path, "r", errors="ignore") as f:
for line in f:
m = LOG_PATTERN.match(line)
if not m:
continue
ts = int(m.group(1)); dr = m.group(2)
if rx_only and dr != "RX":
continue
cid = int(m.group(3), 16)
data = [int(x, 16) for x in m.group(4).split() if x.strip()]
rows.append((ts, dr, cid, data))
df = pd.DataFrame(rows, columns=["ts","dir","id","data"])
if df.empty:
return df
df["time_s"] = (df["ts"] - df["ts"].min())/1000.0
return df
def be16(a,b): return (a<<8)|b
def le16(a,b): return a | (b<<8)
def p95_abs_diff(arr: np.ndarray) -> float:
if arr.size < 2:
return 0.0
d = np.abs(np.diff(arr))
return float(np.percentile(d, 95))
def basic_rate(times: np.ndarray) -> float:
if times.size < 2: return 0.0
dur = times.max() - times.min()
if dur <= 0: return 0.0
return float(times.size / dur)
def interval_best_offset(raw: np.ndarray, scale: float, rmin: float, rmax: float):
a = rmin - scale*raw
b = rmax - scale*raw
lo = np.minimum(a,b)
hi = np.maximum(a,b)
events = []
for L,H in zip(lo,hi):
events.append((L, +1))
events.append((H, -1))
events.sort(key=lambda t: (t[0], -t[1]))
best = -1; cur = 0; best_x = None
for x, v in events:
cur += v
if cur > best:
best = cur; best_x = x
return best_x, float(best)/float(len(raw))
def gen_candidates(df: pd.DataFrame):
times = df["time_s"].to_numpy(dtype=float)
data = df["data"].tolist()
# 8-bit
for i in range(8):
vals = [d[i] for d in data if len(d)>i]
if not vals: continue
yield (f"byte[{i}]", np.array(vals, dtype=float)), times[:len(vals)]
# 16-bit (adjacent)
pairs = [(i,i+1) for i in range(7)]
for i,j in pairs:
vals = [le16(d[i],d[j]) for d in data if len(d)>j]
if vals:
yield (f"le16[{i}-{j}]", np.array(vals, dtype=float)), times[:len(vals)]
vals = [be16(d[i],d[j]) for d in data if len(d)>j]
if vals:
yield (f"be16[{i}-{j}]", np.array(vals, dtype=float)), times[:len(vals)]
def prefilter(vals: np.ndarray):
if vals.size < 12:
return False, {"reason":"too_few_samples"}
uniq = np.unique(vals)
if uniq.size <= 2:
return False, {"reason":"too_constant"}
p95 = p95_abs_diff(vals)
if p95 == 0:
return False, {"reason":"no_changes"}
r = float(np.percentile(vals, 97) - np.percentile(vals, 3) + 1e-9)
if p95 > 0.5*r:
return False, {"reason":"too_jumpi"}
return True, {"p95_abs_diff":p95, "span_est":r}
def try_scaleset():
base = [1e-3, 2e-3, 5e-3,
1e-2, 2e-2, 5e-2,
0.1, 0.2, 0.25, 0.5,
1.0, 2.0, 5.0, 10.0,
0.0625, 0.125, 0.75, 0.8, 1.25]
return sorted(set(base))
def evaluate_supervised(label, vals: np.ndarray, times: np.ndarray, rmin: float, rmax: float, allow_neg_scale=False):
ok, meta = prefilter(vals)
if not ok:
return None
scales = try_scaleset()
if allow_neg_scale:
scales = scales + [-s for s in scales if s>0]
best = {"hit_ratio": -1.0}
for s in scales:
o, hr = interval_best_offset(vals, s, rmin, rmax)
if hr > best["hit_ratio"]:
best = {"scale":s, "offset":float(o), "hit_ratio":hr}
phys = vals*best["scale"] + best["offset"]
within = (phys>=rmin) & (phys<=rmax)
in_count = int(np.count_nonzero(within))
p95_raw = p95_abs_diff(vals)
p95_phys = p95_abs_diff(phys)
rate = basic_rate(times[:len(vals)])
return {
"label": label,
"mode": "range_fit",
"n": int(vals.size),
"rate_hz_est": float(rate),
"raw_min": float(np.min(vals)),
"raw_max": float(np.max(vals)),
"raw_var": float(np.var(vals)),
"p95_absdiff_raw": float(p95_raw),
"scale": float(best["scale"]),
"offset": float(best["offset"]),
"hit_ratio": float(best["hit_ratio"]),
"in_count": in_count,
"phys_min": float(np.min(phys)),
"phys_max": float(np.max(phys)),
"p95_absdiff_phys": float(p95_phys),
"span_phys": float(np.percentile(phys, 97) - np.percentile(phys, 3)),
"prefilter_span_est": float(meta.get("span_est", 0.0)),
"prefilter_p95_absdiff": float(meta.get("p95_abs_diff", 0.0)),
}
def evaluate_unsupervised(label, vals: np.ndarray, times: np.ndarray, min_smooth=0.2):
"""
Liefert nur Plausibilitätsmetriken (keine scale/offset).
smoothness = 1 - clamp(p95(|Δ|) / span, 0..1)
uniq_ratio = |unique| / n
Ranking: smoothness desc, span desc, var desc, rate desc, n desc
"""
if vals.size < 12:
return None
p95 = p95_abs_diff(vals)
span = float(np.percentile(vals, 97) - np.percentile(vals, 3) + 1e-9)
smooth = 1.0 - min(max(p95/span, 0.0), 1.0)
uniq = len(np.unique(vals))
uniq_ratio = float(uniq) / float(vals.size)
var = float(np.var(vals))
rate = basic_rate(times[:len(vals)])
# Filter: zu konstant, zu sprunghaft
if uniq_ratio <= 0.02:
return None
if smooth < min_smooth:
return None
return {
"label": label,
"mode": "unsupervised",
"n": int(vals.size),
"rate_hz_est": float(rate),
"raw_min": float(np.min(vals)),
"raw_max": float(np.max(vals)),
"raw_var": var,
"span_raw": span,
"p95_absdiff_raw": float(p95),
"smoothness": float(smooth),
"uniq_ratio": float(uniq_ratio),
}
def plot_timeseries(times, series, out_png: Path, title: str, ylabel: str):
plt.figure(figsize=(10,4))
plt.plot(times[:len(series)], series, marker=".", linestyle="-")
plt.xlabel("Zeit (s)"); plt.ylabel(ylabel)
plt.title(title); plt.grid(True); plt.tight_layout()
out_png.parent.mkdir(parents=True, exist_ok=True)
plt.savefig(out_png, dpi=150); plt.close()
def main():
ap = argparse.ArgumentParser(description="Finde Encoding-Kandidaten (mit Range) oder plausible Rohsignale (ohne Range) in einer .trace-Datei")
ap.add_argument("trace", help="Pfad zur .trace Datei (aus can_split_by_id.py)")
ap.add_argument("--rmin", type=float, default=None, help="untere Grenze des Zielbereichs (phys)")
ap.add_argument("--rmax", type=float, default=None, help="obere Grenze des Zielbereichs (phys)")
ap.add_argument("--rx-only", action="store_true", help="Nur RX Frames nutzen")
ap.add_argument("--allow-neg-scale", action="store_true", help="Auch negative scale testen (nur Range-Fit)")
ap.add_argument("--outdir", default=".", help="Output-Verzeichnis (CSV/Plots)")
ap.add_argument("--plots-top", type=int, default=8, help="Erzeuge Plots für die Top-N Kandidaten")
ap.add_argument("--min-hit", type=float, default=0.5, help="Mindest-Hit-Ratio für Range-Fit (0..1)")
ap.add_argument("--min-smooth", type=float, default=0.2, help="Mindest-Smoothness für Unsupervised (0..1)")
args = ap.parse_args()
trace = Path(args.trace)
df = parse_trace(trace, rx_only=args.rx_only)
if df.empty:
print("Keine Daten in Trace.", file=sys.stderr); sys.exit(2)
supervised = (args.rmin is not None) and (args.rmax is not None)
results = []
for (label, series), times in gen_candidates(df):
if supervised:
r = evaluate_supervised(label, series, times, args.rmin, args.rmax, allow_neg_scale=args.allow_neg_scale)
if r is None:
continue
if r["hit_ratio"] >= args.min_hit:
r["trace"] = trace.stem
results.append(r)
else:
r = evaluate_unsupervised(label, series, times, min_smooth=args.min_smooth)
if r is None:
continue
r["trace"] = trace.stem
results.append(r)
if not results:
if supervised:
print("Keine Kandidaten über Schwelle gefunden. Tipp: --min-hit senken oder --allow-neg-scale testen.", file=sys.stderr)
else:
print("Keine plausiblen Rohsignale gefunden. Tipp: --min-smooth senken.", file=sys.stderr)
sys.exit(3)
outdir = Path(args.outdir); outdir.mkdir(parents=True, exist_ok=True)
if supervised:
df_res = pd.DataFrame(results).sort_values(["hit_ratio", "p95_absdiff_phys", "rate_hz_est", "n"], ascending=[False, True, False, False])
csv_path = outdir / f"{trace.stem}_encoding_candidates.csv"
df_res.to_csv(csv_path, index=False)
print(f"Kandidaten-CSV: {csv_path}")
# Plots
for _, row in df_res.head(args.plots_top).iterrows():
# decode again
times = df["time_s"].to_numpy(dtype=float)
data = df["data"].tolist()
label = row["label"]
if label.startswith("byte["):
i = int(label.split("[")[1].split("]")[0])
vals = np.array([d[i] for d in data if len(d)>i], dtype=float)
elif label.startswith("le16["):
i,j = map(int, label.split("[")[1].split("]")[0].split("-"))
vals = np.array([le16(d[i],d[j]) for d in data if len(d)>j], dtype=float)
elif label.startswith("be16["):
i,j = map(int, label.split("[")[1].split("]")[0].split("-"))
vals = np.array([be16(d[i],d[j]) for d in data if len(d)>j], dtype=float)
else:
continue
phys = vals*row["scale"] + row["offset"]
out_png = outdir / f"{trace.stem}_{label.replace('[','_').replace(']','')}.png"
plot_timeseries(times[:len(phys)], phys, out_png, f"{trace.name} {label} (scale={row['scale']:.6g}, offset={row['offset']:.6g})", "phys (geschätzt)")
# console
cols = ["label","hit_ratio","scale","offset","p95_absdiff_phys","rate_hz_est","n","phys_min","phys_max"]
print("\nTop-Kandidaten:")
print(df_res.head(10)[cols].to_string(index=False))
else:
# Unsupervised
df_res = pd.DataFrame(results).sort_values(["smoothness","span_raw","raw_var","rate_hz_est","n"], ascending=[False, False, False, False, False])
csv_path = outdir / f"{trace.stem}_unsupervised_candidates.csv"
df_res.to_csv(csv_path, index=False)
print(f"Unsupervised-CSV: {csv_path}")
# Plots
for _, row in df_res.head(max(1, args.plots_top)).iterrows():
# regenerate series for plot
times = df["time_s"].to_numpy(dtype=float)
data = df["data"].tolist()
label = row["label"]
if label.startswith("byte["):
i = int(label.split("[")[1].split("]")[0])
vals = np.array([d[i] for d in data if len(d)>i], dtype=float)
elif label.startswith("le16["):
i,j = map(int, label.split("[")[1].split("]")[0].split("-"))
vals = np.array([le16(d[i],d[j]) for d in data if len(d)>j], dtype=float)
elif label.startswith("be16["):
i,j = map(int, label.split("[")[1].split("]")[0].split("-"))
vals = np.array([be16(d[i],d[j]) for d in data if len(d)>j], dtype=float)
else:
continue
out_png = outdir / f"{trace.stem}_{label.replace('[','_').replace(']','')}_raw.png"
plot_timeseries(times[:len(vals)], vals, out_png, f"{trace.name} {label} (raw)", "raw")
# console
cols = ["label","smoothness","span_raw","raw_var","rate_hz_est","n","uniq_ratio","p95_absdiff_raw"]
print("\nTop plausible Rohsignale:")
print(df_res.head(10)[cols].to_string(index=False))
if __name__ == "__main__":
main()