diff --git a/internal/common/config.go b/internal/common/config.go index 11fe0f400..8d58ccfbb 100644 --- a/internal/common/config.go +++ b/internal/common/config.go @@ -85,6 +85,19 @@ type HttpConfig struct { HTTPSKeyName string } +type WillConfig struct { + // Enabled enables Last Will capability on the client connection + Enabled bool + // Payload is the Last Will Message sent to other clients that are subscribed to the will topic + Payload string + // Qos is the Quality of Service for the will topic + Qos byte + // Retained is the "retain" setting for the will topic + Retained bool + // Topic is the topic for Last Will + Topic string +} + // ExternalMqttConfig contains the MQTT broker configuration for MQTT Trigger type ExternalMqttConfig struct { // Url contains the fully qualified URL to connect to the MQTT broker @@ -112,6 +125,8 @@ type ExternalMqttConfig struct { RetryDuration int // RetryInterval indicates the time (in seconds) that will be waited between attempts to create MQTT client RetryInterval int + // Will contains the Last Will configuration for the MQTT Client + Will WillConfig } // PipelineInfo defines the top level data for configurable pipelines diff --git a/internal/trigger/mqtt/mqtt.go b/internal/trigger/mqtt/mqtt.go index 7a1de5ba6..bed7f48ff 100644 --- a/internal/trigger/mqtt/mqtt.go +++ b/internal/trigger/mqtt/mqtt.go @@ -69,7 +69,7 @@ func NewTrigger(bnd trigger.ServiceBinding, mp trigger.MessageProcessor) *Trigge // Initialize initializes the Trigger for an external MQTT broker func (trigger *Trigger) Initialize(_ *sync.WaitGroup, ctx context.Context, background <-chan interfaces.BackgroundMessage) (bootstrap.Deferred, error) { - // Convenience short cuts + // Convenience shortcuts lc := trigger.serviceBinding.LoggingClient() config := trigger.serviceBinding.Config() @@ -109,6 +109,12 @@ func (trigger *Trigger) Initialize(_ *sync.WaitGroup, ctx context.Context, backg opts.KeepAlive = brokerConfig.KeepAlive opts.Servers = []*url.URL{brokerUrl} + will := brokerConfig.Will + if will.Enabled { + opts.SetWill(will.Topic, will.Payload, will.Qos, will.Retained) + lc.Infof("Last Will options set for MQTT Trigger: %+v", will) + } + if brokerConfig.RetryDuration <= 0 { brokerConfig.RetryDuration = defaultRetryDuration } @@ -148,7 +154,7 @@ func (trigger *Trigger) Initialize(_ *sync.WaitGroup, ctx context.Context, backg } func (trigger *Trigger) onConnectHandler(mqttClient pahoMqtt.Client) { - // Convenience short cuts + // Convenience shortcuts lc := trigger.serviceBinding.LoggingClient() config := trigger.serviceBinding.Config() topics := util.DeleteEmptyAndTrim(strings.FieldsFunc(config.Trigger.SubscribeTopics, util.SplitComma)) @@ -166,7 +172,7 @@ func (trigger *Trigger) onConnectHandler(mqttClient pahoMqtt.Client) { } func (trigger *Trigger) messageHandler(_ pahoMqtt.Client, mqttMessage pahoMqtt.Message) { - // Convenience short cuts + // Convenience shortcuts lc := trigger.serviceBinding.LoggingClient() data := mqttMessage.Payload()