Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Solve Kombu filesystem transport not thread safe" #1595

Merged
merged 1 commit into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 28 additions & 59 deletions kombu/transport/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def callback(body, message):

import os
import shutil
import signal
import tempfile
import uuid
from collections import namedtuple
Expand All @@ -112,26 +111,6 @@ def callback(body, message):
VERSION = (1, 0, 0)
__version__ = '.'.join(map(str, VERSION))


@contextmanager
def timeout_manager(seconds: int):
def timeout_handler(signum, frame):
# Now that flock retries automatically when interrupted, we need
# an exception to stop it
# This exception will propagate on the main thread,
# make sure you're calling flock there
raise InterruptedError

original_handler = signal.signal(signal.SIGALRM, timeout_handler)

try:
signal.alarm(seconds)
yield
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, original_handler)


# needs win32all to work on Windows
if os.name == 'nt':

Expand Down Expand Up @@ -159,7 +138,7 @@ def unlock(file):
elif os.name == 'posix':

import fcntl
from fcntl import LOCK_EX, LOCK_SH
from fcntl import LOCK_EX, LOCK_NB, LOCK_SH # noqa

def lock(file, flags):
"""Create file lock."""
Expand All @@ -175,21 +154,6 @@ def unlock(file):
'Filesystem plugin only defined for NT and POSIX platforms')


@contextmanager
def lock_with_timeout(file, flags, timeout: int = 1):
with timeout_manager(timeout):
try:
lock(file, flags)
yield
except InterruptedError:
# Catch the exception raised by the handler
# If we weren't raising an exception,
# flock would automatically retry on signals
raise BlockingIOError("Lock timed out")
finally:
unlock(file)


exchange_queue_t = namedtuple("exchange_queue_t",
["routing_key", "pattern", "queue"])

Expand All @@ -204,14 +168,18 @@ def _get_exchange_file_obj(self, exchange, mode="rb"):
file = self.control_folder / f"{exchange}.exchange"
if "w" in mode:
self.control_folder.mkdir(exist_ok=True)
lock_mode = LOCK_EX if "w" in mode else LOCK_SH
f_obj = file.open(mode)

with file.open(mode) as f_obj:
try:
with lock_with_timeout(f_obj, lock_mode):
yield f_obj
except OSError as err:
raise ChannelError(f"Cannot open {file}") from err
try:
if "w" in mode:
lock(f_obj, LOCK_EX)
yield f_obj
except OSError:
raise ChannelError(f"Cannot open {file}")
finally:
if "w" in mode:
unlock(f_obj)
f_obj.close()

def get_table(self, exchange):
try:
Expand Down Expand Up @@ -241,12 +209,15 @@ def _put(self, queue, payload, **kwargs):
filename = os.path.join(self.data_folder_out, filename)

try:
with open(filename, 'wb') as f:
with lock_with_timeout(f, LOCK_EX):
f.write(str_to_bytes(dumps(payload)))
except OSError as err:
f = open(filename, 'wb')
lock(f, LOCK_EX)
f.write(str_to_bytes(dumps(payload)))
except OSError:
raise ChannelError(
f'Cannot add file {filename!r} to directory') from err
f'Cannot add file {filename!r} to directory')
finally:
unlock(f)
f.close()

def _get(self, queue):
"""Get next message from `queue`."""
Expand Down Expand Up @@ -274,14 +245,14 @@ def _get(self, queue):

filename = os.path.join(processed_folder, filename)
try:
with open(filename, 'rb') as f:
with lock_with_timeout(f, LOCK_SH):
payload = f.read()
if not self.store_processed:
os.remove(filename)
except OSError as err:
f = open(filename, 'rb')
payload = f.read()
f.close()
if not self.store_processed:
os.remove(filename)
except OSError:
raise ChannelError(
f'Cannot read file {filename!r} from queue.') from err
f'Cannot read file {filename!r} from queue.')

return loads(bytes_to_str(payload))

Expand All @@ -301,9 +272,7 @@ def _purge(self, queue):
continue

filename = os.path.join(self.data_folder_in, filename)
with open(filename, 'wb') as f:
with lock_with_timeout(f, LOCK_EX):
os.remove(filename)
os.remove(filename)

count += 1

Expand Down
112 changes: 1 addition & 111 deletions t/unit/transport/test_filesystem.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
from __future__ import annotations

