Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented UDP socket utility function for Nodes #53

Open
wants to merge 46 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
985fac5
added UDP socket node for Nengo
studywolf Sep 26, 2017
46843b1
squash! added UDP socket node for Nengo
tbekolay Sep 28, 2017
179559e
squash! added UDP socket node for Nengo
tbekolay Oct 3, 2017
82d8dcc
fixup! squash! added UDP socket node for Nengo
tbekolay Oct 3, 2017
61de86d
fixup! squash! added UDP socket node for Nengo
tbekolay Oct 4, 2017
3bb9516
Remove the SocketCloseThread.
jgosmann Oct 5, 2017
ed09c10
Remove reopen and backoff code.
jgosmann Oct 5, 2017
19b4af0
Wait with the socket creation until the make_step.
jgosmann Oct 5, 2017
3b90b22
Prevent exceptions from being hidden in SimThread.
jgosmann Oct 5, 2017
0203bf3
fixup! Remove the SocketCloseThread.
jgosmann Oct 5, 2017
684bc9a
fixup! Wait with the socket creation until the make_step.
jgosmann Oct 5, 2017
dd53b6b
fixup! Wait with the socket creation until the make_step.
jgosmann Oct 5, 2017
d58f8d5
Do not use shutdown when closing sockets.
jgosmann Oct 5, 2017
0647cc0
Make one of the two socket tests pass.
jgosmann Oct 5, 2017
5160116
squash! Remove the SocketCloseThread.
jgosmann Oct 8, 2017
73f57f4
Make sync test pass.
jgosmann Oct 8, 2017
86d4929
Refactor _UDPSocket.
jgosmann Oct 8, 2017
b48003e
Do not catch AttributeError.
jgosmann Oct 8, 2017
50e8e7e
Simplify ignore_timestamp logic.
jgosmann Oct 8, 2017
aea408d
Simplify recv/timeout logic.
jgosmann Oct 8, 2017
438b93d
Introduce limit on packet loss.
jgosmann Oct 8, 2017
350981f
Use interval +/-(dt/2) for timestep verification.
jgosmann Oct 8, 2017
604bdf9
Handle differing remote dt in recv.
jgosmann Oct 8, 2017
2c4df8c
Test recv timestamp check logic.
jgosmann Oct 8, 2017
0d077b2
Do not store value_t.
jgosmann Oct 8, 2017
805d085
Add more tests, fix minor problems.
jgosmann Oct 8, 2017
0f1601a
Update docstrings and parameters.
jgosmann Oct 8, 2017
082a989
Obtain dt from make_step.
jgosmann Oct 8, 2017
8db0335
More documentation.
jgosmann Oct 8, 2017
a0d91bf
More code documentation.
jgosmann Oct 8, 2017
69f315d
Change default timeout.
jgosmann Oct 8, 2017
5b04888
Implement separate timeout for initial package.
jgosmann Oct 12, 2017
e09bc11
Fix some pylint warnings.
jgosmann Oct 12, 2017
1936e28
Add IPv6 todo note.
jgosmann Oct 12, 2017
91bfda0
fixup! Implement separate timeout for initial package.
jgosmann Oct 12, 2017
1339105
Document timestamp checking logic.
jgosmann Oct 13, 2017
bbab041
fixup! Document timestamp checking logic.
jgosmann Oct 13, 2017
1155105
fixup! Document timestamp checking logic.
jgosmann Oct 13, 2017
d59c832
Put adaptive socket timeout back in.
jgosmann Oct 13, 2017
e053a22
Ensure compatibility with different platforms.
jgosmann Oct 13, 2017
25547bf
fixup! squash! added UDP socket node for Nengo
jgosmann Oct 17, 2017
c7cf147
Add some debug logging.
jgosmann Oct 17, 2017
61391d7
fixup! Put adaptive socket timeout back in.
jgosmann Oct 17, 2017
6a8fcb8
fixup! Add some debug logging.
jgosmann Oct 17, 2017
3e5ba35
Fix compatibility with Python 3.6.
jgosmann Oct 17, 2017
0f1288d
Remove unused/not implemented argument.
jgosmann Dec 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
385 changes: 385 additions & 0 deletions nengo_extras/sockets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,385 @@
from __future__ import absolute_import

import errno
import socket
import struct
import threading
import time
import warnings
from timeit import default_timer

from .utils import queue


