Skip to content

Commit

Permalink
feat!: replace REST device callbacks with System Events
Browse files Browse the repository at this point in the history
BREAKING CHANGE:
the following device callback REST endpoints are removed:
- POST /callback/device
- PUT /callback/device
- DELETE /callback/device/name/{name}

closes #1259

Signed-off-by: Chris Hung <chris@iotechsys.com>
  • Loading branch information
Chris Hung committed Jan 11, 2023
1 parent 8025d41 commit 96b3ac1
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 62 deletions.
1 change: 1 addition & 0 deletions example/cmd/device-simple/res/configuration.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ PublishTopicPrefix = "edgex/events/device" # /<device-profile-name>/<device-name
[MessageQueue.Topics]
CommandRequestTopic = "edgex/device/command/request/device-simple/#" # subscribing for inbound command requests
CommandResponseTopicPrefix = "edgex/device/command/response" # publishing outbound command responses; <device-service>/<device-name>/<command-name>/<method> will be added to this publish topic prefix
SystemEventTopic = "edgex/system-events/core-metadata/device/#/device-simple/#"

# Example SecretStore configuration.
# Only used when EDGEX_SECURITY_SECRET_STORE=true
Expand Down
57 changes: 1 addition & 56 deletions internal/controller/http/callback.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2020-2021 IOTech Ltd
// Copyright (C) 2020-2023 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand All @@ -19,61 +19,6 @@ import (
"github.com/edgexfoundry/device-sdk-go/v3/internal/application"
)

func (c *RestController) DeleteDevice(writer http.ResponseWriter, request *http.Request) {
vars := mux.Vars(request)
name := vars[common.Name]

err := application.DeleteDevice(name, c.dic)
if err == nil {
res := commonDTO.NewBaseResponse("", "", http.StatusOK)
c.sendResponse(writer, request, common.ApiDeviceCallbackNameRoute, res, http.StatusOK)
} else {
c.sendEdgexError(writer, request, err, common.ApiDeviceCallbackNameRoute)
}
}

func (c *RestController) AddDevice(writer http.ResponseWriter, request *http.Request) {
defer request.Body.Close()

var addDeviceRequest requests.AddDeviceRequest

err := json.NewDecoder(request.Body).Decode(&addDeviceRequest)
if err != nil {
edgexErr := errors.NewCommonEdgeX(errors.KindServerError, "failed to decode JSON", err)
c.sendEdgexError(writer, request, edgexErr, common.ApiDeviceCallbackRoute)
return
}

edgexErr := application.AddDevice(addDeviceRequest, c.dic)
if edgexErr == nil {
res := commonDTO.NewBaseResponse(addDeviceRequest.RequestId, "", http.StatusOK)
c.sendResponse(writer, request, common.ApiDeviceCallbackRoute, res, http.StatusOK)
} else {
c.sendEdgexError(writer, request, edgexErr, common.ApiDeviceCallbackRoute)
}
}

func (c *RestController) UpdateDevice(writer http.ResponseWriter, request *http.Request) {
defer request.Body.Close()

var updateDeviceRequest requests.UpdateDeviceRequest

err := json.NewDecoder(request.Body).Decode(&updateDeviceRequest)
if err != nil {
edgexErr := errors.NewCommonEdgeX(errors.KindServerError, "failed to decode JSON", err)
c.sendEdgexError(writer, request, edgexErr, common.ApiDeviceCallbackRoute)
return
}

edgexErr := application.UpdateDevice(updateDeviceRequest, c.dic)
if edgexErr == nil {
res := commonDTO.NewBaseResponse(updateDeviceRequest.RequestId, "", http.StatusOK)
c.sendResponse(writer, request, common.ApiDeviceCallbackRoute, res, http.StatusOK)
} else {
c.sendEdgexError(writer, request, edgexErr, common.ApiDeviceCallbackRoute)
}
}

