Skip to content

Commit

Permalink
Merge pull request #907 from lenny-intel/secure-bus
Browse files Browse the repository at this point in the history
feat: Add secure MessageBus capability
  • Loading branch information
cloudxxx8 authored May 10, 2021
2 parents 70dff52 + 692b286 commit edddd13
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 102 deletions.
12 changes: 6 additions & 6 deletions example/cmd/device-simple/res/configuration.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ AsyncBufferSize = 1
# MaxRequestSize limit the request body size in byte of put command
# value 0 unlimit the request size.
MaxRequestSize = 0
UseMessageBus = false

[Registry]
Host = 'localhost'
Expand All @@ -41,11 +42,12 @@ Type = 'consul'
Port = 48081

[MessageQueue]
Enabled = false
Protocol = 'tcp'
Protocol = 'redis'
Host = 'localhost'
Port = 1883
Type = 'mqtt'
Port = 6379
Type = 'redis'
AuthMode = 'usernamepassword' # required for redis messagebus (secure or insecure).
SecretName = "redisdb"
PublishTopicPrefix = 'edgex/events' # /<device-profile-name>/<device-name>/<source-name> will be added to this Publish Topic prefix
[MessageQueue.Optional]
# Default MQTT Specific options that need to be here to enable environment variable overrides of them
Expand All @@ -58,8 +60,6 @@ PublishTopicPrefix = 'edgex/events' # /<device-profile-name>/<device-name>/<sour
AutoReconnect = "true"
ConnectTimeout = "5" # Seconds
SkipCertVerify = "false" # Only used if Cert/Key file or Cert/Key PEMblock are specified
ClientAuth = "none" # Valid values are: `none`, `usernamepassword` or `clientcert`
Secretpath = "messagebus" # Path in secret store used if ClientAuth not `none`

# Example SecretStore configuration.
# Only used when EDGEX_SECURITY_SECRET_STORE=true
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ module github.com/edgexfoundry/device-sdk-go/v2
require (
bitbucket.org/bertimus9/systemstat v0.0.0-20180207000608-0eeff89b0690
github.com/OneOfOne/xxhash v1.2.8
github.com/edgexfoundry/go-mod-bootstrap/v2 v2.0.0-dev.34
github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.0-dev.77
github.com/edgexfoundry/go-mod-messaging/v2 v2.0.0-dev.11
github.com/edgexfoundry/go-mod-registry/v2 v2.0.0-dev.4
github.com/edgexfoundry/go-mod-bootstrap/v2 v2.0.0-dev.46
github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.0-dev.82
github.com/edgexfoundry/go-mod-messaging/v2 v2.0.0-dev.13
github.com/edgexfoundry/go-mod-registry/v2 v2.0.0-dev.7
github.com/google/uuid v1.2.0
github.com/gorilla/mux v1.8.0
github.com/pelletier/go-toml v1.9.0
Expand Down
59 changes: 0 additions & 59 deletions internal/clients/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger"
"github.com/edgexfoundry/go-mod-core-contracts/v2/errors"
v2clients "github.com/edgexfoundry/go-mod-core-contracts/v2/v2/clients/http"
"github.com/edgexfoundry/go-mod-messaging/v2/messaging"
"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"
"github.com/edgexfoundry/go-mod-registry/v2/registry"

"github.com/edgexfoundry/device-sdk-go/v2/internal/config"
Expand All @@ -43,7 +41,6 @@ func BootstrapHandler(
// The initialization process should be pending until Metadata Service and Core Data Service are both available.
func InitDependencyClients(ctx context.Context, wg *sync.WaitGroup, startupTimer startup.Timer, dic *di.Container) bool {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
configuration := container.ConfigurationFrom(dic.Get)

err := validateClientConfig(container.ConfigurationFrom(dic.Get))
if err != nil {
Expand All @@ -56,10 +53,6 @@ func InitDependencyClients(ctx context.Context, wg *sync.WaitGroup, startupTimer
}
initCoreServiceClients(dic)

if configuration.MessageQueue.Enabled && initMessagingClient(ctx, wg, startupTimer, dic) == false {
return false
}

lc.Info("Service clients initialize successful.")
return true
}
Expand Down Expand Up @@ -194,55 +187,3 @@ func initCoreServiceClients(dic *di.Container) {
},
})
}

func initMessagingClient(ctx context.Context, wg *sync.WaitGroup, startupTimer startup.Timer, dic *di.Container) bool {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
configuration := container.ConfigurationFrom(dic.Get)

msgClient, err := messaging.NewMessageClient(
types.MessageBusConfig{
PublishHost: types.HostInfo{
Host: configuration.MessageQueue.Host,
Port: configuration.MessageQueue.Port,
Protocol: configuration.MessageQueue.Protocol,
},
Type: configuration.MessageQueue.Type,
Optional: configuration.MessageQueue.Optional,
})
if err != nil {
lc.Errorf("Failed to create MessageClient: %v", err)
return false
}

for startupTimer.HasNotElapsed() {
select {
case <-ctx.Done():
return false
default:
err = msgClient.Connect()
if err != nil {
lc.Warnf("Unable to connect MessageBus: %v", err)
} else {
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-ctx.Done():
msgClient.Disconnect()
lc.Infof("Disconnecting from MessageBus")
}
}()
dic.Update(di.ServiceConstructorMap{
container.MessagingClientName: func(get di.Get) interface{} {
return msgClient
},
})
return true
}
startupTimer.SleepForInterval()
}
}

