#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2021-09-26
# @Filename: tcc.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
from __future__ import annotations
from typing import TYPE_CHECKING
from hal import config
from hal.exceptions import HALError
from . import HALHelper
if TYPE_CHECKING:
from hal.actor import HALCommandType
__all__ = ["TCCHelper"]
[docs]
class TCCHelper(HALHelper):
"""Helper for the TCC."""
name = "tcc"
is_slewing: bool = False
[docs]
async def goto_position(
self,
command: HALCommandType,
where: str | dict[str, float],
**kwargs,
):
"""Executes the goto command.
Parameters
----------
command
The actor command.
where
The name of the goto entry in the configuration file, or a
dictionary with the keys ``(alt, az, rot)`` in degrees.
kwargs
Arguments to pass to `~.TCCHelper.do_slew`.
"""
config = command.actor.config
if isinstance(where, str):
if where not in config["goto"]:
raise HALError(f"Cannot find goto position '{where}'.")
alt = config["goto"][where]["alt"]
az = config["goto"][where]["az"]
rot = config["goto"][where]["rot"]
where = {"alt": alt, "az": az, "rot": rot}
if self.is_slewing:
raise HALError("TCC is already slewing.")
# await self.axis_init(command)
# Even if this is already checked in axis_init(), let's check again that the
# axes are ok, but if alt < limit, we only check az and alt because we won't
# move in altitude.
result = self.axes_are_clear()
if not result:
raise HALError("Some axes are not clear. Cannot continue.")
# Now do the actual slewing.
slew_result = await self.do_slew(command, where, **kwargs)
if slew_result is False:
raise HALError(f"Failed going to position {where}.")
return command.info(text=f"At position {where}.")
[docs]
async def axis_init(self, command: HALCommandType) -> bool:
"""Executes TCC axis init or fails."""
status = await self._send_command(
command,
"tcc",
"axis status",
time_limit=20.0,
raise_on_fail=False,
)
if status.status.did_fail:
raise HALError("'tcc status' failed. Is the TCC connected?")
if self.check_stop_in() is True:
raise HALError(
"Cannot tcc axis init because of bad axis status: "
"Check stop buttons on Interlocks panel."
)
sem = await self.mcp_semaphore_ok(command)
if sem is False:
raise HALError("Failed getting the semaphore information.")
if sem == "TCC:0:0" and self.axes_are_clear():
command.debug(
text="Axes clear and TCC has semaphore. "
"No axis init needed, so none sent."
)
return True
command.debug(text="Sending tcc axis init.")
axis_init_cmd_str = "axis init"
if self.below_alt_limit():
command.warning(
text="Altitude below interlock limit! Only initializing altitude "
"and rotator: cannot move in az."
)
axis_init_cmd_str += " rot,alt"
axis_init_cmd = await self._send_command(
command,
"tcc",
axis_init_cmd_str,
time_limit=20.0,
raise_on_fail=False,
)
if axis_init_cmd.status.did_fail:
command.error("Cannot slew telescope: failed tcc axis init.")
raise HALError("Cannot slew telescope: check and clear interlocks?")
return True
[docs]
async def axis_stop(self, command: HALCommandType, axis: str = "") -> bool:
"""Issues an axis stop to the TCC."""
axis_stop_cmd = await self._send_command(
command,
"tcc",
f"axis stop {axis}".strip(),
time_limit=30.0,
raise_on_fail=False,
)
if axis_stop_cmd.status.did_fail:
raise HALError("Error: failed to cleanly stop telescope via tcc axis stop.")
self.is_slewing = False
return True
[docs]
def get_bad_axis_bits(self, axes=("az", "alt", "rot"), mask=None):
"""Return the bad status bits for the requested axes."""
if mask is None:
mask = self.actor.models["tcc"]["axisBadStatusMask"][0]
if mask is None:
self.actor.write("e", text="Cannot retrieve TCC axisBadStatusMask.")
return [1, 1, 1]
return [(self.actor.models["tcc"][f"{axis}Stat"][3] & mask) for axis in axes]
[docs]
def check_stop_in(self, axes=("az", "alt", "rot")) -> bool:
"""Returns `True` if any stop bit is set in the ``<axis>Stat`` TCC keywords.
The [az,alt,rot]Stat[3] bits show the exact status:
http://www.apo.nmsu.edu/Telescopes/HardwareControllers/AxisControllers.html#25mStatusBits
"""
try:
# 0x2000 is "stop button in"
return any(self.get_bad_axis_bits(axes=axes, mask=0x2000))
except TypeError:
# Some axisStat is unknown (and thus None)
return False
[docs]
async def mcp_semaphore_ok(self, command: HALCommandType):
"""Returns the semaphore if the semaphore is owned by the TCC or nobody."""
mcp_model = self.actor.models["mcp"]
sem = mcp_model["semaphoreOwner"]
if sem is None:
sem_show_cmd = await self._send_command(
command,
"mcp",
"sem.show",
time_limit=20.0,
raise_on_fail=False,
)
if sem_show_cmd.status.did_fail:
raise HALError("Cannot get mcp semaphore. Is the MCP alive?")
sem = mcp_model["semaphoreOwner"]
if (
(sem[0] != "TCC:0:0")
and (sem[0] != "")
and (sem[0] != "None")
and (sem[0] is not None)
):
raise HALError(
f"Cannot axis init: Semaphore is owned by {sem[0]}. "
"If you are the owner (e.g., via MCP Menu), release it and try again. "
"If you are not the owner, confirm that you can steal "
"it from them, then issue: mcp sem.steal"
)
return sem
[docs]
def axes_are_clear(self, axes=("az", "alt", "rot")) -> bool:
"""Checks that no bits are set in any axis status field."""
if "tcc" in self.actor.helpers.bypasses:
self.actor.write(
"w",
text="Saying all axes are clear because TCC is bypassed.",
)
return True
axes = [ax for ax in axes if ax != "alt" or self.below_alt_limit() is False]
try:
tcc_model = self.actor.models["tcc"]
return all((tcc_model[f"{axis}Stat"][3] == 0) for axis in axes)
except TypeError:
# Some axisStat is unknown (and thus None)
return False
[docs]
def below_alt_limit(self) -> bool:
"""Check if we are below the alt=18 limit that prevents init/motion in az."""
limit = self.actor.config["goto"]["alt_limit"]
return self.actor.models["tcc"]["axePos"][1] < limit
[docs]
def check_axes_status(self, status: str) -> bool:
"""Returns `True` if all the axes are at ``status``."""
axes_status = self.actor.models["tcc"]["AxisCmdState"].value
return all([axis.lower() == status.lower() for axis in axes_status])
[docs]
async def do_slew(
self,
command,
coords: dict[str, float] | None = None,
track_command: str | None = None,
keep_offsets: bool = True,
offset: bool = False,
rotwrap: str | None = None,
) -> bool:
"""Correctly handle a slew command, given what parse_args had received.
Parameters
----------
command
The actor command used to send child commands to the TCC.
coords
The coordinates where to slew. It must be a dictionary with keys
``ra, dec, rot`` or ``alt, az, rot``.
track_command
A raw TCC ``track`` command (without the ``tcc`` target) to send.
In this case other arguments like ``keep_offsets`` or ``rotwrap``
are ignored.
keep_offsets
Whether to keep the existing offsets.
rotwrap
The type of ``/RotWrap`` to use.
offset
If defined, the coordinates will be treated as an offset and not
absolute positions.
"""
tcc_model = self.actor.models["tcc"]
# Check axes.
result = self.axes_are_clear()
if not result:
raise HALError("Some axes are not clear. Cannot continue.")
# NOTE: TBD: We should limit which offsets are kept.
keep_args = "/keep=(arc,gcorr,calib,bore)" if keep_offsets else ""
rotwrap = f"/rotwrap={rotwrap}" if rotwrap else ""
slew_cmd = None
if track_command:
slew_cmd = self._send_command(
command,
"tcc",
track_command,
time_limit=config["timeouts"]["slew"],
raise_on_fail=False,
)
else:
if coords is None:
raise HALError("coords needed if track_command is None.")
if not offset:
if "ra" in coords and "dec" in coords and "rot" in coords:
ra = coords["ra"]
dec = coords["dec"]
rot = coords["rot"]
command.info(
text="Slewing to (ra, dec, rot) == "
f"({ra:.4f}, {dec:.4f}, {rot:g})"
)
if keep_args:
command.warning(text="keeping all offsets")
slew_cmd = self._send_command(
command,
"tcc",
f"track {ra}, {dec} icrs /rottype=object/rotang={rot:g} "
f"{rotwrap} {keep_args}",
time_limit=config["timeouts"]["slew"],
raise_on_fail=False,
)
elif "az" in coords and "alt" in coords and "rot" in coords:
alt = coords["alt"]
az = coords["az"]
rot = coords["rot"]
command.info(
text="Slewing to (az, alt, rot) == "
f"({az:.4f}, {alt:.4f}, {rot:.4f})"
)
slew_cmd = self._send_command(
command,
"tcc",
f"track {az:f}, {alt:f} mount /rottype=mount "
f"/rotangle={rot:f} {rotwrap} {keep_args}",
time_limit=config["timeouts"]["slew"],
raise_on_fail=False,
)
else:
raise HALError("Not enough coordinates information provided.")
else:
if "alt" not in coords or "az" not in coords:
raise HALError("No alt/az offsets provided.")
# In arcsec
alt = coords["alt"] or 0.0
az = coords["az"] or 0.0
rot = coords["rot"] or 0.0
command.info(text=f"Offseting alt={alt:.3f}, az={az:.3f}")
slew_cmd = self._send_command(
command,
"tcc",
f"offset guide {az / 3600.0:g},{alt / 3600.0:g},{rot / 3600.0:g} /computed",
time_limit=config["timeouts"]["slew"],
raise_on_fail=False,
)
# "tcc track" in the new TCC is only Done successfully when all requested
# axes are in the "tracking" state. All other conditions mean the command
# failed, and the appropriate axisCmdState and axisErrCode will be set.
# However, if an axis becomes bad during the slew, the TCC will try to
# finish it anyway, so we need to explicitly check for bad bits.
try:
self.is_slewing = True
slew_cmd = await slew_cmd
finally:
self.is_slewing = False
if slew_cmd.status.did_fail:
str_axis_state = ",".join(tcc_model["axisCmdState"].value)
str_axis_code = ",".join(tcc_model["axisErrCode"].value)
command.error(
f"tcc track command failed with axis states: {str_axis_state} "
f"and error codes: {str_axis_code}"
)
raise HALError("Failed to complete slew: see TCC messages for details.")
if self.axes_are_clear() is False:
axis_bits = self.get_bad_axis_bits()
command.error(
"TCC slew command ended with some bad bits set: "
"0x{:x},0x{:x},0x{:x}".format(*axis_bits)
)
raise HALError("Failed to complete slew: see TCC messages for details.")
return True