Skip to content

Commit

Permalink
New multiprocess manager (#2183)
Browse files Browse the repository at this point in the history
* New multiprocess manager

* lint it

* Fixed test

* Fixed `Process`.`__init__`

* Fix signal handling in Multiprocess class

* Update coverage fail_under value

* Remove redundant log message

* Update coverage fail_under value

* Update coverage fail_under value

* Update fail_under value in coverage report

* Remove unused threading event

* lint

* more tests

* More tests and fix bug

* lint

* Add pytest.mark.skipif for SIGHUP test on Windows

* delete unused code

* More tests

* Try fix tests in Windows

* make linter feels great

* delete pytest-xdist

* Try fix test in windows

* Try make mypy happy

* Skip tests in windows

* lint

* Try test basic run in Windows

* Try fix error in Windows

* lint

* Skip tests in window

* Try test in window

* lint

* Add import statement and set current working directory in test_multiprocess.py

* lint

* giveup

* Refactor signal handling in Multiprocess class

* Tests in windows

* lint

* lint

* ignore mypy check in linux

* Add __init__.py

* fix warning

* coverage ignore

* Update coverage

* coverage

* Add documents

* Update docs/deployment.md

Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>

* Update uvicorn/supervisors/multiprocess.py

Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>

* Update uvicorn/supervisors/multiprocess.py

Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>

* Update uvicorn/supervisors/multiprocess.py

Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>

* Update uvicorn/supervisors/multiprocess.py

Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>

* Update uvicorn/supervisors/multiprocess.py

Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>

* Update uvicorn/supervisors/multiprocess.py

Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>

* Update uvicorn/supervisors/multiprocess.py

Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>

* Update uvicorn/supervisors/multiprocess.py

Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>

* Do not output the PID information repeatedly.

* Fix occasional abnormal exits.

* Update docs

* Change subprocess termination logic in Multiprocess class

* Update new_console_in_windows function to include pragma statement

* Revert coverage to 98.35

* chore: Remove pragma statements from test_multiprocess functions

* chore: Exclude test_multiprocess.py from coverage on Windows

---------

Co-authored-by: Marcelo Trylesinski <marcelotryle@gmail.com>
  • Loading branch information
abersheeran and Kludex committed May 28, 2024
1 parent 22873a9 commit 53fa273
Show file tree
Hide file tree
Showing 5 changed files with 351 additions and 36 deletions.
18 changes: 17 additions & 1 deletion docs/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,23 @@ Running Uvicorn using a process manager ensures that you can run multiple proces

A process manager will handle the socket setup, start-up multiple server processes, monitor process aliveness, and listen for signals to provide for processes restarts, shutdowns, or dialing up and down the number of running processes.

Uvicorn provides a lightweight way to run multiple worker processes, for example `--workers 4`, but does not provide any process monitoring.
### Built-in

Uvicorn includes a `--workers` option that allows you to run multiple worker processes.

```bash
$ uvicorn main:app --workers 4
```

Unlike gunicorn, uvicorn does not use pre-fork, but uses [`spawn`](https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods), which allows uvicorn's multiprocess manager to still work well on Windows.

The default process manager monitors the status of child processes and automatically restarts child processes that die unexpectedly. Not only that, it will also monitor the status of the child process through the pipeline. When the child process is accidentally stuck, the corresponding child process will be killed through an unstoppable system signal or interface.

You can also manage child processes by sending specific signals to the main process. (Not supported on Windows.)

- `SIGHUP`: Work processeses are graceful restarted one after another. If you update the code, the new worker process will use the new code.
- `SIGTTIN`: Increase the number of worker processes by one.
- `SIGTTOU`: Decrease the number of worker processes by one.

### Gunicorn

Expand Down
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,11 @@ exclude_lines = [
]

[tool.coverage.coverage_conditional_plugin.omit]
"sys_platform == 'win32'" = ["uvicorn/loops/uvloop.py"]
"sys_platform == 'win32'" = [
"uvicorn/loops/uvloop.py",
"uvicorn/supervisors/multiprocess.py",
"tests/supervisors/test_multiprocess.py",
]
"sys_platform != 'win32'" = ["uvicorn/loops/asyncio.py"]

[tool.coverage.coverage_conditional_plugin.rules]
Expand Down
Empty file added tests/supervisors/__init__.py
Empty file.
148 changes: 145 additions & 3 deletions tests/supervisors/test_multiprocess.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,66 @@
from __future__ import annotations

import functools
import os
import signal
import socket
import threading
import time
from typing import Any, Callable

import pytest

from uvicorn import Config
from uvicorn._types import ASGIReceiveCallable, ASGISendCallable, Scope
from uvicorn.supervisors import Multiprocess
from uvicorn.supervisors.multiprocess import Process


def new_console_in_windows(test_function: Callable[[], Any]) -> Callable[[], Any]: # pragma: no cover
if os.name != "nt":
return test_function

@functools.wraps(test_function)
def new_function():
import subprocess
import sys

module = test_function.__module__
name = test_function.__name__

subprocess.check_call(
[
sys.executable,
"-c",
f"from {module} import {name}; {name}.__wrapped__()",
],
creationflags=subprocess.CREATE_NO_WINDOW, # type: ignore[attr-defined]
)

return new_function


async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable) -> None:
pass # pragma: no cover


