Source code for chipscopy.api.session

# Copyright (C) 2021-2022, Xilinx, Inc.
# Copyright (C) 2022-2023, Advanced Micro Devices, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import sys
import threading
import traceback
import socket
from typing import Optional, Dict, Any, List, Union, Callable, Set, Tuple

from chipscopy.api import DMNodeListener
from chipscopy.client.jtagdevice import JtagDevice, JtagCable
from chipscopy.dm import chipscope, Node
from chipscopy.utils.logger import log
from chipscopy.utils.version import version_consistency_check
from chipscopy.client import connect as client_connect
from chipscopy.client import disconnect as client_disconnect
from chipscopy.client.util import connect_hw, process_param_str, parse_params
from chipscopy.client.view_info import ViewInfo
from chipscopy.client.server_info import ServerInfo
from chipscopy.api.containers import QueryList
from chipscopy.api.device.device import Device, FeatureNotAvailableError, DeviceState
from chipscopy.api.device.device_util import get_jtag_view_dict
from chipscopy.api.memory import Memory
from chipscopy.api.cable import Cable, discover_devices, wait_for_all_cables_ready, discover_cables

DOMAIN_NAME = "client"


[docs]class Session: """Top level object that tracks a connection to a hardware server and optionally, chipscope server. To create and destroy a session, use the factory function create_session() and delete_session(). """ _session_lock = threading.RLock() # Global session tracking here. We keep a list of who is connected at any time. # Format: # dict[hex(id(session))] = (hw_server, cs_server, session) _connected_session_dict: Dict[str, Tuple[ServerInfo, ServerInfo, "Session"]] = {} # cs_server channel reference counting. # cs_server instances are shared for the same host/port. # Format: # dict[(normalized_host,port)] = (count, cs_server) _cs_server_channel_ref_count: Dict[Tuple[str, int], Tuple[int, ServerInfo]] = {} def __init__( self, *, hw_server_url: str, cs_server_url: Optional[str] = None, xvc_mm_server_url: Optional[str] = None, disable_core_scan: bool, bypass_version_check: bool, cable_timeout: int, disable_cache: bool, initial_device_scan: bool, cs_server_sharing: bool, ): self._cs_server_sharing: bool = cs_server_sharing self._disable_core_scan: bool = disable_core_scan self._bypass_version_check: bool = bypass_version_check self._hw_server_url: str = hw_server_url self._cs_server_url: str = cs_server_url self._xvc_mm_server_url: str = xvc_mm_server_url self._cable_timeout = cable_timeout self._cables_are_initialized = False self._disable_cache = disable_cache self._register_node_listeners = not self._disable_cache self._initial_device_scan = initial_device_scan self.hw_server: Optional[ServerInfo] = None self.cs_server: Optional[ServerInfo] = None self._need_to_scan_devices = True self._devices: QueryList[Device] = QueryList() self._device_lock = threading.Lock() self._dm_node_listener = DMNodeListener(self.node_callback) def __str__(self): return f"{self.handle}" def __repr__(self): return self.to_json() def __getitem__(self, item): props = self.to_dict() if item in props: return props[item] else: raise AttributeError(f"No property {str(item)}") def to_json(self): json_dict = json.dumps(self.to_dict(), indent=4) return json_dict def to_dict(self): ret_dict = { "name": self.handle, "hw_server_url": self._hw_server_url, "cs_server_url": self._cs_server_url, "xvc_mm_server_url": self._xvc_mm_server_url, "bypass_version_check": self._bypass_version_check, "disable_core_scan": self._disable_core_scan, "cable_timeout": self._cable_timeout, "handle": self.handle, } return ret_dict def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): if exc_type: traceback.print_exception(exc_type, exc_val, exc_tb) delete_session(self) def node_callback(self, action: DMNodeListener.NodeAction, node: Node, props: Optional[Set]): # This callback is called when nodes are added, changed, or removed. if node: valid_device_list = QueryList() devices = self._get_devices_with_lock() for device in devices: # Propagate node events to devices, so they can decide what to do with the event device.node_callback(action, node, props) if device.state == DeviceState.INVALID: # Not typical - could be a board power off situation # Anything that causes a device to be INVALID means we need to re-scan the # devices and get that one removed from the list. self._need_to_scan_devices = True else: valid_device_list.append(device) self._set_device_with_lock(valid_device_list) @classmethod def _add_connection( cls, hw_server: ServerInfo, cs_server: Optional[ServerInfo], session: "Session" ): with cls._session_lock: cls._connected_session_dict[hex(id(session))] = (hw_server, cs_server, session) @classmethod def _remove_connection(cls, session: "Session"): with cls._session_lock: del cls._connected_session_dict[hex(id(session))] @classmethod def _get_all_sessions(cls) -> List["Session"]: sessions = [] with cls._session_lock: for id_key, (_, _, session) in cls._connected_session_dict.items(): sessions.append(session) return sessions @classmethod def disconnect_all_sessions(cls): sessions_to_disconnect = [] with cls._session_lock: for id_key, (_, _, session) in cls._connected_session_dict.items(): sessions_to_disconnect.append(session) for session in sessions_to_disconnect: try: session.disconnect() except Exception: # pragma: no cover # TODO: Look into root cause later - happens intermittently in pytest infra causing tests to fail pass cls._connected_session_dict = {} cls._cs_server_channel_ref_count = {} @staticmethod def _parse_url(url: str) -> Tuple[str, int]: param_str = process_param_str(url) host_port_dict = parse_params(param_str) host = host_port_dict.get("Host") port = int(host_port_dict.get("Port")) return host, port def connect_hw_server(self): if not self._hw_server_url: # A hw_server is always required when creating a session raise ValueError("hw_server_url must point to a valid hw_server") if self.hw_server: raise RuntimeError("hw_server already connected") server = Session._connect_server("hw_server", self._hw_server_url, connect_hw) self.hw_server = server def disconnect_hw_server(self): if not self.hw_server: # silently ignore disconnect calls if no server was ever connected - no harm return client_disconnect(self.hw_server) self.hw_server = None def connect_cs_server(self): if not self._cs_server_url: # A cs_server is OPTIONAL required when creating a session - so no error return if self.cs_server: raise RuntimeError("cs_server already connected") # Due to a client limitation, we can not connect multiple times to the # server from the same process. This causes errors due to the manager being # removed on second connect. # # WORKAROUND: # Reference count to safely share the cs_server connection # host, port = Session._parse_url(self._cs_server_url) ip = socket.gethostbyname(host) # entry = Tuple(ref_count, cs_server) entry = Session._cs_server_channel_ref_count.get((ip, port)) if entry is None or not self._cs_server_sharing: # This is the first session - create ref_cnt entry and server server = Session._connect_server("cs_server", self._cs_server_url, client_connect) entry = (0, server) # Increment reference count for every session connection to same cs_server server = entry[1] entry = (entry[0] + 1, server) Session._cs_server_channel_ref_count[(ip, port)] = entry server.connect_remote(self.hw_server.url) if self._xvc_mm_server_url and server: server.connect_xvc(self._xvc_mm_server_url, self._hw_server_url) self.cs_server = server def disconnect_cs_server(self): if not self.cs_server: # silently ignore disconnect calls if no server was ever connected - no harm return host, port = Session._parse_url(self._cs_server_url) ip = socket.gethostbyname(host) # entry = Tuple(ref_count, cs_server) entry: Tuple[int, ServerInfo] = Session._cs_server_channel_ref_count.get((ip, port)) if not entry: # This should not be possible - no entry indicates something unbalanced happened. raise RuntimeError("Error during disconnect - no ref_cnt for session.cs_server") ref_cnt, server = entry ref_cnt -= 1 server.disconnect_remote(f"TCP:{self.hw_server.url}") self.cs_server = None if ref_cnt == 0 or not self._cs_server_sharing: # This is the last session - delete the ref count tracking entry from dict del Session._cs_server_channel_ref_count[(ip, port)] client_disconnect(server) else: # This is not the last session - just decrement the ref count Session._cs_server_channel_ref_count[(ip, port)] = (ref_cnt, server) def connect(self): self.connect_hw_server() with Session._session_lock: # cs_server changes are locked because they use a reference counted # shared cs_server - this is delicate self.connect_cs_server() Session._add_connection(self.hw_server, self.cs_server, self) try: # Quick sanity check - throws RuntimeError on version mismatch version_consistency_check(self.hw_server, self.cs_server, self._bypass_version_check) # Add event listeners if self._register_node_listeners: if self.hw_server: for view_name in ["memory", "jtag", "debugcore"]: self.hw_server.get_view(view_name).add_node_listener(self._dm_node_listener) if self.cs_server: self.cs_server.get_view("chipscope").add_node_listener(self._dm_node_listener) if self._initial_device_scan: # Ensures we have a set of pre-scanned devices ready to go for first use. # Downside is this does take up-front time, so it is optional. # If not done at session creation, it will be done when devices are first accessed with session.devices self.scan_devices() except RuntimeError: self.disconnect() t, v, tb = sys.exc_info() raise t(v).with_traceback(tb) def disconnect(self): # Cleanup registered event listeners if self._register_node_listeners: if self.hw_server: for view_name in ["memory", "jtag", "debugcore"]: self.hw_server.get_view(view_name).remove_node_listener(self._dm_node_listener) if self.cs_server: self.cs_server.get_view("chipscope").remove_node_listener(self._dm_node_listener) with Session._session_lock: # cs_server changes are locked because they use a reference counted # shared cs_server - this is delicate self.disconnect_cs_server() self.disconnect_hw_server() Session._remove_connection(self)
[docs] def set_param(self, params: Dict[str, Any]): """Generic parameter get and set for low level chipscope server params""" if not isinstance(params, dict): message = "Please provide the params to set as a dictionary!" log[DOMAIN_NAME].error(message) raise TypeError(message) cs_service = self.cs_server.get_sync_service("ChipScope") cs_service.set_css_param(params)
[docs] def get_param(self, params: Union[str, List[str]]) -> Dict[str, str]: """Generic parameter get and set for low level chipscope server params""" if isinstance(params, str): params = [params] cs_service = self.cs_server.get_sync_service("ChipScope") return cs_service.get_css_param(params).get()
@property def handle(self) -> str: if self.cs_server and self.hw_server: handle_str = f"{self.cs_server.url}<->{self.hw_server.url}" elif self.hw_server: handle_str = f"{self.hw_server.url}" else: handle_str = "no_hw_server<->no_cs_server" return handle_str @property def chipscope_view(self) -> ViewInfo: view = None if self.cs_server: view = self.cs_server.get_view(chipscope) return view @property def jtag_devices(self) -> QueryList[JtagDevice]: def matcher(node, key, value): if getattr(node, key, None) == value: return True elif node.props.get(key) == value: return True return False devices: QueryList[JtagDevice] = QueryList() devices.set_custom_match_function(matcher) jtag_view_dict = get_jtag_view_dict(self.hw_server) view = self.hw_server.get_view("jtag") for cable_values in jtag_view_dict.values(): devices_dict = cable_values.get("devices", {}) jtag_index = 0 for device_ctx in devices_dict.keys(): jtag_device = view.get_node(ctx=device_ctx, cls=JtagDevice) # TODO: Check the correct way to get the jtag_index. I couldn't find in the props so I just add 1 each # time we find a device on the cable... jtag_device.cable_serial = cable_values["props"].get("Serial") jtag_device.jtag_index = jtag_index jtag_index += 1 devices.append(jtag_device) return devices @property def jtag_cables(self) -> QueryList[JtagCable]: # TODO: Transition jtag_cables -> target_cables. # target_cables will return a higher level wrapper that can represent a virtual cable with virtual # devices in the future. jtag_cables here only support the JtagCable node. def matcher(node, key, value): if getattr(node, key, None) == value: return True elif node.props.get(key) == value: return True return False jtag_cables: QueryList[JtagCable] = QueryList() jtag_cables.set_custom_match_function(matcher) jtag_view_dict = get_jtag_view_dict(self.hw_server) view = self.hw_server.get_view("jtag") for jtag_cable_ctx, cable_values in jtag_view_dict.items(): jtag_cable = view.get_node(ctx=jtag_cable_ctx, cls=JtagCable) jtag_cables.append(jtag_cable) return jtag_cables @property def cables(self) -> QueryList[Cable]: """Returns a list of all cables connected to the hw_server. Similar to vivado get_hw_targets command. target_cables may be jtag or virtual. """ retval = discover_cables( hw_server=self.hw_server, cs_server=self.cs_server, disable_core_scan=self._disable_core_scan, timeout=self._cable_timeout, ) return retval def scan_devices(self) -> QueryList[Device]: if not self._cables_are_initialized: wait_for_all_cables_ready(self.hw_server, self._cable_timeout) self._cables_are_initialized = True devices = discover_devices( hw_server=self.hw_server, cs_server=self.cs_server, disable_core_scan=self._disable_core_scan, cable_ctx=None, disable_cache=self._disable_cache, ) self._set_device_with_lock(devices) return QueryList(self._devices) def _get_devices_with_lock(self) -> QueryList[Device]: with self._device_lock: return QueryList(self._devices) def _set_device_with_lock(self, devices: QueryList) -> None: with self._device_lock: self._devices = QueryList(devices) self._need_to_scan_devices = False @property def devices(self) -> QueryList[Device]: """Returns a list of devices connected to this hw_server and cable. Devices may span multiple jtag chains. No ordering is guaranteed. Devices are cached after scanning until an event changes state and requires a re-scan if caching is enabled. """ if self._disable_cache or self._need_to_scan_devices: # Force scan from hardware (slow) devices = self.scan_devices() else: # Use the cached device list (fast) devices = self._get_devices_with_lock() return devices @property def memory(self) -> QueryList[Memory]: memory_node_list = QueryList() # This is not fast - improve it later. We don't need to query each # device - but it's clear and simple now. for device in self.devices: try: memory_nodes = device.memory for node in memory_nodes: memory_node_list.append(node) except FeatureNotAvailableError: pass return memory_node_list
[docs] @staticmethod def set_log_level(level: str = None): """ Set the logging level for the ChipScoPy client. This applies to all sessions. Default is "NONE" Args: level: The minimum level to use for the logger. Valid levels are "TRACE", "DEBUG", "INFO", "SUCCESS", "WARNING", "ERROR", "CRITICAL", "NONE" """ if level is None: log.disable_domain(DOMAIN_NAME) valid_levels = ["TRACE", "DEBUG", "INFO", "SUCCESS", "WARNING", "ERROR", "CRITICAL", "NONE"] level = level.upper() assert level in valid_levels log.change_log_level(level) log.enable_domain(DOMAIN_NAME)
@staticmethod def _connect_server(server_name: str, server_url: str, connect_func: Callable) -> ServerInfo: try: return connect_func(server_url) except ConnectionRefusedError: # Shorten stack dump, by creating new exception. raise ConnectionRefusedError( f"Connection could not be opened to {server_name} @ {server_url}.\n" f" Please make sure that {server_name} is running at the URL provided, " f"and firewall is not blocking." ) except ConnectionError: # Shorten stack dump, by creating new exception. raise ConnectionRefusedError( f"Connection could not be opened to {server_name} @ {server_url}.\n" f" Please make sure that {server_name} is running at the URL provided, " f"and firewall is not blocking." ) except Exception as ex: # Chain exceptions. raise Exception( f"Connection could not be opened to {server_name} @ {server_url}." ) from ex
############################################################################### # Factory methods below ###############################################################################
[docs]def create_session(*, hw_server_url: str, cs_server_url: Optional[str] = None, **kwargs) -> Session: """ Create a new session. Connect to the specified hw_server, and optionally the cs_server. - hw_server is used for programming and Versal Memory read/write operations - cs_server is used for higher level debug core communication Example 1: Default session create :: my_session = create_session(hw_server_url="TCP:localhost:3121", cs_server_url="TCP:localhost:3042") Example 2: Disable core scanning and server version checking :: my_session = create_session(hw_server_url="TCP:localhost:3121", cs_server_url="TCP:localhost:3042", disable_core_scan=True, bypass_version_check=True) Args: hw_server_url: Hardware server URL. Format ``TCP:<hostname>:<port>`` cs_server_url: ChipScope server URL. Format ``TCP:<hostname>:<port>`` Keyword Arguments: disable_core_scan: Set True to completely disable core scanning during discover_and_setup_debug_cores bypass_version_check: Set True to change hw_server and cs_server version mismatch to warning instead of error xvc_mm_server_url: Url for the testing xvc memory map server - For special debug core testing use cases cable_timeout: Seconds before timing out when detecting devices on a jtag cable initial_device_scan: Do an initial device scan when opening session (lazy initialization otherwise) disable_cache: Control client caching (experimental) auto_connect: Automatically connect to server(s) when session is created cs_server_sharing: Enable reference count to share cs_server connections Returns: New session object. """ disable_core_scan = kwargs.get("disable_core_scan", False) bypass_version_check = kwargs.get("bypass_version_check", True) xvc_mm_server_url = kwargs.get("xvc_mm_server_url", None) cable_timeout = kwargs.get("cable_timeout", 4) initial_device_scan = kwargs.get("initial_device_scan", True) disable_cache = kwargs.get("disable_cache", False) auto_connect = kwargs.get("auto_connect", True) cs_server_sharing = kwargs.get("cs_server_sharing", False) # Create session even if there already exists a session with the same cs_server and hw_server # It *should* be safe. session = Session( cs_server_url=cs_server_url, hw_server_url=hw_server_url, xvc_mm_server_url=xvc_mm_server_url, disable_core_scan=disable_core_scan, bypass_version_check=bypass_version_check, cable_timeout=cable_timeout, disable_cache=disable_cache, initial_device_scan=initial_device_scan, cs_server_sharing=cs_server_sharing, ) if auto_connect: session.connect() return session
[docs]def delete_session(session: Session): """ Delete a session. Shuts down any open server connections. Args: session: The session object to delete """ session.disconnect()