From 57291f0ab3da273ac051c1b0f4dc25fca27e5291 Mon Sep 17 00:00:00 2001 From: lenny Date: Mon, 3 May 2021 12:03:21 -0700 Subject: [PATCH 1/2] feat: Add secure MessageBus capability closes #689 Signed-off-by: lenny --- .../cmd/device-simple/res/configuration.toml | 12 +++++----- go.mod | 6 +++-- internal/clients/init.go | 5 ---- internal/common/utils.go | 2 +- internal/config/config.go | 7 +++++- internal/config/types.go | 2 ++ internal/controller/http/common.go | 2 +- internal/controller/http/common_test.go | 4 ++-- internal/handler/messaging.go | 23 +++++++++++++++++++ pkg/service/main.go | 3 ++- 10 files changed, 47 insertions(+), 19 deletions(-) create mode 100644 internal/handler/messaging.go diff --git a/example/cmd/device-simple/res/configuration.toml b/example/cmd/device-simple/res/configuration.toml index cde9b0707..0545c224a 100644 --- a/example/cmd/device-simple/res/configuration.toml +++ b/example/cmd/device-simple/res/configuration.toml @@ -23,6 +23,7 @@ AsyncBufferSize = 1 # MaxRequestSize limit the request body size in byte of put command # value 0 unlimit the request size. MaxRequestSize = 0 +UseMessageBus = false [Registry] Host = 'localhost' @@ -41,11 +42,12 @@ Type = 'consul' Port = 48081 [MessageQueue] -Enabled = false -Protocol = 'tcp' +Protocol = 'redis' Host = 'localhost' -Port = 1883 -Type = 'mqtt' +Port = 6379 +Type = 'redisstreams' +AuthMode = 'usernamepassword' # required for redis messagebus (secure or insecure). +SecretName = "redisdb" PublishTopicPrefix = 'edgex/events' # /// will be added to this Publish Topic prefix [MessageQueue.Optional] # Default MQTT Specific options that need to be here to enable environment variable overrides of them @@ -58,8 +60,6 @@ PublishTopicPrefix = 'edgex/events' # /// ../MODS/go-mod-bootstrap + go 1.15 diff --git a/internal/clients/init.go b/internal/clients/init.go index 9b2535d78..8ff05cd4f 100644 --- a/internal/clients/init.go +++ b/internal/clients/init.go @@ -43,7 +43,6 @@ func BootstrapHandler( // The initialization process should be pending until Metadata Service and Core Data Service are both available. func InitDependencyClients(ctx context.Context, wg *sync.WaitGroup, startupTimer startup.Timer, dic *di.Container) bool { lc := bootstrapContainer.LoggingClientFrom(dic.Get) - configuration := container.ConfigurationFrom(dic.Get) err := validateClientConfig(container.ConfigurationFrom(dic.Get)) if err != nil { @@ -56,10 +55,6 @@ func InitDependencyClients(ctx context.Context, wg *sync.WaitGroup, startupTimer } initCoreServiceClients(dic) - if configuration.MessageQueue.Enabled && initMessagingClient(ctx, wg, startupTimer, dic) == false { - return false - } - lc.Info("Service clients initialize successful.") return true } diff --git a/internal/common/utils.go b/internal/common/utils.go index 4d1cbebf6..2efe32876 100644 --- a/internal/common/utils.go +++ b/internal/common/utils.go @@ -57,7 +57,7 @@ func SendEvent(event *dtos.Event, correlationID string, dic *di.Container) { ctx := context.WithValue(context.Background(), CorrelationHeader, correlationID) req := requests.NewAddEventRequest(*event) - if configuration.MessageQueue.Enabled { + if configuration.Service.UseMessageBus { mc := container.MessagingClientFrom(dic.Get) bytes, encoding, err := req.Encode() if err != nil { diff --git a/internal/config/config.go b/internal/config/config.go index 625a3401f..d26475bcc 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -27,7 +27,7 @@ type ConfigurationStruct struct { // SecretStore contains information for connecting to the secure SecretStore (Vault) to retrieve or store secrets SecretStore bootstrapConfig.SecretStoreInfo // MessageQueue contains information for connecting to MessageBus which provides alternative way to publish event - MessageQueue MessageQueueInfo + MessageQueue bootstrapConfig.MessageBusInfo } // UpdateFromRaw converts configuration received from the registry to a service-specific configuration struct which is @@ -87,3 +87,8 @@ func (c *ConfigurationStruct) GetRegistryInfo() bootstrapConfig.RegistryInfo { func (c *ConfigurationStruct) GetInsecureSecrets() bootstrapConfig.InsecureSecrets { return c.Writable.InsecureSecrets } + +// GetMessageBusInfo returns the MessageBus configuration +func (c *ConfigurationStruct) GetMessageBusInfo() bootstrapConfig.MessageBusInfo { + return c.MessageQueue +} diff --git a/internal/config/types.go b/internal/config/types.go index 41cc77404..c04b6919f 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -56,6 +56,8 @@ type ServiceInfo struct { AsyncBufferSize int // MaxRequestSize defines the maximum size of http request body in bytes MaxRequestSize int64 + // UseMessageBus indicates whether or not the Event are published directly to the MessageBus + UseMessageBus bool } // DeviceInfo is a struct which contains device specific configuration settings. diff --git a/internal/controller/http/common.go b/internal/controller/http/common.go index b70a6db6c..610c2c603 100644 --- a/internal/controller/http/common.go +++ b/internal/controller/http/common.go @@ -78,7 +78,7 @@ func (c *RestController) Secret(writer http.ResponseWriter, request *http.Reques path, secret := c.prepareSecret(secretRequest) - if err := provider.StoreSecrets(path, secret); err != nil { + if err := provider.StoreSecret(path, secret); err != nil { edgexError := errors.NewCommonEdgeX(errors.KindServerError, "Storing secret failed", err) c.sendEdgexError(writer, request, edgexError, sdkCommon.APIV2SecretRoute) return diff --git a/internal/controller/http/common_test.go b/internal/controller/http/common_test.go index 6ed9e9af7..08737e086 100644 --- a/internal/controller/http/common_test.go +++ b/internal/controller/http/common_test.go @@ -168,8 +168,8 @@ func TestSecretRequest(t *testing.T) { config := &config.ConfigurationStruct{} mockProvider := &mocks.SecretProvider{} - mockProvider.On("StoreSecrets", "/mqtt", map[string]string{"password": "password", "username": "username"}).Return(nil) - mockProvider.On("StoreSecrets", "/no", map[string]string{"password": "password", "username": "username"}).Return(errors.New("Invalid w/o Vault")) + mockProvider.On("StoreSecret", "/mqtt", map[string]string{"password": "password", "username": "username"}).Return(nil) + mockProvider.On("StoreSecret", "/no", map[string]string{"password": "password", "username": "username"}).Return(errors.New("Invalid w/o Vault")) dic := di.NewContainer(di.ServiceConstructorMap{ container.ConfigurationName: func(get di.Get) interface{} { diff --git a/internal/handler/messaging.go b/internal/handler/messaging.go new file mode 100644 index 000000000..336281b6b --- /dev/null +++ b/internal/handler/messaging.go @@ -0,0 +1,23 @@ +package handler + +import ( + "context" + "sync" + + "github.com/edgexfoundry/device-sdk-go/v2/internal/container" + bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container" + bootstrapMessaging "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/messaging" + "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/startup" + "github.com/edgexfoundry/go-mod-bootstrap/v2/di" +) + +func MessagingBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startupTimer startup.Timer, dic *di.Container) bool { + lc := bootstrapContainer.LoggingClientFrom(dic.Get) + config := container.ConfigurationFrom(dic.Get) + if config.Service.UseMessageBus == true { + return bootstrapMessaging.BootstrapHandler(ctx, wg, startupTimer, dic) + } + + lc.Info("Use of MessageBus disabled, skipping creation of messaging client") + return true +} diff --git a/pkg/service/main.go b/pkg/service/main.go index 1d8100c4c..8e94aa979 100644 --- a/pkg/service/main.go +++ b/pkg/service/main.go @@ -15,7 +15,7 @@ import ( "github.com/edgexfoundry/device-sdk-go/v2/internal/clients" "github.com/edgexfoundry/device-sdk-go/v2/internal/common" "github.com/edgexfoundry/device-sdk-go/v2/internal/container" - + "github.com/edgexfoundry/device-sdk-go/v2/internal/handler" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/flags" "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/handlers" @@ -74,6 +74,7 @@ func Main(serviceName string, serviceVersion string, proto interface{}, ctx cont true, []interfaces.BootstrapHandler{ httpServer.BootstrapHandler, + handler.MessagingBootstrapHandler, clients.BootstrapHandler, autoevent.BootstrapHandler, NewBootstrap(router).BootstrapHandler, From 692b28680fefed51e933ff85e2bc551ffbcd2762 Mon Sep 17 00:00:00 2001 From: lenny Date: Fri, 7 May 2021 10:24:18 -0700 Subject: [PATCH 2/2] fix: Addressed PR feedback Signed-off-by: lenny --- .../cmd/device-simple/res/configuration.toml | 2 +- go.mod | 10 ++-- internal/clients/init.go | 54 ------------------- internal/config/types.go | 27 ---------- 4 files changed, 5 insertions(+), 88 deletions(-) diff --git a/example/cmd/device-simple/res/configuration.toml b/example/cmd/device-simple/res/configuration.toml index 0545c224a..b660bbcd9 100644 --- a/example/cmd/device-simple/res/configuration.toml +++ b/example/cmd/device-simple/res/configuration.toml @@ -45,7 +45,7 @@ Type = 'consul' Protocol = 'redis' Host = 'localhost' Port = 6379 -Type = 'redisstreams' +Type = 'redis' AuthMode = 'usernamepassword' # required for redis messagebus (secure or insecure). SecretName = "redisdb" PublishTopicPrefix = 'edgex/events' # /// will be added to this Publish Topic prefix diff --git a/go.mod b/go.mod index d53258623..8f5cc2471 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,10 @@ module github.com/edgexfoundry/device-sdk-go/v2 require ( bitbucket.org/bertimus9/systemstat v0.0.0-20180207000608-0eeff89b0690 github.com/OneOfOne/xxhash v1.2.8 - github.com/edgexfoundry/go-mod-bootstrap/v2 v2.0.0-dev.34 - github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.0-dev.80 - github.com/edgexfoundry/go-mod-messaging/v2 v2.0.0-dev.11 - github.com/edgexfoundry/go-mod-registry/v2 v2.0.0-dev.5 + github.com/edgexfoundry/go-mod-bootstrap/v2 v2.0.0-dev.46 + github.com/edgexfoundry/go-mod-core-contracts/v2 v2.0.0-dev.82 + github.com/edgexfoundry/go-mod-messaging/v2 v2.0.0-dev.13 + github.com/edgexfoundry/go-mod-registry/v2 v2.0.0-dev.7 github.com/google/uuid v1.2.0 github.com/gorilla/mux v1.8.0 github.com/pelletier/go-toml v1.9.0 @@ -14,6 +14,4 @@ require ( gopkg.in/yaml.v2 v2.4.0 ) -replace github.com/edgexfoundry/go-mod-bootstrap/v2 => ../MODS/go-mod-bootstrap - go 1.15 diff --git a/internal/clients/init.go b/internal/clients/init.go index 8ff05cd4f..c9888fcd7 100644 --- a/internal/clients/init.go +++ b/internal/clients/init.go @@ -20,8 +20,6 @@ import ( "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger" "github.com/edgexfoundry/go-mod-core-contracts/v2/errors" v2clients "github.com/edgexfoundry/go-mod-core-contracts/v2/v2/clients/http" - "github.com/edgexfoundry/go-mod-messaging/v2/messaging" - "github.com/edgexfoundry/go-mod-messaging/v2/pkg/types" "github.com/edgexfoundry/go-mod-registry/v2/registry" "github.com/edgexfoundry/device-sdk-go/v2/internal/config" @@ -189,55 +187,3 @@ func initCoreServiceClients(dic *di.Container) { }, }) } - -func initMessagingClient(ctx context.Context, wg *sync.WaitGroup, startupTimer startup.Timer, dic *di.Container) bool { - lc := bootstrapContainer.LoggingClientFrom(dic.Get) - configuration := container.ConfigurationFrom(dic.Get) - - msgClient, err := messaging.NewMessageClient( - types.MessageBusConfig{ - PublishHost: types.HostInfo{ - Host: configuration.MessageQueue.Host, - Port: configuration.MessageQueue.Port, - Protocol: configuration.MessageQueue.Protocol, - }, - Type: configuration.MessageQueue.Type, - Optional: configuration.MessageQueue.Optional, - }) - if err != nil { - lc.Errorf("Failed to create MessageClient: %v", err) - return false - } - - for startupTimer.HasNotElapsed() { - select { - case <-ctx.Done(): - return false - default: - err = msgClient.Connect() - if err != nil { - lc.Warnf("Unable to connect MessageBus: %v", err) - } else { - wg.Add(1) - go func() { - defer wg.Done() - select { - case <-ctx.Done(): - msgClient.Disconnect() - lc.Infof("Disconnecting from MessageBus") - } - }() - dic.Update(di.ServiceConstructorMap{ - container.MessagingClientName: func(get di.Get) interface{} { - return msgClient - }, - }) - return true - } - startupTimer.SleepForInterval() - } - } - - lc.Error("Connecting to MessageBus time out") - return false -} diff --git a/internal/config/types.go b/internal/config/types.go index c04b6919f..209d8e016 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -9,8 +9,6 @@ package config import ( - "fmt" - "github.com/edgexfoundry/go-mod-bootstrap/v2/config" ) @@ -103,31 +101,6 @@ type DiscoveryInfo struct { Interval string } -// MessageQueueInfo provides parameters related to connecting to a message queue -type MessageQueueInfo struct { - Enabled bool - // Protocol indicates the protocol to use when accessing the message queue. - Protocol string - // Host is the hostname or IP address of the broker, if applicable. - Host string - // Port defines the port on which to access the message queue. - Port int - // Indicates the message queue platform being used. - Type string - // Indicates the topic prefix the data is published to. Note that // will be - // added to this Publish Topic prefix as the complete publish topic - PublishTopicPrefix string - // Provides additional configuration properties which do not fit within the existing field. - // Typically the key is the name of the configuration property and the value is a string representation of the - // desired value for the configuration property. - Optional map[string]string -} - -// URL constructs a URL from the protocol, host and port and returns that as a string. -func (m MessageQueueInfo) URL() string { - return fmt.Sprintf("%s://%s:%v", m.Protocol, m.Host, m.Port) -} - func (s ServiceInfo) GetBootstrapServiceInfo() config.ServiceInfo { return config.ServiceInfo{ BootTimeout: s.BootTimeout,