Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add stream methods for Page #1425

Merged
merged 25 commits into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ca6f08d
feat: add stream methods
JoeWang1127 Feb 27, 2023
3a08ea9
add clirr ignore rule
JoeWang1127 Feb 27, 2023
99adcac
Merge branch 'main' into feat/add-stream-methods
JoeWang1127 Feb 27, 2023
00e847e
add comments for stream methods
JoeWang1127 Feb 27, 2023
9688e3d
fix format
JoeWang1127 Feb 27, 2023
647d333
add tests for stream methods
JoeWang1127 Feb 27, 2023
580c6ef
Merge branch 'main' into feat/add-stream-methods
JoeWang1127 Feb 28, 2023
206c46f
modify tests
JoeWang1127 Feb 28, 2023
6fcfa9b
add showcase tests
JoeWang1127 Mar 1, 2023
9fbfa93
Revert "add showcase tests"
JoeWang1127 Mar 1, 2023
8206560
Merge branch 'main' into feat/add-stream-methods
JoeWang1127 Mar 1, 2023
d214420
add a integration test for stream methods
JoeWang1127 Mar 5, 2023
7b257d1
add copyright
JoeWang1127 Mar 5, 2023
9694bb1
change client builder
JoeWang1127 Mar 5, 2023
82a9d72
set page token
JoeWang1127 Mar 5, 2023
a7d26fe
remove page token in stream all
JoeWang1127 Mar 5, 2023
89128c1
remove page token in stream values
JoeWang1127 Mar 6, 2023
974712d
modify paged request
JoeWang1127 Mar 6, 2023
3da41ea
modify tests
JoeWang1127 Mar 6, 2023
df4c7e7
Merge remote-tracking branch 'origin/main' into feat/add-stream-methods
JoeWang1127 Mar 6, 2023
7682f22
create users only once
JoeWang1127 Mar 6, 2023
654fedb
delete showcase it
JoeWang1127 Mar 6, 2023
069d9a5
add comments
JoeWang1127 Mar 6, 2023
17a0a9c
Merge branch 'main' into feat/add-stream-methods
JoeWang1127 Apr 11, 2023
f133040
change unit test name
JoeWang1127 Apr 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions gax-java/gax/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<difference>
<!-- add default stream methods to `Page` interface -->
<differenceType>7012</differenceType>
suztomo marked this conversation as resolved.
Show resolved Hide resolved
<className>com/google/api/gax/paging/Page</className>
<method>* stream*(*)</method>
Comment on lines +7 to +8
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's strange that CLIRR is complaining the change. Adding default method in interface shouldn't cause a breaking changes.

Copy link
Member

