Skip to content

Commit

Permalink
Optimize (#780)
Browse files Browse the repository at this point in the history
* Optimize `find_http_line` which is in critical path

* Update benchmark results

* Keep the loop hot, TCP no delay, cleanup inactive check periodically

* Check for shutdown signal with tick

* Use non-reentrant `NonBlockingQueue` implementation instead of `queue.Queue`

* Fix listener test

* lint and doc
  • Loading branch information
abhinavsingh authored Nov 24, 2021
1 parent 833eeba commit 99fc17b
Show file tree
Hide file tree
Showing 14 changed files with 173 additions and 115 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ lib-profile:
--plugin proxy.plugin.WebServerPlugin \
--local-executor \
--backlog 65536 \
--open-file-limit 65536
--open-file-limit 65536 \
--log-file /dev/null

devtools:
Expand Down
56 changes: 28 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,47 +128,47 @@
```console
# On Macbook Pro 2019 / 2.4 GHz 8-Core Intel Core i9 / 32 GB RAM
❯ ./helper/benchmark.sh
CONCURRENCY: 100 workers, TOTAL REQUESTS: 100000 req, QPS: 5000 req/sec, TIMEOUT: 1 sec
CONCURRENCY: 100 workers, TOTAL REQUESTS: 100000 req, QPS: 8000 req/sec, TIMEOUT: 1 sec

Summary:
Total: 3.1560 secs
Slowest: 0.0375 secs
Fastest: 0.0006 secs
Average: 0.0031 secs
Requests/sec: 31685.9140
Total: 3.1217 secs
Slowest: 0.0499 secs
Fastest: 0.0004 secs
Average: 0.0030 secs
Requests/sec: 32033.7261

Total data: 1900000 bytes
Size/request: 19 bytes

Response time histogram:
0.001 [1] |
0.004 [91680] |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
0.008 [7929] |■■■
0.012 [263] |
0.015 [29] |
0.019 [8] |
0.023 [23] |
0.026 [15] |
0.030 [27] |
0.034 [16] |
0.037 [9] |
0.000 [1] |
0.005 [92268] |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
0.010 [7264] |■■■
0.015 [318] |
0.020 [102] |
0.025 [32] |
0.030 [6] |
0.035 [4] |
0.040 [1] |
0.045 [2] |
0.050 [2] |


Latency distribution:
10% in 0.0022 secs
25% in 0.0025 secs
50% in 0.0029 secs
75% in 0.0034 secs
90% in 0.0041 secs
95% in 0.0048 secs
99% in 0.0066 secs
10% in 0.0017 secs
25% in 0.0020 secs
50% in 0.0025 secs
75% in 0.0036 secs
90% in 0.0050 secs
95% in 0.0060 secs
99% in 0.0087 secs

Details (average, fastest, slowest):
DNS+dialup: 0.0000 secs, 0.0006 secs, 0.0375 secs
DNS+dialup: 0.0000 secs, 0.0004 secs, 0.0499 secs
DNS-lookup: 0.0000 secs, 0.0000 secs, 0.0000 secs
req write: 0.0000 secs, 0.0000 secs, 0.0046 secs
resp wait: 0.0030 secs, 0.0006 secs, 0.0320 secs
resp read: 0.0000 secs, 0.0000 secs, 0.0029 secs
req write: 0.0000 secs, 0.0000 secs, 0.0020 secs
resp wait: 0.0030 secs, 0.0004 secs, 0.0462 secs
resp read: 0.0000 secs, 0.0000 secs, 0.0027 secs

Status code distribution:
[200] 100000 responses
Expand Down
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@
(_py_class_role, 'HttpWebServerBasePlugin'),
(_py_class_role, 'multiprocessing.context.Process'),
(_py_class_role, 'multiprocessing.synchronize.Lock'),
(_py_class_role, 'NonBlockingQueue'),
(_py_class_role, 'paramiko.channel.Channel'),
(_py_class_role, 'proxy.http.parser.parser.T'),
(_py_class_role, 'proxy.plugin.cache.store.base.CacheStore'),
Expand Down
11 changes: 1 addition & 10 deletions helper/benchmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ if [ $(basename $PWD) != "proxy.py" ]; then
fi

TIMEOUT=1
QPS=20000
QPS=8000
CONCURRENCY=100
TOTAL_REQUESTS=100000
OPEN_FILE_LIMIT=65536
Expand All @@ -41,15 +41,6 @@ PID_FILE=/tmp/proxy.pid

ulimit -n $OPEN_FILE_LIMIT

# time python -m \
# proxy \
# --enable-web-server \
# --plugin proxy.plugin.WebServerPlugin \
# --backlog $BACKLOG \
# --open-file-limit $OPEN_FILE_LIMIT \
# --pid-file $PID_FILE \
# --log-file /dev/null

PID=$(cat $PID_FILE)
if [[ -z "$PID" ]]; then
echo "Either pid file doesn't exist or no pid found in the pid file"
Expand Down
38 changes: 37 additions & 1 deletion proxy/common/backports.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
:license: BSD, see LICENSE for more details.
"""
import time
import threading

from typing import Any
from typing import Any, Deque
from queue import Empty
from collections import deque


class cached_property:
Expand Down Expand Up @@ -80,3 +83,36 @@ def __get__(self, inst: Any, owner: Any) -> Any:
finally:
cache[self.__name__] = (value, now)
return value


class NonBlockingQueue:
'''Simple, unbounded, non-blocking FIFO queue.
Supports only a single consumer.
NOTE: This is available in Python since 3.7 as SimpleQueue.
Here because proxy.py still supports 3.6
'''

def __init__(self) -> None:
self._queue: Deque[Any] = deque()
self._count: threading.Semaphore = threading.Semaphore(0)

def put(self, item: Any) -> None:
'''Put the item on the queue.'''
self._queue.append(item)
self._count.release()

def get(self) -> Any:
'''Remove and return an item from the queue.'''
if not self._count.acquire(False, None):
raise Empty
return self._queue.popleft()

def empty(self) -> bool:
'''Return True if the queue is empty, False otherwise (not reliable!).'''
return len(self._queue) == 0

def qsize(self) -> int:
'''Return the approximate size of the queue (not reliable!).'''
return len(self._queue)
5 changes: 4 additions & 1 deletion proxy/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,10 @@ def _env_threadless_compliant() -> bool:
DEFAULT_MAX_SEND_SIZE = 16 * 1024
DEFAULT_WORK_KLASS = 'proxy.http.HttpProtocolHandler'
DEFAULT_ENABLE_PROXY_PROTOCOL = False
DEFAULT_SELECTOR_SELECT_TIMEOUT = 0.1
# 25 milliseconds to keep the loops hot
# Will consume ~0.3-0.6% CPU when idle.
DEFAULT_SELECTOR_SELECT_TIMEOUT = 25 / 1000
DEFAULT_INACTIVE_CONN_CLEANUP_TIMEOUT = 1 # in seconds

DEFAULT_DEVTOOLS_DOC_URL = 'http://proxy'
DEFAULT_DEVTOOLS_FRAME_ID = secrets.token_hex(8)
Expand Down
8 changes: 4 additions & 4 deletions proxy/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,10 @@ def find_http_line(raw: bytes) -> Tuple[Optional[bytes], bytes]:
"""Find and returns first line ending in CRLF along with following buffer.
If no ending CRLF is found, line is None."""
parts = raw.split(CRLF)
if len(parts) == 1:
return None, raw
return parts[0], CRLF.join(parts[1:])
parts = raw.split(CRLF, 1)
return (None, raw) \
if len(parts) == 1 \
else (parts[0], parts[1])


def wrap_socket(
Expand Down
9 changes: 4 additions & 5 deletions proxy/core/acceptor/acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
acceptor
pre
"""
import queue
import socket
import logging
import argparse
Expand All @@ -26,11 +25,11 @@
from multiprocessing.reduction import recv_handle

from typing import List, Optional, Tuple
from typing import Any # noqa: W0611 pylint: disable=unused-import

from ...common.flag import flags
from ...common.utils import is_threadless
from ...common.logger import Logger
from ...common.backports import NonBlockingQueue
from ...common.constants import DEFAULT_LOCAL_EXECUTOR

from ..event import EventQueue
Expand Down Expand Up @@ -103,7 +102,7 @@ def __init__(
self.sock: Optional[socket.socket] = None
# Internals
self._total: Optional[int] = None
self._local_work_queue: Optional['queue.Queue[Any]'] = None
self._local_work_queue: Optional['NonBlockingQueue'] = None
self._local: Optional[LocalExecutor] = None
self._lthread: Optional[threading.Thread] = None

Expand All @@ -118,7 +117,7 @@ def accept(self, events: List[Tuple[selectors.SelectorKey, int]]) -> None:
work = (conn, addr or None)
if self.flags.local_executor:
assert self._local_work_queue
self._local_work_queue.put_nowait(work)
self._local_work_queue.put(work)
else:
self._work(*work)

Expand Down Expand Up @@ -171,7 +170,7 @@ def run(self) -> None:

def _start_local(self) -> None:
assert self.sock
self._local_work_queue = queue.Queue()
self._local_work_queue = NonBlockingQueue()
self._local = LocalExecutor(
work_queue=self._local_work_queue,
flags=self.flags,
Expand Down
2 changes: 2 additions & 0 deletions proxy/core/acceptor/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,15 @@ def shutdown(self) -> None:
def _listen_unix_socket(self) -> None:
self._socket = socket.socket(self.flags.family, socket.SOCK_STREAM)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self._socket.bind(self.flags.unix_socket_path)
self._socket.listen(self.flags.backlog)
self._socket.setblocking(False)

def _listen_server_port(self) -> None:
self._socket = socket.socket(self.flags.family, socket.SOCK_STREAM)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self._socket.bind((str(self.flags.hostname), self.flags.port))
self._socket.listen(self.flags.backlog)
self._socket.setblocking(False)
Expand Down
8 changes: 5 additions & 3 deletions proxy/core/acceptor/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
import contextlib

from typing import Optional
from typing import Any # noqa: W0611 pylint: disable=unused-import
from typing import Any

from ...common.backports import NonBlockingQueue # noqa: W0611, F401 pylint: disable=unused-import

from .threadless import Threadless

logger = logging.getLogger(__name__)


class LocalExecutor(Threadless['queue.Queue[Any]']):
class LocalExecutor(Threadless['NonBlockingQueue']):
"""A threadless executor implementation which uses a queue to receive new work."""

def __init__(self, *args: Any, **kwargs: Any) -> None:
Expand All @@ -44,7 +46,7 @@ def work_queue_fileno(self) -> Optional[int]:

def receive_from_work_queue(self) -> bool:
with contextlib.suppress(queue.Empty):
work = self.work_queue.get(block=False)
work = self.work_queue.get()
if isinstance(work, bool) and work is False:
return True
assert isinstance(work, tuple)
Expand Down
44 changes: 26 additions & 18 deletions proxy/core/acceptor/threadless.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from ...common.logger import Logger
from ...common.types import Readables, Writables
from ...common.constants import DEFAULT_SELECTOR_SELECT_TIMEOUT
from ...common.constants import DEFAULT_INACTIVE_CONN_CLEANUP_TIMEOUT, DEFAULT_SELECTOR_SELECT_TIMEOUT

from ..connection import TcpClientConnection
from ..event import eventNames, EventQueue
Expand Down Expand Up @@ -86,6 +86,7 @@ def __init__(
Dict[int, int],
] = {}
self.wait_timeout: float = DEFAULT_SELECTOR_SELECT_TIMEOUT
self.cleanup_inactive_timeout: float = DEFAULT_INACTIVE_CONN_CLEANUP_TIMEOUT

@property
@abstractmethod
Expand Down Expand Up @@ -216,12 +217,9 @@ async def _selected_events(self) -> Tuple[
work_by_ids[key.data][1].append(key.fileobj)
return (work_by_ids, new_work_available)

async def _wait_for_tasks(
self,
pending: Set['asyncio.Task[bool]'],
) -> None:
async def _wait_for_tasks(self) -> None:
finished, self.unfinished = await asyncio.wait(
pending,
self.unfinished,
timeout=self.wait_timeout,
return_when=asyncio.FIRST_COMPLETED,
)
Expand All @@ -236,10 +234,6 @@ def _fromfd(self, fileno: int) -> socket.socket:
type=socket.SOCK_STREAM,
)

# TODO: Use cached property to avoid execution repeatedly
# within a second interval. Note that our selector timeout
# is 0.1 second which can unnecessarily result in cleanup
# checks within a second boundary.
def _cleanup_inactive(self) -> None:
inactive_works: List[int] = []
for work_id in self.works:
Expand Down Expand Up @@ -294,21 +288,35 @@ async def _run_once(self) -> bool:
if teardown:
return teardown
if len(work_by_ids) == 0:
self._cleanup_inactive()
return False
# Invoke Threadless.handle_events
self.unfinished.update(self._create_tasks(work_by_ids))
# logger.debug('Executing {0} works'.format(len(self.unfinished)))
await self._wait_for_tasks(self.unfinished)
await self._wait_for_tasks()
# logger.debug(
# 'Done executing works, {0} pending, {1} registered'.format(
# len(self.unfinished), len(self.registered_events_by_work_ids),
# ),
# )
# Remove and shutdown inactive workers
self._cleanup_inactive()
return False

async def _run_forever(self) -> None:
tick = 0
try:
while True:
if await self._run_once():
break
# Check for inactive and shutdown signal only second
if (tick * DEFAULT_SELECTOR_SELECT_TIMEOUT) > self.cleanup_inactive_timeout:
self._cleanup_inactive()
if self.running.is_set():
break
tick = 0
tick += 1
finally:
if self.loop:
self.loop.stop()

def run(self) -> None:
Logger.setup(
self.flags.log_file, self.flags.log_level,
Expand All @@ -324,10 +332,9 @@ def run(self) -> None:
data=wqfileno,
)
assert self.loop
while not self.running.is_set():
# logger.debug('Working on {0} works'.format(len(self.works)))
if self.loop.run_until_complete(self._run_once()):
break
# logger.debug('Working on {0} works'.format(len(self.works)))
self.loop.create_task(self._run_forever())
self.loop.run_forever()
except KeyboardInterrupt:
pass
finally:
Expand All @@ -336,4 +343,5 @@ def run(self) -> None:
self.selector.unregister(wqfileno)
self.close_work_queue()
assert self.loop is not None
self.loop.run_until_complete(self.loop.shutdown_asyncgens())
self.loop.close()
Loading

0 comments on commit 99fc17b

Please sign in to comment.