#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2021-12-16
# @Filename: apogee.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
from __future__ import annotations
import asyncio
import enum
from typing import TYPE_CHECKING
from sdsstools.utils import cancel_task
from hal import config
from hal.exceptions import HALError
from . import SpectrographHelper
if TYPE_CHECKING:
from hal.actor import HALActor, HALCommandType
__all__ = ["APOGEEHelper"]
[docs]
class APOGEEHelper(SpectrographHelper):
"""APOGEE instrument helper."""
name = "apogee"
def __init__(self, actor: HALActor):
super().__init__(actor)
self.gang_helper = APOGEEGangHelper(actor)
[docs]
async def shutter(
self,
command: HALCommandType,
open: bool = True,
shutter: str = "apogee",
force: bool = False,
):
"""Opens/closes the shutter.
Parameters
----------
command
The command instance to use to command the shutter.
open
If `True`, opens the shutter, otherwise closes it.
shutter
The shutter to query. Can be ``apogee``, ``calbox``, or ``fpi``.
force
If `True`, sends the command to the shutter even if it reports to
already be in that position.
Returns
-------
shutter_command
The command sent to the shutter after been awaited for completion or
`None` if the shutter is already at that position.
"""
position = "open" if open is True else "close"
if force is False:
current_position = self.get_shutter_position(shutter)
if current_position is None and current_position == open:
return None
if shutter == "apogee":
if self.actor.observatory != "APO":
return True
shutter_command = await self._send_command(
command,
"apogee",
f"shutter {position}",
time_limit=config["timeouts"]["apogee_shutter"],
)
elif shutter == "fpi":
shutter_command = await self._send_command(
command,
"apogeefpi",
position,
time_limit=config["timeouts"]["apogee_shutter"],
)
elif shutter == "calbox":
shutter_command = await self._send_command(
command,
"apogeecal",
"shutterOpen" if open else "shutterClose",
time_limit=config["timeouts"]["apogee_shutter"],
)
else:
raise ValueError(f"Invalid shutter {shutter}.")
return shutter_command
[docs]
def get_shutter_position(self, shutter: str = "apogee") -> bool | None:
"""Returns the shutter status.
Parameters
----------
shutter
The shutter to query. Can be ``apogee``, ``calbox``, or ``fpi``.
Returns
-------
shutter_status
`True` if the shutter is open, `False` if closed, `None` if unknown.
"""
shutter = shutter.lower()
if shutter == "apogee":
limit_switch = self.actor.models["apogee"]["shutterLimitSwitch"]
if limit_switch is None or None in limit_switch.value:
return None
if limit_switch.value[0] is False and limit_switch.value[1] is True:
return False
elif limit_switch.value[1] is False and limit_switch.value[0] is True:
return True
else:
return None
elif shutter == "fpi":
shutter_position = self.actor.models["apogeefpi"]["shutter_position"]
if (
shutter_position is None
or shutter_position.value[0] is None
or shutter_position.value[0] == "?"
):
return None
elif shutter_position.value[0].lower() == "closed":
return False
elif shutter_position.value[0].lower() == "open":
return True
else:
return None
elif shutter == "calbox":
shutter_position = self.actor.models["apogeecal"]["calShutter"]
if (
shutter_position is None
or shutter_position.value[0] is None
or shutter_position.value[0] == "?"
):
return None
else:
return shutter_position.value[0]
else:
raise ValueError(f"Invalid shutter {shutter}.")
[docs]
def get_dither_position(self) -> str | None:
"""Returns the dither position or `None` if unknown."""
position = self.actor.models["apogee"]["ditherPosition"]
if position is None or None in position.value:
return None
return position.value[1]
[docs]
async def set_dither_position(
self,
command: HALCommandType,
position: str,
force: bool = False,
):
"""Sets the dither mechanism to the commanded position."""
position = position.upper()
if position not in ["A", "B"]:
raise HALError(f"Invalid dither position {position}.")
current_position = self.get_dither_position()
if current_position is None:
command.warning("Current dither position is unknown.")
if current_position == position and force is False:
return None
dither_command = await self._send_command(
command,
"apogee",
f"dither namedpos={position}",
time_limit=config["timeouts"]["apogee_dither"],
)
return dither_command
[docs]
def is_exposing(self):
"""Returns `True` if APOGEE is exposing or stopping."""
state = self.get_exposure_state()
if state in ["exposing", "stopping"]:
return True
else:
return False
def get_exposure_state(self) -> str | None:
exposure_state = self.actor.models["apogee"]["exposureState"]
if exposure_state.value is None or None in exposure_state.value:
raise ValueError("Unknown APOGEE exposure state.")
return exposure_state.value[0].lower()
[docs]
async def expose(
self,
command: HALCommandType,
exp_time: float,
exp_type: str = "object",
dither_position: str | None = None,
):
"""Exposes APOGEE.
Parameters
----------
command
The command used to interact with the APOGEE actor.
exp_time
The exposure time.
exp_type
The exposure type. Valid values are ``object``, ``dark``, ``flat``,
and ``DomeFlat``.
dither_position
The dither position. If `None`, uses the current position.
"""
if exp_type.lower() not in ["object", "dark", "flat", "domeflat"]:
raise HALError(f"Invalid exposure type {exp_type}.")
if dither_position:
await self.set_dither_position(command, dither_position)
expose_command = await self._send_command(
command,
"apogee",
f"expose time={exp_time:.1f} object={exp_type.lower()}",
time_limit=exp_time + config["timeouts"]["expose"],
)
return expose_command
[docs]
async def expose_dither_pair(
self,
command: HALCommandType,
exp_time: float,
dither_sequence: str | None = None,
exp_type: str = "object",
):
"""Takes an APOGEE dither set.
Parameters
----------
command
The command used to interact with the APOGEE actor.
exp_time
The exposure time for each exposure in the dither sequence.
exp_type
The exposure type. Valid values are ``object``, ``dark``, and ``flat``.
dither_sequence
The dither sequence. If `None` the first dither will be taken at the
current position and the mechanism will be switched after it.
Alternatively, a string ``"AB"``, ``"BA"``, etc.
"""
if dither_sequence is None:
current = self.get_dither_position()
if current is None:
raise HALError("Cannot determine current APOGEE dither position.")
dither_sequence = current.upper()
dither_sequence = "AB" if dither_sequence == "A" else "BA"
else:
dither_sequence = dither_sequence.upper()
if dither_sequence not in ["AB", "BA", "AA", "BB"]:
raise HALError(f"Invalid dither sequence {dither_sequence}.")
self._exposure_time_remaining = exp_time * len(dither_sequence)
self._exposure_time_remaining_timer = asyncio.create_task(self._timer())
for dither_position in dither_sequence:
await self.expose(
command,
exp_time,
exp_type=exp_type,
dither_position=dither_position,
)
await cancel_task(self._exposure_time_remaining_timer)
[docs]
async def abort(self, command: HALCommandType):
"""Aborts the ongoing exposure."""
if not self.is_exposing():
return True
await self._send_command(command, "apogee", "expose stop", time_limit=60)
await self.shutter(command, open=False, shutter="apogee")
return True
class APOGEEGangHelper:
"""Helper for the APOGEE gang connector."""
def __init__(self, actor: HALActor):
self.actor = actor
self.flag: APOGEEGang = APOGEEGang.UNKNWON
if self.actor.observatory == "APO":
actor.models["mcp"]["apogeeGang"].register_callback(self._update_flag)
async def _update_flag(self, value: list):
"""Callback to update the gang connector flag."""
value = value or [0]
self.flag = APOGEEGang(int(value[0]))
def get_position(self) -> APOGEEGang:
"""Return the position of the gang connector."""
return self.flag
def at_podium(self):
"""Return True if the gang connector is on the podium."""
ok = (self.get_position() & APOGEEGang.AT_PODIUM).value > 0
return ok
def at_cartridge(self):
"""Returns True if the gang connector is at the cartridge."""
pos = self.get_position()
ok = (pos == APOGEEGang.DISCONNECTED_FPI) or (pos == APOGEEGang.AT_FPS_FPI)
return ok
class APOGEEGang(enum.Flag):
"""Flags for the ``mcp.apogeeGang`` keyword."""
UNKNWON = 0
DISCONNECTED = 1
AT_CART = 2
AT_PODIUM = 4
PODIUM_DENSE = 12
DISCONNECTED_FPI = 17
AT_FPS_FPI = 18
AT_PODIUM_SPARSE = 20
AT_PODIUM_DENSE_FPI = 28
AT_PODIUM_ONEM = 36
AT_PODIUM_ONEM_FPI = 52