Source code for pySimBlocks.core.simulator

# ******************************************************************************
#                                  pySimBlocks
#                     Copyright (c) 2026 Université de Lille & INRIA
# ******************************************************************************
#  This program is free software: you can redistribute it and/or modify it
#  under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or (at your
#  option) any later version.
#
#  This program is distributed in the hope that it will be useful, but WITHOUT
#  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
#  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License
#  for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with this program.  If not, see <https://www.gnu.org/licenses/>.
# ******************************************************************************
#  Authors: see Authors.txt
# ******************************************************************************

from typing import Dict, List

import numpy as np

from pySimBlocks.core.block import Block
from pySimBlocks.core.config import SimulationConfig
from pySimBlocks.core.fixed_time_manager import FixedStepTimeManager
from pySimBlocks.core.model import Model
from pySimBlocks.core.scheduler import Scheduler
from pySimBlocks.core.task import Task


[docs] class Simulator: """Discrete-time simulator with strict Simulink-like semantics. Each simulation step follows four phases: 1. **output_update** — blocks compute y[k] from x[k] and u[k]. 2. **Propagate** — outputs are forwarded to downstream inputs. 3. **state_update** — blocks compute x[k+1] from x[k] and u[k]. 4. **Commit** — x[k+1] is copied into x[k]. This guarantees proper separation of outputs and state transitions, correct causal behavior for feedback loops, and algebraic loop detection through the model's topological ordering. Attributes: model: The block-diagram model to simulate. sim_cfg: Simulation execution configuration. verbose: If True, print step-by-step execution logs. logs: Logged signal values keyed by variable name. """ def __init__( self, model: Model, sim_cfg: SimulationConfig, verbose: bool = False, ): """Initialize and compile the simulator. Args: model: The block-diagram model to simulate. sim_cfg: Simulation execution configuration. verbose: If True, print execution logs. """ self.model = model self.sim_cfg = sim_cfg self.verbose = verbose self.model.verbose = verbose self.sim_cfg.validate() self._compile() self.logs: Dict[str, List[np.ndarray]] = {"time": []} # -------------------------------------------------------------------------- # Public methods # --------------------------------------------------------------------------
[docs] def initialize(self, t0: float = 0.0) -> None: """Initialize all blocks and propagate initial outputs. Args: t0: Initial simulation time in seconds. Raises: RuntimeError: If any block raises during initialization. """ self.t = float(t0) self.t_step = float(t0) self.logs = {"time": []} self._log_shapes: Dict[str, tuple[int, int]] = {} for block in self.output_order: try: block.initialize(self.t) self._propagate_from(block) except Exception as e: raise RuntimeError( f"Error during initialization of block '{block.name}': {e}" ) from e for task in self.tasks: task.update_state_blocks()
[docs] def step(self, dt_override: float | None = None) -> None: """Perform one simulation step. With an internal clock, dt is provided by the time manager. With an external clock, dt_override must be supplied by the caller. Args: dt_override: Time step in seconds, required when using an external clock. Must not be provided for an internal clock. Raises: RuntimeError: If dt_override is missing for an external clock, or provided for an internal clock. ValueError: If dt_override is not strictly positive. """ # 0) Choose dt for this tick if self.sim_cfg.clock == "external": if dt_override is None: raise RuntimeError( "[Simulator] dt_override must be provided when using external clock." ) dt_scheduler = float(dt_override) if dt_scheduler <= 0.0: raise ValueError(f"[Simulator] dt_override must be > 0. Got {dt_scheduler}") else: if dt_override is not None: raise RuntimeError( "[Simulator] dt_override should not be provided when using internal clock." ) dt_scheduler = self.time_manager.next_dt(self.t) # 1) Accumulate dt for all tasks for task in self.tasks: task.accumulate(dt_scheduler) active_tasks = self.scheduler.active_tasks() # PHASE 1 — outputs for task in active_tasks: dt_task = task.accumulated_dt for block in task.output_blocks: block.output_update(self.t, dt_task) self._propagate_from(block) # PHASE 2 — states for task in active_tasks: dt_task = task.accumulated_dt for block in task.state_blocks: block.state_update(self.t, dt_task) # PHASE 3 — commit states for task in active_tasks: for block in task.state_blocks: block.commit_state() # 4) Advance all countdowns and reset accumulated dt for active tasks self.scheduler.tick() for task in active_tasks: task.reset_accumulated_dt() self.t_step = self.t self.t += dt_scheduler
[docs] def run( self, T: float | None = None, t0: float | None = None, logging: list[str] | None = None, ) -> Dict[str, List[np.ndarray]]: """Run the simulation from t0 to T. Falls back to sim_cfg values for any argument not provided. Args: T: Simulation end time in seconds. t0: Simulation start time in seconds. logging: List of variable names to log (e.g. ``"BlockName.outputs.port"``). Returns: Dict mapping variable names to their logged values over time. Raises: RuntimeError: If called with an external clock configuration. """ if self.sim_cfg.clock == "external": raise RuntimeError("Simulator.run() is not supported with external clock. Use step(dt_override=...)") sim_duration = T if T is not None else self.sim_cfg.T t0_run = t0 if t0 is not None else self.sim_cfg.t0 logging_run = logging if logging is not None else self.sim_cfg.logging self.initialize(t0_run) eps = 1e-12 while self.t_step < sim_duration - eps: self.step() self._log(logging_run) if self.verbose: print(f"\nTime: {self.t_step}/{sim_duration}") for variable in logging_run: print(f"{variable}: {self.logs[variable][-1]}") for block in self.model.blocks.values(): try: block.finalize() except Exception as e: print(f"[WARNING] finalize() failed for block {block.name}: {e}") return self.logs
[docs] def get_data(self, variable: str | None = None, block:str | None = None, port: str | None = None) -> np.ndarray: """Retrieve logged data for a variable as a NumPy array. Provide either variable or the (block, port) pair: - ``variable``: full log key, e.g. ``"BlockName.outputs.port"``. - ``block`` + ``port``: shorthand for ``"block.outputs.port"``. Args: variable: Full variable name as logged. block: Block name (used with port). port: Output port name (used with block). Returns: Array of shape ``(n_steps, *signal_shape)`` containing the logged values. Raises: ValueError: If neither variable nor (block, port) is provided, if the variable is not found in logs, or if the log is empty or cannot be converted to a NumPy array. """ if variable is not None: var_name = variable elif block is not None and port is not None: var_name = f"{block}.outputs.{port}" else: raise ValueError("Either variable or (block, port) must be provided.") if var_name not in self.logs: raise ValueError(f"Variable '{var_name}' is not logged. Available logs: {list(self.logs.keys())}") data = self.logs.get(var_name) if data is None: raise ValueError(f"No data found for variable '{var_name}'.") length = len(data) if length == 0: raise ValueError(f"Log for variable '{var_name}' is empty.") shape = data[0].shape try: data_array = np.array(data).reshape(length, *shape) except Exception as e: raise ValueError(f"Failed to convert log data for variable '{var_name}' to numpy array: {e}") from e return data_array
# -------------------------------------------------------------------------- # Private methods # -------------------------------------------------------------------------- def _compile(self) -> None: """Prepare the simulator for execution. Builds execution order, groups blocks into tasks by sample time, and initializes the scheduler and time manager. Raises: NotImplementedError: If solver is ``"variable"``. ValueError: If solver is unknown. """ self.output_order = self.model.build_execution_order() self.model.resolve_sample_times(self.sim_cfg.dt) self.model._rebuild_downstream_map() sample_times = [b._effective_sample_time for b in self.model.blocks.values()] tasks_by_ts = {} for b in self.model.blocks.values(): sample_time = b._effective_sample_time tasks_by_ts.setdefault(sample_time, []).append(b) self.tasks = [ Task(sample_time, period_ticks=round(sample_time / self.sim_cfg.dt), blocks=blocks, global_output_order=self.output_order) for sample_time, blocks in tasks_by_ts.items() ] self.scheduler = Scheduler(self.tasks) if self.sim_cfg.solver == "fixed": self.time_manager = FixedStepTimeManager( dt_base=self.sim_cfg.dt, sample_times=list(set(sample_times)) ) elif self.sim_cfg.solver == "variable": raise NotImplementedError( "Variable-step simulation is not implemented yet." ) else: raise ValueError( f"Unknown simulation mode '{self.sim_cfg.solver}'. " "Supported modes are: 'fixed', 'variable'." ) def _propagate_from(self, block: Block) -> None: """Forward outputs of block to its direct downstream inputs.""" blocks = self.model.blocks for (src, dst) in self.model.downstream_of(block.name): _, src_port = src dst_block, dst_port = dst value = block.outputs[src_port] if value is not None: blocks[dst_block].inputs[dst_port] = value def _log(self, variables_to_log: List[str]) -> None: """Log specified variables at the current timestep. Raises: ValueError: If a variable format is invalid or the container is unknown. RuntimeError: If a logged value is None, not 2D, or changes shape across timesteps. """ for var in variables_to_log: block_name, container, key = var.split(".") block = self.model.blocks[block_name] if container == "outputs": value = block.outputs[key] elif container == "state": value = block.state[key] else: raise ValueError(f"Unknown container '{container}' in '{var}'.") if value is None: raise RuntimeError( f"[Simulator] Cannot log '{var}' at t={self.t_step}: value is None." ) arr = np.asarray(value) if arr.ndim != 2: raise RuntimeError( f"[Simulator] Cannot log '{var}' at t={self.t_step}: expected a 2D array, " f"got ndim={arr.ndim} with shape {arr.shape}." ) if var not in self._log_shapes: self._log_shapes[var] = arr.shape else: expected_shape = self._log_shapes[var] if arr.shape != expected_shape: raise RuntimeError( f"[Simulator] Logged signal '{var}' changed shape over time at t={self.t_step}: " f"expected {expected_shape}, got {arr.shape}." ) if var not in self.logs: self.logs[var] = [] self.logs[var].append(np.copy(arr)) self.logs["time"].append(np.array([self.t_step]))