Source code for chipscopy.api.ibert.eye_scan

# Copyright (C) 2021-2022, Xilinx, Inc.
# Copyright (C) 2022-2023, Advanced Micro Devices, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime
from threading import Event
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Set, Tuple

from rich.box import SQUARE as BOX_SQUARE
from rich.table import Table

from chipscopy.api.ibert.aliases import (
    EYE_SCAN_2D_PLOT,
    EYE_SCAN_2D_PLOT_BER_FLOOR_VALUE,
    EYE_SCAN_2D_PLOT_DATA,
    EYE_SCAN_ABORTED,
    EYE_SCAN_DONE,
    EYE_SCAN_ERROR_COUNT,
    EYE_SCAN_HORZ_RANGE,
    EYE_SCAN_IN_PROGRESS,
    EYE_SCAN_NOT_STARTED,
    EYE_SCAN_PRESCALE,
    EYE_SCAN_PROGRESS,
    EYE_SCAN_RAW_DATA,
    EYE_SCAN_SAMPLE_COUNT,
    EYE_SCAN_SCAN_PARAMETERS,
    EYE_SCAN_START_TIME,
    EYE_SCAN_STATUS,
    EYE_SCAN_STOP_TIME,
    EYE_SCAN_TOTAL_NO_OF_DATA_POINTS_EXPECTED,
    EYE_SCAN_TOTAL_NO_OF_DATA_POINTS_READ,
    EYE_SCAN_UT,
    EYE_SCAN_VERT_RANGE,
    MB_ELF_VERSION,
    EYE_SCAN_HORZ_STEP,
    EYE_SCAN_VERT_STEP,
    EYE_SCAN_MAX_HORZ_RANGE,
    EYE_SCAN_MIN_HORZ_RANGE,
    EYE_SCAN_MAX_VERT_RANGE,
    EYE_SCAN_MIN_VERT_RANGE,
)
from chipscopy.api.ibert.rx import RX
from chipscopy.api.ibert.eye_scan.params import EyeScanParam
from chipscopy.api.ibert.eye_scan.plotter import EyeScanPlot
from chipscopy.utils.printer import printer, PercentProgressBar

if TYPE_CHECKING:  # pragma: no cover
    from chipscopy.dm import Node

common_progress = PercentProgressBar()


