Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add MQTT Will configuration for MQTT Export #1495

Merged
merged 1 commit into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions app-service-template/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ module app-new-service

go 1.21

toolchain go1.21.0

// To build local docker image of the template App you must
// comment out this replace statement and update the SDK version to latest
replace github.com/edgexfoundry/app-functions-sdk-go/v3 => ../
Expand All @@ -24,7 +22,7 @@ require (
github.com/armon/go-metrics v0.4.1 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/diegoholiveira/jsonlogic/v3 v3.3.0 // indirect
github.com/diegoholiveira/jsonlogic/v3 v3.3.1 // indirect
github.com/eclipse/paho.mqtt.golang v1.4.3 // indirect
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.1.0-dev.49 // indirect
github.com/edgexfoundry/go-mod-configuration/v3 v3.1.0-dev.7 // indirect
Expand Down
4 changes: 2 additions & 2 deletions app-service-template/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/diegoholiveira/jsonlogic/v3 v3.3.0 h1:XdIxQ+ICFcQB9tVf46cmiCkc5K9MN8Sh/x+XDHL+iXM=
github.com/diegoholiveira/jsonlogic/v3 v3.3.0/go.mod h1:9oE8z9G+0OMxOoLHF3fhek3KuqD5CBqM0B6XFL08MSg=
github.com/diegoholiveira/jsonlogic/v3 v3.3.1 h1:XVHpFel6ZTVYiXQlgDrlxJYYDNZoQXP54BR33w2nAJ4=
github.com/diegoholiveira/jsonlogic/v3 v3.3.1/go.mod h1:9oE8z9G+0OMxOoLHF3fhek3KuqD5CBqM0B6XFL08MSg=
github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.1.0-dev.49 h1:HmfusqiLSGlC0+ojDeYc2Cg9g2kMe7kizvuV8471hOo=
Expand Down
70 changes: 65 additions & 5 deletions internal/app/configurable.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (
"strconv"
"strings"

"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/common"
"github.com/edgexfoundry/app-functions-sdk-go/v3/pkg/interfaces"
"github.com/edgexfoundry/app-functions-sdk-go/v3/pkg/transforms"
"github.com/edgexfoundry/app-functions-sdk-go/v3/pkg/util"
"github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger"
"github.com/edgexfoundry/go-mod-core-contracts/v3/common"
coreCommon "github.com/edgexfoundry/go-mod-core-contracts/v3/common"
)

const (
Expand Down Expand Up @@ -82,10 +83,15 @@ const (
IsEventData = "iseventdata"
MergeOnSend = "mergeonsend"
HttpRequestHeaders = "httprequestheaders"
WillEnabled = "willenabled"
WillTopic = "willtopic"
WillQos = "willqos"
WillPayload = "willpayload"
WillRetained = "willretained"
)

// Configurable contains the helper functions that return the function pointers for building the configurable function pipeline.
// They transform the parameters map from the Pipeline configuration in to the actual actual parameters required by the function.
// They transform the parameters map from the Pipeline configuration in to the actual parameters required by the function.
type Configurable struct {
lc logger.LoggingClient
}
Expand Down Expand Up @@ -225,14 +231,14 @@ func (app *Configurable) WrapIntoEvent(parameters map[string]string) interfaces.
var transform *transforms.EventWrapper

// Converts to upper case and validates it is a valid ValueType
valueType, err := common.NormalizeValueType(valueType)
valueType, err := coreCommon.NormalizeValueType(valueType)
if err != nil {
app.lc.Error(err.Error())
return nil
}

switch valueType {
case common.ValueTypeBinary:
case coreCommon.ValueTypeBinary:
mediaType, ok := parameters[MediaType]
if !ok {
app.lc.Error("Could not find " + MediaType)
Expand All @@ -247,7 +253,7 @@ func (app *Configurable) WrapIntoEvent(parameters map[string]string) interfaces.
}

transform = transforms.NewEventWrapperBinaryReading(profileName, deviceName, resourceName, mediaType)
case common.ValueTypeObject:
case coreCommon.ValueTypeObject:
transform = transforms.NewEventWrapperObjectReading(profileName, deviceName, resourceName)

default:
Expand Down Expand Up @@ -381,6 +387,7 @@ func (app *Configurable) MQTTExport(parameters map[string]string) interfaces.App
retain := false
autoReconnect := false
skipCertVerify := false
willEnabled := false

brokerAddress, ok := parameters[BrokerAddress]
if !ok {
Expand Down Expand Up @@ -441,6 +448,57 @@ func (app *Configurable) MQTTExport(parameters map[string]string) interfaces.App
}
}

will := common.WillConfig{}

willEnabledVal := parameters[WillEnabled]
if len(willEnabledVal) > 0 {
willEnabled, err = strconv.ParseBool(willEnabledVal)
if err != nil {
app.lc.Errorf("Could not parse '%s' to a bool for '%s' parameter: %s", willEnabledVal, WillEnabled, err.Error())
return nil
}
}

if willEnabled {
will.Enabled = true

payloadVal := parameters[WillPayload]
if len(payloadVal) == 0 {
app.lc.Errorf("WillPayload must be present and non-empty when WillEnabled set to true")
return nil
}
will.Payload = payloadVal

topicVal := parameters[WillTopic]
if len(topicVal) == 0 {
app.lc.Errorf("WillTopic must be present and non-empty when WillEnabled set to true")
return nil
}
will.Topic = topicVal

qosVal := parameters[WillQos]
if len(qosVal) > 0 {
qos, err = strconv.Atoi(qosVal)
if err != nil {
app.lc.Errorf("Could not parse '%s' to a int for '%s' parameter: %s", qosVal, WillQos, err.Error())
return nil
}

will.Qos = byte(qos)
}

retainedVal := parameters[WillRetained]
if len(retainedVal) > 0 {
retained, err := strconv.ParseBool(retainedVal)
if err != nil {
app.lc.Errorf("Could not parse '%s' to a bool for '%s' parameter: %s", retainedVal, WillRetained, err.Error())
return nil
}

will.Retained = retained
}
}

// These are optional and blank values result in MQTT defaults being used.
keepAlive := parameters[KeepAlive]
connectTimeout := parameters[ConnectTimeout]
Expand All @@ -457,7 +515,9 @@ func (app *Configurable) MQTTExport(parameters map[string]string) interfaces.App
SecretName: secretName,
Topic: topic,
AuthMode: authMode,
Will: will,
}

// PersistOnError is optional and is false by default.
persistOnError := false
value, ok := parameters[PersistOnError]
Expand Down
82 changes: 82 additions & 0 deletions internal/app/configurable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,93 @@ func TestMQTTExport(t *testing.T) {
params[AuthMode] = "none"
params[ConnectTimeout] = "5s"
params[KeepAlive] = "6s"
params[WillEnabled] = "true"
params[WillTopic] = "will"
params[WillPayload] = "goodbye"

trx := configurable.MQTTExport(params)
assert.NotNil(t, trx, "return result from MQTTSecretSend should not be nil")
}

func TestMQTTExportWillOptions(t *testing.T) {
configurable := Configurable{lc: lc}

goodWillOptions := make(map[string]string)
goodWillOptions[WillEnabled] = "true"
goodWillOptions[WillTopic] = "will"
goodWillOptions[WillPayload] = "goodbye"
goodWillOptions[WillRetained] = "true"
goodWillOptions[WillQos] = "2"
emptyWill := make(map[string]string)

willDisabled := make(map[string]string)
willDisabled[WillEnabled] = "false"

badEnabled := make(map[string]string)
badEnabled[WillEnabled] = "junk"

missingTopic := copyMap(goodWillOptions)
delete(missingTopic, WillTopic)
emptyTopic := copyMap(goodWillOptions)
emptyTopic[WillTopic] = ""

missingPayload := copyMap(goodWillOptions)
delete(missingPayload, WillPayload)
emptyPayload := copyMap(goodWillOptions)
emptyPayload[WillTopic] = ""

badRetained := copyMap(goodWillOptions)
badRetained[WillRetained] = "junk"

badQos := copyMap(goodWillOptions)
badQos[WillQos] = "junk"

tests := []struct {
Name string
Params map[string]string
ExpectError bool
}{
{"Happy Path all options", goodWillOptions, false},
{"Happy Path no options", emptyWill, false},
{"Happy Path will disabled", emptyWill, false},
{"Bad WillEnabled", badEnabled, true},
{"Bad WillRetained", badRetained, true},
{"Bad WillQos", badQos, true},
{"Missing Topic", missingTopic, true},
{"Empty Topic", emptyTopic, true},
{"Missing Payload", missingPayload, true},
{"Empty Payload", emptyPayload, true},
}

for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
test.Params[BrokerAddress] = "mqtt://broker:8883"
test.Params[Topic] = "topic"
test.Params[SecretName] = "my-secret"
test.Params[ClientID] = "clientid"
test.Params[AuthMode] = "none"

trx := configurable.MQTTExport(test.Params)

if test.ExpectError {
assert.Nil(t, trx)
return
}

assert.NotNil(t, trx)
})
}
}

