Source code for hal.helpers.lamps

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2021-12-20
# @Filename: lamps.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import time

from hal import HALCommandType, config
from hal.exceptions import HALError

from . import HALHelper


__all__ = ["LampsHelper"]


[docs]class LampsHelper(HALHelper): """Control for lamps.""" LAMPS = ["ff", "HgCd", "Ne"] WARMUP = config["lamp_warmup"].copy() name = "lamps" def _command_one(self, command: HALCommandType, lamp: str, state: bool): """Commands one lamp.""" state_str = "on" if state is True else "off" return self._send_command( command, "mcp", f"{lamp.lower()}_{state_str}", time_limit=config["timeouts"]["lamps"], )
[docs] async def all_off(self, command: HALCommandType, force: bool = False): """Turn off all the lamps.""" status = self.list_status() tasks = [] for lamp in self.LAMPS: if force is False and status[lamp][0] is False and status[lamp][-1] is True: continue tasks.append(self.turn_lamp(command, lamp, False, force=force)) await asyncio.gather(*tasks)
[docs] def list_status(self) -> dict[str, tuple[bool, bool, float, bool]]: """Returns a dictionary with the state of the lamps. For each lamp the first value in the returned tuple is whether the lamp has been commanded on. The second value is whether the lamps is actually on. The third value is how long since it changed states. The final value indicates whether the lamp has warmed up. """ state = {} for lamp in self.LAMPS: commanded_key = f"{lamp}LampCommandedOn" commanded_on = self.actor.models["mcp"][commanded_key][0] if commanded_on is None: raise HALError(f"Failed getting {commanded_key}.") if lamp in ["wht", "UV"]: is_on = bool(commanded_on) state[lamp] = (is_on, is_on, 0.0, is_on) else: lamp_key = f"{lamp}Lamp" lamp_state = self.actor.models["mcp"][lamp_key] if any([lv is None for lv in lamp_state]): raise HALError(f"Failed getting {lamp_key}.") last_seen = lamp_state.last_seen if sum(lamp_state.value) == 4: lamp_state = True elif sum(lamp_state.value) == 0: lamp_state = False else: raise HALError(f"Failed determining state for lamp {lamp}.") elapsed = time.time() - last_seen warmed = (elapsed >= self.WARMUP[lamp]) if bool(commanded_on) else False state[lamp] = (bool(commanded_on), lamp_state, elapsed, warmed) return state
[docs] async def turn_lamp( self, command: HALCommandType, lamps: str | list[str], state: bool, turn_off_others: bool = False, force: bool = False, ): """Turns a lamp on or off. This routine always blocks until the lamps are on and warmed up. If you don't want to block, call it as a task. Parameters ---------- command The command that issues the lamp switching. lamp Name of the lamp(s) to turn on or off. turn_off_others Turn off all other lamps. force If `True`, send the on/off command regardless of status. """ if isinstance(lamps, str): lamps = [lamps] for lamp in lamps: if lamp not in self.LAMPS: raise HALError(f"Invalid lamp {lamp}.") status = self.list_status() tasks = [] turn_off_tasks = [] for ll in self.LAMPS: if ll in lamps: if force is False and status[ll][0] is state and status[ll][-1] is True: pass else: tasks.append(self._command_one(command, ll, state)) else: if turn_off_others is True: if status[ll][0] is not False or force is True: turn_off_tasks.append(self._command_one(command, ll, False)) if len(turn_off_tasks) > 0: await asyncio.gather(*turn_off_tasks) await asyncio.gather(*tasks) done_lamps: list[bool] = [False] * len(lamps) warmed: list[bool] = [False] * len(lamps) n_iter = 0 while True: if all(done_lamps): if state is False: command.info(f"Lamp(s) {','.join(lamps)} are off.") return elif all(warmed): command.info(f"Lamp(s) {','.join(lamps)} are on and warmed up.") return new_status = self.list_status() for i, lamp in enumerate(lamps): # Don't don't anything if we're already at the state. if done_lamps[i] and (state is False or warmed[i]): continue # Index 1 is if the lamp is actually on/off, not only commanded. if new_status[lamp][1] is state: done_lamps[i] = True if state is True: # Index 2 is the elapsed time since it was completely on/off. elapsed = new_status[lamp][2] if elapsed >= self.WARMUP[lamp]: command.debug(f"Lamp {lamp}: warm-up complete.") warmed[i] = True elif (n_iter % 5) == 0: remaining = int(self.WARMUP[lamp] - elapsed) command.debug( f"Warming up lamp {lamp}: " f"{remaining} s remaining." ) await asyncio.sleep(1) n_iter += 1