diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json index a03c067d2c4e..1efc8e9e4405 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json @@ -1,3 +1,4 @@ { - "comment": "Modify this file in a trivial way to cause this test suite to run" + "comment": "Modify this file in a trivial way to cause this test suite to run", + "modification": 1 } diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto index b03350966d6c..e7990e2f1085 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto @@ -70,6 +70,10 @@ message ManagedTransforms { "beam:schematransform:org.apache.beam:kafka_read:v1"]; KAFKA_WRITE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) = "beam:schematransform:org.apache.beam:kafka_write:v1"]; + BIGQUERY_READ = 4 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:bigquery_storage_read:v1"]; + BIGQUERY_WRITE = 5 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:bigquery_storage_write:v2"]; } } diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 3e322d976c1a..2acce3e94cc2 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -159,6 +159,7 @@ dependencies { testImplementation project(path: ":sdks:java:extensions:google-cloud-platform-core", configuration: "testRuntimeMigration") testImplementation project(path: ":sdks:java:extensions:protobuf", configuration: "testRuntimeMigration") testImplementation project(path: ":runners:direct-java", configuration: "shadow") + testImplementation project(":sdks:java:managed") testImplementation project(path: ":sdks:java:io:common") testImplementation project(path: ":sdks:java:testing:test-utils") testImplementation library.java.commons_math3 diff --git a/sdks/java/io/google-cloud-platform/expansion-service/build.gradle b/sdks/java/io/google-cloud-platform/expansion-service/build.gradle index 1288d91964e1..f6c6f07d0cdf 100644 --- a/sdks/java/io/google-cloud-platform/expansion-service/build.gradle +++ b/sdks/java/io/google-cloud-platform/expansion-service/build.gradle @@ -36,6 +36,9 @@ dependencies { permitUnusedDeclared project(":sdks:java:io:google-cloud-platform") // BEAM-11761 implementation project(":sdks:java:extensions:schemaio-expansion-service") permitUnusedDeclared project(":sdks:java:extensions:schemaio-expansion-service") // BEAM-11761 + implementation project(":sdks:java:managed") + permitUnusedDeclared project(":sdks:java:managed") // BEAM-11761 + runtimeOnly library.java.slf4j_jdk14 } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformConfiguration.java deleted file mode 100644 index f634b5ec6f60..000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformConfiguration.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.bigquery; - -import com.google.auto.value.AutoValue; -import org.apache.beam.sdk.schemas.AutoValueSchema; -import org.apache.beam.sdk.schemas.annotations.DefaultSchema; - -/** - * Configuration for writing to BigQuery. - * - *

This class is meant to be used with {@link BigQueryFileLoadsWriteSchemaTransformProvider}. - * - *

Internal only: This class is actively being worked on, and it will likely change. We - * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam - * repository. - */ -@DefaultSchema(AutoValueSchema.class) -@AutoValue -public abstract class BigQueryFileLoadsWriteSchemaTransformConfiguration { - - /** Instantiates a {@link BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder}. */ - public static Builder builder() { - return new AutoValue_BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder(); - } - - /** - * Writes to the given table specification. See {@link BigQueryIO.Write#to(String)}} for the - * expected format. - */ - public abstract String getTableSpec(); - - /** Specifies whether the table should be created if it does not exist. */ - public abstract String getCreateDisposition(); - - /** Specifies what to do with existing data in the table, in case the table already exists. */ - public abstract String getWriteDisposition(); - - @AutoValue.Builder - public abstract static class Builder { - - /** - * Writes to the given table specification. See {@link BigQueryIO.Write#to(String)}} for the - * expected format. - */ - public abstract Builder setTableSpec(String value); - - /** Specifies whether the table should be created if it does not exist. */ - public abstract Builder setCreateDisposition(String value); - - /** Specifies what to do with existing data in the table, in case the table already exists. */ - public abstract Builder setWriteDisposition(String value); - - /** Builds the {@link BigQueryFileLoadsWriteSchemaTransformConfiguration} configuration. */ - public abstract BigQueryFileLoadsWriteSchemaTransformConfiguration build(); - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java index 3212e2a30348..7c89cb09ef6d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java @@ -17,34 +17,25 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; -import com.google.api.services.bigquery.model.Table; -import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; import com.google.auto.service.AutoService; -import java.io.IOException; import java.util.Collections; import java.util.List; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.io.InvalidConfigurationException; +import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteConfiguration; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; -import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; -import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; /** * An implementation of {@link TypedSchemaTransformProvider} for BigQuery write jobs configured - * using {@link BigQueryFileLoadsWriteSchemaTransformConfiguration}. + * using {@link BigQueryWriteConfiguration}. * *

Internal only: This class is actively being worked on, and it will likely change. We * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam @@ -56,127 +47,78 @@ @Internal @AutoService(SchemaTransformProvider.class) public class BigQueryFileLoadsWriteSchemaTransformProvider - extends TypedSchemaTransformProvider { + extends TypedSchemaTransformProvider { private static final String IDENTIFIER = - "beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1"; - static final String INPUT_TAG = "INPUT"; + "beam:schematransform:org.apache.beam:bigquery_fileloads:v1"; + static final String INPUT_TAG = "input"; - /** Returns the expected class of the configuration. */ @Override - protected Class configurationClass() { - return BigQueryFileLoadsWriteSchemaTransformConfiguration.class; - } - - /** Returns the expected {@link SchemaTransform} of the configuration. */ - @Override - protected SchemaTransform from(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) { + protected SchemaTransform from(BigQueryWriteConfiguration configuration) { return new BigQueryWriteSchemaTransform(configuration); } - /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */ @Override public String identifier() { return IDENTIFIER; } - /** - * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since a - * single is expected, this returns a list with a single name. - */ @Override public List inputCollectionNames() { return Collections.singletonList(INPUT_TAG); } - /** - * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since - * no output is expected, this returns an empty list. - */ @Override public List outputCollectionNames() { return Collections.emptyList(); } - /** - * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based on a {@link - * BigQueryFileLoadsWriteSchemaTransformConfiguration}. - */ protected static class BigQueryWriteSchemaTransform extends SchemaTransform { /** An instance of {@link BigQueryServices} used for testing. */ private BigQueryServices testBigQueryServices = null; - private final BigQueryFileLoadsWriteSchemaTransformConfiguration configuration; + private final BigQueryWriteConfiguration configuration; - BigQueryWriteSchemaTransform(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) { + BigQueryWriteSchemaTransform(BigQueryWriteConfiguration configuration) { + configuration.validate(); this.configuration = configuration; } @Override - public void validate(PipelineOptions options) { - if (!configuration.getCreateDisposition().equals(CreateDisposition.CREATE_NEVER.name())) { - return; - } + public PCollectionRowTuple expand(PCollectionRowTuple input) { + PCollection rowPCollection = input.getSinglePCollection(); + BigQueryIO.Write write = toWrite(); + rowPCollection.apply(write); - BigQueryOptions bigQueryOptions = options.as(BigQueryOptions.class); + return PCollectionRowTuple.empty(input.getPipeline()); + } - BigQueryServices bigQueryServices = new BigQueryServicesImpl(); - if (testBigQueryServices != null) { - bigQueryServices = testBigQueryServices; + BigQueryIO.Write toWrite() { + BigQueryIO.Write write = + BigQueryIO.write() + .to(configuration.getTable()) + .withMethod(BigQueryIO.Write.Method.FILE_LOADS) + .withFormatFunction(BigQueryUtils.toTableRow()) + .useBeamSchema(); + + if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) { + CreateDisposition createDisposition = + CreateDisposition.valueOf(configuration.getCreateDisposition().toUpperCase()); + write = write.withCreateDisposition(createDisposition); } - - DatasetService datasetService = bigQueryServices.getDatasetService(bigQueryOptions); - TableReference tableReference = BigQueryUtils.toTableReference(configuration.getTableSpec()); - - try { - Table table = datasetService.getTable(tableReference); - if (table == null) { - throw new NullPointerException(); - } - - if (table.getSchema() == null) { - throw new InvalidConfigurationException( - String.format("could not fetch schema for table: %s", configuration.getTableSpec())); - } - - } catch (NullPointerException | InterruptedException | IOException ex) { - throw new InvalidConfigurationException( - String.format( - "could not fetch table %s, error: %s", - configuration.getTableSpec(), ex.getMessage())); + if (!Strings.isNullOrEmpty(configuration.getWriteDisposition())) { + WriteDisposition writeDisposition = + WriteDisposition.valueOf(configuration.getWriteDisposition().toUpperCase()); + write = write.withWriteDisposition(writeDisposition); + } + if (!Strings.isNullOrEmpty(configuration.getKmsKey())) { + write = write.withKmsKey(configuration.getKmsKey()); } - } - - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - validate(input); - PCollection rowPCollection = input.get(INPUT_TAG); - Schema schema = rowPCollection.getSchema(); - BigQueryIO.Write write = toWrite(schema); if (testBigQueryServices != null) { write = write.withTestServices(testBigQueryServices); } - PCollection tableRowPCollection = - rowPCollection.apply( - MapElements.into(TypeDescriptor.of(TableRow.class)).via(BigQueryUtils::toTableRow)); - tableRowPCollection.apply(write); - return PCollectionRowTuple.empty(input.getPipeline()); - } - - /** Instantiates a {@link BigQueryIO.Write} from a {@link Schema}. */ - BigQueryIO.Write toWrite(Schema schema) { - TableSchema tableSchema = BigQueryUtils.toTableSchema(schema); - CreateDisposition createDisposition = - CreateDisposition.valueOf(configuration.getCreateDisposition()); - WriteDisposition writeDisposition = - WriteDisposition.valueOf(configuration.getWriteDisposition()); - - return BigQueryIO.writeTableRows() - .to(configuration.getTableSpec()) - .withCreateDisposition(createDisposition) - .withWriteDisposition(writeDisposition) - .withSchema(tableSchema); + return write; } /** Setter for testing using {@link BigQueryServices}. */ @@ -184,73 +126,5 @@ BigQueryIO.Write toWrite(Schema schema) { void setTestBigQueryServices(BigQueryServices testBigQueryServices) { this.testBigQueryServices = testBigQueryServices; } - - /** Validate a {@link PCollectionRowTuple} input. */ - void validate(PCollectionRowTuple input) { - if (!input.has(INPUT_TAG)) { - throw new IllegalArgumentException( - String.format( - "%s %s is missing expected tag: %s", - getClass().getSimpleName(), input.getClass().getSimpleName(), INPUT_TAG)); - } - - PCollection rowInput = input.get(INPUT_TAG); - Schema sourceSchema = rowInput.getSchema(); - - if (sourceSchema == null) { - throw new IllegalArgumentException( - String.format("%s is null for input of tag: %s", Schema.class, INPUT_TAG)); - } - - if (!configuration.getCreateDisposition().equals(CreateDisposition.CREATE_NEVER.name())) { - return; - } - - BigQueryOptions bigQueryOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); - - BigQueryServices bigQueryServices = new BigQueryServicesImpl(); - if (testBigQueryServices != null) { - bigQueryServices = testBigQueryServices; - } - - DatasetService datasetService = bigQueryServices.getDatasetService(bigQueryOptions); - TableReference tableReference = BigQueryUtils.toTableReference(configuration.getTableSpec()); - - try { - Table table = datasetService.getTable(tableReference); - if (table == null) { - throw new NullPointerException(); - } - - TableSchema tableSchema = table.getSchema(); - if (tableSchema == null) { - throw new NullPointerException(); - } - - Schema destinationSchema = BigQueryUtils.fromTableSchema(tableSchema); - if (destinationSchema == null) { - throw new NullPointerException(); - } - - validateMatching(sourceSchema, destinationSchema); - - } catch (NullPointerException | InterruptedException | IOException e) { - throw new InvalidConfigurationException( - String.format( - "could not validate input for create disposition: %s and table: %s, error: %s", - configuration.getCreateDisposition(), - configuration.getTableSpec(), - e.getMessage())); - } - } - - void validateMatching(Schema sourceSchema, Schema destinationSchema) { - if (!sourceSchema.equals(destinationSchema)) { - throw new IllegalArgumentException( - String.format( - "source and destination schema mismatch for table: %s", - configuration.getTableSpec())); - } - } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformTranslation.java new file mode 100644 index 000000000000..102a1840e177 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformTranslation.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransform; +import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransform; + +import com.google.auto.service.AutoService; +import java.util.Map; +import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryDirectReadSchemaTransformProvider; +import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.construction.PTransformTranslation; +import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; + +public class BigQuerySchemaTransformTranslation { + public static class BigQueryStorageReadSchemaTransformTranslator + extends SchemaTransformTranslation.SchemaTransformPayloadTranslator< + BigQueryDirectReadSchemaTransform> { + @Override + public SchemaTransformProvider provider() { + return new BigQueryDirectReadSchemaTransformProvider(); + } + + @Override + public Row toConfigRow(BigQueryDirectReadSchemaTransform transform) { + return transform.getConfigurationRow(); + } + } + + public static class BigQueryStorageWriteSchemaTransformTranslator + extends SchemaTransformTranslation.SchemaTransformPayloadTranslator< + BigQueryStorageWriteApiSchemaTransform> { + @Override + public SchemaTransformProvider provider() { + return new BigQueryStorageWriteApiSchemaTransformProvider(); + } + + @Override + public Row toConfigRow(BigQueryStorageWriteApiSchemaTransform transform) { + return transform.getConfigurationRow(); + } + } + + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class ReadWriteRegistrar implements TransformPayloadTranslatorRegistrar { + @Override + @SuppressWarnings({ + "rawtypes", + }) + public Map< + ? extends Class, + ? extends PTransformTranslation.TransformPayloadTranslator> + getTransformPayloadTranslators() { + return ImmutableMap + ., PTransformTranslation.TransformPayloadTranslator>builder() + .put( + BigQueryDirectReadSchemaTransform.class, + new BigQueryStorageReadSchemaTransformTranslator()) + .put( + BigQueryStorageWriteApiSchemaTransform.class, + new BigQueryStorageWriteSchemaTransformTranslator()) + .build(); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java index 8b8e8179ce7d..76ba186d2e82 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java @@ -33,7 +33,9 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransformConfiguration; import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; @@ -62,7 +64,7 @@ public class BigQueryDirectReadSchemaTransformProvider extends TypedSchemaTransformProvider { - private static final String OUTPUT_TAG = "OUTPUT_ROWS"; + public static final String OUTPUT_TAG = "output"; @Override protected Class configurationClass() { @@ -76,7 +78,7 @@ protected SchemaTransform from(BigQueryDirectReadSchemaTransformConfiguration co @Override public String identifier() { - return "beam:schematransform:org.apache.beam:bigquery_storage_read:v1"; + return "beam:schematransform:org.apache.beam:bigquery_storage_read:v1"; // getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ); } @Override @@ -139,6 +141,10 @@ public static Builder builder() { @Nullable public abstract List getSelectedFields(); + @SchemaFieldDescription("Use this Cloud KMS key to encrypt your data") + @Nullable + public abstract String getKmsKey(); + @Nullable /** Builder for the {@link BigQueryDirectReadSchemaTransformConfiguration}. */ @AutoValue.Builder @@ -151,6 +157,8 @@ public abstract static class Builder { public abstract Builder setSelectedFields(List selectedFields); + public abstract Builder setKmsKey(String kmsKey); + /** Builds a {@link BigQueryDirectReadSchemaTransformConfiguration} instance. */ public abstract BigQueryDirectReadSchemaTransformConfiguration build(); } @@ -161,7 +169,7 @@ public abstract static class Builder { * BigQueryDirectReadSchemaTransformConfiguration} and instantiated by {@link * BigQueryDirectReadSchemaTransformProvider}. */ - protected static class BigQueryDirectReadSchemaTransform extends SchemaTransform { + public static class BigQueryDirectReadSchemaTransform extends SchemaTransform { private BigQueryServices testBigQueryServices = null; private final BigQueryDirectReadSchemaTransformConfiguration configuration; @@ -172,6 +180,20 @@ protected static class BigQueryDirectReadSchemaTransform extends SchemaTransform this.configuration = configuration; } + public Row getConfigurationRow() { + try { + // To stay consistent with our SchemaTransform configuration naming conventions, + // we sort lexicographically + return SchemaRegistry.createDefault() + .getToRowFunction(BigQueryDirectReadSchemaTransformConfiguration.class) + .apply(configuration) + .sorted() + .toSnakeCase(); + } catch (NoSuchSchemaException e) { + throw new RuntimeException(e); + } + } + @VisibleForTesting public void setBigQueryServices(BigQueryServices testBigQueryServices) { this.testBigQueryServices = testBigQueryServices; @@ -211,6 +233,9 @@ BigQueryIO.TypedRead createDirectReadTransform() { } else { read = read.fromQuery(configuration.getQuery()); } + if (!Strings.isNullOrEmpty(configuration.getKmsKey())) { + read = read.withKmsKey(configuration.getKmsKey()); + } if (this.testBigQueryServices != null) { read = read.withTestServices(testBigQueryServices); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index c1c06fc592f4..403e13264e20 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -17,20 +17,16 @@ */ package org.apache.beam.sdk.io.gcp.bigquery.providers; +import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteConfiguration.DYNAMIC_DESTINATIONS; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import com.google.api.services.bigquery.model.TableConstraints; import com.google.api.services.bigquery.model.TableSchema; import com.google.auto.service.AutoService; -import com.google.auto.value.AutoValue; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Optional; -import javax.annotation.Nullable; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method; @@ -42,15 +38,13 @@ import org.apache.beam.sdk.io.gcp.bigquery.RowMutationInformation; import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; -import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; -import org.apache.beam.sdk.schemas.annotations.DefaultSchema; -import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; @@ -65,12 +59,11 @@ import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.joda.time.Duration; /** * An implementation of {@link TypedSchemaTransformProvider} for BigQuery Storage Write API jobs - * configured via {@link BigQueryStorageWriteApiSchemaTransformConfiguration}. + * configured via {@link BigQueryWriteConfiguration}. * *

Internal only: This class is actively being worked on, and it will likely change. We * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam @@ -81,7 +74,7 @@ }) @AutoService(SchemaTransformProvider.class) public class BigQueryStorageWriteApiSchemaTransformProvider - extends TypedSchemaTransformProvider { + extends TypedSchemaTransformProvider { private static final Integer DEFAULT_TRIGGER_FREQUENCY_SECS = 5; private static final Duration DEFAULT_TRIGGERING_FREQUENCY = Duration.standardSeconds(DEFAULT_TRIGGER_FREQUENCY_SECS); @@ -89,7 +82,6 @@ public class BigQueryStorageWriteApiSchemaTransformProvider private static final String FAILED_ROWS_TAG = "FailedRows"; private static final String FAILED_ROWS_WITH_ERRORS_TAG = "FailedRowsWithErrors"; // magic string that tells us to write to dynamic destinations - protected static final String DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS"; protected static final String ROW_PROPERTY_MUTATION_INFO = "row_mutation_info"; protected static final String ROW_PROPERTY_MUTATION_TYPE = "mutation_type"; protected static final String ROW_PROPERTY_MUTATION_SQN = "change_sequence_number"; @@ -100,14 +92,13 @@ public class BigQueryStorageWriteApiSchemaTransformProvider .build(); @Override - protected SchemaTransform from( - BigQueryStorageWriteApiSchemaTransformConfiguration configuration) { + protected SchemaTransform from(BigQueryWriteConfiguration configuration) { return new BigQueryStorageWriteApiSchemaTransform(configuration); } @Override public String identifier() { - return String.format("beam:schematransform:org.apache.beam:bigquery_storage_write:v2"); + return "beam:schematransform:org.apache.beam:bigquery_storage_write:v2"; // getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE); } @Override @@ -130,201 +121,17 @@ public List outputCollectionNames() { return Arrays.asList(FAILED_ROWS_TAG, FAILED_ROWS_WITH_ERRORS_TAG, "errors"); } - /** Configuration for writing to BigQuery with Storage Write API. */ - @DefaultSchema(AutoValueSchema.class) - @AutoValue - public abstract static class BigQueryStorageWriteApiSchemaTransformConfiguration { - - static final Map CREATE_DISPOSITIONS = - ImmutableMap.builder() - .put(CreateDisposition.CREATE_IF_NEEDED.name(), CreateDisposition.CREATE_IF_NEEDED) - .put(CreateDisposition.CREATE_NEVER.name(), CreateDisposition.CREATE_NEVER) - .build(); - - static final Map WRITE_DISPOSITIONS = - ImmutableMap.builder() - .put(WriteDisposition.WRITE_TRUNCATE.name(), WriteDisposition.WRITE_TRUNCATE) - .put(WriteDisposition.WRITE_EMPTY.name(), WriteDisposition.WRITE_EMPTY) - .put(WriteDisposition.WRITE_APPEND.name(), WriteDisposition.WRITE_APPEND) - .build(); - - @AutoValue - public abstract static class ErrorHandling { - @SchemaFieldDescription("The name of the output PCollection containing failed writes.") - public abstract String getOutput(); - - public static Builder builder() { - return new AutoValue_BigQueryStorageWriteApiSchemaTransformProvider_BigQueryStorageWriteApiSchemaTransformConfiguration_ErrorHandling - .Builder(); - } - - @AutoValue.Builder - public abstract static class Builder { - public abstract Builder setOutput(String output); - - public abstract ErrorHandling build(); - } - } - - public void validate() { - String invalidConfigMessage = "Invalid BigQuery Storage Write configuration: "; - - // validate output table spec - checkArgument( - !Strings.isNullOrEmpty(this.getTable()), - invalidConfigMessage + "Table spec for a BigQuery Write must be specified."); - - // if we have an input table spec, validate it - if (!this.getTable().equals(DYNAMIC_DESTINATIONS)) { - checkNotNull(BigQueryHelpers.parseTableSpec(this.getTable())); - } - - // validate create and write dispositions - if (!Strings.isNullOrEmpty(this.getCreateDisposition())) { - checkNotNull( - CREATE_DISPOSITIONS.get(this.getCreateDisposition().toUpperCase()), - invalidConfigMessage - + "Invalid create disposition (%s) was specified. Available dispositions are: %s", - this.getCreateDisposition(), - CREATE_DISPOSITIONS.keySet()); - } - if (!Strings.isNullOrEmpty(this.getWriteDisposition())) { - checkNotNull( - WRITE_DISPOSITIONS.get(this.getWriteDisposition().toUpperCase()), - invalidConfigMessage - + "Invalid write disposition (%s) was specified. Available dispositions are: %s", - this.getWriteDisposition(), - WRITE_DISPOSITIONS.keySet()); - } - - if (this.getErrorHandling() != null) { - checkArgument( - !Strings.isNullOrEmpty(this.getErrorHandling().getOutput()), - invalidConfigMessage + "Output must not be empty if error handling specified."); - } - - if (this.getAutoSharding() != null - && this.getAutoSharding() - && this.getNumStreams() != null) { - checkArgument( - this.getNumStreams() == 0, - invalidConfigMessage - + "Cannot set a fixed number of streams when auto-sharding is enabled. Please pick only one of the two options."); - } - } - - /** - * Instantiates a {@link BigQueryStorageWriteApiSchemaTransformConfiguration.Builder} instance. - */ - public static Builder builder() { - return new AutoValue_BigQueryStorageWriteApiSchemaTransformProvider_BigQueryStorageWriteApiSchemaTransformConfiguration - .Builder(); - } - - @SchemaFieldDescription( - "The bigquery table to write to. Format: [${PROJECT}:]${DATASET}.${TABLE}") - public abstract String getTable(); - - @SchemaFieldDescription( - "Optional field that specifies whether the job is allowed to create new tables. " - + "The following values are supported: CREATE_IF_NEEDED (the job may create the table), CREATE_NEVER (" - + "the job must fail if the table does not exist already).") - @Nullable - public abstract String getCreateDisposition(); - - @SchemaFieldDescription( - "Specifies the action that occurs if the destination table already exists. " - + "The following values are supported: " - + "WRITE_TRUNCATE (overwrites the table data), " - + "WRITE_APPEND (append the data to the table), " - + "WRITE_EMPTY (job must fail if the table is not empty).") - @Nullable - public abstract String getWriteDisposition(); - - @SchemaFieldDescription( - "Determines how often to 'commit' progress into BigQuery. Default is every 5 seconds.") - @Nullable - public abstract Long getTriggeringFrequencySeconds(); - - @SchemaFieldDescription( - "This option enables lower latency for insertions to BigQuery but may ocassionally " - + "duplicate data elements.") - @Nullable - public abstract Boolean getUseAtLeastOnceSemantics(); - - @SchemaFieldDescription( - "This option enables using a dynamically determined number of Storage Write API streams to write to " - + "BigQuery. Only applicable to unbounded data.") - @Nullable - public abstract Boolean getAutoSharding(); - - @SchemaFieldDescription( - "Specifies the number of write streams that the Storage API sink will use. " - + "This parameter is only applicable when writing unbounded data.") - @Nullable - public abstract Integer getNumStreams(); - - @SchemaFieldDescription("This option specifies whether and where to output unwritable rows.") - @Nullable - public abstract ErrorHandling getErrorHandling(); - - @SchemaFieldDescription( - "This option enables the use of BigQuery CDC functionality. The expected PCollection" - + " should contain Beam Rows with a schema wrapping the record to be inserted and" - + " adding the CDC info similar to: {row_mutation_info: {mutation_type:\"...\", " - + "change_sequence_number:\"...\"}, record: {...}}") - @Nullable - public abstract Boolean getUseCdcWrites(); - - @SchemaFieldDescription( - "If CREATE_IF_NEEDED disposition is set, BigQuery table(s) will be created with this" - + " columns as primary key. Required when CDC writes are enabled with CREATE_IF_NEEDED.") - @Nullable - public abstract List getPrimaryKey(); - - /** Builder for {@link BigQueryStorageWriteApiSchemaTransformConfiguration}. */ - @AutoValue.Builder - public abstract static class Builder { - - public abstract Builder setTable(String table); - - public abstract Builder setCreateDisposition(String createDisposition); - - public abstract Builder setWriteDisposition(String writeDisposition); - - public abstract Builder setTriggeringFrequencySeconds(Long seconds); - - public abstract Builder setUseAtLeastOnceSemantics(Boolean use); - - public abstract Builder setAutoSharding(Boolean autoSharding); - - public abstract Builder setNumStreams(Integer numStreams); - - public abstract Builder setErrorHandling(ErrorHandling errorHandling); - - public abstract Builder setUseCdcWrites(Boolean cdcWrites); - - public abstract Builder setPrimaryKey(List pkColumns); - - /** Builds a {@link BigQueryStorageWriteApiSchemaTransformConfiguration} instance. */ - public abstract BigQueryStorageWriteApiSchemaTransformProvider - .BigQueryStorageWriteApiSchemaTransformConfiguration - build(); - } - } - /** * A {@link SchemaTransform} for BigQuery Storage Write API, configured with {@link - * BigQueryStorageWriteApiSchemaTransformConfiguration} and instantiated by {@link + * BigQueryWriteConfiguration} and instantiated by {@link * BigQueryStorageWriteApiSchemaTransformProvider}. */ - protected static class BigQueryStorageWriteApiSchemaTransform extends SchemaTransform { + public static class BigQueryStorageWriteApiSchemaTransform extends SchemaTransform { private BigQueryServices testBigQueryServices = null; - private final BigQueryStorageWriteApiSchemaTransformConfiguration configuration; + private final BigQueryWriteConfiguration configuration; - BigQueryStorageWriteApiSchemaTransform( - BigQueryStorageWriteApiSchemaTransformConfiguration configuration) { + BigQueryStorageWriteApiSchemaTransform(BigQueryWriteConfiguration configuration) { configuration.validate(); this.configuration = configuration; } @@ -420,8 +227,7 @@ public TableConstraints getTableConstraints(String destination) { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { // Check that the input exists - checkArgument(input.has(INPUT_ROWS_TAG), "Missing expected input tag: %s", INPUT_ROWS_TAG); - PCollection inputRows = input.get(INPUT_ROWS_TAG); + PCollection inputRows = input.getSinglePCollection(); BigQueryIO.Write write = createStorageWriteApiTransform(inputRows.getSchema()); @@ -505,6 +311,20 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } } + public Row getConfigurationRow() { + try { + // To stay consistent with our SchemaTransform configuration naming conventions, + // we sort lexicographically + return SchemaRegistry.createDefault() + .getToRowFunction(BigQueryWriteConfiguration.class) + .apply(configuration) + .sorted() + .toSnakeCase(); + } catch (NoSuchSchemaException e) { + throw new RuntimeException(e); + } + } + void validateDynamicDestinationsExpectedSchema(Schema schema) { checkArgument( schema.getFieldNames().containsAll(Arrays.asList("destination", "record")), @@ -540,18 +360,18 @@ BigQueryIO.Write createStorageWriteApiTransform(Schema schema) { if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) { CreateDisposition createDisposition = - BigQueryStorageWriteApiSchemaTransformConfiguration.CREATE_DISPOSITIONS.get( - configuration.getCreateDisposition().toUpperCase()); + CreateDisposition.valueOf(configuration.getCreateDisposition().toUpperCase()); write = write.withCreateDisposition(createDisposition); } if (!Strings.isNullOrEmpty(configuration.getWriteDisposition())) { WriteDisposition writeDisposition = - BigQueryStorageWriteApiSchemaTransformConfiguration.WRITE_DISPOSITIONS.get( - configuration.getWriteDisposition().toUpperCase()); + WriteDisposition.valueOf(configuration.getWriteDisposition().toUpperCase()); write = write.withWriteDisposition(writeDisposition); } - + if (!Strings.isNullOrEmpty(configuration.getKmsKey())) { + write = write.withKmsKey(configuration.getKmsKey()); + } if (this.testBigQueryServices != null) { write = write.withTestServices(testBigQueryServices); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java new file mode 100644 index 000000000000..acc5b1ff6ea4 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery.providers; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; + +/** + * Configuration for writing to BigQuery with SchemaTransforms. Used by {@link + * BigQueryStorageWriteApiSchemaTransformProvider} and {@link + * org.apache.beam.sdk.io.gcp.bigquery.BigQueryFileLoadsWriteSchemaTransformProvider}. + */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class BigQueryWriteConfiguration { + protected static final String DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS"; + + @AutoValue + public abstract static class ErrorHandling { + @SchemaFieldDescription("The name of the output PCollection containing failed writes.") + public abstract String getOutput(); + + public static Builder builder() { + return new AutoValue_BigQueryWriteConfiguration_ErrorHandling.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setOutput(String output); + + public abstract ErrorHandling build(); + } + } + + public void validate() { + String invalidConfigMessage = "Invalid BigQuery Storage Write configuration: "; + + // validate output table spec + checkArgument( + !Strings.isNullOrEmpty(this.getTable()), + invalidConfigMessage + "Table spec for a BigQuery Write must be specified."); + + // if we have an input table spec, validate it + if (!this.getTable().equals(DYNAMIC_DESTINATIONS)) { + checkNotNull(BigQueryHelpers.parseTableSpec(this.getTable())); + } + + // validate create and write dispositions + String createDisposition = getCreateDisposition(); + if (createDisposition != null && !createDisposition.isEmpty()) { + List createDispositions = + Arrays.stream(BigQueryIO.Write.CreateDisposition.values()) + .map(c -> c.name()) + .collect(Collectors.toList()); + Preconditions.checkArgument( + createDispositions.contains(createDisposition.toUpperCase()), + "Invalid create disposition (%s) was specified. Available dispositions are: %s", + createDisposition, + createDispositions); + } + String writeDisposition = getWriteDisposition(); + if (writeDisposition != null && !writeDisposition.isEmpty()) { + List writeDispostions = + Arrays.stream(BigQueryIO.Write.WriteDisposition.values()) + .map(w -> w.name()) + .collect(Collectors.toList()); + Preconditions.checkArgument( + writeDispostions.contains(writeDisposition.toUpperCase()), + "Invalid write disposition (%s) was specified. Available dispositions are: %s", + writeDisposition, + writeDispostions); + } + + ErrorHandling errorHandling = getErrorHandling(); + if (errorHandling != null) { + checkArgument( + !Strings.isNullOrEmpty(errorHandling.getOutput()), + invalidConfigMessage + "Output must not be empty if error handling specified."); + } + + Boolean autoSharding = getAutoSharding(); + Integer numStreams = getNumStreams(); + if (autoSharding != null && autoSharding && numStreams != null) { + checkArgument( + numStreams == 0, + invalidConfigMessage + + "Cannot set a fixed number of streams when auto-sharding is enabled. Please pick only one of the two options."); + } + } + + /** Instantiates a {@link BigQueryWriteConfiguration.Builder} instance. */ + public static Builder builder() { + return new AutoValue_BigQueryWriteConfiguration.Builder(); + } + + @SchemaFieldDescription( + "The bigquery table to write to. Format: [${PROJECT}:]${DATASET}.${TABLE}") + public abstract String getTable(); + + @SchemaFieldDescription( + "Optional field that specifies whether the job is allowed to create new tables. " + + "The following values are supported: CREATE_IF_NEEDED (the job may create the table), CREATE_NEVER (" + + "the job must fail if the table does not exist already).") + @Nullable + public abstract String getCreateDisposition(); + + @SchemaFieldDescription( + "Specifies the action that occurs if the destination table already exists. " + + "The following values are supported: " + + "WRITE_TRUNCATE (overwrites the table data), " + + "WRITE_APPEND (append the data to the table), " + + "WRITE_EMPTY (job must fail if the table is not empty).") + @Nullable + public abstract String getWriteDisposition(); + + @SchemaFieldDescription( + "Determines how often to 'commit' progress into BigQuery. Default is every 5 seconds.") + @Nullable + public abstract Long getTriggeringFrequencySeconds(); + + @SchemaFieldDescription( + "This option enables lower latency for insertions to BigQuery but may ocassionally " + + "duplicate data elements.") + @Nullable + public abstract Boolean getUseAtLeastOnceSemantics(); + + @SchemaFieldDescription( + "This option enables using a dynamically determined number of Storage Write API streams to write to " + + "BigQuery. Only applicable to unbounded data.") + @Nullable + public abstract Boolean getAutoSharding(); + + @SchemaFieldDescription( + "Specifies the number of write streams that the Storage API sink will use. " + + "This parameter is only applicable when writing unbounded data.") + @Nullable + public abstract Integer getNumStreams(); + + @SchemaFieldDescription("Use this Cloud KMS key to encrypt your data") + @Nullable + public abstract String getKmsKey(); + + @SchemaFieldDescription("This option specifies whether and where to output unwritable rows.") + @Nullable + public abstract ErrorHandling getErrorHandling(); + + @SchemaFieldDescription( + "This option enables the use of BigQuery CDC functionality. The expected PCollection" + + " should contain Beam Rows with a schema wrapping the record to be inserted and" + + " adding the CDC info similar to: {row_mutation_info: {mutation_type:\"...\", " + + "change_sequence_number:\"...\"}, record: {...}}") + @Nullable + public abstract Boolean getUseCdcWrites(); + + @SchemaFieldDescription( + "If CREATE_IF_NEEDED disposition is set, BigQuery table(s) will be created with this" + + " columns as primary key. Required when CDC writes are enabled with CREATE_IF_NEEDED.") + @Nullable + public abstract List getPrimaryKey(); + + /** Builder for {@link BigQueryWriteConfiguration}. */ + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setTable(String table); + + public abstract Builder setCreateDisposition(String createDisposition); + + public abstract Builder setWriteDisposition(String writeDisposition); + + public abstract Builder setTriggeringFrequencySeconds(Long seconds); + + public abstract Builder setUseAtLeastOnceSemantics(Boolean use); + + public abstract Builder setAutoSharding(Boolean autoSharding); + + public abstract Builder setNumStreams(Integer numStreams); + + public abstract Builder setKmsKey(String kmsKey); + + public abstract Builder setErrorHandling(ErrorHandling errorHandling); + + public abstract Builder setUseCdcWrites(Boolean cdcWrites); + + public abstract Builder setPrimaryKey(List pkColumns); + + /** Builds a {@link BigQueryWriteConfiguration} instance. */ + public abstract BigQueryWriteConfiguration build(); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProviderTest.java index dd8bb9fc8664..1e4791b94e1c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProviderTest.java @@ -17,38 +17,27 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; -import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryFileLoadsWriteSchemaTransformProvider.INPUT_TAG; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThrows; import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; import java.io.IOException; import java.util.Arrays; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryFileLoadsWriteSchemaTransformProvider.BigQueryWriteSchemaTransform; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteConfiguration; import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices; import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService; import org.apache.beam.sdk.io.gcp.testing.FakeJobService; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; -import org.apache.beam.sdk.schemas.io.InvalidConfigurationException; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayData.Identifier; -import org.apache.beam.sdk.transforms.display.DisplayData.Item; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; -import org.apache.commons.lang3.tuple.Pair; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -71,8 +60,6 @@ public class BigQueryFileLoadsWriteSchemaTransformProviderTest { private static final Schema SCHEMA = Schema.of(Field.of("name", FieldType.STRING), Field.of("number", FieldType.INT64)); - private static final TableSchema TABLE_SCHEMA = BigQueryUtils.toTableSchema(SCHEMA); - private static final List ROWS = Arrays.asList( Row.withSchema(SCHEMA).withFieldValue("name", "a").withFieldValue("number", 1L).build(), @@ -109,9 +96,9 @@ public void tearDown() { public void testLoad() throws IOException, InterruptedException { BigQueryFileLoadsWriteSchemaTransformProvider provider = new BigQueryFileLoadsWriteSchemaTransformProvider(); - BigQueryFileLoadsWriteSchemaTransformConfiguration configuration = - BigQueryFileLoadsWriteSchemaTransformConfiguration.builder() - .setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE)) + BigQueryWriteConfiguration configuration = + BigQueryWriteConfiguration.builder() + .setTable(BigQueryHelpers.toTableSpec(TABLE_REFERENCE)) .setWriteDisposition(WriteDisposition.WRITE_TRUNCATE.name()) .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name()) .build(); @@ -128,138 +115,4 @@ public void testLoad() throws IOException, InterruptedException { assertNotNull(fakeDatasetService.getTable(TABLE_REFERENCE)); assertEquals(ROWS.size(), fakeDatasetService.getAllRows(PROJECT, DATASET, TABLE_ID).size()); } - - @Test - public void testValidatePipelineOptions() { - List< - Pair< - BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder, - Class>> - cases = - Arrays.asList( - Pair.of( - BigQueryFileLoadsWriteSchemaTransformConfiguration.builder() - .setTableSpec("project.doesnot.exist") - .setCreateDisposition(CreateDisposition.CREATE_NEVER.name()) - .setWriteDisposition(WriteDisposition.WRITE_APPEND.name()), - InvalidConfigurationException.class), - Pair.of( - BigQueryFileLoadsWriteSchemaTransformConfiguration.builder() - .setTableSpec(String.format("%s.%s.%s", PROJECT, DATASET, "doesnotexist")) - .setCreateDisposition(CreateDisposition.CREATE_NEVER.name()) - .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name()), - InvalidConfigurationException.class), - Pair.of( - BigQueryFileLoadsWriteSchemaTransformConfiguration.builder() - .setTableSpec("project.doesnot.exist") - .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name()) - .setWriteDisposition(WriteDisposition.WRITE_APPEND.name()), - null)); - for (Pair< - BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder, Class> - caze : cases) { - BigQueryWriteSchemaTransform transform = transformFrom(caze.getLeft().build()); - if (caze.getRight() != null) { - assertThrows(caze.getRight(), () -> transform.validate(p.getOptions())); - } else { - transform.validate(p.getOptions()); - } - } - } - - @Test - public void testToWrite() { - List< - Pair< - BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder, - BigQueryIO.Write>> - cases = - Arrays.asList( - Pair.of( - BigQueryFileLoadsWriteSchemaTransformConfiguration.builder() - .setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE)) - .setCreateDisposition(CreateDisposition.CREATE_NEVER.name()) - .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name()), - BigQueryIO.writeTableRows() - .to(TABLE_REFERENCE) - .withCreateDisposition(CreateDisposition.CREATE_NEVER) - .withWriteDisposition(WriteDisposition.WRITE_EMPTY) - .withSchema(TABLE_SCHEMA)), - Pair.of( - BigQueryFileLoadsWriteSchemaTransformConfiguration.builder() - .setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE)) - .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name()) - .setWriteDisposition(WriteDisposition.WRITE_TRUNCATE.name()), - BigQueryIO.writeTableRows() - .to(TABLE_REFERENCE) - .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) - .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE) - .withSchema(TABLE_SCHEMA))); - for (Pair< - BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder, BigQueryIO.Write> - caze : cases) { - BigQueryWriteSchemaTransform transform = transformFrom(caze.getLeft().build()); - Map gotDisplayData = DisplayData.from(transform.toWrite(SCHEMA)).asMap(); - Map wantDisplayData = DisplayData.from(caze.getRight()).asMap(); - Set keys = new HashSet<>(); - keys.addAll(gotDisplayData.keySet()); - keys.addAll(wantDisplayData.keySet()); - for (Identifier key : keys) { - Item got = null; - Item want = null; - if (gotDisplayData.containsKey(key)) { - got = gotDisplayData.get(key); - } - if (wantDisplayData.containsKey(key)) { - want = wantDisplayData.get(key); - } - assertEquals(want, got); - } - } - } - - @Test - public void validatePCollectionRowTupleInput() { - PCollectionRowTuple empty = PCollectionRowTuple.empty(p); - PCollectionRowTuple valid = - PCollectionRowTuple.of( - INPUT_TAG, p.apply("CreateRowsWithValidSchema", Create.of(ROWS)).setRowSchema(SCHEMA)); - - PCollectionRowTuple invalid = - PCollectionRowTuple.of( - INPUT_TAG, - p.apply( - "CreateRowsWithInvalidSchema", - Create.of( - Row.nullRow( - Schema.builder().addNullableField("name", FieldType.STRING).build())))); - - BigQueryWriteSchemaTransform transform = - transformFrom( - BigQueryFileLoadsWriteSchemaTransformConfiguration.builder() - .setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE)) - .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name()) - .setWriteDisposition(WriteDisposition.WRITE_APPEND.name()) - .build()); - - assertThrows(IllegalArgumentException.class, () -> transform.validate(empty)); - - assertThrows(IllegalStateException.class, () -> transform.validate(invalid)); - - transform.validate(valid); - - p.run(); - } - - private BigQueryWriteSchemaTransform transformFrom( - BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) { - BigQueryFileLoadsWriteSchemaTransformProvider provider = - new BigQueryFileLoadsWriteSchemaTransformProvider(); - BigQueryWriteSchemaTransform transform = - (BigQueryWriteSchemaTransform) provider.from(configuration); - - transform.setTestBigQueryServices(fakeBigQueryServices); - - return transform; - } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryManagedIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryManagedIT.java new file mode 100644 index 000000000000..a39e27bf7d85 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryManagedIT.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.LongStream; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryDirectReadSchemaTransformProvider; +import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; +import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PeriodicImpulse; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.joda.time.Duration; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** This class tests the execution of {@link Managed} BigQueryIO. */ +@RunWith(JUnit4.class) +public class BigQueryManagedIT { + private static final Schema SCHEMA = + Schema.of( + Schema.Field.of("str", Schema.FieldType.STRING), + Schema.Field.of("number", Schema.FieldType.INT64)); + + private static final List ROWS = + LongStream.range(0, 20) + .mapToObj( + i -> + Row.withSchema(SCHEMA) + .withFieldValue("str", Long.toString(i)) + .withFieldValue("number", i) + .build()) + .collect(Collectors.toList()); + + private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryManagedIT"); + + private static final String PROJECT = + TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); + private static final String BIG_QUERY_DATASET_ID = "bigquery_managed_" + System.nanoTime(); + + @BeforeClass + public static void setUpTestEnvironment() throws IOException, InterruptedException { + // Create one BQ dataset for all test cases. + BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID, null); + } + + @AfterClass + public static void cleanup() { + BQ_CLIENT.deleteDataset(PROJECT, BIG_QUERY_DATASET_ID); + } + + @Test + public void testStreamingStorageWriteRead() { + String table = String.format("%s:%s.managed_storage_write_read", PROJECT, BIG_QUERY_DATASET_ID); + + Map writeConfig = + ImmutableMap.builder().put("table", table).build(); + Pipeline p = Pipeline.create(); + PCollectionRowTuple.of("input", getInput(p, true)) + .apply(Managed.write(Managed.BIGQUERY).withConfig(writeConfig)); + p.run().waitUntilFinish(); + + Map readConfig = + ImmutableMap.builder().put("table", table).build(); + Pipeline q = Pipeline.create(); + PCollection outputRows = + PCollectionRowTuple.empty(p) + .apply(Managed.read(Managed.BIGQUERY).withConfig(readConfig)) + .get(BigQueryDirectReadSchemaTransformProvider.OUTPUT_TAG); + PAssert.that(outputRows).containsInAnyOrder(ROWS); + q.run().waitUntilFinish(); + } + + public PCollection getInput(Pipeline p, boolean isStreaming) { + if (isStreaming) { + return p.apply( + PeriodicImpulse.create() + .stopAfter(Duration.millis(20)) + .withInterval(Duration.millis(1))) + .apply( + MapElements.into(TypeDescriptors.rows()) + .via( + i -> + Row.withSchema(SCHEMA) + .withFieldValue("str", Long.toString(i.getMillis())) + .withFieldValue("number", i.getMillis()) + .build())) + .setRowSchema(SCHEMA); + } + return p.apply(Create.of(ROWS)).setRowSchema(SCHEMA); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformTranslationTest.java new file mode 100644 index 000000000000..bc6624bd9371 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformTranslationTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformTranslation.BigQueryStorageReadSchemaTransformTranslator; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformTranslation.BigQueryStorageWriteSchemaTransformTranslator; +import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransform; +import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransform; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryDirectReadSchemaTransformProvider; +import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.util.construction.BeamUrns; +import org.apache.beam.sdk.util.construction.PipelineTranslation; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class BigQuerySchemaTransformTranslationTest { + static final BigQueryStorageWriteApiSchemaTransformProvider WRITE_PROVIDER = + new BigQueryStorageWriteApiSchemaTransformProvider(); + static final BigQueryDirectReadSchemaTransformProvider READ_PROVIDER = + new BigQueryDirectReadSchemaTransformProvider(); + static final Row WRITE_CONFIG_ROW = + Row.withSchema(WRITE_PROVIDER.configurationSchema()) + .withFieldValue("table", "project:dataset.table") + .withFieldValue("create_disposition", "create_never") + .withFieldValue("write_disposition", "write_append") + .withFieldValue("triggering_frequency_seconds", 5L) + .withFieldValue("use_at_least_once_semantics", false) + .withFieldValue("auto_sharding", false) + .withFieldValue("num_streams", 5) + .withFieldValue("error_handling", null) + .build(); + static final Row READ_CONFIG_ROW = + Row.withSchema(READ_PROVIDER.configurationSchema()) + .withFieldValue("query", null) + .withFieldValue("table_spec", "apache-beam-testing.samples.weather_stations") + .withFieldValue("row_restriction", "col < 5") + .withFieldValue("selected_fields", Arrays.asList("col1", "col2", "col3")) + .build(); + + @Test + public void testRecreateWriteTransformFromRow() { + BigQueryStorageWriteApiSchemaTransform writeTransform = + (BigQueryStorageWriteApiSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG_ROW); + + BigQueryStorageWriteSchemaTransformTranslator translator = + new BigQueryStorageWriteSchemaTransformTranslator(); + Row translatedRow = translator.toConfigRow(writeTransform); + + BigQueryStorageWriteApiSchemaTransform writeTransformFromRow = + translator.fromConfigRow(translatedRow, PipelineOptionsFactory.create()); + + assertEquals(WRITE_CONFIG_ROW, writeTransformFromRow.getConfigurationRow()); + } + + @Test + public void testWriteTransformProtoTranslation() + throws InvalidProtocolBufferException, IOException { + // First build a pipeline + Pipeline p = Pipeline.create(); + Schema inputSchema = Schema.builder().addByteArrayField("b").build(); + PCollection input = + p.apply( + Create.of( + Collections.singletonList( + Row.withSchema(inputSchema).addValue(new byte[] {1, 2, 3}).build()))) + .setRowSchema(inputSchema); + + BigQueryStorageWriteApiSchemaTransform writeTransform = + (BigQueryStorageWriteApiSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG_ROW); + PCollectionRowTuple.of("input", input).apply(writeTransform); + + // Then translate the pipeline to a proto and extract KafkaWriteSchemaTransform proto + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); + List writeTransformProto = + pipelineProto.getComponents().getTransformsMap().values().stream() + .filter( + tr -> { + RunnerApi.FunctionSpec spec = tr.getSpec(); + try { + return spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM)) + && SchemaTransformPayload.parseFrom(spec.getPayload()) + .getIdentifier() + .equals(WRITE_PROVIDER.identifier()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + assertEquals(1, writeTransformProto.size()); + RunnerApi.FunctionSpec spec = writeTransformProto.get(0).getSpec(); + + // Check that the proto contains correct values + SchemaTransformPayload payload = SchemaTransformPayload.parseFrom(spec.getPayload()); + Schema schemaFromSpec = SchemaTranslation.schemaFromProto(payload.getConfigurationSchema()); + assertEquals(WRITE_PROVIDER.configurationSchema(), schemaFromSpec); + Row rowFromSpec = RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput()); + + assertEquals(WRITE_CONFIG_ROW, rowFromSpec); + + // Use the information in the proto to recreate the KafkaWriteSchemaTransform + BigQueryStorageWriteSchemaTransformTranslator translator = + new BigQueryStorageWriteSchemaTransformTranslator(); + BigQueryStorageWriteApiSchemaTransform writeTransformFromSpec = + translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create()); + + assertEquals(WRITE_CONFIG_ROW, writeTransformFromSpec.getConfigurationRow()); + } + + @Test + public void testReCreateReadTransformFromRow() { + BigQueryDirectReadSchemaTransform readTransform = + (BigQueryDirectReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG_ROW); + + BigQueryStorageReadSchemaTransformTranslator translator = + new BigQueryStorageReadSchemaTransformTranslator(); + Row row = translator.toConfigRow(readTransform); + + BigQueryDirectReadSchemaTransform readTransformFromRow = + translator.fromConfigRow(row, PipelineOptionsFactory.create()); + + assertEquals(READ_CONFIG_ROW, readTransformFromRow.getConfigurationRow()); + } + + @Test + public void testReadTransformProtoTranslation() + throws InvalidProtocolBufferException, IOException { + // First build a pipeline + Pipeline p = Pipeline.create(); + + BigQueryDirectReadSchemaTransform readTransform = + (BigQueryDirectReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG_ROW); + + PCollectionRowTuple.empty(p).apply(readTransform); + + // Then translate the pipeline to a proto and extract KafkaReadSchemaTransform proto + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); + List readTransformProto = + pipelineProto.getComponents().getTransformsMap().values().stream() + .filter( + tr -> { + RunnerApi.FunctionSpec spec = tr.getSpec(); + try { + return spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM)) + && SchemaTransformPayload.parseFrom(spec.getPayload()) + .getIdentifier() + .equals(READ_PROVIDER.identifier()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + assertEquals(1, readTransformProto.size()); + RunnerApi.FunctionSpec spec = readTransformProto.get(0).getSpec(); + + // Check that the proto contains correct values + SchemaTransformPayload payload = SchemaTransformPayload.parseFrom(spec.getPayload()); + Schema schemaFromSpec = SchemaTranslation.schemaFromProto(payload.getConfigurationSchema()); + assertEquals(READ_PROVIDER.configurationSchema(), schemaFromSpec); + Row rowFromSpec = RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput()); + assertEquals(READ_CONFIG_ROW, rowFromSpec); + + // Use the information in the proto to recreate the KafkaReadSchemaTransform + BigQueryStorageReadSchemaTransformTranslator translator = + new BigQueryStorageReadSchemaTransformTranslator(); + BigQueryDirectReadSchemaTransform readTransformFromSpec = + translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create()); + + assertEquals(READ_CONFIG_ROW, readTransformFromSpec.getConfigurationRow()); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java index 87ba2961461a..3a23f5a3205a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java @@ -35,7 +35,6 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransform; -import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration; import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices; import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService; import org.apache.beam.sdk.io.gcp.testing.FakeJobService; @@ -108,15 +107,14 @@ public void setUp() throws Exception { @Test public void testInvalidConfig() { - List invalidConfigs = + List invalidConfigs = Arrays.asList( - BigQueryStorageWriteApiSchemaTransformConfiguration.builder() - .setTable("not_a_valid_table_spec"), - BigQueryStorageWriteApiSchemaTransformConfiguration.builder() + BigQueryWriteConfiguration.builder().setTable("not_a_valid_table_spec"), + BigQueryWriteConfiguration.builder() .setTable("project:dataset.table") .setCreateDisposition("INVALID_DISPOSITION")); - for (BigQueryStorageWriteApiSchemaTransformConfiguration.Builder config : invalidConfigs) { + for (BigQueryWriteConfiguration.Builder config : invalidConfigs) { assertThrows( Exception.class, () -> { @@ -125,13 +123,11 @@ public void testInvalidConfig() { } } - public PCollectionRowTuple runWithConfig( - BigQueryStorageWriteApiSchemaTransformConfiguration config) { + public PCollectionRowTuple runWithConfig(BigQueryWriteConfiguration config) { return runWithConfig(config, ROWS); } - public PCollectionRowTuple runWithConfig( - BigQueryStorageWriteApiSchemaTransformConfiguration config, List inputRows) { + public PCollectionRowTuple runWithConfig(BigQueryWriteConfiguration config, List inputRows) { BigQueryStorageWriteApiSchemaTransformProvider provider = new BigQueryStorageWriteApiSchemaTransformProvider(); @@ -176,8 +172,8 @@ public boolean rowEquals(Row expectedRow, TableRow actualRow) { @Test public void testSimpleWrite() throws Exception { String tableSpec = "project:dataset.simple_write"; - BigQueryStorageWriteApiSchemaTransformConfiguration config = - BigQueryStorageWriteApiSchemaTransformConfiguration.builder().setTable(tableSpec).build(); + BigQueryWriteConfiguration config = + BigQueryWriteConfiguration.builder().setTable(tableSpec).build(); runWithConfig(config, ROWS); p.run().waitUntilFinish(); @@ -189,9 +185,9 @@ public void testSimpleWrite() throws Exception { @Test public void testWriteToDynamicDestinations() throws Exception { - String dynamic = BigQueryStorageWriteApiSchemaTransformProvider.DYNAMIC_DESTINATIONS; - BigQueryStorageWriteApiSchemaTransformConfiguration config = - BigQueryStorageWriteApiSchemaTransformConfiguration.builder().setTable(dynamic).build(); + String dynamic = BigQueryWriteConfiguration.DYNAMIC_DESTINATIONS; + BigQueryWriteConfiguration config = + BigQueryWriteConfiguration.builder().setTable(dynamic).build(); String baseTableSpec = "project:dataset.dynamic_write_"; @@ -273,8 +269,8 @@ public void testCDCWrites() throws Exception { String tableSpec = "project:dataset.cdc_write"; List primaryKeyColumns = ImmutableList.of("name"); - BigQueryStorageWriteApiSchemaTransformConfiguration config = - BigQueryStorageWriteApiSchemaTransformConfiguration.builder() + BigQueryWriteConfiguration config = + BigQueryWriteConfiguration.builder() .setUseAtLeastOnceSemantics(true) .setTable(tableSpec) .setUseCdcWrites(true) @@ -304,9 +300,9 @@ public void testCDCWrites() throws Exception { @Test public void testCDCWriteToDynamicDestinations() throws Exception { List primaryKeyColumns = ImmutableList.of("name"); - String dynamic = BigQueryStorageWriteApiSchemaTransformProvider.DYNAMIC_DESTINATIONS; - BigQueryStorageWriteApiSchemaTransformConfiguration config = - BigQueryStorageWriteApiSchemaTransformConfiguration.builder() + String dynamic = BigQueryWriteConfiguration.DYNAMIC_DESTINATIONS; + BigQueryWriteConfiguration config = + BigQueryWriteConfiguration.builder() .setUseAtLeastOnceSemantics(true) .setTable(dynamic) .setUseCdcWrites(true) @@ -338,8 +334,8 @@ public void testCDCWriteToDynamicDestinations() throws Exception { @Test public void testInputElementCount() throws Exception { String tableSpec = "project:dataset.input_count"; - BigQueryStorageWriteApiSchemaTransformConfiguration config = - BigQueryStorageWriteApiSchemaTransformConfiguration.builder().setTable(tableSpec).build(); + BigQueryWriteConfiguration config = + BigQueryWriteConfiguration.builder().setTable(tableSpec).build(); runWithConfig(config); PipelineResult result = p.run(); @@ -368,13 +364,11 @@ public void testInputElementCount() throws Exception { @Test public void testFailedRows() throws Exception { String tableSpec = "project:dataset.write_with_fail"; - BigQueryStorageWriteApiSchemaTransformConfiguration config = - BigQueryStorageWriteApiSchemaTransformConfiguration.builder() + BigQueryWriteConfiguration config = + BigQueryWriteConfiguration.builder() .setTable(tableSpec) .setErrorHandling( - BigQueryStorageWriteApiSchemaTransformConfiguration.ErrorHandling.builder() - .setOutput("FailedRows") - .build()) + BigQueryWriteConfiguration.ErrorHandling.builder().setOutput("FailedRows").build()) .build(); String failValue = "fail_me"; @@ -420,13 +414,11 @@ public void testFailedRows() throws Exception { @Test public void testErrorCount() throws Exception { String tableSpec = "project:dataset.error_count"; - BigQueryStorageWriteApiSchemaTransformConfiguration config = - BigQueryStorageWriteApiSchemaTransformConfiguration.builder() + BigQueryWriteConfiguration config = + BigQueryWriteConfiguration.builder() .setTable(tableSpec) .setErrorHandling( - BigQueryStorageWriteApiSchemaTransformConfiguration.ErrorHandling.builder() - .setOutput("FailedRows") - .build()) + BigQueryWriteConfiguration.ErrorHandling.builder().setOutput("FailedRows").build()) .build(); Function shouldFailRow = diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index 8477726686ee..adc7fc7e2684 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -86,17 +86,20 @@ public class Managed { // TODO: Dynamically generate a list of supported transforms public static final String ICEBERG = "iceberg"; public static final String KAFKA = "kafka"; + public static final String BIGQUERY = "bigquery"; // Supported SchemaTransforms public static final Map READ_TRANSFORMS = ImmutableMap.builder() .put(ICEBERG, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_READ)) .put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ)) + .put(BIGQUERY, getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ)) .build(); public static final Map WRITE_TRANSFORMS = ImmutableMap.builder() .put(ICEBERG, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_WRITE)) .put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE)) + .put(BIGQUERY, getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE)) .build(); /** diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index 6f97983d3260..6ca883c96698 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -117,7 +117,7 @@ protected void validate() { "Please specify a config or a config URL, but not both."); } - public @Nullable String resolveUnderlyingConfig() { + private Map resolveUnderlyingConfig() { String yamlTransformConfig = getConfig(); // If YAML string is empty, then attempt to read from YAML file if (Strings.isNullOrEmpty(yamlTransformConfig)) { @@ -131,7 +131,8 @@ protected void validate() { throw new RuntimeException(e); } } - return yamlTransformConfig; + + return YamlUtils.yamlStringToMap(yamlTransformConfig); } } @@ -152,24 +153,24 @@ protected SchemaTransform from(ManagedConfig managedConfig) { static class ManagedSchemaTransform extends SchemaTransform { private final ManagedConfig managedConfig; - private final Row underlyingTransformConfig; + private final Row underlyingRowConfig; private final SchemaTransformProvider underlyingTransformProvider; ManagedSchemaTransform( ManagedConfig managedConfig, SchemaTransformProvider underlyingTransformProvider) { // parse config before expansion to check if it matches underlying transform's config schema Schema transformConfigSchema = underlyingTransformProvider.configurationSchema(); - Row underlyingTransformConfig; + Row underlyingRowConfig; try { - underlyingTransformConfig = getRowConfig(managedConfig, transformConfigSchema); + underlyingRowConfig = getRowConfig(managedConfig, transformConfigSchema); } catch (Exception e) { throw new IllegalArgumentException( "Encountered an error when retrieving a Row configuration", e); } - this.managedConfig = managedConfig; - this.underlyingTransformConfig = underlyingTransformConfig; + this.underlyingRowConfig = underlyingRowConfig; this.underlyingTransformProvider = underlyingTransformProvider; + this.managedConfig = managedConfig; } @Override @@ -177,9 +178,9 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { LOG.debug( "Building transform \"{}\" with Row configuration: {}", underlyingTransformProvider.identifier(), - underlyingTransformConfig); + underlyingRowConfig); - return input.apply(underlyingTransformProvider.from(underlyingTransformConfig)); + return input.apply(underlyingTransformProvider.from(underlyingRowConfig)); } public ManagedConfig getManagedConfig() { @@ -201,16 +202,14 @@ Row getConfigurationRow() { } } + // May return an empty row (perhaps the underlying transform doesn't have any required + // parameters) @VisibleForTesting static Row getRowConfig(ManagedConfig config, Schema transformSchema) { - // May return an empty row (perhaps the underlying transform doesn't have any required - // parameters) - String yamlConfig = config.resolveUnderlyingConfig(); - Map configMap = YamlUtils.yamlStringToMap(yamlConfig); - - // The config Row object will be used to build the underlying SchemaTransform. - // If a mapping for the SchemaTransform exists, we use it to update parameter names and align - // with the underlying config schema + Map configMap = config.resolveUnderlyingConfig(); + // Build a config Row that will be used to build the underlying SchemaTransform. + // If a mapping for the SchemaTransform exists, we use it to update parameter names to align + // with the underlying SchemaTransform config schema Map mapping = MAPPINGS.get(config.getTransformIdentifier()); if (mapping != null && configMap != null) { Map remappedConfig = new HashMap<>(); diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java index 4cf752747be5..30476a30d373 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java @@ -50,9 +50,27 @@ public class ManagedTransformConstants { private static final Map KAFKA_WRITE_MAPPINGS = ImmutableMap.builder().put("data_format", "format").build(); + private static final Map BIGQUERY_READ_MAPPINGS = + ImmutableMap.builder() + .put("table", "table_spec") + .put("fields", "selected_fields") + .build(); + + private static final Map BIGQUERY_WRITE_MAPPINGS = + ImmutableMap.builder() + .put("at_least_once", "use_at_least_once_semantics") + .put("triggering_frequency", "triggering_frequency_seconds") + .build(); + public static final Map> MAPPINGS = ImmutableMap.>builder() .put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ), KAFKA_READ_MAPPINGS) .put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE), KAFKA_WRITE_MAPPINGS) + .put( + getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ), + BIGQUERY_READ_MAPPINGS) + .put( + getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE), + BIGQUERY_WRITE_MAPPINGS) .build(); } diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java index e9edf8751e34..a287ec6260ce 100644 --- a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java +++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java @@ -88,8 +88,7 @@ public void testGetConfigRowFromYamlFile() throws URISyntaxException { .withFieldValue("extra_integer", 123) .build(); Row configRow = - ManagedSchemaTransformProvider.getRowConfig( - config, new TestSchemaTransformProvider().configurationSchema()); + ManagedSchemaTransformProvider.getRowConfig(config, TestSchemaTransformProvider.SCHEMA); assertEquals(expectedRow, configRow); }