# ******************************************************************************
# pySimBlocks
# Copyright (c) 2026 Université de Lille & INRIA
# ******************************************************************************
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
# for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# ******************************************************************************
# Authors: see Authors.txt
# ******************************************************************************
from collections import deque
from pathlib import Path
from typing import Any, Dict, List, Tuple
from pySimBlocks.core.block import Block
# A connection is:
# ( (src_block, src_port), (dst_block, dst_port) )
Connection = Tuple[Tuple[str, str], Tuple[str, str]]
[docs]
class Model:
"""Discrete-time block-diagram model (Simulink-like).
Stores blocks and signal connections, builds the topological execution
order, and provides fast access to downstream connections.
Topological sorting is applied only to the combinational (direct-
feedthrough) graph. Stateful blocks act as cycle breakers, exactly as
Simulink handles algebraic loops (see Simulink PDF p.7).
Attributes:
name: Identifier for this model.
verbose: If True, print detailed execution-order build logs.
blocks: Registry of blocks keyed by name.
connections: List of signal connections.
"""
def __init__(
self,
name: str = "model",
model_data: Dict[str, Any] | None = None,
params_dir: Path | None = None,
verbose: bool = False,
):
"""Initialize a model.
Args:
name: Identifier for this model.
model_data: Optional dict loaded from a YAML project file.
If provided, blocks and connections are built immediately.
params_dir: Directory of the project file, for resolving
relative paths. None if not applicable.
verbose: If True, print execution-order build logs.
"""
self.name = name
self.verbose = verbose
self.blocks: Dict[str, Block] = {}
self.connections: List[Connection] = []
self._output_execution_order: List[Block] = []
self._state_execution_order: List[Block] = []
self._downstream_map: Dict[str, List[Connection]] = {}
self._connections_dirty: bool = True
if model_data is not None:
from pySimBlocks.project.build_model import build_model_from_dict
build_model_from_dict(self, model_data, params_dir=params_dir)
# --------------------------------------------------------------------------
# Public methods
# --------------------------------------------------------------------------
[docs]
def add_block(self, block: Block) -> Block:
"""Add a block to the model.
Args:
block: Block instance to register.
Returns:
The registered block.
Raises:
ValueError: If a block with the same name already exists.
"""
if block.name in self.blocks:
raise ValueError(f"Block name '{block.name}' already exists.")
self.blocks[block.name] = block
return block
[docs]
def get_block_by_name(self, name: str) -> Block:
"""Return a block by its name.
Args:
name: Name of the block to retrieve.
Returns:
The matching Block instance.
Raises:
ValueError: If no block with that name exists.
"""
if name not in self.blocks:
raise ValueError(
f"Block name '{name}' not found. Known blocks: {list(self.blocks.keys())}"
)
return self.blocks[name]
[docs]
def connect(self, src_block: str, src_port: str,
dst_block: str, dst_port: str) -> None:
"""Connect an output port to an input port.
Registers a connection from ``blocks[src_block].outputs[src_port]``
to ``blocks[dst_block].inputs[dst_port]``.
Args:
src_block: Name of the source block.
src_port: Name of the source output port.
dst_block: Name of the destination block.
dst_port: Name of the destination input port.
Raises:
ValueError: If src_block or dst_block is not registered.
"""
if src_block not in self.blocks:
raise ValueError(
f"Unknown source block '{src_block}'. "
f"Known blocks: {list(self.blocks.keys())}"
)
if dst_block not in self.blocks:
raise ValueError(
f"Unknown destination block '{dst_block}'. "
f"Known blocks: {list(self.blocks.keys())}"
)
self.connections.append(
((src_block, src_port), (dst_block, dst_port))
)
self._connections_dirty = True
[docs]
def build_execution_order(self):
"""Build the Simulink-like output execution order.
Runs a Kahn topological sort on the direct-feedthrough dependency
graph. Blocks without direct feedthrough act as cycle breakers.
Returns:
Ordered list of blocks for output_update execution.
Raises:
RuntimeError: If a direct-feedthrough cycle (algebraic loop)
is detected.
"""
blocks = self.blocks
names = list(blocks.keys())
vprint = print if self.verbose else (lambda *a, **k: None)
vprint("\n================= BUILD EXECUTION ORDER =================")
vprint(f"Blocks in model: {names}")
# STEP 1 — Build dependency graph
vprint("\n--- STEP 1: CONNECTION ANALYSIS (direct-feedthrough rules) ---")
graph = {name: [] for name in names}
indegree = {name: 0 for name in names}
for (src, dst) in self.connections:
src_block, src_port = src
dst_block, dst_port = dst
block_dist = blocks[dst_block]
if block_dist.direct_feedthrough:
graph[src_block].append(dst_block)
indegree[dst_block] += 1
vprint(f" DEPENDENCY: {src_block}.{src_port} -> {dst_block}.{dst_port} "
f"(direct-feedthrough)")
else:
vprint(f" NO DEPENDENCY: {src_block}.{src_port} -> {dst_block}.{dst_port} "
f"(destination NOT direct-feedthrough)")
# Show resulting graph
vprint("\nGraph adjacency list:")
for k, v in graph.items():
vprint(f" {k}: {v}")
vprint("\nInitial indegree:")
for k, v in indegree.items():
vprint(f" {k}: {v}")
# STEP 1b — Inject virtual edges from Goto → BusFrom (same tag)
for (goto_name, bus_from_name) in self._build_virtual_edges():
if goto_name in graph and bus_from_name in graph:
graph[goto_name].append(bus_from_name)
indegree[bus_from_name] += 1
vprint(f" VIRTUAL EDGE: {goto_name} -> {bus_from_name} (shared tag)")
# STEP 2 — Kahn topological sort
vprint("\n--- STEP 2: TOPOLOGICAL SORT ---")
ready = deque([b for b in names if indegree[b] == 0])
vprint(f"Initial READY queue: {list(ready)}")
execution_order = []
while ready:
current = ready.popleft()
execution_order.append(current)
vprint(f"\n==> EXECUTE: '{current}'")
# Decrease indegree for successors
for succ in graph[current]:
indegree[succ] -= 1
vprint(f" indegree[{succ}] -> {indegree[succ]}")
if indegree[succ] == 0:
ready.append(succ)
vprint(f" '{succ}' added to READY")
# STEP 3 — Detect algebraic loops
if len(execution_order) != len(names):
vprint("\n!!! ALGEBRAIC LOOP DETECTED !!!")
raise RuntimeError(
"Algebraic loop detected: direct-feedthrough cycle exists."
)
# STEP 4 — Final result
vprint("\n--- FINAL SIMULINK-LIKE EXECUTION ORDER ---")
for i, b in enumerate(execution_order, 1):
vprint(f" {i}. {b}")
vprint("========================================================\n")
# Final storage
self._output_execution_order = [blocks[n] for n in execution_order]
return self._output_execution_order
[docs]
def downstream_of(self, block_name: str) -> List[Connection]:
"""Return all connections where block_name is the source.
Args:
block_name: Name of the source block.
Returns:
List of connections originating from block_name.
"""
"""
Returns all connections where block_name is the source.
"""
if self._connections_dirty or not self._downstream_map:
self._rebuild_downstream_map()
return self._downstream_map.get(block_name, [])
[docs]
def execution_order(self) -> List[Block]:
"""Return the output execution order, building it if necessary.
Returns:
Ordered list of blocks for output_update execution.
"""
if not self._output_execution_order:
return self.build_execution_order()
return self._output_execution_order
[docs]
def predecessors_of(self, block_name):
"""Yield the names of all blocks that feed into block_name.
Args:
block_name: Name of the destination block.
Yields:
Source block names connected to block_name.
"""
for (src, dst) in self.connections:
if dst[0] == block_name:
yield src[0]
[docs]
def resolve_sample_times(self, dt) -> None:
"""Resolve effective sample times for all blocks.
Blocks with an explicit sample_time keep it; others inherit dt.
Args:
dt: Global simulation time step in seconds.
"""
for b in self.blocks.values():
if b.sample_time is None:
b._effective_sample_time = dt
else:
b._effective_sample_time = b.sample_time
# --------------------------------------------------------------------------
# Private methods
# --------------------------------------------------------------------------
def _rebuild_downstream_map(self) -> None:
"""Rebuild the downstream connection map from the current connections."""
downstream = {name: [] for name in self.blocks.keys()}
for (src, dst) in self.connections:
downstream[src[0]].append((src, dst))
self._downstream_map = downstream
self._connections_dirty = False
def _build_virtual_edges(self) -> List[Tuple[str, str]]:
"""Return virtual Goto → BusFrom edges for matching signal bus tags.
Iterates over all blocks in the model, collects Goto and BusFrom
instances grouped by tag, and returns one directed edge per
(Goto, BusFrom) pair that shares a tag. These edges are injected into
the topological sort graph so that every BusFrom executes after its
corresponding Goto within the same tick.
Local imports are used to avoid circular imports between the core
package and the blocks package.
Returns:
List of ``(goto_block_name, bus_from_block_name)`` tuples.
"""
from pySimBlocks.blocks.interfaces.goto import Goto
from pySimBlocks.blocks.interfaces.bus_from import BusFrom
tag_to_gotos: Dict[str, List[str]] = {}
tag_to_bus_froms: Dict[str, List[str]] = {}
for name, block in self.blocks.items():
if isinstance(block, Goto):
tag_to_gotos.setdefault(block.tag, []).append(name)
elif isinstance(block, BusFrom):
tag_to_bus_froms.setdefault(block.tag, []).append(name)
edges: List[Tuple[str, str]] = []
for tag, goto_names in tag_to_gotos.items():
for bus_from_name in tag_to_bus_froms.get(tag, []):
for goto_name in goto_names:
edges.append((goto_name, bus_from_name))
return edges