Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ability to wrap data into an event #1154

Merged
merged 3 commits into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 66 additions & 2 deletions internal/app/configurable.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (c) 2021 Intel Corporation
// Copyright (c) 2022 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -24,7 +24,6 @@ import (
"github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces"
"github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/transforms"
"github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/util"

"github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger"
"github.com/edgexfoundry/go-mod-core-contracts/v2/common"
)
Expand Down Expand Up @@ -257,6 +256,71 @@ func (app *Configurable) PushToCore(parameters map[string]string) interfaces.App
return transform.PushToCoreData
}

// WrapIntoEvent wraps the provided value as an event to CoreData using the configured event/reading metadata that have been
ejlee3 marked this conversation as resolved.
Show resolved Hide resolved
// set. The new Event/Reading is returned to the next pipeline function. This function is a configuration function and
// returns a function pointer.
func (app *Configurable) WrapIntoEvent(parameters map[string]string) interfaces.AppFunction {
profileName, ok := parameters[ProfileName]
if !ok {
app.lc.Errorf("Could not find %s", ProfileName)
return nil
}
deviceName, ok := parameters[DeviceName]
if !ok {
app.lc.Errorf("Could not find %s", DeviceName)
return nil
}
resourceName, ok := parameters[ResourceName]
if !ok {
app.lc.Errorf("Could not find %s", ResourceName)
return nil
}
valueType, ok := parameters[ValueType]
if !ok {
app.lc.Errorf("Could not find %s", ValueType)
return nil
}

profileName = strings.TrimSpace(profileName)
deviceName = strings.TrimSpace(deviceName)
resourceName = strings.TrimSpace(resourceName)
valueType = strings.TrimSpace(valueType)

var transform *transforms.EventWrapper

// Converts to upper case and validates it is a valid ValueType
valueType, err := common.NormalizeValueType(valueType)
if err != nil {
app.lc.Error(err.Error())
return nil
}

switch valueType {
case common.ValueTypeBinary:
mediaType, ok := parameters[MediaType]
if !ok {
app.lc.Error("Could not find " + MediaType)
return nil
}

mediaType = strings.TrimSpace(mediaType)

if len(mediaType) == 0 {
app.lc.Error("MediaType can not be empty when ValueType=Binary")
return nil
}

transform = transforms.NewEventWrapperBinaryReading(profileName, deviceName, resourceName, mediaType)
case common.ValueTypeObject:
transform = transforms.NewEventWrapperObjectReading(profileName, deviceName, resourceName)

default:
transform = transforms.NewEventWrapperSimpleReading(profileName, deviceName, resourceName, valueType)
}

return transform.Wrap
}

// Compress compresses data received as either a string,[]byte, or json.Marshaller using the specified algorithm (GZIP or ZLIB)
// and returns a base64 encoded string as a []byte.
// This function is a configuration function and returns a function pointer.
Expand Down
120 changes: 120 additions & 0 deletions pkg/transforms/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
//
// Copyright (c) 2022 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package transforms

import (
"fmt"

"github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces"
"github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/util"
"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/requests"

"github.com/edgexfoundry/go-mod-core-contracts/v2/common"
"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
)

type EventWrapper struct {
profileName string
deviceName string
resourceName string
valueType string
mediaType string
}

// NewEventWrapperSimpleReading is provided to interact with EventWrapper to add a simple reading
func NewEventWrapperSimpleReading(profileName string, deviceName string, resourceName string, valueType string) *EventWrapper {
eventWrapper := &EventWrapper{
profileName: profileName,
deviceName: deviceName,
resourceName: resourceName,
valueType: valueType,
}
return eventWrapper
}

// NewEventWrapperBinaryReading is provided to interact with EventWrapper to add a binary reading
func NewEventWrapperBinaryReading(profileName string, deviceName string, resourceName string, mediaType string) *EventWrapper {
eventWrapper := &EventWrapper{
profileName: profileName,
deviceName: deviceName,
resourceName: resourceName,
valueType: common.ValueTypeBinary,
mediaType: mediaType,
}
return eventWrapper
}

// NewEventWrapperObjectReading is provided to interact with EventWrapper to add a object reading type
func NewEventWrapperObjectReading(profileName string, deviceName string, resourceName string) *EventWrapper {
eventWrapper := &EventWrapper{
profileName: profileName,
deviceName: deviceName,
resourceName: resourceName,
valueType: common.ValueTypeObject,
}
return eventWrapper
}

