Source code for hal.macros.goto_field

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2021-10-10
# @Filename: goto_field.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
from time import time

import numpy

from hal import config
from hal.exceptions import HALError, MacroError
from hal.helpers.lamps import LampsHelperAPO, LampsHelperLCO
from hal.macros import Macro


__all__ = ["GotoFieldAPOMacro"]


class _GotoFieldBaseMacro(Macro):
    """Go to field macro."""

    name = "goto_field"

    __PRECONDITIONS__ = ["prepare"]
    __STAGES__ = [
        ("slew", "reconfigure"),
        "fvc",
        ("reslew", "lamps"),
        "boss_flat",
        "boss_hartmann",
        "boss_arcs",
        "acquire",
        "guide",
    ]
    __CLEANUP__ = ["cleanup"]

    _lamps_task: asyncio.Task | None = None

    async def prepare(self):
        """Check configuration and run pre-slew checks."""

        self._lamps_task = None

        stages = self.flat_stages

        if "reconfigure" in stages:
            configuration_loaded = self.actor.models["jaeger"]["configuration_loaded"]
            last_seen = configuration_loaded.last_seen

            if last_seen is None:
                self.command.warning("The age of the loaded configuration is unknown.")
            elif time() - last_seen > 3600:  # One hour
                raise MacroError("Configuration is too old. Load a new configuration.")

        do_fvc = "fvc" in stages
        do_flat = "boss_flat" in stages
        do_arcs = "boss_hartmann" in stages or "boss_arcs" in stages

        # Stop the guider.
        # TODO: this will probably be different at LCO.
        if do_fvc or do_flat or do_arcs:
            if self.observatory == "APO":
                assert self.helpers.tcc

                await self.helpers.cherno.stop_guiding(self.command)
                await self.helpers.tcc.axis_stop(self.command)

            else:
                if "slew" in stages:
                    await self.helpers.cherno.stop_guiding(self.command)

        # Ensure the APOGEE shutter is closed but don't wait for it.
        if "apogee" in self.actor.config["enabled_instruments"]:
            asyncio.create_task(
                self.helpers.apogee.shutter(
                    self.command,
                    open=False,
                    shutter="apogee",
                )
            )

        # Reset cherno offsets.
        self.command.debug("Resetting cherno offsets.")
        await self.helpers.cherno.reset_offsets(self.command)

        # Start closing the FFS if they are open but do not block. Only close the FFS
        # if we're going to do BOSS cals, otherwise it's about 20 seconds of lost time.
        if do_flat or do_arcs:
            await self._close_ffs(wait=False)

        # If lamps are needed, turn them on now but do not wait for them to warm up.
        # Do not turn lamps if we are going to take an FVC image. We add a delay
        # since the APOGEE shutter is closing and we don't want to start turning on
        # the lamps until it's fully closed.
        if self.observatory == "APO":
            if do_flat and not do_fvc:
                self._lamps_task = asyncio.create_task(
                    self.helpers.lamps.turn_lamp(
                        self.command,
                        ["ff"],
                        True,
                        turn_off_others=True,
                        delay=10,
                    )
                )
            elif not do_flat and do_arcs:
                if do_fvc:
                    lamps = ["HgCd"]
                else:
                    lamps = ["HgCd", "Ne"]

                self._lamps_task = asyncio.create_task(
                    self.helpers.lamps.turn_lamp(
                        self.command,
                        lamps,
                        True,
                        turn_off_others=True,
                        delay=10,
                    )
                )
            else:
                await self._all_lamps_off(wait=False)

        else:
            await self._all_lamps_off(wait=False)

    async def slew(self):
        """Slew to field but keep the rotator at a fixed position."""

        raise NotImplementedError()

    async def reconfigure(self):
        """Reconfigures the FPS."""

        self.command.info("Reconfiguring FPS array.")

        if not (await self.helpers.jaeger.is_folded(self.command)):
            try:
                await self.send_command("jaeger", "configuration reverse")
            except MacroError:
                self.command.warning("Reverse trajectory failed. Will try to unwind.")

                if not (await self.helpers.jaeger.unwind(self.command)):
                    raise MacroError("Failed unwindind the FPS array.")

        await self.send_command("jaeger", "configuration execute")

    async def fvc(self):
        """Run the FVC loop."""

        if "slew" in self.flat_stages and self.observatory == "APO":
            assert self.helpers.tcc

            self.command.info("Halting the rotator.")
            await self.helpers.tcc.axis_stop(self.command, axis="rot")

        self.command.info("Running FVC loop.")
        fvc_command = await self.send_command(
            "jaeger",
            "fvc loop",
            time_limit=config["timeouts"]["fvc"],
            raise_on_fail=False,
        )

        if fvc_command.status.did_fail:
            raise MacroError("FVC loop failed.")

    async def reslew(self):
        """Re-slew to field."""

        raise NotImplementedError()

    async def lamps(self):
        """Ensures the correct lamps for calibrations are on."""

        cal_stages = ["boss_flat", "boss_hartmann", "boss_arcs"]
        if all([stage not in self.flat_stages for stage in cal_stages]):
            return

        # If we are going to take BOSS cals, start warming up lamps now (some may
        # already be on).
        if "boss_flat" in self.flat_stages:
            mode = "flat"
        elif "boss_hartmann" in self.flat_stages:
            mode = "hartmann"
        else:
            mode = "arcs"

        await self._ensure_lamps(mode)

    async def boss_flat(self):
        """Takes the BOSS flat."""

        self.command.info("Taking BOSS flat.")

        await self._ensure_lamps(mode="flat")

        # Now take the flat. Do not read it yet.
        flat_time = self.config["flat_time"][self.observatory]

        self.command.debug("Starting BOSS flat exposure.")

        await self.helpers.boss.expose(
            self.command,
            flat_time,
            exp_type="flat",
            readout=True,
            read_async=True,
        )

        ff_lamp = "ff" if self.observatory == "APO" else "TCS_FF"
        asyncio.create_task(self.helpers.lamps.turn_lamp(self.command, ff_lamp, False))

    async def boss_hartmann(self):
        """Takes the hartmann sequence."""

        await self._ensure_lamps(mode="hartmann")

        if self.helpers.boss.readout_pending:  # Potential readout from the flat.
            self.command.info("Waiting for BOSS to read out.")
            await self.helpers.boss.readout(self.command)

        # Run hartmann and adjust the collimator but ignore residuals.
        self.command.info("Running hartmann collimate.")

        if self.observatory == "APO":
            command_string = "collimate ignoreResiduals"
        else:
            command_string = "collimate --keep-lamps"

        await self.send_command(
            "hartmann",
            command_string,
            time_limit=config["timeouts"]["hartmann"],
        )

        # Now check if there are residuals that require modifying the blue ring.
        residuals_kw = "sp1Residuals" if self.observatory == "APO" else "sp2Residuals"
        residuals = self.actor.models["hartmann"][residuals_kw][2]
        if residuals != "OK":
            raise MacroError(
                "Please adjust the blue ring and run goto-field again. "
                "The collimator has been adjusted."
            )

        if "boss_arcs" not in self.flat_stages:
            await self._all_lamps_off(wait=False)

    async def boss_arcs(self):
        """Takes BOSS arcs."""

        await self._ensure_lamps(mode="arcs")

        if self.helpers.boss.readout_pending:
            self.command.info("Waiting for BOSS to read out.")
            await self.helpers.boss.readout(self.command)

        self.command.info("Taking BOSS arc.")

        arc_time = self.config["arc_time"][self.observatory]

        await self.helpers.boss.expose(
            self.command,
            arc_time,
            exp_type="arc",
            readout=True,
            read_async=True,
        )

        await self._all_lamps_off(wait=False)

    async def acquire(self):
        """Acquires the field."""

        self._mark_design_as_goto_complete()

        if self.helpers.cherno.is_guiding():
            self.command.info("Already guiding.")
            return

        await self._guide_preconditions("acquire")

        acquisition_config = self.config["acquisition"][self.observatory]

        exposure_time = acquisition_config["exposure_time"]
        max_exposure_time = acquisition_config["max_exposure_time"]
        dynamic_exposure_time = acquisition_config["dynamic_exposure_time"]
        target_rms = acquisition_config["target_rms"]
        min_rms = acquisition_config["min_rms"]
        max_iterations = acquisition_config["max_iterations"]
        wait_time = acquisition_config["wait_time"]

        self.command.info("Acquiring field.")

        try:
            await self.helpers.cherno.acquire(
                self.command,
                exposure_time=exposure_time,
                max_exposure_time=max_exposure_time,
                dynamic_exposure_time=dynamic_exposure_time,
                target_rms=target_rms,
                max_iterations=max_iterations,
                wait_time=wait_time,
            )
        except HALError:
            # If we have exhausted the number of exposures and not reached
            # the target RMS, check if we reach the minimum RMS. If so, warn
            # but continue.
            if self.helpers.cherno.guiding_at_rms(min_rms, allow_not_guiding=True):
                self.command.warning(
                    f"Target RMS not reached but RMS < {min_rms} arcsec. Will continue."
                )
            else:
                raise

    async def guide(self):
        """Starts the guide loop."""

        self._mark_design_as_goto_complete()

        if self.helpers.cherno.is_guiding():
            self.command.info("Already guiding.")
            return

        await self._guide_preconditions("guide")

        guide_config = self.config["guide"][self.observatory]
        if self.config["guider_time"] is not None:
            exposure_time = self.config["guider_time"]
        else:
            exposure_time = guide_config["exposure_time"]

        self.command.info("Starting guide loop.")
        await self.helpers.cherno.guide(
            self.command,
            exposure_time=exposure_time,
            wait_time=guide_config["wait_time"],
            wait=False,
        )

    async def cleanup(self):
        """Turns off all lamps."""

        if self.observatory == "LCO":
            await asyncio.sleep(3)

        # If enough stages have run, mark this configuration as goto_complete.
        self._mark_design_as_goto_complete()

        if self._lamps_task is not None and not self._lamps_task.done():
            self._lamps_task.cancel()

        await self._all_lamps_off()

        # Read any pending BOSS exposure.
        if self.helpers.boss.readout_pending:
            await self.helpers.boss.readout(self.command)

    def _mark_design_as_goto_complete(self):
        """Marks the design as goto_complete."""

        if self.helpers.jaeger.configuration is not None:
            if self._is_goto_complete():
                self.helpers.jaeger.configuration.goto_complete = True
            else:
                self.helpers.jaeger.configuration.goto_complete = False

    def _get_pointing(self):
        """Returns the configuration pointing."""

        configuration_loaded = self.actor.models["jaeger"]["configuration_loaded"]
        ra, dec, pa = configuration_loaded[3:6]

        if any([ra is None, dec is None, pa is None]):
            raise MacroError("Unknown RA/Dec/PA coordinates for field.")

        ra_off = self.config[f"slew_offsets.{self.observatory}.ra"]
        dec_off = self.config[f"slew_offsets.{self.observatory}.dec"]
        rot_off = self.config[f"slew_offsets.{self.observatory}.rot"]

        if ra_off is not None:
            ra += ra_off / numpy.cos(numpy.radians(dec))

        dec += dec_off or 0.0
        pa += rot_off or 0.0

        return ra, dec, pa

    async def _set_guider_offset(self):
        """Sets the guider offset."""

        offset = self.config["guider_offset"]
        if offset is not None:
            offset = " ".join(map(str, offset))
            self.command.info(f"Setting guide offset to {offset}.")
            await self.send_command("cherno", f"offset {offset}")

    def _is_goto_complete(self):
        """Determines whether we can mark the configuration as ``goto_complete```.

        The stage at which we mark the configuration as ``goto_complete`` depends
        on what stages we are running, but it is always after the last stage
        before acquisition.

        """

        for stage in self.flat_stages:
            if stage in ["acquire", "guide", "cleanup"]:
                continue
            if not self.is_stage_done(stage):
                return False

        return True

    async def _close_ffs(self, wait: bool = True):
        """Closes the FFS."""

        pass

    async def _guide_preconditions(self, stage: str):
        """Tasks to be run before acquisition/guiding can start."""

        pass

    async def _all_lamps_off(self, wait: bool = True):
        """Turns all the lamps off after checking them."""

        raise NotImplementedError()

    async def _ensure_lamps(self, mode: str):
        """Ensures the lamps for flats/arcs/hartmann are on."""

        raise NotImplementedError()


