Skip to content

Commit

Permalink
feat: Expect V2 Event DTO from triggers.
Browse files Browse the repository at this point in the history
Triggers and appropriate pipeline functions now expect a V2 Event DTO.

V1 Event model temporarily supported by converting V1 Event to a V2 Event DTO.

closes #595

Signed-off-by: lenny <leonard.goodell@intel.com>
  • Loading branch information
lenny committed Dec 29, 2020
1 parent fb6bb54 commit ddfe917
Show file tree
Hide file tree
Showing 13 changed files with 449 additions and 291 deletions.
92 changes: 62 additions & 30 deletions appcontext/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,24 @@
package appcontext

import (
syscontext "context"
"context"
"errors"
"fmt"
"time"

"github.com/edgexfoundry/app-functions-sdk-go/internal/common"
"github.com/edgexfoundry/app-functions-sdk-go/pkg/util"

"github.com/edgexfoundry/go-mod-bootstrap/bootstrap/interfaces"
"github.com/edgexfoundry/go-mod-core-contracts/clients"
"github.com/edgexfoundry/go-mod-core-contracts/clients/command"
"github.com/edgexfoundry/go-mod-core-contracts/clients/coredata"
"github.com/edgexfoundry/go-mod-core-contracts/clients/logger"
"github.com/edgexfoundry/go-mod-core-contracts/clients/notifications"
"github.com/edgexfoundry/go-mod-core-contracts/models"
"github.com/edgexfoundry/go-mod-core-contracts/v2"
"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
commonDTO "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/common"
"github.com/google/uuid"
)

