Skip to content

High Throughput Generics Stream, Pipeline or Channel processing in Go.

License

Notifications You must be signed in to change notification settings

Planxnx/concurrent-stream

Repository files navigation

concurrent-stream: High Throughput Generics Stream/Pipeline/Channel Processing in Go.

Go Reference

go get github.com/planxnx/concurrent-stream

Examples

Basic

results := make(chan int)
stream := cstream.NewStream(ctx, 8, results)

go func() {
	for i := 0; i < 10; i++ {
		i := i
		stream.Go(func() int {
			return expensiveFunc(i)
		})
	}

	// Should be called to close the stream
	// after all tasks are submitted.
	stream.Close()
}()

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
	defer wg.Done()
	for result := range results {
		fmt.Println(result)
	}
}()

// Wait for all tasks to finish.
if err := stream.Wait(); err != nil {
	panic(err)
}
close(results)

wg.Wait()

Concurrency Mapping

data := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
mapper := NewParallelMap(ctx, 8, data, func(item int, _ int) {
	return expensiveFunc(item)
})

.
.
.

result, err := mapper.Result()
if err != nil {
	panic(err)
}

About

High Throughput Generics Stream, Pipeline or Channel processing in Go.

Resources

License

Stars

Watchers

Forks

Sponsor this project

 

Packages

No packages published

Languages