func (c *RestController) UpdateProfile(writer http.ResponseWriter, request *http.Request) {
defer request.Body.Close()

Expand Down
5 changes: 1 addition & 4 deletions internal/controller/http/restrouter.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2017-2018 Canonical Ltd
// Copyright (C) 2018-2022 IOTech Ltd
// Copyright (C) 2018-2023 IOTech Ltd
// Copyright (c) 2019 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
Expand Down Expand Up @@ -68,9 +68,6 @@ func (c *RestController) InitRestRoutes() {
c.addReservedRoute(common.ApiDeviceNameCommandNameRoute, c.GetCommand).Methods(http.MethodGet)
c.addReservedRoute(common.ApiDeviceNameCommandNameRoute, c.SetCommand).Methods(http.MethodPut)
// callback
c.addReservedRoute(common.ApiDeviceCallbackRoute, c.AddDevice).Methods(http.MethodPost)
c.addReservedRoute(common.ApiDeviceCallbackRoute, c.UpdateDevice).Methods(http.MethodPut)
c.addReservedRoute(common.ApiDeviceCallbackNameRoute, c.DeleteDevice).Methods(http.MethodDelete)
c.addReservedRoute(common.ApiProfileCallbackRoute, c.UpdateProfile).Methods(http.MethodPut)
c.addReservedRoute(common.ApiWatcherCallbackRoute, c.AddProvisionWatcher).Methods(http.MethodPost)
c.addReservedRoute(common.ApiWatcherCallbackRoute, c.UpdateProvisionWatcher).Methods(http.MethodPut)
Expand Down
108 changes: 108 additions & 0 deletions internal/controller/messaging/callback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
//
// Copyright (C) 2023 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package messaging

import (
"context"
"encoding/json"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v3/di"
"github.com/edgexfoundry/go-mod-core-contracts/v3/common"
"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos"
"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos/requests"
"github.com/edgexfoundry/go-mod-core-contracts/v3/errors"
"github.com/edgexfoundry/go-mod-messaging/v3/pkg/types"

"github.com/edgexfoundry/device-sdk-go/v3/internal/application"
"github.com/edgexfoundry/device-sdk-go/v3/internal/container"
)

const SystemEventTopic = "SystemEventTopic"

func DeviceCallback(ctx context.Context, dic *di.Container) errors.EdgeX {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
messageBusInfo := container.ConfigurationFrom(dic.Get).MessageQueue
systemEventTopic := messageBusInfo.Topics[SystemEventTopic]

messages := make(chan types.MessageEnvelope)
messageErrors := make(chan error)
topics := []types.TopicChannel{
{
Topic: systemEventTopic,
Messages: messages,
},
}

messageBus := bootstrapContainer.MessagingClientFrom(dic.Get)
err := messageBus.Subscribe(topics, messageErrors)
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
}

go func() {
for {
select {
case <-ctx.Done():
lc.Infof("Exiting waiting for MessageBus '%s' topic messages", systemEventTopic)
return
case err = <-messageErrors:
lc.Error(err.Error())
case msgEnvelope := <-messages:
lc.Debugf("System event received on message queue. Topic: %s, Correlation-id: %s ", systemEventTopic, msgEnvelope.CorrelationID)

var systemEvent dtos.SystemEvent
err := json.Unmarshal(msgEnvelope.Payload, &systemEvent)
if err != nil {
lc.Errorf("failed to JSON decoding system event: %s", err.Error())
continue
}

serviceName := container.DeviceServiceFrom(dic.Get).Name
if systemEvent.Owner != serviceName {
lc.Errorf("unmatched system event owner %s with service name %s", systemEvent.Owner, serviceName)
continue
}

if systemEvent.Type != common.DeviceSystemEventType {
lc.Errorf("unknown system event type %s", systemEvent.Type)
continue
}

var device dtos.Device
err = systemEvent.DecodeDetails(&device)
if err != nil {
lc.Errorf("failed to decode system event details: %s", err.Error())
continue
}

switch systemEvent.Action {
case common.DeviceSystemEventActionAdd:
err = application.AddDevice(requests.NewAddDeviceRequest(device), dic)
if err != nil {
lc.Error(err.Error(), common.CorrelationHeader, msgEnvelope.CorrelationID)
}
case common.DeviceSystemEventActionUpdate:
deviceModel := dtos.ToDeviceModel(device)
updateDeviceDTO := dtos.FromDeviceModelToUpdateDTO(deviceModel)
err = application.UpdateDevice(requests.NewUpdateDeviceRequest(updateDeviceDTO), dic)
if err != nil {
lc.Error(err.Error(), common.CorrelationHeader, msgEnvelope.CorrelationID)
}
case common.DeviceSystemEventActionDelete:
err = application.DeleteDevice(device.Name, dic)
if err != nil {
lc.Error(err.Error(), common.CorrelationHeader, msgEnvelope.CorrelationID)
}
default:
lc.Errorf("unknown device system event action %s", systemEvent.Action)
}
}
}
}()

return nil
}
12 changes: 10 additions & 2 deletions pkg/service/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2020-2022 IOTech Ltd
// Copyright (C) 2020-2023 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand Down Expand Up @@ -112,12 +112,20 @@ func messageBusBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startup
if !handlers.MessagingBootstrapHandler(ctx, wg, startupTimer, dic) {
return false
}

lc := bootstrapContainer.LoggingClientFrom(dic.Get)

err := messaging.SubscribeCommands(ctx, dic)
if err != nil {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
lc.Errorf("Failed to subscribe internal command request: %v", err)
return false
}

err = messaging.DeviceCallback(ctx, dic)
if err != nil {
lc.Errorf("Failed to subscribe Metadata system event: %v", err)
return false
}
}

return true
Expand Down

0 comments on commit 96b3ac1

Please sign in to comment.