diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json
index a03c067d2c4e..1efc8e9e4405 100644
--- a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json
+++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json
@@ -1,3 +1,4 @@
{
- "comment": "Modify this file in a trivial way to cause this test suite to run"
+ "comment": "Modify this file in a trivial way to cause this test suite to run",
+ "modification": 1
}
diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
index b03350966d6c..e7990e2f1085 100644
--- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
+++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
@@ -70,6 +70,10 @@ message ManagedTransforms {
"beam:schematransform:org.apache.beam:kafka_read:v1"];
KAFKA_WRITE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:kafka_write:v1"];
+ BIGQUERY_READ = 4 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+ "beam:schematransform:org.apache.beam:bigquery_storage_read:v1"];
+ BIGQUERY_WRITE = 5 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+ "beam:schematransform:org.apache.beam:bigquery_storage_write:v2"];
}
}
diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle
index 3e322d976c1a..2acce3e94cc2 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -159,6 +159,7 @@ dependencies {
testImplementation project(path: ":sdks:java:extensions:google-cloud-platform-core", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:extensions:protobuf", configuration: "testRuntimeMigration")
testImplementation project(path: ":runners:direct-java", configuration: "shadow")
+ testImplementation project(":sdks:java:managed")
testImplementation project(path: ":sdks:java:io:common")
testImplementation project(path: ":sdks:java:testing:test-utils")
testImplementation library.java.commons_math3
diff --git a/sdks/java/io/google-cloud-platform/expansion-service/build.gradle b/sdks/java/io/google-cloud-platform/expansion-service/build.gradle
index 1288d91964e1..f6c6f07d0cdf 100644
--- a/sdks/java/io/google-cloud-platform/expansion-service/build.gradle
+++ b/sdks/java/io/google-cloud-platform/expansion-service/build.gradle
@@ -36,6 +36,9 @@ dependencies {
permitUnusedDeclared project(":sdks:java:io:google-cloud-platform") // BEAM-11761
implementation project(":sdks:java:extensions:schemaio-expansion-service")
permitUnusedDeclared project(":sdks:java:extensions:schemaio-expansion-service") // BEAM-11761
+ implementation project(":sdks:java:managed")
+ permitUnusedDeclared project(":sdks:java:managed") // BEAM-11761
+
runtimeOnly library.java.slf4j_jdk14
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformConfiguration.java
deleted file mode 100644
index f634b5ec6f60..000000000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformConfiguration.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import com.google.auto.value.AutoValue;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
-import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
-
-/**
- * Configuration for writing to BigQuery.
- *
- *
This class is meant to be used with {@link BigQueryFileLoadsWriteSchemaTransformProvider}.
- *
- *
Internal only: This class is actively being worked on, and it will likely change. We
- * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
- * repository.
- */
-@DefaultSchema(AutoValueSchema.class)
-@AutoValue
-public abstract class BigQueryFileLoadsWriteSchemaTransformConfiguration {
-
- /** Instantiates a {@link BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder}. */
- public static Builder builder() {
- return new AutoValue_BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder();
- }
-
- /**
- * Writes to the given table specification. See {@link BigQueryIO.Write#to(String)}} for the
- * expected format.
- */
- public abstract String getTableSpec();
-
- /** Specifies whether the table should be created if it does not exist. */
- public abstract String getCreateDisposition();
-
- /** Specifies what to do with existing data in the table, in case the table already exists. */
- public abstract String getWriteDisposition();
-
- @AutoValue.Builder
- public abstract static class Builder {
-
- /**
- * Writes to the given table specification. See {@link BigQueryIO.Write#to(String)}} for the
- * expected format.
- */
- public abstract Builder setTableSpec(String value);
-
- /** Specifies whether the table should be created if it does not exist. */
- public abstract Builder setCreateDisposition(String value);
-
- /** Specifies what to do with existing data in the table, in case the table already exists. */
- public abstract Builder setWriteDisposition(String value);
-
- /** Builds the {@link BigQueryFileLoadsWriteSchemaTransformConfiguration} configuration. */
- public abstract BigQueryFileLoadsWriteSchemaTransformConfiguration build();
- }
-}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java
index 3212e2a30348..7c89cb09ef6d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java
@@ -17,34 +17,25 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
import com.google.auto.service.AutoService;
-import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteConfiguration;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
-import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
/**
* An implementation of {@link TypedSchemaTransformProvider} for BigQuery write jobs configured
- * using {@link BigQueryFileLoadsWriteSchemaTransformConfiguration}.
+ * using {@link BigQueryWriteConfiguration}.
*
*
Internal only: This class is actively being worked on, and it will likely change. We
* provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
@@ -56,127 +47,78 @@
@Internal
@AutoService(SchemaTransformProvider.class)
public class BigQueryFileLoadsWriteSchemaTransformProvider
- extends TypedSchemaTransformProvider {
+ extends TypedSchemaTransformProvider {
private static final String IDENTIFIER =
- "beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1";
- static final String INPUT_TAG = "INPUT";
+ "beam:schematransform:org.apache.beam:bigquery_fileloads:v1";
+ static final String INPUT_TAG = "input";
- /** Returns the expected class of the configuration. */
@Override
- protected Class configurationClass() {
- return BigQueryFileLoadsWriteSchemaTransformConfiguration.class;
- }
-
- /** Returns the expected {@link SchemaTransform} of the configuration. */
- @Override
- protected SchemaTransform from(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) {
+ protected SchemaTransform from(BigQueryWriteConfiguration configuration) {
return new BigQueryWriteSchemaTransform(configuration);
}
- /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
@Override
public String identifier() {
return IDENTIFIER;
}
- /**
- * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since a
- * single is expected, this returns a list with a single name.
- */
@Override
public List inputCollectionNames() {
return Collections.singletonList(INPUT_TAG);
}
- /**
- * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
- * no output is expected, this returns an empty list.
- */
@Override
public List outputCollectionNames() {
return Collections.emptyList();
}
- /**
- * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based on a {@link
- * BigQueryFileLoadsWriteSchemaTransformConfiguration}.
- */
protected static class BigQueryWriteSchemaTransform extends SchemaTransform {
/** An instance of {@link BigQueryServices} used for testing. */
private BigQueryServices testBigQueryServices = null;
- private final BigQueryFileLoadsWriteSchemaTransformConfiguration configuration;
+ private final BigQueryWriteConfiguration configuration;
- BigQueryWriteSchemaTransform(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) {
+ BigQueryWriteSchemaTransform(BigQueryWriteConfiguration configuration) {
+ configuration.validate();
this.configuration = configuration;
}
@Override
- public void validate(PipelineOptions options) {
- if (!configuration.getCreateDisposition().equals(CreateDisposition.CREATE_NEVER.name())) {
- return;
- }
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ PCollection rowPCollection = input.getSinglePCollection();
+ BigQueryIO.Write write = toWrite();
+ rowPCollection.apply(write);
- BigQueryOptions bigQueryOptions = options.as(BigQueryOptions.class);
+ return PCollectionRowTuple.empty(input.getPipeline());
+ }
- BigQueryServices bigQueryServices = new BigQueryServicesImpl();
- if (testBigQueryServices != null) {
- bigQueryServices = testBigQueryServices;
+ BigQueryIO.Write toWrite() {
+ BigQueryIO.Write write =
+ BigQueryIO.write()
+ .to(configuration.getTable())
+ .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
+ .withFormatFunction(BigQueryUtils.toTableRow())
+ .useBeamSchema();
+
+ if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
+ CreateDisposition createDisposition =
+ CreateDisposition.valueOf(configuration.getCreateDisposition().toUpperCase());
+ write = write.withCreateDisposition(createDisposition);
}
-
- DatasetService datasetService = bigQueryServices.getDatasetService(bigQueryOptions);
- TableReference tableReference = BigQueryUtils.toTableReference(configuration.getTableSpec());
-
- try {
- Table table = datasetService.getTable(tableReference);
- if (table == null) {
- throw new NullPointerException();
- }
-
- if (table.getSchema() == null) {
- throw new InvalidConfigurationException(
- String.format("could not fetch schema for table: %s", configuration.getTableSpec()));
- }
-
- } catch (NullPointerException | InterruptedException | IOException ex) {
- throw new InvalidConfigurationException(
- String.format(
- "could not fetch table %s, error: %s",
- configuration.getTableSpec(), ex.getMessage()));
+ if (!Strings.isNullOrEmpty(configuration.getWriteDisposition())) {
+ WriteDisposition writeDisposition =
+ WriteDisposition.valueOf(configuration.getWriteDisposition().toUpperCase());
+ write = write.withWriteDisposition(writeDisposition);
+ }
+ if (!Strings.isNullOrEmpty(configuration.getKmsKey())) {
+ write = write.withKmsKey(configuration.getKmsKey());
}
- }
-
- @Override
- public PCollectionRowTuple expand(PCollectionRowTuple input) {
- validate(input);
- PCollection rowPCollection = input.get(INPUT_TAG);
- Schema schema = rowPCollection.getSchema();
- BigQueryIO.Write write = toWrite(schema);
if (testBigQueryServices != null) {
write = write.withTestServices(testBigQueryServices);
}
- PCollection tableRowPCollection =
- rowPCollection.apply(
- MapElements.into(TypeDescriptor.of(TableRow.class)).via(BigQueryUtils::toTableRow));
- tableRowPCollection.apply(write);
- return PCollectionRowTuple.empty(input.getPipeline());
- }
-
- /** Instantiates a {@link BigQueryIO.Write} from a {@link Schema}. */
- BigQueryIO.Write toWrite(Schema schema) {
- TableSchema tableSchema = BigQueryUtils.toTableSchema(schema);
- CreateDisposition createDisposition =
- CreateDisposition.valueOf(configuration.getCreateDisposition());
- WriteDisposition writeDisposition =
- WriteDisposition.valueOf(configuration.getWriteDisposition());
-
- return BigQueryIO.writeTableRows()
- .to(configuration.getTableSpec())
- .withCreateDisposition(createDisposition)
- .withWriteDisposition(writeDisposition)
- .withSchema(tableSchema);
+ return write;
}
/** Setter for testing using {@link BigQueryServices}. */
@@ -184,73 +126,5 @@ BigQueryIO.Write toWrite(Schema schema) {
void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
this.testBigQueryServices = testBigQueryServices;
}
-
- /** Validate a {@link PCollectionRowTuple} input. */
- void validate(PCollectionRowTuple input) {
- if (!input.has(INPUT_TAG)) {
- throw new IllegalArgumentException(
- String.format(
- "%s %s is missing expected tag: %s",
- getClass().getSimpleName(), input.getClass().getSimpleName(), INPUT_TAG));
- }
-
- PCollection rowInput = input.get(INPUT_TAG);
- Schema sourceSchema = rowInput.getSchema();
-
- if (sourceSchema == null) {
- throw new IllegalArgumentException(
- String.format("%s is null for input of tag: %s", Schema.class, INPUT_TAG));
- }
-
- if (!configuration.getCreateDisposition().equals(CreateDisposition.CREATE_NEVER.name())) {
- return;
- }
-
- BigQueryOptions bigQueryOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
-
- BigQueryServices bigQueryServices = new BigQueryServicesImpl();
- if (testBigQueryServices != null) {
- bigQueryServices = testBigQueryServices;
- }
-
- DatasetService datasetService = bigQueryServices.getDatasetService(bigQueryOptions);
- TableReference tableReference = BigQueryUtils.toTableReference(configuration.getTableSpec());
-
- try {
- Table table = datasetService.getTable(tableReference);
- if (table == null) {
- throw new NullPointerException();
- }
-
- TableSchema tableSchema = table.getSchema();
- if (tableSchema == null) {
- throw new NullPointerException();
- }
-
- Schema destinationSchema = BigQueryUtils.fromTableSchema(tableSchema);
- if (destinationSchema == null) {
- throw new NullPointerException();
- }
-
- validateMatching(sourceSchema, destinationSchema);
-
- } catch (NullPointerException | InterruptedException | IOException e) {
- throw new InvalidConfigurationException(
- String.format(
- "could not validate input for create disposition: %s and table: %s, error: %s",
- configuration.getCreateDisposition(),
- configuration.getTableSpec(),
- e.getMessage()));
- }
- }
-
- void validateMatching(Schema sourceSchema, Schema destinationSchema) {
- if (!sourceSchema.equals(destinationSchema)) {
- throw new IllegalArgumentException(
- String.format(
- "source and destination schema mismatch for table: %s",
- configuration.getTableSpec()));
- }
- }
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformTranslation.java
new file mode 100644
index 000000000000..102a1840e177
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformTranslation.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransform;
+import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransform;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryDirectReadSchemaTransformProvider;
+import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class BigQuerySchemaTransformTranslation {
+ public static class BigQueryStorageReadSchemaTransformTranslator
+ extends SchemaTransformTranslation.SchemaTransformPayloadTranslator<
+ BigQueryDirectReadSchemaTransform> {
+ @Override
+ public SchemaTransformProvider provider() {
+ return new BigQueryDirectReadSchemaTransformProvider();
+ }
+
+ @Override
+ public Row toConfigRow(BigQueryDirectReadSchemaTransform transform) {
+ return transform.getConfigurationRow();
+ }
+ }
+
+ public static class BigQueryStorageWriteSchemaTransformTranslator
+ extends SchemaTransformTranslation.SchemaTransformPayloadTranslator<
+ BigQueryStorageWriteApiSchemaTransform> {
+ @Override
+ public SchemaTransformProvider provider() {
+ return new BigQueryStorageWriteApiSchemaTransformProvider();
+ }
+
+ @Override
+ public Row toConfigRow(BigQueryStorageWriteApiSchemaTransform transform) {
+ return transform.getConfigurationRow();
+ }
+ }
+
+ @AutoService(TransformPayloadTranslatorRegistrar.class)
+ public static class ReadWriteRegistrar implements TransformPayloadTranslatorRegistrar {
+ @Override
+ @SuppressWarnings({
+ "rawtypes",
+ })
+ public Map<
+ ? extends Class extends PTransform>,
+ ? extends PTransformTranslation.TransformPayloadTranslator>
+ getTransformPayloadTranslators() {
+ return ImmutableMap
+ ., PTransformTranslation.TransformPayloadTranslator>builder()
+ .put(
+ BigQueryDirectReadSchemaTransform.class,
+ new BigQueryStorageReadSchemaTransformTranslator())
+ .put(
+ BigQueryStorageWriteApiSchemaTransform.class,
+ new BigQueryStorageWriteSchemaTransformTranslator())
+ .build();
+ }
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java
index 8b8e8179ce7d..76ba186d2e82 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java
@@ -33,7 +33,9 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransformConfiguration;
import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
@@ -62,7 +64,7 @@
public class BigQueryDirectReadSchemaTransformProvider
extends TypedSchemaTransformProvider {
- private static final String OUTPUT_TAG = "OUTPUT_ROWS";
+ public static final String OUTPUT_TAG = "output";
@Override
protected Class configurationClass() {
@@ -76,7 +78,7 @@ protected SchemaTransform from(BigQueryDirectReadSchemaTransformConfiguration co
@Override
public String identifier() {
- return "beam:schematransform:org.apache.beam:bigquery_storage_read:v1";
+ return "beam:schematransform:org.apache.beam:bigquery_storage_read:v1"; // getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ);
}
@Override
@@ -139,6 +141,10 @@ public static Builder builder() {
@Nullable
public abstract List getSelectedFields();
+ @SchemaFieldDescription("Use this Cloud KMS key to encrypt your data")
+ @Nullable
+ public abstract String getKmsKey();
+
@Nullable
/** Builder for the {@link BigQueryDirectReadSchemaTransformConfiguration}. */
@AutoValue.Builder
@@ -151,6 +157,8 @@ public abstract static class Builder {
public abstract Builder setSelectedFields(List selectedFields);
+ public abstract Builder setKmsKey(String kmsKey);
+
/** Builds a {@link BigQueryDirectReadSchemaTransformConfiguration} instance. */
public abstract BigQueryDirectReadSchemaTransformConfiguration build();
}
@@ -161,7 +169,7 @@ public abstract static class Builder {
* BigQueryDirectReadSchemaTransformConfiguration} and instantiated by {@link
* BigQueryDirectReadSchemaTransformProvider}.
*/
- protected static class BigQueryDirectReadSchemaTransform extends SchemaTransform {
+ public static class BigQueryDirectReadSchemaTransform extends SchemaTransform {
private BigQueryServices testBigQueryServices = null;
private final BigQueryDirectReadSchemaTransformConfiguration configuration;
@@ -172,6 +180,20 @@ protected static class BigQueryDirectReadSchemaTransform extends SchemaTransform
this.configuration = configuration;
}
+ public Row getConfigurationRow() {
+ try {
+ // To stay consistent with our SchemaTransform configuration naming conventions,
+ // we sort lexicographically
+ return SchemaRegistry.createDefault()
+ .getToRowFunction(BigQueryDirectReadSchemaTransformConfiguration.class)
+ .apply(configuration)
+ .sorted()
+ .toSnakeCase();
+ } catch (NoSuchSchemaException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@VisibleForTesting
public void setBigQueryServices(BigQueryServices testBigQueryServices) {
this.testBigQueryServices = testBigQueryServices;
@@ -211,6 +233,9 @@ BigQueryIO.TypedRead createDirectReadTransform() {
} else {
read = read.fromQuery(configuration.getQuery());
}
+ if (!Strings.isNullOrEmpty(configuration.getKmsKey())) {
+ read = read.withKmsKey(configuration.getKmsKey());
+ }
if (this.testBigQueryServices != null) {
read = read.withTestServices(testBigQueryServices);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
index c1c06fc592f4..403e13264e20 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
@@ -17,20 +17,16 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery.providers;
+import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteConfiguration.DYNAMIC_DESTINATIONS;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
-import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import com.google.api.services.bigquery.model.TableConstraints;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auto.service.AutoService;
-import com.google.auto.value.AutoValue;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
@@ -42,15 +38,13 @@
import org.apache.beam.sdk.io.gcp.bigquery.RowMutationInformation;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
-import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
-import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
@@ -65,12 +59,11 @@
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Duration;
/**
* An implementation of {@link TypedSchemaTransformProvider} for BigQuery Storage Write API jobs
- * configured via {@link BigQueryStorageWriteApiSchemaTransformConfiguration}.
+ * configured via {@link BigQueryWriteConfiguration}.
*
* Internal only: This class is actively being worked on, and it will likely change. We
* provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
@@ -81,7 +74,7 @@
})
@AutoService(SchemaTransformProvider.class)
public class BigQueryStorageWriteApiSchemaTransformProvider
- extends TypedSchemaTransformProvider {
+ extends TypedSchemaTransformProvider {
private static final Integer DEFAULT_TRIGGER_FREQUENCY_SECS = 5;
private static final Duration DEFAULT_TRIGGERING_FREQUENCY =
Duration.standardSeconds(DEFAULT_TRIGGER_FREQUENCY_SECS);
@@ -89,7 +82,6 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
private static final String FAILED_ROWS_TAG = "FailedRows";
private static final String FAILED_ROWS_WITH_ERRORS_TAG = "FailedRowsWithErrors";
// magic string that tells us to write to dynamic destinations
- protected static final String DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS";
protected static final String ROW_PROPERTY_MUTATION_INFO = "row_mutation_info";
protected static final String ROW_PROPERTY_MUTATION_TYPE = "mutation_type";
protected static final String ROW_PROPERTY_MUTATION_SQN = "change_sequence_number";
@@ -100,14 +92,13 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
.build();
@Override
- protected SchemaTransform from(
- BigQueryStorageWriteApiSchemaTransformConfiguration configuration) {
+ protected SchemaTransform from(BigQueryWriteConfiguration configuration) {
return new BigQueryStorageWriteApiSchemaTransform(configuration);
}
@Override
public String identifier() {
- return String.format("beam:schematransform:org.apache.beam:bigquery_storage_write:v2");
+ return "beam:schematransform:org.apache.beam:bigquery_storage_write:v2"; // getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE);
}
@Override
@@ -130,201 +121,17 @@ public List outputCollectionNames() {
return Arrays.asList(FAILED_ROWS_TAG, FAILED_ROWS_WITH_ERRORS_TAG, "errors");
}
- /** Configuration for writing to BigQuery with Storage Write API. */
- @DefaultSchema(AutoValueSchema.class)
- @AutoValue
- public abstract static class BigQueryStorageWriteApiSchemaTransformConfiguration {
-
- static final Map CREATE_DISPOSITIONS =
- ImmutableMap.builder()
- .put(CreateDisposition.CREATE_IF_NEEDED.name(), CreateDisposition.CREATE_IF_NEEDED)
- .put(CreateDisposition.CREATE_NEVER.name(), CreateDisposition.CREATE_NEVER)
- .build();
-
- static final Map WRITE_DISPOSITIONS =
- ImmutableMap.builder()
- .put(WriteDisposition.WRITE_TRUNCATE.name(), WriteDisposition.WRITE_TRUNCATE)
- .put(WriteDisposition.WRITE_EMPTY.name(), WriteDisposition.WRITE_EMPTY)
- .put(WriteDisposition.WRITE_APPEND.name(), WriteDisposition.WRITE_APPEND)
- .build();
-
- @AutoValue
- public abstract static class ErrorHandling {
- @SchemaFieldDescription("The name of the output PCollection containing failed writes.")
- public abstract String getOutput();
-
- public static Builder builder() {
- return new AutoValue_BigQueryStorageWriteApiSchemaTransformProvider_BigQueryStorageWriteApiSchemaTransformConfiguration_ErrorHandling
- .Builder();
- }
-
- @AutoValue.Builder
- public abstract static class Builder {
- public abstract Builder setOutput(String output);
-
- public abstract ErrorHandling build();
- }
- }
-
- public void validate() {
- String invalidConfigMessage = "Invalid BigQuery Storage Write configuration: ";
-
- // validate output table spec
- checkArgument(
- !Strings.isNullOrEmpty(this.getTable()),
- invalidConfigMessage + "Table spec for a BigQuery Write must be specified.");
-
- // if we have an input table spec, validate it
- if (!this.getTable().equals(DYNAMIC_DESTINATIONS)) {
- checkNotNull(BigQueryHelpers.parseTableSpec(this.getTable()));
- }
-
- // validate create and write dispositions
- if (!Strings.isNullOrEmpty(this.getCreateDisposition())) {
- checkNotNull(
- CREATE_DISPOSITIONS.get(this.getCreateDisposition().toUpperCase()),
- invalidConfigMessage
- + "Invalid create disposition (%s) was specified. Available dispositions are: %s",
- this.getCreateDisposition(),
- CREATE_DISPOSITIONS.keySet());
- }
- if (!Strings.isNullOrEmpty(this.getWriteDisposition())) {
- checkNotNull(
- WRITE_DISPOSITIONS.get(this.getWriteDisposition().toUpperCase()),
- invalidConfigMessage
- + "Invalid write disposition (%s) was specified. Available dispositions are: %s",
- this.getWriteDisposition(),
- WRITE_DISPOSITIONS.keySet());
- }
-
- if (this.getErrorHandling() != null) {
- checkArgument(
- !Strings.isNullOrEmpty(this.getErrorHandling().getOutput()),
- invalidConfigMessage + "Output must not be empty if error handling specified.");
- }
-
- if (this.getAutoSharding() != null
- && this.getAutoSharding()
- && this.getNumStreams() != null) {
- checkArgument(
- this.getNumStreams() == 0,
- invalidConfigMessage
- + "Cannot set a fixed number of streams when auto-sharding is enabled. Please pick only one of the two options.");
- }
- }
-
- /**
- * Instantiates a {@link BigQueryStorageWriteApiSchemaTransformConfiguration.Builder} instance.
- */
- public static Builder builder() {
- return new AutoValue_BigQueryStorageWriteApiSchemaTransformProvider_BigQueryStorageWriteApiSchemaTransformConfiguration
- .Builder();
- }
-
- @SchemaFieldDescription(
- "The bigquery table to write to. Format: [${PROJECT}:]${DATASET}.${TABLE}")
- public abstract String getTable();
-
- @SchemaFieldDescription(
- "Optional field that specifies whether the job is allowed to create new tables. "
- + "The following values are supported: CREATE_IF_NEEDED (the job may create the table), CREATE_NEVER ("
- + "the job must fail if the table does not exist already).")
- @Nullable
- public abstract String getCreateDisposition();
-
- @SchemaFieldDescription(
- "Specifies the action that occurs if the destination table already exists. "
- + "The following values are supported: "
- + "WRITE_TRUNCATE (overwrites the table data), "
- + "WRITE_APPEND (append the data to the table), "
- + "WRITE_EMPTY (job must fail if the table is not empty).")
- @Nullable
- public abstract String getWriteDisposition();
-
- @SchemaFieldDescription(
- "Determines how often to 'commit' progress into BigQuery. Default is every 5 seconds.")
- @Nullable
- public abstract Long getTriggeringFrequencySeconds();
-
- @SchemaFieldDescription(
- "This option enables lower latency for insertions to BigQuery but may ocassionally "
- + "duplicate data elements.")
- @Nullable
- public abstract Boolean getUseAtLeastOnceSemantics();
-
- @SchemaFieldDescription(
- "This option enables using a dynamically determined number of Storage Write API streams to write to "
- + "BigQuery. Only applicable to unbounded data.")
- @Nullable
- public abstract Boolean getAutoSharding();
-
- @SchemaFieldDescription(
- "Specifies the number of write streams that the Storage API sink will use. "
- + "This parameter is only applicable when writing unbounded data.")
- @Nullable
- public abstract Integer getNumStreams();
-
- @SchemaFieldDescription("This option specifies whether and where to output unwritable rows.")
- @Nullable
- public abstract ErrorHandling getErrorHandling();
-
- @SchemaFieldDescription(
- "This option enables the use of BigQuery CDC functionality. The expected PCollection"
- + " should contain Beam Rows with a schema wrapping the record to be inserted and"
- + " adding the CDC info similar to: {row_mutation_info: {mutation_type:\"...\", "
- + "change_sequence_number:\"...\"}, record: {...}}")
- @Nullable
- public abstract Boolean getUseCdcWrites();
-
- @SchemaFieldDescription(
- "If CREATE_IF_NEEDED disposition is set, BigQuery table(s) will be created with this"
- + " columns as primary key. Required when CDC writes are enabled with CREATE_IF_NEEDED.")
- @Nullable
- public abstract List getPrimaryKey();
-
- /** Builder for {@link BigQueryStorageWriteApiSchemaTransformConfiguration}. */
- @AutoValue.Builder
- public abstract static class Builder {
-
- public abstract Builder setTable(String table);
-
- public abstract Builder setCreateDisposition(String createDisposition);
-
- public abstract Builder setWriteDisposition(String writeDisposition);
-
- public abstract Builder setTriggeringFrequencySeconds(Long seconds);
-
- public abstract Builder setUseAtLeastOnceSemantics(Boolean use);
-
- public abstract Builder setAutoSharding(Boolean autoSharding);
-
- public abstract Builder setNumStreams(Integer numStreams);
-
- public abstract Builder setErrorHandling(ErrorHandling errorHandling);
-
- public abstract Builder setUseCdcWrites(Boolean cdcWrites);
-
- public abstract Builder setPrimaryKey(List pkColumns);
-
- /** Builds a {@link BigQueryStorageWriteApiSchemaTransformConfiguration} instance. */
- public abstract BigQueryStorageWriteApiSchemaTransformProvider
- .BigQueryStorageWriteApiSchemaTransformConfiguration
- build();
- }
- }
-
/**
* A {@link SchemaTransform} for BigQuery Storage Write API, configured with {@link
- * BigQueryStorageWriteApiSchemaTransformConfiguration} and instantiated by {@link
+ * BigQueryWriteConfiguration} and instantiated by {@link
* BigQueryStorageWriteApiSchemaTransformProvider}.
*/
- protected static class BigQueryStorageWriteApiSchemaTransform extends SchemaTransform {
+ public static class BigQueryStorageWriteApiSchemaTransform extends SchemaTransform {
private BigQueryServices testBigQueryServices = null;
- private final BigQueryStorageWriteApiSchemaTransformConfiguration configuration;
+ private final BigQueryWriteConfiguration configuration;
- BigQueryStorageWriteApiSchemaTransform(
- BigQueryStorageWriteApiSchemaTransformConfiguration configuration) {
+ BigQueryStorageWriteApiSchemaTransform(BigQueryWriteConfiguration configuration) {
configuration.validate();
this.configuration = configuration;
}
@@ -420,8 +227,7 @@ public TableConstraints getTableConstraints(String destination) {
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
// Check that the input exists
- checkArgument(input.has(INPUT_ROWS_TAG), "Missing expected input tag: %s", INPUT_ROWS_TAG);
- PCollection inputRows = input.get(INPUT_ROWS_TAG);
+ PCollection inputRows = input.getSinglePCollection();
BigQueryIO.Write write = createStorageWriteApiTransform(inputRows.getSchema());
@@ -505,6 +311,20 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
}
}
+ public Row getConfigurationRow() {
+ try {
+ // To stay consistent with our SchemaTransform configuration naming conventions,
+ // we sort lexicographically
+ return SchemaRegistry.createDefault()
+ .getToRowFunction(BigQueryWriteConfiguration.class)
+ .apply(configuration)
+ .sorted()
+ .toSnakeCase();
+ } catch (NoSuchSchemaException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
void validateDynamicDestinationsExpectedSchema(Schema schema) {
checkArgument(
schema.getFieldNames().containsAll(Arrays.asList("destination", "record")),
@@ -540,18 +360,18 @@ BigQueryIO.Write createStorageWriteApiTransform(Schema schema) {
if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
CreateDisposition createDisposition =
- BigQueryStorageWriteApiSchemaTransformConfiguration.CREATE_DISPOSITIONS.get(
- configuration.getCreateDisposition().toUpperCase());
+ CreateDisposition.valueOf(configuration.getCreateDisposition().toUpperCase());
write = write.withCreateDisposition(createDisposition);
}
if (!Strings.isNullOrEmpty(configuration.getWriteDisposition())) {
WriteDisposition writeDisposition =
- BigQueryStorageWriteApiSchemaTransformConfiguration.WRITE_DISPOSITIONS.get(
- configuration.getWriteDisposition().toUpperCase());
+ WriteDisposition.valueOf(configuration.getWriteDisposition().toUpperCase());
write = write.withWriteDisposition(writeDisposition);
}
-
+ if (!Strings.isNullOrEmpty(configuration.getKmsKey())) {
+ write = write.withKmsKey(configuration.getKmsKey());
+ }
if (this.testBigQueryServices != null) {
write = write.withTestServices(testBigQueryServices);
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java
new file mode 100644
index 000000000000..acc5b1ff6ea4
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery.providers;
+
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+
+/**
+ * Configuration for writing to BigQuery with SchemaTransforms. Used by {@link
+ * BigQueryStorageWriteApiSchemaTransformProvider} and {@link
+ * org.apache.beam.sdk.io.gcp.bigquery.BigQueryFileLoadsWriteSchemaTransformProvider}.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class BigQueryWriteConfiguration {
+ protected static final String DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS";
+
+ @AutoValue
+ public abstract static class ErrorHandling {
+ @SchemaFieldDescription("The name of the output PCollection containing failed writes.")
+ public abstract String getOutput();
+
+ public static Builder builder() {
+ return new AutoValue_BigQueryWriteConfiguration_ErrorHandling.Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setOutput(String output);
+
+ public abstract ErrorHandling build();
+ }
+ }
+
+ public void validate() {
+ String invalidConfigMessage = "Invalid BigQuery Storage Write configuration: ";
+
+ // validate output table spec
+ checkArgument(
+ !Strings.isNullOrEmpty(this.getTable()),
+ invalidConfigMessage + "Table spec for a BigQuery Write must be specified.");
+
+ // if we have an input table spec, validate it
+ if (!this.getTable().equals(DYNAMIC_DESTINATIONS)) {
+ checkNotNull(BigQueryHelpers.parseTableSpec(this.getTable()));
+ }
+
+ // validate create and write dispositions
+ String createDisposition = getCreateDisposition();
+ if (createDisposition != null && !createDisposition.isEmpty()) {
+ List createDispositions =
+ Arrays.stream(BigQueryIO.Write.CreateDisposition.values())
+ .map(c -> c.name())
+ .collect(Collectors.toList());
+ Preconditions.checkArgument(
+ createDispositions.contains(createDisposition.toUpperCase()),
+ "Invalid create disposition (%s) was specified. Available dispositions are: %s",
+ createDisposition,
+ createDispositions);
+ }
+ String writeDisposition = getWriteDisposition();
+ if (writeDisposition != null && !writeDisposition.isEmpty()) {
+ List writeDispostions =
+ Arrays.stream(BigQueryIO.Write.WriteDisposition.values())
+ .map(w -> w.name())
+ .collect(Collectors.toList());
+ Preconditions.checkArgument(
+ writeDispostions.contains(writeDisposition.toUpperCase()),
+ "Invalid write disposition (%s) was specified. Available dispositions are: %s",
+ writeDisposition,
+ writeDispostions);
+ }
+
+ ErrorHandling errorHandling = getErrorHandling();
+ if (errorHandling != null) {
+ checkArgument(
+ !Strings.isNullOrEmpty(errorHandling.getOutput()),
+ invalidConfigMessage + "Output must not be empty if error handling specified.");
+ }
+
+ Boolean autoSharding = getAutoSharding();
+ Integer numStreams = getNumStreams();
+ if (autoSharding != null && autoSharding && numStreams != null) {
+ checkArgument(
+ numStreams == 0,
+ invalidConfigMessage
+ + "Cannot set a fixed number of streams when auto-sharding is enabled. Please pick only one of the two options.");
+ }
+ }
+
+ /** Instantiates a {@link BigQueryWriteConfiguration.Builder} instance. */
+ public static Builder builder() {
+ return new AutoValue_BigQueryWriteConfiguration.Builder();
+ }
+
+ @SchemaFieldDescription(
+ "The bigquery table to write to. Format: [${PROJECT}:]${DATASET}.${TABLE}")
+ public abstract String getTable();
+
+ @SchemaFieldDescription(
+ "Optional field that specifies whether the job is allowed to create new tables. "
+ + "The following values are supported: CREATE_IF_NEEDED (the job may create the table), CREATE_NEVER ("
+ + "the job must fail if the table does not exist already).")
+ @Nullable
+ public abstract String getCreateDisposition();
+
+ @SchemaFieldDescription(
+ "Specifies the action that occurs if the destination table already exists. "
+ + "The following values are supported: "
+ + "WRITE_TRUNCATE (overwrites the table data), "
+ + "WRITE_APPEND (append the data to the table), "
+ + "WRITE_EMPTY (job must fail if the table is not empty).")
+ @Nullable
+ public abstract String getWriteDisposition();
+
+ @SchemaFieldDescription(
+ "Determines how often to 'commit' progress into BigQuery. Default is every 5 seconds.")
+ @Nullable
+ public abstract Long getTriggeringFrequencySeconds();
+
+ @SchemaFieldDescription(
+ "This option enables lower latency for insertions to BigQuery but may ocassionally "
+ + "duplicate data elements.")
+ @Nullable
+ public abstract Boolean getUseAtLeastOnceSemantics();
+
+ @SchemaFieldDescription(
+ "This option enables using a dynamically determined number of Storage Write API streams to write to "
+ + "BigQuery. Only applicable to unbounded data.")
+ @Nullable
+ public abstract Boolean getAutoSharding();
+
+ @SchemaFieldDescription(
+ "Specifies the number of write streams that the Storage API sink will use. "
+ + "This parameter is only applicable when writing unbounded data.")
+ @Nullable
+ public abstract Integer getNumStreams();
+
+ @SchemaFieldDescription("Use this Cloud KMS key to encrypt your data")
+ @Nullable
+ public abstract String getKmsKey();
+
+ @SchemaFieldDescription("This option specifies whether and where to output unwritable rows.")
+ @Nullable
+ public abstract ErrorHandling getErrorHandling();
+
+ @SchemaFieldDescription(
+ "This option enables the use of BigQuery CDC functionality. The expected PCollection"
+ + " should contain Beam Rows with a schema wrapping the record to be inserted and"
+ + " adding the CDC info similar to: {row_mutation_info: {mutation_type:\"...\", "
+ + "change_sequence_number:\"...\"}, record: {...}}")
+ @Nullable
+ public abstract Boolean getUseCdcWrites();
+
+ @SchemaFieldDescription(
+ "If CREATE_IF_NEEDED disposition is set, BigQuery table(s) will be created with this"
+ + " columns as primary key. Required when CDC writes are enabled with CREATE_IF_NEEDED.")
+ @Nullable
+ public abstract List getPrimaryKey();
+
+ /** Builder for {@link BigQueryWriteConfiguration}. */
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setTable(String table);
+
+ public abstract Builder setCreateDisposition(String createDisposition);
+
+ public abstract Builder setWriteDisposition(String writeDisposition);
+
+ public abstract Builder setTriggeringFrequencySeconds(Long seconds);
+
+ public abstract Builder setUseAtLeastOnceSemantics(Boolean use);
+
+ public abstract Builder setAutoSharding(Boolean autoSharding);
+
+ public abstract Builder setNumStreams(Integer numStreams);
+
+ public abstract Builder setKmsKey(String kmsKey);
+
+ public abstract Builder setErrorHandling(ErrorHandling errorHandling);
+
+ public abstract Builder setUseCdcWrites(Boolean cdcWrites);
+
+ public abstract Builder setPrimaryKey(List pkColumns);
+
+ /** Builds a {@link BigQueryWriteConfiguration} instance. */
+ public abstract BigQueryWriteConfiguration build();
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProviderTest.java
index dd8bb9fc8664..1e4791b94e1c 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProviderTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProviderTest.java
@@ -17,38 +17,27 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
-import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryFileLoadsWriteSchemaTransformProvider.INPUT_TAG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
import java.util.Arrays;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryFileLoadsWriteSchemaTransformProvider.BigQueryWriteSchemaTransform;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteConfiguration;
import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayData.Identifier;
-import org.apache.beam.sdk.transforms.display.DisplayData.Item;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
-import org.apache.commons.lang3.tuple.Pair;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -71,8 +60,6 @@ public class BigQueryFileLoadsWriteSchemaTransformProviderTest {
private static final Schema SCHEMA =
Schema.of(Field.of("name", FieldType.STRING), Field.of("number", FieldType.INT64));
- private static final TableSchema TABLE_SCHEMA = BigQueryUtils.toTableSchema(SCHEMA);
-
private static final List ROWS =
Arrays.asList(
Row.withSchema(SCHEMA).withFieldValue("name", "a").withFieldValue("number", 1L).build(),
@@ -109,9 +96,9 @@ public void tearDown() {
public void testLoad() throws IOException, InterruptedException {
BigQueryFileLoadsWriteSchemaTransformProvider provider =
new BigQueryFileLoadsWriteSchemaTransformProvider();
- BigQueryFileLoadsWriteSchemaTransformConfiguration configuration =
- BigQueryFileLoadsWriteSchemaTransformConfiguration.builder()
- .setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE))
+ BigQueryWriteConfiguration configuration =
+ BigQueryWriteConfiguration.builder()
+ .setTable(BigQueryHelpers.toTableSpec(TABLE_REFERENCE))
.setWriteDisposition(WriteDisposition.WRITE_TRUNCATE.name())
.setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name())
.build();
@@ -128,138 +115,4 @@ public void testLoad() throws IOException, InterruptedException {
assertNotNull(fakeDatasetService.getTable(TABLE_REFERENCE));
assertEquals(ROWS.size(), fakeDatasetService.getAllRows(PROJECT, DATASET, TABLE_ID).size());
}
-
- @Test
- public void testValidatePipelineOptions() {
- List<
- Pair<
- BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder,
- Class extends Exception>>>
- cases =
- Arrays.asList(
- Pair.of(
- BigQueryFileLoadsWriteSchemaTransformConfiguration.builder()
- .setTableSpec("project.doesnot.exist")
- .setCreateDisposition(CreateDisposition.CREATE_NEVER.name())
- .setWriteDisposition(WriteDisposition.WRITE_APPEND.name()),
- InvalidConfigurationException.class),
- Pair.of(
- BigQueryFileLoadsWriteSchemaTransformConfiguration.builder()
- .setTableSpec(String.format("%s.%s.%s", PROJECT, DATASET, "doesnotexist"))
- .setCreateDisposition(CreateDisposition.CREATE_NEVER.name())
- .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name()),
- InvalidConfigurationException.class),
- Pair.of(
- BigQueryFileLoadsWriteSchemaTransformConfiguration.builder()
- .setTableSpec("project.doesnot.exist")
- .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name())
- .setWriteDisposition(WriteDisposition.WRITE_APPEND.name()),
- null));
- for (Pair<
- BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder, Class extends Exception>>
- caze : cases) {
- BigQueryWriteSchemaTransform transform = transformFrom(caze.getLeft().build());
- if (caze.getRight() != null) {
- assertThrows(caze.getRight(), () -> transform.validate(p.getOptions()));
- } else {
- transform.validate(p.getOptions());
- }
- }
- }
-
- @Test
- public void testToWrite() {
- List<
- Pair<
- BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder,
- BigQueryIO.Write>>
- cases =
- Arrays.asList(
- Pair.of(
- BigQueryFileLoadsWriteSchemaTransformConfiguration.builder()
- .setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE))
- .setCreateDisposition(CreateDisposition.CREATE_NEVER.name())
- .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name()),
- BigQueryIO.writeTableRows()
- .to(TABLE_REFERENCE)
- .withCreateDisposition(CreateDisposition.CREATE_NEVER)
- .withWriteDisposition(WriteDisposition.WRITE_EMPTY)
- .withSchema(TABLE_SCHEMA)),
- Pair.of(
- BigQueryFileLoadsWriteSchemaTransformConfiguration.builder()
- .setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE))
- .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name())
- .setWriteDisposition(WriteDisposition.WRITE_TRUNCATE.name()),
- BigQueryIO.writeTableRows()
- .to(TABLE_REFERENCE)
- .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
- .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
- .withSchema(TABLE_SCHEMA)));
- for (Pair<
- BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder, BigQueryIO.Write>
- caze : cases) {
- BigQueryWriteSchemaTransform transform = transformFrom(caze.getLeft().build());
- Map gotDisplayData = DisplayData.from(transform.toWrite(SCHEMA)).asMap();
- Map wantDisplayData = DisplayData.from(caze.getRight()).asMap();
- Set keys = new HashSet<>();
- keys.addAll(gotDisplayData.keySet());
- keys.addAll(wantDisplayData.keySet());
- for (Identifier key : keys) {
- Item got = null;
- Item want = null;
- if (gotDisplayData.containsKey(key)) {
- got = gotDisplayData.get(key);
- }
- if (wantDisplayData.containsKey(key)) {
- want = wantDisplayData.get(key);
- }
- assertEquals(want, got);
- }
- }
- }
-
- @Test
- public void validatePCollectionRowTupleInput() {
- PCollectionRowTuple empty = PCollectionRowTuple.empty(p);
- PCollectionRowTuple valid =
- PCollectionRowTuple.of(
- INPUT_TAG, p.apply("CreateRowsWithValidSchema", Create.of(ROWS)).setRowSchema(SCHEMA));
-
- PCollectionRowTuple invalid =
- PCollectionRowTuple.of(
- INPUT_TAG,
- p.apply(
- "CreateRowsWithInvalidSchema",
- Create.of(
- Row.nullRow(
- Schema.builder().addNullableField("name", FieldType.STRING).build()))));
-
- BigQueryWriteSchemaTransform transform =
- transformFrom(
- BigQueryFileLoadsWriteSchemaTransformConfiguration.builder()
- .setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE))
- .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name())
- .setWriteDisposition(WriteDisposition.WRITE_APPEND.name())
- .build());
-
- assertThrows(IllegalArgumentException.class, () -> transform.validate(empty));
-
- assertThrows(IllegalStateException.class, () -> transform.validate(invalid));
-
- transform.validate(valid);
-
- p.run();
- }
-
- private BigQueryWriteSchemaTransform transformFrom(
- BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) {
- BigQueryFileLoadsWriteSchemaTransformProvider provider =
- new BigQueryFileLoadsWriteSchemaTransformProvider();
- BigQueryWriteSchemaTransform transform =
- (BigQueryWriteSchemaTransform) provider.from(configuration);
-
- transform.setTestBigQueryServices(fakeBigQueryServices);
-
- return transform;
- }
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryManagedIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryManagedIT.java
new file mode 100644
index 000000000000..a39e27bf7d85
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryManagedIT.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryDirectReadSchemaTransformProvider;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.managed.Managed;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PeriodicImpulse;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** This class tests the execution of {@link Managed} BigQueryIO. */
+@RunWith(JUnit4.class)
+public class BigQueryManagedIT {
+ private static final Schema SCHEMA =
+ Schema.of(
+ Schema.Field.of("str", Schema.FieldType.STRING),
+ Schema.Field.of("number", Schema.FieldType.INT64));
+
+ private static final List ROWS =
+ LongStream.range(0, 20)
+ .mapToObj(
+ i ->
+ Row.withSchema(SCHEMA)
+ .withFieldValue("str", Long.toString(i))
+ .withFieldValue("number", i)
+ .build())
+ .collect(Collectors.toList());
+
+ private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryManagedIT");
+
+ private static final String PROJECT =
+ TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+ private static final String BIG_QUERY_DATASET_ID = "bigquery_managed_" + System.nanoTime();
+
+ @BeforeClass
+ public static void setUpTestEnvironment() throws IOException, InterruptedException {
+ // Create one BQ dataset for all test cases.
+ BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID, null);
+ }
+
+ @AfterClass
+ public static void cleanup() {
+ BQ_CLIENT.deleteDataset(PROJECT, BIG_QUERY_DATASET_ID);
+ }
+
+ @Test
+ public void testStreamingStorageWriteRead() {
+ String table = String.format("%s:%s.managed_storage_write_read", PROJECT, BIG_QUERY_DATASET_ID);
+
+ Map writeConfig =
+ ImmutableMap.builder().put("table", table).build();
+ Pipeline p = Pipeline.create();
+ PCollectionRowTuple.of("input", getInput(p, true))
+ .apply(Managed.write(Managed.BIGQUERY).withConfig(writeConfig));
+ p.run().waitUntilFinish();
+
+ Map readConfig =
+ ImmutableMap.builder().put("table", table).build();
+ Pipeline q = Pipeline.create();
+ PCollection outputRows =
+ PCollectionRowTuple.empty(p)
+ .apply(Managed.read(Managed.BIGQUERY).withConfig(readConfig))
+ .get(BigQueryDirectReadSchemaTransformProvider.OUTPUT_TAG);
+ PAssert.that(outputRows).containsInAnyOrder(ROWS);
+ q.run().waitUntilFinish();
+ }
+
+ public PCollection getInput(Pipeline p, boolean isStreaming) {
+ if (isStreaming) {
+ return p.apply(
+ PeriodicImpulse.create()
+ .stopAfter(Duration.millis(20))
+ .withInterval(Duration.millis(1)))
+ .apply(
+ MapElements.into(TypeDescriptors.rows())
+ .via(
+ i ->
+ Row.withSchema(SCHEMA)
+ .withFieldValue("str", Long.toString(i.getMillis()))
+ .withFieldValue("number", i.getMillis())
+ .build()))
+ .setRowSchema(SCHEMA);
+ }
+ return p.apply(Create.of(ROWS)).setRowSchema(SCHEMA);
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformTranslationTest.java
new file mode 100644
index 000000000000..bc6624bd9371
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformTranslationTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformTranslation.BigQueryStorageReadSchemaTransformTranslator;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformTranslation.BigQueryStorageWriteSchemaTransformTranslator;
+import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransform;
+import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransform;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryDirectReadSchemaTransformProvider;
+import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.construction.BeamUrns;
+import org.apache.beam.sdk.util.construction.PipelineTranslation;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class BigQuerySchemaTransformTranslationTest {
+ static final BigQueryStorageWriteApiSchemaTransformProvider WRITE_PROVIDER =
+ new BigQueryStorageWriteApiSchemaTransformProvider();
+ static final BigQueryDirectReadSchemaTransformProvider READ_PROVIDER =
+ new BigQueryDirectReadSchemaTransformProvider();
+ static final Row WRITE_CONFIG_ROW =
+ Row.withSchema(WRITE_PROVIDER.configurationSchema())
+ .withFieldValue("table", "project:dataset.table")
+ .withFieldValue("create_disposition", "create_never")
+ .withFieldValue("write_disposition", "write_append")
+ .withFieldValue("triggering_frequency_seconds", 5L)
+ .withFieldValue("use_at_least_once_semantics", false)
+ .withFieldValue("auto_sharding", false)
+ .withFieldValue("num_streams", 5)
+ .withFieldValue("error_handling", null)
+ .build();
+ static final Row READ_CONFIG_ROW =
+ Row.withSchema(READ_PROVIDER.configurationSchema())
+ .withFieldValue("query", null)
+ .withFieldValue("table_spec", "apache-beam-testing.samples.weather_stations")
+ .withFieldValue("row_restriction", "col < 5")
+ .withFieldValue("selected_fields", Arrays.asList("col1", "col2", "col3"))
+ .build();
+
+ @Test
+ public void testRecreateWriteTransformFromRow() {
+ BigQueryStorageWriteApiSchemaTransform writeTransform =
+ (BigQueryStorageWriteApiSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG_ROW);
+
+ BigQueryStorageWriteSchemaTransformTranslator translator =
+ new BigQueryStorageWriteSchemaTransformTranslator();
+ Row translatedRow = translator.toConfigRow(writeTransform);
+
+ BigQueryStorageWriteApiSchemaTransform writeTransformFromRow =
+ translator.fromConfigRow(translatedRow, PipelineOptionsFactory.create());
+
+ assertEquals(WRITE_CONFIG_ROW, writeTransformFromRow.getConfigurationRow());
+ }
+
+ @Test
+ public void testWriteTransformProtoTranslation()
+ throws InvalidProtocolBufferException, IOException {
+ // First build a pipeline
+ Pipeline p = Pipeline.create();
+ Schema inputSchema = Schema.builder().addByteArrayField("b").build();
+ PCollection input =
+ p.apply(
+ Create.of(
+ Collections.singletonList(
+ Row.withSchema(inputSchema).addValue(new byte[] {1, 2, 3}).build())))
+ .setRowSchema(inputSchema);
+
+ BigQueryStorageWriteApiSchemaTransform writeTransform =
+ (BigQueryStorageWriteApiSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG_ROW);
+ PCollectionRowTuple.of("input", input).apply(writeTransform);
+
+ // Then translate the pipeline to a proto and extract KafkaWriteSchemaTransform proto
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+ List writeTransformProto =
+ pipelineProto.getComponents().getTransformsMap().values().stream()
+ .filter(
+ tr -> {
+ RunnerApi.FunctionSpec spec = tr.getSpec();
+ try {
+ return spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))
+ && SchemaTransformPayload.parseFrom(spec.getPayload())
+ .getIdentifier()
+ .equals(WRITE_PROVIDER.identifier());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toList());
+ assertEquals(1, writeTransformProto.size());
+ RunnerApi.FunctionSpec spec = writeTransformProto.get(0).getSpec();
+
+ // Check that the proto contains correct values
+ SchemaTransformPayload payload = SchemaTransformPayload.parseFrom(spec.getPayload());
+ Schema schemaFromSpec = SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
+ assertEquals(WRITE_PROVIDER.configurationSchema(), schemaFromSpec);
+ Row rowFromSpec = RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput());
+
+ assertEquals(WRITE_CONFIG_ROW, rowFromSpec);
+
+ // Use the information in the proto to recreate the KafkaWriteSchemaTransform
+ BigQueryStorageWriteSchemaTransformTranslator translator =
+ new BigQueryStorageWriteSchemaTransformTranslator();
+ BigQueryStorageWriteApiSchemaTransform writeTransformFromSpec =
+ translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create());
+
+ assertEquals(WRITE_CONFIG_ROW, writeTransformFromSpec.getConfigurationRow());
+ }
+
+ @Test
+ public void testReCreateReadTransformFromRow() {
+ BigQueryDirectReadSchemaTransform readTransform =
+ (BigQueryDirectReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG_ROW);
+
+ BigQueryStorageReadSchemaTransformTranslator translator =
+ new BigQueryStorageReadSchemaTransformTranslator();
+ Row row = translator.toConfigRow(readTransform);
+
+ BigQueryDirectReadSchemaTransform readTransformFromRow =
+ translator.fromConfigRow(row, PipelineOptionsFactory.create());
+
+ assertEquals(READ_CONFIG_ROW, readTransformFromRow.getConfigurationRow());
+ }
+
+ @Test
+ public void testReadTransformProtoTranslation()
+ throws InvalidProtocolBufferException, IOException {
+ // First build a pipeline
+ Pipeline p = Pipeline.create();
+
+ BigQueryDirectReadSchemaTransform readTransform =
+ (BigQueryDirectReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG_ROW);
+
+ PCollectionRowTuple.empty(p).apply(readTransform);
+
+ // Then translate the pipeline to a proto and extract KafkaReadSchemaTransform proto
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+ List readTransformProto =
+ pipelineProto.getComponents().getTransformsMap().values().stream()
+ .filter(
+ tr -> {
+ RunnerApi.FunctionSpec spec = tr.getSpec();
+ try {
+ return spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))
+ && SchemaTransformPayload.parseFrom(spec.getPayload())
+ .getIdentifier()
+ .equals(READ_PROVIDER.identifier());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toList());
+ assertEquals(1, readTransformProto.size());
+ RunnerApi.FunctionSpec spec = readTransformProto.get(0).getSpec();
+
+ // Check that the proto contains correct values
+ SchemaTransformPayload payload = SchemaTransformPayload.parseFrom(spec.getPayload());
+ Schema schemaFromSpec = SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
+ assertEquals(READ_PROVIDER.configurationSchema(), schemaFromSpec);
+ Row rowFromSpec = RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput());
+ assertEquals(READ_CONFIG_ROW, rowFromSpec);
+
+ // Use the information in the proto to recreate the KafkaReadSchemaTransform
+ BigQueryStorageReadSchemaTransformTranslator translator =
+ new BigQueryStorageReadSchemaTransformTranslator();
+ BigQueryDirectReadSchemaTransform readTransformFromSpec =
+ translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create());
+
+ assertEquals(READ_CONFIG_ROW, readTransformFromSpec.getConfigurationRow());
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
index 87ba2961461a..3a23f5a3205a 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
@@ -35,7 +35,6 @@
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransform;
-import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration;
import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
@@ -108,15 +107,14 @@ public void setUp() throws Exception {
@Test
public void testInvalidConfig() {
- List invalidConfigs =
+ List invalidConfigs =
Arrays.asList(
- BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
- .setTable("not_a_valid_table_spec"),
- BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
+ BigQueryWriteConfiguration.builder().setTable("not_a_valid_table_spec"),
+ BigQueryWriteConfiguration.builder()
.setTable("project:dataset.table")
.setCreateDisposition("INVALID_DISPOSITION"));
- for (BigQueryStorageWriteApiSchemaTransformConfiguration.Builder config : invalidConfigs) {
+ for (BigQueryWriteConfiguration.Builder config : invalidConfigs) {
assertThrows(
Exception.class,
() -> {
@@ -125,13 +123,11 @@ public void testInvalidConfig() {
}
}
- public PCollectionRowTuple runWithConfig(
- BigQueryStorageWriteApiSchemaTransformConfiguration config) {
+ public PCollectionRowTuple runWithConfig(BigQueryWriteConfiguration config) {
return runWithConfig(config, ROWS);
}
- public PCollectionRowTuple runWithConfig(
- BigQueryStorageWriteApiSchemaTransformConfiguration config, List inputRows) {
+ public PCollectionRowTuple runWithConfig(BigQueryWriteConfiguration config, List inputRows) {
BigQueryStorageWriteApiSchemaTransformProvider provider =
new BigQueryStorageWriteApiSchemaTransformProvider();
@@ -176,8 +172,8 @@ public boolean rowEquals(Row expectedRow, TableRow actualRow) {
@Test
public void testSimpleWrite() throws Exception {
String tableSpec = "project:dataset.simple_write";
- BigQueryStorageWriteApiSchemaTransformConfiguration config =
- BigQueryStorageWriteApiSchemaTransformConfiguration.builder().setTable(tableSpec).build();
+ BigQueryWriteConfiguration config =
+ BigQueryWriteConfiguration.builder().setTable(tableSpec).build();
runWithConfig(config, ROWS);
p.run().waitUntilFinish();
@@ -189,9 +185,9 @@ public void testSimpleWrite() throws Exception {
@Test
public void testWriteToDynamicDestinations() throws Exception {
- String dynamic = BigQueryStorageWriteApiSchemaTransformProvider.DYNAMIC_DESTINATIONS;
- BigQueryStorageWriteApiSchemaTransformConfiguration config =
- BigQueryStorageWriteApiSchemaTransformConfiguration.builder().setTable(dynamic).build();
+ String dynamic = BigQueryWriteConfiguration.DYNAMIC_DESTINATIONS;
+ BigQueryWriteConfiguration config =
+ BigQueryWriteConfiguration.builder().setTable(dynamic).build();
String baseTableSpec = "project:dataset.dynamic_write_";
@@ -273,8 +269,8 @@ public void testCDCWrites() throws Exception {
String tableSpec = "project:dataset.cdc_write";
List primaryKeyColumns = ImmutableList.of("name");
- BigQueryStorageWriteApiSchemaTransformConfiguration config =
- BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
+ BigQueryWriteConfiguration config =
+ BigQueryWriteConfiguration.builder()
.setUseAtLeastOnceSemantics(true)
.setTable(tableSpec)
.setUseCdcWrites(true)
@@ -304,9 +300,9 @@ public void testCDCWrites() throws Exception {
@Test
public void testCDCWriteToDynamicDestinations() throws Exception {
List primaryKeyColumns = ImmutableList.of("name");
- String dynamic = BigQueryStorageWriteApiSchemaTransformProvider.DYNAMIC_DESTINATIONS;
- BigQueryStorageWriteApiSchemaTransformConfiguration config =
- BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
+ String dynamic = BigQueryWriteConfiguration.DYNAMIC_DESTINATIONS;
+ BigQueryWriteConfiguration config =
+ BigQueryWriteConfiguration.builder()
.setUseAtLeastOnceSemantics(true)
.setTable(dynamic)
.setUseCdcWrites(true)
@@ -338,8 +334,8 @@ public void testCDCWriteToDynamicDestinations() throws Exception {
@Test
public void testInputElementCount() throws Exception {
String tableSpec = "project:dataset.input_count";
- BigQueryStorageWriteApiSchemaTransformConfiguration config =
- BigQueryStorageWriteApiSchemaTransformConfiguration.builder().setTable(tableSpec).build();
+ BigQueryWriteConfiguration config =
+ BigQueryWriteConfiguration.builder().setTable(tableSpec).build();
runWithConfig(config);
PipelineResult result = p.run();
@@ -368,13 +364,11 @@ public void testInputElementCount() throws Exception {
@Test
public void testFailedRows() throws Exception {
String tableSpec = "project:dataset.write_with_fail";
- BigQueryStorageWriteApiSchemaTransformConfiguration config =
- BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
+ BigQueryWriteConfiguration config =
+ BigQueryWriteConfiguration.builder()
.setTable(tableSpec)
.setErrorHandling(
- BigQueryStorageWriteApiSchemaTransformConfiguration.ErrorHandling.builder()
- .setOutput("FailedRows")
- .build())
+ BigQueryWriteConfiguration.ErrorHandling.builder().setOutput("FailedRows").build())
.build();
String failValue = "fail_me";
@@ -420,13 +414,11 @@ public void testFailedRows() throws Exception {
@Test
public void testErrorCount() throws Exception {
String tableSpec = "project:dataset.error_count";
- BigQueryStorageWriteApiSchemaTransformConfiguration config =
- BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
+ BigQueryWriteConfiguration config =
+ BigQueryWriteConfiguration.builder()
.setTable(tableSpec)
.setErrorHandling(
- BigQueryStorageWriteApiSchemaTransformConfiguration.ErrorHandling.builder()
- .setOutput("FailedRows")
- .build())
+ BigQueryWriteConfiguration.ErrorHandling.builder().setOutput("FailedRows").build())
.build();
Function shouldFailRow =
diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
index 8477726686ee..adc7fc7e2684 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
@@ -86,17 +86,20 @@ public class Managed {
// TODO: Dynamically generate a list of supported transforms
public static final String ICEBERG = "iceberg";
public static final String KAFKA = "kafka";
+ public static final String BIGQUERY = "bigquery";
// Supported SchemaTransforms
public static final Map READ_TRANSFORMS =
ImmutableMap.builder()
.put(ICEBERG, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_READ))
.put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ))
+ .put(BIGQUERY, getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ))
.build();
public static final Map WRITE_TRANSFORMS =
ImmutableMap.builder()
.put(ICEBERG, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_WRITE))
.put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE))
+ .put(BIGQUERY, getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE))
.build();
/**
diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
index 6f97983d3260..6ca883c96698 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
@@ -117,7 +117,7 @@ protected void validate() {
"Please specify a config or a config URL, but not both.");
}
- public @Nullable String resolveUnderlyingConfig() {
+ private Map resolveUnderlyingConfig() {
String yamlTransformConfig = getConfig();
// If YAML string is empty, then attempt to read from YAML file
if (Strings.isNullOrEmpty(yamlTransformConfig)) {
@@ -131,7 +131,8 @@ protected void validate() {
throw new RuntimeException(e);
}
}
- return yamlTransformConfig;
+
+ return YamlUtils.yamlStringToMap(yamlTransformConfig);
}
}
@@ -152,24 +153,24 @@ protected SchemaTransform from(ManagedConfig managedConfig) {
static class ManagedSchemaTransform extends SchemaTransform {
private final ManagedConfig managedConfig;
- private final Row underlyingTransformConfig;
+ private final Row underlyingRowConfig;
private final SchemaTransformProvider underlyingTransformProvider;
ManagedSchemaTransform(
ManagedConfig managedConfig, SchemaTransformProvider underlyingTransformProvider) {
// parse config before expansion to check if it matches underlying transform's config schema
Schema transformConfigSchema = underlyingTransformProvider.configurationSchema();
- Row underlyingTransformConfig;
+ Row underlyingRowConfig;
try {
- underlyingTransformConfig = getRowConfig(managedConfig, transformConfigSchema);
+ underlyingRowConfig = getRowConfig(managedConfig, transformConfigSchema);
} catch (Exception e) {
throw new IllegalArgumentException(
"Encountered an error when retrieving a Row configuration", e);
}
- this.managedConfig = managedConfig;
- this.underlyingTransformConfig = underlyingTransformConfig;
+ this.underlyingRowConfig = underlyingRowConfig;
this.underlyingTransformProvider = underlyingTransformProvider;
+ this.managedConfig = managedConfig;
}
@Override
@@ -177,9 +178,9 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
LOG.debug(
"Building transform \"{}\" with Row configuration: {}",
underlyingTransformProvider.identifier(),
- underlyingTransformConfig);
+ underlyingRowConfig);
- return input.apply(underlyingTransformProvider.from(underlyingTransformConfig));
+ return input.apply(underlyingTransformProvider.from(underlyingRowConfig));
}
public ManagedConfig getManagedConfig() {
@@ -201,16 +202,14 @@ Row getConfigurationRow() {
}
}
+ // May return an empty row (perhaps the underlying transform doesn't have any required
+ // parameters)
@VisibleForTesting
static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
- // May return an empty row (perhaps the underlying transform doesn't have any required
- // parameters)
- String yamlConfig = config.resolveUnderlyingConfig();
- Map configMap = YamlUtils.yamlStringToMap(yamlConfig);
-
- // The config Row object will be used to build the underlying SchemaTransform.
- // If a mapping for the SchemaTransform exists, we use it to update parameter names and align
- // with the underlying config schema
+ Map configMap = config.resolveUnderlyingConfig();
+ // Build a config Row that will be used to build the underlying SchemaTransform.
+ // If a mapping for the SchemaTransform exists, we use it to update parameter names to align
+ // with the underlying SchemaTransform config schema
Map mapping = MAPPINGS.get(config.getTransformIdentifier());
if (mapping != null && configMap != null) {
Map remappedConfig = new HashMap<>();
diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
index 4cf752747be5..30476a30d373 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
@@ -50,9 +50,27 @@ public class ManagedTransformConstants {
private static final Map KAFKA_WRITE_MAPPINGS =
ImmutableMap.builder().put("data_format", "format").build();
+ private static final Map BIGQUERY_READ_MAPPINGS =
+ ImmutableMap.builder()
+ .put("table", "table_spec")
+ .put("fields", "selected_fields")
+ .build();
+
+ private static final Map BIGQUERY_WRITE_MAPPINGS =
+ ImmutableMap.builder()
+ .put("at_least_once", "use_at_least_once_semantics")
+ .put("triggering_frequency", "triggering_frequency_seconds")
+ .build();
+
public static final Map> MAPPINGS =
ImmutableMap.>builder()
.put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ), KAFKA_READ_MAPPINGS)
.put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE), KAFKA_WRITE_MAPPINGS)
+ .put(
+ getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ),
+ BIGQUERY_READ_MAPPINGS)
+ .put(
+ getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE),
+ BIGQUERY_WRITE_MAPPINGS)
.build();
}
diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
index e9edf8751e34..a287ec6260ce 100644
--- a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
+++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
@@ -88,8 +88,7 @@ public void testGetConfigRowFromYamlFile() throws URISyntaxException {
.withFieldValue("extra_integer", 123)
.build();
Row configRow =
- ManagedSchemaTransformProvider.getRowConfig(
- config, new TestSchemaTransformProvider().configurationSchema());
+ ManagedSchemaTransformProvider.getRowConfig(config, TestSchemaTransformProvider.SCHEMA);
assertEquals(expectedRow, configRow);
}