Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Clean up code for sending federation EDUs. #5381

Merged
merged 1 commit into from
Jun 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5381.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Clean up code for sending federation EDUs.
40 changes: 26 additions & 14 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,21 @@ def _transaction_transmission_loop(self):

pending_pdus = []
while True:
device_message_edus, device_stream_id, dev_list_id = (
# We have to keep 2 free slots for presence and rr_edus
yield self._get_new_device_messages(MAX_EDUS_PER_TRANSACTION - 2)
# We have to keep 2 free slots for presence and rr_edus
limit = MAX_EDUS_PER_TRANSACTION - 2

device_update_edus, dev_list_id = (
yield self._get_device_update_edus(limit)
)

limit -= len(device_update_edus)

to_device_edus, device_stream_id = (
yield self._get_to_device_message_edus(limit)
)

pending_edus = device_update_edus + to_device_edus

# BEGIN CRITICAL SECTION
#
# In order to avoid a race condition, we need to make sure that
Expand All @@ -208,10 +218,6 @@ def _transaction_transmission_loop(self):
# We can only include at most 50 PDUs per transactions
pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:]

pending_edus = []

# We can only include at most 100 EDUs per transactions
# rr_edus and pending_presence take at most one slot each
pending_edus.extend(self._get_rr_edus(force_flush=False))
pending_presence = self._pending_presence
self._pending_presence = {}
Expand All @@ -232,7 +238,6 @@ def _transaction_transmission_loop(self):
)
)

pending_edus.extend(device_message_edus)
pending_edus.extend(
self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
)
Expand Down Expand Up @@ -272,10 +277,13 @@ def _transaction_transmission_loop(self):
sent_edus_by_type.labels(edu.edu_type).inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if device_message_edus:
if to_device_edus:
yield self._store.delete_device_msgs_for_remote(
self._destination, device_stream_id
)

# also mark the device updates as sent
if device_update_edus:
logger.info(
"Marking as sent %r %r", self._destination, dev_list_id
)
Expand Down Expand Up @@ -347,7 +355,7 @@ def _pop_pending_edus(self, limit):
return pending_edus

@defer.inlineCallbacks
def _get_new_device_messages(self, limit):
def _get_device_update_edus(self, limit):
last_device_list = self._last_device_list_stream_id
# Will return at most 20 entries
now_stream_id, results = yield self._store.get_devices_by_remote(
Expand All @@ -365,22 +373,26 @@ def _get_new_device_messages(self, limit):

assert len(edus) <= limit, "get_devices_by_remote returned too many EDUs"

defer.returnValue((edus, now_stream_id))

@defer.inlineCallbacks
def _get_to_device_message_edus(self, limit):
last_device_stream_id = self._last_device_stream_id
to_device_stream_id = self._store.get_to_device_stream_token()
contents, stream_id = yield self._store.get_new_device_msgs_for_remote(
self._destination,
last_device_stream_id,
to_device_stream_id,
limit - len(edus),
limit,
)
edus.extend(
edus = [
Edu(
origin=self._server_name,
destination=self._destination,
edu_type="m.direct_to_device",
content=content,
)
for content in contents
)
]

defer.returnValue((edus, stream_id, now_stream_id))
defer.returnValue((edus, stream_id))