# app/simulation/simulator.py from __future__ import annotations from dataclasses import dataclass, field from typing import Dict, Any, List, Optional import importlib, pkgutil, inspect, pathlib # ---------------------- Core: Vehicle + Accumulator-API ---------------------- @dataclass class Vehicle: """ State-/Config-Container + Dashboard-Registry + generische Frame-Akkumulatoren. - set/get/ensure: harte Zustandswerte - push(key, delta, source): additiver Beitrag pro Frame (Source/Sink via Vorzeichen) - acc_total(key): Summe aller Beiträge zu 'key' - acc_breakdown(key): Beiträge je Quelle (Debug/Transparenz) - acc_reset(): am Frame-Beginn alle Akkus löschen """ state: Dict[str, Any] = field(default_factory=dict) config: Dict[str, Any] = field(default_factory=dict) dtc: Dict[str, bool] = field(default_factory=dict) dashboard_specs: Dict[str, Dict[str, Any]] = field(default_factory=dict) # Accumulator: key -> {source_name: float} _acc: Dict[str, Dict[str, float]] = field(default_factory=dict) # ---- state helpers ---- def get(self, key: str, default: Any = None) -> Any: return self.state.get(key, default) def set(self, key: str, value: Any) -> None: self.state[key] = value def ensure(self, key: str, default: Any) -> Any: if key not in self.state: self.state[key] = default return self.state[key] # ---- dashboard helpers ---- def register_metric( self, key: str, *, label: Optional[str] = None, unit: Optional[str] = None, fmt: Optional[str] = None, source: Optional[str] = None, priority: int = 100, overwrite: bool = False, ) -> None: spec = self.dashboard_specs.get(key) if spec and not overwrite: if label and not spec.get("label"): spec["label"] = label if unit and not spec.get("unit"): spec["unit"] = unit if fmt and not spec.get("fmt"): spec["fmt"] = fmt if source and not spec.get("source"): spec["source"] = source if spec.get("priority") is None: spec["priority"] = priority return self.dashboard_specs[key] = { "key": key, "label": label or key, "unit": unit, "fmt": fmt, "source": source, "priority": priority, } def dashboard_snapshot(self) -> Dict[str, Any]: return {"specs": dict(self.dashboard_specs), "values": dict(self.state)} def snapshot(self) -> Dict[str, Any]: return dict(self.state) # ---- generic accumulators (per frame) ---- def acc_reset(self) -> None: self._acc.clear() def push(self, key: str, delta: float, source: Optional[str] = None) -> None: src = source or "anon" bucket = self._acc.setdefault(key, {}) bucket[src] = bucket.get(src, 0.0) + float(delta) def acc_total(self, key: str) -> float: bucket = self._acc.get(key) return 0.0 if not bucket else sum(bucket.values()) def acc_breakdown(self, key: str) -> Dict[str, float]: return dict(self._acc.get(key, {})) # ---------------------------- Module Base + Loader ---------------------------- class Module: PRIO: int = 100 NAME: str = "module" def apply(self, v: Vehicle, dt: float) -> None: raise NotImplementedError def _discover_modules(pkg_name: str = "app.simulation.modules") -> List[Module]: mods: List[Module] = [] pkg = importlib.import_module(pkg_name) pkg_path = pathlib.Path(pkg.__file__).parent for _, modname, ispkg in pkgutil.iter_modules([str(pkg_path)]): if ispkg: continue full_name = f"{pkg_name}.{modname}" try: m = importlib.import_module(full_name) except Exception as exc: print(f"[loader] Fehler beim Import {full_name}: {exc}") continue for _, obj in inspect.getmembers(m, inspect.isclass): if obj is Module or not issubclass(obj, Module): continue try: inst = obj() except Exception as exc: print(f"[loader] Kann {obj.__name__} nicht instanziieren: {exc}") continue mods.append(inst) mods.sort(key=lambda x: (getattr(x, "PRIO", 100), getattr(x, "NAME", x.__class__.__name__))) return mods # ------------------------------- Simulator API -------------------------------- class VehicleSimulator: """Lädt Module dynamisch, führt sie pro Tick in PRIO-Reihenfolge aus.""" def __init__(self, modules_package: str = "app.simulation.modules"): self.v = Vehicle() self.modules: List[Module] = _discover_modules(modules_package) self.module_defaults: Dict[str, Dict[str, Any]] = {} for m in self.modules: ns = getattr(m, "NAME", "").lower() or m.__class__.__name__.lower() mod = importlib.import_module(m.__class__.__module__) # Konvention: UPPER(NAME) + _DEFAULTS key = f"{ns.upper()}_DEFAULTS" defaults = getattr(mod, key, None) if isinstance(defaults, dict): self.module_defaults[ns] = dict(defaults) def update(self, dt: float) -> None: self.v.acc_reset() # pro Frame Akkus leeren for m in self.modules: try: m.apply(self.v, dt) except Exception as exc: print(f"[sim] Modul {getattr(m, 'NAME', m.__class__.__name__)} Fehler: {exc}") def snapshot(self) -> Dict[str, Any]: return self.v.snapshot() def load_config(self, cfg: Dict[str, Any]) -> None: for k, sub in cfg.items(): self.v.config.setdefault(k, {}).update(sub if isinstance(sub, dict) else {}) if "dtc" in cfg: self.v.dtc.update(cfg["dtc"]) def export_config(self) -> Dict[str, Any]: """ Exportiert einen *vollständigen* Snapshot: - Modul-Defaults + Overrides (so fehlen keine Keys) - alle übrigen Namespaces unverändert - DTC separat """ out: Dict[str, Any] = {} # 1) Modul-Namespaces: Defaults + Overrides mergen for ns, defs in self.module_defaults.items(): merged = dict(defs) merged.update(self.v.config.get(ns, {})) out[ns] = merged # 2) übrige Namespaces (ohne bekannte Modul-Defaults) 1:1 übernehmen for ns, data in self.v.config.items(): if ns not in out: out[ns] = dict(data) # 3) DTC anhängen out["dtc"] = dict(self.v.dtc) return out # Falls noch benutzt: def set_gear(self, g: int) -> None: self.v.set("gear", max(0, min(10, int(g)))) def set_throttle(self, t: int) -> None: self.v.set("throttle_pct", max(0, min(100, int(t))))