Source code for chipscopy.api.ibert.yk_scan

# Copyright (C) 2021-2022, Xilinx, Inc.
# Copyright (C) 2022-2023, Advanced Micro Devices, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
from dataclasses import dataclass, field
from typing import Callable, Dict, Any, Optional, TYPE_CHECKING, Set, List

from chipscopy.api.ibert.aliases import (
    MB_ELF_VERSION,
    YK_SCAN_START_TIME,
    YK_SCAN_STOP_TIME,
    YK_SCAN_SLICER_DATA,
    YK_SCAN_SNR_VALUE,
)
from chipscopy.api.ibert.rx import RX
from chipscopy.utils.printer import printer

if TYPE_CHECKING:
    from chipscopy.dm import Node


[docs]@dataclass class YKSample: slicer: List[float] snr: float
[docs]@dataclass class YKScan: """ Class for interacting with YK scans. **Please do not** create an instance of this class directly. Please use the factory method :py:func:`~chipscopy.api.ibert.create_yk_scans` instead. """ rx: RX """:py:class:`RX` object attached to this eye scan""" name: str """Name of the eye scan""" updates_callback: Callable[["YKScan"], None] = None """Callback function called when eye scan has ended""" filter_by: Dict[str, Any] = field(default_factory=dict) scan_data: List[YKSample] = field(default_factory=list) """YK scan data samples in the order they are received""" stop_time: datetime = None """Time stamp of when eye scan was stopped in cs_server""" start_time: datetime = None """Time stamp of when eye scan was started in cs_server""" elf_version: str = None """ELF version read from the MicroBlaze""" _handle_from_cs_server: Optional[str] = None def __repr__(self): return self.name def __post_init__(self): self.rx.yk_scan = self self.filter_by = {"rx": self.rx, "name": self.name} self.rx.property.endpoint_tcf_node.add_listener(self._update_event_listener)
[docs] def start(self): self._handle_from_cs_server = self.rx.core_tcf_node.start_yk_scan(rx_name=self.rx.handle)
def _update_event_listener(self, node: "Node", updated_properties: Set[str]): # NOTE - This is called on the TCF event dispatcher thread if len(updated_properties) == 0 or self._handle_from_cs_server not in updated_properties: return try: report = node.props[self._handle_from_cs_server] if self.start_time is None and YK_SCAN_START_TIME in report: self.start_time = datetime.strptime( report[YK_SCAN_START_TIME], "%Y-%m-%d %H:%M:%S.%f" ) # Not expected to change after scan stop if self.stop_time is None and YK_SCAN_STOP_TIME in report: self.stop_time = datetime.strptime( report[YK_SCAN_STOP_TIME], "%Y-%m-%d %H:%M:%S.%f" ) # Not expected to change after scan start if self.elf_version is None and MB_ELF_VERSION in report: self.elf_version = report[MB_ELF_VERSION] if YK_SCAN_SLICER_DATA in report and YK_SCAN_SNR_VALUE in report: self.scan_data.append( YKSample(report[YK_SCAN_SLICER_DATA], report[YK_SCAN_SNR_VALUE]) ) # If user has registered done callback function call it. if callable(self.updates_callback): try: self.updates_callback(self) except Exception as e: printer( f"Unhandled exception during YK scan update callback!\n" f"Exception - {str(e)}", level="warning", ) except Exception as e: printer( f"Unhandled YK scan update exception on TCF thread!\nException - {str(e)}", level="warning", )
[docs] def stop(self): """ Stop eye scan, that is in-progress in the MicroBlaze """ self.rx.core_tcf_node.terminate_yk_scan(rx_name=self.rx.handle)