Skip to content

Commit

Permalink
Merge pull request #1312 from jinlinGuan/edgex-go-issue-4273
Browse files Browse the repository at this point in the history
feat!: replace REST device profile callback with System Events
  • Loading branch information
cloudxxx8 authored Feb 13, 2023
2 parents d7de237 + 09198ff commit ec40eb7
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 341 deletions.
2 changes: 1 addition & 1 deletion example/cmd/device-simple/res/configuration.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ SecretName = "redisdb"
PublishTopicPrefix = "edgex/events/device" # /<device-profile-name>/<device-name>/<source-name> will be added to this Publish Topic prefix
DeviceCommandRequestTopic = "edgex/device/command/request/device-simple/#" # subscribing for inbound command requests
ResponseTopicPrefix = "edgex/response" # /<service-name>/<request-id> will be added to this prefix topic
SystemEventTopic = "edgex/system-events/core-metadata/device/+/device-simple/#"
MetadataSystemEventTopic = "edgex/system-events/core-metadata/+/+/device-simple/#"
[MessageBus.Optional]
# Default MQTT Specific options that need to be here to enable environment variable overrides of them
# Client Identifiers
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.18
require (
github.com/OneOfOne/xxhash v1.2.8
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.11
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.9
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.10
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.7
github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3
github.com/google/uuid v1.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.11 h1:PHkcIC9hwOG2XyumsdO
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.11/go.mod h1:UjrW9GZ5UjKZLF1EzEtAjvrgOvgQz3FGyVyAAX+fXW4=
github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.2 h1:xp5MsP+qf/fuJxy8fT7k1N+c4j4C6w04qMCBXm6id7o=
github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.2/go.mod h1:1Vv4uWAo6r7k6jUlqVJW8JOL6YKVBc6sRL8Al3DrMck=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.9 h1:ejafyDHaVCdfKW4IQZeg4n1mBI2JkC1Y1XXelyyj+z8=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.9/go.mod h1:4lpZUM54ZareGU/yuAJvLEw0BoJ43SvCj1LO+gsKm9c=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.10 h1:o5yenvmLn8+0AOz0d5GIek011Tt5ZRbvPlgE4VhozEU=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.10/go.mod h1:4lpZUM54ZareGU/yuAJvLEw0BoJ43SvCj1LO+gsKm9c=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.7 h1:jgDQA/7SENURXQkIX11pNgA/pX9IK9ZULenj/vF17Vw=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.7/go.mod h1:r6Klfz+QBDx1Z5UV0z70MKdK2/cgHwhtqTm2HFXoWug=
github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3 h1:QgZF9f70Cwpvkjw3tP1aiVGHc+yNFJNzW6hO8pDs3fg=
Expand Down
6 changes: 4 additions & 2 deletions internal/application/callback.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2020-2022 IOTech Ltd
// Copyright (C) 2020-2023 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand All @@ -22,7 +22,9 @@ import (
"github.com/edgexfoundry/go-mod-core-contracts/v3/models"
)