class SocketAliveThread(threading.Thread):
"""Check for inactivity, and close if timed out.

The socket class should call the ``keepalive`` method to ensure that
the thread does not time out.

Parameters
----------
timeout : float
The number of seconds before the thread times out.
close_func : function
The function to call when the thread times out.
"""

def __init__(self, timeout, close_func):
threading.Thread.__init__(self)
self.last_active = default_timer()
self.timeout = timeout
self.close_func = close_func
self.stopped = False

def keepalive(self):
self.last_active = default_timer()

def run(self):
# Keep checking if the socket class is still being used.
while default_timer() - self.last_active < self.timeout:
time.sleep(self.timeout * 0.5)
# If the socket class is idle, terminate the sockets
self.close_func()

def stop(self):
if not self.stopped:
self.stopped = True
self.last_active = 0
self.join()


class UDPSocket(object):
"""A class for UDP communication to/from a Nengo model.

A UDPSocket can be send only, receive only, or both send and receive.
For each of these cases, different parameter sets must be specified.

If the ``local_addr`` or ``dest_addr`` are not specified, then a local
connection is assumed.

For a send only socket, the user must specify:
(send_dim, dest_port)
and may optionally specify:
(dest_addr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this formatting will be lost when the documentation is built with sphinx. Do we care about that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstrings have been changed in the new commit; this block was explaining the oddities involved with the old organization anyhow.


For a receive only socket, the user must specify:
(recv_dim, local_port)
and may optionally specify:
(local_addr, socket_timeout, thread_timeout)

For a send and receive socket, the user must specify:
(send_dim, recv_dim, local_port, dest_port)
and may optionally specify:
(local_addr, dest_addr, dt_remote, socket_timeout, thread_timeout)

For examples of the UDPSocket communicating between models all running
on a local machine, please see the tests/test_socket.py file.

To communicate between two models in send and receive mode over a network,
one running on machine A with IP address 10.10.21.1 and one running on
machine B, with IP address 10.10.21.25, we add the following socket to the
model on machine A::

socket_send_recv_A = UDPSocket(
send_dim=A_output_dims, recv_dim=B_output_dims,
local_addr='10.10.21.1', local_port=5001,
dest_addr='10.10.21.25', dest_port=5002)
node_send_recv_A = nengo.Node(
socket_send_recv_A.run,
size_in=A_output_dims, # input to this node is data to send
size_out=B_output_dims) # output from this node is data received

and the following socket on machine B::

socket_send_recv_B = UDPSocket(
send_dim=B_output_dims, recv_dim=A_output_dims,
local_addr='10.10.21.25', local_port=5002,
dest_addr='10.10.21.1', dest_port=5001)
node_send_recv_B = nengo.Node(
socket_send_recv_B.run,
size_in=B_output_dims, # input to this node is data to send
size_out=A_output_dims) # output from this node is data received

and then connect the ``UDPSocket.input`` and ``UDPSocket.output`` nodes to
the communicating Nengo model elements.

Parameters
----------
send_dim : int, optional (Default: 1)
Number of dimensions of the vector data being sent.
recv_dim : int, optional (Default: 1)
Number of dimensions of the vector data being received.
dt_remote : float, optional (Default: 0)
The time step of the remote simulation, only relevant for send and
receive nodes. Used to regulate how often data is sent to the remote
machine, handling cases where simulation time steps are not the same.
local_addr : str, optional (Default: '127.0.0.1')
The local IP address data is received over.
local_port : int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't state that this is optional, but the function signature has a default value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's because if you want to use local_addr this parameter is not optional. The default value specified in the constructor is meant to tell the constructor to ignore the local_addr setting.

Copy link
Member

@tbekolay tbekolay Sep 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One way around this (let me know if this seems good) is to split this into 3 processes, one for sending, one for receiving, one for doing both. We can still make sure there's no code duplication by having the step function returned by make_step be similar to the main socket class now (but a bit simpler). If this seems good I can implement it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. The only reason why it's all in one function right now is because of code duplication. If 3 separate processes can be made without code duplication, that'll be ideal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done now.

The local port data is receive over.
dest_addr : str, optional (Default: '127.0.0.1')
The local or remote IP address data is sent to.
dest_port: int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again optional according the the function signature, but not according to the documentation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the local_port parameter.

The local or remote port data is sent to.
socket_timeout : float, optional (Default: 30)
The time a socket waits before throwing an inactivity exception.
thread_timeout : float, optional (Default: 1)
The amount of inactive time allowed before closing a thread running
a socket.
byte_order : str, optional (Default: '!')
Specify 'big' or 'little' endian data format.
'!' uses the system default.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we state that '<' and '>' are also supported? When integrating with other Python code, one might to use those?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in the new commit.

ignore_timestamp : boolean, optional (Default: False)
Relevant to send and receive sockets. If True, does not try to
regulate how often packets are sent to remote system based by
comparing to remote simulation time step. Simply sends a packet
every time step.
"""
def __init__(self, send_dim=1, recv_dim=1, dt_remote=0,
local_addr='127.0.0.1', local_port=-1,
dest_addr='127.0.0.1', dest_port=-1,
socket_timeout=30, thread_timeout=1,
byte_order='!', ignore_timestamp=False):
self.local_addr = local_addr
self.local_port = local_port
self.dest_addr = (dest_addr if isinstance(dest_addr, list)
else [dest_addr])
self.dest_port = (dest_port if isinstance(dest_port, list)
else [dest_port])
self.socket_timeout = socket_timeout

