Skip to content

Commit

Permalink
Add support to subscribe to config DB change (sonic-net#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
taoyl-ms authored Jul 28, 2017
1 parent dfe4bac commit 9b54b80
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 18 deletions.
134 changes: 117 additions & 17 deletions src/swsssdk/configdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,120 @@
SONiC ConfigDB connection module
Example:
# Write to config DB
config_db = ConfigDBConnector()
config_db.connect()
config_db.set_table('bgp_neighbor', '10.0.0.1', {
config_db.set_entry('BGP_NEIGHBOR', '10.0.0.1', {
'admin_status': state
})
TODO:
Will add API on keyspace subscription for incremental configuration support
in the incoming versions.
# Daemon to watch config change in certain table:
config_db = ConfigDBConnector()
handler = lambda table, key, data: print (key, data)
config_db.subscribe('BGP_NEIGHBOR', handler)
config_db.connect()
config_db.listen()
"""

import time
from .dbconnector import SonicV2Connector

class ConfigDBConnector(SonicV2Connector):

INIT_INDICATOR = 'CONFIG_DB_INITIALIZED'

def __init__(self):
"""Connect to Redis through TCP, which does not requires root.
"""
# Connect to Redis through TCP, which does not requires root.
super(ConfigDBConnector, self).__init__(host='127.0.0.1')
self.handlers = {}

def connect(self):
def __wait_for_db_init(self):
client = self.redis_clients[self.CONFIG_DB]
pubsub = client.pubsub()
initialized = client.get(self.INIT_INDICATOR)
if not initialized:
pattern = "__keyspace@{}__:{}".format(self.db_map[self.CONFIG_DB]['db'], self.INIT_INDICATOR)
pubsub.psubscribe(pattern)
for item in pubsub.listen():
if item['type'] == 'pmessage':
key = item['channel'].split(':', 1)[1]
if key == self.INIT_INDICATOR:
initialized = client.get(self.INIT_INDICATOR)
if initialized:
break
pubsub.punsubscribe(pattern)


def connect(self, wait_for_init=True):
SonicV2Connector.connect(self, self.CONFIG_DB, False)
if wait_for_init:
self.__wait_for_db_init()

def subscribe(self, table, handler):
"""Set a handler to handle config change in certain table.
Note that a single handler can be registered to different tables by
calling this fuction multiple times.
Args:
table: Table name.
handler: a handler function that has signature of handler(table_name, key, data)
"""
self.handlers[table] = handler

def unsubscribe(self, table):
"""Remove registered handler from a certain table.
Args:
table: Table name.
"""
if self.handlers.has_key(table):
self.handlers.pop(table)

def __fire(self, table, key, data):
if self.handlers.has_key(table):
handler = self.handlers[table]
handler(table, key, data)

def listen(self):
"""Start listen Redis keyspace events and will trigger corresponding handlers when content of a table changes.
"""
self.pubsub = self.redis_clients[self.CONFIG_DB].pubsub()
self.pubsub.psubscribe("__keyspace@{}__:*".format(self.db_map[self.CONFIG_DB]['db']))
for item in self.pubsub.listen():
if item['type'] == 'pmessage':
key = item['channel'].split(':', 1)[1]
try:
(table, row) = key.split(':', 1)
if self.handlers.has_key(table):
client = self.redis_clients[self.CONFIG_DB]
data = self.__raw_to_typed(client.hgetall(key))
self.__fire(table, row, data)
except ValueError:
pass #Ignore non table-formated redis entries

def __raw_to_typed(self, raw_data):
if raw_data == None:
return {}
typed_data = {}
for key in raw_data:
# A column key with ending '@' is used to mark list-typed table items
# TODO: Replace this with a schema-based typing mechanism.
if key.endswith("@"):
typed_data[key[:-1]] = raw_data[key].split(',')
else:
typed_data[key] = raw_data[key]
return typed_data

def __typed_to_raw(self, typed_data):
if typed_data == None:
return None
raw_data = {}
for key in typed_data:
value = typed_data[key]
if type(value) is list:
raw_data[key+'@'] = ','.join(value)
else:
raw_data[key] = value
return raw_data

def set_entry(self, table, key, data):
"""Write a table entry to config db.
Expand All @@ -34,7 +126,7 @@ def set_entry(self, table, key, data):
"""
client = self.redis_clients[self.CONFIG_DB]
_hash = '{}:{}'.format(table.upper(), key)
client.hmset(_hash, data)
client.hmset(_hash, self.__typed_to_raw(data))

def get_entry(self, table, key):
"""Read a table entry from config db.
Expand All @@ -47,7 +139,7 @@ def get_entry(self, table, key):
"""
client = self.redis_clients[self.CONFIG_DB]
_hash = '{}:{}'.format(table.upper(), key)
return client.hgetall(_hash)
return self.__raw_to_typed(client.hgetall(_hash))

def get_table(self, table):
"""Read an entire table from config db.
Expand All @@ -63,7 +155,13 @@ def get_table(self, table):
keys = client.keys(pattern)
data = {}
for key in keys:
data[key.split(':')[1]] = client.hgetall(key)
try:
(_, row) = key.split(':', 1)
entry = self.__raw_to_typed(client.hgetall(key))
if entry:
data[row] = entry
except ValueError:
pass #Ignore non table-formated redis entries
return data

def set_config(self, data):
Expand All @@ -90,13 +188,15 @@ def get_config(self):
}
"""
client = self.redis_clients[self.CONFIG_DB]
hashes = client.keys('*')
keys = client.keys('*')
data = {}
for _hash in hashes:
table_name = _hash.split(':', 1)[0]
key = _hash.split(':', 1)[1]
if not data.has_key(table_name):
data[table_name] = {}
data[table_name][key] = client.hgetall(_hash)
for key in keys:
try:
(table_name, row) = key.split(':', 1)
entry = self.__raw_to_typed(client.hgetall(key))
if entry:
data.setdefault(table_name, {})[row] = entry
except ValueError:
pass #Ignore non table-formated redis entries
return data

2 changes: 1 addition & 1 deletion src/swsssdk/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class DBInterface(object):
Pub-sub keyspace pattern
"""

KEYSPACE_EVENTS = 'KEgh'
KEYSPACE_EVENTS = 'KEA'
"""
In Redis, by default keyspace events notifications are disabled because while not
very sensible the feature uses some CPU power. Notifications are enabled using
Expand Down

0 comments on commit 9b54b80

Please sign in to comment.