Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SEDONA-455] geoparquet.metadata data source for inspecting GeoParquet metadata #1180

Merged
merged 8 commits into from
Jan 5, 2024
49 changes: 49 additions & 0 deletions docs/tutorial/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,55 @@ root

Sedona supports spatial predicate push-down for GeoParquet files, please refer to the [SedonaSQL query optimizer](../api/sql/Optimizer.md) documentation for details.

### Inspect GeoParquet metadata

Since v`1.5.1`, Sedona provides a Spark SQL data source `"geoparquet.metadata"` for inspecting GeoParquet metadata. The resulting dataframe contains
the "geo" metadata for each input file.

=== "Scala/Java"

```scala
val df = sedona.read.format("geoparquet.metadata").load(geoparquetdatalocation1)
df.printSchema()
```

=== "Java"

```java
Dataset<Row> df = sedona.read.format("geoparquet.metadata").load(geoparquetdatalocation1)
df.printSchema()
```

=== "Python"

```python
df = sedona.read.format("geoparquet.metadata").load(geoparquetdatalocation1)
df.printSchema()
```

The output will be as follows:

```
root
|-- path: string (nullable = true)
|-- version: string (nullable = true)
|-- primary_column: string (nullable = true)
|-- columns: map (nullable = true)
| |-- key: string
| |-- value: struct (valueContainsNull = true)
| | |-- encoding: string (nullable = true)
| | |-- geometry_types: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- bbox: array (nullable = true)
| | | |-- element: double (containsNull = true)
| | |-- crs: string (nullable = true)
```

If the input Parquet file does not have GeoParquet metadata, the values of `version`, `primary_column` and `columns` fields of the resulting dataframe will be `null`.

