Skip to content

Commit

Permalink
ensure master2slave connection in heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
delulu committed Mar 9, 2020
1 parent ed3b263 commit 105d15a
Showing 1 changed file with 42 additions and 6 deletions.
48 changes: 42 additions & 6 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
STATE_INIT, STATE_HATCHING, STATE_RUNNING, STATE_CLEANUP, STATE_STOPPING, STATE_STOPPED, STATE_MISSING = ["ready", "hatching", "running", "cleanup", "stopping", "stopped", "missing"]
SLAVE_REPORT_INTERVAL = 3.0
CPU_MONITOR_INTERVAL = 5.0
FALLBACK_INTERVAL = 5.0

LOCUST_STATE_RUNNING, LOCUST_STATE_WAITING, LOCUST_STATE_STOPPING = ["running", "waiting", "stopping"]

Expand All @@ -44,6 +45,7 @@ def __init__(self, locust_classes, options):
self.exceptions = {}
self.stats = global_stats
self.step_load = options.step_load
self.connection_broken = False

# register listener that resets stats when hatching is complete
def on_hatch_complete(user_count):
Expand Down Expand Up @@ -141,7 +143,8 @@ def hatch():
def start_locust(_):
try:
new_locust.run(runner=self)
except GreenletExit:
except GreenletExit as e:
logger.error("Greenlet Exit in locust run: %s" % ( e ) )
pass
self.locusts.spawn(start_locust, new_locust)
if len(self.locusts) % 10 == 0:
Expand Down Expand Up @@ -426,6 +429,9 @@ def quit(self):
def heartbeat_worker(self):
while True:
gevent.sleep(self.heartbeat_interval)
if self.connection_broken:
self.reset_connection()
continue
for client in self.clients.all:
if client.heartbeat < 0 and client.state != STATE_MISSING:
logger.info('Slave %s failed to send heartbeat, setting state to missing.' % str(client.id))
Expand All @@ -434,9 +440,24 @@ def heartbeat_worker(self):
else:
client.heartbeat -= 1

def reset_connection(self):
logger.info("Reset connection to slave")
try:
self.server.close()
self.server = rpc.Server(self.master_bind_host, self.master_bind_port)
except Exception as e:
logger.error("Exception found when resetting connection: %s" % ( e ) )

def client_listener(self):
while True:
client_id, msg = self.server.recv_from_client()
try:
client_id, msg = self.server.recv_from_client()
except Exception as e:
logger.error("Exception found when receiving from client: %s" % ( e ) )
self.connection_broken = True
gevent.sleep(FALLBACK_INTERVAL)
continue
self.connection_broken = False
msg.node_id = client_id
if msg.type == "client_ready":
id = msg.node_id
Expand Down Expand Up @@ -521,12 +542,27 @@ def on_locust_error(locust_instance, exception, tb):

def heartbeat(self):
while True:
self.client.send(Message('heartbeat', {'state': self.slave_state, 'current_cpu_usage': self.current_cpu_usage}, self.client_id))
try:
self.client.send(Message('heartbeat', {'state': self.slave_state, 'current_cpu_usage': self.current_cpu_usage}, self.client_id))
except Exception as e:
logger.error("Exception found when sending heartbeat: %s" % ( e ) )
self.reset_connection()
gevent.sleep(self.heartbeat_interval)

def reset_connection(self):
logger.info("Reset connection to master")
try:
self.client.close()
self.client = rpc.Client(self.master_host, self.master_port, self.client_id)
except Exception as e:
logger.error("Exception found when resetting connection: %s" % ( e ) )

def worker(self):
while True:
msg = self.client.recv()
try:
msg = self.client.recv()
except Exception as e:
logger.error("Exception found when receiving from master: %s" % ( e ) )
if msg.type == "hatch":
self.slave_state = STATE_HATCHING
self.client.send(Message("hatching", None, self.client_id))
Expand All @@ -553,8 +589,8 @@ def stats_reporter(self):
while True:
try:
self._send_stats()
except:
logger.error("Connection lost to master server. Aborting...")
except Exception as e:
logger.error("Connection lost to master server: %s. Aborting..." % (e))
break

gevent.sleep(SLAVE_REPORT_INTERVAL)
Expand Down

0 comments on commit 105d15a

Please sign in to comment.