From 9b54b80f1783808c5ae2a30e189f24a7404a8c95 Mon Sep 17 00:00:00 2001 From: Taoyu Li Date: Thu, 27 Jul 2017 19:52:01 -0700 Subject: [PATCH] Add support to subscribe to config DB change (#8) --- src/swsssdk/configdb.py | 134 ++++++++++++++++++++++++++++++++++----- src/swsssdk/interface.py | 2 +- 2 files changed, 118 insertions(+), 18 deletions(-) diff --git a/src/swsssdk/configdb.py b/src/swsssdk/configdb.py index 9007be4a8e21..5eb04125edbc 100644 --- a/src/swsssdk/configdb.py +++ b/src/swsssdk/configdb.py @@ -2,28 +2,120 @@ SONiC ConfigDB connection module Example: + # Write to config DB config_db = ConfigDBConnector() config_db.connect() - config_db.set_table('bgp_neighbor', '10.0.0.1', { + config_db.set_entry('BGP_NEIGHBOR', '10.0.0.1', { 'admin_status': state }) -TODO: - Will add API on keyspace subscription for incremental configuration support - in the incoming versions. + # Daemon to watch config change in certain table: + config_db = ConfigDBConnector() + handler = lambda table, key, data: print (key, data) + config_db.subscribe('BGP_NEIGHBOR', handler) + config_db.connect() + config_db.listen() + """ +import time from .dbconnector import SonicV2Connector class ConfigDBConnector(SonicV2Connector): + INIT_INDICATOR = 'CONFIG_DB_INITIALIZED' + def __init__(self): - """Connect to Redis through TCP, which does not requires root. - """ + # Connect to Redis through TCP, which does not requires root. super(ConfigDBConnector, self).__init__(host='127.0.0.1') + self.handlers = {} - def connect(self): + def __wait_for_db_init(self): + client = self.redis_clients[self.CONFIG_DB] + pubsub = client.pubsub() + initialized = client.get(self.INIT_INDICATOR) + if not initialized: + pattern = "__keyspace@{}__:{}".format(self.db_map[self.CONFIG_DB]['db'], self.INIT_INDICATOR) + pubsub.psubscribe(pattern) + for item in pubsub.listen(): + if item['type'] == 'pmessage': + key = item['channel'].split(':', 1)[1] + if key == self.INIT_INDICATOR: + initialized = client.get(self.INIT_INDICATOR) + if initialized: + break + pubsub.punsubscribe(pattern) + + + def connect(self, wait_for_init=True): SonicV2Connector.connect(self, self.CONFIG_DB, False) + if wait_for_init: + self.__wait_for_db_init() + + def subscribe(self, table, handler): + """Set a handler to handle config change in certain table. + Note that a single handler can be registered to different tables by + calling this fuction multiple times. + Args: + table: Table name. + handler: a handler function that has signature of handler(table_name, key, data) + """ + self.handlers[table] = handler + + def unsubscribe(self, table): + """Remove registered handler from a certain table. + Args: + table: Table name. + """ + if self.handlers.has_key(table): + self.handlers.pop(table) + + def __fire(self, table, key, data): + if self.handlers.has_key(table): + handler = self.handlers[table] + handler(table, key, data) + + def listen(self): + """Start listen Redis keyspace events and will trigger corresponding handlers when content of a table changes. + """ + self.pubsub = self.redis_clients[self.CONFIG_DB].pubsub() + self.pubsub.psubscribe("__keyspace@{}__:*".format(self.db_map[self.CONFIG_DB]['db'])) + for item in self.pubsub.listen(): + if item['type'] == 'pmessage': + key = item['channel'].split(':', 1)[1] + try: + (table, row) = key.split(':', 1) + if self.handlers.has_key(table): + client = self.redis_clients[self.CONFIG_DB] + data = self.__raw_to_typed(client.hgetall(key)) + self.__fire(table, row, data) + except ValueError: + pass #Ignore non table-formated redis entries + + def __raw_to_typed(self, raw_data): + if raw_data == None: + return {} + typed_data = {} + for key in raw_data: + # A column key with ending '@' is used to mark list-typed table items + # TODO: Replace this with a schema-based typing mechanism. + if key.endswith("@"): + typed_data[key[:-1]] = raw_data[key].split(',') + else: + typed_data[key] = raw_data[key] + return typed_data + + def __typed_to_raw(self, typed_data): + if typed_data == None: + return None + raw_data = {} + for key in typed_data: + value = typed_data[key] + if type(value) is list: + raw_data[key+'@'] = ','.join(value) + else: + raw_data[key] = value + return raw_data def set_entry(self, table, key, data): """Write a table entry to config db. @@ -34,7 +126,7 @@ def set_entry(self, table, key, data): """ client = self.redis_clients[self.CONFIG_DB] _hash = '{}:{}'.format(table.upper(), key) - client.hmset(_hash, data) + client.hmset(_hash, self.__typed_to_raw(data)) def get_entry(self, table, key): """Read a table entry from config db. @@ -47,7 +139,7 @@ def get_entry(self, table, key): """ client = self.redis_clients[self.CONFIG_DB] _hash = '{}:{}'.format(table.upper(), key) - return client.hgetall(_hash) + return self.__raw_to_typed(client.hgetall(_hash)) def get_table(self, table): """Read an entire table from config db. @@ -63,7 +155,13 @@ def get_table(self, table): keys = client.keys(pattern) data = {} for key in keys: - data[key.split(':')[1]] = client.hgetall(key) + try: + (_, row) = key.split(':', 1) + entry = self.__raw_to_typed(client.hgetall(key)) + if entry: + data[row] = entry + except ValueError: + pass #Ignore non table-formated redis entries return data def set_config(self, data): @@ -90,13 +188,15 @@ def get_config(self): } """ client = self.redis_clients[self.CONFIG_DB] - hashes = client.keys('*') + keys = client.keys('*') data = {} - for _hash in hashes: - table_name = _hash.split(':', 1)[0] - key = _hash.split(':', 1)[1] - if not data.has_key(table_name): - data[table_name] = {} - data[table_name][key] = client.hgetall(_hash) + for key in keys: + try: + (table_name, row) = key.split(':', 1) + entry = self.__raw_to_typed(client.hgetall(key)) + if entry: + data.setdefault(table_name, {})[row] = entry + except ValueError: + pass #Ignore non table-formated redis entries return data diff --git a/src/swsssdk/interface.py b/src/swsssdk/interface.py index 9c68bb668e02..d2f73513b3b7 100644 --- a/src/swsssdk/interface.py +++ b/src/swsssdk/interface.py @@ -121,7 +121,7 @@ class DBInterface(object): Pub-sub keyspace pattern """ - KEYSPACE_EVENTS = 'KEgh' + KEYSPACE_EVENTS = 'KEA' """ In Redis, by default keyspace events notifications are disabled because while not very sensible the feature uses some CPU power. Notifications are enabled using