From 066e5e58cad8be8b92b1353b0f8e9bea985ae3f9 Mon Sep 17 00:00:00 2001 From: jinhoonbang Date: Wed, 18 Sep 2024 17:37:41 -0700 Subject: [PATCH 01/16] implement HTTP target capability and connector handler --- core/capabilities/webapi/target/capability.go | 166 ++++++++++++ .../webapi/target/connector_handler.go | 100 ++++++++ .../capabilities/webapi/target/target_test.go | 240 ++++++++++++++++++ core/capabilities/webapi/target/types.go | 30 +++ core/services/gateway/api/message.go | 6 +- core/services/gateway/connector/connector.go | 39 ++- .../connector/mocks/gateway_connector.go | 47 ++++ core/services/gateway/handler_factory.go | 7 +- .../handlers/webcapabilities/handler.go | 6 + .../handlers/webcapabilities/webapi.go | 18 ++ .../services/standardcapabilities/delegate.go | 28 ++ 11 files changed, 682 insertions(+), 5 deletions(-) create mode 100644 core/capabilities/webapi/target/capability.go create mode 100644 core/capabilities/webapi/target/connector_handler.go create mode 100644 core/capabilities/webapi/target/target_test.go create mode 100644 core/capabilities/webapi/target/types.go create mode 100644 core/services/gateway/handlers/webcapabilities/handler.go create mode 100644 core/services/gateway/handlers/webcapabilities/webapi.go diff --git a/core/capabilities/webapi/target/capability.go b/core/capabilities/webapi/target/capability.go new file mode 100644 index 00000000000..eb3bc261453 --- /dev/null +++ b/core/capabilities/webapi/target/capability.go @@ -0,0 +1,166 @@ +package target + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" + "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/validation" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/webcapabilities" +) + +const ID = "web-api-target@1.0.0" + +var _ capabilities.TargetCapability = &Capability{} + +var capabilityInfo = capabilities.MustNewCapabilityInfo( + ID, + capabilities.CapabilityTypeTarget, + "A target that sends HTTP requests to external clients via the Chainlink Gateway.", +) + +type Capability struct { + capabilityInfo capabilities.CapabilityInfo + connectorHandler *ConnectorHandler + lggr logger.Logger + registry core.CapabilitiesRegistry + config Config + activeWorkflows map[string]struct{} // tracks registered workflows +} + +func NewCapability(config Config, registry core.CapabilitiesRegistry, connectorHandler *ConnectorHandler, lggr logger.Logger) (*Capability, error) { + return &Capability{ + capabilityInfo: capabilityInfo, + config: config, + registry: registry, + connectorHandler: connectorHandler, + activeWorkflows: make(map[string]struct{}), + lggr: lggr, + }, nil +} + +func (c *Capability) Start(ctx context.Context) error { + return c.registry.Add(ctx, c) +} + +func (c *Capability) Close() error { + return nil +} + +func (c *Capability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) { + return capabilityInfo, nil +} + +func getMessageID(req capabilities.CapabilityRequest) (string, error) { + if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowID); err != nil { + return "", fmt.Errorf("workflow ID is invalid: %w", err) + } + if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowExecutionID); err != nil { + return "", fmt.Errorf("workflow execution ID is invalid: %w", err) + } + messageID := []string{ + req.Metadata.WorkflowID, + req.Metadata.WorkflowExecutionID, + webcapabilities.MethodWebAPITarget, + } + return strings.Join(messageID, "/"), nil +} + +func (c *Capability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) { + c.lggr.Debugw("executing http target", "capabilityRequest", req) + + var input Input + err := req.Inputs.UnwrapTo(&input) + if err != nil { + return capabilities.CapabilityResponse{}, err + } + + var workflowCfg WorkflowConfig + err = req.Config.UnwrapTo(&workflowCfg) + if err != nil { + return capabilities.CapabilityResponse{}, err + } + + messageID, err := getMessageID(req) + if err != nil { + return capabilities.CapabilityResponse{}, err + } + + if _, ok := c.activeWorkflows[req.Metadata.WorkflowID]; !ok { + return capabilities.CapabilityResponse{}, fmt.Errorf("workflow is not registered: %v", req.Metadata.WorkflowID) + } + + payload := webcapabilities.TargetRequestPayload{ + URL: input.URL, + Method: input.Method, + Headers: input.Headers, + Body: []byte(input.Body), + TimeoutMs: workflowCfg.TimeoutMs, + RetryCount: workflowCfg.RetryCount, + } + + payloadBytes, err := json.Marshal(payload) + if err != nil { + return capabilities.CapabilityResponse{}, err + } + + gatewayReq := &api.MessageBody{ + MessageId: messageID, + Method: webcapabilities.MethodWebAPITarget, + Payload: payloadBytes, + } + + switch workflowCfg.Schedule { + case SingleNode: + // blocking call to handle single node request. waits for response from gateway + resp, err := c.connectorHandler.HandleSingleNodeRequest(ctx, gatewayReq) + if err != nil { + return capabilities.CapabilityResponse{}, err + } + c.lggr.Debugw("received gateway response", "resp", resp) + var payload webcapabilities.TargetResponsePayload + err = json.Unmarshal(resp.Body.Payload, &payload) + if err != nil { + return capabilities.CapabilityResponse{}, err + } + + // TODO: check target response format and fields + values, err := values.NewMap(map[string]any{ + "statusCode": payload.StatusCode, + "headers": payload.Headers, + "body": string(payload.Body), + }) + if err != nil { + return capabilities.CapabilityResponse{}, err + } + return capabilities.CapabilityResponse{ + Value: values, + }, nil + default: + return capabilities.CapabilityResponse{}, fmt.Errorf("unsupported schedule: %v", workflowCfg.Schedule) + } +} + +func (c *Capability) RegisterToWorkflow(ctx context.Context, req capabilities.RegisterToWorkflowRequest) error { + if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowID); err != nil { + return fmt.Errorf("workflow ID is invalid: %w", err) + } + c.activeWorkflows[req.Metadata.WorkflowID] = struct{}{} + return nil +} + +func (c *Capability) UnregisterFromWorkflow(ctx context.Context, req capabilities.UnregisterFromWorkflowRequest) error { + // TODO: is best-effort removal of workflow sufficient here? or should it error if workflow is not found for some reason? + if _, ok := c.activeWorkflows[req.Metadata.WorkflowID]; !ok { + c.lggr.Warnw("workflow not found", "workflowID", req.Metadata.WorkflowID) + } else { + delete(c.activeWorkflows, req.Metadata.WorkflowID) + } + return nil +} diff --git a/core/capabilities/webapi/target/connector_handler.go b/core/capabilities/webapi/target/connector_handler.go new file mode 100644 index 00000000000..797cfe1f315 --- /dev/null +++ b/core/capabilities/webapi/target/connector_handler.go @@ -0,0 +1,100 @@ +package target + +import ( + "context" + "encoding/json" + "sync" + + "github.com/pkg/errors" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/webcapabilities" +) + +var _ connector.GatewayConnectorHandler = &ConnectorHandler{} + +type ConnectorHandler struct { + gc connector.GatewayConnector + lggr logger.Logger + responseChs map[string]chan *api.Message + responseChsMu sync.Mutex + rateLimiter *common.RateLimiter +} + +func NewConnectorHandler(gc connector.GatewayConnector, config Config, lgger logger.Logger) (*ConnectorHandler, error) { + rateLimiter, err := common.NewRateLimiter(config.RateLimiter) + if err != nil { + return nil, err + } + responseChs := make(map[string]chan *api.Message) + return &ConnectorHandler{ + gc: gc, + responseChs: responseChs, + responseChsMu: sync.Mutex{}, + rateLimiter: rateLimiter, + lggr: lgger, + }, nil +} + +// HandleSingleNodeRequest sends a request to first available gateway node and blocks until response is received +// TODO: handle retries and timeouts +func (c *ConnectorHandler) HandleSingleNodeRequest(ctx context.Context, msg *api.MessageBody) (*api.Message, error) { + ch := make(chan *api.Message, 1) + c.responseChsMu.Lock() + c.responseChs[msg.MessageId] = ch + c.responseChsMu.Unlock() + l := logger.With(c.lggr, "messageId", msg.MessageId) + l.Debugw("sending request to gateway") + + err := c.gc.SendToAvailableGateway(ctx, msg) + if err != nil { + return nil, errors.Wrap(err, "failed to send request to gateway") + } + + select { + case resp := <-ch: + return resp, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +func (c *ConnectorHandler) HandleGatewayMessage(ctx context.Context, gatewayID string, msg *api.Message) { + body := &msg.Body + l := logger.With(c.lggr, "gatewayID", gatewayID, "method", body.Method, "messageID", msg.Body.MessageId) + if !c.rateLimiter.Allow(body.Sender) { + c.lggr.Errorw("request rate-limited") + return + } + l.Debugw("handling gateway request") + switch body.Method { + case webcapabilities.MethodWebAPITarget: + var payload webcapabilities.TargetResponsePayload + err := json.Unmarshal(body.Payload, &payload) + if err != nil { + l.Errorw("failed to unmarshal payload", "err", err) + return + } + c.responseChsMu.Lock() + defer c.responseChsMu.Unlock() + ch, ok := c.responseChs[body.MessageId] + if !ok { + l.Errorw("no response channel found") + return + } + ch <- msg + default: + l.Errorw("unsupported method") + } +} + +func (c *ConnectorHandler) Start(ctx context.Context) error { + return c.gc.AddHandler([]string{webcapabilities.MethodWebAPITarget}, c) +} + +func (c *ConnectorHandler) Close() error { + return nil +} diff --git a/core/capabilities/webapi/target/target_test.go b/core/capabilities/webapi/target/target_test.go new file mode 100644 index 00000000000..cccad6d54b9 --- /dev/null +++ b/core/capabilities/webapi/target/target_test.go @@ -0,0 +1,240 @@ +package target + +import ( + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + registrymock "github.com/smartcontractkit/chainlink-common/pkg/types/core/mocks" + "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" + gcmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector/mocks" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/webcapabilities" +) + +const ( + workflowID1 = "15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0" + workflowExecutionID1 = "95ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0abbadeed" + owner1 = "0x00000000000000000000000000000000000000aa" +) + +type testHarness struct { + registry *registrymock.CapabilitiesRegistry + connector *gcmocks.GatewayConnector + lggr logger.Logger + config Config + connectorHandler *ConnectorHandler + capability *Capability +} + +func setup(t *testing.T) testHarness { + registry := registrymock.NewCapabilitiesRegistry(t) + connector := gcmocks.NewGatewayConnector(t) + lggr := logger.Test(t) + config := Config{ + RateLimiter: common.RateLimiterConfig{ + GlobalRPS: 100.0, + GlobalBurst: 100, + PerSenderRPS: 100.0, + PerSenderBurst: 100, + }, + } + connectorHandler, err := NewConnectorHandler(connector, config, lggr) + require.NoError(t, err) + + capability, err := NewCapability(config, registry, connectorHandler, lggr) + require.NoError(t, err) + + return testHarness{ + registry: registry, + connector: connector, + lggr: lggr, + config: config, + connectorHandler: connectorHandler, + capability: capability, + } +} + +func capabilityRequest(t *testing.T) capabilities.CapabilityRequest { + data := map[string]string{ + "key": "value", + } + jsonData, err := json.Marshal(data) + require.NoError(t, err) + encoded := base64.StdEncoding.EncodeToString(jsonData) + targetInput := map[string]any{ + "url": "http://example.com", + "method": "POST", + "headers": map[string]string{"Content-Type": "application/json"}, + "body": encoded, + } + inputs, err := values.NewMap(targetInput) + require.NoError(t, err) + wfConfig, err := values.NewMap(map[string]interface{}{ + "timeoutMs": 1000, + "schedule": SingleNode, + }) + require.NoError(t, err) + + return capabilities.CapabilityRequest{ + Metadata: capabilities.RequestMetadata{ + WorkflowID: workflowID1, + WorkflowExecutionID: workflowExecutionID1, + }, + Inputs: inputs, + Config: wfConfig, + } +} + +func gatewayResponse(t *testing.T, msgID string) *api.Message { + headers := map[string]string{"Content-Type": "application/json"} + body := []byte("response body") + responsePayload, err := json.Marshal(webcapabilities.TargetResponsePayload{ + StatusCode: 200, + Headers: headers, + Body: body, + Success: true, + }) + require.NoError(t, err) + return &api.Message{ + Body: api.MessageBody{ + MessageId: msgID, + Method: webcapabilities.MethodWebAPITarget, + Payload: responsePayload, + }, + } +} + +func TestRegisterUnregister(t *testing.T) { + th := setup(t) + ctx := testutils.Context(t) + + regReq := capabilities.RegisterToWorkflowRequest{ + Metadata: capabilities.RegistrationMetadata{ + WorkflowID: workflowID1, + WorkflowOwner: owner1, + }, + } + err := th.capability.RegisterToWorkflow(ctx, regReq) + require.NoError(t, err) + + err = th.capability.UnregisterFromWorkflow(ctx, capabilities.UnregisterFromWorkflowRequest{ + Metadata: capabilities.RegistrationMetadata{ + WorkflowID: workflowID1, + WorkflowOwner: owner1, + }, + }) + require.NoError(t, err) +} + +func TestCapability_Execute(t *testing.T) { + th := setup(t) + ctx := testutils.Context(t) + + t.Run("unregistered workflow", func(t *testing.T) { + req := capabilityRequest(t) + _, err := th.capability.Execute(ctx, req) + require.Error(t, err) + require.Contains(t, err.Error(), "workflow is not registered") + }) + + t.Run("happy case", func(t *testing.T) { + regReq := capabilities.RegisterToWorkflowRequest{ + Metadata: capabilities.RegistrationMetadata{ + WorkflowID: workflowID1, + WorkflowOwner: owner1, + }, + } + err := th.capability.RegisterToWorkflow(ctx, regReq) + require.NoError(t, err) + + req := capabilityRequest(t) + msgID, err := getMessageID(req) + require.NoError(t, err) + + gatewayResp := gatewayResponse(t, msgID) + + th.connector.On("SendToAvailableGateway", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { + th.connectorHandler.HandleGatewayMessage(ctx, "gateway1", gatewayResp) + }).Once() + + resp, err := th.capability.Execute(ctx, req) + require.NoError(t, err) + var values map[string]any + err = resp.Value.UnwrapTo(&values) + require.NoError(t, err) + fmt.Printf("values %+v", values) + statusCode, ok := values["statusCode"].(int64) + require.True(t, ok) + require.Equal(t, int64(200), statusCode) + respBody, ok := values["body"].(string) + require.True(t, ok) + require.Equal(t, "response body", respBody) + }) + + t.Run("unsupported schedule", func(t *testing.T) { + regReq := capabilities.RegisterToWorkflowRequest{ + Metadata: capabilities.RegistrationMetadata{ + WorkflowID: workflowID1, + WorkflowOwner: owner1, + }, + } + err := th.capability.RegisterToWorkflow(ctx, regReq) + require.NoError(t, err) + + targetInput := map[string]any{ + "url": "http://example.com", + "method": "POST", + "headers": map[string]string{"Content-Type": "application/json"}, + } + inputs, err := values.NewMap(targetInput) + + require.NoError(t, err) + wfConfig, err := values.NewMap(map[string]interface{}{ + "timeoutMs": 1000, + "schedule": "invalid", + }) + require.NoError(t, err) + + req := capabilities.CapabilityRequest{ + Metadata: capabilities.RequestMetadata{ + WorkflowID: workflowID1, + WorkflowExecutionID: workflowExecutionID1, + }, + Inputs: inputs, + Config: wfConfig, + } + + _, err = th.capability.Execute(ctx, req) + require.Error(t, err) + require.Contains(t, err.Error(), "unsupported schedule") + }) + + t.Run("gateway connector error", func(t *testing.T) { + regReq := capabilities.RegisterToWorkflowRequest{ + Metadata: capabilities.RegistrationMetadata{ + WorkflowID: workflowID1, + WorkflowOwner: owner1, + }, + } + err := th.capability.RegisterToWorkflow(ctx, regReq) + require.NoError(t, err) + + req := capabilityRequest(t) + require.NoError(t, err) + + th.connector.On("SendToAvailableGateway", mock.Anything, mock.Anything).Return(errors.New("gateway error")).Once() + _, err = th.capability.Execute(ctx, req) + require.Error(t, err) + require.Contains(t, err.Error(), "gateway error") + }) +} diff --git a/core/capabilities/webapi/target/types.go b/core/capabilities/webapi/target/types.go new file mode 100644 index 00000000000..d469727db20 --- /dev/null +++ b/core/capabilities/webapi/target/types.go @@ -0,0 +1,30 @@ +package target + +import "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" + +const ( + AllAtOnce string = "AllAtOnce" + SingleNode string = "SingleNode" +) + +type Input struct { + URL string `json:"url"` // URL to query, only http and https protocols are supported. + Method string `json:"method,omitempty"` // HTTP verb, defaults to GET. + Headers map[string]string `json:"headers,omitempty"` // HTTP headers, defaults to empty. + Body string `json:"body,omitempty"` // Base64-encoded binary body, defaults to empty. +} + +// WorkflowConfig is the configuration of the workflow that is passed in the workflow execute request +type WorkflowConfig struct { + TimeoutMs uint32 `json:"timeoutMs,omitempty"` // Timeout in milliseconds + RetryCount uint8 `json:"retryCount,omitempty"` // Number of retries, defaults to 0. + Schedule string `json:"schedule,omitempty"` // schedule, defaults to empty. +} + +// CapabilityConfigConfig is the configuration for the Target capability and handler +// TODO: handle retry configurations here +// Note that workflow executions have their own internal timeouts and retries set by the user +// that are separate from this configuration +type Config struct { + RateLimiter common.RateLimiterConfig `toml:"rateLimiter"` +} diff --git a/core/services/gateway/api/message.go b/core/services/gateway/api/message.go index 5e6c8e49247..86813c26b35 100644 --- a/core/services/gateway/api/message.go +++ b/core/services/gateway/api/message.go @@ -93,7 +93,7 @@ func (m *Message) Sign(privateKey *ecdsa.PrivateKey) error { if m == nil { return errors.New("nil message") } - rawData := getRawMessageBody(&m.Body) + rawData := GetRawMessageBody(&m.Body) signature, err := gw_common.SignData(privateKey, rawData...) if err != nil { return err @@ -107,7 +107,7 @@ func (m *Message) ExtractSigner() (signerAddress []byte, err error) { if m == nil { return nil, errors.New("nil message") } - rawData := getRawMessageBody(&m.Body) + rawData := GetRawMessageBody(&m.Body) signatureBytes, err := hex.DecodeString(m.Signature) if err != nil { return nil, err @@ -115,7 +115,7 @@ func (m *Message) ExtractSigner() (signerAddress []byte, err error) { return gw_common.ExtractSigner(signatureBytes, rawData...) } -func getRawMessageBody(msgBody *MessageBody) [][]byte { +func GetRawMessageBody(msgBody *MessageBody) [][]byte { alignedMessageId := make([]byte, MessageIdMaxLen) copy(alignedMessageId, msgBody.MessageId) alignedMethod := make([]byte, MessageMethodMaxLen) diff --git a/core/services/gateway/connector/connector.go b/core/services/gateway/connector/connector.go index 82ff1e9e13a..d34531cbe3f 100644 --- a/core/services/gateway/connector/connector.go +++ b/core/services/gateway/connector/connector.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math/rand" "net/url" "sync" "time" @@ -27,7 +28,10 @@ type GatewayConnector interface { network.ConnectionInitiator AddHandler(methods []string, handler GatewayConnectorHandler) error + // SendToGateway takes a signed message as argument and sends it to the specified gateway SendToGateway(ctx context.Context, gatewayId string, msg *api.Message) error + // SendToAvailableGateway selects first available gateway node, signs the message and sends then message + SendToAvailableGateway(ctx context.Context, msg *api.MessageBody) error } // Signer implementation needs to be provided by a GatewayConnector user (node) @@ -112,8 +116,9 @@ func NewGatewayConnector(config *ConnectorConfig, signer Signer, clock clockwork if err != nil { return nil, err } + l := lggr.With("URL", parsedURL) gateway := &gatewayState{ - conn: network.NewWSConnectionWrapper(lggr), + conn: network.NewWSConnectionWrapper(l), config: gw, url: parsedURL, wsClient: network.NewWebSocketClient(config.WsClientConfig, connector, lggr), @@ -157,6 +162,38 @@ func (c *gatewayConnector) SendToGateway(ctx context.Context, gatewayId string, return gateway.conn.Write(ctx, websocket.BinaryMessage, data) } +func (c *gatewayConnector) SendToAvailableGateway(ctx context.Context, msg *api.MessageBody) error { + // select a gateway to broadcast in round robin manner + var gids []string + for gid := range c.gateways { + gids = append(gids, gid) + } + idx := rand.Intn(len(gids)) + gatewayID := gids[idx] + + m := &api.Message{ + Body: api.MessageBody{ + MessageId: msg.MessageId, + DonId: c.config.DonId, + Method: msg.Method, + Payload: msg.Payload, + Receiver: gatewayID, + }, + } + signature, err := c.signer.Sign(api.GetRawMessageBody(&m.Body)...) + if err != nil { + return err + } + m.Signature = utils.StringToHex(string(signature)) + m.Body.Sender = utils.StringToHex(string(c.nodeAddress)) + + err = c.SendToGateway(ctx, gatewayID, m) + if err != nil { + return fmt.Errorf("failed to send message to gateway %s: %v", gatewayID, err) + } + return nil +} + func (c *gatewayConnector) readLoop(gatewayState *gatewayState) { ctx, cancel := c.shutdownCh.NewCtx() defer cancel() diff --git a/core/services/gateway/connector/mocks/gateway_connector.go b/core/services/gateway/connector/mocks/gateway_connector.go index 76e3ff5c86e..9f8f23b4909 100644 --- a/core/services/gateway/connector/mocks/gateway_connector.go +++ b/core/services/gateway/connector/mocks/gateway_connector.go @@ -235,6 +235,53 @@ func (_c *GatewayConnector_NewAuthHeader_Call) RunAndReturn(run func(*url.URL) ( return _c } +// SendToAvailableGateway provides a mock function with given fields: ctx, msg +func (_m *GatewayConnector) SendToAvailableGateway(ctx context.Context, msg *api.MessageBody) error { + ret := _m.Called(ctx, msg) + + if len(ret) == 0 { + panic("no return value specified for SendToAvailableGateway") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *api.MessageBody) error); ok { + r0 = rf(ctx, msg) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// GatewayConnector_SendToAvailableGateway_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendToAvailableGateway' +type GatewayConnector_SendToAvailableGateway_Call struct { + *mock.Call +} + +// SendToAvailableGateway is a helper method to define mock.On call +// - ctx context.Context +// - msg *api.MessageBody +func (_e *GatewayConnector_Expecter) SendToAvailableGateway(ctx interface{}, msg interface{}) *GatewayConnector_SendToAvailableGateway_Call { + return &GatewayConnector_SendToAvailableGateway_Call{Call: _e.mock.On("SendToAvailableGateway", ctx, msg)} +} + +func (_c *GatewayConnector_SendToAvailableGateway_Call) Run(run func(ctx context.Context, msg *api.MessageBody)) *GatewayConnector_SendToAvailableGateway_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*api.MessageBody)) + }) + return _c +} + +func (_c *GatewayConnector_SendToAvailableGateway_Call) Return(_a0 error) *GatewayConnector_SendToAvailableGateway_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *GatewayConnector_SendToAvailableGateway_Call) RunAndReturn(run func(context.Context, *api.MessageBody) error) *GatewayConnector_SendToAvailableGateway_Call { + _c.Call.Return(run) + return _c +} + // SendToGateway provides a mock function with given fields: ctx, gatewayId, msg func (_m *GatewayConnector) SendToGateway(ctx context.Context, gatewayId string, msg *api.Message) error { ret := _m.Called(ctx, gatewayId, msg) diff --git a/core/services/gateway/handler_factory.go b/core/services/gateway/handler_factory.go index ca6b98e55aa..520f1b946c2 100644 --- a/core/services/gateway/handler_factory.go +++ b/core/services/gateway/handler_factory.go @@ -15,6 +15,7 @@ import ( const ( FunctionsHandlerType HandlerType = "functions" DummyHandlerType HandlerType = "dummy" + WebCapabilitiesType HandlerType = "web-capabilities" ) type handlerFactory struct { @@ -26,7 +27,11 @@ type handlerFactory struct { var _ HandlerFactory = (*handlerFactory)(nil) func NewHandlerFactory(legacyChains legacyevm.LegacyChainContainer, ds sqlutil.DataSource, lggr logger.Logger) HandlerFactory { - return &handlerFactory{legacyChains, ds, lggr} + return &handlerFactory{ + legacyChains, + ds, + lggr, + } } func (hf *handlerFactory) NewHandler(handlerType HandlerType, handlerConfig json.RawMessage, donConfig *config.DONConfig, don handlers.DON) (handlers.Handler, error) { diff --git a/core/services/gateway/handlers/webcapabilities/handler.go b/core/services/gateway/handlers/webcapabilities/handler.go new file mode 100644 index 00000000000..15fe6dbdecb --- /dev/null +++ b/core/services/gateway/handlers/webcapabilities/handler.go @@ -0,0 +1,6 @@ +package webcapabilities + +const ( + // NOTE: more methods will go here. HTTP trigger/action/target; etc. + MethodWebAPITarget = "web_api_target" +) diff --git a/core/services/gateway/handlers/webcapabilities/webapi.go b/core/services/gateway/handlers/webcapabilities/webapi.go new file mode 100644 index 00000000000..ce964b29c44 --- /dev/null +++ b/core/services/gateway/handlers/webcapabilities/webapi.go @@ -0,0 +1,18 @@ +package webcapabilities + +type TargetRequestPayload struct { + URL string `json:"url"` // URL to query, only http and https protocols are supported. + Method string `json:"method,omitempty"` // HTTP verb, defaults to GET. + Headers map[string]string `json:"headers,omitempty"` // HTTP headers, defaults to empty. + Body []byte `json:"body,omitempty"` // HTTP request body + TimeoutMs uint32 `json:"timeoutMs,omitempty"` // Timeout in milliseconds + RetryCount uint8 `json:"retryCount,omitempty"` // Number of retries, defaults to 0. +} + +type TargetResponsePayload struct { + Success bool `json:"success"` // true if HTTP request was successful + ErrorMessage string `json:"error_message,omitempty"` // error message in case of failure + StatusCode uint8 `json:"statusCode"` // HTTP status code + Headers map[string]string `json:"headers,omitempty"` // HTTP headers + Body []byte `json:"body,omitempty"` // HTTP response body +} diff --git a/core/services/standardcapabilities/delegate.go b/core/services/standardcapabilities/delegate.go index 64134995b2b..a5da209308e 100644 --- a/core/services/standardcapabilities/delegate.go +++ b/core/services/standardcapabilities/delegate.go @@ -14,6 +14,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/core" gatewayconnector "github.com/smartcontractkit/chainlink/v2/core/capabilities/gateway_connector" "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi" + webapitarget "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi/target" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/generic" @@ -43,6 +44,7 @@ type Delegate struct { const ( commandOverrideForWebAPITrigger = "__builtin_web-api-trigger" + commandOverrideForWebAPITarget = "__builtin_web-api-target" ) func NewDelegate(logger logger.Logger, ds sqlutil.DataSource, jobORM job.ORM, registry core.CapabilitiesRegistry, @@ -87,6 +89,32 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser return []job.ServiceCtx{triggerSrvc}, nil } + if spec.StandardCapabilitiesSpec.Command == commandOverrideForWebAPITarget { + if d.gatewayConnectorWrapper == nil { + return nil, errors.New("gateway connector is required for web API Target capability") + } + connector := d.gatewayConnectorWrapper.GetGatewayConnector() + if len(spec.StandardCapabilitiesSpec.Config) == 0 { + return nil, errors.New("config is empty") + } + // TODO: check if TOML is the only supported format here + var targetCfg webapitarget.Config + err := toml.Unmarshal([]byte(spec.StandardCapabilitiesSpec.Config), &targetCfg) + if err != nil { + return nil, err + } + lggr := d.logger.Named("WebAPITarget") + handler, err := webapitarget.NewConnectorHandler(connector, targetCfg, lggr) + if err != nil { + return nil, err + } + capability, err := webapitarget.NewCapability(targetCfg, d.registry, handler, lggr) + if err != nil { + return nil, err + } + return []job.ServiceCtx{capability, handler}, nil + } + standardCapability := newStandardCapabilities(log, spec.StandardCapabilitiesSpec, d.cfg, telemetryService, kvStore, d.registry, errorLog, pr, relayerSet) From d8e22695ee7499eb47418e3f75b0a9e500fa3798 Mon Sep 17 00:00:00 2001 From: jinhoonbang Date: Thu, 19 Sep 2024 13:53:26 -0700 Subject: [PATCH 02/16] self-review --- .changeset/six-frogs-juggle.md | 5 + core/capabilities/webapi/target/capability.go | 64 ++++---- .../webapi/target/connector_handler.go | 23 ++- .../capabilities/webapi/target/target_test.go | 6 +- core/services/gateway/connector/connector.go | 56 ++++--- .../connector/mocks/gateway_connector.go | 153 ++++++++++++++---- .../handlers/webcapabilities/webapi.go | 11 +- 7 files changed, 220 insertions(+), 98 deletions(-) create mode 100644 .changeset/six-frogs-juggle.md diff --git a/.changeset/six-frogs-juggle.md b/.changeset/six-frogs-juggle.md new file mode 100644 index 00000000000..2b960c4be0f --- /dev/null +++ b/.changeset/six-frogs-juggle.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#added HTTP target capability and gateway connector handler diff --git a/core/capabilities/webapi/target/capability.go b/core/capabilities/webapi/target/capability.go index eb3bc261453..a93bfac0c70 100644 --- a/core/capabilities/webapi/target/capability.go +++ b/core/capabilities/webapi/target/capability.go @@ -5,13 +5,13 @@ import ( "encoding/json" "fmt" "strings" + "sync" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/types/core" "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/capabilities/validation" - "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/webcapabilities" ) @@ -25,23 +25,26 @@ var capabilityInfo = capabilities.MustNewCapabilityInfo( "A target that sends HTTP requests to external clients via the Chainlink Gateway.", ) +// Capability is a target capability that sends HTTP requests to external clients via the Chainlink Gateway. type Capability struct { - capabilityInfo capabilities.CapabilityInfo - connectorHandler *ConnectorHandler - lggr logger.Logger - registry core.CapabilitiesRegistry - config Config - activeWorkflows map[string]struct{} // tracks registered workflows + capabilityInfo capabilities.CapabilityInfo + connectorHandler *ConnectorHandler + lggr logger.Logger + registry core.CapabilitiesRegistry + config Config + registeredWorkflows map[string]struct{} + registeredWorkflowsMu sync.Mutex } func NewCapability(config Config, registry core.CapabilitiesRegistry, connectorHandler *ConnectorHandler, lggr logger.Logger) (*Capability, error) { return &Capability{ - capabilityInfo: capabilityInfo, - config: config, - registry: registry, - connectorHandler: connectorHandler, - activeWorkflows: make(map[string]struct{}), - lggr: lggr, + capabilityInfo: capabilityInfo, + config: config, + registry: registry, + connectorHandler: connectorHandler, + registeredWorkflows: make(map[string]struct{}), + registeredWorkflowsMu: sync.Mutex{}, + lggr: lggr, }, nil } @@ -92,17 +95,18 @@ func (c *Capability) Execute(ctx context.Context, req capabilities.CapabilityReq return capabilities.CapabilityResponse{}, err } - if _, ok := c.activeWorkflows[req.Metadata.WorkflowID]; !ok { + c.registeredWorkflowsMu.Lock() + defer c.registeredWorkflowsMu.Unlock() + if _, ok := c.registeredWorkflows[req.Metadata.WorkflowID]; !ok { return capabilities.CapabilityResponse{}, fmt.Errorf("workflow is not registered: %v", req.Metadata.WorkflowID) } payload := webcapabilities.TargetRequestPayload{ - URL: input.URL, - Method: input.Method, - Headers: input.Headers, - Body: []byte(input.Body), - TimeoutMs: workflowCfg.TimeoutMs, - RetryCount: workflowCfg.RetryCount, + URL: input.URL, + Method: input.Method, + Headers: input.Headers, + Body: []byte(input.Body), + TimeoutMs: workflowCfg.TimeoutMs, } payloadBytes, err := json.Marshal(payload) @@ -110,16 +114,10 @@ func (c *Capability) Execute(ctx context.Context, req capabilities.CapabilityReq return capabilities.CapabilityResponse{}, err } - gatewayReq := &api.MessageBody{ - MessageId: messageID, - Method: webcapabilities.MethodWebAPITarget, - Payload: payloadBytes, - } - switch workflowCfg.Schedule { case SingleNode: // blocking call to handle single node request. waits for response from gateway - resp, err := c.connectorHandler.HandleSingleNodeRequest(ctx, gatewayReq) + resp, err := c.connectorHandler.HandleSingleNodeRequest(ctx, messageID, payloadBytes) if err != nil { return capabilities.CapabilityResponse{}, err } @@ -151,16 +149,20 @@ func (c *Capability) RegisterToWorkflow(ctx context.Context, req capabilities.Re if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowID); err != nil { return fmt.Errorf("workflow ID is invalid: %w", err) } - c.activeWorkflows[req.Metadata.WorkflowID] = struct{}{} + c.registeredWorkflowsMu.Lock() + defer c.registeredWorkflowsMu.Unlock() + c.registeredWorkflows[req.Metadata.WorkflowID] = struct{}{} return nil } func (c *Capability) UnregisterFromWorkflow(ctx context.Context, req capabilities.UnregisterFromWorkflowRequest) error { - // TODO: is best-effort removal of workflow sufficient here? or should it error if workflow is not found for some reason? - if _, ok := c.activeWorkflows[req.Metadata.WorkflowID]; !ok { + // if workflow is not found for some reason, just log a warning + c.registeredWorkflowsMu.Lock() + defer c.registeredWorkflowsMu.Unlock() + if _, ok := c.registeredWorkflows[req.Metadata.WorkflowID]; !ok { c.lggr.Warnw("workflow not found", "workflowID", req.Metadata.WorkflowID) } else { - delete(c.activeWorkflows, req.Metadata.WorkflowID) + delete(c.registeredWorkflows, req.Metadata.WorkflowID) } return nil } diff --git a/core/capabilities/webapi/target/connector_handler.go b/core/capabilities/webapi/target/connector_handler.go index 797cfe1f315..fa19cc5d6de 100644 --- a/core/capabilities/webapi/target/connector_handler.go +++ b/core/capabilities/webapi/target/connector_handler.go @@ -3,6 +3,7 @@ package target import ( "context" "encoding/json" + "sort" "sync" "github.com/pkg/errors" @@ -41,15 +42,27 @@ func NewConnectorHandler(gc connector.GatewayConnector, config Config, lgger log // HandleSingleNodeRequest sends a request to first available gateway node and blocks until response is received // TODO: handle retries and timeouts -func (c *ConnectorHandler) HandleSingleNodeRequest(ctx context.Context, msg *api.MessageBody) (*api.Message, error) { +func (c *ConnectorHandler) HandleSingleNodeRequest(ctx context.Context, messageID string, payload []byte) (*api.Message, error) { ch := make(chan *api.Message, 1) c.responseChsMu.Lock() - c.responseChs[msg.MessageId] = ch + c.responseChs[messageID] = ch c.responseChsMu.Unlock() - l := logger.With(c.lggr, "messageId", msg.MessageId) + l := logger.With(c.lggr, "messageID", messageID) l.Debugw("sending request to gateway") - err := c.gc.SendToAvailableGateway(ctx, msg) + body := &api.MessageBody{ + MessageId: messageID, + DonId: c.gc.DonId(), + Method: webcapabilities.MethodWebAPITarget, + Payload: payload, + } + + // simply, send request to first available gateway node from sorted list + // this allows for deterministic selection of gateay node receiver for easier debugging + gatewayIds := c.gc.GatewayIds() + sort.Strings(gatewayIds) + + err := c.gc.SignAndSendToGateway(ctx, gatewayIds[0], body) if err != nil { return nil, errors.Wrap(err, "failed to send request to gateway") } @@ -66,6 +79,8 @@ func (c *ConnectorHandler) HandleGatewayMessage(ctx context.Context, gatewayID s body := &msg.Body l := logger.With(c.lggr, "gatewayID", gatewayID, "method", body.Method, "messageID", msg.Body.MessageId) if !c.rateLimiter.Allow(body.Sender) { + // error is logged here instead of warning because if a message from gateway is rate-limited, + // the workflow will eventually fail with timeout as there are no retries in place yet c.lggr.Errorw("request rate-limited") return } diff --git a/core/capabilities/webapi/target/target_test.go b/core/capabilities/webapi/target/target_test.go index cccad6d54b9..f2e2e63977e 100644 --- a/core/capabilities/webapi/target/target_test.go +++ b/core/capabilities/webapi/target/target_test.go @@ -139,6 +139,8 @@ func TestRegisterUnregister(t *testing.T) { func TestCapability_Execute(t *testing.T) { th := setup(t) ctx := testutils.Context(t) + th.connector.On("DonId").Return("donId") + th.connector.On("GatewayIds").Return([]string{"gateway2", "gateway1"}) t.Run("unregistered workflow", func(t *testing.T) { req := capabilityRequest(t) @@ -163,7 +165,7 @@ func TestCapability_Execute(t *testing.T) { gatewayResp := gatewayResponse(t, msgID) - th.connector.On("SendToAvailableGateway", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { + th.connector.On("SignAndSendToGateway", "gateway1", mock.Anything).Return(nil).Run(func(args mock.Arguments) { th.connectorHandler.HandleGatewayMessage(ctx, "gateway1", gatewayResp) }).Once() @@ -232,7 +234,7 @@ func TestCapability_Execute(t *testing.T) { req := capabilityRequest(t) require.NoError(t, err) - th.connector.On("SendToAvailableGateway", mock.Anything, mock.Anything).Return(errors.New("gateway error")).Once() + th.connector.On("SignAndSendToGateway", mock.Anything, mock.Anything).Return(errors.New("gateway error")).Once() _, err = th.capability.Execute(ctx, req) require.Error(t, err) require.Contains(t, err.Error(), "gateway error") diff --git a/core/services/gateway/connector/connector.go b/core/services/gateway/connector/connector.go index d34531cbe3f..db7e74ea815 100644 --- a/core/services/gateway/connector/connector.go +++ b/core/services/gateway/connector/connector.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "math/rand" "net/url" "sync" "time" @@ -30,8 +29,12 @@ type GatewayConnector interface { AddHandler(methods []string, handler GatewayConnectorHandler) error // SendToGateway takes a signed message as argument and sends it to the specified gateway SendToGateway(ctx context.Context, gatewayId string, msg *api.Message) error - // SendToAvailableGateway selects first available gateway node, signs the message and sends then message - SendToAvailableGateway(ctx context.Context, msg *api.MessageBody) error + // SignAndSendToGateway signs the message and sends the message to the specified gateway + SignAndSendToGateway(ctx context.Context, gatewayId string, msg *api.MessageBody) error + // GatewayIds returns the list of Gateway Ids + GatewayIds() []string + // DonId returns the DON ID + DonId() string } // Signer implementation needs to be provided by a GatewayConnector user (node) @@ -162,38 +165,41 @@ func (c *gatewayConnector) SendToGateway(ctx context.Context, gatewayId string, return gateway.conn.Write(ctx, websocket.BinaryMessage, data) } -func (c *gatewayConnector) SendToAvailableGateway(ctx context.Context, msg *api.MessageBody) error { - // select a gateway to broadcast in round robin manner - var gids []string - for gid := range c.gateways { - gids = append(gids, gid) +func (c *gatewayConnector) SignAndSendToGateway(ctx context.Context, gatewayId string, body *api.MessageBody) error { + signature, err := c.signer.Sign(api.GetRawMessageBody(body)...) + if err != nil { + return err } - idx := rand.Intn(len(gids)) - gatewayID := gids[idx] - - m := &api.Message{ + msg := &api.Message{ Body: api.MessageBody{ - MessageId: msg.MessageId, - DonId: c.config.DonId, - Method: msg.Method, - Payload: msg.Payload, - Receiver: gatewayID, + MessageId: body.MessageId, + DonId: body.DonId, + Method: body.Method, + Payload: body.Payload, + Sender: utils.StringToHex(string(c.nodeAddress)), }, + Signature: utils.StringToHex(string(signature)), } - signature, err := c.signer.Sign(api.GetRawMessageBody(&m.Body)...) - if err != nil { - return err - } - m.Signature = utils.StringToHex(string(signature)) - m.Body.Sender = utils.StringToHex(string(c.nodeAddress)) - err = c.SendToGateway(ctx, gatewayID, m) + err = c.SendToGateway(ctx, gatewayId, msg) if err != nil { - return fmt.Errorf("failed to send message to gateway %s: %v", gatewayID, err) + return fmt.Errorf("failed to send message to gateway %s: %v", gatewayId, err) } return nil } +func (c *gatewayConnector) GatewayIds() []string { + var gids []string + for gid := range c.gateways { + gids = append(gids, gid) + } + return gids +} + +func (c *gatewayConnector) DonId() string { + return c.config.DonId +} + func (c *gatewayConnector) readLoop(gatewayState *gatewayState) { ctx, cancel := c.shutdownCh.NewCtx() defer cancel() diff --git a/core/services/gateway/connector/mocks/gateway_connector.go b/core/services/gateway/connector/mocks/gateway_connector.go index 9f8f23b4909..88b81898cbf 100644 --- a/core/services/gateway/connector/mocks/gateway_connector.go +++ b/core/services/gateway/connector/mocks/gateway_connector.go @@ -177,6 +177,98 @@ func (_c *GatewayConnector_Close_Call) RunAndReturn(run func() error) *GatewayCo return _c } +// DonId provides a mock function with given fields: +func (_m *GatewayConnector) DonId() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for DonId") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// GatewayConnector_DonId_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DonId' +type GatewayConnector_DonId_Call struct { + *mock.Call +} + +// DonId is a helper method to define mock.On call +func (_e *GatewayConnector_Expecter) DonId() *GatewayConnector_DonId_Call { + return &GatewayConnector_DonId_Call{Call: _e.mock.On("DonId")} +} + +func (_c *GatewayConnector_DonId_Call) Run(run func()) *GatewayConnector_DonId_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *GatewayConnector_DonId_Call) Return(_a0 string) *GatewayConnector_DonId_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *GatewayConnector_DonId_Call) RunAndReturn(run func() string) *GatewayConnector_DonId_Call { + _c.Call.Return(run) + return _c +} + +// GatewayIds provides a mock function with given fields: +func (_m *GatewayConnector) GatewayIds() []string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GatewayIds") + } + + var r0 []string + if rf, ok := ret.Get(0).(func() []string); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + return r0 +} + +// GatewayConnector_GatewayIds_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GatewayIds' +type GatewayConnector_GatewayIds_Call struct { + *mock.Call +} + +// GatewayIds is a helper method to define mock.On call +func (_e *GatewayConnector_Expecter) GatewayIds() *GatewayConnector_GatewayIds_Call { + return &GatewayConnector_GatewayIds_Call{Call: _e.mock.On("GatewayIds")} +} + +func (_c *GatewayConnector_GatewayIds_Call) Run(run func()) *GatewayConnector_GatewayIds_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *GatewayConnector_GatewayIds_Call) Return(_a0 []string) *GatewayConnector_GatewayIds_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *GatewayConnector_GatewayIds_Call) RunAndReturn(run func() []string) *GatewayConnector_GatewayIds_Call { + _c.Call.Return(run) + return _c +} + // NewAuthHeader provides a mock function with given fields: _a0 func (_m *GatewayConnector) NewAuthHeader(_a0 *url.URL) ([]byte, error) { ret := _m.Called(_a0) @@ -235,17 +327,17 @@ func (_c *GatewayConnector_NewAuthHeader_Call) RunAndReturn(run func(*url.URL) ( return _c } -// SendToAvailableGateway provides a mock function with given fields: ctx, msg -func (_m *GatewayConnector) SendToAvailableGateway(ctx context.Context, msg *api.MessageBody) error { - ret := _m.Called(ctx, msg) +// SendToGateway provides a mock function with given fields: ctx, gatewayId, msg +func (_m *GatewayConnector) SendToGateway(ctx context.Context, gatewayId string, msg *api.Message) error { + ret := _m.Called(ctx, gatewayId, msg) if len(ret) == 0 { - panic("no return value specified for SendToAvailableGateway") + panic("no return value specified for SendToGateway") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *api.MessageBody) error); ok { - r0 = rf(ctx, msg) + if rf, ok := ret.Get(0).(func(context.Context, string, *api.Message) error); ok { + r0 = rf(ctx, gatewayId, msg) } else { r0 = ret.Error(0) } @@ -253,45 +345,46 @@ func (_m *GatewayConnector) SendToAvailableGateway(ctx context.Context, msg *api return r0 } -// GatewayConnector_SendToAvailableGateway_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendToAvailableGateway' -type GatewayConnector_SendToAvailableGateway_Call struct { +// GatewayConnector_SendToGateway_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendToGateway' +type GatewayConnector_SendToGateway_Call struct { *mock.Call } -// SendToAvailableGateway is a helper method to define mock.On call +// SendToGateway is a helper method to define mock.On call // - ctx context.Context -// - msg *api.MessageBody -func (_e *GatewayConnector_Expecter) SendToAvailableGateway(ctx interface{}, msg interface{}) *GatewayConnector_SendToAvailableGateway_Call { - return &GatewayConnector_SendToAvailableGateway_Call{Call: _e.mock.On("SendToAvailableGateway", ctx, msg)} +// - gatewayId string +// - msg *api.Message +func (_e *GatewayConnector_Expecter) SendToGateway(ctx interface{}, gatewayId interface{}, msg interface{}) *GatewayConnector_SendToGateway_Call { + return &GatewayConnector_SendToGateway_Call{Call: _e.mock.On("SendToGateway", ctx, gatewayId, msg)} } -func (_c *GatewayConnector_SendToAvailableGateway_Call) Run(run func(ctx context.Context, msg *api.MessageBody)) *GatewayConnector_SendToAvailableGateway_Call { +func (_c *GatewayConnector_SendToGateway_Call) Run(run func(ctx context.Context, gatewayId string, msg *api.Message)) *GatewayConnector_SendToGateway_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*api.MessageBody)) + run(args[0].(context.Context), args[1].(string), args[2].(*api.Message)) }) return _c } -func (_c *GatewayConnector_SendToAvailableGateway_Call) Return(_a0 error) *GatewayConnector_SendToAvailableGateway_Call { +func (_c *GatewayConnector_SendToGateway_Call) Return(_a0 error) *GatewayConnector_SendToGateway_Call { _c.Call.Return(_a0) return _c } -func (_c *GatewayConnector_SendToAvailableGateway_Call) RunAndReturn(run func(context.Context, *api.MessageBody) error) *GatewayConnector_SendToAvailableGateway_Call { +func (_c *GatewayConnector_SendToGateway_Call) RunAndReturn(run func(context.Context, string, *api.Message) error) *GatewayConnector_SendToGateway_Call { _c.Call.Return(run) return _c } -// SendToGateway provides a mock function with given fields: ctx, gatewayId, msg -func (_m *GatewayConnector) SendToGateway(ctx context.Context, gatewayId string, msg *api.Message) error { +// SignAndSendToGateway provides a mock function with given fields: ctx, gatewayId, msg +func (_m *GatewayConnector) SignAndSendToGateway(ctx context.Context, gatewayId string, msg *api.MessageBody) error { ret := _m.Called(ctx, gatewayId, msg) if len(ret) == 0 { - panic("no return value specified for SendToGateway") + panic("no return value specified for SignAndSendToGateway") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, *api.Message) error); ok { + if rf, ok := ret.Get(0).(func(context.Context, string, *api.MessageBody) error); ok { r0 = rf(ctx, gatewayId, msg) } else { r0 = ret.Error(0) @@ -300,32 +393,32 @@ func (_m *GatewayConnector) SendToGateway(ctx context.Context, gatewayId string, return r0 } -// GatewayConnector_SendToGateway_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SendToGateway' -type GatewayConnector_SendToGateway_Call struct { +// GatewayConnector_SignAndSendToGateway_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SignAndSendToGateway' +type GatewayConnector_SignAndSendToGateway_Call struct { *mock.Call } -// SendToGateway is a helper method to define mock.On call +// SignAndSendToGateway is a helper method to define mock.On call // - ctx context.Context // - gatewayId string -// - msg *api.Message -func (_e *GatewayConnector_Expecter) SendToGateway(ctx interface{}, gatewayId interface{}, msg interface{}) *GatewayConnector_SendToGateway_Call { - return &GatewayConnector_SendToGateway_Call{Call: _e.mock.On("SendToGateway", ctx, gatewayId, msg)} +// - msg *api.MessageBody +func (_e *GatewayConnector_Expecter) SignAndSendToGateway(ctx interface{}, gatewayId interface{}, msg interface{}) *GatewayConnector_SignAndSendToGateway_Call { + return &GatewayConnector_SignAndSendToGateway_Call{Call: _e.mock.On("SignAndSendToGateway", ctx, gatewayId, msg)} } -func (_c *GatewayConnector_SendToGateway_Call) Run(run func(ctx context.Context, gatewayId string, msg *api.Message)) *GatewayConnector_SendToGateway_Call { +func (_c *GatewayConnector_SignAndSendToGateway_Call) Run(run func(ctx context.Context, gatewayId string, msg *api.MessageBody)) *GatewayConnector_SignAndSendToGateway_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(*api.Message)) + run(args[0].(context.Context), args[1].(string), args[2].(*api.MessageBody)) }) return _c } -func (_c *GatewayConnector_SendToGateway_Call) Return(_a0 error) *GatewayConnector_SendToGateway_Call { +func (_c *GatewayConnector_SignAndSendToGateway_Call) Return(_a0 error) *GatewayConnector_SignAndSendToGateway_Call { _c.Call.Return(_a0) return _c } -func (_c *GatewayConnector_SendToGateway_Call) RunAndReturn(run func(context.Context, string, *api.Message) error) *GatewayConnector_SendToGateway_Call { +func (_c *GatewayConnector_SignAndSendToGateway_Call) RunAndReturn(run func(context.Context, string, *api.MessageBody) error) *GatewayConnector_SignAndSendToGateway_Call { _c.Call.Return(run) return _c } diff --git a/core/services/gateway/handlers/webcapabilities/webapi.go b/core/services/gateway/handlers/webcapabilities/webapi.go index ce964b29c44..ec6781ca71c 100644 --- a/core/services/gateway/handlers/webcapabilities/webapi.go +++ b/core/services/gateway/handlers/webcapabilities/webapi.go @@ -1,12 +1,11 @@ package webcapabilities type TargetRequestPayload struct { - URL string `json:"url"` // URL to query, only http and https protocols are supported. - Method string `json:"method,omitempty"` // HTTP verb, defaults to GET. - Headers map[string]string `json:"headers,omitempty"` // HTTP headers, defaults to empty. - Body []byte `json:"body,omitempty"` // HTTP request body - TimeoutMs uint32 `json:"timeoutMs,omitempty"` // Timeout in milliseconds - RetryCount uint8 `json:"retryCount,omitempty"` // Number of retries, defaults to 0. + URL string `json:"url"` // URL to query, only http and https protocols are supported. + Method string `json:"method,omitempty"` // HTTP verb, defaults to GET. + Headers map[string]string `json:"headers,omitempty"` // HTTP headers, defaults to empty. + Body []byte `json:"body,omitempty"` // HTTP request body + TimeoutMs uint32 `json:"timeoutMs,omitempty"` // Timeout in milliseconds } type TargetResponsePayload struct { From 6064aba4f00f442763c8ff5a1a84080c58660031 Mon Sep 17 00:00:00 2001 From: jinhoonbang Date: Thu, 19 Sep 2024 15:05:32 -0700 Subject: [PATCH 03/16] fix linter --- .../webapi/target/connector_handler.go | 8 ++-- .../capabilities/webapi/target/target_test.go | 8 ++-- core/services/gateway/connector/connector.go | 12 ++--- .../connector/mocks/gateway_connector.go | 44 +++++++++---------- 4 files changed, 36 insertions(+), 36 deletions(-) diff --git a/core/capabilities/webapi/target/connector_handler.go b/core/capabilities/webapi/target/connector_handler.go index fa19cc5d6de..de17fc9ee78 100644 --- a/core/capabilities/webapi/target/connector_handler.go +++ b/core/capabilities/webapi/target/connector_handler.go @@ -52,17 +52,17 @@ func (c *ConnectorHandler) HandleSingleNodeRequest(ctx context.Context, messageI body := &api.MessageBody{ MessageId: messageID, - DonId: c.gc.DonId(), + DonId: c.gc.DonID(), Method: webcapabilities.MethodWebAPITarget, Payload: payload, } // simply, send request to first available gateway node from sorted list // this allows for deterministic selection of gateay node receiver for easier debugging - gatewayIds := c.gc.GatewayIds() - sort.Strings(gatewayIds) + gatewayIDs := c.gc.GatewayIDs() + sort.Strings(gatewayIDs) - err := c.gc.SignAndSendToGateway(ctx, gatewayIds[0], body) + err := c.gc.SignAndSendToGateway(ctx, gatewayIDs[0], body) if err != nil { return nil, errors.Wrap(err, "failed to send request to gateway") } diff --git a/core/capabilities/webapi/target/target_test.go b/core/capabilities/webapi/target/target_test.go index f2e2e63977e..79b3c9a872b 100644 --- a/core/capabilities/webapi/target/target_test.go +++ b/core/capabilities/webapi/target/target_test.go @@ -139,8 +139,8 @@ func TestRegisterUnregister(t *testing.T) { func TestCapability_Execute(t *testing.T) { th := setup(t) ctx := testutils.Context(t) - th.connector.On("DonId").Return("donId") - th.connector.On("GatewayIds").Return([]string{"gateway2", "gateway1"}) + th.connector.On("DonID").Return("donID") + th.connector.On("GatewayIDs").Return([]string{"gateway2", "gateway1"}) t.Run("unregistered workflow", func(t *testing.T) { req := capabilityRequest(t) @@ -165,7 +165,7 @@ func TestCapability_Execute(t *testing.T) { gatewayResp := gatewayResponse(t, msgID) - th.connector.On("SignAndSendToGateway", "gateway1", mock.Anything).Return(nil).Run(func(args mock.Arguments) { + th.connector.On("SignAndSendToGateway", mock.Anything, "gateway1", mock.Anything).Return(nil).Run(func(args mock.Arguments) { th.connectorHandler.HandleGatewayMessage(ctx, "gateway1", gatewayResp) }).Once() @@ -234,7 +234,7 @@ func TestCapability_Execute(t *testing.T) { req := capabilityRequest(t) require.NoError(t, err) - th.connector.On("SignAndSendToGateway", mock.Anything, mock.Anything).Return(errors.New("gateway error")).Once() + th.connector.On("SignAndSendToGateway", mock.Anything, "gateway1", mock.Anything).Return(errors.New("gateway error")).Once() _, err = th.capability.Execute(ctx, req) require.Error(t, err) require.Contains(t, err.Error(), "gateway error") diff --git a/core/services/gateway/connector/connector.go b/core/services/gateway/connector/connector.go index db7e74ea815..7f0e3db991c 100644 --- a/core/services/gateway/connector/connector.go +++ b/core/services/gateway/connector/connector.go @@ -31,10 +31,10 @@ type GatewayConnector interface { SendToGateway(ctx context.Context, gatewayId string, msg *api.Message) error // SignAndSendToGateway signs the message and sends the message to the specified gateway SignAndSendToGateway(ctx context.Context, gatewayId string, msg *api.MessageBody) error - // GatewayIds returns the list of Gateway Ids - GatewayIds() []string - // DonId returns the DON ID - DonId() string + // GatewayIDs returns the list of Gateway IDs + GatewayIDs() []string + // DonID returns the DON ID + DonID() string } // Signer implementation needs to be provided by a GatewayConnector user (node) @@ -188,7 +188,7 @@ func (c *gatewayConnector) SignAndSendToGateway(ctx context.Context, gatewayId s return nil } -func (c *gatewayConnector) GatewayIds() []string { +func (c *gatewayConnector) GatewayIDs() []string { var gids []string for gid := range c.gateways { gids = append(gids, gid) @@ -196,7 +196,7 @@ func (c *gatewayConnector) GatewayIds() []string { return gids } -func (c *gatewayConnector) DonId() string { +func (c *gatewayConnector) DonID() string { return c.config.DonId } diff --git a/core/services/gateway/connector/mocks/gateway_connector.go b/core/services/gateway/connector/mocks/gateway_connector.go index 88b81898cbf..8bf53d909a0 100644 --- a/core/services/gateway/connector/mocks/gateway_connector.go +++ b/core/services/gateway/connector/mocks/gateway_connector.go @@ -177,12 +177,12 @@ func (_c *GatewayConnector_Close_Call) RunAndReturn(run func() error) *GatewayCo return _c } -// DonId provides a mock function with given fields: -func (_m *GatewayConnector) DonId() string { +// DonID provides a mock function with given fields: +func (_m *GatewayConnector) DonID() string { ret := _m.Called() if len(ret) == 0 { - panic("no return value specified for DonId") + panic("no return value specified for DonID") } var r0 string @@ -195,39 +195,39 @@ func (_m *GatewayConnector) DonId() string { return r0 } -// GatewayConnector_DonId_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DonId' -type GatewayConnector_DonId_Call struct { +// GatewayConnector_DonID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DonID' +type GatewayConnector_DonID_Call struct { *mock.Call } -// DonId is a helper method to define mock.On call -func (_e *GatewayConnector_Expecter) DonId() *GatewayConnector_DonId_Call { - return &GatewayConnector_DonId_Call{Call: _e.mock.On("DonId")} +// DonID is a helper method to define mock.On call +func (_e *GatewayConnector_Expecter) DonID() *GatewayConnector_DonID_Call { + return &GatewayConnector_DonID_Call{Call: _e.mock.On("DonID")} } -func (_c *GatewayConnector_DonId_Call) Run(run func()) *GatewayConnector_DonId_Call { +func (_c *GatewayConnector_DonID_Call) Run(run func()) *GatewayConnector_DonID_Call { _c.Call.Run(func(args mock.Arguments) { run() }) return _c } -func (_c *GatewayConnector_DonId_Call) Return(_a0 string) *GatewayConnector_DonId_Call { +func (_c *GatewayConnector_DonID_Call) Return(_a0 string) *GatewayConnector_DonID_Call { _c.Call.Return(_a0) return _c } -func (_c *GatewayConnector_DonId_Call) RunAndReturn(run func() string) *GatewayConnector_DonId_Call { +func (_c *GatewayConnector_DonID_Call) RunAndReturn(run func() string) *GatewayConnector_DonID_Call { _c.Call.Return(run) return _c } -// GatewayIds provides a mock function with given fields: -func (_m *GatewayConnector) GatewayIds() []string { +// GatewayIDs provides a mock function with given fields: +func (_m *GatewayConnector) GatewayIDs() []string { ret := _m.Called() if len(ret) == 0 { - panic("no return value specified for GatewayIds") + panic("no return value specified for GatewayIDs") } var r0 []string @@ -242,29 +242,29 @@ func (_m *GatewayConnector) GatewayIds() []string { return r0 } -// GatewayConnector_GatewayIds_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GatewayIds' -type GatewayConnector_GatewayIds_Call struct { +// GatewayConnector_GatewayIDs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GatewayIDs' +type GatewayConnector_GatewayIDs_Call struct { *mock.Call } -// GatewayIds is a helper method to define mock.On call -func (_e *GatewayConnector_Expecter) GatewayIds() *GatewayConnector_GatewayIds_Call { - return &GatewayConnector_GatewayIds_Call{Call: _e.mock.On("GatewayIds")} +// GatewayIDs is a helper method to define mock.On call +func (_e *GatewayConnector_Expecter) GatewayIDs() *GatewayConnector_GatewayIDs_Call { + return &GatewayConnector_GatewayIDs_Call{Call: _e.mock.On("GatewayIDs")} } -func (_c *GatewayConnector_GatewayIds_Call) Run(run func()) *GatewayConnector_GatewayIds_Call { +func (_c *GatewayConnector_GatewayIDs_Call) Run(run func()) *GatewayConnector_GatewayIDs_Call { _c.Call.Run(func(args mock.Arguments) { run() }) return _c } -func (_c *GatewayConnector_GatewayIds_Call) Return(_a0 []string) *GatewayConnector_GatewayIds_Call { +func (_c *GatewayConnector_GatewayIDs_Call) Return(_a0 []string) *GatewayConnector_GatewayIDs_Call { _c.Call.Return(_a0) return _c } -func (_c *GatewayConnector_GatewayIds_Call) RunAndReturn(run func() []string) *GatewayConnector_GatewayIds_Call { +func (_c *GatewayConnector_GatewayIDs_Call) RunAndReturn(run func() []string) *GatewayConnector_GatewayIDs_Call { _c.Call.Return(run) return _c } From 96cb70213a925ad13b73f5fc1536c1f8ad82c1ee Mon Sep 17 00:00:00 2001 From: jinhoonbang Date: Thu, 19 Sep 2024 15:18:21 -0700 Subject: [PATCH 04/16] more linting fixes --- core/services/gateway/connector/connector.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/services/gateway/connector/connector.go b/core/services/gateway/connector/connector.go index 7f0e3db991c..64b1c0a6c52 100644 --- a/core/services/gateway/connector/connector.go +++ b/core/services/gateway/connector/connector.go @@ -30,7 +30,7 @@ type GatewayConnector interface { // SendToGateway takes a signed message as argument and sends it to the specified gateway SendToGateway(ctx context.Context, gatewayId string, msg *api.Message) error // SignAndSendToGateway signs the message and sends the message to the specified gateway - SignAndSendToGateway(ctx context.Context, gatewayId string, msg *api.MessageBody) error + SignAndSendToGateway(ctx context.Context, gatewayID string, msg *api.MessageBody) error // GatewayIDs returns the list of Gateway IDs GatewayIDs() []string // DonID returns the DON ID @@ -165,7 +165,7 @@ func (c *gatewayConnector) SendToGateway(ctx context.Context, gatewayId string, return gateway.conn.Write(ctx, websocket.BinaryMessage, data) } -func (c *gatewayConnector) SignAndSendToGateway(ctx context.Context, gatewayId string, body *api.MessageBody) error { +func (c *gatewayConnector) SignAndSendToGateway(ctx context.Context, gatewayID string, body *api.MessageBody) error { signature, err := c.signer.Sign(api.GetRawMessageBody(body)...) if err != nil { return err From 42643fc4df8f2dd68031accae5a5759eac305735 Mon Sep 17 00:00:00 2001 From: jinhoonbang Date: Thu, 19 Sep 2024 15:34:32 -0700 Subject: [PATCH 05/16] fix build --- core/services/gateway/connector/connector.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/services/gateway/connector/connector.go b/core/services/gateway/connector/connector.go index 64b1c0a6c52..7d964494e0a 100644 --- a/core/services/gateway/connector/connector.go +++ b/core/services/gateway/connector/connector.go @@ -181,9 +181,9 @@ func (c *gatewayConnector) SignAndSendToGateway(ctx context.Context, gatewayID s Signature: utils.StringToHex(string(signature)), } - err = c.SendToGateway(ctx, gatewayId, msg) + err = c.SendToGateway(ctx, gatewayID, msg) if err != nil { - return fmt.Errorf("failed to send message to gateway %s: %v", gatewayId, err) + return fmt.Errorf("failed to send message to gateway %s: %v", gatewayID, err) } return nil } From 342fa66f575471d673dfb7e398370383ac470391 Mon Sep 17 00:00:00 2001 From: jinhoonbang Date: Thu, 19 Sep 2024 15:51:30 -0700 Subject: [PATCH 06/16] regenerate mocks --- .../gateway/connector/mocks/gateway_connector.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/core/services/gateway/connector/mocks/gateway_connector.go b/core/services/gateway/connector/mocks/gateway_connector.go index 8bf53d909a0..9cb5632b313 100644 --- a/core/services/gateway/connector/mocks/gateway_connector.go +++ b/core/services/gateway/connector/mocks/gateway_connector.go @@ -375,9 +375,9 @@ func (_c *GatewayConnector_SendToGateway_Call) RunAndReturn(run func(context.Con return _c } -// SignAndSendToGateway provides a mock function with given fields: ctx, gatewayId, msg -func (_m *GatewayConnector) SignAndSendToGateway(ctx context.Context, gatewayId string, msg *api.MessageBody) error { - ret := _m.Called(ctx, gatewayId, msg) +// SignAndSendToGateway provides a mock function with given fields: ctx, gatewayID, msg +func (_m *GatewayConnector) SignAndSendToGateway(ctx context.Context, gatewayID string, msg *api.MessageBody) error { + ret := _m.Called(ctx, gatewayID, msg) if len(ret) == 0 { panic("no return value specified for SignAndSendToGateway") @@ -385,7 +385,7 @@ func (_m *GatewayConnector) SignAndSendToGateway(ctx context.Context, gatewayId var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, *api.MessageBody) error); ok { - r0 = rf(ctx, gatewayId, msg) + r0 = rf(ctx, gatewayID, msg) } else { r0 = ret.Error(0) } @@ -400,13 +400,13 @@ type GatewayConnector_SignAndSendToGateway_Call struct { // SignAndSendToGateway is a helper method to define mock.On call // - ctx context.Context -// - gatewayId string +// - gatewayID string // - msg *api.MessageBody -func (_e *GatewayConnector_Expecter) SignAndSendToGateway(ctx interface{}, gatewayId interface{}, msg interface{}) *GatewayConnector_SignAndSendToGateway_Call { - return &GatewayConnector_SignAndSendToGateway_Call{Call: _e.mock.On("SignAndSendToGateway", ctx, gatewayId, msg)} +func (_e *GatewayConnector_Expecter) SignAndSendToGateway(ctx interface{}, gatewayID interface{}, msg interface{}) *GatewayConnector_SignAndSendToGateway_Call { + return &GatewayConnector_SignAndSendToGateway_Call{Call: _e.mock.On("SignAndSendToGateway", ctx, gatewayID, msg)} } -func (_c *GatewayConnector_SignAndSendToGateway_Call) Run(run func(ctx context.Context, gatewayId string, msg *api.MessageBody)) *GatewayConnector_SignAndSendToGateway_Call { +func (_c *GatewayConnector_SignAndSendToGateway_Call) Run(run func(ctx context.Context, gatewayID string, msg *api.MessageBody)) *GatewayConnector_SignAndSendToGateway_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context), args[1].(string), args[2].(*api.MessageBody)) }) From c9bf494cd7f1e490f8f5c6801408c5681440ed6a Mon Sep 17 00:00:00 2001 From: jinhoonbang Date: Fri, 20 Sep 2024 16:29:31 -0700 Subject: [PATCH 07/16] Update core/capabilities/webapi/target/connector_handler.go Co-authored-by: Street <5597260+MStreet3@users.noreply.github.com> --- core/capabilities/webapi/target/connector_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/capabilities/webapi/target/connector_handler.go b/core/capabilities/webapi/target/connector_handler.go index de17fc9ee78..7812fe822b9 100644 --- a/core/capabilities/webapi/target/connector_handler.go +++ b/core/capabilities/webapi/target/connector_handler.go @@ -58,7 +58,7 @@ func (c *ConnectorHandler) HandleSingleNodeRequest(ctx context.Context, messageI } // simply, send request to first available gateway node from sorted list - // this allows for deterministic selection of gateay node receiver for easier debugging + // this allows for deterministic selection of gateway node receiver for easier debugging gatewayIDs := c.gc.GatewayIDs() sort.Strings(gatewayIDs) From 06f468d4c47a0e7c8854ec1ce63deac72de4a57e Mon Sep 17 00:00:00 2001 From: jinhoonbang Date: Mon, 23 Sep 2024 08:15:12 -0700 Subject: [PATCH 08/16] address feedback --- .../webapi/target/connector_handler.go | 20 ++- .../target/{capability.go => target.go} | 17 +- .../capabilities/webapi/target/target_test.go | 165 +++++++++++++++--- core/services/gateway/handler_factory.go | 6 +- .../handler.go | 2 +- .../webapi.go | 2 +- .../services/standardcapabilities/delegate.go | 1 - 7 files changed, 169 insertions(+), 44 deletions(-) rename core/capabilities/webapi/target/{capability.go => target.go} (93%) rename core/services/gateway/handlers/{webcapabilities => webapicapabilities}/handler.go (81%) rename core/services/gateway/handlers/{webcapabilities => webapicapabilities}/webapi.go (97%) diff --git a/core/capabilities/webapi/target/connector_handler.go b/core/capabilities/webapi/target/connector_handler.go index 7812fe822b9..e52d2bbb2c6 100644 --- a/core/capabilities/webapi/target/connector_handler.go +++ b/core/capabilities/webapi/target/connector_handler.go @@ -12,7 +12,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" - "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/webcapabilities" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/webapicapabilities" ) var _ connector.GatewayConnectorHandler = &ConnectorHandler{} @@ -53,13 +53,16 @@ func (c *ConnectorHandler) HandleSingleNodeRequest(ctx context.Context, messageI body := &api.MessageBody{ MessageId: messageID, DonId: c.gc.DonID(), - Method: webcapabilities.MethodWebAPITarget, + Method: webapicapabilities.MethodWebAPITarget, Payload: payload, } // simply, send request to first available gateway node from sorted list // this allows for deterministic selection of gateway node receiver for easier debugging gatewayIDs := c.gc.GatewayIDs() + if len(gatewayIDs) == 0 { + return nil, errors.New("no gateway nodes available") + } sort.Strings(gatewayIDs) err := c.gc.SignAndSendToGateway(ctx, gatewayIDs[0], body) @@ -86,8 +89,8 @@ func (c *ConnectorHandler) HandleGatewayMessage(ctx context.Context, gatewayID s } l.Debugw("handling gateway request") switch body.Method { - case webcapabilities.MethodWebAPITarget: - var payload webcapabilities.TargetResponsePayload + case webapicapabilities.MethodWebAPITarget: + var payload webapicapabilities.TargetResponsePayload err := json.Unmarshal(body.Payload, &payload) if err != nil { l.Errorw("failed to unmarshal payload", "err", err) @@ -100,14 +103,19 @@ func (c *ConnectorHandler) HandleGatewayMessage(ctx context.Context, gatewayID s l.Errorw("no response channel found") return } - ch <- msg + select { + case ch <- msg: + delete(c.responseChs, body.MessageId) + case <-ctx.Done(): + return + } default: l.Errorw("unsupported method") } } func (c *ConnectorHandler) Start(ctx context.Context) error { - return c.gc.AddHandler([]string{webcapabilities.MethodWebAPITarget}, c) + return c.gc.AddHandler([]string{webapicapabilities.MethodWebAPITarget}, c) } func (c *ConnectorHandler) Close() error { diff --git a/core/capabilities/webapi/target/capability.go b/core/capabilities/webapi/target/target.go similarity index 93% rename from core/capabilities/webapi/target/capability.go rename to core/capabilities/webapi/target/target.go index a93bfac0c70..ee765c06336 100644 --- a/core/capabilities/webapi/target/capability.go +++ b/core/capabilities/webapi/target/target.go @@ -12,7 +12,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/core" "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/capabilities/validation" - "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/webcapabilities" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/webapicapabilities" ) const ID = "web-api-target@1.0.0" @@ -33,7 +33,7 @@ type Capability struct { registry core.CapabilitiesRegistry config Config registeredWorkflows map[string]struct{} - registeredWorkflowsMu sync.Mutex + registeredWorkflowsMu sync.RWMutex } func NewCapability(config Config, registry core.CapabilitiesRegistry, connectorHandler *ConnectorHandler, lggr logger.Logger) (*Capability, error) { @@ -43,7 +43,7 @@ func NewCapability(config Config, registry core.CapabilitiesRegistry, connectorH registry: registry, connectorHandler: connectorHandler, registeredWorkflows: make(map[string]struct{}), - registeredWorkflowsMu: sync.Mutex{}, + registeredWorkflowsMu: sync.RWMutex{}, lggr: lggr, }, nil } @@ -70,7 +70,7 @@ func getMessageID(req capabilities.CapabilityRequest) (string, error) { messageID := []string{ req.Metadata.WorkflowID, req.Metadata.WorkflowExecutionID, - webcapabilities.MethodWebAPITarget, + webapicapabilities.MethodWebAPITarget, } return strings.Join(messageID, "/"), nil } @@ -95,13 +95,14 @@ func (c *Capability) Execute(ctx context.Context, req capabilities.CapabilityReq return capabilities.CapabilityResponse{}, err } - c.registeredWorkflowsMu.Lock() - defer c.registeredWorkflowsMu.Unlock() + c.registeredWorkflowsMu.RLock() if _, ok := c.registeredWorkflows[req.Metadata.WorkflowID]; !ok { + c.registeredWorkflowsMu.RUnlock() return capabilities.CapabilityResponse{}, fmt.Errorf("workflow is not registered: %v", req.Metadata.WorkflowID) } + c.registeredWorkflowsMu.RUnlock() - payload := webcapabilities.TargetRequestPayload{ + payload := webapicapabilities.TargetRequestPayload{ URL: input.URL, Method: input.Method, Headers: input.Headers, @@ -122,7 +123,7 @@ func (c *Capability) Execute(ctx context.Context, req capabilities.CapabilityReq return capabilities.CapabilityResponse{}, err } c.lggr.Debugw("received gateway response", "resp", resp) - var payload webcapabilities.TargetResponsePayload + var payload webapicapabilities.TargetResponsePayload err = json.Unmarshal(resp.Body.Payload, &payload) if err != nil { return capabilities.CapabilityResponse{}, err diff --git a/core/capabilities/webapi/target/target_test.go b/core/capabilities/webapi/target/target_test.go index 79b3c9a872b..481bd2e6ec4 100644 --- a/core/capabilities/webapi/target/target_test.go +++ b/core/capabilities/webapi/target/target_test.go @@ -1,6 +1,7 @@ package target import ( + "context" "encoding/base64" "encoding/json" "errors" @@ -18,15 +19,25 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" gcmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector/mocks" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" - "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/webcapabilities" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/webapicapabilities" ) const ( workflowID1 = "15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0" + workflowID2 = "44f129ea13948d1c4eaa2bbc0e72319266364cba12b789174732b2f72b57088d" workflowExecutionID1 = "95ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0abbadeed" owner1 = "0x00000000000000000000000000000000000000aa" ) +var defaultConfig = Config{ + RateLimiter: common.RateLimiterConfig{ + GlobalRPS: 100.0, + GlobalBurst: 100, + PerSenderRPS: 100.0, + PerSenderBurst: 100, + }, +} + type testHarness struct { registry *registrymock.CapabilitiesRegistry connector *gcmocks.GatewayConnector @@ -36,18 +47,10 @@ type testHarness struct { capability *Capability } -func setup(t *testing.T) testHarness { +func setup(t *testing.T, config Config) testHarness { registry := registrymock.NewCapabilitiesRegistry(t) connector := gcmocks.NewGatewayConnector(t) lggr := logger.Test(t) - config := Config{ - RateLimiter: common.RateLimiterConfig{ - GlobalRPS: 100.0, - GlobalBurst: 100, - PerSenderRPS: 100.0, - PerSenderBurst: 100, - }, - } connectorHandler, err := NewConnectorHandler(connector, config, lggr) require.NoError(t, err) @@ -64,7 +67,7 @@ func setup(t *testing.T) testHarness { } } -func capabilityRequest(t *testing.T) capabilities.CapabilityRequest { +func inputsAndConfig(t *testing.T) (*values.Map, *values.Map) { data := map[string]string{ "key": "value", } @@ -84,6 +87,11 @@ func capabilityRequest(t *testing.T) capabilities.CapabilityRequest { "schedule": SingleNode, }) require.NoError(t, err) + return inputs, wfConfig +} + +func capabilityRequest(t *testing.T) capabilities.CapabilityRequest { + inputs, wfConfig := inputsAndConfig(t) return capabilities.CapabilityRequest{ Metadata: capabilities.RequestMetadata{ @@ -98,7 +106,7 @@ func capabilityRequest(t *testing.T) capabilities.CapabilityRequest { func gatewayResponse(t *testing.T, msgID string) *api.Message { headers := map[string]string{"Content-Type": "application/json"} body := []byte("response body") - responsePayload, err := json.Marshal(webcapabilities.TargetResponsePayload{ + responsePayload, err := json.Marshal(webapicapabilities.TargetResponsePayload{ StatusCode: 200, Headers: headers, Body: body, @@ -108,14 +116,14 @@ func gatewayResponse(t *testing.T, msgID string) *api.Message { return &api.Message{ Body: api.MessageBody{ MessageId: msgID, - Method: webcapabilities.MethodWebAPITarget, + Method: webapicapabilities.MethodWebAPITarget, Payload: responsePayload, }, } } func TestRegisterUnregister(t *testing.T) { - th := setup(t) + th := setup(t, defaultConfig) ctx := testutils.Context(t) regReq := capabilities.RegisterToWorkflowRequest{ @@ -127,20 +135,43 @@ func TestRegisterUnregister(t *testing.T) { err := th.capability.RegisterToWorkflow(ctx, regReq) require.NoError(t, err) - err = th.capability.UnregisterFromWorkflow(ctx, capabilities.UnregisterFromWorkflowRequest{ - Metadata: capabilities.RegistrationMetadata{ - WorkflowID: workflowID1, - WorkflowOwner: owner1, - }, + t.Run("happy case", func(t *testing.T) { + err = th.capability.UnregisterFromWorkflow(ctx, capabilities.UnregisterFromWorkflowRequest{ + Metadata: capabilities.RegistrationMetadata{ + WorkflowID: workflowID1, + WorkflowOwner: owner1, + }, + }) + require.NoError(t, err) + }) + + t.Run("unregister non-existent workflow no error", func(t *testing.T) { + err = th.capability.UnregisterFromWorkflow(ctx, capabilities.UnregisterFromWorkflowRequest{ + Metadata: capabilities.RegistrationMetadata{ + WorkflowID: workflowID2, + WorkflowOwner: owner1, + }, + }) + require.NoError(t, err) + }) + + t.Run("reregister idempotent", func(t *testing.T) { + regReq := capabilities.RegisterToWorkflowRequest{ + Metadata: capabilities.RegistrationMetadata{ + WorkflowID: workflowID1, + WorkflowOwner: owner1, + }, + } + err := th.capability.RegisterToWorkflow(ctx, regReq) + require.NoError(t, err) }) - require.NoError(t, err) } func TestCapability_Execute(t *testing.T) { - th := setup(t) + th := setup(t, defaultConfig) ctx := testutils.Context(t) - th.connector.On("DonID").Return("donID") - th.connector.On("GatewayIDs").Return([]string{"gateway2", "gateway1"}) + th.connector.EXPECT().DonID().Return("donID") + th.connector.EXPECT().GatewayIDs().Return([]string{"gateway2", "gateway1"}) t.Run("unregistered workflow", func(t *testing.T) { req := capabilityRequest(t) @@ -183,6 +214,92 @@ func TestCapability_Execute(t *testing.T) { require.Equal(t, "response body", respBody) }) + t.Run("context cancelled while waiting for gateway response", func(t *testing.T) { + regReq := capabilities.RegisterToWorkflowRequest{ + Metadata: capabilities.RegistrationMetadata{ + WorkflowID: workflowID1, + WorkflowOwner: owner1, + }, + } + err := th.capability.RegisterToWorkflow(ctx, regReq) + require.NoError(t, err) + + req := capabilityRequest(t) + _, err = getMessageID(req) + require.NoError(t, err) + + newCtx, cancel := context.WithCancel(ctx) + th.connector.On("SignAndSendToGateway", mock.Anything, "gateway1", mock.Anything).Return(nil).Run(func(args mock.Arguments) { + cancel() + }).Once() + + _, err = th.capability.Execute(newCtx, req) + require.Error(t, err) + require.Contains(t, err.Error(), "context canceled") + }) + + t.Run("invalid workflow ID during registration", func(t *testing.T) { + regReq := capabilities.RegisterToWorkflowRequest{ + Metadata: capabilities.RegistrationMetadata{ + WorkflowID: "invalid", + WorkflowOwner: owner1, + }, + } + err := th.capability.RegisterToWorkflow(ctx, regReq) + require.Error(t, err) + require.ErrorContains(t, err, "workflow ID is invalid") + }) + + t.Run("invalid workflow ID during execute", func(t *testing.T) { + regReq := capabilities.RegisterToWorkflowRequest{ + Metadata: capabilities.RegistrationMetadata{ + WorkflowID: workflowID1, + WorkflowOwner: owner1, + }, + } + err := th.capability.RegisterToWorkflow(ctx, regReq) + require.NoError(t, err) + + inputs, wfConfig := inputsAndConfig(t) + req := capabilities.CapabilityRequest{ + Metadata: capabilities.RequestMetadata{ + WorkflowID: "invalid", + WorkflowExecutionID: workflowExecutionID1, + }, + Inputs: inputs, + Config: wfConfig, + } + + _, err = th.capability.Execute(ctx, req) + require.Error(t, err) + require.ErrorContains(t, err, "workflow ID is invalid") + }) + + t.Run("invalid workflow execution ID during execute", func(t *testing.T) { + regReq := capabilities.RegisterToWorkflowRequest{ + Metadata: capabilities.RegistrationMetadata{ + WorkflowID: workflowID1, + WorkflowOwner: owner1, + }, + } + err := th.capability.RegisterToWorkflow(ctx, regReq) + require.NoError(t, err) + + inputs, wfConfig := inputsAndConfig(t) + req := capabilities.CapabilityRequest{ + Metadata: capabilities.RequestMetadata{ + WorkflowID: workflowID1, + WorkflowExecutionID: "invalid", + }, + Inputs: inputs, + Config: wfConfig, + } + + _, err = th.capability.Execute(ctx, req) + require.Error(t, err) + require.ErrorContains(t, err, "workflow execution ID is invalid") + }) + t.Run("unsupported schedule", func(t *testing.T) { regReq := capabilities.RegisterToWorkflowRequest{ Metadata: capabilities.RegistrationMetadata{ @@ -234,7 +351,7 @@ func TestCapability_Execute(t *testing.T) { req := capabilityRequest(t) require.NoError(t, err) - th.connector.On("SignAndSendToGateway", mock.Anything, "gateway1", mock.Anything).Return(errors.New("gateway error")).Once() + th.connector.EXPECT().SignAndSendToGateway(mock.Anything, "gateway1", mock.Anything).Return(errors.New("gateway error")).Once() _, err = th.capability.Execute(ctx, req) require.Error(t, err) require.Contains(t, err.Error(), "gateway error") diff --git a/core/services/gateway/handler_factory.go b/core/services/gateway/handler_factory.go index 520f1b946c2..6793350f317 100644 --- a/core/services/gateway/handler_factory.go +++ b/core/services/gateway/handler_factory.go @@ -13,9 +13,9 @@ import ( ) const ( - FunctionsHandlerType HandlerType = "functions" - DummyHandlerType HandlerType = "dummy" - WebCapabilitiesType HandlerType = "web-capabilities" + FunctionsHandlerType HandlerType = "functions" + DummyHandlerType HandlerType = "dummy" + WebAPICapabilitiesType HandlerType = "web-api-capabilities" ) type handlerFactory struct { diff --git a/core/services/gateway/handlers/webcapabilities/handler.go b/core/services/gateway/handlers/webapicapabilities/handler.go similarity index 81% rename from core/services/gateway/handlers/webcapabilities/handler.go rename to core/services/gateway/handlers/webapicapabilities/handler.go index 15fe6dbdecb..a38651d40fc 100644 --- a/core/services/gateway/handlers/webcapabilities/handler.go +++ b/core/services/gateway/handlers/webapicapabilities/handler.go @@ -1,4 +1,4 @@ -package webcapabilities +package webapicapabilities const ( // NOTE: more methods will go here. HTTP trigger/action/target; etc. diff --git a/core/services/gateway/handlers/webcapabilities/webapi.go b/core/services/gateway/handlers/webapicapabilities/webapi.go similarity index 97% rename from core/services/gateway/handlers/webcapabilities/webapi.go rename to core/services/gateway/handlers/webapicapabilities/webapi.go index ec6781ca71c..e300b61d85b 100644 --- a/core/services/gateway/handlers/webcapabilities/webapi.go +++ b/core/services/gateway/handlers/webapicapabilities/webapi.go @@ -1,4 +1,4 @@ -package webcapabilities +package webapicapabilities type TargetRequestPayload struct { URL string `json:"url"` // URL to query, only http and https protocols are supported. diff --git a/core/services/standardcapabilities/delegate.go b/core/services/standardcapabilities/delegate.go index a5da209308e..15c829fbf84 100644 --- a/core/services/standardcapabilities/delegate.go +++ b/core/services/standardcapabilities/delegate.go @@ -97,7 +97,6 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser if len(spec.StandardCapabilitiesSpec.Config) == 0 { return nil, errors.New("config is empty") } - // TODO: check if TOML is the only supported format here var targetCfg webapitarget.Config err := toml.Unmarshal([]byte(spec.StandardCapabilitiesSpec.Config), &targetCfg) if err != nil { From 7496048b77cab14feeb3276451a32041a121f2c7 Mon Sep 17 00:00:00 2001 From: jinhoonbang Date: Mon, 23 Sep 2024 12:18:59 -0700 Subject: [PATCH 09/16] add gateway handler for node messages and http client for outgoing messages. pending rate limiting and unit tests. --- core/services/gateway/delegate.go | 8 +- core/services/gateway/gateway.go | 1 + core/services/gateway/handler_factory.go | 8 +- core/services/gateway/handlers/handler.go | 1 + .../handlers/webapicapabilities/handler.go | 137 ++++++++++++++++++ core/services/gateway/network/httpclient.go | 88 +++++++++++ 6 files changed, 241 insertions(+), 2 deletions(-) create mode 100644 core/services/gateway/network/httpclient.go diff --git a/core/services/gateway/delegate.go b/core/services/gateway/delegate.go index 5a30228db4c..4b2daeb6a46 100644 --- a/core/services/gateway/delegate.go +++ b/core/services/gateway/delegate.go @@ -12,6 +12,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/config" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/network" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" ) @@ -54,7 +55,12 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) (services if err2 != nil { return nil, errors.Wrap(err2, "unmarshal gateway config") } - handlerFactory := NewHandlerFactory(d.legacyChains, d.ds, d.lggr) + //TODO: pass in config + httpClient, err := network.NewHTTPClient(nil, d.lggr) + if err != nil { + return nil, err + } + handlerFactory := NewHandlerFactory(d.legacyChains, d.ds, httpClient, d.lggr) gateway, err := NewGatewayFromConfig(&gatewayConfig, handlerFactory, d.lggr) if err != nil { return nil, err diff --git a/core/services/gateway/gateway.go b/core/services/gateway/gateway.go index 93ecc474bec..2e2e34d038c 100644 --- a/core/services/gateway/gateway.go +++ b/core/services/gateway/gateway.go @@ -47,6 +47,7 @@ type gateway struct { codec api.Codec httpServer gw_net.HttpServer + httpClient gw_net.HttpClient handlers map[string]handlers.Handler connMgr ConnectionManager lggr logger.Logger diff --git a/core/services/gateway/handler_factory.go b/core/services/gateway/handler_factory.go index 6793350f317..acfe0198d51 100644 --- a/core/services/gateway/handler_factory.go +++ b/core/services/gateway/handler_factory.go @@ -10,6 +10,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/gateway/config" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/webapicapabilities" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/network" ) const ( @@ -22,15 +24,17 @@ type handlerFactory struct { legacyChains legacyevm.LegacyChainContainer ds sqlutil.DataSource lggr logger.Logger + httpClient network.HttpClient } var _ HandlerFactory = (*handlerFactory)(nil) -func NewHandlerFactory(legacyChains legacyevm.LegacyChainContainer, ds sqlutil.DataSource, lggr logger.Logger) HandlerFactory { +func NewHandlerFactory(legacyChains legacyevm.LegacyChainContainer, ds sqlutil.DataSource, httpClient network.HttpClient, lggr logger.Logger) HandlerFactory { return &handlerFactory{ legacyChains, ds, lggr, + httpClient, } } @@ -40,6 +44,8 @@ func (hf *handlerFactory) NewHandler(handlerType HandlerType, handlerConfig json return functions.NewFunctionsHandlerFromConfig(handlerConfig, donConfig, don, hf.legacyChains, hf.ds, hf.lggr) case DummyHandlerType: return handlers.NewDummyHandler(donConfig, don, hf.lggr) + case WebAPICapabilitiesType: + return webapicapabilities.NewHandler(donConfig, don, hf.httpClient, hf.lggr) default: return nil, fmt.Errorf("unsupported handler type %s", handlerType) } diff --git a/core/services/gateway/handlers/handler.go b/core/services/gateway/handlers/handler.go index 6994488707f..9b2158652d5 100644 --- a/core/services/gateway/handlers/handler.go +++ b/core/services/gateway/handlers/handler.go @@ -32,6 +32,7 @@ type Handler interface { HandleUserMessage(ctx context.Context, msg *api.Message, callbackCh chan<- UserCallbackPayload) error // Handlers should not make any assumptions about goroutines calling HandleNodeMessage + // HandleNodeMessage should not be blocking HandleNodeMessage(ctx context.Context, msg *api.Message, nodeAddr string) error } diff --git a/core/services/gateway/handlers/webapicapabilities/handler.go b/core/services/gateway/handlers/webapicapabilities/handler.go index a38651d40fc..f79c0473e0b 100644 --- a/core/services/gateway/handlers/webapicapabilities/handler.go +++ b/core/services/gateway/handlers/webapicapabilities/handler.go @@ -1,6 +1,143 @@ package webapicapabilities +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/config" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/network" +) + const ( // NOTE: more methods will go here. HTTP trigger/action/target; etc. MethodWebAPITarget = "web_api_target" ) + +type handler struct { + don handlers.DON + lggr logger.Logger + httpClient network.HttpClient +} + +type savedCallback struct { + id string + callbackCh chan<- handlers.UserCallbackPayload +} + +var _ handlers.Handler = (*handler)(nil) + +func NewHandler(donConfig *config.DONConfig, don handlers.DON, httpClient network.HttpClient, lggr logger.Logger) (*handler, error) { + return &handler{ + don: don, + lggr: lggr.Named("WebAPIHandler." + donConfig.DonId), + httpClient: httpClient, + }, nil +} + +func (h *handler) HandleUserMessage(ctx context.Context, msg *api.Message, callbackCh chan<- handlers.UserCallbackPayload) error { + return nil +} + +// sendHttpMessageToClient is an outgoing message from the gateway to external endpoints +// returns message to be sent back to the capability node +func (h *handler) sendHttpMessageToClient(ctx context.Context, req network.HttpRequest, msg *api.Message) (*api.Message, error) { + var payload TargetResponsePayload + resp, err := h.httpClient.Send(ctx, req) + if err != nil { + return nil, err + } else { + payload = TargetResponsePayload{ + Success: true, + StatusCode: uint8(resp.StatusCode), + Headers: resp.Headers, + Body: resp.Body, + } + } + payloadBytes, err := json.Marshal(payload) + if err != nil { + return nil, err + } + return &api.Message{ + Body: api.MessageBody{ + MessageId: msg.Body.MessageId, + Method: msg.Body.Method, + DonId: msg.Body.DonId, + Payload: payloadBytes, + }, + }, nil +} + +func (h *handler) handleWebAPITargetMessage(ctx context.Context, msg *api.Message, nodeAddr string) error { + h.lggr.Debugw("handling web api target message", "messageId", msg.Body.MessageId) + var targetPayload TargetRequestPayload + err := json.Unmarshal(msg.Body.Payload, &targetPayload) + if err != nil { + return err + } + // send message to target + req := network.HttpRequest{ + Method: targetPayload.Method, + URL: targetPayload.URL, + Headers: targetPayload.Headers, + Body: targetPayload.Body, + Timeout: time.Duration(targetPayload.TimeoutMs) * time.Millisecond, + } + // this handle method must be non-blocking + // send response to node (target capability) async + // if there is a non-HTTP error (e.g. malformed request), send payload with success set to false and error messages + go func() { + l := h.lggr.With("url", targetPayload.URL, "messageId", msg.Body.MessageId, "method", targetPayload.Method) + respMsg, err := h.sendHttpMessageToClient(ctx, req, msg) + if err != nil { + l.Errorw("error while sending HTTP request to external endpoint", "err", err) + payload := TargetResponsePayload{ + Success: false, + ErrorMessage: err.Error(), + } + payloadBytes, err := json.Marshal(payload) + if err != nil { + // should not happen + l.Errorw("error while marshalling payload", "err", err) + return + } + respMsg = &api.Message{ + Body: api.MessageBody{ + MessageId: msg.Body.MessageId, + Method: msg.Body.Method, + DonId: msg.Body.DonId, + Payload: payloadBytes, + }, + } + } + err = h.don.SendToNode(ctx, nodeAddr, respMsg) + if err != nil { + l.Errorw("failed to send to node", "err", err, "to", nodeAddr) + return + } + }() + return nil +} + +func (h *handler) HandleNodeMessage(ctx context.Context, msg *api.Message, nodeAddr string) error { + // TODO: rate limiting + switch msg.Body.Method { + case MethodWebAPITarget: + return h.handleWebAPITargetMessage(ctx, msg, nodeAddr) + default: + return fmt.Errorf("unsupported method: %s", msg.Body.Method) + } +} + +func (h *handler) Start(context.Context) error { + // TODO: do anything here? + return nil +} + +func (h *handler) Close() error { + return nil +} diff --git a/core/services/gateway/network/httpclient.go b/core/services/gateway/network/httpclient.go new file mode 100644 index 00000000000..80063c2b7d2 --- /dev/null +++ b/core/services/gateway/network/httpclient.go @@ -0,0 +1,88 @@ +package network + +import ( + "bytes" + "context" + "io" + "net/http" + "strings" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +// HttpClient interfaces defines a method to send HTTP requests +// TODO: handle retries +type HttpClient interface { + Send(ctx context.Context, req HttpRequest) (*HttpResponse, error) +} + +type HttpClientConfig interface { + MaxResponseBytes() int64 + DefaultTimeout() time.Duration +} + +type HttpRequest struct { + Method string + URL string + Headers map[string]string + Body []byte + Timeout time.Duration +} +type HttpResponse struct { + StatusCode int // HTTP status code + Headers map[string]string // HTTP headers + Body []byte // HTTP response body +} + +type httpClient struct { + client *http.Client + config HttpClientConfig + lggr logger.Logger +} + +// NewHttpClient creates a new HttpClient +// As of now, the client does not support TLS configuration but may be extended in the future +func NewHTTPClient(config HttpClientConfig, lggr logger.Logger) (HttpClient, error) { + return &httpClient{ + client: &http.Client{ + Timeout: config.DefaultTimeout(), + Transport: http.DefaultTransport, + }, + lggr: lggr, + }, nil +} + +func (c *httpClient) Send(ctx context.Context, req HttpRequest) (*HttpResponse, error) { + timeoutCtx, cancel := context.WithTimeout(ctx, req.Timeout) + defer cancel() + r, err := http.NewRequestWithContext(timeoutCtx, req.Method, req.URL, bytes.NewBuffer(req.Body)) + if err != nil { + return nil, err + } + + resp, err := c.client.Do(r) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + reader := http.MaxBytesReader(nil, resp.Body, c.config.MaxResponseBytes()) + body, err := io.ReadAll(reader) + if err != nil { + return nil, err + } + headers := make(map[string]string) + for k, v := range resp.Header { + // header values are usually an array of size 1 + // joining them to a single string in case array size is greater than 1 + headers[k] = strings.Join(v, ",") + } + c.lggr.Debugw("received HTTP response", "statusCode", resp.StatusCode, "body", string(body), "url", req.URL, "headers", headers) + + return &HttpResponse{ + Headers: headers, + StatusCode: resp.StatusCode, + Body: body, + }, nil +} From ce2df98ed0e29010c8fc9b1eb1ab8a1dff37556a Mon Sep 17 00:00:00 2001 From: jinhoonbang Date: Mon, 23 Sep 2024 15:45:55 -0700 Subject: [PATCH 10/16] implement rate limiter and add unit test http client --- core/services/gateway/config/config.go | 4 +- core/services/gateway/delegate.go | 3 +- core/services/gateway/gateway.go | 2 +- core/services/gateway/handler_factory.go | 6 +-- core/services/gateway/handlers/handler.go | 4 +- .../handlers/webapicapabilities/handler.go | 47 ++++++++++++------- core/services/gateway/network/httpclient.go | 32 ++++++------- 7 files changed, 56 insertions(+), 42 deletions(-) diff --git a/core/services/gateway/config/config.go b/core/services/gateway/config/config.go index a4d94155c8f..02c1b44869f 100644 --- a/core/services/gateway/config/config.go +++ b/core/services/gateway/config/config.go @@ -10,7 +10,9 @@ type GatewayConfig struct { UserServerConfig gw_net.HTTPServerConfig NodeServerConfig gw_net.WebSocketServerConfig ConnectionManagerConfig ConnectionManagerConfig - Dons []DONConfig + // HTTPClientConfig is configuration for outbound HTTP calls to external endpoints + HTTPClientConfig gw_net.HTTPClientConfig + Dons []DONConfig } type ConnectionManagerConfig struct { diff --git a/core/services/gateway/delegate.go b/core/services/gateway/delegate.go index 4b2daeb6a46..ba059b15a35 100644 --- a/core/services/gateway/delegate.go +++ b/core/services/gateway/delegate.go @@ -55,8 +55,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) (services if err2 != nil { return nil, errors.Wrap(err2, "unmarshal gateway config") } - //TODO: pass in config - httpClient, err := network.NewHTTPClient(nil, d.lggr) + httpClient, err := network.NewHTTPClient(gatewayConfig.HTTPClientConfig, d.lggr) if err != nil { return nil, err } diff --git a/core/services/gateway/gateway.go b/core/services/gateway/gateway.go index 2e2e34d038c..13f2e95b15c 100644 --- a/core/services/gateway/gateway.go +++ b/core/services/gateway/gateway.go @@ -47,7 +47,7 @@ type gateway struct { codec api.Codec httpServer gw_net.HttpServer - httpClient gw_net.HttpClient + httpClient gw_net.HTTPClient handlers map[string]handlers.Handler connMgr ConnectionManager lggr logger.Logger diff --git a/core/services/gateway/handler_factory.go b/core/services/gateway/handler_factory.go index acfe0198d51..0c1eeaf676e 100644 --- a/core/services/gateway/handler_factory.go +++ b/core/services/gateway/handler_factory.go @@ -24,12 +24,12 @@ type handlerFactory struct { legacyChains legacyevm.LegacyChainContainer ds sqlutil.DataSource lggr logger.Logger - httpClient network.HttpClient + httpClient network.HTTPClient } var _ HandlerFactory = (*handlerFactory)(nil) -func NewHandlerFactory(legacyChains legacyevm.LegacyChainContainer, ds sqlutil.DataSource, httpClient network.HttpClient, lggr logger.Logger) HandlerFactory { +func NewHandlerFactory(legacyChains legacyevm.LegacyChainContainer, ds sqlutil.DataSource, httpClient network.HTTPClient, lggr logger.Logger) HandlerFactory { return &handlerFactory{ legacyChains, ds, @@ -45,7 +45,7 @@ func (hf *handlerFactory) NewHandler(handlerType HandlerType, handlerConfig json case DummyHandlerType: return handlers.NewDummyHandler(donConfig, don, hf.lggr) case WebAPICapabilitiesType: - return webapicapabilities.NewHandler(donConfig, don, hf.httpClient, hf.lggr) + return webapicapabilities.NewHandler(handlerConfig, donConfig, don, hf.httpClient, hf.lggr) default: return nil, fmt.Errorf("unsupported handler type %s", handlerType) } diff --git a/core/services/gateway/handlers/handler.go b/core/services/gateway/handlers/handler.go index 9b2158652d5..b9fe4234d25 100644 --- a/core/services/gateway/handlers/handler.go +++ b/core/services/gateway/handlers/handler.go @@ -31,8 +31,8 @@ type Handler interface { // 2. waits on callbackCh with a timeout HandleUserMessage(ctx context.Context, msg *api.Message, callbackCh chan<- UserCallbackPayload) error - // Handlers should not make any assumptions about goroutines calling HandleNodeMessage - // HandleNodeMessage should not be blocking + // Handlers should not make any assumptions about goroutines calling HandleNodeMessage. + // should be non-blocking HandleNodeMessage(ctx context.Context, msg *api.Message, nodeAddr string) error } diff --git a/core/services/gateway/handlers/webapicapabilities/handler.go b/core/services/gateway/handlers/webapicapabilities/handler.go index f79c0473e0b..5bd80f9db52 100644 --- a/core/services/gateway/handlers/webapicapabilities/handler.go +++ b/core/services/gateway/handlers/webapicapabilities/handler.go @@ -10,6 +10,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/config" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/network" ) @@ -19,23 +20,34 @@ const ( ) type handler struct { - don handlers.DON - lggr logger.Logger - httpClient network.HttpClient + don handlers.DON + lggr logger.Logger + httpClient network.HTTPClient + nodeRateLimiter *common.RateLimiter } -type savedCallback struct { - id string - callbackCh chan<- handlers.UserCallbackPayload +type HandlerConfig struct { + NodeRateLimiter common.RateLimiterConfig `json:"nodeRateLimiter"` } var _ handlers.Handler = (*handler)(nil) -func NewHandler(donConfig *config.DONConfig, don handlers.DON, httpClient network.HttpClient, lggr logger.Logger) (*handler, error) { +func NewHandler(handlerConfig json.RawMessage, donConfig *config.DONConfig, don handlers.DON, httpClient network.HTTPClient, lggr logger.Logger) (*handler, error) { + var cfg HandlerConfig + err := json.Unmarshal(handlerConfig, &cfg) + if err != nil { + return nil, err + } + nodeRateLimiter, err := common.NewRateLimiter(cfg.NodeRateLimiter) + if err != nil { + return nil, err + } + return &handler{ - don: don, - lggr: lggr.Named("WebAPIHandler." + donConfig.DonId), - httpClient: httpClient, + don: don, + lggr: lggr.Named("WebAPIHandler." + donConfig.DonId), + httpClient: httpClient, + nodeRateLimiter: nodeRateLimiter, }, nil } @@ -43,9 +55,9 @@ func (h *handler) HandleUserMessage(ctx context.Context, msg *api.Message, callb return nil } -// sendHttpMessageToClient is an outgoing message from the gateway to external endpoints +// sendHTTPMessageToClient is an outgoing message from the gateway to external endpoints // returns message to be sent back to the capability node -func (h *handler) sendHttpMessageToClient(ctx context.Context, req network.HttpRequest, msg *api.Message) (*api.Message, error) { +func (h *handler) sendHTTPMessageToClient(ctx context.Context, req network.HTTPRequest, msg *api.Message) (*api.Message, error) { var payload TargetResponsePayload resp, err := h.httpClient.Send(ctx, req) if err != nil { @@ -73,14 +85,17 @@ func (h *handler) sendHttpMessageToClient(ctx context.Context, req network.HttpR } func (h *handler) handleWebAPITargetMessage(ctx context.Context, msg *api.Message, nodeAddr string) error { - h.lggr.Debugw("handling web api target message", "messageId", msg.Body.MessageId) + h.lggr.Debugw("handling web api target message", "messageId", msg.Body.MessageId, "nodeAddr", nodeAddr) + if !h.nodeRateLimiter.Allow(nodeAddr) { + return fmt.Errorf("rate limit exceeded for node %s", nodeAddr) + } var targetPayload TargetRequestPayload err := json.Unmarshal(msg.Body.Payload, &targetPayload) if err != nil { return err } // send message to target - req := network.HttpRequest{ + req := network.HTTPRequest{ Method: targetPayload.Method, URL: targetPayload.URL, Headers: targetPayload.Headers, @@ -92,7 +107,7 @@ func (h *handler) handleWebAPITargetMessage(ctx context.Context, msg *api.Messag // if there is a non-HTTP error (e.g. malformed request), send payload with success set to false and error messages go func() { l := h.lggr.With("url", targetPayload.URL, "messageId", msg.Body.MessageId, "method", targetPayload.Method) - respMsg, err := h.sendHttpMessageToClient(ctx, req, msg) + respMsg, err := h.sendHTTPMessageToClient(ctx, req, msg) if err != nil { l.Errorw("error while sending HTTP request to external endpoint", "err", err) payload := TargetResponsePayload{ @@ -124,7 +139,6 @@ func (h *handler) handleWebAPITargetMessage(ctx context.Context, msg *api.Messag } func (h *handler) HandleNodeMessage(ctx context.Context, msg *api.Message, nodeAddr string) error { - // TODO: rate limiting switch msg.Body.Method { case MethodWebAPITarget: return h.handleWebAPITargetMessage(ctx, msg, nodeAddr) @@ -134,7 +148,6 @@ func (h *handler) HandleNodeMessage(ctx context.Context, msg *api.Message, nodeA } func (h *handler) Start(context.Context) error { - // TODO: do anything here? return nil } diff --git a/core/services/gateway/network/httpclient.go b/core/services/gateway/network/httpclient.go index 80063c2b7d2..4aecaaed3cd 100644 --- a/core/services/gateway/network/httpclient.go +++ b/core/services/gateway/network/httpclient.go @@ -11,25 +11,24 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" ) -// HttpClient interfaces defines a method to send HTTP requests -// TODO: handle retries -type HttpClient interface { - Send(ctx context.Context, req HttpRequest) (*HttpResponse, error) +// HTTPClient interfaces defines a method to send HTTP requests +type HTTPClient interface { + Send(ctx context.Context, req HTTPRequest) (*HTTPResponse, error) } -type HttpClientConfig interface { - MaxResponseBytes() int64 - DefaultTimeout() time.Duration +type HTTPClientConfig struct { + MaxResponseBytes uint32 + DefaultTimeout time.Duration } -type HttpRequest struct { +type HTTPRequest struct { Method string URL string Headers map[string]string Body []byte Timeout time.Duration } -type HttpResponse struct { +type HTTPResponse struct { StatusCode int // HTTP status code Headers map[string]string // HTTP headers Body []byte // HTTP response body @@ -37,23 +36,24 @@ type HttpResponse struct { type httpClient struct { client *http.Client - config HttpClientConfig + config HTTPClientConfig lggr logger.Logger } -// NewHttpClient creates a new HttpClient +// NewHTTPClient creates a new NewHTTPClient // As of now, the client does not support TLS configuration but may be extended in the future -func NewHTTPClient(config HttpClientConfig, lggr logger.Logger) (HttpClient, error) { +func NewHTTPClient(config HTTPClientConfig, lggr logger.Logger) (HTTPClient, error) { return &httpClient{ + config: config, client: &http.Client{ - Timeout: config.DefaultTimeout(), + Timeout: config.DefaultTimeout, Transport: http.DefaultTransport, }, lggr: lggr, }, nil } -func (c *httpClient) Send(ctx context.Context, req HttpRequest) (*HttpResponse, error) { +func (c *httpClient) Send(ctx context.Context, req HTTPRequest) (*HTTPResponse, error) { timeoutCtx, cancel := context.WithTimeout(ctx, req.Timeout) defer cancel() r, err := http.NewRequestWithContext(timeoutCtx, req.Method, req.URL, bytes.NewBuffer(req.Body)) @@ -67,7 +67,7 @@ func (c *httpClient) Send(ctx context.Context, req HttpRequest) (*HttpResponse, } defer resp.Body.Close() - reader := http.MaxBytesReader(nil, resp.Body, c.config.MaxResponseBytes()) + reader := http.MaxBytesReader(nil, resp.Body, int64(c.config.MaxResponseBytes)) body, err := io.ReadAll(reader) if err != nil { return nil, err @@ -80,7 +80,7 @@ func (c *httpClient) Send(ctx context.Context, req HttpRequest) (*HttpResponse, } c.lggr.Debugw("received HTTP response", "statusCode", resp.StatusCode, "body", string(body), "url", req.URL, "headers", headers) - return &HttpResponse{ + return &HTTPResponse{ Headers: headers, StatusCode: resp.StatusCode, Body: body, From cef8acfefbaa82ebddc870b8dbf3f053e8d4e2ca Mon Sep 17 00:00:00 2001 From: jinhoonbang Date: Mon, 23 Sep 2024 17:18:10 -0700 Subject: [PATCH 11/16] add http client mock and unit tests --- .mockery.yaml | 1 + .../capabilities/webapi/target/target_test.go | 8 +- .../handlers/webapicapabilities/handler.go | 18 +- .../webapicapabilities/handler_test.go | 161 ++++++++++++++++++ .../handlers/webapicapabilities/webapi.go | 10 +- .../gateway/network/httpclient_test.go | 143 ++++++++++++++++ .../gateway/network/mocks/http_client.go | 96 +++++++++++ 7 files changed, 419 insertions(+), 18 deletions(-) create mode 100644 core/services/gateway/handlers/webapicapabilities/handler_test.go create mode 100644 core/services/gateway/network/httpclient_test.go create mode 100644 core/services/gateway/network/mocks/http_client.go diff --git a/.mockery.yaml b/.mockery.yaml index b22875e3f9f..709134b05bd 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -212,6 +212,7 @@ packages: HttpServer: HTTPRequestHandler: WebSocketServer: + HTTPClient: github.com/smartcontractkit/chainlink/v2/core/services/job: interfaces: ServiceCtx: diff --git a/core/capabilities/webapi/target/target_test.go b/core/capabilities/webapi/target/target_test.go index 481bd2e6ec4..a608a9f0070 100644 --- a/core/capabilities/webapi/target/target_test.go +++ b/core/capabilities/webapi/target/target_test.go @@ -107,10 +107,10 @@ func gatewayResponse(t *testing.T, msgID string) *api.Message { headers := map[string]string{"Content-Type": "application/json"} body := []byte("response body") responsePayload, err := json.Marshal(webapicapabilities.TargetResponsePayload{ - StatusCode: 200, - Headers: headers, - Body: body, - Success: true, + StatusCode: 200, + Headers: headers, + Body: body, + ExecutionError: false, }) require.NoError(t, err) return &api.Message{ diff --git a/core/services/gateway/handlers/webapicapabilities/handler.go b/core/services/gateway/handlers/webapicapabilities/handler.go index 5bd80f9db52..413cbd6cb16 100644 --- a/core/services/gateway/handlers/webapicapabilities/handler.go +++ b/core/services/gateway/handlers/webapicapabilities/handler.go @@ -64,10 +64,10 @@ func (h *handler) sendHTTPMessageToClient(ctx context.Context, req network.HTTPR return nil, err } else { payload = TargetResponsePayload{ - Success: true, - StatusCode: uint8(resp.StatusCode), - Headers: resp.Headers, - Body: resp.Body, + ExecutionError: false, + StatusCode: uint16(resp.StatusCode), + Headers: resp.Headers, + Body: resp.Body, } } payloadBytes, err := json.Marshal(payload) @@ -111,13 +111,13 @@ func (h *handler) handleWebAPITargetMessage(ctx context.Context, msg *api.Messag if err != nil { l.Errorw("error while sending HTTP request to external endpoint", "err", err) payload := TargetResponsePayload{ - Success: false, - ErrorMessage: err.Error(), + ExecutionError: true, + ErrorMessage: err.Error(), } - payloadBytes, err := json.Marshal(payload) - if err != nil { + payloadBytes, err2 := json.Marshal(payload) + if err2 != nil { // should not happen - l.Errorw("error while marshalling payload", "err", err) + l.Errorw("error while marshalling payload", "err", err2) return } respMsg = &api.Message{ diff --git a/core/services/gateway/handlers/webapicapabilities/handler_test.go b/core/services/gateway/handlers/webapicapabilities/handler_test.go new file mode 100644 index 00000000000..b66facc1751 --- /dev/null +++ b/core/services/gateway/handlers/webapicapabilities/handler_test.go @@ -0,0 +1,161 @@ +package webapicapabilities + +import ( + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" + gwcommon "github.com/smartcontractkit/chainlink/v2/core/services/gateway/common" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/config" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" + handlermocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/mocks" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/network" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/network/mocks" +) + +func setupHandler(t *testing.T) (*handler, *mocks.HTTPClient, *handlermocks.DON, []gwcommon.TestNode) { + lggr := logger.TestLogger(t) + httpClient := mocks.NewHTTPClient(t) + don := handlermocks.NewDON(t) + nodeRateLimiterConfig := common.RateLimiterConfig{ + GlobalRPS: 100.0, + GlobalBurst: 100, + PerSenderRPS: 100.0, + PerSenderBurst: 100, + } + handlerConfig := HandlerConfig{ + NodeRateLimiter: nodeRateLimiterConfig, + } + cfgBytes, err := json.Marshal(handlerConfig) + require.NoError(t, err) + donConfig := &config.DONConfig{ + Members: []config.NodeConfig{}, + F: 1, + } + nodes := gwcommon.NewTestNodes(t, 2) + for id, n := range nodes { + donConfig.Members = append(donConfig.Members, config.NodeConfig{ + Name: fmt.Sprintf("node_%d", id), + Address: n.Address, + }) + } + + handler, err := NewHandler(json.RawMessage(cfgBytes), donConfig, don, httpClient, lggr) + require.NoError(t, err) + return handler, httpClient, don, nodes +} + +func TestHandler_SendHTTPMessageToClient(t *testing.T) { + handler, httpClient, don, nodes := setupHandler(t) + ctx := testutils.Context(t) + nodeAddr := nodes[0].Address + payload := TargetRequestPayload{ + Method: "GET", + URL: "http://example.com", + Headers: map[string]string{}, + Body: nil, + TimeoutMs: 2000, + } + payloadBytes, err := json.Marshal(payload) + require.NoError(t, err) + msg := &api.Message{ + Body: api.MessageBody{ + MessageId: "123", + Method: MethodWebAPITarget, + DonId: "testDonId", + Payload: json.RawMessage(payloadBytes), + }, + } + + t.Run("happy case", func(t *testing.T) { + httpClient.EXPECT().Send(ctx, mock.Anything).Return(&network.HTTPResponse{ + StatusCode: 200, + Headers: map[string]string{}, + Body: []byte("response body"), + }, nil).Once() + + don.EXPECT().SendToNode(ctx, nodes[0].Address, mock.MatchedBy(func(m *api.Message) bool { + var payload TargetResponsePayload + err = json.Unmarshal(m.Body.Payload, &payload) + if err != nil { + return false + } + return "123" == m.Body.MessageId && + MethodWebAPITarget == m.Body.Method && + "testDonId" == m.Body.DonId && + 200 == payload.StatusCode && + 0 == len(payload.Headers) && + string(payload.Body) == "response body" && + !payload.ExecutionError + })).Return(nil).Once() + + err = handler.HandleNodeMessage(ctx, msg, nodeAddr) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return httpClient.AssertExpectations(t) && don.AssertExpectations(t) + }, tests.WaitTimeout(t), 100*time.Millisecond) + }) + + t.Run("http client non-HTTP error", func(t *testing.T) { + httpClient.EXPECT().Send(ctx, mock.Anything).Return(&network.HTTPResponse{ + StatusCode: 404, + Headers: map[string]string{}, + Body: []byte("access denied"), + }, nil).Once() + + don.EXPECT().SendToNode(ctx, nodes[0].Address, mock.MatchedBy(func(m *api.Message) bool { + var payload TargetResponsePayload + err = json.Unmarshal(m.Body.Payload, &payload) + if err != nil { + return false + } + return "123" == m.Body.MessageId && + MethodWebAPITarget == m.Body.Method && + "testDonId" == m.Body.DonId && + 404 == payload.StatusCode && + string(payload.Body) == "access denied" && + 0 == len(payload.Headers) && + !payload.ExecutionError + })).Return(nil).Once() + + err = handler.HandleNodeMessage(ctx, msg, nodeAddr) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return httpClient.AssertExpectations(t) && don.AssertExpectations(t) + }, tests.WaitTimeout(t), 100*time.Millisecond) + }) + + t.Run("http client non-HTTP error", func(t *testing.T) { + httpClient.EXPECT().Send(ctx, mock.Anything).Return(nil, fmt.Errorf("error while marshalling")).Once() + + don.EXPECT().SendToNode(ctx, nodes[0].Address, mock.MatchedBy(func(m *api.Message) bool { + var payload TargetResponsePayload + err = json.Unmarshal(m.Body.Payload, &payload) + if err != nil { + return false + } + return "123" == m.Body.MessageId && + MethodWebAPITarget == m.Body.Method && + "testDonId" == m.Body.DonId && + payload.ExecutionError && + "error while marshalling" == payload.ErrorMessage + })).Return(nil).Once() + + err = handler.HandleNodeMessage(ctx, msg, nodeAddr) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return httpClient.AssertExpectations(t) && don.AssertExpectations(t) + }, tests.WaitTimeout(t), 100*time.Millisecond) + }) +} diff --git a/core/services/gateway/handlers/webapicapabilities/webapi.go b/core/services/gateway/handlers/webapicapabilities/webapi.go index e300b61d85b..0bb5e299962 100644 --- a/core/services/gateway/handlers/webapicapabilities/webapi.go +++ b/core/services/gateway/handlers/webapicapabilities/webapi.go @@ -9,9 +9,9 @@ type TargetRequestPayload struct { } type TargetResponsePayload struct { - Success bool `json:"success"` // true if HTTP request was successful - ErrorMessage string `json:"error_message,omitempty"` // error message in case of failure - StatusCode uint8 `json:"statusCode"` // HTTP status code - Headers map[string]string `json:"headers,omitempty"` // HTTP headers - Body []byte `json:"body,omitempty"` // HTTP response body + ExecutionError bool `json:"executionError"` // true if there were non-HTTP errors. false if HTTP request was sent regardless of status (2xx, 4xx, 5xx) + ErrorMessage string `json:"errorMessage,omitempty"` // error message in case of failure + StatusCode uint16 `json:"statusCode,omitempty"` // HTTP status code + Headers map[string]string `json:"headers,omitempty"` // HTTP headers + Body []byte `json:"body,omitempty"` // HTTP response body } diff --git a/core/services/gateway/network/httpclient_test.go b/core/services/gateway/network/httpclient_test.go new file mode 100644 index 00000000000..cb76755ea1c --- /dev/null +++ b/core/services/gateway/network/httpclient_test.go @@ -0,0 +1,143 @@ +package network_test + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/network" +) + +func TestHTTPClient_Send(t *testing.T) { + t.Parallel() + + // Setup the test environment + lggr := logger.Test(t) + config := network.HTTPClientConfig{ + MaxResponseBytes: 1024, + DefaultTimeout: 5 * time.Second, + } + client, err := network.NewHTTPClient(config, lggr) + require.NoError(t, err) + + // Define test cases + tests := []struct { + name string + setupServer func() *httptest.Server + request network.HTTPRequest + expectedError error + expectedResp *network.HTTPResponse + }{ + { + name: "successful request", + setupServer: func() *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("success")) + })) + }, + request: network.HTTPRequest{ + Method: "GET", + URL: "/", + Headers: map[string]string{}, + Body: nil, + Timeout: 2 * time.Second, + }, + expectedError: nil, + expectedResp: &network.HTTPResponse{ + StatusCode: http.StatusOK, + Headers: map[string]string{"Content-Length": "7"}, + Body: []byte("success"), + }, + }, + { + name: "request timeout", + setupServer: func() *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(10 * time.Second) + w.WriteHeader(http.StatusOK) + w.Write([]byte("success")) + })) + }, + request: network.HTTPRequest{ + Method: "GET", + URL: "/", + Headers: map[string]string{}, + Body: nil, + Timeout: 1 * time.Second, + }, + expectedError: context.DeadlineExceeded, + expectedResp: nil, + }, + { + name: "server error", + setupServer: func() *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("error")) + })) + }, + request: network.HTTPRequest{ + Method: "GET", + URL: "/", + Headers: map[string]string{}, + Body: nil, + Timeout: 2 * time.Second, + }, + expectedError: nil, + expectedResp: &network.HTTPResponse{ + StatusCode: http.StatusInternalServerError, + Headers: map[string]string{"Content-Length": "5"}, + Body: []byte("error"), + }, + }, + { + name: "response too long", + setupServer: func() *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write(make([]byte, 2048)) // Response body longer than MaxResponseBytes + })) + }, + request: network.HTTPRequest{ + Method: "GET", + URL: "/", + Headers: map[string]string{}, + Body: nil, + Timeout: 2 * time.Second, + }, + expectedError: &http.MaxBytesError{}, + expectedResp: nil, + }, + } + + // Execute test cases + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + server := tt.setupServer() + defer server.Close() + + tt.request.URL = server.URL + tt.request.URL + + resp, err := client.Send(context.Background(), tt.request) + if tt.expectedError != nil { + require.Error(t, err) + require.ErrorContains(t, err, tt.expectedError.Error()) + } else { + require.NoError(t, err) + require.Equal(t, tt.expectedResp.StatusCode, resp.StatusCode) + for k, v := range tt.expectedResp.Headers { + value, ok := resp.Headers[k] + require.True(t, ok) + require.Equal(t, v, value) + } + require.Equal(t, tt.expectedResp.Body, resp.Body) + } + }) + } +} diff --git a/core/services/gateway/network/mocks/http_client.go b/core/services/gateway/network/mocks/http_client.go new file mode 100644 index 00000000000..8b5bff2cccf --- /dev/null +++ b/core/services/gateway/network/mocks/http_client.go @@ -0,0 +1,96 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mocks + +import ( + context "context" + + network "github.com/smartcontractkit/chainlink/v2/core/services/gateway/network" + mock "github.com/stretchr/testify/mock" +) + +// HTTPClient is an autogenerated mock type for the HTTPClient type +type HTTPClient struct { + mock.Mock +} + +type HTTPClient_Expecter struct { + mock *mock.Mock +} + +func (_m *HTTPClient) EXPECT() *HTTPClient_Expecter { + return &HTTPClient_Expecter{mock: &_m.Mock} +} + +// Send provides a mock function with given fields: ctx, req +func (_m *HTTPClient) Send(ctx context.Context, req network.HTTPRequest) (*network.HTTPResponse, error) { + ret := _m.Called(ctx, req) + + if len(ret) == 0 { + panic("no return value specified for Send") + } + + var r0 *network.HTTPResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, network.HTTPRequest) (*network.HTTPResponse, error)); ok { + return rf(ctx, req) + } + if rf, ok := ret.Get(0).(func(context.Context, network.HTTPRequest) *network.HTTPResponse); ok { + r0 = rf(ctx, req) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*network.HTTPResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, network.HTTPRequest) error); ok { + r1 = rf(ctx, req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// HTTPClient_Send_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Send' +type HTTPClient_Send_Call struct { + *mock.Call +} + +// Send is a helper method to define mock.On call +// - ctx context.Context +// - req network.HTTPRequest +func (_e *HTTPClient_Expecter) Send(ctx interface{}, req interface{}) *HTTPClient_Send_Call { + return &HTTPClient_Send_Call{Call: _e.mock.On("Send", ctx, req)} +} + +func (_c *HTTPClient_Send_Call) Run(run func(ctx context.Context, req network.HTTPRequest)) *HTTPClient_Send_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(network.HTTPRequest)) + }) + return _c +} + +func (_c *HTTPClient_Send_Call) Return(_a0 *network.HTTPResponse, _a1 error) *HTTPClient_Send_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *HTTPClient_Send_Call) RunAndReturn(run func(context.Context, network.HTTPRequest) (*network.HTTPResponse, error)) *HTTPClient_Send_Call { + _c.Call.Return(run) + return _c +} + +// NewHTTPClient creates a new instance of HTTPClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewHTTPClient(t interface { + mock.TestingT + Cleanup(func()) +}) *HTTPClient { + mock := &HTTPClient{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} From feccfe5a13ef253319eefbd242d320efc8fae8d5 Mon Sep 17 00:00:00 2001 From: jinhoonbang Date: Mon, 30 Sep 2024 09:57:28 -0700 Subject: [PATCH 12/16] fix failing tests --- .changeset/empty-bees-fix.md | 5 +++++ core/capabilities/webapi/target/target_test.go | 4 ++-- core/scripts/gateway/run_gateway.go | 2 +- core/services/gateway/gateway_test.go | 12 ++++++------ .../integration_tests/gateway_integration_test.go | 9 ++++++++- 5 files changed, 22 insertions(+), 10 deletions(-) create mode 100644 .changeset/empty-bees-fix.md diff --git a/.changeset/empty-bees-fix.md b/.changeset/empty-bees-fix.md new file mode 100644 index 00000000000..59804fe7743 --- /dev/null +++ b/.changeset/empty-bees-fix.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +implement gateway handler that forwards outgoing request from http target capability. introduce gateway http client diff --git a/core/capabilities/webapi/target/target_test.go b/core/capabilities/webapi/target/target_test.go index faa901d4010..a4064c7e7fe 100644 --- a/core/capabilities/webapi/target/target_test.go +++ b/core/capabilities/webapi/target/target_test.go @@ -202,9 +202,9 @@ func TestCapability_Execute(t *testing.T) { statusCode, ok := values["statusCode"].(int64) require.True(t, ok) require.Equal(t, int64(200), statusCode) - respBody, ok := values["body"].(string) + respBody, ok := values["body"].([]byte) require.True(t, ok) - require.Equal(t, "response body", respBody) + require.Equal(t, "response body", string(respBody)) }) t.Run("context cancelled while waiting for gateway response", func(t *testing.T) { diff --git a/core/scripts/gateway/run_gateway.go b/core/scripts/gateway/run_gateway.go index 2daca5190a5..5dbcd02bf56 100644 --- a/core/scripts/gateway/run_gateway.go +++ b/core/scripts/gateway/run_gateway.go @@ -48,7 +48,7 @@ func main() { lggr, _ := logger.NewLogger() - handlerFactory := gateway.NewHandlerFactory(nil, nil, lggr) + handlerFactory := gateway.NewHandlerFactory(nil, nil, nil, lggr) gw, err := gateway.NewGatewayFromConfig(&cfg, handlerFactory, lggr) if err != nil { fmt.Println("error creating Gateway object:", err) diff --git a/core/services/gateway/gateway_test.go b/core/services/gateway/gateway_test.go index 3218c5428a2..7a5457c788c 100644 --- a/core/services/gateway/gateway_test.go +++ b/core/services/gateway/gateway_test.go @@ -57,7 +57,7 @@ Address = "0x0001020304050607080900010203040506070809" `) lggr := logger.TestLogger(t) - _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, lggr), lggr) + _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.NoError(t, err) } @@ -75,7 +75,7 @@ HandlerName = "dummy" `) lggr := logger.TestLogger(t) - _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, lggr), lggr) + _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.Error(t, err) } @@ -89,7 +89,7 @@ HandlerName = "no_such_handler" `) lggr := logger.TestLogger(t) - _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, lggr), lggr) + _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.Error(t, err) } @@ -103,7 +103,7 @@ SomeOtherField = "abcd" `) lggr := logger.TestLogger(t) - _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, lggr), lggr) + _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.Error(t, err) } @@ -121,7 +121,7 @@ Address = "0xnot_an_address" `) lggr := logger.TestLogger(t) - _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, lggr), lggr) + _, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, tomlConfig), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.Error(t, err) } @@ -129,7 +129,7 @@ func TestGateway_CleanStartAndClose(t *testing.T) { t.Parallel() lggr := logger.TestLogger(t) - gateway, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, buildConfig("")), gateway.NewHandlerFactory(nil, nil, lggr), lggr) + gateway, err := gateway.NewGatewayFromConfig(parseTOMLConfig(t, buildConfig("")), gateway.NewHandlerFactory(nil, nil, nil, lggr), lggr) require.NoError(t, err) servicetest.Run(t, gateway) } diff --git a/core/services/gateway/integration_tests/gateway_integration_test.go b/core/services/gateway/integration_tests/gateway_integration_test.go index 59418819b61..0ddf47bec04 100644 --- a/core/services/gateway/integration_tests/gateway_integration_test.go +++ b/core/services/gateway/integration_tests/gateway_integration_test.go @@ -10,6 +10,7 @@ import ( "strings" "sync/atomic" "testing" + "time" "github.com/jonboulle/clockwork" "github.com/onsi/gomega" @@ -24,6 +25,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/gateway/common" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/config" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/network" ) const gatewayConfigTemplate = ` @@ -143,7 +145,12 @@ func TestIntegration_Gateway_NoFullNodes_BasicConnectionAndMessage(t *testing.T) // Launch Gateway lggr := logger.TestLogger(t) gatewayConfig := fmt.Sprintf(gatewayConfigTemplate, nodeKeys.Address) - gateway, err := gateway.NewGatewayFromConfig(parseGatewayConfig(t, gatewayConfig), gateway.NewHandlerFactory(nil, nil, lggr), lggr) + c, err := network.NewHTTPClient(network.HTTPClientConfig{ + DefaultTimeout: 5 * time.Second, + MaxResponseBytes: 1000, + }, lggr) + require.NoError(t, err) + gateway, err := gateway.NewGatewayFromConfig(parseGatewayConfig(t, gatewayConfig), gateway.NewHandlerFactory(nil, nil, c, lggr), lggr) require.NoError(t, err) servicetest.Run(t, gateway) userPort, nodePort := gateway.GetUserPort(), gateway.GetNodePort() From f8154714ee867248d971f88d6d3d987ef2809c73 Mon Sep 17 00:00:00 2001 From: jinhoonbang Date: Mon, 30 Sep 2024 10:01:46 -0700 Subject: [PATCH 13/16] add tag to changeset --- .changeset/empty-bees-fix.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/empty-bees-fix.md b/.changeset/empty-bees-fix.md index 59804fe7743..e76ee621253 100644 --- a/.changeset/empty-bees-fix.md +++ b/.changeset/empty-bees-fix.md @@ -2,4 +2,4 @@ "chainlink": minor --- -implement gateway handler that forwards outgoing request from http target capability. introduce gateway http client +#wip implement gateway handler that forwards outgoing request from http target capability. introduce gateway http client From 6936a1272f93acbbfb2cb04a56fa2c83f9761782 Mon Sep 17 00:00:00 2001 From: jinhoonbang Date: Mon, 30 Sep 2024 10:20:59 -0700 Subject: [PATCH 14/16] fix linting issue --- core/services/gateway/gateway.go | 1 - .../gateway/handlers/webapicapabilities/handler.go | 13 ++++++------- .../handlers/webapicapabilities/handler_test.go | 8 ++++---- .../gateway/handlers/webapicapabilities/webapi.go | 2 +- core/services/gateway/network/httpclient_test.go | 12 ++++++++---- 5 files changed, 19 insertions(+), 17 deletions(-) diff --git a/core/services/gateway/gateway.go b/core/services/gateway/gateway.go index 13f2e95b15c..93ecc474bec 100644 --- a/core/services/gateway/gateway.go +++ b/core/services/gateway/gateway.go @@ -47,7 +47,6 @@ type gateway struct { codec api.Codec httpServer gw_net.HttpServer - httpClient gw_net.HTTPClient handlers map[string]handlers.Handler connMgr ConnectionManager lggr logger.Logger diff --git a/core/services/gateway/handlers/webapicapabilities/handler.go b/core/services/gateway/handlers/webapicapabilities/handler.go index 4fffdcce9e3..69e448bfcfa 100644 --- a/core/services/gateway/handlers/webapicapabilities/handler.go +++ b/core/services/gateway/handlers/webapicapabilities/handler.go @@ -65,13 +65,12 @@ func (h *handler) sendHTTPMessageToClient(ctx context.Context, req network.HTTPR resp, err := h.httpClient.Send(ctx, req) if err != nil { return nil, err - } else { - payload = TargetResponsePayload{ - ExecutionError: false, - StatusCode: uint16(resp.StatusCode), - Headers: resp.Headers, - Body: resp.Body, - } + } + payload = TargetResponsePayload{ + ExecutionError: false, + StatusCode: resp.StatusCode, + Headers: resp.Headers, + Body: resp.Body, } payloadBytes, err := json.Marshal(payload) if err != nil { diff --git a/core/services/gateway/handlers/webapicapabilities/handler_test.go b/core/services/gateway/handlers/webapicapabilities/handler_test.go index 17d6afe2df5..769fd400e2d 100644 --- a/core/services/gateway/handlers/webapicapabilities/handler_test.go +++ b/core/services/gateway/handlers/webapicapabilities/handler_test.go @@ -102,8 +102,8 @@ func TestHandler_SendHTTPMessageToClient(t *testing.T) { require.Eventually(t, func() bool { // ensure all goroutines close - err := handler.Close() - require.NoError(t, err) + err2 := handler.Close() + require.NoError(t, err2) return httpClient.AssertExpectations(t) && don.AssertExpectations(t) }, tests.WaitTimeout(t), 100*time.Millisecond) }) @@ -135,8 +135,8 @@ func TestHandler_SendHTTPMessageToClient(t *testing.T) { require.Eventually(t, func() bool { // ensure all goroutines close - err := handler.Close() - require.NoError(t, err) + err2 := handler.Close() + require.NoError(t, err2) return httpClient.AssertExpectations(t) && don.AssertExpectations(t) }, tests.WaitTimeout(t), 100*time.Millisecond) }) diff --git a/core/services/gateway/handlers/webapicapabilities/webapi.go b/core/services/gateway/handlers/webapicapabilities/webapi.go index 0bb5e299962..3e80d924e54 100644 --- a/core/services/gateway/handlers/webapicapabilities/webapi.go +++ b/core/services/gateway/handlers/webapicapabilities/webapi.go @@ -11,7 +11,7 @@ type TargetRequestPayload struct { type TargetResponsePayload struct { ExecutionError bool `json:"executionError"` // true if there were non-HTTP errors. false if HTTP request was sent regardless of status (2xx, 4xx, 5xx) ErrorMessage string `json:"errorMessage,omitempty"` // error message in case of failure - StatusCode uint16 `json:"statusCode,omitempty"` // HTTP status code + StatusCode int `json:"statusCode,omitempty"` // HTTP status code Headers map[string]string `json:"headers,omitempty"` // HTTP headers Body []byte `json:"body,omitempty"` // HTTP response body } diff --git a/core/services/gateway/network/httpclient_test.go b/core/services/gateway/network/httpclient_test.go index cb76755ea1c..2f4cc448ef5 100644 --- a/core/services/gateway/network/httpclient_test.go +++ b/core/services/gateway/network/httpclient_test.go @@ -38,7 +38,8 @@ func TestHTTPClient_Send(t *testing.T) { setupServer: func() *httptest.Server { return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) - w.Write([]byte("success")) + _, err2 := w.Write([]byte("success")) + require.NoError(t, err2) })) }, request: network.HTTPRequest{ @@ -61,7 +62,8 @@ func TestHTTPClient_Send(t *testing.T) { return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { time.Sleep(10 * time.Second) w.WriteHeader(http.StatusOK) - w.Write([]byte("success")) + _, err2 := w.Write([]byte("success")) + require.NoError(t, err2) })) }, request: network.HTTPRequest{ @@ -79,7 +81,8 @@ func TestHTTPClient_Send(t *testing.T) { setupServer: func() *httptest.Server { return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte("error")) + _, err2 := w.Write([]byte("error")) + require.NoError(t, err2) })) }, request: network.HTTPRequest{ @@ -101,7 +104,8 @@ func TestHTTPClient_Send(t *testing.T) { setupServer: func() *httptest.Server { return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) - w.Write(make([]byte, 2048)) // Response body longer than MaxResponseBytes + _, err2 := w.Write(make([]byte, 2048)) + require.NoError(t, err2) })) }, request: network.HTTPRequest{ From 27bfa02675f4005d5998bc6300530b72e382131e Mon Sep 17 00:00:00 2001 From: jinhoonbang Date: Mon, 30 Sep 2024 11:06:19 -0700 Subject: [PATCH 15/16] fix failing race --- .../webapicapabilities/handler_test.go | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/core/services/gateway/handlers/webapicapabilities/handler_test.go b/core/services/gateway/handlers/webapicapabilities/handler_test.go index 769fd400e2d..e8202b36cb8 100644 --- a/core/services/gateway/handlers/webapicapabilities/handler_test.go +++ b/core/services/gateway/handlers/webapicapabilities/handler_test.go @@ -84,8 +84,8 @@ func TestHandler_SendHTTPMessageToClient(t *testing.T) { don.EXPECT().SendToNode(mock.Anything, nodes[0].Address, mock.MatchedBy(func(m *api.Message) bool { var payload TargetResponsePayload - err = json.Unmarshal(m.Body.Payload, &payload) - if err != nil { + err2 := json.Unmarshal(m.Body.Payload, &payload) + if err2 != nil { return false } return "123" == m.Body.MessageId && @@ -117,8 +117,8 @@ func TestHandler_SendHTTPMessageToClient(t *testing.T) { don.EXPECT().SendToNode(mock.Anything, nodes[0].Address, mock.MatchedBy(func(m *api.Message) bool { var payload TargetResponsePayload - err = json.Unmarshal(m.Body.Payload, &payload) - if err != nil { + err2 := json.Unmarshal(m.Body.Payload, &payload) + if err2 != nil { return false } return "123" == m.Body.MessageId && @@ -134,7 +134,7 @@ func TestHandler_SendHTTPMessageToClient(t *testing.T) { require.NoError(t, err) require.Eventually(t, func() bool { - // ensure all goroutines close + // // ensure all goroutines close err2 := handler.Close() require.NoError(t, err2) return httpClient.AssertExpectations(t) && don.AssertExpectations(t) @@ -146,8 +146,8 @@ func TestHandler_SendHTTPMessageToClient(t *testing.T) { don.EXPECT().SendToNode(mock.Anything, nodes[0].Address, mock.MatchedBy(func(m *api.Message) bool { var payload TargetResponsePayload - err = json.Unmarshal(m.Body.Payload, &payload) - if err != nil { + err2 := json.Unmarshal(m.Body.Payload, &payload) + if err2 != nil { return false } return "123" == m.Body.MessageId && @@ -161,9 +161,9 @@ func TestHandler_SendHTTPMessageToClient(t *testing.T) { require.NoError(t, err) require.Eventually(t, func() bool { - // ensure all goroutines close - err := handler.Close() - require.NoError(t, err) + // // ensure all goroutines close + err2 := handler.Close() + require.NoError(t, err2) return httpClient.AssertExpectations(t) && don.AssertExpectations(t) }, tests.WaitTimeout(t), 100*time.Millisecond) }) From 19b4d18a00d991883225f64b415b2fad8b66a841 Mon Sep 17 00:00:00 2001 From: jinhoonbang Date: Mon, 30 Sep 2024 17:02:46 -0700 Subject: [PATCH 16/16] fix linting --- .../services/gateway/handlers/webapicapabilities/handler_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/core/services/gateway/handlers/webapicapabilities/handler_test.go b/core/services/gateway/handlers/webapicapabilities/handler_test.go index 599ac1291e2..e631111ff1d 100644 --- a/core/services/gateway/handlers/webapicapabilities/handler_test.go +++ b/core/services/gateway/handlers/webapicapabilities/handler_test.go @@ -68,6 +68,7 @@ func setupHandler(t *testing.T) (*handler, *mocks.HTTPClient, *handlermocks.DON, }) } handler, err := NewHandler(json.RawMessage(cfgBytes), donConfig, don, httpClient, lggr) + require.NoError(t, err) return handler, httpClient, don, nodes }