diff --git a/.github/workflows/regression-tests.yml b/.github/workflows/regression-tests.yml index 9d37951dd..21d641208 100644 --- a/.github/workflows/regression-tests.yml +++ b/.github/workflows/regression-tests.yml @@ -67,7 +67,7 @@ jobs: - name: Install Python dependencies shell: bash -l {0} run: | - pip install pyvisa pyvisa-sim pytest coveralls pytest-cov cocotb==1.4.0 + pip install pyvisa pyvisa-sim pytest coveralls pytest-cov cocotb==1.5.2 - name: Install Verilator if: matrix.sim == 'verilator' diff --git a/basil/utils/sim/BasilBusDriver.py b/basil/utils/sim/BasilBusDriver.py index cf28b378a..461c30aa4 100644 --- a/basil/utils/sim/BasilBusDriver.py +++ b/basil/utils/sim/BasilBusDriver.py @@ -10,16 +10,14 @@ # pylint: disable=pointless-statement, expression-not-assigned -import cocotb from cocotb.binary import BinaryValue from cocotb.triggers import RisingEdge, Timer -from cocotb.drivers import BusDriver -from cocotb.clock import Clock +from cocotb_bus.drivers import BusDriver class BasilBusDriver(BusDriver): - """Abastract away interactions with the control bus. - """ + """Abastract away interactions with the control bus.""" + _signals = ["BUS_CLK", "BUS_RST", "BUS_DATA", "BUS_ADD", "BUS_RD", "BUS_WR"] _optional_signals = ["BUS_BYTE_ACCESS"] @@ -36,9 +34,6 @@ def __init__(self, entity): self._has_byte_acces = False - # Kick off a clock generator - cocotb.fork(Clock(self.clock, 5000).start()) - async def init(self): # Defaults self.bus.BUS_RST <= 1 @@ -57,7 +52,7 @@ async def init(self): # why this does not work? hasattr(self.bus, 'BUS_BYTE_ACCESS'): try: - getattr(self.bus, 'BUS_BYTE_ACCESS') + getattr(self.bus, "BUS_BYTE_ACCESS") except Exception: self._has_byte_acces = False else: @@ -73,8 +68,8 @@ async def read(self, address, size): await RisingEdge(self.clock) byte = 0 - while(byte <= size): - if(byte == size): + while byte <= size: + if byte == size: self.bus.BUS_RD <= 0 else: self.bus.BUS_RD <= 1 @@ -83,20 +78,20 @@ async def read(self, address, size): await RisingEdge(self.clock) - if(byte != 0): - if(self._has_byte_acces and self.bus.BUS_BYTE_ACCESS.value.integer == 0): - result.append(self.bus.BUS_DATA.value.integer & 0x000000ff) - result.append((self.bus.BUS_DATA.value.integer & 0x0000ff00) >> 8) - result.append((self.bus.BUS_DATA.value.integer & 0x00ff0000) >> 16) - result.append((self.bus.BUS_DATA.value.integer & 0xff000000) >> 24) + if byte != 0: + if self._has_byte_acces and self.bus.BUS_BYTE_ACCESS.value.integer == 0: + result.append(self.bus.BUS_DATA.value.integer & 0x000000FF) + result.append((self.bus.BUS_DATA.value.integer & 0x0000FF00) >> 8) + result.append((self.bus.BUS_DATA.value.integer & 0x00FF0000) >> 16) + result.append((self.bus.BUS_DATA.value.integer & 0xFF000000) >> 24) else: # result.append(self.bus.BUS_DATA.value[24:31].integer & 0xff) if len(self.bus.BUS_DATA.value) == 8: - result.append(self.bus.BUS_DATA.value.integer & 0xff) + result.append(self.bus.BUS_DATA.value.integer & 0xFF) else: - result.append(self.bus.BUS_DATA.value[24:31].integer & 0xff) + result.append(self.bus.BUS_DATA.value[24:31].integer & 0xFF) - if(self._has_byte_acces and self.bus.BUS_BYTE_ACCESS.value.integer == 0): + if self._has_byte_acces and self.bus.BUS_BYTE_ACCESS.value.integer == 0: byte += 4 else: byte += 1 @@ -126,7 +121,7 @@ async def write(self, address, data): await RisingEdge(self.clock) - if(self._has_byte_acces and self.bus.BUS_BYTE_ACCESS.value.integer == 0): + if self._has_byte_acces and self.bus.BUS_BYTE_ACCESS.value.integer == 0: raise NotImplementedError("BUS_BYTE_ACCESS for write to be implemented.") self.bus.BUS_DATA <= self._high_impedence diff --git a/basil/utils/sim/BasilSbusDriver.py b/basil/utils/sim/BasilSbusDriver.py index 1ad7abbd8..5111cd278 100644 --- a/basil/utils/sim/BasilSbusDriver.py +++ b/basil/utils/sim/BasilSbusDriver.py @@ -4,22 +4,18 @@ # SiLab, Institute of Physics, University of Bonn # ------------------------------------------------------------ # -# Initial version by Chris Higgs -# # pylint: disable=pointless-statement, expression-not-assigned -import cocotb from cocotb.binary import BinaryValue -from cocotb.triggers import RisingEdge -from cocotb.drivers import BusDriver -from cocotb.clock import Clock +from cocotb.triggers import RisingEdge, ReadOnly +from cocotb_bus.drivers import BusDriver class BasilSbusDriver(BusDriver): - """Abastract away interactions with the control bus. - """ + """Abastract away interactions with the control bus.""" + _signals = ["BUS_CLK", "BUS_RST", "BUS_DATA_IN", "BUS_DATA_OUT", "BUS_ADD", "BUS_RD", "BUS_WR"] _optional_signals = ["BUS_BYTE_ACCESS"] @@ -36,9 +32,6 @@ def __init__(self, entity): self._has_byte_acces = False - # Kick off a clock generator - cocotb.fork(Clock(self.clock, 5000).start()) - async def init(self): # Defaults self.bus.BUS_RST <= 1 @@ -57,7 +50,7 @@ async def init(self): # why this does not work? hasattr(self.bus, 'BUS_BYTE_ACCESS'): try: - getattr(self.bus, 'BUS_BYTE_ACCESS') + getattr(self.bus, "BUS_BYTE_ACCESS") except Exception: self._has_byte_acces = False else: @@ -66,55 +59,52 @@ async def init(self): async def read(self, address, size): result = [] - self.bus.BUS_ADD <= self._x - self.bus.BUS_DATA_IN <= self._high_impedance - self.bus.BUS_RD <= 0 - await RisingEdge(self.clock) + if size == 0: + return result + + self.bus.BUS_RD <= 1 + self.bus.BUS_ADD <= address + byte = 0 - while(byte <= size): - if(byte == size): - self.bus.BUS_RD <= 0 - else: - self.bus.BUS_RD <= 1 - self.bus.BUS_ADD <= address + byte + while byte < size: await RisingEdge(self.clock) - if(byte != 0): - if(self._has_byte_acces and self.bus.BUS_BYTE_ACCESS.value.integer == 0): - result.append(self.bus.BUS_DATA_OUT.value.integer & 0x000000ff) - result.append((self.bus.BUS_DATA_OUT.value.integer & 0x0000ff00) >> 8) - result.append((self.bus.BUS_DATA_OUT.value.integer & 0x00ff0000) >> 16) - result.append((self.bus.BUS_DATA_OUT.value.integer & 0xff000000) >> 24) - else: - # result.append(self.bus.BUS_DATA_OUT.value[24:31].integer & 0xff) - if len(self.bus.BUS_DATA_OUT.value) == 8: - result.append(self.bus.BUS_DATA_OUT.value.integer & 0xff) - else: - result.append(self.bus.BUS_DATA_OUT.value[24:31].integer & 0xff) - - if(self._has_byte_acces and self.bus.BUS_BYTE_ACCESS.value.integer == 0): + if self._has_byte_acces and self.bus.BUS_BYTE_ACCESS.value.integer == 0: byte += 4 else: byte += 1 - self.bus.BUS_ADD <= self._x - self.bus.BUS_DATA_IN <= self._high_impedance - self.bus.BUS_RD <= 0 + self.bus.BUS_ADD <= address + byte + if byte >= size: + self.bus.BUS_RD <= 0 + + await ReadOnly() + + value = self.bus.BUS_DATA_OUT.value + + if self._has_byte_acces and self.bus.BUS_BYTE_ACCESS.value.integer == 0: + result.append(value.integer & 0x000000FF) + result.append((value.integer & 0x0000FF00) >> 8) + result.append((value.integer & 0x00FF0000) >> 16) + result.append((value.integer & 0xFF000000) >> 24) + elif len(value) == 8: + result.append(value.integer & 0xFF) + else: + result.append(value[24:31].integer & 0xFF) await RisingEdge(self.clock) + self.bus.BUS_ADD <= self._x + self.bus.BUS_RD <= 0 + return result async def write(self, address, data): - self.bus.BUS_ADD <= self._x - self.bus.BUS_DATA_IN <= self._high_impedance - self.bus.BUS_WR <= 0 - await RisingEdge(self.clock) for index, byte in enumerate(data): @@ -124,11 +114,9 @@ async def write(self, address, data): await RisingEdge(self.clock) - if(self._has_byte_acces and self.bus.BUS_BYTE_ACCESS.value.integer == 0): + if self._has_byte_acces and self.bus.BUS_BYTE_ACCESS.value.integer == 0: raise NotImplementedError("BUS_BYTE_ACCESS for write to be implemented.") self.bus.BUS_ADD <= self._x self.bus.BUS_DATA_IN <= self._high_impedance self.bus.BUS_WR <= 0 - - await RisingEdge(self.clock) diff --git a/basil/utils/sim/Protocol.py b/basil/utils/sim/Protocol.py index b4caadd8c..dccaebc37 100644 --- a/basil/utils/sim/Protocol.py +++ b/basil/utils/sim/Protocol.py @@ -21,7 +21,6 @@ class ProtocolBase(object): class WriteRequest(ProtocolBase): - def __init__(self, address, data): self.address = address self.data = data @@ -31,7 +30,6 @@ def __str__(self): class ReadRequest(ProtocolBase): - def __init__(self, address, size): self.address = address self.size = size @@ -41,7 +39,6 @@ def __str__(self): class ReadResponse(ProtocolBase): - def __init__(self, data): self.data = data @@ -50,7 +47,6 @@ def __str__(self): class PickleInterface(ProtocolBase): - def __init__(self, sock): self.sock = sock @@ -79,7 +75,7 @@ def try_recv(self): def _get_next_obj(self, length): """Assumes we've already read the object length""" - data = b'' + data = b"" while len(data) < length: data += self.sock.recv(length - len(data)) diff --git a/basil/utils/sim/SiLibUsb.py b/basil/utils/sim/SiLibUsb.py index e21ce3932..1515423c6 100644 --- a/basil/utils/sim/SiLibUsb.py +++ b/basil/utils/sim/SiLibUsb.py @@ -37,9 +37,9 @@ class SiUSBDevice(object): HIGH_ADDRESS_EXTERNAL = 0x10000 + 0x10000 BASE_ADDRESS_BLOCK = 0x0001000000000000 - HIGH_ADDRESS_BLOCK = 0xffffffffffffffff + HIGH_ADDRESS_BLOCK = 0xFFFFFFFFFFFFFFFF - def __init__(self, device=None, simulation_host='localhost', simulation_port=12345): + def __init__(self, device=None, simulation_host="localhost", simulation_port=12345): self._sock = None self.simulation_host = simulation_host self.simulation_port = simulation_port @@ -70,7 +70,7 @@ def ReadExternal(self, address, size): resp = self._iface.recv() if not isinstance(resp, ReadResponse): raise ValueError("Communication error with Simulation: got %s" % repr(resp)) - return array.array('B', resp.data) + return array.array("B", resp.data) def FastBlockRead(self, size): req = ReadRequest(self.BASE_ADDRESS_BLOCK, size) @@ -78,15 +78,15 @@ def FastBlockRead(self, size): resp = self._iface.recv() if not isinstance(resp, ReadResponse): raise ValueError("Communication error with Simulation: got %s" % repr(resp)) - return array.array('B', resp.data) + return array.array("B", resp.data) def FastBlockWrite(self, data): req = WriteRequest(self.BASE_ADDRESS_BLOCK, data) self._iface.send(req) def WriteI2C(self, address, data): - print('SiUSBDevice:WriteI2C: {} {}'.format(address, data)) + print("SiUSBDevice:WriteI2C: {} {}".format(address, data)) def ReadI2C(self, address, size): - print('SiUSBDevice:ReadI2C') - return array.array('B', range(size)) + print("SiUSBDevice:ReadI2C") + return array.array("B", range(size)) diff --git a/basil/utils/sim/SiLibUsbBusDriver.py b/basil/utils/sim/SiLibUsbBusDriver.py index 3840aa3ef..b825a5b31 100644 --- a/basil/utils/sim/SiLibUsbBusDriver.py +++ b/basil/utils/sim/SiLibUsbBusDriver.py @@ -12,11 +12,9 @@ """ Abastract away interactions with the control bus """ -import cocotb from cocotb.binary import BinaryValue from cocotb.triggers import RisingEdge, ReadOnly, Timer -from cocotb.drivers import BusDriver -from cocotb.clock import Clock +from cocotb_bus.drivers import BusDriver class SiLibUsbBusDriver(BusDriver): @@ -30,7 +28,7 @@ class SiLibUsbBusDriver(BusDriver): HIGH_ADDRESS_EXTERNAL = 0x10000 + 0x10000 BASE_ADDRESS_BLOCK = 0x0001000000000000 - HIGH_ADDRESS_BLOCK = 0xffffffffffffffff + HIGH_ADDRESS_BLOCK = 0xFFFFFFFFFFFFFFFF def __init__(self, entity): BusDriver.__init__(self, entity, "", entity.FCLK_IN) @@ -43,9 +41,6 @@ def __init__(self, entity): self._x = BinaryValue(n_bits=16) self._x.binstr = "x" * 16 - # Kick off a clock generator - cocotb.fork(Clock(self.clock, 20800).start()) - async def init(self): # Defaults # self.bus.BUS_RST<= 1 @@ -64,15 +59,15 @@ async def init(self): async def read(self, address, size): result = [] - if(address >= self.BASE_ADDRESS_I2C and address < self.HIGH_ADDRESS_I2C): + if address >= self.BASE_ADDRESS_I2C and address < self.HIGH_ADDRESS_I2C: self.entity._log.warning("I2C address space supported in simulation!") for byte in range(size): result.append(0) - elif(address >= self.BASE_ADDRESS_EXTERNAL and address < self.HIGH_ADDRESS_EXTERNAL): + elif address >= self.BASE_ADDRESS_EXTERNAL and address < self.HIGH_ADDRESS_EXTERNAL: for byte in range(size): val = await self.read_external(address - self.BASE_ADDRESS_EXTERNAL + byte) result.append(val) - elif(address >= self.BASE_ADDRESS_BLOCK and address < self.HIGH_ADDRESS_BLOCK): + elif address >= self.BASE_ADDRESS_BLOCK and address < self.HIGH_ADDRESS_BLOCK: for byte in range(size): val = await self.fast_block_read() result.append(val) @@ -82,12 +77,12 @@ async def read(self, address, size): return result async def write(self, address, data): - if(address >= self.BASE_ADDRESS_I2C and address < self.HIGH_ADDRESS_I2C): + if address >= self.BASE_ADDRESS_I2C and address < self.HIGH_ADDRESS_I2C: self.entity._log.warning("I2C address space supported in simulation!") - elif(address >= self.BASE_ADDRESS_EXTERNAL and address < self.HIGH_ADDRESS_EXTERNAL): + elif address >= self.BASE_ADDRESS_EXTERNAL and address < self.HIGH_ADDRESS_EXTERNAL: for index, byte in enumerate(data): await self.write_external(address - self.BASE_ADDRESS_EXTERNAL + index, byte) - elif(address >= self.BASE_ADDRESS_BLOCK and address < self.HIGH_ADDRESS_BLOCK): + elif address >= self.BASE_ADDRESS_BLOCK and address < self.HIGH_ADDRESS_BLOCK: raise NotImplementedError("Unsupported request") # self._sidev.FastBlockWrite(data) else: diff --git a/basil/utils/sim/Test.py b/basil/utils/sim/Test.py index 86e492235..53a65ef9d 100644 --- a/basil/utils/sim/Test.py +++ b/basil/utils/sim/Test.py @@ -14,35 +14,42 @@ import yaml import cocotb -from cocotb.triggers import RisingEdge +from cocotb.triggers import Timer +from cocotb.clock import Clock from basil.utils.sim.Protocol import WriteRequest, ReadRequest, ReadResponse, PickleInterface def get_bus(): bus_name_path = os.getenv("SIMULATION_BUS", "basil.utils.sim.BasilBusDriver") - bus_name = bus_name_path.split('.')[-1] + bus_name = bus_name_path.split(".")[-1] return getattr(__import__(bus_name_path, fromlist=[bus_name]), bus_name) def import_driver(path): - name = path.split('.')[-1] + name = path.split(".")[-1] return getattr(__import__(path, fromlist=[name]), name) @cocotb.test(skip=False) -def socket_test(dut, debug=False): +async def test(dut, debug=False): """Testcase that uses a socket to drive the DUT""" - host = os.getenv("SIMULATION_HOST", 'localhost') - port = os.getenv("SIMULATION_PORT", '12345') + host = os.getenv("SIMULATION_HOST", "localhost") + port = os.getenv("SIMULATION_PORT", "12345") + bus_clk_freq = int(os.getenv("SIMULATION_BUS_CLK_PERIOD", "5000")) + bus_transaction_wait = int(os.getenv("SIMULATION_TRANSACTION_WAIT", "50000")) + bus_clock = bool(int(os.getenv("SIMULATION_BUS_CLOCK", "1"))) if debug: dut._log.setLevel(logging.DEBUG) bus = get_bus()(dut) - dut._log.info("Using bus driver : %s" % (type(bus).__name__)) + dut._log.info(f"Using bus driver : {type(bus).__name__}") + dut._log.info(f"Bus clock : {bus_clock}") + dut._log.info(f"Bus clock period : {bus_clk_freq}") + dut._log.info(f"Bus transaction wait : {bus_transaction_wait}") sim_modules = [] sim_modules_data = os.getenv("SIMULATION_MODULES", "") @@ -64,11 +71,15 @@ def socket_test(dut, debug=False): s = None raise + # Kick off a clock generator + if bus_clock: + cocotb.fork(Clock(bus.clock, bus_clk_freq).start()) + # start sim_modules for mod in sim_modules: cocotb.fork(mod.run()) - yield bus.init() + await bus.init() while True: dut._log.info("Waiting for incoming connection on %s:%d" % (host, int(port))) @@ -78,7 +89,7 @@ def socket_test(dut, debug=False): while True: # uncomment for constantly advancing clock - # yield RisingEdge(bus.clock) + # await RisingEdge(bus.clock) try: req = iface.try_recv() @@ -93,13 +104,14 @@ def socket_test(dut, debug=False): dut._log.debug("Received: %s" % str(req)) # add few clocks - for _ in range(10): - yield RisingEdge(bus.clock) + # for _ in range(10): + # await RisingEdge(bus.clock) + await Timer(bus_transaction_wait) if isinstance(req, WriteRequest): - yield bus.write(req.address, req.data) + await bus.write(req.address, req.data) elif isinstance(req, ReadRequest): - result = yield bus.read(req.address, req.size) + result = await bus.read(req.address, req.size) resp = ReadResponse(result) dut._log.debug("Send: %s" % str(resp)) iface.send(resp) @@ -107,35 +119,12 @@ def socket_test(dut, debug=False): raise NotImplementedError("Unsupported request type: %s" % str(type(req))) # add few clocks - for _ in range(10): - yield RisingEdge(bus.clock) + # for _ in range(10): + # await RisingEdge(bus.clock) + await Timer(bus_transaction_wait) - if(os.getenv("SIMULATION_END_ON_DISCONNECT")): + if os.getenv("SIMULATION_END_ON_DISCONNECT"): break s.shutdown(socket.SHUT_RDWR) s.close() - - -@cocotb.test(skip=True) -def bringup_test(dut): - """Initial test to see if simulation works""" - - bus = get_bus()(dut) - - yield bus.init() - - for _ in range(10): - yield RisingEdge(bus.clock) - - yield bus.write(0, [0xff, 0xf2, 0xf3, 0xa4]) - - for _ in range(10): - yield RisingEdge(bus.clock) - - ret = yield bus.read(0, 4) - - print('bus.read {}'.format(ret)) - - for _ in range(10): - yield RisingEdge(bus.clock) diff --git a/examples/mio_sram_test/tests/test_Sim.py b/examples/mio_sram_test/tests/test_Sim.py index d00147dcd..fc2cd8bae 100644 --- a/examples/mio_sram_test/tests/test_Sim.py +++ b/examples/mio_sram_test/tests/test_Sim.py @@ -69,10 +69,10 @@ def test_simple(self): ret = np.hstack((ret, self.chip['FIFO'].get_data())) - x = np.arange(175 * 4, dtype=np.uint8) + x = np.arange(170 * 4, dtype=np.uint8) x.dtype = np.uint32 - self.assertTrue(np.alltrue(ret == x)) + np.testing.assert_array_equal(ret, x) self.chip['FIFO'].reset() @@ -87,10 +87,10 @@ def test_simple(self): ret = np.hstack((ret, self.chip['FIFO'].get_data())) - x = np.arange(245 * 4, dtype=np.uint8) + x = np.arange(238 * 4, dtype=np.uint8) x.dtype = np.uint32 - self.assertEqual(ret.tolist(), x.tolist()) + np.testing.assert_array_equal(ret, x) def test_full(self): self.chip['CONTROL']['COUNTER_EN'] = 1 @@ -111,10 +111,10 @@ def test_full(self): ret = self.chip['FIFO'].get_data() ret = np.hstack((ret, self.chip['FIFO'].get_data())) - x = np.arange(203 * 4, dtype=np.uint8) + x = np.arange(200 * 4, dtype=np.uint8) x.dtype = np.uint32 - self.assertTrue(np.alltrue(ret == x)) + np.testing.assert_array_equal(ret, x) def test_overflow(self): self.chip['CONTROL']['COUNTER_EN'] = 1 @@ -136,7 +136,7 @@ def test_overflow(self): x = np.arange((128 + 1023) * 4, dtype=np.uint8) x.dtype = np.uint32 - self.assertTrue(np.alltrue(ret == x)) + np.testing.assert_array_equal(ret, x) self.chip['PULSE'].set_DELAY(1) self.chip['PULSE'].set_WIDTH(1) @@ -146,7 +146,7 @@ def test_overflow(self): x = np.arange((128 + 1023) * 4, (128 + 1023 + 1) * 4, dtype=np.uint8) x.dtype = np.uint32 - self.assertEqual(ret, x) + np.testing.assert_array_equal(ret, x) def test_single(self): @@ -171,7 +171,7 @@ def test_pattern(self): for _ in range(5): self.chip['CONTROL'].write() - self.assertEqual(self.chip['FIFO'].get_data().tolist(), [0xaa5555aa] * 35) + self.assertEqual(self.chip['FIFO'].get_data().tolist(), [0xaa5555aa] * 34) def test_direct(self): self.chip['CONTROL']['COUNTER_DIRECT'] = 1