Source code for pySimBlocks.core.block

# ******************************************************************************
#                                  pySimBlocks
#                     Copyright (c) 2026 Université de Lille & INRIA
# ******************************************************************************
#  This program is free software: you can redistribute it and/or modify it
#  under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or (at your
#  option) any later version.
#
#  This program is distributed in the hope that it will be useful, but WITHOUT
#  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
#  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License
#  for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with this program.  If not, see <https://www.gnu.org/licenses/>.
# ******************************************************************************
#  Authors: see Authors.txt
# ******************************************************************************

from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, Dict

import numpy as np


[docs] class Block(ABC): """Base class for all discrete-time blocks (Simulink-like). A block follows two-phase execution per timestep: output_update computes y[k] from x[k] and u[k], then state_update computes x[k+1] from x[k] and u[k]. Attributes: name: Unique identifier for this block instance. sample_time: Sampling period in seconds, or None to use the global dt. inputs: Input port values, set by the simulator each step. outputs: Output port values, written by output_update. state: Committed state x[k]. next_state: Pending state x[k+1], written by state_update. """ direct_feedthrough = True """True if outputs depend directly on inputs.""" is_source = False """True if the block produces signals with no inputs.""" def __init__(self, name: str, sample_time: float | None = None): """Initialize a block. Args: name: Unique identifier for this block instance. sample_time: Sampling period in seconds, or None to use the global simulation dt. Raises: ValueError: If sample_time is provided but not strictly positive. """ self.name = name if sample_time is not None and sample_time <= 0: raise ValueError(f"[{self.name}] sample_time must be > 0.") self.sample_time = sample_time self.inputs: Dict[str, np.ndarray] = {} self.outputs: Dict[str, np.ndarray] = {} self.state: Dict[str, np.ndarray] = {} self.next_state: Dict[str, np.ndarray] = {} self._effective_sample_time = 0. # -------------------------------------------------------------------------- # Class Methods # --------------------------------------------------------------------------
[docs] @classmethod def adapt_params(cls, params: Dict[str, Any], params_dir: Path | None = None) -> Dict[str, Any]: """Adapt parameters from YAML format to constructor format. Args: params: Raw parameter dict loaded from the YAML project file. params_dir: Directory of the project file, for resolving relative paths. None if not applicable. Returns: Parameter dict ready to be passed to the block constructor. """ return params
# -------------------------------------------------------------------------- # Public Methods # -------------------------------------------------------------------------- @property def has_state(self) -> bool: """True if the block carries internal state.""" return bool(self.state) or bool(self.next_state)
[docs] @abstractmethod def initialize(self, t0: float): """Initialize state x[0] and outputs y[0]. Must populate self.state and self.outputs before the first step. Args: t0: Initial simulation time in seconds. """
[docs] @abstractmethod def output_update(self, t: float, dt: float): """Compute outputs y[k] from x[k] and inputs u[k]. Called before state_update each timestep. Must write to self.outputs. Args: t: Current simulation time in seconds. dt: Current time step in seconds. """
[docs] @abstractmethod def state_update(self, t: float, dt: float): """Compute next state x[k+1] from x[k] and inputs u[k]. Must write to self.next_state. Args: t: Current simulation time in seconds. dt: Current time step in seconds. """
[docs] def commit_state(self): """Copy x[k+1] into x[k] to finalize the timestep. Called by the simulator after all blocks have completed state_update. """ for key, value in self.next_state.items(): self.state[key] = np.copy(value)
[docs] def finalize(self): """Clean up resources at the end of the simulation."""
# -------------------------------------------------------------------------- # Private Methods # -------------------------------------------------------------------------- @staticmethod def _is_scalar_2d(arr: np.ndarray) -> bool: """True if arr has shape (1, 1).""" return arr.shape == (1, 1) def _to_2d_array(self, param_name: str, value, *, dtype=float) -> np.ndarray: """Normalize value to a 2D column-oriented array. scalar -> (1,1), 1D (n,) -> (n,1), 2D -> preserved, ndim>2 -> error. Args: param_name: Name of the parameter, used in error messages. value: Input value to normalize. dtype: Target NumPy dtype. Raises: ValueError: If value has more than 2 dimensions. """ arr = np.asarray(value, dtype=dtype) if arr.ndim == 0: return arr.reshape(1, 1) if arr.ndim == 1: return arr.reshape(-1, 1) if arr.ndim == 2: if arr.shape[0] == 1 and arr.shape[1] != 1: return arr.reshape(-1, 1) return arr raise ValueError( f"[{self.name}] '{param_name}' must be scalar, 1D, or 2D array-like. " f"Got ndim={arr.ndim} with shape {arr.shape}." )