forked from bobjacobsen/python-openlcb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
example_node_implementation.py
151 lines (107 loc) · 4.27 KB
/
example_node_implementation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
'''
Demo of using the datagram service to send and receive a datagram
Usage:
python3 example_node_implementation.py [host|host:port]
Options:
host|host:port (optional) Set the address (or using a colon,
the address and port). Defaults to a hard-coded test
address and port.
'''
from openlcb.canbus.tcpsocket import TcpSocket
from openlcb.canbus.canphysicallayergridconnect import (
CanPhysicalLayerGridConnect,
)
from openlcb.canbus.canlink import CanLink
from openlcb.nodeid import NodeID
from openlcb.datagramservice import DatagramService
from openlcb.memoryservice import MemoryService
from openlcb.message import Message
from openlcb.mti import MTI
from openlcb.localnodeprocessor import LocalNodeProcessor
from openlcb.pip import PIP
from openlcb.snip import SNIP
from openlcb.node import Node
# specify connection information
# region moved to settings
# host = "192.168.16.212"
# port = 12021
# localNodeID = "05.01.01.01.03.01"
# farNodeID = "09.00.99.03.00.35"
# endregion moved to settings
# region same code as other examples
from examples_settings import Settings
settings = Settings()
if __name__ == "__main__":
settings.load_cli_args(docstring=__doc__)
# endregion same code as other examples
s = TcpSocket()
# s.settimeout(30)
s.connect(settings['host'], settings['port'])
print("RR, SR are raw socket interface receive and send;"
" RL, SL are link interface; RM, SM are message interface")
def sendToSocket(string):
print(" SR: {}".format(string.strip()))
s.send(string)
def printFrame(frame):
print(" RL: {}".format(frame))
canPhysicalLayerGridConnect = CanPhysicalLayerGridConnect(sendToSocket)
canPhysicalLayerGridConnect.registerFrameReceivedListener(printFrame)
def printMessage(message):
print("RM: {} from {}".format(message, message.source))
canLink = CanLink(NodeID(settings['localNodeID']))
canLink.linkPhysicalLayer(canPhysicalLayerGridConnect)
canLink.registerMessageReceivedListener(printMessage)
datagramService = DatagramService(canLink)
canLink.registerMessageReceivedListener(datagramService.process)
def printDatagram(memo):
"""create a call-back to print datagram contents when received
Args:
memo (DatagramReadMemo): The datagram received
Returns:
bool: Always False (True would mean we sent a reply to the datagram,
but let the MemoryService do that).
"""
print("Datagram receive call back: {}".format(memo.data))
return False
datagramService.registerDatagramReceivedListener(printDatagram)
memoryService = MemoryService(datagramService)
# createcallbacks to get results of memory read
def memoryReadSuccess(memo):
print("successful memory read: {}".format(memo.data))
def memoryReadFail(memo):
print("memory read failed: {}".format(memo.data))
# create a node and connect it update
# This is a very minimal node, which just takes part in the low-level common
# protocols
localNode = Node(
NodeID(settings['localNodeID']),
SNIP("PythonOlcbNode", "example_node_implementation",
"0.1", "0.2", "User Name Here", "User Description Here"),
set([PIP.SIMPLE_NODE_IDENTIFICATION_PROTOCOL, PIP.DATAGRAM_PROTOCOL])
)
localNodeProcessor = LocalNodeProcessor(canLink, localNode)
canLink.registerMessageReceivedListener(localNodeProcessor.process)
def displayOtherNodeIds(message) :
"""Listener to identify connected nodes
Args:
message (_type_): _description_
"""
print("[displayOtherNodeIds] type(message): {}"
"".format(type(message).__name__))
if message.mti == MTI.Verified_NodeID :
print("Detected farNodeID is {}".format(message.source))
canLink.registerMessageReceivedListener(displayOtherNodeIds)
#######################
# have the socket layer report up to bring the link layer up and get an alias
print(" SL : link up")
canPhysicalLayerGridConnect.physicalLayerUp()
# request that nodes identify themselves so that we can print their node IDs
message = Message(MTI.Verify_NodeID_Number_Global,
NodeID(settings['localNodeID']), None)
canLink.sendMessage(message)
# process resulting activity
while True:
input = s.receive()
print(" RR: "+input.strip())
# pass to link processor
canPhysicalLayerGridConnect.receiveString(input)