Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit fcf0930
Author: Fabio Battaglia <tabaglio@posteo.net>
Date:   Tue Sep 3 07:28:34 2024 +0200

    Better check for CXFER response after a transfer

commit d602734
Author: Fabio Battaglia <tabaglio@posteo.net>
Date:   Mon Sep 2 20:56:34 2024 +0200

    replace some magic numbers with named fields

commit 837ca6b
Author: Fabio Battaglia <tabaglio@posteo.net>
Date:   Mon Sep 2 16:33:58 2024 +0200

    Correctly purge remaining data in serial port, and use correct endianneess for received checksum

commit cd3f467
Author: Fabio Battaglia <tabaglio@posteo.net>
Date:   Mon Sep 2 15:47:59 2024 +0200

    pad the cxfer commands, and transfer blocks of 1024k by default

commit 23f327e
Author: Fabio Battaglia <tabaglio@posteo.net>
Date:   Mon Sep 2 13:26:26 2024 +0200

    Add the correct command code to cxfer commands

commit a096845
Author: Fabio Battaglia <tabaglio@posteo.net>
Date:   Mon Sep 2 13:02:03 2024 +0200

    update board command interface for cxfer_read

commit 460edaf
Author: Fabio Battaglia <tabaglio@posteo.net>
Date:   Mon Sep 2 11:39:35 2024 +0200

    bump version and changelog

commit bfc56ed
Author: Fabio Battaglia <tabaglio@posteo.net>
Date:   Mon Sep 2 11:36:49 2024 +0200

    wire up the call to the cxfer read command

commit 0c67be2
Author: Fabio Battaglia <tabaglio@posteo.net>
Date:   Mon Sep 2 10:55:09 2024 +0200

    Implement code to read data with a cxfer

commit e8966be
Author: Fabio Battaglia <tabaglio@posteo.net>
Date:   Sun Sep 1 19:56:29 2024 +0200

    send command to start the transfer

commit 3e1c9f3
Author: Fabio Battaglia <tabaglio@posteo.net>
Date:   Sun Sep 1 19:51:36 2024 +0200

    Make checksum calculator available to everyone, add option to ignore response, test the grouper

commit d3385a8
Author: Fabio Battaglia <tabaglio@posteo.net>
Date:   Sun Sep 1 18:54:34 2024 +0200

    send address and shift map

commit 6350ded
Author: Fabio Battaglia <tabaglio@posteo.net>
Date:   Sun Sep 1 18:39:14 2024 +0200

    Add new utils file with utility code

commit 418d70d
Author: Fabio Battaglia <tabaglio@posteo.net>
Date:   Sun Sep 1 17:56:31 2024 +0200

    cxfer: send clear command

commit 9cdbac2
Author: Fabio Battaglia <tabaglio@posteo.net>
Date:   Sun Sep 1 17:51:48 2024 +0200

    begin writing cxfer code

commit 3b40abd
Author: Fabio Battaglia <tabaglio@posteo.net>
Date:   Sun Sep 1 16:34:53 2024 +0200

    removed useless code from hardware_board_commands, define empty cxfer_read in m3_board_commands

commit 5406509
Author: Fabio Battaglia <tabaglio@posteo.net>
Date:   Sun Sep 1 16:27:55 2024 +0200

    bump version, begin defining the cxfer read command
  • Loading branch information
hkzlab committed Sep 8, 2024
1 parent fdaf760 commit 1aa2bae
Show file tree
Hide file tree
Showing 10 changed files with 244 additions and 85 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Changelog
Changelog for dupicolib

## [0.5.0] - 2024-09-02
### Added
- Support for CXFER read mode

## [0.4.2] - 2024-08-18
### Added
- Added pin '0' toboard pin translation map to indicate an 'NC' pin
Expand Down
18 changes: 17 additions & 1 deletion dupicolib/board_commands_interface.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""This module is an abstract class to set the shape for classes providing higher-level interface to the boards"""

from typing import Dict
from typing import Callable, Dict
from abc import ABC

import serial
Expand Down Expand Up @@ -99,6 +99,22 @@ def detect_osc_pins(reads: int, ser: serial.Serial | None = None) -> int | None:
int | None: A bitmask with bits set to 1 for pins that were detected as flipping
"""
raise NotImplementedError()

