From 0f60f001e74c05e946e6bbd69804c47774cf2bf2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thierry=20Yg=C3=A9?= Date: Wed, 15 Sep 2021 12:42:35 +0200 Subject: [PATCH] #1884 User distribution should happen when new workers comes in --- locust/argument_parser.py | 7 +++++++ locust/runners.py | 13 +++++++++++++ 2 files changed, 20 insertions(+) diff --git a/locust/argument_parser.py b/locust/argument_parser.py index 8b539689df..5abe6b8376 100644 --- a/locust/argument_parser.py +++ b/locust/argument_parser.py @@ -464,6 +464,13 @@ def setup_parser_arguments(parser): dest="equal_weights", help="Use equally distributed task weights, overriding the weights specified in the locustfile.", ) + other_group.add_argument( + "--enable-rebalancing", + action="store_true", + default=False, + dest="enable_rebalancing", + help="Allow to automatically rebalance users if new workers are added after ramp up completed.", + ) user_classes_group = parser.add_argument_group("User classes") user_classes_group.add_argument( diff --git a/locust/runners.py b/locust/runners.py index 552348c5be..b704a7977d 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -57,6 +57,7 @@ ] WORKER_REPORT_INTERVAL = 3.0 CPU_MONITOR_INTERVAL = 5.0 +REBALANCE_INTERVAL = 5 HEARTBEAT_INTERVAL = 1 HEARTBEAT_LIVENESS = 3 FALLBACK_INTERVAL = 5 @@ -601,6 +602,7 @@ def __init__(self, environment, master_bind_host, master_bind_port): self.master_bind_host = master_bind_host self.master_bind_port = master_bind_port self.spawn_rate: float = 0 + self.spawning_completed = False self.clients = WorkerNodes() try: @@ -621,6 +623,8 @@ def __init__(self, environment, master_bind_host, master_bind_port): self.greenlet.spawn(self.heartbeat_worker).link_exception(greenlet_exception_handler) self.greenlet.spawn(self.client_listener).link_exception(greenlet_exception_handler) + if self.environment.parsed_options and self.environment.parsed_options.enable_rebalancing: + self.greenlet.spawn(self.rebalance_worker).link_exception(greenlet_exception_handler) # listener that gathers info on how many users the worker has spawned def on_worker_report(client_id, data): @@ -649,6 +653,7 @@ def cpu_log_warning(self): return warning_emitted def start(self, user_count: int, spawn_rate: float, **kwargs) -> None: + self.spawning_completed = False num_workers = len(self.clients.ready) + len(self.clients.running) + len(self.clients.spawning) if not num_workers: logger.warning( @@ -744,6 +749,7 @@ def start(self, user_count: int, spawn_rate: float, **kwargs) -> None: timeout.cancel() self.environment.events.spawning_complete.fire(user_count=sum(self.target_user_classes_count.values())) + self.spawning_completed = True logger.info("All users spawned: %s" % _format_user_classes_count_for_log(self.reported_user_classes_count)) @@ -817,6 +823,13 @@ def check_stopped(self): ): self.update_state(STATE_STOPPED) + def rebalance_worker(self): + while True: + gevent.sleep(REBALANCE_INTERVAL) + if self.state == STATE_RUNNING and self.spawning_completed and len(self.clients.ready) > 0: + logger.info(f'detected ready workers, rebalancing with target={self.target_user_count}, rate={self.spawn_rate}') + self.start(self.target_user_count, self.spawn_rate) + def heartbeat_worker(self): while True: gevent.sleep(HEARTBEAT_INTERVAL)