# ******************************************************************************
# pySimBlocks
# Copyright (c) 2026 Université de Lille & INRIA
# ******************************************************************************
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
# for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# ******************************************************************************
# Authors: see Authors.txt
# ******************************************************************************
from pathlib import Path
from typing import Any, Dict
import numpy as np
from pySimBlocks.core.block_source import BlockSource
[docs]
class FileSource(BlockSource):
"""Source block that plays samples loaded from a file.
Supported file formats: ``.npz``, ``.npy``, and ``.csv``. Each simulation
step emits one row of the loaded data as a column vector. When the end of
the data is reached, the block either restarts from the first sample
(``repeat=True``) or outputs zeros.
Expected data shapes:
- ``.npz`` / ``.npy``: 1D ``(N,)`` treated as ``(N, 1)``, or 2D ``(N, n)``
where N is the number of samples and n the signal dimension. Each step
outputs a ``(n, 1)`` column vector.
- ``.csv``: a single column is selected by ``key``, always producing shape
``(N, 1)``. Output per step is ``(1, 1)``.
Alternatively, when ``use_time=True``, the output is selected by
looking up the closest past timestamp in a time column bundled with
the file, rather than advancing by index.
Attributes:
file_path: Resolved path to the data file as a string.
file_type: Inferred file extension (``"npz"``, ``"npy"``, or ``"csv"``).
key: Array key (NPZ) or column name (CSV) to load. None for NPY files.
repeat: If True, restart from the first sample after the last one.
use_time: If True, select samples by time lookup instead of index.
"""
VALID_FILE_TYPES = {"npz", "npy", "csv"}
def __init__(
self,
name: str,
file_path: str,
key: str | None = None,
repeat: bool = False,
use_time: bool = False,
sample_time: float | None = None,
):
"""Initialize a FileSource block.
Args:
name: Unique identifier for this block instance.
file_path: Path to the data file. Relative paths are resolved
against the project file directory via ``adapt_params``.
key: Array key for NPZ files or column name for CSV files.
Not used for NPY files.
repeat: If True, loop back to the first sample after the last one.
use_time: If True, select samples by nearest past timestamp
instead of advancing by step index. Requires a ``"time"``
key or column in the file.
sample_time: Sampling period in seconds, or None to use the
global simulation dt.
Raises:
ValueError: If the file extension is unsupported, if ``use_time``
is combined with an NPY file or with ``repeat=True``, or if
the loaded data is invalid.
FileNotFoundError: If the file does not exist.
"""
super().__init__(name, sample_time)
self.file_path = str(file_path)
self.file_type = self._infer_file_type(self.file_path)
self.key = key
self.repeat = self._to_bool(repeat, "repeat")
self.use_time = self._to_bool(use_time, "use_time")
if self.use_time and self.file_type == "npy":
raise ValueError(
f"[{self.name}] use_time is supported only for NPZ and CSV inputs."
)
if self.use_time and self.repeat:
raise ValueError(
f"[{self.name}] repeat cannot be used when use_time=True."
)
self._time: np.ndarray | None = None
self._samples = self._load_samples()
self._index = 0
self._output_shape = (self._samples.shape[1], 1)
self.outputs["out"] = np.zeros(self._output_shape, dtype=float)
# --------------------------------------------------------------------------
# Class methods
# --------------------------------------------------------------------------
[docs]
@classmethod
def adapt_params(
cls,
params: Dict[str, Any],
params_dir: Path | None = None,
) -> Dict[str, Any]:
"""Resolve a relative ``file_path`` against the project directory.
Args:
params: Raw parameter dict loaded from the YAML project file.
params_dir: Directory of the project file, for resolving relative
paths. None if not applicable.
Returns:
Parameter dict with ``file_path`` resolved to an absolute path.
"""
adapted = dict(params)
file_path = adapted.get("file_path")
if file_path is None:
return adapted
path = Path(file_path).expanduser()
if not path.is_absolute() and params_dir is not None:
path = (params_dir / path).resolve()
adapted["file_path"] = str(path)
# Backward compatibility with older models that still contain file_type
adapted.pop("file_type", None)
return adapted
# --------------------------------------------------------------------------
# Public methods
# --------------------------------------------------------------------------
[docs]
def initialize(self, t0: float) -> None:
"""Set the output to the first sample (or time-matched sample) at t0.
Args:
t0: Initial simulation time in seconds.
"""
if self.use_time:
self.outputs["out"] = self._current_output_at_time(t0)
else:
self._index = 0
self.outputs["out"] = self._current_output()
[docs]
def output_update(self, t: float, dt: float) -> None:
"""Write the current sample to the output port and advance the index.
Args:
t: Current simulation time in seconds.
dt: Current time step in seconds.
"""
if self.use_time:
self.outputs["out"] = self._current_output_at_time(t)
else:
self.outputs["out"] = self._current_output()
self._index += 1
[docs]
def state_update(self, t: float, dt: float) -> None:
"""No-op: FileSource carries no internal state."""
# --------------------------------------------------------------------------
# Private methods
# --------------------------------------------------------------------------
def _load_samples(self) -> np.ndarray:
"""Load and validate the data array from the configured file."""
path = Path(self.file_path)
if not path.exists():
raise FileNotFoundError(f"[{self.name}] File not found: {path}")
if self.file_type == "npz":
arr, time = self._load_npz(path)
elif self.file_type == "npy":
arr, time = self._load_npy(path)
else:
arr, time = self._load_csv(path)
if arr.ndim == 1:
arr = arr.reshape(-1, 1)
elif arr.ndim != 2:
raise ValueError(
f"[{self.name}] Loaded data must be 1D or 2D. Got shape {arr.shape}."
)
if arr.shape[0] == 0:
raise ValueError(f"[{self.name}] Loaded file contains no samples.")
self._time = time
return arr.astype(float, copy=False)
def _load_npz(self, path: Path) -> tuple[np.ndarray, np.ndarray | None]:
"""Load an array and optional time vector from an NPZ archive."""
with np.load(path) as data:
keys = list(data.files)
if len(keys) == 0:
raise ValueError(f"[{self.name}] NPZ archive contains no arrays.")
selected_key = self.key
if not selected_key:
raise ValueError(
f"[{self.name}] key is mandatory for NPZ input."
)
if selected_key not in data:
raise KeyError(
f"[{self.name}] key '{selected_key}' not found in NPZ. "
f"Available keys: {keys}"
)
arr = np.asarray(data[selected_key], dtype=float)
time = None
if self.use_time:
if "time" not in data:
raise KeyError(
f"[{self.name}] use_time=True requires NPZ key 'time'."
)
time = np.asarray(data["time"], dtype=float).reshape(-1)
self._validate_time(time, arr.shape[0])
return arr, time
def _load_npy(self, path: Path) -> tuple[np.ndarray, np.ndarray | None]:
"""Load an array from a NPY file."""
if self.key not in (None, ""):
raise ValueError(
f"[{self.name}] key is not used for NPY input."
)
return np.asarray(np.load(path), dtype=float), None
def _load_csv(self, path: Path) -> tuple[np.ndarray, np.ndarray | None]:
"""Load a column array and optional time vector from a CSV file."""
if not self.key:
raise ValueError(
f"[{self.name}] key is mandatory for CSV input and must be a column name."
)
arr = np.genfromtxt(path, delimiter=",", names=True, dtype=float)
if arr.size == 0:
raise ValueError(f"[{self.name}] CSV file is empty.")
if arr.dtype.names is None:
raise ValueError(
f"[{self.name}] CSV must contain a header row with column names."
)
if self.key not in arr.dtype.names:
raise KeyError(
f"[{self.name}] column '{self.key}' not found in CSV. "
f"Available columns: {list(arr.dtype.names)}"
)
col = np.asarray(arr[self.key], dtype=float).reshape(-1, 1)
if np.isnan(col).any():
raise ValueError(
f"[{self.name}] CSV column '{self.key}' contains non-numeric or missing values."
)
time = None
if self.use_time:
if "time" not in arr.dtype.names:
raise KeyError(
f"[{self.name}] use_time=True requires CSV column 'time'."
)
time = np.asarray(arr["time"], dtype=float).reshape(-1)
self._validate_time(time, col.shape[0])
return col, time
def _to_bool(self, value: bool | str, name: str) -> bool:
"""Parse a bool or bool-like string into a Python bool."""
if isinstance(value, bool):
return value
if isinstance(value, str):
lowered = value.strip().lower()
if lowered in {"true", "1", "yes"}:
return True
if lowered in {"false", "0", "no"}:
return False
raise ValueError(f"[{self.name}] '{name}' must be a bool.")
def _infer_file_type(self, file_path: str) -> str:
"""Infer and validate the file type from the file extension."""
ext = Path(file_path).suffix.lower().lstrip(".")
if ext not in self.VALID_FILE_TYPES:
raise ValueError(
f"[{self.name}] Unsupported file extension '.{ext}'. "
f"Supported extensions: {sorted(self.VALID_FILE_TYPES)}"
)
return ext
def _current_output(self) -> np.ndarray:
"""Return the sample at the current index, handling repeat and end-of-data."""
n = self._samples.shape[0]
if self._index < n:
idx = self._index
elif self.repeat:
idx = self._index % n
else:
return np.zeros(self._output_shape, dtype=float)
row = self._samples[idx]
return np.asarray(row, dtype=float).reshape(-1, 1)
def _current_output_at_time(self, t: float) -> np.ndarray:
"""Return the sample corresponding to the nearest past timestamp."""
if self._time is None:
raise RuntimeError(
f"[{self.name}] Internal error: use_time=True but time data is missing."
)
idx = int(np.searchsorted(self._time, t, side="right") - 1)
if idx < 0:
idx = 0
row = self._samples[idx]
return np.asarray(row, dtype=float).reshape(-1, 1)
def _validate_time(self, time: np.ndarray, n_samples: int) -> None:
"""Validate that a time vector is 1D, strictly increasing, and matches n_samples."""
if time.ndim != 1:
raise ValueError(f"[{self.name}] time must be a 1D array.")
if time.shape[0] != n_samples:
raise ValueError(
f"[{self.name}] time length ({time.shape[0]}) must match number of samples ({n_samples})."
)
if np.isnan(time).any():
raise ValueError(f"[{self.name}] time contains NaN values.")
if not np.all(np.diff(time) > 0.0):
raise ValueError(
f"[{self.name}] time must be strictly increasing."
)