Skip to content

Commit

Permalink
[SEDONA-189] Prepare geometries in broadcast join (#708)
Browse files Browse the repository at this point in the history
Co-authored-by: Tanel Kiis <tanel.kiis@reach-u.com>
  • Loading branch information
tanelk and Tanel Kiis authored Nov 5, 2022
1 parent 20283d6 commit 0004580
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.sedona.core.spatialOperator;

import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.geom.prep.PreparedGeometry;

import java.io.Serializable;

Expand All @@ -32,60 +33,99 @@ private SpatialPredicateEvaluators() {}
*/
public interface SpatialPredicateEvaluator extends Serializable {
boolean eval(Geometry left, Geometry right);

boolean eval(PreparedGeometry left, Geometry right);
}

public interface ContainsEvaluator extends SpatialPredicateEvaluator {
default boolean eval(Geometry left, Geometry right) {
return left.contains(right);
}

default boolean eval(PreparedGeometry left, Geometry right) {
return left.contains(right);
}
}

public interface IntersectsEvaluator extends SpatialPredicateEvaluator {
default boolean eval(Geometry left, Geometry right) {
return left.intersects(right);
}

default boolean eval(PreparedGeometry left, Geometry right) {
return left.intersects(right);
}
}

public interface WithinEvaluator extends SpatialPredicateEvaluator {
default boolean eval(Geometry left, Geometry right) {
return left.within(right);
}

default boolean eval(PreparedGeometry left, Geometry right) {
return left.within(right);
}

}

public interface CoversEvaluator extends SpatialPredicateEvaluator {
default boolean eval(Geometry left, Geometry right) {
return left.covers(right);
}

default boolean eval(PreparedGeometry left, Geometry right) {
return left.covers(right);
}
}

public interface CoveredByEvaluator extends SpatialPredicateEvaluator {
default boolean eval(Geometry left, Geometry right) {
return left.coveredBy(right);
}

default boolean eval(PreparedGeometry left, Geometry right) {
return left.coveredBy(right);
}
}

public interface TouchesEvaluator extends SpatialPredicateEvaluator {
default boolean eval(Geometry left, Geometry right) {
return left.touches(right);
}

default boolean eval(PreparedGeometry left, Geometry right) {
return left.touches(right);
}
}

public interface OverlapsEvaluator extends SpatialPredicateEvaluator {
default boolean eval(Geometry left, Geometry right) {
return left.overlaps(right);
}

default boolean eval(PreparedGeometry left, Geometry right) {
return left.overlaps(right);
}
}

public interface CrossesEvaluator extends SpatialPredicateEvaluator {
default boolean eval(Geometry left, Geometry right) {
return left.crosses(right);
}

default boolean eval(PreparedGeometry left, Geometry right) {
return left.crosses(right);
}
}

public interface EqualsEvaluator extends SpatialPredicateEvaluator {
default boolean eval(Geometry left, Geometry right) {
return left.symDifference(right).isEmpty();
}

default boolean eval(PreparedGeometry left, Geometry right) {
return left.getGeometry().symDifference(right).isEmpty();
}
}

private static class ConcreteContainsEvaluator implements ContainsEvaluator {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.sedona_sql.execution.SedonaBinaryExecNode
import org.locationtech.jts.geom.Geometry
import org.locationtech.jts.geom.prep.{PreparedGeometry, PreparedGeometryFactory}
import org.locationtech.jts.index.SpatialIndex

import scala.collection.mutable

case class BroadcastIndexJoinExec(left: SparkPlan,
right: SparkPlan,
streamShape: Expression,
Expand Down Expand Up @@ -81,22 +84,30 @@ case class BroadcastIndexJoinExec(left: SparkPlan,
// override def simpleString: String = super.simpleString + s" $spatialExpression" // SPARK2 anchor

private def windowBroadcastJoin(index: Broadcast[SpatialIndex], spatialRdd: SpatialRDD[Geometry]): RDD[(Geometry, Geometry)] = {
spatialRdd.getRawSpatialRDD.rdd.flatMap { row =>
val candidates = index.value.query(row.getEnvelopeInternal).iterator.asScala.asInstanceOf[Iterator[Geometry]]
spatialRdd.getRawSpatialRDD.rdd.mapPartitions { rows =>
val factory = new PreparedGeometryFactory()
val preparedGeometries = new mutable.HashMap[Geometry, PreparedGeometry]
val evaluator = SpatialPredicateEvaluators.create(spatialPredicate)
candidates
.filter(candidate => evaluator.eval(candidate, row))
.map(candidate => (candidate, row))
rows.flatMap { row =>
val candidates = index.value.query(row.getEnvelopeInternal).iterator.asScala.asInstanceOf[Iterator[Geometry]]
candidates
.filter(candidate => evaluator.eval(preparedGeometries.getOrElseUpdate(candidate, { factory.create(candidate) }), row))
.map(candidate => (candidate, row))
}
}
}

private def objectBroadcastJoin(index: Broadcast[SpatialIndex], spatialRdd: SpatialRDD[Geometry]): RDD[(Geometry, Geometry)] = {
spatialRdd.getRawSpatialRDD.rdd.flatMap { row =>
val candidates = index.value.query(row.getEnvelopeInternal).iterator.asScala.asInstanceOf[Iterator[Geometry]]
val evaluator = SpatialPredicateEvaluators.create(spatialPredicate)
candidates
.filter(candidate => evaluator.eval(row, candidate))
.map(candidate => (row, candidate))
spatialRdd.getRawSpatialRDD.rdd.mapPartitions { rows =>
val factory = new PreparedGeometryFactory()
val preparedGeometries = new mutable.HashMap[Geometry, PreparedGeometry]
val evaluator = SpatialPredicateEvaluators.create(SpatialPredicate.inverse(spatialPredicate))
rows.flatMap { row =>
val candidates = index.value.query(row.getEnvelopeInternal).iterator.asScala.asInstanceOf[Iterator[Geometry]]
candidates
.filter(candidate => evaluator.eval(preparedGeometries.getOrElseUpdate(candidate, { factory.create(candidate) }), row))
.map(candidate => (row, candidate))
}
}
}

Expand Down

0 comments on commit 0004580

Please sign in to comment.