From 4042a8505cdac94dba5718b6a89f82d478fef0d6 Mon Sep 17 00:00:00 2001 From: Wang Date: Thu, 3 Feb 2022 07:59:08 +0800 Subject: [PATCH] fix(core): use selectors to poll connections instead of raw select in threading,gevent,eventlet (#656) Co-authored-by: lawrentwang Solve the select limitation on a maximum file handler value and dynamic choose the best poller in the system. --- kazoo/handlers/eventlet.py | 8 ++- kazoo/handlers/gevent.py | 9 ++- kazoo/handlers/threading.py | 87 ++----------------------- kazoo/handlers/utils.py | 89 ++++++++++++++++++++++++++ kazoo/tests/test_eventlet_handler.py | 22 +++++++ kazoo/tests/test_gevent_handler.py | 22 +++++++ kazoo/tests/test_selectors_select.py | 91 +++++++++++++++++++++++++++ kazoo/tests/test_threading_handler.py | 11 +--- requirements.txt | 1 + setup.py | 3 +- 10 files changed, 245 insertions(+), 98 deletions(-) create mode 100644 kazoo/tests/test_selectors_select.py diff --git a/kazoo/handlers/eventlet.py b/kazoo/handlers/eventlet.py index c13f8860..3f67ccdb 100644 --- a/kazoo/handlers/eventlet.py +++ b/kazoo/handlers/eventlet.py @@ -5,15 +5,15 @@ import logging import eventlet -from eventlet.green import select as green_select from eventlet.green import socket as green_socket from eventlet.green import time as green_time from eventlet.green import threading as green_threading +from eventlet.green import selectors as green_selectors from eventlet import queue as green_queue from kazoo.handlers import utils import kazoo.python2atexit as python2atexit - +from kazoo.handlers.utils import selector_select LOG = logging.getLogger(__name__) @@ -41,6 +41,7 @@ class TimeoutError(Exception): class AsyncResult(utils.AsyncResult): """A one-time event that stores a value or an exception""" + def __init__(self, handler): super(AsyncResult, self).__init__(handler, green_threading.Condition, @@ -164,7 +165,8 @@ def create_connection(self, *args, **kwargs): def select(self, *args, **kwargs): with _yield_before_after(): - return green_select.select(*args, **kwargs) + return selector_select(*args, selectors_module=green_selectors, + **kwargs) def async_result(self): return AsyncResult(self) diff --git a/kazoo/handlers/gevent.py b/kazoo/handlers/gevent.py index 96ee765d..7f2948ef 100644 --- a/kazoo/handlers/gevent.py +++ b/kazoo/handlers/gevent.py @@ -9,6 +9,10 @@ import gevent.queue import gevent.select import gevent.thread +import gevent.selectors + +from kazoo.handlers.utils import selector_select + try: from gevent.lock import Semaphore, RLock except ImportError: @@ -17,7 +21,6 @@ from kazoo.handlers import utils from kazoo import python2atexit - _using_libevent = gevent.__version__.startswith('0.') log = logging.getLogger(__name__) @@ -84,6 +87,7 @@ def greenlet_worker(): del func # release before possible idle except self.queue_empty: continue + return gevent.spawn(greenlet_worker) def start(self): @@ -122,7 +126,8 @@ def stop(self): python2atexit.unregister(self.stop) def select(self, *args, **kwargs): - return gevent.select.select(*args, **kwargs) + return selector_select(*args, selectors_module=gevent.selectors, + **kwargs) def socket(self, *args, **kwargs): return utils.create_tcp_socket(socket) diff --git a/kazoo/handlers/threading.py b/kazoo/handlers/threading.py index 21925237..2389f336 100644 --- a/kazoo/handlers/threading.py +++ b/kazoo/handlers/threading.py @@ -12,11 +12,7 @@ """ from __future__ import absolute_import -from collections import defaultdict -import errno -from itertools import chain import logging -import select import socket import threading import time @@ -25,20 +21,18 @@ import kazoo.python2atexit as python2atexit from kazoo.handlers import utils +from kazoo.handlers.utils import selector_select try: import Queue except ImportError: # pragma: nocover import queue as Queue - # sentinel objects _STOP = object() log = logging.getLogger(__name__) -_HAS_EPOLL = hasattr(select, "epoll") - def _to_fileno(obj): if isinstance(obj, six.integer_types): @@ -65,6 +59,7 @@ class KazooTimeoutError(Exception): class AsyncResult(utils.AsyncResult): """A one-time event that stores a value or an exception""" + def __init__(self, handler): super(AsyncResult, self).__init__(handler, threading.Condition, @@ -133,6 +128,7 @@ def _thread_worker(): # pragma: nocover del func # release before possible idle except self.queue_empty: continue + t = self.spawn(_thread_worker) return t @@ -173,82 +169,7 @@ def stop(self): python2atexit.unregister(self.stop) def select(self, *args, **kwargs): - # if we have epoll, and select is not expected to work - # use an epoll-based "select". Otherwise don't touch - # anything to minimize changes - if _HAS_EPOLL: - # if the highest fd we've seen is > 1023 - if max(map(_to_fileno, chain.from_iterable(args[:3]))) > 1023: - return self._epoll_select(*args, **kwargs) - return self._select(*args, **kwargs) - - def _select(self, *args, **kwargs): - timeout = kwargs.pop('timeout', None) - # either the time to give up, or None - end = (time.time() + timeout) if timeout else None - while end is None or time.time() < end: - if end is not None: - # make a list, since tuples aren't mutable - args = list(args) - - # set the timeout to the remaining time - args[3] = end - time.time() - try: - return select.select(*args, **kwargs) - except select.error as ex: - # if the system call was interrupted, we'll retry until timeout - # in Python 3, system call interruptions are a native exception - # in Python 2, they are not - errnum = ex.errno if isinstance(ex, OSError) else ex[0] - if errnum == errno.EINTR: - continue - raise - # if we hit our timeout, lets return as a timeout - return ([], [], []) - - def _epoll_select(self, rlist, wlist, xlist, timeout=None): - """epoll-based drop-in replacement for select to overcome select - limitation on a maximum filehandle value - """ - if timeout is None: - timeout = -1 - eventmasks = defaultdict(int) - rfd2obj = defaultdict(list) - wfd2obj = defaultdict(list) - xfd2obj = defaultdict(list) - read_evmask = select.EPOLLIN | select.EPOLLPRI # Just in case - - def store_evmasks(obj_list, evmask, fd2obj): - for obj in obj_list: - fileno = _to_fileno(obj) - eventmasks[fileno] |= evmask - fd2obj[fileno].append(obj) - - store_evmasks(rlist, read_evmask, rfd2obj) - store_evmasks(wlist, select.EPOLLOUT, wfd2obj) - store_evmasks(xlist, select.EPOLLERR, xfd2obj) - - poller = select.epoll() - - for fileno in eventmasks: - poller.register(fileno, eventmasks[fileno]) - - try: - events = poller.poll(timeout) - revents = [] - wevents = [] - xevents = [] - for fileno, event in events: - if event & read_evmask: - revents += rfd2obj.get(fileno, []) - if event & select.EPOLLOUT: - wevents += wfd2obj.get(fileno, []) - if event & select.EPOLLERR: - xevents += xfd2obj.get(fileno, []) - finally: - poller.close() - - return revents, wevents, xevents + return selector_select(*args, **kwargs) def socket(self): return utils.create_tcp_socket(socket) diff --git a/kazoo/handlers/utils.py b/kazoo/handlers/utils.py index fa561fe0..9647a246 100644 --- a/kazoo/handlers/utils.py +++ b/kazoo/handlers/utils.py @@ -2,11 +2,21 @@ import errno import functools +import os import select import ssl import socket import time +from collections import defaultdict + +import six + +if six.PY34: + import selectors +else: + import selectors2 as selectors + HAS_FNCTL = True try: import fcntl @@ -19,6 +29,7 @@ class AsyncResult(object): """A one-time event that stores a value or an exception""" + def __init__(self, handler, condition_factory, timeout_factory): self._handler = handler self._exception = _NONE @@ -126,6 +137,7 @@ def _do_callbacks(self): else: functools.partial(callback, self)() + def _set_fd_cloexec(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFD) fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) @@ -272,6 +284,7 @@ def capture_exceptions(async_result): :param async_result: An async result implementing :class:`IAsyncResult` """ + def capture(function): @functools.wraps(function) def captured_function(*args, **kwargs): @@ -279,7 +292,9 @@ def captured_function(*args, **kwargs): return function(*args, **kwargs) except Exception as exc: async_result.set_exception(exc) + return captured_function + return capture @@ -291,6 +306,7 @@ def wrap(async_result): :param async_result: An async result implementing :class:`IAsyncResult` """ + def capture(function): @capture_exceptions(async_result) def captured_function(*args, **kwargs): @@ -298,5 +314,78 @@ def captured_function(*args, **kwargs): if value is not None: async_result.set(value) return value + return captured_function + return capture + + +def fileobj_to_fd(fileobj): + """Return a file descriptor from a file object. + + Parameters: + fileobj -- file object or file descriptor + + Returns: + corresponding file descriptor + + Raises: + TypeError if the object is invalid + """ + if isinstance(fileobj, int): + fd = fileobj + else: + try: + fd = int(fileobj.fileno()) + except (AttributeError, TypeError, ValueError): + raise TypeError("Invalid file object: " + "{!r}".format(fileobj)) + if fd < 0: + raise TypeError("Invalid file descriptor: {}".format(fd)) + os.fstat(fd) + return fd + + +def selector_select(rlist, wlist, xlist, timeout=None, + selectors_module=selectors): + """Selector-based drop-in replacement for select to overcome select + limitation on a maximum filehandle value. + + Need backport selectors2 package in python 2. + """ + if timeout is not None: + if not (isinstance(timeout, six.integer_types) or isinstance( + timeout, float)): + raise TypeError('timeout must be a number') + if timeout < 0: + raise ValueError('timeout must be non-negative') + + events_mapping = {selectors_module.EVENT_READ: rlist, + selectors_module.EVENT_WRITE: wlist} + fd_events = defaultdict(int) + fd_fileobjs = defaultdict(list) + + for event, fileobjs in events_mapping.items(): + for fileobj in fileobjs: + fd = fileobj_to_fd(fileobj) + fd_events[fd] |= event + fd_fileobjs[fd].append(fileobj) + + selector = selectors_module.DefaultSelector() + for fd, events in fd_events.items(): + selector.register(fd, events) + + revents, wevents, xevents = [], [], [] + try: + ready = selector.select(timeout) + finally: + selector.close() + + for info in ready: + k, events = info + if events & selectors.EVENT_READ: + revents.extend(fd_fileobjs[k.fd]) + elif events & selectors.EVENT_WRITE: + wevents.extend(fd_fileobjs[k.fd]) + + return revents, wevents, xevents diff --git a/kazoo/tests/test_eventlet_handler.py b/kazoo/tests/test_eventlet_handler.py index 69af4001..43ec4f92 100644 --- a/kazoo/tests/test_eventlet_handler.py +++ b/kazoo/tests/test_eventlet_handler.py @@ -130,6 +130,28 @@ def broken(): with pytest.raises(IOError): r.get() + def test_huge_file_descriptor(self): + import resource + from eventlet.green import socket + from kazoo.handlers.utils import create_tcp_socket + + try: + resource.setrlimit(resource.RLIMIT_NOFILE, (4096, 4096)) + except (ValueError, resource.error): + self.skipTest('couldnt raise fd limit high enough') + fd = 0 + socks = [] + while fd < 4000: + sock = create_tcp_socket(socket) + fd = sock.fileno() + socks.append(sock) + with start_stop_one() as h: + h.start() + h.select(socks, [], [], 0) + h.stop() + for sock in socks: + sock.close() + class TestEventletClient(test_client.TestClient): def setUp(self): diff --git a/kazoo/tests/test_gevent_handler.py b/kazoo/tests/test_gevent_handler.py index 9bf10299..5515114a 100644 --- a/kazoo/tests/test_gevent_handler.py +++ b/kazoo/tests/test_gevent_handler.py @@ -142,6 +142,28 @@ def changed(d, stat): ev.wait() client.stop() + def test_huge_file_descriptor(self): + import resource + from gevent import socket + from kazoo.handlers.utils import create_tcp_socket + + try: + resource.setrlimit(resource.RLIMIT_NOFILE, (4096, 4096)) + except (ValueError, resource.error): + self.skipTest('couldnt raise fd limit high enough') + fd = 0 + socks = [] + while fd < 4000: + sock = create_tcp_socket(socket) + fd = sock.fileno() + socks.append(sock) + h = self._makeOne() + h.start() + h.select(socks, [], [], 0) + h.stop() + for sock in socks: + sock.close() + class TestGeventClient(test_client.TestClient): def setUp(self): diff --git a/kazoo/tests/test_selectors_select.py b/kazoo/tests/test_selectors_select.py new file mode 100644 index 00000000..b5cdd336 --- /dev/null +++ b/kazoo/tests/test_selectors_select.py @@ -0,0 +1,91 @@ +""" +The official python select function test case copied from python source + to test the selector_select function. +""" + +import errno +import os +import socket +import sys +import unittest +from test import support +from kazoo.handlers.utils import selector_select + +select = selector_select + + +@unittest.skipIf((sys.platform[:3] == 'win'), + "can't easily test on this system") +class SelectTestCase(unittest.TestCase): + class Nope: + pass + + class Almost: + def fileno(self): + return 'fileno' + + def test_error_conditions(self): + self.assertRaises(TypeError, select, 1, 2, 3) + self.assertRaises(TypeError, select, [self.Nope()], [], []) + self.assertRaises(TypeError, select, [self.Almost()], [], []) + self.assertRaises(TypeError, select, [], [], [], "not a number") + self.assertRaises(ValueError, select, [], [], [], -1) + + # Issue #12367: http://www.freebsd.org/cgi/query-pr.cgi?pr=kern/155606 + @unittest.skipIf(sys.platform.startswith('freebsd'), + 'skip because of a FreeBSD bug: kern/155606') + def test_errno(self): + with open(__file__, 'rb') as fp: + fd = fp.fileno() + fp.close() + try: + select([fd], [], [], 0) + except OSError as err: + self.assertEqual(err.errno, errno.EBADF) + else: + self.fail("exception not raised") + + def test_returned_list_identity(self): + # See issue #8329 + r, w, x = select([], [], [], 1) + self.assertIsNot(r, w) + self.assertIsNot(r, x) + self.assertIsNot(w, x) + + def test_select(self): + cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done' + p = os.popen(cmd, 'r') + for tout in (0, 1, 2, 4, 8, 16) + (None,) * 10: + if support.verbose: + print('timeout =', tout) + rfd, wfd, xfd = select([p], [], [], tout) + if (rfd, wfd, xfd) == ([], [], []): + continue + if (rfd, wfd, xfd) == ([p], [], []): + line = p.readline() + if support.verbose: + print(repr(line)) + if not line: + if support.verbose: + print('EOF') + break + continue + self.fail('Unexpected return values from select():', rfd, wfd, xfd) + p.close() + + # Issue 16230: Crash on select resized list + def test_select_mutated(self): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + a = [] + + class F: + def fileno(self): + del a[-1] + return s.fileno() + + a[:] = [F()] * 10 + self.assertEqual(select([], a, []), ([], a[:5], [])) + + +def tearDownModule(): + support.reap_children() diff --git a/kazoo/tests/test_threading_handler.py b/kazoo/tests/test_threading_handler.py index 43769572..dbdccd75 100644 --- a/kazoo/tests/test_threading_handler.py +++ b/kazoo/tests/test_threading_handler.py @@ -45,10 +45,6 @@ def test_double_start_stop(self): assert h._running is False def test_huge_file_descriptor(self): - from kazoo.handlers.threading import _HAS_EPOLL - - if not _HAS_EPOLL: - self.skipTest('only run on systems with epoll()') import resource import socket from kazoo.handlers.utils import create_tcp_socket @@ -65,11 +61,10 @@ def test_huge_file_descriptor(self): socks.append(sock) h = self._makeOne() h.start() - h.select(socks, [], []) - with pytest.raises(ValueError): - h._select(socks, [], []) - h._epoll_select(socks, [], []) + h.select(socks, [], [], 0) h.stop() + for sock in socks: + sock.close() class TestThreadingAsync(unittest.TestCase): diff --git a/requirements.txt b/requirements.txt index ffe2fce4..83adeb7a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ six +selectors2>=2.0.2; python_version < "3.4.0" \ No newline at end of file diff --git a/setup.py b/setup.py index 15a7a135..c9f9dc2e 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,6 @@ from setuptools import setup, find_packages import sys - here = os.path.abspath(os.path.dirname(__file__)) with open(os.path.join(here, 'README.md')) as f: README = f.read() @@ -16,7 +15,7 @@ PYPY = getattr(sys, 'pypy_version_info', False) and True or False -install_requires = ['six'] +install_requires = ['six', 'selectors2>=2.0.2; python_version < "3.4.0"'] tests_require = install_requires + [ 'mock',