diff --git a/docs/api/flink/Aggregator.md b/docs/api/flink/Aggregator.md
new file mode 100644
index 0000000000..e886ded6c7
--- /dev/null
+++ b/docs/api/flink/Aggregator.md
@@ -0,0 +1,27 @@
+## ST_Envelope_Aggr
+
+Introduction: Return the entire envelope boundary of all geometries in A
+
+Format: `ST_Envelope_Aggr (A:geometryColumn)`
+
+Since: `v1.3.0`
+
+SQL example:
+```SQL
+SELECT ST_Envelope_Aggr(pointdf.arealandmark)
+FROM pointdf
+```
+
+## ST_Union_Aggr
+
+Introduction: Return the polygon union of all polygons in A. All inputs must be polygons.
+
+Format: `ST_Union_Aggr (A:geometryColumn)`
+
+Since: `v1.3.0`
+
+SQL example:
+```SQL
+SELECT ST_Union_Aggr(polygondf.polygonshape)
+FROM polygondf
+```
\ No newline at end of file
diff --git a/docs/api/flink/Overview.md b/docs/api/flink/Overview.md
index 07b5fcc135..95e1b7d629 100644
--- a/docs/api/flink/Overview.md
+++ b/docs/api/flink/Overview.md
@@ -5,8 +5,10 @@ SedonaSQL supports SQL/MM Part3 Spatial SQL Standard. Please read the programmin
Sedona includes SQL operators as follows.
* Constructor: Construct a Geometry given an input string or coordinates
- * Example: ST_GeomFromWKT (string). Create a Geometry from a WKT String.
+ * Example: ST_GeomFromWKT (string). Create a Geometry from a WKT String.
* Function: Execute a function on the given column or columns
- * Example: ST_Distance (A, B). Given two Geometry A and B, return the Euclidean distance of A and B.
+ * Example: ST_Distance (A, B). Given two Geometry A and B, return the Euclidean distance of A and B.
+* Aggregator: Return a single aggregated value on the given column
+ * Example: ST_Envelope_Aggr (Geometry column). Given a Geometry column, calculate the entire envelope boundary of this column.
* Predicate: Execute a logic judgement on the given columns and return true or false
- * Example: ST_Contains (A, B). Check if A fully contains B. Return "True" if yes, else return "False".
+ * Example: ST_Contains (A, B). Check if A fully contains B. Return "True" if yes, else return "False".
diff --git a/flink/src/main/java/org/apache/sedona/flink/Catalog.java b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
index adbeb85bb3..887c8cb933 100644
--- a/flink/src/main/java/org/apache/sedona/flink/Catalog.java
+++ b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
@@ -19,6 +19,8 @@
public class Catalog {
public static UserDefinedFunction[] getFuncs() {
return new UserDefinedFunction[]{
+ new Aggregators.ST_Envelope_Aggr(),
+ new Aggregators.ST_Union_Aggr(),
new Constructors.ST_Point(),
new Constructors.ST_PointFromText(),
new Constructors.ST_LineStringFromText(),
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Accumulators.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Accumulators.java
new file mode 100644
index 0000000000..fad05837d1
--- /dev/null
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Accumulators.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sedona.flink.expressions;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.locationtech.jts.geom.Geometry;
+
+/**
+ * Mutable accumulator of structured type for the aggregate function
+ */
+public class Accumulators {
+
+ public static class Envelope {
+ public double minX = Double.MAX_VALUE;
+ public double minY = Double.MAX_VALUE;
+ public double maxX = Double.MIN_VALUE;
+ public double maxY = Double.MIN_VALUE;
+ void reset() {
+ minX = Double.MAX_VALUE;
+ minY = Double.MAX_VALUE;
+ maxX = Double.MIN_VALUE;
+ maxY = Double.MIN_VALUE;
+ }
+ }
+ public static class AccGeometry {
+ @DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
+ public Geometry geom;
+ }
+
+ public static class AccGeometry2 {
+ public Geometry geom1;
+ public Geometry geom2;
+ }
+
+ public static class AccGeometryN {
+ public Geometry[] geoms;
+ public int numGeoms;
+ AccGeometryN(int numGeoms) {
+ this.geoms = new Geometry[numGeoms];
+ this.numGeoms = numGeoms;
+ }
+ }
+}
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Aggregators.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Aggregators.java
new file mode 100644
index 0000000000..3b019907f6
--- /dev/null
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Aggregators.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sedona.flink.expressions;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.geom.Envelope;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.GeometryFactory;
+
+public class Aggregators {
+ // Compute the rectangular boundary of a number of geometries
+ @DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
+ public static class ST_Envelope_Aggr extends AggregateFunction {
+
+ Geometry createPolygon(double minX, double minY, double maxX, double maxY) {
+ Coordinate[] coords = new Coordinate[5];
+ coords[0] = new Coordinate(minX, minY);
+ coords[1] = new Coordinate(minX, maxY);
+ coords[2] = new Coordinate(maxX, maxY);
+ coords[3] = new Coordinate(maxX, minY);
+ coords[4] = coords[0];
+ GeometryFactory geomFact = new GeometryFactory();
+ return geomFact.createPolygon(coords);
+ }
+
+ @Override
+ public Accumulators.Envelope createAccumulator() {
+ return new Accumulators.Envelope();
+ }
+
+ @Override
+ @DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
+ public Geometry getValue(Accumulators.Envelope acc) {
+ return createPolygon(acc.minX, acc.minY, acc.maxX, acc.maxY);
+ }
+
+ public void accumulate(Accumulators.Envelope acc,
+ @DataTypeHint(value = "RAW", bridgedTo = Geometry.class) Object o) {
+ Envelope envelope = ((Geometry) o).getEnvelopeInternal();
+ acc.minX = Math.min(acc.minX, envelope.getMinX());
+ acc.minY = Math.min(acc.minY, envelope.getMinY());
+ acc.maxX = Math.max(acc.maxX, envelope.getMaxX());
+ acc.maxY = Math.max(acc.maxY, envelope.getMaxY());
+ }
+
+ /**
+ * TODO: find an efficient algorithm to incrementally and decrementally update the accumulator
+ *
+ * @param acc
+ * @param o
+ */
+ public void retract(Accumulators.Envelope acc,
+ @DataTypeHint(value = "RAW", bridgedTo = Geometry.class) Object o) {
+ Geometry geometry = (Geometry) o;
+ assert(false);
+ }
+
+ public void merge(Accumulators.Envelope acc, Iterable it) {
+ for (Accumulators.Envelope a : it) {
+ acc.minX = Math.min(acc.minX, a.minX);
+ acc.minY = Math.min(acc.minY, a.minY);
+ acc.maxX = Math.max(acc.maxX, a.maxX);
+ acc.maxY = Math.max(acc.maxY, a.maxY);
+ }
+ }
+
+ public void resetAccumulator(Accumulators.Envelope acc) {
+ acc.reset();
+ }
+ }
+
+ // Compute the Union boundary of numbers of geometries
+ //
+ @DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
+ public static class ST_Union_Aggr extends AggregateFunction {
+
+ @Override
+ public Accumulators.AccGeometry createAccumulator() {
+ return new Accumulators.AccGeometry();
+ }
+
+ @Override
+ @DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
+ public Geometry getValue(Accumulators.AccGeometry acc) {
+ return acc.geom;
+ }
+
+ public void accumulate(Accumulators.AccGeometry acc,
+ @DataTypeHint(value = "RAW", bridgedTo = Geometry.class) Object o) {
+ if (acc.geom == null){
+ acc.geom = (Geometry) o;
+ } else {
+ acc.geom = acc.geom.union((Geometry) o);
+ }
+ }
+
+ /**
+ * TODO: find an efficient algorithm to incrementally and decrementally update the accumulator
+ *
+ * @param acc
+ * @param o
+ */
+ public void retract(Accumulators.AccGeometry acc,
+ @DataTypeHint(value = "RAW", bridgedTo = Geometry.class) Object o) {
+ Geometry geometry = (Geometry) o;
+ assert (false);
+ }
+
+ public void merge (Accumulators.AccGeometry acc, Iterable < Accumulators.AccGeometry > it){
+ for (Accumulators.AccGeometry a : it) {
+ if (acc.geom == null){
+ // make accumulate equal to acc
+ acc.geom = a.geom;
+ } else {
+ acc.geom = acc.geom.union(a.geom);
+ }
+ }
+ }
+
+ public void resetAccumulator (Accumulators.AccGeometry acc){
+ acc.geom = null;
+ }
+ }
+}
+
+
diff --git a/flink/src/test/java/org/apache/sedona/flink/AdapterTest.java b/flink/src/test/java/org/apache/sedona/flink/AdapterTest.java
index 6e429e954f..3a24730cf7 100644
--- a/flink/src/test/java/org/apache/sedona/flink/AdapterTest.java
+++ b/flink/src/test/java/org/apache/sedona/flink/AdapterTest.java
@@ -48,13 +48,13 @@ public void testTableToDS()
$(polygonColNames[0])).as(polygonColNames[0]),
$(polygonColNames[1]));
Row result = last(geomTable);
- assertEquals(data.get(data.size() - 1).toString(), result.toString());
+ assertEquals(data.get(data.size() - 1).getField(0).toString(), result.getField(0).toString());
// GeomTable to GeomDS
DataStream geomStream = tableEnv.toDataStream(geomTable);
- assertEquals(data.get(0).toString(), geomStream.executeAndCollect(1).get(0).toString());
+ assertEquals(data.get(0).getField(0).toString(), geomStream.executeAndCollect(1).get(0).getField(0).toString());
// GeomDS to GeomTable
geomTable = tableEnv.fromDataStream(geomStream);
result = last(geomTable);
- assertEquals(data.get(data.size() - 1).toString(), result.toString());
+ assertEquals(data.get(data.size() - 1).getField(0).toString(), result.getField(0).toString());
}
}
diff --git a/flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java b/flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java
new file mode 100644
index 0000000000..1e8e3fe30d
--- /dev/null
+++ b/flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sedona.flink;
+
+import org.apache.flink.table.api.*;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.sedona.flink.expressions.Functions;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.locationtech.jts.geom.Polygon;
+
+import static org.apache.flink.table.api.Expressions.*;
+import static org.junit.Assert.assertEquals;
+
+public class AggregatorTest extends TestBase{
+ @BeforeClass
+ public static void onceExecutedBeforeAll() {
+ initialize();
+ }
+
+ @Test
+ public void testEnvelop_Aggr() {
+ Table pointTable = createPointTable(testDataSize);
+ Table result = pointTable.select(call("ST_Envelope_Aggr", $(pointColNames[0])));
+ Row last = last(result);
+ assertEquals(String.format("POLYGON ((0 0, 0 %s, %s %s, %s 0, 0 0))", testDataSize - 1, testDataSize - 1,
+ testDataSize - 1, testDataSize - 1), last.getField(0).toString());
+ }
+
+ @Test
+ public void testKNN() {
+ Table pointTable = createPointTable(testDataSize);
+ pointTable = pointTable.select($(pointColNames[0]), call(Functions.ST_Distance.class.getSimpleName(), $(pointColNames[0])
+ , call("ST_GeomFromWKT", "POINT (0 0)")).as("distance"));
+ tableEnv.createTemporaryView(pointTableName, pointTable);
+ Table resultTable = tableEnv.sqlQuery("SELECT distance, " + pointColNames[0] + " " +
+ "FROM (" +
+ "SELECT *, ROW_NUMBER() OVER (ORDER BY distance ASC) AS row_num " +
+ "FROM " + pointTableName +
+ ")" +
+ "WHERE row_num <= 5");
+ assertEquals(0.0, first(resultTable).getField(0));
+ assertEquals(5.656854249492381, last(resultTable).getField(0));
+ }
+
+ @Test
+ public void testUnion_Aggr(){
+ Table polygonTable = createPolygonOverlappingTable(testDataSize);
+ Table result = polygonTable.select(call("ST_Union_Aggr", $(polygonColNames[0])));
+ Row last = last(result);
+ assertEquals(1001, ((Polygon) last.getField(0)).getArea(), 0);
+ }
+}
diff --git a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
index 7585b4f450..c216bc2f71 100644
--- a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
+++ b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
@@ -24,6 +24,7 @@
import org.wololo.jts2geojson.GeoJSONReader;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import static org.apache.flink.table.api.Expressions.$;
@@ -74,7 +75,7 @@ public void test2DPoint() {
public void testPointFromText() {
List data = createPointWKT(testDataSize);
Row result = last(createPointTable(testDataSize));
- assertEquals(data.get(data.size() - 1).toString(), result.toString());
+ assertEquals(data.get(data.size() - 1).getField(0).toString(), result.getField(0).toString());
}
@Test
@@ -86,7 +87,7 @@ public void testLineFromText() {
$(linestringColNames[1]));
Row result = last(lineStringTable);
- assertEquals(data.get(data.size() - 1).toString(), result.toString());
+ assertEquals(data.get(data.size() - 1).getField(0).toString(), result.getField(0).toString());
}
@Test
@@ -98,14 +99,14 @@ public void testLineStringFromText() {
$(linestringColNames[1]));
Row result = last(lineStringTable);
- assertEquals(data.get(data.size() - 1).toString(), result.toString());
+ assertEquals(data.get(data.size() - 1).getField(0).toString(), result.getField(0).toString());
}
@Test
public void testPolygonFromText() {
List data = createPolygonWKT(testDataSize);
Row result = last(createPolygonTable(testDataSize));
- assertEquals(data.get(data.size() - 1).toString(), result.toString());
+ assertEquals(data.get(data.size() - 1).getField(0).toString(), result.getField(0).toString());
}
@Test
@@ -116,7 +117,7 @@ public void testGeomFromWKT() {
$(polygonColNames[0])).as(polygonColNames[0]),
$(polygonColNames[1]));
Row result = last(geomTable);
- assertEquals(data.get(data.size() - 1).toString(), result.toString());
+ assertEquals(data.get(data.size() - 1).getField(0).toString(), result.getField(0).toString());
}
@Test
@@ -127,7 +128,7 @@ public void testGeomFromText() {
$(polygonColNames[0])).as(polygonColNames[0]),
$(polygonColNames[1]));
Row result = last(geomTable);
- assertEquals(data.get(data.size() - 1).toString(), result.toString());
+ assertEquals(data.get(data.size() - 1).getField(0).toString(), result.getField(0).toString());
}
@Test
@@ -179,7 +180,7 @@ public void testGeomFromWKBBytes()
TypeInformation>[] colTypes = {
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO};
- RowTypeInfo typeInfo = new RowTypeInfo(colTypes, polygonColNames);
+ RowTypeInfo typeInfo = new RowTypeInfo(colTypes, Arrays.copyOfRange(polygonColNames, 0, 2));
DataStream wkbDS = env.fromCollection(data).returns(typeInfo);
Table wkbTable = tableEnv.fromDataStream(wkbDS, $(polygonColNames[0]), $(polygonColNames[1]));
@@ -199,7 +200,7 @@ public void testGeomFromWKBBytes()
public void testGeomFromGeoHash() {
Integer precision = 2;
List data = new ArrayList<>();
- data.add(Row.of("2131s12fd", "polygon"));
+ data.add(Row.of("2131s12fd", "polygon", 0L));
Table geohashTable = createTextTable(data, polygonColNames);
Table geomTable = geohashTable
@@ -217,7 +218,7 @@ public void testGeomFromGeoHash() {
@Test
public void testGeomFromGeoHashNullPrecision() {
List data = new ArrayList<>();
- data.add(Row.of("2131s12fd", "polygon"));
+ data.add(Row.of("2131s12fd", "polygon", 0L));
Table geohashTable = createTextTable(data, polygonColNames);
Table geomTable = geohashTable
diff --git a/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java b/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java
index 9e3a20c4b9..1ee2a74384 100644
--- a/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java
+++ b/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java
@@ -54,7 +54,7 @@ public void testFlipCoordinates() {
Table pointTable = createPointTable_real(testDataSize);
Table flippedTable = pointTable.select(call(Functions.ST_FlipCoordinates.class.getSimpleName(), $(pointColNames[0])));
Geometry result = (Geometry) first(flippedTable).getField(0);
- assertEquals("POINT (-118 32)", result.toString());
+ assertEquals("POINT (-117.99 32.01)", result.toString());
}
@Test
@@ -63,7 +63,7 @@ public void testTransform() {
Table transformedTable = pointTable.select(call(Functions.ST_Transform.class.getSimpleName(), $(pointColNames[0])
, "epsg:4326", "epsg:3857"));
String result = first(transformedTable).getField(0).toString();
- assertEquals("POINT (-13135699.91360628 3763310.6271446524)", result);
+ assertEquals("POINT (-13134586.718698347 3764623.3541299687)", result);
}
@Test
@@ -107,7 +107,7 @@ public void testPointOnSurface() {
Table pointTable = createPointTable_real(testDataSize);
Table surfaceTable = pointTable.select(call(Functions.ST_PointOnSurface.class.getSimpleName(), $(pointColNames[0])));
Geometry result = (Geometry) first(surfaceTable).getField(0);
- assertEquals("POINT (32 -118)", result.toString());
+ assertEquals("POINT (32.01 -117.99)", result.toString());
}
@Test
diff --git a/flink/src/test/java/org/apache/sedona/flink/TestBase.java b/flink/src/test/java/org/apache/sedona/flink/TestBase.java
index 81e2cbd1e1..26c5f14c2f 100644
--- a/flink/src/test/java/org/apache/sedona/flink/TestBase.java
+++ b/flink/src/test/java/org/apache/sedona/flink/TestBase.java
@@ -13,6 +13,7 @@
*/
package org.apache.sedona.flink;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -30,7 +31,9 @@
import org.locationtech.jts.geom.*;
import org.wololo.jts2geojson.GeoJSONWriter;
+import java.sql.Timestamp;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import static org.apache.flink.table.api.Expressions.$;
@@ -40,11 +43,13 @@ public class TestBase {
protected static StreamExecutionEnvironment env;
protected static StreamTableEnvironment tableEnv;
static int testDataSize = 1000;
- static String[] pointColNames = {"geom_point", "name_point"};
- static String[] linestringColNames = {"geom_linestring", "name_linestring"};
- static String[] polygonColNames = {"geom_polygon", "name_polygon"};
+ static String[] pointColNames = {"geom_point", "name_point", "event_time", "proc_time"};
+ static String[] linestringColNames = {"geom_linestring", "name_linestring", "event_time", "proc_time"};
+ static String[] polygonColNames = {"geom_polygon", "name_polygon", "event_time", "proc_time"};
static String pointTableName = "point_table";
static String polygonTableName = "polygon_table";
+ static Long timestamp_base = new Timestamp(System.currentTimeMillis()).getTime();
+ static Long time_interval = 1L; // Generate a record per this interval. Unit is second
public void setTestDataSize(int testDataSize) {
this.testDataSize = testDataSize;
@@ -68,7 +73,7 @@ static List createPointText(int size){
List data = new ArrayList<>();
for (int i = 0; i < size; i++) {
// Create a numer of points (1, 1) (2, 2) ...
- data.add(Row.of(i + "," + i, "point" + i));
+ data.add(Row.of(i + "," + i, "point" + i, timestamp_base + time_interval * 1000 * i));
}
return data;
}
@@ -86,10 +91,13 @@ static List creatPoint(int size){
// Simulate some points in the US
static List createPointText_real(int size){
List data = new ArrayList<>();
- for (double i = 0; i < 10.0; i = i + 10.0/size) {
- double x = 32.0 + i;
- double y = -118.0 + i;
- data.add(Row.of(x + "," + y, "point"));
+ double x = 32.0;
+ double y = -118.0;
+ double increment = 10.0/size;
+ for (int i = 0; i < size; i++) {
+ x += increment;
+ y += increment;
+ data.add(Row.of(x + "," + y, "point" + i, timestamp_base + time_interval * 1000 * i));
}
return data;
}
@@ -98,7 +106,7 @@ static List createPointWKT(int size){
List data = new ArrayList<>();
for (int i = 0; i < size; i++) {
// Create a numer of points (1, 1) (2, 2) ...
- data.add(Row.of("POINT (" + i + " " + i +")", "point" + i));
+ data.add(Row.of("POINT (" + i + " " + i +")", "point" + i, timestamp_base + time_interval * 1000 * i));
}
return data;
}
@@ -118,7 +126,7 @@ static List createPolygonText(int size) {
polygon.add(maxX);polygon.add(maxY);
polygon.add(maxX);polygon.add(minY);
polygon.add(minX);polygon.add(minY);
- data.add(Row.of(String.join(",", polygon), "polygon" + i));
+ data.add(Row.of(String.join(",", polygon), "polygon" + i, timestamp_base + time_interval * 1000 * i));
}
return data;
}
@@ -138,7 +146,7 @@ static List createLineStringText(int size) {
linestring.add(maxX);
linestring.add(maxY);
- data.add(Row.of(String.join(",", linestring), "linestring" + i));
+ data.add(Row.of(String.join(",", linestring), "linestring" + i, timestamp_base + time_interval * 1000 * i));
}
return data;
}
@@ -156,7 +164,30 @@ static List createLineStringWKT(int size) {
linestring.add(minX + " " + minY);
linestring.add(maxX + " " + maxY);
- data.add(Row.of("LINESTRING (" + String.join(", ", linestring) + ")", "linestring" + i));
+ data.add(Row.of("LINESTRING (" + String.join(", ", linestring) + ")", "linestring" + i, timestamp_base + time_interval * 1000 * i));
+ }
+ return data;
+ }
+
+ // createPolyOverlapping
+ static List createPolygonOverlapping(int size) {
+ List data = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ // Create polygons each of which only has 1 match in points
+ // The polygons are like (-1, 0, 1, 1)
+ // (0, 0, 2, 1)
+ // (1, 0, 3, 1)
+ String minX = String.valueOf(i - 1);
+ String minY = String.valueOf(0);
+ String maxX = String.valueOf(i + 1);
+ String maxY = String.valueOf(1);
+ List polygon = new ArrayList<>();
+ polygon.add(minX);polygon.add(minY);
+ polygon.add(minX);polygon.add(maxY);
+ polygon.add(maxX);polygon.add(maxY);
+ polygon.add(maxX);polygon.add(minY);
+ polygon.add(minX);polygon.add(minY);
+ data.add(Row.of(String.join(",", polygon),"polygon" + i, timestamp_base + time_interval * 1000 * i));
}
return data;
}
@@ -176,7 +207,7 @@ static List createPolygonWKT(int size) {
polygon.add(maxX + " " + maxY);
polygon.add(maxX + " " + minY);
polygon.add(minX + " " + minY);
- data.add(Row.of("POLYGON ((" + String.join(", ", polygon) + "))", "polygon" + i));
+ data.add(Row.of("POLYGON ((" + String.join(", ", polygon) + "))", "polygon" + i, timestamp_base + time_interval * 1000 * i));
}
return data;
}
@@ -202,7 +233,7 @@ static List createPolygonGeoJSON(int size) {
Geometry polygon = geometryFactory.createPolygon(points);
String geoJson = writer.write(polygon).toString();
- data.add(Row.of(geoJson, "polygon" + i));
+ data.add(Row.of(geoJson, "polygon" + i, timestamp_base + time_interval * 1000 * i));
}
return data;
@@ -211,10 +242,17 @@ static List createPolygonGeoJSON(int size) {
static Table createTextTable(List data, String[] colNames){
TypeInformation>[] colTypes = {
BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO};
- RowTypeInfo typeInfo = new RowTypeInfo(colTypes, colNames);
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO
+ };
+ RowTypeInfo typeInfo = new RowTypeInfo(colTypes, Arrays.copyOfRange(colNames, 0, 3));
DataStream ds = env.fromCollection(data).returns(typeInfo);
- return tableEnv.fromDataStream(ds, $(colNames[0]), $(colNames[1]));
+ // Generate Time Attribute
+ WatermarkStrategy wmStrategy =
+ WatermarkStrategy
+ .forMonotonousTimestamps()
+ .withTimestampAssigner((event, timestamp) -> event.getFieldAs(2));
+ return tableEnv.fromDataStream(ds.assignTimestampsAndWatermarks(wmStrategy), $(colNames[0]), $(colNames[1]), $(colNames[2]).rowtime(), $(colNames[3]).proctime());
}
static Table createPointTextTable(int size){
@@ -233,36 +271,49 @@ static Table createPolygonTextTable(int size) {
return createTextTable(createPolygonText(size), polygonColNames);
}
+ static Table createPolygonTextOverlappingTable(int size) {
+ return createTextTable(createPolygonOverlapping(size), polygonColNames);
+ }
+
static Table createPointTable(int size){
return createPointTextTable(size)
.select(call(Constructors.ST_PointFromText.class.getSimpleName(),
$(pointColNames[0])).as(pointColNames[0]),
- $(pointColNames[1]));
+ $(pointColNames[1]), $(pointColNames[2]), $(pointColNames[3]));
}
static Table createPointTable_real(int size){
return createPointTextTable_real(size)
.select(call(Constructors.ST_PointFromText.class.getSimpleName(),
$(pointColNames[0])).as(pointColNames[0]),
- $(pointColNames[1]));
+ $(pointColNames[1]), $(pointColNames[2]), $(pointColNames[3]));
}
static Table createLineStringTable(int size) {
return createLineStringTextTable(size)
.select(call(Constructors.ST_LineStringFromText.class.getSimpleName(),
$(linestringColNames[0])).as(linestringColNames[0]),
- $(linestringColNames[1]));
+ $(linestringColNames[1]), $(linestringColNames[2]), $(linestringColNames[3]));
}
Table createPolygonTable(int size) {
return createPolygonTextTable(size)
.select(call(Constructors.ST_PolygonFromText.class.getSimpleName(),
$(polygonColNames[0])).as(polygonColNames[0]),
- $(polygonColNames[1]));
+ $(polygonColNames[1]), $(polygonColNames[2]), $(polygonColNames[3]));
+ }
+
+ //createPolygonTextOverlapping
+
+ Table createPolygonOverlappingTable(int size) {
+ return createPolygonTextOverlappingTable(size)
+ .select(call(Constructors.ST_PolygonFromText.class.getSimpleName(),
+ $(polygonColNames[0])).as(polygonColNames[0]),
+ $(polygonColNames[1]), $(polygonColNames[2]), $(polygonColNames[3]));
}
/**
- * Get the iterator of the flink
+ * Get the iterator of the table
* @param table
* @return
*/
@@ -271,7 +322,7 @@ static CloseableIterator iterate(Table table) {
}
/**
- * Iterate to the last row of the flink
+ * Iterate to the last row of the table
* @param table
* @return
*/
@@ -282,6 +333,11 @@ static Row last(Table table) {
return lastRow;
}
+ /**
+ * Get the first row of the table
+ * @param table
+ * @return
+ */
static Row first(Table table) {
CloseableIterator it = iterate(table);
assert(it.hasNext());
diff --git a/mkdocs.yml b/mkdocs.yml
index def129a9e3..a0b898b809 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -76,6 +76,7 @@ nav:
- Overview: api/flink/Overview.md
- Constructor: api/flink/Constructor.md
- Function: api/flink/Function.md
+ - Aggregator: api/flink/Aggregator.md
- Predicate: api/flink/Predicate.md
- Community:
- Community: community/contact.md