Source code for pySimBlocks.blocks.observers.luenberger

# ******************************************************************************
#                                  pySimBlocks
#                     Copyright (c) 2026 Université de Lille & INRIA
# ******************************************************************************
#  This program is free software: you can redistribute it and/or modify it
#  under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or (at your
#  option) any later version.
#
#  This program is distributed in the hope that it will be useful, but WITHOUT
#  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
#  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License
#  for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with this program.  If not, see <https://www.gnu.org/licenses/>.
# ******************************************************************************
#  Authors: see Authors.txt
# ******************************************************************************

from __future__ import annotations

import numpy as np
from numpy.typing import ArrayLike

from pySimBlocks.core.block import Block


[docs] class Luenberger(Block): """Discrete-time Luenberger state observer block. Estimates the state of a linear system using the correction law: y_hat[k] = C x_hat[k] x_hat[k+1] = A x_hat[k] + B u[k] + L (y[k] - y_hat[k]) The D matrix is intentionally not supported. Input column-vector shapes are frozen after the first call and must remain constant. Attributes: A: State transition matrix of shape (n, n). B: Input matrix of shape (n, m). C: Output matrix of shape (p, n). L: Observer gain matrix of shape (n, p). """ direct_feedthrough = False def __init__( self, name: str, A: ArrayLike, B: ArrayLike, C: ArrayLike, L: ArrayLike, x0: ArrayLike | None = None, sample_time: float | None = None, ): """Initialize a Luenberger observer block. Args: name: Unique identifier for this block instance. A: State transition matrix, array-like of shape (n, n). B: Input matrix, array-like of shape (n, m). C: Output matrix, array-like of shape (p, n). L: Observer gain matrix, array-like of shape (n, p). x0: Initial state estimate, array-like of shape (n, 1). Defaults to zeros. sample_time: Sampling period in seconds, or None to use the global simulation dt. Raises: ValueError: If any matrix is not 2D, if dimensions are inconsistent, or if x0 does not have shape (n, 1). """ super().__init__(name, sample_time) self.A = np.asarray(A, dtype=float) self.B = np.asarray(B, dtype=float) self.C = np.asarray(C, dtype=float) self.L = np.asarray(L, dtype=float) for M_name, M in (("A", self.A), ("B", self.B), ("C", self.C), ("L", self.L)): if M.ndim != 2: raise ValueError(f"[{self.name}] {M_name} must be a 2D array. Got shape {M.shape}.") n = self.A.shape[0] if self.A.shape[1] != n: raise ValueError(f"[{self.name}] A must be square (n,n). Got {self.A.shape}.") if self.B.shape[0] != n: raise ValueError(f"[{self.name}] B must have n rows to match A. Got B={self.B.shape}, A={self.A.shape}.") if self.C.shape[1] != n: raise ValueError(f"[{self.name}] C must have n columns to match A. Got C={self.C.shape}, A={self.A.shape}.") p = self.C.shape[0] m = self.B.shape[1] if self.L.shape != (n, p): raise ValueError(f"[{self.name}] L must have shape (n,p) = ({n},{p}). Got {self.L.shape}.") self._n = n self._m = m self._p = p if x0 is None: x0_arr = np.zeros((n, 1), dtype=float) else: x0_arr = np.asarray(x0, dtype=float) if x0_arr.ndim != 2 or x0_arr.shape != (n, 1): raise ValueError(f"[{self.name}] x0 must have shape ({n},1). Got {x0_arr.shape}.") self.state["x_hat"] = x0_arr.copy() self.next_state["x_hat"] = x0_arr.copy() self.inputs["u"] = None self.inputs["y"] = None self.outputs["y_hat"] = None self.outputs["x_hat"] = None self._input_shapes = {} # -------------------------------------------------------------------------- # Public methods # --------------------------------------------------------------------------
[docs] def initialize(self, t0: float) -> None: """Set initial outputs from the initial state estimate. Args: t0: Initial simulation time in seconds. """ x_hat = self.state["x_hat"] self.outputs["x_hat"] = x_hat.copy() self.outputs["y_hat"] = self.C @ x_hat self.next_state["x_hat"] = x_hat.copy()
[docs] def output_update(self, t: float, dt: float) -> None: """Compute x_hat and y_hat outputs from the committed state. Args: t: Current simulation time in seconds. dt: Current time step in seconds. """ x_hat = self.state["x_hat"] self.outputs["x_hat"] = x_hat.copy() self.outputs["y_hat"] = self.C @ x_hat
[docs] def state_update(self, t: float, dt: float) -> None: """Update the state estimate using the observer correction law. Args: t: Current simulation time in seconds. dt: Current time step in seconds. Raises: RuntimeError: If inputs ``u`` or ``y`` are not connected. ValueError: If input shapes are incompatible or have changed. """ u = self._require_col_vector("u", self._m) y = self._require_col_vector("y", self._p) x_hat = self.state["x_hat"] y_hat = self.C @ x_hat self.next_state["x_hat"] = self.A @ x_hat + self.B @ u + self.L @ (y - y_hat)
# -------------------------------------------------------------------------- # Private methods # -------------------------------------------------------------------------- def _require_col_vector(self, port: str, expected_rows: int) -> np.ndarray: """Validate and return an input port value as a column vector. Args: port: Name of the input port to read. expected_rows: Expected number of rows in the column vector. Returns: The input value as a 2D (n, 1) float array. Raises: RuntimeError: If the port value is None. ValueError: If the array is not a column vector, has the wrong number of rows, or its shape has changed since the first call. """ val = self.inputs[port] if val is None: raise RuntimeError(f"[{self.name}] Input '{port}' is not connected or not set.") arr = np.asarray(val, dtype=float) if arr.ndim != 2 or arr.shape[1] != 1: raise ValueError(f"[{self.name}] Input '{port}' must be a column vector (n,1). Got {arr.shape}.") if arr.shape[0] != expected_rows: raise ValueError( f"[{self.name}] Input '{port}' has wrong dimension: expected ({expected_rows},1), got {arr.shape}." ) if port not in self._input_shapes: self._input_shapes[port] = arr.shape elif arr.shape != self._input_shapes[port]: raise ValueError( f"[{self.name}] Input '{port}' shape changed: expected {self._input_shapes[port]}, got {arr.shape}." ) return arr