diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml new file mode 100644 index 0000000..2cf7139 --- /dev/null +++ b/.github/workflows/lint.yaml @@ -0,0 +1,35 @@ +name: lint +on: + # Runs on all pushes to branches + push: + # Runs on all PRs + pull_request: + # Manual Dispatch + workflow_dispatch: + +jobs: + lint_python: + name: Lint Python Code + runs-on: ubuntu-latest + + env: + BLACK_OPTS: --line-length 136 + # E203 difference of opinion between black and flake8 + # E203 whitespace before ':' + FLAKE8_OPTS: --max-line-length 136 --ignore=E203 + + steps: + - name: Checkout + uses: actions/checkout@main + + - name: Install Linters + run: python3 -m pip install -r ./requirements_lint.txt + + - name: Ensure Black Formatting + run: black --diff --color $BLACK_OPTS . + + - name: Ensure Black Formatting + run: black --check $BLACK_OPTS . + + - name: Lint with Flake8 + run: flake8 $FLAKE8_OPTS --exclude venv . diff --git a/AhbLite3.py b/AhbLite3.py index 442a9f0..7269ae2 100644 --- a/AhbLite3.py +++ b/AhbLite3.py @@ -3,53 +3,57 @@ import cocotb from cocotb.result import TestFailure from cocotb.triggers import RisingEdge, Edge +from cocotb.decorators import coroutine -from cocotblib.misc import log2Up, BoolRandomizer, assertEquals +from .misc import log2Up, BoolRandomizer, assertEquals def AhbLite3MasterIdle(ahb): - ahb.HADDR <= 0 - ahb.HWRITE <= 0 - ahb.HSIZE <= 0 - ahb.HBURST <= 0 - ahb.HPROT <= 0 - ahb.HTRANS <= 0 - ahb.HMASTLOCK <= 0 - ahb.HWDATA <= 0 - + ahb.HADDR.value = 0 + ahb.HWRITE.value = 0 + ahb.HSIZE.value = 0 + ahb.HBURST.value = 0 + ahb.HPROT.value = 0 + ahb.HTRANS.value = 0 + ahb.HMASTLOCK.value = 0 + ahb.HWDATA.value = 0 class AhbLite3Transaction: def __init__(self): - self.HADDR = 0 - self.HWRITE = 0 - self.HSIZE = 0 - self.HBURST = 0 - self.HPROT = 0 - self.HTRANS = 0 + self.HADDR = 0 + self.HWRITE = 0 + self.HSIZE = 0 + self.HBURST = 0 + self.HPROT = 0 + self.HTRANS = 0 self.HMASTLOCK = 0 - self.HWDATA = 0 + self.HWDATA = 0 + class AhbLite3TraficGenerator: - def __init__(self,addressWidth,dataWidth): + def __init__(self, addressWidth, dataWidth): self.addressWidth = addressWidth self.dataWidth = dataWidth + def genRandomAddress(self): - return random.randint(0,(1 << self.addressWidth)-1) + return random.randint(0, (1 << self.addressWidth) - 1) def getTransactions(self): if random.random() < 0.8: trans = AhbLite3Transaction() return [trans] else: - OneKiB = 1 << 10 # this pesky 1 KiB wall a burst must not cross - hSize = random.randint(0,log2Up(self.dataWidth//8)) + OneKiB = 1 << 10 # this pesky 1 KiB wall a burst must not cross + hSize = random.randint(0, log2Up(self.dataWidth // 8)) bytesPerBeat = 1 << hSize - maxBurst = 5 if hSize == 7 else 7 # a full-width 1024 bit bus can only burst up to 8 beats for not crossing a 1 KiB boundary - burst = random.randint(0,maxBurst) + maxBurst = ( + 5 if hSize == 7 else 7 + ) # a full-width 1024 bit bus can only burst up to 8 beats for not crossing a 1 KiB boundary + burst = random.randint(0, maxBurst) write = random.random() < 0.5 - prot = random.randint(0,15) - address = self.genRandomAddress() & ~(bytesPerBeat-1) + prot = random.randint(0, 15) + address = self.genRandomAddress() & ~(bytesPerBeat - 1) incrUnspecified = burst == 1 incrFixed = burst != 1 and burst & 1 == 1 @@ -57,31 +61,31 @@ def getTransactions(self): if incrUnspecified: maxBeats = (OneKiB - (address % OneKiB)) // bytesPerBeat - burstBeats = random.randint(1,maxBeats) + burstBeats = random.randint(1, maxBeats) else: burstCase = burst >> 1 - burstBeats = [1,4,8,16][burstCase] + burstBeats = [1, 4, 8, 16][burstCase] - burstBytes = bytesPerBeat*burstBeats + burstBytes = bytesPerBeat * burstBeats while incrFixed and ((address % OneKiB) + burstBytes) > OneKiB: address = address - bytesPerBeat - addressBase = address - address % burstBytes # for wrapFixed bursts + addressBase = address - address % burstBytes # for wrapFixed bursts buffer = [] for beat in range(burstBeats): if beat > 0: busyProp = random.random() - 0.8 - for busyBeat in range(int(busyProp/0.05)): + for busyBeat in range(int(busyProp / 0.05)): trans = AhbLite3Transaction() trans.HWRITE = write trans.HSIZE = hSize trans.HBURST = burst trans.HPROT = prot trans.HADDR = address - trans.HTRANS = 1 # BUSY - trans.HWDATA = random.randint(0,(1 << self.dataWidth)-1) + trans.HTRANS = 1 # BUSY + trans.HWDATA = random.randint(0, (1 << self.dataWidth) - 1) buffer.append(trans) trans = AhbLite3Transaction() trans.HWRITE = write @@ -89,33 +93,34 @@ def getTransactions(self): trans.HBURST = burst trans.HPROT = prot trans.HADDR = address - trans.HTRANS = 2 if beat == 0 else 3 # first beat is NONSEQ, others are SEQ - trans.HWDATA = random.randint(0,(1 << self.dataWidth)-1) + trans.HTRANS = 2 if beat == 0 else 3 # first beat is NONSEQ, others are SEQ + trans.HWDATA = random.randint(0, (1 << self.dataWidth) - 1) address += bytesPerBeat if wrapFixed and (address == addressBase + burstBytes): address = addressBase buffer.append(trans) return buffer + class AhbLite3MasterDriver: - def __init__(self,ahb,transactor,clk,reset): + def __init__(self, ahb, transactor, clk, reset): self.ahb = ahb self.clk = clk self.reset = reset self.transactor = transactor - cocotb.fork(self.stim()) + cocotb.start_soon(self.stim()) - @cocotb.coroutine + @coroutine def stim(self): ahb = self.ahb - ahb.HADDR <= 0 - ahb.HWRITE <= 0 - ahb.HSIZE <= 0 - ahb.HBURST <= 0 - ahb.HPROT <= 0 - ahb.HTRANS <= 0 - ahb.HMASTLOCK <= 0 - ahb.HWDATA <= 0 + ahb.HADDR.value = 0 + ahb.HWRITE.value = 0 + ahb.HSIZE.value = 0 + ahb.HBURST.value = 0 + ahb.HPROT.value = 0 + ahb.HTRANS.value = 0 + ahb.HMASTLOCK.value = 0 + ahb.HWDATA.value = 0 HWDATAbuffer = 0 while True: for trans in self.transactor.getTransactions(): @@ -123,57 +128,60 @@ def stim(self): while int(self.ahb.HREADY) == 0: yield RisingEdge(self.clk) - ahb.HADDR <= trans.HADDR - ahb.HWRITE <= trans.HWRITE - ahb.HSIZE <= trans.HSIZE - ahb.HBURST <= trans.HBURST - ahb.HPROT <= trans.HPROT - ahb.HTRANS <= trans.HTRANS - ahb.HMASTLOCK <= trans.HMASTLOCK - ahb.HWDATA <= HWDATAbuffer + ahb.HADDR.value = trans.HADDR + ahb.HWRITE.value = trans.HWRITE + ahb.HSIZE.value = trans.HSIZE + ahb.HBURST.value = trans.HBURST + ahb.HPROT.value = trans.HPROT + ahb.HTRANS.value = trans.HTRANS + ahb.HMASTLOCK.value = trans.HMASTLOCK + ahb.HWDATA.value = HWDATAbuffer HWDATAbuffer = trans.HWDATA + class AhbLite3Terminaison: - def __init__(self,ahb,clk,reset): + def __init__(self, ahb, clk, reset): self.ahb = ahb self.clk = clk self.reset = reset self.randomHREADY = True - cocotb.fork(self.stim()) - cocotb.fork(self.combEvent()) + cocotb.start_soon(self.stim()) + cocotb.start_soon(self.combEvent()) - @cocotb.coroutine + @coroutine def stim(self): randomizer = BoolRandomizer() - self.ahb.HREADY <= 1 - self.ahb.HSEL <= 1 + self.ahb.HREADY.value = 1 + self.ahb.HSEL.value = 1 while True: yield RisingEdge(self.clk) self.randomHREADY = randomizer.get() self.doComb() - @cocotb.coroutine + @coroutine def combEvent(self): while True: yield Edge(self.ahb.HREADYOUT) self.doComb() def doComb(self): - self.ahb.HREADY <= (self.randomHREADY and (int(self.ahb.HREADYOUT) == 1)) + self.ahb.HREADY.value = self.randomHREADY and (int(self.ahb.HREADYOUT) == 1) class AhbLite3MasterReadChecker: - def __init__(self,ahb,buffer,clk,reset): + def __init__(self, ahb, buffer, clk, reset): self.ahb = ahb self.clk = clk self.reset = reset self.buffer = buffer self.counter = 0 - cocotb.fork(self.stim()) + cocotb.start_soon(self.stim()) - @cocotb.coroutine + @coroutine def stim(self): ahb = self.ahb + size = 0 + byteOffset = 0 readIncoming = False while True: yield RisingEdge(self.clk) @@ -183,34 +191,37 @@ def stim(self): raise TestFailure("Empty buffer ??? ") bufferData = self.buffer.get() - for i in range(byteOffset,byteOffset + size): - assertEquals((int(ahb.HRDATA) >> (i*8)) & 0xFF,(bufferData >> (i*8)) & 0xFF,"AHB master read checker faild %x " %(int(ahb.HADDR)) ) + for i in range(byteOffset, byteOffset + size): + assertEquals( + (int(ahb.HRDATA) >> (i * 8)) & 0xFF, + (bufferData >> (i * 8)) & 0xFF, + "AHB master read checker failed %x " % (int(ahb.HADDR)), + ) self.counter += 1 - # cocotb.log.info("POP " + str(self.buffer.qsize())) + # cocotb._log.info("POP " + str(self.buffer.qsize())) readIncoming = int(ahb.HTRANS) >= 2 and int(ahb.HWRITE) == 0 size = 1 << int(ahb.HSIZE) byteOffset = int(ahb.HADDR) % (len(ahb.HWDATA) // 8) - class AhbLite3SlaveMemory: - def __init__(self,ahb,base,size,clk,reset): + def __init__(self, ahb, base, size, clk, reset): self.ahb = ahb self.clk = clk self.reset = reset self.base = base self.size = size - self.ram = bytearray(b'\x00' * size) + self.ram = bytearray(b"\x00" * size) - cocotb.fork(self.stim()) - cocotb.fork(self.stimReady()) + cocotb.start_soon(self.stim()) + cocotb.start_soon(self.stimReady()) - @cocotb.coroutine + @coroutine def stimReady(self): randomizer = BoolRandomizer() - self.ahb.HREADYOUT <= 1 + self.ahb.HREADYOUT.value = 1 busy = False while True: yield RisingEdge(self.clk) @@ -221,17 +232,22 @@ def stimReady(self): if (busy or busyNew) and int(self.ahb.HREADYOUT) == 0 and int(self.ahb.HREADY) == 1: raise TestFailure("HREADYOUT == 0 but HREADY == 1 ??? " + self.ahb.HREADY._name) busy = busyNew - if (busy): - self.ahb.HREADYOUT <= randomizer.get() # make some random delay for NONSEQ and SEQ requests + if busy: + self.ahb.HREADYOUT.value = randomizer.get() # make some random delay for NONSEQ and SEQ requests else: - self.ahb.HREADYOUT <= 1 # IDLE and BUSY require 0 WS + self.ahb.HREADYOUT.value = 1 # IDLE and BUSY require 0 WS - @cocotb.coroutine + @coroutine def stim(self): ahb = self.ahb - ahb.HREADYOUT <= 1 - ahb.HRESP <= 0 - ahb.HRDATA <= 0 + ahb.HREADYOUT.value = 1 + ahb.HRESP.value = 0 + ahb.HRDATA.value = 0 + addressOffset = 0 + address = 0 + trans = 0 + size = 0 + write = 0 valid = 0 while True: yield RisingEdge(self.clk) @@ -242,7 +258,7 @@ def stim(self): if trans >= 2: if write == 1: for idx in range(size): - self.ram[address-self.base + idx] = (int(ahb.HWDATA) >> (8*(addressOffset + idx))) & 0xFF + self.ram[address - self.base + idx] = (int(ahb.HWDATA) >> (8 * (addressOffset + idx))) & 0xFF # print("write %x with %x" % (address + idx,(int(ahb.HWDATA) >> (8*(addressOffset + idx))) & 0xFF)) valid = int(ahb.HSEL) @@ -250,15 +266,15 @@ def stim(self): write = int(ahb.HWRITE) size = 1 << int(ahb.HSIZE) address = int(ahb.HADDR) - addressOffset = address % (len(ahb.HWDATA)//8) + addressOffset = address % (len(ahb.HWDATA) // 8) - ahb.HRDATA <= 0 + ahb.HRDATA.value = 0 if valid == 1: if trans >= 2: if write == 0: data = 0 for idx in range(size): - data |= self.ram[address-self.base + idx] << (8*(addressOffset + idx)) + data |= self.ram[address - self.base + idx] << (8 * (addressOffset + idx)) # print("read %x with %x" % (address + idx, self.ram[address-self.base + idx])) # print(str(data)) - ahb.HRDATA <= int(data) + ahb.HRDATA.value = int(data) diff --git a/Apb3.py b/Apb3.py index 9fe0efb..fe70130 100644 --- a/Apb3.py +++ b/Apb3.py @@ -1,26 +1,23 @@ -import random - -import cocotb +from cocotb.result import ReturnValue +from cocotb.triggers import RisingEdge from cocotb.decorators import coroutine -from cocotb.result import TestFailure, ReturnValue -from cocotb.triggers import RisingEdge, Edge -from cocotblib.misc import log2Up, BoolRandomizer, assertEquals, waitClockedCond, randSignal +from .misc import assertEquals, waitClockedCond, randSignal class Apb3: - def __init__(self, dut, name, clk = None): + def __init__(self, dut, name, clk=None): self.clk = clk - self.PADDR = dut.__getattr__(name + "_PADDR") - self.PSEL = dut.__getattr__(name + "_PSEL") - self.PENABLE = dut.__getattr__(name + "_PENABLE") - self.PREADY = dut.__getattr__(name + "_PREADY") - self.PWRITE = dut.__getattr__(name + "_PWRITE") - self.PWDATA = dut.__getattr__(name + "_PWDATA") - self.PRDATA = dut.__getattr__(name + "_PRDATA") + self.PADDR = dut.__getattr__(name + "_PADDR") + self.PSEL = dut.__getattr__(name + "_PSEL") + self.PENABLE = dut.__getattr__(name + "_PENABLE") + self.PREADY = dut.__getattr__(name + "_PREADY") + self.PWRITE = dut.__getattr__(name + "_PWRITE") + self.PWDATA = dut.__getattr__(name + "_PWDATA") + self.PRDATA = dut.__getattr__(name + "_PRDATA") def idle(self): - self.PSEL <= 0 + self.PSEL.value = 0 @coroutine def delay(self, cycle): @@ -28,55 +25,54 @@ def delay(self, cycle): yield RisingEdge(self.clk) @coroutine - def write(self, address, data, sel = 1): - self.PADDR <= address - self.PSEL <= sel - self.PENABLE <= False - self.PWRITE <= True - self.PWDATA <= data + def write(self, address, data, sel=1): + self.PADDR.value = address + self.PSEL.value = sel + self.PENABLE.value = False + self.PWRITE.value = True + self.PWDATA.value = data yield RisingEdge(self.clk) - self.PENABLE <= True - yield waitClockedCond(self.clk, lambda : self.PREADY == True) + self.PENABLE.value = True + yield waitClockedCond(self.clk, lambda: self.PREADY.value == 1) randSignal(self.PADDR) - self.PSEL <= 0 + self.PSEL.value = 0 randSignal(self.PENABLE) randSignal(self.PWRITE) randSignal(self.PWDATA) @coroutine - def writeMasked(self, address, data, mask, sel = 1): - readThread = self.read(address,sel) + def writeMasked(self, address, data, mask, sel=1): + readThread = self.read(address, sel) yield readThread - yield self.write(address,(readThread.retval & ~mask) | (data & mask),sel) + yield self.write(address, (readThread.retval & ~mask) | (data & mask), sel) @coroutine def read(self, address, sel=1): - self.PADDR <= address - self.PSEL <= sel - self.PENABLE <= False - self.PWRITE <= False + self.PADDR.value = address + self.PSEL.value = sel + self.PENABLE.value = False + self.PWRITE.value = False randSignal(self.PWDATA) yield RisingEdge(self.clk) - self.PENABLE <= True - yield waitClockedCond(self.clk, lambda: self.PREADY == True) + self.PENABLE.value = True + yield waitClockedCond(self.clk, lambda: self.PREADY.value == 1) randSignal(self.PADDR) - self.PSEL <= 0 + self.PSEL.value = 0 randSignal(self.PENABLE) randSignal(self.PWRITE) raise ReturnValue(int(self.PRDATA)) - @coroutine def readAssert(self, address, data, sel=1): - readThread = self.read(address,sel) + readThread = self.read(address, sel) yield readThread - assertEquals(int(readThread.retval), data," APB readAssert failure") + assertEquals(int(readThread.retval), data, " APB readAssert failure") @coroutine def readAssertMasked(self, address, data, mask, sel=1): - readThread = self.read(address,sel) + readThread = self.read(address, sel) yield readThread - assertEquals(int(readThread.retval) & mask, data," APB readAssert failure") + assertEquals(int(readThread.retval) & mask, data, " APB readAssert failure") @coroutine def pull(self, address, dataValue, dataMask, sel=1): @@ -84,4 +80,4 @@ def pull(self, address, dataValue, dataMask, sel=1): readThread = self.read(address, sel) yield readThread if (int(readThread.retval) & dataMask) == dataValue: - break \ No newline at end of file + break diff --git a/Axi4.py b/Axi4.py index cfc21d6..357c628 100644 --- a/Axi4.py +++ b/Axi4.py @@ -1,69 +1,70 @@ import random from queue import Queue -from cocotblib.Phase import PHASE_SIM, Infrastructure -from cocotblib.Scorboard import ScorboardOutOfOrder -from cocotblib.misc import BoolRandomizer, log2Up, randBits - -from cocotblib.Stream import Stream, Transaction, StreamDriverSlave, StreamDriverMaster, StreamMonitor +from .Phase import PHASE_SIM, Infrastructure +from .Scorboard import ScorboardOutOfOrder +from .misc import BoolRandomizer, log2Up, randBits +from .Stream import Stream, Transaction, StreamDriverSlave, StreamDriverMaster, StreamMonitor class Axi4: - def __init__(self,dut,name): - self.ar = Stream(dut,name + "_ar") - self.r = Stream(dut, name + "_r") + def __init__(self, dut, name): + self.ar = Stream(dut, name + "_ar") + self.r = Stream(dut, name + "_r") self.aw = Stream(dut, name + "_aw") - self.w = Stream(dut, name + "_w") - self.b = Stream(dut, name + "_b") + self.w = Stream(dut, name + "_w") + self.b = Stream(dut, name + "_b") + class Axi4ReadOnly: - def __init__(self,dut,name): - self.ar = Stream(dut,name + "_ar") - self.r = Stream(dut, name + "_r") + def __init__(self, dut, name): + self.ar = Stream(dut, name + "_ar") + self.r = Stream(dut, name + "_r") + class Axi4WriteOnly: - def __init__(self,dut,name): + def __init__(self, dut, name): self.aw = Stream(dut, name + "_aw") - self.w = Stream(dut, name + "_w") - self.b = Stream(dut, name + "_b") + self.w = Stream(dut, name + "_w") + self.b = Stream(dut, name + "_b") + class Axi4Shared: - def __init__(self,dut,name): - self.arw = Stream(dut,name + "_arw") - self.r = Stream(dut, name + "_r") - self.w = Stream(dut, name + "_w") - self.b = Stream(dut, name + "_b") + def __init__(self, dut, name): + self.arw = Stream(dut, name + "_arw") + self.r = Stream(dut, name + "_r") + self.w = Stream(dut, name + "_w") + self.b = Stream(dut, name + "_b") -def Axi4AddrIncr(address,burst,len,size): +def Axi4AddrIncr(address, burst, length, size): if burst == 0: return address if burst == 1: return address + (1 << size) if burst == 2: - burstSize = (1 << size) * (len+1) - burstMask = burstSize-1 + burstSize = (1 << size) * (length + 1) + burstMask = burstSize - 1 base = (address + (1 << size)) & burstMask return (address & ~burstMask) | base - class Axi4SharedMemoryChecker(Infrastructure): - def __init__(self,name,parent,axi,addressWidth,clk,reset): - Infrastructure.__init__(self,name,parent) + def __init__(self, name, parent, axi, addressWidth, clk, reset): + Infrastructure.__init__(self, name, parent) self.axi = axi self.idWidth = len(axi.arw.payload.hid) self.addressWidth = addressWidth - self.ram = bytearray(b'\x00' * ((1 << addressWidth)*len(axi.w.payload.data)//8)) + self.ram = bytearray(b"\x00" * ((1 << addressWidth) * len(axi.w.payload.data) // 8)) self.doReadWriteCmdRand = BoolRandomizer() self.readWriteRand = BoolRandomizer() self.writeDataRand = BoolRandomizer() self.writeRspScoreboard = ScorboardOutOfOrder("writeRspScoreboard", self) - self.readRspScoreboard = ScorboardOutOfOrder("readRspScoreboard", self) + self.readRspScoreboard = ScorboardOutOfOrder("readRspScoreboard", self) self.writeRspScoreboard.addListener(self.freeReservatedAddresses) self.readRspScoreboard.addListener(self.freeReservatedAddresses) - self.cmdTasks = Queue() - self.writeTasks = Queue() + self.cmdTasks = Queue() + self.writeTasks = Queue() self.nonZeroReadRspCounter = 0 self.nonZeroReadRspCounterTarget = 1000 self.reservedAddresses = {} @@ -74,13 +75,13 @@ def __init__(self,name,parent,axi,addressWidth,clk,reset): StreamDriverMaster(axi.w, self.genWriteData, clk, reset) StreamMonitor(axi.r, self.onReadRsp, clk, reset) StreamMonitor(axi.b, self.onWriteRsp, clk, reset) - axi.w.payload.last <= 0 - axi.r.payload.last <= 0 + axi.w.payload.last.value = 0 + axi.r.payload.last.value = 0 - def freeReservatedAddresses(self,uut,ref,equal): - self.reservedAddresses.pop(ref,None) + def freeReservatedAddresses(self, uut, ref, equal): + self.reservedAddresses.pop(ref, None) - def isAddressRangeBusy(self,start,end): + def isAddressRangeBusy(self, start, end): for r in self.reservedAddresses.values(): if start < r[1] and end > r[0]: return True @@ -94,10 +95,10 @@ def genNewCmd(self): cmd.hid = randBits(self.idWidth) # Each master can use 4 id cmd.region = randBits(4) cmd.len = randBits(4) - cmd.size = random.randint(0,log2Up(self.dataWidth//8)) - cmd.burst = random.randint(0,2) + cmd.size = random.randint(0, log2Up(self.dataWidth // 8)) + cmd.burst = random.randint(0, 2) if cmd.burst == 2: - cmd.len = random.choice([2,4,8,16])-1 + cmd.len = random.choice([2, 4, 8, 16]) - 1 else: cmd.len = randBits(4) + (16 if random.random() < 0.1 else 0) + (32 if random.random() < 0.02 else 0) cmd.lock = randBits(1) @@ -105,59 +106,59 @@ def genNewCmd(self): cmd.qos = randBits(4) cmd.prot = randBits(3) - byteCount = (1 << cmd.size)*(cmd.len + 1) - while(True): - cmd.addr = self.genRandomeAddress() & ~((1 << cmd.size)-1) + byteCount = (1 << cmd.size) * (cmd.len + 1) + while True: + cmd.addr = self.genRandomeAddress() & ~((1 << cmd.size) - 1) if cmd.burst == 1: - if cmd.addr + byteCount >= (1<= (1 << self.addressWidth): continue if cmd.burst == 0: start = cmd.addr - end = start + cmd.size + end = start + cmd.size if cmd.burst == 1: start = cmd.addr end = start + byteCount if cmd.burst == 2: - start = cmd.addr & ~(byteCount-1) + start = cmd.addr & ~(byteCount - 1) end = start + byteCount - if self.isAddressRangeBusy(start,end): + if self.isAddressRangeBusy(start, end): continue break if self.readWriteRand.get(): cmd.write = 1 beatAddr = cmd.addr - for i in range(cmd.len+1): + for i in range(cmd.len + 1): dataTrans = Transaction() dataTrans.data = randBits(self.dataWidth) - dataTrans.strb = randBits(self.dataWidth//8) + dataTrans.strb = randBits(self.dataWidth // 8) dataTrans.last = 1 if cmd.len == i else 0 self.writeTasks.put(dataTrans) - for s in range(self.dataWidth//8): + for s in range(self.dataWidth // 8): if (dataTrans.strb >> s) & 1 == 1: - self.ram[(beatAddr & ~(self.dataWidth//8-1)) + s] = (dataTrans.data >> (s*8)) & 0xFF - beatAddr = Axi4AddrIncr(beatAddr,cmd.burst,cmd.len,cmd.size) + self.ram[(beatAddr & ~(self.dataWidth // 8 - 1)) + s] = (dataTrans.data >> (s * 8)) & 0xFF + beatAddr = Axi4AddrIncr(beatAddr, cmd.burst, cmd.len, cmd.size) writeRsp = Transaction() writeRsp.resp = 0 writeRsp.hid = cmd.hid - self.reservedAddresses[writeRsp] = [start,end] - self.writeRspScoreboard.refPush(writeRsp,writeRsp.hid) + self.reservedAddresses[writeRsp] = [start, end] + self.writeRspScoreboard.refPush(writeRsp, writeRsp.hid) else: cmd.write = 0 beatAddr = cmd.addr for s in range(cmd.len + 1): readRsp = Transaction() - addrBase = beatAddr & ~(self.dataWidth//8-1) + addrBase = beatAddr & ~(self.dataWidth // 8 - 1) readRsp.data = 0 for i in range(self.dataWidth // 8): - readRsp.data |= self.ram[addrBase + i] << (i*8) + readRsp.data |= self.ram[addrBase + i] << (i * 8) readRsp.resp = 0 readRsp.last = 1 if cmd.len == s else 0 readRsp.hid = cmd.hid @@ -169,7 +170,6 @@ def genNewCmd(self): self.cmdTasks.put(cmd) # print(str(len(self.cmdTasks.queue)) + " " + str(len(self.writeTasks.queue))) - def genReadWriteCmd(self): if self.doReadWriteCmdRand.get(): while self.cmdTasks.empty(): @@ -186,8 +186,8 @@ def genWriteData(self): self.genNewCmd() return self.writeTasks.get() - def onWriteRsp(self,trans): - self.writeRspScoreboard.uutPush(trans,trans.hid) + def onWriteRsp(self, trans): + self.writeRspScoreboard.uutPush(trans, trans.hid) def onReadRsp(self, trans): self.readRspScoreboard.uutPush(trans, trans.hid) diff --git a/ClockDomain.py b/ClockDomain.py index 39bf48b..c7f2532 100644 --- a/ClockDomain.py +++ b/ClockDomain.py @@ -1,5 +1,7 @@ import cocotb from cocotb.triggers import Timer, RisingEdge, Event +from cocotb.decorators import coroutine +from cocotb.utils import get_time_from_sim_steps ############################################################################### @@ -7,7 +9,7 @@ # class RESET_ACTIVE_LEVEL: HIGH = 1 - LOW = 0 + LOW = 0 ############################################################################### @@ -17,11 +19,9 @@ class RESET_ACTIVE_LEVEL: # # # Create a clock with a reset active high # clockDomain = ClockDomain(dut.clk, 400, dut.reset, RESET_ACTIVE_LEVEL.HIGH) -# cocobt.fork( clockDomain.start() ) +# cocotb.start_soon( clockDomain.start() ) # class ClockDomain: - - ########################################################################## # Constructor # @@ -30,63 +30,56 @@ class ClockDomain: # @param reset : Reset generated # @param resetactiveLevel : Reset active low or high def __init__(self, clk, halfPeriod, reset=None, resetActiveLevel=RESET_ACTIVE_LEVEL.LOW): - self.halfPeriod = halfPeriod + self.frequency = 1 / get_time_from_sim_steps(halfPeriod * 2, units="us") - self.clk = clk - self.reset = reset + self.clk = clk + self.reset = reset self.typeReset = resetActiveLevel self.event_endReset = Event() - ########################################################################## # Generate the clock signals - @cocotb.coroutine + @coroutine def start(self): - - self.fork_gen = cocotb.fork(self._clkGen()) - if self.reset != None : - cocotb.fork(self._waitEndReset()) + self.fork_gen = cocotb.start_soon(self._clkGen()) + if self.reset is not None: + cocotb.start_soon(self._waitEndReset()) if self.reset: - self.reset <= self.typeReset + self.reset.value = self.typeReset yield Timer(self.halfPeriod * 5) if self.reset: - self.reset <= int(1 if self.typeReset == RESET_ACTIVE_LEVEL.LOW else 0) - + self.reset.value = int(1 if self.typeReset == RESET_ACTIVE_LEVEL.LOW else 0) ########################################################################## # Stop all processes def stop(self): - self.fork_gen.kill() - ########################################################################## # Generate the clk - @cocotb.coroutine + @coroutine def _clkGen(self): while True: - self.clk <= 0 + self.clk.value = 0 yield Timer(self.halfPeriod) - self.clk <= 1 + self.clk.value = 1 yield Timer(self.halfPeriod) - ########################################################################## # Wait the end of the reset - @cocotb.coroutine + @coroutine def _waitEndReset(self): while True: yield RisingEdge(self.clk) valueReset = int(1 if self.typeReset == RESET_ACTIVE_LEVEL.LOW else 0) if int(self.reset) == valueReset: self.event_endReset.set() - break; - + break ########################################################################## # Display the frequency of the clock domain diff --git a/Flow.py b/Flow.py index 8514594..202e288 100644 --- a/Flow.py +++ b/Flow.py @@ -1,47 +1,44 @@ import cocotb from cocotb.triggers import RisingEdge, Event -from cocotblib.misc import Bundle +from cocotb.decorators import coroutine + +from .misc import Bundle ############################################################################### # Flow # class Flow: - - #========================================================================== + # ========================================================================== # Constructor - #========================================================================== + # ========================================================================== def __init__(self, dut, name): - # interface self.valid = dut.__getattr__(name + "_valid") - self.payload = Bundle(dut,name + "_payload") + self.payload = Bundle(dut, name + "_payload") # Event self.event_valid = Event() - - #========================================================================== + # ========================================================================== # Start to monitor the valid signal - #========================================================================== + # ========================================================================== def startMonitoringValid(self, clk): - self.clk = clk - self.fork_valid = cocotb.fork(self.monitor_valid()) - + self.clk = clk + self.fork_valid = cocotb.start_soon(self.monitor_valid()) - #========================================================================== + # ========================================================================== # Stop monitoring - #========================================================================== + # ========================================================================== def stopMonitoring(self): self.fork_valid.kill() - - #========================================================================== + # ========================================================================== # Monitor the valid signal - #========================================================================== - @cocotb.coroutine + # ========================================================================== + @coroutine def monitor_valid(self): while True: yield RisingEdge(self.clk) if int(self.valid) == 1: - self.event_valid.set( self.payload ) + self.event_valid.set(self.payload) diff --git a/Phase.py b/Phase.py index f53a622..7b4b964 100644 --- a/Phase.py +++ b/Phase.py @@ -1,6 +1,5 @@ -import cocotb -from cocotb.result import TestFailure, TestError from cocotb.triggers import Timer +from cocotb.decorators import coroutine PHASE_NULL = 0 PHASE_SIM = 100 @@ -10,18 +9,19 @@ class Infrastructure: - def __init__(self,name,parent): + def __init__(self, name, parent): self.name = name self.parent = parent - if parent != None: + if parent is not None: parent.addChild(self) self.children = [] + self.error = False def getPhase(self): return self.parent.getPhase() def startPhase(self, phase): - error = False + self.error = False for child in self.children: child.startPhase(phase) @@ -40,12 +40,12 @@ def endPhase(self, phase): for child in self.children: child.endPhase(phase) - def addChild(self,child): + def addChild(self, child): if child not in self.children: self.children.append(child) def getPath(self): - if self.parent != None: + if self.parent is not None: return self.parent.getPath() + "/" + self.name else: return self.name @@ -59,10 +59,10 @@ def __init__(self): self.waitTasksEndTime = 0 # setSimManager(self) - def setWaitTasksEndTime(self,value): + def setWaitTasksEndTime(self, value): self.waitTasksEndTime = value - @cocotb.coroutine + @coroutine def waitChild(self): while True: if self.canPhaseProgress(self.phase): @@ -72,14 +72,14 @@ def waitChild(self): def getPhase(self): return self.phase - def switchPhase(self,phase): + def switchPhase(self, phase): for infra in self.children: infra.endPhase(self.phase) self.phase = phase for infra in self.children: infra.startPhase(self.phase) - @cocotb.coroutine + @coroutine def run(self): self.switchPhase(PHASE_SIM) yield self.waitChild() @@ -89,6 +89,7 @@ def run(self): self.switchPhase(PHASE_CHECK_SCORBOARDS) self.switchPhase(PHASE_DONE) + # _simManager = None # # def getSimManager(): @@ -98,6 +99,3 @@ def run(self): # global _simManager # _simManager = that # - - - diff --git a/Scorboard.py b/Scorboard.py index 5841904..b3af2ee 100644 --- a/Scorboard.py +++ b/Scorboard.py @@ -3,23 +3,23 @@ import cocotb from cocotb.result import TestFailure -from cocotblib.Phase import Infrastructure, PHASE_CHECK_SCORBOARDS +from .Phase import Infrastructure, PHASE_CHECK_SCORBOARDS class ScorboardInOrder(Infrastructure): - def __init__(self,name,parent): - Infrastructure.__init__(self,name,parent) + def __init__(self, name, parent): + Infrastructure.__init__(self, name, parent) self.refs = Queue() self.uuts = Queue() self.refsCounter = 0 self.uutsCounter = 0 - def refPush(self,ref): + def refPush(self, ref): self.refs.put(ref) self.refsCounter += 1 self.update() - def uutPush(self,uut): + def uutPush(self, uut): self.uuts.put(uut) self.uutsCounter += 1 self.update() @@ -29,27 +29,25 @@ def update(self): ref = self.refs.get() uut = self.uuts.get() - self.match(uut,ref) + self.match(uut, ref) - - def match(self,uut,ref): + def match(self, uut, ref): if not uut.equalRef(ref): - cocotb.log.error("Missmatch detected in " + self.getPath()) + cocotb._log.error("Mismatch detected in " + self.getPath()) uut.assertEqualRef(ref) def startPhase(self, phase): Infrastructure.startPhase(self, phase) if phase == PHASE_CHECK_SCORBOARDS: if (not self.refs.empty()) or (not self.uuts.empty()): - error = self.getPath() + " has some remaining transaction :\n" + error = self.getPath() + " has some remaining transaction(s) :\n" for e in self.refs.queue: error += "REF:\n" + str(e) + "\n" for e in self.uuts.queue: error += "UUT:\n" + str(e) + "\n" - cocotb.log.error(error) - + cocotb._log.error(error) def endPhase(self, phase): Infrastructure.endPhase(self, phase) @@ -59,17 +57,16 @@ def endPhase(self, phase): class ScorboardOutOfOrder(Infrastructure): - def __init__(self,name,parent): - Infrastructure.__init__(self,name,parent) + def __init__(self, name, parent): + Infrastructure.__init__(self, name, parent) self.refsDic = {} self.uutsDic = {} self.listeners = [] - - def addListener(self,func): + def addListener(self, func): self.listeners.append(func) - def refPush(self,ref,oooid): + def refPush(self, ref, oooid): if oooid not in self.refsDic: self.refsDic[oooid] = Queue() self.refsDic[oooid].put(ref) @@ -81,7 +78,7 @@ def uutPush(self, uut, oooid): self.uutsDic[oooid].put(uut) self.update(oooid) - def update(self,oooid): + def update(self, oooid): if oooid in self.uutsDic and oooid in self.refsDic: refs = self.refsDic[oooid] uuts = self.uutsDic[oooid] @@ -89,43 +86,40 @@ def update(self,oooid): ref = refs.get() uut = uuts.get() - self.match(uut,ref) + self.match(uut, ref) - #Clean + # Clean if refs.empty(): self.refsDic.pop(oooid) if uuts.empty(): self.uutsDic.pop(oooid) - - def match(self,uut,ref): + def match(self, uut, ref): equal = uut.equalRef(ref) - for l in self.listeners: - l(uut,ref,equal) + for handler in self.listeners: + handler(uut, ref, equal) if not equal: - cocotb.log.error("Missmatch detected in " + self.getPath()) + cocotb._log.error("Mismatch detected in " + self.getPath()) uut.assertEqualRef(ref) def startPhase(self, phase): Infrastructure.startPhase(self, phase) if phase == PHASE_CHECK_SCORBOARDS: if len(self.refsDic) != 0 or len(self.uutsDic) != 0: - error = self.getPath() + " has some remaining transaction :\n" - for l in self.refsDic.values(): - for e in l.queue: + error = self.getPath() + " has some remaining transaction(s) :\n" + for it in self.refsDic.values(): + for e in it.queue: error += "REF:\n" + str(e) + "\n" - for l in self.uutsDic.values(): - for e in l.queue: + for it in self.uutsDic.values(): + for e in it.queue: error += "UUT:\n" + str(e) + "\n" - cocotb.log.error(error) - + cocotb._log.error(error) def endPhase(self, phase): Infrastructure.endPhase(self, phase) if phase == PHASE_CHECK_SCORBOARDS: if len(self.refsDic) != 0 or len(self.uutsDic) != 0: raise TestFailure("Scoreboard not empty") - diff --git a/Spi.py b/Spi.py index 7b1d7e3..667af91 100644 --- a/Spi.py +++ b/Spi.py @@ -1,12 +1,9 @@ -import random - -import cocotb +from cocotb.result import ReturnValue +from cocotb.triggers import Timer from cocotb.decorators import coroutine -from cocotb.result import TestFailure, ReturnValue -from cocotb.triggers import RisingEdge, Edge, Timer -from cocotblib.TriState import TriStateOutput -from cocotblib.misc import log2Up, BoolRandomizer, assertEquals, testBit +from .TriState import TriStateOutput +from .misc import testBit class SpiMaster: @@ -14,23 +11,17 @@ def __init__(self, dut, name): self.sclk = dut.__getattr__(name + "_sclk") self.mosi = dut.__getattr__(name + "_mosi") self.miso = dut.__getattr__(name + "_miso") - self.ss = dut.__getattr__(name + "_ss") - - - + self.ss = dut.__getattr__(name + "_ss") class SpiSlave: def __init__(self, dut, name): self.sclk = dut.__getattr__(name + "_sclk") self.mosi = dut.__getattr__(name + "_mosi") - self.miso = TriStateOutput(dut,name + "_miso") + self.miso = TriStateOutput(dut, name + "_miso") self.ss = dut.__getattr__(name + "_ss") - - - class SpiSlaveMaster: def __init__(self, spi): self.spi = spi @@ -39,23 +30,23 @@ def __init__(self, spi): self.baudPeriode = 1000 self.dataWidth = 8 - def init(self, cpol, cpha, baudrate, dataWidth = 8): - self.spi.ss <= True + def init(self, cpol, cpha, baudrate, dataWidth=8): + self.spi.ss.value = True self.cpol = cpol self.cpha = cpha self.baudPeriode = baudrate self.dataWidth = dataWidth - self.spi.sclk <= cpol + self.spi.sclk.value = cpol @coroutine def enable(self): - self.spi.ss <= False + self.spi.ss.value = False yield Timer(self.baudPeriode) @coroutine def disable(self): yield Timer(self.baudPeriode) - self.spi.ss <= True + self.spi.ss.value = True yield Timer(self.baudPeriode) @coroutine @@ -63,19 +54,19 @@ def exchange(self, masterData): buffer = "" if not self.cpha: for i in range(self.dataWidth): - self.spi.mosi <= testBit(masterData, self.dataWidth - 1 - i) + self.spi.mosi.value = testBit(masterData, self.dataWidth - 1 - i) yield Timer(self.baudPeriode >> 1) buffer = buffer + str(self.spi.miso.write) if bool(self.spi.miso.writeEnable) else "x" - self.spi.sclk <= (not self.cpol) + self.spi.sclk.value = not self.cpol yield Timer(self.baudPeriode >> 1) - self.spi.sclk <= (self.cpol) + self.spi.sclk.value = self.cpol else: for i in range(self.dataWidth): - self.spi.mosi <= testBit(masterData, self.dataWidth -1 - i) - self.spi.sclk <= (not self.cpol) + self.spi.mosi.value = testBit(masterData, self.dataWidth - 1 - i) + self.spi.sclk.value = not self.cpol yield Timer(self.baudPeriode >> 1) buffer = buffer + str(self.spi.miso.write) if bool(self.spi.miso.writeEnable) else "x" - self.spi.sclk <= (self.cpol) + self.spi.sclk.value = self.cpol yield Timer(self.baudPeriode >> 1) raise ReturnValue(buffer) @@ -84,4 +75,4 @@ def exchange(self, masterData): def exchangeCheck(self, masterData, slaveData): c = self.exchange(masterData) yield c - assert slaveData == int(c.retval,2) \ No newline at end of file + assert slaveData == int(c.retval, 2) diff --git a/Stream.py b/Stream.py index 400489e..827413b 100644 --- a/Stream.py +++ b/Stream.py @@ -1,73 +1,73 @@ +import types import cocotb -import types from cocotb.result import TestFailure from cocotb.triggers import RisingEdge, Timer, Event -from cocotblib.Phase import Infrastructure, PHASE_WAIT_TASKS_END -from cocotblib.Scorboard import ScorboardInOrder +from cocotb.decorators import coroutine -from cocotblib.misc import Bundle, BoolRandomizer +from .Phase import Infrastructure, PHASE_WAIT_TASKS_END +from .Scorboard import ScorboardInOrder +from .misc import Bundle, BoolRandomizer class Stream: def __init__(self, dut, name): - self.valid = dut.__getattr__(name + "_valid") - self.ready = dut.__getattr__(name + "_ready") - self.payload = Bundle(dut,name + "_payload") + self.valid = dut.__getattr__(name + "_valid") + self.ready = dut.__getattr__(name + "_ready") + self.payload = Bundle(dut, name + "_payload") # Event self.event_ready = Event() self.event_valid = Event() def startMonitoringReady(self, clk): - self.clk = clk - self.fork_ready = cocotb.fork(self.monitor_ready()) + self.clk = clk + self.fork_ready = cocotb.start_soon(self.monitor_ready()) def startMonitoringValid(self, clk): - self.clk = clk - self.fork_valid = cocotb.fork(self.monitor_valid()) + self.clk = clk + self.fork_valid = cocotb.start_soon(self.monitor_valid()) def stopMonitoring(self): self.fork_ready.kill() self.fork_valid.kill() - @cocotb.coroutine + @coroutine def monitor_ready(self): while True: yield RisingEdge(self.clk) if int(self.ready) == 1: - self.event_ready.set( self.payload ) + self.event_ready.set(self.payload) - @cocotb.coroutine + @coroutine def monitor_valid(self): while True: yield RisingEdge(self.clk) if int(self.valid) == 1: - self.event_valid.set( self.payload ) + self.event_valid.set(self.payload) class Transaction(object): def __init__(self): - object.__setattr__(self,"_nameToElement",{}) + object.__setattr__(self, "_nameToElement", {}) def __setattr__(self, key, value): # print("set " + key) - if key[0] != '_': + if key[0] != "_": self._nameToElement[key] = value - object.__setattr__(self,key,value) + object.__setattr__(self, key, value) - def equalRef(self,ref): + def equalRef(self, ref): # if(len(self._nameToElement) != len(ref._nameToElement)): # return False for name in self._nameToElement: - refValue = getattr(ref,name) - if refValue != None and self._nameToElement[name] != getattr(ref,name): + refValue = getattr(ref, name) + if refValue is not None and self._nameToElement[name] != getattr(ref, name): return False return True - def assertEqualRef(self,ref): + def assertEqualRef(self, ref): if not self.equalRef(ref): - raise TestFailure("\nFAIL transaction not equal\ntransaction =>\n%s\nref =>\n%s\n\n" % (self,ref)) - + raise TestFailure("\nFAIL transaction not equal\ntransaction =>\n%s\nref =>\n%s\n\n" % (self, ref)) def __str__(self): buffer = "" @@ -77,83 +77,85 @@ def __str__(self): biggerName = len(n) for name in self._nameToElement: e = self._nameToElement[name] - buffer += "%s %s: 0x%x\n" % (name," "*(biggerName-len(name)), 0 if e == None else e) + buffer += "%s %s: 0x%x\n" % (name, " " * (biggerName - len(name)), 0 if e is None else e) return buffer + # Transaction = type('Transaction', (object,), {}) + class StreamDriverMaster: - def __init__(self,stream,transactor,clk,reset): + def __init__(self, stream, transactor, clk, reset): self.stream = stream self.clk = clk self.reset = reset self.transactor = transactor - cocotb.fork(self.stim()) + cocotb.start_soon(self.stim()) - @cocotb.coroutine + @coroutine def stim(self): stream = self.stream - stream.valid <= 0 + stream.valid.value = 0 + next_delay = 0 while True: yield RisingEdge(self.clk) if int(stream.valid) == 1 and int(stream.ready) == 1: - stream.valid <= 0 - for i in range(nextDelay): + stream.valid.value = 0 + for i in range(next_delay): yield RisingEdge(self.clk) - if self.transactor != None and (int(stream.valid) == 0 or int(stream.ready) == 1): - if isinstance(self.transactor,types.GeneratorType): + if self.transactor is not None and (int(stream.valid) == 0 or int(stream.ready) == 1): + if isinstance(self.transactor, types.GeneratorType): trans = next(self.transactor) else: trans = self.transactor() - if trans != None: - if hasattr(trans,"nextDelay"): - nextDelay = trans.nextDelay + if trans is not None: + if hasattr(trans, "nextDelay"): + next_delay = trans.nextDelay else: - nextDelay = 0 - stream.valid <= 1 + next_delay = 0 + stream.valid.value = 1 for name in stream.payload.nameToElement: - if hasattr(trans,name) == False: + if hasattr(trans, name) is False: raise Exception("Missing element in bundle :" + name) - e = stream.payload.nameToElement[name] <= getattr(trans,name) - + stream.payload.nameToElement[name].value = getattr(trans, name) class StreamDriverSlave: - def __init__(self,stream,clk,reset): + def __init__(self, stream, clk, reset): self.stream = stream self.clk = clk self.reset = reset self.randomizer = BoolRandomizer() - cocotb.fork(self.stim()) + cocotb.start_soon(self.stim()) - @cocotb.coroutine + @coroutine def stim(self): stream = self.stream - stream.ready <= 1 + stream.ready.value = 1 while True: yield RisingEdge(self.clk) - stream.ready <= self.randomizer.get() + stream.ready.value = self.randomizer.get() def TransactionFromBundle(bundle): trans = Transaction() for name in bundle.nameToElement: - setattr(trans,name, int(bundle.nameToElement[name])) + setattr(trans, name, int(bundle.nameToElement[name])) return trans class StreamMonitor: - def __init__(self,stream,callback,clk,reset): + def __init__(self, stream, callback, clk, reset): self.stream = stream self.callback = callback self.clk = clk self.reset = reset - cocotb.fork(self.stim()) + cocotb.start_soon(self.stim()) - @cocotb.coroutine + @coroutine def stim(self): stream = self.stream while True: @@ -164,11 +166,9 @@ def stim(self): self.callback(trans) - - class StreamFifoTester(Infrastructure): - def __init__(self,name,parent,pushStream,popStream,transactionGenerator,dutCounterTarget,clk,reset): - Infrastructure.__init__(self,name,parent) + def __init__(self, name, parent, pushStream, popStream, transactionGenerator, dutCounterTarget, clk, reset): + Infrastructure.__init__(self, name, parent) self.pushStream = pushStream self.popStream = popStream self.clk = clk @@ -204,5 +204,3 @@ def onRef(self, uut): def canPhaseProgress(self, phase): return self.dutCounter > self.dutCounterTarget - - diff --git a/TriState.py b/TriState.py index c2e06d7..8f86bbc 100644 --- a/TriState.py +++ b/TriState.py @@ -2,25 +2,21 @@ # Tristate # class TriState: - - #========================================================================== + # ========================================================================== # Constructor - #========================================================================== + # ========================================================================== def __init__(self, dut, name): - # interface - self.read = dut.__getattr__(name + "_read") - self.write = dut.__getattr__(name + "_write") + self.read = dut.__getattr__(name + "_read") + self.write = dut.__getattr__(name + "_write") self.writeEnable = dut.__getattr__(name + "_writeEnable") class TriStateOutput: - - #========================================================================== + # ========================================================================== # Constructor - #========================================================================== + # ========================================================================== def __init__(self, dut, name): - # interface - self.write = dut.__getattr__(name + "_write") - self.writeEnable = dut.__getattr__(name + "_writeEnable") \ No newline at end of file + self.write = dut.__getattr__(name + "_write") + self.writeEnable = dut.__getattr__(name + "_writeEnable") diff --git a/misc.py b/misc.py index 939e0ab..3529531 100644 --- a/misc.py +++ b/misc.py @@ -1,110 +1,123 @@ import random +import time import cocotb from cocotb.binary import BinaryValue -from cocotb.decorators import coroutine from cocotb.result import TestFailure from cocotb.triggers import Timer, RisingEdge +from cocotb.decorators import coroutine def cocotbXHack(): - if hasattr(BinaryValue,"_resolve_to_0"): + if hasattr(BinaryValue, "_resolve_to_0"): # cocotb <= 1.4.0 - BinaryValue._resolve_to_0 = BinaryValue._resolve_to_0 + BinaryValue._resolve_to_error + BinaryValue._resolve_to_0 = BinaryValue._resolve_to_0 + BinaryValue._resolve_to_error BinaryValue._resolve_to_error = "" elif hasattr(cocotb.binary, "resolve_x_to"): # cocotb 1.5.0+ cocotb.binary.resolve_x_to = "ZEROS" cocotb.binary._resolve_table = cocotb.binary._ResolveTable() + def log2Up(value): - return value.bit_length()-1 + return value.bit_length() - 1 + + +def randInt(lower, upper): + return random.randint(lower, upper) -def randInt(min,max): - return random.randint(min, max) def randBool(): return bool(random.getrandbits(1)) + def randBits(width): return random.getrandbits(width) + def randSignal(that): - that <= random.getrandbits(len(that)) + that.value = random.getrandbits(len(that)) + -def randBoolSignal(that,prob): - that <= (random.random() < prob) +def randBoolSignal(that, prob): + that.value = random.random() < prob @coroutine -def clockedWaitTrue(clk,that): +def clockedWaitTrue(clk, that): while True: yield RisingEdge(clk) - if that == True: + if that is True: break + def assertEquals(a, b, name): if int(a) != int(b): - raise TestFailure("FAIL %s %d != %d" % (name,int(a),int(b))) + raise TestFailure("FAIL %s %d != %d" % (name, int(a), int(b))) + def truncUInt(value, signal): - if isinstance( signal, int ): - return value & ((1 << signal)-1) + if isinstance(signal, int): + return value & ((1 << signal) - 1) else: return value & ((1 << len(signal)) - 1) + def truncSInt(value, signal): - if isinstance( signal, int ): + if isinstance(signal, int): bitCount = signal else: bitCount = len(signal) - masked = value & ((1 << bitCount)-1) - if (masked & (1 << bitCount-1)) != 0: - return - (1 << bitCount) + masked + masked = value & ((1 << bitCount) - 1) + if (masked & (1 << bitCount - 1)) != 0: + return -(1 << bitCount) + masked else: return masked def setBit(v, index, x): - mask = 1 << index - v &= ~mask - if x: - v |= mask - return v + mask = 1 << index + v &= ~mask + if x: + v |= mask + return v + def testBit(int_type, offset): - mask = 1 << offset - return (int_type & mask) != 0 + mask = 1 << offset + return (int_type & mask) != 0 + def uint(signal): return signal.value.integer + def sint(signal): return signal.value.signed_integer -@cocotb.coroutine -def ClockDomainAsyncReset(clk,reset,period = 1000): +@coroutine +def ClockDomainAsyncReset(clk, reset, period=1000): if reset: - reset <= 1 - clk <= 0 + reset.value = 1 + clk.value = 0 yield Timer(period) if reset: - reset <= 0 + reset.value = 0 while True: - clk <= 0 - yield Timer(period/2) - clk <= 1 - yield Timer(period/2) + clk.value = 0 + yield Timer(period / 2) + clk.value = 1 + yield Timer(period / 2) + -@cocotb.coroutine +@coroutine def SimulationTimeout(duration): yield Timer(duration) raise TestFailure("Simulation timeout") -import time -@cocotb.coroutine +@coroutine def simulationSpeedPrinter(clk): counter = 0 lastTime = time.time() @@ -114,11 +127,10 @@ def simulationSpeedPrinter(clk): thisTime = time.time() if thisTime - lastTime >= 1.0: lastTime = thisTime - print("Sim speed : %f khz" %(counter/1000.0)) + print("Sim speed : %f khz" % (counter / 1000.0)) counter = 0 - class BoolRandomizer: def __init__(self): self.prob = 0.5 @@ -134,9 +146,8 @@ def get(self): return random.random() < self.prob - # class Stream: -# def __init__(self,name,dut): +# def __init__(self, name, dut): # self.valid = getattr(dut, name + "_valid") # self.ready = getattr(dut, name + "_ready") # payloads = [a for a in dut if a._name.startswith(name + "_payload")] @@ -144,25 +155,25 @@ def get(self): # self.payload = payloads[0] +MyObject = type("MyObject", (object,), {}) -MyObject = type('MyObject', (object,), {}) -@cocotb.coroutine -def StreamRandomizer(streamName, onNew,handle, dut, clk): +@coroutine +def StreamRandomizer(streamName, onNew, handle, dut, clk): validRandomizer = BoolRandomizer() valid = getattr(dut, streamName + "_valid") ready = getattr(dut, streamName + "_ready") payloads = [a for a in dut if a._name.startswith(streamName + "_payload")] - valid <= 0 + valid.value = 0 while True: yield RisingEdge(clk) if int(ready) == 1: - valid <= 0 + valid.value = 0 if int(valid) == 0 or int(ready) == 1: if validRandomizer.get(): - valid <= 1 + valid.value = 1 for e in payloads: randSignal(e) yield Timer(1) @@ -171,21 +182,22 @@ def StreamRandomizer(streamName, onNew,handle, dut, clk): else: payload = MyObject() for e in payloads: - payload.__setattr__(e._name[len(streamName + "_payload_"):], int(e)) + payload.__setattr__(e._name[len(streamName + "_payload_") :], int(e)) if onNew: - onNew(payload,handle) + onNew(payload, handle) + -@cocotb.coroutine -def FlowRandomizer(streamName, onNew,handle, dut, clk): +@coroutine +def FlowRandomizer(streamName, onNew, handle, dut, clk): validRandomizer = BoolRandomizer() valid = getattr(dut, streamName + "_valid") payloads = [a for a in dut if a._name.startswith(streamName + "_payload")] - valid <= 0 + valid.value = 0 while True: yield RisingEdge(clk) if validRandomizer.get(): - valid <= 1 + valid.value = 1 for e in payloads: randSignal(e) yield Timer(1) @@ -194,50 +206,52 @@ def FlowRandomizer(streamName, onNew,handle, dut, clk): else: payload = MyObject() for e in payloads: - payload.__setattr__(e._name[len(streamName + "_payload_"):], int(e)) + payload.__setattr__(e._name[len(streamName + "_payload_") :], int(e)) if onNew: - onNew(payload,handle) + onNew(payload, handle) else: - valid <= 0 + valid.value = 0 + -@cocotb.coroutine +@coroutine def StreamReader(streamName, onTransaction, handle, dut, clk): validRandomizer = BoolRandomizer() valid = getattr(dut, streamName + "_valid") ready = getattr(dut, streamName + "_ready") payloads = [a for a in dut if a._name.startswith(streamName + "_payload")] - ready <= 0 + ready.value = 0 while True: yield RisingEdge(clk) - ready <= validRandomizer.get() + ready.value = validRandomizer.get() if int(valid) == 1 and int(ready) == 1: if len(payloads) == 1 and payloads[0]._name == streamName + "_payload": payload = int(payloads[0]) else: payload = MyObject() for e in payloads: - payload.__setattr__(e._name[len(streamName + "_payload_"):], int(e)) + payload.__setattr__(e._name[len(streamName + "_payload_") :], int(e)) if onTransaction: - onTransaction(payload,handle) - + onTransaction(payload, handle) class Bundle: - def __init__(self,dut,name): + def __init__(self, dut, name): self.nameToElement = {} - self.elements = [a for a in dut if (a._name.lower().startswith(name.lower() + "_") and not a._name.lower().endswith("_readablebuffer"))] + self.elements = [ + a for a in dut if (a._name.lower().startswith(name.lower() + "_") and not a._name.lower().endswith("_readablebuffer")) + ] for e in [a for a in dut if a._name == name]: self.elements.append(e) for element in self.elements: - # print("append " + element._name + " with name : " + element._name[len(name) + 1:]) + # print("append " + element._name + " with name : " + element._name[len(name) + 1 :]) if len(name) == len(element._name): eName = "itself" else: - eName = element._name[len(name) + 1:] + eName = element._name[len(name) + 1 :] if eName == "id": eName = "hid" @@ -250,29 +264,27 @@ def __getattr__(self, name): return self.nameToElement[name] - -def readIHex(path, callback,context): +def readIHex(path, callback, context): with open(path) as f: offset = 0 for line in f: if len(line) > 0: - assert line[0] == ':' + assert line[0] == ":" byteCount = int(line[1:3], 16) nextAddr = int(line[3:7], 16) + offset key = int(line[7:9], 16) if key == 0: - array = [int(line[9 + i * 2:11 + i * 2], 16) for i in range(0, byteCount)] - callback(nextAddr,array,context) + array = [int(line[9 + i * 2 : 11 + i * 2], 16) for i in range(0, byteCount)] + callback(nextAddr, array, context) elif key == 2: offset = int(line[9:13], 16) else: pass - @coroutine def TriggerAndCond(trigger, cond): - while(True): + while True: yield trigger if cond: break @@ -280,14 +292,13 @@ def TriggerAndCond(trigger, cond): @coroutine def waitClockedCond(clk, cond): - while(True): + while True: yield RisingEdge(clk) if cond(): break - @coroutine def TimerClk(clk, count): for i in range(count): - yield RisingEdge(clk) \ No newline at end of file + yield RisingEdge(clk) diff --git a/requirements_lint.txt b/requirements_lint.txt new file mode 100644 index 0000000..c49ca57 --- /dev/null +++ b/requirements_lint.txt @@ -0,0 +1,3 @@ +black~=22.3.0 +flake8~=4.0.1 +flake8-no-implicit-concat==0.3.3