Skip to content

Commit

Permalink
Merge pull request #1616 from jinlinGuan/issue-1611
Browse files Browse the repository at this point in the history
feat: Publish System Events for device discovery and profile scan progress
  • Loading branch information
cloudxxx8 authored Aug 27, 2024
2 parents 160aeea + 6d5cd89 commit 6fbfd52
Show file tree
Hide file tree
Showing 17 changed files with 513 additions and 61 deletions.
2 changes: 1 addition & 1 deletion example/cmd/device-simple/res/configuration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ SimpleCustom:
OnImageLocation: ./res/on.png
OffImageLocation: ./res/off.jpg
Writable:
DiscoverSleepDurationSecs: 10
DiscoverSleepDurationSecs: 3
4 changes: 2 additions & 2 deletions example/config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func (scc *SimpleCustomConfig) Validate() error {
return errors.New("SimpleCustom.OffImageLocation configuration setting can not be blank")
}

if scc.Writable.DiscoverSleepDurationSecs < 10 {
return errors.New("SimpleCustom.Writable.DiscoverSleepDurationSecs configuration setting must be 10 or greater")
if scc.Writable.DiscoverSleepDurationSecs <= 0 {
return errors.New("SimpleCustom.Writable.DiscoverSleepDurationSecs configuration setting must be greater than 0")
}

return nil
Expand Down
8 changes: 7 additions & 1 deletion example/driver/simpledriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,12 @@ func (s *SimpleDriver) Discover() error {
Labels: []string{"auto-discovery"},
}

res := []sdkModels.DiscoveredDevice{device2, device3}
res := []sdkModels.DiscoveredDevice{device2}
time.Sleep(time.Duration(s.serviceConfig.SimpleCustom.Writable.DiscoverSleepDurationSecs) * time.Second)
s.sdk.PublishDeviceDiscoveryProgressSystemEvent(50, len(res), "")

time.Sleep(time.Duration(s.serviceConfig.SimpleCustom.Writable.DiscoverSleepDurationSecs) * time.Second)
res = append(res, device3)
s.deviceCh <- res
return nil
}
Expand Down Expand Up @@ -351,5 +354,8 @@ func (s *SimpleDriver) ValidateDevice(device models.Device) error {
}

func (s *SimpleDriver) ProfileScan(payload sdkModels.ProfileScanRequest) (models.DeviceProfile, error) {
time.Sleep(time.Duration(s.serviceConfig.SimpleCustom.Writable.DiscoverSleepDurationSecs) * time.Second)
s.sdk.PublishProfileScanProgressSystemEvent(payload.RequestId, 50, "")
time.Sleep(time.Duration(s.serviceConfig.SimpleCustom.Writable.DiscoverSleepDurationSecs) * time.Second)
return models.DeviceProfile{Name: payload.ProfileName}, nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/panjf2000/ants/v2 v2.10.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/stretchr/testify v1.9.0
golang.org/x/net v0.27.0
gopkg.in/yaml.v3 v3.0.1
)

