Source code for pySimBlocks.core.model

# ******************************************************************************
#                                  pySimBlocks
#                     Copyright (c) 2026 Université de Lille & INRIA
# ******************************************************************************
#  This program is free software: you can redistribute it and/or modify it
#  under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or (at your
#  option) any later version.
#
#  This program is distributed in the hope that it will be useful, but WITHOUT
#  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
#  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License
#  for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with this program.  If not, see <https://www.gnu.org/licenses/>.
# ******************************************************************************
#  Authors: see Authors.txt
# ******************************************************************************

from collections import deque
from pathlib import Path
from typing import Any, Dict, List, Tuple

from pySimBlocks.core.block import Block

# A connection is:
#    ( (src_block, src_port), (dst_block, dst_port) )
Connection = Tuple[Tuple[str, str], Tuple[str, str]]


[docs] class Model: """Discrete-time block-diagram model (Simulink-like). Stores blocks and signal connections, builds the topological execution order, and provides fast access to downstream connections. Topological sorting is applied only to the combinational (direct- feedthrough) graph. Stateful blocks act as cycle breakers, exactly as Simulink handles algebraic loops (see Simulink PDF p.7). Attributes: name: Identifier for this model. verbose: If True, print detailed execution-order build logs. blocks: Registry of blocks keyed by name. connections: List of signal connections. """ def __init__( self, name: str = "model", model_data: Dict[str, Any] | None = None, params_dir: Path | None = None, verbose: bool = False, ): """Initialize a model. Args: name: Identifier for this model. model_data: Optional dict loaded from a YAML project file. If provided, blocks and connections are built immediately. params_dir: Directory of the project file, for resolving relative paths. None if not applicable. verbose: If True, print execution-order build logs. """ self.name = name self.verbose = verbose self.blocks: Dict[str, Block] = {} self.connections: List[Connection] = [] self._output_execution_order: List[Block] = [] self._state_execution_order: List[Block] = [] self._downstream_map: Dict[str, List[Connection]] = {} self._connections_dirty: bool = True if model_data is not None: from pySimBlocks.project.build_model import build_model_from_dict build_model_from_dict(self, model_data, params_dir=params_dir) # -------------------------------------------------------------------------- # Public methods # --------------------------------------------------------------------------
[docs] def add_block(self, block: Block) -> Block: """Add a block to the model. Args: block: Block instance to register. Returns: The registered block. Raises: ValueError: If a block with the same name already exists. """ if block.name in self.blocks: raise ValueError(f"Block name '{block.name}' already exists.") self.blocks[block.name] = block return block
[docs] def get_block_by_name(self, name: str) -> Block: """Return a block by its name. Args: name: Name of the block to retrieve. Returns: The matching Block instance. Raises: ValueError: If no block with that name exists. """ if name not in self.blocks: raise ValueError( f"Block name '{name}' not found. Known blocks: {list(self.blocks.keys())}" ) return self.blocks[name]
[docs] def connect(self, src_block: str, src_port: str, dst_block: str, dst_port: str) -> None: """Connect an output port to an input port. Registers a connection from ``blocks[src_block].outputs[src_port]`` to ``blocks[dst_block].inputs[dst_port]``. Args: src_block: Name of the source block. src_port: Name of the source output port. dst_block: Name of the destination block. dst_port: Name of the destination input port. Raises: ValueError: If src_block or dst_block is not registered. """ if src_block not in self.blocks: raise ValueError( f"Unknown source block '{src_block}'. " f"Known blocks: {list(self.blocks.keys())}" ) if dst_block not in self.blocks: raise ValueError( f"Unknown destination block '{dst_block}'. " f"Known blocks: {list(self.blocks.keys())}" ) self.connections.append( ((src_block, src_port), (dst_block, dst_port)) ) self._connections_dirty = True
[docs] def build_execution_order(self): """Build the Simulink-like output execution order. Runs a Kahn topological sort on the direct-feedthrough dependency graph. Blocks without direct feedthrough act as cycle breakers. Returns: Ordered list of blocks for output_update execution. Raises: RuntimeError: If a direct-feedthrough cycle (algebraic loop) is detected. """ blocks = self.blocks names = list(blocks.keys()) vprint = print if self.verbose else (lambda *a, **k: None) vprint("\n================= BUILD EXECUTION ORDER =================") vprint(f"Blocks in model: {names}") # STEP 1 — Build dependency graph vprint("\n--- STEP 1: CONNECTION ANALYSIS (direct-feedthrough rules) ---") graph = {name: [] for name in names} indegree = {name: 0 for name in names} for (src, dst) in self.connections: src_block, src_port = src dst_block, dst_port = dst block_dist = blocks[dst_block] if block_dist.direct_feedthrough: graph[src_block].append(dst_block) indegree[dst_block] += 1 vprint(f" DEPENDENCY: {src_block}.{src_port} -> {dst_block}.{dst_port} " f"(direct-feedthrough)") else: vprint(f" NO DEPENDENCY: {src_block}.{src_port} -> {dst_block}.{dst_port} " f"(destination NOT direct-feedthrough)") # Show resulting graph vprint("\nGraph adjacency list:") for k, v in graph.items(): vprint(f" {k}: {v}") vprint("\nInitial indegree:") for k, v in indegree.items(): vprint(f" {k}: {v}") # STEP 2 — Kahn topological sort vprint("\n--- STEP 2: TOPOLOGICAL SORT ---") ready = deque([b for b in names if indegree[b] == 0]) vprint(f"Initial READY queue: {list(ready)}") execution_order = [] while ready: current = ready.popleft() execution_order.append(current) vprint(f"\n==> EXECUTE: '{current}'") # Decrease indegree for successors for succ in graph[current]: indegree[succ] -= 1 vprint(f" indegree[{succ}] -> {indegree[succ]}") if indegree[succ] == 0: ready.append(succ) vprint(f" '{succ}' added to READY") # STEP 3 — Detect algebraic loops if len(execution_order) != len(names): vprint("\n!!! ALGEBRAIC LOOP DETECTED !!!") raise RuntimeError( "Algebraic loop detected: direct-feedthrough cycle exists." ) # STEP 4 — Final result vprint("\n--- FINAL SIMULINK-LIKE EXECUTION ORDER ---") for i, b in enumerate(execution_order, 1): vprint(f" {i}. {b}") vprint("========================================================\n") # Final storage self._output_execution_order = [blocks[n] for n in execution_order] return self._output_execution_order
[docs] def downstream_of(self, block_name: str) -> List[Connection]: """Return all connections where block_name is the source. Args: block_name: Name of the source block. Returns: List of connections originating from block_name. """ """ Returns all connections where block_name is the source. """ if self._connections_dirty or not self._downstream_map: self._rebuild_downstream_map() return self._downstream_map.get(block_name, [])
[docs] def execution_order(self) -> List[Block]: """Return the output execution order, building it if necessary. Returns: Ordered list of blocks for output_update execution. """ if not self._output_execution_order: return self.build_execution_order() return self._output_execution_order
[docs] def predecessors_of(self, block_name): """Yield the names of all blocks that feed into block_name. Args: block_name: Name of the destination block. Yields: Source block names connected to block_name. """ for (src, dst) in self.connections: if dst[0] == block_name: yield src[0]
[docs] def resolve_sample_times(self, dt) -> None: """Resolve effective sample times for all blocks. Blocks with an explicit sample_time keep it; others inherit dt. Args: dt: Global simulation time step in seconds. """ for b in self.blocks.values(): if b.sample_time is None: b._effective_sample_time = dt else: b._effective_sample_time = b.sample_time
# -------------------------------------------------------------------------- # Private methods # -------------------------------------------------------------------------- def _rebuild_downstream_map(self) -> None: """Rebuild the downstream connection map from the current connections.""" downstream = {name: [] for name in self.blocks.keys()} for (src, dst) in self.connections: downstream[src[0]].append((src, dst)) self._downstream_map = downstream self._connections_dirty = False