Skip to content

Commit

Permalink
Revert "Add cache state to task summaries on real runs" (#4390)
Browse files Browse the repository at this point in the history
Reverts #4225

Fixes #4389
  • Loading branch information
tknickman committed Mar 29, 2023
1 parent 4e626df commit 0bb52f9
Show file tree
Hide file tree
Showing 11 changed files with 59 additions and 129 deletions.
28 changes: 2 additions & 26 deletions cli/integration_tests/basic_monorepo/run_summary/run_summary.t
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,14 @@ Setup

$ TURBO_RUN_SUMMARY=true ${TURBO} run build -- someargs > /dev/null # first run (should be cache miss)

# HACK: Generated run summaries are named with a ksuid, which is a time-sorted ID. This _generally_ works
# but we're seeing in this test that sometimes a summary file is not sorted (with /bin/ls) in the order we expect
# causing intermittent test failures.
# Add a sleep statement so we can be sure that the second run is a later timestamp,
# so we can reliably get it with `|head -n1` and `|tail -n1` later in this test.
# When we start emitting the path to the run summary file that was generated, or a way to specify
# the output file, we can remove this and look for the file directly.
# If you find this sleep statement, try running this test 10 times in a row. If there are no
# failures, it *should* be safe to remove.
$ sleep 1
$ TURBO_RUN_SUMMARY=true ${TURBO} run build -- someargs > /dev/null # run again (expecting full turbo here)

# no output, just check for 0 status code, which means the directory was created
$ test -d .turbo/runs
# expect 2 run summaries are created
$ ls .turbo/runs/*.json | wc -l
\s*2 (re)
\s*1 (re)

# get jq-parsed output of each run summary
$ FIRST=$(/bin/ls .turbo/runs/*.json | head -n1)
$ SECOND=$(/bin/ls .turbo/runs/*.json | tail -n1)

# some top level run summary validation
$ cat $FIRST | jq '.tasks | length'
Expand All @@ -51,7 +38,6 @@ Setup

# Extract some task-specific summaries from each
$ FIRST_APP_BUILD=$("$TESTDIR/get-build.sh" "$FIRST" "my-app")
$ SECOND_APP_BUILD=$("$TESTDIR/get-build.sh" "$SECOND" "my-app")
$ FIRST_UTIL_BUILD=$("$TESTDIR/get-build.sh" "$FIRST" "util")

$ echo $FIRST_APP_BUILD | jq '.execution'
Expand All @@ -66,23 +52,13 @@ Setup
[
"someargs"
]

$ echo $FIRST_APP_BUILD | jq '.hashOfExternalDependencies'
"ccab0b28617f1f56"
$ echo $FIRST_APP_BUILD | jq '.expandedOutputs'
[
"apps/my-app/.turbo/turbo-build.log"
]
# validate that cache state updates in second run
$ echo $FIRST_APP_BUILD | jq '.cacheState'
{
"local": false,
"remote": false
}
$ echo $SECOND_APP_BUILD | jq '.cacheState'
{
"local": true,
"remote": false
}

# Some validation of util#build
$ echo $FIRST_UTIL_BUILD | jq '.execution'
Expand Down
2 changes: 1 addition & 1 deletion cli/internal/cache/async_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (c *asyncCache) Put(anchor turbopath.AbsoluteSystemPath, key string, durati
return nil
}

func (c *asyncCache) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
func (c *asyncCache) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (bool, []turbopath.AnchoredSystemPath, int, error) {
return c.realCache.Fetch(anchor, key, files)
}

Expand Down
21 changes: 5 additions & 16 deletions cli/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
type Cache interface {
// Fetch returns true if there is a cache it. It is expected to move files
// into their correct position as a side effect
Fetch(anchor turbopath.AbsoluteSystemPath, hash string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error)
Fetch(anchor turbopath.AbsoluteSystemPath, hash string, files []string) (bool, []turbopath.AnchoredSystemPath, int, error)
Exists(hash string) ItemStatus
// Put caches files for a given hash
Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error
Expand Down Expand Up @@ -232,24 +232,17 @@ func (mplex *cacheMultiplexer) removeCache(removal *cacheRemoval) {
}
}

func (mplex *cacheMultiplexer) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
func (mplex *cacheMultiplexer) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (bool, []turbopath.AnchoredSystemPath, int, error) {
// Make a shallow copy of the caches, since storeUntil can call removeCache
mplex.mu.RLock()
caches := make([]Cache, len(mplex.caches))
copy(caches, mplex.caches)
mplex.mu.RUnlock()

// We need to return a composite cache status from multiple caches
// Initialize the empty struct so we can assign values to it. This is similar
// to how the Exists() method works.
combinedCacheState := ItemStatus{}

// Retrieve from caches sequentially; if we did them simultaneously we could
// easily write the same file from two goroutines at once.
for i, cache := range caches {
itemStatus, actualFiles, duration, err := cache.Fetch(anchor, key, files)
ok := itemStatus.Local || itemStatus.Remote

ok, actualFiles, duration, err := cache.Fetch(anchor, key, files)
if err != nil {
cd := &util.CacheDisabledError{}
if errors.As(err, &cd) {
Expand All @@ -268,15 +261,11 @@ func (mplex *cacheMultiplexer) Fetch(anchor turbopath.AbsoluteSystemPath, key st
// we have previously successfully stored in a higher-priority cache, and so the overall
// result is a success at fetching. Storing in lower-priority caches is an optimization.
_ = mplex.storeUntil(anchor, key, duration, actualFiles, i)

// If another cache had already set this to true, we don't need to set it again from this cache
combinedCacheState.Local = combinedCacheState.Local || itemStatus.Local
combinedCacheState.Remote = combinedCacheState.Remote || itemStatus.Remote
return combinedCacheState, actualFiles, duration, err
return ok, actualFiles, duration, err
}
}

return ItemStatus{Local: false, Remote: false}, nil, 0, nil
return false, nil, 0, nil
}

func (mplex *cacheMultiplexer) Exists(target string) ItemStatus {
Expand Down
16 changes: 8 additions & 8 deletions cli/internal/cache/cache_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func newFsCache(opts Opts, recorder analytics.Recorder, repoRoot turbopath.Absol
}

// Fetch returns true if items are cached. It moves them into position as a side effect.
func (f *fsCache) Fetch(anchor turbopath.AbsoluteSystemPath, hash string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
func (f *fsCache) Fetch(anchor turbopath.AbsoluteSystemPath, hash string, _unusedOutputGlobs []string) (bool, []turbopath.AnchoredSystemPath, int, error) {
uncompressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar")
compressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar.zst")

Expand All @@ -45,33 +45,33 @@ func (f *fsCache) Fetch(anchor turbopath.AbsoluteSystemPath, hash string, _ []st
} else {
// It's not in the cache, bail now
f.logFetch(false, hash, 0)
return ItemStatus{Local: false}, nil, 0, nil
return false, nil, 0, nil
}

cacheItem, openErr := cacheitem.Open(actualCachePath)
if openErr != nil {
return ItemStatus{Local: false}, nil, 0, openErr
return false, nil, 0, openErr
}

restoredFiles, restoreErr := cacheItem.Restore(anchor)
if restoreErr != nil {
_ = cacheItem.Close()
return ItemStatus{Local: false}, nil, 0, restoreErr
return false, nil, 0, restoreErr
}

meta, err := ReadCacheMetaFile(f.cacheDirectory.UntypedJoin(hash + "-meta.json"))
if err != nil {
_ = cacheItem.Close()
return ItemStatus{Local: false}, nil, 0, fmt.Errorf("error reading cache metadata: %w", err)
return false, nil, 0, fmt.Errorf("error reading cache metadata: %w", err)
}
f.logFetch(true, hash, meta.Duration)

// Wait to see what happens with close.
closeErr := cacheItem.Close()
if closeErr != nil {
return ItemStatus{Local: false}, restoredFiles, 0, closeErr
return false, restoredFiles, 0, closeErr
}
return ItemStatus{Local: true}, restoredFiles, meta.Duration, nil
return true, restoredFiles, meta.Duration, nil
}

func (f *fsCache) Exists(hash string) ItemStatus {
Expand Down Expand Up @@ -129,7 +129,7 @@ func (f *fsCache) Put(anchor turbopath.AbsoluteSystemPath, hash string, duration
return cacheItem.Close()
}

func (f *fsCache) Clean(_ turbopath.AbsoluteSystemPath) {
func (f *fsCache) Clean(anchor turbopath.AbsoluteSystemPath) {
fmt.Println("Not implemented yet")
}

Expand Down
3 changes: 1 addition & 2 deletions cli/internal/cache/cache_fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,8 @@ func TestFetch(t *testing.T) {

outputDir := turbopath.AbsoluteSystemPath(t.TempDir())
dstOutputPath := "some-package"
cacheStatus, files, _, err := cache.Fetch(outputDir, "the-hash", []string{})
hit, files, _, err := cache.Fetch(outputDir, "the-hash", []string{})
assert.NilError(t, err, "Fetch")
hit := cacheStatus.Local || cacheStatus.Remote
if !hit {
t.Error("Fetch got false, want true")
}
Expand Down
10 changes: 5 additions & 5 deletions cli/internal/cache/cache_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var mtime = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
// nobody is the usual uid / gid of the 'nobody' user.
const nobody = 65534

func (cache *httpCache) Put(_ turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error {
func (cache *httpCache) Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error {
// if cache.writable {
cache.requestLimiter.acquire()
defer cache.requestLimiter.release()
Expand Down Expand Up @@ -143,16 +143,16 @@ func (cache *httpCache) storeFile(tw *tar.Writer, repoRelativePath turbopath.Anc
return err
}

func (cache *httpCache) Fetch(_ turbopath.AbsoluteSystemPath, key string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
func (cache *httpCache) Fetch(anchor turbopath.AbsoluteSystemPath, key string, _unusedOutputGlobs []string) (bool, []turbopath.AnchoredSystemPath, int, error) {
cache.requestLimiter.acquire()
defer cache.requestLimiter.release()
hit, files, duration, err := cache.retrieve(key)
if err != nil {
// TODO: analytics event?
return ItemStatus{Remote: false}, files, duration, fmt.Errorf("failed to retrieve files from HTTP cache: %w", err)
return false, files, duration, fmt.Errorf("failed to retrieve files from HTTP cache: %w", err)
}
cache.logFetch(hit, key, duration)
return ItemStatus{Remote: hit}, files, duration, err
return hit, files, duration, err
}

func (cache *httpCache) Exists(key string) ItemStatus {
Expand Down Expand Up @@ -349,7 +349,7 @@ func restoreSymlink(root turbopath.AbsoluteSystemPath, hdr *tar.Header, allowNon
return nil
}

func (cache *httpCache) Clean(_ turbopath.AbsoluteSystemPath) {
func (cache *httpCache) Clean(anchor turbopath.AbsoluteSystemPath) {
// Not possible; this implementation can only clean for a hash.
}

Expand Down
14 changes: 7 additions & 7 deletions cli/internal/cache/cache_noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ func newNoopCache() *noopCache {
return &noopCache{}
}

func (c *noopCache) Put(_ turbopath.AbsoluteSystemPath, _ string, _ int, _ []turbopath.AnchoredSystemPath) error {
func (c *noopCache) Put(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath) error {
return nil
}
func (c *noopCache) Fetch(_ turbopath.AbsoluteSystemPath, _ string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
return ItemStatus{Local: false, Remote: false}, nil, 0, nil
func (c *noopCache) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (bool, []turbopath.AnchoredSystemPath, int, error) {
return false, nil, 0, nil
}
func (c *noopCache) Exists(_ string) ItemStatus {
func (c *noopCache) Exists(key string) ItemStatus {
return ItemStatus{}
}

func (c *noopCache) Clean(_ turbopath.AbsoluteSystemPath) {}
func (c *noopCache) CleanAll() {}
func (c *noopCache) Shutdown() {}
func (c *noopCache) Clean(anchor turbopath.AbsoluteSystemPath) {}
func (c *noopCache) CleanAll() {}
func (c *noopCache) Shutdown() {}
22 changes: 10 additions & 12 deletions cli/internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ type testCache struct {
entries map[string][]turbopath.AnchoredSystemPath
}

func (tc *testCache) Fetch(_ turbopath.AbsoluteSystemPath, hash string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
func (tc *testCache) Fetch(anchor turbopath.AbsoluteSystemPath, hash string, files []string) (bool, []turbopath.AnchoredSystemPath, int, error) {
if tc.disabledErr != nil {
return ItemStatus{}, nil, 0, tc.disabledErr
return false, nil, 0, tc.disabledErr
}
foundFiles, ok := tc.entries[hash]
if ok {
duration := 5
return ItemStatus{Local: true}, foundFiles, duration, nil
return true, foundFiles, duration, nil
}
return ItemStatus{}, nil, 0, nil
return false, nil, 0, nil
}

func (tc *testCache) Exists(hash string) ItemStatus {
Expand All @@ -40,17 +40,17 @@ func (tc *testCache) Exists(hash string) ItemStatus {
return ItemStatus{}
}

func (tc *testCache) Put(_ turbopath.AbsoluteSystemPath, hash string, _ int, files []turbopath.AnchoredSystemPath) error {
func (tc *testCache) Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error {
if tc.disabledErr != nil {
return tc.disabledErr
}
tc.entries[hash] = files
return nil
}

func (tc *testCache) Clean(_ turbopath.AbsoluteSystemPath) {}
func (tc *testCache) CleanAll() {}
func (tc *testCache) Shutdown() {}
func (tc *testCache) Clean(anchor turbopath.AbsoluteSystemPath) {}
func (tc *testCache) CleanAll() {}
func (tc *testCache) Shutdown() {}

func newEnabledCache() *testCache {
return &testCache{
Expand Down Expand Up @@ -106,11 +106,10 @@ func TestPutCachingDisabled(t *testing.T) {
mplex.mu.RUnlock()

// subsequent Fetch should still work
cacheStatus, _, _, err := mplex.Fetch("unused-target", "some-hash", []string{"unused", "files"})
hit, _, _, err := mplex.Fetch("unused-target", "some-hash", []string{"unused", "files"})
if err != nil {
t.Errorf("got error fetching files: %v", err)
}
hit := cacheStatus.Local || cacheStatus.Remote
if !hit {
t.Error("failed to find previously stored files")
}
Expand Down Expand Up @@ -186,12 +185,11 @@ func TestFetchCachingDisabled(t *testing.T) {
},
}

cacheStatus, _, _, err := mplex.Fetch("unused-target", "some-hash", []string{"unused", "files"})
hit, _, _, err := mplex.Fetch("unused-target", "some-hash", []string{"unused", "files"})
if err != nil {
// don't leak the cache removal
t.Errorf("Fetch got error %v, want <nil>", err)
}
hit := cacheStatus.Local || cacheStatus.Remote
if hit {
t.Error("hit on empty cache, expected miss")
}
Expand Down
6 changes: 1 addition & 5 deletions cli/internal/run/real_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ func RealRun(
if taskExecutionSummary != nil {
taskSummary.ExpandedOutputs = taskHashTracker.GetExpandedOutputs(taskSummary.TaskID)
taskSummary.Execution = taskExecutionSummary
taskSummary.CacheState = taskHashTracker.GetCacheStatus(taskSummary.TaskID)

// lock since multiple things to be appending to this array at the same time
mu.Lock()
Expand Down Expand Up @@ -247,10 +246,7 @@ func (ec *execContext) exec(ctx gocontext.Context, packageTask *nodes.PackageTas
ErrorPrefix: prettyPrefix,
WarnPrefix: prettyPrefix,
}
cacheStatus, err := taskCache.RestoreOutputs(ctx, prefixedUI, progressLogger)
ec.taskHashTracker.SetCacheStatus(packageTask.TaskID, cacheStatus)

hit := cacheStatus.Local || cacheStatus.Remote
hit, err := taskCache.RestoreOutputs(ctx, prefixedUI, progressLogger)
if err != nil {
prefixedUI.Error(fmt.Sprintf("error fetching from cache: %s", err))
} else if hit {
Expand Down
Loading

0 comments on commit 0bb52f9

Please sign in to comment.