Skip to content

Commit

Permalink
Bring ForEach back, with proper deprecation notice
Browse files Browse the repository at this point in the history
Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
  • Loading branch information
colega committed Jan 11, 2022
1 parent 5b0be33 commit 6faae52
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 1 deletion.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* [CHANGE] grpcutil: Convert Resolver into concrete type. #105
* [CHANGE] grpcutil.Resolver.Resolve: Take a service parameter. #102
* [CHANGE] grpcutil.Update: Remove gRPC LB related metadata. #102
* [CHANGE] concurrency.ForEach: replaced by `concurrency.ForEachJob`. #113
* [CHANGE] concurrency.ForEach: deprecated and reimplemented by new `concurrency.ForEachJob`. #113
* [ENHANCEMENT] Add middleware package. #38
* [ENHANCEMENT] Add the ring package #45
* [ENHANCEMENT] Add limiter package. #41
Expand Down
21 changes: 21 additions & 0 deletions concurrency/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,27 @@ func ForEachUser(ctx context.Context, userIDs []string, concurrency int, userFun
return errs.Err()
}

// ForEach runs the provided jobFunc for each job up to concurrency concurrent workers.
// The execution breaks on first error encountered.
// Deprecated.
// Use ForEachJob instead.
func ForEach(ctx context.Context, jobs []interface{}, concurrency int, jobFunc func(ctx context.Context, job interface{}) error) error {
return ForEachJob(ctx, len(jobs), concurrency, func(ctx context.Context, idx int) error {
return jobFunc(ctx, jobs[idx])
})
}

// CreateJobsFromStrings is an utility to create jobs from an slice of strings.
// Deprecated.
// Will be removed. Not needed when using ForEachJob.
func CreateJobsFromStrings(values []string) []interface{} {
jobs := make([]interface{}, len(values))
for i := 0; i < len(values); i++ {
jobs[i] = values[i]
}
return jobs
}

// ForEachJob runs the provided jobFunc for each job index in [0, jobs) up to concurrency concurrent workers.
// The execution breaks on first error encountered.
func ForEachJob(ctx context.Context, jobs int, concurrency int, jobFunc func(ctx context.Context, idx int) error) error {
Expand Down
95 changes: 95 additions & 0 deletions concurrency/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,98 @@ func TestForEachJob_ShouldReturnImmediatelyOnNoJobsProvided(t *testing.T) {
}))
require.Zero(t, processed.Load())
}

func TestForEach(t *testing.T) {
var (
ctx = context.Background()

// Keep track of processed jobs.
processedMx sync.Mutex
processed []string
)

jobs := []string{"a", "b", "c"}

err := ForEach(ctx, CreateJobsFromStrings(jobs), 2, func(ctx context.Context, job interface{}) error {
processedMx.Lock()
defer processedMx.Unlock()
processed = append(processed, job.(string))
return nil
})

require.NoError(t, err)
assert.ElementsMatch(t, jobs, processed)
}

func TestForEach_ShouldBreakOnFirstError_ContextCancellationHandled(t *testing.T) {
var (
ctx = context.Background()

// Keep the processed jobs count.
processed atomic.Int32
)

err := ForEach(ctx, []interface{}{"a", "b", "c"}, 2, func(ctx context.Context, job interface{}) error {
if processed.CAS(0, 1) {
return errors.New("the first request is failing")
}

// Wait 1s and increase the number of processed jobs, unless the context get canceled earlier.
select {
case <-time.After(time.Second):
processed.Add(1)
case <-ctx.Done():
return ctx.Err()
}

return nil
})

require.EqualError(t, err, "the first request is failing")

// Since we expect the first error interrupts the workers, we should only see
// 1 job processed (the one which immediately returned error).
assert.Equal(t, int32(1), processed.Load())
}

func TestForEach_ShouldBreakOnFirstError_ContextCancellationUnhandled(t *testing.T) {
var (
ctx = context.Background()

// Keep the processed jobs count.
processed atomic.Int32
)

// waitGroup to await the start of the first two jobs
var wg sync.WaitGroup
wg.Add(2)

err := ForEach(ctx, []interface{}{"a", "b", "c"}, 2, func(ctx context.Context, job interface{}) error {
wg.Done()

if processed.CAS(0, 1) {
// wait till two jobs have been started
wg.Wait()
return errors.New("the first request is failing")
}

// Wait till context is cancelled to add processed jobs.
<-ctx.Done()
processed.Add(1)

return nil
})

require.EqualError(t, err, "the first request is failing")

// Since we expect the first error interrupts the workers, we should only
// see 2 job processed (the one which immediately returned error and the
// job with "b").
assert.Equal(t, int32(2), processed.Load())
}

func TestForEach_ShouldReturnImmediatelyOnNoJobsProvided(t *testing.T) {
require.NoError(t, ForEach(context.Background(), nil, 2, func(ctx context.Context, job interface{}) error {
return nil
}))
}

0 comments on commit 6faae52

Please sign in to comment.