Skip to content

Commit

Permalink
repeater/storage enhancements, more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
smarek committed Aug 28, 2023
1 parent ba16a9a commit c85c024
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 16 deletions.
3 changes: 3 additions & 0 deletions okdmr/dmrlib/protocols/hytera/hstrp_datagram_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ def hstrp_send_heartbeat(self, addr: Tuple[Union[str, Any], int]) -> HSTRP:
return hb

def connection_made(self, transport: BaseTransport) -> None:
assert isinstance(
transport, BaseTransport
), f"transport is of unexpected type {type(transport)}"
if self.transport and not self.transport.is_closing():
self.log_warning(
f"HSTRP transport being replaced, old not yet disconnected"
Expand Down
5 changes: 5 additions & 0 deletions okdmr/dmrlib/protocols/hytera/p2p_datagram_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,8 @@ def datagram_received(self, data: bytes, address: ADDRESS_TYPE) -> None:
"Idle packet received, %d bytes from %s" % (len(data), address)
)
self.log_debug(data.hex())

def disconnect(self) -> None:
if self.transport and not self.transport.is_closing():
# reset connection state
self.transport.sendto(bytes([0x00]))
5 changes: 3 additions & 2 deletions okdmr/dmrlib/storage/repeater.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(
serial: str = "",
address_in: ADDRESS_TYPE = ADDRESS_EMPTY,
address_out: ADDRESS_TYPE = ADDRESS_EMPTY,
address_nat: ADDRESS_TYPE = ADDRESS_EMPTY,
snmp_enabled: bool = True,
nat_enabled: bool = False,
logger: Logger = None,
Expand All @@ -36,7 +37,7 @@ def __init__(
""" Address(IP+Port) from which the data come """
self.address_out: ADDRESS_TYPE = address_out
""" Address(IP+Port) for sending data to repeater """
self.address_nat: ADDRESS_TYPE = ADDRESS_EMPTY
self.address_nat: ADDRESS_TYPE = address_nat
""" Address(IP+Port) to which Repeater is sending data (NAT external IP + Forwarded Port) """
self.snmp_enabled: bool = snmp_enabled
self.nat_enabled: bool = nat_enabled
Expand Down Expand Up @@ -122,7 +123,7 @@ def read_snmp_values(
return {}

snmp_data = asyncio.run(
SNMP().walk_ip(ip=self.address_out[0], snmp_community=snmp_community)
SNMP().walk_ip(ip=self.address_in[0], snmp_community=snmp_community)
)
if patch_self:
print(snmp_data)
Expand Down
35 changes: 29 additions & 6 deletions okdmr/dmrlib/storage/repeater_storage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from logging import Logger
from typing import Optional, Dict
from typing import Optional, Dict, List
from uuid import UUID

from okdmr.dmrlib.storage import ADDRESS_TYPE, ADDRESS_EMPTY
Expand All @@ -26,6 +26,8 @@ def match_incoming(

if not found and auto_create:
found = self.create_repeater(dmr_id=None, address_in=address)
# store the newly created repeater
self.__repeaters[found.id] = found

return self.save(rpt=found, patch=patch)

Expand Down Expand Up @@ -66,6 +68,18 @@ def match_attr(self, attr_name: str, match_value: any) -> Optional[Repeater]:
)
return found

def match_ip_incoming(self, ip: str) -> Optional[Repeater]:
found: Optional[Repeater] = None
for repeater in self.__repeaters.values():
if repeater.address_in[0] == ip:
if not found:
found = repeater
else:
self.__logger.critical(
f"match_ip_incoming found duplicate for IP:{ip}"
)
return found

def match_uuid(self, uuid: UUID) -> Optional[Repeater]:
"""
Args:
Expand All @@ -74,13 +88,17 @@ def match_uuid(self, uuid: UUID) -> Optional[Repeater]:
Returns:
Repeater or None if matching fails
"""
return self.match_attr("id", uuid)
found = self.match_attr("id", uuid)
if not found:
raise SystemError(f"match_uuid failed for {uuid}")
return found

def create_repeater(
self,
dmr_id: int = None,
address_in: ADDRESS_TYPE = ADDRESS_EMPTY,
address_out: ADDRESS_TYPE = ADDRESS_EMPTY,
address_nat: ADDRESS_TYPE = ADDRESS_EMPTY,
) -> Repeater:
"""
Override this method if you need to extend Repeater object, you can create compatible instance instead here
Expand All @@ -91,10 +109,15 @@ def create_repeater(
address_out:
"""
# create object, generating UUID
rpt = Repeater(address_in=address_in, address_out=address_out, dmr_id=dmr_id)
# save to storage
self.__repeaters[rpt.id] = rpt
return rpt
return Repeater(
address_in=address_in,
address_out=address_out,
address_nat=address_nat,
dmr_id=dmr_id,
)

def __len__(self):
return len(self.__repeaters)

def all(self) -> List[Repeater]:
return list(self.__repeaters.values())
4 changes: 2 additions & 2 deletions okdmr/tests/dmrlib/hytera/test_snmp.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ def test_snmp():
message="Experimental SNMPv1 support", category=UserWarning, action="ignore"
)

rpt = Repeater(address_out=("127.0.0.1", 0))
rpt = Repeater(address_out=("127.0.0.1", 0), address_in=("127.0.0.1", 0))
rpt.read_snmp_values()


def test_snmp_print(caplog):
caplog.set_level(logging.DEBUG)
SNMP().print_snmp_data({"key": "value", SNMP.OID_RADIO_ID: 12345}, "0.1.2.3")
SNMP().print_snmp_data({"key": "value", SNMP.OID_RADIO_ID: 12345}, "127.0.0.1")
assert len(caplog.messages)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import pytest

from okdmr.dmrlib.hytera.pdu.hstrp import HSTRP, HSTRPPacketType
from okdmr.dmrlib.protocols.hytera.hstrp_datagram_protocol import HSTRPDatagramProtocol


def test_hstrpdp():
hstrp = HSTRPDatagramProtocol(123, be_active_peer=True)
assert hstrp.sn == 0
hstrp.hstrp_increment_sn()
assert hstrp.sn == 1
assert not hstrp.hstrp_connected
hstrp.hstrp_set_connected(True)
assert hstrp.hstrp_connected
hstrp.connection_lost(None)
assert not hstrp.hstrp_connected

# check that routines do not modify the original HSTRP object
h: HSTRP = HSTRP(
pkt_type=HSTRPPacketType(is_connect=True, have_options=False), sn=123
)
h_ack = hstrp.hstrp_send_ack(addr=("", 0), request=h)
assert h_ack.sn == h.sn
assert not h.pkt_type.is_ack
assert h_ack.pkt_type.is_ack

# check that heartbeat is correctly returned
hb = hstrp.hstrp_send_heartbeat(addr=("", 0))
assert isinstance(hb, HSTRP)

# passing none to check assert
with pytest.raises(AssertionError):
hstrp.connection_made(None)
13 changes: 7 additions & 6 deletions okdmr/tests/dmrlib/storage/test_repeater.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys

from okdmr.dmrlib.storage import ADDRESS_EMPTY
from okdmr.dmrlib.storage.repeater import Repeater


Expand All @@ -8,16 +9,16 @@ def test_repeater():
dmr_id=2305519,
callsign="OK1DMR",
serial="ABCDEF",
address_in=("", 1),
address_out=("", 0),
address_in=("127.0.0.1", 1),
address_out=("127.0.0.1", 0),
nat_enabled=True,
snmp_enabled=True,
)
assert rpt.callsign == "OK1DMR"
assert rpt.dmr_id == 2305519
assert rpt.serial == "ABCDEF"
assert rpt.address_out == ("", 0)
assert rpt.address_in == ("", 1)
assert rpt.address_out == ("127.0.0.1", 0)
assert rpt.address_in == ("127.0.0.1", 1)
assert rpt.nat_enabled
assert rpt.snmp_enabled

Expand All @@ -27,9 +28,9 @@ def test_repeater():
assert rpt.attr("custom_attr") == None

rpt.nat_enabled = True
assert rpt.repeater_target_address() == ("", 0)
assert rpt.repeater_target_address() == ADDRESS_EMPTY
rpt.nat_enabled = False
assert rpt.repeater_target_address() == ("", 1)
assert rpt.repeater_target_address() == ("127.0.0.1", 1)

rpt.patch({"different_attr": False, "callsign": "OK4DMR"})
assert rpt.attr("different_attr") == False
Expand Down
12 changes: 12 additions & 0 deletions okdmr/tests/dmrlib/storage/test_repeater_storage.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import uuid

import pytest

from okdmr.dmrlib.storage.repeater_storage import RepeaterStorage


Expand Down Expand Up @@ -29,3 +33,11 @@ def test_storage_auto_create(caplog):
rs.match_attr("callsign", "OK4DMR")

assert rs.match_uuid(rpt.id) == rpt
assert len(rs.all()) == 3

rpt = rs.match_ip_incoming(addr3[0])
assert rpt.address_in == addr3

with pytest.raises(SystemError):
# unknown UUID should not match
rs.match_uuid(uuid.uuid4())
3 changes: 3 additions & 0 deletions okdmr/tests/dmrlib/tools/test_hrnp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ def test_hrnp_client():
parsed = HRNPClient.args().parse_args(["192.168.22.10"])
config = HRNPClientConfiguration(**vars(parsed))
hc: HRNPClient = HRNPClient(config)
assert not hc.is_running
hc.stop()
assert not hc.is_running

0 comments on commit c85c024

Please sign in to comment.