Skip to content

Commit

Permalink
Merge pull request #1356 from hahattan/issue-1271
Browse files Browse the repository at this point in the history
feat: publish event with updated value for PUT command
  • Loading branch information
cloudxxx8 authored Mar 13, 2023
2 parents 84b3eaa + 420f04f commit 2dcc4a6
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 45 deletions.
68 changes: 40 additions & 28 deletions internal/application/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,33 +63,34 @@ func GetCommand(ctx context.Context, deviceName string, commandName string, quer
return res, nil
}

func SetCommand(ctx context.Context, deviceName string, commandName string, queryParams string, requests map[string]any, dic *di.Container) errors.EdgeX {
func SetCommand(ctx context.Context, deviceName string, commandName string, queryParams string, requests map[string]any, dic *di.Container) (*dtos.Event, errors.EdgeX) {
if deviceName == "" {
return errors.NewCommonEdgeX(errors.KindContractInvalid, "device name is empty", nil)
return nil, errors.NewCommonEdgeX(errors.KindContractInvalid, "device name is empty", nil)
}
if commandName == "" {
return errors.NewCommonEdgeX(errors.KindContractInvalid, "command is empty", nil)
return nil, errors.NewCommonEdgeX(errors.KindContractInvalid, "command is empty", nil)
}

device, err := validateServiceAndDeviceState(deviceName, dic)
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
return nil, errors.NewCommonEdgeXWrapper(err)
}

var event *dtos.Event
_, cmdExist := cache.Profiles().DeviceCommand(device.ProfileName, commandName)
if cmdExist {
err = writeDeviceCommand(device, commandName, queryParams, requests, dic)
event, err = writeDeviceCommand(device, commandName, queryParams, requests, dic)
} else {
err = writeDeviceResource(device, commandName, queryParams, requests, dic)
event, err = writeDeviceResource(device, commandName, queryParams, requests, dic)
}

if err != nil {
return errors.NewCommonEdgeXWrapper(err)
return nil, errors.NewCommonEdgeXWrapper(err)
}

lc := bootstrapContainer.LoggingClientFrom(dic.Get)
lc.Debugf("SET Device Command successfully. Device: %s, Source: %s, %s: %s", deviceName, commandName, common.CorrelationHeader, utils.FromContext(ctx, common.CorrelationHeader))
return nil
return event, nil
}

func readDeviceResource(device models.Device, resourceName string, attributes string, dic *di.Container) (res *dtos.Event, edgexErr errors.EdgeX) {
Expand Down Expand Up @@ -128,7 +129,8 @@ func readDeviceResource(device models.Device, resourceName string, attributes st
}

// convert CommandValue to Event
res, edgexErr = transformer.CommandValuesToEventDTO(results, device.Name, dr.Name, dic)
configuration := container.ConfigurationFrom(dic.Get)
res, edgexErr = transformer.CommandValuesToEventDTO(results, device.Name, dr.Name, configuration.Device.DataTransform, dic)
if edgexErr != nil {
return res, errors.NewCommonEdgeX(errors.KindServerError, "failed to convert CommandValue to Event", err)
}
Expand Down Expand Up @@ -185,24 +187,24 @@ func readDeviceCommand(device models.Device, commandName string, attributes stri
}

// convert CommandValue to Event
res, edgexErr = transformer.CommandValuesToEventDTO(results, device.Name, dc.Name, dic)
res, edgexErr = transformer.CommandValuesToEventDTO(results, device.Name, dc.Name, configuration.Device.DataTransform, dic)
if edgexErr != nil {
return res, errors.NewCommonEdgeX(errors.KindServerError, "failed to transform CommandValue to Event", edgexErr)
}

return res, nil
}

