diff --git a/internal/common/utils.go b/internal/common/utils.go index 0b4754fd..ebde4252 100644 --- a/internal/common/utils.go +++ b/internal/common/utils.go @@ -71,7 +71,9 @@ func SendEvent(event *dtos.Event, correlationID string, dic *di.Container) { ctx = context.WithValue(ctx, common.ContentType, encoding) // nolint: staticcheck envelope := types.NewMessageEnvelope(bytes, ctx) serviceName := container.DeviceServiceFrom(dic.Get).Name - publishTopic := common.BuildTopic(configuration.MessageBus.GetBaseTopicPrefix(), common.EventsPublishTopic, DeviceServiceEventPrefix, common.URLEncode(serviceName), common.URLEncode(event.ProfileName), common.URLEncode(event.DeviceName), common.URLEncode(event.SourceName)) + publishTopic := common.NewPathBuilder().EnableNameFieldEscape(configuration.Service.EnableNameFieldEscape). + SetPath(configuration.MessageBus.GetBaseTopicPrefix()).SetPath(common.EventsPublishTopic).SetPath(DeviceServiceEventPrefix). + SetNameFieldPath(serviceName).SetNameFieldPath(event.ProfileName).SetNameFieldPath(event.DeviceName).SetNameFieldPath(event.SourceName).BuildPath() err = mc.Publish(envelope, publishTopic) if err != nil { lc.Errorf("Failed to publish event to MessageBus: %s", err) diff --git a/internal/controller/messaging/callback.go b/internal/controller/messaging/callback.go index fd0b6722..4ce686e5 100644 --- a/internal/controller/messaging/callback.go +++ b/internal/controller/messaging/callback.go @@ -25,10 +25,11 @@ import ( func MetadataSystemEventsCallback(ctx context.Context, serviceBaseName string, dic *di.Container) errors.EdgeX { lc := bootstrapContainer.LoggingClientFrom(dic.Get) + configuration := container.ConfigurationFrom(dic.Get) messageBusInfo := container.ConfigurationFrom(dic.Get).MessageBus deviceService := container.DeviceServiceFrom(dic.Get) - metadataSystemEventTopic := common.BuildTopic(messageBusInfo.GetBaseTopicPrefix(), - common.MetadataSystemEventSubscribeTopic, common.URLEncode(deviceService.Name), "#") + metadataSystemEventTopic := common.NewPathBuilder().EnableNameFieldEscape(configuration.Service.EnableNameFieldEscape). + SetPath(messageBusInfo.GetBaseTopicPrefix()).SetPath(common.MetadataSystemEventSubscribeTopic).SetNameFieldPath(deviceService.Name).SetPath("#").BuildPath() lc.Infof("Subscribing to System Events on topic: %s", metadataSystemEventTopic) @@ -48,8 +49,8 @@ func MetadataSystemEventsCallback(ctx context.Context, serviceBaseName string, d if serviceBaseName != deviceService.Name { // Must replace the first wildcard with the type for Provision Watchers baseSubscribeTopic := strings.Replace(common.MetadataSystemEventSubscribeTopic, "+", common.ProvisionWatcherSystemEventType, 1) - provisionWatcherSystemEventSubscribeTopic := common.BuildTopic(messageBusInfo.GetBaseTopicPrefix(), - baseSubscribeTopic, common.URLEncode(serviceBaseName), "#") + provisionWatcherSystemEventSubscribeTopic := common.NewPathBuilder().EnableNameFieldEscape(configuration.Service.EnableNameFieldEscape). + SetPath(messageBusInfo.GetBaseTopicPrefix()).SetPath(baseSubscribeTopic).SetNameFieldPath(serviceBaseName).SetPath("#").BuildPath() topics = append(topics, types.TopicChannel{ Topic: provisionWatcherSystemEventSubscribeTopic, diff --git a/internal/controller/messaging/command.go b/internal/controller/messaging/command.go index f677b863..6cd8fdfb 100644 --- a/internal/controller/messaging/command.go +++ b/internal/controller/messaging/command.go @@ -27,14 +27,16 @@ import ( func SubscribeCommands(ctx context.Context, dic *di.Container) errors.EdgeX { lc := bootstrapContainer.LoggingClientFrom(dic.Get) + configuration := container.ConfigurationFrom(dic.Get) messageBusInfo := container.ConfigurationFrom(dic.Get).MessageBus deviceService := container.DeviceServiceFrom(dic.Get) - escapedDeviceServiceName := common.URLEncode(deviceService.Name) - requestSubscribeTopic := common.BuildTopic(messageBusInfo.GetBaseTopicPrefix(), common.CommandRequestSubscribeTopic, escapedDeviceServiceName, "#") + requestSubscribeTopic := common.NewPathBuilder().EnableNameFieldEscape(configuration.Service.EnableNameFieldEscape). + SetPath(messageBusInfo.GetBaseTopicPrefix()).SetPath(common.CommandRequestSubscribeTopic).SetNameFieldPath(deviceService.Name).SetPath("#").BuildPath() lc.Infof("Subscribing to command requests on topic: %s", requestSubscribeTopic) - responsePublishTopicPrefix := common.BuildTopic(messageBusInfo.GetBaseTopicPrefix(), common.ResponseTopic, escapedDeviceServiceName) + responsePublishTopicPrefix := common.NewPathBuilder().EnableNameFieldEscape(configuration.Service.EnableNameFieldEscape). + SetPath(messageBusInfo.GetBaseTopicPrefix()).SetPath(common.ResponseTopic).SetNameFieldPath(deviceService.Name).BuildPath() lc.Infof("Responses to command requests will be published on topic: %s/", responsePublishTopicPrefix) messages := make(chan types.MessageEnvelope, 1) diff --git a/internal/controller/messaging/validation.go b/internal/controller/messaging/validation.go index ff5a3a21..70ed70cb 100644 --- a/internal/controller/messaging/validation.go +++ b/internal/controller/messaging/validation.go @@ -22,13 +22,16 @@ import ( func SubscribeDeviceValidation(ctx context.Context, dic *di.Container) errors.EdgeX { lc := bootstrapContainer.LoggingClientFrom(dic.Get) + configuration := container.ConfigurationFrom(dic.Get) messageBusInfo := container.ConfigurationFrom(dic.Get).MessageBus - serviceName := common.URLEncode(container.DeviceServiceFrom(dic.Get).Name) + serviceName := container.DeviceServiceFrom(dic.Get).Name - requestTopic := common.BuildTopic(messageBusInfo.GetBaseTopicPrefix(), serviceName, common.ValidateDeviceSubscribeTopic) + requestTopic := common.NewPathBuilder().EnableNameFieldEscape(configuration.Service.EnableNameFieldEscape). + SetPath(messageBusInfo.GetBaseTopicPrefix()).SetNameFieldPath(serviceName).SetPath(common.ValidateDeviceSubscribeTopic).BuildPath() lc.Infof("Subscribing to device validation requests on topic: %s", requestTopic) - responseTopicPrefix := common.BuildTopic(messageBusInfo.GetBaseTopicPrefix(), common.ResponseTopic, serviceName) + responseTopicPrefix := common.NewPathBuilder().EnableNameFieldEscape(configuration.Service.EnableNameFieldEscape). + SetPath(messageBusInfo.GetBaseTopicPrefix()).SetPath(common.ResponseTopic).SetNameFieldPath(serviceName).BuildPath() lc.Infof("Responses to device validation requests will be published on topic: %s/", responseTopicPrefix) messages := make(chan types.MessageEnvelope, 1)