Skip to content

Commit

Permalink
refactor!: Update message bus topic wild cards (#1291)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: use MQTT wild cards + for single level and # for multiple levels

Signed-off-by: Ginny Guan <ginny@iotechsys.com>
  • Loading branch information
jinlinGuan authored Feb 7, 2023
1 parent 83561e9 commit 3ca42d5
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 26 deletions.
2 changes: 1 addition & 1 deletion app-service-template/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/eclipse/paho.mqtt.golang v1.4.2 // indirect
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.11 // indirect
github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.2 // indirect
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.4 // indirect
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.6 // indirect
github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3 // indirect
github.com/edgexfoundry/go-mod-secrets/v3 v3.0.0-dev.2 // indirect
github.com/fatih/color v1.9.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions app-service-template/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.2 h1:xp5MsP+qf/fuJxy8
github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.2/go.mod h1:1Vv4uWAo6r7k6jUlqVJW8JOL6YKVBc6sRL8Al3DrMck=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.6 h1:RQFs/HjVOi1X3YxJ8sm4vuX8nhKgH0caSf9RtjQvdeI=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.6/go.mod h1:7RwSq896VqelvSU7zYKs2tpZhgELVFECkiGf6XGLKfQ=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.4 h1:swPZOjoQ/IUIWSJpZCmQENtP/plFRx5tgiCEZgnfxFU=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.4/go.mod h1:8pxuYvh2zcq1GuKqmk1MAuH1yuN40iOMmL0g2myIfwk=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.6 h1:JaCP/iw7ahuBCCLuZG9Z2JDDRgQa9V+lZ6ZHZtSb+yQ=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.6/go.mod h1:1Vtp3Zwsie1ODeF2CjHbp6Vhgjmx4URyCQ4rJHQg89I=
github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3 h1:QgZF9f70Cwpvkjw3tP1aiVGHc+yNFJNzW6hO8pDs3fg=
github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3/go.mod h1:2w8v0sv+i21nY+DY6JV4PFxsNTuxpGAjlNFlFMTfZkk=
github.com/edgexfoundry/go-mod-secrets/v3 v3.0.0-dev.2 h1:Lu1gJr2fAUTuogE/SwgVgGpxDlMsC4PLE0Y8oXRUvkI=
Expand Down
4 changes: 2 additions & 2 deletions app-service-template/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (app *myApp) CreateAndRunAppService(serviceKey string, newServiceFactory fu
// Note: This example with default above causes Events from Random-Float-Device device to be processed twice
// resulting in the XML to be published back to the MessageBus twice.
// See https://docs.edgexfoundry.org/latest/microservices/application/AdvancedTopics/#pipeline-per-topics for more details.
err = app.service.AddFunctionsPipelineForTopics("Floats", []string{"edgex/events/#/#/Random-Float-Device/#"},
err = app.service.AddFunctionsPipelineForTopics("Floats", []string{"edgex/events/+/+/Random-Float-Device/#"},
transforms.NewFilterFor(deviceNames).FilterByDeviceName,
sample.LogEventDetails,
sample.ConvertEventToXML,
Expand All @@ -133,7 +133,7 @@ func (app *myApp) CreateAndRunAppService(serviceKey string, newServiceFactory fu
}
// Note: This example with default above causes Events from Int32 source to be processed twice
// resulting in the XML to be published back to the MessageBus twice.
err = app.service.AddFunctionsPipelineForTopics("Int32s", []string{"edgex/events/#/#/#/Int32"},
err = app.service.AddFunctionsPipelineForTopics("Int32s", []string{"edgex/events/+/+/+/Int32"},
transforms.NewFilterFor(deviceNames).FilterByDeviceName,
sample.LogEventDetails,
sample.ConvertEventToXML,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/eclipse/paho.mqtt.golang v1.4.2
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.11
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.6
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.4
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.6
github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3
github.com/fxamacker/cbor/v2 v2.4.0
github.com/gomodule/redigo v1.8.9
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.2 h1:xp5MsP+qf/fuJxy8
github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.2/go.mod h1:1Vv4uWAo6r7k6jUlqVJW8JOL6YKVBc6sRL8Al3DrMck=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.6 h1:RQFs/HjVOi1X3YxJ8sm4vuX8nhKgH0caSf9RtjQvdeI=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.6/go.mod h1:7RwSq896VqelvSU7zYKs2tpZhgELVFECkiGf6XGLKfQ=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.4 h1:swPZOjoQ/IUIWSJpZCmQENtP/plFRx5tgiCEZgnfxFU=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.4/go.mod h1:8pxuYvh2zcq1GuKqmk1MAuH1yuN40iOMmL0g2myIfwk=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.6 h1:JaCP/iw7ahuBCCLuZG9Z2JDDRgQa9V+lZ6ZHZtSb+yQ=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.6/go.mod h1:1Vtp3Zwsie1ODeF2CjHbp6Vhgjmx4URyCQ4rJHQg89I=
github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3 h1:QgZF9f70Cwpvkjw3tP1aiVGHc+yNFJNzW6hO8pDs3fg=
github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3/go.mod h1:2w8v0sv+i21nY+DY6JV4PFxsNTuxpGAjlNFlFMTfZkk=
github.com/edgexfoundry/go-mod-secrets/v3 v3.0.0-dev.2 h1:Lu1gJr2fAUTuogE/SwgVgGpxDlMsC4PLE0Y8oXRUvkI=
Expand Down
10 changes: 7 additions & 3 deletions internal/runtime/runtime.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//
// Copyright (c) 2022 Intel Corporation
// Copyright (c) 2021 One Track Consulting
// Copyright (C) 2023 IOTech Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,8 +49,9 @@ import (
)

const (
TopicWildCard = "#"
TopicLevelSeparator = "/"
TopicWildCard = "#"
TopicSingleLevelWildcard = "+"
TopicLevelSeparator = "/"
)

func NewFunctionPipeline(id string, topics []string, transforms []interfaces.AppFunction) interfaces.FunctionPipeline {
Expand Down Expand Up @@ -484,7 +486,7 @@ func topicMatches(incomingTopic string, pipelineTopics []string) bool {
return true
}

wildcardCount := strings.Count(pipelineTopic, TopicWildCard)
wildcardCount := strings.Count(pipelineTopic, TopicWildCard) + strings.Count(pipelineTopic, TopicSingleLevelWildcard)
switch wildcardCount {
case 0:
if incomingTopic == pipelineTopic {
Expand All @@ -501,6 +503,8 @@ func topicMatches(incomingTopic string, pipelineTopics []string) bool {
for index, level := range pipelineLevels {
if level == TopicWildCard {
incomingLevels[index] = TopicWildCard
} else if level == TopicSingleLevelWildcard {
incomingLevels[index] = TopicSingleLevelWildcard
}
}

Expand Down
31 changes: 16 additions & 15 deletions internal/runtime/runtime_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright (c) 2022 Intel Corporation
// Copyright (c) 2021 One Track Consulting
// Copyright (C) 2023 IOTech Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -561,29 +562,29 @@ func TestTopicMatches(t *testing.T) {
{"Match - Default all", incomingTopic, []string{TopicWildCard}, true},
{"Match - Not First Topic", incomingTopic, []string{"not-edgex/#", TopicWildCard}, true},
{"Match - Exact", incomingTopic, []string{incomingTopic}, true},
{"Match - Any Profile for Device and Source", incomingTopic, []string{"edgex/events/#/D/S"}, true},
{"Match - Any Profile for Device and Source", incomingTopic, []string{"edgex/events/#/D/S"}, true},
{"Match - Any Device for Profile and Source", incomingTopic, []string{"edgex/events/P/#/S"}, true},
{"Match - Any Profile for Device and Source", incomingTopic, []string{"edgex/events/+/D/S"}, true},
{"Match - Any Profile for Device and Source", incomingTopic, []string{"edgex/events/+/D/S"}, true},
{"Match - Any Device for Profile and Source", incomingTopic, []string{"edgex/events/P/+/S"}, true},
{"Match - Any Source for Profile and Device", incomingTopic, []string{"edgex/events/P/D/#"}, true},
{"Match - All Events ", incomingTopic, []string{"edgex/events/#"}, true},
{"Match - First Topic Deeper ", incomingTopic, []string{"edgex/events/P/D/S/Z", "edgex/events/#"}, true},
{"Match - All Devices and Sources for Profile ", incomingTopic, []string{"edgex/events/P/#"}, true},
{"Match - All Sources for Profile and Device ", incomingTopic, []string{"edgex/events/P/D/#"}, true},
{"Match - All Sources for a Device for any Profile ", incomingTopic, []string{"edgex/events/#/D/#"}, true},
{"Match - Source for any Profile and any Device ", incomingTopic, []string{"edgex/events/#/#/S"}, true},
{"NoMatch - SourceX for any Profile and any Device ", incomingTopic, []string{"edgex/events/#/#/Sx"}, false},
{"NoMatch - All Sources for DeviceX and any Profile ", incomingTopic, []string{"edgex/events/#/Dx/#"}, false},
{"Match - All Sources for a Device for any Profile ", incomingTopic, []string{"edgex/events/+/D/#"}, true},
{"Match - Source for any Profile and any Device ", incomingTopic, []string{"edgex/events/+/+/S"}, true},
{"NoMatch - SourceX for any Profile and any Device ", incomingTopic, []string{"edgex/events/+/+/Sx"}, false},
{"NoMatch - All Sources for DeviceX and any Profile ", incomingTopic, []string{"edgex/events/+/Dx/#"}, false},
{"NoMatch - All Sources for ProfileX and Device ", incomingTopic, []string{"edgex/events/Px/D/#"}, false},
{"NoMatch - All Sources for Profile and DeviceX ", incomingTopic, []string{"edgex/events/P/Dx/#"}, false},
{"NoMatch - All Sources for ProfileX and DeviceX ", incomingTopic, []string{"edgex/events/Px/Dx/#"}, false},
{"NoMatch - All Devices and Sources for ProfileX ", incomingTopic, []string{"edgex/events/Px/#"}, false},
{"NoMatch - Any Profile for DeviceX and Source", incomingTopic, []string{"edgex/events/#/Dx/S"}, false},
{"NoMatch - Any Profile for DeviceX and Source", incomingTopic, []string{"edgex/events/#/Dx/S"}, false},
{"NoMatch - Any Profile for Device and SourceX", incomingTopic, []string{"edgex/events/#/D/Sx"}, false},
{"NoMatch - Any Profile for DeviceX and SourceX", incomingTopic, []string{"edgex/events/#/Dx/Sx"}, false},
{"NoMatch - Any Device for Profile and SourceX", incomingTopic, []string{"edgex/events/P/#/Sx"}, false},
{"NoMatch - Any Device for ProfileX and Source", incomingTopic, []string{"edgex/events/Px/#/S"}, false},
{"NoMatch - Any Device for ProfileX and SourceX", incomingTopic, []string{"edgex/events/Px/#/Sx"}, false},
{"NoMatch - Any Profile for DeviceX and Source", incomingTopic, []string{"edgex/events/+/Dx/S"}, false},
{"NoMatch - Any Profile for DeviceX and Source", incomingTopic, []string{"edgex/events/+/Dx/S"}, false},
{"NoMatch - Any Profile for Device and SourceX", incomingTopic, []string{"edgex/events/+/D/Sx"}, false},
{"NoMatch - Any Profile for DeviceX and SourceX", incomingTopic, []string{"edgex/events/+/Dx/Sx"}, false},
{"NoMatch - Any Device for Profile and SourceX", incomingTopic, []string{"edgex/events/P/+/Sx"}, false},
{"NoMatch - Any Device for ProfileX and Source", incomingTopic, []string{"edgex/events/Px/+/S"}, false},
{"NoMatch - Any Device for ProfileX and SourceX", incomingTopic, []string{"edgex/events/Px/+/Sx"}, false},
{"NoMatch - Any Source for ProfileX and Device", incomingTopic, []string{"edgex/events/Px/D/#"}, false},
{"NoMatch - Any Source for Profile and DeviceX", incomingTopic, []string{"edgex/events/P/Dx/#"}, false},
{"NoMatch - Any Source for ProfileX and DeviceX", incomingTopic, []string{"edgex/events/Px/Dx/#"}, false},
Expand Down Expand Up @@ -638,7 +639,7 @@ func TestGetMatchingPipelines(t *testing.T) {
transforms.NewResponseData().SetResponseData,
}

err := target.AddFunctionsPipeline("one", []string{"edgex/events/#/D1/#"}, expectedTransforms)
err := target.AddFunctionsPipeline("one", []string{"edgex/events/+/D1/#"}, expectedTransforms)
require.NoError(t, err)
err = target.AddFunctionsPipeline("two", []string{"edgex/events/P1/#"}, expectedTransforms)
require.NoError(t, err)
Expand Down

0 comments on commit 3ca42d5

Please sign in to comment.