[docs]@dataclass class ScanPoint: x: int y: int ber: float errors: int samples: int
[docs]@dataclass(frozen=True) class Plot2DData: scan_points: Dict[Tuple[int, int], ScanPoint] """Collection of :py:class:`ScanPoint` instances to represent the 2D eye scan plot""" ber_floor_value: float """This value is used as the ``ber`` value for any X, Y coordinate whose computed BER was 0"""
[docs]@dataclass class RawData: """ Class for storing raw data from the MicroBlaze. The size of all the lists in this class will be the same """ ut: List[int] prescale: List[int] error_count: List[int] sample_count: List[int] vertical_range: List[int] horizontal_range: List[int]
[docs]@dataclass class ScanData: """ Class for storing raw scan data and 2D plot data """ raw: RawData """Raw scan data from the MicroBlaze""" all_params: dict """All parameters the server used to run the scan in the MicroBlaze""" processed: Plot2DData = None """2D plot data. This will only be available if the scan completed successfully"""
[docs]@dataclass class MetricData: """ Class for storing scan metric data """ open_area: int """open area in the eye scan""" open_percentage: float """open area percentage of the eye scan""" horizontal_opening: int """zero offset horizontal opening of the eye scan""" horizontal_percentage: float """zero offset horizontal opening percent of the eye scan""" vertical_opening: int """zero offset vertical opening of the eye scan""" vertical_percentage: float """zero offset horizontal opening percent of the eye scan"""
[docs]@dataclass class EyeScan: """ Class for interacting with eye scans. **Please do not** create an instance of this class directly. Please use the factory method :py:func:`~chipscopy.api.ibert.create_eye_scans` instead. """ rx: RX """:py:class:`RX` object attached to this eye scan""" name: str """Name of the eye scan""" params: Dict[str, EyeScanParam] = field(default_factory=dict) """Dictionary containing the py:class:`EyeScanParam` instance for every available param""" done_callback: Callable[["EyeScan"], None] = None """Callback function called when eye scan has ended""" progress_callback: Callable[[float], None] = None """Callback function called when eye scan update is received""" error: str = "" """String detailing reason for scan abort""" filter_by: Dict[str, Any] = field(default_factory=dict) status: str = EYE_SCAN_NOT_STARTED """Status of the eye scan""" progress: float = -1.0 """Progress of the eye scan in %""" stop_time: datetime = None """Time stamp of when eye scan was stopped in cs_server""" start_time: datetime = None """Time stamp of when eye scan was started in cs_server""" elf_version: str = None """ELF version read from the MicroBlaze""" data_points_read: int = 0 """Number of data points scanned by the MicroBlaze""" data_points_expected: int = 0 """Total number of data points expected to be scanned by the MicroBlaze""" open_data_points: int = 0 """Number of data points scanned by the MicroBlaze""" scan_data: ScanData = None """Scan data stored in an instance of the :py:class:`ScanData` class""" metric_data: MetricData = None """metric data stored in an instance of :py:class:`MetricData` class""" plot: EyeScanPlot = None """`EyeScanPlot` instance for interacting with the plot""" _handle_from_cs_server: Optional[str] = None _task_id = None _scan_done_event: Event = field(default_factory=Event) def __repr__(self) -> str: return self.name def __post_init__(self): self.rx.eye_scan = self self.rx.eye_scan_names.append(self.name) self.filter_by = {"rx": self.rx, "name": self.name} if self.rx.link is not None: self.filter_by["link"] = self.rx.link all_params = self.rx.core_tcf_node.get_eye_scan_parameters(rx_name=self.rx.handle) valid_values = all_params["Valid Values"] default_values = all_params["Default Value"] for param in all_params["Parameter Names"]: self.params[param] = EyeScanParam(param) if param in valid_values: self.params[param].modifiable = True self.params[param].valid_values = valid_values[param] self.params[param].default_value = default_values[param] # Todo - Remove event listener after done with everything self.rx.property.endpoint_tcf_node.add_listener(self._scan_update_event_listener) def _clear_out_old_data(self): self.status = EYE_SCAN_NOT_STARTED self.progress = -1.0 self.stop_time = None self.start_time = None self.elf_version = None self.scan_data = None self.metric_data = None self.open_data_points = 0 self.data_points_read = 0 self.data_points_expected = 0 self._scan_done_event.clear() def _get_status(self) -> str: if self.status == EYE_SCAN_IN_PROGRESS: return PercentProgressBar.Status.IN_PROGRESS.value elif self.status == EYE_SCAN_ABORTED: return PercentProgressBar.Status.ABORTED.value elif self.status == EYE_SCAN_DONE: return PercentProgressBar.Status.DONE.value else: return f"[bold]{self.status}[/]"
[docs] def invalidate(self): self.rx.eye_scan = None self.rx.eye_scan_names.remove(self.name) self.rx = None self.name = "" self._clear_out_old_data()
def _calculate_vertical_horizontal_opening(self, plot_params): horizontal_middle = 0 max_horizontal = plot_params[EYE_SCAN_MAX_HORZ_RANGE] min_horizontal = plot_params[EYE_SCAN_MIN_HORZ_RANGE] horizontal_step = plot_params[EYE_SCAN_HORZ_STEP] if max_horizontal != -min_horizontal: middle = ((max_horizontal - min_horizontal) + 1) / 2 horizontal_middle = min_horizontal + middle - (min_horizontal % horizontal_step) vertical_middle = 0 max_vertical = plot_params[EYE_SCAN_MAX_VERT_RANGE] min_vertical = plot_params[EYE_SCAN_MIN_VERT_RANGE] vertical_step = plot_params[EYE_SCAN_VERT_STEP] if max_vertical != -min_vertical: middle = ((max_vertical - min_vertical) + 1) / 2 vertical_middle = min_vertical + middle - (min_vertical % vertical_step) zero_offset_horz_min = 1000 zero_offset_horz_max = -1000 zero_offset_vert_min = 1000 zero_offset_vert_max = -1000 zero_offset_vertical_count = 0 zero_offset_horizontal_count = 0 for _, data in self.scan_data.processed.scan_points.items(): if data.errors == 0: if data.x == horizontal_middle: zero_offset_vertical_count += 1 if data.y < zero_offset_vert_min: zero_offset_vert_min = data.y if data.y > zero_offset_vert_max: zero_offset_vert_max = data.y if data.y == vertical_middle: zero_offset_horizontal_count += 1 if data.x < zero_offset_horz_min: zero_offset_horz_min = data.x if data.x > zero_offset_horz_max: zero_offset_horz_max = data.x zero_offset_vert_opening = zero_offset_vert_max - zero_offset_vert_min zero_offset_horz_opening = zero_offset_horz_max - zero_offset_horz_min if zero_offset_vert_opening < 0: zero_offset_vert_opening = 0 if zero_offset_horz_opening < 0: zero_offset_horz_opening = 0 if zero_offset_vert_max >= 0 >= zero_offset_horz_min: zero_offset_vert_opening += 1 if zero_offset_horz_max >= 0 >= zero_offset_horz_min: zero_offset_horz_opening += 1 self.metric_data.vertical_opening = zero_offset_vert_opening self.metric_data.horizontal_opening = zero_offset_horz_opening num_of_columns = (max_horizontal - min_horizontal) / horizontal_step + 1 self.metric_data.horizontal_percentage = round( float((zero_offset_horizontal_count * 100) / num_of_columns), 2 ) num_of_rows = (max_vertical - min_vertical) / vertical_step + 1 self.metric_data.vertical_percentage = round( float((zero_offset_vertical_count * 100) / num_of_rows), 2 )
[docs] def start(self, *, show_progress_bar: bool = True): """ Send command to cs_server to start the eye scan in the HW. Args: show_progress_bar: Set to true to show progress bar on stdout """ self._clear_out_old_data() scan_params = dict() for param in self.params.values(): if param.value is not None: if not param.modifiable: printer( f"Scan parameter '{param.name} is not modifiable! It will be ignored", level="warning", ) continue scan_params[param.name] = param.value self._handle_from_cs_server = self.rx.core_tcf_node.start_eye_scan( rx_name=self.rx.handle, scan_parameters=scan_params, ) self._task_id = common_progress.add_task( description=f"{self.name} progress ", status=self._get_status(), visible=show_progress_bar, )
def _scan_update_event_listener(self, node: "Node", updated_properties: Set[str]): # NOTE - This is called on the TCF event dispatcher thread if len(updated_properties) == 0: return try: # TODO - Merge this condition with the one at the top if self._handle_from_cs_server not in updated_properties: return scan_report = node.props[self._handle_from_cs_server] # Not expected to change after scan start if self.start_time is None and EYE_SCAN_START_TIME in scan_report: self.start_time = datetime.strptime( scan_report[EYE_SCAN_START_TIME], "%Y-%m-%d %H:%M:%S.%f" ) # Not expected to change after scan stop if self.stop_time is None and EYE_SCAN_STOP_TIME in scan_report: self.stop_time = datetime.strptime( scan_report[EYE_SCAN_STOP_TIME], "%Y-%m-%d %H:%M:%S.%f" ) # Not expected to change after scan start if self.elf_version is None and MB_ELF_VERSION in scan_report: self.elf_version = scan_report[MB_ELF_VERSION] # Not expected to change after scan start if ( self.data_points_expected == 0 and EYE_SCAN_TOTAL_NO_OF_DATA_POINTS_EXPECTED in scan_report ): self.data_points_expected = int( scan_report[EYE_SCAN_TOTAL_NO_OF_DATA_POINTS_EXPECTED] ) if EYE_SCAN_TOTAL_NO_OF_DATA_POINTS_READ in scan_report: self.data_points_read = int(scan_report[EYE_SCAN_TOTAL_NO_OF_DATA_POINTS_READ]) if EYE_SCAN_SCAN_PARAMETERS in scan_report and EYE_SCAN_RAW_DATA in scan_report: new_ut = scan_report[EYE_SCAN_RAW_DATA][EYE_SCAN_UT] new_prescale = scan_report[EYE_SCAN_RAW_DATA][EYE_SCAN_PRESCALE] new_error_count = scan_report[EYE_SCAN_RAW_DATA][EYE_SCAN_ERROR_COUNT] new_sample_count = scan_report[EYE_SCAN_RAW_DATA][EYE_SCAN_SAMPLE_COUNT] new_vertical_range = scan_report[EYE_SCAN_RAW_DATA][EYE_SCAN_VERT_RANGE] new_horizontal_range = scan_report[EYE_SCAN_RAW_DATA][EYE_SCAN_HORZ_RANGE] if self.scan_data is None: self.scan_data = ScanData( raw=RawData( ut=new_ut, prescale=new_prescale, error_count=new_error_count, sample_count=new_sample_count, vertical_range=new_vertical_range, horizontal_range=new_horizontal_range, ), all_params=scan_report[EYE_SCAN_SCAN_PARAMETERS], ) else: self.scan_data.raw.ut.extend(new_ut) self.scan_data.raw.prescale.extend(new_prescale) self.scan_data.raw.error_count.extend(new_error_count) self.scan_data.raw.sample_count.extend(new_sample_count) self.scan_data.raw.vertical_range.extend(new_vertical_range) self.scan_data.raw.horizontal_range.extend(new_horizontal_range) if EYE_SCAN_2D_PLOT in scan_report: ber_floor_value = scan_report[EYE_SCAN_2D_PLOT][EYE_SCAN_2D_PLOT_BER_FLOOR_VALUE] plot_params = scan_report[EYE_SCAN_SCAN_PARAMETERS] if self.metric_data is None: self.metric_data = MetricData( open_area=0, open_percentage=0, vertical_opening=0, vertical_percentage=0, horizontal_opening=0, horizontal_percentage=0, ) scan_points: Dict[Tuple[int, int], ScanPoint] = dict() for key, data in scan_report[EYE_SCAN_2D_PLOT][EYE_SCAN_2D_PLOT_DATA].items(): x, y = [int(coordinate) for coordinate in key.split(", ")] ber = data["BER"] errors = data["Errors"] samples = data["Sample"] data_point = (x, y) if data_point in scan_points: scan_point = scan_points[data_point] # Combine existing BER with new BER scan_point.ber += ber # Combine errors if new error != 0 and old error == 0 if errors != 0 and scan_point.errors == 0: scan_point.errors = errors self.open_data_points -= 1 self.metric_data.open_area -= ( plot_params[EYE_SCAN_HORZ_STEP] * plot_params[EYE_SCAN_VERT_STEP] ) else: scan_points[data_point] = ScanPoint(x, y, ber, errors, samples) if errors == 0: self.open_data_points += 1 self.metric_data.open_area += ( plot_params[EYE_SCAN_HORZ_STEP] * plot_params[EYE_SCAN_VERT_STEP] ) self.scan_data.processed = Plot2DData( scan_points=scan_points, ber_floor_value=ber_floor_value ) self.metric_data.open_percentage = round( float((self.open_data_points * 100) / len(scan_points)), 2 ) self._calculate_vertical_horizontal_opening(plot_params) if EYE_SCAN_PROGRESS in scan_report: self.progress = round(float(scan_report[EYE_SCAN_PROGRESS]), 2) if EYE_SCAN_STATUS in scan_report: self.status = scan_report[EYE_SCAN_STATUS] # If rich pkg is available send update common_progress.update( task_id=self._task_id, completed=int(self.progress), status=self._get_status(), ) if self.status in {EYE_SCAN_ABORTED, EYE_SCAN_DONE}: # If scan is done create the plot if self.status == EYE_SCAN_DONE: self.plot = EyeScanPlot(self) else: assert self.status == EYE_SCAN_ABORTED self.error = scan_report.get("Error", "Not available") printer(f"Reason for aborting {self.name} - {self.error}", level="info") # If user has registered done callback function call it. if callable(self.done_callback): try: self.done_callback(self) except Exception as e: printer( f"Unhandled exception during eye scan done callback!\n" f"Exception - {str(e)}", level="warning", ) self._scan_done_event.set() else: # If user has registered progress callback function call it. if callable(self.progress_callback): try: self.progress_callback(self.progress) except Exception as e: printer( f"Unhandled exception during eye scan progress callback!\n" f"Exception - {str(e)}", level="warning", ) except Exception as e: printer( f"Unhandled scan update exception on TCF thread!\nException - {str(e)}", level="warning", )
[docs] def wait_till_done(self): """ Block current thread execution till the eye scan completes OR it is aborted by cs_server/HW. """ if self._handle_from_cs_server is None: raise RuntimeError(f"Please start the scan before waiting for it to finish!") self._scan_done_event.wait() self._handle_from_cs_server = None
[docs] def stop(self): """ Stop eye scan, that is in-progress in the MicroBlaze """ self.rx.core_tcf_node.terminate_eye_scan(rx_name=self.rx.handle)
[docs] def generate_report(self): """ Generate a report for this eye scan object and send it to the "printer" for printing """ report = Table(title=f"{self.name} report", box=BOX_SQUARE) report.add_column("Property", justify="right") report.add_column("Value", justify="left") report.add_row(f"RX", f"{self.rx.handle}") report.add_row(f"Status", self._get_status()) report.add_row(f"Progress", f"{self.progress}%") report.add_row(f"Start time", f"{self.start_time}") report.add_row(f"Stop time", f"{self.stop_time}") report.add_row(f"MB ELF version", f"{self.elf_version}") report.add_row( f"#Data points expected", f"{self.data_points_expected}", ) report.add_row(f"#Data points read", f"{self.data_points_read}") report.add_row("", "") scan_params = Table(box=BOX_SQUARE) scan_params.add_column("Parameter", justify="right") scan_params.add_column("Value", justify="left") for param in self.params.values(): scan_params.add_row( f"{param.name}", f"{param.default_value if param.value is None else param.value}", ) report.add_row(f"Scan parameters", scan_params) printer("\n") printer(report) printer("\n")