Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate gRPC channels to GCS v2 APIs #590

Merged
merged 14 commits into from
Aug 17, 2021
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "gcsio/googleapis"]
medb marked this conversation as resolved.
Show resolved Hide resolved
path = gcsio/googleapis
url = https://github.com/googleapis/googleapis
16 changes: 11 additions & 5 deletions cloudbuild/cloudbuild.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
steps:
# 1. Create a Docker image containing hadoop-connectors repo
# 1. Clone git repo with submodules
- name: gcr.io/cloud-builders/git
id: 'git-clone'
args: [ 'submodule', 'update', '--init', '--recursive' ]

# 2. Create a Docker image containing hadoop-connectors repo
- name: 'gcr.io/cloud-builders/docker'
id: 'docker-build'
args: ['build', '--tag=gcr.io/$PROJECT_ID/dataproc-hadoop-connectors-presubmit', '-f', 'cloudbuild/Dockerfile', '.']
waitFor: ['git-clone']

# 2. Run Hadoop 2 unit tests concurrently
# 3. Run Hadoop 2 unit tests concurrently
- name: 'gcr.io/$PROJECT_ID/dataproc-hadoop-connectors-presubmit'
id: 'unit-tests-hadoop2'
waitFor: ['docker-build']
Expand All @@ -17,7 +23,7 @@ steps:
- 'VCS_TAG=$TAG_NAME'
- 'CI_BUILD_ID=$BUILD_ID'

# 3. Run Hadoop 3 unit tests concurrently
# 4. Run Hadoop 3 unit tests concurrently
- name: 'gcr.io/$PROJECT_ID/dataproc-hadoop-connectors-presubmit'
id: 'unit-tests-hadoop3'
waitFor: ['docker-build']
Expand All @@ -30,7 +36,7 @@ steps:
- 'VCS_TAG=$TAG_NAME'
- 'CI_BUILD_ID=$BUILD_ID'

# 4. Run Hadoop 2 integration tests concurrently with Hadoop 2 and Hadoop 3 unit tests
# 5. Run Hadoop 2 integration tests concurrently with Hadoop 2 and Hadoop 3 unit tests
- name: 'gcr.io/$PROJECT_ID/dataproc-hadoop-connectors-presubmit'
id: 'integration-tests-hadoop2'
waitFor: ['docker-build']
Expand All @@ -44,7 +50,7 @@ steps:
- 'VCS_TAG=$TAG_NAME'
- 'CI_BUILD_ID=$BUILD_ID'

