Skip to content

Commit

Permalink
feat: Allow NameFieldPath configurable
Browse files Browse the repository at this point in the history
Allow NameFieldPath configurable by checking whether the NameFieldPathEscape enable.

Signed-off-by: bruce <weichou1229@gmail.com>
  • Loading branch information
weichou1229 committed Sep 13, 2023
1 parent 10d83f9 commit f16fc3d
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 11 deletions.
4 changes: 3 additions & 1 deletion internal/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ func SendEvent(event *dtos.Event, correlationID string, dic *di.Container) {
ctx = context.WithValue(ctx, common.ContentType, encoding) // nolint: staticcheck
envelope := types.NewMessageEnvelope(bytes, ctx)
serviceName := container.DeviceServiceFrom(dic.Get).Name
publishTopic := common.BuildTopic(configuration.MessageBus.GetBaseTopicPrefix(), common.EventsPublishTopic, DeviceServiceEventPrefix, common.URLEncode(serviceName), common.URLEncode(event.ProfileName), common.URLEncode(event.DeviceName), common.URLEncode(event.SourceName))
publishTopic := common.NewPathBuilder().EnableNameFieldEscape(configuration.Service.EnableNameFieldEscape).
SetPath(configuration.MessageBus.GetBaseTopicPrefix()).SetPath(common.EventsPublishTopic).SetPath(DeviceServiceEventPrefix).
SetNameFieldPath(serviceName).SetNameFieldPath(event.ProfileName).SetNameFieldPath(event.DeviceName).SetNameFieldPath(event.SourceName).BuildPath()
err = mc.Publish(envelope, publishTopic)
if err != nil {
lc.Errorf("Failed to publish event to MessageBus: %s", err)
Expand Down
9 changes: 5 additions & 4 deletions internal/controller/messaging/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ import (

func MetadataSystemEventsCallback(ctx context.Context, serviceBaseName string, dic *di.Container) errors.EdgeX {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
configuration := container.ConfigurationFrom(dic.Get)
messageBusInfo := container.ConfigurationFrom(dic.Get).MessageBus
deviceService := container.DeviceServiceFrom(dic.Get)
metadataSystemEventTopic := common.BuildTopic(messageBusInfo.GetBaseTopicPrefix(),
common.MetadataSystemEventSubscribeTopic, common.URLEncode(deviceService.Name), "#")
metadataSystemEventTopic := common.NewPathBuilder().EnableNameFieldEscape(configuration.Service.EnableNameFieldEscape).
SetPath(messageBusInfo.GetBaseTopicPrefix()).SetPath(common.MetadataSystemEventSubscribeTopic).SetNameFieldPath(deviceService.Name).SetPath("#").BuildPath()

lc.Infof("Subscribing to System Events on topic: %s", metadataSystemEventTopic)

Expand All @@ -48,8 +49,8 @@ func MetadataSystemEventsCallback(ctx context.Context, serviceBaseName string, d
if serviceBaseName != deviceService.Name {
// Must replace the first wildcard with the type for Provision Watchers
baseSubscribeTopic := strings.Replace(common.MetadataSystemEventSubscribeTopic, "+", common.ProvisionWatcherSystemEventType, 1)
provisionWatcherSystemEventSubscribeTopic := common.BuildTopic(messageBusInfo.GetBaseTopicPrefix(),
baseSubscribeTopic, common.URLEncode(serviceBaseName), "#")
provisionWatcherSystemEventSubscribeTopic := common.NewPathBuilder().EnableNameFieldEscape(configuration.Service.EnableNameFieldEscape).
SetPath(messageBusInfo.GetBaseTopicPrefix()).SetPath(baseSubscribeTopic).SetNameFieldPath(serviceBaseName).SetPath("#").BuildPath()

topics = append(topics, types.TopicChannel{
Topic: provisionWatcherSystemEventSubscribeTopic,
Expand Down
8 changes: 5 additions & 3 deletions internal/controller/messaging/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ import (

func SubscribeCommands(ctx context.Context, dic *di.Container) errors.EdgeX {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
configuration := container.ConfigurationFrom(dic.Get)
messageBusInfo := container.ConfigurationFrom(dic.Get).MessageBus
deviceService := container.DeviceServiceFrom(dic.Get)
escapedDeviceServiceName := common.URLEncode(deviceService.Name)

requestSubscribeTopic := common.BuildTopic(messageBusInfo.GetBaseTopicPrefix(), common.CommandRequestSubscribeTopic, escapedDeviceServiceName, "#")
requestSubscribeTopic := common.NewPathBuilder().EnableNameFieldEscape(configuration.Service.EnableNameFieldEscape).
SetPath(messageBusInfo.GetBaseTopicPrefix()).SetPath(common.CommandRequestSubscribeTopic).SetNameFieldPath(deviceService.Name).SetPath("#").BuildPath()
lc.Infof("Subscribing to command requests on topic: %s", requestSubscribeTopic)

responsePublishTopicPrefix := common.BuildTopic(messageBusInfo.GetBaseTopicPrefix(), common.ResponseTopic, escapedDeviceServiceName)
responsePublishTopicPrefix := common.NewPathBuilder().EnableNameFieldEscape(configuration.Service.EnableNameFieldEscape).
SetPath(messageBusInfo.GetBaseTopicPrefix()).SetPath(common.ResponseTopic).SetNameFieldPath(deviceService.Name).BuildPath()
lc.Infof("Responses to command requests will be published on topic: %s/<requestId>", responsePublishTopicPrefix)

messages := make(chan types.MessageEnvelope, 1)
Expand Down
9 changes: 6 additions & 3 deletions internal/controller/messaging/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import (

func SubscribeDeviceValidation(ctx context.Context, dic *di.Container) errors.EdgeX {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
configuration := container.ConfigurationFrom(dic.Get)
messageBusInfo := container.ConfigurationFrom(dic.Get).MessageBus
serviceName := common.URLEncode(container.DeviceServiceFrom(dic.Get).Name)
serviceName := container.DeviceServiceFrom(dic.Get).Name

requestTopic := common.BuildTopic(messageBusInfo.GetBaseTopicPrefix(), serviceName, common.ValidateDeviceSubscribeTopic)
requestTopic := common.NewPathBuilder().EnableNameFieldEscape(configuration.Service.EnableNameFieldEscape).
SetPath(messageBusInfo.GetBaseTopicPrefix()).SetNameFieldPath(serviceName).SetPath(common.ValidateDeviceSubscribeTopic).BuildPath()
lc.Infof("Subscribing to device validation requests on topic: %s", requestTopic)

responseTopicPrefix := common.BuildTopic(messageBusInfo.GetBaseTopicPrefix(), common.ResponseTopic, serviceName)
responseTopicPrefix := common.NewPathBuilder().EnableNameFieldEscape(configuration.Service.EnableNameFieldEscape).
SetPath(messageBusInfo.GetBaseTopicPrefix()).SetPath(common.ResponseTopic).SetNameFieldPath(serviceName).BuildPath()
lc.Infof("Responses to device validation requests will be published on topic: %s/<requestId>", responseTopicPrefix)

messages := make(chan types.MessageEnvelope, 1)
Expand Down

0 comments on commit f16fc3d

Please sign in to comment.