@classmethod
def cxfer_read(cls, address_pins: list[int], data_pins: list[int], hi_pins: list[int], update_callback: Callable[[int], None] | None, ser: serial.Serial | None = None) -> bytes | None:
"""Uses the "Clever Transfer" mode on the dupico to read the content of an IC.
Args:
address_pins (list[int]): List of the pins composing the address, in order, starting from A0, and already mapped on the dupico socket
data_pins (list[int]): List of the pins composing the data, in order, starting from D0, and already mapped on the dupico socket
hi_pins (list[int]): List of the pins that must be always set to a high logic level during the transfer.
update_callback (Callable[[int], None] | None): A callback that will receive periodic updates of bytes read.
ser (serial.Serial | None, optional): Serial port on which to send the commands. Defaults to None.
Returns:
bytes | None: A bytes object containing the data read from the IC
"""
raise NotImplementedError()

@classmethod
def map_value_to_pins(cls, pins: list[int], value: int) -> int:
Expand Down
63 changes: 62 additions & 1 deletion dupicolib/board_interfaces/m3_board_commands.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
"""This module contains higher-level code for board interfacing"""

from typing import Dict, final
from typing import Callable, Dict, final
import struct
from enum import Enum

import serial

from dupicolib.board_interfaces.special_modes.cxfer import CXFERTransfer
from dupicolib.board_utilities import BoardUtilities
from dupicolib.hardware_board_commands import HardwareBoardCommands
import dupicolib.utils as DPUtils

_CXFER_SHIFT_BLOCK_SIZE: int = 16

class CommandCode(Enum):
WRITE = 0
Expand All @@ -16,6 +20,8 @@ class CommandCode(Enum):
POWER = 3
TEST = 5
OSC_DET = 8
CXFER = 9


@final
class M3BoardCommands(HardwareBoardCommands):
Expand Down Expand Up @@ -126,6 +132,61 @@ def detect_osc_pins(reads: int, ser: serial.Serial) -> int | None:
return struct.unpack('<Q', res)[0]
else:
return None

@classmethod
def cxfer_read(cls, address_pins: list[int], data_pins: list[int], hi_pins: list[int], update_callback: Callable[[int], None] | None, ser: serial.Serial) -> bytes | None:
address_shift_map: list[int] = []
data_shift_map: list[int] = []
hi_pin_mask: int
data_pin_mask: int

for pin in address_pins:
address_shift_map.append(cls._PIN_NUMBER_TO_INDEX_MAP[pin])

for pin in data_pins:
data_shift_map.append(cls._PIN_NUMBER_TO_INDEX_MAP[pin])

hi_pin_mask = cls.map_value_to_pins(hi_pins, 0xFFFFFFFFFFFFFFFF)
data_pin_mask = cls.map_value_to_pins(data_pins, 0xFFFFFFFFFFFFFFFF)

# Clear the configuration for CXFER on the board
BoardUtilities.send_binary_command(ser, bytes([CommandCode.CXFER.value, CXFERTransfer.CXFERSubCommand.CLEAR.value, *([0] * 16)]), 1)

# Set the address shift map
for idx, addr_chunk in enumerate(DPUtils.iter_grouper(address_shift_map, _CXFER_SHIFT_BLOCK_SIZE, 0)):
BoardUtilities.send_binary_command(ser, bytes([CommandCode.CXFER.value, CXFERTransfer.CXFERSubCommand.SET_ADDR_MAP_0.value + idx, *struct.pack(f'{len(addr_chunk)}B', *addr_chunk)]), 1)

