Source code for chipscopy.api.ila.ila_waveform
# Copyright (C) 2021-2022, Xilinx, Inc.
# Copyright (C) 2022-2025, Advanced Micro Devices, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
import json
import sys
import csv
import zipfile
import re
from chipscopy.api.ila.ila_protocol_processing import Rule, TransactionSpec
from abc import abstractmethod
from collections import defaultdict, deque
from dataclasses import dataclass, asdict, field
from datetime import datetime
from io import TextIOBase, BytesIO, StringIO
from itertools import islice
from pprint import pformat
from typing import Generator, Dict, List, Union, Optional, Sequence, Any, Tuple, Iterable
from zipfile import ZipFile
from enum import Enum
from chipscopy.api.ila import ILABitRange, ILAProbeRadix
import chipscopy
import os
from chipscopy.shared.ila_util import bin_reversed_to_hex_values
from chipscopy.utils import Enum2StrEncoder
[docs]@dataclass
class ILAWaveformProbe:
"""Probe location in a data sample."""
name: str
"""Name of probe."""
map: str
"""Location string"""
map_range: List[ILABitRange]
"""List of bit ranges. See :class:`~chipscopy.api.ila.ILABitRange`"""
is_bus: bool
"""True for bus probes"""
bus_left_index: Optional[int] = None
""" Bus left index. E.g. 5 in probe ``counter[5:0]``"""
bus_right_index: Optional[int] = None
"""Bus right index. E.g. 0 in probe ``counter[5:0]``"""
display_radix: ILAProbeRadix = ILAProbeRadix.HEX
"""Display radix, when exporting waveform data. Default is ILAProbeRadix.HEX"""
enum_def: Optional[enum.EnumMeta] = None
"""Enum class defining {name:int} enum values, for this probe."""
def length(self) -> int:
return sum(mr.length for mr in self.map_range)
def bus_range_str(self) -> str:
if not self.is_bus:
res = ""
elif self.bus_left_index == self.bus_right_index:
res = f"[{self.bus_left_index}]"
else:
res = f"[{self.bus_left_index}:{self.bus_right_index}]"
return res
@dataclass
class ProbeGroup:
"""Represents a group of related probes in the waveform."""
name: str
probes: List[str] = field(default_factory=list)
description: Optional[str] = None
is_expanded: bool = True # For UI state
color: Optional[str] = None # For visual grouping
rules: List["Rule"] = field(default_factory=list)
rule_results: Dict[str, Dict[str, any]] = field(default_factory=dict) # Store results
enum_mappings: Dict[str, int] = None
channel_name: str = None
def add_probe(self, probe_name: str) -> None:
"""Add a probe to this group."""
if probe_name not in self.probes:
self.probes.append(probe_name)
def remove_probe(self, probe_name: str) -> None:
"""Remove a probe from this group."""
if probe_name in self.probes:
self.probes.remove(probe_name)
def add_rule(self, rule: "Rule") -> None:
"""Add a rule to this probe group."""
self.rules.append(rule)
def write_enum_mappings_to_text_file(
self, filename: str, enum_mappings: Dict[str, Dict[str, int]]
):
"""
Write enum mappings to a simple text file.
Args:
filename: Output filename
enum_mappings: Dictionary of signal names to their enum mappings
"""
output_lines = []
if enum_mappings is None:
return
# Sort by value
sorted_mapping = sorted(enum_mappings.items(), key=lambda x: x[1])
# Get bit width
max_value = max(enum_mappings.values()) if enum_mappings else 0
bit_width = max_value.bit_length() if max_value > 0 else 1
for label, value in sorted_mapping:
binary = format(value, f"0{bit_width}b")
if label == "":
display_label = "NOEVENTS"
else:
display_label = f"{label}"
output_lines.append(f"{binary} {display_label}")
with open(filename, "w") as f:
f.write("\n".join(output_lines))
print(f"Enum mappings written to {filename}")
def calculate_values(self, waveform: "ILAWaveform", slot: int = 0) -> None:
"""
Apply rules specific to this probe group and create interval signals.
"""
if not self.rules:
return
refactor_rules = False
if waveform.num_slots > 1:
refactor_rules = True
all_data = waveform.get_data()
for rule in self.rules:
# Create signal mapping only from probes in this group
signal_mapping = {}
for signal in rule.signals:
# Find matching probes within this group's probes
tsignal = signal
if refactor_rules:
tsignal = f"SLOT_{slot}_AXI/{signal}"
matching_probes = []
for probe_name in self.probes:
if probe_name.upper() == tsignal.upper():
matching_probes.append(probe_name)
if len(matching_probes) == 1:
signal_mapping[signal] = matching_probes[0]
elif len(matching_probes) > 1:
print(
f"Warning: Multiple probes in group '{self.name}' match '{signal}': {matching_probes}"
)
signal_mapping[signal] = matching_probes[0]
else:
print(f"Warning: No probe in group '{self.name}' found for signal '{signal}'")
break
# Skip if we couldn't find all required signals
if len(signal_mapping) != len(rule.signals):
continue
# Get signal data
signal_data = {
simple_name: all_data[full_name]
for simple_name, full_name in signal_mapping.items()
}
# Create context with helper methods
class Context:
def rising_edge(self, signal, idx):
if idx == 0:
return False
data = signal_data[signal]
return data[idx - 1] == 0 and data[idx] == 1
def both_asserted(self, sig1, sig2, idx):
return signal_data[sig1][idx] == 1 and signal_data[sig2][idx] == 1
def signal(self, signal, idx):
return signal_data[signal][idx]
context = Context()
# Build hits list
hits = []
for idx in range(waveform.sample_count):
try:
hits.append(rule.applies(context, idx))
except Exception as e:
print(
f"Error applying rule '{rule.name}' at index {idx} in group '{self.name}': {e}"
)
hits.append(False)
# Find intervals
intervals = []
start = None
for idx, fire in enumerate(hits):
if fire and start is None:
start = idx
elif not fire and start is not None:
intervals.append((start, idx - 1))
start = None
if start is not None:
intervals.append((start, waveform.sample_count - 1))
# Create manual interval signal for this rule
# Store results instead of creating signals
self.rule_results[rule.name] = {
"hits": hits, # Boolean array of where rule fires
"intervals": intervals, # List of (start, end) tuples
"signal_mapping": signal_mapping, # Which signals were used
"hit_count": sum(hits), # Total number of hits
"hit_indices": [i for i, h in enumerate(hits) if h], # Indices where rule fired
}
def get_rule_hits(self, rule_name: str) -> List[bool]:
"""Get the hit array for a specific rule."""
if rule_name in self.rule_results:
return self.rule_results[rule_name]["hits"]
return []
def get_rule_intervals(self, rule_name: str) -> List["Tuple"]:
"""Get the intervals where a specific rule fired."""
if rule_name in self.rule_results:
return self.rule_results[rule_name]["intervals"]
return []
def create_interval_signals(
self, waveform: "ILAWaveform", rule_names: List[str] = None, slot: int = 0
):
"""Create a single enum signal for this group combining all rules."""
"""Create a single combined enum signal for all rules in this probe group."""
# Get rule names from rule_results if not provided
if rule_names is None:
rule_names = list(self.rule_results.keys())
rules_to_process = rule_names
if not rules_to_process:
return
# Build enum mapping: each rule gets a unique bit
# Example: rule1=0b01, rule2=0b10, both=0b11, neither=0b00
enum_mapping = {"-": 0} # Empty string for no rules active
rule_bit_map = {}
# Assign each rule a unique bit position
for i, rule_name in enumerate(rules_to_process):
if rule_name in self.rule_results:
bit_value = 1 << i # 2^i
rule_bit_map[rule_name] = bit_value
enum_mapping[rule_name] = bit_value
# Generate all possible combinations
num_rules = len(rule_bit_map)
for combo in range(1, 2**num_rules): # Skip 0 (empty)
# Build label for this combination
active_rules = []
for rule_name, bit_value in rule_bit_map.items():
if combo & bit_value:
active_rules.append(rule_name)
if len(active_rules) > 1:
# Multiple rules active
label = " & ".join(active_rules)
enum_mapping[label] = combo
# Create values array for each sample
values = [""] * waveform.sample_count
# Move this to gtkw in export waveform
# For each sample, determine which rules are active
for idx in range(waveform.sample_count):
active_value = 0
active_rules = []
for rule_name in rules_to_process:
if rule_name not in self.rule_results:
continue
# Check if this rule fires at this index
hits = self.rule_results[rule_name]["hits"]
if idx < len(hits) and hits[idx]:
active_value |= rule_bit_map[rule_name]
active_rules.append(rule_name)
# Set the appropriate label
if active_value == 0:
values[idx] = "-"
elif len(active_rules) == 1:
values[idx] = active_rules[0]
else:
values[idx] = " & ".join(active_rules)
# Create the combined signal
if waveform.num_slots > 1:
signal_name = f"SLOT_{slot}_{self.name}_Events"
else:
signal_name = f"{self.name}_Events"
self.channel_name = signal_name
self.enum_mappings = enum_mapping
waveform.append_manual_enum_signal(signal_name, values, enum_mapping)
def __len__(self) -> int:
return len(self.probes)
@dataclass
class Transaction:
kind: str # "read" | "write"
id: Optional[int] # ARID/AWID (0 if not present)
addr: Optional[int] # ARADDR/AWADDR (int if available)
start_idx: int
probes: Dict[str, Any] = field(default_factory=dict) # captured open fields
slot: int = 0
end_idx: Optional[int] = None
resp: Optional[int] = None
channel_name: Optional[str] = None # e.g. "AXI_ID_2_Write
enum: Dict[int, str] = field(default_factory=lambda: {0: "-", 1: "ACTIVE"})
meta: Dict[str, Any] = field(default_factory=defaultdict)
@dataclass
class TopTransactionAssembler:
specs: List["TransactionSpec"]
slot_indices: Optional[List[int]] = None
name_prefix: str = "AXI"
# per-slot assemblers and results
assemblers: List["SimpleTransactionAssembler"] = field(default_factory=list)
transactions_by_slot: Dict[int, List["Transaction"]] = field(default_factory=dict)
def prepare(self, waveform: "ILAWaveform"):
# Determine slots from waveform if not provided
waveform.transactionAssembler = self
if self.slot_indices is None:
num_slots = getattr(waveform, "num_slots", 1) or 1
self.slot_indices = list(range(num_slots))
# Create per-slot assemblers once
if not self.assemblers:
for s in self.slot_indices:
self.assemblers.append(SimpleTransactionAssembler(self.specs, slot=s))
def calculate(self, waveform: "ILAWaveform") -> Dict[int, List["Transaction"]]:
self.prepare(waveform)
self.transactions_by_slot.clear()
for asm in self.assemblers:
txns = asm.calculate_values(waveform)
self.transactions_by_slot[asm.slot] = txns
return self.transactions_by_slot
def append_lanes(self, waveform: "ILAWaveform", separate_by_kind: bool = True):
# Append per-slot lanes to the waveform
for asm in self.assemblers:
# Set a per-slot prefix to avoid name collisions between slots
prefix = f"{self.name_prefix}_SLOT{asm.slot}"
asm.append_per_id_binary_lanes(waveform, name_prefix=prefix)
def write_gtkw_translate(
self, vcd_path: str, out_dir: str, separate_by_kind: bool = True
) -> List[str]:
"""Write per-slot translate tables + .gtkw files, returns list of file paths."""
written = []
os.makedirs(out_dir, exist_ok=True)
for asm in self.assemblers:
prefix = f"{self.name_prefix}_SLOT{asm.slot}"
gtkw = asm.write_gtkw_with_translate(
out_dir=out_dir,
name_prefix=prefix,
)
written.append(gtkw)
return written
def all_transactions(self) -> List["Transaction"]:
# Flatten across slots if needed
result = []
for _, lst in sorted(self.transactions_by_slot.items()):
result.extend(lst)
return result
def get_transactions_by_slot(self) -> Dict[int, List["Transaction"]]:
# Flatten across slots if needed
return self.transactions_by_slot
class SimpleTransactionAssembler:
"""
Evaluates each TransactionSpec over the waveform and pairs opens->closes
using a per-ID FIFO. Ignores counters/overflows for now.
"""
def __init__(self, specs: List[TransactionSpec], slot: int = 0):
self.specs = specs
self.transactions: List[Transaction] = []
self.slot = slot
def _bind_sigmap(self, all_data: Dict[str, List], names: List[str]) -> Optional[Dict[str, str]]:
# Case-insensitive exact-key binding: "ARID" -> actual key in all_data
sigmap: Dict[str, str] = {}
for n in names:
# gather candidates
cands = [k for k in all_data.keys() if k.upper() == n.upper()]
if not cands:
return None
sigmap[n] = cands[0]
return sigmap
@staticmethod
def _to_int(val) -> Optional[int]:
if val is None:
return None
if isinstance(val, str):
try:
return int(val, 16) if val.lower().startswith("0x") else int(val)
except Exception:
return None
try:
return int(val)
except Exception:
return None
def append_per_id_binary_lanes(
self,
waveform: "ILAWaveform",
name_prefix: str = None,
):
"""
For each ID, create a 0/1 lane indicating transaction activity over time.
If separate_by_kind=True, create distinct lanes per (ID, kind).
"""
if not self.transactions:
return
n = waveform.sample_count
base_name = name_prefix or "AXI"
# Collect IDs (and kinds if requested)
groups = {}
for t in self.transactions:
if t.end_idx is None:
continue
key = (t.id or 0, t.kind)
groups.setdefault(key, []).append(t) # Enum mapping for binary 0/1
for (cur_id, cur_kind), txns in groups.items():
# Sort by start time, then end time for stability
txns.sort(
key=lambda x: (x.start_idx, x.end_idx if x.end_idx is not None else x.start_idx)
)
# Greedy packing into non-overlapping lanes
lane_last_end = [] # last end index for each lane
lanes = [] # list of lists of transactions per lane
for t in txns:
placed = False
for li, last_end in enumerate(lane_last_end):
# Place in the first lane that doesn't overlap (strictly after last end)
if t.start_idx > last_end:
lanes[li].append(t)
lane_last_end[li] = t.end_idx
placed = True
break
if not placed:
lanes.append([t])
lane_last_end.append(t.end_idx)
# Emit one enum lane per lane index
for li, lane_txns in enumerate(lanes):
lane_name = f"{base_name}_ID{cur_id}_{cur_kind.capitalize()}_{li}"
values = ["-"] * n
enum_map = {"-": 0}
next_code = 1
for t in lane_txns:
addr_str = f"0x{t.addr:X}" if isinstance(t.addr, int) else "N/A"
resp_str = str(t.resp) if t.resp is not None else "N/A"
label = f"{t.kind.upper()} ID={t.id} ADDR={addr_str} RESP={resp_str}"
if label not in enum_map:
enum_map[label] = next_code
next_code += 1
# Optionally attach mapping and lane name back to the txn
t.enum = {code: lab for lab, code in enum_map.items()}
t.channel_name = lane_name
s = max(0, t.start_idx)
e = min(n - 1, t.end_idx)
for i in range(s, e + 1):
values[i] = label
waveform.append_manual_enum_signal(lane_name, values, enum_map)
def calculate_values(self, waveform: "ILAWaveform") -> List[Transaction]:
from collections import defaultdict, deque
# if waveform.num_slots >
#
# else:
pgs = (
waveform.probe_groups[self.slot] if waveform.num_slots > 1 else waveform.probe_groups[0]
)
probes = []
# Iterate through all groups in that slot
for group_name, probe_group in pgs.items():
# process each probe_group
probes.extend(probe_group.probes)
data = waveform.get_data(probes)
N = waveform.sample_count
txns_all: List[Transaction] = []
def bind(names: list[str]) -> dict[str, str] | None:
m = {}
for n in names:
if waveform.num_slots > 1:
pattern = re.compile(rf"(?<=/|_){re.escape(n)}", re.IGNORECASE)
ks = [k for k in data if pattern.search(k)]
else:
ks = [k for k in data if k.upper() == n.upper()]
if not ks:
continue
m[n] = ks[0]
if len(m) == 0:
return None
return m
def make_ctx(sigmap: dict[str, str]):
class Ctx:
def rising_edge(self, s, i):
if i == 0:
return False
if s not in sigmap:
return False
d = data[sigmap[s]]
return d[i - 1] == 0 and d[i] == 1
def both_asserted(self, a, b, i):
if a not in sigmap or b not in sigmap:
return False
return data[sigmap[a]][i] == 1 and data[sigmap[b]][i] == 1
def signal(self, s, i):
key = sigmap.get(s)
if key is None:
return None
values = data.get(key)
if values is None or i < 0 or i >= len(values):
return None
return values[i]
return Ctx()
def to_int(v):
if v is None:
return None
if isinstance(v, str):
v = v.strip()
if v.lower().startswith("0x"):
try:
return int(v, 16)
except:
return None
try:
return int(v)
except:
return None
for spec in self.specs:
open_cnt_name = "AR_CNT" if spec.txn_type == "read" else "AW_CNT"
close_cnt_name = "R_CNT" if spec.txn_type == "read" else "B_CNT"
overflowed = False
# length
next_ticket: Dict[int, int] = defaultdict(int)
pending_by_id_ticket: Dict[int, Dict[int, Transaction]] = defaultdict(dict)
# Build signal maps for open/close including fields you’ll read
open_need = list(
{
*spec.open_rule.signals,
spec.open_addr_field,
*spec.open_fields,
open_cnt_name,
}
)
if spec.open_id_field:
open_need.append(spec.open_id_field)
# Everything the close rule needs, plus fields we want at close (ID + resp if present)
close_need = list(
{
*spec.close_rule.signals,
spec.close_resp_field,
*spec.close_fields,
close_cnt_name,
}
)
if spec.close_id_field:
close_need.append(spec.close_id_field)
open_map = bind(open_need)
close_map = bind(close_need)
if open_map is None or close_map is None:
continue
open_ctx = make_ctx(open_map)
close_ctx = make_ctx(close_map)
has_open_cnt = open_cnt_name in open_map
has_close_cnt = close_cnt_name in close_map
def derive_overflow_thresholds(width: int):
return (1 << (width - 1)), (1 << width) - 1
# Per-ID FIFO for this spec
pending: dict[int, deque[Transaction]] = defaultdict(deque)
completed: List[Transaction] = []
for i in range(N):
# Open: if rule fires, enqueue a Transaction with start_idx and fields
if spec.open_rule.applies(open_ctx, i):
tid = to_int(open_ctx.signal(spec.open_id_field, i)) or 0
addr = to_int(open_ctx.signal(spec.open_addr_field, i))
resp = (
(to_int(close_ctx.signal(spec.close_resp_field, i)) == 0)
if spec.close_resp_field
else None
)
ticket = next_ticket[tid]
pending_by_id_ticket[tid][ticket] = Transaction(
kind=spec.txn_type, id=tid, addr=addr, start_idx=i
)
next_ticket[tid] += 1
pending[tid].append(
Transaction(kind=spec.txn_type, id=tid, addr=addr, start_idx=i)
)
# Close: if rule fires, pop from that ID’s queue and finalize
if spec.close_rule.applies(close_ctx, i):
tid = to_int(close_ctx.signal(spec.close_id_field, i)) or 0
resp = (
(to_int(close_ctx.signal(spec.close_resp_field, i)) == 0)
if spec.close_resp_field
else None
)
# I think 00 means good response.
t = None
if has_close_cnt:
cc = to_int(close_ctx.signal(close_cnt_name, i))
if cc is not None:
# Oldest outstanding ticket (the one completing now)
ticket_to_close = next_ticket[tid] - cc
t = pending_by_id_ticket[tid].pop(ticket_to_close, None)
# Fallback to FIFO if counters not present or lookup failed
if t is None:
dq = pending[tid]
if dq:
t = dq.popleft()
if t:
t.end_idx = i
t.resp = resp
t.slot = self.slot
t.probes = (open_map or {}) | (close_map or {})
completed.append(t)
else:
# Orphan close; ignore or record a violation if desired
pass
txns_all.extend(completed)
self.transactions = txns_all
# self.append_per_id_binary_lanes(waveform, "SLOT_" + str(self.slot) if waveform.num_slots > 1 else "AXI")
return txns_all
def write_gtkw_with_translate(
self,
out_dir: str = ".",
name_prefix: str = "AXI",
) -> str:
"""
Write translate tables (.tt) and a .gtkw savefile for GTKWave that
pipes each lane signal through its translate table.
Args:
transactions: list of Transaction; defaults to self.transactions.
vcd_path: path to the VCD that contains the lane signals.
out_dir: directory to write .tt files and the .gtkw file.
name_prefix: lane name prefix (e.g., "AXI4MM" or "AXI4MM_SLOT2").
separate_by_kind: if True and channel_name missing, synthesize lanes per (ID, kind).
gtkw_path: optional explicit output path for the .gtkw; defaults to "<out_dir>/<name_prefix>.gtkw".
hierarchy_prefix: optional VCD hierarchy prefix (e.g., "top/dut/") to prepend to lane names in .gtkw.
Returns:
Absolute path to the generated .gtkw file.
"""
lanes: Dict[str, Dict[int, str]] = {}
# Helper to merge a single transaction into the lanes dict
def _merge_txn_lane(t):
lane_name = getattr(t, "channel_name", None)
enum_map = getattr(t, "enum", None) # expected code -> label
if not lane_name or not enum_map:
return
lane_map = lanes.setdefault(lane_name, {})
# Merge codes; keep first-seen label for determinism
for code, label in enum_map.items():
lane_map.setdefault(code, label)
# 1) Flatten transactions across slots (or fall back to asm.transactions)
for t in self.transactions:
_merge_txn_lane(t)
# 2) Normalize and attach to target_columns once per lane
if not lanes:
return []
# 2) Write one .tt per lane in the desired format
written_files: List[str] = []
for lane_name, code_to_label in sorted(lanes.items()):
# Ensure code 0 exists for idle
if 0 not in code_to_label:
code_to_label[0] = "-"
# Sort by code (value) ascending
items = sorted(code_to_label.items(), key=lambda kv: kv[0])
# Compute bit width from the max code
max_code = items[-1][0] if items else 0
bit_width = max_code.bit_length() if max_code > 0 else 1
# Build lines in the requested format
output_lines: List[str] = []
for code, label in items:
binary = format(code, f"0{bit_width}b")
# Use "'<label>" and map empty to NOEVENTS (or '-' if you prefer strictly '-')
display_label = f"'{label if label != '' else '-'}"
output_lines.append(f"{binary} {display_label}")
# File name per lane (change extension if you prefer .gtkw for a GTKW savefile)
tt_path = os.path.abspath(os.path.join(out_dir, f"{lane_name}.gtkw"))
with open(tt_path, "w") as f:
f.write("\n".join(output_lines))
written_files.append(tt_path)
return written_files
[docs]@dataclass
class ILAWaveform:
"""Waveform data, with data probe information."""
width: int
"""Sample bit width."""
sample_count: int
"""Number of data samples."""
trigger_position: List[int]
"""Trigger position index, for each data window."""
window_size: int
"""Number of samples in a window."""
probes: Dict[str, ILAWaveformProbe]
"""Dict of {probe name, waveform probe} See :class:`ILAWaveformProbe`"""
data: bytearray
"""
Waveform data.
Samples are aligned on byte boundary.
This formula can be used to read a bit from the data:
::
bytes_per_sample = len(data) // sample_count
def get_bit_value(data: bytearray, bytes_per_sample: int,
sample_index: int, data_bit_index: int) -> bool:
byte_value = data[sample_index * bytes_per_sample + data_bit_index // 8]
mask = 1 << (data_bit_index & 0x7)
return (byte_value & mask) != 0
"""
gap_index: Optional[int] = None
"""
None or 0, if the waveform has no gaps.
If the value is >0, one sample bit is reserved to indicate which samples are gaps,
i.e. the samples with unknown values. 'gap_index' gives the bit location within the sample data.
"""
probe_groups: List[Dict[str, ProbeGroup]] = field(default_factory=list)
transactionAssembler: Optional[TopTransactionAssembler] = None
num_slots: int = 1
def bytes_per_sample(self) -> int:
# CR-1244881 - partial window captures are not framed correctly.
# Changed so that all windows (partial or full) use the full window size
# when calculating the offset for probes in a buffer. The ILA core
# uploads an entire buffer regardless of partial of full window, so
# alignment does not change for the last partial.
# old_result = len(self.data) // self.sample_count
assert len(self.trigger_position) > 0
assert self.window_size > 0
result = len(self.data) // (len(self.trigger_position) * self.window_size)
return result
def _leaf_name(self, name: str) -> str:
# Safely extract the last path segment and uppercase it for matching
return name.split("/")[-1].upper()
def _match_prefix(self, leaf: str, prefixes: Sequence[str]) -> Optional[str]:
leaf = leaf.upper()
for pfx in prefixes:
if leaf.startswith(pfx.upper()):
return pfx # group name == prefix
return None
def _extract_slot_id(self, probe_name: str) -> Optional[int]:
"""
Extract slot ID from probe name of the form:
SLOT_<n>_.../...
Returns the integer slot id if found, else None.
"""
# Example: "SLOT_4_AXI_AWVALID" -> slot 4
if "cnt" in probe_name.lower():
m = re.search(r"SLOT_(\d+)", probe_name.upper())
else:
m = re.search(r"SLOT_(\d+)", probe_name)
return int(m.group(1)) if m else None
def _create_probe_groups(
self,
prefixes: Optional[Sequence[str]] = None,
rules: Optional[List["Rule"]] = None,
descriptions: Optional[Dict[str, str]] = None,
create_misc_group: bool = False,
misc_group_name: str = "MISC",
slot: int = 0,
) -> Dict[str, "ProbeGroup"]:
"""
Create probe groups based on a list of signal name prefixes for a specific slot.
Each prefix becomes a group. The first matching prefix (by list order) wins.
If self.num_slots > 1, only probes whose hierarchical name indicates the
specified slot (e.g., 'SLOT_<slot>_.../LEAF') are included.
Args:
prefixes: List/sequence of prefixes, e.g., ["AW", "W", "B", "AR", "R"].
If None, defaults to AXI channels.
rules: Optional list of Rule objects; assigned by the prefix match of the
rule's first signal. Rules are included only if their first signal
is from the specified slot.
descriptions: Optional mapping of group name -> description string.
create_misc_group: If True, unmatched signals/rules are placed in a MISC group.
misc_group_name: Name for the MISC group.
slot: Slot index to build groups for.
Returns:
Dictionary of non-empty ProbeGroup objects keyed by group name for the given slot.
"""
# Default to AXI channel-style prefixes
if prefixes is None:
prefixes = ["AW", "W", "B", "AR", "R"]
# Initialize groups (group name == prefix)
groups: Dict[str, ProbeGroup] = {}
for pfx in prefixes:
desc = (descriptions or {}).get(pfx, f"{pfx} Group")
groups[pfx] = ProbeGroup(pfx, description=desc)
# Populate groups with probes for the specified slot
for probe_name in self.probes.keys():
# If multiple slots exist, filter probes by the requested slot
# Use leaf name for prefix matching (e.g., "AWVALID")
if self.num_slots > 1 and self._extract_slot_id(probe_name) != slot:
continue
leaf = self._leaf_name(probe_name)
if "cnt" in probe_name.lower():
parts = leaf.split("_")
leaf = "_".join(parts[-2:])
else:
leaf = leaf.rsplit("_", 1)[-1]
group_key = self._match_prefix(leaf, prefixes)
# Example: "SLOT_4_AXI_AWVALID" -> slot 4
if group_key is not None:
groups[group_key].add_probe(probe_name)
# Attach rules, if any (only rules whose first signal is in this slot)
if rules:
for rule in rules:
group_key = None
if getattr(rule, "signals", None):
first_signal = rule.signals[0]
first_signal_leaf = self._leaf_name(first_signal)
group_key = self._match_prefix(first_signal_leaf, prefixes)
if group_key is not None and group_key in groups:
groups[group_key].add_rule(rule)
# Build result with only non-empty groups
result: Dict[str, ProbeGroup] = {
name: group for name, group in groups.items() if len(group) > 0
}
# Calculate values for groups that have rules
for group in result.values():
if getattr(group, "rules", None) and len(group.rules) > 0:
group.calculate_values(self, slot=slot)
# breakpoint()
return result
def create_protocol_waveforms(
self, rules: Dict[str, List[Rule]], probe_prefixes: List[str] = None
) -> None:
if not self.enable_experimental:
return
for i in range(self.num_slots):
self.probe_groups.append(
self._create_probe_groups(prefixes=probe_prefixes, rules=rules, slot=i)
)
for i in range(len(self.probe_groups)):
for group in self.probe_groups[i].values():
group.create_interval_signals(self, slot=i)
def create_transactions(
self, specs: List["TransactionSpec"], name_prefix: str
) -> Dict[int, List["Transaction"]]:
if self.enable_experimental:
top = TopTransactionAssembler(specs=specs, name_prefix=name_prefix)
# Decode all slots
txns_by_slot = top.calculate(self)
# Render per-slot lanes
top.append_lanes(self, separate_by_kind=True)
return txns_by_slot
else:
return []
[docs] def append_manual_enum_signal(
self, name: str, values: list[str], mapping: dict[str, int]
) -> None:
"""
Append a multi‐bit manual signal to an ILAWaveform, using an Enum display.
Now supports empty‐string labels by automatically giving them a valid
enum member name under the hood.
"""
# sanity checks
if len(values) != self.sample_count:
raise ValueError(f"{name}: must supply one value per sample ({self.sample_count})")
missing = set(values) - set(mapping)
if missing:
raise KeyError(f"{name}: no mapping for {missing}")
# -- step 1: build a "sanitized" mapping that Enum() will accept --
# we also keep a reverse‐map so we can show the original strings.
enum_to_display: dict[str, str] = {}
sanitized_map: dict[str, int] = {}
for label, code in mapping.items():
# pick a valid Python identifier for the enum member
if label:
member_name = label
else:
# e.g. '' → '_EMPTY', or anything else that isn't a valid name
member_name = f"VAL_{code}"
sanitized_map[member_name] = code
enum_to_display[member_name] = label
# determine code width
max_code = max(sanitized_map.values())
bit_width = max_code.bit_length() or 1
# assign bit‐range
old_width = self.width
bit_index = old_width
self.width = old_width + bit_width
# RESIZE THE DATA BUFFER TO ACCOMMODATE NEW BITS
old_bps = self.bytes_per_sample()
new_bps = (self.width + 7) // 8 # Calculate new bytes per sample
if new_bps > old_bps:
# Need to expand the data buffer
new_data = bytearray(new_bps * self.sample_count)
# Copy old data sample by sample
for i in range(self.sample_count):
old_start = i * old_bps
old_end = old_start + old_bps
new_start = i * new_bps
new_data[new_start : new_start + old_bps] = self.data[old_start:old_end]
self.data = new_data
# create the Enum class with the format expected by decode_waveform_from_json
enum_name = f"{name}_Enum"
EnumDef = Enum(enum_name, sanitized_map)
# Store the enum in the format that can be serialized/deserialized correctly
# The JSON decoder expects: ["EnumName", {"member1": value1, "member2": value2}]
enum_def_serializable = [enum_name, sanitized_map]
# build the probe
probe = ILAWaveformProbe(
name=name,
map="manual",
map_range=[ILABitRange(bit_index, bit_width)],
is_bus=True,
bus_left_index=bit_width - 1,
bus_right_index=0,
display_radix=ILAProbeRadix.ENUM,
enum_def=EnumDef, # Store the actual Enum class
)
# Store serialization info and display map for later use
probe._enum_def_serializable = enum_def_serializable # For JSON export
probe._display_map = enum_to_display # For showing original labels
self.probes[name] = probe
# inject the bits - now use the new bytes_per_sample
bps = self.bytes_per_sample() # This will return the NEW bps
for i, sval in enumerate(values):
# find the sanitized member name that produced this label
for member_name, disp in enum_to_display.items():
if disp == sval:
code = sanitized_map[member_name]
break
else:
# should never happen—sanity check
raise AssertionError(f"no enum member for {sval!r}")
base = i * bps
for bit in range(bit_width):
byte_off = (bit_index + bit) // 8
mask = 1 << ((bit_index + bit) % 8)
idx = base + byte_off
if (code >> bit) & 1:
self.data[idx] |= mask
else:
self.data[idx] &= ~mask
def get_window_count(self) -> int:
return len(self.trigger_position)
[docs] def set_sample(self, sample_index: int, sample: bytearray) -> None:
"""Sample may have more bytes than waveform samples have. Erase any gap bit."""
sample_byte_count = self.bytes_per_sample()
copy_byte_count = min(sample_byte_count, len(sample))
start = sample_byte_count * sample_index
self.data[start : start + copy_byte_count] = sample[0:copy_byte_count]
if not self.gap_index:
return
gap_byte_index, gap_bit_index = divmod(self.width, 8)
mask = 0xFF ^ (1 << gap_bit_index)
self.data[start + gap_byte_index] &= mask
[docs] def export_waveform(
self,
export_format: str = "CSV",
fh_or_filepath: Union[TextIOBase, BytesIO, str] = sys.stdout,
probe_names: Optional[List[str]] = None,
start_window_idx: int = 0,
window_count: Optional[int] = None,
start_sample_idx: int = 0,
sample_count: Optional[int] = None,
include_gap: bool = False,
compression: int = zipfile.ZIP_DEFLATED,
compresslevel=None,
) -> None:
"""
Export a waveform in CSV, VCD or CITF format, to a file or in-memory buffer.
By default, all samples for all probes are exported, but it is
possible to select which probes and window/sample ranges for CSV/VCD formats.
================================ ======================== ==============================
Argument/Parameter Type Supported by Export Format
================================ ======================== ==============================
export_format str CSV, VCD, CITF
fh_or_filepath TextIOBase CSV, VCD
fh_or_filepath BytesIO CITF
fh_or_filepath str CSV, VCD, CITF
probe_names List[str] CSV, VCD
start_window int CSV, VCD
start_sample_idx int CSV, VCD
sample_count int CSV, VCD
include_gap bool CSV, VCD
include_gap bool CSV, VCD
compression int CITF
compresslevel int CITF
================================ ======================== ==============================
Args:
export_format (str): Alternatives for output format.
- 'CSV' - Comma Separated Value Format. Default.
- 'VCD' - Value Change Dump.
- 'CITF' - ChipScoPy ILA Trace Format. Export of a whole ILA waveform to a compressed archive.
fh_or_filepath (TextIOBase, BytesIO, str): File object handle or filepath string. Default is `sys.stdout`.
If the argument is a file object, closing and opening the file is the responsibility of the caller.
If argument is a string, the file will be opened and closed by the function.
probe_names (Optional[List[str]]): List of probe names. Default 'None' means export all probes.
start_window_idx (int): Starting window index. Default is first window.
window_count (Optional[int]): Number of windows to export. Default is all windows.
start_sample_idx (int): Starting sample within window. Default is first sample.
sample_count (Optional[int]): Number of samples per window. Default is all samples.
include_gap (bool): Default is False. Include the pseudo "gap" 1-bit probe in the result.
compression: Default is zipfile.ZIP_DEFLATED. See zipfile.ZipFile at https://docs.python.org/.
compresslevel: See zipfile.ZipFile at https://docs.python.org/.
"""
if export_format.upper() == "CITF":
export_compressed_waveform(self, fh_or_filepath, compression, compresslevel)
return
if export_format.upper() != "VCD" and export_format.upper() != "CSV":
raise ValueError(
f'ILAWaveform.export() called with unknown export_format:"{export_format}"'
"Supported export formats are VCD, CSV and CITF."
)
if isinstance(fh_or_filepath, str):
with open(fh_or_filepath, "w", buffering=16384) as fh:
export_waveform_to_stream(
self,
export_format,
fh,
probe_names,
start_window_idx,
window_count,
start_sample_idx,
sample_count,
include_gap,
)
if export_format.upper() == "CSV" and len(self.probe_groups) > 0:
self.apply_probe_group_enums_to_csv(fh_or_filepath, strict=True)
elif export_format.upper() == "VCD" and len(self.probe_groups) > 0:
for pg in self.probe_groups[
0
].values(): # This will have to be later updated. Just too clunky to be used rn.
pg.write_enum_mappings_to_text_file(f"{pg.name}.gtkw", pg.enum_mappings)
if self.transactionAssembler:
self.transactionAssembler.write_gtkw_translate(
vcd_path=fh_or_filepath, out_dir="gtkw_files", separate_by_kind=True
)
return
else:
export_waveform_to_stream(
self,
export_format,
fh_or_filepath,
probe_names,
start_window_idx,
window_count,
start_sample_idx,
sample_count,
include_gap,
)
if export_format.upper() == "CSV" and len(self.probe_groups) > 0:
self.apply_probe_group_enums_to_csv(fh_or_filepath, strict=True)
elif export_format.upper() == "VCD" and len(self.probe_groups) > 0:
for pg in self.probe_groups[0].values():
pg.write_enum_mappings_to_text_file(f"{pg.name}.gtkw", pg.enum_mappings)
if self.transactionAssembler:
self.transactionAssembler.write_gtkw_translate(
vcd_path=fh_or_filepath, out_dir="gtkw_files", separate_by_kind=True
)
[docs] def apply_probe_group_enums_to_csv(
self,
csv_in_path: str,
strict: bool = False,
) -> None:
"""
Post-process a CSV: replace numeric values in ProbeGroup signal columns with enum labels.
Args:
csv_in_path: Path to the input CSV (already exported).
csv_out_path: Path to write the updated CSV. If None, overwrites csv_in_path.
probe_groups: Dict of ProbeGroup where each group has:
- signal_name: str - the column header to target
- enum_mappings: Dict[str, int] - mapping label -> code
idle_label: Label used when a mapping resolves to an empty string or missing label.
strict: If True, raises on missing columns or unparsable numeric values. Otherwise logs and passes through.
Behavior:
- Keeps comment lines starting with '#' intact.
- Finds the header row (first non-comment row) and replaces values only in columns whose header matches
a group's signal_name.
- Expected CSV cell values are integers (decimal or 0x-prefixed hex). If a value is already non-numeric,
it will be left as-is unless strict=True.
"""
csv_out_path = csv_in_path # overwrite
# 1) Read all lines
with open(csv_in_path, "r", newline="") as fh:
lines = fh.readlines()
# 2) Identify header line (first non-comment)
header_idx = None
for i, line in enumerate(lines):
if not line.lstrip().startswith("#"):
header_idx = i
break
if header_idx is None:
raise ValueError("No header row found (no non-comment lines).")
# 3) Parse header
header = next(csv.reader([lines[header_idx]]))
col_index_by_name = {name: idx for idx, name in enumerate(header)}
# 4) For each ProbeGroup, find target column and build reverse mapping (code -> label)
# Also collect display override if ProbeGroup stores it (optional).
target_columns: Dict[int, "Tuple"[str, Dict[int, str]]] = {}
def _find_col_index_by_prefix(header: list[str], signal_name: str) -> Optional[int]:
# Exact match first
if signal_name in col_index_by_name:
return col_index_by_name[signal_name]
# Case-insensitive startswith fallback
sig_lower = signal_name.lower()
matches = [idx for idx, name in enumerate(header) if name.lower().startswith(sig_lower)]
if len(matches) == 1:
return matches[0]
elif len(matches) > 1:
# If multiple candidates, prefer the shortest name or the one equal ignoring case
exact_ci = [idx for idx, name in enumerate(header) if name.lower() == sig_lower]
if exact_ci:
return exact_ci[0]
# Otherwise pick the shortest match deterministically
shortest = min(matches, key=lambda idx: len(header[idx]))
return shortest
return None
for slot_group in self.probe_groups:
for group_name, pg in slot_group.items():
signal_name = getattr(pg, "channel_name", None)
enum_mapping = getattr(pg, "enum_mappings", None) # label -> code
if not signal_name or not enum_mapping:
continue
col_idx = _find_col_index_by_prefix(header, signal_name)
if col_idx is None:
if strict:
raise KeyError(f"Signal column '{signal_name}' not found in CSV header.")
else:
# Skip this group if column not present
continue
# Invert mapping: code -> label
code_to_label = {code: label for label, code in enum_mapping.items()}
target_columns[col_idx] = (signal_name, code_to_label)
lanes: Dict[str, Dict[int, str]] = {}
if self.transactionAssembler is not None:
# Aggregate per-lane enum mappings from all transactions (across slots)
lanes: Dict[str, Dict[int, str]] = {}
asm = self.transactionAssembler
txns_by_slot = getattr(asm, "transactions_by_slot", None)
# Helper to merge a single transaction into the lanes dict
def _merge_txn_lane(t):
lane_name = getattr(t, "channel_name", None)
enum_map = getattr(t, "enum", None) # expected code -> label
if not lane_name or not enum_map:
return
lane_map = lanes.setdefault(lane_name, {})
# Merge codes; keep first-seen label for determinism
for code, label in enum_map.items():
lane_map.setdefault(code, label)
# 1) Flatten transactions across slots (or fall back to asm.transactions)
if isinstance(txns_by_slot, dict):
for slot, txns in txns_by_slot.items():
for t in txns:
_merge_txn_lane(t)
else:
for t in getattr(asm, "transactions", []):
_merge_txn_lane(t)
# 2) Normalize and attach to target_columns once per lane
for lane_name, code_to_label in lanes.items():
# Ensure a code 0 mapping exists (choose label convention to match your CSV tools)
if 0 not in code_to_label:
code_to_label[0] = "-" # or "-" if that’s what your exporter expects
col_idx = _find_col_index_by_prefix(header, lane_name)
if col_idx is not None:
target_columns[col_idx] = (lane_name, code_to_label)
elif strict:
raise KeyError(
f"Transaction lane column '{lane_name}' not found in CSV header."
)
if not target_columns:
# No work to do; just copy file
with open(csv_out_path, "w", newline="") as outfh:
outfh.writelines(lines)
return
# 5) Helper: parse numeric value (supports int or hex like 0x..)
def parse_code(val: str) -> Optional[int]:
s = val.strip()
if s == "":
return None
try:
return int(s, 16)
except ValueError:
return None
# 6) Write out modified CSV
with open(csv_out_path, "w", newline="") as outfh:
# Copy any leading comment lines unchanged
for i in range(header_idx):
outfh.write(lines[i])
writer = csv.writer(outfh)
writer.writerow(header)
# Process rows after header
for raw_line in lines[header_idx + 1 :]:
# Keep empty or comment-like lines intact
if not raw_line.strip():
outfh.write(raw_line)
continue
row = next(csv.reader([raw_line]))
# Ensure row has at least as many columns as header (pad if necessary)
if len(row) < len(header):
row += [""] * (len(header) - len(row))
# For each target column, map numeric code -> text label
for col_idx, (signal_name, code_to_label) in target_columns.items():
cell = row[col_idx]
code = parse_code(cell)
# if col_idx == 45 and cell == '0A':
# breakpoint()
if code is None:
# If non-numeric leave as-is.
# if col_idx == 45:
# breakpoint()
continue
label = code_to_label.get(code, "")
if label == "":
label = "-"
row[col_idx] = label
writer.writerow(row)
@staticmethod
def import_waveform(
import_format: str,
filepath_or_buffer: Union[str, BytesIO],
):
"""
Create an ILAWaveform object from a ChipScoPy ILA Trace Format (CITF) compressed archive.
The archive must contain these two files:
- waveform.cfg, waveform and probe meta information.
- waveform.data, binary waveform samples.
Args:
import_format (str): Format "CITF" is supported.
filepath_or_buffer (str, BytesIO): Filepath string or in-memory buffer.
Returns (ILAWaveform):
Waveform object.
"""
if import_format.upper() != "CITF":
raise (
f'import_waveform command called with import_format "{import_format}".'
f' Only "CITF" format is supported.'
)
return import_compressed_waveform(filepath_or_buffer)
[docs] def get_data(
self,
probe_names: Optional[List[str]] = None,
start_window_idx: int = 0,
window_count: Optional[int] = None,
start_sample_idx: int = 0,
sample_count: Optional[int] = None,
include_trigger: bool = False,
include_sample_info: bool = False,
include_gap: bool = False,
) -> Dict[str, List[int]]:
"""
Get probe waveform data as a list of int values for each probe.
By default, all samples for all probes are included in return data,
but it is possible to select which probes and window/sample ranges.
Args:
probe_names (Optional[List[str]]): List of probe names. Default 'None' means export all probes.
start_window_idx (int): Starting window index. Default is first window.
window_count (Optional[int]): Number of windows to export. Default is all windows.
start_sample_idx (int): Starting sample within window. Default is first sample.
sample_count (Optional[int]): Number of samples per window. Default is all samples.
include_trigger (bool): Include pseudo probe with name '__TRIGGER' in result. Default is False.
include_sample_info (bool): Default is False. Include the following pseudo probes in result:
- '__SAMPLE_INDEX' - Sample index
- '__WINDOW_INDEX' - Window index.
- '__WINDOW_SAMPLE_INDEX' - Sample index within window.
include_gap (bool): Default is False. If True, include the pseudo probe '__GAP' in result. \
Value 1 for a gap sample. Value 0 for a regular sample.
Returns (Dict[str, List[int]]):
Ordered dict, in order:
- '__TRIGGER', if argument **include_trigger** is True
- '__SAMPLE_INDEX', if argument **include_sample_info** is True
- '__WINDOW_INDEX', if argument **include_sample_info** is True
- '__WINDOW_SAMPLE_INDEX', if argument **include_sample_info** is True
- '__GAP', if argument **include_gap** is True
- probe values in order of argument **probe_names**.
Dict key: probe name. Dict value is list of int values, for a probe.
"""
return get_waveform_data_values(
self,
probe_names,
start_window_idx,
window_count,
start_sample_idx,
sample_count,
include_trigger,
include_sample_info,
include_gap,
)
[docs] def get_probe_data(
self,
probe_name: str,
start_window_idx: int = 0,
window_count: Optional[int] = None,
start_sample_idx: int = 0,
sample_count: Optional[int] = None,
) -> List[int]:
"""
Get waveform data as a list of int values for one probe.
By default, all samples for the probe are returned,
It is possible to select window range and sample range.
Args:
probe_name (str): probe name.
start_window_idx (int): Starting window index. Default is first window.
window_count (Optional[int]): Number of windows to export. Default is all windows.
start_sample_idx (int): Starting sample within window. Default is first sample.
sample_count (Optional[int]): Number of samples per window. Default is all samples.
Returns (List[int]):
List probe values.
"""
res_dict = get_waveform_data_values(
self,
[probe_name],
start_window_idx,
window_count,
start_sample_idx,
sample_count,
include_trigger=False,
include_sample_info=False,
include_gap=False,
)
return res_dict[probe_name]
def __str__(self) -> str:
items = {key: val for key, val in self.__dict__.items() if key != "data"}
return pformat(items, 2)
def __repr__(self) -> str:
data_size = len(self.data) if self.data else 0
json_dict = {
"width": self.width,
"sample_count": self.sample_count,
"trigger_position": self.trigger_position,
"window_size": self.window_size,
"probes": {key: asdict(val) for key, val in self.probes.items()},
"data size": hex(data_size),
}
ret = json.dumps(json_dict, cls=Enum2StrEncoder, indent=4)
return ret
def get_txn_data(self, slot: int, kind: str) -> List[Dict[str, Any]]:
if not self.transactionAssembler:
return []
transactions = self.transactionAssembler.get_transactions_by_slot()
slot_txns = transactions.get(slot, [])
read_txns = [
t
for t in slot_txns
if getattr(t, "kind", None) == kind and getattr(t, "end_idx", None) is not None
]
if not read_txns:
raise ValueError("No completed read transactions in slot 0")
start_sample_idx = min(t.start_idx for t in read_txns)
latest_end_idx = max(t.end_idx for t in read_txns)
sample_count = latest_end_idx - start_sample_idx + 1
if sample_count < 0:
raise ValueError(
f"Invalid sample range: start={start_sample_idx}, end={latest_end_idx}"
)
unique_probes = []
seen = set()
def _add_probe(name: str):
if name and name not in seen:
seen.add(name)
unique_probes.append(name)
for t in read_txns:
p = t.probes
for full in p.values():
_add_probe(full)
if not unique_probes:
raise ValueError("No probe names found on read transactions")
# Render per-slot lanes
pdata = self.get_data(
probe_names=unique_probes, start_sample_idx=start_sample_idx, sample_count=sample_count
)
return pdata
def tcf_get_waveform_data(tcf_node) -> {}:
tcf_props = tcf_node.get_property_group(["data"])
props = {
"width": tcf_props["trace_width"],
"sample_count": tcf_props["trace_sample_count"],
"trigger_position": tcf_props["trace_trigger_position"],
"window_size": tcf_props["trace_window_size"],
"data": tcf_props["trace_data"],
}
return props
class WaveformWriter(object):
def __init__(self, file_handle: Union[TextIOBase, None], probes: [ILAWaveformProbe]):
self._file_handle = file_handle
self._probes = probes
self._probe_count = len(probes)
self._probe_names = self.make_probe_names()
self._probe_widths = [p.length() for p in probes]
def handle(self) -> TextIOBase:
return self._file_handle
def write(self, msg: str):
self._file_handle.write(msg)
@staticmethod
def make_values_unknown(in_values: List[str], unknown_ch: str) -> List[str]:
return [unknown_ch * len(val) for val in in_values]
@abstractmethod
def make_probe_names(self) -> [str]:
return [p.name for p in self._probes]
@abstractmethod
def init(self):
pass
@abstractmethod
def write_sample(
self,
sample_position: int,
window_index: int,
sample_in_window_index: int,
is_trigger: bool,
probe_binary_reversed_values: [str],
is_last_sample: bool,
is_gap: bool,
) -> None:
"""
Args:
sample_position (int):
window_index (int):
sample_in_window_index (int):
is_trigger (bool):
probe_binary_reversed_values ([str]): binary string values each start lsb at position zero.
is_last_sample(bool): Last sample in waveform.
is_gap (bool): True ig no data available for the sample.
Returns:
"""
pass
class WaveformWriterCSV(WaveformWriter):
def __init__(self, file_handle: TextIOBase, probes: [ILAWaveformProbe], include_gap: bool):
WaveformWriter.__init__(self, file_handle, probes)
self._include_gap = include_gap
def init(self):
self.write("Sample in Buffer,Sample in Window,TRIGGER,")
if self._include_gap:
self.write("GAP,")
self.write(",".join(self._probe_names))
self.write("\nRadix - UNSIGNED,UNSIGNED,UNSIGNED,")
# Currently, HEX is the only supported radix
self.write(",".join(["HEX"] * self._probe_count))
self.write("\n")
def make_probe_names(self) -> [str]:
return [p.name + p.bus_range_str() for p in self._probes]
def write_sample(
self,
sample_position: int,
window_index: int,
sample_in_window_index: int,
is_trigger: bool,
probe_binary_reversed_values: [str],
is_last_sample: bool,
is_gap: bool,
) -> None:
hex_values = bin_reversed_to_hex_values(probe_binary_reversed_values)
if is_gap:
hex_values = WaveformWriter.make_values_unknown(hex_values, "X")
hex_values_str = ",".join(hex_values)
trig_mark = "1" if is_trigger else "0"
if self._include_gap:
gap_value = "1" if is_gap else "0"
self.write(
f"{sample_position},{sample_in_window_index},{trig_mark},{gap_value},{hex_values_str}\n"
)
else:
self.write(f"{sample_position},{sample_in_window_index},{trig_mark},{hex_values_str}\n")
class WaveformWriterToDict(WaveformWriter):
def __init__(
self,
probes: [ILAWaveformProbe],
include_trigger: bool,
include_sample_info: bool,
include_gap: bool,
):
WaveformWriter.__init__(self, None, probes)
self._include_trigger = include_trigger
self._include_sample_info = include_sample_info
self._include_gap = include_gap
self._result = defaultdict(list)
def init(self):
return
def get_result(self) -> Dict[str, List[int]]:
return self._result
def write_sample(
self,
sample_position: int,
window_index: int,
sample_in_window_index: int,
is_trigger: bool,
probe_binary_reversed_values: [str],
is_last_sample: bool,
is_gap: bool,
) -> None:
if self._include_trigger:
self._result["__TRIGGER"].append(1 if is_trigger else 0)
if self._include_sample_info:
self._result["__SAMPLE_INDEX"].append(sample_position)
self._result["__WINDOW_INDEX"].append(window_index)
self._result["__WINDOW_SAMPLE_INDEX"].append(sample_in_window_index)
if self._include_gap:
self._result["__GAP"].append(1 if is_gap else 0)
int_values = [int(r_val[::-1], 2) for r_val in probe_binary_reversed_values]
for probe_name, val in zip(self._probe_names, int_values):
self._result[probe_name].append(val)
class WaveformWriterVCD(WaveformWriter):
"""Value Change Dump format. See Wikipedia and IEEE Std 1364-2001."""
def __init__(self, file_handle: TextIOBase, probes: [ILAWaveformProbe], include_gap: bool):
WaveformWriter.__init__(self, file_handle, probes)
self._values = None
self._vars: List[str] = None
self._trigger_var = None
self._window_var = None
self._gap_var = None
self._include_gap = include_gap
self._prev_sample_is_trigger = None
self._prev_window_index = None
self._prev_sample_is_gap = None
@staticmethod
def _generate_vars() -> Generator[str, None, None]:
"""Supports up to 94 + 94*94 + 94*94*94 = 839,514 probes (or variables)."""
for x in range(ord("!"), ord("~")):
yield chr(x)
for xx in range(ord("!"), ord("~")):
for yy in range(ord("!"), ord("~")):
yield chr(xx) + chr(yy)
for xxx in range(ord("!"), ord("~")):
for yyy in range(ord("!"), ord("~")):
for zzz in range(ord("!"), ord("~")):
yield chr(xxx) + chr(yyy) + chr(zzz)
def _write_variable_definitions(self):
generate_vars = WaveformWriterVCD._generate_vars
self._values = [None] * self._probe_count
self._trigger_var, self._window_var, self._gap_var, *self._vars = islice(
generate_vars(),
self._probe_count + 3,
)
for width, var, probe_name in zip(self._probe_widths, self._vars, self._probe_names):
self.write(f"$var reg {width} {var} {probe_name} $end\n")
self.write(
f"$var reg 1 {self._trigger_var} _TRIGGER $end\n"
f"$var reg 1 {self._window_var} _WINDOW $end\n"
)
if self._include_gap:
self.write(f"$var reg 1 {self._gap_var} _GAP $end\n")
def make_probe_names(self) -> [str]:
return [p.name + " " + p.bus_range_str() for p in self._probes]
def init(self):
now = "{:%Y-%m-%d %H:%M:%S}".format(datetime.now())
# adjust for trigger position
hdr_1 = f"$date\n {now}\n$end\n"
hdr_2 = (
"$version\n"
+ f" ChipScoPy Version {chipscopy.__version__}\n"
+ "$end\n"
+ "$timescale\n 1ps\n$end\n"
+ "$scope module dut $end\n"
)
hdr_3 = "$upscope $end\n$enddefinitions $end\n"
self.write(hdr_1 + hdr_2)
self._write_variable_definitions()
self.write(hdr_3)
def write_sample(
self,
sample_position: int,
window_index,
sample_in_window_index,
is_trigger,
probe_binary_reversed_values: [str],
is_last_sample,
is_gap: bool,
):
time_written = False
def write_value(var: str, reversed_value: str) -> None:
nonlocal time_written
if not time_written:
self.write(f"#{sample_position}\n")
time_written = True
if len(reversed_value) == 1:
self.write(f"{reversed_value}{var}\n")
return
# Remove leading zeroes
if is_gap and reversed_value[0] == "x":
value = reversed_value
else:
# Remove leading zeroes
msb_idx = reversed_value.rfind("1")
if msb_idx < 0:
msb_idx = 0
value = reversed_value[msb_idx::-1]
self.write(f"b{value} {var}\n")
# Write time, for last sample, even if no changes.
if is_last_sample:
self.write(f"#{sample_position}\n")
time_written = True
# Trigger value
if is_trigger != self._prev_sample_is_trigger:
write_value(self._trigger_var, "1" if is_trigger else "0")
self._prev_sample_is_trigger = is_trigger
# Window marker.
if window_index != self._prev_window_index:
write_value(self._window_var, "1" if window_index % 2 else "0")
self._prev_window_index = window_index
# gap value
if self._include_gap and is_gap != self._prev_sample_is_gap:
write_value(self._gap_var, "1" if is_gap else "0")
self._prev_sample_is_gap = is_gap
# Regular values
if is_gap:
probe_binary_reversed_values = WaveformWriter.make_values_unknown(
probe_binary_reversed_values, "x"
)
for idx, new_val in enumerate(probe_binary_reversed_values):
if new_val == self._values[idx]:
continue
self._values[idx] = new_val
write_value(self._vars[idx], new_val)
def export_waveform_to_stream(
waveform: ILAWaveform,
export_format: str,
stream_handle: TextIOBase,
probe_names: [str],
start_window_idx: int,
window_count: Optional[int],
start_sample_idx: int,
sample_count: Optional[int],
include_gap: bool,
) -> None:
"""Arguments documented in calling API function"""
if probe_names:
probes = [waveform.probes.get(p_name, None) for p_name in probe_names]
if not all(probes):
bad_names = set(probe_names) - set(waveform.probes.keys())
raise KeyError(f"export_waveform() called with non-existent probe_name:\n {bad_names}")
else:
probes = waveform.probes.values()
if export_format.upper() == "CSV":
waveform_writer = WaveformWriterCSV(stream_handle, probes, include_gap)
elif export_format.upper() == "VCD":
waveform_writer = WaveformWriterVCD(stream_handle, probes, include_gap)
else:
raise ValueError(
f'ILAWaveform.export_waveform() called with non-supported format "{export_format}"'
)
_export_waveform(
waveform,
waveform_writer,
probes,
start_window_idx,
window_count,
start_sample_idx,
sample_count,
"export_waveform()",
)
def get_waveform_data_values(
waveform: ILAWaveform,
probe_names: [str],
start_window_idx: int,
window_count: Optional[int],
start_sample_idx: int,
sample_count: Optional[int],
include_trigger: bool,
include_sample_info: bool,
include_gap: bool,
) -> Dict[str, List[int]]:
"""Arguments documented in calling API function"""
if probe_names:
probes = [waveform.probes.get(p_name, None) for p_name in probe_names]
if not all(probes):
bad_names = set(probe_names) - set(waveform.probes.keys())
raise KeyError(
f"ILAWaveform.get_data() called with non-existent probe name(s):\n {bad_names}"
)
else:
probes = waveform.probes.values()
waveform_writer = WaveformWriterToDict(
probes, include_trigger, include_sample_info, include_gap
)
_export_waveform(
waveform,
waveform_writer,
probes,
start_window_idx,
window_count,
start_sample_idx,
sample_count,
"ILAWaveform.get_data()",
)
return waveform_writer.get_result()
def _export_waveform(
waveform: ILAWaveform,
writer: WaveformWriter,
probes: [ILAWaveformProbe],
start_window_idx: int,
window_count: Optional[int],
start_sample_idx: int,
sample_count: Optional[int],
calling_function: str,
) -> None:
# It is at this point where W_events dissapears for whatever reason.
def get_sample_binary_values(sample_data: memoryview) -> [str]:
sample_int_value = int.from_bytes(sample_data, byteorder="little", signed=False)
reversed_bin_value = f"{sample_int_value:0{waveform.width}b}"[::-1]
values = []
for probe in probes:
# Each probe value can be made up of multiple bit range slices.
slices = [reversed_bin_value[br.index : br.index + br.length] for br in probe.map_range]
p_val = "".join(slices)
values.append(p_val)
return values
def convert_to_display_values(bin_values: [str], probes: [ILAWaveformProbe]) -> [str]:
"""Convert binary values to display values based on probe settings."""
display_values = []
for i, (bin_val, probe) in enumerate(zip(bin_values, probes)):
if probe.display_radix == ILAProbeRadix.ENUM and probe.enum_def:
# Convert binary to integer
int_val = int(bin_val[::-1], 2) if bin_val else 0
try:
# Get enum member
enum_member = probe.enum_def(int_val)
# Check if probe has display map for custom labels
if hasattr(probe, "_display_map"):
display_val = probe._display_map.get(enum_member.name, enum_member.name)
else:
display_val = enum_member.name
display_values.append(display_val)
except ValueError:
# Value not in enum, show as hex
display_values.append(f"0x{int_val:X}")
else:
# Keep binary value for non-enum probes
display_values.append(bin_val)
return display_values
def is_gap(sample_high_byte: int, gap_index: Optional[int]) -> bool:
if gap_index and sample_high_byte & (1 << (gap_index % 8)):
return True
return False
if not window_count:
window_count = waveform.get_window_count()
if not sample_count:
sample_count = waveform.window_size
w_size = waveform.window_size
max_window_count = (waveform.sample_count + w_size - 1) // w_size
if start_window_idx < 0 or start_window_idx >= max_window_count:
raise ValueError(
f'{calling_function} function argument start_window="{start_window_idx}" '
f"must be in the range [0-{max_window_count - 1}]"
)
if start_sample_idx < 0 or start_sample_idx >= w_size:
raise ValueError(
f'{calling_function} function argument "start_sample="{start_sample_idx}" '
f"must be in the range [0-{w_size - 1}]"
)
if sample_count < 1 or sample_count > w_size - start_sample_idx:
raise ValueError(
f'{calling_function} function argument "sample_count="{sample_count}" '
f"must be in the range [1-{w_size - start_sample_idx}], "
f'since start_sample_idx="{start_sample_idx}".'
)
if window_count < 1 or window_count > max_window_count - start_window_idx:
raise ValueError(
f'{calling_function} function argument "window_count="{window_count}" '
f"must be in the range [1-{max_window_count - start_window_idx}], "
f'since start_window_idx="{start_window_idx}".'
)
sample_byte_count = waveform.bytes_per_sample()
raw_samples = memoryview(waveform.data)
sample_is_gap = False
writer.init()
for window_idx in range(start_window_idx, start_window_idx + window_count):
window_start_sample_idx = window_idx * w_size
trigger_sample_idx = waveform.trigger_position[window_idx]
for sample_idx in range(
window_start_sample_idx + start_sample_idx,
window_start_sample_idx + start_sample_idx + sample_count,
):
if sample_idx >= waveform.sample_count:
# last window is not full.
break
raw_sample_idx = sample_idx * sample_byte_count
raw_sample_idx_end = raw_sample_idx + sample_byte_count
sample_idx_in_window = sample_idx - window_start_sample_idx
bin_values = get_sample_binary_values(raw_samples[raw_sample_idx:raw_sample_idx_end])
if waveform.gap_index:
sample_is_gap = is_gap(raw_samples[raw_sample_idx_end - 1], waveform.gap_index)
writer.write_sample(
sample_idx,
window_idx,
sample_idx_in_window,
sample_idx_in_window == trigger_sample_idx,
bin_values,
sample_idx + 1 == waveform.sample_count,
sample_is_gap,
)
WAVEFORM_ARCHIVE_VERSION = 1
class Waveform2StrEncoder(json.JSONEncoder):
@staticmethod
def encode_map_range(obj):
# Convert ILABitRange tuple to a dict, e.g. [{"index": 0, "length": 2}]
if isinstance(obj, list):
return [elem._asdict() if isinstance(elem, ILABitRange) else elem for elem in obj]
return obj
def default(self, obj):
if isinstance(obj, ILAWaveformProbe):
return {
key: Waveform2StrEncoder.encode_map_range(val) for key, val in asdict(obj).items()
}
if isinstance(obj, ILABitRange):
return obj._asdict()
if isinstance(obj, ILAProbeRadix):
return str(obj.name)
if isinstance(obj, enum.EnumMeta):
# Encode as 2-item list: [<name>, <dict of enum value items>]
items = {item.name: item.value for item in list(obj)}
return [obj.__name__, items]
return json.JSONEncoder.default(self, obj)
def export_compressed_waveform(
waveform: ILAWaveform,
filepath_or_buffer: Union[str, BytesIO],
compression: int,
compresslevel: int,
) -> None:
waveform_dict = {
"version": 1,
"gap_index": waveform.gap_index,
"probes": waveform.probes,
"sample_count": waveform.sample_count,
"trigger_position": waveform.trigger_position,
"window_size": waveform.window_size,
"width": waveform.width,
}
json_str = json.dumps(waveform_dict, cls=Waveform2StrEncoder, indent=4)
with ZipFile(
filepath_or_buffer,
mode="w",
allowZip64=True,
compression=compression,
compresslevel=compresslevel,
) as zf:
zf.writestr("waveform.cfg", json_str)
zf.writestr("waveform.data", waveform.data)
def decode_waveform_from_json(json_str: str, in_data: bytearray) -> ILAWaveform:
def decode_map_range(in_range: List[Dict[str, int]]) -> List[ILABitRange]:
res = [ILABitRange(**dd) for dd in in_range]
return res
def decode_probe(probe_name: str, in_dict: Dict) -> ILAWaveformProbe:
pd = {}
for key, value in in_dict.items():
try:
if key == "map_range":
pd[key] = decode_map_range(value)
elif key == "display_radix":
pd[key] = ILAProbeRadix[value]
elif key == "enum_def":
if isinstance(value, list) and len(value) == 2:
pd[key] = enum.Enum(value[0], value[1])
else:
pd[key] = None
else:
pd[key] = value
except Exception as ex:
raise ValueError(f'Error reading ILA waveform probe "{probe_name}"') from ex
res = ILAWaveformProbe(**pd)
return res
def decode_probes(in_probes: Dict) -> Dict[str, ILAWaveformProbe]:
res = {name: decode_probe(name, probe_dict) for name, probe_dict in in_probes.items()}
return res
# Decode waveform json string.
json_dict = json.load(StringIO(json_str))
wd = dict()
wd["data"] = in_data
wd["probes"] = decode_probes(json_dict.get("probes", dict()))
for key, value in json_dict.items():
if key in ["data", "probes"]:
# data and probes, handled above.
pass
elif key == "version":
if value > WAVEFORM_ARCHIVE_VERSION:
raise ValueError(
f'waveform archive version "{value}" is not supported.'
f'Only versions "<={WAVEFORM_ARCHIVE_VERSION}" are supported.'
)
else:
wd[key] = value
return ILAWaveform(**wd)
def import_compressed_waveform(filepath_or_buffer: Union[str, BytesIO]) -> ILAWaveform:
with ZipFile(filepath_or_buffer, mode="r") as zf:
json_str = zf.read("waveform.cfg").decode()
data = bytearray(zf.read("waveform.data"))
waveform = decode_waveform_from_json(json_str, data)
return waveform
class ProbeDataMapper:
"""Wraps ProbeData to map simple signal names to full probe names."""
def __init__(self, probe_data, signal_mapping):
self._probe_data = probe_data
self._mapping = signal_mapping
def rising_edge(self, signal, idx):
full_name = self._mapping.get(signal, signal)
return self._probe_data.rising_edge(full_name, idx)
def both_asserted(self, sig1, sig2, idx):
full_sig1 = self._mapping.get(sig1, sig1)
full_sig2 = self._mapping.get(sig2, sig2)
return self._probe_data.both_asserted(full_sig1, full_sig2, idx)
def signal(self, signal, idx):
full_name = self._mapping.get(signal, signal)
return self._probe_data.signal(full_name, idx)