Skip to content

Commit

Permalink
Merge pull request #1276 from hahattan/issue-1259
Browse files Browse the repository at this point in the history
feat!: replace REST device callbacks with System Events
  • Loading branch information
cloudxxx8 authored Jan 18, 2023
2 parents 13fe592 + 3f884ed commit f7483b2
Show file tree
Hide file tree
Showing 7 changed files with 1,422 additions and 62 deletions.
1 change: 1 addition & 0 deletions example/cmd/device-simple/res/configuration.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ SecretName = "redisdb"
PublishTopicPrefix = "edgex/events/device" # /<device-profile-name>/<device-name>/<source-name> will be added to this Publish Topic prefix
CommandRequestTopic = "edgex/device/command/request/device-simple/#" # subscribing for inbound command requests
CommandResponseTopicPrefix = "edgex/device/command/response" # publishing outbound command responses; <device-service>/<device-name>/<command-name>/<method> will be added to this publish topic prefix
SystemEventTopic = "edgex/system-events/core-metadata/device/#/device-simple/#"
[MessageBus.Optional]
# Default MQTT Specific options that need to be here to enable environment variable overrides of them
# Client Identifiers
Expand Down
57 changes: 1 addition & 56 deletions internal/controller/http/callback.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2020-2021 IOTech Ltd
// Copyright (C) 2020-2023 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand All @@ -19,61 +19,6 @@ import (
"github.com/edgexfoundry/device-sdk-go/v3/internal/application"
)

func (c *RestController) DeleteDevice(writer http.ResponseWriter, request *http.Request) {
vars := mux.Vars(request)
name := vars[common.Name]

err := application.DeleteDevice(name, c.dic)
if err == nil {
res := commonDTO.NewBaseResponse("", "", http.StatusOK)
c.sendResponse(writer, request, common.ApiDeviceCallbackNameRoute, res, http.StatusOK)
} else {
c.sendEdgexError(writer, request, err, common.ApiDeviceCallbackNameRoute)
}
}

func (c *RestController) AddDevice(writer http.ResponseWriter, request *http.Request) {
defer request.Body.Close()

var addDeviceRequest requests.AddDeviceRequest

err := json.NewDecoder(request.Body).Decode(&addDeviceRequest)
if err != nil {
edgexErr := errors.NewCommonEdgeX(errors.KindServerError, "failed to decode JSON", err)
c.sendEdgexError(writer, request, edgexErr, common.ApiDeviceCallbackRoute)
return
}

edgexErr := application.AddDevice(addDeviceRequest, c.dic)
if edgexErr == nil {
res := commonDTO.NewBaseResponse(addDeviceRequest.RequestId, "", http.StatusOK)
c.sendResponse(writer, request, common.ApiDeviceCallbackRoute, res, http.StatusOK)
} else {
c.sendEdgexError(writer, request, edgexErr, common.ApiDeviceCallbackRoute)
}
}

func (c *RestController) UpdateDevice(writer http.ResponseWriter, request *http.Request) {
defer request.Body.Close()

var updateDeviceRequest requests.UpdateDeviceRequest

err := json.NewDecoder(request.Body).Decode(&updateDeviceRequest)
if err != nil {
edgexErr := errors.NewCommonEdgeX(errors.KindServerError, "failed to decode JSON", err)
c.sendEdgexError(writer, request, edgexErr, common.ApiDeviceCallbackRoute)
return
}

edgexErr := application.UpdateDevice(updateDeviceRequest, c.dic)
if edgexErr == nil {
res := commonDTO.NewBaseResponse(updateDeviceRequest.RequestId, "", http.StatusOK)
c.sendResponse(writer, request, common.ApiDeviceCallbackRoute, res, http.StatusOK)
} else {
c.sendEdgexError(writer, request, edgexErr, common.ApiDeviceCallbackRoute)
}
}

