Skip to content

Commit

Permalink
feat: Add MQTT Will configuration for MQTT Export (#1495)
Browse files Browse the repository at this point in the history
closes #1117

Signed-off-by: Leonard Goodell <leonard.goodell@intel.com>
  • Loading branch information
Lenny Goodell authored Oct 18, 2023
1 parent 659344e commit 03e14d6
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 13 deletions.
4 changes: 1 addition & 3 deletions app-service-template/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ module app-new-service

go 1.21

toolchain go1.21.0

// To build local docker image of the template App you must
// comment out this replace statement and update the SDK version to latest
replace github.com/edgexfoundry/app-functions-sdk-go/v3 => ../
Expand All @@ -24,7 +22,7 @@ require (
github.com/armon/go-metrics v0.4.1 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/diegoholiveira/jsonlogic/v3 v3.3.0 // indirect
github.com/diegoholiveira/jsonlogic/v3 v3.3.1 // indirect
github.com/eclipse/paho.mqtt.golang v1.4.3 // indirect
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.1.0-dev.49 // indirect
github.com/edgexfoundry/go-mod-configuration/v3 v3.1.0-dev.7 // indirect
Expand Down
4 changes: 2 additions & 2 deletions app-service-template/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/diegoholiveira/jsonlogic/v3 v3.3.0 h1:XdIxQ+ICFcQB9tVf46cmiCkc5K9MN8Sh/x+XDHL+iXM=
github.com/diegoholiveira/jsonlogic/v3 v3.3.0/go.mod h1:9oE8z9G+0OMxOoLHF3fhek3KuqD5CBqM0B6XFL08MSg=
github.com/diegoholiveira/jsonlogic/v3 v3.3.1 h1:XVHpFel6ZTVYiXQlgDrlxJYYDNZoQXP54BR33w2nAJ4=
github.com/diegoholiveira/jsonlogic/v3 v3.3.1/go.mod h1:9oE8z9G+0OMxOoLHF3fhek3KuqD5CBqM0B6XFL08MSg=
github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.1.0-dev.49 h1:HmfusqiLSGlC0+ojDeYc2Cg9g2kMe7kizvuV8471hOo=
Expand Down
70 changes: 65 additions & 5 deletions internal/app/configurable.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (
"strconv"
"strings"

"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/common"
"github.com/edgexfoundry/app-functions-sdk-go/v3/pkg/interfaces"
"github.com/edgexfoundry/app-functions-sdk-go/v3/pkg/transforms"
"github.com/edgexfoundry/app-functions-sdk-go/v3/pkg/util"
"github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger"
"github.com/edgexfoundry/go-mod-core-contracts/v3/common"
coreCommon "github.com/edgexfoundry/go-mod-core-contracts/v3/common"
)

const (
Expand Down Expand Up @@ -82,10 +83,15 @@ const (
IsEventData = "iseventdata"
MergeOnSend = "mergeonsend"
HttpRequestHeaders = "httprequestheaders"
WillEnabled = "willenabled"
WillTopic = "willtopic"
WillQos = "willqos"
WillPayload = "willpayload"
WillRetained = "willretained"
)

// Configurable contains the helper functions that return the function pointers for building the configurable function pipeline.
// They transform the parameters map from the Pipeline configuration in to the actual actual parameters required by the function.
// They transform the parameters map from the Pipeline configuration in to the actual parameters required by the function.
type Configurable struct {
lc logger.LoggingClient
}
Expand Down Expand Up @@ -225,14 +231,14 @@ func (app *Configurable) WrapIntoEvent(parameters map[string]string) interfaces.
var transform *transforms.EventWrapper

// Converts to upper case and validates it is a valid ValueType
valueType, err := common.NormalizeValueType(valueType)
valueType, err := coreCommon.NormalizeValueType(valueType)
if err != nil {
app.lc.Error(err.Error())
return nil
}

switch valueType {
case common.ValueTypeBinary:
case coreCommon.ValueTypeBinary:
mediaType, ok := parameters[MediaType]
if !ok {
app.lc.Error("Could not find " + MediaType)
Expand All @@ -247,7 +253,7 @@ func (app *Configurable) WrapIntoEvent(parameters map[string]string) interfaces.
}

transform = transforms.NewEventWrapperBinaryReading(profileName, deviceName, resourceName, mediaType)
case common.ValueTypeObject:
case coreCommon.ValueTypeObject:
transform = transforms.NewEventWrapperObjectReading(profileName, deviceName, resourceName)

default:
Expand Down Expand Up @@ -381,6 +387,7 @@ func (app *Configurable) MQTTExport(parameters map[string]string) interfaces.App
retain := false
autoReconnect := false
skipCertVerify := false
willEnabled := false

brokerAddress, ok := parameters[BrokerAddress]
if !ok {
Expand Down Expand Up @@ -441,6 +448,57 @@ func (app *Configurable) MQTTExport(parameters map[string]string) interfaces.App
}
}

will := common.WillConfig{}

willEnabledVal := parameters[WillEnabled]
if len(willEnabledVal) > 0 {
willEnabled, err = strconv.ParseBool(willEnabledVal)
if err != nil {
app.lc.Errorf("Could not parse '%s' to a bool for '%s' parameter: %s", willEnabledVal, WillEnabled, err.Error())
return nil
}
}

if willEnabled {
will.Enabled = true

payloadVal := parameters[WillPayload]
if len(payloadVal) == 0 {
app.lc.Errorf("WillPayload must be present and non-empty when WillEnabled set to true")
return nil
}
will.Payload = payloadVal

topicVal := parameters[WillTopic]
if len(topicVal) == 0 {
app.lc.Errorf("WillTopic must be present and non-empty when WillEnabled set to true")
return nil
}
will.Topic = topicVal

qosVal := parameters[WillQos]
if len(qosVal) > 0 {
qos, err = strconv.Atoi(qosVal)
if err != nil {
app.lc.Errorf("Could not parse '%s' to a int for '%s' parameter: %s", qosVal, WillQos, err.Error())
return nil
}

will.Qos = byte(qos)
}

retainedVal := parameters[WillRetained]
if len(retainedVal) > 0 {
retained, err := strconv.ParseBool(retainedVal)
if err != nil {
app.lc.Errorf("Could not parse '%s' to a bool for '%s' parameter: %s", retainedVal, WillRetained, err.Error())
return nil
}

will.Retained = retained
}
}

// These are optional and blank values result in MQTT defaults being used.
keepAlive := parameters[KeepAlive]
connectTimeout := parameters[ConnectTimeout]
Expand All @@ -457,7 +515,9 @@ func (app *Configurable) MQTTExport(parameters map[string]string) interfaces.App
SecretName: secretName,
Topic: topic,
AuthMode: authMode,
Will: will,
}

// PersistOnError is optional and is false by default.
persistOnError := false
value, ok := parameters[PersistOnError]
Expand Down
82 changes: 82 additions & 0 deletions internal/app/configurable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,93 @@ func TestMQTTExport(t *testing.T) {
params[AuthMode] = "none"
params[ConnectTimeout] = "5s"
params[KeepAlive] = "6s"
params[WillEnabled] = "true"
params[WillTopic] = "will"
params[WillPayload] = "goodbye"

trx := configurable.MQTTExport(params)
assert.NotNil(t, trx, "return result from MQTTSecretSend should not be nil")
}

func TestMQTTExportWillOptions(t *testing.T) {
configurable := Configurable{lc: lc}

goodWillOptions := make(map[string]string)
goodWillOptions[WillEnabled] = "true"
goodWillOptions[WillTopic] = "will"
goodWillOptions[WillPayload] = "goodbye"
goodWillOptions[WillRetained] = "true"
goodWillOptions[WillQos] = "2"
emptyWill := make(map[string]string)

willDisabled := make(map[string]string)
willDisabled[WillEnabled] = "false"

badEnabled := make(map[string]string)
badEnabled[WillEnabled] = "junk"

missingTopic := copyMap(goodWillOptions)
delete(missingTopic, WillTopic)
emptyTopic := copyMap(goodWillOptions)
emptyTopic[WillTopic] = ""

missingPayload := copyMap(goodWillOptions)
delete(missingPayload, WillPayload)
emptyPayload := copyMap(goodWillOptions)
emptyPayload[WillTopic] = ""

badRetained := copyMap(goodWillOptions)
badRetained[WillRetained] = "junk"

badQos := copyMap(goodWillOptions)
badQos[WillQos] = "junk"

tests := []struct {
Name string
Params map[string]string
ExpectError bool
}{
{"Happy Path all options", goodWillOptions, false},
{"Happy Path no options", emptyWill, false},
{"Happy Path will disabled", emptyWill, false},
{"Bad WillEnabled", badEnabled, true},
{"Bad WillRetained", badRetained, true},
{"Bad WillQos", badQos, true},
{"Missing Topic", missingTopic, true},
{"Empty Topic", emptyTopic, true},
{"Missing Payload", missingPayload, true},
{"Empty Payload", emptyPayload, true},
}

for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
test.Params[BrokerAddress] = "mqtt://broker:8883"
test.Params[Topic] = "topic"
test.Params[SecretName] = "my-secret"
test.Params[ClientID] = "clientid"
test.Params[AuthMode] = "none"

trx := configurable.MQTTExport(test.Params)

if test.ExpectError {
assert.Nil(t, trx)
return
}

assert.NotNil(t, trx)
})
}
}

