diff --git a/example/cmd/device-simple/res/configuration.toml b/example/cmd/device-simple/res/configuration.toml index 92ba129fc..4e30ccae3 100644 --- a/example/cmd/device-simple/res/configuration.toml +++ b/example/cmd/device-simple/res/configuration.toml @@ -85,6 +85,7 @@ PublishTopicPrefix = "edgex/events/device" # ///// will be added to this publish topic prefix + SystemEventTopic = "edgex/system-events/core-metadata/device/#/device-simple/#" # Example SecretStore configuration. # Only used when EDGEX_SECURITY_SECRET_STORE=true diff --git a/internal/controller/http/callback.go b/internal/controller/http/callback.go index e7bc0d4ee..130d6f3dc 100644 --- a/internal/controller/http/callback.go +++ b/internal/controller/http/callback.go @@ -1,6 +1,6 @@ // -*- Mode: Go; indent-tabs-mode: t -*- // -// Copyright (C) 2020-2021 IOTech Ltd +// Copyright (C) 2020-2023 IOTech Ltd // // SPDX-License-Identifier: Apache-2.0 @@ -19,61 +19,6 @@ import ( "github.com/edgexfoundry/device-sdk-go/v3/internal/application" ) -func (c *RestController) DeleteDevice(writer http.ResponseWriter, request *http.Request) { - vars := mux.Vars(request) - name := vars[common.Name] - - err := application.DeleteDevice(name, c.dic) - if err == nil { - res := commonDTO.NewBaseResponse("", "", http.StatusOK) - c.sendResponse(writer, request, common.ApiDeviceCallbackNameRoute, res, http.StatusOK) - } else { - c.sendEdgexError(writer, request, err, common.ApiDeviceCallbackNameRoute) - } -} - -func (c *RestController) AddDevice(writer http.ResponseWriter, request *http.Request) { - defer request.Body.Close() - - var addDeviceRequest requests.AddDeviceRequest - - err := json.NewDecoder(request.Body).Decode(&addDeviceRequest) - if err != nil { - edgexErr := errors.NewCommonEdgeX(errors.KindServerError, "failed to decode JSON", err) - c.sendEdgexError(writer, request, edgexErr, common.ApiDeviceCallbackRoute) - return - } - - edgexErr := application.AddDevice(addDeviceRequest, c.dic) - if edgexErr == nil { - res := commonDTO.NewBaseResponse(addDeviceRequest.RequestId, "", http.StatusOK) - c.sendResponse(writer, request, common.ApiDeviceCallbackRoute, res, http.StatusOK) - } else { - c.sendEdgexError(writer, request, edgexErr, common.ApiDeviceCallbackRoute) - } -} - -func (c *RestController) UpdateDevice(writer http.ResponseWriter, request *http.Request) { - defer request.Body.Close() - - var updateDeviceRequest requests.UpdateDeviceRequest - - err := json.NewDecoder(request.Body).Decode(&updateDeviceRequest) - if err != nil { - edgexErr := errors.NewCommonEdgeX(errors.KindServerError, "failed to decode JSON", err) - c.sendEdgexError(writer, request, edgexErr, common.ApiDeviceCallbackRoute) - return - } - - edgexErr := application.UpdateDevice(updateDeviceRequest, c.dic) - if edgexErr == nil { - res := commonDTO.NewBaseResponse(updateDeviceRequest.RequestId, "", http.StatusOK) - c.sendResponse(writer, request, common.ApiDeviceCallbackRoute, res, http.StatusOK) - } else { - c.sendEdgexError(writer, request, edgexErr, common.ApiDeviceCallbackRoute) - } -} - func (c *RestController) UpdateProfile(writer http.ResponseWriter, request *http.Request) { defer request.Body.Close() diff --git a/internal/controller/http/restrouter.go b/internal/controller/http/restrouter.go index 3f91ffaea..4aee88de9 100644 --- a/internal/controller/http/restrouter.go +++ b/internal/controller/http/restrouter.go @@ -1,7 +1,7 @@ // -*- Mode: Go; indent-tabs-mode: t -*- // // Copyright (C) 2017-2018 Canonical Ltd -// Copyright (C) 2018-2022 IOTech Ltd +// Copyright (C) 2018-2023 IOTech Ltd // Copyright (c) 2019 Intel Corporation // // SPDX-License-Identifier: Apache-2.0 @@ -68,9 +68,6 @@ func (c *RestController) InitRestRoutes() { c.addReservedRoute(common.ApiDeviceNameCommandNameRoute, c.GetCommand).Methods(http.MethodGet) c.addReservedRoute(common.ApiDeviceNameCommandNameRoute, c.SetCommand).Methods(http.MethodPut) // callback - c.addReservedRoute(common.ApiDeviceCallbackRoute, c.AddDevice).Methods(http.MethodPost) - c.addReservedRoute(common.ApiDeviceCallbackRoute, c.UpdateDevice).Methods(http.MethodPut) - c.addReservedRoute(common.ApiDeviceCallbackNameRoute, c.DeleteDevice).Methods(http.MethodDelete) c.addReservedRoute(common.ApiProfileCallbackRoute, c.UpdateProfile).Methods(http.MethodPut) c.addReservedRoute(common.ApiWatcherCallbackRoute, c.AddProvisionWatcher).Methods(http.MethodPost) c.addReservedRoute(common.ApiWatcherCallbackRoute, c.UpdateProvisionWatcher).Methods(http.MethodPut) diff --git a/internal/controller/messaging/callback.go b/internal/controller/messaging/callback.go new file mode 100644 index 000000000..6ffde79d7 --- /dev/null +++ b/internal/controller/messaging/callback.go @@ -0,0 +1,108 @@ +// +// Copyright (C) 2023 IOTech Ltd +// +// SPDX-License-Identifier: Apache-2.0 + +package messaging + +import ( + "context" + "encoding/json" + + bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container" + "github.com/edgexfoundry/go-mod-bootstrap/v3/di" + "github.com/edgexfoundry/go-mod-core-contracts/v3/common" + "github.com/edgexfoundry/go-mod-core-contracts/v3/dtos" + "github.com/edgexfoundry/go-mod-core-contracts/v3/dtos/requests" + "github.com/edgexfoundry/go-mod-core-contracts/v3/errors" + "github.com/edgexfoundry/go-mod-messaging/v3/pkg/types" + + "github.com/edgexfoundry/device-sdk-go/v3/internal/application" + "github.com/edgexfoundry/device-sdk-go/v3/internal/container" +) + +const SystemEventTopic = "SystemEventTopic" + +func DeviceCallback(ctx context.Context, dic *di.Container) errors.EdgeX { + lc := bootstrapContainer.LoggingClientFrom(dic.Get) + messageBusInfo := container.ConfigurationFrom(dic.Get).MessageQueue + systemEventTopic := messageBusInfo.Topics[SystemEventTopic] + + messages := make(chan types.MessageEnvelope) + messageErrors := make(chan error) + topics := []types.TopicChannel{ + { + Topic: systemEventTopic, + Messages: messages, + }, + } + + messageBus := bootstrapContainer.MessagingClientFrom(dic.Get) + err := messageBus.Subscribe(topics, messageErrors) + if err != nil { + return errors.NewCommonEdgeXWrapper(err) + } + + go func() { + for { + select { + case <-ctx.Done(): + lc.Infof("Exiting waiting for MessageBus '%s' topic messages", systemEventTopic) + return + case err = <-messageErrors: + lc.Error(err.Error()) + case msgEnvelope := <-messages: + lc.Debugf("System event received on message queue. Topic: %s, Correlation-id: %s ", systemEventTopic, msgEnvelope.CorrelationID) + + var systemEvent dtos.SystemEvent + err := json.Unmarshal(msgEnvelope.Payload, &systemEvent) + if err != nil { + lc.Errorf("failed to JSON decoding system event: %s", err.Error()) + continue + } + + serviceName := container.DeviceServiceFrom(dic.Get).Name + if systemEvent.Owner != serviceName { + lc.Errorf("unmatched system event owner %s with service name %s", systemEvent.Owner, serviceName) + continue + } + + if systemEvent.Type != common.DeviceSystemEventType { + lc.Errorf("unknown system event type %s", systemEvent.Type) + continue + } + + var device dtos.Device + err = systemEvent.DecodeDetails(&device) + if err != nil { + lc.Errorf("failed to decode system event details: %s", err.Error()) + continue + } + + switch systemEvent.Action { + case common.DeviceSystemEventActionAdd: + err = application.AddDevice(requests.NewAddDeviceRequest(device), dic) + if err != nil { + lc.Error(err.Error(), common.CorrelationHeader, msgEnvelope.CorrelationID) + } + case common.DeviceSystemEventActionUpdate: + deviceModel := dtos.ToDeviceModel(device) + updateDeviceDTO := dtos.FromDeviceModelToUpdateDTO(deviceModel) + err = application.UpdateDevice(requests.NewUpdateDeviceRequest(updateDeviceDTO), dic) + if err != nil { + lc.Error(err.Error(), common.CorrelationHeader, msgEnvelope.CorrelationID) + } + case common.DeviceSystemEventActionDelete: + err = application.DeleteDevice(device.Name, dic) + if err != nil { + lc.Error(err.Error(), common.CorrelationHeader, msgEnvelope.CorrelationID) + } + default: + lc.Errorf("unknown device system event action %s", systemEvent.Action) + } + } + } + }() + + return nil +} diff --git a/pkg/service/main.go b/pkg/service/main.go index 8972faf51..e6881736b 100644 --- a/pkg/service/main.go +++ b/pkg/service/main.go @@ -1,6 +1,6 @@ // -*- Mode: Go; indent-tabs-mode: t -*- // -// Copyright (C) 2020-2022 IOTech Ltd +// Copyright (C) 2020-2023 IOTech Ltd // // SPDX-License-Identifier: Apache-2.0 @@ -112,12 +112,20 @@ func messageBusBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startup if !handlers.MessagingBootstrapHandler(ctx, wg, startupTimer, dic) { return false } + + lc := bootstrapContainer.LoggingClientFrom(dic.Get) + err := messaging.SubscribeCommands(ctx, dic) if err != nil { - lc := bootstrapContainer.LoggingClientFrom(dic.Get) lc.Errorf("Failed to subscribe internal command request: %v", err) return false } + + err = messaging.DeviceCallback(ctx, dic) + if err != nil { + lc.Errorf("Failed to subscribe Metadata system event: %v", err) + return false + } } return true