Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple Runner and Locust code by introducing Locust.start and Locust.stop methods #1306

Merged
merged 6 commits into from
Apr 3, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 60 additions & 18 deletions locust/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
from .clients import HttpSession
from .exception import (InterruptTaskSet, LocustError, RescheduleTask,
RescheduleTaskImmediately, StopLocust, MissingWaitTimeError)
from .runners import STATE_CLEANUP, LOCUST_STATE_RUNNING, LOCUST_STATE_STOPPING, LOCUST_STATE_WAITING
from .util import deprecation


logger = logging.getLogger(__name__)


LOCUST_STATE_RUNNING, LOCUST_STATE_WAITING, LOCUST_STATE_STOPPING = ["running", "waiting", "stopping"]


def task(weight=1):
"""
Used as a convenience decorator to be able to declare tasks for a Locust or a TaskSet
Expand Down Expand Up @@ -272,14 +274,9 @@ def run(self, *args, **kwargs):
self.schedule_task(self.get_next_task())

try:
if self.locust._state == LOCUST_STATE_STOPPING:
raise GreenletExit()
self._check_stop_condition()
self.execute_next_task()
if self.locust._state == LOCUST_STATE_STOPPING:
raise GreenletExit()
except RescheduleTaskImmediately:
if self.locust._state == LOCUST_STATE_STOPPING:
raise GreenletExit()
pass
except RescheduleTask:
self.wait()
Expand Down Expand Up @@ -361,13 +358,19 @@ class Tasks(TaskSet):
))

def wait(self):
self._check_stop_condition()
self.locust._state = LOCUST_STATE_WAITING
self._sleep(self.wait_time())
self._check_stop_condition()
self.locust._state = LOCUST_STATE_RUNNING

def _sleep(self, seconds):
gevent.sleep(seconds)

def _check_stop_condition(self):
if self.locust._state == LOCUST_STATE_STOPPING:
raise StopLocust()

def interrupt(self, reschedule=True):
"""
Interrupt the TaskSet and hand over execution control back to the parent TaskSet.
Expand Down Expand Up @@ -527,7 +530,8 @@ class ForumPage(TaskSet):
_setup_has_run = False # Internal state to see if we have already run
_teardown_is_set = False # Internal state to see if we have already run
_lock = gevent.lock.Semaphore() # Lock to make sure setup is only run once
_state = False
_state = None
_greenlet = None

def __init__(self, environment):
super(Locust, self).__init__()
Expand Down Expand Up @@ -556,21 +560,59 @@ def _set_setup_flag(cls):
def _set_teardown_flag(cls):
cls._teardown_is_set = True

def run(self, runner=None):
def run(self):
self._state = LOCUST_STATE_RUNNING
task_set_instance = self._task_set(self)
try:
if hasattr(self, "on_start"):
Copy link
Collaborator

@cyberw cyberw Apr 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be possible to define an on_start/on_stop method on the TaskSet base class that does nothing? Instead of checking whether they exist...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that might be better. I guess that any performance difference should be negligible (haven't measure so I don't even know which one is fastest).

# run the task_set on_start method, if it has one
self.on_start()
task_set_instance.run()
except StopLocust:
pass
except GreenletExit as e:
if runner:
runner.state = STATE_CLEANUP
# Run the task_set on_stop method, if it has one
if hasattr(task_set_instance, "on_stop"):
task_set_instance.on_stop()
raise # Maybe something relies on this except being raised?
except (GreenletExit, StopLocust) as e:
if hasattr(self, "on_stop"):
# run the task_set on_stop method, if it has one
self.on_stop()

def start(self, gevent_group):
"""
Start a greenlet that runs this locust instance.

*Arguments*:

* gevent_group: gevent.pool.Group instance where the greenlet will be spawned.

Returns the spawned greenlet.
"""
def run_locust(user):
"""
Main function gor Locust user greenlet. It's important that this function takes the locust
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gor? :P

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, will fix :)

instance as argument, since we use greenlet_instance.args[0] to retrieve a reference to the
locust instance.
"""
user.run()
self._greenlet = gevent_group.spawn(run_locust, self)
return self._greenlet

