Skip to content
110 changes: 107 additions & 3 deletions canbus/CAN.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,88 @@ class CanError:
ERROR_OK = ERROR.ERROR_OK
ERROR_FAIL = ERROR.ERROR_FAIL

# Decodes contents of various MCP2015 status registers
# Returns blank-delimited string, one for each keyword-value,
# e.g. for 3 keywords:
# {status_reg_flags} {interrupt_reg_flags} {error_reg_flags}
# e.g. for 2 keywords (status= and error=):
# {status_reg_flags} {error_reg_flags}
# e.g. for 1 keyword (interrupt=):
# {interrupt_reg_flags}
# txctl: TXBnCTRL

@classmethod
def decode(cls,status=None,interrupt=None,error=None,txctl=None):

def cat(txt):
nonlocal pfx
res, pfx = pfx + txt, ','
return res

res, nxt, pfx = '', '', ''
if status is not None:
res += '{'
bits = status & 0xe0
if bits == 0x00: res += cat('Normal-mode')
if bits == 0x20: res += cat('Sleep-mode')
if bits == 0x40: res += cat('Loopback-mode')
if bits == 0x60: res += cat('Listen-only-mode')
if bits == 0x80: res += cat('Config-mode')
bits = status & 0x0e
if bits == 0x00: res += cat('No-interrupt')
if bits == 0x02: res += cat('Error-interrupt')
if bits == 0x04: res += cat('Wake-up-interrupt')
if bits == 0x06: res += cat('TXB0-interrupt')
if bits == 0x08: res += cat('TXB1-interrupt')
if bits == 0x0a: res += cat('TXB2-interrupt')
if bits == 0x0c: res += cat('RXB0-interrupt')
if bits == 0x0e: res += cat('RXB1-interrupt')
res += '}'
nxt = ' '

if txctl is not None:
res += nxt + '{'
pfx = ''
if txctl & 0x80: res += cat('<unknown>')
if txctl & 0x40: res += cat('Msg-abort')
if txctl & 0x20: res += cat('Msg-lostarb')
if txctl & 0x10: res += cat('Msg-xmt-error')
if txctl & 0x08: res += cat('Msg-xmt-req')
if txctl & 0x04: res += cat('<unknown>')
if txctl & 0x02: res += cat('Msg-prio1')
if txctl & 0x01: res += cat('Msg-prio0')
res += '}'

if interrupt is not None:
res += nxt + '{'
pfx = ''
if interrupt & 0x80: res += cat('Message-error')
if interrupt & 0x40: res += cat('Wake-up')
if interrupt & 0x20: res += cat('Error')
if interrupt & 0x10: res += cat('Xmtbuf2-empty')
if interrupt & 0x08: res += cat('Xmtbuf1-empty')
if interrupt & 0x04: res += cat('Xmtbuf0-empty')
if interrupt & 0x02: res += cat('Rcvbuf1-full')
if interrupt & 0x02: res += cat('Rcvbuf0-full')
res += '}'
nxt = ' '

if error is not None:
res += nxt + '{'
pfx = ''
if error & 0x80: res += cat('Rcvbuf1-overflow')
if error & 0x40: res += cat('Rcvbuf0-overflow')
if error & 0x20: res += cat('Bus-off')
if error & 0x10: res += cat('Xmterror-pasv')
if error & 0x08: res += cat('Rcverror-pasv')
if error & 0x04: res += cat('Xmterror-warn')
if error & 0x02: res += cat('Rcverror-warn')
if error & 0x01: res += cat('Error-warn')
res += '}'

return res


