Skip to content

Commit

Permalink
[FAB-3105] Gossip needs to use comm package CA support
Browse files Browse the repository at this point in the history
The current gossip implementation is initialized with a
set of dial options which don't change during the lifecycle
of the peer.

In order to resolve this, the gossip comm implementation
has been modified to accept a function which returns the
appropriate dial options for secure communincations with
other peers.

When used in the peer, the function passed in
leverages the core/comm packages CASupport.

Change-Id: I99cc4204e27c67f93f8f1ba91891fdb97081c99e
Signed-off-by: Gari Singh <gari.r.singh@gmail.com>
  • Loading branch information
mastersingh24 committed Apr 30, 2017
1 parent 90e36d1 commit f0acc68
Show file tree
Hide file tree
Showing 16 changed files with 187 additions and 92 deletions.
26 changes: 26 additions & 0 deletions core/comm/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,32 @@ func (cas *CASupport) GetDeliverServiceCredentials() credentials.TransportCreden
return creds
}

// GetPeerCredentials returns GRPC transport credentials for use by GRPC
// clients which communicate with remote peer endpoints.
func (cas *CASupport) GetPeerCredentials() credentials.TransportCredentials {
var creds credentials.TransportCredentials
var tlsConfig = &tls.Config{}
var certPool = x509.NewCertPool()
// loop through the orderer CAs
roots, _ := cas.GetServerRootCAs()
for _, root := range roots {
block, _ := pem.Decode(root)
if block != nil {
cert, err := x509.ParseCertificate(block.Bytes)
if err == nil {
certPool.AddCert(cert)
} else {
commLogger.Warningf("Failed to add root cert to credentials (%s)", err)
}
} else {
commLogger.Warning("Failed to add root cert to credentials")
}
}
tlsConfig.RootCAs = certPool
creds = credentials.NewTLS(tlsConfig)
return creds
}

// GetClientRootCAs returns the PEM-encoded root certificates for all of the
// application and orderer organizations defined for all chains. The root
// certificates returned should be used to set the trusted client roots for
Expand Down
6 changes: 6 additions & 0 deletions core/comm/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ func TestCASupport(t *testing.T) {
assert.Exactly(t, casClone, cas, "Expected GetCASupport to be a singleton")

creds := cas.GetDeliverServiceCredentials()
assert.Equal(t, "1.2", creds.Info().SecurityVersion,
"Expected Security version to be 1.2")
creds = cas.GetPeerCredentials()
assert.Equal(t, "1.2", creds.Info().SecurityVersion,
"Expected Security version to be 1.2")

Expand All @@ -148,5 +151,8 @@ func TestCASupport(t *testing.T) {
creds = cas.GetDeliverServiceCredentials()
assert.Equal(t, "1.2", creds.Info().SecurityVersion,
"Expected Security version to be 1.2")
creds = cas.GetPeerCredentials()
assert.Equal(t, "1.2", creds.Info().SecurityVersion,
"Expected Security version to be 1.2")

}
7 changes: 6 additions & 1 deletion core/peer/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,15 @@ func TestCreateChainFromBlock(t *testing.T) {
identity, _ := mgmt.GetLocalSigningIdentityOrPanic().Serialize()
messageCryptoService := peergossip.NewMCS(&mocks.ChannelPolicyManagerGetter{}, localmsp.NewSigner(), mgmt.NewDeserializersManager())
secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager())
var defaultSecureDialOpts = func() []grpc.DialOption {
var dialOpts []grpc.DialOption
dialOpts = append(dialOpts, grpc.WithInsecure())
return dialOpts
}
service.InitGossipServiceCustomDeliveryFactory(
identity, "localhost:13611", grpcServer,
&mockDeliveryClientFactory{},
messageCryptoService, secAdv)
messageCryptoService, secAdv, defaultSecureDialOpts)

err = CreateChainFromBlock(block)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/scc/cscc/configure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestConfigerInvokeJoinChainCorrectParams(t *testing.T) {
identity, _ := mgmt.GetLocalSigningIdentityOrPanic().Serialize()
messageCryptoService := peergossip.NewMCS(&mocks.ChannelPolicyManagerGetter{}, localmsp.NewSigner(), mgmt.NewDeserializersManager())
secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager())
service.InitGossipServiceCustomDeliveryFactory(identity, peerEndpoint, nil, &mockDeliveryClientFactory{}, messageCryptoService, secAdv)
service.InitGossipServiceCustomDeliveryFactory(identity, peerEndpoint, nil, &mockDeliveryClientFactory{}, messageCryptoService, secAdv, nil)

// Successful path for JoinChain
blockBytes := mockConfigBlock()
Expand Down
9 changes: 8 additions & 1 deletion gossip/api/crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ limitations under the License.

