Files
Kettenoeler/Reverse-Engineering CAN-Bus/main.py
Marcel Peterkau a9053997a1
All checks were successful
CI-Build/Kettenoeler/pipeline/head This commit looks good
update of trace_signal_fitter.py and some doc
2025-08-27 23:59:57 +02:00

974 lines
43 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import json
import os
import sys
import threading
import subprocess
import shutil
from pathlib import Path
from datetime import datetime
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import tempfile
SCRIPT_NAME = "can_universal_signal_finder.py"
SPLIT_SCRIPT = "can_split_by_id.py"
EXPLORE_SCRIPT = "id_signal_explorer.py"
TRACE_BATCH = "trace_batch_analyzer.py"
RANGE_FITTER = "trace_signal_fitter.py"
LOG_PATTERNS = ("*.log", "*.txt")
TRACE_PATTERNS = ("*.trace",)
# ---------------- helpers ----------------
def now_stamp():
return datetime.now().strftime("%Y%m%d_%H%M%S")
def find_logs(root: Path, rel_logs_dir: str):
base = (root / rel_logs_dir) if rel_logs_dir else root
found = []
if not base.exists():
return found
for pat in LOG_PATTERNS:
found += [str(p) for p in base.glob(pat)]
found += [str(p) for p in base.rglob(pat)] # include subdirs
return sorted(set(found))
def find_traces(base: Path):
"""Liste .trace im Basisordner und eine Ebene tiefer."""
files = []
if not base.exists():
return files
for pat in TRACE_PATTERNS:
files += [str(p) for p in base.glob(pat)]
files += [str(p) for p in base.glob(f"*/*{pat[1:]}")] # eine Ebene tiefer
return sorted(set(files))
def ensure_dir(p: Path):
p.mkdir(parents=True, exist_ok=True)
return p
def latest_subdir(base: Path) -> Path:
"""Neuester Unterordner in base, sonst base selbst."""
if not base.exists():
return base
subs = [p for p in base.iterdir() if p.is_dir()]
if not subs:
return base
return max(subs, key=lambda p: p.stat().st_mtime)
# ---------------- shared app state ----------------
class AppState:
def __init__(self):
# core paths
self.workdir = tk.StringVar(value="")
self.logs_dir = tk.StringVar(value="logs")
self.traces_dir = tk.StringVar(value="traces")
self.analyze_out_base = tk.StringVar(value="analyze_out")
# discovered logs
self.available_logs = [] # absolute paths
self.selected_log_indices = [] # indices in header listbox
# project defaults
self.timestamp_runs = tk.BooleanVar(value=True)
# shared traces directory + file list
self.traces_current_dir = tk.StringVar(value="") # absoluter Pfad zum aktuell angezeigten Traces-Ordner
self.traces_files = [] # Liste der .trace in current_dir (inkl. eine Ebene tiefer)
self._trace_observers = [] # callbacks, die Liste aktualisieren
# hook: wenn der Pfad geändert wird, scannen
self.traces_current_dir.trace_add("write", self._on_traces_dir_changed)
# --- path helpers ---
def workdir_path(self) -> Path:
wd = self.workdir.get().strip() or "."
return Path(wd)
def logs_base_path(self) -> Path:
return self.workdir_path() / (self.logs_dir.get().strip() or "logs")
def traces_base_path(self) -> Path:
return self.workdir_path() / (self.traces_dir.get().strip() or "traces")
def analyze_out_root(self) -> Path:
return self.workdir_path() / (self.analyze_out_base.get().strip() or "analyze_out")
# --- traces state ---
def add_trace_observer(self, cb):
if cb not in self._trace_observers:
self._trace_observers.append(cb)
def _notify_trace_observers(self):
for cb in list(self._trace_observers):
try:
cb(self.traces_files)
except Exception:
pass
def _on_traces_dir_changed(self, *_):
base = Path(self.traces_current_dir.get().strip() or str(self.traces_base_path()))
self.traces_files = find_traces(base)
self._notify_trace_observers()
def set_traces_dir(self, path: str):
self.traces_current_dir.set(path) # löst automatisch scan + notify aus
def refresh_traces(self):
# retrigger write to force refresh
self._on_traces_dir_changed()
def set_traces_to_default_or_latest(self):
base = self.traces_base_path()
target = latest_subdir(base)
self.set_traces_dir(str(target))
# ---------------- header (workdir + logs selection) ----------------
class Header(ttk.Frame):
def __init__(self, master, state: AppState):
super().__init__(master, padding=8)
self.state = state
self._build_ui()
def _build_ui(self):
self.columnconfigure(1, weight=1)
self.columnconfigure(3, weight=1)
# row 0: workdir + scan
ttk.Label(self, text="Workdir:").grid(row=0, column=0, sticky="w")
self.ent_workdir = ttk.Entry(self, textvariable=self.state.workdir)
self.ent_workdir.grid(row=0, column=1, sticky="ew", padx=6)
ttk.Button(self, text="Wählen…", command=self.pick_workdir).grid(row=0, column=2, padx=5)
ttk.Button(self, text="Logs scannen", command=self.scan_logs).grid(row=0, column=3, padx=5)
# row 1: subfolders + timestamp checkbox
ttk.Label(self, text="Logs-Unterordner:").grid(row=1, column=0, sticky="w")
ttk.Entry(self, textvariable=self.state.logs_dir, width=24).grid(row=1, column=1, sticky="w", padx=6)
ttk.Label(self, text="Traces-Unterordner:").grid(row=1, column=2, sticky="w")
ttk.Entry(self, textvariable=self.state.traces_dir, width=24).grid(row=1, column=3, sticky="w", padx=6)
ttk.Label(self, text="Analyze-Output:").grid(row=2, column=0, sticky="w")
ttk.Entry(self, textvariable=self.state.analyze_out_base, width=24).grid(row=2, column=1, sticky="w", padx=6)
ttk.Checkbutton(self, text="Zeitstempel-Unterordner pro Run", variable=self.state.timestamp_runs).grid(row=2, column=2, columnspan=2, sticky="w")
# row 3: logs list
frm = ttk.LabelFrame(self, text="Gefundene Logdateien (Mehrfachauswahl möglich)")
frm.grid(row=3, column=0, columnspan=4, sticky="nsew", pady=(8,0))
self.rowconfigure(3, weight=1)
frm.columnconfigure(0, weight=1)
frm.rowconfigure(0, weight=1)
self.lst_logs = tk.Listbox(frm, height=6, selectmode=tk.EXTENDED)
self.lst_logs.grid(row=0, column=0, sticky="nsew", padx=(8,4), pady=8)
btns = ttk.Frame(frm)
btns.grid(row=0, column=1, sticky="ns", padx=(4,8), pady=8)
ttk.Button(btns, text="Alle wählen", command=self.select_all).pack(fill="x", pady=2)
ttk.Button(btns, text="Keine", command=self.select_none).pack(fill="x", pady=2)
ttk.Separator(btns, orient="horizontal").pack(fill="x", pady=6)
ttk.Button(btns, text="Manuell hinzufügen…", command=self.add_logs_manual).pack(fill="x", pady=2)
ttk.Button(btns, text="Entfernen", command=self.remove_selected_logs).pack(fill="x", pady=2)
ttk.Button(btns, text="Liste leeren", command=self.clear_logs).pack(fill="x", pady=2)
ttk.Separator(btns, orient="horizontal").pack(fill="x", pady=6)
ttk.Button(btns, text="Projekt speichern…", command=self.save_project).pack(fill="x", pady=2)
ttk.Button(btns, text="Projekt laden…", command=self.load_project).pack(fill="x", pady=2)
# ---- actions ----
def pick_workdir(self):
d = filedialog.askdirectory(title="Workdir auswählen")
if d:
self.state.workdir.set(d)
self.scan_logs()
# automatisch auch traces default/latest setzen
self.state.set_traces_to_default_or_latest()
def scan_logs(self):
wd = self.state.workdir_path()
logs_dir = self.state.logs_dir.get().strip()
found = find_logs(wd, logs_dir)
self.state.available_logs = found
self.lst_logs.delete(0, tk.END)
for p in found:
self.lst_logs.insert(tk.END, p)
# default-select all
self.lst_logs.select_set(0, tk.END)
self.state.selected_log_indices = list(range(len(found)))
def select_all(self):
self.lst_logs.select_set(0, tk.END)
self.state.selected_log_indices = list(range(self.lst_logs.size()))
def select_none(self):
self.lst_logs.select_clear(0, tk.END)
self.state.selected_log_indices = []
def add_logs_manual(self):
paths = filedialog.askopenfilenames(title="Logdateien auswählen", filetypes=[("Logfiles","*.log *.txt"),("Alle Dateien","*.*")])
if not paths: return
for p in paths:
if p not in self.state.available_logs:
self.state.available_logs.append(p)
self.lst_logs.insert(tk.END, p)
# if workdir empty, infer from first added
if not self.state.workdir.get().strip():
self.state.workdir.set(str(Path(paths[0]).resolve().parent))
# auch traces default/latest
self.state.set_traces_to_default_or_latest()
def remove_selected_logs(self):
sel = list(self.lst_logs.curselection())
sel.reverse()
for i in sel:
p = self.lst_logs.get(i)
if p in self.state.available_logs:
self.state.available_logs.remove(p)
self.lst_logs.delete(i)
self.state.selected_log_indices = [i for i in range(self.lst_logs.size()) if self.lst_logs.select_includes(i)]
def clear_logs(self):
self.state.available_logs = []
self.lst_logs.delete(0, tk.END)
self.state.selected_log_indices = []
def selected_logs(self):
idx = self.lst_logs.curselection()
if not idx:
return []
return [self.lst_logs.get(i) for i in idx]
# ---- project save/load ----
def collect_project(self):
return {
"workdir": self.state.workdir.get(),
"logs_dir": self.state.logs_dir.get(),
"traces_dir": self.state.traces_dir.get(),
"analyze_out_base": self.state.analyze_out_base.get(),
"timestamp_runs": bool(self.state.timestamp_runs.get()),
"available_logs": self.state.available_logs,
"selected_indices": list(self.lst_logs.curselection()),
"traces_current_dir": self.state.traces_current_dir.get(),
}
def apply_project(self, cfg):
self.state.workdir.set(cfg.get("workdir",""))
self.state.logs_dir.set(cfg.get("logs_dir","logs"))
self.state.traces_dir.set(cfg.get("traces_dir","traces"))
self.state.analyze_out_base.set(cfg.get("analyze_out_base","analyze_out"))
self.state.timestamp_runs.set(cfg.get("timestamp_runs", True))
# restore logs
self.scan_logs()
# If project contained explicit available_logs, merge
for p in cfg.get("available_logs", []):
if p not in self.state.available_logs:
self.state.available_logs.append(p)
self.lst_logs.insert(tk.END, p)
# re-select indices if valid
self.lst_logs.select_clear(0, tk.END)
for i in cfg.get("selected_indices", []):
if 0 <= i < self.lst_logs.size():
self.lst_logs.select_set(i)
# traces current dir: sofern vorhanden nutzen, sonst default/latest
tdir = cfg.get("traces_current_dir", "")
if tdir and Path(tdir).exists():
self.state.set_traces_dir(tdir)
else:
self.state.set_traces_to_default_or_latest()
def save_project(self):
cfg = self.collect_project()
path = filedialog.asksaveasfilename(title="Projekt speichern", defaultextension=".json", filetypes=[("Projektdatei","*.json")])
if not path: return
with open(path, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2)
messagebox.showinfo("Gespeichert", f"Projekt gespeichert:\n{path}")
def load_project(self):
path = filedialog.askopenfilename(title="Projekt laden", filetypes=[("Projektdatei","*.json"),("Alle Dateien","*.*")])
if not path: return
with open(path, "r", encoding="utf-8") as f:
cfg = json.load(f)
self.apply_project(cfg)
messagebox.showinfo("Geladen", f"Projekt geladen:\n{path}")
# ---------------- shared Trace Panel ----------------
class TracePanel(ttk.LabelFrame):
"""
Einheitliche Trace-Auswahl: Liste links, Buttons rechts.
Nutzt AppState.traces_current_dir + AppState.traces_files.
single_select=True => Listbox SINGLE, versteckt 'Alle/Keine'.
"""
def __init__(self, master, state: AppState, title="Traces", single_select=False, height=10):
super().__init__(master, text=title)
self.state = state
self.single_select = single_select
self.height = height
self._build_ui()
# subscribe to state updates
self.state.add_trace_observer(self._on_traces_updated)
# initial fill from state
self._on_traces_updated(self.state.traces_files)
def _build_ui(self):
self.columnconfigure(0, weight=1)
self.rowconfigure(0, weight=1)
selectmode = tk.SINGLE if self.single_select else tk.EXTENDED
self.lst = tk.Listbox(self, height=self.height, selectmode=selectmode)
self.lst.grid(row=0, column=0, sticky="nsew", padx=(8,4), pady=8)
btns = ttk.Frame(self)
btns.grid(row=0, column=1, sticky="ns", padx=(4,8), pady=8)
ttk.Button(btns, text="Traces-Ordner wählen…", command=self._pick_traces_dir).pack(fill="x", pady=2)
ttk.Button(btns, text="Workdir/traces", command=self._use_default_traces).pack(fill="x", pady=2)
ttk.Button(btns, text="Neuester Split", command=self._use_latest_split).pack(fill="x", pady=2)
ttk.Button(btns, text="Refresh", command=self._refresh_traces).pack(fill="x", pady=6)
if not self.single_select:
ttk.Button(btns, text="Alle wählen", command=lambda: self.lst.select_set(0, tk.END)).pack(fill="x", pady=2)
ttk.Button(btns, text="Keine", command=lambda: self.lst.select_clear(0, tk.END)).pack(fill="x", pady=2)
# --- state sync ---
def _on_traces_updated(self, files):
# refresh list content
cur_sel_paths = self.get_selected()
self.lst.delete(0, tk.END)
for p in files:
self.lst.insert(tk.END, p)
# try to restore selection
if cur_sel_paths:
path_to_index = {self.lst.get(i): i for i in range(self.lst.size())}
for p in cur_sel_paths:
if p in path_to_index:
self.lst.select_set(path_to_index[p])
def _pick_traces_dir(self):
d = filedialog.askdirectory(title="Traces-Ordner wählen", initialdir=str(self.state.traces_base_path()))
if d:
self.state.set_traces_dir(d)
def _use_default_traces(self):
# default or latest under Workdir/traces
self.state.set_traces_to_default_or_latest()
def _use_latest_split(self):
base = self.state.traces_base_path()
target = latest_subdir(base)
self.state.set_traces_dir(str(target))
def _refresh_traces(self):
self.state.refresh_traces()
def get_selected(self):
idx = self.lst.curselection()
return [self.lst.get(i) for i in idx]
# ---------------- Tab 1: Multi-Log Analyse (ranking optional) ----------------
class TabAnalyze(ttk.Frame):
def __init__(self, master, state: AppState, header: Header):
super().__init__(master, padding=10)
self.state = state
self.header = header
self._build_ui()
def _build_ui(self):
self.columnconfigure(0, weight=1)
self.rowconfigure(2, weight=1)
# params
params = ttk.LabelFrame(self, text="Analyse-Parameter")
params.grid(row=0, column=0, sticky="nsew", padx=5, pady=5)
for c in (1,3):
params.columnconfigure(c, weight=1)
ttk.Label(params, text="Include-IDs (z.B. 0x208,0x209):").grid(row=0, column=0, sticky="w")
self.include_var = tk.StringVar(value="")
ttk.Entry(params, textvariable=self.include_var).grid(row=0, column=1, sticky="ew", padx=5)
ttk.Label(params, text="Exclude-IDs:").grid(row=0, column=2, sticky="w")
self.exclude_var = tk.StringVar(value="")
ttk.Entry(params, textvariable=self.exclude_var).grid(row=0, column=3, sticky="ew", padx=5)
ttk.Label(params, text="Scale:").grid(row=1, column=0, sticky="w")
self.scale_var = tk.DoubleVar(value=1.0)
ttk.Entry(params, textvariable=self.scale_var, width=12).grid(row=1, column=1, sticky="w", padx=5)
ttk.Label(params, text="Offset:").grid(row=1, column=2, sticky="w")
self.offset_var = tk.DoubleVar(value=0.0)
ttk.Entry(params, textvariable=self.offset_var, width=12).grid(row=1, column=3, sticky="w", padx=5)
ttk.Label(params, text="Range-Min:").grid(row=2, column=0, sticky="w")
self.rmin_var = tk.StringVar(value="")
ttk.Entry(params, textvariable=self.rmin_var, width=12).grid(row=2, column=1, sticky="w", padx=5)
ttk.Label(params, text="Range-Max:").grid(row=2, column=2, sticky="w")
self.rmax_var = tk.StringVar(value="")
ttk.Entry(params, textvariable=self.rmax_var, width=12).grid(row=2, column=3, sticky="w", padx=5)
ttk.Label(params, text="Range-Hit-Ratio (0..1):").grid(row=3, column=0, sticky="w")
self.hit_ratio_var = tk.DoubleVar(value=0.6)
ttk.Entry(params, textvariable=self.hit_ratio_var, width=12).grid(row=3, column=1, sticky="w", padx=5)
ttk.Label(params, text="Top-N (Fallback):").grid(row=3, column=2, sticky="w")
self.top_var = tk.IntVar(value=20)
ttk.Entry(params, textvariable=self.top_var, width=12).grid(row=3, column=3, sticky="w", padx=5)
ttk.Label(params, text="Per-ID-Limit:").grid(row=4, column=0, sticky="w")
self.per_id_limit_var = tk.IntVar(value=2)
ttk.Entry(params, textvariable=self.per_id_limit_var, width=12).grid(row=4, column=1, sticky="w", padx=5)
self.run_separately_var = tk.BooleanVar(value=False)
ttk.Checkbutton(params, text="Jede Logdatei separat laufen lassen", variable=self.run_separately_var).grid(row=4, column=2, columnspan=2, sticky="w", padx=5)
# run + console
run = ttk.Frame(self)
run.grid(row=1, column=0, sticky="ew", padx=5, pady=5)
ttk.Button(run, text="Analyse starten (Ranking)", command=self.on_run).pack(side="left", padx=5)
out = ttk.LabelFrame(self, text="Ausgabe")
out.grid(row=2, column=0, sticky="nsew", padx=5, pady=5)
out.columnconfigure(0, weight=1); out.rowconfigure(0, weight=1)
self.txt = tk.Text(out, height=12); self.txt.grid(row=0, column=0, sticky="nsew")
sb = ttk.Scrollbar(out, orient="vertical", command=self.txt.yview); sb.grid(row=0, column=1, sticky="ns")
self.txt.configure(yscrollcommand=sb.set)
def on_run(self):
logs = self.header.selected_logs()
if not logs:
messagebox.showwarning("Hinweis", "Bitte oben im Header Logdateien auswählen.")
return
t = threading.Thread(target=self._run_worker, args=(logs,), daemon=True)
self.txt.delete("1.0", tk.END)
self._append("Starte Analyse…\n")
t.start()
def _run_worker(self, logs):
script_path = Path(__file__).parent / SCRIPT_NAME
if not script_path.exists():
self._append(f"[Fehler] Script nicht gefunden: {script_path}\n"); return
# output root: workdir/analyze_out/<ts>_multilog
out_root = self.state.analyze_out_root()
stamp = now_stamp() + "_multilog"
outdir = ensure_dir(out_root / stamp)
def build_args():
args = [sys.executable, str(script_path)]
if self.include_var.get().strip():
args += ["--include-ids", self.include_var.get().strip()]
if self.exclude_var.get().strip():
args += ["--exclude-ids", self.exclude_var.get().strip()]
args += ["--scale", str(self.scale_var.get()), "--offset", str(self.offset_var.get())]
if self.rmin_var.get().strip(): args += ["--range-min", self.rmin_var.get().strip()]
if self.rmax_var.get().strip(): args += ["--range-max", self.rmax_var.get().strip()]
args += ["--range-hit-ratio", str(self.hit_ratio_var.get())]
args += ["--top", str(self.top_var.get()), "--per-id-limit", str(self.per_id_limit_var.get())]
return args
if self.run_separately_var.get():
for p in logs:
sub = ensure_dir(outdir / Path(p).stem)
cmd = build_args() + ["--outdir", str(sub), p]
self._run_cmd(cmd)
else:
cmd = build_args() + ["--outdir", str(outdir)] + logs
self._run_cmd(cmd)
self._append(f"\nDone. Output: {outdir}\n")
def _run_cmd(self, cmd):
self._append(f"\n>>> RUN: {' '.join(cmd)}\n")
try:
with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) as proc:
for line in proc.stdout: self._append(line)
rc = proc.wait()
if rc != 0: self._append(f"[Exit-Code {rc}]\n")
except Exception as e:
self._append(f"[Fehler] {e}\n")
def _append(self, s): self.txt.insert(tk.END, s); self.txt.see(tk.END)
# ---------------- Tab 2: ID Explorer (split + single-ID analyze) ----------------
class TabExplorer(ttk.Frame):
def __init__(self, master, state: AppState, header: Header):
super().__init__(master, padding=10)
self.state = state
self.header = header
self._build_ui()
def _build_ui(self):
self.columnconfigure(0, weight=1)
self.rowconfigure(3, weight=1)
# split controls
frm_split = ttk.LabelFrame(self, text="Split: Logs → per-ID Traces")
frm_split.grid(row=0, column=0, sticky="ew", padx=5, pady=5)
frm_split.columnconfigure(1, weight=1)
self.rx_only_var = tk.BooleanVar(value=False)
self.ts_split_var = tk.BooleanVar(value=True)
ttk.Label(frm_split, text="Ziel (Workdir/traces[/timestamp])").grid(row=0, column=0, sticky="w", padx=5)
ttk.Label(frm_split, textvariable=self.state.traces_dir).grid(row=0, column=1, sticky="w")
ttk.Checkbutton(frm_split, text="nur RX", variable=self.rx_only_var).grid(row=1, column=0, sticky="w", padx=5)
ttk.Checkbutton(frm_split, text="Zeitstempel-Unterordner", variable=self.ts_split_var).grid(row=1, column=1, sticky="w", padx=5)
ttk.Button(frm_split, text="Split starten", command=self.on_split).grid(row=1, column=2, sticky="e", padx=5)
# unified trace panel (multi-select)
self.trace_panel = TracePanel(self, self.state, title="Traces im ausgewählten Ordner", single_select=False, height=10)
self.trace_panel.grid(row=1, column=0, sticky="nsew", padx=5, pady=(8,10))
# single-ID analyze
frm_one = ttk.LabelFrame(self, text="Einzel-ID Analyse (Plots + summary_stats)")
frm_one.grid(row=2, column=0, sticky="nsew", padx=5, pady=5)
frm_one.columnconfigure(1, weight=1)
ttk.Label(frm_one, text="Output-Basis (unter Workdir/analyze_out):").grid(row=0, column=0, sticky="w")
self.one_out_base = tk.StringVar(value="id_explore")
ttk.Entry(frm_one, textvariable=self.one_out_base).grid(row=0, column=1, sticky="ew", padx=5)
self.ts_one = tk.BooleanVar(value=True)
ttk.Checkbutton(frm_one, text="Zeitstempel-Unterordner", variable=self.ts_one).grid(row=0, column=2, sticky="w", padx=5)
ttk.Button(frm_one, text="Analyse starten", command=self.on_one_analyze).grid(row=0, column=3, sticky="e", padx=5)
# console
out = ttk.LabelFrame(self, text="Ausgabe")
out.grid(row=3, column=0, sticky="nsew", padx=5, pady=5)
out.columnconfigure(0, weight=1); out.rowconfigure(0, weight=1)
self.txt = tk.Text(out, height=12); self.txt.grid(row=0, column=0, sticky="nsew")
sb = ttk.Scrollbar(out, orient="vertical", command=self.txt.yview); sb.grid(row=0, column=1, sticky="ns")
self.txt.configure(yscrollcommand=sb.set)
def on_split(self):
logs = self.header.selected_logs()
if not logs:
messagebox.showwarning("Hinweis", "Bitte oben im Header Logdateien auswählen."); return
outdir = self.state.traces_base_path()
if self.ts_split_var.get(): outdir = outdir / now_stamp()
ensure_dir(outdir)
cmd = [sys.executable, str(Path(__file__).parent / SPLIT_SCRIPT), "--outdir", str(outdir)]
if self.rx_only_var.get(): cmd.append("--rx-only")
cmd += logs
self._run_cmd(cmd)
# nach dem Split: globalen Traces-Ordner setzen (neuester Ordner)
self.state.set_traces_dir(str(outdir))
def on_one_analyze(self):
sel = self.trace_panel.get_selected()
if not sel:
messagebox.showwarning("Hinweis", "Bitte mindestens eine .trace auswählen."); return
out_root = self.state.analyze_out_root()
stamp = now_stamp() + "_id_explore" if self.ts_one.get() else "id_explore"
outdir = ensure_dir(out_root / stamp)
for trace in sel:
cmd = [sys.executable, str(Path(__file__).parent / EXPLORE_SCRIPT), "--outdir", str(outdir), trace]
self._run_cmd(cmd)
self._append(f"\nDone. Output: {outdir}\n")
def _run_cmd(self, cmd):
self._append(f"\n>>> RUN: {' '.join(cmd)}\n")
try:
with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) as proc:
for line in proc.stdout: self._append(line)
rc = proc.wait()
if rc != 0: self._append(f"[Exit-Code {rc}]\n")
except Exception as e:
self._append(f"[Fehler] {e}\n")
def _append(self, s): self.txt.insert(tk.END, s); self.txt.see(tk.END)
# ---------------- Tab 3: Traces Batch-Analyse ----------------
class TabTraceBatch(ttk.Frame):
def __init__(self, master, state: AppState, header: Header):
super().__init__(master, padding=10)
self.state = state
self.header = header
self._build_ui()
def _build_ui(self):
self.columnconfigure(0, weight=1)
self.rowconfigure(2, weight=1)
# unified trace panel (multi-select)
self.trace_panel = TracePanel(self, self.state, title="Traces (Ordner/Subset wählen)", single_select=False, height=10)
self.trace_panel.grid(row=0, column=0, sticky="nsew", padx=5, pady=(5,10))
# Params
pr = ttk.LabelFrame(self, text="Analyse-Parameter")
pr.grid(row=1, column=0, sticky="nsew", padx=5, pady=5)
for c in (1,3):
pr.columnconfigure(c, weight=1)
self.rx_only = tk.BooleanVar(value=False)
ttk.Checkbutton(pr, text="nur RX", variable=self.rx_only).grid(row=0, column=0, sticky="w", padx=5)
ttk.Label(pr, text="Scale").grid(row=0, column=1, sticky="e")
self.scale = tk.DoubleVar(value=1.0)
ttk.Entry(pr, textvariable=self.scale, width=12).grid(row=0, column=2, sticky="w", padx=5)
ttk.Label(pr, text="Offset").grid(row=0, column=3, sticky="e")
self.offset = tk.DoubleVar(value=0.0)
ttk.Entry(pr, textvariable=self.offset, width=12).grid(row=0, column=4, sticky="w", padx=5)
ttk.Label(pr, text="Range-Min").grid(row=1, column=1, sticky="e")
self.rmin = tk.StringVar(value="")
ttk.Entry(pr, textvariable=self.rmin, width=12).grid(row=1, column=2, sticky="w", padx=5)
ttk.Label(pr, text="Range-Max").grid(row=1, column=3, sticky="e")
self.rmax = tk.StringVar(value="")
ttk.Entry(pr, textvariable=self.rmax, width=12).grid(row=1, column=4, sticky="w", padx=5)
ttk.Label(pr, text="Top pro Trace").grid(row=2, column=1, sticky="e")
self.top = tk.IntVar(value=8)
ttk.Entry(pr, textvariable=self.top, width=12).grid(row=2, column=2, sticky="w", padx=5)
self.use_ts = tk.BooleanVar(value=True)
ttk.Checkbutton(pr, text="Zeitstempel-Unterordner", variable=self.use_ts).grid(row=2, column=3, sticky="w", padx=5)
# Run & console
run = ttk.Frame(self)
run.grid(row=3, column=0, sticky="ew", padx=5, pady=5)
ttk.Button(run, text="Batch starten", command=self.on_run).pack(side="left", padx=5)
out = ttk.LabelFrame(self, text="Ausgabe")
out.grid(row=4, column=0, sticky="nsew", padx=5, pady=5)
out.columnconfigure(0, weight=1); out.rowconfigure(0, weight=1)
self.txt = tk.Text(out, height=12); self.txt.grid(row=0, column=0, sticky="nsew")
sb = ttk.Scrollbar(out, orient="vertical", command=self.txt.yview); sb.grid(row=0, column=1, sticky="ns")
self.txt.configure(yscrollcommand=sb.set)
def on_run(self):
# nutze Auswahl oder falls leer kompletten Ordner
selected = self.trace_panel.get_selected()
traces_dir = Path(self.state.traces_current_dir.get().strip() or str(self.state.traces_base_path()))
if not traces_dir.exists():
messagebox.showwarning("Hinweis", "Bitte gültigen Traces-Ordner wählen."); return
out_root = self.state.analyze_out_root()
label = "trace_batch"
stamp = now_stamp() + "_" + label if self.use_ts.get() else label
outdir = ensure_dir(out_root / stamp)
# falls Auswahl getroffen wurde, temporären Subset-Ordner bauen
subset_dir = None
if selected:
subset_dir = ensure_dir(outdir / "_subset")
for p in selected:
src = Path(p)
dst = subset_dir / src.name
try:
# versuchen Hardlink (schnell, platzsparend)
if dst.exists():
dst.unlink()
os.link(src, dst)
except Exception:
# Fallback: Kopieren
shutil.copy2(src, dst)
run_dir = subset_dir if subset_dir else traces_dir
cmd = [sys.executable, str(Path(__file__).parent/TRACE_BATCH),
"--traces-dir", str(run_dir), "--outdir", str(outdir),
"--scale", str(self.scale.get()), "--offset", str(self.offset.get()),
"--top", str(self.top.get()), "--plots"]
if self.rmin.get().strip(): cmd += ["--range-min", self.rmin.get().strip()]
if self.rmax.get().strip(): cmd += ["--range-max", self.rmax.get().strip()]
if self.rx_only.get(): cmd.append("--rx-only")
self._run_cmd(cmd)
self._append(f"\nDone. Output: {outdir}\n")
def _run_cmd(self, cmd):
self._append(f"\n>>> RUN: {' '.join(cmd)}\n")
try:
with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) as proc:
for line in proc.stdout: self._append(line)
rc = proc.wait()
if rc != 0: self._append(f"[Exit-Code {rc}]\n")
except Exception as e:
self._append(f"[Fehler] {e}\n")
def _append(self, s): self.txt.insert(tk.END, s); self.txt.see(tk.END)
# ---------------- Tab 4: Range-Fit (supervised + unsupervised, mit Physik-Constraints) ----------------
class TabRangeFit(ttk.Frame):
def __init__(self, master, state: AppState, header: Header):
super().__init__(master, padding=10)
self.state = state
self.header = header
self._last_outdir = None
self._build_ui()
def _build_ui(self):
self.columnconfigure(0, weight=1)
self.rowconfigure(3, weight=1)
# unified trace panel (single-select)
self.trace_panel = TracePanel(self, self.state, title="Trace wählen (Single)", single_select=True, height=10)
self.trace_panel.grid(row=0, column=0, sticky="nsew", padx=5, pady=(5,10))
# Parameter Frames
frm_params = ttk.Frame(self)
frm_params.grid(row=1, column=0, sticky="nsew", padx=5, pady=5)
for c in range(6):
frm_params.columnconfigure(c, weight=1)
# --- Supervised (Range & Physik) ---
box_sup = ttk.LabelFrame(frm_params, text="Supervised (Range-Fit) lasse leer für Unsupervised")
box_sup.grid(row=0, column=0, columnspan=6, sticky="nsew", padx=5, pady=5)
for c in range(6):
box_sup.columnconfigure(c, weight=1)
ttk.Label(box_sup, text="Range-Min").grid(row=0, column=0, sticky="e")
self.rmin = tk.StringVar(value="")
ttk.Entry(box_sup, textvariable=self.rmin, width=12).grid(row=0, column=1, sticky="w", padx=5)
ttk.Label(box_sup, text="Range-Max").grid(row=0, column=2, sticky="e")
self.rmax = tk.StringVar(value="")
ttk.Entry(box_sup, textvariable=self.rmax, width=12).grid(row=0, column=3, sticky="w", padx=5)
ttk.Label(box_sup, text="Min. Hit-Ratio (0..1)").grid(row=0, column=4, sticky="e")
self.min_hit = tk.DoubleVar(value=0.5)
ttk.Entry(box_sup, textvariable=self.min_hit, width=10).grid(row=0, column=5, sticky="w", padx=5)
self.allow_neg = tk.BooleanVar(value=False)
ttk.Checkbutton(box_sup, text="negative Scale erlauben", variable=self.allow_neg).grid(row=1, column=0, columnspan=2, sticky="w")
ttk.Label(box_sup, text="Rate-Min (Hz)").grid(row=1, column=2, sticky="e")
self.rate_min = tk.StringVar(value="")
ttk.Entry(box_sup, textvariable=self.rate_min, width=10).grid(row=1, column=3, sticky="w", padx=5)
ttk.Label(box_sup, text="Rate-Max (Hz)").grid(row=1, column=4, sticky="e")
self.rate_max = tk.StringVar(value="")
ttk.Entry(box_sup, textvariable=self.rate_max, width=10).grid(row=1, column=5, sticky="w", padx=5)
ttk.Label(box_sup, text="Jitter-Max (ms)").grid(row=2, column=0, sticky="e")
self.jitter_max = tk.StringVar(value="")
ttk.Entry(box_sup, textvariable=self.jitter_max, width=10).grid(row=2, column=1, sticky="w", padx=5)
ttk.Label(box_sup, text="Max-Slope-Abs (phys/s)").grid(row=2, column=2, sticky="e")
self.slope_abs = tk.StringVar(value="")
ttk.Entry(box_sup, textvariable=self.slope_abs, width=12).grid(row=2, column=3, sticky="w", padx=5)
ttk.Label(box_sup, text="Max-Slope-Frac (/s)").grid(row=2, column=4, sticky="e")
self.slope_frac = tk.StringVar(value="")
ttk.Entry(box_sup, textvariable=self.slope_frac, width=12).grid(row=2, column=5, sticky="w", padx=5)
ttk.Label(box_sup, text="Slope-Quantile").grid(row=3, column=0, sticky="e")
self.slope_q = tk.DoubleVar(value=0.95) # 0.95 oder 0.99
ttk.Entry(box_sup, textvariable=self.slope_q, width=10).grid(row=3, column=1, sticky="w", padx=5)
ttk.Label(box_sup, text="Min-Unique-Ratio").grid(row=3, column=2, sticky="e")
self.min_uniq = tk.StringVar(value="")
ttk.Entry(box_sup, textvariable=self.min_uniq, width=10).grid(row=3, column=3, sticky="w", padx=5)
# --- Unsupervised ---
box_uns = ttk.LabelFrame(frm_params, text="Unsupervised (ohne Range)")
box_uns.grid(row=1, column=0, columnspan=6, sticky="nsew", padx=5, pady=5)
for c in range(6):
box_uns.columnconfigure(c, weight=1)
ttk.Label(box_uns, text="Min. Smoothness (0..1)").grid(row=0, column=0, sticky="e")
self.min_smooth = tk.DoubleVar(value=0.2)
ttk.Entry(box_uns, textvariable=self.min_smooth, width=12).grid(row=0, column=1, sticky="w", padx=5)
ttk.Label(box_uns, text="Max-Slope-Frac-RAW (/s)").grid(row=0, column=2, sticky="e")
self.max_slope_frac_raw = tk.StringVar(value="")
ttk.Entry(box_uns, textvariable=self.max_slope_frac_raw, width=12).grid(row=0, column=3, sticky="w", padx=5)
# --- Allgemein/Output ---
box_out = ttk.LabelFrame(frm_params, text="Allgemein & Output")
box_out.grid(row=2, column=0, columnspan=6, sticky="nsew", padx=5, pady=5)
for c in range(6):
box_out.columnconfigure(c, weight=1)
self.rx_only = tk.BooleanVar(value=False)
ttk.Checkbutton(box_out, text="nur RX", variable=self.rx_only).grid(row=0, column=0, sticky="w")
ttk.Label(box_out, text="Plots Top-N").grid(row=0, column=1, sticky="e")
self.plots_top = tk.IntVar(value=8)
ttk.Entry(box_out, textvariable=self.plots_top, width=10).grid(row=0, column=2, sticky="w", padx=5)
ttk.Label(box_out, text="Output-Label").grid(row=0, column=3, sticky="e")
self.out_label = tk.StringVar(value="rangefit")
ttk.Entry(box_out, textvariable=self.out_label, width=18).grid(row=0, column=4, sticky="w", padx=5)
self.use_ts = tk.BooleanVar(value=True)
ttk.Checkbutton(box_out, text="Zeitstempel-Unterordner", variable=self.use_ts).grid(row=0, column=5, sticky="w")
# Start + Konsole + Aktionen
frm_run = ttk.Frame(self)
frm_run.grid(row=2, column=0, sticky="ew", padx=5, pady=5)
ttk.Button(frm_run, text="Start Range-/Unsupervised-Fit", command=self._on_run).pack(side="left", padx=5)
ttk.Button(frm_run, text="Report öffnen", command=self._open_last_report).pack(side="left", padx=5)
ttk.Button(frm_run, text="Output-Ordner öffnen", command=self._open_last_outdir).pack(side="left", padx=5)
frm_out = ttk.LabelFrame(self, text="Ausgabe")
frm_out.grid(row=3, column=0, sticky="nsew", padx=5, pady=5)
frm_out.columnconfigure(0, weight=1); frm_out.rowconfigure(0, weight=1)
self.txt = tk.Text(frm_out, height=14); self.txt.grid(row=0, column=0, sticky="nsew")
sbo = ttk.Scrollbar(frm_out, orient="vertical", command=self.txt.yview); sbo.grid(row=0, column=1, sticky="ns")
self.txt.configure(yscrollcommand=sbo.set)
# --- helpers ---
def _append(self, s):
self.txt.insert(tk.END, s); self.txt.see(tk.END)
def _stamp(self):
import datetime as _dt
return _dt.datetime.now().strftime("%Y%m%d_%H%M%S")
def _build_outdir(self, supervised: bool) -> Path:
out_root = self.state.analyze_out_root()
label = (self.out_label.get().strip() or ("rangefit" if supervised else "unsupervised"))
stamp = f"{self._stamp()}_{label}" if self.use_ts.get() else label
outdir = out_root / stamp
outdir.mkdir(parents=True, exist_ok=True)
self._last_outdir = outdir
return outdir
def _selected_trace(self):
sel = self.trace_panel.get_selected()
if not sel:
messagebox.showwarning("Hinweis", "Bitte genau eine .trace-Datei auswählen.")
return None
if len(sel) != 1:
messagebox.showwarning("Hinweis", "Range-Fit benötigt genau eine .trace-Datei (Single-Select).")
return None
return sel[0]
def _maybe(self, val: str, flag: str, args: list):
v = (val or "").strip()
if v != "":
args += [flag, v]
def _open_path(self, p: Path):
try:
if sys.platform.startswith("darwin"):
subprocess.Popen(["open", str(p)])
elif os.name == "nt":
os.startfile(str(p)) # type: ignore
else:
subprocess.Popen(["xdg-open", str(p)])
except Exception as e:
messagebox.showwarning("Fehler", f"Konnte nicht öffnen:\n{p}\n{e}")
def _open_last_outdir(self):
if self._last_outdir and self._last_outdir.exists():
self._open_path(self._last_outdir)
else:
messagebox.showinfo("Hinweis", "Noch kein Output-Ordner vorhanden.")
def _open_last_report(self):
if not (self._last_outdir and self._last_outdir.exists()):
messagebox.showinfo("Hinweis", "Noch kein Report erzeugt.")
return
# versuche ein *_report.md im letzten Outdir zu finden
md = list(Path(self._last_outdir).glob("*_report.md"))
if not md:
messagebox.showinfo("Hinweis", "Kein Report gefunden.")
return
self._open_path(md[0])
def _on_run(self):
trace = self._selected_trace()
if not trace:
return
# supervised?
rmin = self.rmin.get().strip()
rmax = self.rmax.get().strip()
supervised = bool(rmin) and bool(rmax)
outdir = self._build_outdir(supervised)
cmd = [
sys.executable,
str(Path(__file__).parent / RANGE_FITTER),
trace,
"--outdir", str(outdir),
"--plots-top", str(self.plots_top.get()),
]
if self.rx_only.get():
cmd.append("--rx-only")
if supervised:
cmd += ["--rmin", rmin, "--rmax", rmax, "--min-hit", str(self.min_hit.get())]
if self.allow_neg.get():
cmd.append("--allow-neg-scale")
self._maybe(self.rate_min.get(), "--rate-min", cmd)
self._maybe(self.rate_max.get(), "--rate-max", cmd)
self._maybe(self.jitter_max.get(), "--jitter-max-ms", cmd)
self._maybe(self.slope_abs.get(), "--max-slope-abs", cmd)
self._maybe(self.slope_frac.get(), "--max-slope-frac", cmd)
cmd += ["--slope-quantile", str(self.slope_q.get())]
self._maybe(self.min_uniq.get(), "--min-uniq-ratio", cmd)
else:
# unsupervised
cmd += ["--min-smooth", str(self.min_smooth.get())]
self._maybe(self.max_slope_frac_raw.get(), "--max-slope-frac-raw", cmd)
cmd += ["--slope-quantile", str(self.slope_q.get())] # wird intern für p95/p99 gewählt
self._append(f"\n>>> RUN: {' '.join(cmd)}\n")
t = threading.Thread(target=self._run_cmd, args=(cmd,), daemon=True)
t.start()
def _run_cmd(self, cmd):
try:
with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) as proc:
for line in proc.stdout:
self._append(line)
rc = proc.wait()
if rc != 0:
self._append(f"[Exit-Code {rc}]\n")
else:
self._append(f"\nDone. Output: {self._last_outdir}\n")
except Exception as e:
self._append(f"[Fehler] {e}\n")
# ---------------- App Shell ----------------
class App(tk.Tk):
def __init__(self):
super().__init__()
self.title("CAN Universal Signal Finder GUI")
self.geometry("1180x860")
self.configure(padx=8, pady=8)
# shared state
self.state = AppState()
# header (always visible)
self.header = Header(self, self.state)
self.header.pack(fill="x", side="top")
# Tabs
nb = ttk.Notebook(self)
nb.pack(fill="both", expand=True)
self.tab_analyze = TabAnalyze(nb, self.state, self.header)
self.tab_explorer = TabExplorer(nb, self.state, self.header)
self.tab_batch = TabTraceBatch(nb, self.state, self.header)
self.tab_rangefit = TabRangeFit(nb, self.state, self.header)
nb.add(self.tab_analyze, text="Multi-Log Analyse")
nb.add(self.tab_explorer, text="ID Explorer")
nb.add(self.tab_batch, text="Traces Batch-Analyse")
nb.add(self.tab_rangefit, text="Range-Fit")
# init: traces auf default/latest stellen
self.state.set_traces_to_default_or_latest()
if __name__ == "__main__":
app = App()
app.mainloop()