Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple Runner and Locust code by introducing Locust.start and Locust.stop methods #1306

Merged
merged 6 commits into from
Apr 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 89 additions & 25 deletions locust/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
from .clients import HttpSession
from .exception import (InterruptTaskSet, LocustError, RescheduleTask,
RescheduleTaskImmediately, StopLocust, MissingWaitTimeError)
from .runners import STATE_CLEANUP, LOCUST_STATE_RUNNING, LOCUST_STATE_STOPPING, LOCUST_STATE_WAITING
from .util import deprecation


logger = logging.getLogger(__name__)


LOCUST_STATE_RUNNING, LOCUST_STATE_WAITING, LOCUST_STATE_STOPPING = ["running", "waiting", "stopping"]


def task(weight=1):
"""
Used as a convenience decorator to be able to declare tasks for a Locust or a TaskSet
Expand Down Expand Up @@ -253,13 +255,24 @@ def _set_setup_flag(cls):
def _set_teardown_flag(cls):
cls._teardown_is_set = True

def on_start(self):
"""
Hook for end-user scripts for running code when a Locust user starts running
"""
pass

def on_stop(self):
"""
Hook for end-user scripts for running code when a Locust user stops running
"""
pass

def run(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs

try:
if hasattr(self, "on_start"):
self.on_start()
self.on_start()
except InterruptTaskSet as e:
if e.reschedule:
raise RescheduleTaskImmediately(e.reschedule).with_traceback(sys.exc_info()[2])
Expand All @@ -272,27 +285,22 @@ def run(self, *args, **kwargs):
self.schedule_task(self.get_next_task())

try:
if self.locust._state == LOCUST_STATE_STOPPING:
raise GreenletExit()
self._check_stop_condition()
self.execute_next_task()
if self.locust._state == LOCUST_STATE_STOPPING:
raise GreenletExit()
except RescheduleTaskImmediately:
if self.locust._state == LOCUST_STATE_STOPPING:
raise GreenletExit()
pass
except RescheduleTask:
self.wait()
else:
self.wait()
except InterruptTaskSet as e:
self.on_stop()
if e.reschedule:
raise RescheduleTaskImmediately(e.reschedule) from e
else:
raise RescheduleTask(e.reschedule) from e
except StopLocust:
raise
except GreenletExit:
except (StopLocust, GreenletExit):
self.on_stop()
raise
except Exception as e:
self.locust.environment.events.locust_error.fire(locust_instance=self, exception=e, tb=sys.exc_info()[2])
Expand Down Expand Up @@ -361,13 +369,19 @@ class Tasks(TaskSet):
))

def wait(self):
self._check_stop_condition()
self.locust._state = LOCUST_STATE_WAITING
self._sleep(self.wait_time())
self._check_stop_condition()
self.locust._state = LOCUST_STATE_RUNNING

def _sleep(self, seconds):
gevent.sleep(seconds)

def _check_stop_condition(self):
if self.locust._state == LOCUST_STATE_STOPPING:
raise StopLocust()

def interrupt(self, reschedule=True):
"""
Interrupt the TaskSet and hand over execution control back to the parent TaskSet.
Expand Down Expand Up @@ -527,7 +541,8 @@ class ForumPage(TaskSet):
_setup_has_run = False # Internal state to see if we have already run
_teardown_is_set = False # Internal state to see if we have already run
_lock = gevent.lock.Semaphore() # Lock to make sure setup is only run once
_state = False
_state = None
_greenlet = None

def __init__(self, environment):
super(Locust, self).__init__()
Expand Down Expand Up @@ -556,21 +571,70 @@ def _set_setup_flag(cls):
def _set_teardown_flag(cls):
cls._teardown_is_set = True

def run(self, runner=None):
def on_start(self):
"""
Hook for end-user scripts for running code when a Locust user starts running
"""
pass

def on_stop(self):
"""
Hook for end-user scripts for running code when a Locust user stops running
"""
pass

def run(self):
self._state = LOCUST_STATE_RUNNING
task_set_instance = DefaultTaskSet(self)
try:
if hasattr(self, "on_start"):
self.on_start()
# run the task_set on_start method, if it has one
self.on_start()

task_set_instance.run()
except StopLocust:
pass
except GreenletExit as e:
if runner:
runner.state = STATE_CLEANUP
# Run the task_set on_stop method, if it has one
if hasattr(task_set_instance, "on_stop"):
task_set_instance.on_stop()
raise # Maybe something relies on this except being raised?
except (GreenletExit, StopLocust) as e:
# run the on_stop method, if it has one
self.on_stop()

def start(self, gevent_group):
"""
Start a greenlet that runs this locust instance.

*Arguments*:

* gevent_group: gevent.pool.Group instance where the greenlet will be spawned.

Returns the spawned greenlet.
"""
def run_locust(user):
"""
Main function for Locust user greenlet. It's important that this function takes the locust
instance as argument, since we use greenlet_instance.args[0] to retrieve a reference to the
locust instance.
"""
user.run()
self._greenlet = gevent_group.spawn(run_locust, self)
return self._greenlet

