Skip to content

Commit

Permalink
locustio#1884 User distribution should happen when new workers comes in
Browse files Browse the repository at this point in the history
  • Loading branch information
tyge68 committed Sep 15, 2021
1 parent 1c65699 commit 0f60f00
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 0 deletions.
7 changes: 7 additions & 0 deletions locust/argument_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,13 @@ def setup_parser_arguments(parser):
dest="equal_weights",
help="Use equally distributed task weights, overriding the weights specified in the locustfile.",
)
other_group.add_argument(
"--enable-rebalancing",
action="store_true",
default=False,
dest="enable_rebalancing",
help="Allow to automatically rebalance users if new workers are added after ramp up completed.",
)

user_classes_group = parser.add_argument_group("User classes")
user_classes_group.add_argument(
Expand Down
13 changes: 13 additions & 0 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
]
WORKER_REPORT_INTERVAL = 3.0
CPU_MONITOR_INTERVAL = 5.0
REBALANCE_INTERVAL = 5
HEARTBEAT_INTERVAL = 1
HEARTBEAT_LIVENESS = 3
FALLBACK_INTERVAL = 5
Expand Down Expand Up @@ -601,6 +602,7 @@ def __init__(self, environment, master_bind_host, master_bind_port):
self.master_bind_host = master_bind_host
self.master_bind_port = master_bind_port
self.spawn_rate: float = 0
self.spawning_completed = False

self.clients = WorkerNodes()
try:
Expand All @@ -621,6 +623,8 @@ def __init__(self, environment, master_bind_host, master_bind_port):

self.greenlet.spawn(self.heartbeat_worker).link_exception(greenlet_exception_handler)
self.greenlet.spawn(self.client_listener).link_exception(greenlet_exception_handler)
if self.environment.parsed_options and self.environment.parsed_options.enable_rebalancing:
self.greenlet.spawn(self.rebalance_worker).link_exception(greenlet_exception_handler)

# listener that gathers info on how many users the worker has spawned
def on_worker_report(client_id, data):
Expand Down Expand Up @@ -649,6 +653,7 @@ def cpu_log_warning(self):
return warning_emitted

def start(self, user_count: int, spawn_rate: float, **kwargs) -> None:
self.spawning_completed = False
num_workers = len(self.clients.ready) + len(self.clients.running) + len(self.clients.spawning)
if not num_workers:
logger.warning(
Expand Down Expand Up @@ -744,6 +749,7 @@ def start(self, user_count: int, spawn_rate: float, **kwargs) -> None:
timeout.cancel()

self.environment.events.spawning_complete.fire(user_count=sum(self.target_user_classes_count.values()))
self.spawning_completed = True

logger.info("All users spawned: %s" % _format_user_classes_count_for_log(self.reported_user_classes_count))

Expand Down Expand Up @@ -817,6 +823,13 @@ def check_stopped(self):
):
self.update_state(STATE_STOPPED)

def rebalance_worker(self):
while True:
gevent.sleep(REBALANCE_INTERVAL)
if self.state == STATE_RUNNING and self.spawning_completed and len(self.clients.ready) > 0:
logger.info(f'detected ready workers, rebalancing with target={self.target_user_count}, rate={self.spawn_rate}')
self.start(self.target_user_count, self.spawn_rate)

def heartbeat_worker(self):
while True:
gevent.sleep(HEARTBEAT_INTERVAL)
Expand Down

0 comments on commit 0f60f00

Please sign in to comment.