# ******************************************************************************
# pySimBlocks
# Copyright (c) 2026 Université de Lille & INRIA
# ******************************************************************************
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
# for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# ******************************************************************************
# Authors: see Authors.txt
# ******************************************************************************
from __future__ import annotations
import numpy as np
from numpy.typing import ArrayLike
from pySimBlocks.core.block import Block
[docs]
class Delay(Block):
"""N-step discrete delay block.
Outputs a delayed version of the input signal by a fixed number of discrete
time steps. The output at time k is the input at time k − N:
y[k] = u[k - N]
The buffer shape is inferred from the first non-None input unless an
explicit ``initial_output`` of non-scalar shape is provided. A scalar (1,1)
initial value is broadcast to match the first input. Once the shape is
fixed, any mismatch raises an error.
Attributes:
num_delays: Number of discrete steps N (>= 1).
"""
direct_feedthrough = False
def __init__(
self,
name: str,
num_delays: int = 1,
initial_output: ArrayLike | None = None,
sample_time: float | None = None,
):
"""Initialize a Delay block.
Args:
name: Unique identifier for this block instance.
num_delays: Number of discrete steps to delay the input. Must be
>= 1.
initial_output: Initial value used to fill the delay buffer.
Accepted shapes: scalar, 1D, or 2D. A non-scalar 2D value
fixes the buffer shape immediately.
sample_time: Sampling period in seconds, or None to use the global
simulation dt.
Raises:
ValueError: If ``num_delays`` is not a positive integer.
"""
super().__init__(name, sample_time)
if not isinstance(num_delays, int) or num_delays < 1:
raise ValueError(f"[{self.name}] num_delays must be >= 1.")
self.num_delays = num_delays
self.inputs["in"] = None
self.inputs["reset"] = None
self.outputs["out"] = None
self.state["buffer"] = None
self.next_state["buffer"] = None
self._shape_fixed: bool = False
self._buffer_shape: tuple[int, int] | None = None
self._initial_output = initial_output
init = np.zeros((1, 1), dtype=float)
if initial_output is not None:
arr = self._to_2d_array("initial_output", initial_output)
init = arr.astype(float, copy=False)
if not self._is_scalar_2d(init):
self._shape_fixed = True
self._buffer_shape = init.shape
self.state["buffer"] = [init.copy() for _ in range(self.num_delays)]
self.next_state["buffer"] = None
# --------------------------------------------------------------------------
# Public methods
# --------------------------------------------------------------------------
[docs]
def initialize(self, t0: float) -> None:
"""Set the initial output from the buffer, resolving shape if input is available.
Args:
t0: Initial simulation time in seconds.
Raises:
ValueError: If the initial output shape is inconsistent with the
resolved buffer shape.
"""
out = self.state["buffer"][0]
u = self.inputs["in"]
if u is not None:
u_arr = np.asarray(u, dtype=float)
self._ensure_shape_and_buffer(u_arr)
if self._is_scalar_2d(out) and self._buffer_shape != (1, 1):
out = np.full(self._buffer_shape, float(out[0, 0]), dtype=float)
else:
if out.shape != self._buffer_shape:
raise ValueError(
f"[{self.name}] Initial output shape mismatch: expected {self._buffer_shape}, got {out.shape}."
)
self.outputs["out"] = out
[docs]
def output_update(self, t: float, dt: float) -> None:
"""Output the oldest buffer entry.
Args:
t: Current simulation time in seconds.
dt: Current time step in seconds.
"""
if not self._shape_fixed:
u = self.inputs["in"]
if u is not None:
u_arr = np.asarray(u, dtype=float)
self._ensure_shape_and_buffer(u_arr)
self.outputs["out"] = self.state["buffer"][0].copy()
[docs]
def state_update(self, t: float, dt: float) -> None:
"""Shift the buffer left and append the current input.
Args:
t: Current simulation time in seconds.
dt: Current time step in seconds.
Raises:
RuntimeError: If input ``'in'`` is not connected.
ValueError: If the input is not 2D or its shape is inconsistent
with the buffer.
"""
if self._is_reset_active():
self._apply_reset()
return
u = self.inputs["in"]
if u is None:
raise RuntimeError(f"[{self.name}] Input 'in' is not connected or not set.")
u_arr = np.asarray(u, dtype=float)
self._ensure_shape_and_buffer(u_arr)
buffer = self.state["buffer"]
new_buffer = []
for i in range(self.num_delays - 1):
new_buffer.append(buffer[i + 1].copy())
new_buffer.append(u_arr.copy())
self.next_state["buffer"] = new_buffer
# --------------------------------------------------------------------------
# Private methods
# --------------------------------------------------------------------------
def _ensure_shape_and_buffer(self, u: np.ndarray) -> None:
"""Validate input shape and fix the buffer shape on the first non-None input."""
if u.ndim != 2:
raise ValueError(
f"[{self.name}] Input 'in' must be a 2D array. Got ndim={u.ndim} with shape {u.shape}."
)
buf0 = self.state["buffer"][0]
assert buf0 is not None
if self._shape_fixed:
expected = buf0.shape
if u.shape != expected:
raise ValueError(
f"[{self.name}] Input 'in' shape mismatch: expected {expected}, got {u.shape}."
)
return
target_shape = u.shape
if self._is_scalar_2d(buf0) and target_shape != (1, 1):
scalar = float(buf0[0, 0])
self.state["buffer"] = [
np.full(target_shape, scalar, dtype=float) for _ in range(self.num_delays)
]
buf0 = self.state["buffer"][0]
if buf0.shape != target_shape:
raise ValueError(
f"[{self.name}] Cannot infer a consistent delay shape: "
f"buffer currently {buf0.shape} but first input is {target_shape}."
)
self._shape_fixed = True
self._buffer_shape = target_shape
def _is_reset_active(self) -> bool:
"""Return True if the reset signal is active (truthy scalar)."""
reset_signal = self.inputs.get("reset", None)
if reset_signal is None:
return False
reset_arr = np.asarray(reset_signal)
if reset_arr.ndim == 0:
return bool(reset_arr)
elif reset_arr.ndim == 1 and reset_arr.size == 1:
return bool(reset_arr[0])
elif reset_arr.ndim == 2 and reset_arr.shape == (1, 1):
return bool(reset_arr[0, 0])
else:
raise ValueError(
f"[{self.name}] Reset signal must be a scalar or single-element array. Got shape {reset_arr.shape}."
)
def _apply_reset(self) -> None:
"""Reset the buffer to the initial output or zeros."""
if self._initial_output is not None:
arr = self._to_2d_array("initial_output", self._initial_output)
init = arr.astype(float, copy=False)
if self._shape_fixed and self._buffer_shape is not None:
if self._is_scalar_2d(init) and self._buffer_shape != (1, 1):
scalar = float(init[0, 0])
init = np.full(self._buffer_shape, scalar, dtype=float)
elif self._shape_fixed and self._buffer_shape is not None:
init = np.zeros(self._buffer_shape, dtype=float)
else:
init = np.zeros((1, 1), dtype=float)
self.state["buffer"] = [init.copy() for _ in range(self.num_delays)]
self.next_state["buffer"] = [init.copy() for _ in range(self.num_delays)]