Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(conector-node): do not store sink row inside upsert iceberg sink #8625

Merged
merged 1 commit into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.grpc.Status;
import java.util.List;
import java.util.TreeMap;
import org.apache.iceberg.data.Record;

public class SinkRowMap {
TreeMap<List<Comparable<Object>>, SinkRowOp> map = new TreeMap<>(new PkComparator());
Expand All @@ -27,7 +28,7 @@ public void clear() {
map.clear();
}

public void insert(List<Comparable<Object>> key, SinkRow row) {
public void insert(List<Comparable<Object>> key, Record row) {
if (!map.containsKey(key)) {
map.put(key, SinkRowOp.insertOp(row));
} else {
Expand All @@ -42,19 +43,20 @@ public void insert(List<Comparable<Object>> key, SinkRow row) {
}
}

public void delete(List<Comparable<Object>> key, SinkRow row) {
public void delete(List<Comparable<Object>> key, Record row) {
if (!map.containsKey(key)) {
map.put(key, SinkRowOp.deleteOp(row));
} else {
SinkRowOp sinkRowOp = map.get(key);
SinkRow insert = sinkRowOp.getInsert();
Record insert = sinkRowOp.getInsert();
if (insert == null) {
throw Status.FAILED_PRECONDITION
.withDescription("try to double delete a primary key")
.asRuntimeException();
}
assertRowValuesEqual(insert, row);
SinkRow delete = sinkRowOp.getDelete();
// TODO: may enable it again
// assertRowValuesEqual(insert, row);
Record delete = sinkRowOp.getDelete();
if (delete != null) {
map.put(key, SinkRowOp.deleteOp(delete));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@

package com.risingwave.connector;

import com.risingwave.connector.api.sink.SinkRow;
import io.grpc.Status;
import org.apache.iceberg.data.Record;

public class SinkRowOp {
private final SinkRow delete;
private final SinkRow insert;
private final Record delete;
private final Record insert;

public static SinkRowOp insertOp(SinkRow row) {
public static SinkRowOp insertOp(Record row) {
if (row == null) {
throw Status.FAILED_PRECONDITION
.withDescription("row op must not be null to initialize insertOp")
Expand All @@ -30,7 +30,7 @@ public static SinkRowOp insertOp(SinkRow row) {
return new SinkRowOp(null, row);
}

public static SinkRowOp deleteOp(SinkRow row) {
public static SinkRowOp deleteOp(Record row) {
if (row == null) {
throw Status.FAILED_PRECONDITION
.withDescription("row op must not be null to initialize deleteOp")
Expand All @@ -39,7 +39,7 @@ public static SinkRowOp deleteOp(SinkRow row) {
return new SinkRowOp(row, null);
}

public static SinkRowOp updateOp(SinkRow delete, SinkRow insert) {
public static SinkRowOp updateOp(Record delete, Record insert) {
if (delete == null || insert == null) {
throw Status.FAILED_PRECONDITION
.withDescription("row ops must not be null initialize updateOp")
Expand All @@ -48,7 +48,7 @@ public static SinkRowOp updateOp(SinkRow delete, SinkRow insert) {
return new SinkRowOp(delete, insert);
}

private SinkRowOp(SinkRow delete, SinkRow insert) {
private SinkRowOp(Record delete, Record insert) {
this.delete = delete;
this.insert = insert;
}
Expand All @@ -57,11 +57,11 @@ public boolean isDelete() {
return insert == null && delete != null;
}

public SinkRow getDelete() {
public Record getDelete() {
return delete;
}

public SinkRow getInsert() {
public Record getInsert() {
return insert;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public UpsertIcebergSink(
.collect(Collectors.toList());
}

private Record newRecord(Schema schema, SinkRow row) {
private static Record newRecord(Schema schema, SinkRow row) {
Record record = GenericRecord.create(schema);
for (int i = 0; i < schema.columns().size(); i++) {
record.set(i, row.get(i));
Expand Down Expand Up @@ -174,10 +174,10 @@ public void write(Iterator<SinkRow> rows) {
}
switch (row.getOp()) {
case INSERT:
sinkRowMap.insert(getKeyFromRow(row), row);
sinkRowMap.insert(getKeyFromRow(row), newRecord(rowSchema, row));
break;
case DELETE:
sinkRowMap.delete(getKeyFromRow(row), row);
sinkRowMap.delete(getKeyFromRow(row), newRecord(deleteRowSchema, row));
break;
case UPDATE_DELETE:
if (updateBufferExists) {
Expand All @@ -186,7 +186,7 @@ public void write(Iterator<SinkRow> rows) {
"an UPDATE_INSERT should precede an UPDATE_DELETE")
.asRuntimeException();
}
sinkRowMap.delete(getKeyFromRow(row), row);
sinkRowMap.delete(getKeyFromRow(row), newRecord(deleteRowSchema, row));
updateBufferExists = true;
break;
case UPDATE_INSERT:
Expand All @@ -196,7 +196,7 @@ public void write(Iterator<SinkRow> rows) {
"an UPDATE_INSERT should precede an UPDATE_DELETE")
.asRuntimeException();
}
sinkRowMap.insert(getKeyFromRow(row), row);
sinkRowMap.insert(getKeyFromRow(row), newRecord(rowSchema, row));
updateBufferExists = false;
break;
default:
Expand All @@ -217,13 +217,13 @@ public void sync() {
newEqualityDeleteWriter(entry.getKey());
DataWriter<Record> dataWriter = newDataWriter(entry.getKey());
for (SinkRowOp sinkRowOp : entry.getValue().map.values()) {
SinkRow insert = sinkRowOp.getInsert();
SinkRow delete = sinkRowOp.getDelete();
Record insert = sinkRowOp.getInsert();
Record delete = sinkRowOp.getDelete();
if (insert != null) {
dataWriter.write(newRecord(rowSchema, insert));
dataWriter.write(insert);
}
if (delete != null) {
equalityDeleteWriter.write(newRecord(deleteRowSchema, delete));
equalityDeleteWriter.write(delete);
}
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import com.risingwave.proto.Data;
import java.util.ArrayList;
import java.util.List;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -31,29 +35,42 @@ public void testInsert() {
SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1);
List<Comparable<Object>> key = new ArrayList<>();
key.add((Comparable<Object>) row.get(0));
Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get()));
Record r = GenericRecord.create(schema);
r.set(0, row.get(0));

sinkRowMap.insert(key, row);
sinkRowMap.insert(key, r);
assertEquals(1, sinkRowMap.map.size());
assertEquals(null, sinkRowMap.map.get(key).getDelete());
assertEquals(row, sinkRowMap.map.get(key).getInsert());
assertEquals(r, sinkRowMap.map.get(key).getInsert());
}

@Test
public void testInsertAfterDelete() {
SinkRowMap sinkRowMap = new SinkRowMap();
Schema schema =
new Schema(
Types.NestedField.optional(0, "id", Types.IntegerType.get()),
Types.NestedField.optional(1, "name", Types.StringType.get()));

SinkRow row1 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Alice");
List<Comparable<Object>> key1 = new ArrayList<>();
key1.add((Comparable<Object>) row1.get(0));
Record r1 = GenericRecord.create(schema);
r1.set(0, row1.get(0));
r1.set(1, row1.get(1));
SinkRow row2 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Bob");
List<Comparable<Object>> key2 = new ArrayList<>();
key2.add((Comparable<Object>) row2.get(0));
Record r2 = GenericRecord.create(schema);
r2.set(0, row2.get(0));
r2.set(1, row2.get(1));

sinkRowMap.delete(key1, row1);
sinkRowMap.insert(key1, row2);
sinkRowMap.delete(key1, r1);
sinkRowMap.insert(key1, r2);
assertEquals(1, sinkRowMap.map.size());
assertEquals(row1, sinkRowMap.map.get(key1).getDelete());
assertEquals(row2, sinkRowMap.map.get(key1).getInsert());
assertEquals(r1, sinkRowMap.map.get(key1).getDelete());
assertEquals(r2, sinkRowMap.map.get(key1).getInsert());
}

@Test
Expand All @@ -62,11 +79,14 @@ public void testInsertAfterInsert() {
SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1);
List<Comparable<Object>> key = new ArrayList<>();
key.add((Comparable<Object>) row.get(0));
Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get()));
Record r = GenericRecord.create(schema);
r.set(0, row.get(0));

sinkRowMap.insert(key, row);
sinkRowMap.insert(key, r);
boolean exceptionThrown = false;
try {
sinkRowMap.insert(key, row);
sinkRowMap.insert(key, r);
} catch (RuntimeException e) {
exceptionThrown = true;
Assert.assertTrue(
Expand All @@ -87,10 +107,14 @@ public void testDelete() {
List<Comparable<Object>> key = new ArrayList<>();
key.add((Comparable<Object>) row.get(0));

sinkRowMap.delete(key, row);
Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get()));
Record r = GenericRecord.create(schema);
r.set(0, row.get(0));

sinkRowMap.delete(key, r);
assertEquals(1, sinkRowMap.map.size());
assertEquals(null, sinkRowMap.map.get(key).getInsert());
assertEquals(row, sinkRowMap.map.get(key).getDelete());
assertEquals(r, sinkRowMap.map.get(key).getDelete());
}

@Test
Expand All @@ -100,10 +124,14 @@ public void testDeleteAfterDelete() {
List<Comparable<Object>> key = new ArrayList<>();
key.add((Comparable<Object>) row.get(0));

sinkRowMap.delete(key, row);
Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get()));
Record r = GenericRecord.create(schema);
r.set(0, row.get(0));

sinkRowMap.delete(key, r);
boolean exceptionThrown = false;
try {
sinkRowMap.delete(key, row);
sinkRowMap.delete(key, r);
} catch (RuntimeException e) {
exceptionThrown = true;
Assert.assertTrue(
Expand All @@ -122,28 +150,44 @@ public void testDeleteAfterInsert() {
List<Comparable<Object>> key = new ArrayList<>();
key.add((Comparable<Object>) row.get(0));

sinkRowMap.insert(key, row);
sinkRowMap.delete(key, row);
Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get()));
Record r = GenericRecord.create(schema);
r.set(0, row.get(0));

sinkRowMap.insert(key, r);
sinkRowMap.delete(key, r);
assertEquals(0, sinkRowMap.map.size());
}

@Test
public void testDeleteAfterUpdate() {
SinkRowMap sinkRowMap = new SinkRowMap();

Schema schema =
new Schema(
Types.NestedField.optional(0, "id", Types.IntegerType.get()),
Types.NestedField.optional(1, "name", Types.StringType.get()));

SinkRow row1 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Alice");
List<Comparable<Object>> key1 = new ArrayList<>();
key1.add((Comparable<Object>) row1.get(0));
Record r1 = GenericRecord.create(schema);
r1.set(0, row1.get(0));
r1.set(1, row1.get(1));

SinkRow row2 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Clare");
List<Comparable<Object>> key2 = new ArrayList<>();
key2.add((Comparable<Object>) row2.get(0));
Record r2 = GenericRecord.create(schema);
r2.set(0, row2.get(0));
r2.set(1, row2.get(1));

sinkRowMap.delete(key1, row1);
sinkRowMap.insert(key2, row2);
sinkRowMap.delete(key2, row2);
sinkRowMap.delete(key1, r1);
sinkRowMap.insert(key2, r2);
sinkRowMap.delete(key2, r2);
assertEquals(1, sinkRowMap.map.size());
assertEquals(null, sinkRowMap.map.get(key1).getInsert());
assertEquals(row1, sinkRowMap.map.get(key1).getDelete());
assertEquals(r1, sinkRowMap.map.get(key1).getDelete());
}

@Test
Expand All @@ -153,7 +197,10 @@ public void testClear() {
SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1);
List<Comparable<Object>> key = new ArrayList<>();
key.add((Comparable<Object>) row.get(0));
sinkRowMap.insert(key, row);
Schema schema = new Schema(Types.NestedField.optional(0, "id", Types.IntegerType.get()));
Record r = GenericRecord.create(schema);
r.set(0, row.get(0));
sinkRowMap.insert(key, r);

sinkRowMap.clear();
assertEquals(0, sinkRowMap.map.size());
Expand Down