diff --git a/CHANGELOG.md b/CHANGELOG.md index 8788767fe..230ee1d6f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,7 @@ * [CHANGE] grpcutil: Convert Resolver into concrete type. #105 * [CHANGE] grpcutil.Resolver.Resolve: Take a service parameter. #102 * [CHANGE] grpcutil.Update: Remove gRPC LB related metadata. #102 -* [CHANGE] concurrency.ForEach: replaced by `concurrency.ForEachJob`. #113 +* [CHANGE] concurrency.ForEach: deprecated and reimplemented by new `concurrency.ForEachJob`. #113 * [ENHANCEMENT] Add middleware package. #38 * [ENHANCEMENT] Add the ring package #45 * [ENHANCEMENT] Add limiter package. #41 diff --git a/concurrency/runner.go b/concurrency/runner.go index 0386c36d2..7019e1aa9 100644 --- a/concurrency/runner.go +++ b/concurrency/runner.go @@ -61,6 +61,27 @@ func ForEachUser(ctx context.Context, userIDs []string, concurrency int, userFun return errs.Err() } +// ForEach runs the provided jobFunc for each job up to concurrency concurrent workers. +// The execution breaks on first error encountered. +// Deprecated. +// Use ForEachJob instead. +func ForEach(ctx context.Context, jobs []interface{}, concurrency int, jobFunc func(ctx context.Context, job interface{}) error) error { + return ForEachJob(ctx, len(jobs), concurrency, func(ctx context.Context, idx int) error { + return jobFunc(ctx, jobs[idx]) + }) +} + +// CreateJobsFromStrings is an utility to create jobs from an slice of strings. +// Deprecated. +// Will be removed. Not needed when using ForEachJob. +func CreateJobsFromStrings(values []string) []interface{} { + jobs := make([]interface{}, len(values)) + for i := 0; i < len(values); i++ { + jobs[i] = values[i] + } + return jobs +} + // ForEachJob runs the provided jobFunc for each job index in [0, jobs) up to concurrency concurrent workers. // The execution breaks on first error encountered. func ForEachJob(ctx context.Context, jobs int, concurrency int, jobFunc func(ctx context.Context, idx int) error) error { diff --git a/concurrency/runner_test.go b/concurrency/runner_test.go index ab4eed7d3..30b7477f5 100644 --- a/concurrency/runner_test.go +++ b/concurrency/runner_test.go @@ -164,3 +164,98 @@ func TestForEachJob_ShouldReturnImmediatelyOnNoJobsProvided(t *testing.T) { })) require.Zero(t, processed.Load()) } + +func TestForEach(t *testing.T) { + var ( + ctx = context.Background() + + // Keep track of processed jobs. + processedMx sync.Mutex + processed []string + ) + + jobs := []string{"a", "b", "c"} + + err := ForEach(ctx, CreateJobsFromStrings(jobs), 2, func(ctx context.Context, job interface{}) error { + processedMx.Lock() + defer processedMx.Unlock() + processed = append(processed, job.(string)) + return nil + }) + + require.NoError(t, err) + assert.ElementsMatch(t, jobs, processed) +} + +func TestForEach_ShouldBreakOnFirstError_ContextCancellationHandled(t *testing.T) { + var ( + ctx = context.Background() + + // Keep the processed jobs count. + processed atomic.Int32 + ) + + err := ForEach(ctx, []interface{}{"a", "b", "c"}, 2, func(ctx context.Context, job interface{}) error { + if processed.CAS(0, 1) { + return errors.New("the first request is failing") + } + + // Wait 1s and increase the number of processed jobs, unless the context get canceled earlier. + select { + case <-time.After(time.Second): + processed.Add(1) + case <-ctx.Done(): + return ctx.Err() + } + + return nil + }) + + require.EqualError(t, err, "the first request is failing") + + // Since we expect the first error interrupts the workers, we should only see + // 1 job processed (the one which immediately returned error). + assert.Equal(t, int32(1), processed.Load()) +} + +func TestForEach_ShouldBreakOnFirstError_ContextCancellationUnhandled(t *testing.T) { + var ( + ctx = context.Background() + + // Keep the processed jobs count. + processed atomic.Int32 + ) + + // waitGroup to await the start of the first two jobs + var wg sync.WaitGroup + wg.Add(2) + + err := ForEach(ctx, []interface{}{"a", "b", "c"}, 2, func(ctx context.Context, job interface{}) error { + wg.Done() + + if processed.CAS(0, 1) { + // wait till two jobs have been started + wg.Wait() + return errors.New("the first request is failing") + } + + // Wait till context is cancelled to add processed jobs. + <-ctx.Done() + processed.Add(1) + + return nil + }) + + require.EqualError(t, err, "the first request is failing") + + // Since we expect the first error interrupts the workers, we should only + // see 2 job processed (the one which immediately returned error and the + // job with "b"). + assert.Equal(t, int32(2), processed.Load()) +} + +func TestForEach_ShouldReturnImmediatelyOnNoJobsProvided(t *testing.T) { + require.NoError(t, ForEach(context.Background(), nil, 2, func(ctx context.Context, job interface{}) error { + return nil + })) +}