Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Opentracing end to end encryption #5687

Closed
wants to merge 47 commits into from
Closed
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
823c34a
One tracing decorator to rule them all.
JorikSchellekens Jul 22, 2019
565544b
Trace e2e
JorikSchellekens Jun 28, 2019
1e7099d
Fix e2e bugs
JorikSchellekens Jun 28, 2019
d876cda
Trace more e2e stuff and less e2e stuff
JorikSchellekens Jun 28, 2019
fd669e5
Trace devices
JorikSchellekens Jun 28, 2019
c988c02
Trace device messages.
JorikSchellekens Jun 28, 2019
21940ca
Update to new access pattern
JorikSchellekens Jul 2, 2019
d94897e
Include servletname in incoming-request trace
JorikSchellekens Jul 3, 2019
28113ad
typo
JorikSchellekens Jul 3, 2019
d9f0c7f
How did that half of the statement get deleted?
JorikSchellekens Jul 3, 2019
7ae7e79
These functions were not deferreds!
JorikSchellekens Jul 4, 2019
bfc5005
The great logging/ migration
JorikSchellekens Jul 4, 2019
957cd77
Opentracing across streams
JorikSchellekens Jul 17, 2019
1ed790d
Some tracing
JorikSchellekens Jul 8, 2019
01229a4
Clean up room key tracing
JorikSchellekens Jul 8, 2019
794c9e2
Cleanup key upload tracing
JorikSchellekens Jul 8, 2019
d4bdc2b
Trace key claiming
JorikSchellekens Jul 8, 2019
2fd49ce
Nicer tracing
JorikSchellekens Jul 11, 2019
d44f303
Isort of ran out of puns for this one.
JorikSchellekens Jul 17, 2019
a293759
Though style is subjective it depends on a ruthless objectivity: you …
JorikSchellekens Jul 17, 2019
ab191f9
A little extra device_list tracing
JorikSchellekens Jul 15, 2019
e49487f
Better args wrapper
JorikSchellekens Jul 16, 2019
4824e30
Add user _id
JorikSchellekens Jul 16, 2019
30738e9
newsfile
JorikSchellekens Jul 17, 2019
6944c99
Make sure there is an active span here.
JorikSchellekens Jul 17, 2019
d26cbb4
Unbreak json parsing.
JorikSchellekens Jul 17, 2019
cffba28
Trailing .d
JorikSchellekens Jul 17, 2019
d395650
Use better decorator names.
JorikSchellekens Jul 17, 2019
beea2e3
I wish python had a good type system.
JorikSchellekens Jul 17, 2019
1801578
Use unified trace method
JorikSchellekens Jul 22, 2019
08aaad0
Nicer changelog
JorikSchellekens Aug 5, 2019
8b53f8e
Typo
JorikSchellekens Aug 5, 2019
35eb018
Double negatives do not make code that isn't unclear..
JorikSchellekens Aug 5, 2019
46e41ee
Comment for 'context' column in device_lists_outbound_pokes'
JorikSchellekens Aug 5, 2019
d859c34
Nicer use of dict update methods.
JorikSchellekens Aug 5, 2019
e7f4285
Refactor return value so we don't create identical lists each time.
JorikSchellekens Aug 5, 2019
50964d2
String concatenation without the '+'
JorikSchellekens Aug 5, 2019
0395869
Use underscores.
JorikSchellekens Aug 5, 2019
7ab2088
Remove redundent tagging.
JorikSchellekens Aug 5, 2019
f76b071
Remove redundent spans.
JorikSchellekens Aug 5, 2019
22b7e6a
Debug comments gone rampant.
JorikSchellekens Aug 5, 2019
6a355ca
Refactor for clarity.
JorikSchellekens Aug 5, 2019
bd0ed7b
Docstrings shouldn't lie.
JorikSchellekens Aug 5, 2019
4f36a2d
Create and use a method to get the span context as a dict.
JorikSchellekens Aug 5, 2019
82d6eb1
Remove unused import
JorikSchellekens Aug 5, 2019
a68119e
Merge remote-tracking branch 'origin/develop' into joriks/opentracing…
JorikSchellekens Aug 5, 2019
eee4eff
Bind exception to name
JorikSchellekens Aug 5, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5687.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Opentrace e2e code paths.
JorikSchellekens marked this conversation as resolved.
Show resolved Hide resolved
11 changes: 11 additions & 0 deletions synapse/api/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from twisted.internet import defer

import synapse.logging.opentracing as opentracing
JorikSchellekens marked this conversation as resolved.
Show resolved Hide resolved
import synapse.types
from synapse import event_auth
from synapse.api.constants import EventTypes, JoinRules, Membership
Expand Down Expand Up @@ -178,6 +179,7 @@ def can_federate(self, event, auth_events):
def get_public_keys(self, invite_event):
return event_auth.get_public_keys(invite_event)