def stop(self, gevent_group, force=False):
"""
Stop the locust user greenlet that exists in the gevent_group.
This method is not meant to be called from within of the Locust's greenlet.

*Arguments*:

* gevent_group: gevent.pool.Group instance where the greenlet will be spawned.
* force: If False (the default) the stopping is done gracefully by setting the state to LOCUST_STATE_STOPPING
which will make the Locust instance stop once any currently running task is complete and on_stop
methods are called. If force is True the greenlet will be killed immediately.

Returns True if the greenlet was killed immediately, otherwise False
"""
if force or self._state == LOCUST_STATE_WAITING:
gevent_group.killone(self._greenlet)
return True
elif self._state == LOCUST_STATE_RUNNING:
self._state = LOCUST_STATE_STOPPING
return False


class HttpLocust(Locust):
Expand Down
50 changes: 20 additions & 30 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import gevent
import psutil
from gevent import GreenletExit
from gevent.pool import Group

from .rpc import Message, rpc
Expand All @@ -24,7 +23,6 @@
HEARTBEAT_INTERVAL = 1
HEARTBEAT_LIVENESS = 3

LOCUST_STATE_RUNNING, LOCUST_STATE_WAITING, LOCUST_STATE_STOPPING = ["running", "waiting", "stopping"]

class LocustRunner(object):
def __init__(self, environment, locust_classes):
Expand Down Expand Up @@ -135,15 +133,10 @@ def hatch():
self.environment.events.hatch_complete.fire(user_count=len(self.locusts))
return

locust = bucket.pop(random.randint(0, len(bucket)-1))
occurrence_count[locust.__name__] += 1
new_locust = locust(self.environment)
def start_locust(_):
try:
new_locust.run(runner=self)
except GreenletExit:
pass
self.locusts.spawn(start_locust, new_locust)
locust_class = bucket.pop(random.randint(0, len(bucket)-1))
occurrence_count[locust_class.__name__] += 1
new_locust = locust_class(self.environment)
new_locust.start(self.locusts)
if len(self.locusts) % 10 == 0:
logger.debug("%i locusts hatched" % len(self.locusts))
if bucket:
Expand All @@ -161,36 +154,32 @@ def kill_locusts(self, kill_count):
bucket = self.weight_locusts(kill_count)
kill_count = len(bucket)
logger.info("Killing %i locusts" % kill_count)
dying = []
to_kill = []
for g in self.locusts:
for l in bucket:
if l == type(g.args[0]):
dying.append(g)
user = g.args[0]
if l == type(user):
to_kill.append(user)
bucket.remove(l)
break
self.kill_locust_greenlets(dying)
self.kill_locust_instances(to_kill)
self.environment.events.hatch_complete.fire(user_count=self.user_count)

def kill_locust_greenlets(self, greenlets):
"""
Kill running locust greenlets. If environment.stop_timeout is set, we try to stop the
Locust users gracefully
"""

def kill_locust_instances(self, users):
if self.environment.stop_timeout:
dying = Group()
for g in greenlets:
locust = g.args[0]
if locust._state == LOCUST_STATE_WAITING:
self.locusts.killone(g)
else:
locust._state = LOCUST_STATE_STOPPING
dying.add(g)
for user in users:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you're using "user/s" instead of "locust/s" because we're renaming it? it is confusing when the comments talk about locusts but the code about users :P

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I figured we're renaming it, and even before we've done that I don't think user can be mistaken for anything else in that context.

if not user.stop(self.locusts, force=False):
# Locust.stop() returns False if the greenlet was not killed, so we'll need
# to add it's greenlet to our dying Group so we can wait for it to finish it's task
dying.add(user._greenlet)
if not dying.join(timeout=self.environment.stop_timeout):
logger.info("Not all locusts finished their tasks & terminated in %s seconds. Killing them..." % self.environment.stop_timeout)
dying.kill(block=True)
else:
for g in greenlets:
self.locusts.killone(g)
for user in users:
user.stop(self.locusts, force=True)

def monitor_cpu(self):
process = psutil.Process()
Expand Down Expand Up @@ -252,10 +241,11 @@ def stepload_worker(self, hatch_rate, step_clients_growth, step_duration):
gevent.sleep(step_duration)

