Source code for chipscopy.api.device.device

# Copyright (C) 2021-2022, Xilinx, Inc.
# Copyright (C) 2022-2023, Advanced Micro Devices, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import re
import sys
import time
from collections import defaultdict, deque
from enum import Enum
from pathlib import Path
from typing import Optional, Union, Dict, List, Set, Any, NewType, Literal
from struct import pack, unpack

from chipscopy.api import DMNodeListener
from chipscopy.api._detail.ltx import parse_ltx_files  # noqa
from chipscopy.api._detail.trace import Trace  # noqa
from chipscopy.api.containers import QueryList
from chipscopy.api.ddr.ddr import DDR
from chipscopy.api.device.client_program_log import ClientProgramLogStrategy
from chipscopy.api.device.device_spec import DeviceSpec
from chipscopy.api.hbm.hbm import HBM
from chipscopy.api.ibert import IBERT
from chipscopy.api.ila import ILA
from chipscopy.api.memory import Memory
from chipscopy.api.noc.noc import NocPerfmon
from chipscopy.api.pcie import PCIe
from chipscopy.api.sysmon import Sysmon
from chipscopy.api.vio import VIO
from chipscopy.client import ServerInfo
from chipscopy.client.axis_ila_core_client import AxisIlaCoreClient
from chipscopy.client.axis_pcie_core_client import AxisPCIeCoreClient
from chipscopy.client.axis_trace_core_client import AxisTraceClient
from chipscopy.client.axis_vio_core_client import AxisVIOCoreClient
from chipscopy.client.core import CoreParent
from chipscopy.client.ddrmc_client import DDRMCClient
from chipscopy.client.hbm_client import HBMClient
from chipscopy.client.ibert_core_client import IBERTCoreClient
from chipscopy.client.jtagdevice import JtagCable, JtagDevice
from chipscopy.client.mem import MemoryNode
from chipscopy.client.noc_perfmon_core_client import NoCPerfMonCoreClient
from chipscopy.client.sysmon_core_client import SysMonCoreClient
from chipscopy.dm import chipscope, request, Node
from chipscopy.utils.printer import printer, PercentProgressBar
from chipscopy.api.device.device_scanner import (
    scan_all_views,
)
from chipscopy.api.device.device_util import copy_node_props
from chipscopy.utils.logger import log