func UpdateProfile(profileRequest requests.DeviceProfileRequest, lc logger.LoggingClient) errors.EdgeX {
func UpdateProfile(profileRequest requests.DeviceProfileRequest, dic *di.Container) errors.EdgeX {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)

_, ok := cache.Profiles().ForName(profileRequest.Profile.Name)
if !ok {
errMsg := fmt.Sprintf("failed to find profile %s", profileRequest.Profile.Name)
Expand Down
22 changes: 0 additions & 22 deletions internal/controller/http/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,6 @@ import (
"github.com/edgexfoundry/device-sdk-go/v3/internal/application"
)

func (c *RestController) UpdateProfile(writer http.ResponseWriter, request *http.Request) {
defer request.Body.Close()

var edgexErr errors.EdgeX
var profileRequest requests.DeviceProfileRequest

err := json.NewDecoder(request.Body).Decode(&profileRequest)
if err != nil {
edgexErr = errors.NewCommonEdgeX(errors.KindServerError, "failed to decode JSON", err)
c.sendEdgexError(writer, request, edgexErr, common.ApiProfileCallbackRoute)
return
}

edgexErr = application.UpdateProfile(profileRequest, c.lc)
if edgexErr == nil {
res := commonDTO.NewBaseResponse(profileRequest.RequestId, "", http.StatusOK)
c.sendResponse(writer, request, common.ApiProfileCallbackRoute, res, http.StatusOK)
} else {
c.sendEdgexError(writer, request, edgexErr, common.ApiProfileCallbackRoute)
}
}

func (c *RestController) DeleteProvisionWatcher(writer http.ResponseWriter, request *http.Request) {
vars := mux.Vars(request)
name := vars[common.Name]
Expand Down
1 change: 0 additions & 1 deletion internal/controller/http/restrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func (c *RestController) InitRestRoutes() {
c.addReservedRoute(common.ApiDeviceNameCommandNameRoute, c.GetCommand).Methods(http.MethodGet)
c.addReservedRoute(common.ApiDeviceNameCommandNameRoute, c.SetCommand).Methods(http.MethodPut)
// callback
c.addReservedRoute(common.ApiProfileCallbackRoute, c.UpdateProfile).Methods(http.MethodPut)
c.addReservedRoute(common.ApiWatcherCallbackRoute, c.AddProvisionWatcher).Methods(http.MethodPost)
c.addReservedRoute(common.ApiWatcherCallbackRoute, c.UpdateProvisionWatcher).Methods(http.MethodPut)
c.addReservedRoute(common.ApiWatcherCallbackNameRoute, c.DeleteProvisionWatcher).Methods(http.MethodDelete)
Expand Down
88 changes: 57 additions & 31 deletions internal/controller/messaging/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package messaging
import (
"context"
"encoding/json"
"fmt"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v3/di"
Expand All @@ -21,18 +22,18 @@ import (
"github.com/edgexfoundry/device-sdk-go/v3/internal/container"
)

const SystemEventTopic = "SystemEventTopic"
const MetadataSystemEventTopic = "MetadataSystemEventTopic"

func DeviceCallback(ctx context.Context, dic *di.Container) errors.EdgeX {
func MetadataSystemEventCallback(ctx context.Context, dic *di.Container) errors.EdgeX {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
messageBusInfo := container.ConfigurationFrom(dic.Get).MessageBus
systemEventTopic := messageBusInfo.Topics[SystemEventTopic]
metadataSystemEventTopic := messageBusInfo.Topics[MetadataSystemEventTopic]

messages := make(chan types.MessageEnvelope)
messageErrors := make(chan error)
topics := []types.TopicChannel{
{
Topic: systemEventTopic,
Topic: metadataSystemEventTopic,
Messages: messages,
},
}
Expand All @@ -47,12 +48,12 @@ func DeviceCallback(ctx context.Context, dic *di.Container) errors.EdgeX {
for {
select {
case <-ctx.Done():
lc.Infof("Exiting waiting for MessageBus '%s' topic messages", systemEventTopic)
lc.Infof("Exiting waiting for MessageBus '%s' topic messages", metadataSystemEventTopic)
return
case err = <-messageErrors:
lc.Error(err.Error())
case msgEnvelope := <-messages:
lc.Debugf("System event received on message queue. Topic: %s, Correlation-id: %s ", systemEventTopic, msgEnvelope.CorrelationID)
lc.Debugf("System event received on message queue. Topic: %s, Correlation-id: %s ", metadataSystemEventTopic, msgEnvelope.CorrelationID)

var systemEvent dtos.SystemEvent
err := json.Unmarshal(msgEnvelope.Payload, &systemEvent)
Expand All @@ -67,42 +68,67 @@ func DeviceCallback(ctx context.Context, dic *di.Container) errors.EdgeX {
continue
}

if systemEvent.Type != common.DeviceSystemEventType {
lc.Errorf("unknown system event type %s", systemEvent.Type)
continue
}

var device dtos.Device
err = systemEvent.DecodeDetails(&device)
if err != nil {
lc.Errorf("failed to decode system event details: %s", err.Error())
continue
}

switch systemEvent.Action {
case common.DeviceSystemEventActionAdd:
err = application.AddDevice(requests.NewAddDeviceRequest(device), dic)
if err != nil {
lc.Error(err.Error(), common.CorrelationHeader, msgEnvelope.CorrelationID)
}
case common.DeviceSystemEventActionUpdate:
deviceModel := dtos.ToDeviceModel(device)
updateDeviceDTO := dtos.FromDeviceModelToUpdateDTO(deviceModel)
err = application.UpdateDevice(requests.NewUpdateDeviceRequest(updateDeviceDTO), dic)
switch systemEvent.Type {
case common.DeviceSystemEventType:
err = deviceSystemEventAction(systemEvent, dic)
if err != nil {
lc.Error(err.Error(), common.CorrelationHeader, msgEnvelope.CorrelationID)
}
case common.DeviceSystemEventActionDelete:
err = application.DeleteDevice(device.Name, dic)
case common.DeviceProfileSystemEventType:
err = deviceProfileSystemEventAction(systemEvent, dic)
if err != nil {
lc.Error(err.Error(), common.CorrelationHeader, msgEnvelope.CorrelationID)
}
default:
lc.Errorf("unknown device system event action %s", systemEvent.Action)
lc.Errorf("unknown system event type %s", systemEvent.Type)
continue
}
}
}
}()

return nil
}

func deviceSystemEventAction(systemEvent dtos.SystemEvent, dic *di.Container) error {
var device dtos.Device
err := systemEvent.DecodeDetails(&device)
if err != nil {
return fmt.Errorf("failed to decode %s system event details: %s", systemEvent.Type, err.Error())
}

switch systemEvent.Action {
case common.SystemEventActionAdd:
err = application.AddDevice(requests.NewAddDeviceRequest(device), dic)
case common.SystemEventActionUpdate:
deviceModel := dtos.ToDeviceModel(device)
updateDeviceDTO := dtos.FromDeviceModelToUpdateDTO(deviceModel)
err = application.UpdateDevice(requests.NewUpdateDeviceRequest(updateDeviceDTO), dic)
case common.SystemEventActionDelete:
err = application.DeleteDevice(device.Name, dic)
default:
return fmt.Errorf("unknown %s system event action %s", systemEvent.Type, systemEvent.Action)
}

return err
}

func deviceProfileSystemEventAction(systemEvent dtos.SystemEvent, dic *di.Container) error {
var deviceProfile dtos.DeviceProfile
err := systemEvent.DecodeDetails(&deviceProfile)
if err != nil {
return fmt.Errorf("failed to decode %s system event details: %s", systemEvent.Type, err.Error())
}

switch systemEvent.Action {
case common.SystemEventActionUpdate:
err = application.UpdateProfile(requests.NewDeviceProfileRequest(deviceProfile), dic)
// there is no action needed for Device Profile Add and Delete in Device Service
case common.SystemEventActionAdd, common.SystemEventActionDelete:
break
default:
return fmt.Errorf("unknown %s system event action %s", systemEvent.Type, systemEvent.Action)
}

return err
}
1 change: 1 addition & 0 deletions openapi/v3/changes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ API changes from v2

* Remove /callback/device, /callback/device/name/{name} endpoints
* Change /device/name/{name}/{command} ds-pushevent and ds-returnevent parameter values to true/false
* Remove /callback/profile endpoints
Loading

0 comments on commit ec40eb7

Please sign in to comment.