Skip to content

Commit

Permalink
Fixes a transform upgrade compatibility issue related to BigqueryIO
Browse files Browse the repository at this point in the history
  • Loading branch information
chamikaramj committed Sep 26, 2024
1 parent 80960d1 commit 4d254a0
Showing 1 changed file with 21 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -751,18 +752,27 @@ public Write<?> fromConfigRow(Row configRow, PipelineOptions options) {
if (numStorageWriteApiStreams != null) {
builder = builder.setNumStorageWriteApiStreams(numStorageWriteApiStreams);
}
Boolean propagateSuccessfulStorageApiWrites =
configRow.getBoolean("propagate_successful_storage_api_writes");
if (propagateSuccessfulStorageApiWrites != null) {
builder =
builder.setPropagateSuccessfulStorageApiWrites(propagateSuccessfulStorageApiWrites);
}
byte[] predicate = configRow.getBytes("propagate_successful_storage_api_writes_predicate");
if (predicate != null) {
builder =
builder.setPropagateSuccessfulStorageApiWritesPredicate(
(Predicate<String>) fromByteArray(predicate));

if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.60.0") >= 0) {
Boolean propagateSuccessfulStorageApiWrites =
configRow.getBoolean("propagate_successful_storage_api_writes");
if (propagateSuccessfulStorageApiWrites != null) {
builder =
builder.setPropagateSuccessfulStorageApiWrites(propagateSuccessfulStorageApiWrites);
}

byte[] predicate =
configRow.getBytes("propagate_successful_storage_api_writes_predicate");
if (predicate != null) {
builder =
builder.setPropagateSuccessfulStorageApiWritesPredicate(
(Predicate<String>) fromByteArray(predicate));
}
} else {
builder.setPropagateSuccessfulStorageApiWrites(false);
builder.setPropagateSuccessfulStorageApiWritesPredicate(Predicates.alwaysTrue());
}

Integer maxFilesPerPartition = configRow.getInt32("max_files_per_partition");
if (maxFilesPerPartition != null) {
builder = builder.setMaxFilesPerPartition(maxFilesPerPartition);
Expand Down

0 comments on commit 4d254a0

Please sign in to comment.