package api

import "github.com/hyperledger/fabric/gossip/common"
import (
"github.com/hyperledger/fabric/gossip/common"
"google.golang.org/grpc"
)

// MessageCryptoService is the contract between the gossip component and the
// peer's cryptographic layer and is used by the gossip component to verify,
Expand Down Expand Up @@ -61,3 +64,7 @@ type PeerIdentityType []byte
// PeerSuspector returns whether a peer with a given identity is suspected
// as being revoked, or its CA is revoked
type PeerSuspector func(identity PeerIdentityType) bool

// PeerSecureDialOpts returns the gRPC DialOptions to use for connection level
// security when communicating with remote peer endpoints
type PeerSecureDialOpts func() []grpc.DialOption
117 changes: 69 additions & 48 deletions gossip/comm/comm_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,37 +65,38 @@ func (c *commImpl) SetDialOpts(opts ...grpc.DialOption) {
}

// NewCommInstanceWithServer creates a comm instance that creates an underlying gRPC server
func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType, dialOpts ...grpc.DialOption) (Comm, error) {
func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,
secureDialOpts api.PeerSecureDialOpts, dialOpts ...grpc.DialOption) (Comm, error) {

var ll net.Listener
var s *grpc.Server
var secOpt grpc.DialOption
var certHash []byte

if len(dialOpts) == 0 {
dialOpts = []grpc.DialOption{grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout))}
}

if port > 0 {
s, ll, secOpt, certHash = createGRPCLayer(port)
dialOpts = append(dialOpts, secOpt)
s, ll, secureDialOpts, certHash = createGRPCLayer(port)
}

commInst := &commImpl{
selfCertHash: certHash,
PKIID: idMapper.GetPKIidOfCert(peerIdentity),
idMapper: idMapper,
logger: util.GetLogger(util.LoggingCommModule, fmt.Sprintf("%d", port)),
peerIdentity: peerIdentity,
opts: dialOpts,
port: port,
lsnr: ll,
gSrv: s,
msgPublisher: NewChannelDemultiplexer(),
lock: &sync.RWMutex{},
deadEndpoints: make(chan common.PKIidType, 100),
stopping: int32(0),
exitChan: make(chan struct{}, 1),
subscriptions: make([]chan proto.ReceivedMessage, 0),
selfCertHash: certHash,
PKIID: idMapper.GetPKIidOfCert(peerIdentity),
idMapper: idMapper,
logger: util.GetLogger(util.LoggingCommModule, fmt.Sprintf("%d", port)),
peerIdentity: peerIdentity,
opts: dialOpts,
secureDialOpts: secureDialOpts,
port: port,
lsnr: ll,
gSrv: s,
msgPublisher: NewChannelDemultiplexer(),
lock: &sync.RWMutex{},
deadEndpoints: make(chan common.PKIidType, 100),
stopping: int32(0),
exitChan: make(chan struct{}, 1),
subscriptions: make([]chan proto.ReceivedMessage, 0),
}
commInst.connStore = newConnStore(commInst, commInst.logger)
commInst.idMapper.Put(idMapper.GetPKIidOfCert(peerIdentity), peerIdentity)
Expand All @@ -117,9 +118,12 @@ func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity
}

// NewCommInstance creates a new comm instance that binds itself to the given gRPC server
func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper, peerIdentity api.PeerIdentityType, dialOpts ...grpc.DialOption) (Comm, error) {
func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,
peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts,
dialOpts ...grpc.DialOption) (Comm, error) {

dialOpts = append(dialOpts, grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout)))
commInst, err := NewCommInstanceWithServer(-1, idStore, peerIdentity, dialOpts...)
commInst, err := NewCommInstanceWithServer(-1, idStore, peerIdentity, secureDialOpts, dialOpts...)
if err != nil {
return nil, err
}
Expand All @@ -139,24 +143,25 @@ func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Map
}

type commImpl struct {
skipHandshake bool
selfCertHash []byte
peerIdentity api.PeerIdentityType
idMapper identity.Mapper
logger *logging.Logger
opts []grpc.DialOption
connStore *connectionStore
PKIID []byte
port int
deadEndpoints chan common.PKIidType
msgPublisher *ChannelDeMultiplexer
lock *sync.RWMutex
lsnr net.Listener
gSrv *grpc.Server
exitChan chan struct{}
stopping int32
stopWG sync.WaitGroup
subscriptions []chan proto.ReceivedMessage
skipHandshake bool
selfCertHash []byte
peerIdentity api.PeerIdentityType
idMapper identity.Mapper
logger *logging.Logger
opts []grpc.DialOption
secureDialOpts func() []grpc.DialOption
connStore *connectionStore
PKIID []byte
port int
deadEndpoints chan common.PKIidType
msgPublisher *ChannelDeMultiplexer
lock *sync.RWMutex
lsnr net.Listener
gSrv *grpc.Server
exitChan chan struct{}
stopping int32
stopWG sync.WaitGroup
subscriptions []chan proto.ReceivedMessage
}

