diff --git a/internal/app/configupdates.go b/internal/app/configupdates.go index e046650d5..e342e32ee 100644 --- a/internal/app/configupdates.go +++ b/internal/app/configupdates.go @@ -17,6 +17,9 @@ package app import ( "context" + "fmt" + "reflect" + "strings" "sync" "time" @@ -174,3 +177,26 @@ func (svc *Service) stopStoreForward() { svc.ctx.storeForwardCancelCtx() svc.ctx.storeForwardWg.Wait() } + +func (svc *Service) findMatchingFunction(configurable reflect.Value, functionName string) (reflect.Value, reflect.Type, error) { + var functionValue reflect.Value + count := configurable.Type().NumMethod() + + for index := 0; index < count; index++ { + method := configurable.Type().Method(index) + // If the target configuration function name starts with actual method name then it is a match + if strings.Index(functionName, method.Name) == 0 { + functionValue = configurable.MethodByName(method.Name) + break + } + } + + if functionValue.Kind() == reflect.Invalid { + return functionValue, nil, fmt.Errorf("function %s is not a built in SDK function", functionName) + } else if functionValue.IsNil() { + return functionValue, nil, fmt.Errorf("invalid/missing configuration for %s", functionName) + } + + functionType := functionValue.Type() + return functionValue, functionType, nil +} diff --git a/internal/app/configurable.go b/internal/app/configurable.go index 342702511..691811322 100644 --- a/internal/app/configurable.go +++ b/internal/app/configurable.go @@ -42,6 +42,8 @@ const ( ExportMethodPut = "put" MimeType = "mimetype" PersistOnError = "persistonerror" + ContinueOnSendError = "continueonsenderror" + ReturnInputData = "returninputdata" SkipVerify = "skipverify" Qos = "qos" Retain = "retain" @@ -76,13 +78,15 @@ const ( ) type postPutParameters struct { - method string - url string - mimeType string - persistOnError bool - headerName string - secretPath string - secretName string + method string + url string + mimeType string + persistOnError bool + continueOnSendError bool + returnInputData bool + headerName string + secretPath string + secretName string } // Configurable contains the helper functions that return the function pointers for building the configurable function pipeline. @@ -642,6 +646,36 @@ func (app *Configurable) processHttpExportParameters( } } + // ContinueOnSendError is optional and is false by default. + result.continueOnSendError = false + value, ok = parameters[ContinueOnSendError] + if ok { + var err error + result.continueOnSendError, err = strconv.ParseBool(value) + if err != nil { + return nil, + fmt.Errorf("HTTPExport Could not parse '%s' to a bool for '%s' parameter: %s", + value, + ContinueOnSendError, + err.Error()) + } + } + + // ReturnInputData is optional and is false by default. + result.returnInputData = false + value, ok = parameters[ReturnInputData] + if ok { + var err error + result.returnInputData, err = strconv.ParseBool(value) + if err != nil { + return nil, + fmt.Errorf("HTTPExport Could not parse '%s' to a bool for '%s' parameter: %s", + value, + ReturnInputData, + err.Error()) + } + } + result.url = strings.TrimSpace(result.url) result.mimeType = strings.TrimSpace(result.mimeType) result.headerName = strings.TrimSpace(parameters[HeaderName]) diff --git a/internal/app/configurable_test.go b/internal/app/configurable_test.go index 6a577f338..457ce1a4e 100644 --- a/internal/app/configurable_test.go +++ b/internal/app/configurable_test.go @@ -157,41 +157,50 @@ func TestHTTPExport(t *testing.T) { testMimeType := clients.ContentTypeJSON testPersistOnError := "false" testBadPersistOnError := "bogus" + testContinueOnSendError := "true" + testBadContinueOnSendError := "bogus" + testReturnInputData := "true" + testBadReturnInputData := "bogus" + testHeaderName := "My-Header" testSecretPath := "/path" testSecretName := "header" tests := []struct { - Name string - Method string - Url *string - MimeType *string - PersistOnError *string - HeaderName *string - SecretPath *string - SecretName *string - ExpectValid bool + Name string + Method string + Url *string + MimeType *string + PersistOnError *string + ContinueOnSendError *string + ReturnInputData *string + HeaderName *string + SecretPath *string + SecretName *string + ExpectValid bool }{ - {"Valid Post - ony required params", ExportMethodPost, &testUrl, &testMimeType, nil, nil, nil, nil, true}, - {"Valid Post - w/o secrets", http.MethodPost, &testUrl, &testMimeType, &testPersistOnError, nil, nil, nil, true}, - {"Valid Post - with secrets", ExportMethodPost, &testUrl, &testMimeType, nil, &testHeaderName, &testSecretPath, &testSecretName, true}, - {"Valid Post - with all params", ExportMethodPost, &testUrl, &testMimeType, &testPersistOnError, &testHeaderName, &testSecretPath, &testSecretName, true}, - {"Invalid Post - no url", ExportMethodPost, nil, &testMimeType, nil, nil, nil, nil, false}, - {"Invalid Post - no mimeType", ExportMethodPost, &testUrl, nil, nil, nil, nil, nil, false}, - {"Invalid Post - bad persistOnError", ExportMethodPost, &testUrl, &testMimeType, &testBadPersistOnError, nil, nil, nil, false}, - {"Invalid Post - missing headerName", ExportMethodPost, &testUrl, &testMimeType, &testPersistOnError, nil, &testSecretPath, &testSecretName, false}, - {"Invalid Post - missing secretPath", ExportMethodPost, &testUrl, &testMimeType, &testPersistOnError, &testHeaderName, nil, &testSecretName, false}, - {"Invalid Post - missing secretName", ExportMethodPost, &testUrl, &testMimeType, &testPersistOnError, &testHeaderName, &testSecretPath, nil, false}, - {"Valid Put - ony required params", ExportMethodPut, &testUrl, &testMimeType, nil, nil, nil, nil, true}, - {"Valid Put - w/o secrets", ExportMethodPut, &testUrl, &testMimeType, &testPersistOnError, nil, nil, nil, true}, - {"Valid Put - with secrets", http.MethodPut, &testUrl, &testMimeType, nil, &testHeaderName, &testSecretPath, &testSecretName, true}, - {"Valid Put - with all params", ExportMethodPut, &testUrl, &testMimeType, &testPersistOnError, &testHeaderName, &testSecretPath, &testSecretName, true}, - {"Invalid Put - no url", ExportMethodPut, nil, &testMimeType, nil, nil, nil, nil, false}, - {"Invalid Put - no mimeType", ExportMethodPut, &testUrl, nil, nil, nil, nil, nil, false}, - {"Invalid Put - bad persistOnError", ExportMethodPut, &testUrl, &testMimeType, &testBadPersistOnError, nil, nil, nil, false}, - {"Invalid Put - missing headerName", ExportMethodPut, &testUrl, &testMimeType, &testPersistOnError, nil, &testSecretPath, &testSecretName, false}, - {"Invalid Put - missing secretPath", ExportMethodPut, &testUrl, &testMimeType, &testPersistOnError, &testHeaderName, nil, &testSecretName, false}, - {"Invalid Put - missing secretName", ExportMethodPut, &testUrl, &testMimeType, &testPersistOnError, &testHeaderName, &testSecretPath, nil, false}, + {"Valid Post - ony required params", ExportMethodPost, &testUrl, &testMimeType, nil, nil, nil, nil, nil, nil, true}, + {"Valid Post - w/o secrets", http.MethodPost, &testUrl, &testMimeType, &testPersistOnError, nil, nil, nil, nil, nil, true}, + {"Valid Post - with secrets", ExportMethodPost, &testUrl, &testMimeType, nil, nil, nil, &testHeaderName, &testSecretPath, &testSecretName, true}, + {"Valid Post - with all params", ExportMethodPost, &testUrl, &testMimeType, &testPersistOnError, &testContinueOnSendError, &testReturnInputData, &testHeaderName, &testSecretPath, &testSecretName, true}, + {"Invalid Post - no url", ExportMethodPost, nil, &testMimeType, nil, nil, nil, nil, nil, nil, false}, + {"Invalid Post - no mimeType", ExportMethodPost, &testUrl, nil, nil, nil, nil, nil, nil, nil, false}, + {"Invalid Post - bad persistOnError", ExportMethodPost, &testUrl, &testMimeType, &testBadPersistOnError, nil, nil, nil, nil, nil, false}, + {"Invalid Post - missing headerName", ExportMethodPost, &testUrl, &testMimeType, &testPersistOnError, nil, nil, nil, &testSecretPath, &testSecretName, false}, + {"Invalid Post - missing secretPath", ExportMethodPost, &testUrl, &testMimeType, &testPersistOnError, nil, nil, &testHeaderName, nil, &testSecretName, false}, + {"Invalid Post - missing secretName", ExportMethodPost, &testUrl, &testMimeType, &testPersistOnError, nil, nil, &testHeaderName, &testSecretPath, nil, false}, + {"Valid Put - ony required params", ExportMethodPut, &testUrl, &testMimeType, nil, nil, nil, nil, nil, nil, true}, + {"Valid Put - w/o secrets", ExportMethodPut, &testUrl, &testMimeType, &testPersistOnError, nil, nil, nil, nil, nil, true}, + {"Valid Put - with secrets", http.MethodPut, &testUrl, &testMimeType, nil, nil, nil, &testHeaderName, &testSecretPath, &testSecretName, true}, + {"Valid Put - with all params", ExportMethodPut, &testUrl, &testMimeType, &testPersistOnError, nil, nil, &testHeaderName, &testSecretPath, &testSecretName, true}, + {"Invalid Put - no url", ExportMethodPut, nil, &testMimeType, nil, nil, nil, nil, nil, nil, false}, + {"Invalid Put - no mimeType", ExportMethodPut, &testUrl, nil, nil, nil, nil, nil, nil, nil, false}, + {"Invalid Put - bad persistOnError", ExportMethodPut, &testUrl, &testMimeType, &testBadPersistOnError, nil, nil, nil, nil, nil, false}, + {"Invalid Put - bad continueOnSendError", ExportMethodPut, &testUrl, &testMimeType, nil, &testBadContinueOnSendError, nil, nil, nil, nil, false}, + {"Invalid Put - bad returnInputData", ExportMethodPut, &testUrl, &testMimeType, nil, nil, &testBadReturnInputData, nil, nil, nil, false}, + {"Invalid Put - missing headerName", ExportMethodPut, &testUrl, &testMimeType, &testPersistOnError, nil, nil, nil, &testSecretPath, &testSecretName, false}, + {"Invalid Put - missing secretPath", ExportMethodPut, &testUrl, &testMimeType, &testPersistOnError, nil, nil, &testHeaderName, nil, &testSecretName, false}, + {"Invalid Put - missing secretName", ExportMethodPut, &testUrl, &testMimeType, &testPersistOnError, nil, nil, &testHeaderName, &testSecretPath, nil, false}, } for _, test := range tests { @@ -211,6 +220,14 @@ func TestHTTPExport(t *testing.T) { params[PersistOnError] = *test.PersistOnError } + if test.ContinueOnSendError != nil { + params[ContinueOnSendError] = *test.ContinueOnSendError + } + + if test.ReturnInputData != nil { + params[ReturnInputData] = *test.ReturnInputData + } + if test.HeaderName != nil { params[HeaderName] = *test.HeaderName } diff --git a/internal/app/service.go b/internal/app/service.go index bacdfc64e..36dad9899 100644 --- a/internal/app/service.go +++ b/internal/app/service.go @@ -227,9 +227,7 @@ func (svc *Service) LoadConfigurablePipeline() ([]interfaces.AppFunction, error) svc.targetType = &[]byte{} } - configurable := NewConfigurable(svc.lc) - - valueOfType := reflect.ValueOf(configurable) + configurable := reflect.ValueOf(NewConfigurable(svc.lc)) pipelineConfig := svc.config.Writable.Pipeline executionOrder := util.DeleteEmptyAndTrim(strings.FieldsFunc(pipelineConfig.ExecutionOrder, util.SplitComma)) @@ -247,15 +245,13 @@ func (svc *Service) LoadConfigurablePipeline() ([]interfaces.AppFunction, error) return nil, fmt.Errorf("function '%s' configuration not found in Pipeline.Functions section", functionName) } - result := valueOfType.MethodByName(functionName) - if result.Kind() == reflect.Invalid { - return nil, fmt.Errorf("function %s is not a built in SDK function", functionName) - } else if result.IsNil() { - return nil, fmt.Errorf("invalid/missing configuration for %s", functionName) + functionValue, functionType, err := svc.findMatchingFunction(configurable, functionName) + if err != nil { + return nil, err } // determine number of parameters required for function call - inputParameters := make([]reflect.Value, result.Type().NumIn()) + inputParameters := make([]reflect.Value, functionType.NumIn()) // set keys to be all lowercase to avoid casing issues from configuration for key := range configuration.Parameters { value := configuration.Parameters[key] @@ -263,7 +259,7 @@ func (svc *Service) LoadConfigurablePipeline() ([]interfaces.AppFunction, error) configuration.Parameters[strings.ToLower(key)] = value } for index := range inputParameters { - parameter := result.Type().In(index) + parameter := functionType.In(index) switch parameter { case reflect.TypeOf(map[string]string{}): @@ -278,7 +274,7 @@ func (svc *Service) LoadConfigurablePipeline() ([]interfaces.AppFunction, error) } } - function, ok := result.Call(inputParameters)[0].Interface().(interfaces.AppFunction) + function, ok := functionValue.Call(inputParameters)[0].Interface().(interfaces.AppFunction) if !ok { return nil, fmt.Errorf("failed to cast function %s as AppFunction type", functionName) } diff --git a/internal/app/service_test.go b/internal/app/service_test.go index b3b38f974..2f28b7e98 100644 --- a/internal/app/service_test.go +++ b/internal/app/service_test.go @@ -517,3 +517,38 @@ func TestMakeItStop(t *testing.T) { sdk.ctx.stop = nil sdk.MakeItStop() //should avoid nil pointer } + +func TestFindMatchingFunction(t *testing.T) { + svc := Service{ + lc: lc, + serviceKey: "MyAppService", + profileSuffixPlaceholder: interfaces.ProfileSuffixPlaceholder, + } + + configurable := reflect.ValueOf(NewConfigurable(svc.lc)) + + tests := []struct { + Name string + FunctionName string + ExpectError bool + }{ + {"valid exact match AddTags", "AddTags", false}, + {"valid exact match HTTPExport", "HTTPExport", false}, + {"valid starts with match AddTags", "AddTagsExtra", false}, + {"valid starts with match HTTPExport", "HTTPExport2", false}, + {"invalid no match", "Bogus", true}, + {"invalid doesn't start with", "NextHTTPExport", true}, + } + + for _, test := range tests { + t.Run(test.Name, func(t *testing.T) { + _, _, err := svc.findMatchingFunction(configurable, test.FunctionName) + if test.ExpectError { + require.Error(t, err) + return + } + + require.NoError(t, err) + }) + } +} diff --git a/pkg/transforms/http.go b/pkg/transforms/http.go index f6fb6e0cf..24f061eda 100644 --- a/pkg/transforms/http.go +++ b/pkg/transforms/http.go @@ -25,18 +25,19 @@ import ( "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/interfaces" "github.com/edgexfoundry/app-functions-sdk-go/v2/pkg/util" - "github.com/edgexfoundry/go-mod-core-contracts/v2/clients" ) // HTTPSender ... type HTTPSender struct { - URL string - MimeType string - PersistOnError bool - HttpHeaderName string - SecretName string - SecretPath string + URL string + MimeType string + PersistOnError bool + ContinueOnSendError bool + ReturnInputData bool + HttpHeaderName string + SecretName string + SecretPath string } // NewHTTPSender creates, initializes and returns a new instance of HTTPSender @@ -77,11 +78,21 @@ func (sender HTTPSender) HTTPPut(ctx interfaces.AppFunctionContext, data interfa func (sender HTTPSender) httpSend(ctx interfaces.AppFunctionContext, data interface{}, method string) (bool, interface{}) { lc := ctx.LoggingClient() + lc.Debug("HTTP Exporting") + if data == nil { // We didn't receive a result return false, errors.New("No Data Received") } + if sender.PersistOnError && sender.ContinueOnSendError { + return false, errors.New("PersistOnError & ContinueOnSendError can not both be set to true for HTTP Export") + } + + if sender.ContinueOnSendError && !sender.ReturnInputData { + return false, errors.New("ContinueOnSendError can only be used in conjunction ReturnInputData for multiple HTTP Export") + } + if sender.MimeType == "" { sender.MimeType = "application/json" } @@ -118,30 +129,50 @@ func (sender HTTPSender) httpSend(ctx interfaces.AppFunctionContext, data interf req.Header.Set("Content-Type", sender.MimeType) - ctx.LoggingClient().Debug("POSTing data") + ctx.LoggingClient().Debugf("POSTing data to %s", sender.URL) + response, err := client.Do(req) - if err != nil { - sender.setRetryData(ctx, exportData) - return false, err - } - defer func() { _ = response.Body.Close() }() - ctx.LoggingClient().Debugf("Response: %s", response.Status) - ctx.LoggingClient().Debugf("Sent data: %s", string(exportData)) - bodyBytes, errReadingBody := ioutil.ReadAll(response.Body) - if errReadingBody != nil { - sender.setRetryData(ctx, exportData) - return false, errReadingBody + // Pipeline continues if we get a 2xx response, non-2xx response may stop pipeline + if err != nil || response.StatusCode < 200 || response.StatusCode >= 300 { + if err == nil { + err = fmt.Errorf("export failed with %d HTTP status code", response.StatusCode) + } else { + err = fmt.Errorf("export failed: %w", err) + } + + // If continuing on send error then can't be persisting on error since Store and Forward retries starting + // with the function that failed and stopped the execution of the pipeline. + if !sender.ContinueOnSendError { + sender.setRetryData(ctx, exportData) + return false, err + } + + // Continuing pipeline on error + // This is in support of sending to multiple export destinations by chaining export functions in the pipeline. + ctx.LoggingClient().Errorf("Continuing pipeline on error: %s", err.Error()) + + // Return the input data since must have some data for the next function to operate on. + return true, data } + ctx.LoggingClient().Debugf("Sent %s bytes of data. Response status is %s", len(exportData), response.Status) ctx.LoggingClient().Trace("Data exported", "Transport", "HTTP", clients.CorrelationHeader, ctx.CorrelationID) - // continues the pipeline if we get a 2xx response, stops pipeline if non-2xx response - if response.StatusCode < 200 || response.StatusCode >= 300 { + // This allows multiple HTTP Exports to be chained in the pipeline to send the same data to different destinations + // Don't need to read the response data since not going to return it so just return now. + if sender.ReturnInputData { + return true, data + } + + defer func() { _ = response.Body.Close() }() + responseData, errReadingBody := ioutil.ReadAll(response.Body) + if errReadingBody != nil { + // Can't have ContinueOnSendError=true when ReturnInputData=false, so no need to check for it here sender.setRetryData(ctx, exportData) - return false, fmt.Errorf("export failed with %d HTTP status code", response.StatusCode) + return false, errReadingBody } - return true, bodyBytes + return true, responseData } func (sender HTTPSender) determineIfUsingSecrets() (bool, error) { diff --git a/pkg/transforms/http_test.go b/pkg/transforms/http_test.go index 23047aa7b..97ab9734b 100644 --- a/pkg/transforms/http_test.go +++ b/pkg/transforms/http_test.go @@ -36,6 +36,7 @@ import ( const ( msgStr = "test message" path = "/some-path/foo" + path2 = "/some-path/foo2" badPath = "/some-path/bad" ) @@ -75,18 +76,27 @@ func TestHTTPPostPut(t *testing.T) { require.NoError(t, err) tests := []struct { - Name string - Path string - PersistOnFail bool - RetryDataSet bool - ExpectedMethod string + Name string + Path string + PersistOnFail bool + RetryDataSet bool + ReturnInputData bool + ContinueOnSendError bool + ExpectedContinueExecuting bool + ExpectedMethod string }{ - {"Successful POST", path, true, false, http.MethodPost}, - {"Failed POST no persist", badPath, false, false, http.MethodPost}, - {"Failed POST with persist", badPath, true, true, http.MethodPost}, - {"Successful PUT", path, false, false, http.MethodPut}, - {"Failed PUT no persist", badPath, false, false, http.MethodPut}, - {"Failed PUT with persist", badPath, true, true, http.MethodPut}, + {"Successful POST", path, true, false, false, false, true, http.MethodPost}, + {"Successful POST", path, true, false, false, false, true, http.MethodPost}, + {"Successful PUT", path, false, false, false, false, true, http.MethodPut}, + {"Failed POST no persist", badPath, false, false, false, false, false, http.MethodPost}, + {"Failed POST continue on error", badPath, false, false, true, true, true, http.MethodPost}, + {"Failed POST with persist", badPath, true, true, false, false, false, http.MethodPost}, + {"Failed PUT no persist", badPath, false, false, false, false, false, http.MethodPut}, + {"Failed PUT with persist", badPath, true, true, false, false, false, http.MethodPut}, + {"Successful return inputData", path, false, false, true, false, true, http.MethodPost}, + {"Failed with persist and ReturnInputData", badPath, true, true, true, false, false, http.MethodPut}, + {"Failed ContinueOnSendError w/o ReturnInputData", path, false, false, false, true, false, ""}, + {"Failed ContinueOnSendError with PersistOnFail", path, true, false, true, true, false, ""}, } for _, test := range tests { @@ -94,13 +104,25 @@ func TestHTTPPostPut(t *testing.T) { context.SetRetryData(nil) methodUsed = "" sender := NewHTTPSender(`http://`+targetUrl.Host+test.Path, "", test.PersistOnFail) + sender.ReturnInputData = test.ReturnInputData + sender.ContinueOnSendError = test.ContinueOnSendError + var continueExecuting bool + var resultData interface{} if test.ExpectedMethod == http.MethodPost { - sender.HTTPPost(context, msgStr) + continueExecuting, resultData = sender.HTTPPost(context, msgStr) } else { - sender.HTTPPut(context, msgStr) + continueExecuting, resultData = sender.HTTPPut(context, msgStr) } + assert.Equal(t, test.ExpectedContinueExecuting, continueExecuting) + if test.ExpectedContinueExecuting { + if test.ReturnInputData { + assert.Equal(t, msgStr, resultData) + } else { + assert.NotEqual(t, msgStr, resultData) + } + } assert.Equal(t, test.RetryDataSet, context.RetryData() != nil) assert.Equal(t, test.ExpectedMethod, methodUsed) })