[docs] class GotoFieldAPOMacro(_GotoFieldBaseMacro): """Goto field macro for APO.""" observatory = "APO"
[docs] async def slew(self): """Slew to field but keep the rotator at a fixed position.""" assert self.helpers.tcc # Wait five seconds to give time for the axis stop we issued in Prepare to # take effect. await asyncio.sleep(5) ra, dec, pa = self._get_pointing() result = self.helpers.tcc.axes_are_clear() if not result: raise HALError("Some axes are not clear. Cannot continue.") # The fixed position at which to slew for the FVC loop. alt = self.config["fvc"]["alt"] az = self.config["fvc"]["az"] rot = self.config["fvc"]["rot"] keep_offsets = self.config["keep_offsets"] if self.config["fixed_rot"] is False: self.command.info("Slewing to field RA/Dec/PA.") await self.helpers.tcc.goto_position( self.command, {"ra": ra, "dec": dec, "rot": pa}, rotwrap="middle", keep_offsets=keep_offsets, ) else: if self.config["fixed_altaz"]: self.command.info("Slewing to field with fixed rotator angle.") track_command = f"track {az}, {alt} mount /rota={rot} /rottype=mount" else: self.command.info("Slewing to field with fixed alt/az/rot position.") track_command = f"track {ra}, {dec} icrs /rota={rot} /rottype=mount" slew_result = await self.helpers.tcc.do_slew( self.command, track_command=track_command, keep_offsets=keep_offsets, ) if slew_result is False: raise HALError("Failed slewing to position.") self.command.info(text="Position reached.") self.command.info("Halting the rotator.") await self.helpers.tcc.axis_stop(self.command, axis="rot")
[docs] async def reslew(self): """Re-slew to field.""" assert self.helpers.tcc ra, dec, pa = self._get_pointing() self.command.info("Re-slewing to field.") # Start going to position asynchronously. await self.helpers.tcc.goto_position( self.command, {"ra": ra, "dec": dec, "rot": pa}, rotwrap="nearest", keep_offsets=self.config["keep_offsets"], )
async def _all_lamps_off(self, wait: bool = True): """Turns all the lamps off after checking them.""" # Check lamp status. command_off: bool = False lamp_status = self.helpers.lamps.list_status() for name, ls in lamp_status.items(): if ls[1] is not False: self.command.warning(f"Lamp {name} is on, will turn off.") command_off = True if command_off: task = asyncio.create_task(self.helpers.lamps.all_off(self.command)) if wait: await task async def _ensure_lamps(self, mode: str): """Ensures the lamps for flats/arcs/hartmann are on.""" assert isinstance(self.helpers.lamps, LampsHelperAPO) # Make sure FFS are closed. close_ffs = asyncio.create_task(self._close_ffs()) # Check lamps. Depending on the other stages HgCd may be on but Ne not. Loop # over each on of the lamps and if it's not on, turn it on. If any of them is # not on wait for 10 seconds, which is enough for the Hartmanns. lamp_status = self.helpers.lamps.list_status() if mode == "flat": lamp_status = self.helpers.lamps.list_status() if lamp_status["ff"][3] is False: if self._lamps_task and not self._lamps_task.done(): # Lamps have been commanded on but are not warmed up yet. await self._lamps_task else: self.command.warning("Turning FF lamp on.") await self.helpers.lamps.turn_lamp( self.command, ["ff"], True, turn_off_others=True, ) else: wait: float = 0 for lamp in ["HgCd", "Ne"]: if lamp_status[lamp][0] is False: self.command.warning(f"Turning {lamp} lamp on.") asyncio.create_task( self.helpers.lamps.turn_lamp( self.command, [lamp], True, turn_off_others=False, ) ) # For hartmann we don't need to wait until the lamps have fully # warmed up. For arcs we wait until they are. if mode == "hartmann": if lamp == "HgCd": wait = 10 if wait < 10 else wait else: wait = 5 if wait < 5 else wait else: if LampsHelperAPO.WARMUP[lamp] > wait: wait = LampsHelperAPO.WARMUP[lamp] elif mode == "arcs" and lamp_status[lamp][3] is False: elapsed = lamp_status[lamp][2] wait_lamp = LampsHelperAPO.WARMUP[lamp] - elapsed if wait < wait_lamp: wait = wait_lamp if wait > 0: # TODO: maybe here we should confirm that the lamps are turning on. # We don't want to await the tasks though, because in some cases we # are waiting less time than the full warm up time. self.command.info(f"Waiting {wait} seconds for the lamps to warm-up.") await asyncio.sleep(wait) # Ensure FFS have fully closed. await close_ffs async def _close_ffs(self, wait: bool = True): """Closes the FFS.""" assert self.helpers.ffs if not self.helpers.ffs.all_closed(): self.command.info("Closing FFS") task = self.helpers.ffs.close(self.command) if wait: await task else: asyncio.create_task(task) async def _guide_preconditions(self, stage: str): """Run guide preconditions.""" assert self.helpers.ffs and self.helpers.tcc pretasks = [] if (stage == "acquire" and "reslew" not in self.flat_stages) or ( stage == "guide" and "acquire" not in self.flat_stages ): self.command.info("Re-slewing to field.") pretasks.append(self.reslew()) if not self.helpers.ffs.all_open(): self.command.info("Opening FFS") pretasks.append(self.helpers.ffs.open(self.command)) # Open FFS and re-slew at the same time. await asyncio.gather(*pretasks) # A bit of delay to make sure the axis status keyword is updated. await asyncio.sleep(1) if not self.helpers.tcc.check_axes_status("Tracking"): raise MacroError("Axes must be tracking for acquisition.")
class GotoFieldLCOMacro(_GotoFieldBaseMacro): # pragma: no cover """Goto field macro for LCO.""" observatory = "LCO" def __init__(self): super().__init__() self.screen_on: bool = False def _reset_internal(self, **opts): self.screen_on = False return super()._reset_internal(**opts) async def slew(self): """Slews the telescope.""" do_flat = "boss_flat" in self.flat_stages do_arcs = "boss_hartmann" in self.flat_stages or "boss_arcs" in self.flat_stages do_screen = True if do_flat or do_arcs else False await self._slew_telescope(screen=do_screen) async def reslew(self): """Reslew to position after the FVC stage.""" # No need for this at LCO. return async def _all_lamps_off(self, wait: bool = True): """Turns all the lamps off.""" task = asyncio.create_task(self.helpers.lamps.all_off(self.command)) if wait: await task async def _ensure_lamps(self, mode: str): """Makes sure the lamps are configured properly.""" assert isinstance(self.helpers.lamps, LampsHelperLCO) if self._lamps_task and not self._lamps_task.done(): # Lamps have been commanded on but are not warmed up yet. await self._lamps_task if mode == "flat": await self.helpers.lamps.turn_lamp( self.command, ["TCS_FF"], True, turn_off_others=True, ) else: await self.helpers.lamps.turn_lamp( self.command, ["HeAr", "Ne"], True, turn_off_others=True, ) async def _slew_telescope(self, screen: bool = False): """Slews the du Pont telescope.""" ra, dec, pa = self._get_pointing() command_string = f"target {ra}, {dec} /posAngle={pa:.3f}" if screen: command_string += " /screen" self.command.info("Slewing to field RA/Dec/PA.") # The command times out when it gets there, so for now we ignore the failure. await self.send_command("lcotcc", command_string, raise_on_fail=False) self.screen_on = screen async def _remove_screen(self): """Ensures the screen is not in front of the telescope.""" if self.screen_on: await self._slew_telescope(False) async def _guide_preconditions(self, stage: str): """Ensure the system is ready to guide/acquire.""" await self._remove_screen() async def _close_ffs(self, wait: bool = True): return True