Files
Kettenoeler/Reverse-Engineering CAN-Bus/trace_batch_analyzer.py

187 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import re
import sys
import argparse
from pathlib import Path
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
LOG_PATTERN = re.compile(r"(\d+)\s+(TX|RX)\s+0x([0-9A-Fa-f]+)\s+\d+\s+((?:[0-9A-Fa-f]{2}\s+)+)")
def parse_trace(path: Path, rx_only=False) -> pd.DataFrame:
rows = []
with open(path, "r", errors="ignore") as f:
for line in f:
m = LOG_PATTERN.match(line)
if not m:
continue
ts = int(m.group(1))
dr = m.group(2)
if rx_only and dr != "RX":
continue
cid = int(m.group(3), 16)
data = [int(x, 16) for x in m.group(4).split() if x.strip()]
rows.append((ts, dr, cid, data))
df = pd.DataFrame(rows, columns=["ts","dir","id","data"])
if df.empty:
return df
df["time_s"] = (df["ts"] - df["ts"].min())/1000.0
return df
def be16(a,b): return (a<<8)|b
def le16(a,b): return a | (b<<8)
def analyze_one_trace(df: pd.DataFrame, scale=1.0, offs=0.0, rmin=None, rmax=None):
"""Return stats for all 8-bit bytes and all adjacent 16-bit pairs (LE/BE)."""
stats = []
# 8-bit
for i in range(8):
vals = [d[i] for d in df["data"] if len(d)>i]
if not vals: continue
arr = np.array(vals, dtype=float)
phys = arr*scale + offs
hit = np.ones_like(phys, dtype=bool)
if rmin is not None: hit &= (phys>=rmin)
if rmax is not None: hit &= (phys<=rmax)
stats.append({
"type":"byte8","slot":str(i),
"n":len(arr),
"min":float(arr.min()),"max":float(arr.max()),"var":float(arr.var()),
"hit_ratio": float(np.count_nonzero(hit))/len(hit) if len(hit)>0 else 0.0,
"min_phys": float(phys.min()), "max_phys": float(phys.max())
})
# 16-bit
pairs = [(i,i+1) for i in range(7)]
for i,j in pairs:
# LE
vals = [le16(d[i],d[j]) for d in df["data"] if len(d)>j]
if vals:
arr = np.array(vals, dtype=float); phys = arr*scale + offs
hit = np.ones_like(phys, dtype=bool)
if rmin is not None: hit &= (phys>=rmin)
if rmax is not None: hit &= (phys<=rmax)
stats.append({
"type":"le16","slot":f"{i}-{j}",
"n":len(arr),
"min":float(arr.min()),"max":float(arr.max()),"var":float(arr.var()),
"hit_ratio": float(np.count_nonzero(hit))/len(hit) if len(hit)>0 else 0.0,
"min_phys": float(phys.min()), "max_phys": float(phys.max())
})
# BE
vals = [be16(d[i],d[j]) for d in df["data"] if len(d)>j]
if vals:
arr = np.array(vals, dtype=float); phys = arr*scale + offs
hit = np.ones_like(phys, dtype=bool)
if rmin is not None: hit &= (phys>=rmin)
if rmax is not None: hit &= (phys<=rmax)
stats.append({
"type":"be16","slot":f"{i}-{j}",
"n":len(arr),
"min":float(arr.min()),"max":float(arr.max()),"var":float(arr.var()),
"hit_ratio": float(np.count_nonzero(hit))/len(hit) if len(hit)>0 else 0.0,
"min_phys": float(phys.min()), "max_phys": float(phys.max())
})
return pd.DataFrame(stats)
def plot_one_trace(df: pd.DataFrame, outdir: Path, prefix: str):
outdir.mkdir(parents=True, exist_ok=True)
# 8-bit plots
for i in range(8):
times, series = [], []
for t,d in zip(df["time_s"], df["data"]):
if len(d)>i:
times.append(t); series.append(d[i])
if not series: continue
import matplotlib.pyplot as plt
plt.figure(figsize=(10,4))
plt.plot(times, series, marker=".", linestyle="-")
plt.xlabel("Zeit (s)"); plt.ylabel(f"Byte[{i}] (8-bit)")
plt.title(f"{prefix} 8-bit Byte {i}")
plt.grid(True); plt.tight_layout()
plt.savefig(outdir / f"{prefix}_byte{i}.png", dpi=150); plt.close()
# 16-bit plots (LE/BE)
pairs = [(i,i+1) for i in range(7)]
for i,j in pairs:
times, series = [], []
for t,d in zip(df["time_s"], df["data"]):
if len(d)>j: times.append(t); series.append(le16(d[i],d[j]))
if series:
import matplotlib.pyplot as plt
plt.figure(figsize=(10,4))
plt.plot(times, series, marker=".", linestyle="-")
plt.xlabel("Zeit (s)"); plt.ylabel(f"LE16 @{i}-{j}")
plt.title(f"{prefix} LE16 {i}-{j}")
plt.grid(True); plt.tight_layout()
plt.savefig(outdir / f"{prefix}_le16_{i}-{j}.png", dpi=150); plt.close()
times, series = [], []
for t,d in zip(df["time_s"], df["data"]):
if len(d)>j: times.append(t); series.append(be16(d[i],d[j]))
if series:
import matplotlib.pyplot as plt
plt.figure(figsize=(10,4))
plt.plot(times, series, marker=".", linestyle="-")
plt.xlabel("Zeit (s)"); plt.ylabel(f"BE16 @{i}-{j}")
plt.title(f"{prefix} BE16 {i}-{j}")
plt.grid(True); plt.tight_layout()
plt.savefig(outdir / f"{prefix}_be16_{i}-{j}.png", dpi=150); plt.close()
def main():
ap = argparse.ArgumentParser(description="Batch analyze per-ID traces and rank 8/16-bit combinations")
ap.add_argument("--traces-dir", required=True, help="Directory containing *.trace files")
ap.add_argument("--outdir", required=True, help="Output directory for analysis results")
ap.add_argument("--rx-only", action="store_true", help="Use RX frames only")
ap.add_argument("--plots", action="store_true", help="Also generate plots for each trace")
ap.add_argument("--scale", type=float, default=1.0, help="phys = raw*scale + offset")
ap.add_argument("--offset", type=float, default=0.0, help="phys = raw*scale + offset")
ap.add_argument("--range-min", type=float, default=None, help="physical min (after scale/offset)")
ap.add_argument("--range-max", type=float, default=None, help="physical max (after scale/offset)")
ap.add_argument("--top", type=int, default=8, help="Export top combos per trace to summary")
args = ap.parse_args()
tdir = Path(args.traces_dir)
outdir = Path(args.outdir); outdir.mkdir(parents=True, exist_ok=True)
traces = sorted([p for p in tdir.glob("*.trace")])
if not traces:
print("No .trace files found.", file=sys.stderr)
sys.exit(2)
global_rows = []
for tr in traces:
df = parse_trace(tr, rx_only=args.rx_only)
if df.empty:
continue
stats = analyze_one_trace(df, args.scale, args.offset, args.range_min, args.range_max)
# Ranking: primarily by hit_ratio (if range given), else by variance; break ties by var then n
if args.range_min is not None or args.range_max is not None:
stats = stats.sort_values(["hit_ratio","var","n"], ascending=[False, False, False])
else:
stats = stats.sort_values(["var","n"], ascending=[False, False])
# write per-trace csv
per_csv = outdir / f"{tr.stem}_combostats.csv"
stats.to_csv(per_csv, index=False)
# append top rows with trace id hint
stem = tr.stem # e.g., 0x208_log1
for _, row in stats.head(args.top).iterrows():
r = row.to_dict()
r["trace"] = stem
global_rows.append(r)
# plots (optional) into a subdir per trace
if args.plots:
plot_dir = outdir / f"{tr.stem}_plots"
plot_one_trace(df, plot_dir, prefix=tr.stem)
# global summary
if global_rows:
gdf = pd.DataFrame(global_rows)
gdf.to_csv(outdir / "summary_top_combinations.csv", index=False)
print(f"Global summary written: {outdir/'summary_top_combinations.csv'}")
print(f"Processed {len(traces)} trace files. Results at: {outdir}")
if __name__ == "__main__":
main()