Source code for chipscopy.api.ibert.serial_object_base

# Copyright (C) 2021-2022, Xilinx, Inc.
# Copyright (C) 2022-2023, Advanced Micro Devices, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import builtins
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    Generic,
    List,
    Set,
    Tuple,
    Type,
    TypeVar,
    Union,
)

from rich.tree import Tree

from chipscopy.api._detail.property import PropertyCommands, Watchlist, WatchlistEventHandler
from chipscopy.api.containers import QueryList
from chipscopy.api.ibert.aliases import (
    DISPLAY_NAME,
    TYPE,
    HANDLE_NAME,
    PROPERTY_ENDPOINT,
    ALIAS_DICT,
    MODIFIABLE_ALIASES,
)
from chipscopy.client.ibert_core_client import IBERTCoreClient
from typing_extensions import Final

if TYPE_CHECKING:  # pragma: no cover
    from chipscopy.api.ibert import IBERT
    from chipscopy.api.ibert.gt_group import GTGroup


class IBERTWatchlist(Watchlist["IBERTPropertyCommands"]):
    def __init__(self, parent):
        Watchlist.__init__(self, parent)

    def add(
        self,
        property_names: Union[str, List[str]],
        *,
        listeners: Union[WatchlistEventHandler, List[WatchlistEventHandler]] = None,
    ):
        """
        Add properties to the watchlist

        Args:
            property_names: Property name(s)
            listeners: Callback to be called when the property update event is received from cs_server

        """
        if listeners is not None and not isinstance(listeners, list):
            listeners = [listeners]

        sanitized_data = PropertyCommands.sanitize_input(property_names, list)

        watch_id = self._get_uuid()
        with self._lock:
            if not self._tcf_event_listener_registered:
                # NOTE - Imp to use the "endpoint_tcf_node" and not the "core_tcf_node"
                # cs_server will send updates via the node that is the property endpoint
                self.parent.endpoint_tcf_node.add_listener(self._mandatory_event_listener)
                self._tcf_event_listener_registered = True

            self._listeners[watch_id].extend(listeners)

            for property_name in sanitized_data:
                self._watch_ids_for_property[property_name].add(watch_id)

        try:
            self.core_tcf_node.add_to_property_watchlist(sanitized_data, self.parent.endpoint_name)
        except Exception:
            with self._lock:
                for property_name in sanitized_data:
                    self._watch_ids_for_property[property_name].remove(watch_id)
                del self._listeners[watch_id]

    def remove(self, property_names: Union[str, List[str]]):
        """
        Remove properties from watchlist

        Args:
            property_names: Property name(s)

        """
        sanitized_data = {
            prop
            for prop in PropertyCommands.sanitize_input(property_names, list)
            if prop in self._watch_ids_for_property
        }

        self.core_tcf_node.remove_from_property_watchlist(sanitized_data, self.parent.endpoint_name)

        for prop in sanitized_data:
            del self._watch_ids_for_property[prop]


