From fa9093c7182d38f5ea687ebfbd04c2c89462f86d Mon Sep 17 00:00:00 2001 From: Tamer Ahmed Date: Wed, 22 Jul 2020 16:04:46 -0700 Subject: [PATCH 1/4] [configdb] Add Ability to Query/Update Redis Using Pipelines Redis recommend using pipeline in order to obtain optimal speen when handling large volume of data. Te pipeline API also given an ability devide work in batches. singed-off-by: Tamer Ahmed --- src/swsssdk/__init__.py | 2 +- src/swsssdk/configdb.py | 141 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 134 insertions(+), 9 deletions(-) diff --git a/src/swsssdk/__init__.py b/src/swsssdk/__init__.py index d7cf391f550c..982286020342 100644 --- a/src/swsssdk/__init__.py +++ b/src/swsssdk/__init__.py @@ -9,7 +9,7 @@ try: from .dbconnector import SonicDBConfig, SonicV2Connector - from .configdb import ConfigDBConnector + from .configdb import ConfigDBConnector, ConfigDBPipeConnector from .sonic_db_dump_load import sonic_db_dump_load except (KeyError, ValueError): msg = "Failed to database connector objects -- incorrect database config schema." diff --git a/src/swsssdk/configdb.py b/src/swsssdk/configdb.py index 4ccbb0dca32e..ad99564bd925 100644 --- a/src/swsssdk/configdb.py +++ b/src/swsssdk/configdb.py @@ -108,12 +108,12 @@ def listen(self): (table, row) = key.split(self.TABLE_NAME_SEPARATOR, 1) if self.handlers.has_key(table): client = self.get_redis_client(self.db_name) - data = self.__raw_to_typed(client.hgetall(key)) + data = self.raw_to_typed(client.hgetall(key)) self.__fire(table, row, data) except ValueError: pass #Ignore non table-formated redis entries - def __raw_to_typed(self, raw_data): + def raw_to_typed(self, raw_data): if raw_data == None: return None typed_data = {} @@ -141,7 +141,7 @@ def __raw_to_typed(self, raw_data): typed_data[key] = raw_data[raw_key] return typed_data - def __typed_to_raw(self, typed_data): + def typed_to_raw(self, typed_data): if typed_data == None: return None elif typed_data == {}: @@ -187,7 +187,7 @@ def set_entry(self, table, key, data): client.delete(_hash) else: original = self.get_entry(table, key) - client.hmset(_hash, self.__typed_to_raw(data)) + client.hmset(_hash, self.typed_to_raw(data)) for k in [ k for k in original.keys() if k not in data.keys() ]: if type(original[k]) == list: k = k + '@' @@ -208,7 +208,7 @@ def mod_entry(self, table, key, data): if data == None: client.delete(_hash) else: - client.hmset(_hash, self.__typed_to_raw(data)) + client.hmset(_hash, self.typed_to_raw(data)) def get_entry(self, table, key): """Read a table entry from config db. @@ -222,7 +222,7 @@ def get_entry(self, table, key): key = self.serialize_key(key) client = self.get_redis_client(self.db_name) _hash = '{}{}{}'.format(table.upper(), self.TABLE_NAME_SEPARATOR, key) - return self.__raw_to_typed(client.hgetall(_hash)) + return self.raw_to_typed(client.hgetall(_hash)) def get_keys(self, table, split=True): """Read all keys of a table from config db. @@ -266,7 +266,7 @@ def get_table(self, table): data = {} for key in keys: try: - entry = self.__raw_to_typed(client.hgetall(key)) + entry = self.raw_to_typed(client.hgetall(key)) if entry != None: if PY3K: key = key.decode('utf-8') @@ -328,10 +328,135 @@ def get_config(self): key = key.decode('utf-8') try: (table_name, row) = key.split(self.TABLE_NAME_SEPARATOR, 1) - entry = self.__raw_to_typed(client.hgetall(key)) + entry = self.raw_to_typed(client.hgetall(key)) if entry != None: data.setdefault(table_name, {})[self.deserialize_key(row)] = entry except ValueError: pass #Ignore non table-formated redis entries return data + +class ConfigDBPipeConnector(ConfigDBConnector): + REDIS_SCAN_BATCH_SIZE = 30 + + def __init__(self, **kwargs): + super(ConfigDBPipeConnector, self).__init__(**kwargs) + + def __delete_entries(self, client, pipe, pattern, cursor): + """Helper method to delete table entries from config db using Redis pipeline + with batch size of REDIS_SCAN_BATCH_SIZE. + The caller should call pipeline execute once ready + Args: + client: Redis client + pipe: Redis DB pipe + pattern: key pattern + cursor: position to start scanning from + + Returns: + cur: poition of next item to scan + """ + cur, keys = client.scan(cursor=cursor, match=pattern, count=self.REDIS_SCAN_BATCH_SIZE) + for key in keys: + pipe.delete(key) + + return cur + + def __delete_table(self, client, pipe, table): + """Helper method to delete table entries from config db using Redis pipeline. + The caller should call pipeline execute once ready + Args: + client: Redis client + pipe: Redis DB pipe + table: Table name. + """ + pattern = '{}{}*'.format(table.upper(), self.TABLE_NAME_SEPARATOR) + cur = self.__delete_entries(client, pipe, pattern, 0) + while cur != 0: + cur = self.__delete_entries(client, pipe, pattern, cur) + + def __mod_entry(self, pipe, table, key, data): + """Modify a table entry to config db. + Args: + table: Table name. + pipe: Redis DB pipe + table: Table name. + key: Key of table entry, or a tuple of keys if it is a multi-key table. + data: Table row data in a form of dictionary {'column_key': 'value', ...}. + Pass {} as data will create an entry with no column if not already existed. + Pass None as data will delete the entry. + """ + key = self.serialize_key(key) + _hash = '{}{}{}'.format(table.upper(), self.TABLE_NAME_SEPARATOR, key) + if data == None: + pipe.delete(_hash) + else: + pipe.hmset(_hash, self.typed_to_raw(data)) + + def mod_config(self, data): + """Write multiple tables into config db. + Extra entries/fields in the db which are not in the data are kept. + Args: + data: config data in a dictionary form + { + 'TABLE_NAME': { 'row_key': {'column_key': 'value', ...}, ...}, + 'MULTI_KEY_TABLE_NAME': { ('l1_key', 'l2_key', ...) : {'column_key': 'value', ...}, ...}, + ... + } + """ + client = self.get_redis_client(self.db_name) + pipe = client.pipeline() + for table_name in data: + table_data = data[table_name] + if table_data == None: + self.__delete_table(client, pipe, table_name) + continue + for key in table_data: + self.__mod_entry(pipe, table_name, key, table_data[key]) + pipe.execute() + client.bgsave() + + def _get_config(self, client, pipe, data, cursor): + """Read config data in batches of size REDIS_SCAN_BATCH_SIZE using Redis pipelines + Args: + client: Redis client + pipe: Redis DB pipe + data: config dictionary + cursor: position to start scanning from + + Returns: + cur: poition of next item to scan + """ + cur, keys = client.scan(cursor=cursor, match='*', count=self.REDIS_SCAN_BATCH_SIZE) + keys = [key.decode('utf-8') for key in keys if key != self.INIT_INDICATOR] + for key in keys: + pipe.hgetall(key) + records = pipe.execute() + + for index, key in enumerate(keys): + (table_name, row) = key.split(self.TABLE_NAME_SEPARATOR, 1) + entry = self.raw_to_typed(records[index]) + if entry is not None: + data.setdefault(table_name, {})[self.deserialize_key(row)] = entry + + return cur + + def get_config(self): + """Read all config data. + Returns: + Config data in a dictionary form of + { + 'TABLE_NAME': { 'row_key': {'column_key': 'value', ...}, ...}, + 'MULTI_KEY_TABLE_NAME': { ('l1_key', 'l2_key', ...) : {'column_key': 'value', ...}, ...}, + ... + } + """ + client = self.get_redis_client(self.db_name) + pipe = client.pipeline() + data = {} + + cur = self._get_config(client, pipe, data, 0) + while cur != 0: + cur = self._get_config(client, pipe, data, cur) + + return data + From 2d2b7e1ae0031ccc77d0a1d6c4bfc77aab93df70 Mon Sep 17 00:00:00 2001 From: Tamer Ahmed Date: Wed, 22 Jul 2020 17:17:46 -0700 Subject: [PATCH 2/4] making lgtm happy --- src/swsssdk/configdb.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/swsssdk/configdb.py b/src/swsssdk/configdb.py index ad99564bd925..0996f023b27c 100644 --- a/src/swsssdk/configdb.py +++ b/src/swsssdk/configdb.py @@ -114,7 +114,7 @@ def listen(self): pass #Ignore non table-formated redis entries def raw_to_typed(self, raw_data): - if raw_data == None: + if raw_data is None: return None typed_data = {} for raw_key in raw_data: @@ -142,7 +142,7 @@ def raw_to_typed(self, raw_data): return typed_data def typed_to_raw(self, typed_data): - if typed_data == None: + if typed_data is None: return None elif typed_data == {}: return { "NULL": "NULL" } @@ -183,7 +183,7 @@ def set_entry(self, table, key, data): key = self.serialize_key(key) client = self.get_redis_client(self.db_name) _hash = '{}{}{}'.format(table.upper(), self.TABLE_NAME_SEPARATOR, key) - if data == None: + if data is None: client.delete(_hash) else: original = self.get_entry(table, key) @@ -205,7 +205,7 @@ def mod_entry(self, table, key, data): key = self.serialize_key(key) client = self.get_redis_client(self.db_name) _hash = '{}{}{}'.format(table.upper(), self.TABLE_NAME_SEPARATOR, key) - if data == None: + if data is None: client.delete(_hash) else: client.hmset(_hash, self.typed_to_raw(data)) @@ -387,7 +387,7 @@ def __mod_entry(self, pipe, table, key, data): """ key = self.serialize_key(key) _hash = '{}{}{}'.format(table.upper(), self.TABLE_NAME_SEPARATOR, key) - if data == None: + if data is None: pipe.delete(_hash) else: pipe.hmset(_hash, self.typed_to_raw(data)) @@ -407,7 +407,7 @@ def mod_config(self, data): pipe = client.pipeline() for table_name in data: table_data = data[table_name] - if table_data == None: + if table_data is None: self.__delete_table(client, pipe, table_name) continue for key in table_data: From 994851c8e95f143dfebc59c2e89878876bee225d Mon Sep 17 00:00:00 2001 From: Tamer Ahmed Date: Fri, 24 Jul 2020 14:44:15 -0700 Subject: [PATCH 3/4] review comments --- src/swsssdk/configdb.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/swsssdk/configdb.py b/src/swsssdk/configdb.py index 0996f023b27c..ab0ee86dc97f 100644 --- a/src/swsssdk/configdb.py +++ b/src/swsssdk/configdb.py @@ -120,7 +120,7 @@ def raw_to_typed(self, raw_data): for raw_key in raw_data: key = raw_key if PY3K: - key = raw_key.decode('utf-8') + key = raw_key.decode() # "NULL:NULL" is used as a placeholder for objects with no attributes if key == "NULL": @@ -136,7 +136,7 @@ def raw_to_typed(self, raw_data): typed_data[key[:-1]] = value else: if PY3K: - typed_data[key] = raw_data[raw_key].decode('utf-8') + typed_data[key] = raw_data[raw_key].decode() else: typed_data[key] = raw_data[raw_key] return typed_data @@ -240,7 +240,7 @@ def get_keys(self, table, split=True): for key in keys: try: if PY3K: - key = key.decode('utf-8') + key = key.decode() if split: (_, row) = key.split(self.TABLE_NAME_SEPARATOR, 1) data.append(self.deserialize_key(row)) @@ -269,7 +269,7 @@ def get_table(self, table): entry = self.raw_to_typed(client.hgetall(key)) if entry != None: if PY3K: - key = key.decode('utf-8') + key = key.decode() (_, row) = key.split(self.TABLE_NAME_SEPARATOR, 1) data[self.deserialize_key(row)] = entry else: @@ -325,7 +325,7 @@ def get_config(self): data = {} for key in keys: if PY3K: - key = key.decode('utf-8') + key = key.decode() try: (table_name, row) = key.split(self.TABLE_NAME_SEPARATOR, 1) entry = self.raw_to_typed(client.hgetall(key)) @@ -427,7 +427,7 @@ def _get_config(self, client, pipe, data, cursor): cur: poition of next item to scan """ cur, keys = client.scan(cursor=cursor, match='*', count=self.REDIS_SCAN_BATCH_SIZE) - keys = [key.decode('utf-8') for key in keys if key != self.INIT_INDICATOR] + keys = [key.decode() for key in keys if key != self.INIT_INDICATOR] for key in keys: pipe.hgetall(key) records = pipe.execute() From 198d14328bd096bd344197e554791af044931aaf Mon Sep 17 00:00:00 2001 From: Tamer Ahmed Date: Mon, 27 Jul 2020 10:26:40 -0700 Subject: [PATCH 4/4] review comments --- src/swsssdk/configdb.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/swsssdk/configdb.py b/src/swsssdk/configdb.py index ab0ee86dc97f..8fcad0b4c689 100644 --- a/src/swsssdk/configdb.py +++ b/src/swsssdk/configdb.py @@ -415,7 +415,7 @@ def mod_config(self, data): pipe.execute() client.bgsave() - def _get_config(self, client, pipe, data, cursor): + def __get_config(self, client, pipe, data, cursor): """Read config data in batches of size REDIS_SCAN_BATCH_SIZE using Redis pipelines Args: client: Redis client @@ -454,9 +454,9 @@ def get_config(self): pipe = client.pipeline() data = {} - cur = self._get_config(client, pipe, data, 0) + cur = self.__get_config(client, pipe, data, 0) while cur != 0: - cur = self._get_config(client, pipe, data, cur) + cur = self.__get_config(client, pipe, data, cur) return data