// // Wrap creates an EventRequest using the Event/Reading metadata that have been set.
func (ew *EventWrapper) Wrap(ctx interfaces.AppFunctionContext, data interface{}) (bool, interface{}) {
ctx.LoggingClient().Info("Creating Event...")

if data == nil {
return false, fmt.Errorf("function EventWrapper in pipeline '%s': No Data Received", ctx.PipelineId())
}

event := dtos.NewEvent(ew.profileName, ew.deviceName, ew.resourceName)

switch ew.valueType {
case common.ValueTypeBinary:
reading, err := util.CoerceType(data)
if err != nil {
return false, err
}
event.AddBinaryReading(ew.resourceName, reading, ew.mediaType)

case common.ValueTypeString:
reading, err := util.CoerceType(data)
if err != nil {
return false, err
}
err = event.AddSimpleReading(ew.resourceName, ew.valueType, string(reading))
if err != nil {
return false, fmt.Errorf("error adding Reading in pipeline '%s': %s", ctx.PipelineId(), err.Error())
}

case common.ValueTypeObject:
event.AddObjectReading(ew.resourceName, data)

default:
err := event.AddSimpleReading(ew.resourceName, ew.valueType, data)
if err != nil {
return false, fmt.Errorf("error adding Reading in pipeline '%s': %s", ctx.PipelineId(), err.Error())
}
}

// unsetting content type to send back as event
ctx.SetResponseContentType("")
ctx.AddValue(interfaces.PROFILENAME, ew.profileName)
ctx.AddValue(interfaces.DEVICENAME, ew.deviceName)
ctx.AddValue(interfaces.SOURCENAME, ew.resourceName)

// need to wrap in Add Event Request for Core Data to process it if published to the MessageBus
eventRequest := requests.NewAddEventRequest(event)

return true, eventRequest
}
92 changes: 92 additions & 0 deletions pkg/transforms/event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
//
// Copyright (c) 2022 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package transforms

import (
"strconv"
"testing"

"github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces"
"github.com/edgexfoundry/go-mod-core-contracts/v2/common"
"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/requests"
"github.com/stretchr/testify/require"
)

func TestEventWrapper_Wrap(t *testing.T) {
obj := struct {
myInt int
myStr string
}{
myInt: 3,
myStr: "hello world",
}
tests := []struct {
Name string
ProfileName string
DeviceName string
ResourceName string
ValueType string
MediaType string
Data interface{}
ExpectedError interface{}
}{
{"Successful Binary Reading", "MyProfile", "MyDevice", "BinaryEvent", common.ValueTypeBinary, "stuff", true, ""},
{"Successful Object Reading", "MyProfile", "MyDevice", "ObjectEvent", common.ValueTypeObject, "", obj, ""},
{"Successful Simple Reading", "MyProfile", "MyDevice", "ObjectEvent", common.ValueTypeString, "", "hello there", ""},
}

for _, test := range tests {
var transform *EventWrapper
switch test.ValueType {
case common.ValueTypeBinary:
transform = NewEventWrapperBinaryReading(test.ProfileName, test.DeviceName, test.ResourceName, test.MediaType)
case common.ValueTypeObject:
transform = NewEventWrapperObjectReading(test.ProfileName, test.DeviceName, test.ResourceName)
default:
transform = NewEventWrapperSimpleReading(test.ProfileName, test.DeviceName, test.ResourceName, test.ValueType)
}
actualBool, actualInterface := transform.Wrap(ctx, test.Data)
if test.ExpectedError == "" {
require.True(t, actualBool)
require.Equal(t, "", ctx.ResponseContentType())
ctxValues := ctx.GetAllValues()
require.Equal(t, test.ProfileName, ctxValues[interfaces.PROFILENAME])
require.Equal(t, test.DeviceName, ctxValues[interfaces.DEVICENAME])
require.Equal(t, test.ResourceName, ctxValues[interfaces.SOURCENAME])
eventRequest := actualInterface.(requests.AddEventRequest)
require.Equal(t, test.DeviceName, eventRequest.Event.DeviceName)
require.Equal(t, test.ProfileName, eventRequest.Event.ProfileName)
require.Equal(t, test.ResourceName, eventRequest.Event.SourceName)
require.Equal(t, test.DeviceName, eventRequest.Event.Readings[0].DeviceName)
require.Equal(t, test.ProfileName, eventRequest.Event.Readings[0].ProfileName)
require.Equal(t, test.ResourceName, eventRequest.Event.Readings[0].ResourceName)
switch test.ValueType {
case common.ValueTypeBinary:
value, err := strconv.ParseBool(string(eventRequest.Event.Readings[0].BinaryReading.BinaryValue))
require.NoError(t, err)
require.Equal(t, test.Data, value)
case common.ValueTypeObject:
require.Equal(t, test.Data, eventRequest.Event.Readings[0].ObjectReading.ObjectValue)
default:
require.Equal(t, test.Data, eventRequest.Event.Readings[0].SimpleReading.Value)
}
return
}
require.False(t, actualBool)
require.Equal(t, test.ExpectedError, actualInterface)
}
}