func copyMap(src map[string]string) map[string]string {
dst := make(map[string]string)
for k, v := range src {
dst[k] = v
}

return dst
}

func TestAddTags(t *testing.T) {
configurable := Configurable{lc: lc}

Expand Down
2 changes: 1 addition & 1 deletion internal/app/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ var mockSecretProvider bootstrapInterfaces.SecretProvider

func TestMain(m *testing.M) {
// No remote and no file results in STDOUT logging only
lc = logger.NewMockClient()
lc = logger.NewClient("test", "DEBUG")
mockMetricsManager := &mocks.MetricsManager{}
mockMetricsManager.On("Register", mock.Anything, mock.Anything, mock.Anything).Return(nil)
mockMetricsManager.On("Unregister", mock.Anything)
Expand Down
14 changes: 12 additions & 2 deletions pkg/transforms/mqttsecret.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (

MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/edgexfoundry/app-functions-sdk-go/v3/internal"
"github.com/edgexfoundry/go-mod-core-contracts/v3/common"
"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/common"
coreCommon "github.com/edgexfoundry/go-mod-core-contracts/v3/common"
gometrics "github.com/rcrowley/go-metrics"

"github.com/edgexfoundry/app-functions-sdk-go/v3/pkg/interfaces"
Expand Down Expand Up @@ -70,6 +71,8 @@ type MQTTSecretConfig struct {
// AuthMode indicates what to use when connecting to the broker. Options are "none", "cacert" , "usernamepassword", "clientcert".
// If a CA Cert exists in the SecretName then it will be used for all modes except "none".
AuthMode string
// Will contains the Last Will configuration for the MQTT Client
Will common.WillConfig
}

// NewMQTTSecretSender ...
Expand Down Expand Up @@ -111,6 +114,8 @@ func (sender *MQTTSecretSender) initializeMQTTClient(ctx interfaces.AppFunctionC
return nil
}

ctx.LoggingClient().Info("Initializing MQTT Client")

config := sender.mqttConfig
mqttFactory := secure.NewMqttFactory(ctx.SecretProvider(), ctx.LoggingClient(), config.AuthMode, config.SecretName, config.SkipCertVerify)

Expand All @@ -132,6 +137,11 @@ func (sender *MQTTSecretSender) initializeMQTTClient(ctx interfaces.AppFunctionC
sender.opts.SetConnectTimeout(timeout)
}

if config.Will.Enabled {
sender.opts.SetWill(config.Will.Topic, config.Will.Payload, config.Will.Qos, config.Will.Retained)
ctx.LoggingClient().Infof("Last Will options set for MQTT Export: %+v", config.Will)
}

client, err := mqttFactory.Create(sender.opts)
if err != nil {
return fmt.Errorf("in pipeline '%s', unable to create MQTT Client: %s", ctx.PipelineId(), err.Error())
Expand Down Expand Up @@ -240,7 +250,7 @@ func (sender *MQTTSecretSender) MQTTSend(ctx interfaces.AppFunctionContext, data
sender.mqttSizeMetrics.Update(int64(exportDataBytes))

ctx.LoggingClient().Debugf("Sent %d bytes of data to MQTT Broker in pipeline '%s'", exportDataBytes, ctx.PipelineId())
ctx.LoggingClient().Tracef("Data exported", "Transport", "MQTT", "pipeline", ctx.PipelineId(), common.CorrelationHeader, ctx.CorrelationID())
ctx.LoggingClient().Tracef("Data exported", "Transport", "MQTT", "pipeline", ctx.PipelineId(), coreCommon.CorrelationHeader, ctx.CorrelationID())

return true, nil
}
Expand Down

0 comments on commit 03e14d6

Please sign in to comment.