def stop(self):
self.state = STATE_CLEANUP
# if we are currently hatching locusts we need to kill the hatching greenlet first
if self.hatching_greenlet and not self.hatching_greenlet.ready():
self.hatching_greenlet.kill(block=True)
self.kill_locust_greenlets([g for g in self.locusts])
self.kill_locust_instances([g.args[0] for g in self.locusts])
self.state = STATE_STOPPED
self.cpu_log_warning()
self.environment.events.locust_stop_hatching.fire()
Expand Down
76 changes: 76 additions & 0 deletions locust/test/test_locust_class.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import gevent
from gevent import sleep
from gevent.pool import Group

from locust import InterruptTaskSet, ResponseError
from locust.core import HttpLocust, Locust, TaskSet, task
from locust.env import Environment
Expand Down Expand Up @@ -393,6 +397,78 @@ def t2(self):
l.run()
self.assertTrue(l.t1_executed)
self.assertTrue(l.t2_executed)

def test_locust_start(self):
class TestUser(Locust):
wait_time = constant(0.1)
test_state = 0
@task
def t(self):
self.test_state = 1
sleep(0.1)
raise StopLocust()
group = Group()
user = TestUser(self.environment)
greenlet = user.start(group)
sleep(0)
self.assertEqual(1, len(group))
self.assertIn(greenlet, group)
self.assertEqual(1, user.test_state)
timeout = gevent.Timeout(1)
timeout.start()
group.join()
timeout.cancel()

def test_locust_graceful_stop(self):
class TestUser(Locust):
wait_time = constant(0)
test_state = 0
@task
def t(self):
self.test_state = 1
sleep(0.1)
self.test_state = 2

group = Group()
user = TestUser(self.environment)
greenlet = user.start(group)
sleep(0)
self.assertEqual(1, user.test_state)

# stop Locust instance gracefully
user.stop(group, force=False)
sleep(0)
# make sure instance is not killed right away
self.assertIn(greenlet, group)
self.assertEqual(1, user.test_state)
sleep(0.2)
# check that locust instance has now died and that the task got to finish
self.assertEqual(0, len(group))
self.assertEqual(2, user.test_state)

def test_locust_forced_stop(self):
class TestUser(Locust):
wait_time = constant(0)
test_state = 0
@task
def t(self):
self.test_state = 1
sleep(0.1)
self.test_state = 2

group = Group()
user = TestUser(self.environment)
greenlet = user.start(group)
sleep(0)
self.assertIn(greenlet, group)
self.assertEqual(1, user.test_state)

# stop Locust instance gracefully
user.stop(group, force=True)
sleep(0)
# make sure instance is killed right away, and that the task did NOT get to finish
self.assertEqual(0, len(group))
self.assertEqual(1, user.test_state)


class TestWebLocustClass(WebserverTestCase):
Expand Down
32 changes: 31 additions & 1 deletion locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -901,11 +901,12 @@ class MyTaskSet(TaskSet):

class MyTestLocust(Locust):
tasks = [MyTaskSet]
wait_time = constant(0)

environment = create_environment(mocked_options())
environment.stop_timeout = short_time
runner = LocalLocustRunner(environment, [MyTestLocust])
runner.start(1, 1)
runner.start(1, 1, wait=True)
gevent.sleep(0)
timeout = gevent.Timeout(short_time)
timeout.start()
Expand All @@ -917,6 +918,35 @@ class MyTestLocust(Locust):
finally:
timeout.cancel()

def test_stop_timeout_with_interrupt_no_reschedule(self):
state = [0]
class MySubTaskSet(TaskSet):
@task
def a_task(self):
gevent.sleep(0.1)
state[0] = 1
self.interrupt(reschedule=False)

class MyTestLocust(Locust):
tasks = [MySubTaskSet]
wait_time = constant(3)

environment = create_environment(mocked_options())
environment.stop_timeout = 0.3
runner = LocalLocustRunner(environment, [MyTestLocust])
runner.start(1, 1, wait=True)
gevent.sleep(0)
timeout = gevent.Timeout(0.11)
timeout.start()
try:
runner.quit()
runner.greenlet.join()
except gevent.Timeout:
self.fail("Got Timeout exception. Interrupted locusts should exit immediately during stop_timeout.")
finally:
timeout.cancel()
self.assertEqual(1, state[0])

def test_kill_locusts_with_stop_timeout(self):
short_time = 0.05
class MyTaskSet(TaskSet):
Expand Down