# Set the data shift map
for idx, data_chunk in enumerate(DPUtils.iter_grouper(data_shift_map, _CXFER_SHIFT_BLOCK_SIZE, 0)):
BoardUtilities.send_binary_command(ser, bytes([CommandCode.CXFER.value, CXFERTransfer.CXFERSubCommand.SET_DATA_MAP_0.value + idx, *struct.pack(f'{len(data_chunk)}B', *data_chunk)]), 1)

# Set the hi-out mask
BoardUtilities.send_binary_command(ser, bytes([CommandCode.CXFER.value, CXFERTransfer.CXFERSubCommand.SET_HI_OUT_MASK.value, *struct.pack('<Q', hi_pin_mask), *([0] * 8)]), 1)

# Set the data mask
BoardUtilities.send_binary_command(ser, bytes([CommandCode.CXFER.value, CXFERTransfer.CXFERSubCommand.SET_DATA_MASK.value, *struct.pack('<Q', data_pin_mask), *([0] * 8)]), 1)

# Send address width
BoardUtilities.send_binary_command(ser, bytes([CommandCode.CXFER.value, CXFERTransfer.CXFERSubCommand.SET_ADDR_WIDTH.value, *struct.pack(f'B', len(address_pins)), *([0] * 15)]), 1)

# Send data width
BoardUtilities.send_binary_command(ser, bytes([CommandCode.CXFER.value, CXFERTransfer.CXFERSubCommand.SET_DATA_WIDTH.value, *struct.pack(f'B', len(data_pins)), *([0] * 15)]), 1)

data: bytes | None = CXFERTransfer.read(CommandCode.CXFER.value, ser, update_callback)

# Clear the buffer from the last response code from the dupico, and the checksum (command + parameter + checksum = 3 bytes)
resp_data: bytes = ser.read(3)

if (resp_size := len(resp_data)) != 3:
raise IOError(f'Response from transfer command is too short: {resp_size}!')
else:
if resp_data[0] != CommandCode.CXFER.value | BoardUtilities.BINARY_COMMAND_RESPONSE_FLAG:
raise IOError(f'Read wrong response type after execution of CXFER: {resp_data[0]:0{2}X}')
elif BoardUtilities.command_checksum_calculator(resp_data):
raise IOError('Wrong checksum for CXFER read command.')

return data