func (c *RestController) UpdateProfile(writer http.ResponseWriter, request *http.Request) {
defer request.Body.Close()

Expand Down
5 changes: 1 addition & 4 deletions internal/controller/http/restrouter.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2017-2018 Canonical Ltd
// Copyright (C) 2018-2022 IOTech Ltd
// Copyright (C) 2018-2023 IOTech Ltd
// Copyright (c) 2019 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
Expand Down Expand Up @@ -68,9 +68,6 @@ func (c *RestController) InitRestRoutes() {
c.addReservedRoute(common.ApiDeviceNameCommandNameRoute, c.GetCommand).Methods(http.MethodGet)
c.addReservedRoute(common.ApiDeviceNameCommandNameRoute, c.SetCommand).Methods(http.MethodPut)
// callback
c.addReservedRoute(common.ApiDeviceCallbackRoute, c.AddDevice).Methods(http.MethodPost)
c.addReservedRoute(common.ApiDeviceCallbackRoute, c.UpdateDevice).Methods(http.MethodPut)
c.addReservedRoute(common.ApiDeviceCallbackNameRoute, c.DeleteDevice).Methods(http.MethodDelete)
c.addReservedRoute(common.ApiProfileCallbackRoute, c.UpdateProfile).Methods(http.MethodPut)
c.addReservedRoute(common.ApiWatcherCallbackRoute, c.AddProvisionWatcher).Methods(http.MethodPost)
c.addReservedRoute(common.ApiWatcherCallbackRoute, c.UpdateProvisionWatcher).Methods(http.MethodPut)
Expand Down
108 changes: 108 additions & 0 deletions internal/controller/messaging/callback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
//
// Copyright (C) 2023 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package messaging

import (
"context"
"encoding/json"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v3/di"
"github.com/edgexfoundry/go-mod-core-contracts/v3/common"
"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos"
"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos/requests"
"github.com/edgexfoundry/go-mod-core-contracts/v3/errors"
"github.com/edgexfoundry/go-mod-messaging/v3/pkg/types"

"github.com/edgexfoundry/device-sdk-go/v3/internal/application"
"github.com/edgexfoundry/device-sdk-go/v3/internal/container"
)

const SystemEventTopic = "SystemEventTopic"

func DeviceCallback(ctx context.Context, dic *di.Container) errors.EdgeX {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
messageBusInfo := container.ConfigurationFrom(dic.Get).MessageBus
systemEventTopic := messageBusInfo.Topics[SystemEventTopic]

messages := make(chan types.MessageEnvelope)
messageErrors := make(chan error)
topics := []types.TopicChannel{
{
Topic: systemEventTopic,
Messages: messages,
},
}

messageBus := bootstrapContainer.MessagingClientFrom(dic.Get)
err := messageBus.Subscribe(topics, messageErrors)
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
}

go func() {
for {
select {
case <-ctx.Done():
lc.Infof("Exiting waiting for MessageBus '%s' topic messages", systemEventTopic)
return
case err = <-messageErrors:
lc.Error(err.Error())
case msgEnvelope := <-messages:
lc.Debugf("System event received on message queue. Topic: %s, Correlation-id: %s ", systemEventTopic, msgEnvelope.CorrelationID)

var systemEvent dtos.SystemEvent
err := json.Unmarshal(msgEnvelope.Payload, &systemEvent)
if err != nil {
lc.Errorf("failed to JSON decoding system event: %s", err.Error())
continue
}

serviceName := container.DeviceServiceFrom(dic.Get).Name
if systemEvent.Owner != serviceName {
lc.Errorf("unmatched system event owner %s with service name %s", systemEvent.Owner, serviceName)
continue
}

if systemEvent.Type != common.DeviceSystemEventType {
lc.Errorf("unknown system event type %s", systemEvent.Type)
continue
}

var device dtos.Device
err = systemEvent.DecodeDetails(&device)
if err != nil {
lc.Errorf("failed to decode system event details: %s", err.Error())
continue
}

switch systemEvent.Action {
case common.DeviceSystemEventActionAdd:
err = application.AddDevice(requests.NewAddDeviceRequest(device), dic)
if err != nil {
lc.Error(err.Error(), common.CorrelationHeader, msgEnvelope.CorrelationID)
}
case common.DeviceSystemEventActionUpdate:
deviceModel := dtos.ToDeviceModel(device)
updateDeviceDTO := dtos.FromDeviceModelToUpdateDTO(deviceModel)
err = application.UpdateDevice(requests.NewUpdateDeviceRequest(updateDeviceDTO), dic)
if err != nil {
lc.Error(err.Error(), common.CorrelationHeader, msgEnvelope.CorrelationID)
}
case common.DeviceSystemEventActionDelete:
err = application.DeleteDevice(device.Name, dic)
if err != nil {
lc.Error(err.Error(), common.CorrelationHeader, msgEnvelope.CorrelationID)
}
default:
lc.Errorf("unknown device system event action %s", systemEvent.Action)
}
}
}
}()

return nil
}
3 changes: 3 additions & 0 deletions openapi/v3/changes.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
API changes from v2

* Remove /callback/device, /callback/device/name/{name} endpoints
Loading

0 comments on commit f7483b2

Please sign in to comment.