Skip to content

softleader/pagination-stream

Repository files navigation

version Maven Central

pagination-stream

Pagination stream API

<dependency>
  <groupId>tw.com.softleader.data</groupId>
  <artifactId>pagination-stream</artifactId>
  <version>last-release-version</version>
</dependency>

You can find the latest version on the Release Page.

Usage

First, define the method to be called. It must meet the following criteria:

  1. Input must be any combination of 0 to 10 args, ending with a Pageable object.
  2. Return type must be a Page<T>, for example:
Page<MyData> data(int a, long b, String c, Pageable pageable) {
  ...
}  

Then, create a Stream<List<T>> object using PageSupport#pagedStream. Each list in the stream represents the content of a page. For example:

PageSupport
  .pagedStream(fetch::data, 1, 2L, "3", Pageable.ofSize(10))
  .forEach(page -> { // page will be List<T>
    // do something to every page
  })

If you don't care about the number of pages and just want to stream the data from each page, you can create a Stream<T> using PageSupport#stream. For example:

PageSupport
  .stream(fetch::data, 1, 2L, "3", Pageable.ofSize(10))
  .forEach(data -> { // data will be MyData
    // do something to every single data
  })

Builder Pattern

PageSupport also provides a builder pattern. You can start building with PageSupport#of:

PageSupport
  .of(fetch::data)
  .args(1, 2L, "3", Pageable.ofSize(10))
  .stream()
  . ...

Using the builder pattern allows you to define the page stream configuration in advance and create multiple stream objects. For example:

var fetcher = PageSupport.of(fetch::data);

fetcher.args(1, 2L, "3", Pageable.ofSize(10))
  .stream()
  . ...
  
fetcher.args(10, 11L, "12", Pageable.ofSize(10))
  .pagedStream()
  . ...

Parallel

PageSupport also supports parallel streams, providing better performance in scenarios where page order is not important. For example:

PageSupport
  .stream(fetch::data, 1, 2L, "3", Pageable.ofSize(10))
  .parallel()
  ...

In parallel scenarios, the first page of data (P1) is fetched sequentially as a basis for splitting. Assuming P1 shows that there are four pages (P1, P2, P3, P4), the remaining three pages will be split into multiple Spliterators (S1, S2, S3). Each spliterator is an independent subtask.

+-----+-----+-----+-----+ 
|  P1 |  P2 |  P3 |  P4 | 
+-----+-----+-----+-----+ 
          |     |     |   
          |     |     |   
          v     v     v   
       +-----+-----+-----+
       |  S1 |  S2 |  S3 |
       +-----+-----+-----+

In summary, the key points of parallel processing are:

  1. The first page is fetched sequentially to serve as the basis for splitting spliterators (subtasks).
  2. The minimum unit of processing for a subtask is each page.
  3. Each subtask may not necessarily run on an independent thread, but will be handled by Java's parallelism mechanisms.

Performance Impact

Please note, using parallel streams does not always guarantee better performance. Factors such as the cost of thread creation and management can sometimes result in worse performance. Be sure to evaluate the specifics of your scenario before using parallel streams.

Before using, read more on the topic:

Example

Page from Repository

Suppose you have a complex computation, and the data comes from a database. The code example is as follows:

interface PersonRepository extends JpaRepository<Person, Long> {
 
  Page<Person> findAll(Specification<Person> spec, Pageable pageable);
}

class DifficultCalculationService {
  
  PersonRepository repository;
  
  long calculate(Specification<Person> spec, Pageable pageable) {
    return PageSupport.stream(repository::findAll, spec, pageable)
      .mapToLong(person -> {
        ...
      })
      .sum();
  }
}

Page from Remote API

Suppose the data comes from a remote API call using OpenFeign, the code example is as follows:

@FeignClient(name = "person-api", url = "http://somewhere/out/there")
interface PersonClient {
 
  @GetMapping("/people")
  Page<Person> findAll(PersonCriteria criteria, Pageable pageable);
}

class DifficultCalculationService {
  
  PersonClient client;
  
  long calculate(PersonCriteria criteria, Pageable pageable) {
    return PageSupport.stream(client::findAll, criteria, pageable)
      .mapToLong(person -> {
        ...
      })
      .sum();
  }
}

Caution

When dealing with situations where the number of records cannot be controlled, it is recommended to use streaming logic as much as possible to avoid impacting your app, such as memory OOM!

If you need to use .collect(Collectors.toList()) immediately after creating the Page Stream, this is a warning sign that you need to reconsider if there is a better way to handle it!

// Good
PageSupport.stream(fetch::data, pageable)
  .map(/* process each data in each page */)
  . .... // next step

// Bad
PageSupport.stream(fetch::data, pageable)
  .collect(...) // BOOM!