-
Notifications
You must be signed in to change notification settings - Fork 654
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SEDONA-166] Support type-safe dataframe style API #693
Conversation
import org.apache.spark.sql.Column | ||
import org.apache.spark.sql.sedona_sql.expressions_udaf.{ST_Envelope_Aggr => ST_Envelope_Aggr_udaf, ST_Intersection_Aggr => ST_Intersection_Aggr_udaf, ST_Union_Aggr => ST_Union_Aggr_udaf} | ||
|
||
object st_aggregates extends DataFrameAPI { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sedona aggregator is already using type-safe aggregator: https://github.com/apache/incubator-sedona/blob/master/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/AggregateFunctions.scala#L76
The old UDAF-based aggregator (https://github.com/apache/incubator-sedona/blob/master/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions_udaf/AggregateFunctions.scala#L27) is not used in Sedona for Spark 3.0 version: https://github.com/apache/incubator-sedona/blob/master/sql/src/main/scala/org/apache/sedona/sql/UDF/UdfRegistrator.scala#L44
Spark 2.X support will be dropped in Sedona 1.3.0 (next release). Therefore, I don't think we need this part. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you asking if the st_aggregates object is needed? If so then I think it does. This allows for something like this:
df.groupBy().agg(ST_Union_Aggr("geometry").as("geometry"))
If that's not what you meant then apologies.
I didn't realize that UserDefinedAggregateFunction was depricated though. I was actually using this tutorial for reference to understand how this works on the Scala side: https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html
It looks like from what else I've read that I'm supposed to pass an Aggregator to the udaf function, so I'll plan to do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UserDefinedAggregateFunction is deprecated since Spark 3.0 and Spark 3.0 suggests the type-safe aggregator: https://spark.apache.org/docs/latest/sql-ref-functions-udf-aggregate.html#examples
So Sedona for Spark 3.0 uses the type-safe aggregator. Sedona for Spark 2.0 still uses UserDefinedAggregateFunction, which will be completely dropped in Sedona 1.3.0 (next release).
The URL you pasted actually uses the deprecated UserDefinedAggregateFunction as the example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm shaking my fist at databricks right now...
This makes sense. I'll switch to using the type safe Aggregator classes and using the spark udaf function to allow it to take columns.
This is pretty embarrassing. This was meant to PR against my own fork's master to get the build action to run. While most of these changes are approaching done (on the Scala side anyways), it was still subject to change as I'm sorting out the python and Java interface. The cat is out of the bag though so please consider this a draft for the moment. |
@douglasdennis Haha, no problem. This PR looks pretty nice. Keep up the good work! |
@jiayuasu Thank you :) I should also mention this is some of my first Scala I've written. So, if there is any language faux pas or better ways to accomplish what I'm doing, then let me know and I'll fix it. |
@douglasdennis Another interesting finding: since the type-safe dataframe APIs do not need to call "udf.register()" to register all functions, is it possible that, as a side effect, predicate pushdown is finally supported in Sedona? If you don't know what a predicate pushdown is, see this: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-Optimizer-PushDownPredicate.html Since Sedona 1.3.0 will natively read GeoParquet (PR already merged to the master), Sedona user RJ Marcus is working hard to get the predicate pushdown work on GeoParquet. See SEDONA-156. Quoted my reply to RJ Marcus:
|
I believe the current mechanism doesn't use UDFs technically, it registers SQL functions directly which should theoretically be usable by predicate pushdown (if you can figure out that mechanism in V1). That's why we can do things like join detection with pattern matching on the expressions |
@jiayuasu It does not appear to be so. Here are some results I ran this morning using the example1.parquet in the library. Checking to make sure predicate pushdown happens with native types: val geoparquetdatalocation1: String = resourceFolder + "geoparquet/example1.parquet"
val basicPredicateDf = sparkSession.read.format("geoparquet").load(geoparquetdatalocation1).where(col("name").equalTo("Fiji"))
basicPredicateDf.explain() The plan shows a push down:
val geoparquetdatalocation1: String = resourceFolder + "geoparquet/example1.parquet"
val basicGeomPredicateDf = sparkSession.read.format("geoparquet").load(geoparquetdatalocation1).where(ST_GeometryType("geometry").equalTo(lit("ST_Polygon")))
basicGeomPredicateDf.explain() This plan does not show a push down:
val geoparquetdatalocation1: String = resourceFolder + "geoparquet/example1.parquet"
val basicGeomPredicateDf = sparkSession.read.format("geoparquet").load(geoparquetdatalocation1).where(ST_Distance("geometry", ST_Point(2, 2)) <= (50.0))
basicGeomPredicateDf.explain() As expected, no push down as well:
|
@douglasdennis this looks like an awesome PR, thank you for putting in this hard work. I agree that this PR won't shortcut to geometry based predicate pushdowns; the issue comes with handling Expressions/Predicates correctly during the FileScan since a lot of parquet code deals with "Filter" instead of Expression or Predicate. Thanks Dennis for doing this most recent test to confirm. I'm a little bit busy today but I'm taking a look at Jia's comments on SEDONA-156 and making a response to continue the predicate-pushdown conversation in that thread. |
@jiayuasu This should be, at least structurally, complete. I still have docs and docstrings to complete. A couple questions:
I believe that @Imbruced is the Python reviewer, so I'm pinging them for this as well. Any comments, suggestions, or code changes needed please let me know. |
python/sedona/sql/dataframe_api.py
Outdated
elif isinstance(arg, str): | ||
return f.col(arg)._jc | ||
elif isinstance(arg, Iterable): | ||
return f.array(*[Column(x) for x in map(_convert_argument_to_java_column, arg)])._jc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whats the advantage of using map and list comprehension in one place instead of applying two functions using list comprehension ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No advantage. Just me being silly when I refactored. I originally wanted this to use recursion in a different way, but I wasn't feeling it so I defaulted to this out of haste :) Will refactor to use function composition. Thanks for catching that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fixed.
] | ||
|
||
|
||
_call_constructor_function = partial(call_sedona_function, "st_constructors") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of using partial :)
Great improvement ! The main idea is to provide the type safe function and what I am missing the most is to validate input types and verify against None values. Also I would like to have test cases for that. WDYT ? Maybe in another PR because this is massive :) Thanks for your effort. |
|
||
class TestDataFrameAPI(TestBase): | ||
|
||
def test_call_function_using_columns(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think parametrised tests can help to simplify the tests ? I mean https://docs.pytest.org/en/6.2.x/parametrize.html ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then you can use argument list, function name to apply and also some fixtures to provide dataframe ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ooohhhh! I have not used parameterized tests before. I'm excited to give them a shot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright. I gave this a shot. Let me know if it isn't what you were hoping for.
Also: I don't know what the proper etiquette is on github. Do I click the "Resolve Conversation" button when I think I did it or do the folks requesting code changes do that when they are satisfied?
Doh! I had meant to have input validation and I completely forgot about it. I'd like to add those in this PR just for completion. The jvm call will throw if it can't find a method that accepts the given argument types but that would be cryptic to the user. For reference, I intend to use a decorator to manage the type checking. Oh, and, the tests are a great idea, will add those as well. |
@douglasdennis Thanks ! Decorator sounds great ! It's good to have Python exception raised which can be easier handled and debugged later by users :) |
@douglasdennis Please let me know once you finish this PR :-) |
@douglasdennis Is this PR ready to merge? :-) |
Just working on a couple of the notes now. Will be done by the end of this weekend. |
This looks good to me. I will merge it for now as a few other PRs are waiting for this. If @Imbruced has any comments, @douglasdennis can make a follow-up PR to address it. |
Did you read the Contributor Guide?
Is this PR related to a JIRA ticket?
[SEDONA-XXX] my subject
.What changes were proposed in this PR?
Introduce a type-safe DataFrame style API similar to standard Spark functions. The API can operate on Spark
Column
types directly or function specific native Scala/Java types. For example, the functionST_CollectionExtract
can be called in the following ways:The general rule for the API is that methods have two overloaded forms:
Column
arguments which is the most versatile.This API is made available to Scala, Java, and Python.
How was this patch tested?
Scala/Java is tested through unit tests in dataFrameAPITestScala.scala and Python is tested through unit tests in test_dataframe_api.py.
Did this PR include necessary documentation updates?
vX.Y.Z
format.