Source code for ska_tango_base.csp_subelement_state_machine

"""
This module contains specifications of the CSP SubElement Observing state machine.
"""
from transitions import State
from transitions.extensions import LockedMachine as Machine

__all__ = ["CspSubElementObsDeviceStateMachine"]


[docs]class CspSubElementObsDeviceStateMachine(Machine): """ The observation state machine used by a generic CSP Sub-element ObsDevice (derived from SKAObsDevice). """ def __init__(self, callback=None, **extra_kwargs): """ Initialises the model. :param callback: A callback to be called when the state changes :type callback: callable :param extra_kwargs: Additional keywords arguments to pass to super class initialiser (useful for graphing) """ self._callback = callback states = [ "IDLE", "CONFIGURING", "READY", "SCANNING", "ABORTING", "ABORTED", "FAULT", ] transitions = [ { "source": "*", "trigger": "fatal_error", "dest": "FAULT", }, { "source": ["IDLE", "READY"], "trigger": "configure_started", "dest": "CONFIGURING", }, { "source": "CONFIGURING", "trigger": "configure_succeeded", "dest": "READY", }, { "source": "CONFIGURING", "trigger": "configure_failed", "dest": "FAULT", }, { "source": "READY", "trigger": "end_succeeded", "dest": "IDLE", }, { "source": "IDLE", "trigger": "end_succeeded", "dest": "IDLE", }, { "source": "READY", "trigger": "end_failed", "dest": "FAULT", }, { "source": "READY", "trigger": "scan_started", "dest": "SCANNING", }, { "source": "SCANNING", "trigger": "scan_succeeded", "dest": "READY", }, { "source": "SCANNING", "trigger": "scan_failed", "dest": "FAULT", }, { "source": "SCANNING", "trigger": "end_scan_succeeded", "dest": "READY", }, { "source": "SCANNING", "trigger": "end_scan_failed", "dest": "FAULT", }, { "source": [ "CONFIGURING", "READY", "SCANNING", "IDLE", ], "trigger": "abort_started", "dest": "ABORTING", }, { "source": "ABORTING", "trigger": "abort_succeeded", "dest": "ABORTED", }, { "source": "ABORTING", "trigger": "abort_failed", "dest": "FAULT", }, { "source": ["ABORTED", "FAULT"], "trigger": "reset_succeeded", "dest": "IDLE", }, { "source": ["ABORTED", "FAULT"], "trigger": "reset_failed", "dest": "FAULT", }, ] super().__init__( states=states, initial="IDLE", transitions=transitions, after_state_change=self._state_changed, **extra_kwargs, ) self._state_changed() def _state_changed(self): """ State machine callback that is called every time the obs_state changes. Responsible for ensuring that callbacks are called. """ if self._callback is not None: self._callback(self.state)