Source code for pySimBlocks.blocks.operators.sum
# ******************************************************************************
# pySimBlocks
# Copyright (c) 2026 Université de Lille & INRIA
# ******************************************************************************
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
# for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# ******************************************************************************
# Authors: see Authors.txt
# ******************************************************************************
from __future__ import annotations
import numpy as np
from pySimBlocks.core.block import Block
[docs]
class Sum(Block):
"""Multi-input signed summation block.
Computes an element-wise signed sum of multiple 2D input signals. All
non-scalar inputs must share the same shape; scalar (1,1) inputs are
broadcast to that shape.
Attributes:
signs: List of +1.0 or -1.0 coefficients, one per input port.
num_inputs: Number of input ports.
"""
direct_feedthrough = True
def __init__(
self,
name: str,
signs: str | None = None,
sample_time: float | None = None,
):
"""Initialize a Sum block.
Args:
name: Unique identifier for this block instance.
signs: Sequence of ``'+'`` and ``'-'`` defining the sign of each
input (e.g. ``'++-'``, ``'+-'``). Defaults to ``'++'`` (two
positive inputs).
sample_time: Sampling period in seconds, or None to use the global
simulation dt.
Raises:
TypeError: If ``signs`` is not a string.
ValueError: If ``signs`` is empty or contains characters other than
``'+'`` and ``'-'``.
"""
super().__init__(name, sample_time)
if signs is None:
signs = "++"
if not isinstance(signs, str):
raise TypeError(f"[{self.name}] 'signs' must be a str.")
if len(signs) == 0:
raise ValueError(f"[{self.name}] 'signs' must not be empty.")
if any(s not in ("+", "-") for s in signs):
raise ValueError(f"[{self.name}] 'signs' must contain only '+' or '-'.")
self.signs = [1.0 if s == "+" else -1.0 for s in signs]
self.num_inputs = len(self.signs)
for i in range(self.num_inputs):
self.inputs[f"in{i+1}"] = None
self.outputs["out"] = None
# --------------------------------------------------------------------------
# Public methods
# --------------------------------------------------------------------------
[docs]
def initialize(self, t0: float) -> None:
"""Compute the initial output if all inputs are available.
Args:
t0: Initial simulation time in seconds.
"""
if any(self.inputs[f"in{i+1}"] is None for i in range(self.num_inputs)):
self.outputs["out"] = None
return
self.outputs["out"] = self._compute_output()
[docs]
def output_update(self, t: float, dt: float) -> None:
"""Compute the signed element-wise sum and write it to the output port.
Args:
t: Current simulation time in seconds.
dt: Current time step in seconds.
Raises:
RuntimeError: If any input port is not connected.
ValueError: If any input is not 2D or non-scalar inputs have
inconsistent shapes.
"""
arrays = []
for i in range(self.num_inputs):
key = f"in{i+1}"
u = self.inputs[key]
if u is None:
raise RuntimeError(f"[{self.name}] Input '{key}' is not connected or not set.")
a = np.asarray(u, dtype=float)
if a.ndim != 2:
raise ValueError(
f"[{self.name}] Input '{key}' must be a 2D array. Got ndim={a.ndim} with shape {a.shape}."
)
arrays.append(a)
self.outputs["out"] = self._compute_output(prevalidated_arrays=arrays)
[docs]
def state_update(self, t: float, dt: float) -> None:
"""No-op: Sum is a stateless block.
Args:
t: Current simulation time in seconds.
dt: Current time step in seconds.
"""
return
# --------------------------------------------------------------------------
# Private methods
# --------------------------------------------------------------------------
def _resolve_common_shape(self, arrays: list[np.ndarray]) -> tuple[int, int]:
"""Determine the target shape from the set of input arrays."""
non_scalar_shapes = {a.shape for a in arrays if not self._is_scalar_2d(a)}
if len(non_scalar_shapes) == 0:
return (1, 1)
if len(non_scalar_shapes) == 1:
return next(iter(non_scalar_shapes))
raise ValueError(
f"[{self.name}] Inconsistent input shapes for Sum: "
f"{[a.shape for a in arrays]}. All non-scalar inputs must have the same shape."
)
def _broadcast_scalar_only(self, arr: np.ndarray, target_shape: tuple[int, int], input_name: str) -> np.ndarray:
"""Broadcast scalar (1,1) to target shape; reject non-scalar shape mismatches."""
if self._is_scalar_2d(arr):
if target_shape == (1, 1):
return arr.astype(float, copy=False)
return np.full(target_shape, float(arr[0, 0]), dtype=float)
if arr.shape != target_shape:
raise ValueError(
f"[{self.name}] Input '{input_name}' shape {arr.shape} is incompatible with target shape {target_shape}. "
f"Only scalar (1,1) inputs can be broadcast."
)
return arr.astype(float, copy=False)
def _compute_output(self, prevalidated_arrays: list[np.ndarray] | None = None) -> np.ndarray:
"""Compute the signed element-wise sum with scalar-only broadcasting."""
if prevalidated_arrays is None:
arrays = [np.asarray(self.inputs[f"in{i+1}"], dtype=float) for i in range(self.num_inputs)]
else:
arrays = prevalidated_arrays
target_shape = self._resolve_common_shape(arrays)
total = np.zeros(target_shape, dtype=float)
for i, (s, a) in enumerate(zip(self.signs, arrays), start=1):
a2 = self._broadcast_scalar_only(a, target_shape, input_name=f"in{i}")
total += s * a2
return total