diff --git a/docs/tutorial/sql.md b/docs/tutorial/sql.md index 446345a398..6070111eda 100644 --- a/docs/tutorial/sql.md +++ b/docs/tutorial/sql.md @@ -376,6 +376,55 @@ root Sedona supports spatial predicate push-down for GeoParquet files, please refer to the [SedonaSQL query optimizer](../api/sql/Optimizer.md) documentation for details. +### Inspect GeoParquet metadata + +Since v`1.5.1`, Sedona provides a Spark SQL data source `"geoparquet.metadata"` for inspecting GeoParquet metadata. The resulting dataframe contains +the "geo" metadata for each input file. + +=== "Scala/Java" + + ```scala + val df = sedona.read.format("geoparquet.metadata").load(geoparquetdatalocation1) + df.printSchema() + ``` + +=== "Java" + + ```java + Dataset df = sedona.read.format("geoparquet.metadata").load(geoparquetdatalocation1) + df.printSchema() + ``` + +=== "Python" + + ```python + df = sedona.read.format("geoparquet.metadata").load(geoparquetdatalocation1) + df.printSchema() + ``` + +The output will be as follows: + +``` +root + |-- path: string (nullable = true) + |-- version: string (nullable = true) + |-- primary_column: string (nullable = true) + |-- columns: map (nullable = true) + | |-- key: string + | |-- value: struct (valueContainsNull = true) + | | |-- encoding: string (nullable = true) + | | |-- geometry_types: array (nullable = true) + | | | |-- element: string (containsNull = true) + | | |-- bbox: array (nullable = true) + | | | |-- element: double (containsNull = true) + | | |-- crs: string (nullable = true) +``` + +If the input Parquet file does not have GeoParquet metadata, the values of `version`, `primary_column` and `columns` fields of the resulting dataframe will be `null`. + +!!! note + `geoparquet.metadata` only supports reading GeoParquet specific metadata. Users can use [G-Research/spark-extension](https://github.com/G-Research/spark-extension/blob/13109b8e43dfba9272c85896ba5e30cfe280426f/PARQUET.md) to read comprehensive metadata of generic Parquet files. + ## Load data from JDBC data sources The 'query' option in Spark SQL's JDBC data source can be used to convert geometry columns to a format that Sedona can interpret. diff --git a/python/tests/sql/test_geoparquet.py b/python/tests/sql/test_geoparquet.py index 69a7a9ef29..478c8aacd7 100644 --- a/python/tests/sql/test_geoparquet.py +++ b/python/tests/sql/test_geoparquet.py @@ -17,6 +17,7 @@ import pytest import os.path +import json from shapely.geometry import Point from shapely.geometry import LineString @@ -68,3 +69,16 @@ def test_load_plain_parquet_file(self): with pytest.raises(Exception) as excinfo: self.spark.read.format("geoparquet").load(plain_parquet_input_location) assert "does not contain valid geo metadata" in str(excinfo.value) + + def test_inspect_geoparquet_metadata(self): + df = self.spark.read.format("geoparquet.metadata").load(geoparquet_input_location) + rows = df.collect() + assert len(rows) == 1 + row = rows[0] + assert row['path'].endswith('.parquet') + assert len(row['version'].split('.')) == 3 + assert row['primary_column'] == 'geometry' + column_metadata = row['columns']['geometry'] + assert column_metadata['encoding'] == 'WKB' + assert len(column_metadata['bbox']) == 4 + assert isinstance(json.loads(column_metadata['crs']), dict) diff --git a/spark/spark-3.0/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/spark/spark-3.0/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 0fdcab5a3a..e5f994e203 100644 --- a/spark/spark-3.0/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/spark/spark-3.0/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -1 +1,2 @@ org.apache.spark.sql.execution.datasources.parquet.GeoParquetFileFormat +org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata.GeoParquetMetadataDataSource diff --git a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataDataSource.scala b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataDataSource.scala new file mode 100644 index 0000000000..6cf4817634 --- /dev/null +++ b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataDataSource.scala @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata + +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * Data source for reading GeoParquet metadata. This could be accessed using the `spark.read` interface: + * {{{ + * val df = spark.read.format("geoparquet.metadata").load("path/to/geoparquet") + * }}} + */ +class GeoParquetMetadataDataSource extends FileDataSourceV2 with DataSourceRegister { + override val shortName: String = "geoparquet.metadata" + + override def fallbackFileFormat: Class[_ <: FileFormat] = null + + override def getTable(options: CaseInsensitiveStringMap): Table = { + val paths = getPaths(options) + val tableName = getTableName(options, paths) + val optionsWithoutPaths = getOptionsWithoutPaths(options) + GeoParquetMetadataTable(tableName, sparkSession, optionsWithoutPaths, paths, None, fallbackFileFormat) + } + + override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = { + val paths = getPaths(options) + val tableName = getTableName(options, paths) + val optionsWithoutPaths = getOptionsWithoutPaths(options) + GeoParquetMetadataTable(tableName, sparkSession, optionsWithoutPaths, paths, Some(schema), fallbackFileFormat) + } +} diff --git a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala new file mode 100644 index 0000000000..fe1b4f8c5f --- /dev/null +++ b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala @@ -0,0 +1,95 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.parquet.hadoop.util.HadoopInputFile +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} +import org.apache.spark.sql.connector.read.PartitionReader +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData +import org.apache.spark.sql.execution.datasources.v2._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.SerializableConfiguration +import org.json4s.jackson.JsonMethods.{compact, render} + +case class GeoParquetMetadataPartitionReaderFactory( + sqlConf: SQLConf, + broadcastedConf: Broadcast[SerializableConfiguration], + dataSchema: StructType, + readDataSchema: StructType, + partitionSchema: StructType, + filters: Seq[Filter]) extends FilePartitionReaderFactory { + + override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = { + val iter = GeoParquetMetadataPartitionReaderFactory.readFile( + broadcastedConf.value.value, + partitionedFile, + readDataSchema) + val fileReader = new PartitionReaderFromIterator[InternalRow](iter) + new PartitionReaderWithPartitionValues(fileReader, readDataSchema, + partitionSchema, partitionedFile.partitionValues) + } +} + +object GeoParquetMetadataPartitionReaderFactory { + private def readFile(configuration: Configuration, + partitionedFile: PartitionedFile, + readDataSchema: StructType): Iterator[InternalRow] = { + val filePath = partitionedFile.filePath + val metadata = ParquetFileReader.open( + HadoopInputFile.fromPath(new Path(filePath), configuration)) + .getFooter.getFileMetaData.getKeyValueMetaData + val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match { + case Some(geo) => + val geoColumnsMap = geo.columns.map { case (columnName, columnMetadata) => + val columnMetadataFields: Array[Any] = Array( + UTF8String.fromString(columnMetadata.encoding), + new GenericArrayData(columnMetadata.geometryTypes.map(UTF8String.fromString).toArray), + new GenericArrayData(columnMetadata.bbox.toArray), + columnMetadata.crs.map(projjson => UTF8String.fromString(compact(render(projjson)))).orNull) + val columnMetadataStruct = new GenericInternalRow(columnMetadataFields) + UTF8String.fromString(columnName) -> columnMetadataStruct + } + val fields: Array[Any] = Array( + UTF8String.fromString(filePath), + UTF8String.fromString(geo.version.orNull), + UTF8String.fromString(geo.primaryColumn), + ArrayBasedMapData(geoColumnsMap)) + new GenericInternalRow(fields) + case None => + // Not a GeoParquet file, return a row with null metadata values. + val fields: Array[Any] = Array(UTF8String.fromString(filePath), null, null, null) + new GenericInternalRow(fields) + } + Iterator(pruneBySchema(row, GeoParquetMetadataTable.schema, readDataSchema)) + } + + private def pruneBySchema(row: InternalRow, schema: StructType, readDataSchema: StructType): InternalRow = { + // Projection push down for nested fields is not enabled, so this very simple implementation is enough. + val values: Array[Any] = readDataSchema.fields.map { field => + val index = schema.fieldIndex(field.name) + row.get(index, field.dataType) + } + new GenericInternalRow(values) + } +} diff --git a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataScan.scala b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataScan.scala new file mode 100644 index 0000000000..ff417e334d --- /dev/null +++ b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataScan.scala @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.connector.read.PartitionReaderFactory +import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex +import org.apache.spark.sql.execution.datasources.v2.FileScan +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.SerializableConfiguration + +import scala.collection.JavaConverters._ + +case class GeoParquetMetadataScan( + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex, + dataSchema: StructType, + readDataSchema: StructType, + readPartitionSchema: StructType, + options: CaseInsensitiveStringMap, + pushedFilters: Array[Filter], + partitionFilters: Seq[Expression] = Seq.empty, + dataFilters: Seq[Expression] = Seq.empty) + extends FileScan { + override def createReaderFactory(): PartitionReaderFactory = { + val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap + // Hadoop Configurations are case sensitive. + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) + val broadcastedConf = sparkSession.sparkContext.broadcast( + new SerializableConfiguration(hadoopConf)) + // The partition values are already truncated in `FileScan.partitions`. + // We should use `readPartitionSchema` as the partition schema here. + GeoParquetMetadataPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, + dataSchema, readDataSchema, readPartitionSchema, pushedFilters) + } + + override def getFileUnSplittableReason(path: Path): String = + "Reading parquet file metadata does not require splitting the file" + + // This is for compatibility with Spark 3.0. Spark 3.3 does not have this method + def withFilters(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): FileScan = { + copy(partitionFilters = partitionFilters, dataFilters = dataFilters) + } +} diff --git a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataScanBuilder.scala b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataScanBuilder.scala new file mode 100644 index 0000000000..1b35b563df --- /dev/null +++ b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataScanBuilder.scala @@ -0,0 +1,79 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.connector.read.Scan +import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex +import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class GeoParquetMetadataScanBuilder( + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex, + schema: StructType, + dataSchema: StructType, + options: CaseInsensitiveStringMap) + extends FileScanBuilder(sparkSession, fileIndex, dataSchema) { + override def build(): Scan = { + GeoParquetMetadataScan( + sparkSession, + fileIndex, + dataSchema, + readDataSchema(), + readPartitionSchema(), + options, + getPushedDataFilters, + getPartitionFilters, + getDataFilters) + } + + // The following methods uses reflection to address compatibility issues for Spark 3.0 ~ 3.2 + + private def getPushedDataFilters: Array[Filter] = { + try { + val field = classOf[FileScanBuilder].getDeclaredField("pushedDataFilters") + field.setAccessible(true) + field.get(this).asInstanceOf[Array[Filter]] + } catch { + case _: NoSuchFieldException => + Array.empty + } + } + + private def getPartitionFilters: Seq[Expression] = { + try { + val field = classOf[FileScanBuilder].getDeclaredField("partitionFilters") + field.setAccessible(true) + field.get(this).asInstanceOf[Seq[Expression]] + } catch { + case _: NoSuchFieldException => + Seq.empty + } + } + + private def getDataFilters: Seq[Expression] = { + try { + val field = classOf[FileScanBuilder].getDeclaredField("dataFilters") + field.setAccessible(true) + field.get(this).asInstanceOf[Seq[Expression]] + } catch { + case _: NoSuchFieldException => + Seq.empty + } + } +} diff --git a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala new file mode 100644 index 0000000000..13599ffd78 --- /dev/null +++ b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala @@ -0,0 +1,63 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata + +import org.apache.hadoop.fs.FileStatus +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.catalog.TableCapability +import org.apache.spark.sql.connector.read.ScanBuilder +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.v2.FileTable +import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +case class GeoParquetMetadataTable( + name: String, + sparkSession: SparkSession, + options: CaseInsensitiveStringMap, + paths: Seq[String], + userSpecifiedSchema: Option[StructType], + fallbackFileFormat: Class[_ <: FileFormat]) + extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + override def formatName: String = "GeoParquet Metadata" + + override def inferSchema(files: Seq[FileStatus]): Option[StructType] = + Some(GeoParquetMetadataTable.schema) + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = + new GeoParquetMetadataScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) + + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = null + + override def capabilities: java.util.Set[TableCapability] = java.util.EnumSet.of(TableCapability.BATCH_READ) +} + +object GeoParquetMetadataTable { + private val columnMetadataType = StructType(Seq( + StructField("encoding", StringType, nullable = true), + StructField("geometry_types", ArrayType(StringType), nullable = true), + StructField("bbox", ArrayType(DoubleType), nullable = true), + StructField("crs", StringType, nullable = true) + )) + + private val columnsType = MapType(StringType, columnMetadataType, valueContainsNull = false) + + val schema: StructType = StructType(Seq( + StructField("path", StringType, nullable = false), + StructField("version", StringType, nullable = true), + StructField("primary_column", StringType, nullable = true), + StructField("columns", columnsType, nullable = true) + )) +} diff --git a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala new file mode 100644 index 0000000000..71e1595422 --- /dev/null +++ b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala @@ -0,0 +1,67 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sedona.sql + +import org.apache.spark.sql.Row +import org.scalatest.BeforeAndAfterAll + +import scala.collection.JavaConverters._ + +class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll { + val geoparquetdatalocation: String = resourceFolder + "geoparquet/" + + describe("GeoParquet Metadata tests") { + it("Reading GeoParquet Metadata") { + val df = sparkSession.read.format("geoparquet.metadata").load(geoparquetdatalocation) + val metadataArray = df.collect() + assert(metadataArray.length > 1) + assert(metadataArray.exists(_.getAs[String]("path").endsWith(".parquet"))) + assert(metadataArray.exists(_.getAs[String]("version") == "1.0.0-dev")) + assert(metadataArray.exists(_.getAs[String]("primary_column") == "geometry")) + assert(metadataArray.exists { row => + val columnsMap = row.getJavaMap(row.fieldIndex("columns")) + columnsMap != null && columnsMap.containsKey("geometry") && columnsMap.get("geometry").isInstanceOf[Row] + }) + assert(metadataArray.forall { row => + val columnsMap = row.getJavaMap(row.fieldIndex("columns")) + if (columnsMap == null || !columnsMap.containsKey("geometry")) true else { + val columnMetadata = columnsMap.get("geometry").asInstanceOf[Row] + columnMetadata.getAs[String]("encoding") == "WKB" && + columnMetadata.getList[Any](columnMetadata.fieldIndex("bbox")).asScala.forall(_.isInstanceOf[Double]) && + columnMetadata.getList[Any](columnMetadata.fieldIndex("geometry_types")).asScala.forall(_.isInstanceOf[String]) && + columnMetadata.getAs[String]("crs").nonEmpty + } + }) + } + + it("Reading GeoParquet Metadata with column pruning") { + val df = sparkSession.read.format("geoparquet.metadata").load(geoparquetdatalocation) + val metadataArray = df.selectExpr("path", "substring(primary_column, 1, 2) AS partial_primary_column").collect() + assert(metadataArray.length > 1) + assert(metadataArray.forall(_.length == 2)) + assert(metadataArray.exists(_.getAs[String]("path").endsWith(".parquet"))) + assert(metadataArray.exists(_.getAs[String]("partial_primary_column") == "ge")) + } + + it("Reading GeoParquet Metadata of plain parquet files") { + val df = sparkSession.read.format("geoparquet.metadata").load(geoparquetdatalocation) + val metadataArray = df.where("path LIKE '%plain.parquet'").collect() + assert(metadataArray.nonEmpty) + assert(metadataArray.forall(_.getAs[String]("path").endsWith("plain.parquet"))) + assert(metadataArray.forall(_.getAs[String]("version") == null)) + assert(metadataArray.forall(_.getAs[String]("primary_column") == null)) + assert(metadataArray.forall(_.getAs[String]("columns") == null)) + } + } +} diff --git a/spark/spark-3.4/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/spark/spark-3.4/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 0fdcab5a3a..e5f994e203 100644 --- a/spark/spark-3.4/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/spark/spark-3.4/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -1 +1,2 @@ org.apache.spark.sql.execution.datasources.parquet.GeoParquetFileFormat +org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata.GeoParquetMetadataDataSource diff --git a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataDataSource.scala b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataDataSource.scala new file mode 100644 index 0000000000..6cf4817634 --- /dev/null +++ b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataDataSource.scala @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata + +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * Data source for reading GeoParquet metadata. This could be accessed using the `spark.read` interface: + * {{{ + * val df = spark.read.format("geoparquet.metadata").load("path/to/geoparquet") + * }}} + */ +class GeoParquetMetadataDataSource extends FileDataSourceV2 with DataSourceRegister { + override val shortName: String = "geoparquet.metadata" + + override def fallbackFileFormat: Class[_ <: FileFormat] = null + + override def getTable(options: CaseInsensitiveStringMap): Table = { + val paths = getPaths(options) + val tableName = getTableName(options, paths) + val optionsWithoutPaths = getOptionsWithoutPaths(options) + GeoParquetMetadataTable(tableName, sparkSession, optionsWithoutPaths, paths, None, fallbackFileFormat) + } + + override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = { + val paths = getPaths(options) + val tableName = getTableName(options, paths) + val optionsWithoutPaths = getOptionsWithoutPaths(options) + GeoParquetMetadataTable(tableName, sparkSession, optionsWithoutPaths, paths, Some(schema), fallbackFileFormat) + } +} diff --git a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala new file mode 100644 index 0000000000..874d2e743d --- /dev/null +++ b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala @@ -0,0 +1,95 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.parquet.hadoop.util.HadoopInputFile +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow} +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} +import org.apache.spark.sql.connector.read.PartitionReader +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData +import org.apache.spark.sql.execution.datasources.v2._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.SerializableConfiguration +import org.json4s.jackson.JsonMethods.{compact, render} + +case class GeoParquetMetadataPartitionReaderFactory( + sqlConf: SQLConf, + broadcastedConf: Broadcast[SerializableConfiguration], + dataSchema: StructType, + readDataSchema: StructType, + partitionSchema: StructType, + options: FileSourceOptions, + filters: Seq[Filter]) extends FilePartitionReaderFactory { + + override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = { + val iter = GeoParquetMetadataPartitionReaderFactory.readFile( + broadcastedConf.value.value, + partitionedFile, + readDataSchema) + val fileReader = new PartitionReaderFromIterator[InternalRow](iter) + new PartitionReaderWithPartitionValues(fileReader, readDataSchema, + partitionSchema, partitionedFile.partitionValues) + } +} + +object GeoParquetMetadataPartitionReaderFactory { + private def readFile(configuration: Configuration, + partitionedFile: PartitionedFile, + readDataSchema: StructType): Iterator[InternalRow] = { + val filePath = partitionedFile.toPath.toString + val metadata = ParquetFileReader.open( + HadoopInputFile.fromPath(partitionedFile.toPath, configuration)) + .getFooter.getFileMetaData.getKeyValueMetaData + val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match { + case Some(geo) => + val geoColumnsMap = geo.columns.map { case (columnName, columnMetadata) => + val columnMetadataFields: Array[Any] = Array( + UTF8String.fromString(columnMetadata.encoding), + new GenericArrayData(columnMetadata.geometryTypes.map(UTF8String.fromString).toArray), + new GenericArrayData(columnMetadata.bbox.toArray), + columnMetadata.crs.map(projjson => UTF8String.fromString(compact(render(projjson)))).orNull) + val columnMetadataStruct = new GenericInternalRow(columnMetadataFields) + UTF8String.fromString(columnName) -> columnMetadataStruct + } + val fields: Array[Any] = Array( + UTF8String.fromString(filePath), + UTF8String.fromString(geo.version.orNull), + UTF8String.fromString(geo.primaryColumn), + ArrayBasedMapData(geoColumnsMap)) + new GenericInternalRow(fields) + case None => + // Not a GeoParquet file, return a row with null metadata values. + val fields: Array[Any] = Array(UTF8String.fromString(filePath), null, null, null) + new GenericInternalRow(fields) + } + Iterator(pruneBySchema(row, GeoParquetMetadataTable.schema, readDataSchema)) + } + + private def pruneBySchema(row: InternalRow, schema: StructType, readDataSchema: StructType): InternalRow = { + // Projection push down for nested fields is not enabled, so this very simple implementation is enough. + val values: Array[Any] = readDataSchema.fields.map { field => + val index = schema.fieldIndex(field.name) + row.get(index, field.dataType) + } + new GenericInternalRow(values) + } +} diff --git a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataScan.scala b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataScan.scala new file mode 100644 index 0000000000..890b1cb0c6 --- /dev/null +++ b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataScan.scala @@ -0,0 +1,56 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.FileSourceOptions +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.connector.read.PartitionReaderFactory +import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex +import org.apache.spark.sql.execution.datasources.v2.FileScan +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.SerializableConfiguration + +import scala.collection.JavaConverters._ + +case class GeoParquetMetadataScan( + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex, + dataSchema: StructType, + readDataSchema: StructType, + readPartitionSchema: StructType, + options: CaseInsensitiveStringMap, + pushedFilters: Array[Filter], + partitionFilters: Seq[Expression] = Seq.empty, + dataFilters: Seq[Expression] = Seq.empty) + extends FileScan { + override def createReaderFactory(): PartitionReaderFactory = { + val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap + // Hadoop Configurations are case sensitive. + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) + val broadcastedConf = sparkSession.sparkContext.broadcast( + new SerializableConfiguration(hadoopConf)) + // The partition values are already truncated in `FileScan.partitions`. + // We should use `readPartitionSchema` as the partition schema here. + val fileSourceOptions = new FileSourceOptions(caseSensitiveMap) + GeoParquetMetadataPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, + dataSchema, readDataSchema, readPartitionSchema, fileSourceOptions, pushedFilters) + } + + override def getFileUnSplittableReason(path: Path): String = + "Reading parquet file metadata does not require splitting the file" +} diff --git a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataScanBuilder.scala b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataScanBuilder.scala new file mode 100644 index 0000000000..560cf1b613 --- /dev/null +++ b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataScanBuilder.scala @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.read.Scan +import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex +import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class GeoParquetMetadataScanBuilder( + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex, + schema: StructType, + dataSchema: StructType, + options: CaseInsensitiveStringMap) + extends FileScanBuilder(sparkSession, fileIndex, dataSchema) { + override def build(): Scan = { + GeoParquetMetadataScan( + sparkSession, + fileIndex, + dataSchema, + readDataSchema(), + readPartitionSchema(), + options, + pushedDataFilters, + partitionFilters, + dataFilters) + } +} diff --git a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala new file mode 100644 index 0000000000..13599ffd78 --- /dev/null +++ b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala @@ -0,0 +1,63 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata + +import org.apache.hadoop.fs.FileStatus +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.catalog.TableCapability +import org.apache.spark.sql.connector.read.ScanBuilder +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.v2.FileTable +import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +case class GeoParquetMetadataTable( + name: String, + sparkSession: SparkSession, + options: CaseInsensitiveStringMap, + paths: Seq[String], + userSpecifiedSchema: Option[StructType], + fallbackFileFormat: Class[_ <: FileFormat]) + extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + override def formatName: String = "GeoParquet Metadata" + + override def inferSchema(files: Seq[FileStatus]): Option[StructType] = + Some(GeoParquetMetadataTable.schema) + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = + new GeoParquetMetadataScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) + + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = null + + override def capabilities: java.util.Set[TableCapability] = java.util.EnumSet.of(TableCapability.BATCH_READ) +} + +object GeoParquetMetadataTable { + private val columnMetadataType = StructType(Seq( + StructField("encoding", StringType, nullable = true), + StructField("geometry_types", ArrayType(StringType), nullable = true), + StructField("bbox", ArrayType(DoubleType), nullable = true), + StructField("crs", StringType, nullable = true) + )) + + private val columnsType = MapType(StringType, columnMetadataType, valueContainsNull = false) + + val schema: StructType = StructType(Seq( + StructField("path", StringType, nullable = false), + StructField("version", StringType, nullable = true), + StructField("primary_column", StringType, nullable = true), + StructField("columns", columnsType, nullable = true) + )) +} diff --git a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala new file mode 100644 index 0000000000..71e1595422 --- /dev/null +++ b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala @@ -0,0 +1,67 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sedona.sql + +import org.apache.spark.sql.Row +import org.scalatest.BeforeAndAfterAll + +import scala.collection.JavaConverters._ + +class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll { + val geoparquetdatalocation: String = resourceFolder + "geoparquet/" + + describe("GeoParquet Metadata tests") { + it("Reading GeoParquet Metadata") { + val df = sparkSession.read.format("geoparquet.metadata").load(geoparquetdatalocation) + val metadataArray = df.collect() + assert(metadataArray.length > 1) + assert(metadataArray.exists(_.getAs[String]("path").endsWith(".parquet"))) + assert(metadataArray.exists(_.getAs[String]("version") == "1.0.0-dev")) + assert(metadataArray.exists(_.getAs[String]("primary_column") == "geometry")) + assert(metadataArray.exists { row => + val columnsMap = row.getJavaMap(row.fieldIndex("columns")) + columnsMap != null && columnsMap.containsKey("geometry") && columnsMap.get("geometry").isInstanceOf[Row] + }) + assert(metadataArray.forall { row => + val columnsMap = row.getJavaMap(row.fieldIndex("columns")) + if (columnsMap == null || !columnsMap.containsKey("geometry")) true else { + val columnMetadata = columnsMap.get("geometry").asInstanceOf[Row] + columnMetadata.getAs[String]("encoding") == "WKB" && + columnMetadata.getList[Any](columnMetadata.fieldIndex("bbox")).asScala.forall(_.isInstanceOf[Double]) && + columnMetadata.getList[Any](columnMetadata.fieldIndex("geometry_types")).asScala.forall(_.isInstanceOf[String]) && + columnMetadata.getAs[String]("crs").nonEmpty + } + }) + } + + it("Reading GeoParquet Metadata with column pruning") { + val df = sparkSession.read.format("geoparquet.metadata").load(geoparquetdatalocation) + val metadataArray = df.selectExpr("path", "substring(primary_column, 1, 2) AS partial_primary_column").collect() + assert(metadataArray.length > 1) + assert(metadataArray.forall(_.length == 2)) + assert(metadataArray.exists(_.getAs[String]("path").endsWith(".parquet"))) + assert(metadataArray.exists(_.getAs[String]("partial_primary_column") == "ge")) + } + + it("Reading GeoParquet Metadata of plain parquet files") { + val df = sparkSession.read.format("geoparquet.metadata").load(geoparquetdatalocation) + val metadataArray = df.where("path LIKE '%plain.parquet'").collect() + assert(metadataArray.nonEmpty) + assert(metadataArray.forall(_.getAs[String]("path").endsWith("plain.parquet"))) + assert(metadataArray.forall(_.getAs[String]("version") == null)) + assert(metadataArray.forall(_.getAs[String]("primary_column") == null)) + assert(metadataArray.forall(_.getAs[String]("columns") == null)) + } + } +} diff --git a/spark/spark-3.5/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/spark/spark-3.5/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 0fdcab5a3a..e5f994e203 100644 --- a/spark/spark-3.5/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/spark/spark-3.5/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -1 +1,2 @@ org.apache.spark.sql.execution.datasources.parquet.GeoParquetFileFormat +org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata.GeoParquetMetadataDataSource diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataDataSource.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataDataSource.scala new file mode 100644 index 0000000000..6cf4817634 --- /dev/null +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataDataSource.scala @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata + +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * Data source for reading GeoParquet metadata. This could be accessed using the `spark.read` interface: + * {{{ + * val df = spark.read.format("geoparquet.metadata").load("path/to/geoparquet") + * }}} + */ +class GeoParquetMetadataDataSource extends FileDataSourceV2 with DataSourceRegister { + override val shortName: String = "geoparquet.metadata" + + override def fallbackFileFormat: Class[_ <: FileFormat] = null + + override def getTable(options: CaseInsensitiveStringMap): Table = { + val paths = getPaths(options) + val tableName = getTableName(options, paths) + val optionsWithoutPaths = getOptionsWithoutPaths(options) + GeoParquetMetadataTable(tableName, sparkSession, optionsWithoutPaths, paths, None, fallbackFileFormat) + } + + override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = { + val paths = getPaths(options) + val tableName = getTableName(options, paths) + val optionsWithoutPaths = getOptionsWithoutPaths(options) + GeoParquetMetadataTable(tableName, sparkSession, optionsWithoutPaths, paths, Some(schema), fallbackFileFormat) + } +} diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala new file mode 100644 index 0000000000..874d2e743d --- /dev/null +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala @@ -0,0 +1,95 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.parquet.hadoop.util.HadoopInputFile +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow} +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} +import org.apache.spark.sql.connector.read.PartitionReader +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData +import org.apache.spark.sql.execution.datasources.v2._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.SerializableConfiguration +import org.json4s.jackson.JsonMethods.{compact, render} + +case class GeoParquetMetadataPartitionReaderFactory( + sqlConf: SQLConf, + broadcastedConf: Broadcast[SerializableConfiguration], + dataSchema: StructType, + readDataSchema: StructType, + partitionSchema: StructType, + options: FileSourceOptions, + filters: Seq[Filter]) extends FilePartitionReaderFactory { + + override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = { + val iter = GeoParquetMetadataPartitionReaderFactory.readFile( + broadcastedConf.value.value, + partitionedFile, + readDataSchema) + val fileReader = new PartitionReaderFromIterator[InternalRow](iter) + new PartitionReaderWithPartitionValues(fileReader, readDataSchema, + partitionSchema, partitionedFile.partitionValues) + } +} + +object GeoParquetMetadataPartitionReaderFactory { + private def readFile(configuration: Configuration, + partitionedFile: PartitionedFile, + readDataSchema: StructType): Iterator[InternalRow] = { + val filePath = partitionedFile.toPath.toString + val metadata = ParquetFileReader.open( + HadoopInputFile.fromPath(partitionedFile.toPath, configuration)) + .getFooter.getFileMetaData.getKeyValueMetaData + val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match { + case Some(geo) => + val geoColumnsMap = geo.columns.map { case (columnName, columnMetadata) => + val columnMetadataFields: Array[Any] = Array( + UTF8String.fromString(columnMetadata.encoding), + new GenericArrayData(columnMetadata.geometryTypes.map(UTF8String.fromString).toArray), + new GenericArrayData(columnMetadata.bbox.toArray), + columnMetadata.crs.map(projjson => UTF8String.fromString(compact(render(projjson)))).orNull) + val columnMetadataStruct = new GenericInternalRow(columnMetadataFields) + UTF8String.fromString(columnName) -> columnMetadataStruct + } + val fields: Array[Any] = Array( + UTF8String.fromString(filePath), + UTF8String.fromString(geo.version.orNull), + UTF8String.fromString(geo.primaryColumn), + ArrayBasedMapData(geoColumnsMap)) + new GenericInternalRow(fields) + case None => + // Not a GeoParquet file, return a row with null metadata values. + val fields: Array[Any] = Array(UTF8String.fromString(filePath), null, null, null) + new GenericInternalRow(fields) + } + Iterator(pruneBySchema(row, GeoParquetMetadataTable.schema, readDataSchema)) + } + + private def pruneBySchema(row: InternalRow, schema: StructType, readDataSchema: StructType): InternalRow = { + // Projection push down for nested fields is not enabled, so this very simple implementation is enough. + val values: Array[Any] = readDataSchema.fields.map { field => + val index = schema.fieldIndex(field.name) + row.get(index, field.dataType) + } + new GenericInternalRow(values) + } +} diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataScan.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataScan.scala new file mode 100644 index 0000000000..2d01ea0709 --- /dev/null +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataScan.scala @@ -0,0 +1,58 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.FileSourceOptions +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.connector.read.PartitionReaderFactory +import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex +import org.apache.spark.sql.execution.datasources.v2.FileScan +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.SerializableConfiguration + +import scala.collection.JavaConverters._ + +case class GeoParquetMetadataScan( + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex, + dataSchema: StructType, + readDataSchema: StructType, + readPartitionSchema: StructType, + options: CaseInsensitiveStringMap, + pushedFilters: Array[Filter], + partitionFilters: Seq[Expression] = Seq.empty, + dataFilters: Seq[Expression] = Seq.empty) + extends FileScan { + override def createReaderFactory(): PartitionReaderFactory = { + val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap + // Hadoop Configurations are case sensitive. + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) + val broadcastedConf = sparkSession.sparkContext.broadcast( + new SerializableConfiguration(hadoopConf)) + // The partition values are already truncated in `FileScan.partitions`. + // We should use `readPartitionSchema` as the partition schema here. + val fileSourceOptions = new FileSourceOptions(caseSensitiveMap) + GeoParquetMetadataPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, + dataSchema, readDataSchema, readPartitionSchema, fileSourceOptions, pushedFilters) + } + + override def isSplitable(path: Path): Boolean = false + + override def getFileUnSplittableReason(path: Path): String = + "Reading parquet file metadata does not require splitting the file" +} diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataScanBuilder.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataScanBuilder.scala new file mode 100644 index 0000000000..560cf1b613 --- /dev/null +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataScanBuilder.scala @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.read.Scan +import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex +import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class GeoParquetMetadataScanBuilder( + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex, + schema: StructType, + dataSchema: StructType, + options: CaseInsensitiveStringMap) + extends FileScanBuilder(sparkSession, fileIndex, dataSchema) { + override def build(): Scan = { + GeoParquetMetadataScan( + sparkSession, + fileIndex, + dataSchema, + readDataSchema(), + readPartitionSchema(), + options, + pushedDataFilters, + partitionFilters, + dataFilters) + } +} diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala new file mode 100644 index 0000000000..13599ffd78 --- /dev/null +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala @@ -0,0 +1,63 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata + +import org.apache.hadoop.fs.FileStatus +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.catalog.TableCapability +import org.apache.spark.sql.connector.read.ScanBuilder +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.v2.FileTable +import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +case class GeoParquetMetadataTable( + name: String, + sparkSession: SparkSession, + options: CaseInsensitiveStringMap, + paths: Seq[String], + userSpecifiedSchema: Option[StructType], + fallbackFileFormat: Class[_ <: FileFormat]) + extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + override def formatName: String = "GeoParquet Metadata" + + override def inferSchema(files: Seq[FileStatus]): Option[StructType] = + Some(GeoParquetMetadataTable.schema) + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = + new GeoParquetMetadataScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) + + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = null + + override def capabilities: java.util.Set[TableCapability] = java.util.EnumSet.of(TableCapability.BATCH_READ) +} + +object GeoParquetMetadataTable { + private val columnMetadataType = StructType(Seq( + StructField("encoding", StringType, nullable = true), + StructField("geometry_types", ArrayType(StringType), nullable = true), + StructField("bbox", ArrayType(DoubleType), nullable = true), + StructField("crs", StringType, nullable = true) + )) + + private val columnsType = MapType(StringType, columnMetadataType, valueContainsNull = false) + + val schema: StructType = StructType(Seq( + StructField("path", StringType, nullable = false), + StructField("version", StringType, nullable = true), + StructField("primary_column", StringType, nullable = true), + StructField("columns", columnsType, nullable = true) + )) +} diff --git a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala new file mode 100644 index 0000000000..71e1595422 --- /dev/null +++ b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala @@ -0,0 +1,67 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sedona.sql + +import org.apache.spark.sql.Row +import org.scalatest.BeforeAndAfterAll + +import scala.collection.JavaConverters._ + +class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll { + val geoparquetdatalocation: String = resourceFolder + "geoparquet/" + + describe("GeoParquet Metadata tests") { + it("Reading GeoParquet Metadata") { + val df = sparkSession.read.format("geoparquet.metadata").load(geoparquetdatalocation) + val metadataArray = df.collect() + assert(metadataArray.length > 1) + assert(metadataArray.exists(_.getAs[String]("path").endsWith(".parquet"))) + assert(metadataArray.exists(_.getAs[String]("version") == "1.0.0-dev")) + assert(metadataArray.exists(_.getAs[String]("primary_column") == "geometry")) + assert(metadataArray.exists { row => + val columnsMap = row.getJavaMap(row.fieldIndex("columns")) + columnsMap != null && columnsMap.containsKey("geometry") && columnsMap.get("geometry").isInstanceOf[Row] + }) + assert(metadataArray.forall { row => + val columnsMap = row.getJavaMap(row.fieldIndex("columns")) + if (columnsMap == null || !columnsMap.containsKey("geometry")) true else { + val columnMetadata = columnsMap.get("geometry").asInstanceOf[Row] + columnMetadata.getAs[String]("encoding") == "WKB" && + columnMetadata.getList[Any](columnMetadata.fieldIndex("bbox")).asScala.forall(_.isInstanceOf[Double]) && + columnMetadata.getList[Any](columnMetadata.fieldIndex("geometry_types")).asScala.forall(_.isInstanceOf[String]) && + columnMetadata.getAs[String]("crs").nonEmpty + } + }) + } + + it("Reading GeoParquet Metadata with column pruning") { + val df = sparkSession.read.format("geoparquet.metadata").load(geoparquetdatalocation) + val metadataArray = df.selectExpr("path", "substring(primary_column, 1, 2) AS partial_primary_column").collect() + assert(metadataArray.length > 1) + assert(metadataArray.forall(_.length == 2)) + assert(metadataArray.exists(_.getAs[String]("path").endsWith(".parquet"))) + assert(metadataArray.exists(_.getAs[String]("partial_primary_column") == "ge")) + } + + it("Reading GeoParquet Metadata of plain parquet files") { + val df = sparkSession.read.format("geoparquet.metadata").load(geoparquetdatalocation) + val metadataArray = df.where("path LIKE '%plain.parquet'").collect() + assert(metadataArray.nonEmpty) + assert(metadataArray.forall(_.getAs[String]("path").endsWith("plain.parquet"))) + assert(metadataArray.forall(_.getAs[String]("version") == null)) + assert(metadataArray.forall(_.getAs[String]("primary_column") == null)) + assert(metadataArray.forall(_.getAs[String]("columns") == null)) + } + } +}