Source code for pySimBlocks.blocks.operators.mux
# ******************************************************************************
# pySimBlocks
# Copyright (c) 2026 Université de Lille & INRIA
# ******************************************************************************
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
# for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# ******************************************************************************
# Authors: see Authors.txt
# ******************************************************************************
from __future__ import annotations
import numpy as np
from numpy.typing import ArrayLike
from pySimBlocks.core.block import Block
[docs]
class Mux(Block):
"""Vertical signal concatenation block.
Concatenates multiple scalar or column-vector inputs vertically into a
single output column vector. Each input must be a scalar, a 1D array, or
a column vector (n,1). Any 2D non-column input (k,m) with m != 1 is
rejected.
Attributes:
num_inputs: Number of input ports to concatenate.
"""
direct_feedthrough = True
def __init__(self, name: str, num_inputs: int = 2, sample_time: float | None = None):
"""Initialize a Mux block.
Args:
name: Unique identifier for this block instance.
num_inputs: Number of input ports to concatenate. Must be >= 1.
sample_time: Sampling period in seconds, or None to use the global
simulation dt.
Raises:
ValueError: If ``num_inputs`` is not a positive integer.
"""
super().__init__(name, sample_time)
if not isinstance(num_inputs, int) or num_inputs < 1:
raise ValueError(f"[{self.name}] num_inputs must be a positive integer.")
self.num_inputs = num_inputs
for i in range(num_inputs):
self.inputs[f"in{i+1}"] = None
self.outputs["out"] = None
# --------------------------------------------------------------------------
# Public methods
# --------------------------------------------------------------------------
[docs]
def initialize(self, t0: float) -> None:
"""Compute the initial output if all inputs are available.
Args:
t0: Initial simulation time in seconds.
"""
for i in range(self.num_inputs):
if self.inputs[f"in{i+1}"] is None:
self.outputs["out"] = None
return
self.outputs["out"] = self._compute_output()
[docs]
def output_update(self, t: float, dt: float) -> None:
"""Concatenate all inputs vertically and write the result to the output port.
Args:
t: Current simulation time in seconds.
dt: Current time step in seconds.
Raises:
RuntimeError: If any input port is not connected.
ValueError: If any input is not a scalar, 1D, or column vector.
"""
self.outputs["out"] = self._compute_output()
[docs]
def state_update(self, t: float, dt: float) -> None:
"""No-op: Mux is a stateless block.
Args:
t: Current simulation time in seconds.
dt: Current time step in seconds.
"""
return
# --------------------------------------------------------------------------
# Private methods
# --------------------------------------------------------------------------
def _to_column_vector(self, input_name: str, value: ArrayLike) -> np.ndarray:
"""Convert a scalar, 1D array, or column vector to a (n,1) array."""
arr = np.asarray(value, dtype=float)
if arr.ndim == 0:
return arr.reshape(1, 1)
if arr.ndim == 1:
return arr.reshape(-1, 1)
if arr.ndim == 2:
if arr.shape[1] != 1:
raise ValueError(
f"[{self.name}] Input '{input_name}' must be a column vector (n,1). "
f"Got shape {arr.shape}."
)
return arr
raise ValueError(
f"[{self.name}] Input '{input_name}' must be scalar, 1D, or a column vector (n,1). "
f"Got ndim={arr.ndim} with shape {arr.shape}."
)
def _compute_output(self) -> np.ndarray:
"""Collect and concatenate all input column vectors."""
vectors = []
for i in range(self.num_inputs):
key = f"in{i+1}"
u = self.inputs[key]
if u is None:
raise RuntimeError(f"[{self.name}] Input '{key}' is not connected or not set.")
vectors.append(self._to_column_vector(key, u))
return np.vstack(vectors)