Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

draft for HTTP Target PR #14448

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft

draft for HTTP Target PR #14448

wants to merge 1 commit into from

Conversation

jinhoonbang
Copy link
Contributor

@jinhoonbang jinhoonbang commented Sep 16, 2024

  • implements HTTP Target capability that sends HTTP request in a round robin manner with retries and timeouts
  • implements gateway handler that receives request from node and forwards the outgoing request to users
  • implements node handler that receives response back from gateway handler

TODO:

  • unit tests
  • rate limiting and allowlist checks

…ds HTTP request in a round robin manner with retries and timeouts. Implements handlers for gateways and the capability.
Copy link
Contributor

I see you updated files related to core. Please run pnpm changeset in the root directory to add a changeset as well as in the text include at least one of the following tags:

  • #added For any new functionality added.
  • #breaking_change For any functionality that requires manual action for the node to boot.
  • #bugfix For bug fixes.
  • #changed For any change to the existing functionality.
  • #db_update For any feature that introduces updates to database schema.
  • #deprecation_notice For any upcoming deprecation functionality.
  • #internal For changesets that need to be excluded from the final changelog.
  • #nops For any feature that is NOP facing and needs to be in the official Release Notes for the release.
  • #removed For any functionality/config that is removed.
  • #updated For any functionality that is updated.
  • #wip For any change that is not ready yet and external communication about it should be held off till it is feature complete.

🎖️ No JIRA issue number found in: PR title, commit message, or branch name. Please include the issue ID in one of these.

@jinhoonbang jinhoonbang changed the title draft for HTTP Trigger PR draft for HTTP Target PR Sep 16, 2024
@DavidOrchard
Copy link
Contributor

Likely a question from ignorance, but why do you have 3 separate components, Launcher, Handler, Capability, all with start/stop etc overhead?

)

const (
FunctionsHandlerType HandlerType = "functions"
DummyHandlerType HandlerType = "dummy"
WorkflowHandlerType HandlerType = "workflow"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know I used "workflow" as an example in my skeleton PR but it's not a great name. The name proposed in the design doc is "web-capabilities" - let's use that please (same for the const name).

@@ -54,7 +56,12 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) (services
if err2 != nil {
return nil, errors.Wrap(err2, "unmarshal gateway config")
}
handlerFactory := NewHandlerFactory(d.legacyChains, d.ds, d.lggr)
// TODO: pass in TLS config and default HTTP timeout
httpClient, err := network.NewHttpClient(nil, time.Duration(10)*time.Millisecond, d.lggr)
Copy link
Contributor

@bolekk bolekk Sep 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced that the HTTP Client should be created in the delegate and passed via HandlerFactory. Is there a good reason to do so?

It feels like it should be a library that capabilities could include whenever needed. That way, any config for HTTP client can become part of a handler config (and also differ across different handlers, if needed). OTOH a possible issue I see is some Proxy config that could be global to the node, rather than to handler. But maybe it's not a big deal. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some networking configs such as max response size and write timeout seem like global node settings. One other potential advantage is it can have its own state and allow additional logic in the future (e.g. keep track of endpoint health to determine wait time before retrying)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, fair.

@@ -0,0 +1,105 @@
package network
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd put it in gateway/handlers/common and create inside handlers that need it.

type HttpResponse struct {
StatusCode int // HTTP status code
Headers map[string]string // HTTP headers
Body []byte // Base64-encoded binary body
Copy link
Contributor

@bolekk bolekk Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not rally base64-encoded here yet..? since it's a byte array not a string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops copy-pasta. fixed.


func NewHttpClient(tlsConfig *tls.Config, defaultTimeout time.Duration, lggr logger.Logger) (HttpClient, error) {
transport := http.DefaultTransport
if tlsConfig != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likely won't be needed for an HTTPClient. This is useful for client-side certs, which we won't support. I think you can ignore this param for simplicity and just leave a comment saying it could be extended in the future.

Timeout: time.Duration(targetPayload.TimeoutMs) * time.Millisecond,
RetryCount: targetPayload.RetryCount,
}
resp, err := d.httpClient.Send(ctx, req)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handlers have to be non-blocking so we can't do a Send() here directly. You should spawn a new goroutine (or have a pool of them and a queue). Handler should return success once the message is parsed and sent to HTTP client. If a real error appears later, it also needs to be sent back to the node.

RetryCount uint8 `json:"retryCount,omitempty"` // Number of retries, defaults to 0.
}

type TargetResponsePayload struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also have a "higher-level" error code, if a Gateway is not even able to connect or parse or process the request. Check out ResponseBase in the Functions handler.

if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowExecutionID); err != nil {
return "", fmt.Errorf("workflow execution ID is invalid: %w", err)
}
messageID := []string{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great!

@@ -0,0 +1,172 @@
package target
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to put this in webapi instead and have the constructor be NewTarget? I think this will stutter less and also potentially lead to less conflicts: the unmodified name of the capability atm is target.NewCapability which doesn't say much

Copy link
Contributor Author

@jinhoonbang jinhoonbang Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to doing that. I think that would make the code more clear overall. We should enforce the same convention in other capabilities though to be consistent. (i.e. avoid NewCapability and use New${CapabilityName}Capability)

responseChs map[string]chan *api.Message, responseChsMu *sync.Mutex) (*Capability, error) {
return &Capability{
responseChs: make(map[string]chan *api.Message),
responseChsMu: responseChsMu,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I realise this is a draft but just fyi the registry is missing from here

func (c *Capability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
c.lggr.Debugw("executing http target", "capabilityRequest", req)

var input WorkflowInput
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling this WorkflowInput is slightly misleading IMO: the capability defines the input, not the workflow

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. this can be webapi.TargetInput if we start enforcing the convention above.

URL string `json:"url"` // URL to query, only http and https protocols are supported.
Method string `json:"method,omitempty"` // HTTP verb, defaults to GET.
Headers map[string]string `json:"headers,omitempty"` // HTTP headers, defaults to empty.
Body string `json:"body,omitempty"` // Base64-encoded binary body, defaults to empty.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: can we just use bytes here? Why are we expecting the user to encode the body ahead of time; that seems to leak implementation details.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought mapstructure used inside Map.UnwrapTo can't deserialize to []byte here. Will test it out.

Headers: targetPayload.Headers,
Body: targetPayload.Body,
Timeout: time.Duration(targetPayload.TimeoutMs) * time.Millisecond,
RetryCount: targetPayload.RetryCount,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how this is generated, but this should be different than the retrycount from gateway. And it doesnt make sense to retry on many 4xx codes like 401, 403, 404

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be the customer-provided retries. I agree that there are 2 level of retries needed:

  1. user-defined retries - this is defined in the workflows
  2. node-level retries - retries between gateway and node.

decided to leave a TODO for retry logic but yes, fully agreed we shouldn't retry on all errors

return nil
}

func (d *workflowHandler) HandleNodeMessage(ctx context.Context, msg *api.Message, nodeAddr string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pardon my ignorance, but if the HandleNodeMessage fails via handleWebAPITargetMessage failure, how to we tell the difference between a not last retriable message and the final retried message?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants