Skip to content

Commit

Permalink
refactor timeout function
Browse files Browse the repository at this point in the history
  • Loading branch information
lkingland committed Jun 12, 2023
1 parent 57f4e76 commit 04d2e68
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 27 deletions.
4 changes: 2 additions & 2 deletions pkg/functions/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ func TestClient_RunTimeout(t *testing.T) {
client := fn.New(
fn.WithBuilder(oci.NewBuilder("", true)),
fn.WithVerbose(true),
fn.WithStartTimeout(1000*time.Millisecond))
fn.WithStartTimeout(2*time.Second))

// Initialize
f, err := client.Init(fn.Function{Root: root, Runtime: "go", Registry: TestRegistry})
Expand Down Expand Up @@ -784,7 +784,7 @@ func TestClient_RunTimeout(t *testing.T) {
}

// Run
// with a fairly short timeout
// with a fairly short timeout so as not to hold up tests.
_, err = client.Run(ctx, f, fn.RunWithStartTimeout(1*time.Second))
if !errors.As(err, &fn.ErrRunTimeout{}) {
t.Fatalf("did not receive ErrRunTimeout. Got %v", err)
Expand Down
1 change: 1 addition & 0 deletions pkg/functions/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var (
ErrTemplateMissingRepository = errors.New("template name missing repository prefix")
ErrTemplateNotFound = errors.New("template not found")
ErrTemplatesNotFound = errors.New("templates path (runtimes) not found")
ErrContextCanceled = errors.New("the operation was canceled")

// TODO: change the wording of this error to not be CLI-specific;
// eg "registry required". Then catch the error in the CLI and add the
Expand Down
73 changes: 48 additions & 25 deletions pkg/functions/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package functions

import (
"context"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -147,47 +148,69 @@ func runGo(ctx context.Context, job *Job) (err error) {
return
}

func waitFor(ctx context.Context, job *Job, timeout time.Duration) (err error) {
func waitFor(ctx context.Context, job *Job, timeout time.Duration) error {
var (
uri = fmt.Sprintf("http://%s:%s/%s", job.Host, job.Port, readinessEndpoint)
uri = fmt.Sprintf("http://%s:%s%s", job.Host, job.Port, readinessEndpoint)
interval = 500 * time.Millisecond
deadline = time.Now().Add(timeout)
)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

if job.verbose {
fmt.Printf("Waiting for %v\n", uri)
}

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

for {
req, err := http.NewRequestWithContext(ctx, "GET", uri, nil)
if err != nil {
return fmt.Errorf("error creating readiness check context. %w", err)
ok, err := isReady(ctx, uri, timeout, job.verbose)
if ok || err != nil {
return err
}

res, err := http.DefaultClient.Do(req)
select {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return ErrRunTimeout{timeout}
}
return ErrContextCanceled
case <-time.After(interval):
continue
}
}
}

// isReady returns true if the uri could be reached and returned an HTTP 200.
// False is returned if a nonfatal error was encountered (which will have been
// printed to stderr), and an error is returned when an error is encountered
// that is unlikely to be due to startup (malformed requests).
func isReady(ctx context.Context, uri string, timeout time.Duration, verbose bool) (ok bool, err error) {
req, err := http.NewRequestWithContext(ctx, "GET", uri, nil)
if err != nil {
return false, fmt.Errorf("error creating readiness check context. %w", err)
}

res, err := http.DefaultClient.Do(req)
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
return ErrRunTimeout{timeout}
} else if time.Now().After(deadline) {
return ErrRunTimeout{timeout}
} else if res != nil && res.StatusCode == 200 {
return nil // success
return false, ErrRunTimeout{timeout}
}
defer res.Body.Close()

if job.verbose {
if err != nil {
fmt.Printf("endpoint not available. %v", err)
} else {
fmt.Printf("endpoint returned HTTP %v.\n", res.StatusCode)
dump, _ := httputil.DumpResponse(res, true)
fmt.Println(dump)
}
if verbose {
fmt.Fprintf(os.Stderr, "endpoint not available. %v\n", err)
}
return false, nil // nonfatal. May still be starting up.
}
defer res.Body.Close()

time.Sleep(interval)
if res.StatusCode != 200 {
if verbose {
fmt.Fprintf(os.Stderr, "endpoint returned HTTP %v:\n", res.StatusCode)
dump, _ := httputil.DumpResponse(res, true)
fmt.Println(string(dump))
}
return false, nil // nonfatal. May still be starting up
}

return true, nil
}

// choosePort returns an unused port on the given interface (host)
Expand Down

0 comments on commit 04d2e68

Please sign in to comment.