Skip to content

Commit

Permalink
functions: with error check should check the error twice
Browse files Browse the repository at this point in the history
  • Loading branch information
tychoish committed Aug 31, 2023
1 parent bd10ca7 commit b1a1df7
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 5 deletions.
7 changes: 6 additions & 1 deletion process.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,12 +303,17 @@ func (pf Processor[T]) WithoutErrors(errs ...error) Processor[T] {
// error (any error), the processor propagates that error, rather than
// running the underying processor. Useful for injecting an abort into
// an existing pipleine or chain.
//
// The error future is called before running the underlying processor,
// to short circuit the operation, and also a second time when
// processor has returned in case an error has occurred during the
// operation of the processor.
func (pf Processor[T]) WithErrorCheck(ef Future[error]) Processor[T] {
return func(ctx context.Context, in T) error {
if err := ef(); err != nil {
return err
}
return pf(ctx, in)
return ers.Join(pf(ctx, in), ef())
}
}

Expand Down
9 changes: 6 additions & 3 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (pf Producer[T]) CheckForce() (T, bool) { o, e := pf.Wait();
// original producer returns.
func (pf Producer[T]) Launch(ctx context.Context) Producer[T] {
eh, ef := HF.ErrorCollector()
pipe := Blocking(make(chan T, 2))
pipe := Blocking(make(chan T, 1))
pipe.Send().Processor().
ReadAll(pf).
Operation(eh).
Expand All @@ -235,9 +235,12 @@ func (pf Producer[T]) WithErrorCheck(ef Future[error]) Producer[T] {
if err := ef(); err != nil {
return zero, err
}
return pf.Resolve(ctx)
out, err := pf.Resolve(ctx)
if err = ers.Join(err, ef()); err != nil {
return zero, err
}
return out, nil
}

}

// Once returns a producer that only executes ones, and caches the
Expand Down
7 changes: 6 additions & 1 deletion worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,12 +367,17 @@ func (wf Worker) PostHook(op func()) Worker {
// error (any error), the worker propagates that error, rather than
// running the underying producer. Useful for injecting an abort into
// an existing pipleine or chain.
//
// The error future is called before running the underlying worker,
// to short circuit the operation, and also a second time when
// worker has returned in case an error has occurred during the
// operation of the worker.
func (wf Worker) WithErrorCheck(ef Future[error]) Worker {
return func(ctx context.Context) error {
if err := ef(); err != nil {
return err
}
return wf.Run(ctx)
return ers.Join(wf.Run(ctx), ef())
}
}

Expand Down

0 comments on commit b1a1df7

Please sign in to comment.