class CanMsgFlag:
RTR = CAN_ERR_FLAG
EFF = CAN_EFF_FLAG
Expand Down Expand Up @@ -43,7 +125,7 @@ def _get_frame(self):
class CAN_1:
ERROR = ERROR
def __init__(self, board: str = "CANBed_RP2040", spi: int = 0, spics: int = 9):
self.can = CAN(SPI(cs=9))
self.can = CAN(SPI(cs=spics))
def begin(self, bitrate: int = CAN_SPEED.CAN_500KBPS, canclock: int = CAN_CLOCK.MCP_16MHZ, mode: str = 'normal'):
ret = self.can.reset()
if ret != ERROR.ERROR_OK:
Expand All @@ -53,6 +135,23 @@ def begin(self, bitrate: int = CAN_SPEED.CAN_500KBPS, canclock: int = CAN_CLOCK.
return ret
ret = self.can.setNormalMode()
return ret
def setLoopback(self):
return self.can.setLoopbackMode()
def clearInterrupts(self):
self.can.clearInterrupts()
def getInterrupts(self):
return self.can.getInterrupts()
def getInterruptMask(self):
return self.can.getInterruptMask()
def getErrorFlags(self):
return self.can.getErrorFlags()
def clearErrorFlags(self, RXERR=False, MERR=False):
# Keyword args for clearing more conditions in future if required, e.g.
# TXBO, passive errors, warnings
if RXERR: self.can.clearRXnOVRFlags()
if MERR: self.can.clearMERR()
def getStatus(self):
return self.can.getStatus()
def init_mask(self, mask, is_ext_id, mask_id):
ret = self.can.setFilterMask(mask + 1, is_ext_id, mask_id)
if ret != ERROR.ERROR_OK:
Expand All @@ -70,9 +169,14 @@ def checkReceive(self):
def recv(self):
error, frame = self.can.readMessage()
msg = CanMsg()
msg._set_frame(frame)
if frame is not None: msg._set_frame(frame)
return error, msg
def recvinto(self,msg):
frame = msg._get_frame()
error = self.can.readMessageInto(frame)
msg._set_frame(frame)
return error
def send(self, msg):
frame = msg._get_frame()
error = self.can.sendMessage(frame)
return error
return error
20 changes: 12 additions & 8 deletions canbus/internal/can/can.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,14 @@ def __init__(self, can_id: int, data: bytes = b"") -> None:
#
# 32 bit CAN ID + EFF/RTR/ERR flags
#
self.can_id = can_id # type: int
self.data = data # type: bytes
self._can_id = can_id # type: int
self._arbitration_id = can_id & CAN_EFF_MASK # type: int
self._data = bytearray(CAN_MAX_DLEN) # type: bytes
if data is not None:
self._dlc = len(data)
self._data[0:len(data)] = data
else:
self._dlc = 0

@property
def can_id(self) -> int:
Expand All @@ -49,17 +55,15 @@ def data(self) -> bytes:

@data.setter
def data(self, data: bytes) -> None:
self._data = b"" # type: bytes
self._dlc = 0 # frame payload length in byte (0 .. CAN_MAX_DLEN)

if not data:
self._dlc = 0 # frame payload length in byte (0 .. CAN_MAX_DLEN)
return

if len(data) > CAN_MAX_DLEN:
raise Exception("The CAN frame data length exceeds the maximum")

self._data = data
self._dlc = len(data)
self._data[0:len(data)] = data[0:len(data)]

@property
def arbitration_id(self) -> int:
Expand All @@ -85,6 +89,6 @@ def __str__(self) -> str:
data = (
"remote request"
if self.is_remote_frame
else " ".join("{:02X}".format(b) for b in self.data)
else " ".join("{:02X}".format(b) for b in self._data[0:self._dlc])
)
return "{: >8X} [{}] {}".format(self.arbitration_id, self.dlc, data)
return "{:08X} [{}] {}".format(self.arbitration_id, self._dlc, data)
52 changes: 41 additions & 11 deletions canbus/internal/can/mcp2515.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,16 @@ def readRegister(self, reg: int) -> int:

return ret

def readRegisters(self, reg: int, n: int) -> List[int]:
def readRegisters(self, reg: int, n: int, buf: bytearray = bytearray(CAN_MAX_DLEN)) -> bytearray:
self.SPI.start()
self.SPI.transfer(INSTRUCTION.INSTRUCTION_READ)
self.SPI.transfer(reg)
# MCP2515 has auto-increment of address-pointer
values = []
for i in range(n):
values.append(self.SPI.transfer(read=True))
buf[i] = self.SPI.transfer(read=True)
self.SPI.end()

return values
return buf

def setRegister(self, reg: int, value: int) -> None:
self.SPI.start()
Expand Down Expand Up @@ -392,9 +391,40 @@ def sendMessage_(self, frame: Any) -> int:

return ERROR.ERROR_ALLTXBUSY

def readMessageInto(self, frame: Any, rxbn: int = None) -> int:
if rxbn is None:
return self.readMessageInto_(frame)

rxb = RXB[rxbn]

tbufdata = self.readRegisters(rxb.SIDH, 1 + CAN_IDLEN)

id_ = (tbufdata[MCP_SIDH] << 3) + (tbufdata[MCP_SIDL] >> 5)

if (tbufdata[MCP_SIDL] & TXB_EXIDE_MASK) == TXB_EXIDE_MASK:
id_ = (id_ << 2) + (tbufdata[MCP_SIDL] & 0x03)
id_ = (id_ << 8) + tbufdata[MCP_EID8]
id_ = (id_ << 8) + tbufdata[MCP_EID0]
id_ |= CAN_EFF_FLAG

dlc = tbufdata[MCP_DLC] & DLC_MASK
if dlc > CAN_MAX_DLEN:
return ERROR.ERROR_FAIL

ctrl = self.readRegister(rxb.CTRL)
if ctrl & RXBnCTRL_RTR:
id_ |= CAN_RTR_FLAG

frame.can_id = id_

frame.data = self.readRegisters(rxb.DATA, dlc)[0:dlc]

return ERROR.ERROR_OK

def readMessage(self, rxbn: int = None) -> Tuple[int, Any]:
if rxbn is None:
return self.readMessage_()
frame = CANFrame(0)
return self.readMessageInto_(frame), frame

rxb = RXB[rxbn]

Expand All @@ -416,23 +446,23 @@ def readMessage(self, rxbn: int = None) -> Tuple[int, Any]:
if ctrl & RXBnCTRL_RTR:
id_ |= CAN_RTR_FLAG

frame = CANFrame(can_id=id_)
frame = CANFrame(id_)

frame.data = bytearray(self.readRegisters(rxb.DATA, dlc))
frame.data = copy.copy(self.readRegisters(rxb.DATA, dlc)[0:dlc])

return ERROR.ERROR_OK, frame

def readMessage_(self) -> Tuple[int, Any]:
rc = ERROR.ERROR_NOMSG, None
def readMessageInto_(self, frame: Any) -> int:
rc = ERROR.ERROR_NOMSG

stat = self.getStatus()
if stat & STAT.STAT_RX0IF and self.mcp2515_rx_index == 0:
rc = self.readMessage(RXBn.RXB0)
rc = self.readMessageInto(frame,RXBn.RXB0)
if self.getStatus() & STAT.STAT_RX1IF:
self.mcp2515_rx_index = 1
self.modifyRegister(REGISTER.MCP_CANINTF, RXB[RXBn.RXB0].CANINTFRXnIF, 0)
elif stat & STAT.STAT_RX1IF:
rc = self.readMessage(RXBn.RXB1)
rc = self.readMessageInto(frame,RXBn.RXB1)
self.mcp2515_rx_index = 0
self.modifyRegister(REGISTER.MCP_CANINTF, RXB[RXBn.RXB1].CANINTFRXnIF, 0)

Expand Down
24 changes: 15 additions & 9 deletions canbus/internal/spi/spi.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

class SPI:
def __init__(self, cs: int, baudrate: int = SPI_DEFAULT_BAUDRATE) -> None:
# Somehow the self.init() call invokes the machine.SPI .init() method?!
self._SPICS = Pin(cs, Pin.OUT)
self._SPI = self.init(baudrate=baudrate) # type: Any
self.end()
Expand All @@ -25,23 +26,28 @@ def init(self, baudrate: int) -> Any:

def start(self) -> None:
self._SPICS.value(0)
time.sleep_us(SPI_HOLD_US) # type: ignore
### time.sleep_us(SPI_HOLD_US) # type: ignore # MCP2515 spec = 50 ns not us

def end(self) -> None:
self._SPICS.value(1)
time.sleep_us(SPI_HOLD_US) # type: ignore

def transfer(self, value: int = SPI_DUMMY_INT, read: bool = False) -> Optional[int]:
### time.sleep_us(SPI_HOLD_US) # type: ignore # MCP2515 spec = 50 ns not us

def transfer(self,
value: int = SPI_DUMMY_INT,
read: bool = False,
val: bytes = bytearray(SPI_TRANSFER_LEN),
buf: bytes = bytearray(SPI_TRANSFER_LEN)
) -> Optional[int]:
"""Write int value to SPI and read SPI as int value simultaneously.
This method supports transfer single byte only,
and the system byte order doesn't matter because of that. The input and
output int value are unsigned.
val, buf used for static allocation so no changes to heap
"""
value_as_byte = value.to_bytes(SPI_TRANSFER_LEN, sys.byteorder)
val = value.to_bytes(SPI_TRANSFER_LEN, sys.byteorder)

if read:
output = bytearray(SPI_TRANSFER_LEN)
self._SPI.write_readinto(value_as_byte, output)
return int.from_bytes(output, sys.byteorder)
self._SPI.write(value_as_byte)
self._SPI.write_readinto(val, buf)
return int(buf[0])
self._SPI.write(val)
return None
7 changes: 5 additions & 2 deletions readme.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
# MicroPython CAN Bus Library

MicroPython library for MCP2515, it works for most of the MicroPython boards.
This version of the library is extended to optimize for speed and to use
static memory as much as possible for reliable use in IRQs.

With this library, you can,

- Send a CAN 2.0 frame
- Receive a CAN 2.0 frame
- Receive a CAN 2.0 frame (as a new buffer or into a pre-allocated buffer)
- Get data from OBD-II
- Set the masks and filters, there're 32 masks and filters.
- Set the masks and filters, there're 32 masks and filters
- Decode various error registers into human-readable form.

## Getting Started

Expand Down