diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java index 65810163840e349..9c4f56068a7844e 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java @@ -32,8 +32,12 @@ import org.apache.arrow.vector.SmallIntVector; import org.apache.arrow.vector.TimeStampNanoVector; import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.StructVector; import org.apache.log4j.Logger; import java.math.BigDecimal; @@ -49,13 +53,18 @@ public class MaxComputeColumnValue implements ColumnValue { private static final Logger LOG = Logger.getLogger(MaxComputeColumnValue.class); private int idx; - private FieldVector column; + private ValueVector column; public MaxComputeColumnValue() { idx = 0; } - public void reset(FieldVector column) { + public MaxComputeColumnValue(ValueVector valueVector, int i) { + this.column = valueVector; + this.idx = i; + } + + public void reset(ValueVector column) { this.column = column; this.idx = 0; } @@ -222,16 +231,38 @@ public byte[] getBytes() { @Override public void unpackArray(List values) { - + skippedIfNull(); + ListVector listCol = (ListVector) column; + for (int i = 0; i < listCol.getDataVector().getValueCount(); i++) { + MaxComputeColumnValue val = new MaxComputeColumnValue(listCol.getDataVector(), i); + values.add(val); + } } @Override public void unpackMap(List keys, List values) { - + skippedIfNull(); + MapVector mapCol = (MapVector) column; + FieldVector keyList = mapCol.getDataVector().getChildrenFromFields().get(0); + for (int i = 0; i < keyList.getValueCount(); i++) { + MaxComputeColumnValue val = new MaxComputeColumnValue(keyList, i); + keys.add(val); + } + FieldVector valList = mapCol.getDataVector().getChildrenFromFields().get(1); + for (int i = 0; i < valList.getValueCount(); i++) { + MaxComputeColumnValue val = new MaxComputeColumnValue(valList, i); + values.add(val); + } } @Override public void unpackStruct(List structFieldIndex, List values) { - + skippedIfNull(); + StructVector structCol = (StructVector) column; + for (Integer fieldIndex : structFieldIndex) { + MaxComputeColumnValue val = new MaxComputeColumnValue(structCol.getChildByOrdinal(fieldIndex), idx); + values.add(val); + } + idx++; } } diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java index 6a441a69293c9a4..89c351f7343e803 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java @@ -190,6 +190,11 @@ public void open() throws IOException { } private Column createOdpsColumn(int colIdx, ColumnType dorisType) { + TypeInfo odpsType = getOdpsType(dorisType); + return new Column(fields[colIdx], odpsType); + } + + private static TypeInfo getOdpsType(ColumnType dorisType) { TypeInfo odpsType; switch (dorisType.getType()) { case BOOLEAN: @@ -236,10 +241,27 @@ private Column createOdpsColumn(int colIdx, ColumnType dorisType) { case STRING: odpsType = TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.STRING); break; + case ARRAY: + TypeInfo elementType = getOdpsType(dorisType.getChildTypes().get(0)); + odpsType = TypeInfoFactory.getArrayTypeInfo(elementType); + break; + case MAP: + TypeInfo keyType = getOdpsType(dorisType.getChildTypes().get(0)); + TypeInfo valueType = getOdpsType(dorisType.getChildTypes().get(1)); + odpsType = TypeInfoFactory.getMapTypeInfo(keyType, valueType); + break; + case STRUCT: + List names = dorisType.getChildNames(); + List typeInfos = new ArrayList<>(); + for (ColumnType childType : dorisType.getChildTypes()) { + typeInfos.add(getOdpsType(childType)); + } + odpsType = TypeInfoFactory.getStructTypeInfo(names, typeInfos); + break; default: throw new RuntimeException("Unsupported transform for column type: " + dorisType.getType()); } - return new Column(fields[colIdx], odpsType); + return odpsType; } @Override