Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SEDONA-226] Support reading and writing GeoParquet file metadata #740

Merged
merged 1 commit into from
Dec 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Binary file not shown.
20 changes: 1 addition & 19 deletions docs/tutorial/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ Shapefile and GeoJSON must be loaded by SpatialRDD and converted to DataFrame us

## Load GeoParquet

Since v`1.3.0`, Sedona natively supports loading GeoParquet file. GeoParquet must be loaded using DataFrame if default name is geometry.
Since v`1.3.0`, Sedona natively supports loading GeoParquet file. Sedona will infer geometry fields using the "geo" metadata in GeoParquet files.

```Scala
val df = sparkSession.read.format("geoparquet").load(geoparquetdatalocation1)
Expand All @@ -155,24 +155,6 @@ root
|-- gdp_md_est: double (nullable = true)
|-- geometry: geometry (nullable = true)
```
If geometry column name is different

```Scala
var df = sparkSession.read.format("geoparquet").option("fieldGeometry", "new_geometry").load(geoparquetdatalocation1)
```

The output will be as follows:

```
root
|-- pop_est: long (nullable = true)
|-- continent: string (nullable = true)
|-- name: string (nullable = true)
|-- iso_a3: string (nullable = true)
|-- gdp_md_est: double (nullable = true)
|-- new_geometry: geometry (nullable = true)
```


## Transform the Coordinate Reference System

Expand Down
3 changes: 2 additions & 1 deletion python/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ pytest-cov = "*"
[packages]
shapely="==1.8.5"
pandas="*"
geopandas="==0.6.0"
geopandas="==0.10.2"
pyspark=">=2.3.0"
attrs="*"
pyarrow="*"

[requires]
python_version = "3.7"
55 changes: 55 additions & 0 deletions python/tests/sql/test_geoparquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import os.path

from shapely.geometry import Point
from shapely.geometry import LineString
from shapely.geometry.base import BaseGeometry
from shapely.wkt import loads as wkt_loads
import geopandas

from tests.test_base import TestBase


class TestGeoParquet(TestBase):
def test_interoperability_with_geopandas(self, tmp_path):
test_data = [
[1, Point(0, 0), LineString([(1, 2), (3, 4), (5, 6)])],
[2, LineString([(1, 2), (3, 4), (5, 6)]), Point(1, 1)],
[3, Point(1, 1), LineString([(1, 2), (3, 4), (5, 6)])]
]
df = self.spark.createDataFrame(data=test_data, schema=["id", "g0", "g1"]).repartition(1)
geoparquet_save_path = os.path.join(tmp_path, "test.parquet")
df.write.format("geoparquet").save(geoparquet_save_path)

# Load geoparquet file written by sedona using geopandas
gdf = geopandas.read_parquet(geoparquet_save_path)
assert gdf.dtypes['g0'].name == 'geometry'
assert gdf.dtypes['g1'].name == 'geometry'

# Load geoparquet file written by geopandas using sedona
gdf = geopandas.GeoDataFrame([
{'g': wkt_loads('POINT (1 2)'), 'i': 10},
{'g': wkt_loads('LINESTRING (1 2, 3 4)'), 'i': 20}
], geometry='g')
geoparquet_save_path2 = os.path.join(tmp_path, "test_2.parquet")
gdf.to_parquet(geoparquet_save_path2)
df2 = self.spark.read.format("geoparquet").load(geoparquet_save_path2)
assert df2.count() == 2
row = df2.collect()[0]
assert isinstance(row['g'], BaseGeometry)
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
package org.apache.spark.sql.execution.datasources.parquet

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.parquet.filter2.compat.FilterCompat
Expand All @@ -31,6 +32,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.readParquetFootersInParallel
Expand All @@ -40,19 +42,22 @@ import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration

import java.net.URI
import scala.collection.JavaConverters._
import scala.util.Failure
import scala.util.Try

