Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented UDP socket utility function for Nodes #53

Open
wants to merge 46 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
985fac5
added UDP socket node for Nengo
studywolf Sep 26, 2017
46843b1
squash! added UDP socket node for Nengo
tbekolay Sep 28, 2017
179559e
squash! added UDP socket node for Nengo
tbekolay Oct 3, 2017
82d8dcc
fixup! squash! added UDP socket node for Nengo
tbekolay Oct 3, 2017
61de86d
fixup! squash! added UDP socket node for Nengo
tbekolay Oct 4, 2017
3bb9516
Remove the SocketCloseThread.
jgosmann Oct 5, 2017
ed09c10
Remove reopen and backoff code.
jgosmann Oct 5, 2017
19b4af0
Wait with the socket creation until the make_step.
jgosmann Oct 5, 2017
3b90b22
Prevent exceptions from being hidden in SimThread.
jgosmann Oct 5, 2017
0203bf3
fixup! Remove the SocketCloseThread.
jgosmann Oct 5, 2017
684bc9a
fixup! Wait with the socket creation until the make_step.
jgosmann Oct 5, 2017
dd53b6b
fixup! Wait with the socket creation until the make_step.
jgosmann Oct 5, 2017
d58f8d5
Do not use shutdown when closing sockets.
jgosmann Oct 5, 2017
0647cc0
Make one of the two socket tests pass.
jgosmann Oct 5, 2017
5160116
squash! Remove the SocketCloseThread.
jgosmann Oct 8, 2017
73f57f4
Make sync test pass.
jgosmann Oct 8, 2017
86d4929
Refactor _UDPSocket.
jgosmann Oct 8, 2017
b48003e
Do not catch AttributeError.
jgosmann Oct 8, 2017
50e8e7e
Simplify ignore_timestamp logic.
jgosmann Oct 8, 2017
aea408d
Simplify recv/timeout logic.
jgosmann Oct 8, 2017
438b93d
Introduce limit on packet loss.
jgosmann Oct 8, 2017
350981f
Use interval +/-(dt/2) for timestep verification.
jgosmann Oct 8, 2017
604bdf9
Handle differing remote dt in recv.
jgosmann Oct 8, 2017
2c4df8c
Test recv timestamp check logic.
jgosmann Oct 8, 2017
0d077b2
Do not store value_t.
jgosmann Oct 8, 2017
805d085
Add more tests, fix minor problems.
jgosmann Oct 8, 2017
0f1601a
Update docstrings and parameters.
jgosmann Oct 8, 2017
082a989
Obtain dt from make_step.
jgosmann Oct 8, 2017
8db0335
More documentation.
jgosmann Oct 8, 2017
a0d91bf
More code documentation.
jgosmann Oct 8, 2017
69f315d
Change default timeout.
jgosmann Oct 8, 2017
5b04888
Implement separate timeout for initial package.
jgosmann Oct 12, 2017
e09bc11
Fix some pylint warnings.
jgosmann Oct 12, 2017
1936e28
Add IPv6 todo note.
jgosmann Oct 12, 2017
91bfda0
fixup! Implement separate timeout for initial package.
jgosmann Oct 12, 2017
1339105
Document timestamp checking logic.
jgosmann Oct 13, 2017
bbab041
fixup! Document timestamp checking logic.
jgosmann Oct 13, 2017
1155105
fixup! Document timestamp checking logic.
jgosmann Oct 13, 2017
d59c832
Put adaptive socket timeout back in.
jgosmann Oct 13, 2017
e053a22
Ensure compatibility with different platforms.
jgosmann Oct 13, 2017
25547bf
fixup! squash! added UDP socket node for Nengo
jgosmann Oct 17, 2017
c7cf147
Add some debug logging.
jgosmann Oct 17, 2017
61391d7
fixup! Put adaptive socket timeout back in.
jgosmann Oct 17, 2017
6a8fcb8
fixup! Add some debug logging.
jgosmann Oct 17, 2017
3e5ba35
Fix compatibility with Python 3.6.
jgosmann Oct 17, 2017
0f1288d
Remove unused/not implemented argument.
jgosmann Dec 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
391 changes: 391 additions & 0 deletions nengo_extras/sockets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,391 @@
from __future__ import absolute_import

import struct
import socket
import time
import threading
import errno
import warnings

from nengo.utils.compat import queue