class IBERTPropertyCommands(PropertyCommands["SerialObjectBase"]):
    def __init__(self, parent, property_endpoint):
        PropertyCommands.__init__(self, parent)

        self.watchlist = IBERTWatchlist(self)
        self.endpoint_name: Final[str] = property_endpoint.name
        self.endpoint_tcf_node = self.core_tcf_node.get_child_with_name(self.endpoint_name)

    """
    NOTE - The 'endpoint_tcf_node' is different from 'core_tcf_node'.
    core_tcf_node - Points to the IBERTCoreClient class and is needed to call server commands
    _endpoint_tcf_node - Points to the Node class which is a child of core_tcf_node. 
     Needed to access props attached to a node

    Example with IBERT hierarchy 
        IBERT Versal GTY  <-- core_tcf_node
        ├── Quad_206 <-- if self.endpoint_name == "Quad_206", endpoint_tcf_node points to this
        ├── Quad_204 <-- if self.endpoint_name == "Quad_204", endpoint_tcf_node points to this 
        ├── Quad_205 <-- if self.endpoint_name == "Quad_205", endpoint_tcf_node points to this 
        ├── Quad_203 <-- if self.endpoint_name == "Quad_203", endpoint_tcf_node points to this 
        ├── Quad_202 <-- if self.endpoint_name == "Quad_202", endpoint_tcf_node points to this  
        └── Quad_201 <-- if self.endpoint_name == "Quad_201", endpoint_tcf_node points to this
    """

    def _segregate_props(self, all_property_names) -> Tuple[Set[str], Set[str]]:
        all_property_names = set(all_property_names)

        client_side = set(self.endpoint_tcf_node.props.keys()).intersection(all_property_names)
        # Only use properties that are still in the watchlist. If a prop is deleted in the server
        # TCF node, it wont get deleted in the client TCF node. The prop value in that case would
        # be stale, since it's no longer updated via TCF property update events from the server.
        client_side.intersection_update(self.watchlist.active_properties)

        server_side = all_property_names.difference(client_side)
        return client_side, server_side

    def get(self, property_names: Union[str, List[str]]) -> Dict[str, Any]:
        """
        Get the property value cached in cs_server

        Args:
            property_names: Property name(s)

        Returns:
            Dictionary with property name and value as key, value pairs.

        """
        sanitized_data = PropertyCommands.sanitize_input(property_names, list)
        client_side_props, server_side_props = self._segregate_props(sanitized_data)

        all_property_values = dict()

        # Fetch client side properties from Node.props
        for property_name in client_side_props:
            all_property_values[property_name] = self.endpoint_tcf_node.props[property_name]

        if len(server_side_props) > 0:
            # If there are properties not available with the client, go to the server to fetch them
            all_property_values.update(
                self.core_tcf_node.get_property(list(server_side_props), self.endpoint_name)
            )

        return all_property_values

    def set(self, **property_dict):
        """
        Set new values for properties in cs_server

        Args:
            \*\*property_dict:
             Unpacked dict with key as property and value as new property value

        """
        sanitized_data = PropertyCommands.sanitize_input(property_dict, dict)
        return self.core_tcf_node.set_property(sanitized_data, self.endpoint_name)

    def refresh(self, property_names: Union[str, List[str]]) -> Dict[str, Any]:
        """
        Refresh the value of properties and update it in cs_server

        Args:
            property_names: Property name(s)

        Returns:
            Dictionary with property name and refreshed value as key, value pairs.

        """
        sanitized_data = PropertyCommands.sanitize_input(property_names, list)
        return self.core_tcf_node.refresh_property(sanitized_data, self.endpoint_name)

    def commit(self, property_names: Union[str, List[str]]):
        """
        Commit the value of properties to HW

        Args:
            property_names: Property name(s)

        """
        sanitized_data = PropertyCommands.sanitize_input(property_names, list)
        return self.core_tcf_node.commit_property(sanitized_data, self.endpoint_name)

    def report(self, property_names: Union[str, List[str]] = None) -> Dict[str, Dict[str, Any]]:
        """
        Generate a report providing detailed information about the properties.

        Args:
            property_names: Property name(s)

        Returns:
            Dictionary with property name and information as key, value pairs.

        """
        sanitized_data = []
        if property_names is not None:
            sanitized_data = PropertyCommands.sanitize_input(property_names, list)
        return self.core_tcf_node.report_property(sanitized_data, self.endpoint_name)


parent_type = TypeVar("parent_type")
child_type = TypeVar("child_type")


[docs]class SerialObjectBase(Generic[parent_type, child_type]): """ Abstract base class for all serial objects """ def __init__( self, obj_info: Dict[str, Any], parent: parent_type, core_tcf_node: IBERTCoreClient ): self.name: Final[str] = obj_info[DISPLAY_NAME] """Object name""" self.type: Final[str] = obj_info[TYPE] """Object type""" self.handle: Final[str] = obj_info[HANDLE_NAME] """Object handle from cs_server""" self.parent: Final[parent_type] = parent """Parent of this object""" self._children: QueryList[child_type] = QueryList() self.core_tcf_node: Final[IBERTCoreClient] = core_tcf_node self._property_for_alias: Dict[str, str] = dict() self._modifiable_aliases: Set[str] = set() self._property_endpoint: Type[SerialObjectBase] if obj_info[PROPERTY_ENDPOINT]: self._property_endpoint = self else: if self.parent._property_endpoint is None: raise ValueError( f"Property endpoint for {self.parent.handle} is None! " f"Cannot determine property endpoint for {self.handle}" ) self._property_endpoint = self.parent._property_endpoint self._property: IBERTPropertyCommands = IBERTPropertyCommands( parent=self, property_endpoint=self._property_endpoint, ) self.filter_by = {} self.setup_done: bool = False def __repr__(self): return f"{self.handle}({self.type})" def __rich_tree__(self): root = Tree(self.name) for child in self.children: try: root.add(child.__rich_tree__()) except AttributeError: root.add(child.name) return root # Explicitly use property from builtins here, since this class has instance var with same name @builtins.property def aliases(self) -> Set[str]: """All available aliases""" return set(self.property_for_alias.keys()) @builtins.property def children(self) -> QueryList[child_type]: """Direct children of this object""" self.setup() return self._children @builtins.property def modifiable_aliases(self) -> Set[str]: """Aliases that support value modification""" self.setup() return self._modifiable_aliases @builtins.property def property(self) -> IBERTPropertyCommands: self.setup() return self._property @builtins.property def property_for_alias(self) -> Dict[str, str]: """Alias to property name mapping""" self.setup() return self._property_for_alias def reset(self): raise NotImplementedError(f"Reset is not supported for {self.handle}!") def _build_aliases(self, obj_info: Dict[str, Any]): if obj_info.get(ALIAS_DICT): self._property_for_alias = obj_info[ALIAS_DICT] if obj_info.get(MODIFIABLE_ALIASES): self._modifiable_aliases = obj_info[MODIFIABLE_ALIASES] def setup(self): if self.setup_done: return obj_info = self.core_tcf_node.get_obj_info(self.handle) self._build_aliases(obj_info) self.setup_done = True