lc.Error("Connecting to MessageBus time out")
return false
}
2 changes: 1 addition & 1 deletion internal/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func SendEvent(event *dtos.Event, correlationID string, dic *di.Container) {
ctx := context.WithValue(context.Background(), CorrelationHeader, correlationID)
req := requests.NewAddEventRequest(*event)

if configuration.MessageQueue.Enabled {
if configuration.Service.UseMessageBus {
mc := container.MessagingClientFrom(dic.Get)
bytes, encoding, err := req.Encode()
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type ConfigurationStruct struct {
// SecretStore contains information for connecting to the secure SecretStore (Vault) to retrieve or store secrets
SecretStore bootstrapConfig.SecretStoreInfo
// MessageQueue contains information for connecting to MessageBus which provides alternative way to publish event
MessageQueue MessageQueueInfo
MessageQueue bootstrapConfig.MessageBusInfo
}

// UpdateFromRaw converts configuration received from the registry to a service-specific configuration struct which is
Expand Down Expand Up @@ -87,3 +87,8 @@ func (c *ConfigurationStruct) GetRegistryInfo() bootstrapConfig.RegistryInfo {
func (c *ConfigurationStruct) GetInsecureSecrets() bootstrapConfig.InsecureSecrets {
return c.Writable.InsecureSecrets
}

// GetMessageBusInfo returns the MessageBus configuration
func (c *ConfigurationStruct) GetMessageBusInfo() bootstrapConfig.MessageBusInfo {
return c.MessageQueue
}
29 changes: 2 additions & 27 deletions internal/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
package config

import (
"fmt"

"github.com/edgexfoundry/go-mod-bootstrap/v2/config"
)

Expand Down Expand Up @@ -56,6 +54,8 @@ type ServiceInfo struct {
AsyncBufferSize int
// MaxRequestSize defines the maximum size of http request body in bytes
MaxRequestSize int64
// UseMessageBus indicates whether or not the Event are published directly to the MessageBus
UseMessageBus bool
}

// DeviceInfo is a struct which contains device specific configuration settings.
Expand Down Expand Up @@ -101,31 +101,6 @@ type DiscoveryInfo struct {
Interval string
}

// MessageQueueInfo provides parameters related to connecting to a message queue
type MessageQueueInfo struct {
Enabled bool
// Protocol indicates the protocol to use when accessing the message queue.
Protocol string
// Host is the hostname or IP address of the broker, if applicable.
Host string
// Port defines the port on which to access the message queue.
Port int
// Indicates the message queue platform being used.
Type string
// Indicates the topic prefix the data is published to. Note that /<device-profile-name>/<device-name> will be
// added to this Publish Topic prefix as the complete publish topic
PublishTopicPrefix string
// Provides additional configuration properties which do not fit within the existing field.
// Typically the key is the name of the configuration property and the value is a string representation of the
// desired value for the configuration property.
Optional map[string]string
}

// URL constructs a URL from the protocol, host and port and returns that as a string.
func (m MessageQueueInfo) URL() string {
return fmt.Sprintf("%s://%s:%v", m.Protocol, m.Host, m.Port)
}

func (s ServiceInfo) GetBootstrapServiceInfo() config.ServiceInfo {
return config.ServiceInfo{
BootTimeout: s.BootTimeout,
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/http/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (c *RestController) Secret(writer http.ResponseWriter, request *http.Reques

path, secret := c.prepareSecret(secretRequest)

if err := provider.StoreSecrets(path, secret); err != nil {
if err := provider.StoreSecret(path, secret); err != nil {
edgexError := errors.NewCommonEdgeX(errors.KindServerError, "Storing secret failed", err)
c.sendEdgexError(writer, request, edgexError, sdkCommon.APIV2SecretRoute)
return
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/http/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ func TestSecretRequest(t *testing.T) {
config := &config.ConfigurationStruct{}

mockProvider := &mocks.SecretProvider{}
mockProvider.On("StoreSecrets", "/mqtt", map[string]string{"password": "password", "username": "username"}).Return(nil)
mockProvider.On("StoreSecrets", "/no", map[string]string{"password": "password", "username": "username"}).Return(errors.New("Invalid w/o Vault"))
mockProvider.On("StoreSecret", "/mqtt", map[string]string{"password": "password", "username": "username"}).Return(nil)
mockProvider.On("StoreSecret", "/no", map[string]string{"password": "password", "username": "username"}).Return(errors.New("Invalid w/o Vault"))

dic := di.NewContainer(di.ServiceConstructorMap{
container.ConfigurationName: func(get di.Get) interface{} {
Expand Down
23 changes: 23 additions & 0 deletions internal/handler/messaging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package handler

import (
"context"
"sync"

"github.com/edgexfoundry/device-sdk-go/v2/internal/container"
bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
bootstrapMessaging "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/messaging"
"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/startup"
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"
)

func MessagingBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startupTimer startup.Timer, dic *di.Container) bool {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
config := container.ConfigurationFrom(dic.Get)
if config.Service.UseMessageBus == true {
return bootstrapMessaging.BootstrapHandler(ctx, wg, startupTimer, dic)
}

lc.Info("Use of MessageBus disabled, skipping creation of messaging client")
return true
}
3 changes: 2 additions & 1 deletion pkg/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/edgexfoundry/device-sdk-go/v2/internal/clients"
"github.com/edgexfoundry/device-sdk-go/v2/internal/common"
"github.com/edgexfoundry/device-sdk-go/v2/internal/container"

"github.com/edgexfoundry/device-sdk-go/v2/internal/handler"
"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap"
"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/flags"
"github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/handlers"
Expand Down Expand Up @@ -74,6 +74,7 @@ func Main(serviceName string, serviceVersion string, proto interface{}, ctx cont
true,
[]interfaces.BootstrapHandler{
httpServer.BootstrapHandler,
handler.MessagingBootstrapHandler,
clients.BootstrapHandler,
autoevent.BootstrapHandler,
NewBootstrap(router).BootstrapHandler,
Expand Down

0 comments on commit edddd13

Please sign in to comment.