Source code for pySimBlocks.project.generate_sofa_controller

# ******************************************************************************
#                                  pySimBlocks
#                     Copyright (c) 2026 Université de Lille & INRIA
# ******************************************************************************
#  This program is free software: you can redistribute it and/or modify it
#  under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or (at your
#  option) any later version.
#
#  This program is distributed in the hope that it will be useful, but WITHOUT
#  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
#  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License
#  for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with this program.  If not, see <https://www.gnu.org/licenses/>.
# ******************************************************************************
#  Authors: see Authors.txt
# ******************************************************************************

from __future__ import annotations

import importlib.util
import inspect
import os
import re
import sys
from multiprocessing import Pipe, Process
from pathlib import Path

import yaml


def _load_scene_in_subprocess(scene_path, conn) -> None:
    """Load a SOFA scene in a subprocess and send back the controller source file path."""
    try:
        scene_path = Path(scene_path).resolve()
        scene_dir = scene_path.parent
        if str(scene_dir) not in sys.path:
            sys.path.insert(0, str(scene_dir))

        spec = importlib.util.spec_from_file_location("scene", scene_path)
        mod = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(mod)

        import Sofa
        root = Sofa.Core.Node("root")

        out = mod.createScene(root)
        if not isinstance(out, (list, tuple)) or len(out) < 2:
            conn.send(None)
            return

        controller = out[1]
        controller_file = inspect.getsourcefile(controller.__class__)
        conn.send(controller_file)

    except Exception as e:
        print(f"Error {e}")
        conn.send(None)

    finally:
        conn.close()


[docs] def detect_controller_file_from_scene(scene_file: Path) -> Path: """Determine the controller source file by loading the SOFA scene in a subprocess. Args: scene_file: Path to the SOFA Python scene file. The scene's ``createScene`` function must return ``(root, controller)``. Returns: Path to the Python source file that defines the controller class. Raises: RuntimeError: If the controller file cannot be determined (e.g. the scene does not return a controller). """ parent_conn, child_conn = Pipe() p = Process(target=_load_scene_in_subprocess, args=(scene_file, child_conn)) p.start() try: controller_path = parent_conn.recv() except EOFError: controller_path = None p.join() if controller_path is None: raise RuntimeError( f"Unable to determine controller file from scene {scene_file}. " "Ensure createScene(root) returns (root, controller)." ) return Path(controller_path)
[docs] def inject_base_dir(src: str) -> str: """Inject a ``BASE_DIR`` declaration after the last import statement if not present. Args: src: Source code string of the controller file. Returns: Source code with ``BASE_DIR = Path(__file__).resolve().parent`` injected. """ if "BASE_DIR = Path(__file__).resolve().parent" in src: return src injection = ( "from pathlib import Path\n\n" "BASE_DIR = Path(__file__).resolve().parent\n\n" ) import_block = list(re.finditer(r"^(import|from)\s+.+$", src, re.MULTILINE)) if import_block: last = import_block[-1] insert_at = last.end() return src[:insert_at] + "\n\n" + injection + src[insert_at:] return injection + src
[docs] def inject_project_path_into_controller( controller_file: Path, project_yaml: Path, ) -> None: """Inject or update the ``self.project_yaml`` assignment in the controller ``__init__``. Args: controller_file: Path to the controller Python source file. project_yaml: Path to the ``project.yaml`` file to reference from the controller. """ src = controller_file.read_text() src = inject_base_dir(src) controller_dir = controller_file.parent project_yaml = project_yaml.resolve() try: rel_project = Path(os.path.relpath(project_yaml, controller_dir)) project_expr = f"(BASE_DIR / {rel_project.as_posix()!r}).resolve()" except ValueError: project_expr = f"Path({project_yaml.as_posix()!r}).resolve()" expr = f"self.project_yaml = str({project_expr})" pattern = r"self\.project_yaml\s*=.*" if re.search(pattern, src): src = re.sub(pattern, expr, src) else: src = src.replace( "super().__init__(name=name)", f"super().__init__(name=name)\n {expr}", ) controller_file.write_text(src)
def _load_project_yaml(project_yaml: Path) -> dict: """Load and return a project YAML file as a dict.""" if not project_yaml.exists(): raise FileNotFoundError(f"project.yaml not found: {project_yaml}") raw = yaml.safe_load(project_yaml.read_text()) or {} if not isinstance(raw, dict): raise ValueError("project.yaml must define a YAML mapping") return raw def _find_sofa_block(raw_project: dict) -> dict: """Return the first SofaPlant or SofaExchangeIO block dict from the project diagram.""" diagram = raw_project.get("diagram", {}) if not isinstance(diagram, dict): raise ValueError("'diagram' section must be a mapping") blocks = diagram.get("blocks", []) if not isinstance(blocks, list): raise ValueError("'diagram.blocks' section must be a list") sofa_block = next( ( b for b in blocks if isinstance(b, dict) and str(b.get("type", "")).lower() in ("sofa_plant", "sofa_exchange_i_o") ), None, ) if sofa_block is None: raise RuntimeError( "No SofaPlant or SofaExchangeIO block found in project.yaml" ) return sofa_block def _resolve_scene_file(project_yaml: Path, sofa_block: dict) -> Path: """Resolve the absolute scene file path from a SOFA block's parameters.""" params = sofa_block.get("parameters", {}) if not isinstance(params, dict): raise ValueError( f"'diagram.blocks[{sofa_block.get('name', '?')}].parameters' must be a mapping" ) scene_file = params.get("scene_file", None) if not isinstance(scene_file, str) or not scene_file: raise KeyError( f"'scene_file' must be defined in parameters for block '{sofa_block.get('name', '?')}'" ) path = Path(scene_file).expanduser() if not path.is_absolute(): path = (project_yaml.parent / path).resolve() return path
[docs] def generate_sofa_controller( project_dir: Path | None = None, project_yaml: Path | None = None, ) -> None: """Update the SOFA controller file with the project YAML path. Finds the SOFA scene file from the project, detects the controller class, and injects or replaces the ``self.project_yaml`` assignment so the controller can locate the project at runtime. Exactly one of ``project_dir`` or ``project_yaml`` must be provided. Args: project_dir: Path to a project folder containing ``project.yaml``. project_yaml: Explicit path to a ``project.yaml`` file. Raises: ValueError: If both or neither of ``project_dir`` / ``project_yaml`` are given. FileNotFoundError: If ``project.yaml`` or the scene file is not found. RuntimeError: If no SOFA block is found in the project or the controller file cannot be detected. """ has_project_path = project_yaml is not None if project_dir and has_project_path: raise ValueError("Cannot use project_dir together with project_yaml.") if not project_dir and not has_project_path: raise ValueError("You must specify either project_dir or project_yaml.") if project_dir: project_yaml = Path(project_dir).resolve() / "project.yaml" else: project_yaml = Path(project_yaml).resolve() raw_project = _load_project_yaml(project_yaml) sofa_block = _find_sofa_block(raw_project) scene_file = _resolve_scene_file(project_yaml, sofa_block) controller_file = detect_controller_file_from_scene(scene_file) inject_project_path_into_controller(controller_file, project_yaml) print(f"[pySimBlocks] SOFA controller updated: {controller_file}")