# Copyright (C) 2021-2022, Xilinx, Inc.
# Copyright (C) 2022-2023, Advanced Micro Devices, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import re
import sys
import time
from collections import defaultdict, deque
from enum import Enum
from pathlib import Path
from typing import Optional, Union, Dict, List, Set, Any, NewType, Literal
from struct import pack, unpack
from chipscopy.api import DMNodeListener
from chipscopy.api._detail.ltx import parse_ltx_files # noqa
from chipscopy.api._detail.trace import Trace # noqa
from chipscopy.api.containers import QueryList
from chipscopy.api.ddr.ddr import DDR
from chipscopy.api.device.client_program_log import ClientProgramLogStrategy
from chipscopy.api.device.device_spec import DeviceSpec
from chipscopy.api.hbm.hbm import HBM
from chipscopy.api.ibert import IBERT
from chipscopy.api.ila import ILA
from chipscopy.api.memory import Memory
from chipscopy.api.noc.noc import NocPerfmon
from chipscopy.api.pcie import PCIe
from chipscopy.api.sysmon import Sysmon
from chipscopy.api.vio import VIO
from chipscopy.client import ServerInfo
from chipscopy.client.axis_ila_core_client import AxisIlaCoreClient
from chipscopy.client.axis_pcie_core_client import AxisPCIeCoreClient
from chipscopy.client.axis_trace_core_client import AxisTraceClient
from chipscopy.client.axis_vio_core_client import AxisVIOCoreClient
from chipscopy.client.core import CoreParent
from chipscopy.client.ddrmc_client import DDRMCClient
from chipscopy.client.hbm_client import HBMClient
from chipscopy.client.ibert_core_client import IBERTCoreClient
from chipscopy.client.jtagdevice import JtagCable, JtagDevice
from chipscopy.client.mem import MemoryNode
from chipscopy.client.noc_perfmon_core_client import NoCPerfMonCoreClient
from chipscopy.client.sysmon_core_client import SysMonCoreClient
from chipscopy.dm import chipscope, request, Node
from chipscopy.utils.printer import printer, PercentProgressBar
from chipscopy.api.device.device_scanner import (
scan_all_views,
)
from chipscopy.api.device.device_util import copy_node_props
from chipscopy.utils.logger import log
[docs]class FeatureNotAvailableError(Exception):
def __init__(self):
super().__init__("Feature not available on this architecture")
[docs]class DeviceException(Exception):
pass
[docs]class DeviceState(Enum):
VALID = 0
NEEDS_REFRESH = 1
INVALID = 2
[docs]class DeviceFamily(Enum):
GENERIC = "generic"
UPLUS = "ultrascaleplus"
VERSAL = "versal"
XVC = "xvc"
@staticmethod
def get_family_for_name(family_name: str) -> "DeviceFamily":
if family_name == "versal":
return DeviceFamily.VERSAL
elif family_name.startswith("xvc"):
return DeviceFamily.XVC
elif family_name.endswith("uplus") or family_name.endswith("u"):
return DeviceFamily.UPLUS
else:
return DeviceFamily.GENERIC
[docs]class Device:
"""The Device class represents a single device. It is the base class for other
Xilinx or Generic devices. Depending on the device type, some features may be available or unavailable.
Device has the kitchen sink of properties functions to make documentation and type hints easy.
Child classes raise FeatureNotAvailable for functions they do not support.
"""
_CORE_TYPE_TO_CLASS_MAP = {
"vio": AxisVIOCoreClient,
"ila": AxisIlaCoreClient,
"trace": AxisTraceClient,
# todo: remove after hw_server is updated.
"Core": AxisTraceClient,
"pcie": AxisPCIeCoreClient,
"npi_nir": NoCPerfMonCoreClient,
"ibert": IBERTCoreClient,
"sysmon": SysMonCoreClient,
"ddrmc_main": DDRMCClient,
"hbm": HBMClient,
}
def __init__(
self,
*,
hw_server: ServerInfo,
cs_server: Optional[ServerInfo],
device_spec: DeviceSpec,
disable_core_scan: bool,
disable_cache: bool,
):
self.state = DeviceState.NEEDS_REFRESH
self.cached_props = {}
self.hw_server = hw_server
self.cs_server = cs_server
self._device_spec = device_spec
self.default_target: Literal["DPC", "DAP"] = "DPC"
self.family_name = self._device_spec.get_arch_name()
self.device_family = DeviceFamily.get_family_for_name(self.family_name)
self.part_name = self._device_spec.get_part_name()
self.dna = self._device_spec.get_dna()
self.jtag_index = self._device_spec.jtag_index
self.disable_core_scan = disable_core_scan
self.disable_cache = disable_cache
self.ltx = None
self.ltx_sources = []
self.cores_to_scan = {} # set during discover_and_setup_cores
self.programming_error: Optional[str] = None # Keep error for query after programming
self.refresh()
# The following filter_by becomes a dictionary with architecture, jtag_index, context, etc.
# This is used when asking for a device by filter from the device list like:
# report_devices(session.devices.get(dna=".*1234"))
# Needs improvement because QueryList filtering does not work with hierarchy
# Should always be the last part of __init__ because of to_dict() call...
self.filter_by = self.to_dict()
def __str__(self) -> str:
return f"{self.part_name}:{self.dna}:{self.context}"
def __repr__(self) -> str:
return self.to_json()
def __getitem__(self, item):
props = self.to_dict()
if item in props:
return props[item]
else:
raise AttributeError(f"No property {str(item)}")
def _raise_if_state_invalid(self):
if self.state == DeviceState.INVALID:
raise DeviceException(f"Device {str(self)} is invalid - session rescan required")
def refresh(self, *, force_update=True):
# Refresh device properties from hardware
self._raise_if_state_invalid()
self.cached_props = self.scan_props(force_update=force_update)
if not self.disable_cache:
self.state = DeviceState.VALID
def node_callback(self, action: DMNodeListener.NodeAction, node: Node, props: Optional[Set]):
# Called from session when a TCF node event happens
# TODO: TypeError: 'ParodyLogger' object is not callable
# log.client.trace(f"node_callback: action={action}, node={node.ctx}")
if not node or action == DMNodeListener.NodeAction.NODE_ADDED:
# Maybe in the future we will track if a child node was added to anything in the
# Device tree... For now just ignore.
return
# Top level device contexts that can represent the device - track for device removal
root_device_contexts = {self._device_spec.device_ctx, self._device_spec.jtag_device_ctx}
# Expanded set of device contexts that are important to track for node changes
all_device_contexts = {
self._device_spec.device_ctx,
self._device_spec.jtag_device_ctx,
self._device_spec.debugcore_dap_ctx,
self._device_spec.debugcore_dpc_ctx,
self._device_spec.memory_dap_ctx,
self._device_spec.memory_dpc_ctx,
self._device_spec.chipscope_dap_ctx,
self._device_spec.chipscope_dpc_ctx,
}
if action == DMNodeListener.NodeAction.NODE_REMOVED and node.ctx in root_device_contexts:
# Removed a node that is the anchor for this device... Invalidate the device so any subsequent
# user API calls return an invalid device message
# TODO: TypeError: 'ParodyLogger' object is not callable
# log.client.warn(f"DEVICE {str(self)} marked invalid -- rescan required!")
self.state = DeviceState.INVALID
device_node = self._device_spec.get_device_node()
if device_node:
# Remove any cached information for the invalid device
device_node.device_wrapper = None
elif action == DMNodeListener.NodeAction.NODE_CHANGED and node.ctx in all_device_contexts:
# One of our watched nodes changed - need to re-scan properties from hardware on next call
# TODO: TypeError: 'ParodyLogger' object is not callable
# log.client.debug(
# f"NODE_CHANGED on device {str(self)}, Node: {node.ctx} - setting NEEDS_REFRESH"
# )
self.state = DeviceState.NEEDS_REFRESH
[docs] def scan_props(self, *, force_update=True) -> Dict[str, Any]:
"""
Scan device properties from hardware.
Returns: Dict of properties
"""
self._raise_if_state_invalid()
props = {}
if self.hw_server and self.jtag_node:
node = self.jtag_node
props["jtag"] = copy_node_props(node, node.props, force_update=force_update)
props["is_valid"] = self.is_valid
props["context"] = self.context
props["is_programmed"] = self.scan_is_programmed(force_update=force_update)
props["is_programmable"] = self.scan_is_programmable()
props["part"] = self.part_name
props["family"] = self.family_name
props["dna"] = self.dna
props["jtag_index"] = self.jtag_index
props["cable_context"] = self._device_spec.jtag_cable_ctx
props["jtag_context"] = self._device_spec.jtag_device_ctx
props["cable_name"] = self._device_spec.get_jtag_cable_name()
props = dict(sorted(props.items())) # noqa
return props
[docs] def to_dict(self) -> Dict[str, Any]:
"""Returns a dictionary representation of the device data"""
self._raise_if_state_invalid()
if self.state == DeviceState.NEEDS_REFRESH:
self.refresh()
return self.cached_props
[docs] def to_json(self) -> str:
"""Returns a json representation of the device data"""
raw_json = json.dumps(self.to_dict(), indent=4, default=lambda o: str(o))
return raw_json
# NODE / LOW LEVEL
@property
def context(self) -> str:
"""Returns the unique context for this device. Normally this is the
jtag node context. If this device is not on a jtag chain, the top
level device context will be a non-jtag node.
"""
return self._device_spec.device_ctx
[docs] def scan_is_programmed(self, *, force_update=True) -> bool:
"""Returns True if this Device is programmed, False otherwise."""
# TODO: Queries hardware for jtag program status. This could be slow -
# is there a way to collect this once, then track by subsequent events pushed from
# the hw_server if we detect DONE went low?
self._raise_if_state_invalid()
is_programmed_ = False
try:
node = self.jtag_node
is_programmable = node.props.get("is_programmable")
if is_programmable:
regs = node.props.get("regs")
if regs:
jtag_status = node.props.get("regs").get("jtag_status")
config_status = node.props.get("regs").get("config_status")
if jtag_status:
if force_update:
node.update_regs(reg_names=("jtag_status",), force=True, done=None)
done_reg = jtag_status.fields[0]["DONE"]
is_programmed_ = done_reg["value"] == 1
elif config_status:
if force_update:
node.update_regs(reg_names=("config_status",), force=True, done=None)
done_reg = config_status.fields["DONE_PIN"]
is_programmed_ = done_reg["value"] == 1
except Exception: # noqa
is_programmed_ = False
return is_programmed_
@property
def is_programmed(self) -> bool:
"""Returns True if this Device is programmed, False otherwise."""
d = self.to_dict()
return d.get("is_programmed", False)
[docs] def scan_is_programmable(self) -> bool:
"""Returns True if this Device is programmable, False otherwise"""
is_programmable_ = False
node = self.jtag_node
if node:
return node.props.get("is_programmable", False)
@property
def is_programmable(self) -> bool:
"""Returns True if this Device is programmable, False otherwise"""
d = self.to_dict()
return d.get("is_programmable", False)
@property
def is_valid(self) -> bool:
"""Returns True if this Node is valid, False otherwise."""
return self._device_spec.is_valid
@property
def jtag_cable_node(self) -> Optional[JtagCable]:
"""Low level TCF node access - For advanced use"""
return self._device_spec.get_jtag_cable_node()
@property
def jtag_node(self) -> Optional[JtagDevice]:
"""Low level TCF node access - For advanced use"""
return self._device_spec.get_jtag_node()
@property
def debugcore_node(self) -> Optional[CoreParent]:
"""Low level TCF node access - For advanced use"""
return self._device_spec.get_debugcore_node(target=None, default_target=self.default_target)
@property
def chipscope_node(self) -> Optional[CoreParent]:
"""Low level TCF node access - For advanced use"""
return self._device_spec.get_chipscope_node(target=None, default_target=self.default_target)
# DEBUG CORES
@property
def ibert_cores(self) -> QueryList[IBERT]:
"""Returns:
list of detected IBERT cores in the device"""
self._raise_if_family_not(DeviceFamily.VERSAL, DeviceFamily.UPLUS)
self._raise_if_state_invalid()
return self._get_debug_core_wrappers(IBERTCoreClient)
@property
def vio_cores(self) -> QueryList[VIO]:
"""Returns:
list of detected VIO cores in the device"""
self._raise_if_family_not(DeviceFamily.VERSAL, DeviceFamily.UPLUS)
self._raise_if_state_invalid()
return self._get_debug_core_wrappers(AxisVIOCoreClient)
@property
def ila_cores(self) -> QueryList[ILA]:
"""Returns:
list of detected ILA cores in the device"""
self._raise_if_family_not(DeviceFamily.VERSAL, DeviceFamily.UPLUS)
self._raise_if_state_invalid()
return self._get_debug_core_wrappers(AxisIlaCoreClient)
@property
def trace_cores(self) -> QueryList[Trace]:
"""Returns:
list of detected Trace cores in the device"""
self._raise_if_family_not(DeviceFamily.VERSAL)
self._raise_if_state_invalid()
return self._get_debug_core_wrappers(AxisTraceClient)
@property
def pcie_cores(self) -> QueryList[PCIe]:
"""Returns:
list of detected PCIe cores in the device"""
self._raise_if_family_not(DeviceFamily.VERSAL)
self._raise_if_state_invalid()
return self._get_debug_core_wrappers(AxisPCIeCoreClient)
@property
def noc_core(self) -> QueryList[NocPerfmon]:
"""Returns:
List with NOC roots for the device"""
self._raise_if_family_not(DeviceFamily.VERSAL)
self._raise_if_state_invalid()
return self._get_debug_core_wrappers(NoCPerfMonCoreClient)
@property
def ddrs(self) -> QueryList[DDR]:
"""Returns:
list of detected DDRMC cores in the device"""
self._raise_if_family_not(DeviceFamily.VERSAL)
self._raise_if_state_invalid()
return self._get_debug_core_wrappers(DDRMCClient)
@property
def hbms(self) -> QueryList[HBM]:
"""Returns:
list of detected HBM cores in the device"""
self._raise_if_family_not(DeviceFamily.VERSAL)
self._raise_if_state_invalid()
return self._get_debug_core_wrappers(HBMClient)
@property
def sysmon_root(self) -> QueryList[Sysmon]:
"""Returns:
List with one SysMon core for the device"""
self._raise_if_family_not(DeviceFamily.VERSAL)
self._raise_if_state_invalid()
return self._get_debug_core_wrappers(SysMonCoreClient)
@staticmethod
def _set_client_wrapper(node, api_client_wrapper):
# Tacks a wrapper onto a node for tracking. Always call this
# instead of setting directly on node - in case we change the
# client wrapper tracking process.
node.api_client_wrapper = api_client_wrapper
@staticmethod
def _get_client_wrapper(node):
# Gets the tacked on wrapper from a node.
return node.api_client_wrapper
def _create_debugcore_wrapper(self, node):
# Factory to build debug_core_wrappers
# Instantiate debug core wrappers for a known core type. The wrapper gets attached to a
# node for safe keeping. Any time we need to grab a debug core wrapper (like ILA)
# it is available on the node using
# wrapper = self._get_client_wrapper(node)
if node.type == "ila":
debug_core_wrapper = ILA(node, self, ltx=self.ltx)
elif node.type == "vio":
debug_core_wrapper = VIO(node, ltx=self.ltx)
elif node.type == "trace":
debug_core_wrapper = Trace(node)
# todo: remove after hw_server is updated.
elif node.type == "Core":
debug_core_wrapper = Trace(node)
elif node.type == "pcie":
debug_core_wrapper = PCIe(node)
elif node.type == "npi_nir":
debug_core_wrapper = NocPerfmon(node)
elif node.type == "ibert":
debug_core_wrapper = IBERT(node)
elif node.type == "sysmon":
debug_core_wrapper = Sysmon(node)
elif node.type == "ddrmc_main":
debug_core_wrapper = DDR(node)
elif node.type == "hbm":
debug_core_wrapper = HBM(node)
else:
# TODO: extend this factory to other core types with a plugin model for additional types
debug_core_wrapper = None
return debug_core_wrapper
def _find_all_debugcore_nodes(self) -> Dict:
# Traverse the chipscope view for debug cores in this device.
# Returns a dict mapping core_types to a list of found nodes.
# Notes are returned in upgraded form.
# Example return:
# {
# AxisIlaCoreClient: [ila_node_1, ila_node_2, ...],
# AxisVIOCoreClient: [vio_node_1, vio_node_2, ...],
# ...
# }
# The first time calling this function is slow because nodes have
# not been upgraded yet. Slow is around 1 second. Subsequent calls are
# faster, around 1 ms.
#
# Best to avoid calling this function in an inner loop.
cs_view = self.cs_server.get_view(chipscope)
found_cores = defaultdict(list)
def check_and_upgrade_node(node):
if self.cores_to_scan.get(cs_node.type, True):
core_class_type = Device._CORE_TYPE_TO_CLASS_MAP.get(node.type, None)
if core_class_type and core_class_type.is_compatible(node):
upgraded_node = cs_view.get_node(node.ctx, core_class_type)
found_cores[core_class_type].append(upgraded_node)
# Using the self.chipscope_node restricts nodes to this device only
for cs_node in cs_view.get_children(self.chipscope_node):
if cs_node.type == "debug_hub":
debug_hub_children = cs_view.get_children(cs_node)
for child in debug_hub_children:
check_and_upgrade_node(child)
else:
check_and_upgrade_node(cs_node)
return found_cores
def _get_debug_core_wrappers(self, core_type) -> QueryList:
# Returns a QueryList of the debug_core_wrappers matching the given core_type
# core_type is a class from CORE_TYPE_TO_CLASS_MAP
# returned wrapper_type is ILA, VIO, etc. wrapping the node
#
# debug_core_wrappers are initialized if they don't yet exist for the
# node. After initialization, they are hung on the node for future reference.
# This is convenient because their lifecycle is tied to the node.
found_cores = QueryList()
all_debug_core_nodes = self._find_all_debugcore_nodes()
for node in all_debug_core_nodes.get(core_type, []):
debug_core_wrapper = Device._get_client_wrapper(node)
if debug_core_wrapper is None:
# Lazy debugcore initialization here.
# Initialization happens the first time we access the debug core.
debug_core_wrapper = self._create_debugcore_wrapper(node)
self._set_client_wrapper(node, debug_core_wrapper)
# Now the node should have a proper debug_core_wrapper attached.
if Device._get_client_wrapper(node):
found_cores.append(Device._get_client_wrapper(node))
return found_cores
[docs] def discover_and_setup_cores(
self,
*,
hub_address_list: Optional[List[int]] = None,
ltx_file: Union[Path, str] = None,
**kwargs,
):
"""Scan device for debug cores. This may take some time depending on
what gets scanned.
Args:
hub_address_list: List of debug hub addresses to scan
ltx_file: LTX file (which contains debug hub addresses)
Keyword Arguments:
ila_scan: True=Scan Device for ILAs
vio_scan: True=Scan Device for VIOs
ibert_scan: True=Scan Device for IBERTs
pcie_scan: True=Scan Device for PCIEs
noc_scan: True=Scan Device for NOC
ddr_scan: True=Scan Device for DDRs
hbm_scan: True=Scan Device for HBMs
sysmon_scan: True=Scan Device for System Monitor
"""
# Selectively disable scanning of cores depending on what comes in
# This is second priority to the disable_core_scan in __init__.
self._raise_if_family_not(DeviceFamily.VERSAL, DeviceFamily.UPLUS)
self._raise_if_state_invalid()
self.cores_to_scan = {}
if self.disable_core_scan:
for core_name in Device._CORE_TYPE_TO_CLASS_MAP.keys():
self.cores_to_scan[core_name] = False
else:
self.cores_to_scan = {
"ila": kwargs.get("ila_scan", True),
"vio": kwargs.get("vio_scan", True),
"npi_nir": kwargs.get("noc_scan", False),
"ddrmc_main": kwargs.get("ddr_scan", True),
"hbm": kwargs.get("hbm_scan", True),
"pcie": kwargs.get("pcie_scan", True),
"ibert": kwargs.get("ibert_scan", True),
"sysmon": kwargs.get("sysmon_scan", False),
}
if not self.cs_server:
raise RuntimeError("cs_server is not connected")
self.ltx = None
# Make sure debug_hub_addrs is a list of hub addresses.
# It may come as an int or a list or None
if isinstance(hub_address_list, int):
hub_address_list = [hub_address_list]
if not hub_address_list:
hub_address_list = []
if ltx_file:
self.ltx, self.ltx_sources, _ = parse_ltx_files(ltx_file) # noqa
hub_address_list.extend(self.ltx.get_debug_hub_addresses())
# Remove any duplicate hub addresses in list, in case same one
# was passed to function as well as in ltx
# Below is a quick python trick to uniquify a list.
hub_address_list = list(dict.fromkeys(hub_address_list))
# Set up all debug cores in hw_server for the given hub addresses
# Do this by default unless the user has explicitly told us not to with
# "disable_core_scan" as an argument.
if not self.disable_core_scan or self.device_family == DeviceFamily.UPLUS:
if not self.cs_server:
raise RuntimeError("No chipscope server connection. Could not get chipscope view")
cs_node = self.chipscope_node
if cs_node:
cs_node.setup_cores(debug_hub_addrs=hub_address_list)
else:
# print(self._device_spec.to_json())
raise RuntimeError("chipscope_node not available")
# MEMORY
@property
def memory_target_names(self) -> List[str]:
"""Returns:
list of supported memory targets"""
self._raise_if_family_not(DeviceFamily.VERSAL)
self._raise_if_state_invalid()
valid_memory_targets = (
r"(Versal .*)|(DPC)|(APU)|(RPU)|(PPU)|(PSM)|(Cortex.*)|(MicroBlaze.*)|(RISC-V.*)"
)
memory_view = self.hw_server.get_view("memory")
memory_node_list = []
mem_dpc_node = self._device_spec.get_memory_node(target="DPC")
if mem_dpc_node:
memory_node_list.append(mem_dpc_node)
mem_dap_node = self._device_spec.get_memory_node(target="DAP")
if mem_dap_node:
memory_node_list.append(mem_dap_node)
nodes_to_travel = deque(memory_node_list)
memory_node_list = []
while nodes_to_travel:
node = nodes_to_travel.popleft()
if MemoryNode.is_compatible(node):
memory_node = memory_view.get_node(node.ctx, MemoryNode)
memory_node_list.append(memory_node)
nodes_to_travel.extendleft(memory_view.get_children(node))
# Now the memory_node_list has all memory nodes.
# Make the list of what we found and return it
memory_target_names = []
for node in memory_node_list:
memory_target_name = node.props.get("Name")
if memory_target_name and re.search(valid_memory_targets, memory_target_name):
memory_target_names.append(memory_target_name)
return memory_target_names
@property
def memory(self) -> QueryList[Memory]:
"""Returns:
the memory access for the device"""
self._raise_if_family_not(DeviceFamily.VERSAL)
self._raise_if_state_invalid()
target_nodes = self._device_spec.get_all_memory_nodes()
node_list = QueryList()
for node in target_nodes:
node_list.append(node)
return node_list
[docs] def memory_read(self, address: int, num: int = 1, *, size="w", target=None) -> List[int]:
"""Read num values from given memory address. This method is slow because it locates the memory context
in the target tree every time. If you want to execute a large number of memory operations, grab a memory
context and do all the operations in batch.
Note: This method should not be used in inner loops. It is not the fastest because it looks up the memory
target every time. Inner loops should just call the find_memory_target once.
"""
self._raise_if_family_not(DeviceFamily.VERSAL)
self._raise_if_state_invalid()
node = self._device_spec.search_memory_node_deep(target, default_target=self.default_target)
if not node:
raise (ValueError(f"Could not find memory node for target {target}"))
retval = node.memory_read(address=address, num=num, size=size)
return retval
[docs] def memory_write(self, address: int, values: List[int], *, size="w", target=None):
"""Write list of values to given memory address. This method is slow because it locates the memory context
in the target tree every time. If you want to execute a large number of memory operations, grab a memory
context and do the operations in batch.
Note: This method should not be used in inner loops. It is not the fastest because it looks up the memory
target every time. Inner loops should just call the find_memory_target once.
"""
self._raise_if_family_not(DeviceFamily.VERSAL)
node = self._device_spec.search_memory_node_deep(target, default_target=self.default_target)
if not node:
raise (ValueError(f"Could not find memory node for target {target}"))
node.memory_write(address=address, values=values, size=size)
def _dpc_read_bytes(
self,
address: int,
data: bytearray,
*,
offset: int = 0,
byte_size: int = None,
show_progress_bar: bool = True,
):
"""
Reads byte data from memory, using the DPC.
Args:
address (int): Start address
data (bytearray): result buffer which gets filled with read data.
offset (int): start index in *data* to start write to.
byte_size (int): Number of bytes to read. Default is the size of the *data* buffer.
show_progress_bar (bool): Show if True.
"""
self._raise_if_state_invalid()
node = self.debugcore_node
if not show_progress_bar:
node.read_bytes(address, data, offset, byte_size)
return
# With progress bar
transfer_byte_count = byte_size if byte_size else len(data)
desc = (
f"Transferring {transfer_byte_count/(1024 * 1024):0.1f} MB "
f"[bold](Read {hex(address)})[/]"
)
bar = PercentProgressBar()
bar.add_task(description=desc, status=PercentProgressBar.Status.STARTING)
try:
node.read_bytes(
address,
data,
offset,
byte_size,
progress=lambda progress: bar.update(
completed=int(progress * 100), status=PercentProgressBar.Status.IN_PROGRESS
),
)
except Exception as ex:
bar.update(status=PercentProgressBar.Status.ABORTED)
raise ex
bar.update(status=PercentProgressBar.Status.DONE)
def _dpc_write_bytes(
self,
address: int,
data: bytearray,
*,
offset: int = 0,
byte_size: int = None,
show_progress_bar: bool = True,
):
"""
Write byte data from memory, using the DPC.
Args:
address (int): Start address
data (bytearray): data to write.
offset (int): start index in *data* to start write from.
byte_size (int): Number of bytes to write. Default is the size of the *data* buffer.
show_progress_bar (bool): Show if True.
"""
self._raise_if_state_invalid()
node = self.debugcore_node
if not show_progress_bar:
node.write_bytes(address, data, offset, byte_size)
return
# With progress bar
transfer_byte_count = byte_size if byte_size else len(data)
desc = (
f"Transferring {transfer_byte_count/(1024 * 1024):0.1f} MB "
f"[bold](Write {hex(address)})[/]"
)
bar = PercentProgressBar()
bar.add_task(description=desc, status=PercentProgressBar.Status.STARTING)
try:
node.write_bytes(
address,
data,
offset,
byte_size,
progress=lambda progress: bar.update(
completed=int(progress * 100), status=PercentProgressBar.Status.IN_PROGRESS
),
)
except Exception as ex:
bar.update(status=PercentProgressBar.Status.ABORTED)
raise ex
bar.update(status=PercentProgressBar.Status.DONE)
# PROGRAMMING
[docs] def get_plm_log(self, memory_target="Versal.+", slr_index: int = 0) -> str:
"""
Returns: Raw log read from hardware
Args:
memory_target: Optional name of memory target such as APU, RPU, DPC
slr_index: Optional SLR index for multi-SLR devices
"""
self._raise_if_family_not(DeviceFamily.VERSAL)
self._raise_if_state_invalid()
try:
mem = self.memory.get(name=memory_target)
except ValueError:
raise ValueError(f"Memory target {memory_target} not available")
return mem.plm_log(slr_index=slr_index)
[docs] def get_program_log(self, memory_target="Versal.+") -> str:
"""
Returns: Programming log read from hardware (None=Use default transport)
Args:
memory_target: Optional name of memory target such as APU, RPU, DPC
slr_index: Optional SLR index for multi-SLR devices
"""
self._raise_if_family_not(DeviceFamily.VERSAL)
self._raise_if_state_invalid()
try:
mem = self.memory.get(name=memory_target)
except ValueError:
raise ValueError(f"Memory target {memory_target} not available")
report_generator = ClientProgramLogStrategy(
regs=self.jtag_node.props.get("regs", {}), plm_log_fn=lambda x: mem.plm_log(slr_index=x)
)
return report_generator.generate_program_log()
[docs] def write_program_log(self, outfile: Union[str, Path], *, memory_target="Versal.+"):
"""
Write an enhanced program log to a file.
"""
if isinstance(outfile, str):
outfile = Path(outfile)
try:
report_str = self.get_program_log(memory_target=memory_target)
if outfile is None:
sys.stdout.write(report_str)
else:
with outfile.open("w") as file:
file.write(report_str)
except Exception as e:
raise IOError(f"Failed to write report to {outfile}: {str(e)}")
[docs] def reset(self):
"""Reset the device to a non-programmed state."""
self._raise_if_family_not(DeviceFamily.VERSAL, DeviceFamily.UPLUS)
self._raise_if_state_invalid()
jtag_programming_node = self.jtag_node
assert jtag_programming_node is not None
self.state = DeviceState.NEEDS_REFRESH
jtag_programming_node.future().reset()
time.sleep(0.5)
self.refresh()
[docs] def program(
self,
programming_file: Union[str, Path],
*,
skip_reset: bool = False,
delay_after_program: int = 0,
show_progress_bar: bool = True,
progress: request.ProgressFutureCallback = None,
done: request.DoneFutureCallback = None,
):
"""Program the device with a given programming file (bit or pdi).
Args:
programming_file: PDI file path
skip_reset: False = Do not reset device prior to program
delay_after_program: Seconds to delay at end of programming (default=0)
show_progress_bar: False if the progress bar doesn't need to be shown
progress: Optional progress callback
done: Optional async future callback
"""
self.programming_error = None
self._raise_if_family_not(DeviceFamily.VERSAL, DeviceFamily.UPLUS)
self._raise_if_state_invalid()
program_future = request.CsFutureSync(done=done, progress=progress)
self.state = DeviceState.NEEDS_REFRESH
if isinstance(programming_file, str):
programming_file = Path(programming_file)
if not programming_file.exists():
raise FileNotFoundError(
f"Programming file: {str(programming_file.resolve())} doesn't exists!"
)
printer(f"Programming device with: {str(programming_file.resolve())}\n", level="info")
if show_progress_bar:
progress_ = PercentProgressBar()
progress_.add_task(
description="Device program progress",
status=PercentProgressBar.Status.STARTING,
visible=show_progress_bar,
)
def progress_update(future):
if program_future:
program_future.set_progress(future.progress)
if show_progress_bar:
progress_.update(
completed=future.progress * 100, status=PercentProgressBar.Status.IN_PROGRESS
)
def pre_done_programming(future):
# refresh jtag node properties after program (1190699)
if future.error:
self.programming_error = future.error
jtag_programming_node.future().update_regs(
reg_names=[], force=True, done=done_programming
)
def done_programming(future):
if not self.programming_error:
self.programming_error = future.error
if show_progress_bar:
if self.programming_error is not None:
progress_.update(status=PercentProgressBar.Status.ABORTED)
else:
for i in range(delay_after_program):
# This pushes end-of-config node events to the listeners
# They seem to happen within about 2-3 seconds - need a more
# reliable way to wait for them to finish.
# If we don't wait, the DPC node may not be ready for use.
time.sleep(0.5)
if self.cs_server:
self.cs_server.get_view("chipscope").run_events()
time.sleep(0.5)
# refresh device jtag node properties after program (1190699)
self.refresh(force_update=False)
progress_.update(completed=100, status=PercentProgressBar.Status.DONE)
if self.programming_error is not None:
program_future.set_exception(self.programming_error)
else:
program_future.set_result(None)
def finalize_program():
if self.programming_error is not None:
raise RuntimeError(self.programming_error)
jtag_programming_node = self.jtag_node
assert jtag_programming_node is not None
if not skip_reset:
jtag_programming_node.future().reset()
jtag_programming_node.future(
progress=progress_update, done=pre_done_programming, final=finalize_program
).config(str(programming_file.resolve()))
return program_future if done else program_future.result
def _raise_if_family_not(self, *args):
if self.device_family == DeviceFamily.XVC:
# special partial setup case for xvc sw testing - always ok
return
if self.device_family not in args:
raise FeatureNotAvailableError()
# Legacy support - to not break old code that may have referenced the classes
# that were removed (and now all implemented by Device)
VersalDevice = NewType("VersalDevice", Device)
UltrascaleDevice = NewType("UltrascaleDevice", Device)
GenericDevice = NewType("GenericDevice", Device)
def discover_devices(
hw_server: ServerInfo,
cs_server: ServerInfo = None,
disable_core_scan: bool = False,
cable_ctx: Optional[str] = None,
disable_cache: bool = False,
) -> QueryList[Device]:
log.client.debug(
f"discover_devices: hw_server={hw_server}, cs_server={cs_server}, disable_core_scan={disable_core_scan}, cable_ctx={cable_ctx}",
disable_cache={disable_cache},
)
include_dna = True
devices: QueryList[Device] = QueryList()
idx = 0
for key, device_record_list in scan_all_views(hw_server, cs_server, include_dna).items():
device_spec = DeviceSpec.create_from_device_records(
hw_server, cs_server, device_record_list
)
if not device_spec.is_valid:
# This can happen when a chipscope server is serving multiple sessions.
# The cs_server reports back connection information for the
# other sessions which should be ignored.
log.client.debug(f"discover_devices: No device_node for dna key: {key} --> ignored.\n")
continue
if cable_ctx is None or device_spec.jtag_cable_ctx == cable_ctx:
device_node = device_spec.get_device_node()
assert device_node is not None
if device_node.device_wrapper is None:
device_node.device_wrapper = Device(
hw_server=hw_server,
cs_server=cs_server,
device_spec=device_spec,
disable_core_scan=disable_core_scan,
disable_cache=disable_cache,
)
log.client.debug(
f"discover_devices: created new device: {device_node.device_wrapper}"
)
else:
log.client.debug(
f"discover_devices: reusing device wrapper: {device_node.device_wrapper}"
)
devices.append(device_node.device_wrapper)
log.client.info(f"discover_devices: {idx}: {device_node.device_wrapper}")
idx += 1
return devices