Skip to content

Commit

Permalink
Fix GCSUtils IT after Gcs gRPC launch (#32927)
Browse files Browse the repository at this point in the history
  • Loading branch information
Abacn authored Oct 25, 2024
1 parent 2dfa281 commit 6e3e70d
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 27 deletions.
1 change: 1 addition & 0 deletions .github/trigger_files/beam_PostCommit_Java.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
2 changes: 0 additions & 2 deletions sdks/java/extensions/google-cloud-platform-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,10 @@ task integrationTestKms(type: Test) {
group = "Verification"
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
def gcpTempRoot = project.findProperty('gcpTempRootKms') ?: 'gs://temp-storage-for-end-to-end-tests-cmek'
def gcpGrpcTempRoot = project.findProperty('gcpGrpcTempRoot') ?: 'gs://gcs-grpc-team-apache-beam-testing'
def dataflowKmsKey = project.findProperty('dataflowKmsKey') ?: "projects/apache-beam-testing/locations/global/keyRings/beam-it/cryptoKeys/test"
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--project=${gcpProject}",
"--tempRoot=${gcpTempRoot}",
"--grpcTempRoot=${gcpGrpcTempRoot}",
"--dataflowKmsKey=${dataflowKmsKey}",
])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;

import com.google.protobuf.ByteString;
import java.io.IOException;
Expand All @@ -35,10 +34,7 @@
import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.CreateOptions;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.testing.UsesKms;
Expand Down Expand Up @@ -99,8 +95,6 @@ public void testWriteAndReadGcsWithGrpc() throws IOException {
"%s/GcsUtilIT-%tF-%<tH-%<tM-%<tS-%<tL.testWriteAndReadGcsWithGrpc.txt";
final String testContent = "This is a test string.";

PipelineOptionsFactory.register(GcsGrpcOptions.class);

TestPipelineOptions options =
TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);

Expand All @@ -112,17 +106,8 @@ public void testWriteAndReadGcsWithGrpc() throws IOException {
GcsUtil gcsUtil = gcsOptions.getGcsUtil();
assertNotNull(gcsUtil);

// Write a test file in a bucket without gRPC enabled.
// This assumes that GCS gRPC feature is not enabled in every bucket by default.
assertNotNull(options.getTempRoot());
String tempLocationWithoutGrpc = options.getTempRoot() + "/temp";
String wrongFilename = String.format(outputPattern, tempLocationWithoutGrpc, new Date());
assertThrows(IOException.class, () -> writeGcsTextFile(gcsUtil, wrongFilename, testContent));

// Write a test file in a bucket with gRPC enabled.
GcsGrpcOptions grpcOptions = options.as(GcsGrpcOptions.class);
assertNotNull(grpcOptions.getGrpcTempRoot());
String tempLocationWithGrpc = grpcOptions.getGrpcTempRoot() + "/temp";
String tempLocationWithGrpc = options.getTempRoot() + "/temp";
String filename = String.format(outputPattern, tempLocationWithGrpc, new Date());
writeGcsTextFile(gcsUtil, filename, testContent);

Expand All @@ -132,15 +117,6 @@ public void testWriteAndReadGcsWithGrpc() throws IOException {
gcsUtil.remove(Collections.singletonList(filename));
}

public interface GcsGrpcOptions extends PipelineOptions {
/** Get tempRoot in a gRPC-enabled bucket. */
@Description("TempRoot in a gRPC-enabled bucket")
String getGrpcTempRoot();

/** Set the tempRoot in a gRPC-enabled bucket. */
void setGrpcTempRoot(String grpcTempRoot);
}

void writeGcsTextFile(GcsUtil gcsUtil, String filename, String content) throws IOException {
GcsPath gcsPath = GcsPath.fromUri(filename);
try (WritableByteChannel channel =
Expand Down

0 comments on commit 6e3e70d

Please sign in to comment.