class GeoParquetFileFormat extends ParquetFileFormat with FileFormat with DataSourceRegister with Logging with Serializable {

override def shortName(): String = "geoparquet"

override def equals(other: Any): Boolean = other.isInstanceOf[GeoParquetFileFormat]

override def hashCode(): Int = getClass.hashCode()

override def inferSchema(
sparkSession: SparkSession,
parameters: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
val fieldGeometry = new GeoParquetOptions(parameters).fieldGeometry
GeometryField.setFieldGeometry(fieldGeometry)
GeoParquetUtils.inferSchema(sparkSession, parameters, files)
}

Expand Down Expand Up @@ -302,6 +307,8 @@ class GeoParquetFileFormat extends ParquetFileFormat with FileFormat with DataSo
}

override def supportDataType(dataType: DataType): Boolean = super.supportDataType(dataType)

override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = false
}

object GeoParquetFileFormat extends Logging {
Expand All @@ -328,14 +335,46 @@ object GeoParquetFileFormat extends Logging {
val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp

val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => {
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
val converter = new GeoParquetToSparkSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp)
readParquetFootersInParallel(conf, files, ignoreCorruptFiles)
.map(ParquetFileFormat.readSchemaFromFooter(_, converter))
.map { footer =>
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
val keyValueMetaData = footer.getParquetMetadata.getFileMetaData.getKeyValueMetaData
val converter = new GeoParquetToSparkSchemaConverter(
keyValueMetaData = keyValueMetaData,
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp)
readSchemaFromFooter(footer, converter)
}
}

GeoSchemaMergeUtils.mergeSchemasInParallel(sparkSession, parameters, filesToTouch, reader)
}

private def readSchemaFromFooter(footer: Footer, converter: GeoParquetToSparkSchemaConverter): StructType = {
val fileMetaData = footer.getParquetMetadata.getFileMetaData
fileMetaData
.getKeyValueMetaData
.asScala.toMap
.get(ParquetReadSupport.SPARK_METADATA_KEY)
.flatMap(deserializeSchemaString)
.getOrElse(converter.convert(fileMetaData.getSchema))
}

