#!/usr/bin/env python3 import re import sys import argparse from pathlib import Path import pandas as pd import numpy as np import matplotlib.pyplot as plt LOG_PATTERN = re.compile(r"(\d+)\s+(TX|RX)\s+0x([0-9A-Fa-f]+)\s+\d+\s+((?:[0-9A-Fa-f]{2}\s+)+)") def parse_trace(path: Path, rx_only=False) -> pd.DataFrame: rows = [] with open(path, "r", errors="ignore") as f: for line in f: m = LOG_PATTERN.match(line) if not m: continue ts = int(m.group(1)) dr = m.group(2) if rx_only and dr != "RX": continue cid = int(m.group(3), 16) data = [int(x, 16) for x in m.group(4).split() if x.strip()] rows.append((ts, dr, cid, data)) df = pd.DataFrame(rows, columns=["ts","dir","id","data"]) if df.empty: return df df["time_s"] = (df["ts"] - df["ts"].min())/1000.0 return df def be16(a,b): return (a<<8)|b def le16(a,b): return a | (b<<8) def analyze_one_trace(df: pd.DataFrame, scale=1.0, offs=0.0, rmin=None, rmax=None): """Return stats for all 8-bit bytes and all adjacent 16-bit pairs (LE/BE).""" stats = [] # 8-bit for i in range(8): vals = [d[i] for d in df["data"] if len(d)>i] if not vals: continue arr = np.array(vals, dtype=float) phys = arr*scale + offs hit = np.ones_like(phys, dtype=bool) if rmin is not None: hit &= (phys>=rmin) if rmax is not None: hit &= (phys<=rmax) stats.append({ "type":"byte8","slot":str(i), "n":len(arr), "min":float(arr.min()),"max":float(arr.max()),"var":float(arr.var()), "hit_ratio": float(np.count_nonzero(hit))/len(hit) if len(hit)>0 else 0.0, "min_phys": float(phys.min()), "max_phys": float(phys.max()) }) # 16-bit pairs = [(i,i+1) for i in range(7)] for i,j in pairs: # LE vals = [le16(d[i],d[j]) for d in df["data"] if len(d)>j] if vals: arr = np.array(vals, dtype=float); phys = arr*scale + offs hit = np.ones_like(phys, dtype=bool) if rmin is not None: hit &= (phys>=rmin) if rmax is not None: hit &= (phys<=rmax) stats.append({ "type":"le16","slot":f"{i}-{j}", "n":len(arr), "min":float(arr.min()),"max":float(arr.max()),"var":float(arr.var()), "hit_ratio": float(np.count_nonzero(hit))/len(hit) if len(hit)>0 else 0.0, "min_phys": float(phys.min()), "max_phys": float(phys.max()) }) # BE vals = [be16(d[i],d[j]) for d in df["data"] if len(d)>j] if vals: arr = np.array(vals, dtype=float); phys = arr*scale + offs hit = np.ones_like(phys, dtype=bool) if rmin is not None: hit &= (phys>=rmin) if rmax is not None: hit &= (phys<=rmax) stats.append({ "type":"be16","slot":f"{i}-{j}", "n":len(arr), "min":float(arr.min()),"max":float(arr.max()),"var":float(arr.var()), "hit_ratio": float(np.count_nonzero(hit))/len(hit) if len(hit)>0 else 0.0, "min_phys": float(phys.min()), "max_phys": float(phys.max()) }) return pd.DataFrame(stats) def plot_one_trace(df: pd.DataFrame, outdir: Path, prefix: str): outdir.mkdir(parents=True, exist_ok=True) # 8-bit plots for i in range(8): times, series = [], [] for t,d in zip(df["time_s"], df["data"]): if len(d)>i: times.append(t); series.append(d[i]) if not series: continue import matplotlib.pyplot as plt plt.figure(figsize=(10,4)) plt.plot(times, series, marker=".", linestyle="-") plt.xlabel("Zeit (s)"); plt.ylabel(f"Byte[{i}] (8-bit)") plt.title(f"{prefix} – 8-bit Byte {i}") plt.grid(True); plt.tight_layout() plt.savefig(outdir / f"{prefix}_byte{i}.png", dpi=150); plt.close() # 16-bit plots (LE/BE) pairs = [(i,i+1) for i in range(7)] for i,j in pairs: times, series = [], [] for t,d in zip(df["time_s"], df["data"]): if len(d)>j: times.append(t); series.append(le16(d[i],d[j])) if series: import matplotlib.pyplot as plt plt.figure(figsize=(10,4)) plt.plot(times, series, marker=".", linestyle="-") plt.xlabel("Zeit (s)"); plt.ylabel(f"LE16 @{i}-{j}") plt.title(f"{prefix} – LE16 {i}-{j}") plt.grid(True); plt.tight_layout() plt.savefig(outdir / f"{prefix}_le16_{i}-{j}.png", dpi=150); plt.close() times, series = [], [] for t,d in zip(df["time_s"], df["data"]): if len(d)>j: times.append(t); series.append(be16(d[i],d[j])) if series: import matplotlib.pyplot as plt plt.figure(figsize=(10,4)) plt.plot(times, series, marker=".", linestyle="-") plt.xlabel("Zeit (s)"); plt.ylabel(f"BE16 @{i}-{j}") plt.title(f"{prefix} – BE16 {i}-{j}") plt.grid(True); plt.tight_layout() plt.savefig(outdir / f"{prefix}_be16_{i}-{j}.png", dpi=150); plt.close() def main(): ap = argparse.ArgumentParser(description="Batch analyze per-ID traces and rank 8/16-bit combinations") ap.add_argument("--traces-dir", required=True, help="Directory containing *.trace files") ap.add_argument("--outdir", required=True, help="Output directory for analysis results") ap.add_argument("--rx-only", action="store_true", help="Use RX frames only") ap.add_argument("--plots", action="store_true", help="Also generate plots for each trace") ap.add_argument("--scale", type=float, default=1.0, help="phys = raw*scale + offset") ap.add_argument("--offset", type=float, default=0.0, help="phys = raw*scale + offset") ap.add_argument("--range-min", type=float, default=None, help="physical min (after scale/offset)") ap.add_argument("--range-max", type=float, default=None, help="physical max (after scale/offset)") ap.add_argument("--top", type=int, default=8, help="Export top combos per trace to summary") args = ap.parse_args() tdir = Path(args.traces_dir) outdir = Path(args.outdir); outdir.mkdir(parents=True, exist_ok=True) traces = sorted([p for p in tdir.glob("*.trace")]) if not traces: print("No .trace files found.", file=sys.stderr) sys.exit(2) global_rows = [] for tr in traces: df = parse_trace(tr, rx_only=args.rx_only) if df.empty: continue stats = analyze_one_trace(df, args.scale, args.offset, args.range_min, args.range_max) # Ranking: primarily by hit_ratio (if range given), else by variance; break ties by var then n if args.range_min is not None or args.range_max is not None: stats = stats.sort_values(["hit_ratio","var","n"], ascending=[False, False, False]) else: stats = stats.sort_values(["var","n"], ascending=[False, False]) # write per-trace csv per_csv = outdir / f"{tr.stem}_combostats.csv" stats.to_csv(per_csv, index=False) # append top rows with trace id hint stem = tr.stem # e.g., 0x208_log1 for _, row in stats.head(args.top).iterrows(): r = row.to_dict() r["trace"] = stem global_rows.append(r) # plots (optional) into a subdir per trace if args.plots: plot_dir = outdir / f"{tr.stem}_plots" plot_one_trace(df, plot_dir, prefix=tr.stem) # global summary if global_rows: gdf = pd.DataFrame(global_rows) gdf.to_csv(outdir / "summary_top_combinations.csv", index=False) print(f"Global summary written: {outdir/'summary_top_combinations.csv'}") print(f"Processed {len(traces)} trace files. Results at: {outdir}") if __name__ == "__main__": main()