diff --git a/locust/core.py b/locust/core.py index 07932b9be6..cca8507451 100644 --- a/locust/core.py +++ b/locust/core.py @@ -17,13 +17,15 @@ from .clients import HttpSession from .exception import (InterruptTaskSet, LocustError, RescheduleTask, RescheduleTaskImmediately, StopLocust, MissingWaitTimeError) -from .runners import STATE_CLEANUP, LOCUST_STATE_RUNNING, LOCUST_STATE_STOPPING, LOCUST_STATE_WAITING from .util import deprecation logger = logging.getLogger(__name__) +LOCUST_STATE_RUNNING, LOCUST_STATE_WAITING, LOCUST_STATE_STOPPING = ["running", "waiting", "stopping"] + + def task(weight=1): """ Used as a convenience decorator to be able to declare tasks for a Locust or a TaskSet @@ -253,13 +255,24 @@ def _set_setup_flag(cls): def _set_teardown_flag(cls): cls._teardown_is_set = True + def on_start(self): + """ + Hook for end-user scripts for running code when a Locust user starts running + """ + pass + + def on_stop(self): + """ + Hook for end-user scripts for running code when a Locust user stops running + """ + pass + def run(self, *args, **kwargs): self.args = args self.kwargs = kwargs try: - if hasattr(self, "on_start"): - self.on_start() + self.on_start() except InterruptTaskSet as e: if e.reschedule: raise RescheduleTaskImmediately(e.reschedule).with_traceback(sys.exc_info()[2]) @@ -272,27 +285,22 @@ def run(self, *args, **kwargs): self.schedule_task(self.get_next_task()) try: - if self.locust._state == LOCUST_STATE_STOPPING: - raise GreenletExit() + self._check_stop_condition() self.execute_next_task() - if self.locust._state == LOCUST_STATE_STOPPING: - raise GreenletExit() except RescheduleTaskImmediately: - if self.locust._state == LOCUST_STATE_STOPPING: - raise GreenletExit() pass except RescheduleTask: self.wait() else: self.wait() except InterruptTaskSet as e: + self.on_stop() if e.reschedule: raise RescheduleTaskImmediately(e.reschedule) from e else: raise RescheduleTask(e.reschedule) from e - except StopLocust: - raise - except GreenletExit: + except (StopLocust, GreenletExit): + self.on_stop() raise except Exception as e: self.locust.environment.events.locust_error.fire(locust_instance=self, exception=e, tb=sys.exc_info()[2]) @@ -361,13 +369,19 @@ class Tasks(TaskSet): )) def wait(self): + self._check_stop_condition() self.locust._state = LOCUST_STATE_WAITING self._sleep(self.wait_time()) + self._check_stop_condition() self.locust._state = LOCUST_STATE_RUNNING def _sleep(self, seconds): gevent.sleep(seconds) + def _check_stop_condition(self): + if self.locust._state == LOCUST_STATE_STOPPING: + raise StopLocust() + def interrupt(self, reschedule=True): """ Interrupt the TaskSet and hand over execution control back to the parent TaskSet. @@ -527,7 +541,8 @@ class ForumPage(TaskSet): _setup_has_run = False # Internal state to see if we have already run _teardown_is_set = False # Internal state to see if we have already run _lock = gevent.lock.Semaphore() # Lock to make sure setup is only run once - _state = False + _state = None + _greenlet = None def __init__(self, environment): super(Locust, self).__init__() @@ -556,21 +571,70 @@ def _set_setup_flag(cls): def _set_teardown_flag(cls): cls._teardown_is_set = True - def run(self, runner=None): + def on_start(self): + """ + Hook for end-user scripts for running code when a Locust user starts running + """ + pass + + def on_stop(self): + """ + Hook for end-user scripts for running code when a Locust user stops running + """ + pass + + def run(self): + self._state = LOCUST_STATE_RUNNING task_set_instance = DefaultTaskSet(self) try: - if hasattr(self, "on_start"): - self.on_start() + # run the task_set on_start method, if it has one + self.on_start() + task_set_instance.run() - except StopLocust: - pass - except GreenletExit as e: - if runner: - runner.state = STATE_CLEANUP - # Run the task_set on_stop method, if it has one - if hasattr(task_set_instance, "on_stop"): - task_set_instance.on_stop() - raise # Maybe something relies on this except being raised? + except (GreenletExit, StopLocust) as e: + # run the on_stop method, if it has one + self.on_stop() + + def start(self, gevent_group): + """ + Start a greenlet that runs this locust instance. + + *Arguments*: + + * gevent_group: gevent.pool.Group instance where the greenlet will be spawned. + + Returns the spawned greenlet. + """ + def run_locust(user): + """ + Main function for Locust user greenlet. It's important that this function takes the locust + instance as argument, since we use greenlet_instance.args[0] to retrieve a reference to the + locust instance. + """ + user.run() + self._greenlet = gevent_group.spawn(run_locust, self) + return self._greenlet + + def stop(self, gevent_group, force=False): + """ + Stop the locust user greenlet that exists in the gevent_group. + This method is not meant to be called from within of the Locust's greenlet. + + *Arguments*: + + * gevent_group: gevent.pool.Group instance where the greenlet will be spawned. + * force: If False (the default) the stopping is done gracefully by setting the state to LOCUST_STATE_STOPPING + which will make the Locust instance stop once any currently running task is complete and on_stop + methods are called. If force is True the greenlet will be killed immediately. + + Returns True if the greenlet was killed immediately, otherwise False + """ + if force or self._state == LOCUST_STATE_WAITING: + gevent_group.killone(self._greenlet) + return True + elif self._state == LOCUST_STATE_RUNNING: + self._state = LOCUST_STATE_STOPPING + return False class HttpLocust(Locust): diff --git a/locust/runners.py b/locust/runners.py index be27fe26d1..cd20e49c65 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -9,7 +9,6 @@ import gevent import psutil -from gevent import GreenletExit from gevent.pool import Group from .rpc import Message, rpc @@ -24,7 +23,6 @@ HEARTBEAT_INTERVAL = 1 HEARTBEAT_LIVENESS = 3 -LOCUST_STATE_RUNNING, LOCUST_STATE_WAITING, LOCUST_STATE_STOPPING = ["running", "waiting", "stopping"] class LocustRunner(object): def __init__(self, environment, locust_classes): @@ -135,15 +133,10 @@ def hatch(): self.environment.events.hatch_complete.fire(user_count=len(self.locusts)) return - locust = bucket.pop(random.randint(0, len(bucket)-1)) - occurrence_count[locust.__name__] += 1 - new_locust = locust(self.environment) - def start_locust(_): - try: - new_locust.run(runner=self) - except GreenletExit: - pass - self.locusts.spawn(start_locust, new_locust) + locust_class = bucket.pop(random.randint(0, len(bucket)-1)) + occurrence_count[locust_class.__name__] += 1 + new_locust = locust_class(self.environment) + new_locust.start(self.locusts) if len(self.locusts) % 10 == 0: logger.debug("%i locusts hatched" % len(self.locusts)) if bucket: @@ -161,36 +154,32 @@ def kill_locusts(self, kill_count): bucket = self.weight_locusts(kill_count) kill_count = len(bucket) logger.info("Killing %i locusts" % kill_count) - dying = [] + to_kill = [] for g in self.locusts: for l in bucket: - if l == type(g.args[0]): - dying.append(g) + user = g.args[0] + if l == type(user): + to_kill.append(user) bucket.remove(l) break - self.kill_locust_greenlets(dying) + self.kill_locust_instances(to_kill) self.environment.events.hatch_complete.fire(user_count=self.user_count) - def kill_locust_greenlets(self, greenlets): - """ - Kill running locust greenlets. If environment.stop_timeout is set, we try to stop the - Locust users gracefully - """ + + def kill_locust_instances(self, users): if self.environment.stop_timeout: dying = Group() - for g in greenlets: - locust = g.args[0] - if locust._state == LOCUST_STATE_WAITING: - self.locusts.killone(g) - else: - locust._state = LOCUST_STATE_STOPPING - dying.add(g) + for user in users: + if not user.stop(self.locusts, force=False): + # Locust.stop() returns False if the greenlet was not killed, so we'll need + # to add it's greenlet to our dying Group so we can wait for it to finish it's task + dying.add(user._greenlet) if not dying.join(timeout=self.environment.stop_timeout): logger.info("Not all locusts finished their tasks & terminated in %s seconds. Killing them..." % self.environment.stop_timeout) dying.kill(block=True) else: - for g in greenlets: - self.locusts.killone(g) + for user in users: + user.stop(self.locusts, force=True) def monitor_cpu(self): process = psutil.Process() @@ -252,10 +241,11 @@ def stepload_worker(self, hatch_rate, step_clients_growth, step_duration): gevent.sleep(step_duration) def stop(self): + self.state = STATE_CLEANUP # if we are currently hatching locusts we need to kill the hatching greenlet first if self.hatching_greenlet and not self.hatching_greenlet.ready(): self.hatching_greenlet.kill(block=True) - self.kill_locust_greenlets([g for g in self.locusts]) + self.kill_locust_instances([g.args[0] for g in self.locusts]) self.state = STATE_STOPPED self.cpu_log_warning() self.environment.events.locust_stop_hatching.fire() diff --git a/locust/test/test_locust_class.py b/locust/test/test_locust_class.py index 7b38f48ce6..898d4b012d 100644 --- a/locust/test/test_locust_class.py +++ b/locust/test/test_locust_class.py @@ -1,3 +1,7 @@ +import gevent +from gevent import sleep +from gevent.pool import Group + from locust import InterruptTaskSet, ResponseError from locust.core import HttpLocust, Locust, TaskSet, task from locust.env import Environment @@ -154,6 +158,68 @@ def t2(self): self.assertRaises(RescheduleTask, lambda: l.run()) self.assertTrue(l.t1_executed) self.assertTrue(l.t2_executed) + + def test_on_stop_interrupt(self): + class MyTasks(TaskSet): + t2_executed = False + on_stop_executed = False + + def on_stop(self): + self.on_stop_executed = True + + @task + def t2(self): + self.t2_executed = True + self.interrupt(reschedule=False) + + ts = MyTasks(self.locust) + self.assertRaises(RescheduleTask, lambda: ts.run()) + self.assertTrue(ts.t2_executed) + self.assertTrue(ts.on_stop_executed) + + def test_on_stop_interrupt_reschedule(self): + class MyTasks(TaskSet): + t2_executed = False + on_stop_executed = False + + def on_stop(self): + self.on_stop_executed = True + + @task + def t2(self): + self.t2_executed = True + self.interrupt(reschedule=True) + + ts = MyTasks(self.locust) + self.assertRaises(RescheduleTaskImmediately, lambda: ts.run()) + self.assertTrue(ts.t2_executed) + self.assertTrue(ts.on_stop_executed) + + def test_on_stop_when_locust_stops(self): + class MyTasks(TaskSet): + def on_stop(self): + self.locust.on_stop_executed = True + + @task + def t2(self): + self.locust.t2_executed = True + + class MyUser(Locust): + t2_executed = False + on_stop_executed = False + + tasks = [MyTasks] + wait_time = constant(0.1) + + group = Group() + user = MyUser(self.environment) + user.start(group) + sleep(0.05) + user.stop(group) + sleep(0) + + self.assertTrue(user.t2_executed) + self.assertTrue(user.on_stop_executed) def test_schedule_task(self): self.t1_executed = False @@ -393,6 +459,96 @@ def t2(self): l.run() self.assertTrue(l.t1_executed) self.assertTrue(l.t2_executed) + + def test_locust_on_stop(self): + class MyLocust(Locust): + on_stop_executed = False + t2_executed = True + + def on_stop(self): + self.on_stop_executed = True + + @task + def t2(self): + self.t2_executed = True + raise StopLocust() + + l = MyLocust(self.environment) + l.run() + self.assertTrue(l.on_stop_executed) + self.assertTrue(l.t2_executed) + + def test_locust_start(self): + class TestUser(Locust): + wait_time = constant(0.1) + test_state = 0 + @task + def t(self): + self.test_state = 1 + sleep(0.1) + raise StopLocust() + group = Group() + user = TestUser(self.environment) + greenlet = user.start(group) + sleep(0) + self.assertEqual(1, len(group)) + self.assertIn(greenlet, group) + self.assertEqual(1, user.test_state) + timeout = gevent.Timeout(1) + timeout.start() + group.join() + timeout.cancel() + + def test_locust_graceful_stop(self): + class TestUser(Locust): + wait_time = constant(0) + test_state = 0 + @task + def t(self): + self.test_state = 1 + sleep(0.1) + self.test_state = 2 + + group = Group() + user = TestUser(self.environment) + greenlet = user.start(group) + sleep(0) + self.assertEqual(1, user.test_state) + + # stop Locust instance gracefully + user.stop(group, force=False) + sleep(0) + # make sure instance is not killed right away + self.assertIn(greenlet, group) + self.assertEqual(1, user.test_state) + sleep(0.2) + # check that locust instance has now died and that the task got to finish + self.assertEqual(0, len(group)) + self.assertEqual(2, user.test_state) + + def test_locust_forced_stop(self): + class TestUser(Locust): + wait_time = constant(0) + test_state = 0 + @task + def t(self): + self.test_state = 1 + sleep(0.1) + self.test_state = 2 + + group = Group() + user = TestUser(self.environment) + greenlet = user.start(group) + sleep(0) + self.assertIn(greenlet, group) + self.assertEqual(1, user.test_state) + + # stop Locust instance gracefully + user.stop(group, force=True) + sleep(0) + # make sure instance is killed right away, and that the task did NOT get to finish + self.assertEqual(0, len(group)) + self.assertEqual(1, user.test_state) class TestWebLocustClass(WebserverTestCase): diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index 0dc00594eb..fc2c6e9dae 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -900,11 +900,12 @@ class MyTaskSet(TaskSet): class MyTestLocust(Locust): tasks = [MyTaskSet] + wait_time = constant(0) environment = create_environment(mocked_options()) environment.stop_timeout = short_time runner = LocalLocustRunner(environment, [MyTestLocust]) - runner.start(1, 1) + runner.start(1, 1, wait=True) gevent.sleep(0) timeout = gevent.Timeout(short_time) timeout.start() @@ -916,6 +917,35 @@ class MyTestLocust(Locust): finally: timeout.cancel() + def test_stop_timeout_with_interrupt_no_reschedule(self): + state = [0] + class MySubTaskSet(TaskSet): + @task + def a_task(self): + gevent.sleep(0.1) + state[0] = 1 + self.interrupt(reschedule=False) + + class MyTestLocust(Locust): + tasks = [MySubTaskSet] + wait_time = constant(3) + + environment = create_environment(mocked_options()) + environment.stop_timeout = 0.3 + runner = LocalLocustRunner(environment, [MyTestLocust]) + runner.start(1, 1, wait=True) + gevent.sleep(0) + timeout = gevent.Timeout(0.11) + timeout.start() + try: + runner.quit() + runner.greenlet.join() + except gevent.Timeout: + self.fail("Got Timeout exception. Interrupted locusts should exit immediately during stop_timeout.") + finally: + timeout.cancel() + self.assertEqual(1, state[0]) + def test_kill_locusts_with_stop_timeout(self): short_time = 0.05 class MyTaskSet(TaskSet):