func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) {
Expand All @@ -165,14 +170,18 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT
var stream proto.Gossip_GossipStreamClient
var pkiID common.PKIidType
var connInfo *proto.ConnectionInfo
var dialOpts []grpc.DialOption

c.logger.Debug("Entering", endpoint, expectedPKIID)
defer c.logger.Debug("Exiting")

if c.isStopping() {
return nil, errors.New("Stopping")
}
cc, err = grpc.Dial(endpoint, append(c.opts, grpc.WithBlock())...)
dialOpts = append(dialOpts, c.secureDialOpts()...)
dialOpts = append(dialOpts, grpc.WithBlock())
dialOpts = append(dialOpts, c.opts...)
cc, err = grpc.Dial(endpoint, dialOpts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -257,13 +266,18 @@ func (c *commImpl) isStopping() bool {
}

func (c *commImpl) Probe(remotePeer *RemotePeer) error {
var dialOpts []grpc.DialOption
endpoint := remotePeer.Endpoint
pkiID := remotePeer.PKIID
if c.isStopping() {
return errors.New("Stopping")
}
c.logger.Debug("Entering, endpoint:", endpoint, "PKIID:", pkiID)
cc, err := grpc.Dial(remotePeer.Endpoint, append(c.opts, grpc.WithBlock())...)
dialOpts = append(dialOpts, c.secureDialOpts()...)
dialOpts = append(dialOpts, grpc.WithBlock())
dialOpts = append(dialOpts, c.opts...)

cc, err := grpc.Dial(remotePeer.Endpoint, dialOpts...)
if err != nil {
c.logger.Debug("Returning", err)
return err
Expand All @@ -276,7 +290,12 @@ func (c *commImpl) Probe(remotePeer *RemotePeer) error {
}

func (c *commImpl) Handshake(remotePeer *RemotePeer) (api.PeerIdentityType, error) {
cc, err := grpc.Dial(remotePeer.Endpoint, append(c.opts, grpc.WithBlock())...)
var dialOpts []grpc.DialOption
dialOpts = append(dialOpts, c.secureDialOpts()...)
dialOpts = append(dialOpts, grpc.WithBlock())
dialOpts = append(dialOpts, c.opts...)

cc, err := grpc.Dial(remotePeer.Endpoint, dialOpts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -590,13 +609,13 @@ type stream interface {
grpc.Stream
}

func createGRPCLayer(port int) (*grpc.Server, net.Listener, grpc.DialOption, []byte) {
func createGRPCLayer(port int) (*grpc.Server, net.Listener, api.PeerSecureDialOpts, []byte) {
var returnedCertHash []byte
var s *grpc.Server
var ll net.Listener
var err error
var serverOpts []grpc.ServerOption
var dialOpts grpc.DialOption
var dialOpts []grpc.DialOption

keyFileName := fmt.Sprintf("key.%d.pem", util.RandomUInt64())
certFileName := fmt.Sprintf("cert.%d.pem", util.RandomUInt64())
Expand Down Expand Up @@ -627,17 +646,19 @@ func createGRPCLayer(port int) (*grpc.Server, net.Listener, grpc.DialOption, []b
Certificates: []tls.Certificate{cert},
InsecureSkipVerify: true,
})
dialOpts = grpc.WithTransportCredentials(&authCreds{tlsCreds: ta})
dialOpts = append(dialOpts, grpc.WithTransportCredentials(&authCreds{tlsCreds: ta}))
} else {
dialOpts = grpc.WithInsecure()
dialOpts = append(dialOpts, grpc.WithInsecure())
}

listenAddress := fmt.Sprintf("%s:%d", "", port)
ll, err = net.Listen("tcp", listenAddress)
if err != nil {
panic(err)
}

secureDialOpts := func() []grpc.DialOption {
return dialOpts
}
s = grpc.NewServer(serverOpts...)
return s, ll, dialOpts, returnedCertHash
return s, ll, secureDialOpts, returnedCertHash
}
2 changes: 1 addition & 1 deletion gossip/comm/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (*naiveSecProvider) VerifyByChannel(_ common.ChainID, _ api.PeerIdentityTyp

func newCommInstance(port int, sec api.MessageCryptoService) (Comm, error) {
endpoint := fmt.Sprintf("localhost:%d", port)
inst, err := NewCommInstanceWithServer(port, identity.NewIdentityMapper(sec), []byte(endpoint))
inst, err := NewCommInstanceWithServer(port, identity.NewIdentityMapper(sec), []byte(endpoint), nil)
return inst, err
}

Expand Down
24 changes: 15 additions & 9 deletions gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,18 @@ type gossipServiceImpl struct {
}

// NewGossipService creates a gossip instance attached to a gRPC server
func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvisor, mcs api.MessageCryptoService, idMapper identity.Mapper, selfIdentity api.PeerIdentityType, dialOpts ...grpc.DialOption) Gossip {
func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvisor,
mcs api.MessageCryptoService, idMapper identity.Mapper, selfIdentity api.PeerIdentityType,
secureDialOpts api.PeerSecureDialOpts) Gossip {

var c comm.Comm
var err error

lgr := util.GetLogger(util.LoggingGossipModule, conf.ID)
if s == nil {
c, err = createCommWithServer(conf.BindPort, idMapper, selfIdentity)
c, err = createCommWithServer(conf.BindPort, idMapper, selfIdentity, secureDialOpts)
} else {
c, err = createCommWithoutServer(s, conf.TLSServerCert, idMapper, selfIdentity, dialOpts...)
c, err = createCommWithoutServer(s, conf.TLSServerCert, idMapper, selfIdentity, secureDialOpts)
}

if err != nil {
Expand Down Expand Up @@ -159,17 +162,20 @@ func newChannelState(g *gossipServiceImpl) *channelState {
}
}

func createCommWithoutServer(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper, identity api.PeerIdentityType, dialOpts ...grpc.DialOption) (comm.Comm, error) {
return comm.NewCommInstance(s, cert, idStore, identity, dialOpts...)
func createCommWithoutServer(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,
identity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts) (comm.Comm, error) {
return comm.NewCommInstance(s, cert, idStore, identity, secureDialOpts)
}

// NewGossipServiceWithServer creates a new gossip instance with a gRPC server
func NewGossipServiceWithServer(conf *Config, secAdvisor api.SecurityAdvisor, mcs api.MessageCryptoService, mapper identity.Mapper, identity api.PeerIdentityType) Gossip {
return NewGossipService(conf, nil, secAdvisor, mcs, mapper, identity)
func NewGossipServiceWithServer(conf *Config, secAdvisor api.SecurityAdvisor, mcs api.MessageCryptoService,
mapper identity.Mapper, identity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts) Gossip {
return NewGossipService(conf, nil, secAdvisor, mcs, mapper, identity, secureDialOpts)
}

func createCommWithServer(port int, idStore identity.Mapper, identity api.PeerIdentityType) (comm.Comm, error) {
return comm.NewCommInstanceWithServer(port, idStore, identity)
func createCommWithServer(port int, idStore identity.Mapper, identity api.PeerIdentityType,
secureDialOpts api.PeerSecureDialOpts) (comm.Comm, error) {
return comm.NewCommInstanceWithServer(port, idStore, identity, secureDialOpts)
}

func (g *gossipServiceImpl) toDie() bool {
Expand Down
6 changes: 4 additions & 2 deletions gossip/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ func newGossipInstanceWithCustomMCS(portPrefix int, id int, maxMsgCount int, mcs
}

idMapper := identity.NewIdentityMapper(mcs)
g := NewGossipServiceWithServer(conf, &orgCryptoService{}, mcs, idMapper, api.PeerIdentityType(conf.InternalEndpoint))
g := NewGossipServiceWithServer(conf, &orgCryptoService{}, mcs, idMapper,
api.PeerIdentityType(conf.InternalEndpoint), nil)

return g
}
Expand Down Expand Up @@ -251,7 +252,8 @@ func newGossipInstanceWithOnlyPull(portPrefix int, id int, maxMsgCount int, boot
cryptoService := &naiveCryptoService{}
idMapper := identity.NewIdentityMapper(cryptoService)

g := NewGossipServiceWithServer(conf, &orgCryptoService{}, cryptoService, idMapper, api.PeerIdentityType(conf.InternalEndpoint))
g := NewGossipServiceWithServer(conf, &orgCryptoService{}, cryptoService, idMapper,
api.PeerIdentityType(conf.InternalEndpoint), nil)
return g
}

Expand Down
3 changes: 2 additions & 1 deletion gossip/gossip/orgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ func newGossipInstanceWithExternalEndpoint(portPrefix int, id int, mcs *configur
}

idMapper := identity.NewIdentityMapper(mcs)
g := NewGossipServiceWithServer(conf, mcs, mcs, idMapper, api.PeerIdentityType(conf.InternalEndpoint))
g := NewGossipServiceWithServer(conf, mcs, mcs, idMapper, api.PeerIdentityType(conf.InternalEndpoint),
nil)

return g
}
Expand Down
Loading

0 comments on commit f0acc68

Please sign in to comment.