# ******************************************************************************
# pySimBlocks
# Copyright (c) 2026 Université de Lille & INRIA
# ******************************************************************************
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
# for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# ******************************************************************************
# Authors: see Authors.txt
# ******************************************************************************
import os
import shutil
from pathlib import Path
from PySide6.QtCore import QProcess, QProcessEnvironment
from pySimBlocks.gui.models.project_state import ProjectState
from pySimBlocks.gui.project_controller import ProjectController
from pySimBlocks.gui.services.yaml_tools import (
cleanup_runtime_project_yaml,
runtime_project_yaml_path,
save_yaml,
)
from pySimBlocks.project.generate_sofa_controller import generate_sofa_controller
[docs]
class SofaService:
"""Manage SOFA-specific validation, export, and execution workflows.
Attributes:
project_state: Project state used to resolve blocks and files.
project_controller: Controller used to access current view state.
sofa_path: Path to the ``runSofa`` executable.
gui: Selected SOFA GUI backend.
scene_file: Resolved SOFA scene file path.
"""
def __init__(self, project_state: ProjectState, project_controller: ProjectController):
"""Initialize the SOFA service.
Args:
project_state: Project state used to resolve blocks and files.
project_controller: Controller used to access current view state.
Raises:
None.
"""
self.project_state = project_state
self.project_controller = project_controller
self.sofa_path = ""
self.gui = "imgui"
self.scene_file = ""
self._detect_sofa()
# --------------------------------------------------------------------------
# Public Methods
# --------------------------------------------------------------------------
[docs]
def get_scene_file(self):
"""Resolve and cache the scene file used by the SOFA block.
Returns:
Tuple containing success flag, title, and details message.
"""
flag, msg, details = self.can_use_sofa()
if flag:
sofa_block = [b for b in self.project_state.blocks if b.meta.type in ["sofa_plant", "sofa_exchange_i_o"]]
scene_param = sofa_block[0].parameters.get("scene_file")
if not scene_param:
return False, "No scene file", "scene_file parameter is missing."
try:
scene_path = self._resolve_scene_file(scene_param)
except Exception as e:
return False, "Invalid scene file", str(e)
if not scene_path.exists():
return False, "Incorrect Scene File", "The scene file does not exist."
self.scene_file = str(scene_path)
return True, "Scene File set", ""
else:
return flag, msg, details
[docs]
def can_use_sofa(self):
"""Check whether the current project can be driven by SOFA.
Returns:
Tuple containing success flag, title, and details message.
"""
sofa_block = [b for b in self.project_state.blocks if b.meta.type in ["sofa_plant", "sofa_exchange_i_o"]]
if len(sofa_block) == 0:
return False, "No SOFA block", "Please Add at least one sofa system."
elif len(sofa_block) > 1:
return False, "Multiple SOFA blocks", "Only one sofa system can be set to run sofa."
else:
return True, "Sofa can be master", "Only one system found. Diagram can be used from controller."
[docs]
def export_controller(self, window, saver):
"""Export the generated SOFA controller for the current project.
Args:
window: Main window used for save confirmation.
saver: Project saver used to persist the project before export.
Raises:
ValueError: If the project directory is not defined.
"""
if window.confirm_discard_or_save("exporting sofa"):
saver.save(self.project_controller.project_state, self.project_controller.view.block_items)
if self.project_state.directory_path is None:
raise ValueError("Project directory is not set.\nPlease define it in settings.")
generate_sofa_controller(self.project_state.directory_path)
[docs]
def run(self):
"""Run the configured SOFA scene and collect its execution output.
Returns:
Tuple containing success flag, title, and details message.
"""
env_ok, msg = self._check_sofa_environnment()
if not env_ok:
return False, "Environment error", msg
if not self.sofa_path or not os.path.exists(self.sofa_path):
return False, "runSofa not found", ""
if not self.scene_file or not os.path.exists(self.scene_file):
return False, "scene file not found", ""
project_dir = self.project_state.directory_path
if project_dir is None:
return {}, False, "Project directory is not set.\nPlease define it in settings."
runtime_yaml = runtime_project_yaml_path(project_dir)
cleanup_runtime_project_yaml(project_dir)
save_yaml(project_state=self.project_state, runtime=True)
try:
generate_sofa_controller(project_yaml=runtime_yaml)
except Exception as e:
cleanup_runtime_project_yaml(project_dir)
return False, "Could not update SOFA controller", str(e)
# set command
plugins = "SofaPython3"
if self.gui == "imgui":
plugins += ",SofaImgui"
args = ["-l", plugins, "-g", self.gui, self.scene_file]
self._full_log = ""
self.process = QProcess()
env = QProcessEnvironment.systemEnvironment()
self.process.setProcessEnvironment(env)
self.process.setWorkingDirectory(str(Path(self.scene_file).parent))
self.process.setProgram(self.sofa_path)
self.process.setArguments(args)
self.process.setProcessChannelMode(QProcess.MergedChannels)
self.process.readyReadStandardOutput.connect(
lambda: self._accumulate_output()
)
try:
self.process.start()
if not self.process.waitForStarted():
return False, "Launch failed", "runSofa could not start"
self.process.waitForFinished(-1)
try:
generate_sofa_controller(project_dir)
except Exception as e:
return False, "Could not regenerate controller", "project.yaml does not exist.\n" + str(e)
# get output results
full_log = self._full_log
exit_code = self.process.exitCode()
if exit_code != 0:
return False, "SOFA exited with error", f"exit code = {exit_code}\n\n{full_log}"
pysimblocks_errors = [
line for line in full_log.splitlines()
if "[pySimBlocks] ERROR" in line
]
if pysimblocks_errors:
return False, "pySimBlocks configuration error", "\n".join(pysimblocks_errors)
return True, "SOFA finished", "Process terminated correctly"
finally:
cleanup_runtime_project_yaml(project_dir)
# --------------------------------------------------------------------------
# Private Methods
# --------------------------------------------------------------------------
def _accumulate_output(self):
"""Append the latest process output chunk to the accumulated log."""
chunk = self.process.readAllStandardOutput().data().decode()
print(chunk, end="")
self._full_log += chunk
def _check_sofa_environnment(self):
"""Validate the environment variables required to run SOFA."""
sofa_root = os.environ.get("SOFA_ROOT")
if not sofa_root:
return False, "SOFA_ROOT is not set."
return True, "OK"
def _detect_sofa(self):
"""Detect the ``runSofa`` executable from environment or PATH."""
detected = None
sofa_root = os.environ.get("SOFA_ROOT")
if sofa_root:
bin_dir = Path(sofa_root) / "bin"
for candidate in ("runSofa", "runSofa.exe"):
potential_path = bin_dir / candidate
if potential_path.exists():
detected = str(potential_path)
break
if not detected:
detected = shutil.which("runSofa")
if not detected:
detected = shutil.which("runsofa")
if detected:
self.sofa_path = detected
def _resolve_scene_file(self, scene_file: str) -> Path:
"""Resolve a scene file path relative to the project directory."""
project_dir = self.project_state.directory_path
if project_dir is None:
raise RuntimeError("Project directory is not set")
path = Path(scene_file).expanduser()
if not path.is_absolute():
path = (project_dir / path).resolve()
return path