diff --git a/tango/pyaml/attribute.py b/tango/pyaml/attribute.py index 9ddd267..88aedbc 100644 --- a/tango/pyaml/attribute.py +++ b/tango/pyaml/attribute.py @@ -1,3 +1,4 @@ +import copy import logging from typing import Optional, Tuple @@ -27,11 +28,16 @@ class ConfigModel(BaseModel): The unit of the attribute. range : tuple(min, max), optional Range of valid values. Use null for -∞ or +∞. + index : int, optional + Zero-based index into a SPECTRUM attribute. When set, the instance + behaves as a read-only scalar view of one vector element; writes are + always rejected and a SPECTRUM data_format is enforced on init. """ attribute: str unit: str = "" range: Optional[Tuple[Optional[float], Optional[float]]] = None + index: Optional[int] = None class Attribute(DeviceAccess, InitializableElement): @@ -52,7 +58,9 @@ class Attribute(DeviceAccess, InitializableElement): def __init__(self, cfg: ConfigModel, writable=True): super().__init__() self._cfg = cfg - self._writable = writable + self._index = cfg.index + # Indexed access never writes individual array elements. + self._writable = writable and self._index is None self._attribute_dev: tango.DeviceProxy = None self._attr_config: tango.AttributeConfig = None self._attribute_dev_name: str = None @@ -72,6 +80,13 @@ def initialize(self): self._attribute_dev.get_attribute_config(self._attr_name, wait=True) ) + if self._index is not None: + if self._attr_config.data_format != tango.AttrDataFormat.SPECTRUM: + raise pyaml.PyAMLException( + f"Tango attribute '{self._cfg.attribute}' is not a SPECTRUM; " + "indexed access requires a vector attribute." + ) + if self._writable: if self._attr_config.writable not in [ tango.AttrWriteType.READ_WRITE, @@ -97,8 +112,13 @@ def set(self, value: float): Raises ------ pyaml.PyAMLException - If the Tango write fails. + If the Tango write fails or this is an indexed attribute. """ + if self._index is not None: + raise pyaml.PyAMLException( + f"Indexed attribute '{self._cfg.attribute}[{self._index}]' " + "does not support individual element writes." + ) self._ensure_initialized() logger.log( logging.DEBUG, f"Setting asynchronously {self._cfg.attribute} to {value}" @@ -120,8 +140,13 @@ def set_and_wait(self, value: float): Raises ------ pyaml.PyAMLException - If the Tango write fails. + If the Tango write fails or this is an indexed attribute. """ + if self._index is not None: + raise pyaml.PyAMLException( + f"Indexed attribute '{self._cfg.attribute}[{self._index}]' " + "does not support individual element writes." + ) self._ensure_initialized() logger.log(logging.DEBUG, f"Setting {self._cfg.attribute} to {value}") try: @@ -150,7 +175,8 @@ def readback(self) -> Value: quality = Quality[ attr_value.quality.name.rsplit("_", 1)[1] ] # AttrQuality.ATTR_VALID gives Quality.VALID - value = Value(attr_value.value, quality, attr_value.time.todatetime()) + raw = attr_value.value[self._index] if self._index is not None else attr_value.value + value = Value(raw, quality, attr_value.time.todatetime()) except tango.DevFailed as df: raise tango_to_PyAMLException(df) return value @@ -173,10 +199,38 @@ def name(self) -> str: Returns ------- str - The attribute path (e.g., 'my/ps/device/current'). + The attribute path (e.g., 'my/ps/device/current'), or with index + notation when indexed (e.g., 'my/ps/device/current[2]'). + """ + if self._index is not None: + return f"{self._cfg.attribute}[{self._index}]" + return self._cfg.attribute + + def get_tango_attribute(self) -> str: + """ + Return the raw Tango attribute path without index decoration. + + Returns + ------- + str + Tango attribute path stored in the configuration. """ return self._cfg.attribute + def clone_with_tango_attribute(self, attribute: str) -> "Attribute": + """ + Return a shallow copy configured with another Tango attribute path. + + Parameters + ---------- + attribute : str + Tango attribute path to store in the cloned instance. + """ + new_obj = copy.copy(self) + new_obj._cfg = copy.copy(self._cfg) + new_obj._cfg.attribute = attribute + return new_obj + def measure_name(self) -> str: """ Return the short attribute name (last component). @@ -184,14 +238,21 @@ def measure_name(self) -> str: Returns ------- str - The attribute name (e.g., 'current'). + The attribute name (e.g., 'current'), with index notation when + indexed (e.g., 'current[2]'). """ - return self._cfg.attribute.rsplit("/", 1)[1] + short = self._cfg.attribute.rsplit("/", 1)[1] + if self._index is not None: + return f"{short}[{self._index}]" + return short def get(self) -> float: """ Get the last written value of the attribute. + For indexed attributes, returns the setpoint element at the configured + index (``w_value[index]``). + Returns ------- float @@ -204,7 +265,10 @@ def get(self) -> float: """ self._ensure_initialized() try: - return self._attribute_dev.read_attribute(self._attr_name).w_value + attr_val = self._attribute_dev.read_attribute(self._attr_name) + if self._index is not None: + return attr_val.w_value[self._index] + return attr_val.w_value except tango.DevFailed as df: raise tango_to_PyAMLException(df) diff --git a/tango/pyaml/attribute_list.py b/tango/pyaml/attribute_list.py index 4d1eddf..44156e5 100644 --- a/tango/pyaml/attribute_list.py +++ b/tango/pyaml/attribute_list.py @@ -85,6 +85,17 @@ def measure_name(self) -> str: """ return self._cfg.name + def get_tango_attributes(self) -> list[str]: + """ + Return the raw Tango attribute paths stored in the configuration. + + Returns + ------- + list[str] + Tango attribute paths in configured order. + """ + return self._cfg.attributes + def set(self, value: float): """ Write a value asynchronously to all Tango attributes. diff --git a/tango/pyaml/attribute_list_read_only.py b/tango/pyaml/attribute_list_read_only.py index 104e561..32828b5 100644 --- a/tango/pyaml/attribute_list_read_only.py +++ b/tango/pyaml/attribute_list_read_only.py @@ -1,13 +1,17 @@ import logging import pyaml -from .attribute_list import AttributeList, ConfigModel +from .attribute_list import AttributeList, ConfigModel as AttributeListConfigModel PYAMLCLASS: str = "AttributeListReadOnly" logger = logging.getLogger(__name__) +class ConfigModel(AttributeListConfigModel): + """Configuration model for a read-only Tango attribute list.""" + + class AttributeListReadOnly(AttributeList): """ Handle a list of Tango attributes using Tango Groups. diff --git a/tango/pyaml/attribute_read_only.py b/tango/pyaml/attribute_read_only.py index 1dccc03..614fe94 100644 --- a/tango/pyaml/attribute_read_only.py +++ b/tango/pyaml/attribute_read_only.py @@ -1,6 +1,6 @@ import logging -from .attribute import Attribute, ConfigModel +from .attribute import Attribute, ConfigModel as AttributeConfigModel from .tango_pyaml_utils import * PYAMLCLASS: str = "AttributeReadOnly" @@ -8,6 +8,10 @@ logger = logging.getLogger(__name__) +class ConfigModel(AttributeConfigModel): + """Configuration model for a read-only Tango attribute.""" + + class AttributeReadOnly(Attribute): """ Read-only Tango attribute. diff --git a/tango/pyaml/controlsystem.py b/tango/pyaml/controlsystem.py index 744eb35..6b82dce 100644 --- a/tango/pyaml/controlsystem.py +++ b/tango/pyaml/controlsystem.py @@ -1,10 +1,21 @@ import logging -import copy -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict +from pyaml import PyAMLException +from pyaml.configuration.catalog import Catalog from pyaml.control.controlsystem import ControlSystem from pyaml.control.deviceaccess import DeviceAccess from . import __version__ +from .attribute import Attribute, ConfigModel as AttributeConfigModel +from .attribute_list import AttributeList, ConfigModel as AttributeListConfigModel +from .attribute_list_read_only import ( + AttributeListReadOnly, + ConfigModel as AttributeListReadOnlyConfigModel, +) +from .attribute_read_only import ( + AttributeReadOnly, + ConfigModel as AttributeReadOnlyConfigModel, +) PYAMLCLASS: str = "TangoControlSystem" @@ -21,6 +32,8 @@ class ConfigModel(BaseModel): Name of the control system. tango_host : str Tango host URL. Default is the TANGO_HOST variable. + catalog : Catalog | str | None + Catalog instance or catalog name used to resolve PyAML device keys. debug_level : int Debug verbosity level. scalar_aggregator : str @@ -31,9 +44,12 @@ class ConfigModel(BaseModel): Device timeout in milli seconds. """ + model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") + name: str tango_host: str | None = None - debug_level: str = None + catalog: Catalog | str | None = None + debug_level: str | None = None lazy_devices: bool = True scalar_aggregator: str | None = "tango.pyaml.multi_attribute" vector_aggregator: str | None = None @@ -66,15 +82,6 @@ def __init__(self, cfg: ConfigModel): f" and TANGO_HOST={self._cfg.tango_host}", ) - def __newref(self, obj, new_name: str): - # Shallow copy the object - newObj = copy.copy(obj) - # Shallow copy the config object - # to allow a new attribute name - newObj._cfg = copy.copy(obj._cfg) - newObj._cfg.attribute = new_name - return newObj - def attach_array(self, devs: list[DeviceAccess]) -> list[DeviceAccess]: return self._attach(devs) @@ -86,17 +93,112 @@ def _attach(self, devs: list[DeviceAccess]) -> list[DeviceAccess]: newDevs = [] for d in devs: if d is not None: - if self._cfg.tango_host: - full_name = "//" + self._cfg.tango_host + "/" + d._cfg.attribute + try: + attribute = d.get_tango_attribute() + except AttributeError as exc: + raise PyAMLException( + f"Cannot attach device {d!r}: expected a Tango attribute with get_tango_attribute()." + ) from exc + + tango_host = self.get_tango_host() + if tango_host: + full_name = "//" + tango_host + "/" + attribute else: - full_name = d._cfg.attribute + full_name = attribute if full_name not in self.__devices: - self.__devices[full_name] = self.__newref(d, full_name) + self.__devices[full_name] = d.clone_with_tango_attribute(full_name) newDevs.append(self.__devices[full_name]) else: newDevs.append(None) return newDevs + def get_device(self, ref: str | BaseModel | None) -> DeviceAccess | None: + """ + Resolve a public device reference for this Tango control system. + + YAML references are opaque strings resolved by the configured backend + catalog. Public Python APIs may pass Tango backend configuration models. + Already constructed DeviceAccess instances are intentionally rejected: + attach() remains the internal compatibility API for those. + """ + if ref is None: + return None + + if isinstance(ref, DeviceAccess): + raise PyAMLException( + "TangoControlSystem.get_device() expects a catalog key, Tango " + "ConfigModel, or None. Use attach() for already constructed " + "DeviceAccess objects." + ) + + if isinstance(ref, str): + catalog = self.get_catalog() + if catalog is None: + raise PyAMLException( + f"TangoControlSystem '{self.name()}' has no catalog configured." + ) + if not isinstance(catalog, Catalog): + raise PyAMLException( + f"TangoControlSystem '{self.name()}' has unsupported catalog type " + f"{type(catalog).__name__}." + ) + try: + resolve = catalog.resolve + except AttributeError as exc: + raise PyAMLException( + f"Catalog '{catalog.get_name()}' cannot resolve key '{ref}': " + "missing backend resolve() method." + ) from exc + device = resolve(ref, self) + return self.attach([device])[0] + + if isinstance(ref, AttributeReadOnlyConfigModel): + return self.attach([AttributeReadOnly(ref)])[0] + + if isinstance(ref, AttributeConfigModel): + return self.attach([Attribute(ref)])[0] + + if isinstance(ref, AttributeListReadOnlyConfigModel): + return AttributeListReadOnly(self._attach_attribute_list_config(ref)) + + if isinstance(ref, AttributeListConfigModel): + return AttributeList(self._attach_attribute_list_config(ref)) + + if isinstance(ref, BaseModel): + raise PyAMLException( + f"TangoControlSystem cannot construct a device from config model " + f"{type(ref).__name__}." + ) + + raise PyAMLException( + f"TangoControlSystem.get_device() cannot resolve references of type " + f"{type(ref).__name__}; expected str, Tango ConfigModel, or None." + ) + + def get_catalog_config(self) -> Catalog | str | None: + """ + Return the catalog configured for this Tango control system. + + PyAML keeps this value as backend configuration only; runtime catalog + resolution is owned by ``get_device()``. + """ + return self._cfg.catalog + + def _attach_attribute_list_config( + self, cfg: AttributeListConfigModel + ) -> AttributeListConfigModel: + tango_host = self.get_tango_host() + if not tango_host: + return cfg + + return cfg.model_copy( + update={ + "attributes": [ + f"//{tango_host}/{attribute}" for attribute in cfg.attributes + ] + } + ) + def name(self) -> str: """ Return the name of the control system. @@ -108,6 +210,17 @@ def name(self) -> str: """ return self._cfg.name + def get_tango_host(self) -> str | None: + """ + Return the Tango host configured for this control system. + + Returns + ------- + str | None + Tango host URL, or ``None`` when unconfigured. + """ + return self._cfg.tango_host + def scalar_aggregator(self) -> str | None: """ Returns the module name used for handling aggregator of DeviceAccess diff --git a/tango/pyaml/static_catalog.py b/tango/pyaml/static_catalog.py new file mode 100644 index 0000000..3a97428 --- /dev/null +++ b/tango/pyaml/static_catalog.py @@ -0,0 +1,91 @@ +from pydantic import ConfigDict + +from pyaml import PyAMLException +from pyaml.configuration.catalog import Catalog, CatalogConfigModel +from pyaml.control.deviceaccess import DeviceAccess + +from .static_catalog_entry import StaticCatalogEntry + +PYAMLCLASS = "StaticCatalog" + + +class ConfigModel(CatalogConfigModel): + """ + Configuration model for a static catalog. + + Attributes + ---------- + name : str + Catalog identifier. + entries : list[StaticCatalogEntry] + Explicit list of key-to-device mappings. Must contain at least one + entry, and keys must be unique within the catalog. + """ + + model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") + + entries: list[StaticCatalogEntry] + + +class StaticCatalog(Catalog): + """ + Catalog backed by a fixed list of key-to-device mappings. + + All entries are validated at construction time: the list must be + non-empty and every key must be unique. Resolution is an O(1) dictionary + lookup; no Tango connection is required. + + Parameters + ---------- + cfg : ConfigModel + Configuration containing the catalog name and its entries. + + Raises + ------ + pyaml.PyAMLException + If ``cfg.entries`` is empty or contains duplicate keys. + """ + + def __init__(self, cfg: ConfigModel): + super().__init__(cfg) + if len(cfg.entries) == 0: + raise PyAMLException( + "StaticCatalog.entries must contain at least one entry" + ) + self._refs: dict[str, DeviceAccess] = {} + for entry in cfg.entries: + key = entry.get_key() + if key in self._refs: + raise PyAMLException( + f"StaticCatalog.entries contains duplicate key '{key}'" + ) + self._refs[key] = entry.get_device() + + def resolve(self, key: str, control_system: object | None = None) -> DeviceAccess: + """ + Return the device associated with ``key``. + + Parameters + ---------- + key : str + Catalog key to resolve. + control_system : object | None + Optional backend context. Static catalogs do not need it, but the + argument keeps the backend catalog API uniform. + + Returns + ------- + DeviceAccess + The device access object registered under ``key``. + + Raises + ------ + pyaml.PyAMLException + If ``key`` is not present in the catalog. + """ + try: + return self._refs[key] + except KeyError as exc: + raise PyAMLException( + f"Catalog '{self.get_name()}' cannot resolve key '{key}'" + ) from exc diff --git a/tango/pyaml/static_catalog_entry.py b/tango/pyaml/static_catalog_entry.py new file mode 100644 index 0000000..600fcb1 --- /dev/null +++ b/tango/pyaml/static_catalog_entry.py @@ -0,0 +1,45 @@ +from pydantic import BaseModel, ConfigDict + +from pyaml.control.deviceaccess import DeviceAccess + +PYAMLCLASS = "StaticCatalogEntry" + + +class ConfigModel(BaseModel): + """ + Configuration model for a static catalog entry. + + Attributes + ---------- + key : str + Catalog key used to look up the device. + device : DeviceAccess + Device access object returned when the key is resolved. + """ + + model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") + + key: str + device: DeviceAccess + + +class StaticCatalogEntry: + """ + A single key-to-device mapping in a static catalog. + + Parameters + ---------- + cfg : ConfigModel + Configuration containing the key and device. + """ + + def __init__(self, cfg: ConfigModel): + self._cfg = cfg + + def get_key(self) -> str: + """Return the catalog key for this entry.""" + return self._cfg.key + + def get_device(self) -> DeviceAccess: + """Return the device access object associated with this entry.""" + return self._cfg.device diff --git a/tango/pyaml/tango_catalog.py b/tango/pyaml/tango_catalog.py new file mode 100644 index 0000000..c13b88c --- /dev/null +++ b/tango/pyaml/tango_catalog.py @@ -0,0 +1,308 @@ +import tango +import pyaml + +from pydantic import ConfigDict +from pyaml.configuration.catalog import Catalog, CatalogConfigModel +from pyaml.control.deviceaccess import DeviceAccess + +from .attribute import Attribute, ConfigModel as AttributeConfigModel +from .attribute_read_only import AttributeReadOnly +from .tango_pyaml_utils import tango_to_PyAMLException, to_float_or_none + +PYAMLCLASS = "TangoCatalog" + + +class ConfigModel(CatalogConfigModel): + """ + Configuration model for a Tango catalog. + + Attributes + ---------- + name : str + Catalog identifier. + disconnected : bool + If true, resolve Tango attribute names without querying Tango. + """ + + model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") + + disconnected: bool = False + + +class TangoCatalog(Catalog): + """ + Catalog resolving keys that are direct Tango attribute references. + + Keys can be plain Tango attribute paths (``domain/family/member/attribute``) + or indexed references into a SPECTRUM attribute + (``domain/family/member/attribute@index``). + """ + + _WRITABLE_TYPES = { + tango.AttrWriteType.READ_WRITE, + tango.AttrWriteType.WRITE, + tango.AttrWriteType.READ_WITH_WRITE, + } + + def __init__(self, cfg: ConfigModel): + super().__init__(cfg) + # Resolved DeviceAccess objects are bound to one control-system context + # because metadata lookup depends on that control system's Tango host. + self._refs: dict[tuple[int, str], DeviceAccess] = {} + self._data_formats: dict[tuple[int, str], tango.AttrDataFormat] = {} + + def resolve(self, key: str, control_system: object | None = None) -> DeviceAccess: + """ + Resolve a Tango attribute reference into a DeviceAccess. + + Supports two key formats: + + - ``domain/family/member/attribute`` — resolves to a scalar + :class:`~tango.pyaml.attribute.Attribute` or + :class:`~tango.pyaml.attribute_read_only.AttributeReadOnly`. + - ``domain/family/member/attribute@index`` — resolves to a scalar view + of one element in a SPECTRUM attribute + (:class:`~tango.pyaml.attribute_indexed.AttributeIndexed` or + :class:`~tango.pyaml.attribute_indexed_read_only.AttributeIndexedReadOnly`). + + In connected mode (``disconnected=False``) indexed keys additionally verify + that the Tango attribute is a SPECTRUM. + + Parameters + ---------- + key : str + Plain attribute path or indexed path (``attribute@index``). + control_system : object + Tango control-system context used for Tango host handling. + + Returns + ------- + DeviceAccess + Resolved device access, cached for subsequent calls. + + Raises + ------ + pyaml.PyAMLException + If the key is malformed, the Tango call fails, or (in connected + mode) an indexed key targets a non-SPECTRUM attribute. + """ + self._validate_control_system(control_system, key) + attr_path, index = self._parse_key(key) + cache_key = (id(control_system), key) + + if cache_key not in self._refs: + if index is not None: + if self.is_disconnected(): + self._refs[cache_key] = self._build_disconnected_indexed( + cache_key, attr_path, index + ) + else: + self._refs[cache_key] = self._build_connected_indexed( + cache_key, control_system, attr_path, index + ) + else: + if self.is_disconnected(): + self._refs[cache_key] = self._build_disconnected_attribute( + cache_key, key + ) + else: + self._refs[cache_key] = self._build_connected_attribute( + cache_key, control_system, key + ) + + return self._refs[cache_key] + + def is_disconnected(self) -> bool: + return self._cfg.disconnected + + def get_data_format( + self, key: str, control_system: object | None = None + ) -> tango.AttrDataFormat: + """ + Return the Tango data format for a resolved attribute. + + Parameters + ---------- + key : str + Catalog key (must have been resolved at least once, or will be + resolved now). + control_system : object + Tango control-system context used for Tango host handling. + + Returns + ------- + tango.AttrDataFormat + Data format reported by Tango, or ``FMT_UNKNOWN`` in disconnected + mode. + """ + self.resolve(key, control_system) + return self._data_formats[(id(control_system), key)] + + def _validate_control_system(self, control_system: object | None, key: str) -> None: + from .controlsystem import TangoControlSystem + + if control_system is None: + raise pyaml.PyAMLException( + f"Tango catalog '{self.get_name()}' needs a TangoControlSystem context " + f"before resolving key '{key}'" + ) + + if not isinstance(control_system, TangoControlSystem): + raise pyaml.PyAMLException( + f"Tango catalog '{self.get_name()}' can only resolve through TangoControlSystem" + ) + + def _parse_key(self, key: str) -> tuple[str, int | None]: + """ + Validate and split a catalog key into ``(attr_path, index)``. + + The ``index`` is ``None`` for plain attribute paths and an integer for + indexed paths (``attr_path@index``). + + Raises + ------ + pyaml.PyAMLException + If the key is not a string, the attribute path does not have + exactly four slash-separated components, or the index suffix is + not a valid integer. + """ + if not isinstance(key, str): + raise pyaml.PyAMLException( + f"Tango catalog '{self.get_name()}' expects string keys, got {type(key).__name__}" + ) + + if "@" in key: + attr_path, idx_str = key.rsplit("@", 1) + try: + index = int(idx_str) + except ValueError as exc: + raise pyaml.PyAMLException( + f"Tango catalog '{self.get_name()}' invalid index '{idx_str}' in key '{key}'." + ) from exc + else: + attr_path = key + index = None + + parts = attr_path.split("/") + if len(parts) != 4 or any(part == "" for part in parts): + raise pyaml.PyAMLException( + f"Tango catalog '{self.get_name()}' cannot resolve invalid Tango attribute " + f"reference '{key}'. Expected 'domain/family/member/attribute' or " + f"'domain/family/member/attribute@index'." + ) + + return attr_path, index + + def _build_disconnected_attribute( + self, cache_key: tuple[int, str], key: str + ) -> DeviceAccess: + # In disconnected mode, keep all metadata local. In particular, setting + # range avoids Attribute.get_range() from lazily querying Tango later. + self._data_formats[cache_key] = tango.AttrDataFormat.FMT_UNKNOWN + return Attribute(AttributeConfigModel(attribute=key, range=(None, None))) + + def _build_disconnected_indexed( + self, cache_key: tuple[int, str], attr_path: str, index: int + ) -> DeviceAccess: + # Cannot verify SPECTRUM in disconnected mode; store FMT_UNKNOWN. + self._data_formats[cache_key] = tango.AttrDataFormat.FMT_UNKNOWN + return Attribute( + AttributeConfigModel(attribute=attr_path, index=index, range=(None, None)) + ) + + def _build_connected_attribute( + self, cache_key: tuple[int, str], control_system: object, key: str + ) -> DeviceAccess: + tango_attr_name = self._tango_attribute_name(control_system, key) + try: + # AttributeProxy.get_config() is the most direct way to retrieve + # writability, unit, range and data format from Tango. + attr_config = tango.AttributeProxy(tango_attr_name).get_config() + except tango.DevFailed as df: + pyaml_exception = tango_to_PyAMLException(df) + raise pyaml.PyAMLException( + f"Tango catalog '{self.get_name()}' cannot resolve '{key}': {pyaml_exception}" + ) from df + + unit, attr_range, data_format, writable = self._read_config_metadata( + attr_config, key + ) + self._data_formats[cache_key] = data_format + cfg = AttributeConfigModel(attribute=key, unit=unit, range=attr_range) + + if writable in self._WRITABLE_TYPES: + return Attribute(cfg) + return AttributeReadOnly(cfg) + + def _build_connected_indexed( + self, + cache_key: tuple[int, str], + control_system: object, + attr_path: str, + index: int, + ) -> DeviceAccess: + """ + Build an indexed device access after verifying the attribute is a SPECTRUM. + + Raises + ------ + pyaml.PyAMLException + If the Tango call fails or the attribute is not a SPECTRUM. + """ + key = f"{attr_path}@{index}" + tango_attr_name = self._tango_attribute_name(control_system, attr_path) + try: + attr_config = tango.AttributeProxy(tango_attr_name).get_config() + except tango.DevFailed as df: + pyaml_exception = tango_to_PyAMLException(df) + raise pyaml.PyAMLException( + f"Tango catalog '{self.get_name()}' cannot resolve '{key}': {pyaml_exception}" + ) from df + + unit, attr_range, data_format, writable = self._read_config_metadata( + attr_config, key + ) + if data_format != tango.AttrDataFormat.SPECTRUM: + raise pyaml.PyAMLException( + f"Tango catalog '{self.get_name()}' cannot use '{key}' as an indexed " + "key: the Tango attribute is not a SPECTRUM." + ) + + self._data_formats[cache_key] = tango.AttrDataFormat.SPECTRUM + cfg = AttributeConfigModel( + attribute=attr_path, index=index, unit=unit, range=attr_range + ) + + if writable in self._WRITABLE_TYPES: + return Attribute(cfg) + return AttributeReadOnly(cfg) + + def _read_config_metadata( + self, attr_config, key: str + ) -> tuple[ + str, + tuple[float | None, float | None], + tango.AttrDataFormat, + tango.AttrWriteType, + ]: + try: + unit = attr_config.unit or "" + attr_range = ( + to_float_or_none(attr_config.min_value), + to_float_or_none(attr_config.max_value), + ) + data_format = attr_config.data_format + writable = attr_config.writable + except AttributeError as exc: + raise pyaml.PyAMLException( + f"Tango catalog '{self.get_name()}' cannot resolve '{key}': " + f"incomplete Tango attribute config, missing '{exc.name}'." + ) from exc + + return unit, attr_range, data_format, writable + + def _tango_attribute_name(self, control_system: object, attr_path: str) -> str: + tango_host = control_system.get_tango_host() + if tango_host: + return f"//{tango_host}/{attr_path}" + return attr_path diff --git a/tests/mocked_device_proxy.py b/tests/mocked_device_proxy.py index 7e2c220..f6d7f7f 100644 --- a/tests/mocked_device_proxy.py +++ b/tests/mocked_device_proxy.py @@ -10,11 +10,15 @@ def __init__( writable=tango.AttrWriteType.READ_WRITE, min_value: str = "", max_value: str = "", + unit: str = "", + data_format=tango.AttrDataFormat.SCALAR, ): self.name = name self.writable = writable + self.unit = unit self.min_value = min_value self.max_value = max_value + self.data_format = data_format class MockedDeviceAttribute: @@ -134,13 +138,18 @@ def ping(self, green_mode=None, wait=True, timeout=True) -> int: class MockedAttributeProxy(MagicMock): - def __init__(self, attr_full_name, *args, **kwargs): + def __init__(self, attr_full_name, attr_config=None, *args, **kwargs): super().__init__(*args, **kwargs) self.attr_full_name = attr_full_name self.device_name, self._attr_name = attr_full_name.rsplit("/", 1) self.device_proxy = MockedDeviceProxy(self.device_name) + # Tests can inject a specific config to exercise catalog metadata + # handling without creating a dedicated proxy class each time. + self.attr_config = attr_config def get_config(self, *args, **kwds): + if self.attr_config is not None: + return self.attr_config return self.device_proxy.get_attribute_config(self.name(), *args, **kwds) def read(self, *args, **kwds): diff --git a/tests/test_attribute_indexed.py b/tests/test_attribute_indexed.py new file mode 100644 index 0000000..3d36aa9 --- /dev/null +++ b/tests/test_attribute_indexed.py @@ -0,0 +1,153 @@ +import numpy as np +import pytest +import tango +from unittest.mock import patch + +import pyaml + +from tango.pyaml.attribute import Attribute, ConfigModel +from tango.pyaml.attribute_read_only import AttributeReadOnly +from .mocked_device_proxy import MockedAttributeInfoEx, MockedDeviceProxy, MockedDeviceAttribute + + +SPECTRUM_ARRAY = np.array([10.0, 20.0, 30.0]) + + +class MockedSpectrumDeviceProxy(MockedDeviceProxy): + """DeviceProxy that returns a SPECTRUM (READ_WRITE) attribute.""" + + def attribute_query(self, name): + return MockedAttributeInfoEx( + name, + writable=tango.AttrWriteType.READ_WRITE, + data_format=tango.AttrDataFormat.SPECTRUM, + unit="mm", + ) + + def read_attribute(self, name): + return MockedDeviceAttribute(name, SPECTRUM_ARRAY) + + +class MockedSpectrumRODeviceProxy(MockedDeviceProxy): + """DeviceProxy that returns a SPECTRUM (READ) attribute.""" + + def attribute_query(self, name): + return MockedAttributeInfoEx( + name, + writable=tango.AttrWriteType.READ, + data_format=tango.AttrDataFormat.SPECTRUM, + unit="mm", + ) + + def read_attribute(self, name): + return MockedDeviceAttribute(name, SPECTRUM_ARRAY) + + +class MockedScalarDeviceProxy(MockedDeviceProxy): + """DeviceProxy that returns a SCALAR attribute.""" + + def attribute_query(self, name): + return MockedAttributeInfoEx( + name, + data_format=tango.AttrDataFormat.SCALAR, + ) + + +# --- Attribute with index --- + + +def test_attribute_indexed_get_returns_w_value_at_index(): + cfg = ConfigModel(attribute="domain/family/member/position", index=1, unit="mm") + with patch("tango.DeviceProxy", new=MockedSpectrumDeviceProxy): + attr = Attribute(cfg) + assert attr.get() == SPECTRUM_ARRAY[1] + + +def test_attribute_indexed_readback_returns_value_at_index(): + cfg = ConfigModel(attribute="domain/family/member/position", index=0, unit="mm") + with patch("tango.DeviceProxy", new=MockedSpectrumDeviceProxy): + attr = Attribute(cfg) + rb = attr.readback() + assert rb.value == SPECTRUM_ARRAY[0] + + +def test_attribute_indexed_set_raises(): + cfg = ConfigModel(attribute="domain/family/member/position", index=0) + with patch("tango.DeviceProxy", new=MockedSpectrumDeviceProxy): + attr = Attribute(cfg) + with pytest.raises(pyaml.PyAMLException, match="does not support individual element writes"): + attr.set(99.0) + + +def test_attribute_indexed_set_and_wait_raises(): + cfg = ConfigModel(attribute="domain/family/member/position", index=0) + with patch("tango.DeviceProxy", new=MockedSpectrumDeviceProxy): + attr = Attribute(cfg) + with pytest.raises(pyaml.PyAMLException, match="does not support individual element writes"): + attr.set_and_wait(99.0) + + +def test_attribute_indexed_name_includes_index(): + cfg = ConfigModel(attribute="domain/family/member/position", index=2) + attr = Attribute(cfg) + assert attr.name() == "domain/family/member/position[2]" + + +def test_attribute_indexed_measure_name_includes_index(): + cfg = ConfigModel(attribute="domain/family/member/position", index=2) + attr = Attribute(cfg) + assert attr.measure_name() == "position[2]" + + +def test_attribute_indexed_unit(): + cfg = ConfigModel(attribute="domain/family/member/position", index=0, unit="mm") + attr = Attribute(cfg) + assert attr.unit() == "mm" + + +def test_attribute_indexed_raises_when_not_spectrum(): + cfg = ConfigModel(attribute="domain/family/member/current", index=0) + with patch("tango.DeviceProxy", new=MockedScalarDeviceProxy): + attr = Attribute(cfg) + with pytest.raises(pyaml.PyAMLException, match="not a SPECTRUM"): + attr.get() + + +def test_attribute_indexed_range_from_config(): + cfg = ConfigModel( + attribute="domain/family/member/position", index=0, unit="mm", range=(-5.0, 5.0) + ) + attr = Attribute(cfg) + assert attr.get_range() == [-5.0, 5.0] + + +# --- AttributeReadOnly with index --- + + +def test_attribute_indexed_read_only_get_returns_measured_value(): + cfg = ConfigModel(attribute="domain/family/member/position", index=2, unit="mm") + with patch("tango.DeviceProxy", new=MockedSpectrumRODeviceProxy): + attr = AttributeReadOnly(cfg) + assert attr.get() == SPECTRUM_ARRAY[2] + + +def test_attribute_indexed_read_only_readback_returns_value_at_index(): + cfg = ConfigModel(attribute="domain/family/member/position", index=1, unit="mm") + with patch("tango.DeviceProxy", new=MockedSpectrumRODeviceProxy): + attr = AttributeReadOnly(cfg) + assert attr.readback().value == SPECTRUM_ARRAY[1] + + +def test_attribute_indexed_read_only_set_raises(): + cfg = ConfigModel(attribute="domain/family/member/position", index=0) + with patch("tango.DeviceProxy", new=MockedSpectrumRODeviceProxy): + attr = AttributeReadOnly(cfg) + with pytest.raises(pyaml.PyAMLException): + attr.set(1.0) + + +def test_attribute_indexed_read_only_get_equals_readback(): + cfg = ConfigModel(attribute="domain/family/member/position", index=0, unit="mm") + with patch("tango.DeviceProxy", new=MockedSpectrumRODeviceProxy): + attr = AttributeReadOnly(cfg) + assert attr.get() == attr.readback().value diff --git a/tests/test_controlsystem.py b/tests/test_controlsystem.py index f8f7255..45d5142 100644 --- a/tests/test_controlsystem.py +++ b/tests/test_controlsystem.py @@ -1,11 +1,26 @@ import logging -from tango.pyaml.controlsystem import TangoControlSystem - +import pyaml +import pytest +from tango.pyaml.static_catalog import ConfigModel as StaticCatalogConfigModel +from tango.pyaml.static_catalog import StaticCatalog +from tango.pyaml.static_catalog_entry import ( + ConfigModel as StaticCatalogEntryConfigModel, +) +from tango.pyaml.static_catalog_entry import StaticCatalogEntry +from tango.pyaml.controlsystem import ConfigModel, TangoControlSystem from .mocked_device_proxy import MockedDeviceProxy from unittest.mock import patch -from tango.pyaml.attribute import Attribute +from tango.pyaml.attribute_list import AttributeList +from tango.pyaml.attribute_list import ConfigModel as AttributeListConfigModel +from tango.pyaml.attribute_list_read_only import AttributeListReadOnly +from tango.pyaml.attribute_list_read_only import ( + ConfigModel as AttributeListReadOnlyConfigModel, +) +from tango.pyaml.attribute import Attribute, ConfigModel as AttributeConfigModel +from tango.pyaml.attribute_read_only import AttributeReadOnly +from tango.pyaml.attribute_read_only import ConfigModel as AttributeReadOnlyConfigModel from tango.pyaml import __version__ @@ -32,3 +47,205 @@ def test_laziness_init_cs_attribute(config_tango_cs_lazy_default, config): attr.set_and_wait(42.0) mock_ctor.assert_called_once() assert attr.get() == 42.0 + + +def test_catalog_can_be_configured_and_resolved(): + device = AttributeReadOnly( + AttributeConfigModel(attribute="sys/tg_test/1/float_scalar", unit="A") + ) + catalog = StaticCatalog( + StaticCatalogConfigModel( + name="device-catalog", + entries=[ + StaticCatalogEntry( + StaticCatalogEntryConfigModel( + key="BPM_C01-01/x", + device=device, + ) + ) + ], + ) + ) + cs = TangoControlSystem( + ConfigModel( + name="test_tango_cs", + tango_host="tangodb:10000", + catalog=catalog, + ) + ) + + cs.set_catalog(catalog) + resolved = cs.get_device("BPM_C01-01/x") + + assert cs.get_catalog_config() is catalog + assert cs.get_catalog() is catalog + assert catalog.resolve("BPM_C01-01/x") is device + assert resolved.name() == "//tangodb:10000/sys/tg_test/1/float_scalar" + + +def test_configured_catalog_instance_is_not_runtime_catalog_until_set(): + device = Attribute(AttributeConfigModel(attribute="sys/tg_test/1/float_scalar")) + catalog = StaticCatalog( + StaticCatalogConfigModel( + name="device-catalog", + entries=[ + StaticCatalogEntry( + StaticCatalogEntryConfigModel(key="BPM_C01-01/x", device=device) + ) + ], + ) + ) + cs = TangoControlSystem(ConfigModel(name="test_tango_cs", catalog=catalog)) + + assert cs.get_catalog_config() is catalog + assert cs.get_catalog() is None + with pytest.raises(pyaml.PyAMLException, match="has no catalog configured"): + cs.get_device("BPM_C01-01/x") + + +def test_get_device_builds_attribute_from_config_model(): + cs = TangoControlSystem( + ConfigModel(name="test_tango_cs", tango_host="tangodb:10000") + ) + + resolved = cs.get_device( + AttributeConfigModel(attribute="sys/tg_test/1/float_scalar", unit="A") + ) + + assert isinstance(resolved, Attribute) + assert resolved.name() == "//tangodb:10000/sys/tg_test/1/float_scalar" + assert resolved.unit() == "A" + + +def test_get_device_builds_read_only_attribute_from_config_model(): + cs = TangoControlSystem( + ConfigModel(name="test_tango_cs", tango_host="tangodb:10000") + ) + + resolved = cs.get_device( + AttributeReadOnlyConfigModel(attribute="sys/tg_test/1/float_scalar", unit="A") + ) + + assert isinstance(resolved, AttributeReadOnly) + assert resolved.name() == "//tangodb:10000/sys/tg_test/1/float_scalar" + assert resolved.unit() == "A" + + +def test_get_device_builds_attribute_list_from_config_model(): + cs = TangoControlSystem( + ConfigModel(name="test_tango_cs", tango_host="tangodb:10000") + ) + + resolved = cs.get_device( + AttributeListConfigModel( + name="group", + attributes=[ + "sys/tg_test/1/float_scalar", + "sys/tg_test/2/float_scalar", + ], + unit="A", + ) + ) + + assert isinstance(resolved, AttributeList) + assert not isinstance(resolved, AttributeListReadOnly) + assert resolved.name() == "group" + assert resolved.unit() == "A" + assert resolved.get_tango_attributes() == [ + "//tangodb:10000/sys/tg_test/1/float_scalar", + "//tangodb:10000/sys/tg_test/2/float_scalar", + ] + + +def test_get_device_builds_read_only_attribute_list_from_config_model(): + cs = TangoControlSystem( + ConfigModel(name="test_tango_cs", tango_host="tangodb:10000") + ) + + resolved = cs.get_device( + AttributeListReadOnlyConfigModel( + name="group", + attributes=[ + "sys/tg_test/1/float_scalar", + "sys/tg_test/2/float_scalar", + ], + unit="A", + ) + ) + + assert isinstance(resolved, AttributeListReadOnly) + assert resolved.name() == "group" + assert resolved.unit() == "A" + assert resolved.get_tango_attributes() == [ + "//tangodb:10000/sys/tg_test/1/float_scalar", + "//tangodb:10000/sys/tg_test/2/float_scalar", + ] + + +def test_get_device_none_returns_none(): + cs = TangoControlSystem(ConfigModel(name="test_tango_cs")) + + assert cs.get_device(None) is None + + +def test_get_device_rejects_preconstructed_device_access(config): + cs = TangoControlSystem(ConfigModel(name="test_tango_cs")) + + with pytest.raises(pyaml.PyAMLException, match="Use attach\\(\\)"): + cs.get_device(Attribute(config)) + + +def test_get_device_requires_catalog_for_string_key(): + cs = TangoControlSystem(ConfigModel(name="test_tango_cs")) + + with pytest.raises(pyaml.PyAMLException, match="has no catalog configured"): + cs.get_device("BPM_C01-01/x") + + +def test_get_device_rejects_unloaded_named_catalog(): + cs = TangoControlSystem(ConfigModel(name="test_tango_cs", catalog="device-catalog")) + + with pytest.raises(pyaml.PyAMLException, match="has no catalog configured"): + cs.get_device("BPM_C01-01/x") + + +def test_get_device_reports_unknown_catalog_key(): + device = Attribute(AttributeConfigModel(attribute="sys/tg_test/1/float_scalar")) + catalog = StaticCatalog( + StaticCatalogConfigModel( + name="device-catalog", + entries=[ + StaticCatalogEntry( + StaticCatalogEntryConfigModel(key="BPM_C01-01/x", device=device) + ) + ], + ) + ) + cs = TangoControlSystem(ConfigModel(name="test_tango_cs", catalog=catalog)) + cs.set_catalog(catalog) + + with pytest.raises(pyaml.PyAMLException, match="cannot resolve key 'BPM_C01-02/x'"): + cs.get_device("BPM_C01-02/x") + + +def test_get_device_rejects_unknown_reference_type(): + cs = TangoControlSystem(ConfigModel(name="test_tango_cs")) + + with pytest.raises(pyaml.PyAMLException, match="type int"): + cs.get_device(42) + + +def test_named_catalog_config_is_accepted(): + cfg = ConfigModel(name="test_tango_cs", catalog="device-catalog") + cs = TangoControlSystem(cfg) + + assert cfg.catalog == "device-catalog" + assert cs.get_catalog_config() == "device-catalog" + + +def test_tango_control_system_exposes_tango_host(): + cs = TangoControlSystem( + ConfigModel(name="test_tango_cs", tango_host="tangodb:10000") + ) + + assert cs.get_tango_host() == "tangodb:10000" diff --git a/tests/test_static_catalog.py b/tests/test_static_catalog.py new file mode 100644 index 0000000..670a285 --- /dev/null +++ b/tests/test_static_catalog.py @@ -0,0 +1,146 @@ +import pytest + +import pyaml +from tango.pyaml.attribute import Attribute, ConfigModel as AttributeConfigModel +from tango.pyaml.attribute_read_only import AttributeReadOnly +from tango.pyaml.controlsystem import ConfigModel as TangoControlSystemConfigModel +from tango.pyaml.controlsystem import TangoControlSystem +from tango.pyaml.static_catalog import ConfigModel as StaticCatalogConfigModel +from tango.pyaml.static_catalog import StaticCatalog +from tango.pyaml.static_catalog_entry import ConfigModel as EntryConfigModel +from tango.pyaml.static_catalog_entry import StaticCatalogEntry + + +def make_attribute( + path: str = "domain/family/member/attr", unit: str = "mm" +) -> Attribute: + return Attribute(AttributeConfigModel(attribute=path, unit=unit)) + + +def make_entry(key: str, device=None) -> StaticCatalogEntry: + if device is None: + device = make_attribute() + return StaticCatalogEntry(EntryConfigModel(key=key, device=device)) + + +def make_catalog(name: str = "static", entries=None) -> StaticCatalog: + if entries is None: + entries = [make_entry("default/key")] + return StaticCatalog(StaticCatalogConfigModel(name=name, entries=entries)) + + +# --- StaticCatalogEntry --- + + +def test_static_catalog_entry_returns_key(): + entry = make_entry("BPM/x") + assert entry.get_key() == "BPM/x" + + +def test_static_catalog_entry_returns_device(): + device = make_attribute("sr/bpm/c01-01/x", unit="mm") + entry = make_entry("BPM/x", device=device) + assert entry.get_device() is device + + +# --- StaticCatalog construction --- + + +def test_static_catalog_rejects_empty_entries(): + with pytest.raises(pyaml.PyAMLException, match="must contain at least one entry"): + StaticCatalog(StaticCatalogConfigModel(name="empty", entries=[])) + + +def test_static_catalog_rejects_duplicate_keys(): + entries = [make_entry("BPM/x"), make_entry("BPM/x")] + with pytest.raises(pyaml.PyAMLException, match="duplicate key 'BPM/x'"): + StaticCatalog(StaticCatalogConfigModel(name="dup", entries=entries)) + + +def test_static_catalog_get_name(): + catalog = make_catalog(name="my-catalog") + assert catalog.get_name() == "my-catalog" + + +# --- StaticCatalog.resolve --- + + +def test_static_catalog_resolves_known_key(): + device = make_attribute("sr/bpm/c01-01/position") + catalog = make_catalog(entries=[make_entry("BPM_C01-01/x", device=device)]) + + resolved = catalog.resolve("BPM_C01-01/x") + + assert resolved is device + + +def test_static_catalog_resolves_multiple_entries(): + device_x = make_attribute("sr/bpm/c01-01/x") + device_y = make_attribute("sr/bpm/c01-01/y") + catalog = make_catalog( + entries=[ + make_entry("BPM/x", device=device_x), + make_entry("BPM/y", device=device_y), + ] + ) + + assert catalog.resolve("BPM/x") is device_x + assert catalog.resolve("BPM/y") is device_y + + +def test_static_catalog_raises_on_unknown_key(): + catalog = make_catalog(entries=[make_entry("BPM/x")]) + + with pytest.raises(pyaml.PyAMLException, match="cannot resolve key 'BPM/y'"): + catalog.resolve("BPM/y") + + +def test_static_catalog_error_includes_catalog_name(): + catalog = make_catalog(name="my-catalog", entries=[make_entry("BPM/x")]) + + with pytest.raises(pyaml.PyAMLException, match="Catalog 'my-catalog'"): + catalog.resolve("missing") + + +def test_static_catalog_is_shared_across_control_systems(): + device = make_attribute() + catalog = make_catalog(entries=[make_entry("BPM/x", device=device)]) + live = TangoControlSystem(TangoControlSystemConfigModel(name="live")) + ops = TangoControlSystem(TangoControlSystemConfigModel(name="ops")) + + live.set_catalog(catalog) + ops.set_catalog(catalog) + + assert live.get_catalog() is catalog + assert ops.get_catalog() is catalog + assert catalog.resolve("BPM/x") is device + assert live.get_device("BPM/x") is not device + assert ops.get_device("BPM/x") is not device + assert live.get_device("BPM/x") is not ops.get_device("BPM/x") + + +# --- Integration with DeviceAccess types --- + + +def test_static_catalog_works_with_attribute_read_only(): + device = AttributeReadOnly( + AttributeConfigModel(attribute="sr/bpm/c01-01/pos", unit="mm") + ) + catalog = make_catalog(entries=[make_entry("BPM/x", device=device)]) + + resolved = catalog.resolve("BPM/x") + + assert isinstance(resolved, AttributeReadOnly) + assert resolved.unit() == "mm" + + +def test_static_catalog_can_be_used_through_tango_control_system(): + device = make_attribute("sr/bpm/c01-01/x", unit="mm") + catalog = make_catalog(entries=[make_entry("BPM/x", device=device)]) + control_system = TangoControlSystem(TangoControlSystemConfigModel(name="live")) + control_system.set_catalog(catalog) + + resolved = control_system.get_device("BPM/x") + + assert resolved is not device + assert resolved.name() == "sr/bpm/c01-01/x" diff --git a/tests/test_tango_catalog.py b/tests/test_tango_catalog.py new file mode 100644 index 0000000..050ee9d --- /dev/null +++ b/tests/test_tango_catalog.py @@ -0,0 +1,344 @@ +from unittest.mock import call, patch + +import pyaml +import pytest +import tango +from pyaml.control.controlsystem import ControlSystemAdapter + +from .mocked_device_proxy import MockedAttributeInfoEx, MockedAttributeProxy +from tango.pyaml.attribute import Attribute +from tango.pyaml.attribute_read_only import AttributeReadOnly +from tango.pyaml.controlsystem import ConfigModel as TangoControlSystemConfigModel +from tango.pyaml.controlsystem import TangoControlSystem +from tango.pyaml.tango_catalog import ConfigModel, TangoCatalog + + +def build_control_system(catalog: TangoCatalog, name="live"): + control_system = TangoControlSystem(TangoControlSystemConfigModel(name=name)) + control_system.set_catalog(catalog) + return control_system + + +def test_tango_catalog_disconnected_resolves_without_querying_tango(): + catalog = TangoCatalog(ConfigModel(name="tango-direct", disconnected=True)) + control_system = build_control_system(catalog) + + with patch("tango.AttributeProxy") as attr_proxy: + device = catalog.resolve("domain/family/member/attribute", control_system) + + attr_proxy.assert_not_called() + assert isinstance(device, Attribute) + assert device.name() == "domain/family/member/attribute" + assert device.unit() == "" + assert device.get_range() == [None, None] + + +def test_tango_catalog_connected_resolves_writable_attribute(): + attr_config = MockedAttributeInfoEx( + name="current", + writable=tango.AttrWriteType.READ_WRITE, + unit="A", + min_value="-10.5", + max_value="12.0", + data_format=tango.AttrDataFormat.SPECTRUM, + ) + catalog = TangoCatalog(ConfigModel(name="tango-direct")) + control_system = build_control_system(catalog) + + with patch( + "tango.AttributeProxy", + return_value=MockedAttributeProxy("domain/family/member/current", attr_config), + ): + device = catalog.resolve("domain/family/member/current", control_system) + + assert isinstance(device, Attribute) + assert not isinstance(device, AttributeReadOnly) + assert device.name() == "domain/family/member/current" + assert device.unit() == "A" + assert device.get_range() == [-10.5, 12.0] + assert ( + catalog.get_data_format("domain/family/member/current", control_system) + == tango.AttrDataFormat.SPECTRUM + ) + + +def test_tango_catalog_connected_resolves_read_only_attribute(): + attr_config = MockedAttributeInfoEx( + name="position", writable=tango.AttrWriteType.READ, unit="mm" + ) + catalog = TangoCatalog(ConfigModel(name="tango-direct")) + control_system = build_control_system(catalog) + + with patch( + "tango.AttributeProxy", + return_value=MockedAttributeProxy("domain/family/member/position", attr_config), + ): + device = catalog.resolve("domain/family/member/position", control_system) + + assert isinstance(device, AttributeReadOnly) + assert device.unit() == "mm" + + +def test_tango_catalog_caches_resolved_devices(): + catalog = TangoCatalog(ConfigModel(name="tango-direct")) + control_system = build_control_system(catalog) + + with patch( + "tango.AttributeProxy", + return_value=MockedAttributeProxy("domain/family/member/attribute"), + ) as attr_proxy: + first = catalog.resolve("domain/family/member/attribute", control_system) + second = catalog.resolve("domain/family/member/attribute", control_system) + + attr_proxy.assert_called_once_with("domain/family/member/attribute") + assert first is second + + +def test_tango_catalog_cache_is_bound_to_control_system_resolver(): + catalog = TangoCatalog(ConfigModel(name="tango-direct")) + live = build_control_system(catalog, name="live") + ops = build_control_system(catalog, name="ops") + + with patch( + "tango.AttributeProxy", + return_value=MockedAttributeProxy("domain/family/member/attribute"), + ) as attr_proxy: + live_first = catalog.resolve("domain/family/member/attribute", live) + live_second = catalog.resolve("domain/family/member/attribute", live) + ops_device = catalog.resolve("domain/family/member/attribute", ops) + + assert attr_proxy.call_count == 2 + assert live_first is live_second + assert ops_device is not live_first + + +def test_tango_catalog_connected_metadata_uses_control_system_tango_host(): + key = "domain/family/member/current" + catalog = TangoCatalog(ConfigModel(name="tango-direct")) + live = TangoControlSystem( + TangoControlSystemConfigModel(name="live", tango_host="live-db:10000") + ) + ops = TangoControlSystem( + TangoControlSystemConfigModel(name="ops", tango_host="ops-db:10000") + ) + live.set_catalog(catalog) + ops.set_catalog(catalog) + + attr_configs = { + "//live-db:10000/domain/family/member/current": MockedAttributeInfoEx( + name="current", + min_value="-10.0", + max_value="10.0", + ), + "//ops-db:10000/domain/family/member/current": MockedAttributeInfoEx( + name="current", + min_value="-2.5", + max_value="2.5", + ), + } + + def attribute_proxy(attr_full_name): + return MockedAttributeProxy(attr_full_name, attr_configs[attr_full_name]) + + with patch("tango.AttributeProxy", side_effect=attribute_proxy) as attr_proxy: + live_device = live.get_device(key) + ops_device = ops.get_device(key) + + assert attr_proxy.call_args_list == [ + call("//live-db:10000/domain/family/member/current"), + call("//ops-db:10000/domain/family/member/current"), + ] + assert live_device.name() == "//live-db:10000/domain/family/member/current" + assert ops_device.name() == "//ops-db:10000/domain/family/member/current" + assert live_device.get_range() == [-10.0, 10.0] + assert ops_device.get_range() == [-2.5, 2.5] + + +def test_tango_catalog_can_be_used_through_tango_control_system(): + catalog = TangoCatalog(ConfigModel(name="tango-direct", disconnected=True)) + control_system = TangoControlSystem(TangoControlSystemConfigModel(name="live")) + control_system.set_catalog(catalog) + + device = control_system.get_device("domain/family/member/attribute") + + assert isinstance(device, Attribute) + assert control_system.get_catalog() is catalog + + +def test_tango_catalog_rejects_non_tango_control_system(): + catalog = TangoCatalog(ConfigModel(name="tango-direct")) + + with pytest.raises( + pyaml.PyAMLException, match="can only resolve through TangoControlSystem" + ): + catalog.resolve("domain/family/member/attribute", ControlSystemAdapter()) + + +def test_tango_catalog_rejects_external_tango_control_system_class(): + class TangoControlSystem: + pass + + catalog = TangoCatalog(ConfigModel(name="tango-direct")) + + with pytest.raises( + pyaml.PyAMLException, match="can only resolve through TangoControlSystem" + ): + catalog.resolve("domain/family/member/attribute", TangoControlSystem()) + + +def test_tango_catalog_requires_control_system_attachment(): + catalog = TangoCatalog(ConfigModel(name="tango-direct")) + + with pytest.raises( + pyaml.PyAMLException, match="needs a TangoControlSystem context" + ): + catalog.resolve("domain/family/member/attribute") + + +def test_tango_catalog_rejects_invalid_tango_reference(): + catalog = TangoCatalog(ConfigModel(name="tango-direct")) + control_system = build_control_system(catalog) + + with pytest.raises( + pyaml.PyAMLException, match="Expected 'domain/family/member/attribute'" + ): + catalog.resolve("domain/family/member", control_system) + + +def test_tango_catalog_rejects_invalid_index(): + catalog = TangoCatalog(ConfigModel(name="tango-direct")) + control_system = build_control_system(catalog) + + with pytest.raises(pyaml.PyAMLException, match="invalid index"): + catalog.resolve("domain/family/member/attribute@notanint", control_system) + + +def test_tango_catalog_disconnected_resolves_indexed_attribute(): + catalog = TangoCatalog(ConfigModel(name="tango-direct", disconnected=True)) + control_system = build_control_system(catalog) + + with patch("tango.AttributeProxy") as attr_proxy: + device = catalog.resolve("domain/family/member/attribute@1", control_system) + + attr_proxy.assert_not_called() + assert isinstance(device, Attribute) and device._index is not None + assert device.name() == "domain/family/member/attribute[1]" + assert device.unit() == "" + assert device.get_range() == [None, None] + + +def test_tango_catalog_connected_resolves_indexed_writable_spectrum(): + attr_config = MockedAttributeInfoEx( + name="position", + writable=tango.AttrWriteType.READ_WRITE, + unit="mm", + data_format=tango.AttrDataFormat.SPECTRUM, + ) + catalog = TangoCatalog(ConfigModel(name="tango-direct")) + control_system = build_control_system(catalog) + + with patch( + "tango.AttributeProxy", + return_value=MockedAttributeProxy("domain/family/member/position", attr_config), + ): + device = catalog.resolve("domain/family/member/position@0", control_system) + + assert isinstance(device, Attribute) and device._index is not None + assert not isinstance(device, AttributeReadOnly) + assert device.name() == "domain/family/member/position[0]" + assert device.unit() == "mm" + assert ( + catalog.get_data_format("domain/family/member/position@0", control_system) + == tango.AttrDataFormat.SPECTRUM + ) + + +def test_tango_catalog_connected_resolves_indexed_read_only_spectrum(): + attr_config = MockedAttributeInfoEx( + name="position", + writable=tango.AttrWriteType.READ, + unit="mm", + data_format=tango.AttrDataFormat.SPECTRUM, + ) + catalog = TangoCatalog(ConfigModel(name="tango-direct")) + control_system = build_control_system(catalog) + + with patch( + "tango.AttributeProxy", + return_value=MockedAttributeProxy("domain/family/member/position", attr_config), + ): + device = catalog.resolve("domain/family/member/position@2", control_system) + + assert isinstance(device, AttributeReadOnly) and device._index is not None + assert device.unit() == "mm" + + +def test_tango_catalog_connected_rejects_indexed_scalar_attribute(): + attr_config = MockedAttributeInfoEx( + name="current", + writable=tango.AttrWriteType.READ_WRITE, + data_format=tango.AttrDataFormat.SCALAR, + ) + catalog = TangoCatalog(ConfigModel(name="tango-direct")) + control_system = build_control_system(catalog) + + with patch( + "tango.AttributeProxy", + return_value=MockedAttributeProxy("domain/family/member/current", attr_config), + ): + with pytest.raises(pyaml.PyAMLException, match="not a SPECTRUM"): + catalog.resolve("domain/family/member/current@0", control_system) + + +def test_tango_catalog_indexed_caches_resolved_devices(): + attr_config = MockedAttributeInfoEx( + name="position", + data_format=tango.AttrDataFormat.SPECTRUM, + ) + catalog = TangoCatalog(ConfigModel(name="tango-direct")) + control_system = build_control_system(catalog) + + with patch( + "tango.AttributeProxy", + return_value=MockedAttributeProxy("domain/family/member/position", attr_config), + ) as attr_proxy: + first = catalog.resolve("domain/family/member/position@1", control_system) + second = catalog.resolve("domain/family/member/position@1", control_system) + + attr_proxy.assert_called_once_with("domain/family/member/position") + assert first is second + + +def test_tango_catalog_wraps_tango_errors(): + catalog = TangoCatalog(ConfigModel(name="tango-direct")) + control_system = build_control_system(catalog) + + with patch("tango.AttributeProxy", side_effect=tango.DevFailed()): + with pytest.raises( + pyaml.PyAMLException, + match="Tango catalog 'tango-direct' cannot resolve 'domain/family/member/attribute'", + ): + catalog.resolve("domain/family/member/attribute", control_system) + + +def test_tango_catalog_rejects_incomplete_tango_config(): + class IncompleteAttributeConfig: + unit = "A" + min_value = "-1" + max_value = "1" + data_format = tango.AttrDataFormat.SCALAR + + catalog = TangoCatalog(ConfigModel(name="tango-direct")) + control_system = build_control_system(catalog) + + with patch( + "tango.AttributeProxy", + return_value=MockedAttributeProxy( + "domain/family/member/attribute", IncompleteAttributeConfig() + ), + ): + with pytest.raises( + pyaml.PyAMLException, + match="incomplete Tango attribute config, missing 'writable'", + ): + catalog.resolve("domain/family/member/attribute", control_system)