Skip to content

Commit

Permalink
feat: Add Will configuration elements for External MQTT trigger config (
Browse files Browse the repository at this point in the history
#1493)

closes #1117

Signed-off-by: Leonard Goodell <leonard.goodell@intel.com>
  • Loading branch information
Lenny Goodell authored Oct 17, 2023
1 parent 2743bc4 commit 005c7e8
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 3 deletions.
15 changes: 15 additions & 0 deletions internal/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,19 @@ type HttpConfig struct {
HTTPSKeyName string
}

type WillConfig struct {
// Enabled enables Last Will capability on the client connection
Enabled bool
// Payload is the Last Will Message sent to other clients that are subscribed to the will topic
Payload string
// Qos is the Quality of Service for the will topic
Qos byte
// Retained is the "retain" setting for the will topic
Retained bool
// Topic is the topic for Last Will
Topic string
}

// ExternalMqttConfig contains the MQTT broker configuration for MQTT Trigger
type ExternalMqttConfig struct {
// Url contains the fully qualified URL to connect to the MQTT broker
Expand Down Expand Up @@ -112,6 +125,8 @@ type ExternalMqttConfig struct {
RetryDuration int
// RetryInterval indicates the time (in seconds) that will be waited between attempts to create MQTT client
RetryInterval int
// Will contains the Last Will configuration for the MQTT Client
Will WillConfig
}

// PipelineInfo defines the top level data for configurable pipelines
Expand Down
12 changes: 9 additions & 3 deletions internal/trigger/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func NewTrigger(bnd trigger.ServiceBinding, mp trigger.MessageProcessor) *Trigge

// Initialize initializes the Trigger for an external MQTT broker
func (trigger *Trigger) Initialize(_ *sync.WaitGroup, ctx context.Context, background <-chan interfaces.BackgroundMessage) (bootstrap.Deferred, error) {
// Convenience short cuts
// Convenience shortcuts
lc := trigger.serviceBinding.LoggingClient()
config := trigger.serviceBinding.Config()

Expand Down Expand Up @@ -109,6 +109,12 @@ func (trigger *Trigger) Initialize(_ *sync.WaitGroup, ctx context.Context, backg
opts.KeepAlive = brokerConfig.KeepAlive
opts.Servers = []*url.URL{brokerUrl}

will := brokerConfig.Will
if will.Enabled {
opts.SetWill(will.Topic, will.Payload, will.Qos, will.Retained)
lc.Infof("Last Will options set for MQTT Trigger: %+v", will)
}

if brokerConfig.RetryDuration <= 0 {
brokerConfig.RetryDuration = defaultRetryDuration
}
Expand Down Expand Up @@ -148,7 +154,7 @@ func (trigger *Trigger) Initialize(_ *sync.WaitGroup, ctx context.Context, backg
}

func (trigger *Trigger) onConnectHandler(mqttClient pahoMqtt.Client) {
// Convenience short cuts
// Convenience shortcuts
lc := trigger.serviceBinding.LoggingClient()
config := trigger.serviceBinding.Config()
topics := util.DeleteEmptyAndTrim(strings.FieldsFunc(config.Trigger.SubscribeTopics, util.SplitComma))
Expand All @@ -166,7 +172,7 @@ func (trigger *Trigger) onConnectHandler(mqttClient pahoMqtt.Client) {
}

func (trigger *Trigger) messageHandler(_ pahoMqtt.Client, mqttMessage pahoMqtt.Message) {
// Convenience short cuts
// Convenience shortcuts
lc := trigger.serviceBinding.LoggingClient()

data := mqttMessage.Payload()
Expand Down

0 comments on commit 005c7e8

Please sign in to comment.