Skip to content

Commit

Permalink
feat: Accept Url escape in API path
Browse files Browse the repository at this point in the history
Use escaped for message bus topic
- service name
- profile name
- device name

Close edgexfoundry#4053

Signed-off-by: bruce <weichou1229@gmail.com>
  • Loading branch information
weichou1229 committed Sep 4, 2023
1 parent 23e3a0f commit 67d489b
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 31 deletions.
21 changes: 17 additions & 4 deletions internal/core/command/controller/messaging/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ func commandQueryHandler(dic *di.Container) mqtt.MessageHandler {
// example topic scheme: edgex/commandquery/request/<device-name>
// deviceName is expected to be at last topic level.
topicLevels := strings.Split(message.Topic(), "/")
deviceName := topicLevels[len(topicLevels)-1]
deviceName, err := url.PathUnescape(topicLevels[len(topicLevels)-1])
if err != nil {
lc.Errorf("Failed to unescape device name '%s': %s", deviceName, err.Error())
lc.Warn("Not publishing error message back due to insufficient information on response topic")
return
}
if strings.EqualFold(deviceName, common.All) {
deviceName = common.All
}
Expand Down Expand Up @@ -113,8 +118,14 @@ func commandRequestHandler(requestTimeout time.Duration, dic *di.Container) mqtt

// expected external command request/response topic scheme: #/<device-name>/<command-name>/<method>
deviceName := topicLevels[length-3]
unescapedDeviceName, err := url.PathUnescape(deviceName)
if err != nil {
lc.Errorf("Failed to unescape device name '%s': %s", deviceName, err.Error())
lc.Warn("Not publishing error message back due to insufficient information on response topic")
return
}
commandName := topicLevels[length-2]
_, err = url.QueryUnescape(topicLevels[length-2])
unescapedCommandName, err := url.PathUnescape(commandName)
if err != nil {
lc.Errorf("Failed to unescape command name '%s': %s", commandName, err.Error())
lc.Warn("Not publishing error message back due to insufficient information on response topic")
Expand All @@ -132,7 +143,7 @@ func commandRequestHandler(requestTimeout time.Duration, dic *di.Container) mqtt
internalBaseTopic := container.ConfigurationFrom(dic.Get).MessageBus.GetBaseTopicPrefix()
topicPrefix := common.BuildTopic(internalBaseTopic, common.CoreCommandDeviceRequestPublishTopic)

deviceServiceName, deviceRequestTopic, err := validateRequestTopic(topicPrefix, deviceName, commandName, method, dic)
deviceServiceName, err := retrieveServiceNameByDevice(unescapedDeviceName, dic)
if err != nil {
responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error())
publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc)
Expand All @@ -146,7 +157,9 @@ func commandRequestHandler(requestTimeout time.Duration, dic *di.Container) mqtt
return
}

deviceResponseTopicPrefix := common.BuildTopic(internalBaseTopic, common.ResponseTopic, deviceServiceName)
// escape again to ensure that the topic is valid in the internal message bus
deviceRequestTopic := common.BuildTopic(topicPrefix, common.URLEncode(deviceServiceName), common.URLEncode(unescapedDeviceName), common.URLEncode(unescapedCommandName), method)
deviceResponseTopicPrefix := common.BuildTopic(internalBaseTopic, common.ResponseTopic, common.URLEncode(deviceServiceName))