@classmethod
def map_value_to_pins(cls, pins: list[int], value: int) -> int:
Expand Down
96 changes: 96 additions & 0 deletions dupicolib/board_interfaces/special_modes/cxfer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""Code to support the CXFER transfer modes"""

from enum import Enum
import logging
import struct
from typing import Callable, final

import serial

from dupicolib.board_utilities import BoardUtilities

_LOGGER = logging.getLogger(__name__)

@final
class CXFERTransfer:
_XMIT_BLOCK_SIZE: int = 1024
_XFER_RESPONSE_SIZE: int = 4
_XFER_CHECKSUM_SIZE: int = 2

class CXFERSubCommand(Enum):
SET_ADDR_MAP_0 = 0x00
SET_ADDR_MAP_1 = 0x01
SET_ADDR_MAP_2 = 0x02
SET_ADDR_MAP_3 = 0x03
SET_DATA_MAP_0 = 0x10
SET_DATA_MAP_1 = 0x11
SET_DATA_MAP_2 = 0x12
SET_DATA_MAP_3 = 0x13
SET_HI_OUT_MASK = 0xE0
SET_DATA_MASK = 0xE1
SET_ADDR_WIDTH = 0xE2
SET_DATA_WIDTH = 0xE3
CLEAR = 0xF0
EXECUTE_READ = 0xFF

class CXFERResponse(Enum):
XFER_PKT_START = 0xDEADBEEF
XFER_PKT_FAIL = 0xBAADF00D
XFER_DONE = 0xC00FFFEE

@classmethod
def read(cls, command_code: int, ser: serial.Serial, update_callback: Callable[[int], None] | None = None) -> bytes | None:
file_data: bytearray = bytearray()
data_block: bytearray
resp: int

# Start the transfer!
BoardUtilities.send_binary_command(ser, bytes([command_code, cls.CXFERSubCommand.EXECUTE_READ.value, *([0] * 16)]), 0)

while True:
data: bytes = ser.read(cls._XFER_RESPONSE_SIZE)

if (data_len := len(data)) != cls._XFER_RESPONSE_SIZE:
raise IOError(f'Received {data_len} data for starting block!')

resp, = struct.unpack('>I', data)

if resp == cls.CXFERResponse.XFER_PKT_START.value:
_LOGGER.debug(f'Received a XFER_PKT_START packet, current file size {len(file_data)}')
elif resp == cls.CXFERResponse.XFER_DONE.value:
_LOGGER.info(f'Received a XFER_DONE packet, current file size {len(file_data)}')
break
else:
raise IOError(f'Received {resp:0{4}X} while expecting a start block.')

data_block = bytearray()
rem_data: int = cls._XMIT_BLOCK_SIZE

while rem_data > 0:
data = ser.read(16)

if (data_len := len(data)) <= 0:
raise IOError('Timed out while waiting to read data...')

rem_data = rem_data - data_len
data_block.extend(data)

calc_checksum: int = BoardUtilities.cxfer_checksum_calculator(bytes(data_block))
data = ser.read(cls._XFER_CHECKSUM_SIZE)
if (data_len := len(data)) != cls._XFER_CHECKSUM_SIZE:
raise IOError(f'Received {data_len} data for checksum!')
resp, = struct.unpack('<H', data)

if resp != calc_checksum:
raise IOError(f'Calculated checksum is {calc_checksum:0{4}X}, received is {resp:0{4}X}')

# Append the block data to the file buffer
file_data.extend(data_block)

# Once verified, send the checksum back
ser.write(data)

if update_callback:
update_callback(len(file_data))

return bytes(file_data)
25 changes: 17 additions & 8 deletions dupicolib/board_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ class BoardUtilities:
"""
This class contains basic utilities for board access.
"""

BINARY_COMMAND_RESPONSE_FLAG: int = 0x80

_MAX_RESPONSE_SIZE: int = 32
_ENCODING: str = 'ASCII'

_BINARY_COMMAND_RESPONSE_FLAG: int = 0x80

_LOGGER = logging.getLogger(__name__)

@classmethod
Expand Down Expand Up @@ -73,12 +73,17 @@ def initialize_connection(cls, ser: serial.Serial, retries: int = 2) -> bool:
return False

@classmethod
def send_binary_command(cls, ser: serial.Serial, cmd: bytes, resp_data_len: int) -> bytes | None:
chks: int = cls._checksum_calculator(cmd)
def send_binary_command(cls, ser: serial.Serial, cmd: bytes, resp_data_len: int = 0) -> bytes | None:
chks: int = cls.command_checksum_calculator(cmd)

ser.write(bytes([*cmd, chks]))

expected_resp = cmd[0] | cls._BINARY_COMMAND_RESPONSE_FLAG
# In this case, we just send the command and ignore any response, that we expect to be handled by the caller
if resp_data_len <= 0:
cls._LOGGER.debug(f'Sending command {cmd}, ignoring any response.')
return None

expected_resp = cmd[0] | cls.BINARY_COMMAND_RESPONSE_FLAG
resp_code: bytes = ser.read(1)
if (len(resp_code) != 1 or resp_code[0] != expected_resp):
cls._LOGGER.error(f'Got response {resp_code[0]:0{2}X} while expected was {expected_resp:0{2}X}')
Expand All @@ -91,13 +96,17 @@ def send_binary_command(cls, ser: serial.Serial, cmd: bytes, resp_data_len: int)
ser.reset_input_buffer()
return None

if cls._checksum_calculator(resp_code + resp_data) != 0:
if cls.command_checksum_calculator(resp_code + resp_data) != 0:
cls._LOGGER.error(f'Command has wrong checksum')
ser.reset_input_buffer()
return None

return resp_data[:-1] # Avoid returning the checksum

@classmethod
def _checksum_calculator(cls, data: bytes) -> int:
@staticmethod
def command_checksum_calculator(data: bytes) -> int:
return reduce(operator.sub, bytes([0, *data])) & 0xFF

@staticmethod
def cxfer_checksum_calculator(data: bytes) -> int:
return reduce(operator.add, bytes([0, *data])) & 0xFFFF
71 changes: 0 additions & 71 deletions dupicolib/hardware_board_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,73 +49,6 @@ def get_version(ser: serial.Serial) -> str | None:
else:
return None

@staticmethod
def test_board(ser: serial.Serial) -> bool | None:
"""Perform a minimal self-test of the board
Args:
ser (serial.Serial): serial port on which to send the command
Returns:
bool | None: True if test passed correctly, False otherwise
"""
raise NotImplementedError()

@staticmethod
def set_power(state: bool, ser: serial.Serial) -> bool | None:
"""Enable or disable the power on the socket VCC
Args:
state (bool): True if we wish power applied, False otherwise
ser (serial.Serial): serial port on which to send the command
Returns:
bool | None: True if power was applied, False otherwise, None in case we did not read the response correctly
"""
raise NotImplementedError()

@staticmethod
def write_pins(pins: int, ser: serial.Serial) -> int | None:
"""Toggle the specified pins and read their status back
Args:
ser (serial.Serial): serial port on which to send the command
pins (int): value that the pins will be set to. A bit set to '1' means that the pin will be pulled high
Returns:
int | None: The value we read back from the pins, or None in case of parsing issues
"""
raise NotImplementedError()

@staticmethod
def read_pins(ser: serial.Serial) -> int | None:
"""Read the value of the pins
Args:
ser (serial.Serial): serial port on which to send the command
Returns:
int | None: The value we read back from the pins, or None in case of parsing issues
"""
raise NotImplementedError()

@staticmethod
def detect_osc_pins(reads: int, ser: serial.Serial) -> int | None:
"""Repeat reads a number of times and reports which pins changed their state in at least one of the reads
Args:
tries (int): Number of reads to perform
ser (serial.Serial | None, optional): serial port on which to send the command. Defaults to None.
Returns:
int | None: A bitmask with bits set to 1 for pins that were detected as flipping
"""
raise NotImplementedError()

@classmethod
def map_value_to_pins(cls, pins: list[int], value: int) -> int:
raise NotImplementedError()

@staticmethod
def _map_value_to_pins(map: Dict[int, int], pins: list[int], value: int) -> int:
"""This method takes a number to set on selected pins and uses a list of said pins to
Expand All @@ -138,10 +71,6 @@ def _map_value_to_pins(map: Dict[int, int], pins: list[int], value: int) -> int:
ret_val = ret_val | (1 << pin_pos)

