Skip to content

Commit

Permalink
feat: Add ability to chain HTTP exports for multiple destinations
Browse files Browse the repository at this point in the history
HTTP export now has option to return the input data and optionally continue pipeline on send errors.
This enables the ability to chain multiple HTTP exports in the pipeline to send the same data to multiple destinations.
Configurable pipeline now supports multiple instances of same function to be configured.
Configuration names that start with the actual function name now match.  i.e. `HttpExport` matches `HttpExport` and `HttpExport2` also matches `HttpExport`.

closes #256

Signed-off-by: lenny <leonard.goodell@intel.com>
  • Loading branch information
lenny committed May 28, 2021
1 parent c799dd1 commit 8ac2140
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 83 deletions.
26 changes: 26 additions & 0 deletions internal/app/configupdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ package app

import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -174,3 +177,26 @@ func (svc *Service) stopStoreForward() {
svc.ctx.storeForwardCancelCtx()
svc.ctx.storeForwardWg.Wait()
}

func (svc *Service) findMatchingFunction(configurable reflect.Value, functionName string) (reflect.Value, reflect.Type, error) {
var functionValue reflect.Value
count := configurable.Type().NumMethod()

for index := 0; index < count; index++ {
method := configurable.Type().Method(index)
// If the target configuration function name starts with actual method name then it is a match
if strings.Index(functionName, method.Name) == 0 {
functionValue = configurable.MethodByName(method.Name)
break
}
}

if functionValue.Kind() == reflect.Invalid {
return functionValue, nil, fmt.Errorf("function %s is not a built in SDK function", functionName)
} else if functionValue.IsNil() {
return functionValue, nil, fmt.Errorf("invalid/missing configuration for %s", functionName)
}

functionType := functionValue.Type()
return functionValue, functionType, nil
}
48 changes: 41 additions & 7 deletions internal/app/configurable.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ const (
ExportMethodPut = "put"
MimeType = "mimetype"
PersistOnError = "persistonerror"
ContinueOnSendError = "continueonsenderror"
ReturnInputData = "returninputdata"
SkipVerify = "skipverify"
Qos = "qos"
Retain = "retain"
Expand Down Expand Up @@ -76,13 +78,15 @@ const (
)

type postPutParameters struct {
method string
url string
mimeType string
persistOnError bool
headerName string
secretPath string
secretName string
method string
url string
mimeType string
persistOnError bool
continueOnSendError bool
returnInputData bool
headerName string
secretPath string
secretName string
}

// Configurable contains the helper functions that return the function pointers for building the configurable function pipeline.
Expand Down Expand Up @@ -642,6 +646,36 @@ func (app *Configurable) processHttpExportParameters(
}
}

// ContinueOnSendError is optional and is false by default.
result.continueOnSendError = false
value, ok = parameters[ContinueOnSendError]
if ok {
var err error
result.continueOnSendError, err = strconv.ParseBool(value)
if err != nil {
return nil,
fmt.Errorf("HTTPExport Could not parse '%s' to a bool for '%s' parameter: %s",
value,
ContinueOnSendError,
err.Error())
}
}

// ReturnInputData is optional and is false by default.
result.returnInputData = false
value, ok = parameters[ReturnInputData]
if ok {
var err error
result.returnInputData, err = strconv.ParseBool(value)
if err != nil {
return nil,
fmt.Errorf("HTTPExport Could not parse '%s' to a bool for '%s' parameter: %s",
value,
ReturnInputData,
err.Error())
}
}

