Skip to content

Commit

Permalink
Add heartbeat to detect down slaves
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan McCall committed Dec 4, 2018
1 parent 79cd5a1 commit 9dbe76f
Showing 1 changed file with 28 additions and 5 deletions.
33 changes: 28 additions & 5 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import traceback
import warnings
from uuid import uuid4
from time import time
from time import time, sleep

import gevent
import six
Expand All @@ -23,8 +23,9 @@
# global locust runner singleton
locust_runner = None

STATE_INIT, STATE_HATCHING, STATE_RUNNING, STATE_CLEANUP, STATE_STOPPED = ["ready", "hatching", "running", "cleanup", "stopped"]
STATE_INIT, STATE_HATCHING, STATE_RUNNING, STATE_CLEANUP, STATE_STOPPED, STATE_MISSING = ["ready", "hatching", "running", "cleanup", "stopped", "missing"]
SLAVE_REPORT_INTERVAL = 3.0
HEARTBEAT_THRESHOLD = 3


class LocustRunner(object):
Expand Down Expand Up @@ -223,6 +224,7 @@ def __init__(self, id, state=STATE_INIT):
self.id = id
self.state = state
self.user_count = 0
self.heartbeat = HEARTBEAT_THRESHOLD

class MasterLocustRunner(DistributedLocustRunner):
def __init__(self, *args, **kwargs):
Expand All @@ -247,6 +249,7 @@ def running(self):
self.clients = SlaveNodesDict()
self.server = rpc.Server(self.master_bind_host, self.master_bind_port)
self.greenlet = Group()
self.greenlet.spawn(self.heartbeat_worker).link_exception(callback=self.noop)
self.greenlet.spawn(self.client_listener).link_exception(callback=self.noop)

# listener that gathers info on how many locust users the slaves has spawned
Expand Down Expand Up @@ -286,7 +289,7 @@ def start_hatching(self, locust_count, hatch_rate):
self.exceptions = {}
events.master_start_hatching.fire()

for client in six.itervalues(self.clients):
for client in xrange(num_slaves):
data = {
"hatch_rate":slave_hatch_rate,
"num_clients":slave_num_clients,
Expand All @@ -304,7 +307,7 @@ def start_hatching(self, locust_count, hatch_rate):
self.state = STATE_HATCHING

def stop(self):
for client in self.clients.hatching + self.clients.running:
for client in xrange(len(self.clients)):
self.server.send(Message("stop", None, None))
events.master_stop_hatching.fire()

Expand All @@ -313,12 +316,25 @@ def quit(self):
self.server.send(Message("quit", None, None))
self.greenlet.kill(block=True)

def heartbeat_worker(self):
while True:
sleep(1)
for client in self.clients.values():
if client.heartbeat >= 0:
client.heartbeat -= 1
elif client.heartbeat < 0:
logger.warning('Slave %s failed to send heartbeat, setting state to missing.' % client.id)
self.clients[client.id].state = STATE_MISSING

def client_listener(self):
while True:
msg = self.server.recv()
if msg.type == "client_ready":
id = msg.node_id
self.clients[id] = SlaveNode(id)
if not id in self.clients:
self.clients[id] = SlaveNode(id)
else:
self.clients[id].state = STATE_INIT
logger.info("Client %r reported as ready. Currently %i clients ready to swarm." % (id, len(self.clients.ready)))
## emit a warning if the slave's clock seem to be out of sync with our clock
#if abs(time() - msg.data["time"]) > 5.0:
Expand All @@ -329,6 +345,8 @@ def client_listener(self):
self.state = STATE_STOPPED
logger.info("Removing %s client from running clients" % (msg.node_id))
elif msg.type == "stats":
self.clients[msg.node_id].state = msg.data["state"]
self.clients[msg.node_id].heartbeat = HEARTBEAT_THRESHOLD
events.slave_report.fire(client_id=msg.node_id, data=msg.data)
elif msg.type == "hatching":
self.clients[msg.node_id].state = STATE_HATCHING
Expand Down Expand Up @@ -360,15 +378,18 @@ def __init__(self, *args, **kwargs):
self.greenlet.spawn(self.worker).link_exception(callback=self.noop)
self.client.send(Message("client_ready", None, self.client_id))
self.greenlet.spawn(self.stats_reporter).link_exception(callback=self.noop)
self.slave_state = STATE_INIT

# register listener for when all locust users have hatched, and report it to the master node
def on_hatch_complete(user_count):
self.client.send(Message("hatch_complete", {"count":user_count}, self.client_id))
self.slave_state = STATE_RUNNING
events.hatch_complete += on_hatch_complete

# register listener that adds the current number of spawned locusts to the report that is sent to the master node
def on_report_to_master(client_id, data):
data["user_count"] = self.user_count
data["state"] = self.slave_state
events.report_to_master += on_report_to_master

# register listener that sends quit message to master
Expand All @@ -386,6 +407,7 @@ def worker(self):
while True:
msg = self.client.recv()
if msg.type == "hatch":
self.slave_state = STATE_HATCHING
self.client.send(Message("hatching", None, self.client_id))
job = msg.data
self.hatch_rate = job["hatch_rate"]
Expand All @@ -396,6 +418,7 @@ def worker(self):
self.stop()
self.client.send(Message("client_stopped", None, self.client_id))
self.client.send(Message("client_ready", None, self.client_id))
self.slave_state = STATE_INIT
elif msg.type == "quit":
logger.info("Got quit message from master, shutting down...")
self.stop()
Expand Down

0 comments on commit 9dbe76f

Please sign in to comment.