Source code for hal.helpers.boss

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2021-12-20
# @Filename: boss.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio

from typing import TYPE_CHECKING

from hal import HALCommandType, config
from hal.exceptions import HALError

from . import HALHelper


if TYPE_CHECKING:
    from hal.actor import HALActor


__all__ = ["BOSSHelper"]


[docs]class BOSSHelper(HALHelper): """Control for BOSS spectrograph.""" __readout_pending: bool = False __readout_task: asyncio.Task | None = None name = "boss" def __init__(self, actor: HALActor): super().__init__(actor) @property def readout_pending(self): """True if an exposure readout is pending.""" return ( self.__readout_pending and self.__readout_task and not self.__readout_task.done() )
[docs] def clear_readout(self): """Clears any pending readouts.""" self.__readout_pending = False
[docs] def is_exposing(self): """Returns `True` if the BOSS spectrograph is currently exposing.""" exposure_state = self.actor.models["boss"]["exposureState"] if exposure_state is None or None in exposure_state.value: raise ValueError("Unknown BOSS exposure state.") state = exposure_state.value[0].lower() if state in ["idle", "aborted"]: return False else: return True
[docs] async def expose( self, command: HALCommandType, exp_time: float = 0.0, exp_type: str = "science", readout: bool = True, read_async: bool = False, ): """Exposes BOSS. If ``readout=False``, does not read the exposure.""" if self.readout_pending is not False: raise HALError( "Cannot expose. The camera is exposing or a readout is pending." ) timeout = ( exp_time + config["timeouts"]["expose"] + config["timeouts"]["boss_flushing"] ) command_parts = [f"exposure {exp_type}"] if exp_type != "bias": command_parts.append(f"itime={round(exp_time, 1)}") if readout is False or read_async is True: command_parts.append("noreadout") else: timeout += config["timeouts"]["boss_readout"] command_string = " ".join(command_parts) await self._send_command(command, "boss", command_string, time_limit=timeout) self.__readout_pending = True if readout is True and read_async is True: # We use a _send_command because readout cannot await on itself. self.__readout_task = asyncio.create_task( self._send_command( command, "boss", "exposure readout", time_limit=25.0 + config["timeouts"]["boss_readout"], ) ) return self.__readout_pending = not readout
[docs] async def readout(self, command: HALCommandType): """Reads a pending readout.""" if self.readout_pending is False: raise HALError("No pending readout.") command.debug("Reading pending BOSS exposure.") if self.readout_pending and self.__readout_task: await self.__readout_task else: await self._send_command( command, "boss", "exposure readout", time_limit=25.0 + config["timeouts"]["boss_readout"], ) self.clear_readout()