diff --git a/internal/core/command/controller/messaging/external.go b/internal/core/command/controller/messaging/external.go index 553dfaf617..2b5140b296 100644 --- a/internal/core/command/controller/messaging/external.go +++ b/internal/core/command/controller/messaging/external.go @@ -70,7 +70,12 @@ func commandQueryHandler(dic *di.Container) mqtt.MessageHandler { // example topic scheme: edgex/commandquery/request/ // deviceName is expected to be at last topic level. topicLevels := strings.Split(message.Topic(), "/") - deviceName := topicLevels[len(topicLevels)-1] + deviceName, err := url.PathUnescape(topicLevels[len(topicLevels)-1]) + if err != nil { + lc.Errorf("Failed to unescape device name '%s': %s", deviceName, err.Error()) + lc.Warn("Not publishing error message back due to insufficient information on response topic") + return + } if strings.EqualFold(deviceName, common.All) { deviceName = common.All } @@ -113,8 +118,14 @@ func commandRequestHandler(requestTimeout time.Duration, dic *di.Container) mqtt // expected external command request/response topic scheme: #/// deviceName := topicLevels[length-3] + unescapedDeviceName, err := url.PathUnescape(deviceName) + if err != nil { + lc.Errorf("Failed to unescape device name '%s': %s", deviceName, err.Error()) + lc.Warn("Not publishing error message back due to insufficient information on response topic") + return + } commandName := topicLevels[length-2] - _, err = url.QueryUnescape(topicLevels[length-2]) + unescapedCommandName, err := url.PathUnescape(commandName) if err != nil { lc.Errorf("Failed to unescape command name '%s': %s", commandName, err.Error()) lc.Warn("Not publishing error message back due to insufficient information on response topic") @@ -132,7 +143,7 @@ func commandRequestHandler(requestTimeout time.Duration, dic *di.Container) mqtt internalBaseTopic := container.ConfigurationFrom(dic.Get).MessageBus.GetBaseTopicPrefix() topicPrefix := common.BuildTopic(internalBaseTopic, common.CoreCommandDeviceRequestPublishTopic) - deviceServiceName, deviceRequestTopic, err := validateRequestTopic(topicPrefix, deviceName, commandName, method, dic) + deviceServiceName, err := retrieveServiceNameByDevice(unescapedDeviceName, dic) if err != nil { responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error()) publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc) @@ -146,7 +157,9 @@ func commandRequestHandler(requestTimeout time.Duration, dic *di.Container) mqtt return } - deviceResponseTopicPrefix := common.BuildTopic(internalBaseTopic, common.ResponseTopic, deviceServiceName) + // escape again to ensure that the topic is valid in the internal message bus + deviceRequestTopic := common.BuildTopic(topicPrefix, common.URLEncode(deviceServiceName), common.URLEncode(unescapedDeviceName), common.URLEncode(unescapedCommandName), method) + deviceResponseTopicPrefix := common.BuildTopic(internalBaseTopic, common.ResponseTopic, common.URLEncode(deviceServiceName)) lc.Debugf("Sending Command request to internal MessageBus. Topic: %s, Request-id: %s Correlation-id: %s", deviceRequestTopic, requestEnvelope.RequestID, requestEnvelope.CorrelationID) lc.Debugf("Expecting response on topic: %s/%s", deviceResponseTopicPrefix, requestEnvelope.RequestID) diff --git a/internal/core/command/controller/messaging/internal.go b/internal/core/command/controller/messaging/internal.go index 6227e5f59b..e18abac4a0 100644 --- a/internal/core/command/controller/messaging/internal.go +++ b/internal/core/command/controller/messaging/internal.go @@ -99,7 +99,14 @@ func processDeviceCommandRequest( // expected internal command request/response topic scheme: #/// deviceName := topicLevels[length-3] - commandName, err := url.QueryUnescape(topicLevels[length-2]) + unescapedDeviceName, err := url.PathUnescape(deviceName) + if err != nil { + lc.Errorf("Failed to unescape device name '%s': %s", deviceName, err.Error()) + lc.Warn("Not publishing error message back due to insufficient information on response topic") + return + } + commandName := topicLevels[length-2] + unescapedCommandName, err := url.PathUnescape(commandName) if err != nil { err = fmt.Errorf("failed to unescape command name '%s': %s", commandName, err.Error()) lc.Error(err.Error()) @@ -124,7 +131,7 @@ func processDeviceCommandRequest( topicPrefix := common.BuildTopic(baseTopic, common.CoreCommandDeviceRequestPublishTopic) // internal command request topic scheme: //// - deviceServiceName, deviceRequestTopic, err := validateRequestTopic(topicPrefix, deviceName, commandName, method, dic) + deviceServiceName, err := retrieveServiceNameByDevice(unescapedDeviceName, dic) if err != nil { err = fmt.Errorf("invalid request topic: %s", err.Error()) lc.Error(err.Error()) @@ -147,7 +154,8 @@ func processDeviceCommandRequest( return } - deviceResponseTopicPrefix := common.BuildTopic(baseTopic, common.ResponseTopic, deviceServiceName) + deviceRequestTopic := common.BuildTopic(topicPrefix, common.URLEncode(deviceServiceName), common.URLEncode(unescapedDeviceName), common.URLEncode(unescapedCommandName), method) + deviceResponseTopicPrefix := common.BuildTopic(baseTopic, common.ResponseTopic, common.URLEncode(deviceServiceName)) lc.Debugf("Sending Command Device Request to internal MessageBus. Topic: %s, Correlation-id: %s", deviceRequestTopic, requestEnvelope.CorrelationID) lc.Debugf("Expecting response on topic: %s/%s", deviceResponseTopicPrefix, requestEnvelope.RequestID) @@ -228,7 +236,12 @@ func processCommandQueryRequest( // example topic scheme: /commandquery/request/ // deviceName is expected to be at last topic level. topicLevels := strings.Split(requestEnvelope.ReceivedTopic, "/") - deviceName := topicLevels[len(topicLevels)-1] + deviceName, err := url.PathUnescape(topicLevels[len(topicLevels)-1]) + if err != nil { + lc.Errorf("Failed to unescape device name '%s': %s", deviceName, err.Error()) + lc.Warn("Not publishing error message back due to insufficient information on response topic") + return + } if strings.EqualFold(deviceName, common.All) { deviceName = common.All } diff --git a/internal/core/command/controller/messaging/internal_test.go b/internal/core/command/controller/messaging/internal_test.go index 1545533fc1..160196c7bb 100644 --- a/internal/core/command/controller/messaging/internal_test.go +++ b/internal/core/command/controller/messaging/internal_test.go @@ -42,12 +42,12 @@ func TestSubscribeCommandRequests(t *testing.T) { expectedDevice := "device1" expectedResource := "resource" expectedMethod := "get" - expectedDeviceResponseTopicPrefix := strings.Join([]string{expectedResponseTopicPrefix, expectedServiceName}, "/") + expectedDeviceResponseTopicPrefix := strings.Join([]string{expectedResponseTopicPrefix, common.URLEncode(expectedServiceName)}, "/") expectedCommandResponseTopic := strings.Join([]string{expectedResponseTopicPrefix, common.CoreCommandServiceKey, expectedRequestId}, "/") expectedCommandRequestSubscribeTopic := common.BuildTopic(baseTopic, common.CoreCommandRequestSubscribeTopic) expectedCommandRequestReceivedTopic := common.BuildTopic(strings.Replace(expectedCommandRequestSubscribeTopic, "/#", "", 1), expectedServiceName, expectedDevice, expectedResource, expectedMethod) - expectedDeviceCommandRequestRequestTopic := common.BuildTopic(baseTopic, common.CoreCommandDeviceRequestPublishTopic, expectedServiceName, expectedDevice, expectedResource, expectedMethod) + expectedDeviceCommandRequestRequestTopic := common.BuildTopic(baseTopic, common.CoreCommandDeviceRequestPublishTopic, common.URLEncode(expectedServiceName), common.URLEncode(expectedDevice), common.URLEncode(expectedResource), expectedMethod) mockLogger := &lcMocks.LoggingClient{} mockDeviceClient := &mocks2.DeviceClient{} mockDeviceProfileClient := &mocks2.DeviceProfileClient{} diff --git a/internal/core/command/controller/messaging/utils.go b/internal/core/command/controller/messaging/utils.go index 560a935251..afef09d7bc 100644 --- a/internal/core/command/controller/messaging/utils.go +++ b/internal/core/command/controller/messaging/utils.go @@ -24,32 +24,29 @@ import ( "github.com/edgexfoundry/edgex-go/internal/core/command/application" ) -// validateRequestTopic validates the request topic by checking the existence of device and device service, -// returns the internal device request topic and service name to which the command request will be sent. -func validateRequestTopic(prefix string, deviceName string, commandName string, method string, dic *di.Container) (string, string, error) { +// retrieveServiceNameByDevice validates the existence of device and device service, +// returns the service name to which the command request will be sent. +func retrieveServiceNameByDevice(deviceName string, dic *di.Container) (string, error) { // retrieve device information through Metadata DeviceClient dc := bootstrapContainer.DeviceClientFrom(dic.Get) if dc == nil { - return "", "", errors.New("nil Device Client") + return "", errors.New("nil Device Client") } deviceResponse, err := dc.DeviceByName(context.Background(), deviceName) if err != nil { - return "", "", fmt.Errorf("failed to get Device by name %s: %v", deviceName, err) + return "", fmt.Errorf("failed to get Device by name %s: %v", deviceName, err) } // retrieve device service information through Metadata DeviceClient dsc := bootstrapContainer.DeviceServiceClientFrom(dic.Get) if dsc == nil { - return "", "", errors.New("nil DeviceService Client") + return "", errors.New("nil DeviceService Client") } deviceServiceResponse, err := dsc.DeviceServiceByName(context.Background(), deviceResponse.Device.ServiceName) if err != nil { - return "", "", fmt.Errorf("failed to get DeviceService by name %s: %v", deviceResponse.Device.ServiceName, err) + return "", fmt.Errorf("failed to get DeviceService by name %s: %v", deviceResponse.Device.ServiceName, err) } - - // expected internal command request topic scheme: //// - return deviceServiceResponse.Service.Name, common.BuildTopic(prefix, deviceServiceResponse.Service.Name, deviceName, commandName, method), nil - + return deviceServiceResponse.Service.Name, nil } // validateGetCommandQueryParameters validates the value is valid for device service's reserved query parameters diff --git a/internal/core/data/application/event.go b/internal/core/data/application/event.go index c56d292ef3..d86c74bf21 100644 --- a/internal/core/data/application/event.go +++ b/internal/core/data/application/event.go @@ -8,7 +8,6 @@ package application import ( "context" "fmt" - "net/url" "strings" msgTypes "github.com/edgexfoundry/go-mod-messaging/v3/pkg/types" @@ -83,7 +82,7 @@ func (a *CoreDataApp) PublishEvent(data []byte, serviceName string, profileName correlationId := correlation.FromContext(ctx) basePrefix := configuration.MessageBus.GetBaseTopicPrefix() - publishTopic := common.BuildTopic(basePrefix, common.EventsPublishTopic, CoreDataEventTopicPrefix, serviceName, profileName, deviceName, url.QueryEscape(sourceName)) + publishTopic := common.BuildTopic(basePrefix, common.EventsPublishTopic, CoreDataEventTopicPrefix, common.URLEncode(serviceName), common.URLEncode(profileName), common.URLEncode(deviceName), common.URLEncode(sourceName)) lc.Debugf("Publishing AddEventRequest to MessageBus. Topic: %s; %s: %s", publishTopic, common.CorrelationHeader, correlationId) msgEnvelope := msgTypes.NewMessageEnvelope(data, ctx) diff --git a/internal/core/data/controller/messaging/subscriber.go b/internal/core/data/controller/messaging/subscriber.go index 2923c42495..6c2a1c9d87 100644 --- a/internal/core/data/controller/messaging/subscriber.go +++ b/internal/core/data/controller/messaging/subscriber.go @@ -118,8 +118,14 @@ func validateEvent(messageTopic string, e dtos.Event) errors.EdgeX { } len := len(fields) - profileName := fields[len-3] - deviceName := fields[len-2] + profileName, err := url.PathUnescape(fields[len-3]) + if err != nil { + return errors.NewCommonEdgeXWrapper(err) + } + deviceName, err := url.PathUnescape(fields[len-2]) + if err != nil { + return errors.NewCommonEdgeXWrapper(err) + } sourceName, err := url.PathUnescape(fields[len-1]) if err != nil { return errors.NewCommonEdgeXWrapper(err) diff --git a/internal/core/metadata/application/notify.go b/internal/core/metadata/application/notify.go index fea3daef1e..6a79ed4beb 100644 --- a/internal/core/metadata/application/notify.go +++ b/internal/core/metadata/application/notify.go @@ -42,8 +42,8 @@ func validateDeviceCallback(device dtos.Device, dic *di.Container) errors.EdgeX } baseTopic := configuration.MessageBus.GetBaseTopicPrefix() - requestTopic := common.BuildTopic(baseTopic, device.ServiceName, common.ValidateDeviceSubscribeTopic) - responseTopicPrefix := common.BuildTopic(baseTopic, common.ResponseTopic, device.ServiceName) + requestTopic := common.BuildTopic(baseTopic, common.URLEncode(device.ServiceName), common.ValidateDeviceSubscribeTopic) + responseTopicPrefix := common.BuildTopic(baseTopic, common.ResponseTopic, common.URLEncode(device.ServiceName)) requestEnvelope := types.NewMessageEnvelopeForRequest(requestBytes, nil) lc.Debugf("Sending Device Validation request for device=%s, CorrelationId=%s to topic: %s", device.Name, requestEnvelope.CorrelationID, requestTopic) @@ -136,13 +136,13 @@ func publishSystemEvent(eventType, action, owner string, dto any, ctx context.Co systemEvent.Source, systemEvent.Type, systemEvent.Action, - systemEvent.Owner, + common.URLEncode(systemEvent.Owner), ) if profileName != "" { publishTopic = common.BuildTopic( publishTopic, - profileName, + common.URLEncode(profileName), ) } diff --git a/internal/core/metadata/application/notify_test.go b/internal/core/metadata/application/notify_test.go index 96c049e66f..bdce0efd9c 100644 --- a/internal/core/metadata/application/notify_test.go +++ b/internal/core/metadata/application/notify_test.go @@ -171,8 +171,8 @@ func TestPublishSystemEvent(t *testing.T) { common.CoreMetaDataServiceKey, test.Type, test.Action, - test.Owner, - expectedDevice.ProfileName) + common.URLEncode(test.Owner), + common.URLEncode(expectedDevice.ProfileName)) mockClient.AssertCalled(t, "Publish", mock.Anything, expectedTopic) if test.PubError {