Source code for pySimBlocks.gui.addons.sofa.sofa_service

# ******************************************************************************
#                                  pySimBlocks
#                     Copyright (c) 2026 Université de Lille & INRIA
# ******************************************************************************
#  This program is free software: you can redistribute it and/or modify it
#  under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or (at your
#  option) any later version.
#
#  This program is distributed in the hope that it will be useful, but WITHOUT
#  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
#  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License
#  for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with this program.  If not, see <https://www.gnu.org/licenses/>.
# ******************************************************************************
#  Authors: see Authors.txt
# ******************************************************************************

import os
import shutil
from pathlib import Path

from PySide6.QtCore import QProcess, QProcessEnvironment

from pySimBlocks.gui.models.project_state import ProjectState
from pySimBlocks.gui.project_controller import ProjectController
from pySimBlocks.gui.services.yaml_tools import (
    cleanup_runtime_project_yaml,
    runtime_project_yaml_path,
    save_yaml,
)
from pySimBlocks.project.generate_sofa_controller import generate_sofa_controller


[docs] class SofaService: """Manage SOFA-specific validation, export, and execution workflows. Attributes: project_state: Project state used to resolve blocks and files. project_controller: Controller used to access current view state. sofa_path: Path to the ``runSofa`` executable. gui: Selected SOFA GUI backend. scene_file: Resolved SOFA scene file path. """ def __init__(self, project_state: ProjectState, project_controller: ProjectController): """Initialize the SOFA service. Args: project_state: Project state used to resolve blocks and files. project_controller: Controller used to access current view state. Raises: None. """ self.project_state = project_state self.project_controller = project_controller self.sofa_path = "" self.gui = "imgui" self.scene_file = "" self._detect_sofa() # -------------------------------------------------------------------------- # Public Methods # --------------------------------------------------------------------------
[docs] def get_scene_file(self): """Resolve and cache the scene file used by the SOFA block. Returns: Tuple containing success flag, title, and details message. """ flag, msg, details = self.can_use_sofa() if flag: sofa_block = [b for b in self.project_state.blocks if b.meta.type in ["sofa_plant", "sofa_exchange_i_o"]] scene_param = sofa_block[0].parameters.get("scene_file") if not scene_param: return False, "No scene file", "scene_file parameter is missing." try: scene_path = self._resolve_scene_file(scene_param) except Exception as e: return False, "Invalid scene file", str(e) if not scene_path.exists(): return False, "Incorrect Scene File", "The scene file does not exist." self.scene_file = str(scene_path) return True, "Scene File set", "" else: return flag, msg, details
[docs] def can_use_sofa(self): """Check whether the current project can be driven by SOFA. Returns: Tuple containing success flag, title, and details message. """ sofa_block = [b for b in self.project_state.blocks if b.meta.type in ["sofa_plant", "sofa_exchange_i_o"]] if len(sofa_block) == 0: return False, "No SOFA block", "Please Add at least one sofa system." elif len(sofa_block) > 1: return False, "Multiple SOFA blocks", "Only one sofa system can be set to run sofa." else: return True, "Sofa can be master", "Only one system found. Diagram can be used from controller."
[docs] def export_controller(self, window, saver): """Export the generated SOFA controller for the current project. Args: window: Main window used for save confirmation. saver: Project saver used to persist the project before export. Raises: ValueError: If the project directory is not defined. """ if window.confirm_discard_or_save("exporting sofa"): saver.save(self.project_controller.project_state, self.project_controller.view.block_items) if self.project_state.directory_path is None: raise ValueError("Project directory is not set.\nPlease define it in settings.") generate_sofa_controller(self.project_state.directory_path)
[docs] def run(self): """Run the configured SOFA scene and collect its execution output. Returns: Tuple containing success flag, title, and details message. """ env_ok, msg = self._check_sofa_environnment() if not env_ok: return False, "Environment error", msg if not self.sofa_path or not os.path.exists(self.sofa_path): return False, "runSofa not found", "" if not self.scene_file or not os.path.exists(self.scene_file): return False, "scene file not found", "" project_dir = self.project_state.directory_path if project_dir is None: return {}, False, "Project directory is not set.\nPlease define it in settings." runtime_yaml = runtime_project_yaml_path(project_dir) cleanup_runtime_project_yaml(project_dir) save_yaml(project_state=self.project_state, runtime=True) try: generate_sofa_controller(project_yaml=runtime_yaml) except Exception as e: cleanup_runtime_project_yaml(project_dir) return False, "Could not update SOFA controller", str(e) # set command plugins = "SofaPython3" if self.gui == "imgui": plugins += ",SofaImgui" args = ["-l", plugins, "-g", self.gui, self.scene_file] self._full_log = "" self.process = QProcess() env = QProcessEnvironment.systemEnvironment() self.process.setProcessEnvironment(env) self.process.setWorkingDirectory(str(Path(self.scene_file).parent)) self.process.setProgram(self.sofa_path) self.process.setArguments(args) self.process.setProcessChannelMode(QProcess.MergedChannels) self.process.readyReadStandardOutput.connect( lambda: self._accumulate_output() ) try: self.process.start() if not self.process.waitForStarted(): return False, "Launch failed", "runSofa could not start" self.process.waitForFinished(-1) try: generate_sofa_controller(project_dir) except Exception as e: return False, "Could not regenerate controller", "project.yaml does not exist.\n" + str(e) # get output results full_log = self._full_log exit_code = self.process.exitCode() if exit_code != 0: return False, "SOFA exited with error", f"exit code = {exit_code}\n\n{full_log}" pysimblocks_errors = [ line for line in full_log.splitlines() if "[pySimBlocks] ERROR" in line ] if pysimblocks_errors: return False, "pySimBlocks configuration error", "\n".join(pysimblocks_errors) return True, "SOFA finished", "Process terminated correctly" finally: cleanup_runtime_project_yaml(project_dir)
# -------------------------------------------------------------------------- # Private Methods # -------------------------------------------------------------------------- def _accumulate_output(self): """Append the latest process output chunk to the accumulated log.""" chunk = self.process.readAllStandardOutput().data().decode() print(chunk, end="") self._full_log += chunk def _check_sofa_environnment(self): """Validate the environment variables required to run SOFA.""" sofa_root = os.environ.get("SOFA_ROOT") if not sofa_root: return False, "SOFA_ROOT is not set." return True, "OK" def _detect_sofa(self): """Detect the ``runSofa`` executable from environment or PATH.""" detected = None sofa_root = os.environ.get("SOFA_ROOT") if sofa_root: bin_dir = Path(sofa_root) / "bin" for candidate in ("runSofa", "runSofa.exe"): potential_path = bin_dir / candidate if potential_path.exists(): detected = str(potential_path) break if not detected: detected = shutil.which("runSofa") if not detected: detected = shutil.which("runsofa") if detected: self.sofa_path = detected def _resolve_scene_file(self, scene_file: str) -> Path: """Resolve a scene file path relative to the project directory.""" project_dir = self.project_state.directory_path if project_dir is None: raise RuntimeError("Project directory is not set") path = Path(scene_file).expanduser() if not path.is_absolute(): path = (project_dir / path).resolve() return path