class SocketCheckAliveThread(threading.Thread):
"""Check UDPSocket class for inactivity, and close if a message has
not been sent or received in the specified time.

Parameters
----------
socket_class : UDPSocket
Monitor the activity of this UDPSocket class.
"""
def __init__(self, socket_class):
threading.Thread.__init__(self)
self.socket_class = socket_class

def run(self):
# Keep checking if the socket class is still being used.
while (time.time() - self.socket_class.last_active <
self.socket_class.idle_time_limit):
time.sleep(self.socket_class.alive_thread_sleep_time)
# If the socket class is idle, terminate the sockets
self.socket_class._close_recv_socket()
self.socket_class._close_send_socket()


class UDPSocket(object):
""" A class for UDP communication to/from a Nengo model.

A UDPSocket can be send only, receive only, or both send and receive.
For each of these cases, different parameter sets must be specified.

If the ``local_addr`` or ``dest_addr`` are not specified, then a local
connection is assumed.

For a send only socket, the user must specify:
(send_dim, dest_port)
and may optionally specify:
(dest_addr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this formatting will be lost when the documentation is built with sphinx. Do we care about that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstrings have been changed in the new commit; this block was explaining the oddities involved with the old organization anyhow.


For a receive only socket, the user must specify:
(recv_dim, local_port)
and may optionally specify:
(local_addr, socket_timeout, thread_timeout)

For a send and receive socket, the user must specify:
(send_dim, recv_dim, local_port, dest_port)
and may optionally specify:
(local_addr, dest_addr, dt_remote, socket_timeout, thread_timeout)

For examples of the UDPSocket communicating between models all running
on a local machine, please see the tests/test_socket.py file.

To communicate between two models in send and receive mode over a network,
one running on machine A with IP address 10.10.21.1 and one running on
machine B, with IP address 10.10.21.25, we add the following socket to the
model on machine A::

socket_send_recv_A = UDPSocket(
send_dim=A_output_dims, recv_dim=B_output_dims,
local_addr='10.10.21.1', local_port=5001,
dest_addr='10.10.21.25', dest_port=5002)
node_send_recv_A = nengo.Node(
socket_send_recv_A.run,
size_in=A_output_dims, # input to this node is data to send
size_out=B_output_dims) # output from this node is data received

and the following socket on machine B::

socket_send_recv_B = UDPSocket(
send_dim=B_output_dims, recv_dim=A_output_dims,
local_addr='10.10.21.25', local_port=5002,
dest_addr='10.10.21.1', dest_port=5001)
node_send_recv_B = nengo.Node(
socket_send_recv_B.run,
size_in=B_output_dims, # input to this node is data to send
size_out=A_output_dims) # output from this node is data received

and then connect the ``UDPSocket.input`` and ``UDPSocket.output`` nodes to
the communicating Nengo model elements.

Parameters
----------
send_dim : int, optional (Default: 1)
Number of dimensions of the vector data being sent.
recv_dim : int, optional (Default: 1)
Number of dimensions of the vector data being received.
dt_remote : float, optional (Default: 0)
The time step of the remote simulation, only relevant for send and
receive nodes. Used to regulate how often data is sent to the remote
machine, handling cases where simulation time steps are not the same.
local_addr : str, optional (Default: '127.0.0.1')
The local IP address data is received over.
local_port : int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't state that this is optional, but the function signature has a default value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's because if you want to use local_addr this parameter is not optional. The default value specified in the constructor is meant to tell the constructor to ignore the local_addr setting.

Copy link
Member

@tbekolay tbekolay Sep 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One way around this (let me know if this seems good) is to split this into 3 processes, one for sending, one for receiving, one for doing both. We can still make sure there's no code duplication by having the step function returned by make_step be similar to the main socket class now (but a bit simpler). If this seems good I can implement it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. The only reason why it's all in one function right now is because of code duplication. If 3 separate processes can be made without code duplication, that'll be ideal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done now.

The local port data is receive over.
dest_addr : str, optional (Default: '127.0.0.1')
The local or remote IP address data is sent to.
dest_port: int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again optional according the the function signature, but not according to the documentation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the local_port parameter.

The local or remote port data is sent to.
socket_timeout : float, optional (Default: 30)
The time a socket waits before throwing an inactivity exception.
thread_timeout : float, optional (Default: 1)
The amount of inactive time allowed before closing a thread running
a socket.
byte_order : str, optional (Default: '!')
Specify 'big' or 'little' endian data format.
'!' uses the system default.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we state that '<' and '>' are also supported? When integrating with other Python code, one might to use those?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in the new commit.

ignore_timestamp : boolean, optional (Default: False)
Relevant to send and receive sockets. If True, does not try to
regulate how often packets are sent to remote system based by
comparing to remote simulation time step. Simply sends a packet
every time step.
"""
def __init__(self, send_dim=1, recv_dim=1, dt_remote=0,
local_addr='127.0.0.1', local_port=-1,
dest_addr='127.0.0.1', dest_port=-1,
socket_timeout=30, thread_timeout=1,
byte_order='!', ignore_timestamp=False):
self.local_addr = local_addr
self.local_port = local_port
self.dest_addr = (dest_addr if isinstance(dest_addr, list)
else [dest_addr])
self.dest_port = (dest_port if isinstance(dest_port, list)
else [dest_port])
self.socket_timeout = socket_timeout