result.url = strings.TrimSpace(result.url)
result.mimeType = strings.TrimSpace(result.mimeType)
result.headerName = strings.TrimSpace(parameters[HeaderName])
Expand Down
75 changes: 46 additions & 29 deletions internal/app/configurable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,41 +157,50 @@ func TestHTTPExport(t *testing.T) {
testMimeType := clients.ContentTypeJSON
testPersistOnError := "false"
testBadPersistOnError := "bogus"
testContinueOnSendError := "true"
testBadContinueOnSendError := "bogus"
testReturnInputData := "true"
testBadReturnInputData := "bogus"

testHeaderName := "My-Header"
testSecretPath := "/path"
testSecretName := "header"

tests := []struct {
Name string
Method string
Url *string
MimeType *string
PersistOnError *string
HeaderName *string
SecretPath *string
SecretName *string
ExpectValid bool
Name string
Method string
Url *string
MimeType *string
PersistOnError *string
ContinueOnSendError *string
ReturnInputData *string
HeaderName *string
SecretPath *string
SecretName *string
ExpectValid bool
}{
{"Valid Post - ony required params", ExportMethodPost, &testUrl, &testMimeType, nil, nil, nil, nil, true},
{"Valid Post - w/o secrets", http.MethodPost, &testUrl, &testMimeType, &testPersistOnError, nil, nil, nil, true},
{"Valid Post - with secrets", ExportMethodPost, &testUrl, &testMimeType, nil, &testHeaderName, &testSecretPath, &testSecretName, true},
{"Valid Post - with all params", ExportMethodPost, &testUrl, &testMimeType, &testPersistOnError, &testHeaderName, &testSecretPath, &testSecretName, true},
{"Invalid Post - no url", ExportMethodPost, nil, &testMimeType, nil, nil, nil, nil, false},
{"Invalid Post - no mimeType", ExportMethodPost, &testUrl, nil, nil, nil, nil, nil, false},
{"Invalid Post - bad persistOnError", ExportMethodPost, &testUrl, &testMimeType, &testBadPersistOnError, nil, nil, nil, false},
{"Invalid Post - missing headerName", ExportMethodPost, &testUrl, &testMimeType, &testPersistOnError, nil, &testSecretPath, &testSecretName, false},
{"Invalid Post - missing secretPath", ExportMethodPost, &testUrl, &testMimeType, &testPersistOnError, &testHeaderName, nil, &testSecretName, false},
{"Invalid Post - missing secretName", ExportMethodPost, &testUrl, &testMimeType, &testPersistOnError, &testHeaderName, &testSecretPath, nil, false},
{"Valid Put - ony required params", ExportMethodPut, &testUrl, &testMimeType, nil, nil, nil, nil, true},
{"Valid Put - w/o secrets", ExportMethodPut, &testUrl, &testMimeType, &testPersistOnError, nil, nil, nil, true},
{"Valid Put - with secrets", http.MethodPut, &testUrl, &testMimeType, nil, &testHeaderName, &testSecretPath, &testSecretName, true},
{"Valid Put - with all params", ExportMethodPut, &testUrl, &testMimeType, &testPersistOnError, &testHeaderName, &testSecretPath, &testSecretName, true},
{"Invalid Put - no url", ExportMethodPut, nil, &testMimeType, nil, nil, nil, nil, false},
{"Invalid Put - no mimeType", ExportMethodPut, &testUrl, nil, nil, nil, nil, nil, false},
{"Invalid Put - bad persistOnError", ExportMethodPut, &testUrl, &testMimeType, &testBadPersistOnError, nil, nil, nil, false},
{"Invalid Put - missing headerName", ExportMethodPut, &testUrl, &testMimeType, &testPersistOnError, nil, &testSecretPath, &testSecretName, false},
{"Invalid Put - missing secretPath", ExportMethodPut, &testUrl, &testMimeType, &testPersistOnError, &testHeaderName, nil, &testSecretName, false},
{"Invalid Put - missing secretName", ExportMethodPut, &testUrl, &testMimeType, &testPersistOnError, &testHeaderName, &testSecretPath, nil, false},
{"Valid Post - ony required params", ExportMethodPost, &testUrl, &testMimeType, nil, nil, nil, nil, nil, nil, true},
{"Valid Post - w/o secrets", http.MethodPost, &testUrl, &testMimeType, &testPersistOnError, nil, nil, nil, nil, nil, true},
{"Valid Post - with secrets", ExportMethodPost, &testUrl, &testMimeType, nil, nil, nil, &testHeaderName, &testSecretPath, &testSecretName, true},
{"Valid Post - with all params", ExportMethodPost, &testUrl, &testMimeType, &testPersistOnError, &testContinueOnSendError, &testReturnInputData, &testHeaderName, &testSecretPath, &testSecretName, true},
{"Invalid Post - no url", ExportMethodPost, nil, &testMimeType, nil, nil, nil, nil, nil, nil, false},
{"Invalid Post - no mimeType", ExportMethodPost, &testUrl, nil, nil, nil, nil, nil, nil, nil, false},
{"Invalid Post - bad persistOnError", ExportMethodPost, &testUrl, &testMimeType, &testBadPersistOnError, nil, nil, nil, nil, nil, false},
{"Invalid Post - missing headerName", ExportMethodPost, &testUrl, &testMimeType, &testPersistOnError, nil, nil, nil, &testSecretPath, &testSecretName, false},
{"Invalid Post - missing secretPath", ExportMethodPost, &testUrl, &testMimeType, &testPersistOnError, nil, nil, &testHeaderName, nil, &testSecretName, false},
{"Invalid Post - missing secretName", ExportMethodPost, &testUrl, &testMimeType, &testPersistOnError, nil, nil, &testHeaderName, &testSecretPath, nil, false},
{"Valid Put - ony required params", ExportMethodPut, &testUrl, &testMimeType, nil, nil, nil, nil, nil, nil, true},
{"Valid Put - w/o secrets", ExportMethodPut, &testUrl, &testMimeType, &testPersistOnError, nil, nil, nil, nil, nil, true},
{"Valid Put - with secrets", http.MethodPut, &testUrl, &testMimeType, nil, nil, nil, &testHeaderName, &testSecretPath, &testSecretName, true},
{"Valid Put - with all params", ExportMethodPut, &testUrl, &testMimeType, &testPersistOnError, nil, nil, &testHeaderName, &testSecretPath, &testSecretName, true},
{"Invalid Put - no url", ExportMethodPut, nil, &testMimeType, nil, nil, nil, nil, nil, nil, false},
{"Invalid Put - no mimeType", ExportMethodPut, &testUrl, nil, nil, nil, nil, nil, nil, nil, false},
{"Invalid Put - bad persistOnError", ExportMethodPut, &testUrl, &testMimeType, &testBadPersistOnError, nil, nil, nil, nil, nil, false},
{"Invalid Put - bad continueOnSendError", ExportMethodPut, &testUrl, &testMimeType, nil, &testBadContinueOnSendError, nil, nil, nil, nil, false},
{"Invalid Put - bad returnInputData", ExportMethodPut, &testUrl, &testMimeType, nil, nil, &testBadReturnInputData, nil, nil, nil, false},
{"Invalid Put - missing headerName", ExportMethodPut, &testUrl, &testMimeType, &testPersistOnError, nil, nil, nil, &testSecretPath, &testSecretName, false},
{"Invalid Put - missing secretPath", ExportMethodPut, &testUrl, &testMimeType, &testPersistOnError, nil, nil, &testHeaderName, nil, &testSecretName, false},
{"Invalid Put - missing secretName", ExportMethodPut, &testUrl, &testMimeType, &testPersistOnError, nil, nil, &testHeaderName, &testSecretPath, nil, false},
}

for _, test := range tests {
Expand All @@ -211,6 +220,14 @@ func TestHTTPExport(t *testing.T) {
params[PersistOnError] = *test.PersistOnError
}

if test.ContinueOnSendError != nil {
params[ContinueOnSendError] = *test.ContinueOnSendError
}

if test.ReturnInputData != nil {
params[ReturnInputData] = *test.ReturnInputData
}

if test.HeaderName != nil {
params[HeaderName] = *test.HeaderName
}
Expand Down
18 changes: 7 additions & 11 deletions internal/app/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,7 @@ func (svc *Service) LoadConfigurablePipeline() ([]interfaces.AppFunction, error)
svc.targetType = &[]byte{}
}

configurable := NewConfigurable(svc.lc)

valueOfType := reflect.ValueOf(configurable)
configurable := reflect.ValueOf(NewConfigurable(svc.lc))
pipelineConfig := svc.config.Writable.Pipeline
executionOrder := util.DeleteEmptyAndTrim(strings.FieldsFunc(pipelineConfig.ExecutionOrder, util.SplitComma))

Expand All @@ -247,23 +245,21 @@ func (svc *Service) LoadConfigurablePipeline() ([]interfaces.AppFunction, error)
return nil, fmt.Errorf("function '%s' configuration not found in Pipeline.Functions section", functionName)
}