[docs]class FeatureNotAvailableError(Exception): def __init__(self): super().__init__("Feature not available on this architecture")
[docs]class DeviceException(Exception): pass
[docs]class DeviceState(Enum): VALID = 0 NEEDS_REFRESH = 1 INVALID = 2
[docs]class DeviceFamily(Enum): GENERIC = "generic" UPLUS = "ultrascaleplus" VERSAL = "versal" XVC = "xvc" @staticmethod def get_family_for_name(family_name: str) -> "DeviceFamily": if family_name == "versal": return DeviceFamily.VERSAL elif family_name.startswith("xvc"): return DeviceFamily.XVC elif family_name.endswith("uplus") or family_name.endswith("u"): return DeviceFamily.UPLUS else: return DeviceFamily.GENERIC
[docs]class Device: """The Device class represents a single device. It is the base class for other Xilinx or Generic devices. Depending on the device type, some features may be available or unavailable. Device has the kitchen sink of properties functions to make documentation and type hints easy. Child classes raise FeatureNotAvailable for functions they do not support. """ _CORE_TYPE_TO_CLASS_MAP = { "vio": AxisVIOCoreClient, "ila": AxisIlaCoreClient, "trace": AxisTraceClient, # todo: remove after hw_server is updated. "Core": AxisTraceClient, "pcie": AxisPCIeCoreClient, "npi_nir": NoCPerfMonCoreClient, "ibert": IBERTCoreClient, "sysmon": SysMonCoreClient, "ddrmc_main": DDRMCClient, "hbm": HBMClient, } def __init__( self, *, hw_server: ServerInfo, cs_server: Optional[ServerInfo], device_spec: DeviceSpec, disable_core_scan: bool, disable_cache: bool, ): self.state = DeviceState.NEEDS_REFRESH self.cached_props = {} self.hw_server = hw_server self.cs_server = cs_server self._device_spec = device_spec self.default_target: Literal["DPC", "DAP"] = "DPC" self.family_name = self._device_spec.get_arch_name() self.device_family = DeviceFamily.get_family_for_name(self.family_name) self.part_name = self._device_spec.get_part_name() self.dna = self._device_spec.get_dna() self.jtag_index = self._device_spec.jtag_index self.disable_core_scan = disable_core_scan self.disable_cache = disable_cache self.ltx = None self.ltx_sources = [] self.cores_to_scan = {} # set during discover_and_setup_cores self.programming_error: Optional[str] = None # Keep error for query after programming self.refresh() # The following filter_by becomes a dictionary with architecture, jtag_index, context, etc. # This is used when asking for a device by filter from the device list like: # report_devices(session.devices.get(dna=".*1234")) # Needs improvement because QueryList filtering does not work with hierarchy # Should always be the last part of __init__ because of to_dict() call... self.filter_by = self.to_dict() def __str__(self) -> str: return f"{self.part_name}:{self.dna}:{self.context}" def __repr__(self) -> str: return self.to_json() def __getitem__(self, item): props = self.to_dict() if item in props: return props[item] else: raise AttributeError(f"No property {str(item)}") def _raise_if_state_invalid(self): if self.state == DeviceState.INVALID: raise DeviceException(f"Device {str(self)} is invalid - session rescan required") def refresh(self, *, force_update=True): # Refresh device properties from hardware self._raise_if_state_invalid() self.cached_props = self.scan_props(force_update=force_update) if not self.disable_cache: self.state = DeviceState.VALID def node_callback(self, action: DMNodeListener.NodeAction, node: Node, props: Optional[Set]): # Called from session when a TCF node event happens # TODO: TypeError: 'ParodyLogger' object is not callable # log.client.trace(f"node_callback: action={action}, node={node.ctx}") if not node or action == DMNodeListener.NodeAction.NODE_ADDED: # Maybe in the future we will track if a child node was added to anything in the # Device tree... For now just ignore. return # Top level device contexts that can represent the device - track for device removal root_device_contexts = {self._device_spec.device_ctx, self._device_spec.jtag_device_ctx} # Expanded set of device contexts that are important to track for node changes all_device_contexts = { self._device_spec.device_ctx, self._device_spec.jtag_device_ctx, self._device_spec.debugcore_dap_ctx, self._device_spec.debugcore_dpc_ctx, self._device_spec.memory_dap_ctx, self._device_spec.memory_dpc_ctx, self._device_spec.chipscope_dap_ctx, self._device_spec.chipscope_dpc_ctx, } if action == DMNodeListener.NodeAction.NODE_REMOVED and node.ctx in root_device_contexts: # Removed a node that is the anchor for this device... Invalidate the device so any subsequent # user API calls return an invalid device message # TODO: TypeError: 'ParodyLogger' object is not callable # log.client.warn(f"DEVICE {str(self)} marked invalid -- rescan required!") self.state = DeviceState.INVALID device_node = self._device_spec.get_device_node() if device_node: # Remove any cached information for the invalid device device_node.device_wrapper = None elif action == DMNodeListener.NodeAction.NODE_CHANGED and node.ctx in all_device_contexts: # One of our watched nodes changed - need to re-scan properties from hardware on next call # TODO: TypeError: 'ParodyLogger' object is not callable # log.client.debug( # f"NODE_CHANGED on device {str(self)}, Node: {node.ctx} - setting NEEDS_REFRESH" # ) self.state = DeviceState.NEEDS_REFRESH
[docs] def scan_props(self, *, force_update=True) -> Dict[str, Any]: """ Scan device properties from hardware. Returns: Dict of properties """ self._raise_if_state_invalid() props = {} if self.hw_server and self.jtag_node: node = self.jtag_node props["jtag"] = copy_node_props(node, node.props, force_update=force_update) props["is_valid"] = self.is_valid props["context"] = self.context props["is_programmed"] = self.scan_is_programmed(force_update=force_update) props["is_programmable"] = self.scan_is_programmable() props["part"] = self.part_name props["family"] = self.family_name props["dna"] = self.dna props["jtag_index"] = self.jtag_index props["cable_context"] = self._device_spec.jtag_cable_ctx props["jtag_context"] = self._device_spec.jtag_device_ctx props["cable_name"] = self._device_spec.get_jtag_cable_name() props = dict(sorted(props.items())) # noqa return props
[docs] def to_dict(self) -> Dict[str, Any]: """Returns a dictionary representation of the device data""" self._raise_if_state_invalid() if self.state == DeviceState.NEEDS_REFRESH: self.refresh() return self.cached_props
[docs] def to_json(self) -> str: """Returns a json representation of the device data""" raw_json = json.dumps(self.to_dict(), indent=4, default=lambda o: str(o)) return raw_json
# NODE / LOW LEVEL @property def context(self) -> str: """Returns the unique context for this device. Normally this is the jtag node context. If this device is not on a jtag chain, the top level device context will be a non-jtag node. """ return self._device_spec.device_ctx
[docs] def scan_is_programmed(self, *, force_update=True) -> bool: """Returns True if this Device is programmed, False otherwise.""" # TODO: Queries hardware for jtag program status. This could be slow - # is there a way to collect this once, then track by subsequent events pushed from # the hw_server if we detect DONE went low? self._raise_if_state_invalid() is_programmed_ = False try: node = self.jtag_node is_programmable = node.props.get("is_programmable") if is_programmable: regs = node.props.get("regs") if regs: jtag_status = node.props.get("regs").get("jtag_status") config_status = node.props.get("regs").get("config_status") if jtag_status: if force_update: node.update_regs(reg_names=("jtag_status",), force=True, done=None) done_reg = jtag_status.fields[0]["DONE"] is_programmed_ = done_reg["value"] == 1 elif config_status: if force_update: node.update_regs(reg_names=("config_status",), force=True, done=None) done_reg = config_status.fields["DONE_PIN"] is_programmed_ = done_reg["value"] == 1 except Exception: # noqa is_programmed_ = False return is_programmed_
@property def is_programmed(self) -> bool: """Returns True if this Device is programmed, False otherwise.""" d = self.to_dict() return d.get("is_programmed", False)
[docs] def scan_is_programmable(self) -> bool: """Returns True if this Device is programmable, False otherwise""" is_programmable_ = False node = self.jtag_node if node: return node.props.get("is_programmable", False)
@property def is_programmable(self) -> bool: """Returns True if this Device is programmable, False otherwise""" d = self.to_dict() return d.get("is_programmable", False) @property def is_valid(self) -> bool: """Returns True if this Node is valid, False otherwise.""" return self._device_spec.is_valid @property def jtag_cable_node(self) -> Optional[JtagCable]: """Low level TCF node access - For advanced use""" return self._device_spec.get_jtag_cable_node() @property def jtag_node(self) -> Optional[JtagDevice]: """Low level TCF node access - For advanced use""" return self._device_spec.get_jtag_node() @property def debugcore_node(self) -> Optional[CoreParent]: """Low level TCF node access - For advanced use""" return self._device_spec.get_debugcore_node(target=None, default_target=self.default_target) @property def chipscope_node(self) -> Optional[CoreParent]: """Low level TCF node access - For advanced use""" return self._device_spec.get_chipscope_node(target=None, default_target=self.default_target) # DEBUG CORES @property def ibert_cores(self) -> QueryList[IBERT]: """Returns: list of detected IBERT cores in the device""" self._raise_if_family_not(DeviceFamily.VERSAL, DeviceFamily.UPLUS) self._raise_if_state_invalid() return self._get_debug_core_wrappers(IBERTCoreClient) @property def vio_cores(self) -> QueryList[VIO]: """Returns: list of detected VIO cores in the device""" self._raise_if_family_not(DeviceFamily.VERSAL, DeviceFamily.UPLUS) self._raise_if_state_invalid() return self._get_debug_core_wrappers(AxisVIOCoreClient) @property def ila_cores(self) -> QueryList[ILA]: """Returns: list of detected ILA cores in the device""" self._raise_if_family_not(DeviceFamily.VERSAL, DeviceFamily.UPLUS) self._raise_if_state_invalid() return self._get_debug_core_wrappers(AxisIlaCoreClient) @property def trace_cores(self) -> QueryList[Trace]: """Returns: list of detected Trace cores in the device""" self._raise_if_family_not(DeviceFamily.VERSAL) self._raise_if_state_invalid() return self._get_debug_core_wrappers(AxisTraceClient) @property def pcie_cores(self) -> QueryList[PCIe]: """Returns: list of detected PCIe cores in the device""" self._raise_if_family_not(DeviceFamily.VERSAL) self._raise_if_state_invalid() return self._get_debug_core_wrappers(AxisPCIeCoreClient) @property def noc_core(self) -> QueryList[NocPerfmon]: """Returns: List with NOC roots for the device""" self._raise_if_family_not(DeviceFamily.VERSAL) self._raise_if_state_invalid() return self._get_debug_core_wrappers(NoCPerfMonCoreClient) @property def ddrs(self) -> QueryList[DDR]: """Returns: list of detected DDRMC cores in the device""" self._raise_if_family_not(DeviceFamily.VERSAL) self._raise_if_state_invalid() return self._get_debug_core_wrappers(DDRMCClient) @property def hbms(self) -> QueryList[HBM]: """Returns: list of detected HBM cores in the device""" self._raise_if_family_not(DeviceFamily.VERSAL) self._raise_if_state_invalid() return self._get_debug_core_wrappers(HBMClient) @property def sysmon_root(self) -> QueryList[Sysmon]: """Returns: List with one SysMon core for the device""" self._raise_if_family_not(DeviceFamily.VERSAL) self._raise_if_state_invalid() return self._get_debug_core_wrappers(SysMonCoreClient) @staticmethod def _set_client_wrapper(node, api_client_wrapper): # Tacks a wrapper onto a node for tracking. Always call this # instead of setting directly on node - in case we change the # client wrapper tracking process. node.api_client_wrapper = api_client_wrapper @staticmethod def _get_client_wrapper(node): # Gets the tacked on wrapper from a node. return node.api_client_wrapper def _create_debugcore_wrapper(self, node): # Factory to build debug_core_wrappers # Instantiate debug core wrappers for a known core type. The wrapper gets attached to a # node for safe keeping. Any time we need to grab a debug core wrapper (like ILA) # it is available on the node using # wrapper = self._get_client_wrapper(node) if node.type == "ila": debug_core_wrapper = ILA(node, self, ltx=self.ltx) elif node.type == "vio": debug_core_wrapper = VIO(node, ltx=self.ltx) elif node.type == "trace": debug_core_wrapper = Trace(node) # todo: remove after hw_server is updated. elif node.type == "Core": debug_core_wrapper = Trace(node) elif node.type == "pcie": debug_core_wrapper = PCIe(node) elif node.type == "npi_nir": debug_core_wrapper = NocPerfmon(node) elif node.type == "ibert": debug_core_wrapper = IBERT(node) elif node.type == "sysmon": debug_core_wrapper = Sysmon(node) elif node.type == "ddrmc_main": debug_core_wrapper = DDR(node) elif node.type == "hbm": debug_core_wrapper = HBM(node) else: # TODO: extend this factory to other core types with a plugin model for additional types debug_core_wrapper = None return debug_core_wrapper def _find_all_debugcore_nodes(self) -> Dict: # Traverse the chipscope view for debug cores in this device. # Returns a dict mapping core_types to a list of found nodes. # Notes are returned in upgraded form. # Example return: # { # AxisIlaCoreClient: [ila_node_1, ila_node_2, ...], # AxisVIOCoreClient: [vio_node_1, vio_node_2, ...], # ... # } # The first time calling this function is slow because nodes have # not been upgraded yet. Slow is around 1 second. Subsequent calls are # faster, around 1 ms. # # Best to avoid calling this function in an inner loop. cs_view = self.cs_server.get_view(chipscope) found_cores = defaultdict(list) def check_and_upgrade_node(node): if self.cores_to_scan.get(cs_node.type, True): core_class_type = Device._CORE_TYPE_TO_CLASS_MAP.get(node.type, None) if core_class_type and core_class_type.is_compatible(node): upgraded_node = cs_view.get_node(node.ctx, core_class_type) found_cores[core_class_type].append(upgraded_node) # Using the self.chipscope_node restricts nodes to this device only for cs_node in cs_view.get_children(self.chipscope_node): if cs_node.type == "debug_hub": debug_hub_children = cs_view.get_children(cs_node) for child in debug_hub_children: check_and_upgrade_node(child) else: check_and_upgrade_node(cs_node) return found_cores def _get_debug_core_wrappers(self, core_type) -> QueryList: # Returns a QueryList of the debug_core_wrappers matching the given core_type # core_type is a class from CORE_TYPE_TO_CLASS_MAP # returned wrapper_type is ILA, VIO, etc. wrapping the node # # debug_core_wrappers are initialized if they don't yet exist for the # node. After initialization, they are hung on the node for future reference. # This is convenient because their lifecycle is tied to the node. found_cores = QueryList() all_debug_core_nodes = self._find_all_debugcore_nodes() for node in all_debug_core_nodes.get(core_type, []): debug_core_wrapper = Device._get_client_wrapper(node) if debug_core_wrapper is None: # Lazy debugcore initialization here. # Initialization happens the first time we access the debug core. debug_core_wrapper = self._create_debugcore_wrapper(node) self._set_client_wrapper(node, debug_core_wrapper) # Now the node should have a proper debug_core_wrapper attached. if Device._get_client_wrapper(node): found_cores.append(Device._get_client_wrapper(node)) return found_cores
[docs] def discover_and_setup_cores( self, *, hub_address_list: Optional[List[int]] = None, ltx_file: Union[Path, str] = None, **kwargs, ): """Scan device for debug cores. This may take some time depending on what gets scanned. Args: hub_address_list: List of debug hub addresses to scan ltx_file: LTX file (which contains debug hub addresses) Keyword Arguments: ila_scan: True=Scan Device for ILAs vio_scan: True=Scan Device for VIOs ibert_scan: True=Scan Device for IBERTs pcie_scan: True=Scan Device for PCIEs noc_scan: True=Scan Device for NOC ddr_scan: True=Scan Device for DDRs hbm_scan: True=Scan Device for HBMs sysmon_scan: True=Scan Device for System Monitor """ # Selectively disable scanning of cores depending on what comes in # This is second priority to the disable_core_scan in __init__. self._raise_if_family_not(DeviceFamily.VERSAL, DeviceFamily.UPLUS) self._raise_if_state_invalid() self.cores_to_scan = {} if self.disable_core_scan: for core_name in Device._CORE_TYPE_TO_CLASS_MAP.keys(): self.cores_to_scan[core_name] = False else: self.cores_to_scan = { "ila": kwargs.get("ila_scan", True), "vio": kwargs.get("vio_scan", True), "npi_nir": kwargs.get("noc_scan", False), "ddrmc_main": kwargs.get("ddr_scan", True), "hbm": kwargs.get("hbm_scan", True), "pcie": kwargs.get("pcie_scan", True), "ibert": kwargs.get("ibert_scan", True), "sysmon": kwargs.get("sysmon_scan", False), } if not self.cs_server: raise RuntimeError("cs_server is not connected") self.ltx = None # Make sure debug_hub_addrs is a list of hub addresses. # It may come as an int or a list or None if isinstance(hub_address_list, int): hub_address_list = [hub_address_list] if not hub_address_list: hub_address_list = [] if ltx_file: self.ltx, self.ltx_sources, _ = parse_ltx_files(ltx_file) # noqa hub_address_list.extend(self.ltx.get_debug_hub_addresses()) # Remove any duplicate hub addresses in list, in case same one # was passed to function as well as in ltx # Below is a quick python trick to uniquify a list. hub_address_list = list(dict.fromkeys(hub_address_list)) # Set up all debug cores in hw_server for the given hub addresses # Do this by default unless the user has explicitly told us not to with # "disable_core_scan" as an argument. if not self.disable_core_scan or self.device_family == DeviceFamily.UPLUS: if not self.cs_server: raise RuntimeError("No chipscope server connection. Could not get chipscope view") cs_node = self.chipscope_node if cs_node: cs_node.setup_cores(debug_hub_addrs=hub_address_list) else: # print(self._device_spec.to_json()) raise RuntimeError("chipscope_node not available")
# MEMORY @property def memory_target_names(self) -> List[str]: """Returns: list of supported memory targets""" self._raise_if_family_not(DeviceFamily.VERSAL) self._raise_if_state_invalid() valid_memory_targets = ( r"(Versal .*)|(DPC)|(APU)|(RPU)|(PPU)|(PSM)|(Cortex.*)|(MicroBlaze.*)|(RISC-V.*)" ) memory_view = self.hw_server.get_view("memory") memory_node_list = [] mem_dpc_node = self._device_spec.get_memory_node(target="DPC") if mem_dpc_node: memory_node_list.append(mem_dpc_node) mem_dap_node = self._device_spec.get_memory_node(target="DAP") if mem_dap_node: memory_node_list.append(mem_dap_node) nodes_to_travel = deque(memory_node_list) memory_node_list = [] while nodes_to_travel: node = nodes_to_travel.popleft() if MemoryNode.is_compatible(node): memory_node = memory_view.get_node(node.ctx, MemoryNode) memory_node_list.append(memory_node) nodes_to_travel.extendleft(memory_view.get_children(node)) # Now the memory_node_list has all memory nodes. # Make the list of what we found and return it memory_target_names = [] for node in memory_node_list: memory_target_name = node.props.get("Name") if memory_target_name and re.search(valid_memory_targets, memory_target_name): memory_target_names.append(memory_target_name) return memory_target_names @property def memory(self) -> QueryList[Memory]: """Returns: the memory access for the device""" self._raise_if_family_not(DeviceFamily.VERSAL) self._raise_if_state_invalid() target_nodes = self._device_spec.get_all_memory_nodes() node_list = QueryList() for node in target_nodes: node_list.append(node) return node_list
[docs] def memory_read(self, address: int, num: int = 1, *, size="w", target=None) -> List[int]: """Read num values from given memory address. This method is slow because it locates the memory context in the target tree every time. If you want to execute a large number of memory operations, grab a memory context and do all the operations in batch. Note: This method should not be used in inner loops. It is not the fastest because it looks up the memory target every time. Inner loops should just call the find_memory_target once. """ self._raise_if_family_not(DeviceFamily.VERSAL) self._raise_if_state_invalid() node = self._device_spec.search_memory_node_deep(target, default_target=self.default_target) if not node: raise (ValueError(f"Could not find memory node for target {target}")) retval = node.memory_read(address=address, num=num, size=size) return retval
[docs] def memory_write(self, address: int, values: List[int], *, size="w", target=None): """Write list of values to given memory address. This method is slow because it locates the memory context in the target tree every time. If you want to execute a large number of memory operations, grab a memory context and do the operations in batch. Note: This method should not be used in inner loops. It is not the fastest because it looks up the memory target every time. Inner loops should just call the find_memory_target once. """ self._raise_if_family_not(DeviceFamily.VERSAL) node = self._device_spec.search_memory_node_deep(target, default_target=self.default_target) if not node: raise (ValueError(f"Could not find memory node for target {target}")) node.memory_write(address=address, values=values, size=size)
def _dpc_read_bytes( self, address: int, data: bytearray, *, offset: int = 0, byte_size: int = None, show_progress_bar: bool = True, ): """ Reads byte data from memory, using the DPC. Args: address (int): Start address data (bytearray): result buffer which gets filled with read data. offset (int): start index in *data* to start write to. byte_size (int): Number of bytes to read. Default is the size of the *data* buffer. show_progress_bar (bool): Show if True. """ self._raise_if_state_invalid() node = self.debugcore_node if not show_progress_bar: node.read_bytes(address, data, offset, byte_size) return # With progress bar transfer_byte_count = byte_size if byte_size else len(data) desc = ( f"Transferring {transfer_byte_count/(1024 * 1024):0.1f} MB " f"[bold](Read {hex(address)})[/]" ) bar = PercentProgressBar() bar.add_task(description=desc, status=PercentProgressBar.Status.STARTING) try: node.read_bytes( address, data, offset, byte_size, progress=lambda progress: bar.update( completed=int(progress * 100), status=PercentProgressBar.Status.IN_PROGRESS ), ) except Exception as ex: bar.update(status=PercentProgressBar.Status.ABORTED) raise ex bar.update(status=PercentProgressBar.Status.DONE) def _dpc_write_bytes( self, address: int, data: bytearray, *, offset: int = 0, byte_size: int = None, show_progress_bar: bool = True, ): """ Write byte data from memory, using the DPC. Args: address (int): Start address data (bytearray): data to write. offset (int): start index in *data* to start write from. byte_size (int): Number of bytes to write. Default is the size of the *data* buffer. show_progress_bar (bool): Show if True. """ self._raise_if_state_invalid() node = self.debugcore_node if not show_progress_bar: node.write_bytes(address, data, offset, byte_size) return # With progress bar transfer_byte_count = byte_size if byte_size else len(data) desc = ( f"Transferring {transfer_byte_count/(1024 * 1024):0.1f} MB " f"[bold](Write {hex(address)})[/]" ) bar = PercentProgressBar() bar.add_task(description=desc, status=PercentProgressBar.Status.STARTING) try: node.write_bytes( address, data, offset, byte_size, progress=lambda progress: bar.update( completed=int(progress * 100), status=PercentProgressBar.Status.IN_PROGRESS ), ) except Exception as ex: bar.update(status=PercentProgressBar.Status.ABORTED) raise ex bar.update(status=PercentProgressBar.Status.DONE) # PROGRAMMING
[docs] def get_plm_log(self, memory_target="Versal.+", slr_index: int = 0) -> str: """ Returns: Raw log read from hardware Args: memory_target: Optional name of memory target such as APU, RPU, DPC slr_index: Optional SLR index for multi-SLR devices """ self._raise_if_family_not(DeviceFamily.VERSAL) self._raise_if_state_invalid() try: mem = self.memory.get(name=memory_target) except ValueError: raise ValueError(f"Memory target {memory_target} not available") return mem.plm_log(slr_index=slr_index)
[docs] def get_program_log(self, memory_target="Versal.+") -> str: """ Returns: Programming log read from hardware (None=Use default transport) Args: memory_target: Optional name of memory target such as APU, RPU, DPC slr_index: Optional SLR index for multi-SLR devices """ self._raise_if_family_not(DeviceFamily.VERSAL) self._raise_if_state_invalid() try: mem = self.memory.get(name=memory_target) except ValueError: raise ValueError(f"Memory target {memory_target} not available") report_generator = ClientProgramLogStrategy( regs=self.jtag_node.props.get("regs", {}), plm_log_fn=lambda x: mem.plm_log(slr_index=x) ) return report_generator.generate_program_log()
[docs] def write_program_log(self, outfile: Union[str, Path], *, memory_target="Versal.+"): """ Write an enhanced program log to a file. """ if isinstance(outfile, str): outfile = Path(outfile) try: report_str = self.get_program_log(memory_target=memory_target) if outfile is None: sys.stdout.write(report_str) else: with outfile.open("w") as file: file.write(report_str) except Exception as e: raise IOError(f"Failed to write report to {outfile}: {str(e)}")
[docs] def reset(self): """Reset the device to a non-programmed state.""" self._raise_if_family_not(DeviceFamily.VERSAL, DeviceFamily.UPLUS) self._raise_if_state_invalid() jtag_programming_node = self.jtag_node assert jtag_programming_node is not None self.state = DeviceState.NEEDS_REFRESH jtag_programming_node.future().reset() time.sleep(0.5) self.refresh()
[docs] def program( self, programming_file: Union[str, Path], *, skip_reset: bool = False, delay_after_program: int = 0, show_progress_bar: bool = True, progress: request.ProgressFutureCallback = None, done: request.DoneFutureCallback = None, ): """Program the device with a given programming file (bit or pdi). Args: programming_file: PDI file path skip_reset: False = Do not reset device prior to program delay_after_program: Seconds to delay at end of programming (default=0) show_progress_bar: False if the progress bar doesn't need to be shown progress: Optional progress callback done: Optional async future callback """ self.programming_error = None self._raise_if_family_not(DeviceFamily.VERSAL, DeviceFamily.UPLUS) self._raise_if_state_invalid() program_future = request.CsFutureSync(done=done, progress=progress) self.state = DeviceState.NEEDS_REFRESH if isinstance(programming_file, str): programming_file = Path(programming_file) if not programming_file.exists(): raise FileNotFoundError( f"Programming file: {str(programming_file.resolve())} doesn't exists!" ) printer(f"Programming device with: {str(programming_file.resolve())}\n", level="info") if show_progress_bar: progress_ = PercentProgressBar() progress_.add_task( description="Device program progress", status=PercentProgressBar.Status.STARTING, visible=show_progress_bar, ) def progress_update(future): if program_future: program_future.set_progress(future.progress) if show_progress_bar: progress_.update( completed=future.progress * 100, status=PercentProgressBar.Status.IN_PROGRESS ) def pre_done_programming(future): # refresh jtag node properties after program (1190699) if future.error: self.programming_error = future.error jtag_programming_node.future().update_regs( reg_names=[], force=True, done=done_programming ) def done_programming(future): if not self.programming_error: self.programming_error = future.error if show_progress_bar: if self.programming_error is not None: progress_.update(status=PercentProgressBar.Status.ABORTED) else: for i in range(delay_after_program): # This pushes end-of-config node events to the listeners # They seem to happen within about 2-3 seconds - need a more # reliable way to wait for them to finish. # If we don't wait, the DPC node may not be ready for use. time.sleep(0.5) if self.cs_server: self.cs_server.get_view("chipscope").run_events() time.sleep(0.5) # refresh device jtag node properties after program (1190699) self.refresh(force_update=False) progress_.update(completed=100, status=PercentProgressBar.Status.DONE) if self.programming_error is not None: program_future.set_exception(self.programming_error) else: program_future.set_result(None) def finalize_program(): if self.programming_error is not None: raise RuntimeError(self.programming_error) jtag_programming_node = self.jtag_node assert jtag_programming_node is not None if not skip_reset: jtag_programming_node.future().reset() jtag_programming_node.future( progress=progress_update, done=pre_done_programming, final=finalize_program ).config(str(programming_file.resolve())) return program_future if done else program_future.result
def _raise_if_family_not(self, *args): if self.device_family == DeviceFamily.XVC: # special partial setup case for xvc sw testing - always ok return if self.device_family not in args: raise FeatureNotAvailableError()
# Legacy support - to not break old code that may have referenced the classes # that were removed (and now all implemented by Device) VersalDevice = NewType("VersalDevice", Device) UltrascaleDevice = NewType("UltrascaleDevice", Device) GenericDevice = NewType("GenericDevice", Device) def discover_devices( hw_server: ServerInfo, cs_server: ServerInfo = None, disable_core_scan: bool = False, cable_ctx: Optional[str] = None, disable_cache: bool = False, ) -> QueryList[Device]: log.client.debug( f"discover_devices: hw_server={hw_server}, cs_server={cs_server}, disable_core_scan={disable_core_scan}, cable_ctx={cable_ctx}", disable_cache={disable_cache}, ) include_dna = True devices: QueryList[Device] = QueryList() idx = 0 for key, device_record_list in scan_all_views(hw_server, cs_server, include_dna).items(): device_spec = DeviceSpec.create_from_device_records( hw_server, cs_server, device_record_list ) if not device_spec.is_valid: # This can happen when a chipscope server is serving multiple sessions. # The cs_server reports back connection information for the # other sessions which should be ignored. log.client.debug(f"discover_devices: No device_node for dna key: {key} --> ignored.\n") continue if cable_ctx is None or device_spec.jtag_cable_ctx == cable_ctx: device_node = device_spec.get_device_node() assert device_node is not None if device_node.device_wrapper is None: device_node.device_wrapper = Device( hw_server=hw_server, cs_server=cs_server, device_spec=device_spec, disable_core_scan=disable_core_scan, disable_cache=disable_cache, ) log.client.debug( f"discover_devices: created new device: {device_node.device_wrapper}" ) else: log.client.debug( f"discover_devices: reusing device wrapper: {device_node.device_wrapper}" ) devices.append(device_node.device_wrapper) log.client.info(f"discover_devices: {idx}: {device_node.device_wrapper}") idx += 1 return devices