private def deserializeSchemaString(schemaString: String): Option[StructType] = {
// Tries to deserialize the schema string as JSON first, then falls back to the case class
// string parser (data generated by older versions of Spark SQL uses this format).
Try(DataType.fromJson(schemaString).asInstanceOf[StructType]).recover {
case _: Throwable =>
logInfo(
"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
"falling back to the deprecated DataType.fromCaseClassString parser.")
LegacyTypeStringParser.parseString(schemaString).asInstanceOf[StructType]
}.recoverWith {
case cause: Throwable =>
logWarning(
"Failed to parse and ignored serialized Spark schema in " +
s"Parquet key-value metadata:\n\t$schemaString", cause)
Failure(cause)
}.toOption
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.parquet

/**
* A case class that holds CRS metadata for geometry columns. This class is left empty since CRS
* metadata was not implemented yet.
*/
case class CRSMetaData()

/**
* A case class that holds the metadata of geometry column in GeoParquet metadata
* @param encoding Name of the geometry encoding format. Currently only "WKB" is supported
* @param geometryTypes The geometry types of all geometries, or an empty array if they are not known.
* @param bbox Bounding Box of the geometries in the file, formatted according to RFC 7946, section 5.
*/
case class GeometryFieldMetaData(
encoding: String,
geometryTypes: Seq[String],
bbox: Seq[Double],
crs: Option[CRSMetaData] = None)

/**
* A case class that holds the metadata of GeoParquet file
* @param version The version identifier for the GeoParquet specification.
* @param primaryColumn The name of the "primary" geometry column.
* @param columns Metadata about geometry columns.
*/
case class GeoParquetMetaData(
version: Option[String], // defined as optional for compatibility with old GeoParquet specs
primaryColumn: String,
columns: Map[String, GeometryFieldMetaData])

object GeoParquetMetaData {
// We're conforming to version 1.0.0-beta.1 of the GeoParquet specification, please refer to
// https://github.com/opengeospatial/geoparquet/blob/v1.0.0-beta.1/format-specs/geoparquet.md
// for more details.
val VERSION = "1.0.0-beta.1"
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class GeoParquetReadSupport (
new GeoParquetRecordMaterializer(
parquetRequestedSchema,
GeoParquetReadSupport.expandUDT(catalystRequestedSchema),
new GeoParquetToSparkSchemaConverter(conf),
new GeoParquetToSparkSchemaConverter(keyValueMetaData, conf),
convertTz,
datetimeRebaseMode,
int96RebaseMode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
import org.apache.spark.sql.types._
import org.json4s.jackson.JsonMethods.parse

/**
* This converter class is used to convert Parquet [[MessageType]] to Spark SQL [[StructType]].
Expand All @@ -43,23 +44,33 @@ import org.apache.spark.sql.types._
* [[TimestampType]] fields.
*/
class GeoParquetToSparkSchemaConverter(
assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get)
extends ParquetToSparkSchemaConverter(assumeBinaryIsString, assumeInt96IsTimestamp) {
keyValueMetaData: java.util.Map[String, String],
assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get) {

private val geoParquetMetaData: GeoParquetMetaData = {
val geo = keyValueMetaData.get("geo")
if (geo == null) {
throw new AnalysisException("GeoParquet file does not contain valid geo metadata")
}
implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
parse(geo).camelizeKeys.extract[GeoParquetMetaData]
}

def this(conf: SQLConf) = this(
def this(keyValueMetaData: java.util.Map[String, String], conf: SQLConf) = this(
keyValueMetaData = keyValueMetaData,
assumeBinaryIsString = conf.isParquetBinaryAsString,
assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp)

def this(conf: Configuration) = this(
def this(keyValueMetaData: java.util.Map[String, String], conf: Configuration) = this(
keyValueMetaData = keyValueMetaData,
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean)


/**
* Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].
*/
override def convert(parquetSchema: MessageType): StructType = convert(parquetSchema.asGroupType())
def convert(parquetSchema: MessageType): StructType = convert(parquetSchema.asGroupType())

private def convert(parquetSchema: GroupType): StructType = {
val fields = parquetSchema.getFields.asScala.map { field =>
Expand Down Expand Up @@ -90,6 +101,9 @@ extends ParquetToSparkSchemaConverter(assumeBinaryIsString, assumeInt96IsTimesta
case t: GroupType => convertGroupField(t.asGroupType())
}

private def isGeometryField(fieldName: String): Boolean =
geoParquetMetaData.columns.contains(fieldName)

private def convertPrimitiveField(field: PrimitiveType): DataType = {
val typeName = field.getPrimitiveTypeName
val originalType = field.getOriginalType
Expand Down Expand Up @@ -161,7 +175,7 @@ extends ParquetToSparkSchemaConverter(assumeBinaryIsString, assumeInt96IsTimesta
case BINARY =>
originalType match {
case UTF8 | ENUM | JSON => StringType
case null if GeoParquetSchemaConverter.checkGeomFieldName(field.getName) => GeometryUDT
case null if isGeometryField(field.getName) => GeometryUDT
case null if assumeBinaryIsString => StringType
case null => BinaryType
case BSON => BinaryType
Expand Down Expand Up @@ -558,12 +572,6 @@ extends SparkToParquetSchemaConverter(writeLegacyParquetFormat, outputTimestampT
}

private[sql] object GeoParquetSchemaConverter {
def checkGeomFieldName(name: String): Boolean = {
if (name.equals(GeometryField.getFieldGeometry())) {
true
} else false
}

def checkFieldName(name: String): Unit = {
// ,;{}()\n\t= and space are special characters in Parquet schema
checkConversionRequirement(
Expand Down
Loading