Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support withFormatRecordOnFailureFunction() for BigQuery STORAGE_WRITE_API and STORAGE_API_AT_LEAST_ONCE methods #31659

Merged
merged 2 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Go SDK Minimum Go Version updated to 1.21 ([#32092](https://github.com/apache/beam/pull/32092)).
* [BigQueryIO] Added support for withFormatRecordOnFailureFunction() for STORAGE_WRITE_API and STORAGE_API_AT_LEAST_ONCE methods (Java) ([#31354](https://github.com/apache/beam/issues/31354)).
* Updated Go protobuf package to new version (Go) ([#21515](https://github.com/apache/beam/issues/21515)).

## Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2711,9 +2711,14 @@ public Write<T> withFormatFunction(SerializableFunction<T, TableRow> formatFunct
}

/**
* If an insert failure occurs, this function is applied to the originally supplied row T. The
* resulting {@link TableRow} will be accessed via {@link
* WriteResult#getFailedInsertsWithErr()}.
* If an insert failure occurs, this function is applied to the originally supplied T element.
*
* <p>For {@link Method#STREAMING_INSERTS} method, the resulting {@link TableRow} will be
* accessed via {@link WriteResult#getFailedInsertsWithErr()}.
*
* <p>For {@link Method#STORAGE_WRITE_API} and {@link Method#STORAGE_API_AT_LEAST_ONCE} methods,
* the resulting {@link TableRow} will be accessed via {@link
* WriteResult#getFailedStorageApiInserts()}.
*/
public Write<T> withFormatRecordOnFailureFunction(
SerializableFunction<T, TableRow> formatFunction) {
Expand Down Expand Up @@ -3773,6 +3778,7 @@ private <DestinationT> WriteResult continueExpandTyped(
dynamicDestinations,
elementSchema,
elementToRowFunction,
getFormatRecordOnFailureFunction(),
getRowMutationInformationFn() != null);
} else if (getWriteProtosClass() != null && getDirectWriteProtos()) {
// We could support both of these by falling back to
Expand All @@ -3795,7 +3801,9 @@ private <DestinationT> WriteResult continueExpandTyped(
storageApiDynamicDestinations =
(StorageApiDynamicDestinations<T, DestinationT>)
new StorageApiDynamicDestinationsProto(
dynamicDestinations, getWriteProtosClass());
dynamicDestinations,
getWriteProtosClass(),
getFormatRecordOnFailureFunction());
} else if (getAvroRowWriterFactory() != null) {
// we can configure the avro to storage write api proto converter for this
// assuming the format function returns an Avro GenericRecord
Expand All @@ -3818,6 +3826,7 @@ private <DestinationT> WriteResult continueExpandTyped(
dynamicDestinations,
avroSchemaFactory,
recordWriterFactory.getToAvroFn(),
getFormatRecordOnFailureFunction(),
getRowMutationInformationFn() != null);
} else {
RowWriterFactory.TableRowWriterFactory<T, DestinationT> tableRowWriterFactory =
Expand All @@ -3827,6 +3836,7 @@ private <DestinationT> WriteResult continueExpandTyped(
new StorageApiDynamicDestinationsTableRow<>(
dynamicDestinations,
tableRowWriterFactory.getToRowFn(),
getFormatRecordOnFailureFunction(),
getRowMutationInformationFn() != null,
getCreateDisposition(),
getIgnoreUnknownValues(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
Expand All @@ -42,6 +43,8 @@ abstract static class Value {
abstract ProtoRows getProtoRows();

abstract List<Instant> getTimestamps();

abstract List<@Nullable TableRow> getFailsafeTableRows();
}

interface ConvertUnknownFields {
Expand Down Expand Up @@ -96,11 +99,18 @@ public Value next() {
}

List<Instant> timestamps = Lists.newArrayList();
List<@Nullable TableRow> failsafeRows = Lists.newArrayList();
ProtoRows.Builder inserts = ProtoRows.newBuilder();
long bytesSize = 0;
while (underlyingIterator.hasNext()) {
StorageApiWritePayload payload = underlyingIterator.next();
ByteString byteString = ByteString.copyFrom(payload.getPayload());
@Nullable TableRow failsafeTableRow = null;
try {
failsafeTableRow = payload.getFailsafeTableRow();
} catch (IOException e) {
// Do nothing, table row will be generated later from row bytes
}
if (autoUpdateSchema) {
try {
@Nullable TableRow unknownFields = payload.getUnknownFields();
Expand All @@ -116,7 +126,10 @@ public Value next() {
// This generally implies that ignoreUnknownValues=false and there were still
// unknown values here.
// Reconstitute the TableRow and send it to the failed-rows consumer.
TableRow tableRow = protoToTableRow.apply(byteString);
TableRow tableRow =
failsafeTableRow != null
? failsafeTableRow
: protoToTableRow.apply(byteString);
// TODO(24926, reuvenlax): We need to merge the unknown fields in! Currently we
// only execute this
// codepath when ignoreUnknownFields==true, so we should never hit this codepath.
Expand All @@ -142,12 +155,13 @@ public Value next() {
timestamp = elementsTimestamp;
}
timestamps.add(timestamp);
failsafeRows.add(failsafeTableRow);
bytesSize += byteString.size();
if (bytesSize > splitSize) {
break;
}
}
return new AutoValue_SplittingIterable_Value(inserts.build(), timestamps);
return new AutoValue_SplittingIterable_Value(inserts.build(), timestamps, failsafeRows);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,17 @@ public void processElement(
.withTimestamp(timestamp);
o.get(successfulWritesTag).output(KV.of(element.getKey(), payload));
} catch (TableRowToStorageApiProto.SchemaConversionException conversionException) {
TableRow tableRow;
TableRow failsafeTableRow;
try {
tableRow = messageConverter.toTableRow(element.getValue());
failsafeTableRow = messageConverter.toFailsafeTableRow(element.getValue());
} catch (Exception e) {
badRecordRouter.route(o, element, elementCoder, e, "Unable to convert value to TableRow");
return;
}
o.get(failedWritesTag)
.output(new BigQueryStorageApiInsertError(tableRow, conversionException.toString()));
.output(
new BigQueryStorageApiInsertError(
failsafeTableRow, conversionException.toString()));
} catch (Exception e) {
badRecordRouter.route(
o, element, elementCoder, e, "Unable to convert value to StorageWriteApiPayload");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface MessageConverter<T> {
StorageApiWritePayload toMessage(
T element, @Nullable RowMutationInformation rowMutationInformation) throws Exception;

TableRow toTableRow(T element);
TableRow toFailsafeTableRow(T element);
}

StorageApiDynamicDestinations(DynamicDestinations<T, DestinationT> inner) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,20 @@ class StorageApiDynamicDestinationsBeamRow<T, DestinationT extends @NonNull Obje
extends StorageApiDynamicDestinations<T, DestinationT> {
private final TableSchema tableSchema;
private final SerializableFunction<T, Row> toRow;
private final @Nullable SerializableFunction<T, TableRow> formatRecordOnFailureFunction;

private final boolean usesCdc;

StorageApiDynamicDestinationsBeamRow(
DynamicDestinations<T, DestinationT> inner,
Schema schema,
SerializableFunction<T, Row> toRow,
@Nullable SerializableFunction<T, TableRow> formatRecordOnFailureFunction,
boolean usesCdc) {
super(inner);
this.tableSchema = BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(schema);
this.toRow = toRow;
this.formatRecordOnFailureFunction = formatRecordOnFailureFunction;
this.usesCdc = usesCdc;
}

Expand Down Expand Up @@ -96,12 +99,19 @@ public StorageApiWritePayload toMessage(
Message msg =
BeamRowToStorageApiProto.messageFromBeamRow(
descriptorToUse, toRow.apply(element), changeType, changeSequenceNum);
return StorageApiWritePayload.of(msg.toByteArray(), null);
return StorageApiWritePayload.of(
msg.toByteArray(),
null,
formatRecordOnFailureFunction != null ? toFailsafeTableRow(element) : null);
}

@Override
public TableRow toTableRow(T element) {
return BigQueryUtils.toTableRow(toRow.apply(element));
public TableRow toFailsafeTableRow(T element) {
if (formatRecordOnFailureFunction != null) {
return formatRecordOnFailureFunction.apply(element);
} else {
return BigQueryUtils.toTableRow(toRow.apply(element));
}
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,21 @@ class StorageApiDynamicDestinationsGenericRecord<T, DestinationT extends @NonNul

private final SerializableFunction<AvroWriteRequest<T>, GenericRecord> toGenericRecord;
private final SerializableFunction<@Nullable TableSchema, Schema> schemaFactory;
private final @javax.annotation.Nullable SerializableFunction<T, TableRow>
formatRecordOnFailureFunction;

private boolean usesCdc;

StorageApiDynamicDestinationsGenericRecord(
DynamicDestinations<T, DestinationT> inner,
SerializableFunction<@Nullable TableSchema, Schema> schemaFactory,
SerializableFunction<AvroWriteRequest<T>, GenericRecord> toGenericRecord,
@Nullable SerializableFunction<T, TableRow> formatRecordOnFailureFunction,
boolean usesCdc) {
super(inner);
this.toGenericRecord = toGenericRecord;
this.schemaFactory = schemaFactory;
this.formatRecordOnFailureFunction = formatRecordOnFailureFunction;
this.usesCdc = usesCdc;
}

Expand Down Expand Up @@ -96,13 +101,20 @@ public StorageApiWritePayload toMessage(
toGenericRecord.apply(new AvroWriteRequest<>(element, avroSchema)),
changeType,
changeSequenceNum);
return StorageApiWritePayload.of(msg.toByteArray(), null);
return StorageApiWritePayload.of(
msg.toByteArray(),
null,
formatRecordOnFailureFunction != null ? toFailsafeTableRow(element) : null);
}

@Override
public TableRow toTableRow(T element) {
return BigQueryUtils.convertGenericRecordToTableRow(
toGenericRecord.apply(new AvroWriteRequest<>(element, avroSchema)), bqTableSchema);
public TableRow toFailsafeTableRow(T element) {
if (formatRecordOnFailureFunction != null) {
return formatRecordOnFailureFunction.apply(element);
} else {
return BigQueryUtils.convertGenericRecordToTableRow(
toGenericRecord.apply(new AvroWriteRequest<>(element, avroSchema)), bqTableSchema);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,29 @@
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import java.lang.reflect.InvocationTargetException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.Preconditions;
import org.checkerframework.checker.nullness.qual.NonNull;

/** Storage API DynamicDestinations used when the input is a compiled protocol buffer. */
class StorageApiDynamicDestinationsProto<T extends Message, DestinationT extends @NonNull Object>
extends StorageApiDynamicDestinations<T, DestinationT> {
DescriptorProtos.DescriptorProto descriptorProto;
private final DescriptorProtos.DescriptorProto descriptorProto;
private final @Nullable SerializableFunction<T, TableRow> formatRecordOnFailureFunction;

@SuppressWarnings({"unchecked", "nullness"})
StorageApiDynamicDestinationsProto(
DynamicDestinations<T, DestinationT> inner, Class<T> protoClass) {
DynamicDestinations<T, DestinationT> inner,
Class<T> protoClass,
@Nullable SerializableFunction<T, TableRow> formatRecordOnFailureFunction) {
super(inner);
try {
this.formatRecordOnFailureFunction = formatRecordOnFailureFunction;
this.descriptorProto =
fixNestedTypes(
(Descriptors.Descriptor)
Expand Down Expand Up @@ -84,12 +90,27 @@ public StorageApiWritePayload toMessage(
// we can forward
// the through directly. This means that we don't currently support ignoreUnknownValues or
// autoUpdateSchema.
return StorageApiWritePayload.of(element.toByteArray(), null);
return StorageApiWritePayload.of(
element.toByteArray(),
null,
formatRecordOnFailureFunction != null ? toFailsafeTableRow(element) : null);
}

@Override
public TableRow toTableRow(T element) {
throw new RuntimeException("Not implemented!");
public TableRow toFailsafeTableRow(T element) {
if (formatRecordOnFailureFunction != null) {
return formatRecordOnFailureFunction.apply(element);
} else {
try {
return TableRowToStorageApiProto.tableRowFromMessage(
DynamicMessage.parseFrom(
TableRowToStorageApiProto.wrapDescriptorProto(descriptorProto),
element.toByteArray()),
true);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
public class StorageApiDynamicDestinationsTableRow<T, DestinationT extends @NonNull Object>
extends StorageApiDynamicDestinations<T, DestinationT> {
private final SerializableFunction<T, TableRow> formatFunction;
private final @Nullable SerializableFunction<T, TableRow> formatRecordOnFailureFunction;

private final boolean usesCdc;
private final CreateDisposition createDisposition;
Expand All @@ -51,12 +52,14 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT extends @NonN
StorageApiDynamicDestinationsTableRow(
DynamicDestinations<T, DestinationT> inner,
SerializableFunction<T, TableRow> formatFunction,
@Nullable SerializableFunction<T, TableRow> formatRecordOnFailureFunction,
boolean usesCdc,
CreateDisposition createDisposition,
boolean ignoreUnknownValues,
boolean autoSchemaUpdates) {
super(inner);
this.formatFunction = formatFunction;
this.formatRecordOnFailureFunction = formatRecordOnFailureFunction;
this.usesCdc = usesCdc;
this.createDisposition = createDisposition;
this.ignoreUnknownValues = ignoreUnknownValues;
Expand Down Expand Up @@ -151,8 +154,12 @@ public DescriptorProtos.DescriptorProto getDescriptor(boolean includeCdcColumns)
}

@Override
public TableRow toTableRow(T element) {
return formatFunction.apply(element);
public TableRow toFailsafeTableRow(T element) {
if (formatRecordOnFailureFunction != null) {
return formatRecordOnFailureFunction.apply(element);
} else {
return formatFunction.apply(element);
}
}

@Override
Expand Down Expand Up @@ -183,7 +190,10 @@ public StorageApiWritePayload toMessage(
unknownFields,
changeType,
changeSequenceNum);
return StorageApiWritePayload.of(msg.toByteArray(), unknownFields);
return StorageApiWritePayload.of(
msg.toByteArray(),
unknownFields,
formatRecordOnFailureFunction != null ? toFailsafeTableRow(element) : null);
}
};
}
Loading
Loading