Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SEDONA-314] Support optimized join on ST_HausdorffDistance #878

Merged
merged 53 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from 52 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
85ae113
Remove prebuild_index config from search plugin
iGN5117 Jun 6, 2023
8e829fe
Update compile the code documentation for sedona
iGN5117 Jun 6, 2023
4bcf99c
approximate float comparisons in python test cases
iGN5117 Jun 6, 2023
c7f6236
Update compile the code documentation for sedona
iGN5117 Jun 6, 2023
ceed7a4
Remove prebuild_index config from search plugin
iGN5117 Jun 6, 2023
15c6255
Revert "Update compile the code documentation for sedona"
iGN5117 Jun 7, 2023
471e281
Revert "Remove prebuild_index config from search plugin"
iGN5117 Jun 7, 2023
2a5a6d8
FIxed incorrect indentation
iGN5117 Jun 7, 2023
6e9883d
Addressed PR comments for documentation changes
iGN5117 Jun 7, 2023
e66895e
Merge branch 'master' of https://github.com/iGN5117/sedona into docum…
iGN5117 Jun 7, 2023
cf7e78d
Add ST_NumPoints
iGN5117 Jun 8, 2023
fc23fc9
Update available version to 1.4.1 in documentation
iGN5117 Jun 8, 2023
fea1daa
Merge branch 'apache:master' into develop_Nilesh_1.4.0
iGN5117 Jun 8, 2023
816e4ed
Fix failing scala test case
iGN5117 Jun 8, 2023
472c383
Merge branch 'develop_Nilesh_1.4.0' of https://github.com/iGN5117/sed…
iGN5117 Jun 8, 2023
471a87e
Updated documentation to include negative flow.
iGN5117 Jun 8, 2023
b33499d
Removed Spark SQL from flink documentation
iGN5117 Jun 8, 2023
0d68337
Add ST_Force3D to sedona
iGN5117 Jun 9, 2023
d1e4365
Merge branch 'sedona-master' into develop_Nilesh_1.4.0
iGN5117 Jun 9, 2023
da0314a
Updated force3D logic to handle empty geometries
iGN5117 Jun 9, 2023
020e9b9
Updated force3D dataframe test
iGN5117 Jun 10, 2023
86de054
fix error in test case
iGN5117 Jun 10, 2023
307c060
Updated documentation for Force3D to include Z format WKT in the outp…
iGN5117 Jun 11, 2023
dbab04c
Added default zValue test case in sedona flink
iGN5117 Jun 11, 2023
c0eb8bc
Added default zValue dataframe test case
iGN5117 Jun 11, 2023
479b264
Added default zValue scala test case
iGN5117 Jun 11, 2023
641d2d9
fix dataframe testcase
iGN5117 Jun 11, 2023
c2e27d8
Addressed PR comments
iGN5117 Jun 12, 2023
4fa254d
Merge branch 'develop_Nilesh_1.4.0' into documentation_update_Nilesh
iGN5117 Jun 12, 2023
9815161
Update community/develop to include steps to run python test cases.
iGN5117 Jun 12, 2023
5bdf476
Merge branch 'apache:master' into documentation_update_Nilesh
iGN5117 Jun 12, 2023
b5cf30c
Add ST_NRings
iGN5117 Jun 13, 2023
b89cec1
Add ST_Translate
iGN5117 Jun 13, 2023
b49d789
Merge branch 'develop_Nilesh_1.4.1_Translate' of https://github.com/i…
iGN5117 Jun 13, 2023
ece4802
Updated GeomUtils to remove redundant geom return type
iGN5117 Jun 14, 2023
78be0f1
Simplified ST_Translate implementation, updated test cases and docs
iGN5117 Jun 14, 2023
cd12ce2
Updated tests for Translate
iGN5117 Jun 15, 2023
19016ae
temp affine commit
iGN5117 Jun 19, 2023
959b35c
core logic implementation of BoundingDiagonal (without fits parameter)
iGN5117 Jun 19, 2023
00d0e48
Revert "temp affine commit"
iGN5117 Jun 20, 2023
0e23430
Implement BoundingDiagonal
iGN5117 Jun 20, 2023
e04f668
Merge branch 'sedona-master' into develop_Nilesh_1.4.1_BoundingDiagonal
iGN5117 Jun 20, 2023
f18099e
Add flink test case
iGN5117 Jun 20, 2023
ea6ea55
Reduce loops in ST_BoundingDiagonal
iGN5117 Jun 20, 2023
73e28c0
Add ST_HausdorffDistance
iGN5117 Jun 22, 2023
170a5d3
Merge branch 'master' into develop_Nilesh_1.4.1_HausdorffDistance
iGN5117 Jun 23, 2023
da353af
Merge branch 'sedona-master' into develop_Nilesh_1.4.1_HausdorffDistance
iGN5117 Jun 27, 2023
a1fea92
backmerge with master
iGN5117 Jun 28, 2023
be96258
Support optimized join for ST_HausdorffDistance
iGN5117 Jun 28, 2023
0a4e1e7
Updated optimizer documentation for hausdorffDistance
iGN5117 Jun 28, 2023
859bf8f
Merge branch 'master' into develop_Nilesh_1.4.1_HausdorffDistance
iGN5117 Jun 29, 2023
b20716f
fix query mistake
iGN5117 Jun 29, 2023
7e777d8
Added distance candidates with results > 100
iGN5117 Jun 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions docs/api/sql/Optimizer.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ RangeJoin polygonshape#20: geometry, pointshape#43: geometry, false