if byte_order.lower() == "little":
self.byte_order = '<'
elif byte_order.lower() == "big":
self.byte_order = '>'
else:
self.byte_order = byte_order

self.last_t = 0.0 # local sim time last time run was called
self.last_packet_t = 0.0 # remote sim time from last packet received
self.dt = 0.0 # local simulation dt
self.dt_remote = max(dt_remote, self.dt) # dt between each packet sent

self.retry_backoff_time = 1

self.send_socket = None
self.recv_socket = None
self.is_sender = dest_port != -1
self.is_receiver = local_port != -1
self.ignore_timestamp = ignore_timestamp

self.send_dim = send_dim
self.recv_dim = recv_dim

self.max_recv_len = (recv_dim + 1) * 4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why +1? Why times 4? (4 bytes per value?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to account for the timestamp, *4 because the packet data is sent as float: struct.pack(self.byte_order + 'f' * (self.send_dim + 1), *send_data), which are 4 bytes long.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should then probably use struct.calcsize to determine the byte length.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suuure... okay.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the new commit, we do not track the max_recv_len; instead, we use NumPy to hold data associated with the socket, and do socket.recv_into(ndarray.data) to stream the data from the socket directly into the numpy array.

self.value = [0.0] * recv_dim
self.buffer = queue.PriorityQueue()

self.timeout_min = thread_timeout
self.timeout_max = max(thread_timeout, socket_timeout + 1)
self.timeout_thread = None

def __del__(self):
self.close()

def _open_sockets(self):
"""Startup sockets and timeout thread."""
# Close existing sockets and thread
self.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit surprising to me that _open_sockets closes sockets. Maybe it should just fail with an acception if the sockets are already open? (I think it sometimes also can be a problem to close a socket and reopen it in short succession.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It originally failed with an exception, but that means the simple act of resetting the simulation in the GUI will cause an exception to occur.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sockets are now only opened once, so no need to close, except in the reopen method, in which calling close should hopefully make more sense.


# Open new sockets and thread
if self.is_receiver:
self._open_recv_socket()
if self.is_sender:
self._open_send_socket()
self.timeout_thread = SocketAliveThread(self.timeout_max, self.close)
self.timeout_thread.start()

def _open_recv_socket(self):
"""Create a socket for receiving data."""
try:
self.recv_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.recv_socket.bind((self.local_addr, self.local_port))
self.recv_socket.settimeout(self.socket_timeout)
except socket.error:
raise RuntimeError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to use a custom exception type? That would allow to catch precisely this exception, while RuntimeError might be much more general.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer raise this error and let the socket error raise itself if need be (there wasn't much of a point to reraising this).

"UDPSocket: Could not bind to socket. Address: %s, Port: %s, "
"is in use. If simulation has been run before, wait for "
"previous UDPSocket to release the port. See 'socket_timeout'"
" argument, currently set to %g seconds." %
(self.local_addr,
self.local_port,
self.timeout_thread.timeout))

def _close_recv_socket(self):
if self.recv_socket is not None:
self.recv_socket.close()
self.recv_socket = None

def _open_send_socket(self):
"""Create a socket for sending data."""
try:
self.send_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if self.dest_addr == self.local_addr:
self.send_socket.bind((self.local_addr, 0))
except socket.error as error:
raise RuntimeError("UDPSocket: Error str: %s" % (error,))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Custom exception type? (see above)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or keep it as socket.error?


def _close_send_socket(self):
if self.send_socket is not None:
self.send_socket.close()
self.send_socket = None

def _retry_connection(self):
"""Try to create a new receive socket with backoff."""
self._close_recv_socket()
while self.recv_socket is None:
time.sleep(self.retry_backoff_time)
try:
self._open_recv_socket()
except socket.error:
pass
# Failed to open receiving socket, double backoff time, then retry
self.retry_backoff_time *= 2

def close(self):
"""Close all threads and sockets."""
if self.timeout_thread is not None:
self.timeout_thread.stop()
# Double make sure all sockets are closed
self._close_send_socket()
self._close_recv_socket()

def pack_packet(self, t, x):
"""Takes a timestamp and data (x) and makes a socket packet

Default packet data type: float
Default packet structure: [t, x[0], x[1], x[2], ... , x[d]]
"""
send_data = ([float(t + self.dt_remote / 2.0)] +
[x[i] for i in range(self.send_dim)])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that a list(x) or tuple(x) might be more efficient than the list comprehension (and this gets called for every time step I suppose?).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully NumPy is even more efficient!

return struct.pack(self.byte_order + 'f' * (self.send_dim + 1),
*send_data)

def unpack_packet(self, packet):
"""Takes a packet and extracts a timestamp and data (x)

Default packet data type: float
Default packet structure: [t, x[0], x[1], x[2], ... , x[d]]
"""
data_len = int(len(packet) // 4)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The explicit int conversion shouldn't be required? len should return an int, and it is integer division

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better safe than sorry. But it can probably be taken out.. maybe?

data = list(struct.unpack(self.byte_order + 'f' * data_len, packet))
t_data = data[0]
value = data[1:]
return t_data, value

def __call__(self, t, x=None):
return self.run(t, x)

def _get_or_buffer(self, t_data, value, t):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the get in the name a bit strange because this function does not return anything and it doesn't even get value from somewhere, but the value is passed in.

Copy link
Member

@xchoo xchoo Sep 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feel free to suggest another name for this function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method is removed in the new commit.

if (t_data >= t and t_data < t + self.dt) or self.ignore_timestamp:
self.value = value
elif t_data >= t + self.dt:
self.buffer.put((t_data, value))
else:
raise RuntimeError("Data from the past is buffered")

def run_recv(self, t):
if not self.buffer.empty():
# There are items (packets with future timestamps) in the
# buffer. Check the buffer for appropriate information
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there always items with future timestamps in the buffer? Or should this say that there might be such items?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the buffer is not empty for a specific t, then the things in the buffer have future timestamps. It might not always be the case that the buffer has items though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look at the new recv method in SocketStep; we don't actually keep track of a buffer, we only have the last received value, which we check for validity and wait if it's not valid.

t_data, value = self.buffer.get()
self._get_or_buffer(t_data, value, t)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find these two lines hard to understand confusing.
Is this supposed to set self.value with the data for the current timestep and if only data for a future timestep is available to put it back into the buffer? If so, there should be a better way to express this intent.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's supposed to cycle through the buffer and find any packets with timestamps that can be used. If it sees a packet with a timestamp in the future, it puts it back in the buffer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still confused by this. You say

If the buffer is not empty for a specific t, then the things in the buffer have future timestamps.

but also

If it sees a packet with a timestamp in the future, it puts it back in the buffer.

So we know at that point the buffer is not empty, so all timestamps are in the future, so all packets get put back into the buffer?

I suppose there are items in the buffer of which some (or all, but not necessarily all) have future time stamps?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because if the buffer is not empty, it means that there are packets that have either a future or current timestamp in it. Keep in mind that the buffer is checked every time step, so eventually, the code will run into a packet with an appropriate timestamp to use.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check the new recv method to see if it makes more sense. @xchoo, it seemed in the previous implementation that the buffer could never hold more than one item. Is that correct? If not, then I'm very confused.

return

while True:
try:
packet, addr = self.recv_socket.recvfrom(self.max_recv_len)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this return less then max_recv_len bytes? The Python documentation doesn't really say ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. You can ask for X bytes of a packet, where X is <= to the packet data length.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, what I'm wondering about is whether this might return y < X bytes when X bytes are requested. Which would require to handle that case because otherwise the unpacking would fail because we've got only a partial packet.

Copy link
Member

@xchoo xchoo Sep 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. But only if the send code has super screwed up the send format (i.e. the send and recv formats are different). Every packet received is a full packet. If the packet is corrupted, the python socket code should drop it (it's udp for a reason).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I'm essentially asking. I think with TCP connections you don't have that guarantee (but I didn't know about UDP).

t_data, value = self.unpack_packet(packet)
self._get_or_buffer(t_data, value, t)

# Packet recv success! Decay timeout to the user specified
# thread timeout (which can be smaller than the socket timeout)
self.timeout_thread.timeout = max(
self.timeout_min, self.timeout_thread.timeout * 0.9)
self.retry_backoff_time = max(1, self.retry_backoff_time * 0.5)
break

except (socket.error, AttributeError) as error:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we expect an AttributeError here? That seems like something completely different than a socket.error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the exception I received when I coded this part to take care of the specific exception.

# Socket error has occurred. Probably a timeout.
# Assume worst case, set thread timeout to
# timeout_max to wait for more timeouts to
# occur (this is so that the socket isn't constantly
# closed by the check_alive thread)
self.timeout_thread.timeout = self.timeout_max

# Timeout occurred, assume packet lost.
if isinstance(error, socket.timeout):
break

# If connection was reset (somehow?), or closed by the
# idle timer (prematurely), retry the connection, and
# retry receiving the packet again.
connreset = (hasattr(error, 'errno')
and error.errno == errno.ECONNRESET)
if connreset or self.recv_socket is None:
self._retry_connection()
warnings.warn("UDPSocket Error at t=%g: %s" % (t, error))

def run_send(self, t, x):
# Calculate if it is time to send the next packet.
# Ideal time to send is last_packet_t + dt_remote, and we
# want to find out if current or next local time step is closest.
if (t + self.dt / 2.0) >= (self.last_packet_t + self.dt_remote):
for addr in self.dest_addr:
for port in self.dest_port:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to send the packet to all listed ports on each address? Shouldn't it be pairs of addresses and ports?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't remember if I put this in... It's probably a bug, and multiple address/port pairs should not be allowed? I can't remember the motivation for doing that... haha

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in the new commit.

self.send_socket.sendto(
self.pack_packet(t, x), (addr, port))
self.last_packet_t = t # Copy t (which is a scalar)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment does not seem helpful. The assignment tells me that t is assigned (“copied”) to the attribute. I might not know that t is a scalar, but that depends on the caller, so I would rather document the argument list saying that t is supposed to be a scalar.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh. The comment is in reference to t not being a scalar/float, and the copy is needed so that the class doesn't just have a reference to t.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this is just storing the reference to t and not doing a copy? Why is t not a scalar/float?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dunno. haha. I can't remember why this is done tbh. 😆

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done because previously the t and x provided to a node function was a live signal; in newer versions of Nengo it's a copy, so keeping a reference to it is now safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but I think the line wasn't doing what is was intended to do ... anyways I suppose this is now obsolete.


def run(self, t, x=None):
"""Function that will be passed into the Nengo node.

When both sending and receiving, the sending frequency is
regulated by comparing the local and remote time steps. Information
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the sending frequency only regulated when both sending and receiving, i.e. when only sending it is not controlled?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. When sending only, the sending frequency is determined by the local t. When sending and receiving, it tries to sync up the packets so neither side gets flooded with packets.

is sent when the current local timestep is closer to the remote
time step than the next local timestep.
"""

# If t == 0, return array of zeros and reset state of class,
# empty queue of messages, close any open sockets
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we doing this? Because this indicates a simulator reset?
Maybe the socket functionality rather be implemented as a nengo.Process to properly support resets?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was written before nengo.Process was implemented. So... if anyone has the free time to switch the code over and test it, by all means! 😄

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

if t == 0:
self.value = [0.0] * self.recv_dim
self.last_t = 0.0
self.last_packet_t = 0.0

# Empty the buffer
while not self.buffer.empty():
self.buffer.get()

self.close()
return self.value

# Initialize socket if t > 0, and it has not been initialized
if t > 0 and ((self.recv_socket is None and self.is_receiver) or
(self.send_socket is None and self.is_sender)):
self._open_sockets()

# Calculate dt
self.dt = t - self.last_t
# An update can be sent, at most, every self.dt.
# If remote dt is smaller use self.dt to check.
self.dt_remote = max(self.dt_remote, self.dt)
self.last_t = t
self.timeout_thread.keepalive()

if self.is_sender:
assert x is not None
self.run_send(t, x)

if self.is_receiver:
self.run_recv(t)

# Return retrieved value
return self.value
Loading