diff --git a/changelog.d/5687.misc b/changelog.d/5687.misc new file mode 100644 index 000000000000..9d98c4abb157 --- /dev/null +++ b/changelog.d/5687.misc @@ -0,0 +1 @@ +Add OpenTracing instrumentation for e2e code paths. diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 179644852a2c..bfd08db52116 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -22,6 +22,7 @@ from twisted.internet import defer +import synapse.logging.opentracing as opentracing import synapse.types from synapse import event_auth from synapse.api.constants import EventTypes, JoinRules, Membership @@ -178,6 +179,7 @@ def can_federate(self, event, auth_events): def get_public_keys(self, invite_event): return event_auth.get_public_keys(invite_event) + @opentracing.trace @defer.inlineCallbacks def get_user_by_req( self, request, allow_guest=False, rights="access", allow_expired=False @@ -209,6 +211,10 @@ def get_user_by_req( user_id, app_service = yield self._get_appservice_user_id(request) if user_id: request.authenticated_entity = user_id + opentracing.set_tag("authenticated_entity", user_id) + # there is at least one other place where authenticated entity is + # set. user_id is tagged in case authenticated_entity is clobbered + opentracing.set_tag("user_id", user_id) if ip_addr and self.hs.config.track_appservice_user_ips: yield self.store.insert_client_ip( @@ -260,6 +266,11 @@ def get_user_by_req( request.authenticated_entity = user.to_string() + opentracing.set_tag("authenticated_entity", user.to_string()) + # there is at least one other place where authenticated entity is + # set. user_id is tagged incase authenticated_entity is clobbered + opentracing.set_tag("user_id", user.to_string()) + return synapse.types.create_requester( user, token_id, is_guest, device_id, app_service=app_service ) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index d216c46dfee0..2d27ae942bb7 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -25,6 +25,7 @@ from twisted.internet.abstract import isIPAddress from twisted.python import failure +import synapse.logging.opentracing as opentracing from synapse.api.constants import EventTypes, Membership from synapse.api.errors import ( AuthError, @@ -507,6 +508,7 @@ def on_query_client_keys(self, origin, content): def on_query_user_devices(self, origin, user_id): return self.on_query_request("user_devices", user_id) + @opentracing.trace @defer.inlineCallbacks @log_function def on_claim_client_keys(self, origin, content): @@ -515,6 +517,9 @@ def on_claim_client_keys(self, origin, content): for device_id, algorithm in device_keys.items(): query.append((user_id, device_id, algorithm)) + opentracing.log_kv( + {"message": "Claiming one time keys.", "user, device pairs": query} + ) results = yield self.store.claim_e2e_one_time_keys(query) json_result = {} @@ -808,12 +813,13 @@ def on_edu(self, edu_type, origin, content): if not handler: logger.warn("No handler registered for EDU type %s", edu_type) - try: - yield handler(origin, content) - except SynapseError as e: - logger.info("Failed to handle edu %r: %r", edu_type, e) - except Exception: - logger.exception("Failed to handle edu %r", edu_type) + with opentracing.start_active_span_from_edu(content, "handle_edu"): + try: + yield handler(origin, content) + except SynapseError as e: + logger.info("Failed to handle edu %r: %r", edu_type, e) + except Exception: + logger.exception("Failed to handle edu %r", edu_type) def on_query(self, query_type, args): handler = self.query_handlers.get(query_type) diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index fad980b89307..0d7246a613da 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -16,10 +16,12 @@ import datetime import logging +from canonicaljson import json from prometheus_client import Counter from twisted.internet import defer +import synapse.logging.opentracing as opentracing from synapse.api.errors import ( FederationDeniedError, HttpResponseException, @@ -204,97 +206,137 @@ def _transaction_transmission_loop(self): pending_edus = device_update_edus + to_device_edus - # BEGIN CRITICAL SECTION - # - # In order to avoid a race condition, we need to make sure that - # the following code (from popping the queues up to the point - # where we decide if we actually have any pending messages) is - # atomic - otherwise new PDUs or EDUs might arrive in the - # meantime, but not get sent because we hold the - # transmission_loop_running flag. - - pending_pdus = self._pending_pdus - - # We can only include at most 50 PDUs per transactions - pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:] - - pending_edus.extend(self._get_rr_edus(force_flush=False)) - pending_presence = self._pending_presence - self._pending_presence = {} - if pending_presence: - pending_edus.append( - Edu( - origin=self._server_name, - destination=self._destination, - edu_type="m.presence", - content={ - "push": [ - format_user_presence_state( - presence, self._clock.time_msec() - ) - for presence in pending_presence.values() - ] - }, - ) + # Make a transaction sending span, this span follows on from all the + # edus in that transaction. This needs to be done because if the edus + # are never received on the remote the span effectively has no causality. + + span_contexts = [ + opentracing.extract_text_map( + json.loads( + edu.get_dict().get("content", {}).get("context", "{}") + ).get("opentracing", {}) ) + for edu in pending_edus + ] - pending_edus.extend( - self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus)) - ) - while ( - len(pending_edus) < MAX_EDUS_PER_TRANSACTION - and self._pending_edus_keyed + with opentracing.start_active_span_follows_from( + "send_transaction", span_contexts ): - _, val = self._pending_edus_keyed.popitem() - pending_edus.append(val) - - if pending_pdus: - logger.debug( - "TX [%s] len(pending_pdus_by_dest[dest]) = %d", - self._destination, - len(pending_pdus), + # Link each sent edu to this transaction's span + _pending_edus = [] + for edu in pending_edus: + edu_dict = edu.get_dict() + span_context = json.loads( + edu_dict.get("content", {}).get("context", "{}") + ).get("opentracing", {}) + # If there is no span context then we are either blacklisting + # this destination or we are not tracing + if span_context: + span_context.setdefault("references", []).append( + opentracing.active_span_context_as_string() + ) + edu_dict["content"]["context"] = json.dumps( + {"opentracing": span_context} + ) + _pending_edus.append(Edu(**edu_dict)) + pending_edus = _pending_edus + + # BEGIN CRITICAL SECTION + # + # In order to avoid a race condition, we need to make sure that + # the following code (from popping the queues up to the point + # where we decide if we actually have any pending messages) is + # atomic - otherwise new PDUs or EDUs might arrive in the + # meantime, but not get sent because we hold the + # transmission_loop_running flag. + + pending_pdus = self._pending_pdus + + # We can only include at most 50 PDUs per transactions + pending_pdus, self._pending_pdus = ( + pending_pdus[:50], + pending_pdus[50:], ) - if not pending_pdus and not pending_edus: - logger.debug("TX [%s] Nothing to send", self._destination) - self._last_device_stream_id = device_stream_id - return - - # if we've decided to send a transaction anyway, and we have room, we - # may as well send any pending RRs - if len(pending_edus) < MAX_EDUS_PER_TRANSACTION: - pending_edus.extend(self._get_rr_edus(force_flush=True)) - - # END CRITICAL SECTION - - success = yield self._transaction_manager.send_new_transaction( - self._destination, pending_pdus, pending_edus - ) - if success: - sent_transactions_counter.inc() - sent_edus_counter.inc(len(pending_edus)) - for edu in pending_edus: - sent_edus_by_type.labels(edu.edu_type).inc() - # Remove the acknowledged device messages from the database - # Only bother if we actually sent some device messages - if to_device_edus: - yield self._store.delete_device_msgs_for_remote( - self._destination, device_stream_id + pending_edus.extend(self._get_rr_edus(force_flush=False)) + pending_presence = self._pending_presence + self._pending_presence = {} + if pending_presence: + pending_edus.append( + Edu( + origin=self._server_name, + destination=self._destination, + edu_type="m.presence", + content={ + "push": [ + format_user_presence_state( + presence, self._clock.time_msec() + ) + for presence in pending_presence.values() + ] + }, + ) ) - # also mark the device updates as sent - if device_update_edus: - logger.info( - "Marking as sent %r %r", self._destination, dev_list_id + pending_edus.extend( + self._pop_pending_edus( + MAX_EDUS_PER_TRANSACTION - len(pending_edus) ) - yield self._store.mark_as_sent_devices_by_remote( - self._destination, dev_list_id + ) + while ( + len(pending_edus) < MAX_EDUS_PER_TRANSACTION + and self._pending_edus_keyed + ): + _, val = self._pending_edus_keyed.popitem() + pending_edus.append(val) + + if pending_pdus: + logger.debug( + "TX [%s] len(pending_pdus_by_dest[dest]) = %d", + self._destination, + len(pending_pdus), ) - self._last_device_stream_id = device_stream_id - self._last_device_list_stream_id = dev_list_id - else: - break + if not pending_pdus and not pending_edus: + logger.debug("TX [%s] Nothing to send", self._destination) + self._last_device_stream_id = device_stream_id + return + + # if we've decided to send a transaction anyway, and we have room, we + # may as well send any pending RRs + if len(pending_edus) < MAX_EDUS_PER_TRANSACTION: + pending_edus.extend(self._get_rr_edus(force_flush=True)) + + # END CRITICAL SECTION + + success = yield self._transaction_manager.send_new_transaction( + self._destination, pending_pdus, pending_edus + ) + if success: + sent_transactions_counter.inc() + sent_edus_counter.inc(len(pending_edus)) + for edu in pending_edus: + sent_edus_by_type.labels(edu.edu_type).inc() + # Remove the acknowledged device messages from the database + # Only bother if we actually sent some device messages + if to_device_edus: + yield self._store.delete_device_msgs_for_remote( + self._destination, device_stream_id + ) + + # also mark the device updates as sent + if device_update_edus: + logger.info( + "Marking as sent %r %r", self._destination, dev_list_id + ) + yield self._store.mark_as_sent_devices_by_remote( + self._destination, dev_list_id + ) + + self._last_device_stream_id = device_stream_id + self._last_device_list_stream_id = dev_list_id + else: + break except NotRetryingDestination as e: logger.debug( "TX [%s] not ready for retry yet (next retry at %s) - " diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index ea4e1b6d0f28..3a80083d0a9a 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -297,6 +297,7 @@ async def new_func(request, *args, **kwargs): opentracing.tags.HTTP_URL: request.get_redacted_uri(), opentracing.tags.PEER_HOST_IPV6: request.getClientIP(), "authenticated_entity": origin, + "servlet_name": request.request_metrics.name, }, ): if origin: diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 5c1cf83c9dd1..d7e1935d0f11 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -18,6 +18,7 @@ from twisted.internet import defer +import synapse.logging.opentracing as opentracing from synapse.api import errors from synapse.api.constants import EventTypes from synapse.api.errors import ( @@ -45,6 +46,7 @@ def __init__(self, hs): self.state = hs.get_state_handler() self._auth_handler = hs.get_auth_handler() + @opentracing.trace @defer.inlineCallbacks def get_devices_by_user(self, user_id): """ @@ -56,6 +58,7 @@ def get_devices_by_user(self, user_id): defer.Deferred: list[dict[str, X]]: info on each device """ + opentracing.set_tag("user_id", user_id) device_map = yield self.store.get_devices_by_user(user_id) ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None) @@ -64,8 +67,10 @@ def get_devices_by_user(self, user_id): for device in devices: _update_device_from_client_ips(device, ips) + opentracing.log_kv(device_map) return devices + @opentracing.trace @defer.inlineCallbacks def get_device(self, user_id, device_id): """ Retrieve the given device @@ -85,9 +90,14 @@ def get_device(self, user_id, device_id): raise errors.NotFoundError ips = yield self.store.get_last_client_ip_by_device(user_id, device_id) _update_device_from_client_ips(device, ips) + + opentracing.set_tag("device", device) + opentracing.set_tag("ips", ips) + return device @measure_func("device.get_user_ids_changed") + @opentracing.trace @defer.inlineCallbacks def get_user_ids_changed(self, user_id, from_token): """Get list of users that have had the devices updated, or have newly @@ -97,6 +107,9 @@ def get_user_ids_changed(self, user_id, from_token): user_id (str) from_token (StreamToken) """ + + opentracing.set_tag("user_id", user_id) + opentracing.set_tag("from_token", from_token) now_room_key = yield self.store.get_room_events_max_id() room_ids = yield self.store.get_rooms_for_user(user_id) @@ -133,7 +146,7 @@ def get_user_ids_changed(self, user_id, from_token): if etype != EventTypes.Member: continue possibly_left.add(state_key) - continue + continue # Fetch the current state at the time. try: @@ -148,6 +161,9 @@ def get_user_ids_changed(self, user_id, from_token): # special-case for an empty prev state: include all members # in the changed list if not event_ids: + opentracing.log_kv( + {"event": "encountered empty previous state", "room_id": room_id} + ) for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: @@ -200,7 +216,11 @@ def get_user_ids_changed(self, user_id, from_token): possibly_joined = [] possibly_left = [] - return {"changed": list(possibly_joined), "left": list(possibly_left)} + result = {"changed": list(possibly_joined), "left": list(possibly_left)} + + opentracing.log_kv(result) + + return {result} class DeviceHandler(DeviceWorkerHandler): @@ -267,6 +287,7 @@ def check_device_registered( raise errors.StoreError(500, "Couldn't generate a device ID.") + @opentracing.trace @defer.inlineCallbacks def delete_device(self, user_id, device_id): """ Delete the given device @@ -284,6 +305,10 @@ def delete_device(self, user_id, device_id): except errors.StoreError as e: if e.code == 404: # no match + opentracing.set_tag("error", True) + opentracing.log_kv( + {"reason": "User doesn't have device id.", "device_id": device_id} + ) pass else: raise @@ -296,6 +321,7 @@ def delete_device(self, user_id, device_id): yield self.notify_device_update(user_id, [device_id]) + @opentracing.trace @defer.inlineCallbacks def delete_all_devices_for_user(self, user_id, except_device_id=None): """Delete all of the user's devices @@ -331,6 +357,8 @@ def delete_devices(self, user_id, device_ids): except errors.StoreError as e: if e.code == 404: # no match + opentracing.set_tag("error", True) + opentracing.set_tag("reason", "User doesn't have that device id.") pass else: raise @@ -371,6 +399,7 @@ def update_device(self, user_id, device_id, content): else: raise + @opentracing.trace @measure_func("notify_device_update") @defer.inlineCallbacks def notify_device_update(self, user_id, device_ids): @@ -386,6 +415,8 @@ def notify_device_update(self, user_id, device_ids): hosts.update(get_domain_from_id(u) for u in users_who_share_room) hosts.discard(self.server_name) + opentracing.set_tag("hosts to update", hosts) + position = yield self.store.add_device_change_to_streams( user_id, device_ids, list(hosts) ) @@ -405,6 +436,9 @@ def notify_device_update(self, user_id, device_ids): ) for host in hosts: self.federation_sender.send_device_messages(host) + opentracing.log_kv( + {"message": "sent device update to host", "host": host} + ) @defer.inlineCallbacks def on_federation_query_user_devices(self, user_id): @@ -451,12 +485,15 @@ def __init__(self, hs, device_handler): iterable=True, ) + @opentracing.trace @defer.inlineCallbacks def incoming_device_list_update(self, origin, edu_content): """Called on incoming device list update from federation. Responsible for parsing the EDU and adding to pending updates list. """ + opentracing.set_tag("origin", origin) + opentracing.set_tag("edu_content", edu_content) user_id = edu_content.pop("user_id") device_id = edu_content.pop("device_id") stream_id = str(edu_content.pop("stream_id")) # They may come as ints @@ -471,12 +508,30 @@ def incoming_device_list_update(self, origin, edu_content): device_id, origin, ) + + opentracing.set_tag("error", True) + opentracing.log_kv( + { + "message": "Got a device list update edu from a user and " + "device which does not match the origin of the request.", + "user_id": user_id, + "device_id": device_id, + } + ) return room_ids = yield self.store.get_rooms_for_user(user_id) if not room_ids: # We don't share any rooms with this user. Ignore update, as we # probably won't get any further updates. + opentracing.set_tag("error", True) + opentracing.log_kv( + { + "message": "Got an update from a user for which " + "we don't share any rooms", + "other user_id": user_id, + } + ) logger.warning( "Got device list update edu for %r/%r, but don't share a room", user_id, @@ -578,6 +633,7 @@ def user_device_resync(self, user_id): request: https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid """ + opentracing.log_kv({"message": "Doing resync to update device list."}) # Fetch all devices for the user. origin = get_domain_from_id(user_id) try: @@ -594,13 +650,20 @@ def user_device_resync(self, user_id): # eventually become consistent. return except FederationDeniedError as e: + opentracing.set_tag("error", True) + opentracing.log_kv({"reason": "FederationDeniedError"}) logger.info(e) return - except Exception: + except Exception as e: # TODO: Remember that we are now out of sync and try again # later + opentracing.set_tag("error", True) + opentracing.log_kv( + {"message": "Exception raised by federation request", "exception": e} + ) logger.exception("Failed to handle device list update for %s", user_id) return + opentracing.log_kv({"result": result}) stream_id = result["stream_id"] devices = result["devices"] diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index e1ebb6346c3a..424965368e55 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -15,8 +15,11 @@ import logging +from canonicaljson import json + from twisted.internet import defer +import synapse.logging.opentracing as opentracing from synapse.api.errors import SynapseError from synapse.types import UserID, get_domain_from_id from synapse.util.stringutils import random_string @@ -78,7 +81,8 @@ def on_direct_to_device_edu(self, origin, content): @defer.inlineCallbacks def send_device_message(self, sender_user_id, message_type, messages): - + opentracing.set_tag("number_of_messages", len(messages)) + opentracing.set_tag("sender", sender_user_id) local_messages = {} remote_messages = {} for user_id, by_device in messages.items(): @@ -100,15 +104,24 @@ def send_device_message(self, sender_user_id, message_type, messages): message_id = random_string(16) + context = {"opentracing": {}} + opentracing.inject_active_span_text_map(context["opentracing"]) + remote_edu_contents = {} for destination, messages in remote_messages.items(): - remote_edu_contents[destination] = { - "messages": messages, - "sender": sender_user_id, - "type": message_type, - "message_id": message_id, - } + with opentracing.start_active_span("to_device_for_user"): + opentracing.set_tag("destination", destination) + remote_edu_contents[destination] = { + "messages": messages, + "sender": sender_user_id, + "type": message_type, + "message_id": message_id, + "context": json.dumps(context) + if opentracing.whitelisted_homeserver(destination) + else "{}", + } + opentracing.log_kv({"local_messages": local_messages}) stream_id = yield self.store.add_messages_to_device_inbox( local_messages, remote_edu_contents ) @@ -117,6 +130,7 @@ def send_device_message(self, sender_user_id, message_type, messages): "to_device_key", stream_id, users=local_messages.keys() ) + opentracing.log_kv({"remote_messages": remote_messages}) for destination in remote_messages.keys(): # Enqueue a new federation transaction to send the new # device messages to each remote destination. diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 1f90b0d27864..5aed78e5cc08 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -22,6 +22,7 @@ from twisted.internet import defer +import synapse.logging.opentracing as opentracing from synapse.api.errors import CodeMessageException, SynapseError from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.types import UserID, get_domain_from_id @@ -46,6 +47,7 @@ def __init__(self, hs): "client_keys", self.on_federation_query_client_keys ) + @opentracing.trace @defer.inlineCallbacks def query_devices(self, query_body, timeout): """ Handle a device key query from a client @@ -81,6 +83,9 @@ def query_devices(self, query_body, timeout): else: remote_queries[user_id] = device_ids + opentracing.set_tag("local_key_query", local_query) + opentracing.set_tag("remote_key_query", remote_queries) + # First get local devices. failures = {} results = {} @@ -121,6 +126,7 @@ def query_devices(self, query_body, timeout): r[user_id] = remote_queries[user_id] # Now fetch any devices that we don't have in our cache + @opentracing.trace @defer.inlineCallbacks def do_remote_query(destination): """This is called when we are querying the device list of a user on @@ -131,6 +137,7 @@ def do_remote_query(destination): destination_query = remote_queries_not_in_cache[destination] + opentracing.set_tag("key_query", destination_query) # We first consider whether we wish to update the device list cache with # the users device list. We want to track a user's devices when the # authenticated user shares a room with the queried user and the query @@ -185,6 +192,8 @@ def do_remote_query(destination): except Exception as e: failure = _exception_to_failure(e) failures[destination] = failure + opentracing.set_tag("error", True) + opentracing.set_tag("reason", failure) yield make_deferred_yieldable( defer.gatherResults( @@ -198,6 +207,7 @@ def do_remote_query(destination): return {"device_keys": results, "failures": failures} + @opentracing.trace @defer.inlineCallbacks def query_local_devices(self, query): """Get E2E device keys for local users @@ -210,6 +220,7 @@ def query_local_devices(self, query): defer.Deferred: (resolves to dict[string, dict[string, dict]]): map from user_id -> device_id -> device details """ + opentracing.set_tag("local_query", query) local_query = [] result_dict = {} @@ -217,6 +228,14 @@ def query_local_devices(self, query): # we use UserID.from_string to catch invalid user ids if not self.is_mine(UserID.from_string(user_id)): logger.warning("Request for keys for non-local user %s", user_id) + opentracing.log_kv( + { + "message": "Requested a local key for a user which" + + " was not local to the homeserver", + "user_id": user_id, + } + ) + opentracing.set_tag("error", True) raise SynapseError(400, "Not a user here") if not device_ids: @@ -241,6 +260,7 @@ def query_local_devices(self, query): r["unsigned"]["device_display_name"] = display_name result_dict[user_id][device_id] = r + opentracing.log_kv(results) return result_dict @defer.inlineCallbacks @@ -251,6 +271,7 @@ def on_federation_query_client_keys(self, query_body): res = yield self.query_local_devices(device_keys_query) return {"device_keys": res} + @opentracing.trace @defer.inlineCallbacks def claim_one_time_keys(self, query, timeout): local_query = [] @@ -265,6 +286,9 @@ def claim_one_time_keys(self, query, timeout): domain = get_domain_from_id(user_id) remote_queries.setdefault(domain, {})[user_id] = device_keys + opentracing.set_tag("local_key_query", local_query) + opentracing.set_tag("remote_key_query", remote_queries) + results = yield self.store.claim_e2e_one_time_keys(local_query) json_result = {} @@ -276,8 +300,10 @@ def claim_one_time_keys(self, query, timeout): key_id: json.loads(json_bytes) } + @opentracing.trace @defer.inlineCallbacks def claim_client_keys(destination): + opentracing.set_tag("destination", destination) device_keys = remote_queries[destination] try: remote_result = yield self.federation.claim_client_keys( @@ -290,6 +316,8 @@ def claim_client_keys(destination): except Exception as e: failure = _exception_to_failure(e) failures[destination] = failure + opentracing.set_tag("error", True) + opentracing.set_tag("reason", failure) yield make_deferred_yieldable( defer.gatherResults( @@ -313,9 +341,11 @@ def claim_client_keys(destination): ), ) + opentracing.log_kv({"one_time_keys": json_result, "failures": failures}) return {"one_time_keys": json_result, "failures": failures} @defer.inlineCallbacks + @opentracing.tag_args def upload_keys_for_user(self, user_id, device_id, keys): time_now = self.clock.time_msec() @@ -329,6 +359,13 @@ def upload_keys_for_user(self, user_id, device_id, keys): user_id, time_now, ) + opentracing.log_kv( + { + "message": "Updating device_keys for user.", + "user_id": user_id, + "device_id": device_id, + } + ) # TODO: Sign the JSON with the server key changed = yield self.store.set_e2e_device_keys( user_id, device_id, time_now, device_keys @@ -336,12 +373,26 @@ def upload_keys_for_user(self, user_id, device_id, keys): if changed: # Only notify about device updates *if* the keys actually changed yield self.device_handler.notify_device_update(user_id, [device_id]) - + else: + opentracing.log_kv( + {"message": "Not updating device_keys for user", "user_id": user_id} + ) one_time_keys = keys.get("one_time_keys", None) if one_time_keys: + opentracing.log_kv( + { + "message": "Updating one_time_keys for device.", + "user_id": user_id, + "device_id": device_id, + } + ) yield self._upload_one_time_keys_for_user( user_id, device_id, time_now, one_time_keys ) + else: + opentracing.log_kv( + {"message": "Did not update one_time_keys", "reason": "no keys given"} + ) # the device should have been registered already, but it may have been # deleted due to a race with a DELETE request. Or we may be using an @@ -352,6 +403,7 @@ def upload_keys_for_user(self, user_id, device_id, keys): result = yield self.store.count_e2e_one_time_keys(user_id, device_id) + opentracing.set_tag("one_time_key_counts", result) return {"one_time_key_counts": result} @defer.inlineCallbacks @@ -395,6 +447,9 @@ def _upload_one_time_keys_for_user( (algorithm, key_id, encode_canonical_json(key).decode("ascii")) ) + opentracing.log_kv( + {"message": "Inserting new one_time_keys.", "keys": new_keys} + ) yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys) diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index 41b871fc5953..f6a95bce18d7 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -19,6 +19,7 @@ from twisted.internet import defer +import synapse.logging.opentracing as opentracing from synapse.api.errors import ( Codes, NotFoundError, @@ -49,6 +50,7 @@ def __init__(self, hs): # changed. self._upload_linearizer = Linearizer("upload_room_keys_lock") + @opentracing.trace @defer.inlineCallbacks def get_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk get the E2E room keys for a given backup, optionally filtered to a given @@ -84,8 +86,10 @@ def get_room_keys(self, user_id, version, room_id=None, session_id=None): user_id, version, room_id, session_id ) + opentracing.log_kv(results) return results + @opentracing.trace @defer.inlineCallbacks def delete_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk delete the E2E room keys for a given backup, optionally filtered to a given @@ -107,6 +111,7 @@ def delete_room_keys(self, user_id, version, room_id=None, session_id=None): with (yield self._upload_linearizer.queue(user_id)): yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id) + @opentracing.trace @defer.inlineCallbacks def upload_room_keys(self, user_id, version, room_keys): """Bulk upload a list of room keys into a given backup version, asserting @@ -186,7 +191,14 @@ def _upload_room_key(self, user_id, version, room_id, session_id, room_key): session_id(str): the session whose room_key we're setting room_key(dict): the room_key being set """ - + opentracing.log_kv( + { + "message": "Trying to upload room key", + "room_id": room_id, + "session_id": session_id, + "user_id": user_id, + } + ) # get the room_key for this particular row current_room_key = None try: @@ -195,14 +207,23 @@ def _upload_room_key(self, user_id, version, room_id, session_id, room_key): ) except StoreError as e: if e.code == 404: - pass + opentracing.log_kv( + { + "message": "Room key not found.", + "room_id": room_id, + "user_id": user_id, + } + ) else: raise if self._should_replace_room_key(current_room_key, room_key): + opentracing.log_kv({"message": "Replacing room key."}) yield self.store.set_e2e_room_key( user_id, version, room_id, session_id, room_key ) + else: + opentracing.log_kv({"message": "Not replacing room_key."}) @staticmethod def _should_replace_room_key(current_room_key, room_key): @@ -236,6 +257,7 @@ def _should_replace_room_key(current_room_key, room_key): return False return True + @opentracing.trace @defer.inlineCallbacks def create_version(self, user_id, version_info): """Create a new backup version. This automatically becomes the new @@ -294,6 +316,7 @@ def get_version_info(self, user_id, version=None): raise return res + @opentracing.trace @defer.inlineCallbacks def delete_version(self, user_id, version=None): """Deletes a given version of the user's e2e_room_keys backup @@ -314,6 +337,7 @@ def delete_version(self, user_id, version=None): else: raise + @opentracing.trace @defer.inlineCallbacks def update_version(self, user_id, version, version_info): """Update the info about a given version of the user's backup diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index d2c209c471fa..bbc609a56973 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -527,6 +527,31 @@ def inject_active_span_text_map(carrier, destination=None): ) +def get_active_span_text_map(destination=None): + """ + Gets a span context as a dict. This can be used instead of injecting a span + into an empty carrier. + + Args: + destination (str): the name of the remote server. The dict will only + contain a span context if the destination matches the homeserver_whitelist + or if destination is None. + + Returns: + A dict containing the span context. + """ + + if not opentracing or (destination and not whitelisted_homeserver(destination)): + return {} + + carrier = {} + opentracing.tracer.inject( + opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier + ) + + return carrier + + def active_span_context_as_string(): """ Returns: diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 6008adec7cf3..3282b5589291 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -17,6 +17,7 @@ from twisted.internet import defer +import synapse.logging.opentracing as opentracing from synapse.api.errors import SynapseError from synapse.http.servlet import ( RestServlet, @@ -68,6 +69,7 @@ def __init__(self, hs): self.auth = hs.get_auth() self.e2e_keys_handler = hs.get_e2e_keys_handler() + @opentracing.trace_using_operation_name("upload_keys") @defer.inlineCallbacks def on_POST(self, request, device_id): requester = yield self.auth.get_user_by_req(request, allow_guest=True) @@ -78,6 +80,14 @@ def on_POST(self, request, device_id): # passing the device_id here is deprecated; however, we allow it # for now for compatibility with older clients. if requester.device_id is not None and device_id != requester.device_id: + opentracing.set_tag("error", True) + opentracing.log_kv( + { + "message": "Client uploading keys for a different device", + "logged_in_id": requester.device_id, + "key_being_uploaded": device_id, + } + ) logger.warning( "Client uploading keys for a different device " "(logged in as %s, uploading for %s)", @@ -178,10 +188,11 @@ def on_GET(self, request): requester = yield self.auth.get_user_by_req(request, allow_guest=True) from_token_string = parse_string(request, "from") + opentracing.set_tag("from", from_token_string) # We want to enforce they do pass us one, but we ignore it and return # changes after the "to" as well as before. - parse_string(request, "to") + opentracing.set_tag("to", parse_string(request, "to")) from_token = StreamToken.from_string(from_token_string) diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index 2613648d821b..db8103bd5dba 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -17,6 +17,7 @@ from twisted.internet import defer +import synapse.logging.opentracing as opentracing from synapse.http import servlet from synapse.http.servlet import parse_json_object_from_request from synapse.rest.client.transactions import HttpTransactionCache @@ -42,7 +43,10 @@ def __init__(self, hs): self.txns = HttpTransactionCache(hs) self.device_message_handler = hs.get_device_message_handler() + @opentracing.trace_using_operation_name("sendToDevice") def on_PUT(self, request, message_type, txn_id): + opentracing.set_tag("message_type", message_type) + opentracing.set_tag("txn_id", txn_id) return self.txns.fetch_or_execute_request( request, self._put, request, message_type, txn_id ) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 79bb0ea46db5..5f09b86bf0ac 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -19,6 +19,7 @@ from twisted.internet import defer +import synapse.logging.opentracing as opentracing from synapse.storage._base import SQLBaseStore from synapse.storage.background_updates import BackgroundUpdateStore from synapse.util.caches.expiringcache import ExpiringCache @@ -72,6 +73,7 @@ def get_new_messages_for_device_txn(txn): "get_new_messages_for_device", get_new_messages_for_device_txn ) + @opentracing.trace @defer.inlineCallbacks def delete_messages_for_device(self, user_id, device_id, up_to_stream_id): """ @@ -87,11 +89,15 @@ def delete_messages_for_device(self, user_id, device_id, up_to_stream_id): last_deleted_stream_id = self._last_device_delete_cache.get( (user_id, device_id), None ) + + opentracing.set_tag("last_deleted_stream_id", last_deleted_stream_id) + if last_deleted_stream_id: has_changed = self._device_inbox_stream_cache.has_entity_changed( user_id, last_deleted_stream_id ) if not has_changed: + opentracing.log_kv({"message": "No changes in cache since last check"}) return 0 def delete_messages_for_device_txn(txn): @@ -107,6 +113,10 @@ def delete_messages_for_device_txn(txn): "delete_messages_for_device", delete_messages_for_device_txn ) + opentracing.log_kv( + {"message": "deleted {} messages for device".format(count), "count": count} + ) + # Update the cache, ensuring that we only ever increase the value last_deleted_stream_id = self._last_device_delete_cache.get( (user_id, device_id), 0 @@ -117,6 +127,7 @@ def delete_messages_for_device_txn(txn): return count + @opentracing.trace def get_new_device_msgs_for_remote( self, destination, last_stream_id, current_stream_id, limit ): @@ -132,16 +143,23 @@ def get_new_device_msgs_for_remote( in the stream the messages got to. """ + opentracing.set_tag("destination", destination) + opentracing.set_tag("last_stream_id", last_stream_id) + opentracing.set_tag("current_stream_id", current_stream_id) + opentracing.set_tag("limit", limit) + has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( destination, last_stream_id ) if not has_changed or last_stream_id == current_stream_id: + opentracing.log_kv({"message": "No new messages in stream"}) return defer.succeed(([], current_stream_id)) if limit <= 0: # This can happen if we run out of room for EDUs in the transaction. return defer.succeed(([], last_stream_id)) + @opentracing.trace def get_new_messages_for_remote_destination_txn(txn): sql = ( "SELECT stream_id, messages_json FROM device_federation_outbox" @@ -156,6 +174,9 @@ def get_new_messages_for_remote_destination_txn(txn): stream_pos = row[0] messages.append(json.loads(row[1])) if len(messages) < limit: + opentracing.log_kv( + {"message": "Set stream position to current position"} + ) stream_pos = current_stream_id return (messages, stream_pos) @@ -164,6 +185,7 @@ def get_new_messages_for_remote_destination_txn(txn): get_new_messages_for_remote_destination_txn, ) + @opentracing.trace def delete_device_msgs_for_remote(self, destination, up_to_stream_id): """Used to delete messages when the remote destination acknowledges their receipt. @@ -214,6 +236,7 @@ def __init__(self, db_conn, hs): expiry_ms=30 * 60 * 1000, ) + @opentracing.trace @defer.inlineCallbacks def add_messages_to_device_inbox( self, local_messages_by_user_then_device, remote_messages_by_destination diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 8f72d9289555..724c5b76222e 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -20,6 +20,7 @@ from twisted.internet import defer +import synapse.logging.opentracing as opentracing from synapse.api.errors import StoreError from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import Cache, SQLBaseStore, db_to_json @@ -73,6 +74,7 @@ def get_devices_by_user(self, user_id): return {d["device_id"]: d for d in devices} + @opentracing.trace @defer.inlineCallbacks def get_devices_by_remote(self, destination, from_stream_id, limit): """Get stream of updates to send to remote servers @@ -127,8 +129,10 @@ def get_devices_by_remote(self, destination, from_stream_id, limit): # (user_id, device_id) entries into a map, with the value being # the max stream_id across each set of duplicate entries # - # maps (user_id, device_id) -> stream_id + # maps (user_id, device_id) -> (stream_id, context) # as long as their stream_id does not match that of the last row + # where context is any metadata about the message's context such as + # opentracing data query_map = {} for update in updates: if stream_id_cutoff is not None and update[2] >= stream_id_cutoff: @@ -136,7 +140,10 @@ def get_devices_by_remote(self, destination, from_stream_id, limit): break key = (update[0], update[1]) - query_map[key] = max(query_map.get(key, 0), update[2]) + context = update[3] + stream_id = max(query_map.get(key, (0, None))[0], update[2]) + + query_map[key] = (stream_id, context) # If we didn't find any updates with a stream_id lower than the cutoff, it # means that there are more than limit updates all of which have the same @@ -171,7 +178,7 @@ def _get_devices_by_remote_txn( List: List of device updates """ sql = """ - SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes + SELECT user_id, device_id, stream_id, context FROM device_lists_outbound_pokes WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ? ORDER BY stream_id LIMIT ? @@ -187,8 +194,8 @@ def _get_device_update_edus_by_remote(self, destination, from_stream_id, query_m Args: destination (str): The host the device updates are intended for from_stream_id (int): The minimum stream_id to filter updates by, exclusive - query_map (Dict[(str, str): int]): Dictionary mapping - user_id/device_id to update stream_id + query_map (Dict[(str, str): (int, str)]): Dictionary mapping + user_id/device_id to update stream_id and the relevent opentracing context Returns: List[Dict]: List of objects representing an device update EDU @@ -210,12 +217,15 @@ def _get_device_update_edus_by_remote(self, destination, from_stream_id, query_m destination, user_id, from_stream_id ) for device_id, device in iteritems(user_devices): - stream_id = query_map[(user_id, device_id)] + stream_id, _ = query_map[(user_id, device_id)] result = { "user_id": user_id, "device_id": device_id, "prev_id": [prev_id] if prev_id else [], "stream_id": stream_id, + "context": query_map[(user_id, device_id)][1] + if opentracing.whitelisted_homeserver(destination) + else "{}", } prev_id = stream_id @@ -299,6 +309,7 @@ def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): def get_device_stream_token(self): return self._device_list_id_gen.get_current_token() + @opentracing.trace @defer.inlineCallbacks def get_user_devices_from_cache(self, query_list): """Get the devices (and keys if any) for remote users from the cache. @@ -330,6 +341,9 @@ def get_user_devices_from_cache(self, query_list): else: results[user_id] = yield self._get_cached_devices_for_user(user_id) + opentracing.set_tag("in_cache", results) + opentracing.set_tag("not_in_cache", user_ids_not_in_cache) + return (user_ids_not_in_cache, results) @cachedInlineCallbacks(num_args=2, tree=True) @@ -814,6 +828,8 @@ def _add_device_change_txn(self, txn, user_id, device_ids, hosts, stream_id): ], ) + context = {"opentracing": opentracing.get_active_span_text_map()} + self._simple_insert_many_txn( txn, table="device_lists_outbound_pokes", @@ -825,6 +841,9 @@ def _add_device_change_txn(self, txn, user_id, device_ids, hosts, stream_id): "device_id": device_id, "sent": False, "ts": now, + "context": json.dumps(context) + if opentracing.whitelisted_homeserver(destination) + else "{}", } for destination in hosts for device_id in device_ids diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py index 99128f2df70e..4a6b2be6f617 100644 --- a/synapse/storage/e2e_room_keys.py +++ b/synapse/storage/e2e_room_keys.py @@ -17,6 +17,7 @@ from twisted.internet import defer +import synapse.logging.opentracing as opentracing from synapse.api.errors import StoreError from ._base import SQLBaseStore @@ -94,7 +95,16 @@ def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key): }, lock=False, ) + opentracing.log_kv( + { + "message": "Set room key", + "room_id": room_id, + "session_id": session_id, + "room_key": room_key, + } + ) + @opentracing.trace @defer.inlineCallbacks def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk get the E2E room keys for a given backup, optionally filtered to a given @@ -153,6 +163,7 @@ def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): return sessions + @opentracing.trace @defer.inlineCallbacks def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk delete the E2E room keys for a given backup, optionally filtered to a given @@ -236,6 +247,7 @@ def _get_e2e_room_keys_version_info_txn(txn): "get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn ) + @opentracing.trace def create_e2e_room_keys_version(self, user_id, info): """Atomically creates a new version of this user's e2e_room_keys store with the given version info. @@ -276,6 +288,7 @@ def _create_e2e_room_keys_version_txn(txn): "create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn ) + @opentracing.trace def update_e2e_room_keys_version(self, user_id, version, info): """Update a given backup version @@ -292,6 +305,7 @@ def update_e2e_room_keys_version(self, user_id, version, info): desc="update_e2e_room_keys_version", ) + @opentracing.trace def delete_e2e_room_keys_version(self, user_id, version=None): """Delete a given backup version of the user's room keys. Doesn't delete their actual key data. diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 1e07474e706a..7ee973a8bb3c 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -18,12 +18,14 @@ from twisted.internet import defer +import synapse.logging.opentracing as opentracing from synapse.util.caches.descriptors import cached from ._base import SQLBaseStore, db_to_json class EndToEndKeyWorkerStore(SQLBaseStore): + @opentracing.trace @defer.inlineCallbacks def get_e2e_device_keys( self, query_list, include_all_devices=False, include_deleted_devices=False @@ -40,6 +42,7 @@ def get_e2e_device_keys( Dict mapping from user-id to dict mapping from device_id to dict containing "key_json", "device_display_name". """ + opentracing.set_tag("query_list", query_list) if not query_list: return {} @@ -57,9 +60,13 @@ def get_e2e_device_keys( return results + @opentracing.trace def _get_e2e_device_keys_txn( self, txn, query_list, include_all_devices=False, include_deleted_devices=False ): + opentracing.set_tag("include_all_devices", include_all_devices) + opentracing.set_tag("include_deleted_devices", include_deleted_devices) + query_clauses = [] query_params = [] @@ -104,6 +111,7 @@ def _get_e2e_device_keys_txn( for user_id, device_id in deleted_devices: result.setdefault(user_id, {})[device_id] = None + opentracing.log_kv(result) return result @defer.inlineCallbacks @@ -129,8 +137,11 @@ def get_e2e_one_time_keys(self, user_id, device_id, key_ids): keyvalues={"user_id": user_id, "device_id": device_id}, desc="add_e2e_one_time_keys_check", ) - - return {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows} + result = {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows} + opentracing.log_kv( + {"message": "Fetched one time keys for user", "one_time_keys": result} + ) + return result @defer.inlineCallbacks def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys): @@ -146,6 +157,9 @@ def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys): """ def _add_e2e_one_time_keys(txn): + opentracing.set_tag("user_id", user_id) + opentracing.set_tag("device_id", device_id) + opentracing.set_tag("new_keys", new_keys) # We are protected from race between lookup and insertion due to # a unique constraint. If there is a race of two calls to # `add_e2e_one_time_keys` then they'll conflict and we will only @@ -202,6 +216,11 @@ def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys): """ def _set_e2e_device_keys_txn(txn): + opentracing.set_tag("user_id", user_id) + opentracing.set_tag("device_id", device_id) + opentracing.set_tag("time_now", time_now) + opentracing.set_tag("device_keys", device_keys) + old_key_json = self._simple_select_one_onecol_txn( txn, table="e2e_device_keys_json", @@ -215,6 +234,7 @@ def _set_e2e_device_keys_txn(txn): new_key_json = encode_canonical_json(device_keys).decode("utf-8") if old_key_json == new_key_json: + opentracing.log_kv({"Message": "Device key already stored."}) return False self._simple_upsert_txn( @@ -223,7 +243,7 @@ def _set_e2e_device_keys_txn(txn): keyvalues={"user_id": user_id, "device_id": device_id}, values={"ts_added_ms": time_now, "key_json": new_key_json}, ) - + opentracing.log_kv({"message": "Device keys stored."}) return True return self.runInteraction("set_e2e_device_keys", _set_e2e_device_keys_txn) @@ -231,6 +251,7 @@ def _set_e2e_device_keys_txn(txn): def claim_e2e_one_time_keys(self, query_list): """Take a list of one time keys out of the database""" + @opentracing.trace def _claim_e2e_one_time_keys(txn): sql = ( "SELECT key_id, key_json FROM e2e_one_time_keys_json" @@ -252,7 +273,15 @@ def _claim_e2e_one_time_keys(txn): " AND key_id = ?" ) for user_id, device_id, algorithm, key_id in delete: + opentracing.log_kv( + { + "message": "Executing claim e2e_one_time_keys transaction on database." + } + ) txn.execute(sql, (user_id, device_id, algorithm, key_id)) + opentracing.log_kv( + {"message": "finished executing and invalidating cache"} + ) self._invalidate_cache_and_stream( txn, self.count_e2e_one_time_keys, (user_id, device_id) ) @@ -262,6 +291,13 @@ def _claim_e2e_one_time_keys(txn): def delete_e2e_keys_by_device(self, user_id, device_id): def delete_e2e_keys_by_device_txn(txn): + opentracing.log_kv( + { + "message": "Deleting keys for device", + "device_id": device_id, + "user_id": user_id, + } + ) self._simple_delete_txn( txn, table="e2e_device_keys_json", diff --git a/synapse/storage/schema/delta/55/add_spans_to_device_lists.sql b/synapse/storage/schema/delta/55/add_spans_to_device_lists.sql new file mode 100644 index 000000000000..bbd5d58486e5 --- /dev/null +++ b/synapse/storage/schema/delta/55/add_spans_to_device_lists.sql @@ -0,0 +1,23 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Opentracing needs to inject a span_context into data item which leaves the + * current execution context. Since device list updates are dumped in here + * and then processed later they need to include the span context for opentraing. + * Since we may also decide later to include other tracking information the column + * has just been called "context", the structure of the data within it may change. + */ +ALTER TABLE device_lists_outbound_pokes ADD context TEXT; \ No newline at end of file