Skip to content

Commit

Permalink
[Improvement](multi-catalog)support array and map type parse
Browse files Browse the repository at this point in the history
  • Loading branch information
wsjz committed Aug 13, 2024
1 parent a5fd824 commit 951247d
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeStampNanoVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.log4j.Logger;

import java.math.BigDecimal;
Expand All @@ -49,13 +53,18 @@
public class MaxComputeColumnValue implements ColumnValue {
private static final Logger LOG = Logger.getLogger(MaxComputeColumnValue.class);
private int idx;
private FieldVector column;
private ValueVector column;

public MaxComputeColumnValue() {
idx = 0;
}

public void reset(FieldVector column) {
public MaxComputeColumnValue(ValueVector valueVector, int i) {
this.column = valueVector;
this.idx = i;
}

public void reset(ValueVector column) {
this.column = column;
this.idx = 0;
}
Expand Down Expand Up @@ -222,16 +231,38 @@ public byte[] getBytes() {

@Override
public void unpackArray(List<ColumnValue> values) {

skippedIfNull();
ListVector listCol = (ListVector) column;
for (int i = 0; i < listCol.getDataVector().getValueCount(); i++) {
MaxComputeColumnValue val = new MaxComputeColumnValue(listCol.getDataVector(), i);
values.add(val);
}
}

@Override
public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {

skippedIfNull();
MapVector mapCol = (MapVector) column;
FieldVector keyList = mapCol.getDataVector().getChildrenFromFields().get(0);
for (int i = 0; i < keyList.getValueCount(); i++) {
MaxComputeColumnValue val = new MaxComputeColumnValue(keyList, i);
keys.add(val);
}
FieldVector valList = mapCol.getDataVector().getChildrenFromFields().get(1);
for (int i = 0; i < valList.getValueCount(); i++) {
MaxComputeColumnValue val = new MaxComputeColumnValue(valList, i);
values.add(val);
}
}

@Override
public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values) {

skippedIfNull();
StructVector structCol = (StructVector) column;
for (Integer fieldIndex : structFieldIndex) {
MaxComputeColumnValue val = new MaxComputeColumnValue(structCol.getChildByOrdinal(fieldIndex), idx);
values.add(val);
}
idx++;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ public void open() throws IOException {
}

private Column createOdpsColumn(int colIdx, ColumnType dorisType) {
TypeInfo odpsType = getOdpsType(dorisType);
return new Column(fields[colIdx], odpsType);
}

private static TypeInfo getOdpsType(ColumnType dorisType) {
TypeInfo odpsType;
switch (dorisType.getType()) {
case BOOLEAN:
Expand Down Expand Up @@ -236,10 +241,27 @@ private Column createOdpsColumn(int colIdx, ColumnType dorisType) {
case STRING:
odpsType = TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.STRING);
break;
case ARRAY:
TypeInfo elementType = getOdpsType(dorisType.getChildTypes().get(0));
odpsType = TypeInfoFactory.getArrayTypeInfo(elementType);
break;
case MAP:
TypeInfo keyType = getOdpsType(dorisType.getChildTypes().get(0));
TypeInfo valueType = getOdpsType(dorisType.getChildTypes().get(1));
odpsType = TypeInfoFactory.getMapTypeInfo(keyType, valueType);
break;
case STRUCT:
List<String> names = dorisType.getChildNames();
List<TypeInfo> typeInfos = new ArrayList<>();
for (ColumnType childType : dorisType.getChildTypes()) {
typeInfos.add(getOdpsType(childType));
}
odpsType = TypeInfoFactory.getStructTypeInfo(names, typeInfos);
break;
default:
throw new RuntimeException("Unsupported transform for column type: " + dorisType.getType());
}
return new Column(fields[colIdx], odpsType);
return odpsType;
}

@Override
Expand Down

0 comments on commit 951247d

Please sign in to comment.