Skip to content

Commit

Permalink
Migrate gRPC channels to GCS v2 APIs (#590)
Browse files Browse the repository at this point in the history
  • Loading branch information
mprashanthsagar committed Aug 17, 2021
1 parent 96acdcd commit 9e14bea
Show file tree
Hide file tree
Showing 14 changed files with 751 additions and 3,362 deletions.
2 changes: 2 additions & 0 deletions bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@
<include>com.google.http-client</include>
<include>com.google.oauth-client</include>
<include>com.google.protobuf</include>
<include>com.google.storage.v2</include>
<include>io.grpc</include>
<include>io.opencensus</include>
<include>io.perfmark</include>
Expand All @@ -306,6 +307,7 @@
<include>com.google.longrunning.**</include>
<include>com.google.protobuf.**</include>
<include>com.google.rpc.**</include>
<include>com.google.storage.**</include>
</includes>
<excludes>
<exclude>com.google.cloud.hadoop.util.AccessTokenProvider</exclude>
Expand Down
2 changes: 2 additions & 0 deletions gcs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@
<include>com.google.http-client</include>
<include>com.google.oauth-client</include>
<include>com.google.protobuf</include>
<include>com.google.storage.v2</include>
<!-- Needed by http client and versions conflict -->
<include>commons-codec:commons-codec</include>
<include>io.grpc</include>
Expand Down Expand Up @@ -297,6 +298,7 @@
<include>com.google.protobuf.**</include>
<include>com.google.thirdparty.**</include>
<include>com.google.type.**</include>
<include>com.google.storage.**</include>
</includes>
<excludes>
<exclude>com.google.cloud.hadoop.gcsio.authorization.AuthorizationHandler</exclude>
Expand Down
29 changes: 4 additions & 25 deletions gcsio/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-cloud-storage-v2</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
Expand Down Expand Up @@ -167,10 +171,6 @@
</dependencies>
<build>
<plugins>
<plugin>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
Expand All @@ -180,27 +180,6 @@
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<configuration>
<protocArtifact>
com.google.protobuf:protoc:${google.protobuf.version}:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>
io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
import com.google.common.base.Preconditions;
import com.google.common.flogger.GoogleLogger;
import com.google.common.hash.Hashing;
import com.google.google.storage.v1.GetObjectMediaRequest;
import com.google.google.storage.v1.GetObjectMediaResponse;
import com.google.google.storage.v1.StorageGrpc;
import com.google.google.storage.v1.StorageGrpc.StorageBlockingStub;
import com.google.protobuf.ByteString;
import com.google.storage.v2.ReadObjectRequest;
import com.google.storage.v2.ReadObjectResponse;
import com.google.storage.v2.StorageGrpc;
import com.google.storage.v2.StorageGrpc.StorageBlockingStub;
import io.grpc.Context;
import io.grpc.Context.CancellableContext;
import io.grpc.MethodDescriptor;
Expand All @@ -60,10 +60,10 @@ public class GoogleCloudStorageGrpcReadChannel implements SeekableByteChannel {

// ZeroCopy version of GetObjectMedia Method
private static final ZeroCopyMessageMarshaller getObjectMediaResponseMarshaller =
new ZeroCopyMessageMarshaller(GetObjectMediaResponse.getDefaultInstance());
private static final MethodDescriptor<GetObjectMediaRequest, GetObjectMediaResponse>
new ZeroCopyMessageMarshaller(ReadObjectResponse.getDefaultInstance());
private static final MethodDescriptor<ReadObjectRequest, ReadObjectResponse>
getObjectMediaMethod =
StorageGrpc.getGetObjectMediaMethod()
StorageGrpc.getReadObjectMethod()
.toBuilder()
.setResponseMarshaller(getObjectMediaResponseMarshaller)
.build();
Expand Down Expand Up @@ -104,7 +104,7 @@ public class GoogleCloudStorageGrpcReadChannel implements SeekableByteChannel {
@Nullable private InputStream streamForBufferedContent = null;

// The streaming read operation. If null, there is not an in-flight read in progress.
@Nullable private Iterator<GetObjectMediaResponse> resIterator = null;
@Nullable private Iterator<ReadObjectResponse> resIterator = null;

// Fine-grained options.
private final GoogleCloudStorageReadOptions readOptions;
Expand Down Expand Up @@ -256,20 +256,20 @@ private static ByteString getFooterContent(
long footerOffset)
throws IOException {
try {
Iterator<GetObjectMediaResponse> footerContentResponse =
Iterator<ReadObjectResponse> footerContentResponse =
stub.withDeadlineAfter(readOptions.getGrpcReadTimeoutMillis(), MILLISECONDS)
.getObjectMedia(
GetObjectMediaRequest.newBuilder()
.readObject(
ReadObjectRequest.newBuilder()
.setReadOffset(footerOffset)
.setBucket(resourceId.getBucketName())
.setBucket(GrpcChannelUtils.toV2BucketName(resourceId.getBucketName()))
.setObject(resourceId.getObjectName())
.build());

ByteString footerContent = null;
while (footerContentResponse.hasNext()) {
GetObjectMediaResponse objectMediaResponse = footerContentResponse.next();
if (objectMediaResponse.hasChecksummedData()) {
ByteString content = objectMediaResponse.getChecksummedData().getContent();
ReadObjectResponse readObjectResponse = footerContentResponse.next();
if (readObjectResponse.hasChecksummedData()) {
ByteString content = readObjectResponse.getChecksummedData().getContent();
if (footerContent == null) {
footerContent = content;
} else {
Expand Down Expand Up @@ -439,7 +439,8 @@ private boolean isByteBufferBeyondCurrentRequestRange(ByteBuffer byteBuffer) {
private int readObjectContentFromGCS(ByteBuffer byteBuffer) throws IOException {
int bytesRead = 0;
while (moreServerContent() && byteBuffer.hasRemaining()) {
GetObjectMediaResponse res = resIterator.next();
ReadObjectResponse res = resIterator.next();

// When zero-copy mashaller is used, the stream that backs GetObjectMediaResponse
// should be closed when the mssage is no longed needed so that all buffers in the
// stream can be reclaimed. If zero-copy is not used, stream will be null.
Expand Down Expand Up @@ -484,11 +485,11 @@ private int readObjectContentFromGCS(ByteBuffer byteBuffer) throws IOException {
return bytesRead;
}

private void validateChecksum(GetObjectMediaResponse res) throws IOException {
private void validateChecksum(ReadObjectResponse res) throws IOException {
// TODO: Concatenate all these hashes together and compare the result at the end.
int calculatedChecksum =
Hashing.crc32c().hashBytes(res.getChecksummedData().getContent().toByteArray()).asInt();
int expectedChecksum = res.getChecksummedData().getCrc32C().getValue();
int expectedChecksum = res.getChecksummedData().getCrc32C();
if (calculatedChecksum != expectedChecksum) {
throw new IOException(
String.format(
Expand Down Expand Up @@ -534,14 +535,14 @@ private int readFooterContentIntoBuffer(ByteBuffer byteBuffer) {
}

private void requestObjectMedia(OptionalLong bytesToRead) throws IOException {
GetObjectMediaRequest.Builder requestBuilder =
GetObjectMediaRequest.newBuilder()
.setBucket(resourceId.getBucketName())
ReadObjectRequest.Builder requestBuilder =
ReadObjectRequest.newBuilder()
.setBucket(GrpcChannelUtils.toV2BucketName(resourceId.getBucketName()))
.setObject(resourceId.getObjectName())
.setGeneration(objectGeneration)
.setReadOffset(positionInGrpcStream);
bytesToRead.ifPresent(requestBuilder::setReadLimit);
GetObjectMediaRequest request = requestBuilder.build();
ReadObjectRequest request = requestBuilder.build();
try {
ResilientOperation.retry(
() -> {
Expand All @@ -559,7 +560,7 @@ private void requestObjectMedia(OptionalLong bytesToRead) throws IOException {
blockingStub.getCallOptions(),
request);
} else {
resIterator = blockingStub.getObjectMedia(request);
resIterator = blockingStub.readObject(request);
}
} finally {
requestContext.detach(toReattach);
Expand Down Expand Up @@ -641,9 +642,7 @@ public long position() throws IOException {
throw new ClosedChannelException();
}
// Our real position is tracked in "positionInGrpcStream," but if the user is skipping
// forwards
// a bit, we
// pretend we're at the new position already.
// forwards a bit, we pretend we're at the new position already.
return positionInGrpcStream + bytesToSkipBeforeReading;
}

Expand Down
Loading

0 comments on commit 9e14bea

Please sign in to comment.