Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-44065: [Java] Implement C Data Interface for RunEndEncodedVector #44241

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dev/archery/archery/integration/datagen.py
Original file line number Diff line number Diff line change
Expand Up @@ -1929,7 +1929,6 @@ def _temp_path():

generate_run_end_encoded_case()
.skip_tester('C#')
.skip_tester('Java')
.skip_tester('JS')
# TODO(https://github.com/apache/arrow-nanoarrow/issues/618)
.skip_tester('nanoarrow')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public List<ArrowBuf> visit(ArrowType.Union type) {

@Override
public List<ArrowBuf> visit(ArrowType.RunEndEncoded type) {
throw new UnsupportedOperationException("Importing buffers for type: " + type);
return List.of();
}

@Override
Expand Down
4 changes: 4 additions & 0 deletions java/c/src/main/java/org/apache/arrow/c/Format.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ static String asString(ArrowType arrowType) {
return "+vl";
case LargeListView:
return "+vL";
case RunEndEncoded:
return "+r";
case NONE:
throw new IllegalArgumentException("Arrow type ID is NONE");
default:
Expand Down Expand Up @@ -321,6 +323,8 @@ static ArrowType asType(String format, long flags)
return new ArrowType.ListView();
case "+vL":
return new ArrowType.LargeListView();
case "+r":
return new ArrowType.RunEndEncoded();
default:
String[] parts = format.split(":", 2);
if (parts.length == 2) {
Expand Down
17 changes: 17 additions & 0 deletions java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.ListViewVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.RunEndEncodedVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.complex.UnionVector;
import org.apache.arrow.vector.complex.impl.UnionMapWriter;
Expand Down Expand Up @@ -770,6 +771,22 @@ public void testStructVector() {
}
}

@Test
public void testRunEndEncodedVector() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we also add a test for an empty REE Vector?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, see testEmptyRunEndEncodedVector

try (final RunEndEncodedVector vector = RunEndEncodedVector.empty("v", allocator)) {
setVector(vector, List.of(1, 3), List.of(1, 2));
assertTrue(roundtrip(vector, RunEndEncodedVector.class));
}
}

@Test
public void testEmptyRunEndEncodedVector() {
try (final RunEndEncodedVector vector = RunEndEncodedVector.empty("v", allocator)) {
setVector(vector, List.of(), List.of());
assertTrue(roundtrip(vector, RunEndEncodedVector.class));
}
}

@Test
public void testExtensionTypeVector() {
ExtensionTypeRegistry.register(new UuidType());
Expand Down
3 changes: 3 additions & 0 deletions java/c/src/test/python/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,9 @@ def recreate_batch():
return reader.read_next_batch()

self.round_trip_record_batch(recreate_batch)

def test_runendencoded_array(self):
self.round_trip_array(lambda: pa.RunEndEncodedArray.from_arrays([3, 5, 10, 12, 19], [1, 2, 1, None, 3]), check_metadata=False)

if __name__ == '__main__':
unittest.main(verbosity=2)
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ public void clear() {
for (FieldVector v : getChildrenFromFields()) {
v.clear();
}
this.valueCount = 0;
}

/**
Expand Down Expand Up @@ -234,19 +235,6 @@ public MinorType getMinorType() {
return MinorType.RUNENDENCODED;
}

/**
* To transfer quota responsibility.
*
* @param allocator the target allocator
* @return a {@link org.apache.arrow.vector.util.TransferPair transfer pair}, creating a new
* target vector of the same type.
*/
@Override
public TransferPair getTransferPair(BufferAllocator allocator) {
throw new UnsupportedOperationException(
"RunEndEncodedVector does not support getTransferPair(BufferAllocator)");
}

/**
* To transfer quota responsibility.
*
Expand Down Expand Up @@ -284,8 +272,7 @@ public TransferPair getTransferPair(Field field, BufferAllocator allocator) {
*/
@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
throw new UnsupportedOperationException(
"RunEndEncodedVector does not support getTransferPair(String, BufferAllocator, CallBack)");
return new TransferImpl(ref, allocator, callBack);
}

/**
Expand All @@ -299,8 +286,7 @@ public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallB
*/
@Override
public TransferPair getTransferPair(Field field, BufferAllocator allocator, CallBack callBack) {
throw new UnsupportedOperationException(
"RunEndEncodedVector does not support getTransferPair(Field, BufferAllocator, CallBack)");
return new TransferImpl(field, allocator, callBack);
}

/**
Expand All @@ -312,8 +298,68 @@ public TransferPair getTransferPair(Field field, BufferAllocator allocator, Call
*/
@Override
public TransferPair makeTransferPair(ValueVector target) {
throw new UnsupportedOperationException(
"RunEndEncodedVector does not support makeTransferPair(ValueVector)");
return new TransferImpl((RunEndEncodedVector) target);
}

private class TransferImpl implements TransferPair {

RunEndEncodedVector to;
TransferPair dataTransferPair;
TransferPair reeTransferPair;

public TransferImpl(String name, BufferAllocator allocator, CallBack callBack) {
this(new RunEndEncodedVector(name, allocator, field.getFieldType(), callBack));
}

public TransferImpl(Field field, BufferAllocator allocator, CallBack callBack) {
this(new RunEndEncodedVector(field, allocator, callBack));
}

public TransferImpl(RunEndEncodedVector to) {
this.to = to;
if (to.getRunEndsVector() instanceof ZeroVector) {
to.initializeChildrenFromFields(field.getChildren());
}
reeTransferPair = getRunEndsVector().makeTransferPair(to.getRunEndsVector());
dataTransferPair = getValuesVector().makeTransferPair(to.getValuesVector());
}

/**
* Transfer this vector'data to another vector. The memory associated with this vector is
* transferred to the allocator of target vector for accounting and management purposes.
*/
@Override
public void transfer() {
to.clear();
dataTransferPair.transfer();
reeTransferPair.transfer();
if (valueCount > 0) {
to.setValueCount(valueCount);
}
clear();
}

/**
* Slice this vector at desired index and length and transfer the corresponding data to the
* target vector.
*
* @param startIndex start position of the split in source vector.
* @param length length of the split.
*/
@Override
public void splitAndTransfer(int startIndex, int length) {
throw new UnsupportedOperationException();
}

@Override
public ValueVector getTo() {
return to;
}

@Override
public void copyValueSafe(int from, int to) {
this.to.copyFrom(from, to, RunEndEncodedVector.this);
}
}

/**
Expand Down Expand Up @@ -568,6 +614,7 @@ public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers
throw new UnsupportedOperationException(
"Run-end encoded vectors do not have any associated buffers.");
}
this.valueCount = fieldNode.getLength();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,9 @@ private void readFromJsonIntoVector(Field field, FieldVector vector) throws IOEx
}

int nullCount = 0;
if (type instanceof ArrowType.Null) {
if (type instanceof ArrowType.RunEndEncoded) {
nullCount = 0;
} else if (type instanceof ArrowType.Null) {
nullCount = valueCount;
} else if (!(type instanceof Union)) {
nullCount = BitVectorHelper.getNullCount(vectorBuffers.get(0), valueCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.arrow.vector.types.pojo.ArrowType.RunEndEncoded;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.util.TransferPair;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -78,6 +79,18 @@ public void testConstantValueVector() {
for (int i = 0; i < logicalValueCount; i++) {
assertEquals(value, reeVector.getObject(i));
}

TransferPair transferPair = reeVector.getTransferPair(allocator);
transferPair.transfer();
assertEquals(0, reeVector.getValueCount());
assertEquals(0, reeVector.getValuesVector().getValueCount());
assertEquals(0, reeVector.getRunEndsVector().getValueCount());
try (RunEndEncodedVector toVector = (RunEndEncodedVector) transferPair.getTo()) {
assertEquals(logicalValueCount, toVector.getValueCount());
for (int i = 0; i < logicalValueCount; i++) {
assertEquals(value, toVector.getObject(i));
}
}
}

// constant null vector
Expand Down Expand Up @@ -106,22 +119,36 @@ public void testBasicRunEndEncodedVector() {
setBasicVector(reeVector, runCount, i -> i % 2 == 0 ? null : i + 1, i -> i + 1);

assertEquals(15, reeVector.getValueCount());
int index = 0;
for (int run = 0; run < runCount; run++) {
long expectedRunValue = (long) run + 1;
for (int j = 0; j <= run; j++) {
if (run % 2 == 0) {
assertNull(reeVector.getObject(index));
} else {
assertEquals(expectedRunValue, reeVector.getObject(index));
}
index++;
}
}
checkBasic(runCount, reeVector);

// test index out of bound
assertThrows(IndexOutOfBoundsException.class, () -> reeVector.getObject(-1));
assertThrows(IndexOutOfBoundsException.class, () -> reeVector.getObject(logicalValueCount));

TransferPair transferPair = reeVector.getTransferPair(allocator);
transferPair.transfer();
assertEquals(0, reeVector.getValueCount());
assertEquals(0, reeVector.getValuesVector().getValueCount());
assertEquals(0, reeVector.getRunEndsVector().getValueCount());
try (RunEndEncodedVector toVector = (RunEndEncodedVector) transferPair.getTo()) {
assertEquals(logicalValueCount, toVector.getValueCount());
checkBasic(runCount, toVector);
}
}
}

private static void checkBasic(int runCount, RunEndEncodedVector reeVector) {
int index = 0;
for (int run = 0; run < runCount; run++) {
long expectedRunValue = (long) run + 1;
for (int j = 0; j <= run; j++) {
if (run % 2 == 0) {
assertNull(reeVector.getObject(index));
} else {
assertEquals(expectedRunValue, reeVector.getObject(index));
}
index++;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,12 @@
import org.apache.arrow.vector.complex.LargeListViewVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.ListViewVector;
import org.apache.arrow.vector.complex.RunEndEncodedVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.holders.IntervalDayHolder;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;

/** Utility for populating {@link org.apache.arrow.vector.ValueVector}. */
Expand Down Expand Up @@ -794,4 +796,41 @@ public static void setVector(LargeListViewVector vector, List<Integer>... values
dataVector.setValueCount(curPos);
vector.setValueCount(values.length);
}

public static void setVector(
RunEndEncodedVector vector, List<Integer> runEnds, List<Integer> values) {
int runCount = runEnds.size();
assert runCount == values.size();
final FieldType valueType = FieldType.notNullable(MinorType.INT.getType());
final FieldType runEndType = FieldType.notNullable(Types.MinorType.INT.getType());
final Field valueField = new Field("value", valueType, null);
final Field runEndField = new Field("ree", runEndType, null);
vector.initializeChildrenFromFields(List.of(runEndField, valueField));

IntVector runEndsVector = (IntVector) vector.getRunEndsVector();
runEndsVector.setValueCount(runCount);
for (int i = 0; i < runCount; i++) {
if (runEnds.get(i) == null) {
runEndsVector.setNull(i);
} else {
runEndsVector.set(i, runEnds.get(i));
}
}

IntVector valuesVector = (IntVector) vector.getValuesVector();
valuesVector.setValueCount(runCount);
for (int i = 0; i < runCount; i++) {
if (runEnds.get(i) == null) {
valuesVector.setNull(i);
} else {
valuesVector.set(i, values.get(i));
}
}

if (runCount > 0) {
vector.setValueCount(runEnds.get(runCount - 1));
} else {
vector.setValueCount(0);
}
}
}
Loading