# ******************************************************************************
# pySimBlocks
# Copyright (c) 2026 Université de Lille & INRIA
# ******************************************************************************
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
# for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# ******************************************************************************
# Authors: see Authors.txt
# ******************************************************************************
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, List
import numpy as np
import Sofa
from pySimBlocks import Model, Simulator
from pySimBlocks.project.load_project_config import load_project_config
from pySimBlocks.project.build_model import build_model_from_dict
try:
import Sofa.ImGui as MyGui
_imgui = hasattr(MyGui, "MyRobotWindow")
except ImportError:
_imgui = False
[docs]
class SofaPysimBlocksController(Sofa.Core.Controller):
"""Base SOFA controller class bridging the SOFA simulation loop and pySimBlocks.
Supports two operating modes:
**SOFA_MASTER** (``SOFA_MASTER=True``): SOFA drives the time loop. A
``project_yaml`` must be provided. At each pySimBlocks step the
controller reads SOFA outputs, runs one pySimBlocks step, and applies
the resulting inputs back to SOFA.
**pySimBlocks master** (``SOFA_MASTER=False``): pySimBlocks drives the
time loop. The controller acts as a pure I/O shell — no model is built
or executed internally.
Subclasses must implement :meth:`set_inputs` and :meth:`get_outputs`.
Attributes:
IS_READY: Set to True by :meth:`prepare_scene` when the scene is
ready to start the control loop.
SOFA_MASTER: If True, SOFA is the time master.
inputs: Dict of input signals written by :meth:`set_inputs`.
outputs: Dict of output signals populated by :meth:`get_outputs`.
dt: SOFA simulation time step in seconds. Set automatically at runtime.
project_yaml: Path to the pySimBlocks YAML project file.
sim: The pySimBlocks :class:`Simulator` instance, or None.
variables_to_log: List of signal names to log at each step.
node: The SOFA node to which the controller is attached, set automatically at runtime.
step_index: Total number of SOFA animation steps executed.
verbose: If True, print logged variables at each control step.
"""
def __init__(self, name: str = "SofaControllerGui"):
"""Initialize the SOFA–pySimBlocks controller.
Args:
name: Name passed to the SOFA controller base class.
"""
super().__init__(name=name)
self.IS_READY = False
self.SOFA_MASTER = True
self._imgui = _imgui
self.inputs: Dict[str, np.ndarray] = {}
self.outputs: Dict[str, np.ndarray] = {}
self.variables_to_log: List[str] = []
self.verbose = False
self.node: Sofa.Core.Node | None = None
self.dt: float | None = None
self.sim: Simulator | None = None
self.step_index: int = 0
self.project_yaml: str | None = None
self._init_failed = False
# --------------------------------------------------------------------------
# Public methods
# --------------------------------------------------------------------------
[docs]
def prepare_scene(self) -> None:
"""Optional hook executed before the pySimBlocks control loop starts.
Override this method to wait for a preparation condition (e.g. a
fixed number of warm-up steps or scene stabilization). Set
``self.IS_READY = True`` when the scene is ready. The default
implementation sets ``IS_READY`` immediately.
"""
self.IS_READY = True
[docs]
def get_outputs(self) -> None:
"""Read state from SOFA components and populate ``self.outputs``.
Must always succeed and return consistent shapes across calls.
Raises:
NotImplementedError: Always — must be implemented by subclasses.
"""
raise NotImplementedError("[pySimBlocks] ERROR: get_outputs() must be implemented by subclass.")
[docs]
def save(self) -> None:
"""Optional hook executed at each control step.
Override to save logs or export custom data. The default
implementation does nothing.
"""
[docs]
def get_block(self, block_name: str):
"""Return a block from the pySimBlocks model by name.
Args:
block_name: Name of the block to retrieve.
Returns:
The block instance with the specified name.
Raises:
RuntimeError: If the simulator is not initialized or if the
block is not found in the model.
"""
if self.sim is None:
raise RuntimeError("[pySimBlocks] ERROR: Simulator not initialized. Cannot get block.")
if block_name not in self.sim.model.blocks:
raise RuntimeError(f"[pySimBlocks] ERROR: Block '{block_name}' not found in the model.")
return self.sim.model.blocks[block_name]
[docs]
def onAnimateBeginEvent(self, event) -> None:
"""SOFA callback executed before each physical integration step.
When ``SOFA_MASTER=True``, runs the following sequence at each
pySimBlocks step:
1. Read SOFA outputs via :meth:`get_outputs`.
2. Push them into the exchange block.
3. Advance pySimBlocks one step.
4. Retrieve controller inputs from the exchange block.
5. Apply them to SOFA via :meth:`set_inputs`.
Args:
event: SOFA animation event (unused).
"""
if self.SOFA_MASTER:
if self._init_failed:
return
if self.sim is None:
self._init_sofa_context()
self._prepare_pysimblocks()
self._get_sofa_outputs()
self._set_sofa_plot()
self._set_sofa_slider()
if not self.IS_READY:
self.prepare_scene()
if self.IS_READY:
if self.counter % self.ratio == 0:
self._get_sofa_outputs()
self.sim.step()
self.sim._log(self.sim_cfg.logging)
self._set_sofa_inputs()
if self.verbose:
self._print_logs()
self.save()
self._update_sofa_slider()
self._update_sofa_plot()
self.sim_index += 1
self.counter = 0
self.counter += 1
self.step_index += 1
# --------------------------------------------------------------------------
# Private methods
# --------------------------------------------------------------------------
def _init_sofa_context(self):
"""Resolve the SOFA node and dt from the controller context."""
node = self.getContext()
if not isinstance(node, Sofa.Core.Node):
self._init_failed = True
raise RuntimeError("[pySimBlocks] ERROR: Controller context is not a SOFA node.")
self.node = node
try:
self.dt = float(self.node.getRootContext().dt.value)
except AttributeError:
self._init_failed = True
raise RuntimeError("[pySimBlocks] ERROR: Could not read SOFA time step (dt).")
def _build_model(self) -> None:
"""Load the pySimBlocks model from ``project_yaml``."""
project_path = self.project_yaml
if project_path is None:
raise RuntimeError("[pySimBlocks] ERROR: SOFA_MASTER=True requires project_yaml to be set.")
self.sim_cfg, model_dict, self.plot_cfg, _, params_dir = load_project_config(project_path)
model_dict = self._adapt_model_for_sofa(model_dict)
self.model = Model("sofa_model")
build_model_from_dict(self.model, model_dict, params_dir=params_dir)
def _prepare_pysimblocks(self) -> None:
"""Initialize the pySimBlocks simulator once SOFA is ready."""
try:
if self.SOFA_MASTER and self.project_yaml is None:
self._init_failed = True
raise RuntimeError("[pySimBlocks] ERROR: SOFA_MASTER=True requires project_yaml.")
self._build_model()
self._detect_sofa_exchange_block()
self._secure_keys()
self.sim = Simulator(self.model, self.sim_cfg, verbose=self.verbose)
self._get_sofa_outputs()
self.sim.initialize()
self.sim_index = 0
ratio = self.sim_cfg.dt / self.dt
if abs(ratio - round(ratio)) > 1e-12:
self._init_failed = True
raise ValueError(
"[pySimBlocks] ERROR: Sample time mismatch.\n"
f"pySimBlocks sample time={self.sim_cfg.dt} "
f"is not a multiple of Sofa sample time={self.dt}."
)
self.ratio = int(round(ratio))
self.counter = 0
except Exception as e:
self._init_failed = True
raise
def _secure_keys(self) -> None:
"""Validate that model port keys are a subset of SOFA controller keys."""
model_inputs_keys = set(self._sofa_block.inputs.keys())
sofa_inputs_keys = set(self.inputs.keys())
if not model_inputs_keys.issubset(sofa_inputs_keys):
self._init_failed = True
raise RuntimeError(
"[pySimBlocks] ERROR: model input_keys are missing from controller inputs.\n"
f"SOFA controller inputs: {sofa_inputs_keys}\n"
f"Model block inputs: {model_inputs_keys}\n"
f"Ensure that the controller in the SOFA block contains at least the same input keys as the SofaExchangeIO block."
)
model_outputs_keys = set(self._sofa_block.outputs.keys())
sofa_outputs_keys = set(self.outputs.keys())
if not model_outputs_keys.issubset(sofa_outputs_keys):
self._init_failed = True
raise RuntimeError(
"[pySimBlocks] ERROR: model output_keys are missing from controller outputs.\n"
f"SOFA controller outputs: {sofa_outputs_keys}\n"
f"Model block outputs: {model_outputs_keys}\n"
f"Ensure that the controller in the SOFA block contains at least the same output keys as the SofaExchangeIO block."
)
def _print_logs(self) -> None:
"""Print selected logged variables at the current control step."""
print(f"\nStep: {self.sim_index}")
for variable in self.sim_cfg.logging:
print(f"{variable}: {self.sim.logs[variable][-1]}")
def _detect_sofa_exchange_block(self) -> None:
"""Find the unique SofaExchangeIO block inside the model."""
from pySimBlocks.blocks.systems.sofa.sofa_exchange_i_o import SofaExchangeIO
candidates = [blk for blk in self.model.blocks.values() if isinstance(blk, SofaExchangeIO)]
if len(candidates) == 0:
self._init_failed = True
raise RuntimeError(
"[pySimBlocks] ERROR: No SofaExchangeIO block found in the model.\n"
"The controller must include exactly one SOFA exchange block."
)
if len(candidates) > 1:
self._init_failed = True
raise RuntimeError(
"[pySimBlocks] ERROR: Multiple SofaExchangeIO blocks found ({len(candidates)}).\n"
"Only one SOFA IO block is allowed."
)
self._sofa_block = candidates[0]
def _get_sofa_outputs(self) -> None:
"""Read SOFA outputs and push them into the exchange block."""
self.get_outputs()
for keys, val in self.outputs.items():
self._sofa_block.outputs[keys] = val
def _set_sofa_inputs(self) -> None:
"""Pull inputs from the exchange block and apply them to SOFA."""
for key, val in self._sofa_block.inputs.items():
self.inputs[key] = val
self.set_inputs()
def _set_sofa_plot(self) -> None:
"""Set up ImGui plotting nodes for the configured signals."""
if not self._imgui:
return
if self.sim is None:
raise RuntimeError("[pySimBlocks] ERROR: Simulator not initialized.")
self._plot_node = self.node.addChild("PLOT")
self._plot_data = {}
for plot in self.plot_cfg.plots:
for var in plot["signals"]:
block_name, _, key = var.split(".")
block = self.get_block(block_name)
self._plot_data[f"{block_name}.{key}"] = self._plot_node.addChild(f"{block_name}_{key}")
value = block.outputs[key].flatten()
for i in range(len(value)):
self._plot_data[f"{block_name}.{key}"].addData(name=f"value{i}", type="float", value=value[i])
MyGui.PlottingWindow.addData(f"{block_name}.{key}[{i}]", self._plot_data[f"{block_name}.{key}"].getData(f"value{i}"))
def _update_sofa_plot(self) -> None:
"""Update ImGui plot values for the configured signals."""
if not self._imgui:
return
for name, node in self._plot_data.items():
block_name, key = name.split(".")
block = self.get_block(block_name)
value = block.outputs[key].flatten()
for i in range(len(value)):
node.getData(f"value{i}").value = float(value[i])
def _set_sofa_slider(self) -> None:
"""Set up ImGui slider nodes for the configured block attributes."""
if not self._imgui:
return
if self.sim is None:
raise RuntimeError("[pySimBlocks] ERROR: Simulator not initialized.")
data = self._sofa_block.slider_params
data = data if data is not None else {}
self._slider_node = self.node.addChild("SLIDERS")
self._slider_data = {}
for var, extremum in data.items():
block_name, key = var.split(".")
node = self._slider_node.addChild(f"{block_name}_{key}")
block = self.get_block(block_name)
value = getattr(block, key)
self._slider_data[f"{block_name}.{key}"] = {"node": node, "shape": value.shape}
value = value.flatten()
for i in range(len(value)):
d = node.addData(name=f"value{i}", type="float", value=value[i])
MyGui.MyRobotWindow.addSettingInGroup(f"{key}[{i}]", d, extremum[0], extremum[1], f"{block_name}")
def _update_sofa_slider(self) -> None:
"""Read ImGui slider values and apply them to the corresponding block attributes."""
if not self._imgui:
return
for var in self._slider_data:
block_name, key = var.split(".")
block = self.get_block(block_name)
node = self._slider_data[var]["node"]
shape = self._slider_data[var]["shape"]
new_values = []
for i in range(np.prod(shape)):
new_values.append(node.getData(f"value{i}").value)
setattr(block, key, np.array(new_values).reshape(shape))
def _adapt_model_for_sofa(self, model_data: Dict[str, Any]) -> Dict[str, Any]:
"""Replace any SofaPlant block with a SofaExchangeIO block.
Preserves block names and connections so the model topology is
unchanged. Used when SOFA itself runs the simulation and the plant
block is not needed.
Args:
model_data: Model dictionary loaded from the YAML project file.
Returns:
Adapted model dictionary with SofaPlant replaced by SofaExchangeIO.
"""
adapted = dict(model_data)
adapted_blocks = []
for block in model_data.get("blocks", []):
if not isinstance(block, dict):
adapted_blocks.append(block)
continue
block_type = str(block.get("type", "")).lower()
if block_type == "sofa_plant":
patched = dict(block).copy()
patched["type"] = "sofa_exchange_i_o"
patched.pop("scene_file", None)
adapted_blocks.append(patched)
else:
adapted_blocks.append(block)
adapted["blocks"] = adapted_blocks
return adapted