Skip to content

Commit

Permalink
[FAB-2778] Msg store update
Browse files Browse the repository at this point in the history
Update message store to automatically expire messages.

Includes new go routine and expiration callback functions

Change-Id: I7c4ec11c7392015e65de5d0553bd1e4c8aca3e5c
Signed-off-by: Gennady Laventman <gennady@il.ibm.com>
  • Loading branch information
gennadylaventman committed Apr 9, 2017
1 parent 2663d8b commit aa84135
Show file tree
Hide file tree
Showing 2 changed files with 294 additions and 11 deletions.
164 changes: 153 additions & 11 deletions gossip/gossip/msgstore/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@ package msgstore
import (
"sync"

"time"

"github.com/hyperledger/fabric/gossip/common"
)

var noopLock = func() {}

// invalidationTrigger is invoked on each message that was invalidated because of a message addition
// i.e: if add(0), add(1) was called one after the other, and the store has only {1} after the sequence of invocations
// then the invalidation trigger on 0 was called when 1 was added.
Expand All @@ -30,7 +34,49 @@ type invalidationTrigger func(message interface{})
// NewMessageStore returns a new MessageStore with the message replacing
// policy and invalidation trigger passed.
func NewMessageStore(pol common.MessageReplacingPolicy, trigger invalidationTrigger) MessageStore {
return &messageStoreImpl{pol: pol, lock: &sync.RWMutex{}, messages: make([]*msg, 0), invTrigger: trigger}
return newMsgStore(pol, trigger)
}

// NewMessageStoreExpirable returns a new MessageStore with the message replacing
// policy and invalidation trigger passed. It supports old message expiration after msgTTL, during expiration first external
// lock taken, expiration callback invoked and external lock released. Callback and external lock can be nil.
func NewMessageStoreExpirable(pol common.MessageReplacingPolicy, trigger invalidationTrigger, msgTTL time.Duration, externalLock func(), externalUnlock func(), externalExpire func(interface{})) MessageStore {
store := newMsgStore(pol, trigger)

store.expirable = true
store.msgTTL = msgTTL

if externalLock != nil {
store.externalLock = externalLock
}

if externalUnlock != nil {
store.externalUnlock = externalUnlock
}

if externalExpire != nil {
store.expireMsgCallback = externalExpire
}

go store.expirationRoutine()
return store
}

func newMsgStore(pol common.MessageReplacingPolicy, trigger invalidationTrigger) *messageStoreImpl {
return &messageStoreImpl{
pol: pol,
messages: make([]*msg, 0),
invTrigger: trigger,

expirable: false,
externalLock: noopLock,
externalUnlock: noopLock,
expireMsgCallback: func(m interface{}) {},
expiredCount: 0,

doneCh: make(chan struct{}),
}

}

// MessageStore adds messages to an internal buffer.
Expand All @@ -44,22 +90,42 @@ type MessageStore interface {
// returns true or false whether the message was added to the store
Add(msg interface{}) bool

// Checks if message is valid for insertion to store
// returns true or false whether the message can be added to the store
CheckValid(msg interface{}) bool

// size returns the amount of messages in the store
Size() int

// get returns all messages in the store
Get() []interface{}

// Stop all associated go routines
Stop()
}

type messageStoreImpl struct {
pol common.MessageReplacingPolicy
lock *sync.RWMutex
lock sync.RWMutex
messages []*msg
invTrigger invalidationTrigger

expirable bool
msgTTL time.Duration
expiredCount int

externalLock func()
externalUnlock func()
expireMsgCallback func(msg interface{})

doneCh chan struct{}
stopOnce sync.Once
}

type msg struct {
data interface{}
data interface{}
created time.Time
expired bool
}

// add adds a message to the store
Expand All @@ -78,32 +144,108 @@ func (s *messageStoreImpl) Add(message interface{}) bool {
s.messages = append(s.messages[:i], s.messages[i+1:]...)
n--
i--
break
default:
break
}
}

