Skip to content

Commit

Permalink
fix: fix TestScheduleRetention (#9069)
Browse files Browse the repository at this point in the history
  • Loading branch information
salonig23 authored Mar 28, 2024
1 parent 1e45918 commit 53bf20e
Showing 1 changed file with 204 additions and 61 deletions.
265 changes: 204 additions & 61 deletions master/internal/api_logretention_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,24 @@ func setRetentionTime(timestamp string) error {
return err
}

func completeExpAndTrials(ctx context.Context, expID int32, trialIDs []int) error {
_, err := db.Bun().NewUpdate().Table("experiments").
Set("state = ?", model.CompletedState).
Where("id = ?", expID).
Exec(ctx)
if err != nil {
return err
}
_, err = db.Bun().NewUpdate().Table("runs").
Set("state = ?", model.CompletedState).
Where("id IN (?)", bun.In(trialIDs)).
Exec(ctx)
if err != nil {
return err
}
return nil
}

func quoteSetRetentionTime(timestamp time.Time) error {
return setRetentionTime(fmt.Sprintf("'%s'", timestamp.Format(pgTimeFormat)))
}
Expand Down Expand Up @@ -261,7 +279,7 @@ func incrementScheduler(
return timestamp, fakeClock
}

func TestScheduleRetention(t *testing.T) {
func TestScheduleRetentionNoConfig(t *testing.T) {
// Reset retention time to transaction time on exit.
defer func() {
require.NoError(t, resetRetentionTime())
Expand All @@ -284,32 +302,10 @@ func TestScheduleRetention(t *testing.T) {
require.NoError(t, err)

// Create an experiment1 with 5 trials and no special config.
experiment1, trialIDs1, taskIDs1 := createTestRetentionExperiment(ctx, t, api, "", 5)
require.Nil(t, experiment1.EndTime)
require.Len(t, trialIDs1, 5)
require.Len(t, taskIDs1, 5)

// Create an experiment1 with 5 trials and a config to expire in 1000 days.
experiment2, trialIDs2, taskIDs2 := createTestRetentionExperiment(ctx, t, api, "log_retention_days: 1000", 5)
require.Nil(t, experiment2.EndTime)
require.Len(t, trialIDs2, 5)
require.Len(t, taskIDs2, 5)

// Create an experiment1 with 5 trials and config to never expire.
experiment3, trialIDs3, taskIDs3 := createTestRetentionExperiment(ctx, t, api, "log_retention_days: -1", 5)
require.Nil(t, experiment3.EndTime)
require.Len(t, trialIDs3, 5)
require.Len(t, taskIDs3, 5)

taskIDs := []model.TaskID{}
taskIDs = append(taskIDs, taskIDs1...)
taskIDs = append(taskIDs, taskIDs2...)
taskIDs = append(taskIDs, taskIDs3...)

trialIDs := []int{}
trialIDs = append(trialIDs, trialIDs1...)
trialIDs = append(trialIDs, trialIDs2...)
trialIDs = append(trialIDs, trialIDs3...)
experiment, trialIDs, taskIDs := createTestRetentionExperiment(ctx, t, api, "", 5)
require.Nil(t, experiment.EndTime)
require.Len(t, trialIDs, 5)
require.Len(t, taskIDs, 5)

// Add logs for each task.
for _, taskID := range taskIDs {
Expand Down Expand Up @@ -337,7 +333,7 @@ func TestScheduleRetention(t *testing.T) {
// Verify that the logs are still there.
count, err := countTaskLogs(api.m.db, taskIDs)
require.NoError(t, err)
require.Equal(t, 30, count)
require.Equal(t, 10, count)

// Add an end time to the task logs.
for _, taskID := range taskIDs {
Expand All @@ -355,86 +351,233 @@ func TestScheduleRetention(t *testing.T) {
}

// Mark experiments and trials as completed.
_, err = db.Bun().NewUpdate().Table("experiments").
Set("state = ?", model.CompletedState).
Where("id IN (?)", bun.In([]int32{experiment1.Id, experiment2.Id, experiment3.Id})).
Exec(ctx)
require.NoError(t, err)
_, err = db.Bun().NewUpdate().Table("runs").
Set("state = ?", model.CompletedState).
Where("id IN (?)", bun.In(trialIDs)).
Exec(ctx)
err = completeExpAndTrials(ctx, experiment.Id, trialIDs)
require.NoError(t, err)

// Advance time by 1 day.
midnight, fakeClock = incrementScheduler(t, midnight, fakeClock, 1)
// Verify that the logs are still there.
count, err = countTaskLogs(api.m.db, taskIDs)
require.NoError(t, err)
require.Equal(t, 30, count)
require.Equal(t, 10, count)

// Advance time by 98 days.
midnight, fakeClock = incrementScheduler(t, midnight, fakeClock, 98)
// Verify that some logs are deleted.
_, _ = incrementScheduler(t, midnight, fakeClock, 98)
// Verify that logs are deleted.
count, err = countTaskLogs(api.m.db, taskIDs)
require.NoError(t, err)
require.Equal(t, 20, count)
require.Equal(t, 0, count)

// Ensure that experiment1 logs are deleted.
for _, taskID := range taskIDs1 {
for _, taskID := range taskIDs {
logCount, err := api.m.db.TaskLogsCount(taskID, nil)
require.NoError(t, err)
require.Equal(t, 0, logCount)
}
// Ensure that experiment2 logs are not deleted.
for _, taskID := range taskIDs2 {
}

func TestScheduleRetention1000days(t *testing.T) {
// Reset retention time to transaction time on exit.
defer func() {
require.NoError(t, resetRetentionTime())
}()

fakeClock := clockwork.NewFakeClock()
logretention.SetupScheduler(gocron.WithClock(fakeClock))
logretention.TestingOnlySynchronizationHelper = &sync.WaitGroup{}

api, _, ctx := setupAPITest(t, nil)

err := logretention.Schedule(model.LogRetentionPolicy{
Days: ptrs.Ptr(int16(100)),
Schedule: ptrs.Ptr("0 0 * * *"),
})
require.NoError(t, err)

// Clear all logs.
_, err = db.Bun().NewDelete().Model(&model.TaskLog{}).Where("TRUE").Exec(context.Background())
require.NoError(t, err)

// Create an experiment with 5 trials and a config to expire in 1000 days.
experiment, trialIDs, taskIDs := createTestRetentionExperiment(ctx, t, api, "log_retention_days: 1000", 5)
require.Nil(t, experiment.EndTime)
require.Len(t, trialIDs, 5)
require.Len(t, taskIDs, 5)

// Add logs for each task.
for _, taskID := range taskIDs {
task, err := db.TaskByID(ctx, taskID)
require.NoError(t, err)
require.Nil(t, task.EndTime)
require.NoError(t, api.m.db.AddTaskLogs(
[]*model.TaskLog{{TaskID: string(taskID), Log: "log1\n"}}))
require.NoError(t, api.m.db.AddTaskLogs(
[]*model.TaskLog{{TaskID: string(taskID), Log: "log2\n"}}))
}

// Check that the logs are there.
for _, taskID := range taskIDs {
logCount, err := api.m.db.TaskLogsCount(taskID, nil)
require.NoError(t, err)
require.Equal(t, 2, logCount)
require.Equal(t, logCount, 2)
}
// Ensure that experiment3 logs are not deleted.
for _, taskID := range taskIDs3 {

// Advance time to midnight.
now := time.Now()
midnight := time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, now.Location())
midnight, fakeClock = incrementScheduler(t, midnight, fakeClock, 1)

// Verify that the logs are still there.
count, err := countTaskLogs(api.m.db, taskIDs)
require.NoError(t, err)
require.Equal(t, 10, count)

// Add an end time to the task logs.
for _, taskID := range taskIDs {
logCount, err := api.m.db.TaskLogsCount(taskID, nil)
require.NoError(t, err)
require.Equal(t, 2, logCount)
task, err := db.TaskByID(context.Background(), taskID)
require.NoError(t, err)
task.EndTime = ptrs.Ptr(time.Now())
res, err := db.Bun().NewUpdate().Model(task).Where("task_id = ?", taskID).Exec(context.Background())
require.NoError(t, err)
rows, err := res.RowsAffected()
require.NoError(t, err)
require.Equal(t, int64(1), rows)
}

// Move time 899 days in the future.
midnight, fakeClock = incrementScheduler(t, midnight, fakeClock, 899)
// Mark experiments and trials as completed.
err = completeExpAndTrials(ctx, experiment.Id, trialIDs)
require.NoError(t, err)

// Advance time by 998 days.
midnight, fakeClock = incrementScheduler(t, midnight, fakeClock, 998)
// Verify that no logs are deleted.
count, err = countTaskLogs(api.m.db, taskIDs)
require.NoError(t, err)
require.Equal(t, 20, count)
require.Equal(t, 10, count)

// Ensure that experiment logs are not deleted.
for _, taskID := range taskIDs {
logCount, err := api.m.db.TaskLogsCount(taskID, nil)
require.NoError(t, err)
require.Equal(t, 2, logCount)
}

// Move time 1 day in the future.
midnight, fakeClock = incrementScheduler(t, midnight, fakeClock, 1)
// Verify that no logs are deleted.
_, _ = incrementScheduler(t, midnight, fakeClock, 1)
// Verify that logs are deleted.
count, err = countTaskLogs(api.m.db, taskIDs)
require.NoError(t, err)
require.Equal(t, 10, count)
require.Equal(t, 0, count)

// Ensure that experiment2 logs are deleted.
for _, taskID := range taskIDs2 {
// Ensure that experiment logs are deleted.
for _, taskID := range taskIDs {
logCount, err := api.m.db.TaskLogsCount(taskID, nil)
require.NoError(t, err)
require.Equal(t, 0, logCount)
}
// Ensure that experiment3 logs are not deleted.
for _, taskID := range taskIDs3 {
}

func TestScheduleRetentionNeverExpire(t *testing.T) {
// Reset retention time to transaction time on exit.
defer func() {
require.NoError(t, resetRetentionTime())
}()

fakeClock := clockwork.NewFakeClock()
logretention.SetupScheduler(gocron.WithClock(fakeClock))
logretention.TestingOnlySynchronizationHelper = &sync.WaitGroup{}

api, _, ctx := setupAPITest(t, nil)

err := logretention.Schedule(model.LogRetentionPolicy{
Days: ptrs.Ptr(int16(100)),
Schedule: ptrs.Ptr("0 0 * * *"),
})
require.NoError(t, err)

// Clear all logs.
_, err = db.Bun().NewDelete().Model(&model.TaskLog{}).Where("TRUE").Exec(context.Background())
require.NoError(t, err)

// Create an experiment with 5 trials and config to never expire.
experiment, trialIDs, taskIDs := createTestRetentionExperiment(ctx, t, api, "log_retention_days: -1", 5)
require.Nil(t, experiment.EndTime)
require.Len(t, trialIDs, 5)
require.Len(t, taskIDs, 5)

// Add logs for each task.
for _, taskID := range taskIDs {
task, err := db.TaskByID(ctx, taskID)
require.NoError(t, err)
require.Nil(t, task.EndTime)
require.NoError(t, api.m.db.AddTaskLogs(
[]*model.TaskLog{{TaskID: string(taskID), Log: "log1\n"}}))
require.NoError(t, api.m.db.AddTaskLogs(
[]*model.TaskLog{{TaskID: string(taskID), Log: "log2\n"}}))
}

// Check that the logs are there.
for _, taskID := range taskIDs {
logCount, err := api.m.db.TaskLogsCount(taskID, nil)
require.NoError(t, err)
require.Equal(t, logCount, 2)
}

// Advance time to midnight.
now := time.Now()
midnight := time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, now.Location())
midnight, fakeClock = incrementScheduler(t, midnight, fakeClock, 1)

// Verify that the logs are still there.
count, err := countTaskLogs(api.m.db, taskIDs)
require.NoError(t, err)
require.Equal(t, 10, count)

// Add an end time to the task logs.
for _, taskID := range taskIDs {
logCount, err := api.m.db.TaskLogsCount(taskID, nil)
require.NoError(t, err)
require.Equal(t, 2, logCount)
task, err := db.TaskByID(context.Background(), taskID)
require.NoError(t, err)
task.EndTime = ptrs.Ptr(time.Now())
res, err := db.Bun().NewUpdate().Model(task).Where("task_id = ?", taskID).Exec(context.Background())
require.NoError(t, err)
rows, err := res.RowsAffected()
require.NoError(t, err)
require.Equal(t, int64(1), rows)
}

// Move time 10 years in the future.
_, _ = incrementScheduler(t, midnight, fakeClock, 365*10)
// Mark experiments and trials as completed.
err = completeExpAndTrials(ctx, experiment.Id, trialIDs)
require.NoError(t, err)

// Advance time by 100 days.
midnight, fakeClock = incrementScheduler(t, midnight, fakeClock, 100)
// Verify that no logs are deleted.
count, err = countTaskLogs(api.m.db, taskIDs)
require.NoError(t, err)
require.Equal(t, 10, count)

// Ensure that experiment3 logs are not deleted.
for _, taskID := range taskIDs3 {
// Ensure that experiment logs are not deleted.
for _, taskID := range taskIDs {
logCount, err := api.m.db.TaskLogsCount(taskID, nil)
require.NoError(t, err)
require.Equal(t, 2, logCount)
}

// Move time 2 years in the future.
_, _ = incrementScheduler(t, midnight, fakeClock, 365*2)
// Verify that no logs are deleted.
count, err = countTaskLogs(api.m.db, taskIDs)
require.NoError(t, err)
require.Equal(t, 10, count)

// Ensure that experiment logs are not deleted.
for _, taskID := range taskIDs {
logCount, err := api.m.db.TaskLogsCount(taskID, nil)
require.NoError(t, err)
require.Equal(t, 2, logCount)
Expand Down

0 comments on commit 53bf20e

Please sign in to comment.