Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add ability to shard the federation sender #7798

Merged
merged 13 commits into from
Jul 10, 2020
13 changes: 7 additions & 6 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,12 @@ def send_read_receipt(self, receipt: ReadReceipt):

# Work out which remote servers should be poked and poke them.
domains = yield self.state.get_current_hosts_in_room(room_id)
domains = [d for d in domains if d != self.server_name]
domains = [
d
for d in domains
if d != self.server_name
and self._federation_shard_config.should_send_to(self._instance_name, d)
]
if not domains:
return

Expand All @@ -346,11 +351,6 @@ def send_read_receipt(self, receipt: ReadReceipt):
self._schedule_rr_flush_for_room(room_id, len(domains))

for domain in domains:
if not self._federation_shard_config.should_send_to(
self._instance_name, domain
):
continue

queue = self._get_per_destination_queue(domain)
queue.queue_read_receipt(receipt)

Expand Down Expand Up @@ -459,6 +459,7 @@ def _process_presence_inner(self, states: List[UserPresenceState]):
for destination in destinations:
if destination == self.server_name:
continue
Comment on lines 460 to 461
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth uplifting the check of whether to not self-send into should_send_to? It seems to be done in many places already!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly, but I think that'll risk conflating different ideas in a way that is going to lead to confusion down the line


if not self._federation_shard_config.should_send_to(
self._instance_name, destination
):
Expand Down
6 changes: 3 additions & 3 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def __init__(
self._instance_name = hs.get_instance_name()
self._federation_shard_config = hs.config.federation.federation_shard_config

self._should_send_on_this_instance = True
if not self._federation_shard_config.should_send_to(
self._instance_name, destination
):
Expand All @@ -86,6 +87,7 @@ def __init__(
logger.error(
"Create a per destination queue for %s on wrong worker", destination,
)
self._should_send_on_this_instance = False

self._destination = destination
self.transmission_loop_running = False
Expand Down Expand Up @@ -192,9 +194,7 @@ def attempt_new_transaction(self) -> None:
logger.debug("TX [%s] Transaction already in progress", self._destination)
return

if not self._federation_shard_config.should_send_to(
self._instance_name, self._destination
):
if self._should_send_on_this_instance:
clokep marked this conversation as resolved.
Show resolved Hide resolved
# We don't raise an exception here to avoid taking out any other
# processing.
logger.error(
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/data_stores/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ def _reset_federation_positions_txn(self, txn):
if not self._send_federation:
return

# Pull out the configured instancs. If we don't have a shard config then
# Pull out the configured instances. If we don't have a shard config then
# we assume that we're the only instance sending.
configured_instances = self._federation_shard_config.instances
if not configured_instances:
Expand Down
14 changes: 12 additions & 2 deletions tests/replication/test_federation_sender_shard.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2019 New Vector Ltd
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,7 +50,6 @@ def prepare(self, reactor, clock, hs):
store = hs.get_datastore()
self.database = store.db

# Make a new HomeServer object for the worker
self.reactor.lookups["testserv"] = "1.2.3.4"

def default_config(self):
Expand All @@ -75,6 +74,8 @@ def make_worker_hs(self, extra_config={}):
store = worker_hs.get_datastore()
store.db._db_pool = self.database._db_pool

# We run this manaully to work around the fact that this doesn't get
# correctly applied when using sqlite :memory: databases.
self.get_success(
store.db.runInteraction("reset", store._reset_federation_positions_txn)
)
Expand Down Expand Up @@ -145,6 +146,9 @@ class FederationSenderTestCase(BaseStreamTestCase):
]

def test_send_event_single_sender(self):
"""Test that using a single federation sender worker correctly sends a
new event.
"""
worker_hs = self.make_worker_hs({"send_federation": True})
mock_client = worker_hs.get_http_client()

Expand All @@ -164,6 +168,9 @@ def test_send_event_single_sender(self):
self.assertTrue(mock_client.put_json.call_args[1]["data"].get("pdus"))

def test_send_event_sharded(self):
"""Test that using two federation sender workers correctly sends
new events.
"""
worker1 = self.make_worker_hs(
{
"send_federation": True,
Expand Down Expand Up @@ -218,6 +225,9 @@ def test_send_event_sharded(self):
self.assertTrue(sent_on_2)

def test_send_typing_sharded(self):
clokep marked this conversation as resolved.
Show resolved Hide resolved
"""Test that using two federation sender workers correctly sends
new typing EDUs.
"""
worker1 = self.make_worker_hs(
{
"send_federation": True,
Expand Down