Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix typing replication not being handled on master #7959

Merged
merged 4 commits into from
Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7959.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix typing notifications on master process when using another worker for typing stream.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
7 changes: 0 additions & 7 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@
ReceiptsStream,
TagAccountDataStream,
ToDeviceStream,
TypingStream,
)
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.client.v1 import events
Expand Down Expand Up @@ -644,7 +643,6 @@ def __init__(self, hs):
super(GenericWorkerReplicationHandler, self).__init__(hs)

self.store = hs.get_datastore()
self.typing_handler = hs.get_typing_handler()
self.presence_handler = hs.get_presence_handler() # type: GenericWorkerPresence
self.notifier = hs.get_notifier()

Expand Down Expand Up @@ -681,11 +679,6 @@ async def _process_and_notify(self, stream_name, instance_name, token, rows):
await self.pusher_pool.on_new_receipts(
token, token, {row.room_id for row in rows}
)
elif stream_name == TypingStream.NAME:
self.typing_handler.process_replication_rows(token, rows)
self.notifier.on_new_event(
"typing_key", token, rooms=[row.room_id for row in rows]
)
elif stream_name == ToDeviceStream.NAME:
entities = [row.entity for row in rows if row.entity.startswith("@")]
if entities:
Expand Down
8 changes: 8 additions & 0 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from synapse.api.constants import EventTypes
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
from synapse.replication.tcp.streams import TypingStream
from synapse.replication.tcp.streams.events import (
EventsStream,
EventsStreamEventRow,
Expand Down Expand Up @@ -104,6 +105,7 @@ def __init__(self, hs: "HomeServer"):
self._clock = hs.get_clock()
self._streams = hs.get_replication_streams()
self._instance_name = hs.get_instance_name()
self._typing_handler = hs.get_typing_handler()

# Map from stream to list of deferreds waiting for the stream to
# arrive at a particular position. The lists are sorted by stream position.
Expand All @@ -127,6 +129,12 @@ async def on_rdata(
"""
self.store.process_replication_rows(stream_name, instance_name, token, rows)

if stream_name == TypingStream.NAME:
self._typing_handler.process_replication_rows(token, rows)
self.notifier.on_new_event(
"typing_key", token, rooms=[row.room_id for row in rows]
)

if stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows.
Expand Down
3 changes: 3 additions & 0 deletions synapse/server.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import synapse.server_notices.server_notices_sender
import synapse.state
import synapse.storage
from synapse.events.builder import EventBuilderFactory
from synapse.handlers.typing import FollowerTypingHandler
from synapse.replication.tcp.streams import Stream

class HomeServer(object):
Expand Down Expand Up @@ -150,3 +151,5 @@ class HomeServer(object):
pass
def should_send_federation(self) -> bool:
pass
def get_typing_handler(self) -> FollowerTypingHandler:
pass