!!! note
`geoparquet.metadata` only supports reading GeoParquet specific metadata. Users can use [G-Research/spark-extension](https://github.com/G-Research/spark-extension/blob/13109b8e43dfba9272c85896ba5e30cfe280426f/PARQUET.md) to read comprehensive metadata of generic Parquet files.

## Load data from JDBC data sources

The 'query' option in Spark SQL's JDBC data source can be used to convert geometry columns to a format that Sedona can interpret.
Expand Down
14 changes: 14 additions & 0 deletions python/tests/sql/test_geoparquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import pytest
import os.path
import json

from shapely.geometry import Point
from shapely.geometry import LineString
Expand Down Expand Up @@ -68,3 +69,16 @@ def test_load_plain_parquet_file(self):
with pytest.raises(Exception) as excinfo:
self.spark.read.format("geoparquet").load(plain_parquet_input_location)
assert "does not contain valid geo metadata" in str(excinfo.value)

def test_inspect_geoparquet_metadata(self):
df = self.spark.read.format("geoparquet.metadata").load(geoparquet_input_location)
rows = df.collect()
assert len(rows) == 1
row = rows[0]
assert row['path'].endswith('.parquet')
assert len(row['version'].split('.')) == 3
assert row['primary_column'] == 'geometry'
column_metadata = row['columns']['geometry']
assert column_metadata['encoding'] == 'WKB'
assert len(column_metadata['bbox']) == 4
assert isinstance(json.loads(column_metadata['crs']), dict)
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
org.apache.spark.sql.execution.datasources.parquet.GeoParquetFileFormat
org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata.GeoParquetMetadataDataSource
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata

import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* Data source for reading GeoParquet metadata. This could be accessed using the `spark.read` interface:
* {{{
* val df = spark.read.format("geoparquet.metadata").load("path/to/geoparquet")
* }}}
*/
class GeoParquetMetadataDataSource extends FileDataSourceV2 with DataSourceRegister {
override val shortName: String = "geoparquet.metadata"

override def fallbackFileFormat: Class[_ <: FileFormat] = null

override def getTable(options: CaseInsensitiveStringMap): Table = {
val paths = getPaths(options)
val tableName = getTableName(options, paths)
val optionsWithoutPaths = getOptionsWithoutPaths(options)
GeoParquetMetadataTable(tableName, sparkSession, optionsWithoutPaths, paths, None, fallbackFileFormat)
}

override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
val paths = getPaths(options)
val tableName = getTableName(options, paths)
val optionsWithoutPaths = getOptionsWithoutPaths(options)
GeoParquetMetadataTable(tableName, sparkSession, optionsWithoutPaths, paths, Some(schema), fallbackFileFormat)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData
import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.SerializableConfiguration
import org.json4s.jackson.JsonMethods.{compact, render}

case class GeoParquetMetadataPartitionReaderFactory(
sqlConf: SQLConf,
broadcastedConf: Broadcast[SerializableConfiguration],
dataSchema: StructType,
readDataSchema: StructType,
partitionSchema: StructType,
filters: Seq[Filter]) extends FilePartitionReaderFactory {

override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = {
val iter = GeoParquetMetadataPartitionReaderFactory.readFile(
broadcastedConf.value.value,
partitionedFile,
readDataSchema)
val fileReader = new PartitionReaderFromIterator[InternalRow](iter)
new PartitionReaderWithPartitionValues(fileReader, readDataSchema,
partitionSchema, partitionedFile.partitionValues)
}
}

object GeoParquetMetadataPartitionReaderFactory {
private def readFile(configuration: Configuration,
partitionedFile: PartitionedFile,
readDataSchema: StructType): Iterator[InternalRow] = {
val filePath = partitionedFile.filePath
val metadata = ParquetFileReader.open(
HadoopInputFile.fromPath(new Path(filePath), configuration))
.getFooter.getFileMetaData.getKeyValueMetaData
val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match {
case Some(geo) =>
val geoColumnsMap = geo.columns.map { case (columnName, columnMetadata) =>
val columnMetadataFields: Array[Any] = Array(
UTF8String.fromString(columnMetadata.encoding),
new GenericArrayData(columnMetadata.geometryTypes.map(UTF8String.fromString).toArray),
new GenericArrayData(columnMetadata.bbox.toArray),
columnMetadata.crs.map(projjson => UTF8String.fromString(compact(render(projjson)))).orNull)
val columnMetadataStruct = new GenericInternalRow(columnMetadataFields)
UTF8String.fromString(columnName) -> columnMetadataStruct
}
val fields: Array[Any] = Array(
UTF8String.fromString(filePath),
UTF8String.fromString(geo.version.orNull),
UTF8String.fromString(geo.primaryColumn),
ArrayBasedMapData(geoColumnsMap))
new GenericInternalRow(fields)
case None =>
// Not a GeoParquet file, return a row with null metadata values.
val fields: Array[Any] = Array(UTF8String.fromString(filePath), null, null, null)
new GenericInternalRow(fields)
}
Iterator(pruneBySchema(row, GeoParquetMetadataTable.schema, readDataSchema))
}

private def pruneBySchema(row: InternalRow, schema: StructType, readDataSchema: StructType): InternalRow = {
// Projection push down for nested fields is not enabled, so this very simple implementation is enough.
val values: Array[Any] = readDataSchema.fields.map { field =>
val index = schema.fieldIndex(field.name)
row.get(index, field.dataType)
}
new GenericInternalRow(values)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.v2.FileScan
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration

import scala.collection.JavaConverters._

case class GeoParquetMetadataScan(
sparkSession: SparkSession,
fileIndex: PartitioningAwareFileIndex,
dataSchema: StructType,
readDataSchema: StructType,
readPartitionSchema: StructType,
options: CaseInsensitiveStringMap,
pushedFilters: Array[Filter],
partitionFilters: Seq[Expression] = Seq.empty,
dataFilters: Seq[Expression] = Seq.empty)
extends FileScan {
override def createReaderFactory(): PartitionReaderFactory = {
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
// Hadoop Configurations are case sensitive.
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
val broadcastedConf = sparkSession.sparkContext.broadcast(
new SerializableConfiguration(hadoopConf))
// The partition values are already truncated in `FileScan.partitions`.
// We should use `readPartitionSchema` as the partition schema here.
GeoParquetMetadataPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
dataSchema, readDataSchema, readPartitionSchema, pushedFilters)
}

override def getFileUnSplittableReason(path: Path): String =
"Reading parquet file metadata does not require splitting the file"

// This is for compatibility with Spark 3.0. Spark 3.3 does not have this method
def withFilters(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): FileScan = {
copy(partitionFilters = partitionFilters, dataFilters = dataFilters)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

class GeoParquetMetadataScanBuilder(
sparkSession: SparkSession,
fileIndex: PartitioningAwareFileIndex,
schema: StructType,
dataSchema: StructType,
options: CaseInsensitiveStringMap)
extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
override def build(): Scan = {
GeoParquetMetadataScan(
sparkSession,
fileIndex,
dataSchema,
readDataSchema(),
readPartitionSchema(),
options,
getPushedDataFilters,
getPartitionFilters,
getDataFilters)
}

// The following methods uses reflection to address compatibility issues for Spark 3.0 ~ 3.2

private def getPushedDataFilters: Array[Filter] = {
try {
val field = classOf[FileScanBuilder].getDeclaredField("pushedDataFilters")
field.setAccessible(true)
field.get(this).asInstanceOf[Array[Filter]]
} catch {
case _: NoSuchFieldException =>
Array.empty
}
}

private def getPartitionFilters: Seq[Expression] = {
try {
val field = classOf[FileScanBuilder].getDeclaredField("partitionFilters")
field.setAccessible(true)
field.get(this).asInstanceOf[Seq[Expression]]
} catch {
case _: NoSuchFieldException =>
Seq.empty
}
}

private def getDataFilters: Seq[Expression] = {
try {
val field = classOf[FileScanBuilder].getDeclaredField("dataFilters")
field.setAccessible(true)
field.get(this).asInstanceOf[Seq[Expression]]
} catch {
case _: NoSuchFieldException =>
Seq.empty
}
}
}
Loading
Loading