Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

heimdall: add max retries to heimdall client #9098

Merged
merged 4 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 87 additions & 71 deletions consensus/bor/heimdall/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import (
"strings"
"time"

"github.com/ledgerwatch/erigon-lib/metrics"
"github.com/ledgerwatch/log/v3"

"github.com/ledgerwatch/erigon-lib/metrics"
"github.com/ledgerwatch/erigon/consensus/bor/clerk"
"github.com/ledgerwatch/erigon/consensus/bor/heimdall/checkpoint"
"github.com/ledgerwatch/erigon/consensus/bor/heimdall/milestone"
"github.com/ledgerwatch/erigon/consensus/bor/heimdall/span"
"github.com/ledgerwatch/log/v3"
)

var (
Expand All @@ -35,7 +35,8 @@ var (
const (
stateFetchLimit = 50
apiHeimdallTimeout = 10 * time.Second
retryCall = 5 * time.Second
retryBackOff = time.Second
maxRetries = 5
)

type StateSyncEventsResponse struct {
Expand All @@ -48,27 +49,42 @@ type SpanResponse struct {
Result span.HeimdallSpan `json:"result"`
}

type HeimdallClient struct {
urlString string
client http.Client
closeCh chan struct{}
logger log.Logger
type Client struct {
urlString string
client HttpClient
retryBackOff time.Duration
maxRetries int
closeCh chan struct{}
logger log.Logger
}

type Request struct {
client http.Client
client HttpClient
url *url.URL
start time.Time
}

func NewHeimdallClient(urlString string, logger log.Logger) *HeimdallClient {
return &HeimdallClient{
urlString: urlString,
logger: logger,
client: http.Client{
Timeout: apiHeimdallTimeout,
},
closeCh: make(chan struct{}),
//go:generate mockgen -destination=./mock/http_client_mock.go -package=mock . HttpClient
type HttpClient interface {
Do(req *http.Request) (*http.Response, error)
CloseIdleConnections()
}

func NewHeimdallClient(urlString string, logger log.Logger) *Client {
httpClient := &http.Client{
Timeout: apiHeimdallTimeout,
}
return newHeimdallClient(urlString, httpClient, retryBackOff, maxRetries, logger)
}

func newHeimdallClient(urlString string, httpClient HttpClient, retryBackOff time.Duration, maxRetries int, logger log.Logger) *Client {
return &Client{
urlString: urlString,
logger: logger,
client: httpClient,
retryBackOff: retryBackOff,
maxRetries: maxRetries,
closeCh: make(chan struct{}),
}
}

Expand All @@ -90,20 +106,20 @@ const (
fetchSpanFormat = "bor/span/%d"
)

func (h *HeimdallClient) StateSyncEvents(ctx context.Context, fromID uint64, to int64) ([]*clerk.EventRecordWithTime, error) {
func (c *Client) StateSyncEvents(ctx context.Context, fromID uint64, to int64) ([]*clerk.EventRecordWithTime, error) {
eventRecords := make([]*clerk.EventRecordWithTime, 0)

for {
url, err := stateSyncURL(h.urlString, fromID, to)
url, err := stateSyncURL(c.urlString, fromID, to)
if err != nil {
return nil, err
}

h.logger.Debug("[bor.heimdall] Fetching state sync events", "queryParams", url.RawQuery)
c.logger.Debug("[bor.heimdall] Fetching state sync events", "queryParams", url.RawQuery)

ctx = withRequestType(ctx, stateSyncRequest)

response, err := FetchWithRetry[StateSyncEventsResponse](ctx, h.client, url, h.closeCh, h.logger)
response, err := FetchWithRetry[StateSyncEventsResponse](ctx, c, url)
if err != nil {
return nil, err
}
Expand All @@ -129,15 +145,15 @@ func (h *HeimdallClient) StateSyncEvents(ctx context.Context, fromID uint64, to
return eventRecords, nil
}

func (h *HeimdallClient) Span(ctx context.Context, spanID uint64) (*span.HeimdallSpan, error) {
url, err := spanURL(h.urlString, spanID)
func (c *Client) Span(ctx context.Context, spanID uint64) (*span.HeimdallSpan, error) {
url, err := spanURL(c.urlString, spanID)
if err != nil {
return nil, err
}

ctx = withRequestType(ctx, spanRequest)

response, err := FetchWithRetry[SpanResponse](ctx, h.client, url, h.closeCh, h.logger)
response, err := FetchWithRetry[SpanResponse](ctx, c, url)
if err != nil {
return nil, err
}
Expand All @@ -146,15 +162,15 @@ func (h *HeimdallClient) Span(ctx context.Context, spanID uint64) (*span.Heimdal
}

// FetchCheckpoint fetches the checkpoint from heimdall
func (h *HeimdallClient) FetchCheckpoint(ctx context.Context, number int64) (*checkpoint.Checkpoint, error) {
url, err := checkpointURL(h.urlString, number)
func (c *Client) FetchCheckpoint(ctx context.Context, number int64) (*checkpoint.Checkpoint, error) {
url, err := checkpointURL(c.urlString, number)
if err != nil {
return nil, err
}

ctx = withRequestType(ctx, checkpointRequest)

response, err := FetchWithRetry[checkpoint.CheckpointResponse](ctx, h.client, url, h.closeCh, h.logger)
response, err := FetchWithRetry[checkpoint.CheckpointResponse](ctx, c, url)
if err != nil {
return nil, err
}
Expand All @@ -168,8 +184,8 @@ func isInvalidMilestoneIndexError(err error) bool {
}

// FetchMilestone fetches a milestone from heimdall
func (h *HeimdallClient) FetchMilestone(ctx context.Context, number int64) (*milestone.Milestone, error) {
url, err := milestoneURL(h.urlString, number)
func (c *Client) FetchMilestone(ctx context.Context, number int64) (*milestone.Milestone, error) {
url, err := milestoneURL(c.urlString, number)
if err != nil {
return nil, err
}
Expand All @@ -180,7 +196,7 @@ func (h *HeimdallClient) FetchMilestone(ctx context.Context, number int64) (*mil
return !isInvalidMilestoneIndexError(err)
}

response, err := FetchWithRetryEx[milestone.MilestoneResponse](ctx, h.client, url, isRecoverableError, h.closeCh, h.logger)
response, err := FetchWithRetryEx[milestone.MilestoneResponse](ctx, c, url, isRecoverableError)
if err != nil {
if isInvalidMilestoneIndexError(err) {
return nil, fmt.Errorf("%w: number %d", ErrNotInMilestoneList, number)
Expand All @@ -192,15 +208,15 @@ func (h *HeimdallClient) FetchMilestone(ctx context.Context, number int64) (*mil
}

// FetchCheckpointCount fetches the checkpoint count from heimdall
func (h *HeimdallClient) FetchCheckpointCount(ctx context.Context) (int64, error) {
url, err := checkpointCountURL(h.urlString)
func (c *Client) FetchCheckpointCount(ctx context.Context) (int64, error) {
url, err := checkpointCountURL(c.urlString)
if err != nil {
return 0, err
}

ctx = withRequestType(ctx, checkpointCountRequest)

response, err := FetchWithRetry[checkpoint.CheckpointCountResponse](ctx, h.client, url, h.closeCh, h.logger)
response, err := FetchWithRetry[checkpoint.CheckpointCountResponse](ctx, c, url)
if err != nil {
return 0, err
}
Expand All @@ -209,15 +225,15 @@ func (h *HeimdallClient) FetchCheckpointCount(ctx context.Context) (int64, error
}

// FetchMilestoneCount fetches the milestone count from heimdall
func (h *HeimdallClient) FetchMilestoneCount(ctx context.Context) (int64, error) {
url, err := milestoneCountURL(h.urlString)
func (c *Client) FetchMilestoneCount(ctx context.Context) (int64, error) {
url, err := milestoneCountURL(c.urlString)
if err != nil {
return 0, err
}

ctx = withRequestType(ctx, milestoneCountRequest)

response, err := FetchWithRetry[milestone.MilestoneCountResponse](ctx, h.client, url, h.closeCh, h.logger)
response, err := FetchWithRetry[milestone.MilestoneCountResponse](ctx, c, url)
if err != nil {
return 0, err
}
Expand All @@ -226,15 +242,15 @@ func (h *HeimdallClient) FetchMilestoneCount(ctx context.Context) (int64, error)
}

// FetchLastNoAckMilestone fetches the last no-ack-milestone from heimdall
func (h *HeimdallClient) FetchLastNoAckMilestone(ctx context.Context) (string, error) {
url, err := lastNoAckMilestoneURL(h.urlString)
func (c *Client) FetchLastNoAckMilestone(ctx context.Context) (string, error) {
url, err := lastNoAckMilestoneURL(c.urlString)
if err != nil {
return "", err
}

ctx = withRequestType(ctx, milestoneLastNoAckRequest)

response, err := FetchWithRetry[milestone.MilestoneLastNoAckResponse](ctx, h.client, url, h.closeCh, h.logger)
response, err := FetchWithRetry[milestone.MilestoneLastNoAckResponse](ctx, c, url)
if err != nil {
return "", err
}
Expand All @@ -243,15 +259,15 @@ func (h *HeimdallClient) FetchLastNoAckMilestone(ctx context.Context) (string, e
}

// FetchNoAckMilestone fetches the last no-ack-milestone from heimdall
func (h *HeimdallClient) FetchNoAckMilestone(ctx context.Context, milestoneID string) error {
url, err := noAckMilestoneURL(h.urlString, milestoneID)
func (c *Client) FetchNoAckMilestone(ctx context.Context, milestoneID string) error {
url, err := noAckMilestoneURL(c.urlString, milestoneID)
if err != nil {
return err
}

ctx = withRequestType(ctx, milestoneNoAckRequest)

response, err := FetchWithRetry[milestone.MilestoneNoAckResponse](ctx, h.client, url, h.closeCh, h.logger)
response, err := FetchWithRetry[milestone.MilestoneNoAckResponse](ctx, c, url)
if err != nil {
return err
}
Expand All @@ -263,17 +279,17 @@ func (h *HeimdallClient) FetchNoAckMilestone(ctx context.Context, milestoneID st
return nil
}

// FetchMilestoneID fetches the bool result from Heimdal whether the ID corresponding
// FetchMilestoneID fetches the bool result from Heimdall whether the ID corresponding
// to the given milestone is in process in Heimdall
func (h *HeimdallClient) FetchMilestoneID(ctx context.Context, milestoneID string) error {
url, err := milestoneIDURL(h.urlString, milestoneID)
func (c *Client) FetchMilestoneID(ctx context.Context, milestoneID string) error {
url, err := milestoneIDURL(c.urlString, milestoneID)
if err != nil {
return err
}

ctx = withRequestType(ctx, milestoneIDRequest)

response, err := FetchWithRetry[milestone.MilestoneIDResponse](ctx, h.client, url, h.closeCh, h.logger)
response, err := FetchWithRetry[milestone.MilestoneIDResponse](ctx, c, url)

if err != nil {
return err
Expand All @@ -287,55 +303,53 @@ func (h *HeimdallClient) FetchMilestoneID(ctx context.Context, milestoneID strin
}

// FetchWithRetry returns data from heimdall with retry
func FetchWithRetry[T any](ctx context.Context, client http.Client, url *url.URL, closeCh chan struct{}, logger log.Logger) (*T, error) {
return FetchWithRetryEx[T](ctx, client, url, nil, closeCh, logger)
func FetchWithRetry[T any](ctx context.Context, client *Client, url *url.URL) (*T, error) {
return FetchWithRetryEx[T](ctx, client, url, nil)
}

// FetchWithRetryEx returns data from heimdall with retry
func FetchWithRetryEx[T any](ctx context.Context, client http.Client, url *url.URL, isRecoverableError func(error) bool, closeCh chan struct{}, logger log.Logger) (*T, error) {
func FetchWithRetryEx[T any](ctx context.Context, client *Client, url *url.URL, isRecoverableError func(error) bool) (result *T, err error) {
attempt := 0
const logEach = 5

// create a new ticker for retrying the request
ticker := time.NewTicker(retryCall)
ticker := time.NewTicker(client.retryBackOff)
defer ticker.Stop()

for {
for attempt < client.maxRetries {
attempt++

request := &Request{client: client, url: url, start: time.Now()}
result, err := Fetch[T](ctx, request)
request := &Request{client: client.client, url: url, start: time.Now()}
result, err = Fetch[T](ctx, request)
if err == nil {
return result, nil
}

// 503 (Service Unavailable) is thrown when an endpoint isn't activated
// yet in heimdall. E.g. when the hardfork hasn't hit yet but heimdall
// yet in heimdall. E.g. when the hard fork hasn't hit yet but heimdall
// is upgraded.
if errors.Is(err, ErrServiceUnavailable) {
logger.Debug("[bor.heimdall] service unavailable at the moment", "path", url.Path, "attempt", attempt, "err", err)
client.logger.Debug("[bor.heimdall] service unavailable at the moment", "path", url.Path, "attempt", attempt, "err", err)
return nil, err
}

if (isRecoverableError != nil) && !isRecoverableError(err) {
return nil, err
}

if attempt%logEach == 1 {
logger.Warn("[bor.heimdall] an error while fetching", "path", url.Path, "attempt", attempt, "err", err)
}
client.logger.Warn("[bor.heimdall] an error while fetching", "path", url.Path, "attempt", attempt, "err", err)

select {
case <-ctx.Done():
logger.Debug("[bor.heimdall] request canceled", "reason", ctx.Err(), "path", url.Path, "attempt", attempt)
client.logger.Debug("[bor.heimdall] request canceled", "reason", ctx.Err(), "path", url.Path, "attempt", attempt)
return nil, ctx.Err()
case <-closeCh:
logger.Debug("[bor.heimdall] shutdown detected, terminating request", "path", url.Path)
case <-client.closeCh:
client.logger.Debug("[bor.heimdall] shutdown detected, terminating request", "path", url.Path)
return nil, ErrShutdownDetected
case <-ticker.C:
// retry
}
}

return nil, err
}

// Fetch fetches response from heimdall
Expand Down Expand Up @@ -430,7 +444,7 @@ func makeURL(urlString, rawPath, rawQuery string) (*url.URL, error) {
}

// internal fetch method
func internalFetch(ctx context.Context, client http.Client, u *url.URL) ([]byte, error) {
func internalFetch(ctx context.Context, client HttpClient, u *url.URL) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return nil, err
Expand All @@ -441,10 +455,12 @@ func internalFetch(ctx context.Context, client http.Client, u *url.URL) ([]byte,
return nil, err
}

defer res.Body.Close()
defer func() {
_ = res.Body.Close()
}()

if res.StatusCode == http.StatusServiceUnavailable {
return nil, fmt.Errorf("%w: url='%s'; status=%d;", ErrServiceUnavailable, u.String(), res.StatusCode)
return nil, fmt.Errorf("%w: url='%s', status=%d", ErrServiceUnavailable, u.String(), res.StatusCode)
}

// unmarshall data from buffer
Expand All @@ -460,13 +476,13 @@ func internalFetch(ctx context.Context, client http.Client, u *url.URL) ([]byte,

// check status code
if res.StatusCode != 200 {
return nil, fmt.Errorf("%w: url='%s'; status=%d; body='%s';", ErrNotSuccessfulResponse, u.String(), res.StatusCode, string(body))
return nil, fmt.Errorf("%w: url='%s', status=%d, body='%s'", ErrNotSuccessfulResponse, u.String(), res.StatusCode, string(body))
}

return body, nil
}

func internalFetchWithTimeout(ctx context.Context, client http.Client, url *url.URL) ([]byte, error) {
func internalFetchWithTimeout(ctx context.Context, client HttpClient, url *url.URL) ([]byte, error) {
ctx, cancel := context.WithTimeout(ctx, apiHeimdallTimeout)
defer cancel()

Expand All @@ -475,7 +491,7 @@ func internalFetchWithTimeout(ctx context.Context, client http.Client, url *url.
}

// Close sends a signal to stop the running process
func (h *HeimdallClient) Close() {
close(h.closeCh)
h.client.CloseIdleConnections()
func (c *Client) Close() {
close(c.closeCh)
c.client.CloseIdleConnections()
}
Loading
Loading