diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index fc1f5e39c1..dea83c7770 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -5,7 +5,7 @@ import unittest from collections import defaultdict from contextlib import contextmanager -from operator import itemgetter +from operator import add, itemgetter import gevent import mock @@ -47,6 +47,7 @@ User, task, ) +from retry import retry NETWORK_BROKEN = "network broken" @@ -663,6 +664,84 @@ def incr_stats(self): "For some reason the master node's stats has not come in", ) + def test_distributed_rebalanced_integration_run(self): + """ + Full integration test that starts both a MasterRunner and three WorkerRunner instances + and makes sure that their stats is sent to the Master. + """ + + class TestUser(User): + wait_time = constant(0.1) + + @task + def incr_stats(self): + self.environment.events.request.fire( + request_type="GET", + name="/", + response_time=1337, + response_length=666, + exception=None, + context={}, + ) + + with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3): + # start a Master runner + options = parse_options(["--enable-rebalancing", "--rebalance-interval", "1"]) + master_env = Environment(user_classes=[TestUser], parsed_options=options) + master = master_env.create_master_runner("*", 0) + sleep(0) + # start 3 Worker runners + workers = [] + + def addWorker(): + worker_env = Environment(user_classes=[TestUser]) + worker = worker_env.create_worker_runner("127.0.0.1", master.server.port) + workers.append(worker) + + for i in range(3): + addWorker() + + # give workers time to connect + sleep(0.1) + # issue start command that should trigger TestUsers to be spawned in the Workers + master.start(6, spawn_rate=1000) + sleep(0.1) + # check that worker nodes have started locusts + for worker in workers: + self.assertEqual(2, worker.user_count) + # give time for users to generate stats, and stats to be sent to master + # Add 1 more workers (should be 4 now) + addWorker() + + @retry(AssertionError, tries=10, delay=0.5) + def checkRebalancedTrue(): + for worker in workers: + self.assertTrue(worker.user_count > 0) + + checkRebalancedTrue() + # Add 2 more workers (should be 6 now) + addWorker() + addWorker() + + @retry(AssertionError, tries=10, delay=0.5) + def checkRebalancedEquals(): + for worker in workers: + self.assertEqual(1, worker.user_count) + + checkRebalancedEquals() + sleep(1) + master.quit() + # make sure users are killed + for worker in workers: + self.assertEqual(0, worker.user_count) + + # check that stats are present in master + self.assertGreater( + master_env.runner.stats.total.num_requests, + 20, + "For some reason the master node's stats has not come in", + ) + def test_distributed_run_with_custom_args(self): """ Full integration test that starts both a MasterRunner and three WorkerRunner instances