Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Start fewer opentracing spans #8640

Merged
merged 7 commits into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8640.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce number of OpenTracing spans started.
50 changes: 43 additions & 7 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.

import logging
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Union

from prometheus_client import Counter

Expand All @@ -30,7 +30,10 @@
event_processing_loop_counter,
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
from synapse.types import Collection, JsonDict, RoomStreamToken, UserID
from synapse.util.metrics import Measure

Expand All @@ -53,7 +56,7 @@ def __init__(self, hs):
self.current_max = 0
self.is_processing = False

async def notify_interested_services(self, max_token: RoomStreamToken):
def notify_interested_services(self, max_token: RoomStreamToken):
"""Notifies (pushes) all application services interested in this event.

Pushing is done asynchronously, so this method won't block for any
Expand All @@ -72,6 +75,12 @@ async def notify_interested_services(self, max_token: RoomStreamToken):
if self.is_processing:
return

# We only start a new background process if necessary rather than
# optimistically (to cut down on overhead).
self._notify_interested_services(max_token)

@wrap_as_background_process("notify_interested_services")
async def _notify_interested_services(self, max_token: RoomStreamToken):
with Measure(self.clock, "notify_interested_services"):
self.is_processing = True
try:
Expand Down Expand Up @@ -166,8 +175,11 @@ async def handle_room_events(events):
finally:
self.is_processing = False

async def notify_interested_services_ephemeral(
self, stream_key: str, new_token: Optional[int], users: Collection[UserID] = [],
def notify_interested_services_ephemeral(
self,
stream_key: str,
new_token: Optional[int],
users: Collection[Union[str, UserID]] = [],
):
"""This is called by the notifier in the background
when a ephemeral event handled by the homeserver.
Expand All @@ -183,13 +195,34 @@ async def notify_interested_services_ephemeral(
new_token: The latest stream token
users: The user(s) involved with the event.
"""
if not self.notify_appservices:
return

if stream_key not in ("typing_key", "receipt_key", "presence_key"):
return

services = [
service
for service in self.store.get_app_services()
if service.supports_ephemeral
]
if not services or not self.notify_appservices:
if not services:
return

# We only start a new background process if necessary rather than
# optimistically (to cut down on overhead).
self._notify_interested_services_ephemeral(
services, stream_key, new_token, users
)

@wrap_as_background_process("notify_interested_services_ephemeral")
async def _notify_interested_services_ephemeral(
self,
services: List[ApplicationService],
stream_key: str,
new_token: Optional[int],
users: Collection[Union[str, UserID]],
):
logger.info("Checking interested services for %s" % (stream_key))
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
Expand Down Expand Up @@ -237,14 +270,17 @@ async def _handle_receipts(self, service: ApplicationService):
return receipts

async def _handle_presence(
self, service: ApplicationService, users: Collection[UserID]
self, service: ApplicationService, users: Collection[Union[str, UserID]]
):
events = [] # type: List[JsonDict]
presence_source = self.event_sources.sources["presence"]
from_key = await self.store.get_type_stream_id_for_appservice(
service, "presence"
)
for user in users:
if isinstance(user, str):
user = UserID.from_string(user)
Comment on lines +281 to +282
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this just a separate bug?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the commit message:

In practice this wasn't an issue as we always pass UserID objects in that particular code path.


interested = await service.is_interested_in_presence(user, self.store)
if not interested:
continue
Expand Down
10 changes: 5 additions & 5 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def ensure_active_span_inner_2(*args, **kwargs):


@contextlib.contextmanager
def _noop_context_manager(*args, **kwargs):
def noop_context_manager(*args, **kwargs):
"""Does exactly what it says on the tin"""
yield

Expand Down Expand Up @@ -413,7 +413,7 @@ def start_active_span(
"""

if opentracing is None:
return _noop_context_manager()
return noop_context_manager()

return opentracing.tracer.start_active_span(
operation_name,
Expand All @@ -428,7 +428,7 @@ def start_active_span(

def start_active_span_follows_from(operation_name, contexts):
if opentracing is None:
return _noop_context_manager()
return noop_context_manager()

references = [opentracing.follows_from(context) for context in contexts]
scope = start_active_span(operation_name, references=references)
Expand Down Expand Up @@ -459,7 +459,7 @@ def start_active_span_from_request(
# Also, twisted uses byte arrays while opentracing expects strings.

if opentracing is None:
return _noop_context_manager()
return noop_context_manager()

header_dict = {
k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
Expand Down Expand Up @@ -497,7 +497,7 @@ def start_active_span_from_edu(
"""

if opentracing is None:
return _noop_context_manager()
return noop_context_manager()

carrier = json_decoder.decode(edu_content.get("context", "{}")).get(
"opentracing", {}
Expand Down
12 changes: 9 additions & 3 deletions synapse/metrics/background_process_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from twisted.internet import defer

from synapse.logging.context import LoggingContext, PreserveLoggingContext
from synapse.logging.opentracing import start_active_span
from synapse.logging.opentracing import noop_context_manager, start_active_span

if TYPE_CHECKING:
import resource
Expand Down Expand Up @@ -167,7 +167,7 @@ def update_metrics(self):
)


def run_as_background_process(desc: str, func, *args, **kwargs):
def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwargs):
"""Run the given function in its own logcontext, with resource metrics

This should be used to wrap processes which are fired off to run in the
Expand All @@ -181,6 +181,9 @@ def run_as_background_process(desc: str, func, *args, **kwargs):
Args:
desc: a description for this background process type
func: a function, which may return a Deferred or a coroutine
bg_start_span: Whether to start an opentracing span. Defaults to True.
Should only be disabled for processes that will not log to or tag
a span.
args: positional args for func
kwargs: keyword args for func

Expand All @@ -199,7 +202,10 @@ async def run():
with BackgroundProcessLoggingContext(desc) as context:
context.request = "%s-%i" % (desc, count)
try:
with start_active_span(desc, tags={"request_id": context.request}):
ctx = noop_context_manager()
if bg_start_span:
ctx = start_active_span(desc, tags={"request_id": context.request})
with ctx:
result = func(*args, **kwargs)

if inspect.isawaitable(result):
Expand Down
34 changes: 11 additions & 23 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.streams.config import PaginationConfig
from synapse.types import (
Collection,
Expand Down Expand Up @@ -310,44 +309,37 @@ def _on_updated_room_token(self, max_room_stream_token: RoomStreamToken):
"""

# poke any interested application service.
run_as_background_process(
"_notify_app_services", self._notify_app_services, max_room_stream_token
)

run_as_background_process(
"_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_token
)
self._notify_app_services(max_room_stream_token)
self._notify_pusher_pool(max_room_stream_token)

if self.federation_sender:
self.federation_sender.notify_new_events(max_room_stream_token)

async def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
try:
await self.appservice_handler.notify_interested_services(
max_room_stream_token
)
self.appservice_handler.notify_interested_services(max_room_stream_token)
except Exception:
logger.exception("Error notifying application services of event")

async def _notify_app_services_ephemeral(
def _notify_app_services_ephemeral(
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
users: Collection[UserID] = [],
users: Collection[Union[str, UserID]] = [],
):
try:
stream_token = None
if isinstance(new_token, int):
stream_token = new_token
await self.appservice_handler.notify_interested_services_ephemeral(
self.appservice_handler.notify_interested_services_ephemeral(
stream_key, stream_token, users
)
except Exception:
logger.exception("Error notifying application services of event")

async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
try:
await self._pusher_pool.on_new_notifications(max_room_stream_token)
self._pusher_pool.on_new_notifications(max_room_stream_token)
except Exception:
logger.exception("Error pusher pool of event")

Expand Down Expand Up @@ -384,12 +376,8 @@ def on_new_event(
self.notify_replication()

# Notify appservices
run_as_background_process(
"_notify_app_services_ephemeral",
self._notify_app_services_ephemeral,
stream_key,
new_token,
users,
self._notify_app_services_ephemeral(
stream_key, new_token, users,
)

def on_new_replication_data(self) -> None:
Expand Down
18 changes: 16 additions & 2 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

from prometheus_client import Gauge

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
from synapse.push import PusherConfigException
from synapse.push.emailpusher import EmailPusher
from synapse.push.httppusher import HttpPusher
Expand Down Expand Up @@ -187,7 +190,7 @@ async def remove_pushers_by_access_token(self, user_id, access_tokens):
)
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])

async def on_new_notifications(self, max_token: RoomStreamToken):
def on_new_notifications(self, max_token: RoomStreamToken):
if not self.pushers:
# nothing to do here.
return
Expand All @@ -201,6 +204,17 @@ async def on_new_notifications(self, max_token: RoomStreamToken):
# Nothing to do
return

# We only start a new background process if necessary rather than
# optimistically (to cut down on overhead).
self._on_new_notifications(max_token)

@wrap_as_background_process("on_new_notifications")
async def _on_new_notifications(self, max_token: RoomStreamToken):
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
max_stream_id = max_token.stream

prev_stream_id = self._last_room_stream_id_seen
self._last_room_stream_id_seen = max_stream_id

Expand Down
4 changes: 3 additions & 1 deletion synapse/replication/tcp/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ def send_command(self, cmd: Command):
Args:
cmd (Command)
"""
run_as_background_process("send-cmd", self._async_send_command, cmd)
run_as_background_process(
"send-cmd", self._async_send_command, cmd, bg_start_span=False
)

async def _async_send_command(self, cmd: Command):
"""Encode a replication command and send it over our outbound connection"""
Expand Down
20 changes: 8 additions & 12 deletions tests/handlers/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ def setUp(self):
hs.get_clock.return_value = MockClock()
self.handler = ApplicationServicesHandler(hs)

@defer.inlineCallbacks
def test_notify_interested_services(self):
interested_service = self._mkservice(is_interested=True)
services = [
Expand All @@ -62,14 +61,12 @@ def test_notify_interested_services(self):
defer.succeed((0, [event])),
defer.succeed((0, [])),
]
yield defer.ensureDeferred(
self.handler.notify_interested_services(RoomStreamToken(None, 0))
)
self.handler.notify_interested_services(RoomStreamToken(None, 0))

self.mock_scheduler.submit_event_for_as.assert_called_once_with(
interested_service, event
)

@defer.inlineCallbacks
def test_query_user_exists_unknown_user(self):
user_id = "@someone:anywhere"
services = [self._mkservice(is_interested=True)]
Expand All @@ -83,12 +80,11 @@ def test_query_user_exists_unknown_user(self):
defer.succeed((0, [event])),
defer.succeed((0, [])),
]
yield defer.ensureDeferred(
self.handler.notify_interested_services(RoomStreamToken(None, 0))
)

self.handler.notify_interested_services(RoomStreamToken(None, 0))

self.mock_as_api.query_user.assert_called_once_with(services[0], user_id)

@defer.inlineCallbacks
def test_query_user_exists_known_user(self):
user_id = "@someone:anywhere"
services = [self._mkservice(is_interested=True)]
Expand All @@ -102,9 +98,9 @@ def test_query_user_exists_known_user(self):
defer.succeed((0, [event])),
defer.succeed((0, [])),
]
yield defer.ensureDeferred(
self.handler.notify_interested_services(RoomStreamToken(None, 0))
)

self.handler.notify_interested_services(RoomStreamToken(None, 0))

self.assertFalse(
self.mock_as_api.query_user.called,
"query_user called when it shouldn't have been.",
Expand Down