Expand Down Expand Up @@ -127,7 +128,6 @@ require (
go.opentelemetry.io/otel/trace v1.28.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.22.0 // indirect
Expand Down
27 changes: 21 additions & 6 deletions internal/application/profilescan.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ package application

import (
"context"
"fmt"
"sync"

"github.com/edgexfoundry/device-sdk-go/v3/internal/controller/http/correlation"
"github.com/edgexfoundry/device-sdk-go/v3/internal/utils"
"github.com/edgexfoundry/device-sdk-go/v3/pkg/interfaces"
sdkModels "github.com/edgexfoundry/device-sdk-go/v3/pkg/models"
bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container"
Expand All @@ -25,7 +28,7 @@ type profileScanLocker struct {

var locker = profileScanLocker{busyMap: make(map[string]bool)}

func ProfileScanWrapper(busy chan bool, ps interfaces.ProfileScan, req sdkModels.ProfileScanRequest, correlationId string, dic *di.Container) {
func ProfileScanWrapper(busy chan bool, ps interfaces.ProfileScan, req sdkModels.ProfileScanRequest, ctx context.Context, dic *di.Container) {
locker.mux.Lock()
b := locker.busyMap[req.DeviceName]
busy <- b
Expand All @@ -39,28 +42,40 @@ func ProfileScanWrapper(busy chan bool, ps interfaces.ProfileScan, req sdkModels
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
dpc := bootstrapContainer.DeviceProfileClientFrom(dic.Get)
dc := bootstrapContainer.DeviceClientFrom(dic.Get)
ctx := context.WithValue(context.Background(), common.CorrelationHeader, correlationId) //nolint: staticcheck
if correlation.IdFromContext(ctx) != req.RequestId {
// ensure the correlation id matches the request id.
ctx = context.WithValue(ctx, common.CorrelationHeader, req.RequestId) //nolint: staticcheck
}

lc.Debugf("profile scan triggered with device name '%s' and profile name '%s', with Correlation Id '%s'", req.DeviceName, req.ProfileName, correlationId)
utils.PublishProfileScanProgressSystemEvent(req.RequestId, 0, "", ctx, dic)
lc.Debugf("profile scan triggered with device name '%s' and profile name '%s', Correlation Id: %s", req.DeviceName, req.ProfileName, req.RequestId)
profile, err := ps.ProfileScan(req)
if err != nil {
lc.Errorf("failed to trigger profile scan: %v, with Correlation Id '%s'", err.Error(), correlationId)
errMsg := fmt.Sprintf("failed to trigger profile scan: %v, Correlation Id: %s", err.Error(), req.RequestId)
utils.PublishProfileScanProgressSystemEvent(req.RequestId, -1, errMsg, ctx, dic)
lc.Error(errMsg)
releaseLock(req.DeviceName)
return
}
// Add profile to metadata
profileReq := requests.NewDeviceProfileRequest(dtos.FromDeviceProfileModelToDTO(profile))
_, err = dpc.Add(ctx, []requests.DeviceProfileRequest{profileReq})
if err != nil {
lc.Errorf("failed to add device profile '%s': %v, with Correlation Id '%s'", profile.Name, err, correlationId)
errMsg := fmt.Sprintf("failed to add device profile '%s': %v, Correlation Id: %s", profile.Name, err, req.RequestId)
utils.PublishProfileScanProgressSystemEvent(req.RequestId, -1, errMsg, ctx, dic)
lc.Error(errMsg)
releaseLock(req.DeviceName)
return
}
// Update device
deviceReq := requests.NewUpdateDeviceRequest(dtos.UpdateDevice{Name: &req.DeviceName, ProfileName: &profile.Name})
_, err = dc.Update(ctx, []requests.UpdateDeviceRequest{deviceReq})
if err != nil {
lc.Errorf("failed to update device '%s' with profile '%s': %v, with Correlation Id '%s'", req.DeviceName, profile.Name, err, correlationId)
errMsg := fmt.Sprintf("failed to update device '%s' with profile '%s': %v, Correlation Id: %s", req.DeviceName, profile.Name, err, req.RequestId)
utils.PublishProfileScanProgressSystemEvent(req.RequestId, -1, errMsg, ctx, dic)
lc.Error(errMsg)
} else {
utils.PublishProfileScanProgressSystemEvent(req.RequestId, 100, "", ctx, dic)
}

// ReleaseLock
Expand Down
6 changes: 3 additions & 3 deletions internal/autodiscovery/autodiscovery.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2020-2023 IOTech Ltd
// Copyright (C) 2020-2024 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand Down Expand Up @@ -47,13 +47,13 @@ func BootstrapHandler(
defer wg.Done()

lc.Infof("Starting auto-discovery with duration %v", duration)
DiscoveryWrapper(driver, lc)
DiscoveryWrapper(driver, ctx, dic)
for {
select {
case <-ctx.Done():
return
case <-time.After(duration):
DiscoveryWrapper(driver, lc)
DiscoveryWrapper(driver, ctx, dic)
}
}
}()
Expand Down
41 changes: 36 additions & 5 deletions internal/autodiscovery/discovery.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2020-2023 IOTech Ltd
// Copyright (C) 2020-2024 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package autodiscovery

import (
"fmt"
"sync"

"github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger"
"github.com/google/uuid"
"golang.org/x/net/context"

"github.com/edgexfoundry/device-sdk-go/v3/internal/container"
"github.com/edgexfoundry/device-sdk-go/v3/internal/controller/http/correlation"
"github.com/edgexfoundry/device-sdk-go/v3/internal/utils"
"github.com/edgexfoundry/device-sdk-go/v3/pkg/interfaces"
bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v3/di"
"github.com/edgexfoundry/go-mod-core-contracts/v3/common"
)

type discoveryLocker struct {
Expand All @@ -21,7 +29,8 @@ type discoveryLocker struct {

var locker discoveryLocker

func DiscoveryWrapper(driver interfaces.ProtocolDriver, lc logger.LoggingClient) {
func DiscoveryWrapper(driver interfaces.ProtocolDriver, ctx context.Context, dic *di.Container) {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
locker.mux.Lock()
if locker.busy {
lc.Info("another device discovery process is currently running")
Expand All @@ -31,14 +40,36 @@ func DiscoveryWrapper(driver interfaces.ProtocolDriver, lc logger.LoggingClient)
locker.busy = true
locker.mux.Unlock()

lc.Debug("protocol discovery triggered")
requestId := correlation.IdFromContext(ctx)
if len(requestId) == 0 {
requestId = uuid.NewString()
ctx = context.WithValue(ctx, common.CorrelationHeader, requestId) // nolint: staticcheck
lc.Debugf("device discovery correlation id is empty, set it to %s", requestId)
}
dic.Update(di.ServiceConstructorMap{
container.DiscoveryRequestIdName: func(get di.Get) any {
return requestId
},
})

utils.PublishDeviceDiscoveryProgressSystemEvent(requestId, 0, 0, "", ctx, dic)
lc.Debugf("protocol discovery triggered with correlation id: %s", requestId)
err := driver.Discover()
if err != nil {
lc.Error("failed to trigger protocol discovery", err.Error())
errMsg := fmt.Sprintf("failed to trigger protocol discovery with correlation id: %s, err: %s", requestId, err.Error())
utils.PublishDeviceDiscoveryProgressSystemEvent(requestId, -1, 0, errMsg, ctx, dic)
lc.Error(errMsg)
} else {
utils.PublishDeviceDiscoveryProgressSystemEvent(requestId, 100, 0, "", ctx, dic)
}

// ReleaseLock
locker.mux.Lock()
locker.busy = false
dic.Update(di.ServiceConstructorMap{
container.DiscoveryRequestIdName: func(get di.Get) any {
return ""
},
})
locker.mux.Unlock()
}
5 changes: 5 additions & 0 deletions internal/common/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ const (
SDKReservedPrefix = "ds-"
)

const (
SystemEventActionDiscovery = "discovery"
SystemEventActionProfileScan = "profilescan"
)

// SDKVersion indicates the version of the SDK - will be overwritten by build
var SDKVersion string = "0.0.0"

Expand Down
8 changes: 8 additions & 0 deletions internal/container/deviceservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,11 @@ func ProfileScanFrom(get di.Get) interfaces.ProfileScan {
}
return nil
}

// DiscoveryRequestIdName contains the name of discovery request id implementation in the DIC.
var DiscoveryRequestIdName = di.TypeInstanceToName(new(string))

// DiscoveryRequestIdFrom helper function queries the DIC and returns discovery request id.
func DiscoveryRequestIdFrom(get di.Get) string {
return get(DiscoveryRequestIdName).(string)
}
38 changes: 25 additions & 13 deletions internal/controller/http/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package http

import (
"context"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -45,13 +46,15 @@ func (c *RestController) Discovery(e echo.Context) error {

driver := container.ProtocolDriverFrom(c.dic.Get)

correlationId := correlation.IdFromContext(request.Context())
// Use correlation id as request id since there is no request body
requestId := correlation.IdFromContext(request.Context())
go func() {
c.lc.Info("Discovery triggered.", common.CorrelationHeader, correlationId)
go autodiscovery.DiscoveryWrapper(driver, c.lc)
c.lc.Info("Discovery end.", common.CorrelationHeader, correlationId)
c.lc.Infof("Discovery triggered. Correlation Id: %s", requestId)
autodiscovery.DiscoveryWrapper(driver, request.Context(), c.dic)
c.lc.Infof("Discovery end. Correlation Id: %s", requestId)
}()
response := commonDTO.NewBaseResponse("", "Trigger discovery with correlationId "+correlationId, http.StatusAccepted)

response := commonDTO.NewBaseResponse(requestId, "Device Discovery is triggered.", http.StatusAccepted)
return c.sendResponse(writer, request, common.ApiDiscoveryRoute, response, http.StatusAccepted)
}

Expand All @@ -74,29 +77,28 @@ func (c *RestController) ProfileScan(e echo.Context) error {
return c.sendEdgexError(writer, request, edgexErr, common.ApiProfileScan)
}

req, edgexErr := profileScanValidation(body, c.dic)
req, edgexErr := profileScanValidation(body, request.Context(), c.dic)
if edgexErr != nil {
return c.sendEdgexError(writer, request, edgexErr, common.ApiProfileScan)
}

correlationId := correlation.IdFromContext(request.Context())
busy := make(chan bool)
go func() {
c.lc.Info("Profile scanning is triggered.", common.CorrelationHeader, correlationId)
application.ProfileScanWrapper(busy, ps, req, correlationId, c.dic)
c.lc.Info("Profile scanning is end.", common.CorrelationHeader, correlationId)
c.lc.Infof("Profile scanning is triggered. Correlation Id: %s", req.RequestId)
application.ProfileScanWrapper(busy, ps, req, request.Context(), c.dic)
c.lc.Infof("Profile scanning is end. Correlation Id: %s", req.RequestId)
}()
b := <-busy
if b {
edgexErr := errors.NewCommonEdgeX(errors.KindStatusConflict, fmt.Sprintf("Another profile scan process for %s is currently running", req.DeviceName), nil)
return c.sendEdgexError(writer, request, edgexErr, common.ApiProfileScan)
}

response := commonDTO.NewBaseResponse("", "Trigger profile scan with correlationId "+correlationId, http.StatusAccepted)
response := commonDTO.NewBaseResponse(req.RequestId, "Device ProfileScan is triggered.", http.StatusAccepted)
return c.sendResponse(writer, request, common.ApiProfileScan, response, http.StatusAccepted)
}

func profileScanValidation(request []byte, dic *di.Container) (sdkModels.ProfileScanRequest, errors.EdgeX) {
func profileScanValidation(request []byte, ctx context.Context, dic *di.Container) (sdkModels.ProfileScanRequest, errors.EdgeX) {
var r sdkModels.ProfileScanRequest
// check device service AdminState
ds := container.DeviceServiceFrom(dic.Get)
Expand All @@ -118,15 +120,25 @@ func profileScanValidation(request []byte, dic *di.Container) (sdkModels.Profile
}

// check profile should not exist
if req.ProfileName != "" {
if len(req.ProfileName) > 0 {
if _, exist := cache.Profiles().ForName(req.ProfileName); exist {
return r, errors.NewCommonEdgeX(errors.KindStatusConflict, fmt.Sprintf("profile name %s is duplicated", req.ProfileName), nil)
}
} else {
req.ProfileName = fmt.Sprintf("%s_profile_%d", req.DeviceName, time.Now().UnixMilli())
}

requestId := req.RequestId
if len(requestId) == 0 {
// Use correlation id as request id if request id is not provided
requestId = correlation.IdFromContext(ctx)
}

r = sdkModels.ProfileScanRequest{
BaseRequest: commonDTO.BaseRequest{
Versionable: commonDTO.NewVersionable(),
RequestId: requestId,
},
DeviceName: req.DeviceName,
ProfileName: req.ProfileName,
Options: req.Options,
Expand Down
Loading

0 comments on commit 6fbfd52

Please sign in to comment.