From 01248ecd964cc61eb9946e465727e3cb89a41bcf Mon Sep 17 00:00:00 2001 From: lenny Date: Fri, 8 Jan 2021 13:28:01 -0700 Subject: [PATCH] feat: Add unit test for multiple subscriptions. Signed-off-by: lenny --- internal/trigger/messagebus/messaging_test.go | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/internal/trigger/messagebus/messaging_test.go b/internal/trigger/messagebus/messaging_test.go index 7dc9ea02e..f30e78c99 100644 --- a/internal/trigger/messagebus/messaging_test.go +++ b/internal/trigger/messagebus/messaging_test.go @@ -348,3 +348,92 @@ func TestInitializeAndProcessBackgroundMessage(t *testing.T) { } } } + +func TestInitializeAndProcessEventMultipleTopics(t *testing.T) { + config := common.ConfigurationStruct{ + Binding: common.BindingInfo{ + Type: "edgeX-meSsaGebus", + PublishTopic: "", + SubscribeTopics: "t1,t2", + }, + MessageBus: types.MessageBusConfig{ + Type: "zero", + PublishHost: types.HostInfo{ + Host: "*", + Port: 5592, + Protocol: "tcp", + }, + SubscribeHost: types.HostInfo{ + Host: "localhost", + Port: 5594, + Protocol: "tcp", + }, + }, + } + + expectedCorrelationID := "123" + expectedPayload := []byte(`{"id":"TBD","created":1485364897029,"modified":1485364897029,"origin":1471806386919,"pushed":0,"device":"livingroomthermostat","readings":[{"id":"5888dea0bd36573f4681d6f8","created":1485364896983,"modified":1485364896983,"origin":1471806386919,"pushed":0,"name":"temperature","value":"38","device":"livingroomthermostat"}]}`) + var expectedEvent models.Event + _ = json.Unmarshal(expectedPayload, &expectedEvent) + + transformWasCalled := common.AtomicBool{} + + done := make(chan bool) + transform1 := func(edgexcontext *appcontext.Context, params ...interface{}) (bool, interface{}) { + transformWasCalled.Set(true) + require.Equal(t, expectedEvent, params[0]) + done <- true + return false, nil + } + + goRuntime := &runtime.GolangRuntime{} + goRuntime.Initialize(nil, nil) + goRuntime.SetTransforms([]appcontext.AppFunction{transform1}) + trigger := Trigger{Configuration: &config, Runtime: goRuntime, EdgeXClients: common.EdgeXClients{LoggingClient: logClient}} + _, err := trigger.Initialize(&sync.WaitGroup{}, context.Background(), nil) + require.NoError(t, err) + + message := types.MessageEnvelope{ + CorrelationID: expectedCorrelationID, + Payload: expectedPayload, + ContentType: clients.ContentTypeJSON, + } + + testClientConfig := types.MessageBusConfig{ + PublishHost: types.HostInfo{ + Host: "*", + Port: 5594, + Protocol: "tcp", + }, + Type: "zero", + } + + testClient, err := messaging.NewMessageClient(testClientConfig) + require.NoError(t, err, "Unable to create to publisher") + assert.False(t, transformWasCalled.Value()) + + err = testClient.Publish(message, "t1") //transform1 should be called after this executes + require.NoError(t, err, "Failed to publish message") + + select { + case <-done: + // do nothing, just need to fall out. + case <-time.After(3 * time.Second): + // do nothing, just need to fall out. + } + require.True(t, transformWasCalled.Value(), "Transform never called") + + //send message through second topic + transformWasCalled.Set(false) + + err = testClient.Publish(message, "t2") //transform1 should be called after this executes + require.NoError(t, err, "Failed to publish message") + + select { + case <-done: + // do nothing, just need to fall out. + case <-time.After(3 * time.Second): + // do nothing, just need to fall out. + } + require.True(t, transformWasCalled.Value(), "Transform never called") +}