Source code for hal.macros.expose

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2022-01-13
# @Filename: expose.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)


from __future__ import annotations

import asyncio
from time import time

import numpy

from hal import config
from hal.exceptions import MacroError
from hal.macros import Macro


__all__ = ["ExposeMacro"]


[docs]class ExposeMacro(Macro): """Takes a science exposure with APOGEE and/or BOSS.""" name = "expose" __PRECONDITIONS__ = ["prepare"] __STAGES__ = [("expose_boss", "expose_apogee")] __CLEANUP__ = ["cleanup"] _state_apogee: dict = { "current": 0, "n": 0, "pairs": True, "dither": "A", "etr": 0.0, "total_time": 0.0, "timestamp": 0, } _state_boss: dict = { "current": 0, "n": 0, "etr": 0.0, "total_time": 0.0, "timestamp": 0, } def _reset_internal(self, **opts): """Reset the exposure status.""" self._state_apogee = ExposeMacro._state_apogee.copy() self._state_boss = ExposeMacro._state_boss.copy()
[docs] async def prepare(self): """Prepare for exposures and run checks.""" do_apogee = "expose_apogee" in self._flat_stages do_boss = "expose_boss" in self._flat_stages # First check if we are exposing and if we are fail before doing anything else. if do_apogee and self.helpers.apogee.is_exposing(): raise MacroError("APOGEE is already exposing.") if do_boss and self.helpers.boss.is_exposing(): raise MacroError("BOSS is already exposing.") if do_apogee: if not self.command.actor.helpers.apogee.gang_helper.at_cartridge(): raise MacroError("The APOGEE gang connector is not at the cart.") # Check lamps. They must be turned off manually (but maybe add a parameter?) lamp_status = [lamp[0] for lamp in self.helpers.lamps.list_status().values()] if any(lamp_status): raise MacroError("Some lamps are on.") # Concurrent tasks to run. tasks = [self.helpers.ffs.open(self.command)] if do_apogee: initial_dither = self.config["initial_apogee_dither"] if initial_dither: tasks.append( self.helpers.apogee.set_dither_position( self.command, initial_dither, ) ) tasks.append( self.helpers.apogee.shutter( self.command, open=True, shutter="apogee", ) ) tasks.append( self.helpers.apogee.shutter( self.command, open=self.config["with_fpi"], shutter="fpi", ) ) await asyncio.gather(*tasks) self._state_apogee["dither"] = self.helpers.apogee.get_dither_position() self.command.info(exposure_state_apogee=list(self._state_apogee.values())) self.command.info(exposure_state_boss=list(self._state_boss.values()))
[docs] async def expose_boss(self): """Exposes BOSS.""" if self.config["boss"] is False: return count: int = self.config["count_boss"] or self.config["count"] assert count, "Invalid number of exposures." self._state_boss["n"] = count exp_time: float = self.config["boss_exposure_time"] assert exp_time, "Invalid exposure time." # Just in case there's a readout pending from some previous error. Although # maybe it's better to have to manually force this. self.helpers.boss.clear_readout() etr_one = ( config["durations"]["boss_flushing"] + exp_time + config["durations"]["boss_readout"] ) self._state_boss["total_time"] = count * etr_one for n_exp in range(count): self._state_boss["current"] = n_exp + 1 self._state_boss["etr"] = (count - n_exp) * etr_one # Timestamp self._state_boss["timestamp"] = time() self.command.info(exposure_state_boss=list(self._state_boss.values())) await self.helpers.boss.expose(self.command, exp_time)
[docs] async def expose_apogee(self): """Exposes APOGEE.""" if self.config["apogee"] is False: return count: int = self.config["count_apogee"] or self.config["count"] assert count, "Invalid number of exposures." # For APOGEE we report the number of dithers. pairs = self.config["pairs"] if pairs: count *= 2 self._state_apogee["n"] = count self._state_apogee["pairs"] = pairs boss_exp_time: float = self.config["boss_exposure_time"] boss_flushing: float = config["durations"]["boss_flushing"] boss_readout: float = config["durations"]["boss_readout"] # If the exposure time for APOGEE is not set, use the one for BOSS # but divide by two if we are doing dither pairs. Otherwise just use # apogee_exposure_time is always per exposure (rewardless of whether we # are doing dither pairs or single exposures). apogee_exp_time: float | None = self.config["apogee_exposure_time"] if apogee_exp_time is None: readout_match = True apogee_exp_time = boss_exp_time + boss_flushing + boss_readout if pairs: apogee_exp_time /= 2.0 else: readout_match = False # Initially we assume all the exposures have the same exposure time. exposure_times = [apogee_exp_time] * count # We want the final exposure or dither pair to finish as the BOSS readout # begins. This allows us to do something during readout like slewing or # folding the FPS. We calculate an exposure time for the last exposure/pair # by removing the BOSS readout time. if "expose_boss" in self._flat_stages and readout_match is True: if pairs: last_exp_time = apogee_exp_time - boss_readout / 2.0 exposure_times[-2:] = [last_exp_time, last_exp_time] else: last_exp_time = apogee_exp_time - boss_readout exposure_times[-1] = last_exp_time exposure_times = numpy.ceil(exposure_times) self._state_apogee["total_time"] = sum(exposure_times) # Set the first dither and determine the dither sequence. if self.config["initial_apogee_dither"]: await self.helpers.apogee.set_dither_position( self.command, self.config["initial_apogee_dither"], ) current_dither_position = self.helpers.apogee.get_dither_position() if current_dither_position is None: raise MacroError("Invalid current dither position.") if self.config["disable_dithering"]: # If disable_dithering, just keep using the current dither position. dither_sequence = current_dither_position * count else: # If we are dithering, the sequence starts on the current position # and changes every two dithers to minimise how much we move the # mechanism, e.g., ABBAABBA. dither_sequence = current_dither_position for i in range(1, count): if i % 2 != 0: if dither_sequence[-1] == "A": dither_sequence += "B" else: dither_sequence += "A" else: dither_sequence += dither_sequence[-1] for n_exp in range(count): self._state_apogee["current"] = n_exp + 1 etr = sum(exposure_times[n_exp:]) self._state_apogee["etr"] = etr new_dither_position = dither_sequence[n_exp] self._state_apogee["dither"] = new_dither_position # Timestamp self._state_apogee["timestamp"] = time() self.command.info(exposure_state_apogee=list(self._state_apogee.values())) await self.helpers.apogee.expose( self.command, exposure_times[n_exp], exp_type="object", dither_position=new_dither_position, )
[docs] async def cleanup(self): """Cancel any running exposures.""" if not self.failed: return # Wait a bit to give APOGEE or BOSS time to update exposureStatus if the # macro failed very quickly. await asyncio.sleep(5) if self.helpers.apogee.is_exposing(): self.command.warning("APOGEE exposure is running. Not cancelling it.") if self.helpers.boss.is_exposing(): self.command.warning("BOSS exposure is running. Not cancelling it.") # Close the APOGEE cold shutter. if "expose_apogee" in self._flat_stages: await self.helpers.apogee.shutter( self.command, open=False, shutter="apogee", )