Skip to content

Commit

Permalink
[bugfix](paimon)Fixed the reading of timestamp with time zone type da…
Browse files Browse the repository at this point in the history
…ta (#37716)

## Proposed changes

1. When using jni to read timestamps with time zones, the time needs to
be converted to local time
2. In version 0.8 of paimon, the time zone (isAdjustToUTC) information
of parquet files is added, and doris can parse data directly according
to the time zone information
  • Loading branch information
wuwenchi authored and dataroaring committed Jul 17, 2024
1 parent 4ebb45c commit fd5f0d4
Show file tree
Hide file tree
Showing 7 changed files with 400 additions and 51 deletions.
15 changes: 2 additions & 13 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -785,20 +785,9 @@ Status VFileScanner::_get_next_reader() {
break;
}
case TFileFormatType::FORMAT_PARQUET: {
static const cctz::time_zone utc0 = cctz::utc_time_zone();
cctz::time_zone* tz;
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "paimon") {
// The timestmap generated by paimon does not carry metadata information (e.g., isAdjustToUTC, etc.),
// and the stored data is UTC0 by default, so it is directly set to the UTC time zone.
// In version 0.7, paimon fixed this issue and can remove the judgment here
tz = const_cast<cctz::time_zone*>(&utc0);
} else {
tz = const_cast<cctz::time_zone*>(&_state->timezone_obj());
}
std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique(
_profile, *_params, range, _state->query_options().batch_size, tz,
_io_ctx.get(), _state,
_profile, *_params, range, _state->query_options().batch_size,
const_cast<cctz::time_zone*>(&_state->timezone_obj()), _io_ctx.get(), _state,
_shoudl_enable_file_meta_cache() ? ExecEnv::GetInstance()->file_meta_cache()
: nullptr,
_state->query_options().enable_parquet_lazy_mat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ docker exec iceberg-rest bash -c 'cp /tmp/iceberg_rest_mode\=memory /mnt/data/in

# save iceberg from s3
docker exec mc bash -c 'mc cp -r minio/warehouse /mnt/data/input/minio'

# package zip
cp -r data iceberg_data
zip -rq iceberg_data.zip iceberg_data
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,43 @@
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;

public class PaimonColumnValue implements ColumnValue {
private static final Logger LOG = LoggerFactory.getLogger(PaimonColumnValue.class);
private int idx;
private DataGetters record;
private ColumnType dorisType;
private DataType dataType;

public PaimonColumnValue() {
}

public PaimonColumnValue(DataGetters record, int idx, ColumnType columnType) {
public PaimonColumnValue(DataGetters record, int idx, ColumnType columnType, DataType dataType) {
this.idx = idx;
this.record = record;
this.dorisType = columnType;
this.dataType = dataType;
}

public void setIdx(int idx, ColumnType dorisType) {
public void setIdx(int idx, ColumnType dorisType, DataType dataType) {
this.idx = idx;
this.dorisType = dorisType;
this.dataType = dataType;
}

public void setOffsetRow(InternalRow record) {
Expand Down Expand Up @@ -124,7 +134,12 @@ public LocalDate getDate() {

@Override
public LocalDateTime getDateTime() {
return record.getTimestamp(idx, dorisType.getPrecision()).toLocalDateTime();
Timestamp ts = record.getTimestamp(idx, dorisType.getPrecision());
if (dataType instanceof LocalZonedTimestampType) {
return LocalDateTime.ofInstant(ts.toInstant(), ZoneId.systemDefault());
} else {
return ts.toLocalDateTime();
}
}

@Override
Expand All @@ -142,7 +157,7 @@ public void unpackArray(List<ColumnValue> values) {
InternalArray recordArray = record.getArray(idx);
for (int i = 0; i < recordArray.size(); i++) {
PaimonColumnValue arrayColumnValue = new PaimonColumnValue((DataGetters) recordArray, i,
dorisType.getChildTypes().get(0));
dorisType.getChildTypes().get(0), ((ArrayType) dataType).getElementType());
values.add(arrayColumnValue);
}
}
Expand All @@ -153,13 +168,13 @@ public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
InternalArray key = map.keyArray();
for (int i = 0; i < key.size(); i++) {
PaimonColumnValue keyColumnValue = new PaimonColumnValue((DataGetters) key, i,
dorisType.getChildTypes().get(0));
dorisType.getChildTypes().get(0), ((MapType) dataType).getKeyType());
keys.add(keyColumnValue);
}
InternalArray value = map.valueArray();
for (int i = 0; i < value.size(); i++) {
PaimonColumnValue valueColumnValue = new PaimonColumnValue((DataGetters) value, i,
dorisType.getChildTypes().get(1));
dorisType.getChildTypes().get(1), ((MapType) dataType).getValueType());
values.add(valueColumnValue);
}
}
Expand All @@ -169,7 +184,8 @@ public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> value
// todo: support pruned struct fields
InternalRow row = record.getRow(idx, structFieldIndex.size());
for (int i : structFieldIndex) {
values.add(new PaimonColumnValue(row, i, dorisType.getChildTypes().get(i)));
values.add(new PaimonColumnValue(row, i, dorisType.getChildTypes().get(i),
((RowType) dataType).getFields().get(i).type()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class PaimonJniScanner extends JniScanner {
private RecordReader<InternalRow> reader;
private final PaimonColumnValue columnValue = new PaimonColumnValue();
private List<String> paimonAllFieldNames;
private List<DataType> paimonDataTypeList;

private long ctlId;
private long dbId;
Expand Down Expand Up @@ -99,7 +100,7 @@ public void open() throws IOException {
initTable();
initReader();
resetDatetimeV2Precision();
} catch (Exception e) {
} catch (Throwable e) {
LOG.warn("Failed to open paimon_scanner: " + e.getMessage(), e);
throw e;
}
Expand All @@ -114,9 +115,12 @@ private void initReader() throws IOException {
+ " Please refresh table and try again",
fields.length, paimonAllFieldNames.size()));
}
readBuilder.withProjection(getProjected());
int[] projected = getProjected();
readBuilder.withProjection(projected);
readBuilder.withFilter(getPredicates());
reader = readBuilder.newRead().createReader(getSplit());
paimonDataTypeList =
Arrays.stream(projected).mapToObj(i -> table.rowType().getTypeAt(i)).collect(Collectors.toList());
}

private int[] getProjected() {
Expand Down Expand Up @@ -175,7 +179,7 @@ protected int getNext() throws IOException {
while ((record = recordIterator.next()) != null) {
columnValue.setOffsetRow(record);
for (int i = 0; i < fields.length; i++) {
columnValue.setIdx(i, types[i]);
columnValue.setIdx(i, types[i], paimonDataTypeList.get(i));
appendData(i, columnValue);
}
rows++;
Expand All @@ -189,8 +193,8 @@ protected int getNext() throws IOException {
} catch (Exception e) {
close();
LOG.warn("Failed to get the next batch of paimon. "
+ "split: {}, requiredFieldNames: {}, paimonAllFieldNames: {}",
getSplit(), params.get("required_fields"), paimonAllFieldNames, e);
+ "split: {}, requiredFieldNames: {}, paimonAllFieldNames: {}, dataType: {}",
getSplit(), params.get("required_fields"), paimonAllFieldNames, paimonDataTypeList, e);
throw new IOException(e);
}
return rows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public Optional<SchemaCacheValue> initSchema() {
}

private Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) {
int tsScale = 3; // default
switch (dataType.getTypeRoot()) {
case BOOLEAN:
return Type.BOOLEAN;
Expand All @@ -114,20 +115,26 @@ private Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dat
case DATE:
return ScalarType.createDateV2Type();
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
int scale = 3; // default
if (dataType instanceof org.apache.paimon.types.TimestampType) {
scale = ((org.apache.paimon.types.TimestampType) dataType).getPrecision();
if (scale > 6) {
scale = 6;
tsScale = ((org.apache.paimon.types.TimestampType) dataType).getPrecision();
if (tsScale > 6) {
tsScale = 6;
}
} else if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) {
scale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision();
if (scale > 6) {
scale = 6;
tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision();
if (tsScale > 6) {
tsScale = 6;
}
}
return ScalarType.createDatetimeV2Type(tsScale);
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) {
tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision();
if (tsScale > 6) {
tsScale = 6;
}
}
return ScalarType.createDatetimeV2Type(scale);
return ScalarType.createDatetimeV2Type(tsScale);
case ARRAY:
ArrayType arrayType = (ArrayType) dataType;
Type innerType = paimonPrimitiveTypeToDorisType(arrayType.getElementType());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !c1 --
1 2024-01-02T10:04:05.100 2024-01-02T10:04:05.120 2024-01-02T10:04:05.123 2024-01-02T10:04:05.123400 2024-01-02T10:04:05.123450 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.100 2024-01-02T10:04:05.120 2024-01-02T10:04:05.123 2024-01-02T10:04:05.123400 2024-01-02T10:04:05.123450 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456

-- !c2 --
1 2024-01-02T10:04:05.100 2024-01-02T10:04:05.120 2024-01-02T10:04:05.123 2024-01-02T10:04:05.123400 2024-01-02T10:04:05.123450 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.100 2024-01-02T10:04:05.120 2024-01-02T10:04:05.123 2024-01-02T10:04:05.123400 2024-01-02T10:04:05.123450 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456

-- !ltz_ntz_simple2 --
1 {"2024-01-01 10:12:34.123456":"2024-01-02 10:12:34.123456", "2024-01-03 10:12:34.123456":"2024-01-04 10:12:34.123456"} {"2024-01-03 10:12:34.123456":"2024-01-04 10:12:34.123456", "2024-01-01 10:12:34.123456":"2024-01-02 10:12:34.123456"} ["2024-01-01 10:12:34.123456", "2024-01-02 10:12:34.123456", "2024-01-03 10:12:34.123456"] ["2024-01-01 10:12:34.123456", "2024-01-02 10:12:34.123456", "2024-01-03 10:12:34.123456"] {"crow1":"2024-01-01 10:12:34.123456", "crow2":"2024-01-02 10:12:34.123456"}

-- !ltz_ntz_simple3 --
{"2024-01-01 10:12:34.123456":"2024-01-02 10:12:34.123456", "2024-01-03 10:12:34.123456":"2024-01-04 10:12:34.123456"}

-- !ltz_ntz_simple4 --
2024-01-02T10:12:34.123456

-- !ltz_ntz_simple5 --
2024-01-04T10:12:34.123456

-- !ltz_ntz_simple6 --
{"2024-01-03 10:12:34.123456":"2024-01-04 10:12:34.123456", "2024-01-01 10:12:34.123456":"2024-01-02 10:12:34.123456"}

-- !ltz_ntz_simple7 --
2024-01-02T10:12:34.123456

-- !ltz_ntz_simple8 --
2024-01-04T10:12:34.123456

-- !ltz_ntz_simple9 --
["2024-01-01 10:12:34.123456", "2024-01-02 10:12:34.123456", "2024-01-03 10:12:34.123456"]

-- !ltz_ntz_simple10 --
2024-01-02T10:12:34.123456

-- !ltz_ntz_simple11 --
["2024-01-01 10:12:34.123456", "2024-01-02 10:12:34.123456", "2024-01-03 10:12:34.123456"]

-- !ltz_ntz_simple12 --
2024-01-02T10:12:34.123456

-- !ltz_ntz_simple13 --
{"crow1":"2024-01-01 10:12:34.123456", "crow2":"2024-01-02 10:12:34.123456"}

-- !ltz_ntz_simple14 --
2024-01-01T10:12:34.123456

-- !ltz_ntz_simple15 --
2024-01-02T10:12:34.123456

-- !c1 --
1 2024-01-02T10:04:05.100 2024-01-02T10:04:05.120 2024-01-02T10:04:05.123 2024-01-02T10:04:05.123400 2024-01-02T10:04:05.123450 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456 2024-01-02T02:04:05.100 2024-01-02T02:04:05.120 2024-01-02T02:04:05.123 2024-01-02T02:04:05.123400 2024-01-02T02:04:05.123450 2024-01-02T02:04:05.123456 2024-01-02T02:04:05.123456 2024-01-02T02:04:05.123456 2024-01-02T02:04:05.123456

-- !c2 --
1 2024-01-02T10:04:05.100 2024-01-02T10:04:05.120 2024-01-02T10:04:05.123 2024-01-02T10:04:05.123400 2024-01-02T10:04:05.123450 2024-01-02T10:04:05.123456 2024-01-02T18:04:05.123456 2024-01-02T18:04:05.123456 2024-01-02T18:04:05.123456 2024-01-02T10:04:05.100 2024-01-02T10:04:05.120 2024-01-02T10:04:05.123 2024-01-02T10:04:05.123400 2024-01-02T10:04:05.123450 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456 2024-01-02T10:04:05.123456

-- !ltz_ntz_simple2 --
1 {"2024-01-01 10:12:34.123456":"2024-01-02 10:12:34.123456", "2024-01-03 10:12:34.123456":"2024-01-04 10:12:34.123456"} {"2024-01-03 02:12:34.123456":"2024-01-04 02:12:34.123456", "2024-01-01 02:12:34.123456":"2024-01-02 02:12:34.123456"} ["2024-01-01 10:12:34.123456", "2024-01-02 10:12:34.123456", "2024-01-03 10:12:34.123456"] ["2024-01-01 02:12:34.123456", "2024-01-02 02:12:34.123456", "2024-01-03 02:12:34.123456"] {"crow1":"2024-01-01 10:12:34.123456", "crow2":"2024-01-02 02:12:34.123456"}

-- !ltz_ntz_simple3 --
{"2024-01-01 10:12:34.123456":"2024-01-02 10:12:34.123456", "2024-01-03 10:12:34.123456":"2024-01-04 10:12:34.123456"}

-- !ltz_ntz_simple4 --
2024-01-02T10:12:34.123456

-- !ltz_ntz_simple5 --
2024-01-04T10:12:34.123456

-- !ltz_ntz_simple6 --
{"2024-01-03 02:12:34.123456":"2024-01-04 02:12:34.123456", "2024-01-01 02:12:34.123456":"2024-01-02 02:12:34.123456"}

-- !ltz_ntz_simple7 --
\N

-- !ltz_ntz_simple8 --
\N

-- !ltz_ntz_simple9 --
["2024-01-01 10:12:34.123456", "2024-01-02 10:12:34.123456", "2024-01-03 10:12:34.123456"]

-- !ltz_ntz_simple10 --
2024-01-02T10:12:34.123456

-- !ltz_ntz_simple11 --
["2024-01-01 02:12:34.123456", "2024-01-02 02:12:34.123456", "2024-01-03 02:12:34.123456"]

-- !ltz_ntz_simple12 --
2024-01-02T02:12:34.123456

-- !ltz_ntz_simple13 --
{"crow1":"2024-01-01 10:12:34.123456", "crow2":"2024-01-02 02:12:34.123456"}

-- !ltz_ntz_simple14 --
2024-01-01T10:12:34.123456

-- !ltz_ntz_simple15 --
2024-01-02T02:12:34.123456

-- !ltz_ntz_simple2 --
1 {"2024-01-01 10:12:34.123456":"2024-01-02 10:12:34.123456", "2024-01-03 10:12:34.123456":"2024-01-04 10:12:34.123456"} {"2024-01-03 10:12:34.123456":"2024-01-04 10:12:34.123456", "2024-01-01 10:12:34.123456":"2024-01-02 10:12:34.123456"} ["2024-01-01 10:12:34.123456", "2024-01-02 10:12:34.123456", "2024-01-03 10:12:34.123456"] ["2024-01-01 10:12:34.123456", "2024-01-02 10:12:34.123456", "2024-01-03 10:12:34.123456"] {"crow1":"2024-01-01 10:12:34.123456", "crow2":"2024-01-02 10:12:34.123456"}

-- !ltz_ntz_simple3 --
{"2024-01-01 10:12:34.123456":"2024-01-02 10:12:34.123456", "2024-01-03 10:12:34.123456":"2024-01-04 10:12:34.123456"}

-- !ltz_ntz_simple4 --
2024-01-02T10:12:34.123456

-- !ltz_ntz_simple5 --
2024-01-04T10:12:34.123456

-- !ltz_ntz_simple6 --
{"2024-01-03 10:12:34.123456":"2024-01-04 10:12:34.123456", "2024-01-01 10:12:34.123456":"2024-01-02 10:12:34.123456"}

-- !ltz_ntz_simple7 --
2024-01-02T10:12:34.123456

-- !ltz_ntz_simple8 --
2024-01-04T10:12:34.123456

-- !ltz_ntz_simple9 --
["2024-01-01 10:12:34.123456", "2024-01-02 10:12:34.123456", "2024-01-03 10:12:34.123456"]

-- !ltz_ntz_simple10 --
2024-01-02T10:12:34.123456

-- !ltz_ntz_simple11 --
["2024-01-01 10:12:34.123456", "2024-01-02 10:12:34.123456", "2024-01-03 10:12:34.123456"]

-- !ltz_ntz_simple12 --
2024-01-02T10:12:34.123456

-- !ltz_ntz_simple13 --
{"crow1":"2024-01-01 10:12:34.123456", "crow2":"2024-01-02 10:12:34.123456"}

-- !ltz_ntz_simple14 --
2024-01-01T10:12:34.123456

-- !ltz_ntz_simple15 --
2024-01-02T10:12:34.123456

Loading

0 comments on commit fd5f0d4

Please sign in to comment.