Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Reject concurrent transactions #9597

Merged
merged 3 commits into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9597.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.20 which caused incoming federation transactions to stack up, causing slow recovery from outages.
77 changes: 42 additions & 35 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ def __init__(self, hs: "HomeServer"):
# with FederationHandlerRegistry.
hs.get_directory_handler()

self._federation_ratelimiter = hs.get_federation_ratelimiter()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should just inline the rate-limiter that used to be shared with this into TransportLayerServer now...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷‍♂️ probably...


self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler")

# origins that we are currently processing a transaction from.
# a dict from origin to txn id.
self._active_transactions = {} # type: Dict[str, str]

# We cache results for transaction with the same ID
self._transaction_resp_cache = ResponseCache(
Expand Down Expand Up @@ -169,6 +170,33 @@ async def on_incoming_transaction(

logger.debug("[%s] Got transaction", transaction_id)

# Reject malformed transactions early: reject if too many PDUs/EDUs
if len(transaction.pdus) > 50 or ( # type: ignore
hasattr(transaction, "edus") and len(transaction.edus) > 100 # type: ignore
):
logger.info("Transaction PDU or EDU count too large. Returning 400")
return 400, {}
Comment on lines +173 to +178
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this up here from _handle_incoming_transaction to keep a sytest (which was sending concurrent transactions) happy, but it seems like generally a good thing to do.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it seems best to do this as early as possible. 👍


# we only process one transaction from each origin at a time. We need to do
# this check here, rather than in _on_incoming_transaction_inner so that we
# don't cache the rejection in _transaction_resp_cache (so that if the txn
# arrives again later, we can process it).
current_transaction = self._active_transactions.get(origin)
if current_transaction and current_transaction != transaction_id:
logger.warning(
"Received another txn %s from %s while still processing %s",
transaction_id,
origin,
current_transaction,
)
return 429, {
"errcode": Codes.UNKNOWN,
"error": "Too many concurrent transactions",
}

# CRITICAL SECTION: we must now not await until we populate _active_transactions
# in _on_incoming_transaction_inner.

# We wrap in a ResponseCache so that we de-duplicate retried
# transactions.
return await self._transaction_resp_cache.wrap(
Expand All @@ -182,26 +210,18 @@ async def on_incoming_transaction(
async def _on_incoming_transaction_inner(
self, origin: str, transaction: Transaction, request_time: int
) -> Tuple[int, Dict[str, Any]]:
# Use a linearizer to ensure that transactions from a remote are
# processed in order.
with await self._transaction_linearizer.queue(origin):
# We rate limit here *after* we've queued up the incoming requests,
# so that we don't fill up the ratelimiter with blocked requests.
#
# This is important as the ratelimiter allows N concurrent requests
# at a time, and only starts ratelimiting if there are more requests
# than that being processed at a time. If we queued up requests in
# the linearizer/response cache *after* the ratelimiting then those
# queued up requests would count as part of the allowed limit of N
# concurrent requests.
with self._federation_ratelimiter.ratelimit(origin) as d:
await d

result = await self._handle_incoming_transaction(
origin, transaction, request_time
)
# CRITICAL SECTION: the first thing we must do (before awaiting) is
# add an entry to _active_transactions.
assert origin not in self._active_transactions
self._active_transactions[origin] = transaction.transaction_id # type: ignore

return result
try:
result = await self._handle_incoming_transaction(
origin, transaction, request_time
)
return result
finally:
del self._active_transactions[origin]

async def _handle_incoming_transaction(
self, origin: str, transaction: Transaction, request_time: int
Expand All @@ -227,19 +247,6 @@ async def _handle_incoming_transaction(

logger.debug("[%s] Transaction is new", transaction.transaction_id) # type: ignore

# Reject if PDU count > 50 or EDU count > 100
if len(transaction.pdus) > 50 or ( # type: ignore
hasattr(transaction, "edus") and len(transaction.edus) > 100 # type: ignore
):

logger.info("Transaction PDU or EDU count too large. Returning 400")

response = {}
await self.transaction_actions.set_response(
origin, transaction, 400, response
)
return 400, response

# We process PDUs and EDUs in parallel. This is important as we don't
# want to block things like to device messages from reaching clients
# behind the potentially expensive handling of PDUs.
Expand Down