return ret_val

@classmethod
def map_pins_to_value(cls, pins: list[int], value: int) -> int:
raise NotImplementedError()

@staticmethod
def _map_pins_to_value(map: Dict[int, int], pins: list[int], value: int) -> int:
Expand Down
16 changes: 16 additions & 0 deletions dupicolib/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Miscellaneous utilities used by the library"""

from itertools import zip_longest
from collections.abc import Iterable
from typing import Iterator, Tuple, TypeVar

T = TypeVar('T')


# From https://stackoverflow.com/questions/434287/how-to-iterate-over-a-list-in-chunks
# See https://docs.python.org/3/library/itertools.html#itertools.zip_longest
def iter_grouper(iterable: Iterable[T], group_size: int, fillvalue: T | None = None) -> Iterator[Tuple[T]]:
# We are feeding multiple copies of the same iterator to zip_longest, so it will consume from the same
# source when zipping values, resulting in grouping the values
args = [iter(iterable)] * group_size
return zip_longest(*args, fillvalue=fillvalue)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "dupicolib"
version = "0.4.2"
version = "0.5.0"
description = "Library to interface with the dupico hardware"
authors = [
{ name = "Fabio Battaglia", email = "hkzlabnet@gmail.com" }
Expand Down
Loading

0 comments on commit 1aa2bae

Please sign in to comment.