Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --processes parameter to automatically fork worker processes #2472

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5d631f3
Add --processes parameter: Number of times to fork the locust process…
cyberw Nov 18, 2023
cb0c9f7
integration tests: ensure port 5557 is closed after each test
cyberw Nov 18, 2023
1d25c8c
test_main: Ensure port 5557 is available when starting test cases.
cyberw Nov 18, 2023
d1ecb13
test_main: Only check port availability in distributed tests, and fai…
cyberw Nov 18, 2023
6aeab6b
flake8. And take extra care to communicate with child processes in te…
cyberw Nov 18, 2023
08a5659
test_main: Give more time for process to finish.
cyberw Nov 18, 2023
cb02329
test_main: allow test_processes_ctrl_c to take a little longer...
cyberw Nov 18, 2023
ecd2a1d
Add a "fast" integration test to run in parallel with the normal suit…
cyberw Nov 18, 2023
783320c
fix fast_integration_test run
cyberw Nov 18, 2023
d2b58ca
make broken test fail even faster
cyberw Nov 18, 2023
aa170aa
Show debug log in messy test case.
cyberw Nov 18, 2023
d16ae25
dont use a subshell in tests cases if you intend to signal it...
cyberw Nov 18, 2023
935e596
Correct fast_integration_test. Remove workarounds in test_processes_c…
cyberw Nov 18, 2023
7a86d40
Rename fail-fast test stage to fail_fast_test_main.
cyberw Nov 18, 2023
fb3947e
Add workaround for python 3.8. Probably want to remove support for th…
cyberw Nov 18, 2023
9f746ce
fix the workaround for python 3.8
cyberw Nov 18, 2023
7a1aa21
define sigint_handler where it is needed, and not before.
cyberw Nov 19, 2023
b0e6548
fix the workaround for python 3.8. again.
cyberw Nov 19, 2023
5f275e7
fix typo in tox
cyberw Nov 19, 2023
169c63c
final final workaround for 3.8
cyberw Nov 19, 2023
c9390cf
cleaner way to detect whether it is the second time sigint_handler is…
cyberw Nov 19, 2023
bc37634
fix
cyberw Nov 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ jobs:
fail-fast: false
matrix:
include:
#- {name: Linux, python: '3.9', os: ubuntu-latest, tox: py39}
#- {name: Windows, python: '3.9', os: windows-latest, tox: py39}
#- {name: Mac, python: '3.9', os: macos-latest, tox: py39}
- { name: "flake8", python: "3.11", os: ubuntu-latest, tox: "flake8" }
- { name: "black", python: "3.11", os: ubuntu-latest, tox: "black" }
- { name: "mypy", python: "3.10", os: ubuntu-latest, tox: "mypy" }
# run some integration tests and abort immediately if they fail, for faster feedback
- { name: "fail_fast_test_main", python: "3.12", os: ubuntu-latest, tox: fail_fast_test_main }
- { name: "3.12", python: "3.12", os: ubuntu-latest, tox: py312 }
- { name: "3.11", python: "3.11", os: ubuntu-latest, tox: py311 }
- { name: "3.10", python: "3.10", os: ubuntu-latest, tox: py310 }
Expand Down
6 changes: 6 additions & 0 deletions locust/argument_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,12 @@ def setup_parser_arguments(parser):
help="Set locust to run in distributed mode with this process as worker",
env_var="LOCUST_MODE_WORKER",
)
worker_group.add_argument(
"--processes",
type=int,
help="Number of times to fork the locust process, to enable using multiple CPU cores. Use -1 to launch one process per CPU core in your system. Combine with --worker flag or let it automatically set --worker and --master flags for an all-in-one-solution. Not available on Windows. Experimental.",
env_var="LOCUST_PROCESSES",
)
worker_group.add_argument(
"--slave",
action="store_true",
Expand Down
81 changes: 79 additions & 2 deletions locust/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from .input_events import input_listener
from .html import get_html_report
from .util.load_locustfile import load_locustfile
import traceback

try:
# import locust_plugins if it is installed, to allow it to register custom arguments etc
Expand All @@ -37,7 +38,6 @@

version = locust.__version__


# Options to ignore when using a custom shape class without `use_common_options=True`
# See: https://docs.locust.io/en/stable/custom-load-shape.html#use-common-options
COMMON_OPTIONS = {
Expand Down Expand Up @@ -162,6 +162,84 @@ def is_valid_percentile(parameter):
sys.stderr.write("Invalid --loglevel. Valid values are: DEBUG/INFO/WARNING/ERROR/CRITICAL\n")
sys.exit(1)

children = []

if options.processes:
if os.name == "nt":
raise Exception("--processes is not supported in Windows (except in WSL)")
if options.processes == -1:
options.processes = os.cpu_count()
assert options.processes, "couldnt detect number of cpus!?"
elif options.processes < -1:
raise Exception(f"invalid processes count {options.processes}")
for _ in range(options.processes):
child_pid = gevent.fork()
if child_pid:
children.append(child_pid)
logging.debug(f"Started child worker with pid #{child_pid}")
else:
# child is always a worker, even when it wasnt set on command line
options.worker = True
# remove options that dont make sense on worker
options.run_time = None
options.autostart = None
break
else:
# we're in the parent process
if options.worker:
# ignore the first sigint in parent, and wait for the children to handle sigint
def sigint_handler(_signal, _frame):
if getattr(sigint_handler, "has_run", False):
# if parent gets repeated sigint, we kill the children hard
for child_pid in children:
try:
logging.debug(f"Sending SIGKILL to child with pid {child_pid}")
os.kill(child_pid, signal.SIGKILL)
except ProcessLookupError:
pass # process already dead
except Exception:
logging.error(traceback.format_exc())
sys.exit(1)
sigint_handler.has_run = True

signal.signal(signal.SIGINT, sigint_handler)
exit_code = 0
# nothing more to do, just wait for the children to exit
for child_pid in children:
_, child_status = os.waitpid(child_pid, 0)
try:
if sys.version_info > (3, 8):
child_exit_code = os.waitstatus_to_exitcode(child_status)
exit_code = max(exit_code, child_exit_code)
except AttributeError:
pass # dammit python 3.8...
sys.exit(exit_code)
else:
options.master = True
options.expect_workers = options.processes

def kill_workers(children):
exit_code = 0
logging.debug("Sending SIGINT to children")
for child_pid in children:
try:
os.kill(child_pid, signal.SIGINT)
except ProcessLookupError:
pass # never mind, process was already dead
logging.debug("waiting for children to terminate")
for child_pid in children:
_, child_status = os.waitpid(child_pid, 0)
try:
if sys.version_info > (3, 8):
child_exit_code = os.waitstatus_to_exitcode(child_status)
exit_code = max(exit_code, child_exit_code)
except AttributeError:
pass # dammit python 3.8...
if exit_code > 1:
logging.error(f"bad response code from worker children: {exit_code}")

atexit.register(kill_workers, children)

logger = logging.getLogger(__name__)
greenlet_exception_handler = greenlet_exception_logger(logger)

Expand Down Expand Up @@ -486,7 +564,6 @@ def shutdown():
print_stats(runner.stats, current=False)
print_percentile_stats(runner.stats)
print_error_report(runner.stats)

environment.events.quit.fire(exit_code=code)
sys.exit(code)

Expand Down
6 changes: 5 additions & 1 deletion locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,11 @@ def _send_stats(self) -> None:
def connect_to_master(self):
self.retry += 1
self.client.send(Message("client_ready", __version__, self.client_id))
success = self.connection_event.wait(timeout=CONNECT_TIMEOUT)
try:
success = self.connection_event.wait(timeout=CONNECT_TIMEOUT)
except KeyboardInterrupt:
# dont complain about getting CTRL-C
sys.exit(1)
if not success:
if self.retry < 3:
logger.debug(
Expand Down
149 changes: 141 additions & 8 deletions locust/test/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys
import platform
import unittest

import socket
import pty
import signal
import subprocess
Expand All @@ -20,6 +20,11 @@
from .util import temporary_file, get_free_tcp_port, patch_env


def is_port_in_use(port: int) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(("localhost", port)) == 0


MOCK_LOCUSTFILE_CONTENT_A = textwrap.dedent(
"""
from locust import User, task, constant, events
Expand Down Expand Up @@ -52,6 +57,13 @@ def tearDown(self):
self.timeout.cancel()
super().tearDown()

def assert_run(self, cmd: list[str], timeout: int = 5) -> subprocess.CompletedProcess[str]:
out = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
self.assertEqual(0, out.returncode, f"locust run failed with exit code {out.returncode}:\n{out.stderr}")
return out


class StandaloneIntegrationTests(ProcessIntegrationTest):
def test_help_arg(self):
output = subprocess.check_output(
["locust", "--help"],
Expand All @@ -65,13 +77,6 @@ def test_help_arg(self):
self.assertIn("Logging options:", output)
self.assertIn("--skip-log-setup Disable Locust's logging setup.", output)

def assert_run(self, cmd: list[str], timeout: int = 5) -> subprocess.CompletedProcess[str]:
out = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
self.assertEqual(0, out.returncode, f"locust run failed with exit code {out.returncode}:\n{out.stderr}")
return out


class StandaloneIntegrationTests(ProcessIntegrationTest):
def test_custom_arguments(self):
port = get_free_tcp_port()
with temporary_file(
Expand Down Expand Up @@ -1322,6 +1327,22 @@ def my_task(self):


class DistributedIntegrationTests(ProcessIntegrationTest):
failed_port_check = False

def setUp(self):
if self.failed_port_check:
# fail immediately
raise Exception("Port 5557 was (still) busy when starting a new test case")
for _ in range(5):
if not is_port_in_use(5557):
break
else:
gevent.sleep(1)
else:
self.failed_port_check = True
raise Exception("Port 5557 was (still) busy when starting a new test case")
super().setUp()

def test_expect_workers(self):
with mock_locustfile() as mocked:
proc = subprocess.Popen(
Expand Down Expand Up @@ -1727,3 +1748,115 @@ def my_task(self):
for i in range(2):
if found[i] != i:
raise Exception(f"expected index {i} but got", found[i])

def test_processes(self):
with mock_locustfile() as mocked:
command = f"locust -f {mocked.file_path} --processes 4 --headless --run-time 1 --exit-code-on-error 0"
proc = subprocess.Popen(
command,
shell=True,
stdout=PIPE,
stderr=PIPE,
text=True,
)
try:
_, stderr = proc.communicate(timeout=9)
except Exception:
proc.kill()
assert False, f"locust process never finished: {command}"
self.assertNotIn("Traceback", stderr)
self.assertIn("(index 3) reported as ready", stderr)
self.assertIn("Shutting down (exit code 0)", stderr)

def test_processes_autodetect(self):
with mock_locustfile() as mocked:
command = f"locust -f {mocked.file_path} --processes -1 --headless --run-time 1 --exit-code-on-error 0"
proc = subprocess.Popen(
command,
shell=True,
stdout=PIPE,
stderr=PIPE,
text=True,
)
try:
_, stderr = proc.communicate(timeout=9)
except Exception:
proc.kill()
assert False, f"locust process never finished: {command}"
self.assertNotIn("Traceback", stderr)
self.assertIn("(index 0) reported as ready", stderr)
self.assertIn("Shutting down (exit code 0)", stderr)

def test_processes_separate_worker(self):
with mock_locustfile() as mocked:
master_proc = subprocess.Popen(
f"locust -f {mocked.file_path} --master --headless --run-time 1 --exit-code-on-error 0 --expect-workers-max-wait 2",
shell=True,
stdout=PIPE,
stderr=PIPE,
text=True,
)

worker_parent_proc = subprocess.Popen(
f"locust -f {mocked.file_path} --processes 4 --worker",
shell=True,
stdout=PIPE,
stderr=PIPE,
text=True,
)

try:
_, worker_stderr = worker_parent_proc.communicate(timeout=9)
except Exception:
master_proc.kill()
worker_parent_proc.kill()
_, worker_stderr = worker_parent_proc.communicate()
_, master_stderr = master_proc.communicate()
assert False, f"worker never finished: {worker_stderr}"

try:
_, master_stderr = master_proc.communicate(timeout=9)
except Exception:
master_proc.kill()
worker_parent_proc.kill()
_, worker_stderr = worker_parent_proc.communicate()
_, master_stderr = master_proc.communicate()
assert False, f"master never finished: {master_stderr}"

_, worker_stderr = worker_parent_proc.communicate()
_, master_stderr = master_proc.communicate()
self.assertNotIn("Traceback", worker_stderr)
self.assertNotIn("Traceback", master_stderr)
self.assertNotIn("Gave up waiting for workers to connect", master_stderr)
self.assertIn("(index 3) reported as ready", master_stderr)
self.assertIn("Shutting down (exit code 0)", master_stderr)

def test_processes_ctrl_c(self):
with mock_locustfile() as mocked:
proc = subprocess.Popen(
[
"locust",
"-f",
mocked.file_path,
"--processes",
"4",
"--headless",
"-L",
"DEBUG",
],
stdout=PIPE,
stderr=PIPE,
text=True,
)
gevent.sleep(3)
proc.send_signal(signal.SIGINT)
try:
_, stderr = proc.communicate(timeout=3)
except Exception:
proc.kill()
_, stderr = proc.communicate()
assert False, f"locust process never finished: {stderr}"
self.assertNotIn("Traceback", stderr)
self.assertIn("(index 3) reported as ready", stderr)
self.assertIn("The last worker quit, stopping test", stderr)
self.assertIn("Shutting down (exit code 0)", stderr)
6 changes: 6 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ envlist =
black
mypy
py{37,38,39,310,311,312}
fail_fast_test_main

[flake8]
extend-exclude = build,examples/issue_*.py,src/readthedocs-sphinx-search/
Expand Down Expand Up @@ -34,6 +35,11 @@ commands =
; bash -ec '! grep . {temp_dir}/err.txt' # should be empty
bash -ec 'PYTHONUNBUFFERED=1 python3 examples/debugging_advanced.py | grep done'

[testenv:fail_fast_test_main]
commands =
python3 -m pip install .
python3 -m unittest -f locust.test.test_main

[testenv:black]
deps = black==23.10.1
commands = black --check .
Expand Down
Loading