Skip to content

Commit

Permalink
PR feedback on job order and ctx.Err check
Browse files Browse the repository at this point in the history
Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
  • Loading branch information
colega committed Jan 11, 2022
1 parent 6faae52 commit a491840
Showing 1 changed file with 6 additions and 9 deletions.
15 changes: 6 additions & 9 deletions concurrency/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,27 +89,24 @@ func ForEachJob(ctx context.Context, jobs int, concurrency int, jobFunc func(ctx
return nil
}

indexes := atomic.Int64{}
indexes.Add(int64(jobs))
// Initialise indexes with -1 so first Inc() returns index 0.
indexes := atomic.NewInt64(-1)

// Start workers to process jobs.
g, ctx := errgroup.WithContext(ctx)
for ix := 0; ix < math.Min(concurrency, jobs); ix++ {
g.Go(func() error {
for {
idx := int(indexes.Dec())
if idx < 0 {
for ctx.Err() == nil {
idx := int(indexes.Inc())
if idx >= jobs {
return nil
}

if err := ctx.Err(); err != nil {
return err
}

if err := jobFunc(ctx, idx); err != nil {
return err
}
}
return ctx.Err()
})
}

Expand Down

0 comments on commit a491840

Please sign in to comment.