#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2022-01-13
# @Filename: expose.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
from __future__ import annotations
import asyncio
import os
from contextlib import suppress
from dataclasses import dataclass
from time import time
from typing import TYPE_CHECKING
import numpy
from hal import config
from hal.exceptions import MacroError
from hal.macros import Macro
if TYPE_CHECKING:
from hal.helpers.jaeger import Configuration
__all__ = ["ExposeMacro"]
@dataclass
class ExposeParameters:
"""Expose macro parameters."""
boss_exptime: float | None = config["macros"]["expose"]["fallback"]["exptime"]["default"] # fmt: skip # noqa
apogee_exptime: float | None = None
count_apogee: int | None = 1
count_boss: int | None = 1
pairs: bool = True
dither: bool = True
initial_apogee_dither: str = "A"
readout_matching: bool = True
@dataclass
class BossExposure:
"""Parameters for a BOSS exposure."""
n: int
exptime: float
actual_exptime: float
read_sync: bool = True
done: bool = False
@dataclass
class ApogeeExposure:
"""Parameters for an APOGEE exposure."""
n: int
exptime: float
dither_position: str
done: bool = False
class ExposeHelper:
"""Track exposure status, add/remove exposures, etc."""
def __init__(self, macro: ExposeMacro, **opts):
self.macro = macro
self.observatory = os.environ.get("OBSERVATORY", "APO").upper()
# Currently running exposure.
self.n_apogee = 0
self.n_boss = 0
# Information about each exposure to take.
self.apogee_exps: list[ApogeeExposure] = []
self.boss_exps: list[BossExposure] = []
self._apogee_exp_start_time: float = 0
self._boss_exp_start_time: float = 0
self.etr: float = 0.0
self.interval: float = 10
self._monitor_task: asyncio.Task | None = None
self.params: ExposeParameters = ExposeParameters()
self.update_params(**opts)
self.refresh()
def update_params(self, **opts):
"""Update parameters from the macro config."""
# Exclude options that are not in the dataclass.
valid_opts = {
opt: opts[opt]
for opt in opts
if opt in ExposeParameters.__dataclass_fields__
}
if self.running:
valid_opts["initial_apogee_dither"] = self.params.initial_apogee_dither
self.params.__dict__.update(**valid_opts)
if "expose_boss" not in self.macro.flat_stages:
self.params.count_boss = None
self.params.readout_matching = False
elif "expose_apogee" not in self.macro.flat_stages:
self.params.count_apogee = None
return self.params
async def start(self):
"""Indicate that exposures are starting. Output states on a timer."""
self._monitor_task = asyncio.create_task(self._monitor())
async def stop(self):
"""Stops the monitoring task."""
if self._monitor_task:
self._monitor_task.cancel()
with suppress(asyncio.CancelledError):
await self._monitor_task
self._monitor_task = None
async def _monitor(self):
"""Outputs states at intervals."""
while True:
self.update_status()
await asyncio.sleep(self.interval)
@property
def running(self):
"""Returns whether the exposures are running."""
n_apogee = len(self.apogee_exps)
n_boss = len(self.boss_exps)
apogee_running = self.n_apogee > 0 and self.n_apogee < n_apogee
boss_running = self.n_boss > 0 and self.n_boss < n_boss
return apogee_running or boss_running
def refresh(self):
"""Refreshes the list of exposures."""
self._refresh_boss()
self._refresh_apogee()
self.update_status()
def _refresh_boss(self):
"""Refreshes the list of BOSS exposures."""
count = self.params.count_boss
exptime = self.params.boss_exptime
if exptime is None or count is None or count == 0:
return
# We don't allow to reduce the number of exposures
# below the one already being taken.
if count < self.n_boss:
count = self.params.count_boss = self.n_boss
flushing = config["durations"]["boss"][self.observatory]["flushing"]
readout = config["durations"]["boss"][self.observatory]["readout"]
if count < len(self.boss_exps):
# Pop any exposures beyond the count number. Count cannot be
# smaller than the current exposure.
while len(self.boss_exps) > count and len(self.boss_exps) >= self.n_boss:
self.boss_exps.pop()
# Replace/append exposures, but only for those that have not been executed
# or are not being executed.
# Minimum exposure index that we can modify.
min_exp_idx = 0 if self.n_boss == 0 else self.n_boss
for ii in range(min_exp_idx, count):
# Set actual_exptime to zero for now. We'll set it next.
new = BossExposure(n=ii + 1, exptime=exptime, actual_exptime=0.0)
# Replace existing exposures, or append if extending.
if len(self.boss_exps) >= ii + 1:
self.boss_exps[ii] = new
else:
self.boss_exps.append(new)
# Reset the actual time an exposure will take. This is fine to do
# at any point (I think). For the last exposure, we do not read the
# exposure synchronously, and we assume the actual exposure time does
# not include readout.
for nexp, exp in enumerate(self.boss_exps):
exp.actual_exptime = exp.exptime + flushing + readout
exp.read_sync = True
if nexp == len(self.boss_exps) - 1:
exp.actual_exptime -= readout
exp.read_sync = False
def _refresh_apogee(self):
"""Refreshes the list of APOGEE exposures."""
initial_dither = self.params.initial_apogee_dither
pairs = self.params.pairs
n_exp = self.params.count_apogee
if n_exp is None or n_exp == 0:
return
# We consider n_exp the number of actual exposures, not dither pairs.
if pairs:
n_exp *= 2
# We don't allow to reduce the number of exposures below the one
# already being taken. If doing dither pairs, we require completing them.
if n_exp < self.n_apogee:
if pairs:
n_exp = self.n_apogee + self.n_apogee % 2
else:
n_exp = self.n_apogee
exposure_times: list[float] = []
if self.params.readout_matching:
# We are matching exposure times. We use the BOSS exposure time as
# reference to determine the full APOGEE exposure time.
boss_exptime = self.params.boss_exptime
flushing: float = config["durations"]["boss"][self.observatory]["flushing"]
readout: float = config["durations"]["boss"][self.observatory]["readout"]
if boss_exptime is None or boss_exptime == 0:
return
apogee_exptime = boss_exptime + flushing + readout
# If we are doing pairs, the full exposure time corresponds to a pair.
if pairs:
apogee_exptime /= 2.0
exposure_times = [apogee_exptime] * n_exp
# The last exposure (or two exposures if doing pairs) must be one BOSS
# readout shorter. If pairs, we distribute that between the two dither
# positions.
if pairs:
last_exptime = apogee_exptime - readout / 2.0
exposure_times[-2:] = [last_exptime, last_exptime]
else:
exposure_times[-1] = apogee_exptime - readout
else:
# We are not matching readout times. Just exposure as many APOGEE
# exposures/pairs as needed.
apogee_exptime = self.params.apogee_exptime
if apogee_exptime is None or apogee_exptime == 0:
return
exposure_times = [apogee_exptime] * n_exp
# Round up exposure times.
exposure_times = list(numpy.ceil(exposure_times))
dither_sequence = ""
if self.params.dither is False:
# If disable_dithering, just keep using the current dither position.
dither_sequence = initial_dither * n_exp
else:
# If we are dithering, the sequence starts on the current position
# and changes every two dithers to minimise how much we move the
# mechanism, e.g., ABBAABBA.
current_dither_position = initial_dither
dither_sequence = current_dither_position
for i in range(1, n_exp):
if i % 2 != 0:
if dither_sequence[-1] == "A":
dither_sequence += "B"
else:
dither_sequence += "A"
else:
dither_sequence += dither_sequence[-1]
if n_exp < len(self.apogee_exps):
n_apogee = self.n_apogee
# Pop any exposures beyond n_exp. n_exp cannot be
# smaller than the current exposure.
while len(self.apogee_exps) > n_exp and len(self.apogee_exps) >= n_apogee:
self.apogee_exps.pop()
# Replace/append exposures, but only for those that have not been executed
# or are not being executed.
# Minimum exposure index that we can modify. Preserve full dither sets.
if pairs is False:
min_exp_idx = 0 if self.n_apogee == 0 else self.n_apogee
else:
min_exp_idx = 0 if self.n_apogee == 0 else self.n_apogee + self.n_apogee % 2
for ii in range(min_exp_idx, n_exp):
new = ApogeeExposure(
n=ii + 1,
exptime=exposure_times[ii],
dither_position=dither_sequence[ii],
)
# Replace existing exposures, or append if extending.
if len(self.apogee_exps) >= ii + 1:
self.apogee_exps[ii] = new
else:
self.apogee_exps.append(new)
def yield_apogee(self):
"""Returns an iterator of APOGEE exposures."""
while self.n_apogee < len(self.apogee_exps):
self.n_apogee += 1
self._apogee_exp_start_time = time()
self.update_status("apogee")
yield self.apogee_exps[self.n_apogee - 1]
self.update_status("apogee")
yield None
def yield_boss(self):
"""Returns an iterator of BOSS exposures."""
while self.n_boss < len(self.boss_exps):
self.n_boss += 1
self._boss_exp_start_time = time()
self.update_status("boss")
yield self.boss_exps[self.n_boss - 1]
self.update_status("boss")
yield None
def update_status(self, instrument: str | None = None):
"""Emits the exposure status keywords."""
# Incomplete state. Just filling out the easy bits.
state_apogee: dict = {
"current": self.n_apogee,
"n": len(self.apogee_exps),
"pairs": self.params.pairs,
"dither": "A",
"etr": 0.0,
"total_time": 0.0,
"timestamp": 0,
}
state_boss: dict = {
"current": self.n_boss,
"n": len(self.boss_exps),
"etr": 0.0,
"total_time": 0.0,
"timestamp": 0,
}
if not hasattr(self.macro, "command"):
return
if (instrument is None or instrument == "apogee") and len(self.apogee_exps) > 0:
exps = self.apogee_exps
this_exp = exps[self.n_apogee - 1] if self.n_apogee > 0 else None
if self.n_apogee == 0:
state_apogee["dither"] = exps[0].dither_position
else:
state_apogee["dither"] = exps[self.n_apogee - 1].dither_position
state_apogee["total_time"] = int(round(sum([exp.exptime for exp in exps])))
state_apogee["timestamp"] = round(time(), 1)
n_completed = 0 if self.n_apogee == 0 else self.n_apogee - 1
if this_exp and this_exp.done:
n_completed += 1
etr = state_apogee["total_time"]
etr -= sum([exps[ii].exptime for ii in range(0, n_completed)])
if self._apogee_exp_start_time > 0 and this_exp and not this_exp.done:
exp_elapsed = time() - self._apogee_exp_start_time
etr -= exp_elapsed
if etr < 0:
etr = 0
state_apogee["etr"] = int(round(etr))
self.macro.command.debug(exposure_state_apogee=list(state_apogee.values()))
if (instrument is None or instrument == "boss") and len(self.boss_exps) > 0:
exps = self.boss_exps
this_exp = exps[self.n_boss - 1] if self.n_boss > 0 else None
total_time = sum([exp.actual_exptime for exp in exps])
state_boss["total_time"] = int(round(total_time))
state_boss["timestamp"] = round(time(), 1)
n_completed = 0 if self.n_boss == 0 else self.n_boss - 1
if this_exp and this_exp.done:
n_completed += 1
etr = state_boss["total_time"]
etr -= sum([exps[ii].actual_exptime for ii in range(0, n_completed)])
if self._boss_exp_start_time > 0 and this_exp and not this_exp.done:
exp_elapsed = time() - self._boss_exp_start_time
etr -= exp_elapsed
if etr < 0:
etr = 0
state_boss["etr"] = int(round(etr))
self.macro.command.debug(exposure_state_boss=list(state_boss.values()))
self.etr = max(state_apogee["etr"], state_boss["etr"])
[docs]
class ExposeMacro(Macro):
"""Takes a science exposure with APOGEE and/or BOSS."""
name = "expose"
__PRECONDITIONS__ = ["prepare"]
__STAGES__ = [("expose_boss", "expose_apogee")]
__CLEANUP__ = ["cleanup"]
expose_helper: ExposeHelper
_pause_event = asyncio.Event()
def __init__(self):
super().__init__()
self.configuration: Configuration | None = None
def _reset_internal(self, **opts):
"""Reset the exposure status."""
self.expose_helper = ExposeHelper(self, **opts)
# An event blocker for when we wait to pause the execution of the macro.
if not self._pause_event.is_set():
self._pause_event.set()
[docs]
async def prepare(self):
"""Prepare for exposures and run checks."""
self.command.debug(expose_is_paused=False)
do_apogee = "expose_apogee" in self.flat_stages
do_boss = "expose_boss" in self.flat_stages
# First check if we are exposing and if we are fail before doing anything else.
if do_apogee and self.helpers.apogee.is_exposing():
raise MacroError("APOGEE is already exposing.")
if do_boss and self.helpers.boss.is_exposing():
raise MacroError("BOSS is already exposing.")
if do_apogee and self.command.actor.observatory != "LCO":
if not self.command.actor.helpers.apogee.gang_helper.at_cartridge():
raise MacroError("The APOGEE gang connector is not at the cart.")
# Check that IEB FBI are off.
try:
cmd = await self.send_command("jaeger", "ieb status")
fbi_led1, fbi_led2, *_ = cmd.replies.get("fbi_led")
except (MacroError, KeyError):
fbi_led1 = fbi_led2 = 0.0
self.command.warning("Failed getting FBI levels but moving on.")
if float(fbi_led1) > 0.1 or float(fbi_led2) > 0.1:
raise MacroError("FBI LEDs are not off.")
# Check lamps. They must be turned off manually (but maybe add a parameter?)
if self.command.actor.observatory == "APO":
lamp_st = [lamp[0] for lamp in self.helpers.lamps.list_status().values()]
if any(lamp_st):
raise MacroError("Some lamps are on.")
else:
self.command.warning("Skipping lamps check for now.")
# Concurrent tasks to run.
tasks = []
if self.command.actor.observatory == "APO":
assert self.helpers.ffs
tasks.append(self.helpers.ffs.open(self.command))
if do_apogee:
initial_dither = self.config["initial_apogee_dither"]
if initial_dither:
tasks.append(
self.helpers.apogee.set_dither_position(
self.command,
initial_dither,
)
)
tasks.append(
self.helpers.apogee.shutter(
self.command,
open=True,
shutter="apogee",
)
)
if self.config["with_fpi"]:
tasks.append(
self.helpers.apogee.shutter(
self.command,
open=True,
shutter="fpi",
)
)
await asyncio.gather(*tasks)
# If we are about to expose that means the goto-field is done, so let's mark it.
# This is useful in cases where the goto failed and it was manually completed.
if self.helpers.jaeger.configuration:
self.helpers.jaeger.configuration.goto_complete = True
# Tell the helper that we're about to start exposing.
await self.expose_helper.start()
[docs]
async def expose_boss(self):
"""Exposes BOSS."""
# Just in case there's a readout pending from some previous error. Although
# maybe it's better to have to manually force this. Note that this does not
# abort or clear an exposure at the ICC level, just the internal tracking
# in HAL.
self.helpers.boss.clear_readout()
await self._pause_event.wait()
for exposure_info in self.expose_helper.yield_boss():
if exposure_info is None:
break
await self.helpers.boss.expose(
self.command,
exposure_info.exptime,
read_async=True,
)
# Except for the final exposure, we wait until the readout is complete.
if exposure_info.read_sync:
await self.helpers.boss.readout(self.command)
exposure_info.done = True
# We wait at the end of the loop so that if a new exposure is
# added while we are paused, the next iteration of the loop
# will yield it.
await self._pause_event.wait()
# If we have added exposures while paused and the expose, then
# this exposure readout will have become sync so we give it another
# try.
if exposure_info.read_sync and self.helpers.boss.readout_pending:
await self.helpers.boss.readout(self.command)
[docs]
async def expose_apogee(self):
"""Exposes APOGEE."""
await self._pause_event.wait()
for exposure_info in self.expose_helper.yield_apogee():
if exposure_info is None:
break
await self.helpers.apogee.expose(
self.command,
exposure_info.exptime,
exp_type="object",
dither_position=exposure_info.dither_position,
)
exposure_info.done = True
await self._pause_event.wait()
[docs]
async def cleanup(self):
"""Cancel any running exposures."""
await self.expose_helper.stop()
# Close the APOGEE cold shutter.
if "expose_apogee" in self.flat_stages:
self.command.info("Closing APOGEE shutter.")
await self.helpers.apogee.shutter(
self.command,
open=False,
shutter="apogee",
)
if self.helpers.apogee.is_exposing():
self.command.warning("APOGEE exposure is running. Not cancelling it.")
if "expose_boss" in self.flat_stages and self.helpers.boss.is_exposing():
if self.helpers.boss.is_reading():
self.command.info("BOSS is reading.")
else:
self.command.warning("BOSS exposure is running. Not cancelling it.")
if not self.failed and not self.cancelled and self.configuration:
self.configuration.observed = True
async def _pause(self):
"""Pauses the execution of the macro."""
if self._pause_event.is_set():
self._pause_event.clear()
self.command.warning("Pausing execution of the expose macro.")
self.command.debug(expose_is_paused=True)
else:
self.command.warning("Macro is already paused.")
self.command.debug(expose_is_paused=True)
async def _resume(self):
"""Resumes the execution of the macro."""
if not self._pause_event.is_set():
self._pause_event.set()
self.command.warning("Resuming execution of the expose macro.")
self.command.debug(expose_is_paused=False)
else:
self.command.warning("Macro is already running.")
self.command.debug(expose_is_paused=False)