Source code for pySimBlocks.blocks.operators.discrete_derivator

# ******************************************************************************
#                                  pySimBlocks
#                     Copyright (c) 2026 Université de Lille & INRIA
# ******************************************************************************
#  This program is free software: you can redistribute it and/or modify it
#  under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or (at your
#  option) any later version.
#
#  This program is distributed in the hope that it will be useful, but WITHOUT
#  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
#  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License
#  for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with this program.  If not, see <https://www.gnu.org/licenses/>.
# ******************************************************************************
#  Authors: see Authors.txt
# ******************************************************************************

from __future__ import annotations

import numpy as np
from numpy.typing import ArrayLike

from pySimBlocks.core.block import Block


[docs] class DiscreteDerivator(Block): """Discrete-time differentiator block. Estimates the derivative of the input using a backward finite difference: y[k] = (u[k] - u[k-1]) / dt The output shape is resolved from the first non-scalar input and then frozen. If an ``initial_output`` is provided it immediately fixes the shape. A scalar (1,1) input is broadcast to the frozen shape once it is known. The output is never ``None`` — a zero placeholder is used when no shape information is yet available. Attributes: initial_output: Initial output value, or None if not provided. """ direct_feedthrough = True def __init__( self, name: str, initial_output: ArrayLike | None = None, sample_time: float | None = None, ): """Initialize a DiscreteDerivator block. Args: name: Unique identifier for this block instance. initial_output: Output used at the first execution step. If provided, it also fixes the signal shape permanently. sample_time: Sampling period in seconds, or None to use the global simulation dt. """ super().__init__(name, sample_time) self.inputs["in"] = None self.outputs["out"] = None self.state["u_prev"] = None self.next_state["u_prev"] = None self._resolved_shape: tuple[int, int] | None = None self._first_output = True self._placeholder = np.zeros((1, 1), dtype=float) self._initial_output_raw: np.ndarray | None = None if initial_output is not None: y0 = self._to_2d_array("initial_output", initial_output).astype(float) self._initial_output_raw = y0.copy() self._resolved_shape = y0.shape self.outputs["out"] = y0.copy() else: self.outputs["out"] = self._placeholder.copy() # -------------------------------------------------------------------------- # Public methods # --------------------------------------------------------------------------
[docs] def initialize(self, t0: float) -> None: """Set the previous-input state and prepare the initial output. Args: t0: Initial simulation time in seconds. """ u = self.inputs["in"] if u is None: self.state["u_prev"] = None self.next_state["u_prev"] = None self._first_output = True return u_arr = self._normalize_input(u) self.state["u_prev"] = u_arr.copy() self.next_state["u_prev"] = u_arr.copy() self._first_output = True
[docs] def output_update(self, t: float, dt: float) -> None: """Compute the finite-difference derivative and write it to the output port. At the first call the output is held at ``initial_output`` (or zero if none was provided). Afterwards: y = (u - u_prev) / dt Args: t: Current simulation time in seconds. dt: Current time step in seconds. """ u_arr = self._normalize_input(self.inputs["in"]) if self._first_output: self._first_output = False if self._resolved_shape is not None and self.outputs["out"] is not None: y = np.asarray(self.outputs["out"], dtype=float) if y.shape == (1, 1) and self._resolved_shape != (1, 1): self.outputs["out"] = np.full(self._resolved_shape, float(y[0, 0]), dtype=float) return u_prev = self.state["u_prev"] if u_prev is None: self.outputs["out"] = np.zeros_like(u_arr) return u_prev_arr = np.asarray(u_prev, dtype=float) if self._resolved_shape is not None and u_prev_arr.shape == (1, 1) and self._resolved_shape != (1, 1): u_prev_arr = np.full(self._resolved_shape, float(u_prev_arr[0, 0]), dtype=float) if u_prev_arr.shape != u_arr.shape: raise ValueError( f"[{self.name}] Previous input shape mismatch: u_prev={u_prev_arr.shape}, u={u_arr.shape}." ) self.outputs["out"] = (u_arr - u_prev_arr) / dt
[docs] def state_update(self, t: float, dt: float) -> None: """Store the current input as the previous value for the next step. Args: t: Current simulation time in seconds. dt: Current time step in seconds. """ u_arr = self._normalize_input(self.inputs["in"]) self.next_state["u_prev"] = u_arr.copy()
# -------------------------------------------------------------------------- # Private methods # -------------------------------------------------------------------------- def _maybe_freeze_shape_from(self, u: np.ndarray) -> None: """Freeze the signal shape from the first non-scalar input.""" if u.ndim != 2: raise ValueError( f"[{self.name}] Input 'in' must be a 2D array. Got ndim={u.ndim} with shape {u.shape}." ) if self._resolved_shape is not None: return if u.shape != (1, 1): self._resolved_shape = u.shape y = np.asarray(self.outputs["out"], dtype=float) if y.shape == (1, 1): scalar = float(y[0, 0]) self.outputs["out"] = np.full(self._resolved_shape, scalar, dtype=float) self.state["u_prev"] = u.copy() self.next_state["u_prev"] = u.copy() def _normalize_input(self, u: ArrayLike | None) -> np.ndarray: """Normalize input to 2D, applying shape freezing and scalar broadcasting.""" if u is None: if self._resolved_shape is not None: return np.zeros(self._resolved_shape, dtype=float) return self._placeholder.copy() u_arr = np.asarray(u, dtype=float) if u_arr.ndim != 2: raise ValueError( f"[{self.name}] Input 'in' must be a 2D array. Got ndim={u_arr.ndim} with shape {u_arr.shape}." ) self._maybe_freeze_shape_from(u_arr) if self._resolved_shape is not None: if u_arr.shape == (1, 1) and self._resolved_shape != (1, 1): return np.full(self._resolved_shape, float(u_arr[0, 0]), dtype=float) if u_arr.shape != self._resolved_shape: raise ValueError( f"[{self.name}] Input 'in' shape changed: expected {self._resolved_shape}, got {u_arr.shape}." ) return u_arr