## Distance join

Introduction: Find geometries from A and geometries from B such that the distance of each geometry pair is less or equal than a certain distance. It supports the planar Euclidean distance calculator `ST_Distance` and the meter-based geodesic distance calculators `ST_DistanceSpheroid` and `ST_DistanceSphere`.
Introduction: Find geometries from A and geometries from B such that the distance of each geometry pair is less or equal than a certain distance. It supports the planar Euclidean distance calculators `ST_Distance` and `ST_HausdorffDistance` and the meter-based geodesic distance calculators `ST_DistanceSpheroid` and `ST_DistanceSphere`.

Spark SQL Example for planar Euclidean distance:

Expand All @@ -57,13 +57,25 @@ FROM pointdf1, pointdf2
WHERE ST_Distance(pointdf1.pointshape1,pointdf2.pointshape2) < 2
```

```sql
SELECT *
FROM pointDf, polygonDF
WHERE ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape, 0.3) < 2
```

*Consider ==intersects within a certain distance==*
```sql
SELECT *
FROM pointdf1, pointdf2
WHERE ST_Distance(pointdf1.pointshape1,pointdf2.pointshape2) <= 2
```

```sql
SELECT *
FROM pointDf, polygonDF
WHERE ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape) <= 2
```

Spark SQL Physical plan:
```
== Physical Plan ==
Expand All @@ -75,7 +87,7 @@ DistanceJoin pointshape1#12: geometry, pointshape2#33: geometry, 2.0, true
```

!!!warning
If you use `ST_Distance` as the predicate, Sedona doesn't control the distance's unit (degree or meter). It is same with the geometry. If your coordinates are in the longitude and latitude system, the unit of `distance` should be degree instead of meter or mile. To change the geometry's unit, please either transform the coordinate reference system to a meter-based system. See [ST_Transform](Function.md#st_transform). If you don't want to transform your data, please consider using `ST_DistanceSpheroid` or `ST_DistanceSphere`.
If you use planar euclidean distance functions like `ST_Distance` or `ST_HausdorffDistance` as the predicate, Sedona doesn't control the distance's unit (degree or meter). It is same with the geometry. If your coordinates are in the longitude and latitude system, the unit of `distance` should be degree instead of meter or mile. To change the geometry's unit, please either transform the coordinate reference system to a meter-based system. See [ST_Transform](Function.md#st_transform). If you don't want to transform your data, please consider using `ST_DistanceSpheroid` or `ST_DistanceSphere`.

Spark SQL Example for meter-based geodesic distance `ST_DistanceSpheroid` (works for `ST_DistanceSphere` too):

Expand Down Expand Up @@ -126,7 +138,7 @@ BroadcastIndexJoin pointshape#52: geometry, BuildRight, BuildRight, false ST_Con
+- FileScan csv
```

This also works for distance joins with `ST_Distance`, `ST_DistanceSpheroid` or `ST_DistanceSphere`:
This also works for distance joins with `ST_Distance`, `ST_DistanceSpheroid`, `ST_DistanceSphere` or `ST_HausdorffDistance`:

```scala
pointDf1.alias("pointDf1").join(broadcast(pointDf2).alias("pointDf2"), expr("ST_Distance(pointDf1.pointshape, pointDf2.pointshape) <= 2"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,7 @@ case class ST_BoundingDiagonal(inputExpressions: Seq[Expression])
}

case class ST_HausdorffDistance(inputExpressions: Seq[Expression])
extends InferredExpression(inferrableFunction3(Functions.hausdorffDistance)) {
extends InferredExpression(inferrableFunction3(Functions.hausdorffDistance), inferrableFunction2(Functions.hausdorffDistance)) {
protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = {
copy(inputExpressions = newChildren)
}
Expand All @@ -1062,4 +1062,3 @@ case class ST_Degrees(inputExpressions: Seq[Expression])
copy(inputExpressions = newChildren)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,32 @@ class JoinQueryDetector(sparkSession: SparkSession) extends Strategy {
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, true, condition, Some(distance)))
case Some(And(_, LessThan(ST_DistanceSpheroid(Seq(leftShape, rightShape)), distance))) =>
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, true, condition, Some(distance)))
//ST_HausdorffDistanceDefault
case Some(LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape, rightShape)), distance)) =>
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
case Some(And(LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape, rightShape)), distance), _)) =>
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
case Some(And(_, LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape, rightShape)), distance))) =>
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
case Some(LessThan(ST_HausdorffDistance(Seq(leftShape, rightShape)), distance)) =>
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
case Some(And(LessThan(ST_HausdorffDistance(Seq(leftShape, rightShape)), distance), _)) =>
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
case Some(And(_, LessThan(ST_HausdorffDistance(Seq(leftShape, rightShape)), distance))) =>
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
//ST_HausdorffDistanceDensityFrac
case Some(LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape, rightShape, densityFrac)), distance)) =>
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
case Some(And(LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape, rightShape, densityFrac)), distance), _)) =>
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
case Some(And(_, LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape, rightShape, densityFrac)), distance))) =>
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
case Some(LessThan(ST_HausdorffDistance(Seq(leftShape, rightShape, densityFrac)), distance)) =>
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
case Some(And(LessThan(ST_HausdorffDistance(Seq(leftShape, rightShape, densityFrac)), distance), _)) =>
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
case Some(And(_, LessThan(ST_HausdorffDistance(Seq(leftShape, rightShape, densityFrac)), distance))) =>
Some(JoinQueryDetection(left, right, leftShape, rightShape, SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
case _ =>
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,43 @@ class BroadcastIndexJoinSuite extends TestBaseScala {
assert(distanceJoinDf.count() == expected)
})
}

it("Passed ST_HausdorffDistance with densityFrac <= distance in a broadcast join") {
val sampleCount = 100
val distance = 1.0
val densityFrac = 0.5
val polygonDf = buildPolygonDf.limit(sampleCount).repartition(3)
val pointDf = buildPointDf.limit(sampleCount).repartition(5)
val expected = bruteForceDistanceJoinHausdorff(sampleCount, distance, 0.5, true)

var distanceJoinDF = pointDf.alias("pointDf").join(
broadcast(polygonDf).alias("polygonDF"), expr(s"ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape, $densityFrac) <= $distance"))
assert(distanceJoinDF.queryExecution.sparkPlan.collect{case p: BroadcastIndexJoinExec => p}.size == 1)
assert(distanceJoinDF.count() == expected)

distanceJoinDF = broadcast(pointDf).alias("pointDf").join(polygonDf.alias("polygonDf"), expr(s"ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape, $densityFrac) <= $distance"))

assert(distanceJoinDF.queryExecution.sparkPlan.collect { case p: BroadcastIndexJoinExec => p }.size == 1)
assert(distanceJoinDF.count() == expected)
}

it("Passed ST_HausdorffDistance <= distance in a broadcast join") {
val sampleCount = 200
val distance = 2.0
val polygonDf = buildPolygonDf.limit(sampleCount).repartition(3)
val pointDf = buildPointDf.limit(sampleCount).repartition(5)
val expected = bruteForceDistanceJoinHausdorff(sampleCount, distance, 0, true)

var distanceJoinDF = pointDf.alias("pointDf").join(
broadcast(polygonDf).alias("polygonDF"), expr(s"ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape) <= $distance"))
assert(distanceJoinDF.queryExecution.sparkPlan.collect { case p: BroadcastIndexJoinExec => p }.size == 1)
assert(distanceJoinDF.count() == expected)

distanceJoinDF = broadcast(pointDf).alias("pointDf").join(polygonDf.alias("polygonDf"), expr(s"ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape) <= $distance"))

assert(distanceJoinDF.queryExecution.sparkPlan.collect { case p: BroadcastIndexJoinExec => p }.size == 1)
assert(distanceJoinDF.count() == expected)
}
}

describe("Sedona-SQL Broadcast Index Join Test for left semi joins") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.sedona.sql
import com.google.common.math.DoubleMath
import org.apache.log4j.{Level, Logger}
import org.apache.sedona.common.sphere.{Haversine, Spheroid}
import org.apache.sedona.common.Functions.hausdorffDistance
import org.apache.sedona.spark.SedonaContext
import org.apache.spark.sql.DataFrame
import org.locationtech.jts.geom.{CoordinateSequence, CoordinateSequenceComparator}
Expand Down Expand Up @@ -117,4 +118,25 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
}).sum
}

protected def bruteForceDistanceJoinHausdorff(sampleCount: Int, distance: Double, densityFrac: Double, intersects: Boolean): Int = {
val inputPolygon = buildPolygonDf.limit(sampleCount).collect()
val inputPoint = buildPointDf.limit(sampleCount).collect()
inputPoint.map(row => {
val point = row.getAs[org.locationtech.jts.geom.Point](0)
inputPolygon.map(row => {
val polygon = row.getAs[org.locationtech.jts.geom.Polygon](0)
if (densityFrac == 0) {
if (intersects)
if (hausdorffDistance(point, polygon) <= distance) 1 else 0
else
if (hausdorffDistance(point, polygon) < distance) 1 else 0
} else {
if (intersects)
if (hausdorffDistance(point, polygon, densityFrac) <= distance) 1 else 0
else
if (hausdorffDistance(point, polygon, densityFrac) < distance) 1 else 0
}
}).sum
}).sum
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,41 @@ class predicateJoinTestScala extends TestBaseScala {
assert(distanceJoinDf.queryExecution.sparkPlan.collect { case p: DistanceJoinExec => p }.size === 1)
assert(distanceJoinDf.count() == expected)
})
}

it("Passed ST_HausdorffDistance in a spatial join") {
val sampleCount = 100
val distanceCandidates = Seq(1, 2)
val densityFrac = 0.6
val inputPoint = buildPointDf.limit(sampleCount).repartition(5)
val inputPolygon = buildPolygonDf.limit(sampleCount).repartition(3)

distanceCandidates.foreach(distance => {

//DensityFrac specified, <= distance
val expectedDensityIntersects = bruteForceDistanceJoinHausdorff(sampleCount, distance, densityFrac, true)
val distanceDensityIntersectsDF = inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"), expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape, $densityFrac) <= $distance"))
assert(distanceDensityIntersectsDF.queryExecution.sparkPlan.collect { case p: DistanceJoinExec => p }.size === 1)
assert(distanceDensityIntersectsDF.count() == expectedDensityIntersects)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what the number of expected intersects is in these test cases? Can you report the numbers here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, both intersects (<=) and non-intersects (<) return a count of 100 for this dataset. The polygon-point pairs are also exactly same since there is no pair with frechetDistance == 1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add more distanceCandidates? Most importantly, add some candidates that lead to results > 100.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have added candidates 1, 2, 5, 10. These yield result 100, 298, 688, 1258 respectively for a sample size of 100


//DensityFrac specified, < distance
val expectedDensityNoIntersect = bruteForceDistanceJoinHausdorff(sampleCount, distance, densityFrac, false)
val distanceDensityNoIntersectDF = inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"), expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape, $densityFrac) < $distance"))
assert(distanceDensityNoIntersectDF.queryExecution.sparkPlan.collect { case p: DistanceJoinExec => p }.size === 1)
assert(distanceDensityNoIntersectDF.count() == expectedDensityNoIntersect)

//DensityFrac not specified, <= distance
val expectedDefaultIntersects = bruteForceDistanceJoinHausdorff(sampleCount, distance, 0.0, true)
val distanceDefaultIntersectsDF = inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"), expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape, $densityFrac) <= $distance"))
assert(distanceDefaultIntersectsDF.queryExecution.sparkPlan.collect { case p: DistanceJoinExec => p }.size === 1)
assert(distanceDefaultIntersectsDF.count() == expectedDefaultIntersects)

//DensityFrac not specified, < distance
val expectedDefaultNoIntersects = bruteForceDistanceJoinHausdorff(sampleCount, distance, 0.0, false)
val distanceDefaultNoIntersectsDF = inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"), expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape, $densityFrac) < $distance"))
assert(distanceDefaultNoIntersectsDF.queryExecution.sparkPlan.collect { case p: DistanceJoinExec => p }.size === 1)
assert(distanceDefaultIntersectsDF.count() == expectedDefaultNoIntersects)
})
}
}
}