Source code for pySimBlocks.blocks.systems.sofa.sofa_plant

# ******************************************************************************
#                                  pySimBlocks
#                     Copyright (c) 2026 Université de Lille & INRIA
# ******************************************************************************
#  This program is free software: you can redistribute it and/or modify it
#  under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or (at your
#  option) any later version.
#
#  This program is distributed in the hope that it will be useful, but WITHOUT
#  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
#  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License
#  for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with this program.  If not, see <https://www.gnu.org/licenses/>.
# ******************************************************************************
#  Authors: see Authors.txt
# ******************************************************************************

from __future__ import annotations

from multiprocessing import Pipe, Process
from pathlib import Path
from typing import Any, Dict, List

import numpy as np

from pySimBlocks.core.block import Block


[docs] def sofa_worker(conn, scene_file, input_keys, output_keys, sample_time, block_name): """Worker function executed in a subprocess to run the SOFA simulation.""" import os import sys import Sofa import importlib.util scene_dir = os.path.dirname(os.path.abspath(scene_file)) if scene_dir not in sys.path: sys.path.insert(0, scene_dir) spec = importlib.util.spec_from_file_location("scene", scene_file) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) root = Sofa.Core.Node("root") root, controller = mod.createScene(root) sofa_outputs_keys = set(controller.outputs.keys()) if not set(output_keys).issubset(sofa_outputs_keys): conn.send({ "cmd": "error", "message": ( f"\n[pySimBlocks] ERROR: Output key not found in controller outputs.\n" f"Available keys: {sofa_outputs_keys}\n" f"Provided keys: {set(output_keys)}\n" f"Check the 'output_keys' parameter in your project.yaml." ) }) conn.close() return sofa_inputs_keys = set(controller.inputs.keys()) if not set(input_keys).issubset(sofa_inputs_keys): conn.send({ "cmd": "error", "message": ( f"[pySimBlocks] ERROR: Input key not found in controller inputs.\n" f"Available keys: {sofa_inputs_keys}\n" f"Provided keys: {set(input_keys)}\n" f"Check the 'input_keys' parameter in your project.yaml." ) }) conn.close() return dt = float(root.dt.value) ratio = sample_time / dt if abs(ratio - round(ratio)) > 1e-9 or round(ratio) < 1: conn.send({ "cmd": "error", "message": ( f"[pySimBlocks] ERROR [{block_name}]: SofaPlant sample_time={sample_time}s " f"is not a positive integer multiple of SOFA scene dt={dt}s " f"(ratio={ratio:.6g})." ) }) conn.close() return ratio = int(round(ratio)) controller.SOFA_MASTER = False Sofa.Simulation.initRoot(root) while not controller.IS_READY: controller.prepare_scene() if controller.IS_READY: break Sofa.Simulation.animate(root, dt) try: controller.get_outputs() initial = {k: np.asarray(controller.outputs[k]).reshape(-1, 1) for k in output_keys} conn.send(initial) except Exception as e: conn.send({ "cmd": "error", "message": f"[pySimBlocks] ERROR: Failed to get initial outputs.\n{e}" }) conn.close() return while True: msg = conn.recv() if msg["cmd"] == "step": try: for key, val in msg["inputs"].items(): controller.inputs[key] = val controller.set_inputs() for _ in range(ratio): Sofa.Simulation.animate(root, dt) controller.get_outputs() outputs = {k: np.asarray(controller.outputs[k]).reshape(-1, 1) for k in output_keys} conn.send(outputs) except Exception as e: conn.send({ "cmd": "error", "message": f"[pySimBlocks] ERROR during step execution.\n{e}" }) break elif msg["cmd"] == "stop": break conn.close()
[docs] class SofaPlant(Block): """SOFA-based dynamic plant block. Executes a SOFA simulation as a dynamic system driven by pySimBlocks. SOFA runs in a separate subprocess. At each control step, inputs are sent to the worker process, the SOFA scene advances by one step, and updated outputs are returned. Attributes: scene_file: Resolved path to the SOFA scene file. input_keys: Names of input ports sent to SOFA at each step. output_keys: Names of output ports received from SOFA at each step. slider_params: Optional ImGui slider configuration, mapping ``"BlockName.attr"`` to ``[min, max]`` bounds. """ direct_feedthrough = False need_first = True def __init__( self, name: str, scene_file: str, input_keys: list[str], output_keys: list[str], slider_params: Dict[str, List[float]] | None = None, sample_time: float | None = None, ): """Initialize a SofaPlant block. Args: name: Unique identifier for this block instance. scene_file: Path to the SOFA scene file. Relative paths are resolved against the project file directory via ``adapt_params``. input_keys: Names of input ports to send to SOFA. output_keys: Names of output ports to receive from SOFA. slider_params: Optional ImGui slider configuration. None to disable sliders. sample_time: Sampling period in seconds, or None to use the global simulation dt. """ super().__init__(name, sample_time) self.scene_file = scene_file self.input_keys = input_keys self.output_keys = output_keys self.slider_params = slider_params for k in input_keys: self.inputs[k] = None self.next_outputs = {} for k in output_keys: self.outputs[k] = None self.state[k] = None self.next_state[k] = None self.process = None self.conn = None # -------------------------------------------------------------------------- # Class methods # --------------------------------------------------------------------------
[docs] @classmethod def adapt_params( cls, params: Dict[str, Any], params_dir: Path | None = None, ) -> Dict[str, Any]: """Resolve a relative ``scene_file`` path against the project directory. Args: params: Raw parameter dict loaded from the YAML project file. params_dir: Directory of the project file, for resolving relative paths. Must not be None. Returns: Parameter dict with ``scene_file`` resolved to an absolute path. Raises: ValueError: If ``params_dir`` is None or ``scene_file`` is missing from ``params``. """ if params_dir is None: raise ValueError("params_dir must be provided for SofaPlant adaptation") scene_file = params.get("scene_file") if scene_file is None: raise ValueError("Missing 'scene_file' parameter") path = Path(scene_file).expanduser() if not path.is_absolute(): path = (params_dir / path).resolve() adapted = dict(params) adapted["scene_file"] = str(path) return adapted
# -------------------------------------------------------------------------- # Public methods # --------------------------------------------------------------------------
[docs] def initialize(self, t0: float) -> None: """Start the SOFA worker process and receive initial outputs. Args: t0: Initial simulation time in seconds. Raises: RuntimeError: If the SOFA worker reports an error during startup. """ parent_conn, child_conn = Pipe() self.conn = parent_conn self.process = Process( target=sofa_worker, args=(child_conn, self.scene_file, self.input_keys, self.output_keys, self._effective_sample_time, self.name) ) self.process.start() initial_outputs = self._recv_or_raise() for k in self.output_keys: self.outputs[k] = initial_outputs[k] self.state[k] = initial_outputs[k] self.next_state[k] = initial_outputs[k]
[docs] def output_update(self, t: float, dt: float) -> None: """Forward the committed state outputs to the output ports. Args: t: Current simulation time in seconds. dt: Current time step in seconds. """ for key in self.output_keys: self.outputs[key] = self.state[key]
[docs] def state_update(self, t: float, dt: float) -> None: """Send inputs to SOFA, advance one step, and store the new outputs. Args: t: Current simulation time in seconds. dt: Current time step in seconds. Raises: RuntimeError: If any input is missing or the SOFA worker reports an error. """ msg = {"cmd": "step", "inputs": {}} for k in self.input_keys: val = self.inputs[k] if val is None: raise RuntimeError( f"[{self.name}] Input '{k}' is missing at time {t}." ) msg["inputs"][k] = val self.conn.send(msg) outputs = self._recv_or_raise(timeout=5.) for k in self.output_keys: self.next_state[k] = outputs[k]
[docs] def finalize(self) -> None: """Shut down the SOFA worker process cleanly.""" if self.conn: try: self.conn.send({"cmd": "stop"}) except Exception: pass try: self.conn.close() except Exception: pass if self.process: self.process.join(timeout=1.0) if self.process.is_alive(): self.process.kill()
# -------------------------------------------------------------------------- # Private methods # -------------------------------------------------------------------------- def _recv_or_raise(self, timeout: float = 30.0) -> Any: """Receive a message from the SOFA worker with timeout and crash detection. Args: timeout: Maximum seconds to wait for a response. Returns: The message received from the worker. Raises: RuntimeError: If the worker times out, has died, or reports an error. """ if not self.conn.poll(timeout): if not self.process.is_alive(): raise RuntimeError( f"[{self.name}] SOFA worker process died unexpectedly." ) raise RuntimeError( f"[{self.name}] SOFA worker timed out after {timeout}s." ) result = self.conn.recv() if isinstance(result, dict) and result.get("cmd") == "error": raise RuntimeError(result["message"]) return result def __del__(self) -> None: """Attempt to stop the worker process on garbage collection.""" if self.conn: try: self.conn.send({"cmd": "stop"}) except Exception: pass if self.process: self.process.join(timeout=0.5)