func writeDeviceResource(device models.Device, resourceName string, attributes string, requests map[string]any, dic *di.Container) errors.EdgeX {
func writeDeviceResource(device models.Device, resourceName string, attributes string, requests map[string]any, dic *di.Container) (*dtos.Event, errors.EdgeX) {
dr, ok := cache.Profiles().DeviceResource(device.ProfileName, resourceName)
if !ok {
errMsg := fmt.Sprintf("deviceResource %s not found", resourceName)
return errors.NewCommonEdgeX(errors.KindEntityDoesNotExist, errMsg, nil)
return nil, errors.NewCommonEdgeX(errors.KindEntityDoesNotExist, errMsg, nil)
}
// check deviceResource is not read-only
if dr.Properties.ReadWrite == common.ReadWrite_R {
errMsg := fmt.Sprintf("deviceResource %s is marked as read-only", dr.Name)
return errors.NewCommonEdgeX(errors.KindNotAllowed, errMsg, nil)
return nil, errors.NewCommonEdgeX(errors.KindNotAllowed, errMsg, nil)
}

// check set parameters contains provided deviceResource
Expand All @@ -212,14 +214,14 @@ func writeDeviceResource(device models.Device, resourceName string, attributes s
v = dr.Properties.DefaultValue
} else {
errMsg := fmt.Sprintf("deviceResource %s not found in request body and no default value defined", dr.Name)
return errors.NewCommonEdgeX(errors.KindServerError, errMsg, nil)
return nil, errors.NewCommonEdgeX(errors.KindServerError, errMsg, nil)
}
}

// create CommandValue
cv, edgexErr := createCommandValueFromDeviceResource(dr, v)
if edgexErr != nil {
return errors.NewCommonEdgeX(errors.Kind(edgexErr), "failed to create CommandValue", edgexErr)
return nil, errors.NewCommonEdgeX(errors.Kind(edgexErr), "failed to create CommandValue", edgexErr)
}

// prepare CommandRequest
Expand All @@ -239,7 +241,7 @@ func writeDeviceResource(device models.Device, resourceName string, attributes s
if configuration.Device.DataTransform {
edgexErr = transformer.TransformWriteParameter(cv, dr.Properties)
if edgexErr != nil {
return errors.NewCommonEdgeX(errors.KindContractInvalid, "failed to transform set parameter", edgexErr)
return nil, errors.NewCommonEdgeX(errors.KindContractInvalid, "failed to transform set parameter", edgexErr)
}
}

Expand All @@ -248,28 +250,33 @@ func writeDeviceResource(device models.Device, resourceName string, attributes s
err := driver.HandleWriteCommands(device.Name, device.Protocols, reqs, []*sdkModels.CommandValue{cv})
if err != nil {
errMsg := fmt.Sprintf("error writing DeviceResource %s for %s", dr.Name, device.Name)
return errors.NewCommonEdgeX(errors.KindServerError, errMsg, err)
return nil, errors.NewCommonEdgeX(errors.KindServerError, errMsg, err)
}

return nil
// Updated resource value will be published to MessageBus as long as it's not write-only
if dr.Properties.ReadWrite != common.ReadWrite_W {
return transformer.CommandValuesToEventDTO([]*sdkModels.CommandValue{cv}, device.Name, resourceName, false, dic)
}

return nil, nil
}

func writeDeviceCommand(device models.Device, commandName string, attributes string, requests map[string]any, dic *di.Container) errors.EdgeX {
func writeDeviceCommand(device models.Device, commandName string, attributes string, requests map[string]any, dic *di.Container) (*dtos.Event, errors.EdgeX) {
dc, ok := cache.Profiles().DeviceCommand(device.ProfileName, commandName)
if !ok {
errMsg := fmt.Sprintf("deviceCommand %s not found", commandName)
return errors.NewCommonEdgeX(errors.KindEntityDoesNotExist, errMsg, nil)
return nil, errors.NewCommonEdgeX(errors.KindEntityDoesNotExist, errMsg, nil)
}
// check deviceCommand is not read-only
if dc.ReadWrite == common.ReadWrite_R {
errMsg := fmt.Sprintf("deviceCommand %s is marked as read-only", dc.Name)
return errors.NewCommonEdgeX(errors.KindNotAllowed, errMsg, nil)
return nil, errors.NewCommonEdgeX(errors.KindNotAllowed, errMsg, nil)
}
// check ResourceOperation count does not exceed MaxCmdOps defined in configuration
configuration := container.ConfigurationFrom(dic.Get)
if len(dc.ResourceOperations) > configuration.Device.MaxCmdOps {
errMsg := fmt.Sprintf("SET command %s exceed device %s MaxCmdOps (%d)", dc.Name, device.Name, configuration.Device.MaxCmdOps)
return errors.NewCommonEdgeX(errors.KindServerError, errMsg, nil)
return nil, errors.NewCommonEdgeX(errors.KindServerError, errMsg, nil)
}

// create CommandValues
Expand All @@ -280,7 +287,7 @@ func writeDeviceCommand(device models.Device, commandName string, attributes str
dr, ok := cache.Profiles().DeviceResource(device.ProfileName, drName)
if !ok {
errMsg := fmt.Sprintf("deviceResource %s in SET commnd %s for %s not defined", drName, dc.Name, device.Name)
return errors.NewCommonEdgeX(errors.KindServerError, errMsg, nil)
return nil, errors.NewCommonEdgeX(errors.KindServerError, errMsg, nil)
}

// check request body contains the deviceResource
Expand All @@ -292,7 +299,7 @@ func writeDeviceCommand(device models.Device, commandName string, attributes str
value = dr.Properties.DefaultValue
} else {
errMsg := fmt.Sprintf("deviceResource %s not found in request body and no default value defined", dr.Name)
return errors.NewCommonEdgeX(errors.KindServerError, errMsg, nil)
return nil, errors.NewCommonEdgeX(errors.KindServerError, errMsg, nil)
}
}

Expand All @@ -312,7 +319,7 @@ func writeDeviceCommand(device models.Device, commandName string, attributes str
if err == nil {
cvs = append(cvs, cv)
} else {
return errors.NewCommonEdgeX(errors.Kind(err), "failed to create CommandValue", err)
return nil, errors.NewCommonEdgeX(errors.Kind(err), "failed to create CommandValue", err)
}
}

Expand All @@ -335,7 +342,7 @@ func writeDeviceCommand(device models.Device, commandName string, attributes str
if configuration.Device.DataTransform {
err := transformer.TransformWriteParameter(cv, dr.Properties)
if err != nil {
return errors.NewCommonEdgeX(errors.KindContractInvalid, "failed to transform set parameter", err)
return nil, errors.NewCommonEdgeX(errors.KindContractInvalid, "failed to transform set parameter", err)
}
}
}
Expand All @@ -345,10 +352,15 @@ func writeDeviceCommand(device models.Device, commandName string, attributes str
err := driver.HandleWriteCommands(device.Name, device.Protocols, reqs, cvs)
if err != nil {
errMsg := fmt.Sprintf("error writing DeviceCommand %s for %s", dc.Name, device.Name)
return errors.NewCommonEdgeX(errors.KindServerError, errMsg, err)
return nil, errors.NewCommonEdgeX(errors.KindServerError, errMsg, err)
}

// Updated resource(s) value will be published to MessageBus as long as they're not write-only
if dc.ReadWrite != common.ReadWrite_W {
return transformer.CommandValuesToEventDTO(cvs, device.Name, commandName, false, dic)
}

return nil
return nil, nil
}

func validateServiceAndDeviceState(deviceName string, dic *di.Container) (models.Device, errors.EdgeX) {
Expand Down
3 changes: 2 additions & 1 deletion internal/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ package common

import (
"context"
"net/url"

"github.com/edgexfoundry/device-sdk-go/v3/internal/cache"
"github.com/edgexfoundry/device-sdk-go/v3/internal/container"
"net/url"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container"
bootstrapInterfaces "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/interfaces"
Expand Down
9 changes: 7 additions & 2 deletions internal/controller/http/command.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2020-2022 IOTech Ltd
// Copyright (C) 2020-2023 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand Down Expand Up @@ -83,12 +83,17 @@ func (c *RestController) SetCommand(w http.ResponseWriter, r *http.Request) {
return
}

err = application.SetCommand(ctx, deviceName, commandName, queryParams, requestParamsMap, c.dic)
event, err := application.SetCommand(ctx, deviceName, commandName, queryParams, requestParamsMap, c.dic)
if err != nil {
c.sendEdgexError(w, r, err, common.ApiDeviceNameCommandNameRoute)
return
}

if event != nil {
correlationId := utils.FromContext(ctx, common.CorrelationHeader)
go sdkCommon.SendEvent(event, correlationId, c.dic)
}

res := commonDTO.NewBaseResponse("", "", http.StatusOK)
c.sendResponse(w, r, common.ApiDeviceNameCommandNameRoute, res, http.StatusOK)
}
Expand Down
31 changes: 29 additions & 2 deletions internal/controller/http/command_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2022 IOTech Ltd
// Copyright (C) 2022-2023 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand All @@ -12,6 +12,7 @@ import (
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container"
Expand All @@ -23,12 +24,14 @@ import (
commonDTO "github.com/edgexfoundry/go-mod-core-contracts/v3/dtos/common"
"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos/responses"
"github.com/edgexfoundry/go-mod-core-contracts/v3/models"
messagingMocks "github.com/edgexfoundry/go-mod-messaging/v3/messaging/mocks"
"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/edgexfoundry/device-sdk-go/v3/internal/cache"
sdkCommon "github.com/edgexfoundry/device-sdk-go/v3/internal/common"
"github.com/edgexfoundry/device-sdk-go/v3/internal/config"
"github.com/edgexfoundry/device-sdk-go/v3/internal/container"
"github.com/edgexfoundry/device-sdk-go/v3/pkg/interfaces/mocks"
Expand Down Expand Up @@ -318,12 +321,13 @@ func TestRestController_GetCommand_ReturnEvent(t *testing.T) {
}

func TestRestController_SetCommand(t *testing.T) {
validRequest := map[string]any{testResource: "value"}
validRequest := map[string]any{testResource: "value", writeOnlyResource: "value"}
invalidRequest := map[string]any{"invalid": "test"}
emptyValueRequest := map[string]any{objectResource: ""}

dic := mockDic()

sdkCommon.InitializeSentMetrics(logger.NewMockClient(), dic)
err := cache.InitCache(testService, dic)
require.NoError(t, err)

Expand All @@ -338,8 +342,10 @@ func TestRestController_SetCommand(t *testing.T) {
expectedStatusCode int
}{
{"valid - device resource", testDevice, testResource, validRequest, http.StatusOK},
{"valid - write-only device resource", testDevice, writeOnlyResource, validRequest, http.StatusOK},
{"valid - device resource not specified in request body but default value provided", testDevice, testResource, invalidRequest, http.StatusOK},
{"valid - device command", testDevice, testCommand, validRequest, http.StatusOK},
{"valid - write-only device command", testDevice, writeOnlyCommand, validRequest, http.StatusOK},
{"valid - device command not specified in request body but default value provided", testDevice, testCommand, invalidRequest, http.StatusOK},
{"invalid - device name parameter is empty", "", testResource, validRequest, http.StatusBadRequest},
{"invalid - command is empty", testDevice, "", validRequest, http.StatusBadRequest},
Expand All @@ -363,6 +369,22 @@ func TestRestController_SetCommand(t *testing.T) {
req = mux.SetURLVars(req, map[string]string{common.Name: testCase.deviceName, common.Command: testCase.commandName})
require.NoError(t, err)

var wg sync.WaitGroup
if testCase.commandName != writeOnlyCommand && testCase.commandName != writeOnlyResource {
wg.Add(1)
}
messagingClientMock := &messagingMocks.MessageClient{}
messagingClientMock.On("Publish", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
go func() {
defer wg.Done()
}()
}).Return(nil)
dic.Update(di.ServiceConstructorMap{
bootstrapContainer.MessagingClientName: func(get di.Get) any {
return messagingClientMock
},
})

// Act
recorder := httptest.NewRecorder()
handler := http.HandlerFunc(controller.SetCommand)
Expand All @@ -378,6 +400,11 @@ func TestRestController_SetCommand(t *testing.T) {
assert.Equal(t, testCase.expectedStatusCode, res.StatusCode, "Response status code not as expected")
if testCase.expectedStatusCode == http.StatusOK {
assert.Empty(t, res.Message, "Message should be empty when it is successful")

wg.Wait()
if testCase.commandName != writeOnlyCommand && testCase.commandName != writeOnlyResource {
messagingClientMock.AssertNumberOfCalls(t, "Publish", 1)
}
} else {
assert.NotEmpty(t, res.Message, "Response message doesn't contain the error message")
}
Expand Down
8 changes: 6 additions & 2 deletions internal/controller/messaging/command.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2022 IOTech Ltd
// Copyright (C) 2022-2023 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand Down Expand Up @@ -183,7 +183,7 @@ func setCommand(ctx context.Context, msgEnvelope types.MessageEnvelope, response

// TODO: fix properly in EdgeX 3.0
ctx = context.WithValue(ctx, common.CorrelationHeader, msgEnvelope.CorrelationID) // nolint: staticcheck
edgexErr := application.SetCommand(ctx, deviceName, commandName, rawQuery, requestPayload, dic)
event, edgexErr := application.SetCommand(ctx, deviceName, commandName, rawQuery, requestPayload, dic)
if edgexErr != nil {
lc.Errorf("Failed to process set device command %s for device %s: %s", commandName, deviceName, edgexErr.Error())
responseEnvelope = types.NewMessageEnvelopeWithError(msgEnvelope.RequestID, edgexErr.Error())
Expand All @@ -210,6 +210,10 @@ func setCommand(ctx context.Context, msgEnvelope types.MessageEnvelope, response
lc.Errorf("Failed to publish command response: %s", err.Error())
return
}

if event != nil {
go sdkCommon.SendEvent(event, msgEnvelope.CorrelationID, dic)
}
}

func filterQueryParams(queries map[string]string) (string, bool, bool) {
Expand Down
Loading

0 comments on commit 2dcc4a6

Please sign in to comment.