Source code for hal.helpers.scripts

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2021-09-27
# @Filename: scripts.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import pathlib
from dataclasses import dataclass

from typing import TYPE_CHECKING

import hal


if TYPE_CHECKING:
    from clu import Command

    from hal.actor import HALActor, HALCommandType


__all__ = ["Scripts"]


[docs] @dataclass class Scripts: actor: HALActor path: pathlib.Path def __post_init__(self): self.path = pathlib.Path(self.path) if not self.path.is_absolute(): self.path = pathlib.Path(hal.__file__).parent / self.path self.running: dict[str, asyncio.Task] = {}
[docs] def list_scripts(self): """Returns a list of script names.""" return [ff.name.replace(ff.suffix, "") for ff in self.path.glob("*.inp")]
[docs] def get_steps(self, name: str) -> list[tuple[str, str, float | None]]: """Returns the list of steps of the script.""" if name not in self.list_scripts(): raise ValueError(f"Cannot find script {name}.") lines = (self.path / (name + ".inp")).read_text().splitlines() steps = [] for line in lines: if line.strip().startswith("#"): continue parts = line.strip().split() try: timeout = float(parts[0]) actor = parts[1] command_string = " ".join(parts[2:]) except ValueError: actor = parts[0] command_string = " ".join(parts[1:]) timeout = None steps.append((actor, command_string, timeout)) return steps
[docs] async def run(self, name: str, command: Command[HALActor] | None = None) -> bool: """Runs a script. This coroutine creates a task with all the steps to execute and adds it to the ``running`` dictionary. The execution can be cancelled by calling `.stop`, at which point the task will be cancelled and `.run` will handle the cancellation. """ async def run_steps(steps): for n, step in enumerate(steps): actor, command_string, timeout = step if command: step_message = f"{actor} {command_string}" if timeout: step_message = f"{timeout} {step_message}" command.warning(text=f"Script {name}: {step_message}") runner = self.actor if command is None else command if command: command.debug( script_step=[ name, f"{actor} {command_string}", n + 1, len(steps), ] ) if actor == "sleep": await asyncio.sleep(float(command_string)) else: await asyncio.wait_for( runner.send_command(actor, command_string), timeout, ) if name in self.running: raise RuntimeError(f"Script {name} is already running.") steps = self.get_steps(name) task = asyncio.create_task(run_steps(steps)) self.running[name] = task self._emit_running() # Now actually await the task, but be sure to handle cancellation. # If there is another kind of error (e.g. a timeout), raise it. try: await task except asyncio.CancelledError: return False except asyncio.TimeoutError: if command: command.error(error=f"Script {name}: one of the steps timedout out.") return False except Exception: raise finally: self.running.pop(name) self._emit_running() return True
[docs] async def cancel(self, name: str): """Cancels a running script.""" if name not in self.running: raise RuntimeError(f"Script {name} is not running.") task = self.running[name] task.cancel()
def _emit_running(self, command: HALCommandType | None = None): """Emits the ``running_scripts`` keyword.""" running_scripts = list(self.running.keys()) if command: command.info(running_scripts=running_scripts) else: self.actor.write("i", running_scripts=running_scripts)