Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Managed BigQueryIO #31486

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdks/java/io/google-cloud-platform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ dependencies {
testImplementation project(path: ":sdks:java:extensions:google-cloud-platform-core", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:extensions:protobuf", configuration: "testRuntimeMigration")
testImplementation project(path: ":runners:direct-java", configuration: "shadow")
testImplementation project(":sdks:java:managed")
testImplementation project(path: ":sdks:java:io:common")
testImplementation project(path: ":sdks:java:testing:test-utils")
testImplementation library.java.commons_math3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ dependencies {
permitUnusedDeclared project(":sdks:java:io:google-cloud-platform") // BEAM-11761
implementation project(":sdks:java:extensions:schemaio-expansion-service")
permitUnusedDeclared project(":sdks:java:extensions:schemaio-expansion-service") // BEAM-11761
implementation project(":sdks:java:managed")
permitUnusedDeclared project(":sdks:java:managed") // BEAM-11761
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notes, no action required on this PR:

  • This is a link to Jira, so probably there's a github issue it is migrated to
  • This should be equivalent to runtimeOnly because it is "implementation" but no static references to it. I would guess this works the same, or else the uberjar plugin might not treat it right.
  • Putting these deps into a docker container without making an uber jar would honestly be better in the case where it does end up in a container, so we keep the original jar metadata.


runtimeOnly library.java.slf4j_jdk14
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,26 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryWriteConfiguration;
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;

/**
* An implementation of {@link TypedSchemaTransformProvider} for BigQuery write jobs configured
* using {@link BigQueryFileLoadsWriteSchemaTransformConfiguration}.
* using {@link BigQueryWriteConfiguration}.
*
* <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
* provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
Expand All @@ -56,201 +48,84 @@
@Internal
@AutoService(SchemaTransformProvider.class)
public class BigQueryFileLoadsWriteSchemaTransformProvider
extends TypedSchemaTransformProvider<BigQueryFileLoadsWriteSchemaTransformConfiguration> {
extends TypedSchemaTransformProvider<BigQueryWriteConfiguration> {

private static final String IDENTIFIER =
"beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1";
static final String INPUT_TAG = "INPUT";

/** Returns the expected class of the configuration. */
@Override
protected Class<BigQueryFileLoadsWriteSchemaTransformConfiguration> configurationClass() {
return BigQueryFileLoadsWriteSchemaTransformConfiguration.class;
}
"beam:schematransform:org.apache.beam:bigquery_fileloads:v1";
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
static final String INPUT_TAG = "input";

/** Returns the expected {@link SchemaTransform} of the configuration. */
@Override
protected SchemaTransform from(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) {
protected SchemaTransform from(BigQueryWriteConfiguration configuration) {
return new BigQueryWriteSchemaTransform(configuration);
}

/** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
@Override
public String identifier() {
return IDENTIFIER;
}

/**
* Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since a
* single is expected, this returns a list with a single name.
*/
@Override
public List<String> inputCollectionNames() {
return Collections.singletonList(INPUT_TAG);
}

/**
* Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
* no output is expected, this returns an empty list.
*/
@Override
public List<String> outputCollectionNames() {
return Collections.emptyList();
}

/**
* A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based on a {@link
* BigQueryFileLoadsWriteSchemaTransformConfiguration}.
*/
protected static class BigQueryWriteSchemaTransform extends SchemaTransform {
/** An instance of {@link BigQueryServices} used for testing. */
private BigQueryServices testBigQueryServices = null;

private final BigQueryFileLoadsWriteSchemaTransformConfiguration configuration;
private final BigQueryWriteConfiguration configuration;
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved

BigQueryWriteSchemaTransform(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) {
BigQueryWriteSchemaTransform(BigQueryWriteConfiguration configuration) {
configuration.validate();
this.configuration = configuration;
}

@Override
public void validate(PipelineOptions options) {
if (!configuration.getCreateDisposition().equals(CreateDisposition.CREATE_NEVER.name())) {
return;
}
public PCollectionRowTuple expand(PCollectionRowTuple input) {
PCollection<Row> rowPCollection = input.getSinglePCollection();
BigQueryIO.Write<Row> write = toWrite();
rowPCollection.apply(write);

BigQueryOptions bigQueryOptions = options.as(BigQueryOptions.class);
return PCollectionRowTuple.empty(input.getPipeline());
}

BigQueryServices bigQueryServices = new BigQueryServicesImpl();
if (testBigQueryServices != null) {
bigQueryServices = testBigQueryServices;
BigQueryIO.Write<Row> toWrite() {
BigQueryIO.Write<Row> write =
BigQueryIO.<Row>write()
.to(configuration.getTable())
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withFormatFunction(BigQueryUtils.toTableRow())
.useBeamSchema();

if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
CreateDisposition createDisposition =
CreateDisposition.valueOf(configuration.getCreateDisposition().toUpperCase());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a larger point, I think we should do any transform overriding in job submission (BQ modes for batch/streaming etc.) so that we can just upgrade in the backend (at least in the first version).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean making this switch in the SDK (ie. construction time)? I assumed we had settled on making it a runner side decision

Some decisions are actually dependent on the runner (e.g. at least one streaming mode in Dataflow)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean making this switch in the SDK (ie. construction time)? I assumed we had settled on making it a runner side decision

Yeah. Added some comments to the relavent doc.

write = write.withCreateDisposition(createDisposition);
}

DatasetService datasetService = bigQueryServices.getDatasetService(bigQueryOptions);
TableReference tableReference = BigQueryUtils.toTableReference(configuration.getTableSpec());

try {
Table table = datasetService.getTable(tableReference);
if (table == null) {
throw new NullPointerException();
}

if (table.getSchema() == null) {
throw new InvalidConfigurationException(
String.format("could not fetch schema for table: %s", configuration.getTableSpec()));
}

} catch (NullPointerException | InterruptedException | IOException ex) {
throw new InvalidConfigurationException(
String.format(
"could not fetch table %s, error: %s",
configuration.getTableSpec(), ex.getMessage()));
if (!Strings.isNullOrEmpty(configuration.getWriteDisposition())) {
WriteDisposition writeDisposition =
WriteDisposition.valueOf(configuration.getWriteDisposition().toUpperCase());
write = write.withWriteDisposition(writeDisposition);
}
if (!Strings.isNullOrEmpty(configuration.getKmsKey())) {
write = write.withKmsKey(configuration.getKmsKey());
}
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
validate(input);
PCollection<Row> rowPCollection = input.get(INPUT_TAG);
Schema schema = rowPCollection.getSchema();
BigQueryIO.Write<TableRow> write = toWrite(schema);
if (testBigQueryServices != null) {
write = write.withTestServices(testBigQueryServices);
}

PCollection<TableRow> tableRowPCollection =
rowPCollection.apply(
MapElements.into(TypeDescriptor.of(TableRow.class)).via(BigQueryUtils::toTableRow));
tableRowPCollection.apply(write);
return PCollectionRowTuple.empty(input.getPipeline());
}

/** Instantiates a {@link BigQueryIO.Write<TableRow>} from a {@link Schema}. */
BigQueryIO.Write<TableRow> toWrite(Schema schema) {
TableSchema tableSchema = BigQueryUtils.toTableSchema(schema);
CreateDisposition createDisposition =
CreateDisposition.valueOf(configuration.getCreateDisposition());
WriteDisposition writeDisposition =
WriteDisposition.valueOf(configuration.getWriteDisposition());

return BigQueryIO.writeTableRows()
.to(configuration.getTableSpec())
.withCreateDisposition(createDisposition)
.withWriteDisposition(writeDisposition)
.withSchema(tableSchema);
return write;
}

/** Setter for testing using {@link BigQueryServices}. */
@VisibleForTesting
void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
this.testBigQueryServices = testBigQueryServices;
}

/** Validate a {@link PCollectionRowTuple} input. */
void validate(PCollectionRowTuple input) {
if (!input.has(INPUT_TAG)) {
throw new IllegalArgumentException(
String.format(
"%s %s is missing expected tag: %s",
getClass().getSimpleName(), input.getClass().getSimpleName(), INPUT_TAG));
}

PCollection<Row> rowInput = input.get(INPUT_TAG);
Schema sourceSchema = rowInput.getSchema();

if (sourceSchema == null) {
throw new IllegalArgumentException(
String.format("%s is null for input of tag: %s", Schema.class, INPUT_TAG));
}

if (!configuration.getCreateDisposition().equals(CreateDisposition.CREATE_NEVER.name())) {
return;
}

BigQueryOptions bigQueryOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);

BigQueryServices bigQueryServices = new BigQueryServicesImpl();
if (testBigQueryServices != null) {
bigQueryServices = testBigQueryServices;
}

DatasetService datasetService = bigQueryServices.getDatasetService(bigQueryOptions);
TableReference tableReference = BigQueryUtils.toTableReference(configuration.getTableSpec());

try {
Table table = datasetService.getTable(tableReference);
if (table == null) {
throw new NullPointerException();
}

TableSchema tableSchema = table.getSchema();
if (tableSchema == null) {
throw new NullPointerException();
}

Schema destinationSchema = BigQueryUtils.fromTableSchema(tableSchema);
if (destinationSchema == null) {
throw new NullPointerException();
}

validateMatching(sourceSchema, destinationSchema);

} catch (NullPointerException | InterruptedException | IOException e) {
throw new InvalidConfigurationException(
String.format(
"could not validate input for create disposition: %s and table: %s, error: %s",
configuration.getCreateDisposition(),
configuration.getTableSpec(),
e.getMessage()));
}
}

void validateMatching(Schema sourceSchema, Schema destinationSchema) {
if (!sourceSchema.equals(destinationSchema)) {
throw new IllegalArgumentException(
String.format(
"source and destination schema mismatch for table: %s",
configuration.getTableSpec()));
}
}
}
}
Loading
Loading