Skip to content

Commit

Permalink
fix (dot/rpc, dot/state): state_subscribeStorage to only notify for v…
Browse files Browse the repository at this point in the history
…alue changes (#1460)

* move websocket messages and listeners into own files

* fix notifyStorageSubscriptions to only notify for changes

* address PR comments

* add to websocket tests

* repair append, cleanup filter declareation

* fix anti-pattern in log message

* create notifyStorageSubscription for individual sub notify

* add websocket listeners unit tests

* cleanup merge conflicts

* lint

* add sleep timer

* refactor websocket files

* lint

* a locks to fix data race

* implement observer design pattern

* fix race conditions

* add tests

* add tests

* add tests

* add tests

* add tests

* add tests

* add troubleshooting stuff for testing transactions

* save commit

* address PR comments

* lint

* remove unused printf and comments

* fix test

* update tests

* add return from error
  • Loading branch information
edwardmack authored Apr 22, 2021
1 parent e1f9f42 commit e5ae30d
Show file tree
Hide file tree
Showing 14 changed files with 1,075 additions and 655 deletions.
9 changes: 4 additions & 5 deletions dot/rpc/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,8 @@ func (h *HTTPServer) Stop() error {
for _, conn := range h.wsConns {
for _, sub := range conn.Subscriptions {
switch v := sub.(type) {
case *subscription.StorageChangeListener:
h.serverConfig.StorageAPI.UnregisterStorageChangeChannel(v.ChanID)
close(v.Channel)
case *subscription.StorageObserver:
h.serverConfig.StorageAPI.UnregisterStorageObserver(v)
case *subscription.BlockListener:
h.serverConfig.BlockAPI.UnregisterImportedChannel(v.ChanID)
close(v.Channel)
Expand Down Expand Up @@ -234,8 +233,8 @@ func (h *HTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func NewWSConn(conn *websocket.Conn, cfg *HTTPServerConfig) *subscription.WSConn {
c := &subscription.WSConn{
Wsconn: conn,
Subscriptions: make(map[int]subscription.Listener),
BlockSubChannels: make(map[int]byte),
Subscriptions: make(map[uint]subscription.Listener),
BlockSubChannels: make(map[uint]byte),
StorageSubChannels: make(map[int]byte),
StorageAPI: cfg.StorageAPI,
BlockAPI: cfg.BlockAPI,
Expand Down
4 changes: 2 additions & 2 deletions dot/rpc/modules/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ type StorageAPI interface {
GetStorage(root *common.Hash, key []byte) ([]byte, error)
GetStorageByBlockHash(bhash common.Hash, key []byte) ([]byte, error)
Entries(root *common.Hash) (map[string][]byte, error)
RegisterStorageChangeChannel(sub state.StorageSubscription) (byte, error)
UnregisterStorageChangeChannel(id byte)
GetStateRootFromBlock(bhash *common.Hash) (*common.Hash, error)
GetKeysWithPrefix(root *common.Hash, prefix []byte) ([][]byte, error)
RegisterStorageObserver(observer state.Observer)
UnregisterStorageObserver(observer state.Observer)
}

// BlockAPI is the interface for the block state
Expand Down
241 changes: 50 additions & 191 deletions dot/rpc/subscription/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,119 +30,67 @@ type Listener interface {
Listen()
}

func (c *WSConn) startListener(lid int) {
go c.Subscriptions[lid].Listen()
// WSConnAPI interface defining methors a WSConn should have
type WSConnAPI interface {
safeSend(interface{})
}

func (c *WSConn) initStorageChangeListener(reqID float64, params interface{}) (int, error) {
scl := &StorageChangeListener{
Channel: make(chan *state.SubscriptionResult),
wsconn: c,
}
sub := &state.StorageSubscription{
Filter: make(map[string]bool),
Listener: scl.Channel,
}
// StorageObserver struct to hold data for observer (Observer Design Pattern)
type StorageObserver struct {
id uint
filter map[string][]byte
wsconn WSConnAPI
}

pA := params.([]interface{})
for _, param := range pA {
switch p := param.(type) {
case []interface{}:
for _, pp := range param.([]interface{}) {
sub.Filter[pp.(string)] = true
}
case string:
sub.Filter[p] = true
default:
return 0, fmt.Errorf("unknow parameter type")
}
}
// Change type defining key value pair representing change
type Change [2]string

if c.StorageAPI == nil {
c.safeSendError(reqID, nil, "error StorageAPI not set")
return 0, fmt.Errorf("error StorageAPI not set")
}
// ChangeResult struct to hold change result data
type ChangeResult struct {
Changes []Change `json:"changes"`
Block string `json:"block"`
}

chanID, err := c.StorageAPI.RegisterStorageChangeChannel(*sub)
if err != nil {
return 0, err
// Update is called to notify observer of new value
func (s *StorageObserver) Update(change *state.SubscriptionResult) {
if change == nil {
return
}
scl.ChanID = chanID

c.qtyListeners++
scl.subID = c.qtyListeners
c.Subscriptions[scl.subID] = scl
c.StorageSubChannels[scl.subID] = chanID

initRes := newSubscriptionResponseJSON(scl.subID, reqID)
c.safeSend(initRes)
changeResult := ChangeResult{
Block: change.Hash.String(),
Changes: make([]Change, len(change.Changes)),
}
for i, v := range change.Changes {
changeResult.Changes[i] = Change{common.BytesToHex(v.Key), common.BytesToHex(v.Value)}
}

return scl.subID, nil
res := newSubcriptionBaseResponseJSON()
res.Method = "state_storage"
res.Params.Result = changeResult
res.Params.SubscriptionID = s.GetID()
s.wsconn.safeSend(res)
}

// StorageChangeListener for listening to state change channels
type StorageChangeListener struct {
Channel chan *state.SubscriptionResult
wsconn *WSConn
ChanID byte
subID int
// GetID the id for the Observer
func (s *StorageObserver) GetID() uint {
return s.id
}

// Listen implementation of Listen interface to listen for importedChan changes
func (l *StorageChangeListener) Listen() {
for change := range l.Channel {
if change == nil {
continue
}

result := make(map[string]interface{})
result["block"] = change.Hash.String()
changes := [][]string{}
for _, v := range change.Changes {
kv := []string{common.BytesToHex(v.Key), common.BytesToHex(v.Value)}
changes = append(changes, kv)
}
result["changes"] = changes

res := newSubcriptionBaseResponseJSON()
res.Method = "state_storage"
res.Params.Result = result
res.Params.SubscriptionID = l.subID
l.wsconn.safeSend(res)
}
// GetFilter returns the filter the Observer is using
func (s *StorageObserver) GetFilter() map[string][]byte {
return s.filter
}

// Listen to satisfy Listener interface (but is no longer used by StorageObserver)
func (s *StorageObserver) Listen() {}

// BlockListener to handle listening for blocks importedChan
type BlockListener struct {
Channel chan *types.Block
wsconn *WSConn
wsconn WSConnAPI
ChanID byte
subID int
}

func (c *WSConn) initBlockListener(reqID float64) (int, error) {
bl := &BlockListener{
Channel: make(chan *types.Block),
wsconn: c,
}

if c.BlockAPI == nil {
c.safeSendError(reqID, nil, "error BlockAPI not set")
return 0, fmt.Errorf("error BlockAPI not set")
}
chanID, err := c.BlockAPI.RegisterImportedChannel(bl.Channel)
if err != nil {
return 0, err
}
bl.ChanID = chanID
c.qtyListeners++
bl.subID = c.qtyListeners
c.Subscriptions[bl.subID] = bl
c.BlockSubChannels[bl.subID] = chanID
initRes := newSubscriptionResponseJSON(bl.subID, reqID)
c.safeSend(initRes)

return bl.subID, nil
subID uint
}

// Listen implementation of Listen interface to listen for importedChan changes
Expand All @@ -167,34 +115,9 @@ func (l *BlockListener) Listen() {
// BlockFinalizedListener to handle listening for finalized blocks
type BlockFinalizedListener struct {
channel chan *types.Header
wsconn *WSConn
wsconn WSConnAPI
chanID byte
subID int
}

func (c *WSConn) initBlockFinalizedListener(reqID float64) (int, error) {
bfl := &BlockFinalizedListener{
channel: make(chan *types.Header),
wsconn: c,
}

if c.BlockAPI == nil {
c.safeSendError(reqID, nil, "error BlockAPI not set")
return 0, fmt.Errorf("error BlockAPI not set")
}
chanID, err := c.BlockAPI.RegisterFinalizedChannel(bfl.channel)
if err != nil {
return 0, err
}
bfl.chanID = chanID
c.qtyListeners++
bfl.subID = c.qtyListeners
c.Subscriptions[bfl.subID] = bfl
c.BlockSubChannels[bfl.subID] = chanID
initRes := newSubscriptionResponseJSON(bfl.subID, reqID)
c.safeSend(initRes)

return bfl.subID, nil
subID uint
}

// Listen implementation of Listen interface to listen for importedChan changes
Expand All @@ -217,8 +140,8 @@ func (l *BlockFinalizedListener) Listen() {

// ExtrinsicSubmitListener to handle listening for extrinsic events
type ExtrinsicSubmitListener struct {
wsconn *WSConn
subID int
wsconn WSConnAPI
subID uint
extrinsic types.Extrinsic

importedChan chan *types.Block
Expand All @@ -231,55 +154,6 @@ type ExtrinsicSubmitListener struct {
// AuthorExtrinsicUpdates method name
const AuthorExtrinsicUpdates = "author_extrinsicUpdate"

func (c *WSConn) initExtrinsicWatch(reqID float64, params interface{}) (int, error) {
pA := params.([]interface{})
extBytes, err := common.HexToBytes(pA[0].(string))
if err != nil {
return 0, err
}

// listen for built blocks
esl := &ExtrinsicSubmitListener{
importedChan: make(chan *types.Block),
wsconn: c,
extrinsic: types.Extrinsic(extBytes),
finalizedChan: make(chan *types.Header),
}

if c.BlockAPI == nil {
return 0, fmt.Errorf("error BlockAPI not set")
}
esl.importedChanID, err = c.BlockAPI.RegisterImportedChannel(esl.importedChan)
if err != nil {
return 0, err
}

esl.finalizedChanID, err = c.BlockAPI.RegisterFinalizedChannel(esl.finalizedChan)
if err != nil {
return 0, err
}

c.qtyListeners++
esl.subID = c.qtyListeners
c.Subscriptions[esl.subID] = esl
c.BlockSubChannels[esl.subID] = esl.importedChanID

err = c.CoreAPI.HandleSubmittedExtrinsic(extBytes)
if err != nil {
return 0, err
}
c.safeSend(newSubscriptionResponseJSON(esl.subID, reqID))

// TODO (ed) since HandleSubmittedExtrinsic has been called we assume the extrinsic is in the tx queue
// should we add a channel to tx queue so we're notified when it's in the queue
if c.CoreAPI.IsBlockProducer() {
c.safeSend(newSubscriptionResponse(AuthorExtrinsicUpdates, esl.subID, "ready"))
}

// todo (ed) determine which peer extrinsic has been broadcast to, and set status
return esl.subID, err
}

// Listen implementation of Listen interface to listen for importedChan changes
func (l *ExtrinsicSubmitListener) Listen() {
// listen for imported blocks with extrinsic
Expand Down Expand Up @@ -319,28 +193,13 @@ func (l *ExtrinsicSubmitListener) Listen() {
// RuntimeVersionListener to handle listening for Runtime Version
type RuntimeVersionListener struct {
wsconn *WSConn
subID int
}

func (c *WSConn) initRuntimeVersionListener(reqID float64) (int, error) {
rvl := &RuntimeVersionListener{
wsconn: c,
}
if c.CoreAPI == nil {
c.safeSendError(reqID, nil, "error CoreAPI not set")
return 0, fmt.Errorf("error CoreAPI not set")
}
c.qtyListeners++
rvl.subID = c.qtyListeners
c.Subscriptions[rvl.subID] = rvl
initRes := newSubscriptionResponseJSON(rvl.subID, reqID)
c.safeSend(initRes)

return rvl.subID, nil
subID uint
}

// Listen implementation of Listen interface to listen for runtime version changes
func (l *RuntimeVersionListener) Listen() {
// This sends current runtime version once when subscription is created
// TODO (ed) add logic to send updates when runtime version changes
rtVersion, err := l.wsconn.CoreAPI.GetRuntimeVersion(nil)
if err != nil {
return
Expand Down
Loading

0 comments on commit e5ae30d

Please sign in to comment.