func copyMap(src map[string]string) map[string]string {
dst := make(map[string]string)
for k, v := range src {
dst[k] = v
}

return dst
}

func TestAddTags(t *testing.T) {
configurable := Configurable{lc: lc}

Expand Down
2 changes: 1 addition & 1 deletion internal/app/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ var mockSecretProvider bootstrapInterfaces.SecretProvider

func TestMain(m *testing.M) {
// No remote and no file results in STDOUT logging only
lc = logger.NewMockClient()
lc = logger.NewClient("test", "DEBUG")
mockMetricsManager := &mocks.MetricsManager{}
mockMetricsManager.On("Register", mock.Anything, mock.Anything, mock.Anything).Return(nil)
mockMetricsManager.On("Unregister", mock.Anything)
Expand Down
14 changes: 12 additions & 2 deletions pkg/transforms/mqttsecret.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (

MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/edgexfoundry/app-functions-sdk-go/v3/internal"
"github.com/edgexfoundry/go-mod-core-contracts/v3/common"
"github.com/edgexfoundry/app-functions-sdk-go/v3/internal/common"
coreCommon "github.com/edgexfoundry/go-mod-core-contracts/v3/common"
gometrics "github.com/rcrowley/go-metrics"

"github.com/edgexfoundry/app-functions-sdk-go/v3/pkg/interfaces"
Expand Down Expand Up @@ -70,6 +71,8 @@ type MQTTSecretConfig struct {
// AuthMode indicates what to use when connecting to the broker. Options are "none", "cacert" , "usernamepassword", "clientcert".
// If a CA Cert exists in the SecretName then it will be used for all modes except "none".
AuthMode string
// Will contains the Last Will configuration for the MQTT Client
Will common.WillConfig
}

// NewMQTTSecretSender ...
Expand Down Expand Up @@ -111,6 +114,8 @@ func (sender *MQTTSecretSender) initializeMQTTClient(ctx interfaces.AppFunctionC
return nil
}

ctx.LoggingClient().Info("Initializing MQTT Client")

config := sender.mqttConfig
mqttFactory := secure.NewMqttFactory(ctx.SecretProvider(), ctx.LoggingClient(), config.AuthMode, config.SecretName, config.SkipCertVerify)

Expand All @@ -132,6 +137,11 @@ func (sender *MQTTSecretSender) initializeMQTTClient(ctx interfaces.AppFunctionC
sender.opts.SetConnectTimeout(timeout)
}

if config.Will.Enabled {
sender.opts.SetWill(config.Will.Topic, config.Will.Payload, config.Will.Qos, config.Will.Retained)
ctx.LoggingClient().Infof("Last Will options set for MQTT Export: %+v", config.Will)
}

client, err := mqttFactory.Create(sender.opts)
if err != nil {
return fmt.Errorf("in pipeline '%s', unable to create MQTT Client: %s", ctx.PipelineId(), err.Error())
Expand Down Expand Up @@ -240,7 +250,7 @@ func (sender *MQTTSecretSender) MQTTSend(ctx interfaces.AppFunctionContext, data
sender.mqttSizeMetrics.Update(int64(exportDataBytes))

ctx.LoggingClient().Debugf("Sent %d bytes of data to MQTT Broker in pipeline '%s'", exportDataBytes, ctx.PipelineId())
ctx.LoggingClient().Tracef("Data exported", "Transport", "MQTT", "pipeline", ctx.PipelineId(), common.CorrelationHeader, ctx.CorrelationID())
ctx.LoggingClient().Tracef("Data exported", "Transport", "MQTT", "pipeline", ctx.PipelineId(), coreCommon.CorrelationHeader, ctx.CorrelationID())

return true, nil
}
Expand Down