Source code for pySimBlocks.blocks.operators.demux

# ******************************************************************************
#                                  pySimBlocks
#                     Copyright (c) 2026 Université de Lille & INRIA
# ******************************************************************************
#  This program is free software: you can redistribute it and/or modify it
#  under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or (at your
#  option) any later version.
#
#  This program is distributed in the hope that it will be useful, but WITHOUT
#  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
#  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License
#  for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with this program.  If not, see <https://www.gnu.org/licenses/>.
# ******************************************************************************
#  Authors: see Authors.txt
# ******************************************************************************

from __future__ import annotations

import numpy as np
from numpy.typing import ArrayLike

from pySimBlocks.core.block import Block


[docs] class Demux(Block): """Vector split block (inverse of Mux). Splits one input column vector of length n into p output segments. Segment sizes are distributed as evenly as possible: let q = n // p and m = n % p, then the first m outputs have size q+1 and the remaining p-m outputs have size q. Attributes: num_outputs: Number of output segments to produce. """ direct_feedthrough = True def __init__(self, name: str, num_outputs: int = 2, sample_time: float | None = None): """Initialize a Demux block. Args: name: Unique identifier for this block instance. num_outputs: Number of output segments. Must be >= 1. sample_time: Sampling period in seconds, or None to use the global simulation dt. Raises: ValueError: If ``num_outputs`` is not a positive integer. """ super().__init__(name, sample_time) if not isinstance(num_outputs, int) or num_outputs < 1: raise ValueError(f"[{self.name}] num_outputs must be a positive integer.") self.num_outputs = num_outputs self.inputs["in"] = None for i in range(num_outputs): self.outputs[f"out{i+1}"] = None # -------------------------------------------------------------------------- # Public methods # --------------------------------------------------------------------------
[docs] def initialize(self, t0: float) -> None: """Compute initial outputs, or set zero placeholders if input is unavailable. Args: t0: Initial simulation time in seconds. """ if self.inputs["in"] is None: for i in range(self.num_outputs): self.outputs[f"out{i+1}"] = np.zeros((1, 1), dtype=float) return self._compute_outputs()
[docs] def output_update(self, t: float, dt: float) -> None: """Split the input vector and write the segments to the output ports. Args: t: Current simulation time in seconds. dt: Current time step in seconds. Raises: RuntimeError: If input ``'in'`` is not connected. ValueError: If input is not a column vector or has fewer elements than ``num_outputs``. """ self._compute_outputs()
[docs] def state_update(self, t: float, dt: float) -> None: """No-op: Demux is a stateless block. Args: t: Current simulation time in seconds. dt: Current time step in seconds. """ return
# -------------------------------------------------------------------------- # Private methods # -------------------------------------------------------------------------- def _to_vector(self, value: ArrayLike) -> np.ndarray: """Validate and return the input as a (n,1) column vector.""" arr = np.asarray(value, dtype=float) if arr.ndim != 2 or arr.shape[1] != 1: raise ValueError( f"[{self.name}] Input 'in' must be a column vector (n,1). " f"Got shape {arr.shape}." ) return arr def _compute_outputs(self) -> None: """Split the input vector into output segments.""" u = self.inputs["in"] if u is None: raise RuntimeError(f"[{self.name}] Input 'in' is not connected or not set.") vec = self._to_vector(u) n = vec.shape[0] p = self.num_outputs if p > n: raise ValueError( f"[{self.name}] num_outputs ({p}) must be <= input vector length ({n})." ) q = n // p m = n % p start = 0 for i in range(p): seg_len = q + 1 if i < m else q end = start + seg_len self.outputs[f"out{i+1}"] = vec[start:end].copy() start = end