Skip to content

Commit

Permalink
[FAB-2828] Add resilient delivery client to peer
Browse files Browse the repository at this point in the history
This commit adds a resilient delivery client to the peer's deliveryService.
This client:
	- Accepts the following arguments at creation:
		- Connection Producer (previous change set)
		- clientFactory (creates AtomicBroadcastClient from ClientConn)
		- broadcastSetup (function that is going to be used
		  to send a Send() with SeekInfo to orderer)
		- backoffStrategy - retry logic descriptor,
		  a function that can implement any kind of backoff policy,
		  i.e exponential backoff, etc. etc.
	- Able to reconnect to the ordering service when the connection is broken
	- Hides all failure handling and reconnection logic from its user

In a later commit I will:
	- Move the connection creation code that is invoked only once at creation
          instead of when needed, to the factory of the core/comm/producer.go
          that was created in the previous commit.
	- Change the blocksprovider accordingly to use this
	  newly introduced broadcastClient

Provided 12 test cases with code coverage of 100%

Change-Id: I96a46b76e8fb227eb8bea4c8ded9b788e4fd0eef
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Mar 27, 2017
1 parent 046a667 commit f9fa8d6
Show file tree
Hide file tree
Showing 3 changed files with 776 additions and 0 deletions.
170 changes: 170 additions & 0 deletions core/deliverservice/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
Copyright IBM Corp. 2017 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package deliverclient

import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/core/deliverservice/blocksprovider"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/orderer"
"golang.org/x/net/context"
"google.golang.org/grpc"
)

// broadcastSetup is a function that is called by the broadcastClient immediately after each
// successful connection to the ordering service
type broadcastSetup func(blocksprovider.BlocksDeliverer) error

// retryPolicy receives as parameters the number of times the attempt has failed
// and a duration that specifies the total elapsed time passed since the first attempt.
// If further attempts should be made, it returns:
// - a time duration after which the next attempt would be made, true
// Else, a zero duration, false
type retryPolicy func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool)

// clientFactory creates a gRPC broadcast client out of a ClientConn
type clientFactory func(*grpc.ClientConn) orderer.AtomicBroadcastClient

type broadcastClient struct {
stopFlag int32
sync.RWMutex
stopChan chan struct{}
createClient clientFactory
shouldRetry retryPolicy
onConnect broadcastSetup
prod comm.ConnectionProducer
blocksprovider.BlocksDeliverer
conn *grpc.ClientConn
}

// NewBroadcastClient returns a broadcastClient with the given params
func NewBroadcastClient(prod comm.ConnectionProducer, clFactory clientFactory, onConnect broadcastSetup, bos retryPolicy) *broadcastClient {
return &broadcastClient{prod: prod, onConnect: onConnect, shouldRetry: bos, createClient: clFactory, stopChan: make(chan struct{}, 1)}
}

// Recv receives a message from the ordering service
func (bc *broadcastClient) Recv() (*orderer.DeliverResponse, error) {
o, err := bc.try(func() (interface{}, error) {
return bc.BlocksDeliverer.Recv()
})
if err != nil {
return nil, err
}
return o.(*orderer.DeliverResponse), nil
}

// Send sends a message to the ordering service
func (bc *broadcastClient) Send(msg *common.Envelope) error {
_, err := bc.try(func() (interface{}, error) {
return nil, bc.BlocksDeliverer.Send(msg)
})
return err
}

func (bc *broadcastClient) try(action func() (interface{}, error)) (interface{}, error) {
attempt := 0
start := time.Now()
var backoffDuration time.Duration
retry := true
for retry && !bc.shouldStop() {
attempt++
resp, err := bc.doAction(action)
if err != nil {
backoffDuration, retry = bc.shouldRetry(attempt, time.Since(start))
if !retry {
break
}
bc.sleep(backoffDuration)
continue
}
return resp, nil
}
if bc.shouldStop() {
return nil, errors.New("Client is closing")
}
return nil, fmt.Errorf("Attempts (%d) or elapsed time (%v) exhausted", attempt, time.Since(start))
}

func (bc *broadcastClient) doAction(action func() (interface{}, error)) (interface{}, error) {
if bc.BlocksDeliverer == nil {
err := bc.connect()
if err != nil {
return nil, err
}
}
resp, err := action()
if err != nil {
bc.conn.Close()
bc.BlocksDeliverer = nil
bc.conn = nil
return nil, err
}
return resp, nil
}

func (bc *broadcastClient) sleep(duration time.Duration) {
select {
case <-time.After(duration):
case <-bc.stopChan:
}
}

func (bc *broadcastClient) connect() error {
conn, endpoint, err := bc.prod.NewConnection()
if err != nil {
logger.Error("Failed obtaining connection:", err)
return err
}
abc, err := bc.createClient(conn).Deliver(context.Background())
if err != nil {
logger.Error("Connection to ", endpoint, "established but was unable to create gRPC stream:", err)
conn.Close()
return err
}
err = bc.onConnect(bc)
if err == nil {
bc.Lock()
bc.conn = conn
bc.Unlock()
bc.BlocksDeliverer = abc
return nil
}
logger.Error("Failed setting up broadcast:", err)
conn.Close()
return err
}

func (bc *broadcastClient) shouldStop() bool {
return atomic.LoadInt32(&bc.stopFlag) == int32(1)
}

func (bc *broadcastClient) Close() {
atomic.StoreInt32(&bc.stopFlag, int32(1))
bc.stopChan <- struct{}{}
bc.RLock()
defer bc.RUnlock()
if bc.conn == nil {
return
}
bc.conn.Close()
}
Loading

0 comments on commit f9fa8d6

Please sign in to comment.