From 9b7952005bbed6393ce878c6b04d6e37bd4b5e18 Mon Sep 17 00:00:00 2001 From: delulu Date: Mon, 9 Mar 2020 17:28:16 +0800 Subject: [PATCH] ensure master2slave connection in heartbeat --- locust/rpc/zmqrpc.py | 5 ++++- locust/runners.py | 48 ++++++++++++++++++++++++++++++++++++++------ 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/locust/rpc/zmqrpc.py b/locust/rpc/zmqrpc.py index ec3f1c48b0..a7a6f8a7c1 100644 --- a/locust/rpc/zmqrpc.py +++ b/locust/rpc/zmqrpc.py @@ -13,7 +13,7 @@ def __init__(self, sock_type): @retry() def send(self, msg): - self.socket.send(msg.serialize()) + self.socket.send(msg.serialize(), zmq.NOBLOCK) @retry() def send_to_client(self, msg): @@ -32,6 +32,9 @@ def recv_from_client(self): msg = Message.unserialize(data[1]) return addr, msg + def close(self): + self.socket.close() + class Server(BaseSocket): def __init__(self, host, port): BaseSocket.__init__(self, zmq.ROUTER) diff --git a/locust/runners.py b/locust/runners.py index 7c48c8c337..15f4efbb55 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -24,6 +24,7 @@ STATE_INIT, STATE_HATCHING, STATE_RUNNING, STATE_CLEANUP, STATE_STOPPING, STATE_STOPPED, STATE_MISSING = ["ready", "hatching", "running", "cleanup", "stopping", "stopped", "missing"] SLAVE_REPORT_INTERVAL = 3.0 CPU_MONITOR_INTERVAL = 5.0 +FALLBACK_INTERVAL = 5.0 LOCUST_STATE_RUNNING, LOCUST_STATE_WAITING, LOCUST_STATE_STOPPING = ["running", "waiting", "stopping"] @@ -44,6 +45,7 @@ def __init__(self, locust_classes, options): self.exceptions = {} self.stats = global_stats self.step_load = options.step_load + self.connection_broken = False # register listener that resets stats when hatching is complete def on_hatch_complete(user_count): @@ -141,7 +143,8 @@ def hatch(): def start_locust(_): try: new_locust.run(runner=self) - except GreenletExit: + except GreenletExit as e: + logger.error("Greenlet Exit in locust run: %s" % ( e ) ) pass self.locusts.spawn(start_locust, new_locust) if len(self.locusts) % 10 == 0: @@ -426,6 +429,9 @@ def quit(self): def heartbeat_worker(self): while True: gevent.sleep(self.heartbeat_interval) + if self.connection_broken: + self.reset_connection() + continue for client in self.clients.all: if client.heartbeat < 0 and client.state != STATE_MISSING: logger.info('Slave %s failed to send heartbeat, setting state to missing.' % str(client.id)) @@ -434,9 +440,24 @@ def heartbeat_worker(self): else: client.heartbeat -= 1 + def reset_connection(self): + logger.info("Reset connection to slave") + try: + self.server.close() + self.server = rpc.Server(self.master_bind_host, self.master_bind_port) + except Exception as e: + logger.error("Exception found when resetting connection: %s" % ( e ) ) + def client_listener(self): while True: - client_id, msg = self.server.recv_from_client() + try: + client_id, msg = self.server.recv_from_client() + except Exception as e: + logger.error("Exception found when receiving from client: %s" % ( e ) ) + self.connection_broken = True + gevent.sleep(FALLBACK_INTERVAL) + continue + self.connection_broken = False msg.node_id = client_id if msg.type == "client_ready": id = msg.node_id @@ -521,12 +542,27 @@ def on_locust_error(locust_instance, exception, tb): def heartbeat(self): while True: - self.client.send(Message('heartbeat', {'state': self.slave_state, 'current_cpu_usage': self.current_cpu_usage}, self.client_id)) + try: + self.client.send(Message('heartbeat', {'state': self.slave_state, 'current_cpu_usage': self.current_cpu_usage}, self.client_id)) + except Exception as e: + logger.error("Exception found when sending heartbeat: %s" % ( e ) ) + self.reset_connection() gevent.sleep(self.heartbeat_interval) + def reset_connection(self): + logger.info("Reset connection to master") + try: + self.client.close() + self.client = rpc.Client(self.master_host, self.master_port, self.client_id) + except Exception as e: + logger.error("Exception found when resetting connection: %s" % ( e ) ) + def worker(self): while True: - msg = self.client.recv() + try: + msg = self.client.recv() + except Exception as e: + logger.error("Exception found when receiving from master: %s" % ( e ) ) if msg.type == "hatch": self.slave_state = STATE_HATCHING self.client.send(Message("hatching", None, self.client_id)) @@ -553,8 +589,8 @@ def stats_reporter(self): while True: try: self._send_stats() - except: - logger.error("Connection lost to master server. Aborting...") + except Exception as e: + logger.error("Connection lost to master server: %s. Aborting..." % (e)) break gevent.sleep(SLAVE_REPORT_INTERVAL)