diff --git a/pyproject.toml b/pyproject.toml index 195cc767..5f68208a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,8 @@ test = [ "cocotb==2.0.1", "cocotb-bus==0.3.0", "cocotbext-axi==0.1.26", - "cocotbext-umi==0.0.3" + "cocotbext-umi==0.0.3", + "cocotbext-apb==0.11.0" ] [tool.check-wheel-contents] diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/adapters/tl2umi/env.py b/tests/adapters/tl2umi/env.py new file mode 100644 index 00000000..919e3b3b --- /dev/null +++ b/tests/adapters/tl2umi/env.py @@ -0,0 +1,143 @@ +# Owns the driver, monitor, and scoreboard for TL to UMI adapter tests, +# and provides common functionality for the tests. +# +# Uses cocotbext-umi UmiMemoryDevice as a pure-Python replacement for +# the Verilog umi_memagent. The DUT is tl2umi directly (no wrapper needed). + +from cocotb.clock import Clock +from cocotb.triggers import ClockCycles, Timer + +from cocotb_bus.scoreboard import Scoreboard + +from cocotbext.umi.drivers.sumi_driver import SumiDriver +from cocotbext.umi.monitors.sumi_monitor import SumiMonitor +from cocotbext.umi.models.umi_memory_device import UmiMemoryDevice + +from tl_driver import TLDriver +from tl_monitor import TLMonitor, TLDResponse, TLDOpcode + + +async def do_reset(reset, time_ns, active_level=False): + """Perform an async reset""" + reset.value = not active_level + await Timer(1, unit="step") + reset.value = active_level + await Timer(time_ns, "ns") + reset.value = not active_level + await Timer(1, unit="step") + + +class TL2UMIEnv: + """Test environment for tl2umi adapter with Python UmiMemoryDevice backend""" + + def __init__(self, dut, clk_period_ns=10): + self.dut = dut + self.clk_period_ns = clk_period_ns + + # Extract parameters from DUT + self.cw = int(dut.CW.value) # UMI command width (32) + self.aw = int(dut.AW.value) # Address width (64) + self.dw = int(dut.DW.value) # Data width (64) + + self.data_size = self.dw // 8 # 8 bytes + + self.expected_responses = [] + + self.clk = dut.clk + self.nreset = dut.nreset + + self._build() + + def _build(self): + dut = self.dut + + # TileLink A-channel driver (sends requests) + self.tl_driver = TLDriver( + entity=dut, + name="tl_a", + clock=self.clk, + bus_separator="_", + ) + + # TileLink D-channel monitor (receives responses) + self.tl_monitor = TLMonitor( + entity=dut, + name="tl_d", + clock=self.clk, + bus_separator="_", + ) + + # UMI request monitor (observes requests from tl2umi) + self.sumi_req_monitor = SumiMonitor( + entity=dut, name="uhost_req", clock=self.clk + ) + + # Drive UMI request ready (accept all requests immediately) + dut.uhost_req_ready.value = 1 + + # UMI response driver (sends responses back to tl2umi) + self.sumi_resp_driver = SumiDriver( + entity=dut, name="uhost_resp", clock=self.clk + ) + + # Python UMI memory device (replaces Verilog umi_memagent) + self.mem_device = UmiMemoryDevice( + monitor=self.sumi_req_monitor, + driver=self.sumi_resp_driver, + log=dut._log + ) + + # Scoreboard for response checking + self.scoreboard = Scoreboard(dut, fail_immediately=True) + self.scoreboard.add_interface( + monitor=self.tl_monitor, + expected_output=self.expected_responses, + ) + + async def start(self): + """Start clocks and perform reset""" + Clock(self.clk, self.clk_period_ns, unit="ns").start() + await do_reset(self.nreset, self.clk_period_ns) + + # Initialize DUT configuration signals + self.dut.srcaddr.value = 0xAE510000 + + async def wait_for_responses(self, max_cycles=1000): + """Wait for all expected responses to be received""" + cycles = 0 + while self.expected_responses: + await ClockCycles(self.clk, 1) + cycles += 1 + if cycles > max_cycles: + raise TimeoutError( + f"Timeout waiting for responses " + f"({len(self.expected_responses)} remaining)" + ) + + +def create_expected_read_response(address, size, data, source=0): + """Create expected TileLink D-channel read response""" + return TLDResponse( + opcode=TLDOpcode.AccessAckData, + param=0, + size=size, + source=source, + sink=0, + denied=False, + data=data, + corrupt=False, + ) + + +def create_expected_write_response(size, source=0): + """Create expected TileLink D-channel write response""" + return TLDResponse( + opcode=TLDOpcode.AccessAck, + param=0, + size=size, + source=source, + sink=0, + denied=False, + data=0, + corrupt=False, + ) diff --git a/tests/adapters/tl2umi/test_advanced.py b/tests/adapters/tl2umi/test_advanced.py new file mode 100644 index 00000000..d578ffdd --- /dev/null +++ b/tests/adapters/tl2umi/test_advanced.py @@ -0,0 +1,439 @@ +import cocotb + +from cocotb.handle import SimHandleBase +from cocotb.triggers import ClockCycles + +from tl_driver import TLTransaction, TLArithParam, TLLogicParam +from env import TL2UMIEnv, create_expected_write_response, create_expected_read_response + + +@cocotb.test(timeout_time=50, timeout_unit="ms") +async def test_backpressure(dut: SimHandleBase): + """ + Test backpressure handling: + 1. Send transaction with ready enabled + 2. Wait for valid to assert + 3. Apply backpressure + 4. Verify response held + 5. Release backpressure and verify response completes + """ + env = TL2UMIEnv(dut) + await env.start() + dut.tl_d_ready.value = 1 + + test_addr = 0x100 + test_data = 0xDEADBEEF + size = 2 + + print("=== Backpressure Test ===") + + # Queue expected response + env.expected_responses.append( + create_expected_write_response(size=size, source=0) + ) + + # Send write transaction + env.tl_driver.append( + TLTransaction.put_full(address=test_addr, size=size, data=test_data, source=0) + ) + print(f"Sent write: addr=0x{test_addr:x}, data=0x{test_data:08x}") + + # Wait for first response to complete + await env.wait_for_responses(max_cycles=100) + print("First transaction completed") + + # Now test backpressure: send second transaction and apply backpressure mid-flight + test_addr2 = 0x200 + test_data2 = 0xCAFEBABE + + env.expected_responses.append( + create_expected_write_response(size=size, source=1) + ) + + env.tl_driver.append( + TLTransaction.put_full(address=test_addr2, size=size, data=test_data2, source=1) + ) + print(f"Sent second write: addr=0x{test_addr2:x}, data=0x{test_data2:08x}") + + # Wait a few cycles then apply backpressure + await ClockCycles(env.clk, 5) + dut.tl_d_ready.value = 0 + print("Applied backpressure (tl_d_ready=0)") + + # Wait while backpressure is applied + await ClockCycles(env.clk, 20) + + # Response should still be pending + assert len(env.expected_responses) == 1, "Response should not have been consumed yet" + print("Response held with backpressure") + + # Release backpressure + dut.tl_d_ready.value = 1 + print("Released backpressure (tl_d_ready=1)") + + # Wait for response + await env.wait_for_responses(max_cycles=10) + + print("=== Backpressure Test PASSED ===") + raise env.scoreboard.result + + +@cocotb.test(timeout_time=50, timeout_unit="ms") +async def test_partial_write(dut: SimHandleBase): + env = TL2UMIEnv(dut) + await env.start() + dut.tl_d_ready.value = 1 + + test_addr = 0x500 + size = 2 # 4 bytes + + print("=== Partial Write Test ===") + + # Write full word + init_data = 0xAAAAAAAA + env.expected_responses.append( + create_expected_write_response(size=size, source=0) + ) + env.tl_driver.append( + TLTransaction.put_full(address=test_addr, size=size, data=init_data, source=0) + ) + await env.wait_for_responses(max_cycles=100) + print(f" Init write: 0x{init_data:08x} -> 0x{test_addr:03x}") + + # Partial write + partial_data = 0x0000BBBB + # Only supports contiguous masks + mask = 0b0011 + env.expected_responses.append( + create_expected_write_response(size=size, source=1) + ) + env.tl_driver.append( + TLTransaction.put_partial(address=test_addr, size=size, data=partial_data, mask=mask, source=1) + ) + await env.wait_for_responses(max_cycles=100) + print(f" Partial write: 0x{partial_data:08x} mask=0b{mask:04b}") + + # Read back + expected_data = 0xAAAABBBB + env.expected_responses.append( + create_expected_read_response(address=test_addr, size=size, data=expected_data, source=2) + ) + env.tl_driver.append( + TLTransaction.get(address=test_addr, size=size, source=2) + ) + await env.wait_for_responses(max_cycles=100) + print(f" Read back: expected 0x{expected_data:08x}") + + raise env.scoreboard.result + + +@cocotb.test(timeout_time=50, timeout_unit="ms") +async def test_back_to_back_writes(dut: SimHandleBase): + + env = TL2UMIEnv(dut) + await env.start() + dut.tl_d_ready.value = 1 + + size = 2 # 4 bytes + num_transactions = 8 + + print("=== Back-to-Back Writes Test ===") + + # Queue all expected responses + for i in range(num_transactions): + env.expected_responses.append( + create_expected_write_response(size=size, source=i) + ) + + # Queue all transactions + for i in range(num_transactions): + addr = 0x1000 + (i * 4) + data = 0x10000000 + i + env.tl_driver.append( + TLTransaction.put_full(address=addr, size=size, data=data, source=i) + ) + print(f" Queued write {i}: 0x{data:08x} -> 0x{addr:04x}") + + # Wait for all responses + await env.wait_for_responses(max_cycles=500) + print(f"All {num_transactions} write responses received") + + raise env.scoreboard.result + + +@cocotb.test(timeout_time=50, timeout_unit="ms") +async def test_back_to_back_reads(dut: SimHandleBase): + + env = TL2UMIEnv(dut) + await env.start() + dut.tl_d_ready.value = 1 + + size = 2 # 4 bytes + num_transactions = 4 + base_addr = 0x2000 + + print("=== Back-to-Back Reads Test ===") + + # Write data to memory + # Use 8-byte aligned addresses since RTL drops lower 3 bits + for i in range(num_transactions): + addr = base_addr + (i * 8) + data = 0xBABE0000 + i + env.expected_responses.append( + create_expected_write_response(size=size, source=i) + ) + env.tl_driver.append( + TLTransaction.put_full(address=addr, size=size, data=data, source=i) + ) + await env.wait_for_responses(max_cycles=100) + print(f" Wrote {num_transactions} words to memory") + + # Now read back + for i in range(num_transactions): + addr = base_addr + (i * 8) + expected_data = 0xBABE0000 + i + read_source = 16 + i # Different from write sources, but still <= 31 + env.expected_responses.append( + create_expected_read_response(address=addr, size=size, data=expected_data, source=read_source) + ) + env.tl_driver.append( + TLTransaction.get(address=addr, size=size, source=read_source) + ) + await env.wait_for_responses(max_cycles=100) + print(f" Read {i}: 0x{addr:04x}") + + print(f" All {num_transactions} read responses received") + + raise env.scoreboard.result + + +@cocotb.test(timeout_time=50, timeout_unit="ms") +async def test_different_source_ids(dut: SimHandleBase): + + env = TL2UMIEnv(dut) + await env.start() + dut.tl_d_ready.value = 1 + + size = 2 # 4 bytes + + print("=== Source ID Matching Test ===") + + # address, data, source_id + test_cases = [ + (0x100, 0x11111111, 7), + (0x108, 0x22222222, 3), + (0x110, 0x33333333, 15), + (0x118, 0x44444444, 1), + ] + + # Write + for addr, data, source in test_cases: + env.expected_responses.append( + create_expected_write_response(size=size, source=source) + ) + env.tl_driver.append( + TLTransaction.put_full(address=addr, size=size, data=data, source=source) + ) + await env.wait_for_responses(max_cycles=100) + print(f" Write source={source}: 0x{data:08x} -> 0x{addr:03x}") + + # Read back with different source IDs + for addr, data, source in test_cases: + read_source = source + 16 # Different source for reads + env.expected_responses.append( + create_expected_read_response(address=addr, size=size, data=data, source=read_source) + ) + env.tl_driver.append( + TLTransaction.get(address=addr, size=size, source=read_source) + ) + await env.wait_for_responses(max_cycles=100) + print(f" Read source={read_source}: 0x{addr:03x} -> 0x{data:08x}") + + raise env.scoreboard.result + + +@cocotb.test(timeout_time=50, timeout_unit="ms") +async def test_mixed_read_write_same_address(dut: SimHandleBase): + + env = TL2UMIEnv(dut) + await env.start() + dut.tl_d_ready.value = 1 + + test_addr = 0x800 + size = 2 # 4 bytes + + print("=== Mixed Read/Write Same Address Test ===") + + # Write initial val + data1 = 0xAAAAAAAA + env.expected_responses.append(create_expected_write_response(size=size, source=0)) + env.tl_driver.append(TLTransaction.put_full(address=test_addr, size=size, data=data1, source=0)) + await env.wait_for_responses(max_cycles=100) + print(f" Write 1: 0x{data1:08x}") + + # Read back + env.expected_responses.append(create_expected_read_response(address=test_addr, size=size, data=data1, source=1)) + env.tl_driver.append(TLTransaction.get(address=test_addr, size=size, source=1)) + await env.wait_for_responses(max_cycles=100) + print(f" Read 1: 0x{data1:08x}") + + # Write new value + data2 = 0xBBBBBBBB + env.expected_responses.append(create_expected_write_response(size=size, source=2)) + env.tl_driver.append(TLTransaction.put_full(address=test_addr, size=size, data=data2, source=2)) + await env.wait_for_responses(max_cycles=100) + print(f" Write 2: 0x{data2:08x}") + + # Read back new value + env.expected_responses.append(create_expected_read_response(address=test_addr, size=size, data=data2, source=3)) + env.tl_driver.append(TLTransaction.get(address=test_addr, size=size, source=3)) + await env.wait_for_responses(max_cycles=100) + print(f" Read 2: 0x{data2:08x}") + + raise env.scoreboard.result + + +@cocotb.test(timeout_time=50, timeout_unit="ms") +async def test_all_sizes(dut: SimHandleBase): + """ + Test all supported sizes: 1, 2, 4, 8 byts + """ + env = TL2UMIEnv(dut) + await env.start() + dut.tl_d_ready.value = 1 + + base_addr = 0xA00 + + print("=== All Sizes Test ===") + + # 1 byte + addr = base_addr + data = 0xAB + env.expected_responses.append(create_expected_write_response(size=0, source=0)) + env.tl_driver.append(TLTransaction.put_full(address=addr, size=0, data=data, source=0)) + await env.wait_for_responses(max_cycles=100) + env.expected_responses.append(create_expected_read_response(address=addr, size=0, data=data, source=1)) + env.tl_driver.append(TLTransaction.get(address=addr, size=0, source=1)) + await env.wait_for_responses(max_cycles=100) + print(f" Size 0 (1 byte): 0x{data:02x}") + + # 2 bytes + addr = base_addr + 0x10 + data = 0xABCD + env.expected_responses.append(create_expected_write_response(size=1, source=2)) + env.tl_driver.append(TLTransaction.put_full(address=addr, size=1, data=data, source=2)) + await env.wait_for_responses(max_cycles=100) + env.expected_responses.append(create_expected_read_response(address=addr, size=1, data=data, source=3)) + env.tl_driver.append(TLTransaction.get(address=addr, size=1, source=3)) + await env.wait_for_responses(max_cycles=100) + print(f" Size 1 (2 bytes): 0x{data:04x}") + + # 4 bytes + addr = base_addr + 0x20 + data = 0xABCD1234 + env.expected_responses.append(create_expected_write_response(size=2, source=4)) + env.tl_driver.append(TLTransaction.put_full(address=addr, size=2, data=data, source=4)) + await env.wait_for_responses(max_cycles=100) + env.expected_responses.append(create_expected_read_response(address=addr, size=2, data=data, source=5)) + env.tl_driver.append(TLTransaction.get(address=addr, size=2, source=5)) + await env.wait_for_responses(max_cycles=100) + print(f" Size 2 (4 bytes): 0x{data:08x}") + + # 8 bytes + addr = base_addr + 0x30 + data = 0xABCD1234DEADBEEF + env.expected_responses.append(create_expected_write_response(size=3, source=6)) + env.tl_driver.append(TLTransaction.put_full(address=addr, size=3, data=data, source=6)) + await env.wait_for_responses(max_cycles=100) + env.expected_responses.append(create_expected_read_response(address=addr, size=3, data=data, source=7)) + env.tl_driver.append(TLTransaction.get(address=addr, size=3, source=7)) + await env.wait_for_responses(max_cycles=100) + print(f" Size 3 (8 bytes): 0x{data:016x}") + + raise env.scoreboard.result + + +@cocotb.test(timeout_time=100, timeout_unit="ms") +async def test_atomic_add(dut: SimHandleBase): + + env = TL2UMIEnv(dut) + await env.start() + dut.tl_d_ready.value = 1 + + test_addr = 0xB00 + size = 2 # 4 bytes + initial_value = 100 + add_value = 50 + + print("=== Atomic ADD Test ===") + + # Write initial value (to be returned by atomic op) + env.expected_responses.append(create_expected_write_response(size=size, source=0)) + env.tl_driver.append(TLTransaction.put_full(address=test_addr, size=size, data=initial_value, source=0)) + await env.wait_for_responses(max_cycles=100) + print(f" Initial write: {initial_value}") + + # Atomic ADD - returns old value, stores sum + env.expected_responses.append( + create_expected_read_response(address=test_addr, size=size, data=initial_value, source=1) + ) + env.tl_driver.append( + TLTransaction.atomic_arith(address=test_addr, size=size, data=add_value, param=TLArithParam.ADD, source=1) + ) + await env.wait_for_responses(max_cycles=100) + print(f" Atomic ADD: +{add_value}, returned old value {initial_value}") + + # Read back - should be initial + add + expected_result = initial_value + add_value + env.expected_responses.append( + create_expected_read_response(address=test_addr, size=size, data=expected_result, source=2) + ) + env.tl_driver.append(TLTransaction.get(address=test_addr, size=size, source=2)) + await env.wait_for_responses(max_cycles=100) + print(f" Read back: {expected_result}") + + raise env.scoreboard.result + + +@cocotb.test(timeout_time=100, timeout_unit="ms") +async def test_atomic_xor(dut: SimHandleBase): + """ + Test logic (XOR) operation. + """ + env = TL2UMIEnv(dut) + await env.start() + dut.tl_d_ready.value = 1 + + test_addr = 0xD00 + size = 2 # 4 bytes + initial_value = 0xFF00FF00 + xor_value = 0x0F0F0F0F + + print("=== Atomic XOR Test ===") + + # Write initial value + env.expected_responses.append(create_expected_write_response(size=size, source=0)) + env.tl_driver.append(TLTransaction.put_full(address=test_addr, size=size, data=initial_value, source=0)) + await env.wait_for_responses(max_cycles=100) + print(f" Initial write: 0x{initial_value:08x}") + + # Atomic XOR - returns old value, stores old XOR operand + env.expected_responses.append( + create_expected_read_response(address=test_addr, size=size, data=initial_value, source=1) + ) + env.tl_driver.append( + TLTransaction.atomic_logic(address=test_addr, size=size, data=xor_value, param=TLLogicParam.XOR, source=1) + ) + await env.wait_for_responses(max_cycles=100) + print(f" Atomic XOR: 0x{xor_value:08x}, returned old 0x{initial_value:08x}") + + # Read back - should be XOR result + expected_result = initial_value ^ xor_value + env.expected_responses.append( + create_expected_read_response(address=test_addr, size=size, data=expected_result, source=2) + ) + env.tl_driver.append(TLTransaction.get(address=test_addr, size=size, source=2)) + await env.wait_for_responses(max_cycles=100) + print(f" Read back: 0x{expected_result:08x}") + + raise env.scoreboard.result diff --git a/tests/adapters/tl2umi/test_basic.py b/tests/adapters/tl2umi/test_basic.py new file mode 100644 index 00000000..b29fa693 --- /dev/null +++ b/tests/adapters/tl2umi/test_basic.py @@ -0,0 +1,181 @@ +import cocotb + +from cocotb.handle import SimHandleBase + +from tl_driver import TLTransaction +from env import TL2UMIEnv, create_expected_write_response, create_expected_read_response + + +@cocotb.test(timeout_time=50, timeout_unit="ms") +async def test_basic_write(dut: SimHandleBase): + """ + Basic write test: + 1. Single aligned TileLink write + 2. Verify write acknowledgment received + """ + env = TL2UMIEnv(dut) + await env.start() + dut.tl_d_ready.value = 1 + + test_addr = 0x100 + test_data = 0xDEADBEEF + size = 2 # 4 bytes + + print("=== Basic Write Test ===") + + # Queue expected write response + env.expected_responses.append( + create_expected_write_response(size=size, source=0) + ) + + # Send write transaction + env.tl_driver.append( + TLTransaction.put_full(address=test_addr, size=size, data=test_data, source=0) + ) + + # Wait for response + await env.wait_for_responses(max_cycles=100) + + print(f" Write to 0x{test_addr:08x} with data 0x{test_data:08x}") + print(" Write acknowledgment verified by scoreboard") + + raise env.scoreboard.result + + +@cocotb.test(timeout_time=50, timeout_unit="ms") +async def test_basic_read(dut: SimHandleBase): + """ + Basic read test: + 1. Write data to memory via TileLink + 2. Read it back via TileLink + 3. Verify read response contains correct data + """ + env = TL2UMIEnv(dut) + await env.start() + dut.tl_d_ready.value = 1 + + test_addr = 0x200 + test_data = 0xCAFEBABE + size = 2 # 4 bytes + + print("=== Write then Read Test ===") + + # write data to memory + env.expected_responses.append( + create_expected_write_response(size=size, source=1) + ) + env.tl_driver.append( + TLTransaction.put_full(address=test_addr, size=size, data=test_data, source=1) + ) + await env.wait_for_responses(max_cycles=100) + print(f" Write complete: 0x{test_data:08x} -> 0x{test_addr:08x}") + + # read it back + env.expected_responses.append( + create_expected_read_response(address=test_addr, size=size, data=test_data, source=2) + ) + env.tl_driver.append( + TLTransaction.get(address=test_addr, size=size, source=2) + ) + await env.wait_for_responses(max_cycles=100) + print(f" Read complete: got 0x{test_data:08x} from 0x{test_addr:08x}") + + raise env.scoreboard.result + + +@cocotb.test(timeout_time=50, timeout_unit="ms") +async def test_multiple_writes(dut: SimHandleBase): + """ + Multiple sequential writes to different addresses + """ + env = TL2UMIEnv(dut) + await env.start() + dut.tl_d_ready.value = 1 + + size = 2 # 4 bytes + test_cases = [ + (0x000, 0x11111111), + (0x004, 0x22222222), + (0x008, 0x33333333), + (0x00C, 0x44444444), + ] + + print("=== Multiple Writes Test ===") + + for i, (addr, data) in enumerate(test_cases): + env.expected_responses.append( + create_expected_write_response(size=size, source=i) + ) + env.tl_driver.append( + TLTransaction.put_full(address=addr, size=size, data=data, source=i) + ) + await env.wait_for_responses(max_cycles=100) + print(f" Write {i}: 0x{data:08x} -> 0x{addr:03x}") + + raise env.scoreboard.result + + +@cocotb.test(timeout_time=50, timeout_unit="ms") +async def test_byte_write(dut: SimHandleBase): + """ + Single byte write using size=0 + """ + env = TL2UMIEnv(dut) + await env.start() + dut.tl_d_ready.value = 1 + + test_addr = 0x300 + test_data = 0xAB + size = 0 # 1 byte + + print("=== Byte Write Test ===") + + env.expected_responses.append( + create_expected_write_response(size=size, source=0) + ) + env.tl_driver.append( + TLTransaction.put_full(address=test_addr, size=size, data=test_data, source=0) + ) + await env.wait_for_responses(max_cycles=100) + + print(f" Byte write: 0x{test_data:02x} -> 0x{test_addr:03x}") + + raise env.scoreboard.result + + +@cocotb.test(timeout_time=50, timeout_unit="ms") +async def test_64bit_write_read(dut: SimHandleBase): + """ + Full 64-bit (8 byte) write and read + """ + env = TL2UMIEnv(dut) + await env.start() + dut.tl_d_ready.value = 1 + + test_addr = 0x400 + test_data = 0xDEADBEEFCAFEBABE + size = 3 # 8 bytes + + print("=== 64-bit Write/Read Test ===") + + # Write + env.expected_responses.append( + create_expected_write_response(size=size, source=0) + ) + env.tl_driver.append( + TLTransaction.put_full(address=test_addr, size=size, data=test_data, source=0) + ) + await env.wait_for_responses(max_cycles=100) + print(f" Write: 0x{test_data:016x} -> 0x{test_addr:03x}") + + # Read + env.expected_responses.append( + create_expected_read_response(address=test_addr, size=size, data=test_data, source=1) + ) + env.tl_driver.append( + TLTransaction.get(address=test_addr, size=size, source=1) + ) + await env.wait_for_responses(max_cycles=100) + print(f" Read: got 0x{test_data:016x}") + + raise env.scoreboard.result diff --git a/tests/adapters/tl2umi/test_tl2umi_run.py b/tests/adapters/tl2umi/test_tl2umi_run.py new file mode 100644 index 00000000..2e329384 --- /dev/null +++ b/tests/adapters/tl2umi/test_tl2umi_run.py @@ -0,0 +1,75 @@ +import pytest + +from siliconcompiler import Sim, Design +from siliconcompiler.flows.dvflow import DVFlow +from siliconcompiler.tools.verilator.cocotb_compile import CocotbCompileTask as VerilatorCompileTask +from siliconcompiler.tools.verilator.cocotb_exec import CocotbExecTask as VerilatorCocotbExecTask + +from umi.adapters import TL2UMI + + +class TL2UMITestbench(Design): + """TL2UMI testbench for cocotb testing (UMI memory agent in Python)""" + + def __init__(self, aw=64, dw=64): + super().__init__() + + self.set_name("tb_tl2umi") + self.set_dataroot("tl2umi", __file__) + + with self.active_dataroot("tl2umi"): + with self.active_fileset("testbench.cocotb"): + self.set_topmodule("tl2umi") + # Add test files + self.add_file("test_basic.py", filetype="python") + self.add_file("test_advanced.py", filetype="python") + # Add helper Python modules (populates PYTHONPATH via DVFlow) + self.add_file("env.py", filetype="python") + self.add_file("tl_driver.py", filetype="python") + self.add_file("tl_monitor.py", filetype="python") + # Add RTL dependency (no Verilog wrapper needed) + self.add_depfileset(TL2UMI(), "rtl") + + # Store parameters + self.aw = aw + self.dw = dw + + +def run_tl2umi(simulator="verilator", waves=True, aw=64, dw=64, seed=None): + # Create project + project = Sim() + project.set_design(TL2UMITestbench(aw=aw, dw=dw)) + project.add_fileset("testbench.cocotb") + + # Set the cocotb design verification flow + project.set_flow(DVFlow(tool=f"{simulator}-cocotb")) + + # Configure compilation + compile_task = VerilatorCompileTask.find_task(project) + compile_task.set_verilator_trace(waves) + compile_task.add_parameter("AW", "int", "UMI address width", defvalue=aw) + compile_task.add_parameter("DW", "int", "UMI data width", defvalue=dw) + + # Run the simulation + project.run() + project.summary() + + # Check for failures + results = project.find_result( + step='simulate', + index='0', + directory="outputs", + filename="results.xml" + ) + if results: + print(f"\nCocotb results file: {results}") + + return project + + +@pytest.mark.sim +@pytest.mark.parametrize("simulator", ["verilator"]) +@pytest.mark.parametrize("aw", [32, 64]) +@pytest.mark.parametrize("dw", [64, 128]) +def test_tl2umi(simulator, aw, dw): + run_tl2umi(simulator, aw=aw, dw=dw) diff --git a/tests/adapters/tl2umi/testbench.v b/tests/adapters/tl2umi/testbench.v new file mode 100644 index 00000000..20c1833c --- /dev/null +++ b/tests/adapters/tl2umi/testbench.v @@ -0,0 +1,152 @@ +/******************************************************************************* + * Copyright 2024 Zero ASIC Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ---- + * + * Documentation: + * - TL-UH to UMI converter cocotb testbench + * + ******************************************************************************/ + +`timescale 1ns / 1ps +`default_nettype wire + +module testbench #( + parameter CW = 32, // UMI command width + parameter AW = 64, // UMI address width + parameter DW = 64, // UMI data width + parameter IDW = 48, // umi global chip ID width + parameter RAMDEPTH = 512 // Memory depth +) +( + input clk, + input nreset, + input [IDW-1:0] globalid, + + // TileLink A channel + output tl_a_ready, + input tl_a_valid, + input [2:0] tl_a_opcode, + input [2:0] tl_a_param, + input [2:0] tl_a_size, + input [4:0] tl_a_source, + input [55:0] tl_a_address, + input [7:0] tl_a_mask, + input [63:0] tl_a_data, + input tl_a_corrupt, + + // TileLink D channel + input tl_d_ready, + output tl_d_valid, + output [2:0] tl_d_opcode, + output [1:0] tl_d_param, + output [2:0] tl_d_size, + output [4:0] tl_d_source, + output tl_d_sink, + output tl_d_denied, + output [63:0] tl_d_data, + output tl_d_corrupt +); + + // Internal UMI signals between tl2umi and umi_memagent + wire uhost_req_valid; + wire [CW-1:0] uhost_req_cmd; + wire [AW-1:0] uhost_req_dstaddr; + wire [AW-1:0] uhost_req_srcaddr; + wire [DW-1:0] uhost_req_data; + wire uhost_req_ready; + + wire uhost_resp_valid; + wire [CW-1:0] uhost_resp_cmd; + wire [AW-1:0] uhost_resp_dstaddr; + wire [AW-1:0] uhost_resp_srcaddr; + wire [DW-1:0] uhost_resp_data; + wire uhost_resp_ready; + + // TL2UMI adapter + tl2umi #( + .CW (CW), + .AW (AW), + .DW (DW), + .IDW (IDW) + ) dut ( + .clk (clk), + .nreset (nreset), + .globalid (globalid), + + .tl_a_ready (tl_a_ready), + .tl_a_valid (tl_a_valid), + .tl_a_opcode (tl_a_opcode), + .tl_a_param (tl_a_param), + .tl_a_size (tl_a_size), + .tl_a_source (tl_a_source), + .tl_a_address (tl_a_address), + .tl_a_mask (tl_a_mask), + .tl_a_data (tl_a_data), + .tl_a_corrupt (tl_a_corrupt), + + .tl_d_ready (tl_d_ready), + .tl_d_valid (tl_d_valid), + .tl_d_opcode (tl_d_opcode), + .tl_d_param (tl_d_param), + .tl_d_size (tl_d_size), + .tl_d_source (tl_d_source), + .tl_d_sink (tl_d_sink), + .tl_d_denied (tl_d_denied), + .tl_d_data (tl_d_data), + .tl_d_corrupt (tl_d_corrupt), + + .uhost_req_valid (uhost_req_valid), + .uhost_req_cmd (uhost_req_cmd), + .uhost_req_dstaddr (uhost_req_dstaddr), + .uhost_req_srcaddr (uhost_req_srcaddr), + .uhost_req_data (uhost_req_data), + .uhost_req_ready (uhost_req_ready), + + .uhost_resp_valid (uhost_resp_valid), + .uhost_resp_cmd (uhost_resp_cmd), + .uhost_resp_dstaddr (uhost_resp_dstaddr), + .uhost_resp_srcaddr (uhost_resp_srcaddr), + .uhost_resp_data (uhost_resp_data), + .uhost_resp_ready (uhost_resp_ready) + ); + + // UMI memory agent (handles UMI requests and generates responses) + umi_memagent #( + .DW (DW), + .AW (AW), + .CW (CW), + .RAMDEPTH (RAMDEPTH) + ) mem_agent ( + .clk (clk), + .nreset (nreset), + .sram_ctrl (8'b0), + + .udev_req_valid (uhost_req_valid), + .udev_req_cmd (uhost_req_cmd), + .udev_req_dstaddr (uhost_req_dstaddr), + .udev_req_srcaddr (uhost_req_srcaddr), + .udev_req_data (uhost_req_data), + .udev_req_ready (uhost_req_ready), + + .udev_resp_valid (uhost_resp_valid), + .udev_resp_cmd (uhost_resp_cmd), + .udev_resp_dstaddr (uhost_resp_dstaddr), + .udev_resp_srcaddr (uhost_resp_srcaddr), + .udev_resp_data (uhost_resp_data), + .udev_resp_ready (uhost_resp_ready) + ); + +endmodule diff --git a/tests/adapters/tl2umi/tl_driver.py b/tests/adapters/tl2umi/tl_driver.py new file mode 100644 index 00000000..690e9f89 --- /dev/null +++ b/tests/adapters/tl2umi/tl_driver.py @@ -0,0 +1,185 @@ +from dataclasses import dataclass +from enum import IntEnum +from typing import Any, Optional + +from cocotb.triggers import RisingEdge +from cocotb.handle import SimHandleBase + +from cocotb_bus.drivers import ValidatedBusDriver + + +class TLOpcode(IntEnum): + """TileLink A-channel opcodes""" + PutFullData = 0 + PutPartialData = 1 + ArithmeticData = 2 + LogicalData = 3 + Get = 4 + Intent = 5 + + +class TLArithParam(IntEnum): + """TileLink arithmetic operation parameters""" + MIN = 0 + MAX = 1 + MINU = 2 + MAXU = 3 + ADD = 4 + + +class TLLogicParam(IntEnum): + """TileLink logical operation parameters""" + XOR = 0 + OR = 1 + AND = 2 + SWAP = 3 + + +@dataclass +class TLTransaction: + """TileLink A-channel transaction""" + opcode: int + address: int + size: int # log2(bytes) + mask: int = 0xFF + data: int = 0 + source: int = 0 + param: int = 0 + + @classmethod + def get(cls, address: int, size: int, source: int = 0) -> "TLTransaction": + """Create a Get (read) transaction""" + mask = (1 << (1 << size)) - 1 # All bytes valid for size + return cls( + opcode=TLOpcode.Get, + address=address, + size=size, + mask=mask, + source=source, + ) + + @classmethod + def put_full(cls, address: int, size: int, data: int, source: int = 0) -> "TLTransaction": + """Create a PutFullData (write) transaction""" + mask = (1 << (1 << size)) - 1 + return cls( + opcode=TLOpcode.PutFullData, + address=address, + size=size, + mask=mask, + data=data, + source=source, + ) + + @classmethod + def put_partial(cls, address: int, size: int, data: int, mask: int, source: int = 0) -> "TLTransaction": + """Create a PutPartialData (masked write) transaction""" + return cls( + opcode=TLOpcode.PutPartialData, + address=address, + size=size, + mask=mask, + data=data, + source=source, + ) + + @classmethod + def atomic_arith(cls, address: int, size: int, data: int, param: TLArithParam, source: int = 0) -> "TLTransaction": + """Create an ArithmeticData atomic transaction""" + mask = (1 << (1 << size)) - 1 + return cls( + opcode=TLOpcode.ArithmeticData, + address=address, + size=size, + mask=mask, + data=data, + param=int(param), + source=source, + ) + + @classmethod + def atomic_logic(cls, address: int, size: int, data: int, param: TLLogicParam, source: int = 0) -> "TLTransaction": + """Create a LogicalData atomic transaction""" + mask = (1 << (1 << size)) - 1 + return cls( + opcode=TLOpcode.LogicalData, + address=address, + size=size, + mask=mask, + data=data, + param=int(param), + source=source, + ) + + +class TLDriver(ValidatedBusDriver): + _signals = [ + "valid", + "ready", + "opcode", + "param", + "size", + "source", + "address", + "mask", + "data", + ] + + def __init__( + self, + entity: SimHandleBase, + name: str, + clock: SimHandleBase, + *, + config: Optional[dict] = None, + **kwargs: Any, + ): + ValidatedBusDriver.__init__(self, entity, name, clock, **kwargs) + + self.clock = clock + self.bus.valid.value = 0 + + async def _driver_send(self, transaction: TLTransaction, sync: bool = True) -> None: + """Drive a TileLink A-channel transaction. + + Args: + transaction: The TLTransaction to send. + sync: Synchronize the transfer by waiting for a rising edge. + """ + clk_re = RisingEdge(self.clock) + + if sync: + await clk_re + + # Insert a gap where valid is low + if not self.on: + self.bus.valid.value = 0 + for _ in range(self.off): + await clk_re + + # Grab the next set of on/off values + self._next_valids() + + # Consume a valid cycle + if self.on is not True and self.on: + self.on -= 1 + + def ready() -> bool: + return bool(self.bus.ready.value) + + # Drive signals and wait for ready + while True: + self.bus.valid.value = 1 + self.bus.opcode.value = transaction.opcode + self.bus.param.value = transaction.param + self.bus.size.value = transaction.size + self.bus.source.value = transaction.source + self.bus.address.value = transaction.address + self.bus.mask.value = transaction.mask + self.bus.data.value = transaction.data + + await clk_re + if ready(): + break + + self.bus.valid.value = 0 diff --git a/tests/adapters/tl2umi/tl_monitor.py b/tests/adapters/tl2umi/tl_monitor.py new file mode 100644 index 00000000..99632464 --- /dev/null +++ b/tests/adapters/tl2umi/tl_monitor.py @@ -0,0 +1,87 @@ +from dataclasses import dataclass +from enum import IntEnum +from typing import Any + +from cocotb.triggers import RisingEdge +from cocotb.handle import SimHandleBase + +from cocotb_bus.monitors import BusMonitor + + +class TLDOpcode(IntEnum): + """TileLink D-channel opcodes""" + AccessAck = 0 + AccessAckData = 1 + HintAck = 2 + + +@dataclass +class TLDResponse: + """TileLink D-channel response""" + opcode: int + param: int + size: int + source: int + sink: int + denied: bool + data: int + corrupt: bool + + def is_read_response(self) -> bool: + return self.opcode == TLDOpcode.AccessAckData + + def is_write_response(self) -> bool: + return self.opcode == TLDOpcode.AccessAck + + +class TLMonitor(BusMonitor): + """TileLink D-channel monitor""" + + _signals = [ + "valid", + "ready", + "opcode", + "param", + "size", + "source", + "sink", + "denied", + "data", + "corrupt", + ] + + _optional_signals = [] + + def __init__( + self, + entity: SimHandleBase, + name: str, + clock: SimHandleBase, + **kwargs: Any, + ): + BusMonitor.__init__(self, entity, name, clock, **kwargs) + self.clock = clock + + async def _monitor_recv(self) -> None: + """Monitor D-channel for responses""" + clk_re = RisingEdge(self.clock) + + while True: + await clk_re + + if self.in_reset: + continue + + # Check for valid handshake + if bool(self.bus.valid.value) and bool(self.bus.ready.value): + response = TLDResponse( + opcode=int(self.bus.opcode.value), + param=int(self.bus.param.value), + size=int(self.bus.size.value), + source=int(self.bus.source.value), + sink=int(self.bus.sink.value), + denied=bool(self.bus.denied.value), + data=int(self.bus.data.value) if self.bus.data.value.is_resolvable else 0, + corrupt=bool(self.bus.corrupt.value), + ) + self._recv(response) diff --git a/tests/adapters/umi2apb/env.py b/tests/adapters/umi2apb/env.py new file mode 100644 index 00000000..24fdffa9 --- /dev/null +++ b/tests/adapters/umi2apb/env.py @@ -0,0 +1,111 @@ +# Owns the driver, monitor, and scoreboard for UMI to APB adapter tests, +# and provides common functionality for the tests. + +import math + +from cocotb.clock import Clock +from cocotb.triggers import ClockCycles, Timer + +from cocotb_bus.scoreboard import Scoreboard +from cocotbext.apb import ApbBus, ApbSlave, MemoryRegion + +from cocotbext.umi.drivers.sumi_driver import SumiDriver +from cocotbext.umi.monitors.sumi_monitor import SumiMonitor +from cocotbext.umi.sumi import SumiTransaction, SumiCmdType, SumiCmd + + +async def do_reset(reset, time_ns, active_level=False): + """Perform an async reset""" + reset.value = not active_level + await Timer(1, unit="step") + reset.value = active_level + await Timer(time_ns, "ns") + reset.value = not active_level + await Timer(1, unit="step") + + +# Creates the umi2apb test environment +class UMI2APBEnv: + def __init__(self, dut, clk_period_ns=10, mem_size=2**16): + self.dut = dut + self.clk_period_ns = clk_period_ns + self.mem_size = mem_size + + self.data_width = int(dut.RW.value) # default of 64 + self.addr_width = int(dut.AW.value) # default of 64 + self.data_size = self.data_width // 8 + self.umi_size = int(math.log2(self.data_size)) + + self.expected_responses = [] + + self.clk = dut.apb_pclk + self.nreset = dut.apb_nreset + + self._build() + + def _build(self): + dut = self.dut + + # Instantiates UMI driver + self.sumi_driver = SumiDriver( + entity=dut, + name="udev_req", + clock=self.clk, + bus_separator="_" + ) + + # Instantiates APB slave and memory region + apb_bus = ApbBus.from_prefix(dut, "apb") + self.apb_slave = ApbSlave(apb_bus, self.clk, self.nreset) + self.region = MemoryRegion(self.mem_size) + self.apb_slave.target = self.region + + # Creates UMI monitor (for responses) + self.sumi_monitor = SumiMonitor( + entity=dut, + name="udev_resp", + clock=self.clk, + bus_separator="_" + ) + + # Creates scoreboard + self.scoreboard = Scoreboard(dut, fail_immediately=True) + self.scoreboard.add_interface(monitor=self.sumi_monitor, expected_output=self.expected_responses) + + # Prerequisites for starting tests + async def start(self): + Clock(self.clk, self.clk_period_ns, unit="ns").start() + await do_reset(self.nreset, self.clk_period_ns) + + # Waits for umi responses + async def wait_for_responses(self, max_cycles): + cycles = 0 + while self.expected_responses: + await ClockCycles(self.clk, 1) + cycles += 1 + if cycles > max_cycles: + raise TimeoutError( + f"Timeout waiting for responses " + f"({len(self.expected_responses)} remaining)" + ) + + +# Creates an ideal umi write response +def create_expected_write_response(write_txn, data_size, addr_width=64): + req_da = int(write_txn.da.value) if hasattr(write_txn.da, "value") else int(write_txn.da) + req_sa = int(write_txn.sa.value) if hasattr(write_txn.sa, "value") else int(write_txn.sa) + + req_size = int(write_txn.cmd.size) + req_len = int(write_txn.cmd.len) + + return SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_RESP_WRITE), + size=req_size, + len=req_len + ), + da=req_sa, + sa=req_da, + data=bytearray(data_size), # Expect no data in write response + addr_width=addr_width + ) diff --git a/tests/adapters/umi2apb/test_backpressure.py b/tests/adapters/umi2apb/test_backpressure.py new file mode 100644 index 00000000..68ee5d3f --- /dev/null +++ b/tests/adapters/umi2apb/test_backpressure.py @@ -0,0 +1,79 @@ +import math +import cocotb + +from cocotb.handle import SimHandleBase +from cocotb.triggers import ClockCycles + +from cocotbext.umi.sumi import SumiTransaction, SumiCmdType, SumiCmd +from env import UMI2APBEnv, create_expected_write_response + + +@cocotb.test(timeout_time=50, timeout_unit="ms") +async def test_backpressure(dut: SimHandleBase): + """ + Test backpressure : + 1. Disable ready + 2. Send transactions + 3. Verify responses held + 4. Enable ready and verify responses arrive correctly + """ + + env = UMI2APBEnv(dut) + await env.start() + + umi_size = int(math.log2(env.data_size)) + + print("=== Backpressure Test ===") + + # Disable response ready + dut.udev_resp_ready.value = 0 + + # Send write + test_addr = 0x100 + test_data = 0xDEADBEEF + + write_txn = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_REQ_WRITE), + size=umi_size, + len=0, + ), + da=test_addr, + sa=0x0, + data=test_data.to_bytes(env.data_size, byteorder="little"), + ) + + env.expected_responses.append( + create_expected_write_response( + write_txn, + data_size=env.data_size, + addr_width=env.addr_width, + ) + ) + + env.sumi_driver.append(write_txn) + print(f"Sent write: addr=0x{test_addr:x}, data=0x{test_data:08x}") + + await ClockCycles(env.clk, 20) + + # Verify response held correctly + assert dut.udev_resp_valid.value == 1, "Response should be valid" + assert len(env.expected_responses) == 1, "Response should not have been consumed yet" + print("Response held with backpressure") + + # enable response + dut.udev_resp_ready.value = 1 + print("Re-enabled udev_resp_ready") + + await env.wait_for_responses(max_cycles=10) + + # Verify mem + mem_data = await env.region.read(test_addr, env.data_size) + actual_data = int.from_bytes(mem_data, byteorder="little") + assert actual_data == test_data, ( + f"Write data mismatch: expected 0x{test_data:x}, got 0x{actual_data:x}" + ) + print(f"Memory verified: 0x{actual_data:08x}") + + print("\n=== Backpressure Test PASSED ===") + raise env.scoreboard.result diff --git a/tests/adapters/umi2apb/test_basic_WR.py b/tests/adapters/umi2apb/test_basic_WR.py new file mode 100644 index 00000000..46089cd8 --- /dev/null +++ b/tests/adapters/umi2apb/test_basic_WR.py @@ -0,0 +1,98 @@ +import cocotb + +from cocotb.handle import SimHandleBase + +from cocotbext.umi.sumi import SumiTransaction, SumiCmdType, SumiCmd +from env import UMI2APBEnv, create_expected_write_response + + +@cocotb.test(timeout_time=50, timeout_unit="ms") +async def test_basic_WR(dut: SimHandleBase): + """ + Basic sanity test: + 1. Single UMI write + 2. Verify APB memory + 3. Single UMI read + 4. Verify response payload + """ + + # Grab shared test environment + env = UMI2APBEnv(dut) + await env.start() + dut.udev_resp_ready.value = 1 + + umi_size = env.umi_size + test_addr = 0x100 + test_data = 0xDEADBEEF + + print("=== Basic Write Test ===") + + # WRITE transaction + write_txn = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_REQ_WRITE), + size=umi_size, + len=0, + ), + da=test_addr, + sa=0x0, + data=test_data.to_bytes(env.data_size, byteorder="little"), + ) + + env.expected_responses.append( + create_expected_write_response( + write_txn, + data_size=env.data_size, + addr_width=env.addr_width, + ) + ) + + env.sumi_driver.append(write_txn) + + # Wait for write response + await env.wait_for_responses(max_cycles=100) + + # Verify APB memory contents + mem_data = await env.region.read(test_addr, env.data_size) + assert int.from_bytes(mem_data, byteorder="little") == test_data, ( + f"Write failed: expected 0x{test_data:x}, " + f"got 0x{int.from_bytes(mem_data, 'little'):x}" + ) + + print(f"Data written to memory: 0x{test_data:08x}") + print(" UMI write response verified by scoreboard") + + print("\n=== Basic Read Test ===") + + # READ transaction + read_txn = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_REQ_READ), + size=umi_size, + len=0, + ), + da=test_addr, + sa=0x0, + data=bytearray(env.data_size), + ) + + expected_read_resp = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_RESP_READ), + size=umi_size, + len=0, + ), + da=0x0, + sa=test_addr, + data=test_data.to_bytes(env.data_size, byteorder="little"), + addr_width=env.addr_width, + ) + + env.expected_responses.append(expected_read_resp) + env.sumi_driver.append(read_txn) + + # Wait for response + await env.wait_for_responses(max_cycles=100) + + print("Read response verified by scoreboard") + raise env.scoreboard.result diff --git a/tests/adapters/umi2apb/test_full_throughput.py b/tests/adapters/umi2apb/test_full_throughput.py new file mode 100644 index 00000000..7ec5f09b --- /dev/null +++ b/tests/adapters/umi2apb/test_full_throughput.py @@ -0,0 +1,88 @@ +import cocotb + +from env import UMI2APBEnv +from cocotbext.umi.sumi import SumiTransaction, SumiCmdType, SumiCmd + + +@cocotb.test(timeout_time=50, timeout_unit="ms") +async def test_full_throughput(dut): + """ + Back-to-back full-throughput tests alternating read/write transactions. + + """ + + env = UMI2APBEnv(dut) + await env.start() + dut.udev_resp_ready.value = 1 + + data_size = env.data_size + addr_width = env.addr_width + umi_size = env.umi_size + + num_transactions = 100 + + print("=== Back-to-Back Full Throughput Test ===") + + for i in range(num_transactions): + txn_size = i % (umi_size + 1) + txn_bytes = 1 << txn_size + addr = i * data_size + + is_read = (i % 2) == 0 + + if is_read: + txn = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_REQ_READ), + size=txn_size, + len=0, + ), + da=addr, + sa=0x0, + data=bytearray(txn_bytes), + ) + + expected_resp = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_RESP_READ), + size=txn_size, + len=0, + ), + da=0x0, + sa=addr, + data=bytearray(txn_bytes), + + addr_width=addr_width, + ) + + else: + data = bytes([i & 0xFF] * txn_bytes) + txn = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_REQ_WRITE), + size=txn_size, + len=0, + ), + da=addr, + sa=0x0, + data=data, + ) + + expected_resp = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_RESP_WRITE), + size=txn_size, + len=0, + ), + da=0x0, + sa=addr, + data=bytearray(txn_bytes), + addr_width=addr_width, + ) + + env.expected_responses.append(expected_resp) + env.sumi_driver.append(txn) + await env.wait_for_responses(max_cycles=num_transactions * 50) + + print(f"All {num_transactions} back-to-back transactions completed successfully!") + raise env.scoreboard.result diff --git a/tests/adapters/umi2apb/test_posted_write.py b/tests/adapters/umi2apb/test_posted_write.py new file mode 100644 index 00000000..82e50275 --- /dev/null +++ b/tests/adapters/umi2apb/test_posted_write.py @@ -0,0 +1,75 @@ +import math +import cocotb + +from cocotb.handle import SimHandleBase +from cocotb.triggers import ClockCycles, RisingEdge + +from cocotbext.umi.sumi import SumiTransaction, SumiCmdType, SumiCmd +from env import UMI2APBEnv + + +async def verify_no_resp_valid(dut, clk, cycles): + """Verify that udev_resp_valid never goes high for the given number of cycles.""" + for _ in range(cycles): + await RisingEdge(clk) + assert not dut.udev_resp_valid.value, ( + "Unexpected response on udev_resp channel during posted write" + ) + + +@cocotb.test(timeout_time=50, timeout_unit="ms") +async def test_posted_write(dut: SimHandleBase): + """ + Test posted writes (no UMI response): + 1. Send multiple writes to different addresses + 2. Verify no responses occur on udev_resp channel + 3. Verify memory contents + """ + + env = UMI2APBEnv(dut) + await env.start() + + umi_size = int(math.log2(env.data_size)) + + print("=== Posted Write Test ===") + + # Test data + test_data = { + 0x100: 0xDEADBEEF, + 0x200: 0xCAFEBABE, + 0x300: 0x12345678, + 0x400: 0xABCD1234, + } + + # Send writes + for addr, data in test_data.items(): + posted_txn = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_REQ_POSTED), + size=umi_size, + len=0, + ), + da=addr, + sa=0x0, + data=data.to_bytes(env.data_size, byteorder="little"), + ) + env.sumi_driver.append(posted_txn) + print(f"Sent posted write: addr=0x{addr:x}, data=0x{data:08x}") + + # Wait for transactions to complete and verify no responses occur + print("\n=== Verifying No Responses on udev_resp Channel ===") + await verify_no_resp_valid(dut, env.clk, 50) + print("Confirmed: No responses received (as expected for posted writes)") + + # Verify memory + print("\n=== Verifying Memory Contents ===") + for addr, expected_data in test_data.items(): + mem_data = await env.region.read(addr, env.data_size) + actual_data = int.from_bytes(mem_data, byteorder="little") + assert actual_data == expected_data, ( + f"Posted write failed at 0x{addr:x}: " + f"expected 0x{expected_data:x}, got 0x{actual_data:x}" + ) + print(f"Verified addr=0x{addr:x}: 0x{actual_data:08x}") + + print("\n=== Posted Write Test PASSED ===") diff --git a/tests/adapters/umi2apb/test_random_stimulus.py b/tests/adapters/umi2apb/test_random_stimulus.py new file mode 100644 index 00000000..54a6333c --- /dev/null +++ b/tests/adapters/umi2apb/test_random_stimulus.py @@ -0,0 +1,134 @@ +import random +import cocotb +from random import randint, randbytes +from cocotb.triggers import ClockCycles + +from env import UMI2APBEnv, create_expected_write_response +from cocotbext.umi.sumi import SumiTransaction, SumiCmdType, SumiCmd + + +async def random_ready_toggle(dut, clk, stop_event): + """Background task that randomly toggles udev_resp_ready for backpressure testing.""" + while not stop_event["stop"]: + # Random number of cycles to hold current ready state + cycles = randint(1, 10) + await ClockCycles(clk, cycles) + # Toggle ready with 50% probability + if random.choice([True, False]): + dut.udev_resp_ready.value = 1 - int(dut.udev_resp_ready.value) + + +@cocotb.test(timeout_time=500, timeout_unit="ms") +async def test_random_stimulus(dut): + """ + Randomized read/write stimulus. + + - Aligned addresses + - Full-width accesses + - Randomized ready/valid signaling (backpressure) + - Memory model checked at end + """ + # Grab shared test environment + env = UMI2APBEnv(dut) + await env.start() + dut.udev_resp_ready.value = 1 + + data_size = env.data_size + addr_width = env.addr_width + mem_size = env.mem_size + umi_size = env.umi_size + + num_random_transactions = 512 + + print(f"=== Randomized Test: {num_random_transactions} transactions ===") + + # Start background task for random ready toggling + stop_event = {"stop": False} + cocotb.start_soon(random_ready_toggle(dut, env.clk, stop_event)) + + # Ideal memory model for writes/reads + memory_model = {} + + for i in range(num_random_transactions): + txn_bytes = env.data_size + max_addr = (mem_size - txn_bytes) // txn_bytes + + # Randomized address and command type + addr = randint(0, max_addr) * txn_bytes + is_read = random.choice([True, False]) + + if is_read: + txn = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_REQ_READ), + size=umi_size, + len=0, + ), + da=addr, + sa=0x0, + data=bytearray(txn_bytes), + ) + + expected_data = memory_model.get(addr, bytearray(txn_bytes)) + expected_resp = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_RESP_READ), + size=umi_size, + len=0, + ), + da=0x0, + sa=addr, + data=expected_data, + addr_width=addr_width, + ) + + env.expected_responses.append(expected_resp) + + else: + data = randbytes(txn_bytes) + memory_model[addr] = data + + txn = SumiTransaction( + cmd=SumiCmd.from_fields( + cmd_type=int(SumiCmdType.UMI_REQ_WRITE), + size=umi_size, + len=0, + ), + da=addr, + sa=0x0, + data=data, + ) + + env.expected_responses.append( + create_expected_write_response(txn, txn_bytes, addr_width) + ) + + await env.sumi_driver.send(txn) + + # Wait for response before sending next transaction (ordering required for scoreboard) + await env.wait_for_responses(max_cycles=100) + + if (i + 1) % 100 == 0: + print(f" Completed {i+1}/{num_random_transactions} transactions...") + + # Stop the random ready toggle + stop_event["stop"] = True + dut.udev_resp_ready.value = 1 # Ensure ready high for memory verification + + # Memory verification + num_verified = 0 + for addr, expected_data in memory_model.items(): + mem_data = await env.region.read(addr, data_size) + assert mem_data == expected_data, ( + f"Memory mismatch at 0x{addr:x}: " + f"expected {expected_data.hex()}, got {mem_data.hex()}" + ) + num_verified += 1 + + print("\n=== Test Statistics ===") + print(f" Total transactions: {num_random_transactions}") + print(f" Unique addresses written: {len(memory_model)}") + print(f" Memory locations verified: {num_verified}") + print(" All transactions completed") + + raise env.scoreboard.result diff --git a/tests/adapters/umi2apb/test_umi2apb_run.py b/tests/adapters/umi2apb/test_umi2apb_run.py new file mode 100644 index 00000000..f704db6a --- /dev/null +++ b/tests/adapters/umi2apb/test_umi2apb_run.py @@ -0,0 +1,76 @@ +import pytest + +from siliconcompiler import Sim, Design +from siliconcompiler.flows.dvflow import DVFlow +from siliconcompiler.tools.verilator.cocotb_compile import CocotbCompileTask as VerilatorCompileTask +from siliconcompiler.tools.verilator.cocotb_exec import CocotbExecTask as VerilatorCocotbExecTask + +from umi.adapters import UMI2APB + + +class UMI2APBTestbench(Design): + """UMI2APB testbench for cocotb testing""" + + def __init__(self, aw=64, dw=256): + super().__init__() + + self.set_name("tb_umi2apb") + self.set_dataroot("umi2apb", __file__) + + with self.active_dataroot("umi2apb"): + with self.active_fileset("testbench.cocotb"): + self.set_topmodule("umi2apb") + # Add test files + self.add_file("test_basic_WR.py", filetype="python") + self.add_file("test_backpressure.py", filetype="python") + self.add_file("test_full_throughput.py", filetype="python") + self.add_file("test_posted_write.py", filetype="python") + self.add_file("test_random_stimulus.py", filetype="python") + # Add helper Python module (populates PYTHONPATH via DVFlow) + self.add_file("env.py", filetype="python") + # Add RTL dependency + self.add_depfileset(UMI2APB(), "rtl") + + # Store parameters + self.aw = aw + self.dw = dw + + +def run_umi2apb(simulator="verilator", waves=True, aw=64, dw=256, seed=None): + # Create project + project = Sim() + project.set_design(UMI2APBTestbench(aw=aw, dw=dw)) + project.add_fileset("testbench.cocotb") + + # Set the cocotb design verification flow + project.set_flow(DVFlow(tool=f"{simulator}-cocotb")) + + # Configure compilation + compile_task = VerilatorCompileTask.find_task(project) + compile_task.set_verilator_trace(waves) + compile_task.add_parameter("AW", "int", "UMI address width", defvalue=aw) + compile_task.add_parameter("DW", "int", "UMI data width", defvalue=dw) + + # Run the simulation + project.run() + project.summary() + + # Check for failures + results = project.find_result( + step='simulate', + index='0', + directory="outputs", + filename="results.xml" + ) + if results: + print(f"\nCocotb results file: {results}") + + return project + + +@pytest.mark.sim +@pytest.mark.parametrize("simulator", ["verilator"]) +@pytest.mark.parametrize("aw", [32, 64]) +@pytest.mark.parametrize("dw", [64, 128]) +def test_umi2apb(simulator, aw, dw): + run_umi2apb(simulator, aw=aw, dw=dw) diff --git a/tests/cocotb_utils.py b/tests/cocotb_utils.py new file mode 100644 index 00000000..47a32036 --- /dev/null +++ b/tests/cocotb_utils.py @@ -0,0 +1,111 @@ +import os +from pathlib import Path +from typing import List, Tuple, Optional, Mapping, Union + +from siliconcompiler import Sim + +from cocotb.triggers import Timer +from cocotb.handle import SimHandleBase + +from cocotb_tools.runner import get_runner, VerilatorControlFile +from cocotb_tools.check_results import get_results + + +async def do_reset( + reset: SimHandleBase, + time_ns: int, + active_level: bool = False): + """Perform a async reset""" + reset.value = not active_level + await Timer(1, unit="step") + reset.value = active_level + await Timer(time_ns, "ns") + reset.value = not active_level + await Timer(1, unit="step") + +def run_cocotb( + project: Sim, + test_module_name: str, + output_dir_name: Optional[str] = None, + simulator_name: str = "icarus", + build_args: Optional[List] = None, + timescale: Optional[Tuple[str, str]] = None, + parameters: Optional[Mapping[str, object]] = None, + seed: Optional[Union[str, int]] = None, + waves: bool = True): + """Launch cocotb given a SC Project""" + + if parameters is None: + parameters = {} + + if output_dir_name is None: + output_dir_name = test_module_name + + pytest_current_test = os.getenv("PYTEST_CURRENT_TEST", None) + + rootpath = Path(__file__).resolve().parent.parent + top_level_dir = rootpath + build_dir = rootpath / "build" / output_dir_name + test_dir = None + + results_xml = None + if not pytest_current_test: + results_xml = build_dir / "results.xml" + test_dir = top_level_dir + + # Get top level module name + top_lvl_module_name = None + main_filesets = project.get("option", "fileset") + if main_filesets and len(main_filesets) != 0: + main_fileset = main_filesets[0] + top_lvl_module_name = project.design.get_topmodule( + fileset=main_fileset + ) + + filesets = project.get_filesets() + idirs = [] + defines = [] + for lib, fileset in filesets: + idirs.extend(lib.find_files("fileset", fileset, "idir")) + defines.extend(lib.get("fileset", fileset, "define")) + + sources = [] + for lib, fileset in filesets: + for value in lib.get_file(fileset=fileset, filetype="systemverilog"): + sources.append(value) + for lib, fileset in filesets: + for value in lib.get_file(fileset=fileset, filetype="verilog"): + sources.append(value) + + vlt_files = [] + if simulator_name == "verilator": + for lib, fileset in filesets: + for value in lib.get_file(fileset=fileset, filetype="verilatorctrlfile"): + vlt_files.append(VerilatorControlFile(value)) + + # Build HDL in chosen simulator + runner = get_runner(simulator_name) + runner.build( + sources=vlt_files + sources, + includes=idirs, + hdl_toplevel=top_lvl_module_name, + build_args=build_args, + waves=waves, + timescale=timescale, + build_dir=build_dir, + parameters=parameters + ) + + # Run test + _, tests_failed = get_results(runner.test( + hdl_toplevel=top_lvl_module_name, + test_module=test_module_name, + test_dir=test_dir, + test_args=build_args, + results_xml=results_xml, + build_dir=build_dir, + seed=seed, + waves=waves + )) + + return tests_failed diff --git a/umi/adapters/tl2umi/rtl/tl2umi.v b/umi/adapters/tl2umi/rtl/tl2umi.v index 5a47f5ca..921d9de5 100644 --- a/umi/adapters/tl2umi/rtl/tl2umi.v +++ b/umi/adapters/tl2umi/rtl/tl2umi.v @@ -595,8 +595,8 @@ module tl2umi #( uhost_req_packet_cmd_size <= 'b0; uhost_req_packet_cmd_len <= (1 << tl_a_size) - 1; uhost_req_packet_cmd_eom <= 1'b1; - uhost_req_packet_dstaddr <= ml_tx_addr; - uhost_req_packet_srcaddr <= local_address; + uhost_req_packet_dstaddr <= ml_tx_addr[AW-1:0]; + uhost_req_packet_srcaddr <= local_address[AW-1:0]; uhost_req_packet_valid_r <= 1'b1; ml_tx_non_zero_mask_r <= ml_tx_non_zero_mask; get_ack_req <= 1'b1; @@ -616,8 +616,8 @@ module tl2umi #( uhost_req_packet_cmd_opcode <= UMI_REQ_WRITE; uhost_req_packet_cmd_size <= 'b0; uhost_req_packet_cmd_len <= ml_tx_len; - uhost_req_packet_dstaddr <= ml_tx_addr; - uhost_req_packet_srcaddr <= local_address; + uhost_req_packet_dstaddr <= ml_tx_addr[AW-1:0]; + uhost_req_packet_srcaddr <= local_address[AW-1:0]; uhost_req_packet_valid_r <= 1'b1; ml_tx_non_zero_mask_r <= ml_tx_non_zero_mask; uhost_req_packet_data[63:0] <= ml_tx_data; @@ -630,8 +630,8 @@ module tl2umi #( uhost_req_packet_cmd_size <= tl_a_size; uhost_req_packet_cmd_len <= 8'b0; uhost_req_packet_cmd_eom <= 1'b1; - uhost_req_packet_dstaddr <= ml_tx_addr; - uhost_req_packet_srcaddr <= local_address; + uhost_req_packet_dstaddr <= ml_tx_addr[AW-1:0]; + uhost_req_packet_srcaddr <= local_address[AW-1:0]; uhost_req_packet_valid_r <= 1'b1; ml_tx_non_zero_mask_r <= ml_tx_non_zero_mask; uhost_req_packet_data[63:0] <= ml_tx_data; diff --git a/umi/adapters/umi2apb/rtl/umi2apb.v b/umi/adapters/umi2apb/rtl/umi2apb.v index a8c39a23..f7cdfcd5 100644 --- a/umi/adapters/umi2apb/rtl/umi2apb.v +++ b/umi/adapters/umi2apb/rtl/umi2apb.v @@ -137,8 +137,10 @@ module umi2apb #(parameter AW = 64, // UMI address width //# APB Mapping //############################ + /* verilator lint_off SELRANGE */ assign apb_paddr = incoming_req ? udev_req_dstaddr[RAW-1:0] : udev_req_dstaddr_r[RAW-1:0]; + /* verilator lint_on SELRANGE */ assign apb_pprot = {1'b0, cmd_prot[1:0]}; assign apb_pwrite = cmd_write | cmd_posted; diff --git a/umi/sumi/umi_fifoflex/rtl/umi_fifoflex.v b/umi/sumi/umi_fifoflex/rtl/umi_fifoflex.v index 587bbb30..9d20332e 100644 --- a/umi/sumi/umi_fifoflex/rtl/umi_fifoflex.v +++ b/umi/sumi/umi_fifoflex/rtl/umi_fifoflex.v @@ -412,8 +412,10 @@ module umi_fifoflex assign addr_mask[AW-1:0] = {{AW-$clog2(ODW/8){1'b0}},{$clog2(ODW/8){1'b1}}}; assign dstaddr_masked[AW-1:0] = latch2fifo_dstaddr[AW-1:0] & addr_mask[AW-1:0]; + /* verilator lint_off WIDTHEXPAND */ assign packet_latch_en = (cmd_len_plus_one + (dstaddr_masked[9:0] >> cmd_size)) > (ODW >> cmd_size >> 3); + /* verilator lint_on WIDTHEXPAND */ assign packet_cmd[CW-1:0] = packet_latch_valid ? packet_cmd_latch[CW-1:0] : @@ -433,9 +435,11 @@ module umi_fifoflex assign latch2in_ready = ~packet_latch_valid & umi_out_ready; // Latched command for next split + /* verilator lint_off WIDTHEXPAND */ assign latch_dstaddr = latch2fifo_dstaddr + ((ODW/8) - dstaddr_masked[AW-1:0]); assign latch_srcaddr = latch2fifo_srcaddr + ((ODW/8) - dstaddr_masked[AW-1:0]); assign latch_data = latch2fifo_data >> (ODW - (dstaddr_masked[9:0] << 3)); + /* verilator lint_on WIDTHEXPAND */ assign latch_len = cmd_len - ((ODW[10:3] - dstaddr_masked[7:0]) >> cmd_size);