From 1aa2634c9c16767d352090ad5ac3eb54a5d8bcea Mon Sep 17 00:00:00 2001 From: duyalei <> Date: Mon, 28 Aug 2023 17:22:33 +0800 Subject: [PATCH] rebase to apache-iceberg-1.3.1 --- README.md | 8 -- .../expressions/GeoMinMaxAggregate.java | 11 +- .../iceberg/expressions/MaxAggregate.java | 3 +- .../iceberg/expressions/MaxAggregator.java | 41 +++---- .../iceberg/expressions/MinAggregate.java | 3 +- .../iceberg/expressions/MinAggregator.java | 41 +++---- .../iceberg/expressions/ValueAggregate.java | 8 +- .../expressions/TestAggregateEvaluator.java | 102 ++++++++++-------- .../TestInclusiveMetricsEvaluator.java | 3 +- .../apache/iceberg/data/IcebergGenerics.java | 2 +- .../spark/PruneColumnsWithoutReordering.java | 7 +- .../iceberg/spark/source/SparkScan.java | 2 - version.txt | 2 +- 13 files changed, 120 insertions(+), 113 deletions(-) diff --git a/README.md b/README.md index 32e5edb8f..3d8016097 100644 --- a/README.md +++ b/README.md @@ -35,11 +35,3 @@ WHERE ST_Contains(geom, ST_Point(0.5, 0.5)); ## Quickstart Check this repo [docker-spark-geolake](https://github.com/spatialx-project/docker-spark-geolake) for early access, there are some [notebooks](https://github.com/spatialx-project/docker-spark-geolake/tree/main/spark/notebooks) inside. - -Source code and documentation will be released soon. - -## PVLDB Artifact - -We are submitting a paper titled "GeoLake: Bringing Geospatial Support to Lakehouses" to VLDB (Very Large Data Bases), and we have made the experiment-related code, data, and results available at this repository. Specialy, check [parquet-benchmark](https://github.com/spatialx-project/geplake-parquet-benchmark) for Parquet-related experiments(paper's section 7.2), check [serde-benckmark](https://github.com/Kontinuation/play-with-geometry-serde) for Serde-related experiments(paper's section 7.3), check [Partition-Resolution](https://github.com/spatialx-project/docker-spark-geolake/blob/main/spark/notebooks/benchmark-portotaxi.ipynb) for Partition-related experiments(paper's section 7.4), check [end-2-end](https://github.com/spatialx-project/docker-spark-geolake/blob/main/spark/notebooks/benchmark-portotaxi.ipynb) for end-2-end experiments(paper's section 7.5). - -It is noteworthy that, for Partition-related experiments and end-2-end experiments, the corresponding repository only contains code for the Portotaxi dataset. For the TIGER2018 and MSBuildings datasets, you only need to modify the logic for reading the dataset in the code. diff --git a/api/src/main/java/org/apache/iceberg/expressions/GeoMinMaxAggregate.java b/api/src/main/java/org/apache/iceberg/expressions/GeoMinMaxAggregate.java index 1cec348a4..81502fac7 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/GeoMinMaxAggregate.java +++ b/api/src/main/java/org/apache/iceberg/expressions/GeoMinMaxAggregate.java @@ -18,14 +18,13 @@ */ package org.apache.iceberg.expressions; +import java.util.Comparator; import org.apache.iceberg.DataFile; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Type.PrimitiveType; import org.apache.iceberg.types.Types; -import java.util.Comparator; - public class GeoMinMaxAggregate extends ValueAggregate { private final int fieldId; @@ -57,13 +56,17 @@ protected Object evaluateRef(DataFile file) { } Pair res; if (this.geomOp == Operation.ST_MINX || this.geomOp == Operation.ST_MINY) { - res = Conversions.fromByteBuffer(Types.GeometryBoundType.get(), safeGet(file.lowerBounds(), fieldId)); + res = + Conversions.fromByteBuffer( + Types.GeometryBoundType.get(), safeGet(file.lowerBounds(), fieldId)); if (res == null) { return null; } return this.geomOp == Operation.ST_MINX ? res.first() : res.second(); } else if (this.geomOp == Operation.ST_MAXX || this.geomOp == Operation.ST_MAXY) { - res = Conversions.fromByteBuffer(Types.GeometryBoundType.get(), safeGet(file.upperBounds(), fieldId)); + res = + Conversions.fromByteBuffer( + Types.GeometryBoundType.get(), safeGet(file.upperBounds(), fieldId)); if (res == null) { return null; } diff --git a/api/src/main/java/org/apache/iceberg/expressions/MaxAggregate.java b/api/src/main/java/org/apache/iceberg/expressions/MaxAggregate.java index 78354cbef..a13728c1d 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/MaxAggregate.java +++ b/api/src/main/java/org/apache/iceberg/expressions/MaxAggregate.java @@ -45,7 +45,8 @@ protected boolean hasValue(DataFile file) { @Override protected Object evaluateRef(DataFile file) { - PrimitiveType resType = type.equals(Types.GeometryType.get()) ? Types.GeometryBoundType.get() : type; + PrimitiveType resType = + type.equals(Types.GeometryType.get()) ? Types.GeometryBoundType.get() : type; return Conversions.fromByteBuffer(resType, safeGet(file.upperBounds(), fieldId)); } diff --git a/api/src/main/java/org/apache/iceberg/expressions/MaxAggregator.java b/api/src/main/java/org/apache/iceberg/expressions/MaxAggregator.java index d4a8b97c9..078f9133d 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/MaxAggregator.java +++ b/api/src/main/java/org/apache/iceberg/expressions/MaxAggregator.java @@ -1,24 +1,21 @@ /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * * Licensed to the Apache Software Foundation (ASF) under one - * * or more contributor license agreements. See the NOTICE file - * * distributed with this work for additional information - * * regarding copyright ownership. The ASF licenses this file - * * to you under the Apache License, Version 2.0 (the - * * "License"); you may not use this file except in compliance - * * with the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, - * * software distributed under the License is distributed on an - * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * * KIND, either express or implied. See the License for the - * * specific language governing permissions and limitations - * * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ - package org.apache.iceberg.expressions; import java.util.Comparator; @@ -41,10 +38,14 @@ protected void update(T value) { // this only happens when the value is a Pair (bounds of a geometry) Pair valuePair = (Pair) value; Pair maxPair = (Pair) max; - this.max = (T) Pair.of(Math.max(valuePair.first(), maxPair.first()), - Math.max(valuePair.second(), maxPair.second())); + this.max = + (T) + Pair.of( + Math.max(valuePair.first(), maxPair.first()), + Math.max(valuePair.second(), maxPair.second())); } catch (ClassCastException e) { - throw new UnsupportedOperationException("MaxAggregator only supports Pair values"); + throw new UnsupportedOperationException( + "MaxAggregator only supports Pair values"); } } else if (comparator.compare(value, max) > 0) { this.max = value; diff --git a/api/src/main/java/org/apache/iceberg/expressions/MinAggregate.java b/api/src/main/java/org/apache/iceberg/expressions/MinAggregate.java index c2b3149ca..43b4c6669 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/MinAggregate.java +++ b/api/src/main/java/org/apache/iceberg/expressions/MinAggregate.java @@ -45,7 +45,8 @@ protected boolean hasValue(DataFile file) { @Override protected Object evaluateRef(DataFile file) { - PrimitiveType resType = type.equals(Types.GeometryType.get()) ? Types.GeometryBoundType.get() : type; + PrimitiveType resType = + type.equals(Types.GeometryType.get()) ? Types.GeometryBoundType.get() : type; return Conversions.fromByteBuffer(resType, safeGet(file.lowerBounds(), fieldId)); } diff --git a/api/src/main/java/org/apache/iceberg/expressions/MinAggregator.java b/api/src/main/java/org/apache/iceberg/expressions/MinAggregator.java index 58045d42e..940ceab9d 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/MinAggregator.java +++ b/api/src/main/java/org/apache/iceberg/expressions/MinAggregator.java @@ -1,24 +1,21 @@ /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * * Licensed to the Apache Software Foundation (ASF) under one - * * or more contributor license agreements. See the NOTICE file - * * distributed with this work for additional information - * * regarding copyright ownership. The ASF licenses this file - * * to you under the Apache License, Version 2.0 (the - * * "License"); you may not use this file except in compliance - * * with the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, - * * software distributed under the License is distributed on an - * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * * KIND, either express or implied. See the License for the - * * specific language governing permissions and limitations - * * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ - package org.apache.iceberg.expressions; import java.util.Comparator; @@ -41,10 +38,14 @@ protected void update(T value) { // this only happens when the value is a Pair (bounds of a geometry) Pair valuePair = (Pair) value; Pair minPair = (Pair) min; - this.min = (T) Pair.of(Math.min(valuePair.first(), minPair.first()), - Math.min(valuePair.second(), minPair.second())); + this.min = + (T) + Pair.of( + Math.min(valuePair.first(), minPair.first()), + Math.min(valuePair.second(), minPair.second())); } catch (ClassCastException e) { - throw new UnsupportedOperationException("MinAggregator only supports Pair values"); + throw new UnsupportedOperationException( + "MinAggregator only supports Pair values"); } } else if (comparator.compare(value, min) < 0) { this.min = value; diff --git a/api/src/main/java/org/apache/iceberg/expressions/ValueAggregate.java b/api/src/main/java/org/apache/iceberg/expressions/ValueAggregate.java index beb608821..15f3a31b6 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/ValueAggregate.java +++ b/api/src/main/java/org/apache/iceberg/expressions/ValueAggregate.java @@ -74,10 +74,10 @@ boolean hasValue(DataFile file, int fieldId) { Long valueCount = safeGet(file.valueCounts(), fieldId); Long nullCount = safeGet(file.nullValueCounts(), fieldId); boolean boundAllNull = - valueCount != null - && valueCount > 0 - && nullCount != null - && nullCount.longValue() == valueCount.longValue(); + valueCount != null + && valueCount > 0 + && nullCount != null + && nullCount.longValue() == valueCount.longValue(); return hasBound || boundAllNull; } } diff --git a/api/src/test/java/org/apache/iceberg/expressions/TestAggregateEvaluator.java b/api/src/test/java/org/apache/iceberg/expressions/TestAggregateEvaluator.java index db4d6e354..8b413cfb8 100644 --- a/api/src/test/java/org/apache/iceberg/expressions/TestAggregateEvaluator.java +++ b/api/src/test/java/org/apache/iceberg/expressions/TestAggregateEvaluator.java @@ -18,8 +18,10 @@ */ package org.apache.iceberg.expressions; -import static org.apache.iceberg.expressions.TestGeometryHelpers.MetricEvalData.*; +import static org.apache.iceberg.expressions.TestGeometryHelpers.MetricEvalData.GEOM_X_MAX; +import static org.apache.iceberg.expressions.TestGeometryHelpers.MetricEvalData.GEOM_X_MIN; import static org.apache.iceberg.expressions.TestGeometryHelpers.MetricEvalData.GEOM_Y_MAX; +import static org.apache.iceberg.expressions.TestGeometryHelpers.MetricEvalData.GEOM_Y_MIN; import static org.apache.iceberg.types.Conversions.toByteBuffer; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; @@ -54,7 +56,7 @@ public class TestAggregateEvaluator { // any value counts, including nulls ImmutableMap.of(1, 50L, 3, 50L, 4, 50L, 5, 50L), // null value counts - ImmutableMap.of(1, 10L, 3, 50L, 4, 10L, 5,0L), + ImmutableMap.of(1, 10L, 3, 50L, 4, 10L, 5, 0L), // nan value counts null, // lower bounds @@ -95,68 +97,78 @@ public class TestAggregateEvaluator { ImmutableMap.of(1, toByteBuffer(IntegerType.get(), 3333))); private static final DataFile GEOM_FILE = - new TestDataFile( - "file.avro", - Row.of(), - 50, - // any value counts, including nulls - ImmutableMap.of(1, 50L, 3, 50L, 4, 50L, 5, 50L), - // null value counts - ImmutableMap.of(1, 10L, 3, 50L, 4, 10L, 5,0L), - // nan value counts - null, - // lower bounds - ImmutableMap.of(5, toByteBuffer(Types.GeometryBoundType.get(), Pair.of(GEOM_X_MIN + 1, GEOM_Y_MIN))), - // upper bounds - ImmutableMap.of(5, toByteBuffer(Types.GeometryBoundType.get(), Pair.of(GEOM_X_MAX, GEOM_Y_MAX - 1)))); + new TestDataFile( + "file.avro", + Row.of(), + 50, + // any value counts, including nulls + ImmutableMap.of(1, 50L, 3, 50L, 4, 50L, 5, 50L), + // null value counts + ImmutableMap.of(1, 10L, 3, 50L, 4, 10L, 5, 0L), + // nan value counts + null, + // lower bounds + ImmutableMap.of( + 5, toByteBuffer(Types.GeometryBoundType.get(), Pair.of(GEOM_X_MIN + 1, GEOM_Y_MIN))), + // upper bounds + ImmutableMap.of( + 5, toByteBuffer(Types.GeometryBoundType.get(), Pair.of(GEOM_X_MAX, GEOM_Y_MAX - 1)))); private static final DataFile MISSING_SOME_NULLS_GEOM_FILE = - new TestDataFile( - "file.avro", - Row.of(), - 50, - // any value counts, including nulls - ImmutableMap.of(1, 50L, 3, 50L, 4, 50L, 5, 40L), - // null value counts - ImmutableMap.of(1, 10L, 3, 50L, 4, 10L, 5,10L), - // nan value counts - null, - // lower bounds - ImmutableMap.of(5, toByteBuffer(Types.GeometryBoundType.get(), Pair.of(GEOM_X_MIN, GEOM_Y_MIN + 1))), - // upper bounds - ImmutableMap.of(5, toByteBuffer(Types.GeometryBoundType.get(), Pair.of(GEOM_X_MAX - 1, GEOM_Y_MAX)))); + new TestDataFile( + "file.avro", + Row.of(), + 50, + // any value counts, including nulls + ImmutableMap.of(1, 50L, 3, 50L, 4, 50L, 5, 40L), + // null value counts + ImmutableMap.of(1, 10L, 3, 50L, 4, 10L, 5, 10L), + // nan value counts + null, + // lower bounds + ImmutableMap.of( + 5, toByteBuffer(Types.GeometryBoundType.get(), Pair.of(GEOM_X_MIN, GEOM_Y_MIN + 1))), + // upper bounds + ImmutableMap.of( + 5, toByteBuffer(Types.GeometryBoundType.get(), Pair.of(GEOM_X_MAX - 1, GEOM_Y_MAX)))); private static final DataFile[] dataFiles = { FILE, MISSING_SOME_NULLS_STATS_1, MISSING_SOME_NULLS_STATS_2 }; - private static final DataFile[] geoDataFiles = { - GEOM_FILE, MISSING_SOME_NULLS_GEOM_FILE - }; + private static final DataFile[] geoDataFiles = {GEOM_FILE, MISSING_SOME_NULLS_GEOM_FILE}; @Test public void testGeomAggregate() { List list = - ImmutableList.of( - Expressions.countStar(), - Expressions.count("geom"), - Expressions.max("geom"), - Expressions.min("geom"), - Expressions.stMinX("geom"), - Expressions.stMinY("geom"), - Expressions.stMaxX("geom"), - Expressions.stMaxY("geom") - ); + ImmutableList.of( + Expressions.countStar(), + Expressions.count("geom"), + Expressions.max("geom"), + Expressions.min("geom"), + Expressions.stMinX("geom"), + Expressions.stMinY("geom"), + Expressions.stMaxX("geom"), + Expressions.stMaxY("geom")); AggregateEvaluator aggregateEvaluator = AggregateEvaluator.create(SCHEMA, list); for (DataFile dataFile : geoDataFiles) { aggregateEvaluator.update(dataFile); } Assert.assertTrue(aggregateEvaluator.allAggregatorsValid()); StructLike result = aggregateEvaluator.result(); - for (int i=0; i { private final StructType requestedType; @@ -200,7 +199,7 @@ public Type map(Types.MapType map, Supplier keyResult, Supplier valu public Type primitive(Type.PrimitiveType primitive) { Set> expectedType = TYPES.get(primitive.typeId()); Preconditions.checkArgument( - expectedType != null && expectedType.contains(current.getClass()), + expectedType != null && expectedType.stream().anyMatch(t -> t.isInstance(current)), "Cannot project %s to incompatible type: %s", primitive, current); @@ -247,6 +246,6 @@ public Type primitive(Type.PrimitiveType primitive) { .put(TypeID.STRING, ImmutableSet.of(StringType$.class)) .put(TypeID.FIXED, ImmutableSet.of(BinaryType$.class)) .put(TypeID.BINARY, ImmutableSet.of(BinaryType$.class)) - .put(TypeID.GEOMETRY, ImmutableSet.of(UserDefinedType$.class)) + .put(TypeID.GEOMETRY, ImmutableSet.of(UserDefinedType.class)) .buildOrThrow(); } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index e4b07bb7a..c46e2e86b 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -31,7 +31,6 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Binder; import org.apache.iceberg.expressions.Expression; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.Spark3Util; @@ -98,7 +97,6 @@ public abstract class SparkScan implements Scan, SupportsReportStatistics { this.caseSensitive = readConf.caseSensitive(); this.expectedSchema = expectedSchema; this.filterExpressions = filters != null ? filters : Collections.emptyList(); - this.readTimestampWithoutZone = readConf.handleTimestampWithoutZone(); this.branch = readConf.branch(); } diff --git a/version.txt b/version.txt index ea1f4d98a..815e50b55 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -1.1.0-geolake +1.3.1-geolake