Skip to content

Commit

Permalink
crc9 fixed, transmission generator 1/2, more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
smarek committed Jul 13, 2022
1 parent 7afa1d8 commit fdb3fa7
Show file tree
Hide file tree
Showing 24 changed files with 455 additions and 75 deletions.
2 changes: 1 addition & 1 deletion okdmr/dmrlib/etsi/crc/crc9.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def calculate_from_parts(
):
source_data: bitarray = bytes_to_bits(data, endian="big")

if crc32 is not None:
if crc32 is not None and crc32 != 0:
if isinstance(crc32, int):
crc32 = crc32.to_bytes(4, byteorder="big")
assert len(crc32) == 4, "32-bit CRC must be exactly 4-bytes long"
Expand Down
4 changes: 3 additions & 1 deletion okdmr/dmrlib/etsi/layer2/burst.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ class Burst:
"""

def __init__(
self, full_bits: bitarray, burst_type: BurstTypes = BurstTypes.Undefined
self,
full_bits: bitarray = bitarray(264),
burst_type: BurstTypes = BurstTypes.Undefined,
):
assert (
len(full_bits) == 264
Expand Down
42 changes: 42 additions & 0 deletions okdmr/dmrlib/etsi/layer2/elements/fragment_sequence_number.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from bitarray import bitarray
from bitarray.util import ba2int, int2ba

from okdmr.dmrlib.utils.bits_interface import BitsInterface


class FragmentSequenceNumber(BitsInterface):
SINGLE_UNCONFIRMED_FRAGMENT_VALUE: int = 0b0000
SINGLE_CONFIRMED_FRAGMENT_VALUE: int = 0b1000

def __init__(self, value: int = 0):
assert (
0b0000 <= value <= 0b1111
), f"FSN value out of range 0b0000-0b1111 got {bin(value)}"
self.value: int = value

def is_last(self) -> bool:
return (
# single (last) unconfirmed fragment
self.value == FragmentSequenceNumber.SINGLE_UNCONFIRMED_FRAGMENT_VALUE
# 0b1000 is single confirmed, bigger values are last fragment
or self.value >= FragmentSequenceNumber.SINGLE_CONFIRMED_FRAGMENT_VALUE
)

def __repr__(self) -> str:
desc: str = ""
if self.value == FragmentSequenceNumber.SINGLE_UNCONFIRMED_FRAGMENT_VALUE:
desc = "Unconfirmed data single fragment"
elif self.value == FragmentSequenceNumber.SINGLE_CONFIRMED_FRAGMENT_VALUE:
desc = "Confirmed data single fragment"
else:
num: int = self.value & 0b111
is_last: str = "Last" if self.is_last() else "Subsequent"
desc = f"{is_last} confirmed data fragment, number {num}"
return f"[FSN: {desc}]"

@staticmethod
def from_bits(bits: bitarray) -> "FragmentSequenceNumber":
return FragmentSequenceNumber(value=ba2int(bits[0:4]))

def as_bits(self) -> bitarray:
return int2ba(self.value, length=4)
6 changes: 3 additions & 3 deletions okdmr/dmrlib/etsi/layer2/elements/voice_bursts.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import enum
from typing import Any


@enum.unique
Expand All @@ -17,9 +18,8 @@ class VoiceBursts(enum.Enum):
VoiceBurstF = 105

@classmethod
def _missing_(cls, value: int):
def _missing_(cls, value: int) -> Any:
assert (
100 <= value <= 105
) or value == 1, f"Unknown VoiceBurst value, got {value}"

return VoiceBursts.Unknown
# no return value here, since assert checks all available values
8 changes: 4 additions & 4 deletions okdmr/dmrlib/etsi/layer2/pdu/csbk.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ class CSBK(BitsInterface):

def __init__(
self,
last_block: Union[bool, int],
protect_flag: Union[bool, int],
csbko: CsbkOpcodes,
manufacturers_feature_set_id: FeatureSetIDs,
last_block: Union[bool, int] = True,
protect_flag: Union[bool, int] = False,
manufacturers_feature_set_id: FeatureSetIDs = FeatureSetIDs.StandardizedFID,
# default value for crc indicates, it must be recalculated on construct
crc: int = 0,
# bs outbound activation fields
Expand Down Expand Up @@ -310,7 +310,7 @@ def from_bits(bits: bitarray) -> "CSBK":
raw_data=bits[16:80],
)

raise ValueError(
raise NotImplementedError(
f"Not-implemented CSBKO {csbko} PDU {bits_to_bytes(bits).hex()}"
)

Expand Down
31 changes: 20 additions & 11 deletions okdmr/dmrlib/etsi/layer2/pdu/data_header.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from okdmr.dmrlib.etsi.layer2.elements.csbk_opcodes import CsbkOpcodes
from okdmr.dmrlib.etsi.layer2.elements.data_packet_formats import DataPacketFormats
from okdmr.dmrlib.etsi.layer2.elements.defined_data_formats import DefinedDataFormats
from okdmr.dmrlib.etsi.layer2.elements.fragment_sequence_number import (
FragmentSequenceNumber,
)
from okdmr.dmrlib.etsi.layer2.elements.full_message_flag import FullMessageFlag
from okdmr.dmrlib.etsi.layer2.elements.resynchronize_flag import ResynchronizeFlag
from okdmr.dmrlib.etsi.layer2.elements.sap_identifier import SAPIdentifier
Expand All @@ -26,7 +29,7 @@ class DataHeader(BitsInterface):
def __init__(
self,
dpf: DataPacketFormats,
crc: Optional[bitarray],
crc: Optional[bitarray] = None,
# some common fields
is_group: Union[int, bool] = False,
is_response_requested: Union[int, bool] = False,
Expand All @@ -38,7 +41,7 @@ def __init__(
blocks_to_follow: int = 0,
resynchronize_flag: Optional[ResynchronizeFlag] = None,
send_sequence_number: int = 0,
fragment_sequence_number: int = 0,
fragment_sequence_number: Union[FragmentSequenceNumber, int] = 0,
# Confirmed Response packet Header (C_RHEAD) PDU
response_class: int = 0,
response_type: int = 0,
Expand All @@ -57,7 +60,7 @@ def __init__(
supplementary_flag: Optional[SupplementaryFlag] = None,
):
self.data_packet_format: DataPacketFormats = dpf
self.crc: bitarray = crc or bitarray()
self.crc: bitarray = crc or bitarray([0] * 16)
self.is_group: bool = is_group in (True, 1)
self.is_response_requested: bool = is_response_requested in (True, 1)
self.pad_octet_count: int = pad_octet_count
Expand All @@ -68,7 +71,11 @@ def __init__(
self.blocks_to_follow: int = blocks_to_follow
self.resynchronize_flag: Optional[ResynchronizeFlag] = resynchronize_flag
self.send_sequence_number: int = send_sequence_number
self.fragment_sequence_number: int = fragment_sequence_number
self.fragment_sequence_number: FragmentSequenceNumber = (
FragmentSequenceNumber(fragment_sequence_number)
if isinstance(fragment_sequence_number, int)
else fragment_sequence_number
)
# Confirmed Response packet Header (C_RHEAD) PDU
self.response_class: int = response_class
self.response_type: int = response_type
Expand All @@ -86,7 +93,7 @@ def __init__(
self.udt_opcode: Optional[CsbkOpcodes] = udt_opcode
self.supplementary_flag: Optional[SupplementaryFlag] = supplementary_flag

if len(self.crc) < 16 or ba2int(self.crc) <= 0:
if len(self.crc) != 16 or ba2int(self.crc) <= 0:
self.crc_ok: bool = True
self.crc = int2ba(
CRC16.calculate(self.as_bits()[:-16].tobytes(), CrcMasks.DataHeader),
Expand Down Expand Up @@ -117,7 +124,7 @@ def __repr__(self):
+ f"[PAD OCTETS: {self.pad_octet_count}] "
+ f"[SOURCE: {self.llid_source}] [DESTINATION: {self.llid_destination}] [{self.full_message_flag}] "
+ f"[BTF: {self.blocks_to_follow}] [{self.resynchronize_flag}] [N(S): {self.send_sequence_number}] "
+ f"[FSN: {self.fragment_sequence_number}]"
+ f"{repr(self.fragment_sequence_number)}"
)
elif self.data_packet_format == DataPacketFormats.DataPacketUnconfirmed:
descr += (
Expand All @@ -128,7 +135,7 @@ def __repr__(self):
+ f"[PAD OCTETS: {self.pad_octet_count}] "
+ f"[SOURCE: {self.llid_source}] [DESTINATION: {self.llid_destination}] [{self.full_message_flag}] "
+ f"[BTF: {self.blocks_to_follow}] "
+ f"[FSN: {self.fragment_sequence_number}]"
+ f"{repr(self.fragment_sequence_number)}"
)
elif self.data_packet_format == DataPacketFormats.ResponsePacket:
descr += (
Expand Down Expand Up @@ -175,7 +182,7 @@ def as_bits(self) -> bitarray:
+ int2ba(self.blocks_to_follow, length=7)
+ bitarray([self.resynchronize_flag.value])
+ int2ba(self.send_sequence_number, length=3)
+ int2ba(self.fragment_sequence_number, length=4)
+ self.fragment_sequence_number.as_bits()
+ self.crc
)
elif self.data_packet_format == DataPacketFormats.DataPacketUnconfirmed:
Expand All @@ -190,7 +197,7 @@ def as_bits(self) -> bitarray:
+ bitarray([self.full_message_flag.value])
+ int2ba(self.blocks_to_follow, length=7)
+ bitarray([0] * 4)
+ int2ba(self.fragment_sequence_number, length=4)
+ self.fragment_sequence_number.as_bits()
+ self.crc
)
elif self.data_packet_format == DataPacketFormats.ResponsePacket:
Expand Down Expand Up @@ -244,7 +251,9 @@ def as_bits(self) -> bitarray:
+ self.udt_opcode.as_bits()
+ self.crc
)
raise ValueError(f"as_bits not implemented for {self.data_packet_format}")
raise NotImplementedError(
f"as_bits not implemented for {self.data_packet_format}"
)

@staticmethod
def from_bits(bits: bitarray) -> "DataHeader":
Expand Down Expand Up @@ -326,4 +335,4 @@ def from_bits(bits: bitarray) -> "DataHeader":
udt_opcode=CsbkOpcodes.from_bits(bits[74:80]),
)
else:
raise KeyError(f"from_bits not implemented for {dpf}")
raise NotImplementedError(f"from_bits not implemented for {dpf}")
7 changes: 4 additions & 3 deletions okdmr/dmrlib/etsi/layer2/pdu/embedded_signalling.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ def __init__(
assert (
0b0 <= colour_code <= 0b1111
), f"CC (Colour Code) value must be in range 0-15, got {colour_code}"
assert (
0b00 <= link_control_start_stop <= 0b11
), f"LCSS value must be in range 0-3, got {link_control_start_stop}"
if isinstance(link_control_start_stop, int):
assert (
0b00 <= link_control_start_stop <= 0b11
), f"LCSS value must be in range 0-3, got {link_control_start_stop}"
assert (
0b0 <= preemption_and_power_control_indicator <= 0b1
), f"PI must be in range 0-1, got {preemption_and_power_control_indicator}"
Expand Down
12 changes: 7 additions & 5 deletions okdmr/dmrlib/etsi/layer2/pdu/rate12_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ def __init__(
crc32 if isinstance(crc32, int) else int.from_bytes(crc32, byteorder="big")
)

crc9 = crc9 if isinstance(crc9, int) else ba2int(crc9)
self.crc9: int = self.calculate_crc9() if self.is_confirmed() else crc9
self.crc9_ok: bool = self.crc9 == crc9 if crc9 > 0 else False
self.crc9: int = crc9 if isinstance(crc9, int) else ba2int(crc9[::-1])
calculated_crc9 = self.calculate_crc9()
if self.crc9 <= 0:
self.crc9 = calculated_crc9
self.crc9_ok: bool = self.crc9 == calculated_crc9

@staticmethod
def validate_packet_type(packet_type: Rate12DataTypes, data_length: int) -> bool:
Expand Down Expand Up @@ -150,7 +152,7 @@ def as_bits(self):
# R_1_2_DATA PDU content for confirmed data
return (
int2ba(self.dbsn, length=7)
+ int2ba(self.crc9, length=9)
+ int2ba(self.crc9, length=9, endian="little")
+ bytes_to_bits(self.data)
)
elif self.packet_type == Rate12DataTypes.UnconfirmedLastBlock:
Expand All @@ -160,7 +162,7 @@ def as_bits(self):
# R_1_2_LDATA PDU content for confirmed data
return (
int2ba(self.dbsn, length=7)
+ int2ba(self.crc9, length=9)
+ int2ba(self.crc9, length=9, endian="little")
+ bytes_to_bits(self.data)
+ int2ba(self.crc32, length=32)
)
12 changes: 7 additions & 5 deletions okdmr/dmrlib/etsi/layer2/pdu/rate1_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ def __init__(
crc32 if isinstance(crc32, int) else int.from_bytes(crc32, byteorder="big")
)

self.crc9: int = self.calculate_crc9()
crc9 = crc9 if isinstance(crc9, int) else ba2int(crc9)
self.crc9_ok: bool = self.crc9 == crc9 if crc9 > 0 else False
self.crc9: int = crc9 if isinstance(crc9, int) else ba2int(crc9[::-1])
calculated_crc9 = self.calculate_crc9()
if self.crc9 <= 0:
self.crc9 = calculated_crc9
self.crc9_ok: bool = self.crc9 == calculated_crc9

@staticmethod
def validate_packet_type(packet_type: Rate1DataTypes, data_length: int) -> bool:
Expand Down Expand Up @@ -155,7 +157,7 @@ def as_bits(self):
# R_1_DATA PDU content for confirmed data
return (
int2ba(self.dbsn, length=7)
+ int2ba(self.crc9, length=9)
+ int2ba(self.crc9, length=9, endian="little")
+ bytes_to_bits(self.data)
)
elif self.packet_type == Rate1DataTypes.UnconfirmedLastBlock:
Expand All @@ -165,7 +167,7 @@ def as_bits(self):
# R_1_LDATA PDU content for confirmed data
return (
int2ba(self.dbsn, length=7)
+ int2ba(self.calculate_crc9(), length=9)
+ int2ba(self.crc9, length=9, endian="little")
+ bytes_to_bits(self.data)
+ int2ba(self.crc32, length=32)
)
10 changes: 7 additions & 3 deletions okdmr/dmrlib/etsi/layer2/pdu/rate34_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ def __init__(
crc32 if isinstance(crc32, int) else int.from_bytes(crc32, byteorder="big")
)

self.crc9: int = self.calculate_crc9()
crc9 = crc9 if isinstance(crc9, int) else ba2int(crc9)
self.crc9_ok: bool = self.crc9 == crc9 if crc9 > 0 else False
self.crc9: int = crc9 if isinstance(crc9, int) else ba2int(crc9[::-1])
calculated_crc9 = self.calculate_crc9()
if self.crc9 <= 0:
self.crc9 = calculated_crc9
self.crc9_ok: bool = self.crc9 == calculated_crc9

def calculate_crc9(self) -> int:
return CRC9.calculate_from_parts(
Expand All @@ -72,6 +74,7 @@ def __repr__(self) -> str:
elif self.packet_type == Rate34DataTypes.Confirmed:
return (
f"[RATE 3/4 DATA CONFIRMED] [DATA(16) {self.data.hex()}]"
+ f" [DBSN: {self.dbsn}]"
+ f" [CRC9: {self.crc9}]"
+ (" [CRC9 INVALID]" if not self.crc9_ok else "")
)
Expand All @@ -83,6 +86,7 @@ def __repr__(self) -> str:
elif self.packet_type == Rate34DataTypes.ConfirmedLastBlock:
return (
f"[RATE 3/4 DATA CONFIRMED - LAST BLOCK] [DATA(12) {self.data.hex()}]"
+ f" [DBSN: {self.dbsn}]"
+ f" [CRC9: {self.crc9}]"
+ (" [CRC9 INVALID]" if not self.crc9_ok else "")
+ f" [CRC32 int({self.crc32}) hex({self.crc32.to_bytes(4, byteorder='big').hex()})]"
Expand Down
3 changes: 2 additions & 1 deletion okdmr/dmrlib/motorola/arrp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ class ARRP(MBXMLDocument):
def get_configuration(
doc_type: MBXMLDocumentIdentifier,
) -> Dict[MBXMLTokenType, Dict[int, MBXMLToken]]:
pass
# TODO implement
return {}
Loading

0 comments on commit fdb3fa7

Please sign in to comment.