@suztomo suztomo Mar 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoeWang1127 Just in case, would you build this gax-java locally (with -SNAPSHOT version) and try to use in java-storage to see anything breaks? You ran some storage samples (https://github.com/JoeWang1127/storage-demo) that you used to understand the request.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Specifying ignored differences of Clirr Maven Plugin doc, 7012 has the following definition:

7012 (Method Added to Interface): className, method

So adding a method to interface, default or not, is a breaking change to this rule.

Also, there's 7013:

7013 (Abstract Method Added to Class): className, method

which is not complain.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clirr isn't aware of java 8 default methods. It is looking at the bytecode from the perspective of java7 binary compatibility. When adding a default method it is safe to specify an ignore rule.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BenWhitehead Thanks for the clarification.

</difference>
</differences>
21 changes: 19 additions & 2 deletions gax-java/gax/src/main/java/com/google/api/gax/paging/Page.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
*/
package com.google.api.gax.paging;

import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
* A Page object wraps an API list method response.
*
Expand All @@ -52,12 +55,26 @@ public interface Page<ResourceT> {
Page<ResourceT> getNextPage();

/**
* Returns an iterable that traverses all of the elements of the underlying data source. The data
* is fetched lazily page by page, where each page may contain multiple elements. A new page is
* Returns an iterable that traverses all the elements of the underlying data source. The data is
* fetched lazily page by page, where each page may contain multiple elements. A new page is
* fetched whenever the elements of any particular page are exhausted.
*/
Iterable<ResourceT> iterateAll();

/** Returns an iterable over the elements in this page. */
Iterable<ResourceT> getValues();

/**
* Returns a stream that traverses all the elements of the underlying data source. The data is
* fetched lazily page by page, where each page may contain multiple elements. A new page is
* fetched whenever the elements of any particular page are exhausted.
*/
default Stream<ResourceT> streamAll() {
return StreamSupport.stream(iterateAll().spliterator(), false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BenWhitehead Would you elaborate what's not ideal? Parallel Stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is great for a first step of getting it on the api and saving our customers from having to do the same hoop jumping.

In the case of future improvements, right now this approach will not allow of parallel processing the same way a natively parallel friendly stream would. In all actuality, Page isn't fully parallelizable due to the fact that each page contains the nextPageToken necessary in order to fetch the next page. But, Page could be more parallel friendly where processing of the actual resources can happen on a thread separate from the thread performing the actual get.

Copy link
Member

@suztomo suztomo Mar 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoeWang1127 Do you have question or comment about parallelism? (I don't think of an implementation as of now and don't think we need to implement it right now)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, parallelism does not need to block this PR and can be an additional improvement in the future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@suztomo I'm not understand about the separate thread in Ben's comment:

Page could be more parallel friendly where processing of the actual resources can happen on a thread separate from the thread performing the actual get.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reference.

I don't have a question about it.

}

/** Returns a stream over the elements in this page. */
default Stream<ResourceT> streamValues() {
return StreamSupport.stream(getValues().spliterator(), false);
}
}
48 changes: 48 additions & 0 deletions gax-java/gax/src/test/java/com/google/api/gax/rpc/PagingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,54 @@ public void pagedByPage() {
Truth.assertThat(requestCapture.getAllValues()).containsExactly(0, 2, 4).inOrder();
}

@Test
public void streamedByPage() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update the test names based on the best practices and a recent TotT?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

ArgumentCaptor<Integer> requestCapture = ArgumentCaptor.forClass(Integer.class);
Mockito.when(callIntList.futureCall(requestCapture.capture(), Mockito.any()))
.thenReturn(ApiFutures.immediateFuture(Arrays.asList(0, 1, 2)))
.thenReturn(ApiFutures.immediateFuture(Arrays.asList(3, 4)))
.thenReturn(ApiFutures.immediateFuture(Collections.emptyList()));

Page<Integer> page =
FakeCallableFactory.createPagedCallable(
callIntList,
PagedCallSettings.newBuilder(new ListIntegersPagedResponseFactory()).build(),
clientContext)
.call(0)
.getPage();

Truth.assertThat(page.streamValues().count()).isEqualTo(3);
Truth.assertThat(page.hasNextPage()).isTrue();

page = page.getNextPage();
Truth.assertThat(page.streamValues().count()).isEqualTo(2);
Truth.assertThat(page.hasNextPage()).isTrue();

page = page.getNextPage();
Truth.assertThat(page.streamValues().count()).isEqualTo(0);
Truth.assertThat(page.hasNextPage()).isFalse();
Truth.assertThat(page.getNextPage()).isNull();
}

@Test
public void streamedAll() {
ArgumentCaptor<Integer> requestCapture = ArgumentCaptor.forClass(Integer.class);
Mockito.when(callIntList.futureCall(requestCapture.capture(), Mockito.any()))
.thenReturn(ApiFutures.immediateFuture(Arrays.asList(0, 1, 2)))
.thenReturn(ApiFutures.immediateFuture(Arrays.asList(3, 4)))
.thenReturn(ApiFutures.immediateFuture(Collections.emptyList()));

Page<Integer> page =
FakeCallableFactory.createPagedCallable(
callIntList,
PagedCallSettings.newBuilder(new ListIntegersPagedResponseFactory()).build(),
clientContext)
.call(0)
.getPage();

Truth.assertThat(page.streamAll().count()).isEqualTo(5);
}

@Test
public void pagedByFixedSizeCollection() {
ArgumentCaptor<Integer> requestCapture = ArgumentCaptor.forClass(Integer.class);
Expand Down