Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(data): Make Core Data publish events to <TopicPrefix>/<DeviceProfile>/<DeviceName> #3002

Merged
merged 4 commits into from
Jan 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/core-data/res/configuration.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Host = '*'
Port = 5563
Type = 'zero'
Topic = 'events'
PublishTopicPrefix = 'edgex/events' # /<device-profile-name>/<device-name> will be added to this Publish Topic prefix
[MessageQueue.Optional]
# Default MQTT Specific options that need to be here to enable evnironment variable overrides of them
# Client Identifiers
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/edgexfoundry/go-mod-bootstrap v0.0.70
github.com/edgexfoundry/go-mod-configuration v0.0.8
github.com/edgexfoundry/go-mod-core-contracts v0.1.144
github.com/edgexfoundry/go-mod-core-contracts v0.1.145
github.com/edgexfoundry/go-mod-messaging v0.1.30
github.com/edgexfoundry/go-mod-registry v0.1.27
github.com/edgexfoundry/go-mod-secrets v0.0.32
Expand Down
4 changes: 4 additions & 0 deletions internal/core/data/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ type MessageQueueInfo struct {
// Indicates the message queue platform being used.
Type string
// Indicates the topic the data is published/subscribed
// TODO this configuration shall be removed once v1 API is deprecated.
Topic string
lenny-goodell marked this conversation as resolved.
Show resolved Hide resolved
// Indicates the topic prefix the data is published to. Note that /<device-profile-name>/<device-name> will be
// added to this Publish Topic prefix as the complete publish topic
PublishTopicPrefix string
// Provides additional configuration properties which do not fit within the existing field.
// Typically the key is the name of the configuration property and the value is a string representation of the
// desired value for the configuration property.
Expand Down
50 changes: 29 additions & 21 deletions internal/core/data/v2/application/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,43 @@ import (
"github.com/edgexfoundry/go-mod-core-contracts/clients"
"github.com/edgexfoundry/go-mod-core-contracts/errors"
"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
dto "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/requests"
"github.com/edgexfoundry/go-mod-core-contracts/v2/models"

msgTypes "github.com/edgexfoundry/go-mod-messaging/pkg/types"

"github.com/google/uuid"
)

// ValidateEvent validates if e is a valid event with corresponding device profile name and device name
// ValidateEvent throws error when profileName or deviceName doesn't match to e
func ValidateEvent(e models.Event, profileName string, deviceName string, ctx context.Context, dic *di.Container) errors.EdgeX {
if e.ProfileName != profileName {
return errors.NewCommonEdgeX(errors.KindContractInvalid, fmt.Sprintf("event's profileName %s mismatches %s", e.ProfileName, profileName), nil)
}
if e.DeviceName != deviceName {
return errors.NewCommonEdgeX(errors.KindContractInvalid, fmt.Sprintf("event's deviceName %s mismatches %s", e.DeviceName, deviceName), nil)
}
return checkDevice(e.DeviceName, ctx, dic)
}

// The AddEvent function accepts the new event model from the controller functions
// and invokes addEvent function in the infrastructure layer
func AddEvent(e models.Event, ctx context.Context, dic *di.Container) (id string, err errors.EdgeX) {
func AddEvent(e models.Event, profileName string, deviceName string, ctx context.Context, dic *di.Container) (err errors.EdgeX) {
configuration := dataContainer.ConfigurationFrom(dic.Get)
if !configuration.Writable.PersistData {
return nil
}

dbClient := v2DataContainer.DBClientFrom(dic.Get)
lc := container.LoggingClientFrom(dic.Get)

err = checkDevice(e.DeviceName, ctx, dic)
if err != nil {
return "", errors.NewCommonEdgeXWrapper(err)
}

// Add the event and readings to the database
if configuration.Writable.PersistData {
lenny-goodell marked this conversation as resolved.
Show resolved Hide resolved
correlationId := correlation.FromContext(ctx)
addedEvent, err := dbClient.AddEvent(e)
if err != nil {
return "", errors.NewCommonEdgeXWrapper(err)
return errors.NewCommonEdgeXWrapper(err)
}
e = addedEvent

Expand All @@ -54,15 +66,11 @@ func AddEvent(e models.Event, ctx context.Context, dic *di.Container) (id string
))
}

//convert Event model to Event DTO
eventDTO := dtos.FromEventModelToDTO(e)
putEventOnQueue(eventDTO, ctx, dic) // Push event DTO to message bus for App Services to consume

return e.Id, nil
return nil
}

// Put event DTO on the message queue to be processed by the rules engine
func putEventOnQueue(evt dtos.Event, ctx context.Context, dic *di.Container) {
// PublishEvent publishes incoming AddEventRequest through MessageClient
func PublishEvent(addEventReq dto.AddEventRequest, profileName string, deviceName string, ctx context.Context, dic *di.Container) {
lc := container.LoggingClientFrom(dic.Get)
msgClient := dataContainer.MessagingClientFrom(dic.Get)
configuration := dataContainer.ConfigurationFrom(dic.Get)
Expand All @@ -77,21 +85,21 @@ func putEventOnQueue(evt dtos.Event, ctx context.Context, dic *di.Container) {
ctx = context.WithValue(ctx, clients.ContentType, clients.ContentTypeJSON)
}

data, err = json.Marshal(evt)
data, err = json.Marshal(addEventReq)
if err != nil {
lc.Error(fmt.Sprintf("error marshaling V2 Event DTO: %+v", evt), clients.CorrelationHeader, correlationId)
lc.Error(fmt.Sprintf("error marshaling V2 AddEventRequest DTO: %+v", addEventReq), clients.CorrelationHeader, correlationId)
return
}

publishTopic := fmt.Sprintf("%s/%s/%s", configuration.MessageQueue.PublishTopicPrefix, profileName, deviceName)
msgEnvelope := msgTypes.NewMessageEnvelope(data, ctx)
err = msgClient.Publish(msgEnvelope, configuration.MessageQueue.Topic)
err = msgClient.Publish(msgEnvelope, publishTopic)
if err != nil {
lc.Error(fmt.Sprintf("Unable to send message for V2 API event. Correlation-id: %s, Device Name: %s, Error: %v",
correlationId, evt.DeviceName, err))
lc.Error(fmt.Sprintf("Unable to send message for V2 API event. Correlation-id: %s, Profile Name: %s, "+
"Device Name: %s, Error: %v", correlationId, profileName, deviceName, err))
} else {
lc.Debug(fmt.Sprintf(
"Event Published on message queue. Topic: %s, Correlation-id: %s ",
configuration.MessageQueue.Topic, correlationId))
"V2 API Event Published on message queue. Topic: %s, Correlation-id: %s ", publishTopic, correlationId))
}
}

