From 0daad6c5078e373b61cb9d0fc070461a88f3bf88 Mon Sep 17 00:00:00 2001 From: Gao Date: Fri, 9 Sep 2022 21:37:34 +0800 Subject: [PATCH] Revert "Solve Kombu filesystem transport not thread safe (#1593)" This reverts commit 8699920e050727d385a6d5a19c939e55a86688d6. --- kombu/transport/filesystem.py | 87 +++++++-------------- t/unit/transport/test_filesystem.py | 112 +--------------------------- 2 files changed, 29 insertions(+), 170 deletions(-) diff --git a/kombu/transport/filesystem.py b/kombu/transport/filesystem.py index eb07bba6e..92d8c4e43 100644 --- a/kombu/transport/filesystem.py +++ b/kombu/transport/filesystem.py @@ -93,7 +93,6 @@ def callback(body, message): import os import shutil -import signal import tempfile import uuid from collections import namedtuple @@ -112,26 +111,6 @@ def callback(body, message): VERSION = (1, 0, 0) __version__ = '.'.join(map(str, VERSION)) - -@contextmanager -def timeout_manager(seconds: int): - def timeout_handler(signum, frame): - # Now that flock retries automatically when interrupted, we need - # an exception to stop it - # This exception will propagate on the main thread, - # make sure you're calling flock there - raise InterruptedError - - original_handler = signal.signal(signal.SIGALRM, timeout_handler) - - try: - signal.alarm(seconds) - yield - finally: - signal.alarm(0) - signal.signal(signal.SIGALRM, original_handler) - - # needs win32all to work on Windows if os.name == 'nt': @@ -159,7 +138,7 @@ def unlock(file): elif os.name == 'posix': import fcntl - from fcntl import LOCK_EX, LOCK_SH + from fcntl import LOCK_EX, LOCK_NB, LOCK_SH # noqa def lock(file, flags): """Create file lock.""" @@ -175,21 +154,6 @@ def unlock(file): 'Filesystem plugin only defined for NT and POSIX platforms') -@contextmanager -def lock_with_timeout(file, flags, timeout: int = 1): - with timeout_manager(timeout): - try: - lock(file, flags) - yield - except InterruptedError: - # Catch the exception raised by the handler - # If we weren't raising an exception, - # flock would automatically retry on signals - raise BlockingIOError("Lock timed out") - finally: - unlock(file) - - exchange_queue_t = namedtuple("exchange_queue_t", ["routing_key", "pattern", "queue"]) @@ -204,14 +168,18 @@ def _get_exchange_file_obj(self, exchange, mode="rb"): file = self.control_folder / f"{exchange}.exchange" if "w" in mode: self.control_folder.mkdir(exist_ok=True) - lock_mode = LOCK_EX if "w" in mode else LOCK_SH + f_obj = file.open(mode) - with file.open(mode) as f_obj: - try: - with lock_with_timeout(f_obj, lock_mode): - yield f_obj - except OSError as err: - raise ChannelError(f"Cannot open {file}") from err + try: + if "w" in mode: + lock(f_obj, LOCK_EX) + yield f_obj + except OSError: + raise ChannelError(f"Cannot open {file}") + finally: + if "w" in mode: + unlock(f_obj) + f_obj.close() def get_table(self, exchange): try: @@ -241,12 +209,15 @@ def _put(self, queue, payload, **kwargs): filename = os.path.join(self.data_folder_out, filename) try: - with open(filename, 'wb') as f: - with lock_with_timeout(f, LOCK_EX): - f.write(str_to_bytes(dumps(payload))) - except OSError as err: + f = open(filename, 'wb') + lock(f, LOCK_EX) + f.write(str_to_bytes(dumps(payload))) + except OSError: raise ChannelError( - f'Cannot add file {filename!r} to directory') from err + f'Cannot add file {filename!r} to directory') + finally: + unlock(f) + f.close() def _get(self, queue): """Get next message from `queue`.""" @@ -274,14 +245,14 @@ def _get(self, queue): filename = os.path.join(processed_folder, filename) try: - with open(filename, 'rb') as f: - with lock_with_timeout(f, LOCK_SH): - payload = f.read() - if not self.store_processed: - os.remove(filename) - except OSError as err: + f = open(filename, 'rb') + payload = f.read() + f.close() + if not self.store_processed: + os.remove(filename) + except OSError: raise ChannelError( - f'Cannot read file {filename!r} from queue.') from err + f'Cannot read file {filename!r} from queue.') return loads(bytes_to_str(payload)) @@ -301,9 +272,7 @@ def _purge(self, queue): continue filename = os.path.join(self.data_folder_in, filename) - with open(filename, 'wb') as f: - with lock_with_timeout(f, LOCK_EX): - os.remove(filename) + os.remove(filename) count += 1 diff --git a/t/unit/transport/test_filesystem.py b/t/unit/transport/test_filesystem.py index c452e7ca1..b22e3b8de 100644 --- a/t/unit/transport/test_filesystem.py +++ b/t/unit/transport/test_filesystem.py @@ -1,19 +1,17 @@ from __future__ import annotations import tempfile -from fcntl import LOCK_EX, LOCK_NB, LOCK_SH from queue import Empty -from unittest.mock import call, patch import pytest import t.skip from kombu import Connection, Consumer, Exchange, Producer, Queue -from kombu.transport.filesystem import lock, unlock @t.skip.if_win32 class test_FilesystemTransport: + def setup(self): self.channels = set() try: @@ -147,7 +145,6 @@ def callback2(message_data, message): @t.skip.if_win32 class test_FilesystemFanout: - def setup(self): try: data_folder_in = tempfile.mkdtemp() @@ -237,110 +234,3 @@ def callback2(message_data, message): assert self.q2(self.consume_channel).get() self.q2(self.consume_channel).purge() assert self.q2(self.consume_channel).get() is None - - -@t.skip.if_win32 -class test_FilesystemLock: - def test_lock(self): - file_obj1 = tempfile.NamedTemporaryFile() - with open(file_obj1.name) as file_obj2: - lock(file_obj1, LOCK_SH) - with pytest.raises(BlockingIOError): - lock(file_obj2, LOCK_EX | LOCK_NB) - - lock(file_obj2, LOCK_SH) - unlock(file_obj2) - - unlock(file_obj1) - lock(file_obj2, LOCK_EX) - unlock(file_obj2) - file_obj1.close() - - -@t.skip.if_win32 -class test_FilesystemLockDuringProcess: - def setup(self): - try: - data_folder_in = tempfile.mkdtemp() - data_folder_out = tempfile.mkdtemp() - control_folder = tempfile.mkdtemp() - except Exception: - pytest.skip("filesystem transport: cannot create tempfiles") - - self.consumer_connection = Connection( - transport="filesystem", - transport_options={ - "data_folder_in": data_folder_in, - "data_folder_out": data_folder_out, - "control_folder": control_folder, - }, - ) - self.consume_channel = self.consumer_connection.channel() - self.produce_connection = Connection( - transport="filesystem", - transport_options={ - "data_folder_in": data_folder_out, - "data_folder_out": data_folder_in, - "control_folder": control_folder, - }, - ) - self.producer_channel = self.produce_connection.channel() - self.exchange = Exchange("filesystem_exchange_lock", type="fanout") - self.q = Queue("queue1", exchange=self.exchange) - - def teardown(self): - # make sure we don't attempt to restore messages at shutdown. - for channel in [self.producer_channel, self.consumer_connection]: - try: - channel._qos._dirty.clear() - except AttributeError: - pass - try: - channel._qos._delivered.clear() - except AttributeError: - pass - - def test_lock_during_process(self): - producer = Producer(self.producer_channel, self.exchange) - - with patch("kombu.transport.filesystem.lock") as lock_m, patch( - "kombu.transport.filesystem.unlock" - ) as unlock_m: - consumer = Consumer(self.consume_channel, self.q) - assert unlock_m.call_count == 1 - lock_m.assert_called_once_with(unlock_m.call_args[0][0], LOCK_EX) - - self.q(self.consume_channel).declare() - with patch("kombu.transport.filesystem.lock") as lock_m, patch( - "kombu.transport.filesystem.unlock" - ) as unlock_m: - producer.publish({"foo": 1}) - assert unlock_m.call_count == 2 - assert lock_m.call_count == 2 - exchange_file_obj = unlock_m.call_args_list[0][0][0] - msg_file_obj = unlock_m.call_args_list[1][0][0] - assert lock_m.call_args_list == [call(exchange_file_obj, LOCK_SH), - call(msg_file_obj, LOCK_EX)] - - def callback(_, message): - message.ack() - - consumer.register_callback(callback) - consumer.consume() - - with patch("kombu.transport.filesystem.lock") as lock_m, patch( - "kombu.transport.filesystem.unlock" - ) as unlock_m: - self.consume_channel.drain_events() - assert lock_m.call_count == 1 - assert unlock_m.call_count == 1 - lock_m.assert_called_once_with(unlock_m.call_args[0][0], LOCK_SH) - - producer.publish({"foo": 0}) - with patch("kombu.transport.filesystem.lock") as lock_m, patch( - "kombu.transport.filesystem.unlock" - ) as unlock_m: - self.q(self.consume_channel).purge() - assert lock_m.call_count == 1 - assert unlock_m.call_count == 1 - lock_m.assert_called_once_with(unlock_m.call_args[0][0], LOCK_EX)