def stop(self, gevent_group, force=False):
"""
Stop the locust user greenlet that exists in the gevent_group.
This method is not meant to be called from within of the Locust's greenlet.

*Arguments*:

* gevent_group: gevent.pool.Group instance where the greenlet will be spawned.
* force: If False (the default) the stopping is done gracefully by setting the state to LOCUST_STATE_STOPPING
which will make the Locust instance stop once any currently running task is complete and on_stop
methods are called. If force is True the greenlet will be killed immediately.

Returns True if the greenlet was killed immediately, otherwise False
"""
if force or self._state == LOCUST_STATE_WAITING:
gevent_group.killone(self._greenlet)
return True
elif self._state == LOCUST_STATE_RUNNING:
self._state = LOCUST_STATE_STOPPING
return False


class HttpLocust(Locust):
Expand Down
50 changes: 20 additions & 30 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import gevent
import psutil
from gevent import GreenletExit
from gevent.pool import Group

from .rpc import Message, rpc
Expand All @@ -24,7 +23,6 @@
HEARTBEAT_INTERVAL = 1
HEARTBEAT_LIVENESS = 3

LOCUST_STATE_RUNNING, LOCUST_STATE_WAITING, LOCUST_STATE_STOPPING = ["running", "waiting", "stopping"]

class LocustRunner(object):
def __init__(self, environment, locust_classes):
Expand Down Expand Up @@ -135,15 +133,10 @@ def hatch():
self.environment.events.hatch_complete.fire(user_count=len(self.locusts))
return

locust = bucket.pop(random.randint(0, len(bucket)-1))
occurrence_count[locust.__name__] += 1
new_locust = locust(self.environment)
def start_locust(_):
try:
new_locust.run(runner=self)
except GreenletExit:
pass
self.locusts.spawn(start_locust, new_locust)
locust_class = bucket.pop(random.randint(0, len(bucket)-1))
occurrence_count[locust_class.__name__] += 1
new_locust = locust_class(self.environment)
new_locust.start(self.locusts)
if len(self.locusts) % 10 == 0:
logger.debug("%i locusts hatched" % len(self.locusts))
if bucket:
Expand All @@ -161,36 +154,32 @@ def kill_locusts(self, kill_count):
bucket = self.weight_locusts(kill_count)
kill_count = len(bucket)
logger.info("Killing %i locusts" % kill_count)
dying = []
to_kill = []
for g in self.locusts:
for l in bucket:
if l == type(g.args[0]):
dying.append(g)
user = g.args[0]
if l == type(user):
to_kill.append(user)
bucket.remove(l)
break
self.kill_locust_greenlets(dying)
self.kill_locust_instances(to_kill)
self.environment.events.hatch_complete.fire(user_count=self.user_count)

def kill_locust_greenlets(self, greenlets):
"""
Kill running locust greenlets. If environment.stop_timeout is set, we try to stop the
Locust users gracefully
"""

def kill_locust_instances(self, users):
if self.environment.stop_timeout:
dying = Group()
for g in greenlets:
locust = g.args[0]
if locust._state == LOCUST_STATE_WAITING:
self.locusts.killone(g)
else:
locust._state = LOCUST_STATE_STOPPING
dying.add(g)
for user in users:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you're using "user/s" instead of "locust/s" because we're renaming it? it is confusing when the comments talk about locusts but the code about users :P

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I figured we're renaming it, and even before we've done that I don't think user can be mistaken for anything else in that context.

if not user.stop(self.locusts, force=False):
# Locust.stop() returns False if the greenlet was not killed, so we'll need
# to add it's greenlet to our dying Group so we can wait for it to finish it's task
dying.add(user._greenlet)
if not dying.join(timeout=self.environment.stop_timeout):
logger.info("Not all locusts finished their tasks & terminated in %s seconds. Killing them..." % self.environment.stop_timeout)
dying.kill(block=True)
else:
for g in greenlets:
self.locusts.killone(g)
for user in users:
user.stop(self.locusts, force=True)

def monitor_cpu(self):
process = psutil.Process()
Expand Down Expand Up @@ -252,10 +241,11 @@ def stepload_worker(self, hatch_rate, step_clients_growth, step_duration):
gevent.sleep(step_duration)

def stop(self):
self.state = STATE_CLEANUP
# if we are currently hatching locusts we need to kill the hatching greenlet first
if self.hatching_greenlet and not self.hatching_greenlet.ready():
self.hatching_greenlet.kill(block=True)
self.kill_locust_greenlets([g for g in self.locusts])
self.kill_locust_instances([g.args[0] for g in self.locusts])
self.state = STATE_STOPPED
self.cpu_log_warning()
self.environment.events.locust_stop_hatching.fire()
Expand Down
Loading