Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BCF-3178] - Improve err handling and logs for in memory data source cache #12907

Merged
merged 4 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/brown-penguins-grin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Fix in memory data source cache changes/bug that only allowed pipeline results where none of the data sources failed. #bugfix
47 changes: 26 additions & 21 deletions core/services/ocrcommon/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ocrcommon
import (
"context"
"encoding/json"
errjoin "errors"
"fmt"
"math/big"
"sync"
Expand Down Expand Up @@ -294,31 +293,30 @@ func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error {
ds.mu.Lock()
defer ds.mu.Unlock()

// check for any errors
_, latestTrrs, latestUpdateErr := ds.executeRun(ctx)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This checked for any errors in the pipeline results, which doesn't make sense since multiple bridge data sources are used and not always all of them are successful

if latestTrrs.FinalResult(ds.lggr).HasErrors() {
latestUpdateErr = errjoin.Join(append(latestTrrs.FinalResult(ds.lggr).AllErrors, latestUpdateErr)...)
}

if latestUpdateErr != nil {
_, latestTrrs, err := ds.executeRun(ctx)
if err != nil {
previousUpdateErr := ds.latestUpdateErr
ds.latestUpdateErr = latestUpdateErr
// raise log severity
ds.latestUpdateErr = err
// warn log if previous cache update also errored
if previousUpdateErr != nil {
ds.lggr.Warnf("consecutive cache updates errored: previous err: %v new err: %v", previousUpdateErr, ds.latestUpdateErr)
}
return errors.Wrapf(ds.latestUpdateErr, "error executing run for spec ID %v", ds.spec.ID)

return errors.Wrapf(ds.latestUpdateErr, "error updating in memory data source cache for spec ID %v", ds.spec.ID)
}

ds.latestTrrs = latestTrrs
ds.latestResult = ds.latestTrrs.FinalResult(ds.lggr)
value, err := ds.inMemoryDataSource.parse(ds.latestResult)
value, err := ds.inMemoryDataSource.parse(latestTrrs.FinalResult(ds.lggr))
if err != nil {
return errors.Wrapf(err, "invalid result")
ds.latestUpdateErr = errors.Wrapf(err, "invalid result")
return ds.latestUpdateErr
}

// backup in case data source fails continuously and node gets rebooted
// update cache values
ds.latestTrrs = latestTrrs
ds.latestResult = ds.latestTrrs.FinalResult(ds.lggr)
ds.latestUpdateErr = nil
Copy link
Contributor Author

@ilija42 ilija42 Apr 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved this down to be more cautious since we now don't check for all errors, although median task should handle everything properly


// backup in case data source fails continuously and node gets rebooted
timePairBytes, err := json.Marshal(&ResultTimePair{Result: *serializablebig.New(value), Time: time.Now()})
if err != nil {
return fmt.Errorf("failed to marshal result time pair, err: %w", err)
Expand All @@ -341,7 +339,7 @@ func (ds *inMemoryDataSourceCache) get(ctx context.Context) (pipeline.FinalResul
ds.mu.RUnlock()

if err := ds.updateCache(ctx); err != nil {
ds.lggr.Warnf("failed to update cache err: %v, returning stale result now, err: %v", err)
ds.lggr.Warnf("failed to update cache, returning stale result now, err: %v", err)
}

ds.mu.RLock()
Expand All @@ -357,15 +355,15 @@ func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2ty

timePairBytes, err := ds.kvStore.Get(ctx, dataSourceCacheKey)
if err != nil {
return nil, fmt.Errorf("failed to get result time pair bytes, err: %w", err)
return nil, fmt.Errorf("in memory data source cache is empty and failed to get backup persisted value, err: %w", err)
}

if err := json.Unmarshal(timePairBytes, &resTime); err != nil {
return nil, fmt.Errorf("failed to unmarshal result time pair bytes, err: %w", err)
if err = json.Unmarshal(timePairBytes, &resTime); err != nil {
return nil, fmt.Errorf("in memory data source cache is empty and failed to unmarshal backup persisted value, err: %w", err)
}

if time.Since(resTime.Time) >= ds.stalenessAlertThreshold {
ds.lggr.Errorf("cache hasn't been updated for over %v, latestUpdateErr is: %v", ds.stalenessAlertThreshold, ds.latestUpdateErr)
ds.lggr.Errorf("in memory data source cache is empty and the persisted value hasn't been updated for over %v, latestUpdateErr is: %v", ds.stalenessAlertThreshold, ds.latestUpdateErr)
}
return resTime.Result.ToInt(), nil
}
Expand All @@ -376,6 +374,13 @@ func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2ty
ConfigDigest: timestamp.ConfigDigest.Hex(),
})

// if last update was unsuccessful, check how much time passed since a successful update
if ds.latestUpdateErr != nil {
if time.Since(ds.latestTrrs.GetTaskRunResultsFinishedAt()) >= ds.stalenessAlertThreshold {
ds.lggr.Errorf("in memory cache is old and hasn't been updated for over %v, latestUpdateErr is: %v", ds.stalenessAlertThreshold, ds.latestUpdateErr)
}

}
return ds.parse(latestResult)
}

Expand Down
11 changes: 11 additions & 0 deletions core/services/pipeline/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,17 @@ func (result *TaskRunResult) IsTerminal() bool {
// TaskRunResults represents a collection of results for all task runs for one pipeline run
type TaskRunResults []TaskRunResult

// GetTaskRunResultsFinishedAt returns latest finishedAt time from TaskRunResults.
func (trrs TaskRunResults) GetTaskRunResultsFinishedAt() time.Time {
var finishedTime time.Time
for _, trr := range trrs {
if trr.FinishedAt.Valid && trr.FinishedAt.Time.After(finishedTime) {
finishedTime = trr.FinishedAt.Time
}
}
return finishedTime
}

// FinalResult pulls the FinalResult for the pipeline_run from the task runs
// It needs to respect the output index of each task
func (trrs TaskRunResults) FinalResult(l logger.Logger) FinalResult {
Expand Down
Loading