# 5. Run Hadoop 3 integration tests concurrently with Hadoop 2 integration tests
# 6. Run Hadoop 3 integration tests concurrently with Hadoop 2 integration tests
- name: 'gcr.io/$PROJECT_ID/dataproc-hadoop-connectors-presubmit'
id: 'integration-tests-hadoop3'
waitFor: ['unit-tests-hadoop2', 'unit-tests-hadoop3']
Expand Down
1 change: 1 addition & 0 deletions gcsio/googleapis
Submodule googleapis added at 6cf3b8
1 change: 1 addition & 0 deletions gcsio/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<configuration>
<protoSourceRoot>googleapis/google/storage/v2</protoSourceRoot>
<protocArtifact>
com.google.protobuf:protoc:${google.protobuf.version}:exe:${os.detected.classifier}
</protocArtifact>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
import com.google.common.base.Preconditions;
import com.google.common.flogger.GoogleLogger;
import com.google.common.hash.Hashing;
import com.google.google.storage.v1.GetObjectMediaRequest;
import com.google.google.storage.v1.GetObjectMediaResponse;
import com.google.google.storage.v1.StorageGrpc;
import com.google.google.storage.v1.StorageGrpc.StorageBlockingStub;
import com.google.protobuf.ByteString;
import com.google.storage.v2.ReadObjectRequest;
import com.google.storage.v2.ReadObjectResponse;
import com.google.storage.v2.StorageGrpc;
import com.google.storage.v2.StorageGrpc.StorageBlockingStub;
import io.grpc.Context;
import io.grpc.Context.CancellableContext;
import io.grpc.MethodDescriptor;
Expand All @@ -60,10 +60,10 @@ public class GoogleCloudStorageGrpcReadChannel implements SeekableByteChannel {

// ZeroCopy version of GetObjectMedia Method
private static final ZeroCopyMessageMarshaller getObjectMediaResponseMarshaller =
new ZeroCopyMessageMarshaller(GetObjectMediaResponse.getDefaultInstance());
private static final MethodDescriptor<GetObjectMediaRequest, GetObjectMediaResponse>
new ZeroCopyMessageMarshaller(ReadObjectResponse.getDefaultInstance());
private static final MethodDescriptor<ReadObjectRequest, ReadObjectResponse>
getObjectMediaMethod =
StorageGrpc.getGetObjectMediaMethod()
StorageGrpc.getReadObjectMethod()
.toBuilder()
.setResponseMarshaller(getObjectMediaResponseMarshaller)
.build();
Expand Down Expand Up @@ -104,7 +104,7 @@ public class GoogleCloudStorageGrpcReadChannel implements SeekableByteChannel {
@Nullable private InputStream streamForBufferedContent = null;

// The streaming read operation. If null, there is not an in-flight read in progress.
@Nullable private Iterator<GetObjectMediaResponse> resIterator = null;
@Nullable private Iterator<ReadObjectResponse> resIterator = null;

// Fine-grained options.
private final GoogleCloudStorageReadOptions readOptions;
Expand Down Expand Up @@ -256,20 +256,20 @@ private static ByteString getFooterContent(
long footerOffset)
throws IOException {
try {
Iterator<GetObjectMediaResponse> footerContentResponse =
Iterator<ReadObjectResponse> footerContentResponse =
stub.withDeadlineAfter(readOptions.getGrpcReadTimeoutMillis(), MILLISECONDS)
.getObjectMedia(
GetObjectMediaRequest.newBuilder()
.readObject(
ReadObjectRequest.newBuilder()
.setReadOffset(footerOffset)
.setBucket(resourceId.getBucketName())
.setBucket(StringPaths.toV2BucketName(resourceId.getBucketName()))
.setObject(resourceId.getObjectName())
.build());

ByteString footerContent = null;
while (footerContentResponse.hasNext()) {
GetObjectMediaResponse objectMediaResponse = footerContentResponse.next();
if (objectMediaResponse.hasChecksummedData()) {
ByteString content = objectMediaResponse.getChecksummedData().getContent();
ReadObjectResponse readObjectResponse = footerContentResponse.next();
if (readObjectResponse.hasChecksummedData()) {
ByteString content = readObjectResponse.getChecksummedData().getContent();
if (footerContent == null) {
footerContent = content;
} else {
Expand Down Expand Up @@ -439,7 +439,8 @@ private boolean isByteBufferBeyondCurrentRequestRange(ByteBuffer byteBuffer) {
private int readObjectContentFromGCS(ByteBuffer byteBuffer) throws IOException {
int bytesRead = 0;
while (moreServerContent() && byteBuffer.hasRemaining()) {
GetObjectMediaResponse res = resIterator.next();
ReadObjectResponse res = resIterator.next();

// When zero-copy mashaller is used, the stream that backs GetObjectMediaResponse
// should be closed when the mssage is no longed needed so that all buffers in the
// stream can be reclaimed. If zero-copy is not used, stream will be null.
Expand Down Expand Up @@ -484,11 +485,11 @@ private int readObjectContentFromGCS(ByteBuffer byteBuffer) throws IOException {
return bytesRead;
}

private void validateChecksum(GetObjectMediaResponse res) throws IOException {
private void validateChecksum(ReadObjectResponse res) throws IOException {
// TODO: Concatenate all these hashes together and compare the result at the end.
int calculatedChecksum =
Hashing.crc32c().hashBytes(res.getChecksummedData().getContent().toByteArray()).asInt();
int expectedChecksum = res.getChecksummedData().getCrc32C().getValue();
int expectedChecksum = res.getChecksummedData().getCrc32C();
if (calculatedChecksum != expectedChecksum) {
throw new IOException(
String.format(
Expand Down Expand Up @@ -534,14 +535,14 @@ private int readFooterContentIntoBuffer(ByteBuffer byteBuffer) {
}

private void requestObjectMedia(OptionalLong bytesToRead) throws IOException {
GetObjectMediaRequest.Builder requestBuilder =
GetObjectMediaRequest.newBuilder()
.setBucket(resourceId.getBucketName())
ReadObjectRequest.Builder requestBuilder =
ReadObjectRequest.newBuilder()
.setBucket(StringPaths.toV2BucketName(resourceId.getBucketName()))
.setObject(resourceId.getObjectName())
.setGeneration(objectGeneration)
.setReadOffset(positionInGrpcStream);
bytesToRead.ifPresent(requestBuilder::setReadLimit);
GetObjectMediaRequest request = requestBuilder.build();
ReadObjectRequest request = requestBuilder.build();
try {
ResilientOperation.retry(
() -> {
Expand All @@ -559,7 +560,7 @@ private void requestObjectMedia(OptionalLong bytesToRead) throws IOException {
blockingStub.getCallOptions(),
request);
} else {
resIterator = blockingStub.getObjectMedia(request);
resIterator = blockingStub.readObject(request);
}
} finally {
requestContext.detach(toReattach);
Expand Down Expand Up @@ -642,8 +643,7 @@ public long position() throws IOException {
}
// Our real position is tracked in "positionInGrpcStream," but if the user is skipping
// forwards
// a bit, we
// pretend we're at the new position already.
// a bit, we pretend we're at the new position already.
mprashanthsagar marked this conversation as resolved.
Show resolved Hide resolved
return positionInGrpcStream + bytesToSkipBeforeReading;
}

Expand Down
Loading