Source code for hal.macros.apogee_dome_flat
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2021-12-16
# @Filename: apogee_dome_flat.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
from __future__ import annotations
import asyncio
from clu.legacy.tron import TronKey
from hal import config
from hal.exceptions import MacroError
from hal.macros import Macro
__all__ = ["APOGEEDomeFlatMacro"]
[docs]
class APOGEEDomeFlatMacro(Macro):
"""Take an APOGEE dome flat after an exposure."""
name = "apogee_dome_flat"
__PRECONDITIONS__ = ["gang_at_cart"]
__STAGES__ = [("ffs", "open_shutter"), "expose"]
__CLEANUP__ = ["cleanup"]
__ffs_initial_state: str | None = None
[docs]
async def gang_at_cart(self):
"""Checks that the gang connected is at the cart."""
if not self.command.actor.helpers.apogee.gang_helper.at_cartridge():
raise MacroError("The APOGEE gang connector is not at the cart.")
return True
[docs]
async def ffs(self):
"""Check the FFS status and closes it."""
assert self.command.actor.helpers.ffs, "FFS helper not available."
if self.command.actor.helpers.ffs.all_closed():
self.command.debug("FFS already closed.")
self.__ffs_initial_state = "closed"
return
elif self.command.actor.helpers.ffs.all_open():
self.__ffs_initial_state = "open"
else:
raise MacroError("Cannot determine state of FFS.")
self.command.info("Closing FFS.")
await self.command.actor.helpers.ffs.close(self.command)
[docs]
async def open_shutter(self):
"""Opens the APOGEE cold shutter."""
apogee = self.command.actor.helpers.apogee
if not await apogee.shutter(self.command, True):
raise MacroError("Failed opening APOGEE shutter.")
[docs]
async def expose(self):
"""Takes the dome flat. Turns on the FFS after the fourth read."""
apogee = self.command.actor.helpers.apogee
self.command.actor.models["apogee"]["utrReadState"].register_callback(
self._flash_lamps
)
self.command.info("Taking APOGEE dome flat.")
await apogee.expose(self.command, 50, exp_type="DomeFlat")
return True
async def _flash_lamps(self, key: TronKey):
"""Flashes the FF lamps."""
if len(key.value) == 0:
return
state = key.value[1]
n_read = int(key.value[2])
if n_read == 3 and state == "Reading":
time_to_flash = 4.0
self.command.info(text="Calling ff_lamp.on")
asyncio.create_task(
self.send_command(
"mcp",
"ff.on",
time_limit=config["timeouts"]["lamps"],
)
)
await asyncio.sleep(time_to_flash)
lamp_off = await self.command.send_command("mcp", "ff.off")
if lamp_off.status.did_fail:
raise MacroError("Failed flashing lamps.")
return
[docs]
async def cleanup(self):
"""Closes the shutter and does cleanup."""
assert self.command.actor.helpers.ffs, "FFS helper not available."
apogee = self.command.actor.helpers.apogee
try:
self.command.actor.models["apogee"]["utrReadState"].remove_callback(
self._flash_lamps
)
except AssertionError:
pass
if not await apogee.shutter(self.command, False):
self.command.error("Failed closing APOGEE shutter.")
if self.__ffs_initial_state == "open":
self.command.info("Reopening FFS.")
await self.command.actor.helpers.ffs.open(self.command)
else:
self.command.info("Keeping FFS closed.")