s.messages = append(s.messages, &msg{data: message})
s.messages = append(s.messages, &msg{data: message, created: time.Now()})
return true
}

// Checks if message is valid for insertion to store
func (s *messageStoreImpl) CheckValid(message interface{}) bool {
s.lock.RLock()
defer s.lock.RUnlock()

for _, m := range s.messages {
if s.pol(message, m.data) == common.MessageInvalidated {
return false
}
}
return true
}

// size returns the amount of messages in the store
func (s *messageStoreImpl) Size() int {
s.lock.RLock()
defer s.lock.RUnlock()
return len(s.messages)
return len(s.messages) - s.expiredCount
}

// get returns all messages in the store
func (s *messageStoreImpl) Get() []interface{} {
res := make([]interface{}, 0)

s.lock.RLock()
defer s.lock.RUnlock()

for _, msg := range s.messages {
if !msg.expired {
res = append(res, msg.data)
}
}
return res
}

func (s *messageStoreImpl) expireMessages() {
s.externalLock()
s.lock.Lock()
defer s.lock.Unlock()
defer s.externalUnlock()

n := len(s.messages)
res := make([]interface{}, n)
for i := 0; i < n; i++ {
res[i] = s.messages[i].data
m := s.messages[i]
if !m.expired {
if time.Since(m.created) > s.msgTTL {
m.expired = true
s.expireMsgCallback(m.data)
s.expiredCount++
}
} else {
if time.Since(m.created) > (s.msgTTL * 2) {
s.messages = append(s.messages[:i], s.messages[i+1:]...)
n--
i--
s.expiredCount--
}

}
}
return res
}

func (s *messageStoreImpl) needToExpire() bool {
s.lock.RLock()
defer s.lock.RUnlock()
for _, msg := range s.messages {
if !msg.expired && time.Since(msg.created) > s.msgTTL {
return true
} else if time.Since(msg.created) > (s.msgTTL * 2) {
return true
}
}
return false
}

func (s *messageStoreImpl) expirationRoutine() {
for {
select {
case <-s.doneCh:
return
case <-time.After(s.expirationCheckInterval()):
if s.needToExpire() {
s.expireMessages()
}
}
}
}

func (s *messageStoreImpl) Stop() {
stopFunc := func() {
close(s.doneCh)
}
s.stopOnce.Do(stopFunc)
}

func (s *messageStoreImpl) expirationCheckInterval() time.Duration {
return s.msgTTL / 100
}
141 changes: 141 additions & 0 deletions gossip/gossip/msgstore/msgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

"sync"

"github.com/hyperledger/fabric/gossip/common"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -51,6 +53,16 @@ func compareInts(this interface{}, that interface{}) common.InvalidationResult {
return common.MessageInvalidated
}

func nonReplaceInts(this interface{}, that interface{}) common.InvalidationResult {
a := this.(int)
b := that.(int)
if a == b {
return common.MessageInvalidated
}

return common.MessageNoAction
}