@opentracing.trace
@defer.inlineCallbacks
def get_user_by_req(
self, request, allow_guest=False, rights="access", allow_expired=False
Expand Down Expand Up @@ -209,6 +211,10 @@ def get_user_by_req(
user_id, app_service = yield self._get_appservice_user_id(request)
if user_id:
request.authenticated_entity = user_id
opentracing.set_tag("authenticated_entity", user_id)
# there is at least one other place where authenticated entity is
JorikSchellekens marked this conversation as resolved.
Show resolved Hide resolved
# set. user_id is tagged incase authenticated_entity is clobbered
JorikSchellekens marked this conversation as resolved.
Show resolved Hide resolved
opentracing.set_tag("user_id", user_id)

if ip_addr and self.hs.config.track_appservice_user_ips:
yield self.store.insert_client_ip(
Expand Down Expand Up @@ -260,6 +266,11 @@ def get_user_by_req(

request.authenticated_entity = user.to_string()

opentracing.set_tag("authenticated_entity", user.to_string())
richvdh marked this conversation as resolved.
Show resolved Hide resolved
# there is at least one other place where authenticated entity is
# set. user_id is tagged incase authenticated_entity is clobbered
opentracing.set_tag("user_id", user.to_string())

return synapse.types.create_requester(
user, token_id, is_guest, device_id, app_service=app_service
)
Expand Down
18 changes: 12 additions & 6 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from twisted.internet.abstract import isIPAddress
from twisted.python import failure

import synapse.logging.opentracing as opentracing
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import (
AuthError,
Expand Down Expand Up @@ -507,6 +508,7 @@ def on_query_client_keys(self, origin, content):
def on_query_user_devices(self, origin, user_id):
return self.on_query_request("user_devices", user_id)

@opentracing.trace
@defer.inlineCallbacks
@log_function
def on_claim_client_keys(self, origin, content):
Expand All @@ -515,6 +517,9 @@ def on_claim_client_keys(self, origin, content):
for device_id, algorithm in device_keys.items():
query.append((user_id, device_id, algorithm))

opentracing.log_kv(
{"message": "Claiming one time keys.", "user, device pairs": query}
)
results = yield self.store.claim_e2e_one_time_keys(query)

json_result = {}
Expand Down Expand Up @@ -808,12 +813,13 @@ def on_edu(self, edu_type, origin, content):
if not handler:
logger.warn("No handler registered for EDU type %s", edu_type)

try:
yield handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception:
logger.exception("Failed to handle edu %r", edu_type)
with opentracing.start_active_span_from_edu(content, "handle_edu"):
try:
yield handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception:
logger.exception("Failed to handle edu %r", edu_type)

def on_query(self, query_type, args):
handler = self.query_handlers.get(query_type)
Expand Down
209 changes: 128 additions & 81 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
import datetime
import logging

from canonicaljson import json
from prometheus_client import Counter

from twisted.internet import defer

import synapse.logging.opentracing as opentracing
from synapse.api.errors import (
FederationDeniedError,
HttpResponseException,
Expand Down Expand Up @@ -204,97 +206,142 @@ def _transaction_transmission_loop(self):

pending_edus = device_update_edus + to_device_edus

# BEGIN CRITICAL SECTION
#
# In order to avoid a race condition, we need to make sure that
# the following code (from popping the queues up to the point
# where we decide if we actually have any pending messages) is
# atomic - otherwise new PDUs or EDUs might arrive in the
# meantime, but not get sent because we hold the
# transmission_loop_running flag.

pending_pdus = self._pending_pdus

# We can only include at most 50 PDUs per transactions
pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:]

pending_edus.extend(self._get_rr_edus(force_flush=False))
pending_presence = self._pending_presence
self._pending_presence = {}
if pending_presence:
pending_edus.append(
Edu(
origin=self._server_name,
destination=self._destination,
edu_type="m.presence",
content={
"push": [
format_user_presence_state(
presence, self._clock.time_msec()
)
for presence in pending_presence.values()
]
},
)
# Make a transaction sending span, this span follows on from all the
JorikSchellekens marked this conversation as resolved.
Show resolved Hide resolved
# edus in that transaction. This needs to be done because if the edus
# are never received on the remote the span effectively has no causality.

span_contexts = [
opentracing.extract_text_map(
json.loads(
edu.get_dict().get("content", {}).get("context", "{}")
).get("opentracing", {})
)
for edu in pending_edus
]

pending_edus.extend(
self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
)
while (
len(pending_edus) < MAX_EDUS_PER_TRANSACTION
and self._pending_edus_keyed
with opentracing.start_active_span_follows_from(
"send_transaction", span_contexts
):
_, val = self._pending_edus_keyed.popitem()
pending_edus.append(val)

if pending_pdus:
logger.debug(
"TX [%s] len(pending_pdus_by_dest[dest]) = %d",
self._destination,
len(pending_pdus),
# Link each sent edu to this transaction's span
_pending_edus = []
for edu in pending_edus:
edu_dict = edu.get_dict()
span_context = json.loads(
edu_dict.get("content", {}).get("context", "{}")
).get("opentracing", {})
# If there is no span context then we are either blacklisting
# this destination or we are not tracing
if not span_context == {}:
JorikSchellekens marked this conversation as resolved.
Show resolved Hide resolved
if "references" not in span_context:
JorikSchellekens marked this conversation as resolved.
Show resolved Hide resolved
span_context["references"] = [
opentracing.active_span_context_as_string()
]
else:
span_context["references"].append(
opentracing.active_span_context_as_string()
)
edu_dict["content"]["context"] = json.dumps(
{"opentracing": span_context}
)
_pending_edus.append(Edu(**edu_dict))
pending_edus = _pending_edus

# BEGIN CRITICAL SECTION
#
# In order to avoid a race condition, we need to make sure that
# the following code (from popping the queues up to the point
# where we decide if we actually have any pending messages) is
# atomic - otherwise new PDUs or EDUs might arrive in the
# meantime, but not get sent because we hold the
# transmission_loop_running flag.

pending_pdus = self._pending_pdus

# We can only include at most 50 PDUs per transactions
pending_pdus, self._pending_pdus = (
pending_pdus[:50],
pending_pdus[50:],
)

if not pending_pdus and not pending_edus:
logger.debug("TX [%s] Nothing to send", self._destination)
self._last_device_stream_id = device_stream_id
return

# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
pending_edus.extend(self._get_rr_edus(force_flush=True))

# END CRITICAL SECTION

success = yield self._transaction_manager.send_new_transaction(
self._destination, pending_pdus, pending_edus
)
if success:
sent_transactions_counter.inc()
sent_edus_counter.inc(len(pending_edus))
for edu in pending_edus:
sent_edus_by_type.labels(edu.edu_type).inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if to_device_edus:
yield self._store.delete_device_msgs_for_remote(
self._destination, device_stream_id
pending_edus.extend(self._get_rr_edus(force_flush=False))
pending_presence = self._pending_presence
self._pending_presence = {}
if pending_presence:
pending_edus.append(
Edu(
origin=self._server_name,
destination=self._destination,
edu_type="m.presence",
content={
"push": [
format_user_presence_state(
presence, self._clock.time_msec()
)
for presence in pending_presence.values()
]
},
)
)

# also mark the device updates as sent
if device_update_edus:
logger.info(
"Marking as sent %r %r", self._destination, dev_list_id
pending_edus.extend(
self._pop_pending_edus(
MAX_EDUS_PER_TRANSACTION - len(pending_edus)
)
yield self._store.mark_as_sent_devices_by_remote(
self._destination, dev_list_id
)
while (
len(pending_edus) < MAX_EDUS_PER_TRANSACTION
and self._pending_edus_keyed
):
_, val = self._pending_edus_keyed.popitem()
pending_edus.append(val)

if pending_pdus:
logger.debug(
"TX [%s] len(pending_pdus_by_dest[dest]) = %d",
self._destination,
len(pending_pdus),
)

self._last_device_stream_id = device_stream_id
self._last_device_list_stream_id = dev_list_id
else:
break
if not pending_pdus and not pending_edus:
logger.debug("TX [%s] Nothing to send", self._destination)
self._last_device_stream_id = device_stream_id
return

# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
pending_edus.extend(self._get_rr_edus(force_flush=True))

# END CRITICAL SECTION

success = yield self._transaction_manager.send_new_transaction(
self._destination, pending_pdus, pending_edus
)
if success:
sent_transactions_counter.inc()
sent_edus_counter.inc(len(pending_edus))
for edu in pending_edus:
sent_edus_by_type.labels(edu.edu_type).inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if to_device_edus:
yield self._store.delete_device_msgs_for_remote(
self._destination, device_stream_id
)

# also mark the device updates as sent
if device_update_edus:
logger.info(
"Marking as sent %r %r", self._destination, dev_list_id
)
yield self._store.mark_as_sent_devices_by_remote(
self._destination, dev_list_id
)

self._last_device_stream_id = device_stream_id
self._last_device_list_stream_id = dev_list_id
else:
break
except NotRetryingDestination as e:
logger.debug(
"TX [%s] not ready for retry yet (next retry at %s) - "
Expand Down
1 change: 1 addition & 0 deletions synapse/federation/transport/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ async def new_func(request, *args, **kwargs):
opentracing.tags.HTTP_URL: request.get_redacted_uri(),
opentracing.tags.PEER_HOST_IPV6: request.getClientIP(),
"authenticated_entity": origin,
"servlet_name": request.request_metrics.name,
},
):
if origin:
Expand Down
Loading