Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SEDONA-166] Support type-safe dataframe style API #693

Merged
merged 20 commits into from
Oct 2, 2022

Conversation

douglasdennis
Copy link
Contributor

@douglasdennis douglasdennis commented Sep 21, 2022

Did you read the Contributor Guide?

Is this PR related to a JIRA ticket?

What changes were proposed in this PR?

Introduce a type-safe DataFrame style API similar to standard Spark functions. The API can operate on Spark Column types directly or function specific native Scala/Java types. For example, the function ST_CollectionExtract can be called in the following ways:

val df = sparkSession.sql("SELECT ST_GeomFromWKT('GEOMETRYCOLLECTION(POINT(0 0), LINESTRING(0 0, 1 0))') AS geom")

// with Column objects and default geomType argument
df.select(ST_CollectionExtract($"geom")

// with a String to specify a column name
df.select(ST_CollectionExtract("geom")

// using Column objects and specifying the geomType argument
df.select(ST_CollectionExtract($"geom", lit(1))

// using a String for the column name and an Int for geomType
df.select(ST_CollectionExtract("geom", 1)

The general rule for the API is that methods have two overloaded forms:

  1. All Column arguments which is the most versatile.
  2. Strings for arguments that are commonly column names, and native types for arguments that are commonly constants. For example, the ST_CollectionExtract geomType argument (when given) is generally constant across the dataframe.

This API is made available to Scala, Java, and Python.

How was this patch tested?

Scala/Java is tested through unit tests in dataFrameAPITestScala.scala and Python is tested through unit tests in test_dataframe_api.py.

Did this PR include necessary documentation updates?

import org.apache.spark.sql.Column
import org.apache.spark.sql.sedona_sql.expressions_udaf.{ST_Envelope_Aggr => ST_Envelope_Aggr_udaf, ST_Intersection_Aggr => ST_Intersection_Aggr_udaf, ST_Union_Aggr => ST_Union_Aggr_udaf}

object st_aggregates extends DataFrameAPI {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you asking if the st_aggregates object is needed? If so then I think it does. This allows for something like this:

df.groupBy().agg(ST_Union_Aggr("geometry").as("geometry"))

If that's not what you meant then apologies.

I didn't realize that UserDefinedAggregateFunction was depricated though. I was actually using this tutorial for reference to understand how this works on the Scala side: https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html

It looks like from what else I've read that I'm supposed to pass an Aggregator to the udaf function, so I'll plan to do that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UserDefinedAggregateFunction is deprecated since Spark 3.0 and Spark 3.0 suggests the type-safe aggregator: https://spark.apache.org/docs/latest/sql-ref-functions-udf-aggregate.html#examples

So Sedona for Spark 3.0 uses the type-safe aggregator. Sedona for Spark 2.0 still uses UserDefinedAggregateFunction, which will be completely dropped in Sedona 1.3.0 (next release).

The URL you pasted actually uses the deprecated UserDefinedAggregateFunction as the example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm shaking my fist at databricks right now...

This makes sense. I'll switch to using the type safe Aggregator classes and using the spark udaf function to allow it to take columns.

@jiayuasu jiayuasu changed the title Dataframe api [SEDONA-166] Support type-safe dataframe style API Sep 22, 2022
@jiayuasu jiayuasu added this to the sedona-1.3.0 milestone Sep 22, 2022
@douglasdennis
Copy link
Contributor Author

This is pretty embarrassing. This was meant to PR against my own fork's master to get the build action to run. While most of these changes are approaching done (on the Scala side anyways), it was still subject to change as I'm sorting out the python and Java interface. The cat is out of the bag though so please consider this a draft for the moment.

@jiayuasu
Copy link
Member

@douglasdennis Haha, no problem. This PR looks pretty nice. Keep up the good work!

@douglasdennis
Copy link
Contributor Author

@jiayuasu Thank you :) I should also mention this is some of my first Scala I've written. So, if there is any language faux pas or better ways to accomplish what I'm doing, then let me know and I'll fix it.

@jiayuasu
Copy link
Member

jiayuasu commented Sep 22, 2022

@douglasdennis Another interesting finding: since the type-safe dataframe APIs do not need to call "udf.register()" to register all functions, is it possible that, as a side effect, predicate pushdown is finally supported in Sedona?

If you don't know what a predicate pushdown is, see this: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-Optimizer-PushDownPredicate.html

Since Sedona 1.3.0 will natively read GeoParquet (PR already merged to the master), Sedona user RJ Marcus is working hard to get the predicate pushdown work on GeoParquet. See SEDONA-156.

Quoted my reply to RJ Marcus:

Pushed filter: UDF function can NOT be pushed down as a filter if they use udf.register(). Based on my understanding, Sedona ST functions cannot be pushed down because UDFs in pure Spark SQL are blackbox to Spark catalyst unless we do something with the current Sedona ST functions.

However, Sedona implements all functions in Spark SQL Catalyst "Expressions" [1] instead of the naive UDF. This gives you the possibility to push them down to the data source (see [2]). There is an ongoing effort to enable Sedona ST functions in type-safe format which bypasses the "udf.register" step (see [3])

So, with the current Sedona GeoParquet reader, and [3], it is possible that the Pushed filter will be finally supported. You might want to check it out and confirm my wild guess.

[1] https://github.com/apache/incubator-sedona/blob/master/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Constructors.scala#L45

[2] https://neapowers.com/apache-spark/native-functions-catalyst-expressions/

[3] #693

@Kimahriman
Copy link
Contributor

@douglasdennis Another interesting finding: since the type-safe dataframe APIs do not need to call "udf.register()" to register all functions, is it possible that, as a side effect, predicate pushdown is finally supported in Sedona?

I believe the current mechanism doesn't use UDFs technically, it registers SQL functions directly which should theoretically be usable by predicate pushdown (if you can figure out that mechanism in V1). That's why we can do things like join detection with pattern matching on the expressions

@douglasdennis
Copy link
Contributor Author

Another interesting finding: since the type-safe dataframe APIs do not need to call "udf.register()" to register all functions, is it possible that, as a side effect, predicate pushdown is finally supported in Sedona?

@jiayuasu It does not appear to be so. Here are some results I ran this morning using the example1.parquet in the library.

Checking to make sure predicate pushdown happens with native types:

val geoparquetdatalocation1: String = resourceFolder + "geoparquet/example1.parquet"
val basicPredicateDf = sparkSession.read.format("geoparquet").load(geoparquetdatalocation1).where(col("name").equalTo("Fiji"))
basicPredicateDf.explain()

The plan shows a push down:

== Physical Plan ==
*(1) Filter (isnotnull(name#5302) AND (name#5302 = Fiji))
+- FileScan geoparquet [pop_est#5300L,continent#5301,name#5302,iso_a3#5303,gdp_md_est#5304,geometry#5305] Batched: false, DataFilters: [isnotnull(name#5302), (name#5302 = Fiji)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:<removed>, PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,Fiji)], ReadSchema: struct<pop_est:bigint,continent:string,name:string,iso_a3:string,gdp_md_est:double,geometry:array...



A simple geometry based predicate:

val geoparquetdatalocation1: String = resourceFolder + "geoparquet/example1.parquet"
val basicGeomPredicateDf = sparkSession.read.format("geoparquet").load(geoparquetdatalocation1).where(ST_GeometryType("geometry").equalTo(lit("ST_Polygon")))
basicGeomPredicateDf.explain()

This plan does not show a push down:

== Physical Plan ==
Filter (st_geometrytype(geometry#5318) = ST_Polygon)
+- FileScan geoparquet [pop_est#5313L,continent#5314,name#5315,iso_a3#5316,gdp_md_est#5317,geometry#5318] Batched: false, DataFilters: [(st_geometrytype(geometry#5318) = ST_Polygon)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:<removed>, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<pop_est:bigint,continent:string,name:string,iso_a3:string,gdp_md_est:double,geometry:array...



And just to be thorough, a more complex geometry based predicate:

val geoparquetdatalocation1: String = resourceFolder + "geoparquet/example1.parquet"
val basicGeomPredicateDf = sparkSession.read.format("geoparquet").load(geoparquetdatalocation1).where(ST_Distance("geometry", ST_Point(2, 2)) <= (50.0))
basicGeomPredicateDf.explain()

As expected, no push down as well:

== Physical Plan ==
Filter ( **org.apache.spark.sql.sedona_sql.expressions.ST_Distance**   <= 50.0)
+- FileScan geoparquet [pop_est#5326L,continent#5327,name#5328,iso_a3#5329,gdp_md_est#5330,geometry#5331] Batched: false, DataFilters: [( **org.apache.spark.sql.sedona_sql.expressions.ST_Distance**   <= 50.0)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:<removed>, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<pop_est:bigint,continent:string,name:string,iso_a3:string,gdp_md_est:double,geometry:array...

@neontty
Copy link

neontty commented Sep 22, 2022

@douglasdennis this looks like an awesome PR, thank you for putting in this hard work.

I agree that this PR won't shortcut to geometry based predicate pushdowns; the issue comes with handling Expressions/Predicates correctly during the FileScan since a lot of parquet code deals with "Filter" instead of Expression or Predicate. Thanks Dennis for doing this most recent test to confirm.

I'm a little bit busy today but I'm taking a look at Jia's comments on SEDONA-156 and making a response to continue the predicate-pushdown conversation in that thread.

@douglasdennis
Copy link
Contributor Author

@jiayuasu This should be, at least structurally, complete. I still have docs and docstrings to complete. A couple questions:

  1. I was planning to add a new section to this page and this page demonstrating how to use this API. Does that seem like a good way to go about it? Or would you prefer something else?
  2. Should I add scaladoc to the Scala code?
  3. I found sphinx style docstrings in some python code. Is that the style that I should use in Python?

I believe that @Imbruced is the Python reviewer, so I'm pinging them for this as well.

Any comments, suggestions, or code changes needed please let me know.

elif isinstance(arg, str):
return f.col(arg)._jc
elif isinstance(arg, Iterable):
return f.array(*[Column(x) for x in map(_convert_argument_to_java_column, arg)])._jc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the advantage of using map and list comprehension in one place instead of applying two functions using list comprehension ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No advantage. Just me being silly when I refactored. I originally wanted this to use recursion in a different way, but I wasn't feeling it so I defaulted to this out of haste :) Will refactor to use function composition. Thanks for catching that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fixed.

]


_call_constructor_function = partial(call_sedona_function, "st_constructors")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of using partial :)

@Imbruced
Copy link
Member

Great improvement ! The main idea is to provide the type safe function and what I am missing the most is to validate input types and verify against None values. Also I would like to have test cases for that. WDYT ? Maybe in another PR because this is massive :) Thanks for your effort.


class TestDataFrameAPI(TestBase):

def test_call_function_using_columns(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think parametrised tests can help to simplify the tests ? I mean https://docs.pytest.org/en/6.2.x/parametrize.html ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then you can use argument list, function name to apply and also some fixtures to provide dataframe ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooohhhh! I have not used parameterized tests before. I'm excited to give them a shot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright. I gave this a shot. Let me know if it isn't what you were hoping for.

Also: I don't know what the proper etiquette is on github. Do I click the "Resolve Conversation" button when I think I did it or do the folks requesting code changes do that when they are satisfied?

@douglasdennis
Copy link
Contributor Author

douglasdennis commented Sep 24, 2022

Great improvement ! The main idea is to provide the type safe function and what I am missing the most is to validate input types and verify against None values. Also I would like to have test cases for that. WDYT ? Maybe in another PR because this is massive :) Thanks for your effort.

Doh! I had meant to have input validation and I completely forgot about it. I'd like to add those in this PR just for completion. The jvm call will throw if it can't find a method that accepts the given argument types but that would be cryptic to the user. For reference, I intend to use a decorator to manage the type checking.

Oh, and, the tests are a great idea, will add those as well.

@Imbruced
Copy link
Member

@douglasdennis Thanks ! Decorator sounds great ! It's good to have Python exception raised which can be easier handled and debugged later by users :)

@jiayuasu
Copy link
Member

@douglasdennis Please let me know once you finish this PR :-)

@jiayuasu
Copy link
Member

jiayuasu commented Oct 2, 2022

@douglasdennis Is this PR ready to merge? :-)

@douglasdennis
Copy link
Contributor Author

@douglasdennis Is this PR ready to merge? :-)

Just working on a couple of the notes now. Will be done by the end of this weekend.

@douglasdennis
Copy link
Contributor Author

Victory!

@jiayuasu As best I know this is ready to go, assuming I have addressed the notes from @Imbruced . I can also do a follow up PR after this merges if that would be better.

@jiayuasu
Copy link
Member

jiayuasu commented Oct 2, 2022

This looks good to me. I will merge it for now as a few other PRs are waiting for this. If @Imbruced has any comments, @douglasdennis can make a follow-up PR to address it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants