From 03adadceffa406e7beb52062814ecb6b8307faa3 Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Thu, 26 Sep 2024 22:55:32 +0800 Subject: [PATCH 1/6] GH-44065: [Java] Implement C Data Interface for RunEndEncodedVector --- .../arrow/c/BufferImportTypeVisitor.java | 2 +- .../main/java/org/apache/arrow/c/Format.java | 4 + .../org/apache/arrow/c/RoundtripTest.java | 13 +++ .../vector/complex/RunEndEncodedVector.java | 85 ++++++++++++++----- .../arrow/vector/TestRunEndEncodedVector.java | 51 ++++++++--- .../testing/ValueVectorDataPopulator.java | 35 ++++++++ 6 files changed, 158 insertions(+), 32 deletions(-) diff --git a/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java b/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java index 150c11e41edff..2661c12cda3af 100644 --- a/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java +++ b/java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java @@ -187,7 +187,7 @@ public List visit(ArrowType.Union type) { @Override public List visit(ArrowType.RunEndEncoded type) { - throw new UnsupportedOperationException("Importing buffers for type: " + type); + return List.of(); } @Override diff --git a/java/c/src/main/java/org/apache/arrow/c/Format.java b/java/c/src/main/java/org/apache/arrow/c/Format.java index f77a555d18481..ba8a12cbd87bf 100644 --- a/java/c/src/main/java/org/apache/arrow/c/Format.java +++ b/java/c/src/main/java/org/apache/arrow/c/Format.java @@ -233,6 +233,8 @@ static String asString(ArrowType arrowType) { return "+vl"; case LargeListView: return "+vL"; + case RunEndEncoded: + return "r"; case NONE: throw new IllegalArgumentException("Arrow type ID is NONE"); default: @@ -321,6 +323,8 @@ static ArrowType asType(String format, long flags) return new ArrowType.ListView(); case "+vL": return new ArrowType.LargeListView(); + case "r": + return new ArrowType.RunEndEncoded(); default: String[] parts = format.split(":", 2); if (parts.length == 2) { diff --git a/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java b/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java index d8286465e475f..807680a3ba3fd 100644 --- a/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java +++ b/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java @@ -88,6 +88,7 @@ import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.ListViewVector; import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.StructVector; import org.apache.arrow.vector.complex.UnionVector; import org.apache.arrow.vector.complex.impl.UnionMapWriter; @@ -770,6 +771,18 @@ public void testStructVector() { } } + @Test + public void testRunEndEncodedVector() { + try (final RunEndEncodedVector vector = RunEndEncodedVector.empty("v", allocator)) { + setVector( + vector, + Arrays.stream(new int[] {1, 3}).boxed().collect(Collectors.toList()), + Arrays.stream(new int[] {1, 2}).boxed().collect(Collectors.toList())); + assertTrue(roundtrip(vector, RunEndEncodedVector.class)); + } + } + + @Test public void testExtensionTypeVector() { ExtensionTypeRegistry.register(new UuidType()); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java index e8de86f6e9549..b612b7f748ef0 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/RunEndEncodedVector.java @@ -203,6 +203,7 @@ public void clear() { for (FieldVector v : getChildrenFromFields()) { v.clear(); } + this.valueCount = 0; } /** @@ -234,19 +235,6 @@ public MinorType getMinorType() { return MinorType.RUNENDENCODED; } - /** - * To transfer quota responsibility. - * - * @param allocator the target allocator - * @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new - * target vector of the same type. - */ - @Override - public TransferPair getTransferPair(BufferAllocator allocator) { - throw new UnsupportedOperationException( - "RunEndEncodedVector does not support getTransferPair(BufferAllocator)"); - } - /** * To transfer quota responsibility. * @@ -284,8 +272,7 @@ public TransferPair getTransferPair(Field field, BufferAllocator allocator) { */ @Override public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) { - throw new UnsupportedOperationException( - "RunEndEncodedVector does not support getTransferPair(String, BufferAllocator, CallBack)"); + return new TransferImpl(ref, allocator, callBack); } /** @@ -299,8 +286,7 @@ public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallB */ @Override public TransferPair getTransferPair(Field field, BufferAllocator allocator, CallBack callBack) { - throw new UnsupportedOperationException( - "RunEndEncodedVector does not support getTransferPair(Field, BufferAllocator, CallBack)"); + return new TransferImpl(field, allocator, callBack); } /** @@ -312,8 +298,68 @@ public TransferPair getTransferPair(Field field, BufferAllocator allocator, Call */ @Override public TransferPair makeTransferPair(ValueVector target) { - throw new UnsupportedOperationException( - "RunEndEncodedVector does not support makeTransferPair(ValueVector)"); + return new TransferImpl((RunEndEncodedVector) target); + } + + private class TransferImpl implements TransferPair { + + RunEndEncodedVector to; + TransferPair dataTransferPair; + TransferPair reeTransferPair; + + public TransferImpl(String name, BufferAllocator allocator, CallBack callBack) { + this(new RunEndEncodedVector(name, allocator, field.getFieldType(), callBack)); + } + + public TransferImpl(Field field, BufferAllocator allocator, CallBack callBack) { + this(new RunEndEncodedVector(field, allocator, callBack)); + } + + public TransferImpl(RunEndEncodedVector to) { + this.to = to; + if (to.getRunEndsVector() instanceof ZeroVector) { + to.initializeChildrenFromFields(field.getChildren()); + } + reeTransferPair = getRunEndsVector().makeTransferPair(to.getRunEndsVector()); + dataTransferPair = getValuesVector().makeTransferPair(to.getValuesVector()); + } + + /** + * Transfer this vector'data to another vector. The memory associated with this vector is + * transferred to the allocator of target vector for accounting and management purposes. + */ + @Override + public void transfer() { + to.clear(); + dataTransferPair.transfer(); + reeTransferPair.transfer(); + if (valueCount > 0) { + to.setValueCount(valueCount); + } + clear(); + } + + /** + * Slice this vector at desired index and length and transfer the corresponding data to the + * target vector. + * + * @param startIndex start position of the split in source vector. + * @param length length of the split. + */ + @Override + public void splitAndTransfer(int startIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ValueVector getTo() { + return to; + } + + @Override + public void copyValueSafe(int from, int to) { + this.to.copyFrom(from, to, RunEndEncodedVector.this); + } } /** @@ -568,6 +614,7 @@ public void loadFieldBuffers(ArrowFieldNode fieldNode, List ownBuffers throw new UnsupportedOperationException( "Run-end encoded vectors do not have any associated buffers."); } + this.valueCount = fieldNode.getLength(); } /** diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java index 3f4be2e52ce56..28dd57048480c 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestRunEndEncodedVector.java @@ -32,6 +32,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType.RunEndEncoded; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.util.TransferPair; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -78,6 +79,18 @@ public void testConstantValueVector() { for (int i = 0; i < logicalValueCount; i++) { assertEquals(value, reeVector.getObject(i)); } + + TransferPair transferPair = reeVector.getTransferPair(allocator); + transferPair.transfer(); + assertEquals(0, reeVector.getValueCount()); + assertEquals(0, reeVector.getValuesVector().getValueCount()); + assertEquals(0, reeVector.getRunEndsVector().getValueCount()); + try (RunEndEncodedVector toVector = (RunEndEncodedVector) transferPair.getTo()) { + assertEquals(logicalValueCount, toVector.getValueCount()); + for (int i = 0; i < logicalValueCount; i++) { + assertEquals(value, toVector.getObject(i)); + } + } } // constant null vector @@ -106,22 +119,36 @@ public void testBasicRunEndEncodedVector() { setBasicVector(reeVector, runCount, i -> i % 2 == 0 ? null : i + 1, i -> i + 1); assertEquals(15, reeVector.getValueCount()); - int index = 0; - for (int run = 0; run < runCount; run++) { - long expectedRunValue = (long) run + 1; - for (int j = 0; j <= run; j++) { - if (run % 2 == 0) { - assertNull(reeVector.getObject(index)); - } else { - assertEquals(expectedRunValue, reeVector.getObject(index)); - } - index++; - } - } + checkBasic(runCount, reeVector); // test index out of bound assertThrows(IndexOutOfBoundsException.class, () -> reeVector.getObject(-1)); assertThrows(IndexOutOfBoundsException.class, () -> reeVector.getObject(logicalValueCount)); + + TransferPair transferPair = reeVector.getTransferPair(allocator); + transferPair.transfer(); + assertEquals(0, reeVector.getValueCount()); + assertEquals(0, reeVector.getValuesVector().getValueCount()); + assertEquals(0, reeVector.getRunEndsVector().getValueCount()); + try (RunEndEncodedVector toVector = (RunEndEncodedVector) transferPair.getTo()) { + assertEquals(logicalValueCount, toVector.getValueCount()); + checkBasic(runCount, toVector); + } + } + } + + private static void checkBasic(int runCount, RunEndEncodedVector reeVector) { + int index = 0; + for (int run = 0; run < runCount; run++) { + long expectedRunValue = (long) run + 1; + for (int j = 0; j <= run; j++) { + if (run % 2 == 0) { + assertNull(reeVector.getObject(index)); + } else { + assertEquals(expectedRunValue, reeVector.getObject(index)); + } + index++; + } } } diff --git a/java/vector/src/test/java/org/apache/arrow/vector/testing/ValueVectorDataPopulator.java b/java/vector/src/test/java/org/apache/arrow/vector/testing/ValueVectorDataPopulator.java index afbc30f019ef6..fcdd5c66ead11 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/testing/ValueVectorDataPopulator.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/testing/ValueVectorDataPopulator.java @@ -68,10 +68,12 @@ import org.apache.arrow.vector.complex.LargeListViewVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.ListViewVector; +import org.apache.arrow.vector.complex.RunEndEncodedVector; import org.apache.arrow.vector.complex.StructVector; import org.apache.arrow.vector.holders.IntervalDayHolder; import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.Types.MinorType; +import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; /** Utility for populating {@link org.apache.arrow.vector.ValueVector}. */ @@ -794,4 +796,37 @@ public static void setVector(LargeListViewVector vector, List... values dataVector.setValueCount(curPos); vector.setValueCount(values.length); } + + public static void setVector( + RunEndEncodedVector vector, List runEnds, List values) { + int runCount = runEnds.size(); + assert runCount == values.size(); + final FieldType valueType = FieldType.notNullable(MinorType.INT.getType()); + final FieldType runEndType = FieldType.notNullable(Types.MinorType.INT.getType()); + final Field valueField = new Field("value", valueType, null); + final Field runEndField = new Field("ree", runEndType, null); + vector.initializeChildrenFromFields(List.of(runEndField, valueField)); + + IntVector runEndsVector = (IntVector) vector.getRunEndsVector(); + runEndsVector.setValueCount(runCount); + for (int i = 0; i < runCount; i++) { + if (runEnds.get(i) == null) { + runEndsVector.setNull(i); + } else { + runEndsVector.set(i, runEnds.get(i)); + } + } + + IntVector valuesVector = (IntVector) vector.getValuesVector(); + valuesVector.setValueCount(runCount); + for (int i = 0; i < runCount; i++) { + if (runEnds.get(i) == null) { + valuesVector.setNull(i); + } else { + valuesVector.set(i, values.get(i)); + } + } + + vector.setValueCount(runEnds.get(runCount - 1)); + } } From bd23d706d7bb7d7ee1d70a41050592af0e130d1d Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Tue, 8 Oct 2024 21:48:20 +0800 Subject: [PATCH 2/6] integration test --- dev/archery/archery/integration/datagen.py | 1 - java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java | 1 - java/c/src/test/python/integration_tests.py | 3 +++ .../main/java/org/apache/arrow/vector/ipc/JsonFileReader.java | 4 +++- 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py index 970fe2e16bfe9..0ba7f8bf9d049 100644 --- a/dev/archery/archery/integration/datagen.py +++ b/dev/archery/archery/integration/datagen.py @@ -1929,7 +1929,6 @@ def _temp_path(): generate_run_end_encoded_case() .skip_tester('C#') - .skip_tester('Java') .skip_tester('JS') # TODO(https://github.com/apache/arrow-nanoarrow/issues/618) .skip_tester('nanoarrow') diff --git a/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java b/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java index 807680a3ba3fd..58ca9ac2ef70e 100644 --- a/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java +++ b/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java @@ -782,7 +782,6 @@ public void testRunEndEncodedVector() { } } - @Test public void testExtensionTypeVector() { ExtensionTypeRegistry.register(new UuidType()); diff --git a/java/c/src/test/python/integration_tests.py b/java/c/src/test/python/integration_tests.py index b0a86e9c66e59..e965edcabf99a 100644 --- a/java/c/src/test/python/integration_tests.py +++ b/java/c/src/test/python/integration_tests.py @@ -399,6 +399,9 @@ def recreate_batch(): return reader.read_next_batch() self.round_trip_record_batch(recreate_batch) + + def test_runendencoded_array(self): + self.round_trip_array(lambda: pa.RunEndEncodedArray.from_arrays([3, 5, 10, 12, 19], [1, 2, 1, None, 3]), check_metadata=False) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java index 5668325a87eeb..8e1da0d527c34 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java @@ -910,7 +910,9 @@ private void readFromJsonIntoVector(Field field, FieldVector vector) throws IOEx } int nullCount = 0; - if (type instanceof ArrowType.Null) { + if (type instanceof ArrowType.RunEndEncoded) { + nullCount = 0; + } else if (type instanceof ArrowType.Null) { nullCount = valueCount; } else if (!(type instanceof Union)) { nullCount = BitVectorHelper.getNullCount(vectorBuffers.get(0), valueCount); From 712a0d763c9ed690b51a0bf8c7b815b60ee797d8 Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Tue, 8 Oct 2024 22:11:26 +0800 Subject: [PATCH 3/6] fix --- java/c/src/main/java/org/apache/arrow/c/Format.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/c/src/main/java/org/apache/arrow/c/Format.java b/java/c/src/main/java/org/apache/arrow/c/Format.java index ba8a12cbd87bf..adf3936581961 100644 --- a/java/c/src/main/java/org/apache/arrow/c/Format.java +++ b/java/c/src/main/java/org/apache/arrow/c/Format.java @@ -323,7 +323,7 @@ static ArrowType asType(String format, long flags) return new ArrowType.ListView(); case "+vL": return new ArrowType.LargeListView(); - case "r": + case "+r": return new ArrowType.RunEndEncoded(); default: String[] parts = format.split(":", 2); From d967d5fb20044d4b8102128fe295fd369161efdf Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Tue, 8 Oct 2024 22:12:07 +0800 Subject: [PATCH 4/6] fix --- java/c/src/main/java/org/apache/arrow/c/Format.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/c/src/main/java/org/apache/arrow/c/Format.java b/java/c/src/main/java/org/apache/arrow/c/Format.java index adf3936581961..7ce99614d2a7a 100644 --- a/java/c/src/main/java/org/apache/arrow/c/Format.java +++ b/java/c/src/main/java/org/apache/arrow/c/Format.java @@ -234,7 +234,7 @@ static String asString(ArrowType arrowType) { case LargeListView: return "+vL"; case RunEndEncoded: - return "r"; + return "+r"; case NONE: throw new IllegalArgumentException("Arrow type ID is NONE"); default: From b34aca3a2f0ba016cdcd72d0f33e87811c5e671f Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Wed, 9 Oct 2024 12:47:26 +0800 Subject: [PATCH 5/6] test empty vector --- .../test/java/org/apache/arrow/c/RoundtripTest.java | 11 +++++++++-- .../vector/testing/ValueVectorDataPopulator.java | 6 +++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java b/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java index 58ca9ac2ef70e..a28e0a3984228 100644 --- a/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java +++ b/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java @@ -776,12 +776,19 @@ public void testRunEndEncodedVector() { try (final RunEndEncodedVector vector = RunEndEncodedVector.empty("v", allocator)) { setVector( vector, - Arrays.stream(new int[] {1, 3}).boxed().collect(Collectors.toList()), - Arrays.stream(new int[] {1, 2}).boxed().collect(Collectors.toList())); + List.of(1, 3), + List.of(1, 2)); assertTrue(roundtrip(vector, RunEndEncodedVector.class)); } } + @Test + public void testEmptyRunEndEncodedVector() { + try (final RunEndEncodedVector vector = RunEndEncodedVector.empty("v", allocator)) { + setVector(vector, List.of(), List.of()); + assertTrue(roundtrip(vector, RunEndEncodedVector.class)); + } + } @Test public void testExtensionTypeVector() { ExtensionTypeRegistry.register(new UuidType()); diff --git a/java/vector/src/test/java/org/apache/arrow/vector/testing/ValueVectorDataPopulator.java b/java/vector/src/test/java/org/apache/arrow/vector/testing/ValueVectorDataPopulator.java index fcdd5c66ead11..f599dfa539421 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/testing/ValueVectorDataPopulator.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/testing/ValueVectorDataPopulator.java @@ -827,6 +827,10 @@ public static void setVector( } } - vector.setValueCount(runEnds.get(runCount - 1)); + if (runCount > 0) { + vector.setValueCount(runEnds.get(runCount - 1)); + } else { + vector.setValueCount(0); + } } } From d4017dcdc44748b4c6c2bd1f1c9375434c0f0cd7 Mon Sep 17 00:00:00 2001 From: "chenweiguo.vc" Date: Wed, 9 Oct 2024 13:36:17 +0800 Subject: [PATCH 6/6] spotless --- java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java b/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java index a28e0a3984228..67ab282de5a32 100644 --- a/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java +++ b/java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java @@ -774,10 +774,7 @@ public void testStructVector() { @Test public void testRunEndEncodedVector() { try (final RunEndEncodedVector vector = RunEndEncodedVector.empty("v", allocator)) { - setVector( - vector, - List.of(1, 3), - List.of(1, 2)); + setVector(vector, List.of(1, 3), List.of(1, 2)); assertTrue(roundtrip(vector, RunEndEncodedVector.class)); } } @@ -789,6 +786,7 @@ public void testEmptyRunEndEncodedVector() { assertTrue(roundtrip(vector, RunEndEncodedVector.class)); } } + @Test public void testExtensionTypeVector() { ExtensionTypeRegistry.register(new UuidType());