import tempfile
from fcntl import LOCK_EX, LOCK_NB, LOCK_SH
from queue import Empty
from unittest.mock import call, patch

import pytest

import t.skip
from kombu import Connection, Consumer, Exchange, Producer, Queue
from kombu.transport.filesystem import lock, unlock


@t.skip.if_win32
class test_FilesystemTransport:

def setup(self):
self.channels = set()
try:
Expand Down Expand Up @@ -147,7 +145,6 @@ def callback2(message_data, message):

@t.skip.if_win32
class test_FilesystemFanout:

def setup(self):
try:
data_folder_in = tempfile.mkdtemp()
Expand Down Expand Up @@ -237,110 +234,3 @@ def callback2(message_data, message):
assert self.q2(self.consume_channel).get()
self.q2(self.consume_channel).purge()
assert self.q2(self.consume_channel).get() is None


@t.skip.if_win32
class test_FilesystemLock:
def test_lock(self):
file_obj1 = tempfile.NamedTemporaryFile()
with open(file_obj1.name) as file_obj2:
lock(file_obj1, LOCK_SH)
with pytest.raises(BlockingIOError):
lock(file_obj2, LOCK_EX | LOCK_NB)

lock(file_obj2, LOCK_SH)
unlock(file_obj2)

unlock(file_obj1)
lock(file_obj2, LOCK_EX)
unlock(file_obj2)
file_obj1.close()


@t.skip.if_win32
class test_FilesystemLockDuringProcess:
def setup(self):
try:
data_folder_in = tempfile.mkdtemp()
data_folder_out = tempfile.mkdtemp()
control_folder = tempfile.mkdtemp()
except Exception:
pytest.skip("filesystem transport: cannot create tempfiles")

self.consumer_connection = Connection(
transport="filesystem",
transport_options={
"data_folder_in": data_folder_in,
"data_folder_out": data_folder_out,
"control_folder": control_folder,
},
)
self.consume_channel = self.consumer_connection.channel()
self.produce_connection = Connection(
transport="filesystem",
transport_options={
"data_folder_in": data_folder_out,
"data_folder_out": data_folder_in,
"control_folder": control_folder,
},
)
self.producer_channel = self.produce_connection.channel()
self.exchange = Exchange("filesystem_exchange_lock", type="fanout")
self.q = Queue("queue1", exchange=self.exchange)

def teardown(self):
# make sure we don't attempt to restore messages at shutdown.
for channel in [self.producer_channel, self.consumer_connection]:
try:
channel._qos._dirty.clear()
except AttributeError:
pass
try:
channel._qos._delivered.clear()
except AttributeError:
pass

def test_lock_during_process(self):
producer = Producer(self.producer_channel, self.exchange)

with patch("kombu.transport.filesystem.lock") as lock_m, patch(
"kombu.transport.filesystem.unlock"
) as unlock_m:
consumer = Consumer(self.consume_channel, self.q)
assert unlock_m.call_count == 1
lock_m.assert_called_once_with(unlock_m.call_args[0][0], LOCK_EX)

self.q(self.consume_channel).declare()
with patch("kombu.transport.filesystem.lock") as lock_m, patch(
"kombu.transport.filesystem.unlock"
) as unlock_m:
producer.publish({"foo": 1})
assert unlock_m.call_count == 2
assert lock_m.call_count == 2
exchange_file_obj = unlock_m.call_args_list[0][0][0]
msg_file_obj = unlock_m.call_args_list[1][0][0]
assert lock_m.call_args_list == [call(exchange_file_obj, LOCK_SH),
call(msg_file_obj, LOCK_EX)]

def callback(_, message):
message.ack()

consumer.register_callback(callback)
consumer.consume()

with patch("kombu.transport.filesystem.lock") as lock_m, patch(
"kombu.transport.filesystem.unlock"
) as unlock_m:
self.consume_channel.drain_events()
assert lock_m.call_count == 1
assert unlock_m.call_count == 1
lock_m.assert_called_once_with(unlock_m.call_args[0][0], LOCK_SH)

producer.publish({"foo": 0})
with patch("kombu.transport.filesystem.lock") as lock_m, patch(
"kombu.transport.filesystem.unlock"
) as unlock_m:
self.q(self.consume_channel).purge()
assert lock_m.call_count == 1
assert unlock_m.call_count == 1
lock_m.assert_called_once_with(unlock_m.call_args[0][0], LOCK_EX)