def run(sockets: list[socket.socket] | None) -> None:
pass # pragma: no cover
while True:
time.sleep(1)


def test_process_ping_pong() -> None:
process = Process(Config(app=app), target=lambda x: None, sockets=[])
threading.Thread(target=process.always_pong, daemon=True).start()
assert process.ping()


def test_process_ping_pong_timeout() -> None:
process = Process(Config(app=app), target=lambda x: None, sockets=[])
assert not process.ping(0.1)


@new_console_in_windows
def test_multiprocess_run() -> None:
"""
A basic sanity check.
Expand All @@ -25,5 +70,102 @@ def test_multiprocess_run() -> None:
"""
config = Config(app=app, workers=2)
supervisor = Multiprocess(config, target=run, sockets=[])
supervisor.signal_handler(sig=signal.SIGINT, frame=None)
supervisor.run()
threading.Thread(target=supervisor.run, daemon=True).start()
supervisor.signal_queue.append(signal.SIGINT)
supervisor.join_all()


@new_console_in_windows
def test_multiprocess_health_check() -> None:
"""
Ensure that the health check works as expected.
"""
config = Config(app=app, workers=2)
supervisor = Multiprocess(config, target=run, sockets=[])
threading.Thread(target=supervisor.run, daemon=True).start()
time.sleep(1)
process = supervisor.processes[0]
process.kill()
assert not process.is_alive()
time.sleep(1)
for p in supervisor.processes:
assert p.is_alive()
supervisor.signal_queue.append(signal.SIGINT)
supervisor.join_all()


@new_console_in_windows
def test_multiprocess_sigterm() -> None:
"""
Ensure that the SIGTERM signal is handled as expected.
"""
config = Config(app=app, workers=2)
supervisor = Multiprocess(config, target=run, sockets=[])
threading.Thread(target=supervisor.run, daemon=True).start()
time.sleep(1)
supervisor.signal_queue.append(signal.SIGTERM)
supervisor.join_all()


@pytest.mark.skipif(not hasattr(signal, "SIGBREAK"), reason="platform unsupports SIGBREAK")
@new_console_in_windows
def test_multiprocess_sigbreak() -> None: # pragma: py-not-win32
"""
Ensure that the SIGBREAK signal is handled as expected.
"""
config = Config(app=app, workers=2)
supervisor = Multiprocess(config, target=run, sockets=[])
threading.Thread(target=supervisor.run, daemon=True).start()
time.sleep(1)
supervisor.signal_queue.append(getattr(signal, "SIGBREAK"))
supervisor.join_all()


@pytest.mark.skipif(not hasattr(signal, "SIGHUP"), reason="platform unsupports SIGHUP")
def test_multiprocess_sighup() -> None:
"""
Ensure that the SIGHUP signal is handled as expected.
"""
config = Config(app=app, workers=2)
supervisor = Multiprocess(config, target=run, sockets=[])
threading.Thread(target=supervisor.run, daemon=True).start()
time.sleep(1)
pids = [p.pid for p in supervisor.processes]
supervisor.signal_queue.append(signal.SIGHUP)
time.sleep(1)
assert pids != [p.pid for p in supervisor.processes]
supervisor.signal_queue.append(signal.SIGINT)
supervisor.join_all()


@pytest.mark.skipif(not hasattr(signal, "SIGTTIN"), reason="platform unsupports SIGTTIN")
def test_multiprocess_sigttin() -> None:
"""
Ensure that the SIGTTIN signal is handled as expected.
"""
config = Config(app=app, workers=2)
supervisor = Multiprocess(config, target=run, sockets=[])
threading.Thread(target=supervisor.run, daemon=True).start()
supervisor.signal_queue.append(signal.SIGTTIN)
time.sleep(1)
assert len(supervisor.processes) == 3
supervisor.signal_queue.append(signal.SIGINT)
supervisor.join_all()


@pytest.mark.skipif(not hasattr(signal, "SIGTTOU"), reason="platform unsupports SIGTTOU")
def test_multiprocess_sigttou() -> None:
"""
Ensure that the SIGTTOU signal is handled as expected.
"""
config = Config(app=app, workers=2)
supervisor = Multiprocess(config, target=run, sockets=[])
threading.Thread(target=supervisor.run, daemon=True).start()
supervisor.signal_queue.append(signal.SIGTTOU)
time.sleep(1)
assert len(supervisor.processes) == 1
supervisor.signal_queue.append(signal.SIGTTOU)
time.sleep(1)
assert len(supervisor.processes) == 1
supervisor.signal_queue.append(signal.SIGINT)
supervisor.join_all()
Loading

0 comments on commit 53fa273

Please sign in to comment.