Skip to content

Commit

Permalink
enhance exception handling
Browse files Browse the repository at this point in the history
  • Loading branch information
delulu committed Mar 16, 2020
1 parent 13b6ab0 commit 8b26f51
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 27 deletions.
7 changes: 7 additions & 0 deletions locust/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,10 @@ class RescheduleTaskImmediately(Exception):
"""
When raised in a Locust task, another locust task will be rescheduled immediately
"""

class RPCError(Exception):
"""
Exception that shows bad or broken network.
When raised from zmqrpc, RPC should be reestablished.
"""
44 changes: 31 additions & 13 deletions locust/rpc/zmqrpc.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import zmq.green as zmq

from .protocol import Message
from locust.util.exception_handler import retry
from locust.exception import RPCError
import zmq.error as zmqerr
import msgpack.exceptions as msgerr

class BaseSocket(object):
def __init__(self, sock_type):
Expand All @@ -13,23 +15,37 @@ def __init__(self, sock_type):

@retry()
def send(self, msg):
self.socket.send(msg.serialize(), zmq.NOBLOCK)
try:
self.socket.send(msg.serialize(), zmq.NOBLOCK)
except zmqerr.ZMQError as e:
raise RPCError("ZMQ sent failure: %s" % (e) )

@retry()
def send_to_client(self, msg):
self.socket.send_multipart([msg.node_id.encode(), msg.serialize()])
try:
self.socket.send_multipart([msg.node_id.encode(), msg.serialize()])
except zmqerr.ZMQError as e:
raise RPCError("ZMQ sent failure: %s" % (e) )

@retry()
def recv(self):
data = self.socket.recv()
msg = Message.unserialize(data)
try:
data = self.socket.recv()
msg = Message.unserialize(data)
except msgerr.ExtraData as e:
raise RPCError("Interrupted message: %s" % (e) )
except zmqerr.ZMQError as e:
raise RPCError("Network broken: %s" % (e) )
return msg

@retry()
def recv_from_client(self):
data = self.socket.recv_multipart()
addr = data[0].decode()
msg = Message.unserialize(data[1])
try:
data = self.socket.recv_multipart()
addr = data[0].decode()
msg = Message.unserialize(data[1])
except (UnicodeDecodeError, msgerr.ExtraData) as e:
raise RPCError("Interrupted message: %s" % (e) )
except zmqerr.ZMQError as e:
raise RPCError("Network broken: %s" % (e) )
return addr, msg

def close(self):
Expand All @@ -41,12 +57,14 @@ def __init__(self, host, port):
if port == 0:
self.port = self.socket.bind_to_random_port("tcp://%s" % host)
else:
self.socket.bind("tcp://%s:%i" % (host, port))
self.port = port
try:
self.socket.bind("tcp://%s:%i" % (host, port))
self.port = port
except zmqerr.ZMQError as e:
raise RPCError("Socket bind failure: %s" % (e) )

class Client(BaseSocket):
def __init__(self, host, port, identity):
BaseSocket.__init__(self, zmq.DEALER)
self.socket.setsockopt(zmq.IDENTITY, identity.encode())
self.socket.connect("tcp://%s:%i" % (host, port))

29 changes: 15 additions & 14 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from .rpc import Message, rpc
from .stats import RequestStats, setup_distributed_stats_event_listeners

from .exception import RPCError

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -440,15 +442,15 @@ def reset_connection(self):
try:
self.server.close()
self.server = rpc.Server(self.master_bind_host, self.master_bind_port)
except Exception as e:
logger.error("Exception found when resetting connection: %s" % ( e ) )
except RPCError as e:
logger.error("Temporay failure when resetting connection: %s, will retry later." % ( e ) )

def client_listener(self):
while True:
try:
client_id, msg = self.server.recv_from_client()
except Exception as e:
logger.error("Exception found when receiving from client: %s" % ( e ) )
except RPCError as e:
logger.error("RPCError found when receiving from client: %s" % ( e ) )
self.connection_broken = True
gevent.sleep(FALLBACK_INTERVAL)
continue
Expand Down Expand Up @@ -539,8 +541,8 @@ def heartbeat(self):
while True:
try:
self.client.send(Message('heartbeat', {'state': self.slave_state, 'current_cpu_usage': self.current_cpu_usage}, self.client_id))
except Exception as e:
logger.error("Exception found when sending heartbeat: %s" % ( e ) )
except RPCError as e:
logger.error("RPCError found when sending heartbeat: %s" % ( e ) )
self.reset_connection()
gevent.sleep(HEARTBEAT_INTERVAL)

Expand All @@ -549,15 +551,16 @@ def reset_connection(self):
try:
self.client.close()
self.client = rpc.Client(self.master_host, self.master_port, self.client_id)
except Exception as e:
logger.error("Exception found when resetting connection: %s" % ( e ) )
except RPCError as e:
logger.error("Temporary failure when resetting connection: %s, will retry later." % ( e ) )

def worker(self):
while True:
try:
msg = self.client.recv()
except Exception as e:
logger.error("Exception found when receiving from master: %s" % ( e ) )
except RPCError as e:
logger.error("RPCError found when receiving from master: %s" % ( e ) )
continue
if msg.type == "hatch":
self.slave_state = STATE_HATCHING
self.client.send(Message("hatching", None, self.client_id))
Expand All @@ -584,10 +587,8 @@ def stats_reporter(self):
while True:
try:
self._send_stats()
except Exception as e:
logger.error("Connection lost to master server: %s. Aborting..." % (e))
break

except RPCError as e:
logger.error("Temporary connection lost to master server: %s, will retry later." % (e))
gevent.sleep(SLAVE_REPORT_INTERVAL)

def _send_stats(self):
Expand Down

0 comments on commit 8b26f51

Please sign in to comment.