Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add instance name to RDATA/POSITION commands #7364

Merged
merged 8 commits into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7364.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add an `instance_name` to `RDATA` and `POSITION` replication commands.
41 changes: 24 additions & 17 deletions docs/tcp_replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@ example flow would be (where '>' indicates master to worker and

> SERVER example.com
< REPLICATE
> POSITION events 53
> RDATA events 54 ["$foo1:bar.com", ...]
> RDATA events 55 ["$foo4:bar.com", ...]
> POSITION events master 53
> RDATA events master 54 ["$foo1:bar.com", ...]
> RDATA events master 55 ["$foo4:bar.com", ...]

The example shows the server accepting a new connection and sending its identity
with the `SERVER` command, followed by the client server to respond with the
position of all streams. The server then periodically sends `RDATA` commands
which have the format `RDATA <stream_name> <token> <row>`, where the format of
`<row>` is defined by the individual streams.
which have the format `RDATA <stream_name> <instance_name> <token> <row>`, where
the format of `<row>` is defined by the individual streams. The
`<instance_name>` is the name of the Synapse process that generated the data
(e.g. usually "master").
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(e.g. usually "master").
(usually "master").


Error reporting happens by either the client or server sending an ERROR
command, and usually the connection will be closed.
Expand Down Expand Up @@ -52,7 +54,7 @@ The basic structure of the protocol is line based, where the initial
word of each line specifies the command. The rest of the line is parsed
based on the command. For example, the RDATA command is defined as:

RDATA <stream_name> <token> <row_json>
RDATA <stream_name> <instance_name> <token> <row_json>

(Note that <row_json> may contains spaces, but cannot contain
newlines.)
Expand Down Expand Up @@ -136,11 +138,11 @@ the wire:
< NAME synapse.app.appservice
< PING 1490197665618
< REPLICATE
> POSITION events 1
> POSITION backfill 1
> POSITION caches 1
> RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
> RDATA events 14 ["$149019767112vOHxz:localhost:8823",
> POSITION events master 1
> POSITION backfill master 1
> POSITION caches master 1
> RDATA caches master 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
> RDATA events master 14 ["$149019767112vOHxz:localhost:8823",
"!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
< PING 1490197675618
> ERROR server stopping
Expand All @@ -151,10 +153,10 @@ position without needing to send data with the `RDATA` command.

An example of a batched set of `RDATA` is:

> RDATA caches batch ["get_user_by_id",["@test:localhost:8823"],1490197670513]
> RDATA caches batch ["get_user_by_id",["@test2:localhost:8823"],1490197670513]
> RDATA caches batch ["get_user_by_id",["@test3:localhost:8823"],1490197670513]
> RDATA caches 54 ["get_user_by_id",["@test4:localhost:8823"],1490197670513]
> RDATA caches master batch ["get_user_by_id",["@test:localhost:8823"],1490197670513]
> RDATA caches master batch ["get_user_by_id",["@test2:localhost:8823"],1490197670513]
> RDATA caches master batch ["get_user_by_id",["@test3:localhost:8823"],1490197670513]
> RDATA caches master 54 ["get_user_by_id",["@test4:localhost:8823"],1490197670513]

In this case the client shouldn't advance their caches token until it
sees the the last `RDATA`.
Expand All @@ -178,6 +180,11 @@ client (C):
updates, and if so then fetch them out of band. Sent in response to a
REPLICATE command (but can happen at any time).

The POSITION command includes the source of the stream. Currently all streams
are written by a single process (usually "master"). If fetching missing
updates via HTTP API, rather than via the DB, then processes should make the
request to the appropriate process.

#### ERROR (S, C)

There was an error
Expand Down Expand Up @@ -234,12 +241,12 @@ Each individual cache invalidation results in a row being sent down
replication, which includes the cache name (the name of the function)
and they key to invalidate. For example:

> RDATA caches 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251]
> RDATA caches master 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251]

Alternatively, an entire cache can be invalidated by sending down a `null`
instead of the key. For example:

> RDATA caches 550953772 ["get_user_by_id", null, 1550574873252]
> RDATA caches master 550953772 ["get_user_by_id", null, 1550574873252]

However, there are times when a number of caches need to be invalidated
at the same time with the same key. To reduce traffic we batch those
Expand Down
37 changes: 26 additions & 11 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class RdataCommand(Command):

Format::

RDATA <stream_name> <token> <row_json>
RDATA <stream_name> <instance_name> <token> <row_json>

The `<token>` may either be a numeric stream id OR "batch". The latter case
is used to support sending multiple updates with the same stream ID. This
Expand All @@ -105,33 +105,40 @@ class RdataCommand(Command):
The client should batch all incoming RDATA with a token of "batch" (per
stream_name) until it sees an RDATA with a numeric stream ID.

The `<instance_name>` is the source of the new data (usually "master").

`<token>` of "batch" maps to the instance variable `token` being None.

An example of a batched series of RDATA::

RDATA presence batch ["@foo:example.com", "online", ...]
RDATA presence batch ["@bar:example.com", "online", ...]
RDATA presence 59 ["@baz:example.com", "online", ...]
RDATA presence master batch ["@foo:example.com", "online", ...]
RDATA presence master batch ["@bar:example.com", "online", ...]
RDATA presence master 59 ["@baz:example.com", "online", ...]
"""

NAME = "RDATA"

def __init__(self, stream_name, token, row):
def __init__(self, stream_name, instance_name, token, row):
self.stream_name = stream_name
self.instance_name = instance_name
self.token = token
self.row = row

@classmethod
def from_line(cls, line):
stream_name, token, row_json = line.split(" ", 2)
stream_name, instance_name, token, row_json = line.split(" ", 3)
return cls(
stream_name, None if token == "batch" else int(token), json.loads(row_json)
stream_name,
instance_name,
None if token == "batch" else int(token),
json.loads(row_json),
)

def to_line(self):
return " ".join(
(
self.stream_name,
self.instance_name,
str(self.token) if self.token is not None else "batch",
_json_encoder.encode(self.row),
)
Expand All @@ -145,23 +152,31 @@ class PositionCommand(Command):
"""Sent by the server to tell the client the stream postition without
needing to send an RDATA.

Format::

POSITION <stream_name> <instance_name> <token>

On receipt of a POSITION command clients should check if they have missed
any updates, and if so then fetch them out of band.

The `<instance_name>` is the process that sent the command and is the source
of the stream.
"""

NAME = "POSITION"

def __init__(self, stream_name, token):
def __init__(self, stream_name, instance_name, token):
self.stream_name = stream_name
self.instance_name = instance_name
self.token = token

@classmethod
def from_line(cls, line):
stream_name, token = line.split(" ", 1)
return cls(stream_name, int(token))
stream_name, instance_name, token = line.split(" ", 2)
return cls(stream_name, instance_name, int(token))

def to_line(self):
return " ".join((self.stream_name, str(self.token)))
return " ".join((self.stream_name, self.instance_name, str(self.token)))


class ErrorCommand(_SimpleCommand):
Expand Down
15 changes: 13 additions & 2 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def __init__(self, hs):
self._notifier = hs.get_notifier()
self._clock = hs.get_clock()
self._instance_id = hs.get_instance_id()
self._instance_name = hs.get_instance_name()

# Set of streams that we've caught up with.
self._streams_connected = set() # type: Set[str]
Expand Down Expand Up @@ -169,7 +170,9 @@ async def on_REPLICATE(self, cmd: ReplicateCommand):

for stream_name, stream in self._streams.items():
current_token = stream.current_token()
self.send_command(PositionCommand(stream_name, current_token))
self.send_command(
PositionCommand(stream_name, self._instance_name, current_token)
)

async def on_USER_SYNC(self, cmd: UserSyncCommand):
user_sync_counter.inc()
Expand Down Expand Up @@ -226,6 +229,10 @@ async def on_USER_IP(self, cmd: UserIpCommand):
await self._server_notices_sender.on_user_ip(cmd.user_id)

async def on_RDATA(self, cmd: RdataCommand):
if cmd.instance_name == self._instance_name:
# Ignore RDATA that are just our own echoes
return

stream_name = cmd.stream_name
inbound_rdata_count.labels(stream_name).inc()

Expand Down Expand Up @@ -277,6 +284,10 @@ async def on_rdata(self, stream_name: str, token: int, rows: list):
await self._replication_data_handler.on_rdata(stream_name, token, rows)

async def on_POSITION(self, cmd: PositionCommand):
if cmd.instance_name == self._instance_name:
# Ignore RDATA that are just our own echoes
return

stream = self._streams.get(cmd.stream_name)
if not stream:
logger.error("Got POSITION for unknown stream: %s", cmd.stream_name)
Expand Down Expand Up @@ -448,7 +459,7 @@ def stream_update(self, stream_name: str, token: str, data: Any):

We need to check if the client is interested in the stream or not
"""
self.send_command(RdataCommand(stream_name, token, data))
self.send_command(RdataCommand(stream_name, self._instance_name, token, data))


UpdateToken = TypeVar("UpdateToken")
Expand Down
9 changes: 9 additions & 0 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ def __init__(self, hostname: str, config: HomeServerConfig, reactor=None, **kwar
self.start_time = None

self.instance_id = random_string(5)
self.instance_name = config.worker_name or "master"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should probably be underscored if it's intended that we use get_instance_name to access it.


self.clock = Clock(reactor)
self.distributor = Distributor()
Expand All @@ -256,6 +257,14 @@ def get_instance_id(self):
"""
return self.instance_id

def get_instance_name(self) -> str:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'd be nice to actually use this in the various places that currently use config.worker_name (I found a few dotted around), rather than having two ways of doing the same thing.

"""A unique name for this synapse process.

Used to identify the process over replication and in config. Does not
change over restarts.
"""
return self.instance_name

def setup(self):
logger.info("Setting up.")
self.start_time = int(self.get_clock().time())
Expand Down
2 changes: 2 additions & 0 deletions synapse/server.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,5 @@ class HomeServer(object):
pass
def get_instance_id(self) -> str:
pass
def get_instance_name(self) -> str:
pass
1 change: 1 addition & 0 deletions tests/replication/slave/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def prepare(self, reactor, clock, hs):
# We now do some gut wrenching so that we have a client that is based
# off of the slave store rather than the main store.
self.replication_handler = ReplicationCommandHandler(self.hs)
self.replication_handler._instance_name = "worker"
self.replication_handler._replication_data_handler = ReplicationDataHandler(
self.slaved_store
)
Expand Down
6 changes: 4 additions & 2 deletions tests/replication/tcp/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@ def test_parse_one_word_command(self):
self.assertIsInstance(cmd, ReplicateCommand)

def test_parse_rdata(self):
line = 'RDATA events 6287863 ["ev", ["$eventid", "!roomid", "type", null, null, null]]'
line = 'RDATA events master 6287863 ["ev", ["$eventid", "!roomid", "type", null, null, null]]'
cmd = parse_command_from_line(line)
self.assertIsInstance(cmd, RdataCommand)
self.assertEqual(cmd.stream_name, "events")
self.assertEqual(cmd.instance_name, "master")
self.assertEqual(cmd.token, 6287863)

def test_parse_rdata_batch(self):
line = 'RDATA presence batch ["@foo:example.com", "online"]'
line = 'RDATA presence master batch ["@foo:example.com", "online"]'
cmd = parse_command_from_line(line)
self.assertIsInstance(cmd, RdataCommand)
self.assertEqual(cmd.stream_name, "presence")
self.assertEqual(cmd.instance_name, "master")
self.assertIsNone(cmd.token)