-
Notifications
You must be signed in to change notification settings - Fork 2
/
ksql.patch
78 lines (74 loc) · 3.67 KB
/
ksql.patch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
diff --git a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java
index 1bbdab224..0e09a734c 100644
--- a/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java
+++ b/ksqldb-serde/src/main/java/io/confluent/ksql/serde/json/KsqlJsonDeserializer.java
@@ -146,9 +146,11 @@ public class KsqlJsonDeserializer implements Deserializer<Object> {
});
return handler.apply(context);
} catch (final CoercionException e) {
- throw new CoercionException(e.getRawMessage(), pathPart + e.getPath(), e);
+ return null;
+ //throw new CoercionException(e.getRawMessage(), pathPart + e.getPath(), e);
} catch (final Exception e) {
- throw new CoercionException(e.getMessage(), pathPart, e);
+ return null;
+ //throw new CoercionException(e.getMessage(), pathPart, e);
}
}
@@ -237,12 +239,19 @@ public class KsqlJsonDeserializer implements Deserializer<Object> {
final Map<String, JsonNode> upperCasedFields = upperCaseKeys(jsonFields);
for (final Field ksqlField : context.schema.fields()) {
+
+ JsonNode fieldValue;
+ if (ksqlField.name().equals("$")) {
+ fieldValue = jsonFields;
+ } else {
+ fieldValue = jsonFields.get(ksqlField.name());
+ }
+
// the "case insensitive" strategy leverages that all KSQL fields are internally
// case sensitive - if they were specified without quotes, then they are upper-cased
// during parsing. any ksql fields that are case insensitive, therefore, will be matched
// in this case insensitive field map without modification but the quoted fields will not
// (unless they were all uppercase to start off with, which is expected to match)
- JsonNode fieldValue = jsonFields.get(ksqlField.name());
if (fieldValue == null) {
fieldValue = upperCasedFields.get(ksqlField.name());
}
@@ -315,4 +324,4 @@ public class KsqlJsonDeserializer implements Deserializer<Object> {
return path;
}
}
-}
\ No newline at end of file
+}
diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java
index 9eac3eb5a..b23058459 100644
--- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java
+++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamAggregateBuilder.java
@@ -48,6 +48,7 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
@@ -273,7 +274,8 @@ public final class StreamAggregateBuilder {
valueSerde,
StreamsUtil.buildOpName(queryContext),
window.getRetention().map(WindowTimeClause::toDuration))
- );
+ )
+ .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
}
@Override
@@ -316,7 +318,8 @@ public final class StreamAggregateBuilder {
valueSerde,
StreamsUtil.buildOpName(queryContext),
window.getRetention().map(WindowTimeClause::toDuration))
- );
+ )
+ .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
}
}