Source code for hal.helpers.lamps

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2021-12-20
# @Filename: lamps.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import time

from typing import TYPE_CHECKING

from hal import config
from hal.exceptions import HALError

from . import HALHelper


if TYPE_CHECKING:
    from hal.actor import HALCommandType


__all__ = ["LampsHelperAPO", "LampsHelperLCO"]


[docs] class LampsHelperAPO(HALHelper): """Control for lamps.""" LAMPS = ["ff", "HgCd", "Ne"] WARMUP = config["lamp_warmup"].copy() name = "lamps" def _command_one(self, command: HALCommandType, lamp: str, state: bool): """Commands one lamp.""" state_str = "on" if state is True else "off" return self._send_command( command, "mcp", f"{lamp.lower()}_{state_str}", time_limit=config["timeouts"]["lamps"], )
[docs] async def all_off(self, command: HALCommandType, force: bool = False): """Turn off all the lamps.""" status = self.list_status() tasks = [] for lamp in self.LAMPS: if force is False and status[lamp][0] is False and status[lamp][-1] is True: continue tasks.append(self.turn_lamp(command, lamp, False, force=force)) await asyncio.gather(*tasks)
[docs] def list_status(self) -> dict[str, tuple[bool, bool, float, bool]]: """Returns a dictionary with the state of the lamps. For each lamp the first value in the returned tuple is whether the lamp has been commanded on. The second value is whether the lamps is actually on. The third value is how long since it changed states. The final value indicates whether the lamp has warmed up. """ state = {} for lamp in self.LAMPS: commanded_key = f"{lamp}LampCommandedOn" commanded_on = self.actor.models["mcp"][commanded_key][0] if commanded_on is None: raise HALError(f"Failed getting {commanded_key}.") if lamp in ["wht", "UV"]: is_on = bool(commanded_on) state[lamp] = (is_on, is_on, 0.0, is_on) else: lamp_key = f"{lamp}Lamp" lamp_state = self.actor.models["mcp"][lamp_key] if any([lv is None for lv in lamp_state]): raise HALError(f"Failed getting {lamp_key}.") last_seen = lamp_state.last_seen if sum(lamp_state.value) == 4: lamp_state = True else: # Sometimes when a lamp is turning on we'll have, e.g., 1,0,0,1. lamp_state = False elapsed = time.time() - last_seen warmed = (elapsed >= self.WARMUP[lamp]) if bool(commanded_on) else False state[lamp] = (bool(commanded_on), lamp_state, elapsed, warmed) return state
[docs] async def turn_lamp( self, command: HALCommandType, lamps: str | list[str], state: bool, turn_off_others: bool = False, delay: float = 0.0, force: bool = False, ): """Turns a lamp on or off. This routine always blocks until the lamps are on and warmed up. If you don't want to block, call it as a task. Parameters ---------- command The command that issues the lamp switching. lamps Name of the lamp(s) to turn on or off. state Whether to turn lamp on or off. turn_off_others Turn off all other lamps. delay Wait this amount of seconds before actually commanding the lamps. force If `True`, send the on/off command regardless of status. """ if isinstance(lamps, str): lamps = [lamps] for lamp in lamps: if lamp not in self.LAMPS: raise HALError(f"Invalid lamp {lamp}.") await asyncio.sleep(delay) status = self.list_status() tasks = [] turn_off_tasks = [] for ll in self.LAMPS: if ll in lamps: if force is False and status[ll][0] is state and status[ll][-1] is True: pass else: tasks.append(self._command_one(command, ll, state)) else: if turn_off_others is True: if status[ll][0] is not False or force is True: turn_off_tasks.append(self._command_one(command, ll, False)) if len(turn_off_tasks) > 0: await asyncio.gather(*turn_off_tasks) await asyncio.gather(*tasks) done_lamps: list[bool] = [False] * len(lamps) warmed: list[bool] = [False] * len(lamps) n_iter = 0 while True: if all(done_lamps): if state is False: if len(lamps) > 0: command.info(f"Lamps {','.join(lamps)} are off.") else: command.info(f"Lamp {','.join(lamps)} is off.") return elif all(warmed): if len(lamps) > 0: command.info(f"Lamps {','.join(lamps)} are on and warmed up.") else: command.info(f"Lamp {','.join(lamps)} is on and warmed up.") return new_status = self.list_status() for i, lamp in enumerate(lamps): # Don't don't anything if we're already at the state. if done_lamps[i] and (state is False or warmed[i]): continue # Index 1 is if the lamp is actually on/off, not only commanded. if new_status[lamp][1] is state: done_lamps[i] = True if state is True: # Index 2 is the elapsed time since it was completely on/off. elapsed = new_status[lamp][2] if elapsed >= self.WARMUP[lamp]: command.debug(f"Lamp {lamp}: warm-up complete.") warmed[i] = True elif (n_iter % 5) == 0: remaining = int(self.WARMUP[lamp] - elapsed) command.debug( f"Warming up lamp {lamp}: {remaining} s remaining." ) await asyncio.sleep(1) n_iter += 1
[docs] class LampsHelperLCO(HALHelper): """Control for lamps at LCO.""" LAMPS = ["TCS_FF", "HeAr", "Ne"] name = "lamps" async def _command_one(self, command: HALCommandType, lamp_name: str, state: bool): """Commands one lamp.""" state_str = "on" if state is True else "off" # Blocks until warmed up. await self._send_command(command, "lcolamps", f"{state_str} {lamp_name}")
[docs] async def all_off(self, command: HALCommandType): """Turn off all the lamps.""" await self._send_command(command, "lcolamps", "off")
[docs] def list_status(self) -> dict[str, tuple[bool, bool]]: """Returns a dictionary with the state of the lamps. For each lamp the first value in the returned tuple is whether the lamp has been commanded on with the second being whether the lamps has finished warming up. """ state = {} for lamp in self.LAMPS: current_state = self.actor.models["lcolamps"][lamp][0] if current_state == "ON": state[lamp] = (True, True) elif current_state == "WARMING": state[lamp] = (True, False) elif current_state == "OFF": state[lamp] = (False, False) else: raise HALError(f"Failed getting status for lamp {lamp}.") return state
[docs] async def turn_lamp( self, command: HALCommandType, lamps: str | list[str], state: bool, turn_off_others: bool = False, delay: float = 0.0, is_retry: bool = False, ): """Turns a lamp on or off. This routine always blocks until the lamps are on and warmed up. If you don't want to block, call it as a task. Parameters ---------- command The command that issues the lamp switching. lamps Name of the lamp(s) to turn on or off. state Whether to turn lamp on or off. turn_off_others Turn off all other lamps. delay Wait this amount of seconds before actually commanding the lamps. is_retry Flag to track whether the method is being called as a retry. """ if isinstance(lamps, str): lamps = [lamps] for lamp in lamps: if lamp not in self.LAMPS: raise HALError(f"Invalid lamp {lamp}.") await asyncio.sleep(delay) status = self.list_status() tasks = [] turn_off_tasks = [] for ll in self.LAMPS: if ll in lamps: if status[ll][0] is state and status[ll][1] is state: # All done with this lamp. pass else: # Turn on/off this lamp. tasks.append(self._command_one(command, ll, state)) else: if turn_off_others is True: if status[ll][0] is not False: turn_off_tasks.append(self._command_one(command, ll, False)) try: if len(turn_off_tasks) > 0: await asyncio.gather(*turn_off_tasks) await asyncio.gather(*tasks) except Exception as err: if is_retry: raise else: command.warning(f"Failed commanding lamps: {err}") command.warning("Retrying lamps.") return await self.turn_lamp( command, lamps, state, turn_off_others=turn_off_others, delay=delay, is_retry=True, )