Source code for pySimBlocks.real_time.real_time_runner
# ******************************************************************************
# pySimBlocks
# Copyright (c) 2026 Université de Lille & INRIA
# ******************************************************************************
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
# for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# ******************************************************************************
# Authors: see Authors.txt
# ******************************************************************************
import time
import numpy as np
from typing import Any, Dict, List, Optional
from pySimBlocks.core.simulator import Simulator
[docs]
class RealTimeRunner:
"""Run a simulator step loop against a real-time clock.
The runner measures or accepts an external timestep, forwards input values
to model blocks, advances the simulator, and collects output values.
Attributes:
sim: Simulator instance driven by the runner.
input_blocks: Model blocks updated from external inputs at each tick.
output_blocks: Model blocks read to produce external outputs.
target_dt: Optional target period used for pacing.
"""
def __init__(
self,
sim: Simulator,
input_blocks: List[str],
output_blocks: List[str],
*,
target_dt: Optional[float] = None,
time_source: str = "perf_counter", # "perf_counter" | "time"
):
"""Initialize the real-time runner.
Args:
sim: Initialized simulator instance with a compiled model.
input_blocks: Names of model blocks that receive external inputs.
output_blocks: Names of model blocks that expose external outputs.
target_dt: Target loop period in seconds for optional pacing.
time_source: Clock source name, either ``"perf_counter"`` or
``"time"``.
Raises:
ValueError: If ``time_source`` is not supported.
"""
self.sim = sim
self.input_blocks = {block_name: sim.model.get_block_by_name(block_name) for block_name in input_blocks}
self.output_blocks = {block_name: sim.model.get_block_by_name(block_name) for block_name in output_blocks}
self.target_dt = target_dt
if time_source == "perf_counter":
self._now = time.perf_counter
elif time_source == "time":
self._now = time.time
else:
raise ValueError("time_source must be 'perf_counter' or 'time'")
self._t_prev: Optional[float] = None
# --- Public methods ---
[docs]
def initialize(self, t0: float = 0.0) -> None:
"""Initialize the simulator and synchronize the runner clock.
Args:
t0: Initial simulation time in seconds.
"""
self.sim.initialize(t0)
self._t_prev = self._now()
[docs]
def tick(
self,
inputs: Dict[str, Any],
*,
dt: Optional[float] = None,
pace: bool = False,
) -> Dict[str, np.ndarray]:
"""Execute one real-time simulation tick.
Args:
inputs: External input values keyed by block name.
dt: Explicit timestep override in seconds. If omitted, the runner
measures elapsed wall-clock time.
pace: If True, sleep after the step to approximate ``target_dt``.
Returns:
Output values keyed by block name as column vectors.
Raises:
KeyError: If a required input block value is missing.
RuntimeError: If an output block does not provide an ``"out"``
value.
"""
if self._t_prev is None:
self._t_prev = self._now()
t_now = self._now()
dt_meas = t_now - self._t_prev
dt_used = float(dt) if dt is not None else float(dt_meas)
# warning if dt is much larger than target_dt
if self.target_dt is not None and dt_used > 1.5 * self.target_dt:
print(f"[RealTimeRunner] Warning: dt={dt_used:.3f}s exceeds target_dt={self.target_dt:.3f}s")
# 1) push inputs
for block_name, block in self.input_blocks.items():
if block_name not in inputs:
raise KeyError(f"[RealTimeRunner] Missing input '{block_name}'")
block.inputs["in"] = inputs[block_name]
# 2) step with external dt
self.sim.step(dt_override=dt_used)
# 3) pull outputs
outputs: Dict[str, np.ndarray] = {}
for block_name, block in self.output_blocks.items():
y = block.outputs["out"]
if y is None:
raise RuntimeError(f"[RealTimeRunner] Output 'out' of block '{block_name}' is None")
outputs[block_name] = np.asarray(y, dtype=float).reshape(-1, 1)
# 4) bookkeeping + pacing
self._t_prev = t_now
if pace and self.target_dt is not None:
elapsed = self._now() - t_now
sleep_time = self.target_dt - elapsed
if sleep_time > 0:
time.sleep(sleep_time)
return outputs