lc.Debugf("Sending Command request to internal MessageBus. Topic: %s, Request-id: %s Correlation-id: %s", deviceRequestTopic, requestEnvelope.RequestID, requestEnvelope.CorrelationID)
lc.Debugf("Expecting response on topic: %s/%s", deviceResponseTopicPrefix, requestEnvelope.RequestID)
Expand Down
21 changes: 17 additions & 4 deletions internal/core/command/controller/messaging/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,14 @@ func processDeviceCommandRequest(

// expected internal command request/response topic scheme: #/<device>/<command-name>/<method>
deviceName := topicLevels[length-3]
commandName, err := url.QueryUnescape(topicLevels[length-2])
unescapedDeviceName, err := url.PathUnescape(deviceName)
if err != nil {
lc.Errorf("Failed to unescape device name '%s': %s", deviceName, err.Error())
lc.Warn("Not publishing error message back due to insufficient information on response topic")
return
}
commandName := topicLevels[length-2]
unescapedCommandName, err := url.PathUnescape(commandName)
if err != nil {
err = fmt.Errorf("failed to unescape command name '%s': %s", commandName, err.Error())
lc.Error(err.Error())
Expand All @@ -124,7 +131,7 @@ func processDeviceCommandRequest(

topicPrefix := common.BuildTopic(baseTopic, common.CoreCommandDeviceRequestPublishTopic)
// internal command request topic scheme: <DeviceRequestTopicPrefix>/<device-service>/<device>/<command-name>/<method>
deviceServiceName, deviceRequestTopic, err := validateRequestTopic(topicPrefix, deviceName, commandName, method, dic)
deviceServiceName, err := retrieveServiceNameByDevice(unescapedDeviceName, dic)
if err != nil {
err = fmt.Errorf("invalid request topic: %s", err.Error())
lc.Error(err.Error())
Expand All @@ -147,7 +154,8 @@ func processDeviceCommandRequest(
return
}

deviceResponseTopicPrefix := common.BuildTopic(baseTopic, common.ResponseTopic, deviceServiceName)
deviceRequestTopic := common.BuildTopic(topicPrefix, common.URLEncode(deviceServiceName), common.URLEncode(unescapedDeviceName), common.URLEncode(unescapedCommandName), method)
deviceResponseTopicPrefix := common.BuildTopic(baseTopic, common.ResponseTopic, common.URLEncode(deviceServiceName))

lc.Debugf("Sending Command Device Request to internal MessageBus. Topic: %s, Correlation-id: %s", deviceRequestTopic, requestEnvelope.CorrelationID)
lc.Debugf("Expecting response on topic: %s/%s", deviceResponseTopicPrefix, requestEnvelope.RequestID)
Expand Down Expand Up @@ -228,7 +236,12 @@ func processCommandQueryRequest(
// example topic scheme: /commandquery/request/<device>
// deviceName is expected to be at last topic level.
topicLevels := strings.Split(requestEnvelope.ReceivedTopic, "/")
deviceName := topicLevels[len(topicLevels)-1]
deviceName, err := url.PathUnescape(topicLevels[len(topicLevels)-1])
if err != nil {
lc.Errorf("Failed to unescape device name '%s': %s", deviceName, err.Error())
lc.Warn("Not publishing error message back due to insufficient information on response topic")
return
}
if strings.EqualFold(deviceName, common.All) {
deviceName = common.All
}
Expand Down
4 changes: 2 additions & 2 deletions internal/core/command/controller/messaging/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ func TestSubscribeCommandRequests(t *testing.T) {
expectedDevice := "device1"
expectedResource := "resource"
expectedMethod := "get"
expectedDeviceResponseTopicPrefix := strings.Join([]string{expectedResponseTopicPrefix, expectedServiceName}, "/")
expectedDeviceResponseTopicPrefix := strings.Join([]string{expectedResponseTopicPrefix, common.URLEncode(expectedServiceName)}, "/")
expectedCommandResponseTopic := strings.Join([]string{expectedResponseTopicPrefix, common.CoreCommandServiceKey, expectedRequestId}, "/")
expectedCommandRequestSubscribeTopic := common.BuildTopic(baseTopic, common.CoreCommandRequestSubscribeTopic)
expectedCommandRequestReceivedTopic := common.BuildTopic(strings.Replace(expectedCommandRequestSubscribeTopic, "/#", "", 1),
expectedServiceName, expectedDevice, expectedResource, expectedMethod)
expectedDeviceCommandRequestRequestTopic := common.BuildTopic(baseTopic, common.CoreCommandDeviceRequestPublishTopic, expectedServiceName, expectedDevice, expectedResource, expectedMethod)
expectedDeviceCommandRequestRequestTopic := common.BuildTopic(baseTopic, common.CoreCommandDeviceRequestPublishTopic, common.URLEncode(expectedServiceName), common.URLEncode(expectedDevice), common.URLEncode(expectedResource), expectedMethod)
mockLogger := &lcMocks.LoggingClient{}
mockDeviceClient := &mocks2.DeviceClient{}
mockDeviceProfileClient := &mocks2.DeviceProfileClient{}
Expand Down
19 changes: 8 additions & 11 deletions internal/core/command/controller/messaging/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,29 @@ import (
"github.com/edgexfoundry/edgex-go/internal/core/command/application"
)

// validateRequestTopic validates the request topic by checking the existence of device and device service,
// returns the internal device request topic and service name to which the command request will be sent.
func validateRequestTopic(prefix string, deviceName string, commandName string, method string, dic *di.Container) (string, string, error) {
// retrieveServiceNameByDevice validates the existence of device and device service,
// returns the service name to which the command request will be sent.
func retrieveServiceNameByDevice(deviceName string, dic *di.Container) (string, error) {
// retrieve device information through Metadata DeviceClient
dc := bootstrapContainer.DeviceClientFrom(dic.Get)
if dc == nil {
return "", "", errors.New("nil Device Client")
return "", errors.New("nil Device Client")
}
deviceResponse, err := dc.DeviceByName(context.Background(), deviceName)
if err != nil {
return "", "", fmt.Errorf("failed to get Device by name %s: %v", deviceName, err)
return "", fmt.Errorf("failed to get Device by name %s: %v", deviceName, err)
}

// retrieve device service information through Metadata DeviceClient
dsc := bootstrapContainer.DeviceServiceClientFrom(dic.Get)
if dsc == nil {
return "", "", errors.New("nil DeviceService Client")
return "", errors.New("nil DeviceService Client")
}
deviceServiceResponse, err := dsc.DeviceServiceByName(context.Background(), deviceResponse.Device.ServiceName)
if err != nil {
return "", "", fmt.Errorf("failed to get DeviceService by name %s: %v", deviceResponse.Device.ServiceName, err)
return "", fmt.Errorf("failed to get DeviceService by name %s: %v", deviceResponse.Device.ServiceName, err)
}

// expected internal command request topic scheme: <prefix>/<device-service>/<device>/<command-name>/<method>
return deviceServiceResponse.Service.Name, common.BuildTopic(prefix, deviceServiceResponse.Service.Name, deviceName, commandName, method), nil

return deviceServiceResponse.Service.Name, nil
}

// validateGetCommandQueryParameters validates the value is valid for device service's reserved query parameters
Expand Down
3 changes: 1 addition & 2 deletions internal/core/data/application/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package application
import (
"context"
"fmt"
"net/url"
"strings"

msgTypes "github.com/edgexfoundry/go-mod-messaging/v3/pkg/types"
Expand Down Expand Up @@ -83,7 +82,7 @@ func (a *CoreDataApp) PublishEvent(data []byte, serviceName string, profileName
correlationId := correlation.FromContext(ctx)

basePrefix := configuration.MessageBus.GetBaseTopicPrefix()
publishTopic := common.BuildTopic(basePrefix, common.EventsPublishTopic, CoreDataEventTopicPrefix, serviceName, profileName, deviceName, url.QueryEscape(sourceName))
publishTopic := common.BuildTopic(basePrefix, common.EventsPublishTopic, CoreDataEventTopicPrefix, common.URLEncode(serviceName), common.URLEncode(profileName), common.URLEncode(deviceName), common.URLEncode(sourceName))
lc.Debugf("Publishing AddEventRequest to MessageBus. Topic: %s; %s: %s", publishTopic, common.CorrelationHeader, correlationId)

msgEnvelope := msgTypes.NewMessageEnvelope(data, ctx)
Expand Down
10 changes: 8 additions & 2 deletions internal/core/data/controller/messaging/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,14 @@ func validateEvent(messageTopic string, e dtos.Event) errors.EdgeX {
}

len := len(fields)
profileName := fields[len-3]
deviceName := fields[len-2]
profileName, err := url.PathUnescape(fields[len-3])
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
}
deviceName, err := url.PathUnescape(fields[len-2])
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
}
sourceName, err := url.PathUnescape(fields[len-1])
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
Expand Down
8 changes: 4 additions & 4 deletions internal/core/metadata/application/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func validateDeviceCallback(device dtos.Device, dic *di.Container) errors.EdgeX
}

baseTopic := configuration.MessageBus.GetBaseTopicPrefix()
requestTopic := common.BuildTopic(baseTopic, device.ServiceName, common.ValidateDeviceSubscribeTopic)
responseTopicPrefix := common.BuildTopic(baseTopic, common.ResponseTopic, device.ServiceName)
requestTopic := common.BuildTopic(baseTopic, common.URLEncode(device.ServiceName), common.ValidateDeviceSubscribeTopic)
responseTopicPrefix := common.BuildTopic(baseTopic, common.ResponseTopic, common.URLEncode(device.ServiceName))
requestEnvelope := types.NewMessageEnvelopeForRequest(requestBytes, nil)

lc.Debugf("Sending Device Validation request for device=%s, CorrelationId=%s to topic: %s", device.Name, requestEnvelope.CorrelationID, requestTopic)
Expand Down Expand Up @@ -136,13 +136,13 @@ func publishSystemEvent(eventType, action, owner string, dto any, ctx context.Co
systemEvent.Source,
systemEvent.Type,
systemEvent.Action,
systemEvent.Owner,
common.URLEncode(systemEvent.Owner),
)

if profileName != "" {
publishTopic = common.BuildTopic(
publishTopic,
profileName,
common.URLEncode(profileName),
)
}

Expand Down
4 changes: 2 additions & 2 deletions internal/core/metadata/application/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ func TestPublishSystemEvent(t *testing.T) {
common.CoreMetaDataServiceKey,
test.Type,
test.Action,
test.Owner,
expectedDevice.ProfileName)
common.URLEncode(test.Owner),
common.URLEncode(expectedDevice.ProfileName))
mockClient.AssertCalled(t, "Publish", mock.Anything, expectedTopic)

if test.PubError {
Expand Down

0 comments on commit 67d489b

Please sign in to comment.