Source code for pySimBlocks.blocks.operators.product
# ******************************************************************************
# pySimBlocks
# Copyright (c) 2026 Université de Lille & INRIA
# ******************************************************************************
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
# for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# ******************************************************************************
# Authors: see Authors.txt
# ******************************************************************************
from __future__ import annotations
import numpy as np
from pySimBlocks.core.block import Block
[docs]
class Product(Block):
"""Multi-input product block.
Computes a product or division of multiple 2D input signals. The number of
inputs is ``len(operations) + 1``. Two multiplication modes are supported:
- **Element-wise**: applies ``*`` and ``/`` component-wise with scalar
(1,1) broadcasting only.
- **Matrix**: applies ``@`` sequentially; division is not supported.
Input shapes are frozen per port after their first use.
Attributes:
operations: String of ``'*'`` and ``'/'`` operators, one per adjacent
pair of inputs.
multiplication: Active multiplication mode string.
num_inputs: Total number of input ports.
"""
direct_feedthrough = True
def __init__(
self,
name: str,
operations: str | None = None,
multiplication: str = "Element-wise (*)",
sample_time: float | None = None,
):
"""Initialize a Product block.
Args:
name: Unique identifier for this block instance.
operations: String of ``'*'`` and ``'/'`` operators between inputs.
Defaults to ``'*'`` (two inputs, one multiplication).
multiplication: Multiplication mode. Must be ``'Element-wise (*)'``
or ``'Matrix (@)'``.
sample_time: Sampling period in seconds, or None to use the global
simulation dt.
Raises:
TypeError: If ``operations`` or ``multiplication`` are not strings.
ValueError: If ``operations`` contains unsupported characters, if
``multiplication`` is not a valid mode, or if ``'/'`` is used
in matrix mode.
"""
super().__init__(name, sample_time)
if operations is None:
operations = "*"
if not isinstance(operations, str):
raise TypeError(f"[{self.name}] 'operations' must be a str.")
if any(op not in ("*", "/") for op in operations):
raise ValueError(f"[{self.name}] 'operations' must contain only '*' or '/'.")
if not isinstance(multiplication, str):
raise TypeError(f"[{self.name}] 'multiplication' must be a str.")
if multiplication not in ("Element-wise (*)", "Matrix (@)"):
raise ValueError(
f"[{self.name}] 'multiplication' must be 'Element-wise (*)' or 'Matrix (@)'."
)
if multiplication == "Matrix (@)" and "/" in operations:
raise ValueError(f"[{self.name}] Division '/' is not supported in 'Matrix (@)' mode.")
self.operations = operations
self.multiplication = multiplication
self.num_inputs = len(self.operations) + 1
for i in range(self.num_inputs):
self.inputs[f"in{i+1}"] = None
self.outputs["out"] = None
self._input_shapes: dict[str, tuple[int, int]] = {}
# --------------------------------------------------------------------------
# Public methods
# --------------------------------------------------------------------------
[docs]
def initialize(self, t0: float) -> None:
"""Compute the initial output if all inputs are available.
Args:
t0: Initial simulation time in seconds.
"""
for i in range(self.num_inputs):
if self.inputs[f"in{i+1}"] is None:
self.outputs["out"] = None
return
self.outputs["out"] = self._compute_output()
[docs]
def output_update(self, t: float, dt: float) -> None:
"""Compute the product and write the result to the output port.
Args:
t: Current simulation time in seconds.
dt: Current time step in seconds.
Raises:
RuntimeError: If any input port is not connected.
ValueError: If input shapes are inconsistent or incompatible with
the multiplication mode.
"""
self.outputs["out"] = self._compute_output()
[docs]
def state_update(self, t: float, dt: float) -> None:
"""No-op: Product is a stateless block.
Args:
t: Current simulation time in seconds.
dt: Current time step in seconds.
"""
pass
# --------------------------------------------------------------------------
# Private methods
# --------------------------------------------------------------------------
def _get_input_2d(self, port: str) -> np.ndarray:
"""Retrieve, validate, and shape-freeze a single input port."""
u = self.inputs[port]
if u is None:
raise RuntimeError(f"[{self.name}] Input '{port}' is not connected or not set.")
u_arr = self._to_2d_array(port, u)
if port not in self._input_shapes:
self._input_shapes[port] = u_arr.shape
elif u_arr.shape != self._input_shapes[port]:
raise ValueError(
f"[{self.name}] Input '{port}' shape changed: expected {self._input_shapes[port]}, got {u_arr.shape}."
)
return u_arr
def _compute_output(self) -> np.ndarray:
"""Compute the product of all inputs according to the multiplication mode."""
arrays = [self._get_input_2d(f"in{i+1}") for i in range(self.num_inputs)]
if self.multiplication == "Element-wise (*)":
non_scalar_shapes = {a.shape for a in arrays if not self._is_scalar_2d(a)}
if len(non_scalar_shapes) > 1:
raise ValueError(
f"[{self.name}] Incompatible input shapes for element-wise product: {sorted(non_scalar_shapes)}."
)
target_shape = (1, 1) if len(non_scalar_shapes) == 0 else next(iter(non_scalar_shapes))
def expand(a: np.ndarray) -> np.ndarray:
if self._is_scalar_2d(a) and target_shape != (1, 1):
return np.full(target_shape, float(a[0, 0]), dtype=float)
return a.astype(float)
arrays = [expand(a) for a in arrays]
result = arrays[0].copy()
for op, a in zip(self.operations, arrays[1:]):
if op == "*":
result = result * a
else:
result = result / a
return result
result = arrays[0].astype(float)
for a in arrays[1:]:
a = a.astype(float)
if self._is_scalar_2d(result) and not self._is_scalar_2d(a):
result = float(result[0, 0]) * a
continue
if not self._is_scalar_2d(result) and self._is_scalar_2d(a):
result = result * float(a[0, 0])
continue
if self._is_scalar_2d(result) and self._is_scalar_2d(a):
result = np.array([[float(result[0, 0]) * float(a[0, 0])]], dtype=float)
continue
if result.shape[1] != a.shape[0]:
raise ValueError(
f"[{self.name}] Incompatible dimensions for matrix product: "
f"{result.shape} @ {a.shape}."
)
result = result @ a
return result