result := valueOfType.MethodByName(functionName)
if result.Kind() == reflect.Invalid {
return nil, fmt.Errorf("function %s is not a built in SDK function", functionName)
} else if result.IsNil() {
return nil, fmt.Errorf("invalid/missing configuration for %s", functionName)
functionValue, functionType, err := svc.findMatchingFunction(configurable, functionName)
if err != nil {
return nil, err
}

// determine number of parameters required for function call
inputParameters := make([]reflect.Value, result.Type().NumIn())
inputParameters := make([]reflect.Value, functionType.NumIn())
// set keys to be all lowercase to avoid casing issues from configuration
for key := range configuration.Parameters {
value := configuration.Parameters[key]
delete(configuration.Parameters, key) // Make sure the old key has been removed so don't have multiples
configuration.Parameters[strings.ToLower(key)] = value
}
for index := range inputParameters {
parameter := result.Type().In(index)
parameter := functionType.In(index)

switch parameter {
case reflect.TypeOf(map[string]string{}):
Expand All @@ -278,7 +274,7 @@ func (svc *Service) LoadConfigurablePipeline() ([]interfaces.AppFunction, error)
}
}

function, ok := result.Call(inputParameters)[0].Interface().(interfaces.AppFunction)
function, ok := functionValue.Call(inputParameters)[0].Interface().(interfaces.AppFunction)
if !ok {
return nil, fmt.Errorf("failed to cast function %s as AppFunction type", functionName)
}
Expand Down
35 changes: 35 additions & 0 deletions internal/app/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,3 +517,38 @@ func TestMakeItStop(t *testing.T) {
sdk.ctx.stop = nil
sdk.MakeItStop() //should avoid nil pointer
}

func TestFindMatchingFunction(t *testing.T) {
svc := Service{
lc: lc,
serviceKey: "MyAppService",
profileSuffixPlaceholder: interfaces.ProfileSuffixPlaceholder,
}

configurable := reflect.ValueOf(NewConfigurable(svc.lc))

tests := []struct {
Name string
FunctionName string
ExpectError bool
}{
{"valid exact match AddTags", "AddTags", false},
{"valid exact match HTTPExport", "HTTPExport", false},
{"valid starts with match AddTags", "AddTagsExtra", false},
{"valid starts with match HTTPExport", "HTTPExport2", false},
{"invalid no match", "Bogus", true},
{"invalid doesn't start with", "NextHTTPExport", true},
}

for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
_, _, err := svc.findMatchingFunction(configurable, test.FunctionName)
if test.ExpectError {
require.Error(t, err)
return
}

require.NoError(t, err)
})
}
}
Loading

0 comments on commit 8ac2140

Please sign in to comment.