# app/simulation/simulator.py from __future__ import annotations from dataclasses import dataclass, field from typing import Dict, Any, List, Optional, Tuple, Type import importlib, pkgutil, inspect, pathlib # ---------------------- Core: Vehicle + Accumulator-API ---------------------- @dataclass class Vehicle: """ State-/Config-Container + Dashboard-Registry + generische Frame-Akkumulatoren. Grundprinzip: - set(key, value): harter Setzer (eine Quelle „besitzt“ den Wert) - get/ensure: lesen/initialisieren - push(key, delta, source): additiv beitragen (Source/Sink über Vorzeichen) - acc_total(key): Summe aller Beiträge in diesem Frame - acc_breakdown(key): Beiträge je Quelle (Debug/Transparenz) - acc_reset(): zu Framebeginn alle Akkus löschen Konvention (Empfehlung, aber nicht erzwungen): * Positive Beiträge „belasten“ (z. B. Widerstandsmoment, Laststrom) * Negative Beiträge „speisen“ (z. B. Generator-Moment, Einspeisestrom) """ state: Dict[str, Any] = field(default_factory=dict) config: Dict[str, Any] = field(default_factory=dict) dtc: Dict[str, bool] = field(default_factory=dict) dashboard_specs: Dict[str, Dict[str, Any]] = field(default_factory=dict) # Accumulatoren: key -> {source_name: float} _acc: Dict[str, Dict[str, float]] = field(default_factory=dict) # ---- state helpers ---- def get(self, key: str, default: Any = None) -> Any: return self.state.get(key, default) def set(self, key: str, value: Any) -> None: self.state[key] = value def ensure(self, key: str, default: Any) -> Any: if key not in self.state: self.state[key] = default return self.state[key] # ---- dashboard helpers ---- def register_metric( self, key: str, *, label: Optional[str] = None, unit: Optional[str] = None, fmt: Optional[str] = None, source: Optional[str] = None, priority: int = 100, overwrite: bool = False, ) -> None: spec = self.dashboard_specs.get(key) if spec and not overwrite: if label and not spec.get("label"): spec["label"] = label if unit and not spec.get("unit"): spec["unit"] = unit if fmt and not spec.get("fmt"): spec["fmt"] = fmt if source and not spec.get("source"): spec["source"] = source if spec.get("priority") is None: spec["priority"] = priority return self.dashboard_specs[key] = { "key": key, "label": label or key, "unit": unit, "fmt": fmt, "source": source, "priority": priority, } def dashboard_snapshot(self) -> Dict[str, Any]: return {"specs": dict(self.dashboard_specs), "values": dict(self.state)} def snapshot(self) -> Dict[str, Any]: return dict(self.state) # ---- generic accumulators (per-frame) ---- def acc_reset(self) -> None: self._acc.clear() def push(self, key: str, delta: float, source: Optional[str] = None) -> None: """ Additiver Beitrag zu einer Größe. Vorzeichen: + belastet / - speist (Empfehlung). """ src = source or "anon" bucket = self._acc.setdefault(key, {}) bucket[src] = bucket.get(src, 0.0) + float(delta) def acc_total(self, key: str) -> float: bucket = self._acc.get(key) if not bucket: return 0.0 return sum(bucket.values()) def acc_breakdown(self, key: str) -> Dict[str, float]: return dict(self._acc.get(key, {})) # ---- Backwards-compat convenience for your current Basic code ---- def elec_reset_frame(self) -> None: # map legacy helpers auf generisches System # loads + sources werden in einem Kanal gesammelt # (loads positiv, sources negativ) # Diese Methode ist mittlerweile redundant, acc_reset() macht alles. pass def elec_add_load(self, name: str, amps: float) -> None: self.push("elec.current", +max(0.0, float(amps)), source=name) def elec_add_source(self, name: str, amps: float) -> None: self.push("elec.current", -max(0.0, float(amps)), source=name) def elec_totals(self) -> Tuple[float, float]: """ Gibt (loads_a_positiv, sources_a_positiv) zurück. Intern liegt alles algebraisch in 'elec.current'. """ bd = self.acc_breakdown("elec.current") loads = sum(v for v in bd.values() if v > 0) sources = sum(-v for v in bd.values() if v < 0) return (loads, sources) # ---------------------------- Module Base + Loader ---------------------------- class Module: """ Basisklasse für alle Module. Jedes Modul: - deklariert PRIO (klein = früher) - hat NAME (für Debug/Registry) - implementiert apply(v, dt) """ PRIO: int = 100 NAME: str = "module" def apply(self, v: Vehicle, dt: float) -> None: raise NotImplementedError def _discover_modules(pkg_name: str = "app.simulation.modules") -> List[Module]: """ Sucht in app/simulation/modules nach Klassen, die Module erben, instanziert sie und sortiert nach PRIO. """ mods: List[Module] = [] try: pkg = importlib.import_module(pkg_name) except Exception as exc: raise RuntimeError(f"Module package '{pkg_name}' konnte nicht geladen werden: {exc}") pkg_path = pathlib.Path(pkg.__file__).parent for _, modname, ispkg in pkgutil.iter_modules([str(pkg_path)]): if ispkg: # optional: auch Subpackages zulassen continue full_name = f"{pkg_name}.{modname}" try: m = importlib.import_module(full_name) except Exception as exc: print(f"[loader] Fehler beim Import {full_name}: {exc}") continue for _, obj in inspect.getmembers(m, inspect.isclass): if not issubclass(obj, Module): continue if obj is Module: continue try: inst = obj() # Module ohne args except Exception as exc: print(f"[loader] Kann {obj.__name__} nicht instanziieren: {exc}") continue mods.append(inst) # sortieren nach PRIO; bei Gleichstand NAME als Tie-Break mods.sort(key=lambda x: (getattr(x, "PRIO", 100), getattr(x, "NAME", x.__class__.__name__))) return mods # ------------------------------- Simulator API -------------------------------- class VehicleSimulator: """ Öffentliche Fassade für GUI/Tests. Lädt Module dynamisch, führt sie pro Tick in PRIO-Reihenfolge aus. """ def __init__(self, modules_package: str = "app.simulation.modules"): self.v = Vehicle() self.modules: List[Module] = _discover_modules(modules_package) def update(self, dt: float) -> None: # pro Frame alle Akkumulatoren leeren self.v.acc_reset() for m in self.modules: try: m.apply(self.v, dt) except Exception as exc: print(f"[sim] Modul {getattr(m, 'NAME', m.__class__.__name__)} Fehler: {exc}") # Kompatible Hilfsfunktionen für GUI def snapshot(self) -> Dict[str, Any]: return self.v.snapshot() def load_config(self, cfg: Dict[str, Any]) -> None: # Namespaced-Merge; Keys bleiben modul-spezifisch for k, sub in cfg.items(): self.v.config.setdefault(k, {}).update(sub if isinstance(sub, dict) else {}) if "dtc" in cfg: self.v.dtc.update(cfg["dtc"]) def export_config(self) -> Dict[str, Any]: return {ns: dict(data) for ns, data in self.v.config.items()} | {"dtc": dict(self.v.dtc)} # für alte GUI-Knöpfe def set_gear(self, g: int) -> None: self.v.set("gear", max(0, min(10, int(g)))) def set_throttle(self, t: int) -> None: self.v.set("throttle_pct", max(0, min(100, int(t)))) # falls noch genutzt