Source code for pySimBlocks.core.task

# ******************************************************************************
#                                  pySimBlocks
#                     Copyright (c) 2026 Université de Lille & INRIA
# ******************************************************************************
#  This program is free software: you can redistribute it and/or modify it
#  under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or (at your
#  option) any later version.
#
#  This program is distributed in the hope that it will be useful, but WITHOUT
#  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
#  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License
#  for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with this program.  If not, see <https://www.gnu.org/licenses/>.
# ******************************************************************************
#  Authors: see Authors.txt
# ******************************************************************************

from typing import List

from pySimBlocks.core.block import Block


[docs] class Task: """A group of blocks sharing the same sample time. Manages the scheduling and execution of output updates, state updates, and state commits for all blocks in the group. Scheduling is tick-based: the task maintains an integer countdown reset to ``period_ticks - 1`` after each activation. This avoids floating-point time comparisons and works correctly with both fixed and external clocks. Attributes: sample_time: Sampling period of this task in seconds. period_ticks: Number of base ticks between two activations. ticks_until_next: Countdown to the next activation. accumulated_dt: Accumulated time since the last activation. output_blocks: Blocks ordered for output computation, filtered from the global output order. state_blocks: Subset of output_blocks that carry internal state. """ def __init__( self, sample_time: float, period_ticks: int, blocks: List[Block], global_output_order: List[Block], ): """Initialize a task. Args: sample_time: Sampling period in seconds. blocks: Set of blocks belonging to this task. global_output_order: Global topological order of all blocks, used to filter and preserve execution order within the task. """ self.sample_time = sample_time self.period_ticks = period_ticks self.ticks_until_next = 0 self.accumulated_dt: float = 0.0 self.output_blocks = [b for b in global_output_order if b in blocks] self.state_blocks = [] # -------------------------------------------------------------------------- # Public methods # --------------------------------------------------------------------------
[docs] def update_state_blocks(self) -> None: """Refresh the list of stateful blocks from output_blocks.""" self.state_blocks = [b for b in self.output_blocks if b.has_state]
[docs] def should_run(self) -> bool: """Return True if the task is due to run at time t. Returns: True if ticks_until_next is zero, indicating that the task should run at the current time step. """ return self.ticks_until_next == 0
[docs] def advance(self) -> None: """Advance the countdown by one tick.""" if self.ticks_until_next == 0: self.ticks_until_next = self.period_ticks - 1 else: self.ticks_until_next -= 1
[docs] def accumulate(self, dt: float) -> None: """Accumulate the time step dt since the last activation. Args: dt: Time step in seconds to accumulate. """ self.accumulated_dt += dt
[docs] def reset_accumulated_dt(self) -> None: """Reset the accumulated time to zero after an activation.""" self.accumulated_dt = 0.0