Source code for pySimBlocks.core.block_source
# ******************************************************************************
# pySimBlocks
# Copyright (c) 2026 Université de Lille & INRIA
# ******************************************************************************
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
# for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# ******************************************************************************
# Authors: see Authors.txt
# ******************************************************************************
import numpy as np
from pySimBlocks.core.block import Block
[docs]
class BlockSource(Block):
"""Base class for all source blocks (Constant, Step, Ramp, Sinusoidal, ...).
Provides normalization utilities for source parameters to produce 2D
signals, strict scalar-only broadcasting to a common 2D shape, and no
state update by default.
"""
direct_feedthrough = False
is_source = True
def __init__(self, name: str, sample_time: float | None = None):
"""Initialize a source block.
Args:
name: Unique identifier for this block instance.
sample_time: Sampling period in seconds, or None to use the
global simulation dt.
"""
super().__init__(name, sample_time)
# --------------------------------------------------------------------------
# Public methods
# --------------------------------------------------------------------------
[docs]
def state_update(self, t: float, dt: float) -> None:
"""No-op: all source blocks are stateless."""
# --------------------------------------------------------------------------
# Private methods
# --------------------------------------------------------------------------
def _resolve_common_shape(self, params: dict[str, np.ndarray]) -> tuple[int, int]:
"""Determine the common target shape among parameters.
Scalars (1,1) are broadcastable to any shape. Any non-scalar fixes
the target shape. Multiple non-scalars must all share the same shape.
If all parameters are scalar, returns (1,1).
Args:
params: Mapping of parameter names to their 2D arrays.
Returns:
The common (m, n) target shape.
Raises:
ValueError: If multiple non-scalar parameters have different shapes.
"""
non_scalar_shapes = {a.shape for a in params.values() if not self._is_scalar_2d(a)}
if len(non_scalar_shapes) == 0:
return (1, 1)
if len(non_scalar_shapes) == 1:
return next(iter(non_scalar_shapes))
details = ", ".join(f"{k}={v.shape}" for k, v in params.items())
raise ValueError(
f"[{self.name}] Inconsistent parameter shapes. "
f"All non-scalar parameters must have the same (m,n) shape. Got: {details}"
)
def _broadcast_scalar_only(self,
param_name: str,
arr: np.ndarray,
target_shape: tuple[int, int]) -> np.ndarray:
"""Broadcast a scalar (1,1) array to target_shape; non-scalars must match exactly.
Args:
param_name: Name of the parameter, used in error messages.
arr: 2D array to broadcast.
target_shape: Target (m, n) shape.
Returns:
Array of shape target_shape with dtype float.
Raises:
ValueError: If arr is non-scalar and does not match target_shape.
"""
if self._is_scalar_2d(arr):
if target_shape == (1, 1):
return arr.astype(float, copy=False)
return np.full(target_shape, float(arr[0, 0]), dtype=float)
if arr.shape != target_shape:
raise ValueError(
f"[{self.name}] '{param_name}' shape {arr.shape} is incompatible with "
f"target shape {target_shape}. Only scalar-to-shape broadcasting is allowed."
)
return arr.astype(float, copy=False)