Expand All @@ -55,7 +59,7 @@ type Context struct {
EventClient coredata.EventClient
// ValueDescriptorClient exposes Core Data's ValueDescriptor API
ValueDescriptorClient coredata.ValueDescriptorClient
// CommandClient exposes Core Commands's Command API
// CommandClient exposes Core Commands' Command API
CommandClient command.CommandClient
// NotificationsClient exposes Support Notification's Notifications API
NotificationsClient notifications.NotificationsClient
Expand All @@ -69,39 +73,39 @@ type Context struct {

// Complete is optional and provides a way to return the specified data.
// In the case of an HTTP Trigger, the data will be returned as the http response.
// In the case of the message bus trigger, the data will be placed on the specifed
// In the case of the message bus trigger, the data will be placed on the specified
// message bus publish topic and host in the configuration.
func (context *Context) Complete(output []byte) {
context.OutputData = output
func (appContext *Context) Complete(output []byte) {
appContext.OutputData = output
}

// MarkAsPushed will make a request to CoreData to mark the event that triggered the pipeline as pushed.
func (context *Context) MarkAsPushed() error {
context.LoggingClient.Debug("Marking event as pushed")
if context.EventClient == nil {
func (appContext *Context) MarkAsPushed() error {
appContext.LoggingClient.Debug("Marking event as pushed")
if appContext.EventClient == nil {
return fmt.Errorf("unable to Mark As Pushed: '%s' is missing from Clients configuration", common.CoreDataClientName)
}

if context.EventID != "" {
return context.EventClient.MarkPushed(syscontext.WithValue(syscontext.Background(), clients.CorrelationHeader, context.CorrelationID), context.EventID)
} else if context.EventChecksum != "" {
return context.EventClient.MarkPushedByChecksum(syscontext.WithValue(syscontext.Background(), clients.CorrelationHeader, context.CorrelationID), context.EventChecksum)
if appContext.EventID != "" {
return appContext.EventClient.MarkPushed(context.WithValue(context.Background(), clients.CorrelationHeader, appContext.CorrelationID), appContext.EventID)
} else if appContext.EventChecksum != "" {
return appContext.EventClient.MarkPushedByChecksum(context.WithValue(context.Background(), clients.CorrelationHeader, appContext.CorrelationID), appContext.EventChecksum)
} else {
return errors.New("No EventID or EventChecksum Provided")
}
}

// SetRetryData sets the RetryData to the specified payload to be stored for later retry
// when the pipeline function returns an error.
func (context *Context) SetRetryData(payload []byte) {
context.RetryData = payload
func (appContext *Context) SetRetryData(payload []byte) {
appContext.RetryData = payload
}

// PushToCoreData pushes the provided value as an event to CoreData using the device name and reading name that have been set. If validation is turned on in
// CoreServices then your deviceName and readingName must exist in the CoreMetadata and be properly registered in EdgeX.
func (context *Context) PushToCoreData(deviceName string, readingName string, value interface{}) (*models.Event, error) {
context.LoggingClient.Debug("Pushing to CoreData")
if context.EventClient == nil {
func (appContext *Context) PushToCoreData(deviceName string, readingName string, value interface{}) (*dtos.Event, error) {
appContext.LoggingClient.Debug("Pushing to CoreData")
if appContext.EventClient == nil {
return nil, fmt.Errorf("unable to Push To CoreData: '%s' is missing from Clients configuration", common.CoreDataClientName)
}

Expand All @@ -110,36 +114,64 @@ func (context *Context) PushToCoreData(deviceName string, readingName string, va
if err != nil {
return nil, err
}
newReading := models.Reading{
Value: string(val),
Origin: now,
Device: deviceName,
Name: readingName,

// Temporary use V1 Reading until V2 EventClient is available
// TODO: Change to use dtos.Reading
v1Reading := models.Reading{
Value: string(val),
ValueType: v2.ValueTypeString,
Origin: now,
Device: deviceName,
Name: readingName,
}

readings := make([]models.Reading, 0, 1)
readings = append(readings, newReading)
readings = append(readings, v1Reading)

newEdgeXEvent := &models.Event{
// Temporary use V1 Event until V2 EventClient is available
// TODO: Change to use dtos.Event
v1Event := &models.Event{
Device: deviceName,
Origin: now,
Readings: readings,
}

correlation := uuid.New().String()
ctx := syscontext.WithValue(syscontext.Background(), clients.CorrelationHeader, correlation)
result, err := context.EventClient.Add(ctx, newEdgeXEvent)
ctx := context.WithValue(context.Background(), clients.CorrelationHeader, correlation)
result, err := appContext.EventClient.Add(ctx, v1Event) // TODO: Update to use V2 EventClient
if err != nil {
return nil, err
}
newEdgeXEvent.ID = result
return newEdgeXEvent, nil
v1Event.ID = result

// TODO: Remove once V2 EventClient is available
v2Reading := dtos.BaseReading{
Versionable: commonDTO.NewVersionable(),
Id: v1Reading.Id,
Created: v1Reading.Created,
Origin: v1Reading.Origin,
DeviceName: v1Reading.Device,
ResourceName: v1Reading.Name,
ProfileName: "",
ValueType: v1Reading.ValueType,
SimpleReading: dtos.SimpleReading{Value: v1Reading.Value},
}

// TODO: Remove once V2 EventClient is available
v2Event := dtos.Event{
Versionable: commonDTO.NewVersionable(),
Id: result,
DeviceName: v1Event.Device,
Origin: v1Event.Origin,
Readings: []dtos.BaseReading{v2Reading},
}
return &v2Event, nil
}

// GetSecrets retrieves secrets from a secret store.
// path specifies the type or location of the secrets to retrieve.
// keys specifies the secrets which to retrieve. If no keys are provided then all the keys associated with the
// specified path will be returned.
func (context *Context) GetSecrets(path string, keys ...string) (map[string]string, error) {
return context.SecretProvider.GetSecrets(path, keys...)
func (appContext *Context) GetSecrets(path string, keys ...string) (map[string]string, error) {
return appContext.SecretProvider.GetSecrets(path, keys...)
}
40 changes: 24 additions & 16 deletions appcontext/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import (
"github.com/edgexfoundry/go-mod-core-contracts/clients/coredata"
"github.com/edgexfoundry/go-mod-core-contracts/clients/logger"
localURL "github.com/edgexfoundry/go-mod-core-contracts/clients/urlclient/local"
"github.com/edgexfoundry/go-mod-core-contracts/models"
"github.com/edgexfoundry/go-mod-core-contracts/v2"
"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/common"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -113,27 +116,32 @@ func TestPushToCore(t *testing.T) {
EventClient: eventClient,
LoggingClient: lc,
}
newEvent := &models.Event{
ID: "newId",
Device: "device-name",
Origin: 1567802840199266000,
Readings: []models.Reading{
expectedEvent := &dtos.Event{
Versionable: common.NewVersionable(),
DeviceName: "device-name",
Readings: []dtos.BaseReading{
{
Device: "device-name",
Name: "device-resource",
Value: "value",
BinaryValue: []uint8(nil),
Versionable: common.NewVersionable(),
DeviceName: "device-name",
ResourceName: "device-resource",
ValueType: v2.ValueTypeString,
SimpleReading: dtos.SimpleReading{
Value: "value",
},
},
},
}
result, err := ctx.PushToCoreData("device-name", "device-resource", "value")
actualEvent, err := ctx.PushToCoreData("device-name", "device-resource", "value")
require.NoError(t, err)

assert.NotNil(t, result)
assert.Equal(t, newEvent.ID, result.ID)
assert.Equal(t, newEvent.Device, result.Device)
assert.Equal(t, newEvent.Readings[0].Name, result.Readings[0].Name)
assert.Equal(t, newEvent.Readings[0].Value, result.Readings[0].Value)
assert.NotNil(t, actualEvent)
assert.Equal(t, expectedEvent.ApiVersion, actualEvent.ApiVersion)
assert.Equal(t, expectedEvent.DeviceName, actualEvent.DeviceName)
assert.Equal(t, expectedEvent.Readings[0].DeviceName, actualEvent.Readings[0].DeviceName)
assert.Equal(t, expectedEvent.Readings[0].ResourceName, actualEvent.Readings[0].ResourceName)
assert.Equal(t, expectedEvent.Readings[0].Value, actualEvent.Readings[0].Value)
assert.Equal(t, expectedEvent.Readings[0].ValueType, actualEvent.Readings[0].ValueType)
assert.Equal(t, expectedEvent.Readings[0].ApiVersion, actualEvent.Readings[0].ApiVersion)
}

func TestMarkAsPushedEventId(t *testing.T) {
Expand Down
101 changes: 86 additions & 15 deletions internal/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,24 @@ import (
"net/http"
"reflect"
"strconv"
"strings"
"sync"

"github.com/edgexfoundry/go-mod-core-contracts/clients"
"github.com/edgexfoundry/go-mod-core-contracts/clients/logger"
"github.com/edgexfoundry/go-mod-core-contracts/models"
v2 "github.com/edgexfoundry/go-mod-core-contracts/v2"
"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
commonDTO "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/common"
"github.com/fxamacker/cbor/v2"

"github.com/edgexfoundry/app-functions-sdk-go/appcontext"
"github.com/edgexfoundry/app-functions-sdk-go/internal/common"
dbInterfaces "github.com/edgexfoundry/app-functions-sdk-go/internal/store/db/interfaces"

"github.com/edgexfoundry/go-mod-bootstrap/bootstrap/interfaces"
"github.com/edgexfoundry/go-mod-core-contracts/clients"
"github.com/edgexfoundry/go-mod-core-contracts/models"

"github.com/edgexfoundry/go-mod-messaging/pkg/types"
"github.com/fxamacker/cbor/v2"
)

const unmarshalErrorMessage = "Unable to unmarshal message payload as %s"
Expand All @@ -59,7 +66,7 @@ func (gr *GolangRuntime) ProcessMessage(edgexcontext *appcontext.Context, envelo
edgexcontext.LoggingClient.Debug("Processing message: " + strconv.Itoa(len(gr.transforms)) + " Transforms")

if gr.TargetType == nil {
gr.TargetType = &models.Event{}
gr.TargetType = &dtos.Event{}
}

if reflect.TypeOf(gr.TargetType).Kind() != reflect.Ptr {
Expand All @@ -74,6 +81,8 @@ func (gr *GolangRuntime) ProcessMessage(edgexcontext *appcontext.Context, envelo
// Only set when the data is binary so function receiving it knows how to deal with it.
var contentType string

_, expectingV2Event := target.(*dtos.Event)

switch target.(type) {
case *[]byte:
target = &envelope.Payload
Expand All @@ -83,7 +92,21 @@ func (gr *GolangRuntime) ProcessMessage(edgexcontext *appcontext.Context, envelo
switch envelope.ContentType {
case clients.ContentTypeJSON:

if err := json.Unmarshal([]byte(envelope.Payload), target); err != nil {
// This block is temporary until fully switched over to V2 Event DTO
// TODO: remove once fully switched over to V2 Event DTO
if expectingV2Event {
// Check if received V1 event which will not contain the apiVersion field
if !strings.Contains(string(envelope.Payload), "apiVersion") {
var err error
target, err = gr.unmarshalV1EventToV2Event(envelope.Payload, edgexcontext.LoggingClient)
if err != nil {
return &MessageError{Err: err, ErrorCode: http.StatusBadRequest}
}
break
}
}

if err := json.Unmarshal(envelope.Payload, target); err != nil {
message := fmt.Sprintf(unmarshalErrorMessage, "JSON")
edgexcontext.LoggingClient.Error(
message, "error", err.Error(),
Expand All @@ -92,14 +115,8 @@ func (gr *GolangRuntime) ProcessMessage(edgexcontext *appcontext.Context, envelo
return &MessageError{Err: err, ErrorCode: http.StatusBadRequest}
}

event, ok := target.(*models.Event)
if ok {
// Needed for Marking event as handled
edgexcontext.EventID = event.ID
}

case clients.ContentTypeCBOR:
err := cbor.Unmarshal([]byte(envelope.Payload), target)
err := cbor.Unmarshal(envelope.Payload, target)
if err != nil {
message := fmt.Sprintf(unmarshalErrorMessage, "CBOR")
edgexcontext.LoggingClient.Error(
Expand All @@ -109,9 +126,6 @@ func (gr *GolangRuntime) ProcessMessage(edgexcontext *appcontext.Context, envelo
return &MessageError{Err: err, ErrorCode: http.StatusBadRequest}
}

// Needed for Marking event as handled
edgexcontext.EventChecksum = envelope.Checksum

default:
message := "content type for input data not supported"
edgexcontext.LoggingClient.Error(message,
Expand All @@ -122,6 +136,16 @@ func (gr *GolangRuntime) ProcessMessage(edgexcontext *appcontext.Context, envelo
}
}

if expectingV2Event {
if err := v2.Validate(target); err != nil {
edgexcontext.LoggingClient.Error(
"Event DTO failed validation", "error", err.Error(),
clients.CorrelationHeader, envelope.CorrelationID)
err = fmt.Errorf("Event DTO failed validation: %s", err.Error())
return &MessageError{Err: err, ErrorCode: http.StatusBadRequest}
}
}

edgexcontext.CorrelationID = envelope.CorrelationID

// All functions expect an object, not a pointer to an object, so must use reflection to
Expand Down Expand Up @@ -205,3 +229,50 @@ func (gr *GolangRuntime) StartStoreAndForward(

gr.storeForward.startStoreAndForwardRetryLoop(appWg, appCtx, enabledWg, enabledCtx, serviceKey, config, edgeXClients)
}

// TODO: Remove when completely switched to V2 Event DTO
func (gr *GolangRuntime) unmarshalV1EventToV2Event(payload []byte, lc logger.LoggingClient) (*dtos.Event, error) {
v1Event := models.Event{}
lc.Trace("Attempting to unmarshal V1 Event model and convert to V2 Event DTO")
if err := json.Unmarshal(payload, &v1Event); err != nil {
lc.Error("Unable to unmarshal payload as V1 Event model", "error", err)
return nil, err
}

v2Event := dtos.Event{
Versionable: commonDTO.NewVersionable(),
Id: v1Event.ID,
DeviceName: v1Event.Device,
ProfileName: "Unknown",
Created: v1Event.Created,
Origin: v1Event.Origin,
Tags: v1Event.Tags,
}

for _, v1Reading := range v1Event.Readings {
v2Reading := dtos.BaseReading{
Versionable: commonDTO.NewVersionable(),
Id: v1Reading.Id,
Created: v1Reading.Created,
Origin: v1Reading.Origin,
DeviceName: v1Reading.Device,
ResourceName: v1Reading.Name,
ProfileName: "unknown",
ValueType: v1Reading.ValueType,
}

if v1Reading.ValueType == v2.ValueTypeBinary {
v2Reading.BinaryValue = v1Reading.BinaryValue
} else {
v2Reading.Value = v1Reading.Value
}

if v1Reading.FloatEncoding == models.Base64Encoding {
// TODO: convert float value to e-notation
}

v2Event.Readings = append(v2Event.Readings, v2Reading)
}

return &v2Event, nil
}
Loading

0 comments on commit ddfe917

Please sign in to comment.