func TestSize(t *testing.T) {
msgStore := NewMessageStore(alwaysNoAction, noopTrigger)
msgStore.Add(0)
Expand Down Expand Up @@ -108,6 +120,7 @@ func TestNewMessagesInvalidated(t *testing.T) {
}

func TestConcurrency(t *testing.T) {
t.Parallel()
stopFlag := int32(0)
msgStore := NewMessageStore(compareInts, noopTrigger)
looper := func(f func()) func() {
Expand Down Expand Up @@ -141,3 +154,131 @@ func TestConcurrency(t *testing.T) {

atomic.CompareAndSwapInt32(&stopFlag, 0, 1)
}

func TestExpiration(t *testing.T) {
t.Parallel()
expired := make([]int, 0)
msgTTL := time.Second * 3

msgStore := NewMessageStoreExpirable(nonReplaceInts, noopTrigger, msgTTL, nil, nil, func(m interface{}) {
expired = append(expired, m.(int))
})

for i := 0; i < 10; i++ {
assert.True(t, msgStore.Add(i), "Adding", i)
}

assert.Equal(t, 10, msgStore.Size(), "Wrong number of items in store - first batch")

time.Sleep(time.Second * 2)

for i := 0; i < 10; i++ {
assert.False(t, msgStore.CheckValid(i))
assert.False(t, msgStore.Add(i))
}

for i := 10; i < 20; i++ {
assert.True(t, msgStore.CheckValid(i))
assert.True(t, msgStore.Add(i))
assert.False(t, msgStore.CheckValid(i))
}
assert.Equal(t, 20, msgStore.Size(), "Wrong number of items in store - second batch")

time.Sleep(time.Second * 2)

for i := 0; i < 20; i++ {
assert.False(t, msgStore.Add(i))
}

assert.Equal(t, 10, msgStore.Size(), "Wrong number of items in store - after first batch expiration")
assert.Equal(t, 10, len(expired), "Wrong number of expired msgs - after first batch expiration")

time.Sleep(time.Second * 4)

assert.Equal(t, 0, msgStore.Size(), "Wrong number of items in store - after second batch expiration")
assert.Equal(t, 20, len(expired), "Wrong number of expired msgs - after second batch expiration")

for i := 0; i < 10; i++ {
assert.True(t, msgStore.CheckValid(i))
assert.True(t, msgStore.Add(i))
assert.False(t, msgStore.CheckValid(i))
}

assert.Equal(t, 10, msgStore.Size(), "Wrong number of items in store - after second batch expiration and first banch re-added")

}

func TestExpirationConcurrency(t *testing.T) {
t.Parallel()
expired := make([]int, 0)
msgTTL := time.Second * 3
lock := &sync.RWMutex{}

msgStore := NewMessageStoreExpirable(nonReplaceInts, noopTrigger, msgTTL,
func() {
lock.Lock()
},
func() {
lock.Unlock()
},
func(m interface{}) {
expired = append(expired, m.(int))
})

lock.Lock()
for i := 0; i < 10; i++ {
assert.True(t, msgStore.Add(i), "Adding", i)
}
assert.Equal(t, 10, msgStore.Size(), "Wrong number of items in store - first batch")
lock.Unlock()

time.Sleep(time.Second * 2)

lock.Lock()
time.Sleep(time.Second * 2)

for i := 0; i < 10; i++ {
assert.False(t, msgStore.Add(i))
}

assert.Equal(t, 10, msgStore.Size(), "Wrong number of items in store - after first batch expiration, external lock taken")
assert.Equal(t, 0, len(expired), "Wrong number of expired msgs - after first batch expiration, external lock taken")
lock.Unlock()

time.Sleep(time.Second * 1)

lock.Lock()
for i := 0; i < 10; i++ {
assert.False(t, msgStore.Add(i))
}

assert.Equal(t, 0, msgStore.Size(), "Wrong number of items in store - after first batch expiration, expiration should run")
assert.Equal(t, 10, len(expired), "Wrong number of expired msgs - after first batch expiration, expiration should run")

lock.Unlock()
}

func TestStop(t *testing.T) {
t.Parallel()
expired := make([]int, 0)
msgTTL := time.Second * 3

msgStore := NewMessageStoreExpirable(nonReplaceInts, noopTrigger, msgTTL, nil, nil, func(m interface{}) {
expired = append(expired, m.(int))
})

for i := 0; i < 10; i++ {
assert.True(t, msgStore.Add(i), "Adding", i)
}

assert.Equal(t, 10, msgStore.Size(), "Wrong number of items in store - first batch")

msgStore.Stop()

time.Sleep(time.Second * 4)

assert.Equal(t, 10, msgStore.Size(), "Wrong number of items in store - after first batch expiration, but store was stopped, so no expiration")
assert.Equal(t, 0, len(expired), "Wrong number of expired msgs - after first batch expiration, but store was stopped, so no expiration")

msgStore.Stop()
}

0 comments on commit aa84135

Please sign in to comment.