From 7cf8b2ed7ca1964aabc85f24262cc26e1316f8e4 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 18 Mar 2022 18:00:47 +0000 Subject: [PATCH 01/25] Upsert many client IPs at once, more efficiently --- synapse/storage/databases/main/client_ips.py | 31 +++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index 8480ea4e1c3c..a333785f98a4 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -634,20 +634,29 @@ def _update_client_ips_batch_txn( ): self.database_engine.lock_table(txn, "user_ips") + # Keys and values for the `user_ips` upsert. + key_columns = "user_id", "access_token", "ip" + keys = [] + value_columns = "user_agent", "device_id", "last_seen" + values = [] + for entry in to_update.items(): (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry + keys.append((user_id, access_token, ip)) + values.append((user_agent, device_id, last_seen)) - self.db_pool.simple_upsert_txn( - txn, - table="user_ips", - keyvalues={"user_id": user_id, "access_token": access_token, "ip": ip}, - values={ - "user_agent": user_agent, - "device_id": device_id, - "last_seen": last_seen, - }, - lock=False, - ) + self.db_pool.simple_upsert_many_txn( + txn, + table="user_ips", + key_names=key_columns, + key_values=keys, + value_names=value_columns, + value_values=values, + # TODO lock=False + ) + + for entry in to_update.items(): + (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry # Technically an access token might not be associated with # a device so we need to check. From c7cfbf5522b18a17366b5d81c79bfc48917921b5 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 18 Mar 2022 18:02:48 +0000 Subject: [PATCH 02/25] Newsfile Signed-off-by: Olivier Wilkinson (reivilibre) --- changelog.d/12252.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/12252.feature diff --git a/changelog.d/12252.feature b/changelog.d/12252.feature new file mode 100644 index 000000000000..82b9e82f868b --- /dev/null +++ b/changelog.d/12252.feature @@ -0,0 +1 @@ +Move `update_client_ip` background job from the main process to the background worker. \ No newline at end of file From 45c98e0b43967ae16b914e9b865b1865e3d244ab Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 24 Mar 2022 12:01:00 +0000 Subject: [PATCH 03/25] Add a simple update many txn --- synapse/storage/database.py | 50 ++++++++++++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 12750d9b89d4..016745271839 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -63,7 +63,7 @@ from synapse.server import HomeServer # python 3 does not have a maximum int value -MAX_TXN_ID = 2**63 - 1 +MAX_TXN_ID = 2 ** 63 - 1 logger = logging.getLogger(__name__) @@ -1792,6 +1792,54 @@ def simple_update_txn( return txn.rowcount + @staticmethod + def simple_update_many_txn( + txn: LoggingTransaction, + table: str, + key_names: Collection[str], + key_values: Collection[Iterable[Any]], + value_names: Collection[str], + value_values: Iterable[Iterable[Any]], + ) -> None: + """ + Update, many times, using batching where possible. + + Args: + table: The table to upsert into + key_names: The key column names. + key_values: A list of each row's key column values. + value_names: The names of value columns to update. + value_values: A list of each row's value column values. + """ + + # List of value names, then key names + allnames: List[str] = [] + allnames.extend(value_names) + allnames.extend(key_names) + + # List of tuples of (value values, then key values) + # (This matches the order of `allnames` and the order of the query) + args = [tuple(x) + tuple(y) for x, y in zip(value_values, key_values)] + + for ks, vs in zip(key_values, value_values): + args.append(tuple(vs) + tuple(ks)) + + # 'col1 = ?, col2 = ?, ...' + set_clause = ", ".join(f"{n} = ?" for n in value_names) + + if key_names: + # 'WHERE col3 = ? AND col4 = ? AND col5 = ?' + where_clause = "WHERE " + (" AND ".join(f"{n} = ?" for n in key_names)) + else: + where_clause = "" + + # UPDATE mytable SET col1 = ?, col2 = ? WHERE col3 = ? AND col4 = ? + sql = f""" + UPDATE {table} SET {set_clause} {where_clause} + """ + + txn.execute_batch(sql, args) + async def simple_update_one( self, table: str, From 0646b4b5483abd972ad135ccda9d7c61bd50cf64 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 24 Mar 2022 12:05:00 +0000 Subject: [PATCH 04/25] Add non-txn simple update many --- synapse/storage/database.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 016745271839..0bbaa9e89835 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1792,6 +1792,25 @@ def simple_update_txn( return txn.rowcount + async def simple_update_many( + self, + table: str, + key_names: Collection[str], + key_values: Collection[Iterable[Any]], + value_names: Collection[str], + value_values: Iterable[Iterable[Any]], + desc: str, + ) -> None: + return await self.runInteraction( + desc, + self.simple_update_many_txn, + table, + key_names, + key_values, + value_names, + value_values, + ) + @staticmethod def simple_update_many_txn( txn: LoggingTransaction, From 1acd8cd23636a0904c3b84331f27f61b1f13acf9 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 25 Mar 2022 11:48:40 +0000 Subject: [PATCH 05/25] Use simple_update_many for devices --- synapse/storage/databases/main/client_ips.py | 30 +++++++++++--------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index a333785f98a4..29516910588c 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -655,25 +655,29 @@ def _update_client_ips_batch_txn( # TODO lock=False ) + # Keys and values for the `devices` update. + key_columns = "user_id", "device_id" + keys = [] + value_columns = "user_agent", "last_seen", "ip" + values = [] + for entry in to_update.items(): (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry # Technically an access token might not be associated with # a device so we need to check. if device_id: - # this is always an update rather than an upsert: the row should - # already exist, and if it doesn't, that may be because it has been - # deleted, and we don't want to re-create it. - self.db_pool.simple_update_txn( - txn, - table="devices", - keyvalues={"user_id": user_id, "device_id": device_id}, - updatevalues={ - "user_agent": user_agent, - "last_seen": last_seen, - "ip": ip, - }, - ) + keys.append((user_id, device_id)) + values.append((user_agent, last_seen, ip)) + + self.db_pool.simple_update_many_txn( + txn, + table="devices", + key_names=key_columns, + key_values=keys, + value_names=value_columns, + value_values=values, + ) async def get_last_client_ip_by_device( self, user_id: str, device_id: Optional[str] From 1e961ad370d049e923d3898470a5be385c1eb0bf Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 25 Mar 2022 11:50:19 +0000 Subject: [PATCH 06/25] Isolate locals by splitting into two smaller functions --- synapse/storage/databases/main/client_ips.py | 93 +++++++++++--------- 1 file changed, 50 insertions(+), 43 deletions(-) diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index 29516910588c..85dc5070f758 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -634,50 +634,57 @@ def _update_client_ips_batch_txn( ): self.database_engine.lock_table(txn, "user_ips") - # Keys and values for the `user_ips` upsert. - key_columns = "user_id", "access_token", "ip" - keys = [] - value_columns = "user_agent", "device_id", "last_seen" - values = [] - - for entry in to_update.items(): - (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry - keys.append((user_id, access_token, ip)) - values.append((user_agent, device_id, last_seen)) - - self.db_pool.simple_upsert_many_txn( - txn, - table="user_ips", - key_names=key_columns, - key_values=keys, - value_names=value_columns, - value_values=values, - # TODO lock=False - ) + def update_user_ips() -> None: + # Keys and values for the `user_ips` upsert. + key_columns = "user_id", "access_token", "ip" + keys = [] + value_columns = "user_agent", "device_id", "last_seen" + values = [] + + for entry in to_update.items(): + (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry + keys.append((user_id, access_token, ip)) + values.append((user_agent, device_id, last_seen)) + + self.db_pool.simple_upsert_many_txn( + txn, + table="user_ips", + key_names=key_columns, + key_values=keys, + value_names=value_columns, + value_values=values, + # TODO lock=False + ) - # Keys and values for the `devices` update. - key_columns = "user_id", "device_id" - keys = [] - value_columns = "user_agent", "last_seen", "ip" - values = [] - - for entry in to_update.items(): - (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry - - # Technically an access token might not be associated with - # a device so we need to check. - if device_id: - keys.append((user_id, device_id)) - values.append((user_agent, last_seen, ip)) - - self.db_pool.simple_update_many_txn( - txn, - table="devices", - key_names=key_columns, - key_values=keys, - value_names=value_columns, - value_values=values, - ) + def update_devices() -> None: + # Keys and values for the `devices` update. + key_columns = "user_id", "device_id" + keys = [] + value_columns = "user_agent", "last_seen", "ip" + values = [] + + for entry in to_update.items(): + (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry + + # Technically an access token might not be associated with + # a device so we need to check. + if device_id: + keys.append((user_id, device_id)) + values.append((user_agent, last_seen, ip)) + + self.db_pool.simple_update_many_txn( + txn, + table="devices", + key_names=key_columns, + key_values=keys, + value_names=value_columns, + value_values=values, + ) + + # This update is split into two smaller functions so that we can + # be sure their locals doesn't overlap + update_user_ips() + update_devices() async def get_last_client_ip_by_device( self, user_id: str, device_id: Optional[str] From 503f5c8f25032999783c86c88dfe79f01b6e4392 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Mon, 28 Mar 2022 11:38:53 +0100 Subject: [PATCH 07/25] Make _dump_to_tuple dump the entire table without undue ceremony --- tests/storage/test__base.py | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index 366398e39de6..cc53732ca9e7 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -46,9 +46,13 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: ) ) - def _dump_to_tuple( - self, res: List[Dict[str, Any]] - ) -> Generator[Tuple[int, str, str], None, None]: + def _dump_to_tuple(self) -> Generator[Tuple[int, str, str], None, None]: + res = self.get_success( + self.storage.db_pool.simple_select_list( + self.table_name, None, ["id, username, value"] + ) + ) + for i in res: yield (i["id"], i["username"], i["value"]) @@ -75,13 +79,8 @@ def test_upsert_many(self) -> None: ) # Check results are what we expect - res = self.get_success( - self.storage.db_pool.simple_select_list( - self.table_name, None, ["id, username, value"] - ) - ) self.assertEqual( - set(self._dump_to_tuple(res)), + set(self._dump_to_tuple()), {(1, "user1", "hello"), (2, "user2", "there")}, ) @@ -102,12 +101,7 @@ def test_upsert_many(self) -> None: ) # Check results are what we expect - res = self.get_success( - self.storage.db_pool.simple_select_list( - self.table_name, None, ["id, username, value"] - ) - ) self.assertEqual( - set(self._dump_to_tuple(res)), + set(self._dump_to_tuple()), {(1, "user1", "hello"), (2, "user2", "bleb")}, ) From 5f2909968456a4d07fb399b9cdabc1c076841daf Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Mon, 28 Mar 2022 13:42:07 +0100 Subject: [PATCH 08/25] Rename some things to clarify --- tests/storage/test__base.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index cc53732ca9e7..db815ae85a90 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -24,7 +24,7 @@ from tests import unittest -class UpsertManyTests(unittest.HomeserverTestCase): +class UpdateUpsertManyTests(unittest.HomeserverTestCase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.storage = hs.get_datastores().main @@ -46,7 +46,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: ) ) - def _dump_to_tuple(self) -> Generator[Tuple[int, str, str], None, None]: + def _dump_table_to_tuple(self) -> Generator[Tuple[int, str, str], None, None]: res = self.get_success( self.storage.db_pool.simple_select_list( self.table_name, None, ["id, username, value"] @@ -80,7 +80,7 @@ def test_upsert_many(self) -> None: # Check results are what we expect self.assertEqual( - set(self._dump_to_tuple()), + set(self._dump_table_to_tuple()), {(1, "user1", "hello"), (2, "user2", "there")}, ) @@ -102,6 +102,6 @@ def test_upsert_many(self) -> None: # Check results are what we expect self.assertEqual( - set(self._dump_to_tuple()), + set(self._dump_table_to_tuple()), {(1, "user1", "hello"), (2, "user2", "bleb")}, ) From e7c4907a28c38f94e708a951fc42626d281fbad6 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Mon, 28 Mar 2022 13:51:04 +0100 Subject: [PATCH 09/25] Add a test for simple_update_many --- tests/storage/test__base.py | 47 +++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index db815ae85a90..62e42dc50e9d 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -105,3 +105,50 @@ def test_upsert_many(self) -> None: set(self._dump_table_to_tuple()), {(1, "user1", "hello"), (2, "user2", "bleb")}, ) + + def test_simple_update_many(self): + """ + simple_update_many performs many updates at once. + """ + # First add some data. + self.get_success( + self.storage.db_pool.simple_insert_many( + table=self.table_name, + keys=("id", "username", "value"), + values=[(1, "alice", "A"), (2, "bob", "B"), (3, "charlie", "C")], + desc="insert", + ) + ) + + # Check the data made it to the table + self.assertEqual( + set(self._dump_table_to_tuple()), + {(1, "alice", "A"), (2, "bob", "B"), (3, "charlie", "C")}, + ) + + # Now use simple_update_many + self.get_success( + self.storage.db_pool.simple_update_many( + table=self.table_name, + key_names=("username",), + key_values=( + ("alice",), + ("bob",), + ("stranger",), + ), + value_names=("value",), + value_values=( + ("aaa!",), + ("bbb!",), + ("???",), + ), + desc="update_many1", + ) + ) + + # Check the table is how we expect: + # charlie has been left alone + self.assertEqual( + set(self._dump_table_to_tuple()), + {(1, "alice", "aaa!"), (2, "bob", "bbb!"), (3, "charlie", "C")}, + ) From 5675a942f726d7bc3a1dfcc1f64cd2642fdd5de4 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Mon, 28 Mar 2022 13:51:46 +0100 Subject: [PATCH 10/25] Add an exception for when the number of value rows and key rows don't match --- synapse/storage/database.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 0bbaa9e89835..90721c117f0f 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1818,7 +1818,7 @@ def simple_update_many_txn( key_names: Collection[str], key_values: Collection[Iterable[Any]], value_names: Collection[str], - value_values: Iterable[Iterable[Any]], + value_values: Collection[Iterable[Any]], ) -> None: """ Update, many times, using batching where possible. @@ -1831,6 +1831,11 @@ def simple_update_many_txn( value_values: A list of each row's value column values. """ + if len(value_values) != len(key_values): + raise ValueError( + f"{len(key_values)} key rows and {len(value_values)} value rows: should be the same number." + ) + # List of value names, then key names allnames: List[str] = [] allnames.extend(value_names) From e7985d2b0bcad52b3ef9dc40c03f72ae300bcfc1 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 6 Apr 2022 10:18:42 +0100 Subject: [PATCH 11/25] Antilint --- synapse/storage/database.py | 2 +- tests/storage/test__base.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 90721c117f0f..4efc308a470d 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -63,7 +63,7 @@ from synapse.server import HomeServer # python 3 does not have a maximum int value -MAX_TXN_ID = 2 ** 63 - 1 +MAX_TXN_ID = 2**63 - 1 logger = logging.getLogger(__name__) diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index 62e42dc50e9d..09cb06d614a1 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -14,7 +14,7 @@ # limitations under the License. import secrets -from typing import Any, Dict, Generator, List, Tuple +from typing import Generator, Tuple from twisted.test.proto_helpers import MemoryReactor From 7edf6f774e091708584e50f2cefe75e311194754 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 7 Apr 2022 14:42:46 +0100 Subject: [PATCH 12/25] Add lock=True for the emulated many-upsert case --- synapse/storage/database.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 4efc308a470d..a3bd3e3de784 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1268,6 +1268,7 @@ async def simple_upsert_many( value_names: Collection[str], value_values: Collection[Collection[Any]], desc: str, + lock: bool = True, ) -> None: """ Upsert, many times. @@ -1279,6 +1280,7 @@ async def simple_upsert_many( value_names: The value column names value_values: A list of each row's value column values. Ignored if value_names is empty. + lock: True to lock the table when doing the upsert. """ # We can autocommit if we are going to use native upserts @@ -1294,6 +1296,7 @@ async def simple_upsert_many( key_values, value_names, value_values, + lock=lock, db_autocommit=autocommit, ) @@ -1305,6 +1308,7 @@ def simple_upsert_many_txn( key_values: Collection[Iterable[Any]], value_names: Collection[str], value_values: Iterable[Iterable[Any]], + lock: bool = True, ) -> None: """ Upsert, many times. @@ -1316,6 +1320,7 @@ def simple_upsert_many_txn( value_names: The value column names value_values: A list of each row's value column values. Ignored if value_names is empty. + lock: True to lock the table when doing the upsert. """ if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables: return self.simple_upsert_many_txn_native_upsert( @@ -1323,7 +1328,7 @@ def simple_upsert_many_txn( ) else: return self.simple_upsert_many_txn_emulated( - txn, table, key_names, key_values, value_names, value_values + txn, table, key_names, key_values, value_names, value_values, lock=lock ) def simple_upsert_many_txn_emulated( @@ -1334,6 +1339,7 @@ def simple_upsert_many_txn_emulated( key_values: Collection[Iterable[Any]], value_names: Collection[str], value_values: Iterable[Iterable[Any]], + lock: bool = True, ) -> None: """ Upsert, many times, but without native UPSERT support or batching. @@ -1345,6 +1351,7 @@ def simple_upsert_many_txn_emulated( value_names: The value column names value_values: A list of each row's value column values. Ignored if value_names is empty. + lock: True to lock the table when doing the upsert. """ # No value columns, therefore make a blank list so that the following # zip() works correctly. @@ -1355,7 +1362,7 @@ def simple_upsert_many_txn_emulated( _keys = {x: y for x, y in zip(key_names, keyv)} _vals = {x: y for x, y in zip(value_names, valv)} - self.simple_upsert_txn_emulated(txn, table, _keys, _vals) + self.simple_upsert_txn_emulated(txn, table, _keys, _vals, lock=lock) def simple_upsert_many_txn_native_upsert( self, From 9556aae616e76c371bc58f55ddc7ab1fca8e06ac Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 7 Apr 2022 14:43:21 +0100 Subject: [PATCH 13/25] Don't double-lock the table --- synapse/storage/databases/main/client_ips.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index 85dc5070f758..b46d8ca9fcf0 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -653,7 +653,7 @@ def update_user_ips() -> None: key_values=keys, value_names=value_columns, value_values=values, - # TODO lock=False + lock=False ) def update_devices() -> None: From 303fba6f0dcb9f430ada37cd09a4a14f706ce7b2 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 7 Apr 2022 14:43:35 +0100 Subject: [PATCH 14/25] Inline column names --- synapse/storage/databases/main/client_ips.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index b46d8ca9fcf0..043f83243470 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -636,9 +636,7 @@ def _update_client_ips_batch_txn( def update_user_ips() -> None: # Keys and values for the `user_ips` upsert. - key_columns = "user_id", "access_token", "ip" keys = [] - value_columns = "user_agent", "device_id", "last_seen" values = [] for entry in to_update.items(): @@ -649,18 +647,16 @@ def update_user_ips() -> None: self.db_pool.simple_upsert_many_txn( txn, table="user_ips", - key_names=key_columns, + key_names=("user_id", "access_token", "ip"), key_values=keys, - value_names=value_columns, + value_names=("user_agent", "device_id", "last_seen"), value_values=values, lock=False ) def update_devices() -> None: # Keys and values for the `devices` update. - key_columns = "user_id", "device_id" keys = [] - value_columns = "user_agent", "last_seen", "ip" values = [] for entry in to_update.items(): @@ -675,9 +671,9 @@ def update_devices() -> None: self.db_pool.simple_update_many_txn( txn, table="devices", - key_names=key_columns, + key_names=("user_id", "device_id"), key_values=keys, - value_names=value_columns, + value_names=("user_agent", "last_seen", "ip"), value_values=values, ) From ac4b1d58ce3b103d586f4f80d9a91803ab735ac1 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 7 Apr 2022 14:45:13 +0100 Subject: [PATCH 15/25] Flatten user_ips builder --- synapse/storage/databases/main/client_ips.py | 33 ++++++++++---------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index 043f83243470..4e6f5976ef59 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -634,25 +634,24 @@ def _update_client_ips_batch_txn( ): self.database_engine.lock_table(txn, "user_ips") - def update_user_ips() -> None: - # Keys and values for the `user_ips` upsert. - keys = [] - values = [] + # Keys and values for the `user_ips` upsert. + user_ips_keys = [] + user_ips_values = [] - for entry in to_update.items(): - (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry - keys.append((user_id, access_token, ip)) - values.append((user_agent, device_id, last_seen)) + for entry in to_update.items(): + (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry + user_ips_keys.append((user_id, access_token, ip)) + user_ips_values.append((user_agent, device_id, last_seen)) - self.db_pool.simple_upsert_many_txn( - txn, - table="user_ips", - key_names=("user_id", "access_token", "ip"), - key_values=keys, - value_names=("user_agent", "device_id", "last_seen"), - value_values=values, - lock=False - ) + self.db_pool.simple_upsert_many_txn( + txn, + table="user_ips", + key_names=("user_id", "access_token", "ip"), + key_values=user_ips_keys, + value_names=("user_agent", "device_id", "last_seen"), + value_values=user_ips_values, + lock=False, + ) def update_devices() -> None: # Keys and values for the `devices` update. From 34403cb77edb44cf4063e251353cbdaf54df23ea Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 7 Apr 2022 14:46:23 +0100 Subject: [PATCH 16/25] Flatten devices builder --- synapse/storage/databases/main/client_ips.py | 45 ++++++++------------ 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index 4e6f5976ef59..e78344a68612 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -638,11 +638,21 @@ def _update_client_ips_batch_txn( user_ips_keys = [] user_ips_values = [] + # Keys and values for the `devices` update. + devices_keys = [] + devices_values = [] + for entry in to_update.items(): (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry user_ips_keys.append((user_id, access_token, ip)) user_ips_values.append((user_agent, device_id, last_seen)) + # Technically an access token might not be associated with + # a device so we need to check. + if device_id: + devices_keys.append((user_id, device_id)) + devices_values.append((user_agent, last_seen, ip)) + self.db_pool.simple_upsert_many_txn( txn, table="user_ips", @@ -653,33 +663,14 @@ def _update_client_ips_batch_txn( lock=False, ) - def update_devices() -> None: - # Keys and values for the `devices` update. - keys = [] - values = [] - - for entry in to_update.items(): - (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry - - # Technically an access token might not be associated with - # a device so we need to check. - if device_id: - keys.append((user_id, device_id)) - values.append((user_agent, last_seen, ip)) - - self.db_pool.simple_update_many_txn( - txn, - table="devices", - key_names=("user_id", "device_id"), - key_values=keys, - value_names=("user_agent", "last_seen", "ip"), - value_values=values, - ) - - # This update is split into two smaller functions so that we can - # be sure their locals doesn't overlap - update_user_ips() - update_devices() + self.db_pool.simple_update_many_txn( + txn, + table="devices", + key_names=("user_id", "device_id"), + key_values=devices_keys, + value_names=("user_agent", "last_seen", "ip"), + value_values=devices_values, + ) async def get_last_client_ip_by_device( self, user_id: str, device_id: Optional[str] From 0f8e98b7a2b304659bdbd78f2d5fa519cb0049a4 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 7 Apr 2022 14:48:19 +0100 Subject: [PATCH 17/25] Fix up docstring --- synapse/storage/database.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index a3bd3e3de784..fb2b76086a21 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1808,6 +1808,18 @@ async def simple_update_many( value_values: Iterable[Iterable[Any]], desc: str, ) -> None: + """ + Update, many times, using batching where possible. + If the keys don't match anything, nothing will be updated. + + Args: + table: The table to update + key_names: The key column names. + key_values: A list of each row's key column values. + value_names: The names of value columns to update. + value_values: A list of each row's value column values. + """ + return await self.runInteraction( desc, self.simple_update_many_txn, @@ -1829,9 +1841,10 @@ def simple_update_many_txn( ) -> None: """ Update, many times, using batching where possible. + If the keys don't match anything, nothing will be updated. Args: - table: The table to upsert into + table: The table to update key_names: The key column names. key_values: A list of each row's key column values. value_names: The names of value columns to update. From 481b730710c7b50d5de24e43e9c3034df0858415 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 7 Apr 2022 14:49:11 +0100 Subject: [PATCH 18/25] Remove dead variable --- synapse/storage/database.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index fb2b76086a21..278782efa01a 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1856,11 +1856,6 @@ def simple_update_many_txn( f"{len(key_values)} key rows and {len(value_values)} value rows: should be the same number." ) - # List of value names, then key names - allnames: List[str] = [] - allnames.extend(value_names) - allnames.extend(key_names) - # List of tuples of (value values, then key values) # (This matches the order of `allnames` and the order of the query) args = [tuple(x) + tuple(y) for x, y in zip(value_values, key_values)] From 99e6b6658d48c2dd9883bcbf75a6db4f2d14700c Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 7 Apr 2022 14:50:00 +0100 Subject: [PATCH 19/25] Don't return for None --- synapse/storage/database.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 278782efa01a..f94511ccacec 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1288,7 +1288,7 @@ async def simple_upsert_many( self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables ) - return await self.runInteraction( + await self.runInteraction( desc, self.simple_upsert_many_txn, table, @@ -1820,7 +1820,7 @@ async def simple_update_many( value_values: A list of each row's value column values. """ - return await self.runInteraction( + await self.runInteraction( desc, self.simple_update_many_txn, table, From 3f7f6592197c3fec92e3224d31186eef4ac83153 Mon Sep 17 00:00:00 2001 From: reivilibre Date: Thu, 7 Apr 2022 14:59:28 +0100 Subject: [PATCH 20/25] Update synapse/storage/database.py Co-authored-by: Patrick Cloke --- synapse/storage/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index f94511ccacec..f929b673a1fd 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1857,7 +1857,7 @@ def simple_update_many_txn( ) # List of tuples of (value values, then key values) - # (This matches the order of `allnames` and the order of the query) + # (This matches the order needed for the query) args = [tuple(x) + tuple(y) for x, y in zip(value_values, key_values)] for ks, vs in zip(key_values, value_values): From 50f2b9179095235417f7fb283b38ffdfab2928c8 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 7 Apr 2022 15:59:49 +0100 Subject: [PATCH 21/25] Bail out when there's nothing to do --- synapse/storage/databases/main/client_ips.py | 24 +++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index e78344a68612..d9e1fe09e7ac 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -616,9 +616,10 @@ async def _update_client_ips_batch(self) -> None: to_update = self._batch_row_update self._batch_row_update = {} - await self.db_pool.runInteraction( - "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update - ) + if to_update: + await self.db_pool.runInteraction( + "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update + ) def _update_client_ips_batch_txn( self, @@ -663,14 +664,15 @@ def _update_client_ips_batch_txn( lock=False, ) - self.db_pool.simple_update_many_txn( - txn, - table="devices", - key_names=("user_id", "device_id"), - key_values=devices_keys, - value_names=("user_agent", "last_seen", "ip"), - value_values=devices_values, - ) + if devices_values: + self.db_pool.simple_update_many_txn( + txn, + table="devices", + key_names=("user_id", "device_id"), + key_values=devices_keys, + value_names=("user_agent", "last_seen", "ip"), + value_values=devices_values, + ) async def get_last_client_ip_by_device( self, user_id: str, device_id: Optional[str] From 93e12375cd74d5ba73f5da59e5ba5039fd0c5666 Mon Sep 17 00:00:00 2001 From: reivilibre Date: Fri, 8 Apr 2022 10:15:09 +0100 Subject: [PATCH 22/25] Apply suggestions from code review Co-authored-by: Patrick Cloke --- synapse/storage/database.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index f929b673a1fd..73ad316f522d 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1280,7 +1280,8 @@ async def simple_upsert_many( value_names: The value column names value_values: A list of each row's value column values. Ignored if value_names is empty. - lock: True to lock the table when doing the upsert. + lock: True to lock the table when doing the upsert. Unused if the database engine + supports native upserts. """ # We can autocommit if we are going to use native upserts @@ -1320,7 +1321,8 @@ def simple_upsert_many_txn( value_names: The value column names value_values: A list of each row's value column values. Ignored if value_names is empty. - lock: True to lock the table when doing the upsert. + lock: True to lock the table when doing the upsert. Unused if the database engine + supports native upserts. """ if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables: return self.simple_upsert_many_txn_native_upsert( From c0aaec40bd8b06575c4e371b2ef6a6155b8f544d Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 8 Apr 2022 10:41:47 +0100 Subject: [PATCH 23/25] Antilint --- synapse/storage/database.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 73ad316f522d..77dc9df34487 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1281,7 +1281,7 @@ async def simple_upsert_many( value_values: A list of each row's value column values. Ignored if value_names is empty. lock: True to lock the table when doing the upsert. Unused if the database engine - supports native upserts. + supports native upserts. """ # We can autocommit if we are going to use native upserts @@ -1322,7 +1322,7 @@ def simple_upsert_many_txn( value_values: A list of each row's value column values. Ignored if value_names is empty. lock: True to lock the table when doing the upsert. Unused if the database engine - supports native upserts. + supports native upserts. """ if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables: return self.simple_upsert_many_txn_native_upsert( From c456958c4a3923fa0c609a2143ce0992b114ae4b Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 8 Apr 2022 10:53:59 +0100 Subject: [PATCH 24/25] Lock only once for batch emulated upserts --- synapse/storage/database.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 77dc9df34487..5eb545c86e2a 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1360,11 +1360,17 @@ def simple_upsert_many_txn_emulated( if not value_names: value_values = [() for x in range(len(key_values))] + if lock: + # Lock the table just once, to prevent it being done once per row. + # Note that, according to Postgres' documentation, once obtained, + # the lock is held for the remainder of the current transaction. + self.engine.lock_table(txn, "user_ips") + for keyv, valv in zip(key_values, value_values): _keys = {x: y for x, y in zip(key_names, keyv)} _vals = {x: y for x, y in zip(value_names, valv)} - self.simple_upsert_txn_emulated(txn, table, _keys, _vals, lock=lock) + self.simple_upsert_txn_emulated(txn, table, _keys, _vals, lock=False) def simple_upsert_many_txn_native_upsert( self, From 214aacf40354567680150415e4b0920793cb1ef5 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 8 Apr 2022 10:57:20 +0100 Subject: [PATCH 25/25] No need to manually lock --- synapse/storage/databases/main/client_ips.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index d9e1fe09e7ac..0df160d2b089 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -630,11 +630,6 @@ def _update_client_ips_batch_txn( self._update_on_this_worker ), "This worker is not designated to update client IPs" - if "user_ips" in self.db_pool._unsafe_to_upsert_tables or ( - not self.database_engine.can_native_upsert - ): - self.database_engine.lock_table(txn, "user_ips") - # Keys and values for the `user_ips` upsert. user_ips_keys = [] user_ips_values = [] @@ -661,7 +656,6 @@ def _update_client_ips_batch_txn( key_values=user_ips_keys, value_names=("user_agent", "device_id", "last_seen"), value_values=user_ips_values, - lock=False, ) if devices_values: