Skip to content

Commit

Permalink
[SEDONA-151] Add ST aggregators to Sedona Flink (#672)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiayuasu authored Aug 23, 2022
1 parent 547ff40 commit d7b1382
Show file tree
Hide file tree
Showing 11 changed files with 388 additions and 41 deletions.
27 changes: 27 additions & 0 deletions docs/api/flink/Aggregator.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
## ST_Envelope_Aggr

Introduction: Return the entire envelope boundary of all geometries in A

Format: `ST_Envelope_Aggr (A:geometryColumn)`

Since: `v1.3.0`

SQL example:
```SQL
SELECT ST_Envelope_Aggr(pointdf.arealandmark)
FROM pointdf
```

## ST_Union_Aggr

Introduction: Return the polygon union of all polygons in A. All inputs must be polygons.

Format: `ST_Union_Aggr (A:geometryColumn)`

Since: `v1.3.0`

SQL example:
```SQL
SELECT ST_Union_Aggr(polygondf.polygonshape)
FROM polygondf
```
8 changes: 5 additions & 3 deletions docs/api/flink/Overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ SedonaSQL supports SQL/MM Part3 Spatial SQL Standard. Please read the programmin
Sedona includes SQL operators as follows.

* Constructor: Construct a Geometry given an input string or coordinates
* Example: ST_GeomFromWKT (string). Create a Geometry from a WKT String.
* Example: ST_GeomFromWKT (string). Create a Geometry from a WKT String.
* Function: Execute a function on the given column or columns
* Example: ST_Distance (A, B). Given two Geometry A and B, return the Euclidean distance of A and B.
* Example: ST_Distance (A, B). Given two Geometry A and B, return the Euclidean distance of A and B.
* Aggregator: Return a single aggregated value on the given column
* Example: ST_Envelope_Aggr (Geometry column). Given a Geometry column, calculate the entire envelope boundary of this column.
* Predicate: Execute a logic judgement on the given columns and return true or false
* Example: ST_Contains (A, B). Check if A fully contains B. Return "True" if yes, else return "False".
* Example: ST_Contains (A, B). Check if A fully contains B. Return "True" if yes, else return "False".
2 changes: 2 additions & 0 deletions flink/src/main/java/org/apache/sedona/flink/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
public class Catalog {
public static UserDefinedFunction[] getFuncs() {
return new UserDefinedFunction[]{
new Aggregators.ST_Envelope_Aggr(),
new Aggregators.ST_Union_Aggr(),
new Constructors.ST_Point(),
new Constructors.ST_PointFromText(),
new Constructors.ST_LineStringFromText(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sedona.flink.expressions;

import org.apache.flink.table.annotation.DataTypeHint;
import org.locationtech.jts.geom.Geometry;

/**
* Mutable accumulator of structured type for the aggregate function
*/
public class Accumulators {

public static class Envelope {
public double minX = Double.MAX_VALUE;
public double minY = Double.MAX_VALUE;
public double maxX = Double.MIN_VALUE;
public double maxY = Double.MIN_VALUE;
void reset() {
minX = Double.MAX_VALUE;
minY = Double.MAX_VALUE;
maxX = Double.MIN_VALUE;
maxY = Double.MIN_VALUE;
}
}
public static class AccGeometry {
@DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
public Geometry geom;
}

public static class AccGeometry2 {
public Geometry geom1;
public Geometry geom2;
}

public static class AccGeometryN {
public Geometry[] geoms;
public int numGeoms;
AccGeometryN(int numGeoms) {
this.geoms = new Geometry[numGeoms];
this.numGeoms = numGeoms;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sedona.flink.expressions;

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.AggregateFunction;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.Envelope;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.geom.GeometryFactory;

public class Aggregators {
// Compute the rectangular boundary of a number of geometries
@DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
public static class ST_Envelope_Aggr extends AggregateFunction<Geometry, Accumulators.Envelope> {

Geometry createPolygon(double minX, double minY, double maxX, double maxY) {
Coordinate[] coords = new Coordinate[5];
coords[0] = new Coordinate(minX, minY);
coords[1] = new Coordinate(minX, maxY);
coords[2] = new Coordinate(maxX, maxY);
coords[3] = new Coordinate(maxX, minY);
coords[4] = coords[0];
GeometryFactory geomFact = new GeometryFactory();
return geomFact.createPolygon(coords);
}

@Override
public Accumulators.Envelope createAccumulator() {
return new Accumulators.Envelope();
}

@Override
@DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
public Geometry getValue(Accumulators.Envelope acc) {
return createPolygon(acc.minX, acc.minY, acc.maxX, acc.maxY);
}

public void accumulate(Accumulators.Envelope acc,
@DataTypeHint(value = "RAW", bridgedTo = Geometry.class) Object o) {
Envelope envelope = ((Geometry) o).getEnvelopeInternal();
acc.minX = Math.min(acc.minX, envelope.getMinX());
acc.minY = Math.min(acc.minY, envelope.getMinY());
acc.maxX = Math.max(acc.maxX, envelope.getMaxX());
acc.maxY = Math.max(acc.maxY, envelope.getMaxY());
}

/**
* TODO: find an efficient algorithm to incrementally and decrementally update the accumulator
*
* @param acc
* @param o
*/
public void retract(Accumulators.Envelope acc,
@DataTypeHint(value = "RAW", bridgedTo = Geometry.class) Object o) {
Geometry geometry = (Geometry) o;
assert(false);
}

public void merge(Accumulators.Envelope acc, Iterable<Accumulators.Envelope> it) {
for (Accumulators.Envelope a : it) {
acc.minX = Math.min(acc.minX, a.minX);
acc.minY = Math.min(acc.minY, a.minY);
acc.maxX = Math.max(acc.maxX, a.maxX);
acc.maxY = Math.max(acc.maxY, a.maxY);
}
}

public void resetAccumulator(Accumulators.Envelope acc) {
acc.reset();
}
}

// Compute the Union boundary of numbers of geometries
//
@DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
public static class ST_Union_Aggr extends AggregateFunction<Geometry, Accumulators.AccGeometry> {

@Override
public Accumulators.AccGeometry createAccumulator() {
return new Accumulators.AccGeometry();
}

@Override
@DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
public Geometry getValue(Accumulators.AccGeometry acc) {
return acc.geom;
}

public void accumulate(Accumulators.AccGeometry acc,
@DataTypeHint(value = "RAW", bridgedTo = Geometry.class) Object o) {
if (acc.geom == null){
acc.geom = (Geometry) o;
} else {
acc.geom = acc.geom.union((Geometry) o);
}
}

/**
* TODO: find an efficient algorithm to incrementally and decrementally update the accumulator
*
* @param acc
* @param o
*/
public void retract(Accumulators.AccGeometry acc,
@DataTypeHint(value = "RAW", bridgedTo = Geometry.class) Object o) {
Geometry geometry = (Geometry) o;
assert (false);
}

public void merge (Accumulators.AccGeometry acc, Iterable < Accumulators.AccGeometry > it){
for (Accumulators.AccGeometry a : it) {
if (acc.geom == null){
// make accumulate equal to acc
acc.geom = a.geom;
} else {
acc.geom = acc.geom.union(a.geom);
}
}
}

public void resetAccumulator (Accumulators.AccGeometry acc){
acc.geom = null;
}
}
}


6 changes: 3 additions & 3 deletions flink/src/test/java/org/apache/sedona/flink/AdapterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ public void testTableToDS()
$(polygonColNames[0])).as(polygonColNames[0]),
$(polygonColNames[1]));
Row result = last(geomTable);
assertEquals(data.get(data.size() - 1).toString(), result.toString());
assertEquals(data.get(data.size() - 1).getField(0).toString(), result.getField(0).toString());
// GeomTable to GeomDS
DataStream<Row> geomStream = tableEnv.toDataStream(geomTable);
assertEquals(data.get(0).toString(), geomStream.executeAndCollect(1).get(0).toString());
assertEquals(data.get(0).getField(0).toString(), geomStream.executeAndCollect(1).get(0).getField(0).toString());
// GeomDS to GeomTable
geomTable = tableEnv.fromDataStream(geomStream);
result = last(geomTable);
assertEquals(data.get(data.size() - 1).toString(), result.toString());
assertEquals(data.get(data.size() - 1).getField(0).toString(), result.getField(0).toString());
}
}
65 changes: 65 additions & 0 deletions flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sedona.flink;

import org.apache.flink.table.api.*;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.sedona.flink.expressions.Functions;
import org.junit.BeforeClass;
import org.junit.Test;
import org.locationtech.jts.geom.Polygon;

import static org.apache.flink.table.api.Expressions.*;
import static org.junit.Assert.assertEquals;

public class AggregatorTest extends TestBase{
@BeforeClass
public static void onceExecutedBeforeAll() {
initialize();
}

@Test
public void testEnvelop_Aggr() {
Table pointTable = createPointTable(testDataSize);
Table result = pointTable.select(call("ST_Envelope_Aggr", $(pointColNames[0])));
Row last = last(result);
assertEquals(String.format("POLYGON ((0 0, 0 %s, %s %s, %s 0, 0 0))", testDataSize - 1, testDataSize - 1,
testDataSize - 1, testDataSize - 1), last.getField(0).toString());
}

@Test
public void testKNN() {
Table pointTable = createPointTable(testDataSize);
pointTable = pointTable.select($(pointColNames[0]), call(Functions.ST_Distance.class.getSimpleName(), $(pointColNames[0])
, call("ST_GeomFromWKT", "POINT (0 0)")).as("distance"));
tableEnv.createTemporaryView(pointTableName, pointTable);
Table resultTable = tableEnv.sqlQuery("SELECT distance, " + pointColNames[0] + " " +
"FROM (" +
"SELECT *, ROW_NUMBER() OVER (ORDER BY distance ASC) AS row_num " +
"FROM " + pointTableName +
")" +
"WHERE row_num <= 5");
assertEquals(0.0, first(resultTable).getField(0));
assertEquals(5.656854249492381, last(resultTable).getField(0));
}

@Test
public void testUnion_Aggr(){
Table polygonTable = createPolygonOverlappingTable(testDataSize);
Table result = polygonTable.select(call("ST_Union_Aggr", $(polygonColNames[0])));
Row last = last(result);
assertEquals(1001, ((Polygon) last.getField(0)).getArea(), 0);
}
}
Loading

0 comments on commit d7b1382

Please sign in to comment.