if byte_order.lower() == "little":
self.byte_order = '<'
elif byte_order.lower() == "big":
self.byte_order = '>'
else:
self.byte_order = byte_order

self.last_t = 0.0 # local sim time last time run was called
self.last_packet_t = 0.0 # remote sim time from last packet received
self.dt = 0.0 # local simulation dt
self.dt_remote = max(dt_remote, self.dt) # dt between each packet sent

self.last_active = time.time()
self.idle_time_limit_min = thread_timeout
self.idle_time_limit_max = max(thread_timeout, socket_timeout + 1)
# threshold time for closing inactive socket thread
self.idle_time_limit = self.idle_time_limit_max
self.alive_check_thread = None
# how often to check the socket threads for inactivity
self.alive_thread_sleep_time = thread_timeout / 2.0
self.retry_backoff_time = 1

self.send_socket = None
self.recv_socket = None
self.is_sender = dest_port != -1
self.is_receiver = local_port != -1
self.ignore_timestamp = ignore_timestamp

self.send_dim = send_dim
self.recv_dim = recv_dim

self.max_recv_len = (recv_dim + 1) * 4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why +1? Why times 4? (4 bytes per value?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to account for the timestamp, *4 because the packet data is sent as float: struct.pack(self.byte_order + 'f' * (self.send_dim + 1), *send_data), which are 4 bytes long.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should then probably use struct.calcsize to determine the byte length.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suuure... okay.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the new commit, we do not track the max_recv_len; instead, we use NumPy to hold data associated with the socket, and do socket.recv_into(ndarray.data) to stream the data from the socket directly into the numpy array.

self.value = [0.0] * recv_dim
self.buffer = queue.PriorityQueue()

def __del__(self):
self.close()

def _open_socket(self):
"""Startup socket and thread for communication inactivity."""

# Close socket, terminate alive check thread
self.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit surprising to me that _open_sockets closes sockets. Maybe it should just fail with an acception if the sockets are already open? (I think it sometimes also can be a problem to close a socket and reopen it in short succession.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It originally failed with an exception, but that means the simple act of resetting the simulation in the GUI will cause an exception to occur.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sockets are now only opened once, so no need to close, except in the reopen method, in which calling close should hopefully make more sense.


if self.is_sender:
try:
self.send_socket = socket.socket(socket.AF_INET,
socket.SOCK_DGRAM)
if self.dest_addr == self.local_addr:
self.send_socket.bind((self.local_addr, 0))
except socket.error as error:
raise RuntimeError("UDPSocket: Error str: " + str(error))

if self.is_receiver:
self._open_recv_socket()

self.last_active = time.time()
self.alive_check_thread = SocketCheckAliveThread(self)
self.alive_check_thread.start()

def _open_recv_socket(self):
"""Create a socket for receiving data."""
try:
self.recv_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.recv_socket.bind((self.local_addr, self.local_port))
self.recv_socket.settimeout(self.socket_timeout)
except socket.error:
raise RuntimeError("UDPSocket: Could not bind to socket. "
"Address: %s, Port: %s, is in use. If "
"simulation has been run before, wait for "
"previous UDPSocket to release the port. "
"(See 'socket_timeout' argument in class "
"constructor, currently set to %f seconds)" %
(self.local_addr, self.local_port,
self.idle_time_limit))

def _retry_connection(self):
"""Close any open receive sockets, try to create a new receive
socket with increasing delays between attempts."""
self._close_recv_socket()
while self.recv_socket is None:
time.sleep(self.retry_backoff_time)
try:
self._open_recv_socket()
except socket.error:
pass
# Failed to open receiving socket, double backoff time, then retry
self.retry_backoff_time *= 2

def _terminate_alive_check_thread(self):
"""Set up situation for the alive_check_thread to shut down
this class due to communication inactivity and exit."""
self.idle_time_limit = self.idle_time_limit_max
if self.alive_check_thread is not None:
self.last_active = 0
self.alive_check_thread.join()
self.alive_check_thread = None

def _close_send_socket(self):
if self.send_socket is not None:
self.send_socket.close()
self.send_socket = None

def _close_recv_socket(self):
if self.recv_socket is not None:
self.recv_socket.close()
self.recv_socket = None

def pack_packet(self, t, x):
"""Takes a timestamp and data (x) and makes a socket packet

Default packet data type: float
Default packet structure: [t, x[0], x[1], x[2], ... , x[d]]"""

send_data = [float(t + self.dt_remote / 2.0)] + \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why self.dt_remote / 2.0 instead of just self.dt_remote?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that ... is a good question. in a receive node it checks to make sure that the packet's t >= self.t and t < self.t + self.dt, so i assume t + self.dt_remote / 2 is used in the packet to avoid rounding errors? @xchoo can you confirm?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. this is to mostly avoid rounding errors (because the socket class can be used to accept or send data to any device). It also makes sure that the timestamp of the data sent is squarely in one dt block.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give an example of how this (and the other place in the code where this happens) would solve a rounding error? I'd like to make a commit to document this somewhere.

Copy link
Member

@xchoo xchoo Sep 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's as much to address possibilities of rounding errors as it is to avoid compatibility problems in the future. The code that processes timestamps works like this:

  • When processing timestep T,
  • If socket packet is received with timestamp between T and T + dt, use it
  • If timestamp is greater than T + dt, remember it for later
  • Otherwise, ignore it.

The potential problem with this code is at the boundary points of the code (i.e. T and T + dt). The easiest way to ensure that the send code doesn't cause the receive code to run into these problems is to timestamp the packet exactly in between T and T + dt.

This also addresses the issue of rounding. Because packets can come from different devices, processing a timestep for t == 0.1s could really be processing it for t = 0.10000000000001s (floating point rounding issues on the host machine). But, if the code gets a packet with timestamp t = 0.99999999999s (floating point or casting issues with data from the remote machine), should it be considered for the previous timestep, or for the current timestep? Using a timestamp of T + dt / 2.0 avoids this ambiguity.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I would send the packets with the correct timestamp, and leave the checking logic to the machine that's processing the timestep. That way, it's easy to change the checking procedure. You could add dt / 2. (or some other epsilon) as part of the check, or you could round all timesteps to the nearest timestep on the machine doing the processing. I think it just helps keep the two things separate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. That could be done too. But I'd just caution that changes to this code requires extensive (beyond the unit tests) testing. There's a bunch of packet memory code (to handle cases where you receive packets timestamped in the future or past) that needs to be rechecked if changes are made to this logic.

Copy link
Member

@xchoo xchoo Sep 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. another word of caution. Because the socket code only handles packet data, there could be instances where the host and remote systems run using different dt's, so the timestamp processing code has to handle this. The way it's currently done is t >= T and t < T + dt. but it could just as easily be done as t >= T - dt/2 and t < T + dt/2. I chose the first option because it makes it clear that any packet received from T to T + dt is considered for the T timestep. Going the second option means potentially dealing with issues of having to test receiving data from the past.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So... How should we include this discussion in the documentation? Can we just in-line it or would that take too much space?

[x[i] for i in range(self.send_dim)]
packet = struct.pack(self.byte_order + 'f' * (self.send_dim + 1),
*send_data)
return packet

def unpack_packet(self, packet):
"""Takes a packet and extracts a timestamp and data (x)

Default packet data type: float
Default packet structure: [t, x[0], x[1], x[2], ... , x[d]]"""

data_len = int(len(packet) / 4)
data = list(struct.unpack(self.byte_order + 'f' * data_len, packet))
t_data = data[0]
value = data[1:]
return t_data, value

def __call__(self, t, x=None):
return self.run(t, x)

def _t_check(self, t_lim, t):
"""Check to see if current or next time step is closer to t_lim."""
return (t_lim >= t and t_lim < t + self.dt) or self.ignore_timestamp

def run(self, t, x=None): # noqa: C901
"""Function to pass into Nengo node. In case of both sending and
receiving the sending frequency is regulated by comparing the local
and remote time steps. Information is sent when the current local
time step is closer to the remote time step than the next local time
step.
"""
# If t == 0, return array of zeros and reset state of class,
# empty queue of messages, close any open sockets
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we doing this? Because this indicates a simulator reset?
Maybe the socket functionality rather be implemented as a nengo.Process to properly support resets?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was written before nengo.Process was implemented. So... if anyone has the free time to switch the code over and test it, by all means! 😄

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

if t == 0:
self.value = [0.0] * self.recv_dim
self.last_t = 0.0
self.last_packet_t = 0.0

# Empty the buffer
while not self.buffer.empty():
self.buffer.get()

self.close()
return self.value
# Initialize socket if t > 0, and it has not been initialized
if t > 0 and ((self.recv_socket is None and self.is_receiver) or
(self.send_socket is None and self.is_sender)):
self._open_socket()

# Calculate dt
self.dt = t - self.last_t
# most often that an update can be sent is every self.dt,
# so if remote dt is smaller just use self.dt for check
self.dt_remote = max(self.dt_remote, self.dt)
self.last_t = t * 1.0
self.last_active = time.time()

if self.is_sender:
# Calculate if it is time to send the next packet.
# Ideal time to send is last_packet_t + dt_remote, and we
# want to find out if current or next local time step is closest.
if (t + self.dt / 2.0) >= (self.last_packet_t + self.dt_remote):
for addr in self.dest_addr:
for port in self.dest_port:
self.send_socket.sendto(self.pack_packet(t * 1.0, x),
(addr, port))
self.last_packet_t = t * 1.0 # Copy t (which is a scalar)

if self.is_receiver:
found_item = False
if not self.buffer.empty():
# There are items (packets with future timestamps) in the
# buffer. Check the buffer for appropriate information
t_peek = self.buffer.queue[0][0]
if self._t_check(t_peek, t):
# Time stamp of first item in buffer is >= t and < t+dt,
# meaning that this is the information for the current
# time step, so it should be used.
data = self.buffer.get()
self.value = data[1]
found_item = True
elif (t_peek >= t + self.dt):
# Time stamp of first item in buffer is > t+dt (i.e. all
# items in the buffer are future packets). Assume packet
# for current time step has been lost, don't read the info,
# and wait for local sim time to catch up.
found_item = True

while not found_item:
try:
packet, addr = self.recv_socket.recvfrom(self.max_recv_len)
t_data, value = self.unpack_packet(packet)
if self._t_check(t_data, t):
self.value = value
found_item = True
elif (t_data >= t + self.dt):
self.buffer.put((t_data, value))
found_item = True

# Packet recv success! Decay idle_time_limit time
# to the user specified max_idle_time (which can be
# smaller than the socket timeout time)
self.idle_time_limit = \
max(self.idle_time_limit_min,
self.idle_time_limit * 0.9)
self.retry_backoff_time = \
max(1, self.retry_backoff_time / 2)

except (socket.error, AttributeError) as error:
found_item = False

# Socket error has occurred. Probably a timeout.
# Assume worst case, set idle_time_limit to
# idle_time_limit_max to wait for more timeouts to
# occur (this is so that the socket isn't constantly
# closed by the check_alive thread)
self.idle_time_limit = self.idle_time_limit_max

# Timeout occurred, assume packet lost.
if isinstance(error, socket.timeout):
found_item = True

# If connection was reset (somehow?), or closed by the
# idle timer (prematurely), retry the connection, and
# retry receiving the packet again.
if (hasattr(error, 'errno') and error.errno ==
errno.ECONNRESET) or self.recv_socket is None:
self._retry_connection()

warnings.warn("UDPSocket Error @ t = " + str(t) +
"s: " + str(error))

# Return retrieved value
return self.value

def close(self):
"""Close all threads and sockets."""
self._terminate_alive_check_thread()
# Double make sure all sockets are closed
self._close_send_socket()
self._close_recv_socket()
Loading