Skip to content

Commit

Permalink
Merge pull request #776 from garrettjonesgoogle/pubsub-alpha
Browse files Browse the repository at this point in the history
Adding bundling support for PublisherApi.publish
  • Loading branch information
garrettjonesgoogle committed Mar 22, 2016
2 parents e32afa6 + e369b3c commit 1a0e970
Show file tree
Hide file tree
Showing 10 changed files with 378 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@

package com.google.gcloud.pubsub.spi.v1;

import com.google.api.gax.grpc.ApiCallSettings;
import com.google.api.gax.grpc.ApiCallable;
import com.google.api.gax.grpc.ApiCallable.BundlableApiCallableInfo;
import com.google.api.gax.grpc.BundlerFactory;
import com.google.api.gax.protobuf.PathTemplate;
import com.google.protobuf.Empty;
import com.google.pubsub.v1.DeleteTopicRequest;
Expand Down Expand Up @@ -65,9 +66,25 @@
*/
@javax.annotation.Generated("by GAPIC")
public class PublisherApi implements AutoCloseable {
// ========
// Members
// ========

private final ManagedChannel channel;
private final List<AutoCloseable> closeables = new ArrayList<>();

private final ApiCallable<Topic, Topic> createTopicCallable;
private final ApiCallable<PublishRequest, PublishResponse> publishCallable;
private final ApiCallable<GetTopicRequest, Topic> getTopicCallable;
private final ApiCallable<ListTopicsRequest, ListTopicsResponse> listTopicsCallable;
private final ApiCallable<ListTopicsRequest, Iterable<Topic>> listTopicsIterableCallable;
private final ApiCallable<ListTopicSubscriptionsRequest, ListTopicSubscriptionsResponse>
listTopicSubscriptionsCallable;
private final ApiCallable<ListTopicSubscriptionsRequest, Iterable<String>>
listTopicSubscriptionsIterableCallable;
private final ApiCallable<DeleteTopicRequest, Empty> deleteTopicCallable;

public static class ResourceNames {
private ResourceNames() {}

// =======================
// ResourceNames Constants
Expand All @@ -93,6 +110,8 @@ private ResourceNames() {}
private static final PathTemplate TOPIC_PATH_TEMPLATE =
PathTemplate.create("projects/{project}/topics/{topic}");

private ResourceNames() {}

// ==============================
// Resource Name Helper Functions
// ==============================
Expand Down Expand Up @@ -153,24 +172,6 @@ public static final String parseTopicFromTopicPath(String topicPath) {
}
}

// ========
// Members
// ========

private final ManagedChannel channel;
private final List<AutoCloseable> closeables = new ArrayList<>();

private final ApiCallable<Topic, Topic> createTopicCallable;
private final ApiCallable<PublishRequest, PublishResponse> publishCallable;
private final ApiCallable<GetTopicRequest, Topic> getTopicCallable;
private final ApiCallable<ListTopicsRequest, ListTopicsResponse> listTopicsCallable;
private final ApiCallable<ListTopicsRequest, Iterable<Topic>> listTopicsIterableCallable;
private final ApiCallable<ListTopicSubscriptionsRequest, ListTopicSubscriptionsResponse>
listTopicSubscriptionsCallable;
private final ApiCallable<ListTopicSubscriptionsRequest, Iterable<String>>
listTopicSubscriptionsIterableCallable;
private final ApiCallable<DeleteTopicRequest, Empty> deleteTopicCallable;

// ===============
// Factory Methods
// ===============
Expand All @@ -186,8 +187,9 @@ public static PublisherApi create() throws IOException {
}

/**
* Constructs an instance of PublisherApi, using the given settings. The channels are created based
* on the settings passed in, or defaults for any settings that are not set.
* Constructs an instance of PublisherApi, using the given settings.
* The channels are created based on the settings passed in, or defaults for any
* settings that are not set.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -197,8 +199,9 @@ public static PublisherApi create(PublisherSettings settings) throws IOException
}

/**
* Constructs an instance of PublisherApi, using the given settings. This is protected so that it
* easy to make a subclass, but otherwise, the static factory methods should be preferred.
* Constructs an instance of PublisherApi, using the given settings.
* This is protected so that it easy to make a subclass, but otherwise, the static
* factory methods should be preferred.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -207,7 +210,14 @@ protected PublisherApi(PublisherSettings settings) throws IOException {
this.channel = settings.getChannel();

this.createTopicCallable = settings.createTopicMethod().build(settings);
this.publishCallable = settings.publishMethod().build(settings);
BundlableApiCallableInfo<PublishRequest, PublishResponse> bundlablePublish =
settings.publishMethod().buildBundlable(settings);
this.publishCallable = bundlablePublish.getApiCallable();
BundlerFactory<PublishRequest, PublishResponse> publishBundlerFactory =
bundlablePublish.getBundlerFactory();
if (publishBundlerFactory != null) {
this.closeables.add(publishBundlerFactory);
}
this.getTopicCallable = settings.getTopicMethod().build(settings);
this.listTopicsCallable = settings.listTopicsMethod().build(settings);
this.listTopicsIterableCallable = settings.listTopicsMethod().buildPageStreaming(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@
import com.google.api.gax.core.RetryParams;
import com.google.api.gax.grpc.ApiCallSettings;
import com.google.api.gax.grpc.ApiCallable.ApiCallableBuilder;
import com.google.api.gax.grpc.ApiCallable.BundlableApiCallableBuilder;
import com.google.api.gax.grpc.ApiCallable.PageStreamingApiCallableBuilder;
import com.google.api.gax.grpc.PageDescriptor;
import com.google.api.gax.grpc.BundlingDescriptor;
import com.google.api.gax.grpc.PageStreamingDescriptor;
import com.google.api.gax.grpc.RequestIssuer;
import com.google.api.gax.grpc.ServiceApiSettings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -56,8 +59,12 @@
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PublisherGrpc;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Topic;
import io.grpc.Status;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

// Manually-added imports: add custom (non-generated) imports after this point.

Expand Down Expand Up @@ -132,9 +139,11 @@ public class PublisherSettings extends ServiceApiSettings {
RETRY_PARAM_DEFINITIONS = definitions.build();
}

private final MethodBuilders methods;

private static class MethodBuilders {
private final ApiCallableBuilder<Topic, Topic> createTopicMethod;
private final ApiCallableBuilder<PublishRequest, PublishResponse> publishMethod;
private final BundlableApiCallableBuilder<PublishRequest, PublishResponse> publishMethod;
private final ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod;
private final PageStreamingApiCallableBuilder<ListTopicsRequest, ListTopicsResponse, Topic>
listTopicsMethod;
Expand All @@ -149,7 +158,8 @@ public MethodBuilders() {
createTopicMethod.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("idempotent"));
createTopicMethod.setRetryParams(RETRY_PARAM_DEFINITIONS.get("default"));

publishMethod = new ApiCallableBuilder<>(PublisherGrpc.METHOD_PUBLISH);
publishMethod =
new BundlableApiCallableBuilder<>(PublisherGrpc.METHOD_PUBLISH, PUBLISH_BUNDLING_DESC);
publishMethod.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("non_idempotent"));
publishMethod.setRetryParams(RETRY_PARAM_DEFINITIONS.get("default"));

Expand Down Expand Up @@ -187,8 +197,6 @@ public MethodBuilders() {
}
}

private final MethodBuilders methods;

// ===============
// Factory Methods
// ===============
Expand All @@ -211,8 +219,9 @@ public static PublisherSettings create() {
}

/**
* Constructs an instance of PublisherSettings with default settings. This is protected so that it
* easy to make a subclass, but otherwise, the static factory methods should be preferred.
* Constructs an instance of PublisherSettings with default settings. This is protected
* so that it easy to make a subclass, but otherwise, the static factory methods should be
* preferred.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -223,7 +232,7 @@ protected PublisherSettings(MethodBuilders methods) {
}

/**
* Returns the ApiCallableBuilder for the API method createTopic.
* Returns the builder for the API method createTopic.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -233,17 +242,17 @@ public ApiCallableBuilder<Topic, Topic> createTopicMethod() {
}

/**
* Returns the ApiCallableBuilder for the API method publish.
* Returns the builder for the API method publish.
*
* <!-- manual edit -->
* <!-- end manual edit -->
*/
public ApiCallableBuilder<PublishRequest, PublishResponse> publishMethod() {
public BundlableApiCallableBuilder<PublishRequest, PublishResponse> publishMethod() {
return methods.publishMethod;
}

/**
* Returns the ApiCallableBuilder for the API method getTopic.
* Returns the builder for the API method getTopic.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -253,7 +262,7 @@ public ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod() {
}

/**
* Returns the PageStreamingApiCallableBuilder for the API method listTopics.
* Returns the builder for the API method listTopics.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -264,7 +273,7 @@ public ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod() {
}

/**
* Returns the PageStreamingApiCallableBuilder for the API method listTopicSubscriptions.
* Returns the builder for the API method listTopicSubscriptions.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -276,7 +285,7 @@ public ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod() {
}

/**
* Returns the ApiCallableBuilder for the API method deleteTopic.
* Returns the builder for the API method deleteTopic.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -285,9 +294,9 @@ public ApiCallableBuilder<DeleteTopicRequest, Empty> deleteTopicMethod() {
return methods.deleteTopicMethod;
}

private static PageDescriptor<ListTopicsRequest, ListTopicsResponse, Topic>
private static PageStreamingDescriptor<ListTopicsRequest, ListTopicsResponse, Topic>
LIST_TOPICS_PAGE_STR_DESC =
new PageDescriptor<ListTopicsRequest, ListTopicsResponse, Topic>() {
new PageStreamingDescriptor<ListTopicsRequest, ListTopicsResponse, Topic>() {
@Override
public Object emptyToken() {
return "";
Expand All @@ -309,10 +318,10 @@ public Iterable<Topic> extractResources(ListTopicsResponse payload) {
}
};

private static PageDescriptor<
private static PageStreamingDescriptor<
ListTopicSubscriptionsRequest, ListTopicSubscriptionsResponse, String>
LIST_TOPIC_SUBSCRIPTIONS_PAGE_STR_DESC =
new PageDescriptor<
new PageStreamingDescriptor<
ListTopicSubscriptionsRequest, ListTopicSubscriptionsResponse, String>() {
@Override
public Object emptyToken() {
Expand All @@ -337,4 +346,66 @@ public Iterable<String> extractResources(ListTopicSubscriptionsResponse payload)
return payload.getSubscriptionsList();
}
};

private static BundlingDescriptor<PublishRequest, PublishResponse> PUBLISH_BUNDLING_DESC =
new BundlingDescriptor<PublishRequest, PublishResponse>() {
@Override
public String getBundlePartitionKey(PublishRequest request) {
return request.getTopic();
}

@Override
public PublishRequest mergeRequests(Collection<PublishRequest> requests) {
PublishRequest firstRequest = requests.iterator().next();

List<PubsubMessage> elements = new ArrayList<>();
for (PublishRequest request : requests) {
elements.addAll(request.getMessagesList());
}

PublishRequest bundleRequest =
PublishRequest.newBuilder()
.setTopic(firstRequest.getTopic())
.addAllMessages(elements)
.build();
return bundleRequest;
}

@Override
public void splitResponse(
PublishResponse bundleResponse,
Collection<? extends RequestIssuer<PublishRequest, PublishResponse>> bundle) {
int bundleMessageIndex = 0;
for (RequestIssuer<PublishRequest, PublishResponse> responder : bundle) {
List<String> subresponseElements = new ArrayList<>();
int subresponseCount = responder.getRequest().getMessagesCount();
for (int i = 0; i < subresponseCount; i++) {
subresponseElements.add(bundleResponse.getMessageIds(bundleMessageIndex));
bundleMessageIndex += 1;
}
PublishResponse response =
PublishResponse.newBuilder().addAllMessageIds(subresponseElements).build();
responder.setResponse(response);
}
}

@Override
public void splitException(
Throwable throwable,
Collection<? extends RequestIssuer<PublishRequest, PublishResponse>> bundle) {
for (RequestIssuer<PublishRequest, PublishResponse> responder : bundle) {
responder.setException(throwable);
}
}

@Override
public long countElements(PublishRequest request) {
return request.getMessagesCount();
}

@Override
public long countBytes(PublishRequest request) {
return request.getSerializedSize();
}
};
}
Loading

0 comments on commit 1a0e970

Please sign in to comment.