Expand Down
73 changes: 63 additions & 10 deletions internal/core/data/v2/application/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
const (
testDeviceResourceName = "TestDeviceResource"
testDeviceName = "TestDevice"
testProfileName = "TestProfile"
testUUIDString = "ca93c8fa-9919-4ec5-85d3-f81b2b6a7bc1"
testCreatedTime = 1600666214495
testOriginTime = 1600666185705354000
Expand Down Expand Up @@ -130,20 +131,68 @@ func newMockDB(persist bool) *dbMock.DBClient {
return myMock
}

func TestValidateEvent(t *testing.T) {
evt := models.Event{
Id: testUUIDString,
DeviceName: testDeviceName,
ProfileName: testProfileName,
Origin: testOriginTime,
Readings: buildReadings(),
}

tests := []struct {
Name string
event models.Event
profileName string
deviceName string
errorExpected bool
}{
{"Valid - profileName/deviceName matches", persistedEvent, testProfileName, testDeviceName, false},
{"Invalid - empty profile name", persistedEvent, "", testDeviceName, true},
{"Invalid - inconsistent profile name", persistedEvent, "inconsistent", testDeviceName, true},
{"Invalid - empty device name", persistedEvent, testProfileName, "", true},
{"Invalid - inconsistent profile name", persistedEvent, testProfileName, "inconsistent", true},
}

for _, testCase := range tests {
t.Run(testCase.Name, func(t *testing.T) {
dbClientMock := newMockDB(true)

dic := mocks.NewMockDIC()
dic.Update(di.ServiceConstructorMap{
v2DataContainer.DBClientInterfaceName: func(get di.Get) interface{} {
return dbClientMock
},
})
err := ValidateEvent(evt, testCase.profileName, testCase.deviceName, context.Background(), dic)

if testCase.errorExpected {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}

func TestAddEvent(t *testing.T) {
evt := models.Event{
Id: testUUIDString,
DeviceName: testDeviceName,
Origin: testOriginTime,
Readings: buildReadings(),
Id: testUUIDString,
DeviceName: testDeviceName,
ProfileName: testProfileName,
Origin: testOriginTime,
Readings: buildReadings(),
}

tests := []struct {
Name string
Persistence bool
Name string
Persistence bool
profileName string
deviceName string
errorExpected bool
}{
{Name: "Add Event with persistence", Persistence: true},
{Name: "Add Event without persistence", Persistence: false},
{"Valid - Add Event with persistence", true, testProfileName, testDeviceName, false},
{"Valid - Add Event without persistence", false, testProfileName, testDeviceName, false},
}

for _, testCase := range tests {
Expand All @@ -163,9 +212,13 @@ func TestAddEvent(t *testing.T) {
return dbClientMock
},
})
_, err := AddEvent(evt, context.Background(), dic)
err := AddEvent(evt, testCase.profileName, testCase.deviceName, context.Background(), dic)

require.NoError(t, err)
if testCase.errorExpected {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}

if !testCase.Persistence {
// assert there is no db client function called
Expand Down
61 changes: 30 additions & 31 deletions internal/core/data/v2/controller/http/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,50 +49,49 @@ func (ec *EventController) AddEvent(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
correlationId := correlation.FromContext(ctx)

reader := io.NewEventRequestReader()
addEventReqDTOs, err := reader.ReadAddEventRequest(r.Body)
// URL parameters
vars := mux.Vars(r)
profileName := vars[v2.ProfileName]
deviceName := vars[v2.DeviceName]

addEventReqDTO, err := ec.reader.ReadAddEventRequest(r.Body)
if err != nil {
lc.Error(err.Error(), clients.CorrelationHeader, correlationId)
lc.Debug(err.DebugMessages(), clients.CorrelationHeader, correlationId)
errResponses := commonDTO.NewBaseResponse(
"",
err.Message(),
err.Code())
errResponses := commonDTO.NewBaseResponse("", err.Message(), err.Code())
utils.WriteHttpHeader(w, ctx, err.Code())
// encode and send out the response
pkg.Encode(errResponses, w, lc)
return
}
events := requestDTO.AddEventReqToEventModels(addEventReqDTOs)

// map Event models to AddEventResponse DTOs
var addResponses []interface{}
for i, e := range events {
newId, err := application.AddEvent(e, ctx, ec.dic)
var addEventResponse interface{}
// get the requestID from AddEventRequestDTO
reqId := addEventReqDTOs[i].RequestId
var addEventResponse interface{}
var statusCode int

if err != nil {
lc.Error(err.Error(), clients.CorrelationHeader, correlationId)
lc.Debug(err.DebugMessages(), clients.CorrelationHeader, correlationId)
addEventResponse = commonDTO.NewBaseResponse(
reqId,
err.Message(),
err.Code())
} else {
addEventResponse = commonDTO.NewBaseWithIdResponse(
reqId,
"",
http.StatusCreated,
newId)
}
addResponses = append(addResponses, addEventResponse)
event := requestDTO.AddEventReqToEventModel(addEventReqDTO)
err = application.ValidateEvent(event, profileName, deviceName, ctx, ec.dic)
if err == nil {
err = application.AddEvent(event, profileName, deviceName, ctx, ec.dic)
}

utils.WriteHttpHeader(w, ctx, http.StatusMultiStatus)
if err != nil {
lc.Error(err.Error(), clients.CorrelationHeader, correlationId)
lc.Debug(err.DebugMessages(), clients.CorrelationHeader, correlationId)
addEventResponse = commonDTO.NewBaseResponse(addEventReqDTO.RequestId, err.Message(), err.Code())
statusCode = err.Code()
} else {
addEventResponse = commonDTO.NewBaseWithIdResponse(
addEventReqDTO.RequestId,
"",
http.StatusCreated,
event.Id)
statusCode = http.StatusCreated
application.PublishEvent(addEventReqDTO, profileName, deviceName, ctx, ec.dic)
}

utils.WriteHttpHeader(w, ctx, statusCode)
// encode and send out the response
pkg.Encode(addResponses, w, lc)
pkg.Encode(addEventResponse, w, lc)
}

func (ec *EventController) EventById(w http.ResponseWriter, r *http.Request) {
Expand Down
Loading