-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Flink Pinot connector #8233
Conversation
@walterddr please review |
Codecov Report
@@ Coverage Diff @@
## master #8233 +/- ##
============================================
- Coverage 70.83% 69.51% -1.33%
Complexity 4245 4245
============================================
Files 1631 1631
Lines 85462 85490 +28
Branches 12877 12878 +1
============================================
- Hits 60539 59427 -1112
- Misses 20746 21923 +1177
+ Partials 4177 4140 -37
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
Response response = mock(Response.class); | ||
MultivaluedMap<String, Object> headers = getRequestHeadersToPinot(); | ||
String expectedFullURL = "http://localhost:9000/tables/demand?type=realtime"; | ||
final Map<String, Object> resEntity = new HashMap<>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems not compilable for jdk8:
Error: Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile (default-testCompile) on project flink-pinot-sink: Compilation failure
Error: /home/runner/work/pinot/pinot/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/http/PinotControllerClientTest.java:[124,53] error: cannot infer type arguments for HashMap<K,V>
Error: reason: cannot use '<>' with anonymous inner classes
Error: where K,V are type-variables:
Error: K extends Object declared in class HashMap
Error: V extends Object declared in class HashMap
Error: -> [Help 1]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we dropped jdk8 support? not yet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, this usage HashMap<>
is all over the codebase
"transformFunction": null | ||
}, | ||
"primaryKeyColumns" : [ "demand_uuid" ] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: new line, same for other files
|
||
|
||
/** This file is partially copied from RTAUMS gateway package */ | ||
public class PinotControllerClient extends HttpClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel this is a very helpful Client.
Just drop my random thought here:
Shall we rename this to PinotAdminClient and move it to the pinot-java-client
module then import it back?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH we already have something very similar but is only used in test: ControllerTestUtils. we can probably consider a prime version of it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup, I'm fine with consolidating/extracting the client into a more reusable lib. But I think it's better to do this in a later PR, to reduce the scope of this PR, which is already quite large.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, can you create a new issue for consolidating the reusable utils with potential candidates that can be merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure. tracked at #8246
|
||
|
||
/** This is copied from {@link HttpClient} in RTA-UMS project. */ | ||
public abstract class HttpClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this abstract? I feel this class is more like static util methods if making _httpClient
static.
Also, we have implementations at AbstractBaseAdminCommand
in pinot-tools
for the same purpose, you can also consider moving this logic to pinot-common :p
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, it's just some helper methods that can be reused by subclasses. For example, if we want to add other clients to broker/server etc
this looks super great. thanks @yupeng9 for the contribution. I will take a look this weekend thoroughly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed comments
Response response = mock(Response.class); | ||
MultivaluedMap<String, Object> headers = getRequestHeadersToPinot(); | ||
String expectedFullURL = "http://localhost:9000/tables/demand?type=realtime"; | ||
final Map<String, Object> resEntity = new HashMap<>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, this usage HashMap<>
is all over the codebase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @yupeng9 for the contribution. I briefly looked over most of the implementions and found some issues. please kindly take a look.
Since this is largely based on my previous POC: https://github.com/walterddr/flink-pinot-sink my review might've been biased. it would be great if @npawar can also take another look at the segment write/upload logic.
thanks
import org.apache.pinot.spi.data.readers.GenericRow; | ||
|
||
|
||
public class PinotMapRecordConverter implements RecordConverter<Map> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public class PinotMapRecordConverter implements RecordConverter<Map> { | |
public class PinotMapRecordConverter implements RecordConverter<Map<String, Object>> { |
to avoid unchecked casts.
private BatchIngestionConfig _batchIngestionConfig; | ||
|
||
public FlinkSegmentUploader(int indexOfSubtask) { | ||
_indexOfSubtask = indexOfSubtask; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indexOfSubtask is not used in uploaded. I think it only matters in segment writer.
* data and also share that data with checkpoint state. Instead it uses an internal buffer within | ||
* PinotSinkFunction for checkpoint. | ||
* | ||
* <p>This should change once we introduce FlinkPinotSegmentWriter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this javadoc is outdated.
// TODO improve segment uploader to use in-memory buffer then flush to tar file. | ||
_segmentWriter = new FlinkSegmentWriter(indexOfSubtask, getRuntimeContext().getMetricGroup()); | ||
_segmentWriter.init(_tableConfig, _schema); | ||
// TODO improve segment uploader to take in-memory tar | ||
// TODO launch segment uploader as separate thread for uploading (non-blocking?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clean up TODOs.
// ... | ||
} | ||
|
||
private void flush() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
potential race condition when snapshot and invoke both calls flush?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to have a race conditon? Only invoke
can call it, since snapshotState
is not used in batch processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in this case we can just throw an exception? rather than invoking flush in snapshotState? if it is not use anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
_reusableRecord = new GenericData.Record(_avroSchema); | ||
|
||
// Create tmp dir | ||
// TODO staging Dir also need to be subtask separated otherwise there will be write conflict. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clean up TODO
private transient Counter _processedRecords; | ||
private transient volatile long _lastRecordProcessingTimeMs = 0; | ||
|
||
public FlinkSegmentWriter(int indexOfSubtask, MetricGroup metricGrp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Refactor common code between FlinkSegmentWriter and FileBaseSegmentWriter but can be done in separate PR
@@ -0,0 +1,130 @@ | |||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might've been wrong but I dont see the entire package of org.apache.pinot.connector.flink.http
used in anywhere outside of this package.
is this module relevant to the flink connector PR at all? or this is simply used for integration/e2e test as an easy util?
i've deleted this entire module and tests runs just fine, so if not relevant I suggest we delete this module
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used in the quickstart to show how to use REST API to fetch the schema and table config to config the connector. Otherwise developer needs to manually craft those configs.
Also, you can take a look at PinotConnectionUtils
for the useful config management.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but it is not related to the flink connector. for the sake of this PR's scope. i would suggest we drop it and add in separate PR.
mainly for the following reasons.
- it doesn't relate to flink connector, at first I thought we are utilizing http to directly ingest into pinot via some sort of REST API. but if it is not the case it shouldn't be part of flink-pinot connector but rather in pinot-tools.
- @xiangfu0 also think we can refactor this part out later. doesn't make sense to get this in an refactor it out if it is not part of the critical path
- there's already one util in test call ControllerUtils that can be used for demonstration purpose.
Not saying it is not useful, but it would be much easier to address it separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really. I think this is an integral part to show how the connector is used as in QuickStart
, without it I don't think the connector is complete. It doesn't make sense to expect developers to manually create the Schema as well as TableConfig, not to mention several configurations decoration needed in the PinotConnectionUtils
.
Xiang's point is that in later PR we can refactor this part of code with a reusable client lib. However, from the connector's user perspective, it doesn't change the way of using the connector.
Makes sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm. I must have missed something. here is what I did:
- deleted the entire folder under
src/main/java/org/apache/pinot/connector/flink/http
. - run
mvn test -pl pinot-connectors/pinot-flink-connector
everything runs pass. this proves that the http folder has nothing to do with this PR. yes?
did you forget to add a Quickstart? I don't see a quickstart in this PR or any changes outside of the pinot-flink-connector module. (and if so, let's add it in a separate PR i suggest)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm , I see. This file is filtered by one of the .gitignore rules. Renamed it and pushed again.
<version>0.10.0-SNAPSHOT</version> | ||
<relativePath>..</relativePath> | ||
</parent> | ||
<artifactId>flink-pinot-sink</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<artifactId>flink-pinot-sink</artifactId> | |
<artifactId>pinot-flink-connector</artifactId> |
<dependency> | ||
<groupId>org.mockito</groupId> | ||
<artifactId>mockito-core</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-clients_${scala.version}</artifactId> | ||
<version>${flink.version}</version> | ||
</dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add <scope>test</scope>
to these 2 dependencies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@walterddr Thanks for the review. Addressed the comments PTAL.
@@ -0,0 +1,130 @@ | |||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used in the quickstart to show how to use REST API to fetch the schema and table config to config the connector. Otherwise developer needs to manually craft those configs.
Also, you can take a look at PinotConnectionUtils
for the useful config management.
// ... | ||
} | ||
|
||
private void flush() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to have a race conditon? Only invoke
can call it, since snapshotState
is not used in batch processing.
*/ | ||
@SuppressWarnings("NullAway") | ||
@NotThreadSafe | ||
public class FlinkSegmentWriter implements SegmentWriter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why was creating a Flink specific impl of SegmentWriter needed? Why couldn't we just use the FileBasedSegmentWriter? most of the code looks the same, with some additional metadata like seqId and rowCount etc. Aren't those concepts generic enough that we could just add them to FileBasedSegmentWriter and enhance it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could with a later refactoring. I think one of the reasons is that we don't have an abstract class of SegmentWriter
that allows customization like in this Flink case. Also, initially, this connector was not planned to add to Pinot repo, but Flink repo, which means this connector has to depend on the published pinot library only.
Similar to the consolidation with the client utils, I think we can later consolidate the SegmentWriter, after they are in the same repo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the Sink should have been in Flink, but the Writer should have always been in Pinot, and it should have been from the impls offered. If by consolidate later, you mean soon enough, I'm fine with starting with this. But this can easily get deprioritized and fall off the plate once it is non-blocking :) And the code really is identical, except a few tweaks here and there which the default impl can benefit from anyway..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fair. Let me address this in a follow-up PR, and it shouldn't be hard. But I try not to modify other modules in this PR, so it'll be a pure module addition.
...-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
Show resolved
Hide resolved
...-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
Show resolved
Hide resolved
...-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
Show resolved
Hide resolved
...-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
Show resolved
Hide resolved
...-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
Show resolved
Hide resolved
|
||
|
||
/** | ||
* Default implementation of {@link SegmentUploader} with support for all push modes The configs for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadoc needs to be updated
import org.slf4j.LoggerFactory; | ||
|
||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the javadoc from FileBasedSegmentWriter. call out here why FlinkSegmentWriter needed to be added, why it is different from FileBased, and any todos to unify the 2. imo, FlinkSegmentWriter should just extend the FileBasedSegmentWriter for the flink metrics related special casing. Everything else should get folded into FileBased impl.
* Default implementation of {@link SegmentUploader} with support for all push modes The configs for | ||
* push are fetched from batchConfigMaps of tableConfig | ||
*/ | ||
@SuppressWarnings("NullAway") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class seems exactly identical to SegmentUploaderDefault. Can we just use that one?
} catch (InterruptedException e) { | ||
_executor.shutdownNow(); | ||
} | ||
_segmentWriter.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do this in finally?
public PinotRowRecordConverter(RowTypeInfo rowTypeInfo) { | ||
_rowTypeInfo = rowTypeInfo; | ||
_fieldNames = rowTypeInfo.getFieldNames(); | ||
_fieldTypes = rowTypeInfo.getFieldTypes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is unused. remove?
import org.apache.pinot.spi.data.readers.GenericRow; | ||
|
||
|
||
public class PinotMapRecordConverter implements RecordConverter<Map<String, Object>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(optional) this name is not the very intuitive..
How about PinotGenericRowConverter as interface, and MapGenericRowConverter and FlinkRowGenericRowConverter as impls?
* the offline table of all Starbucks store locations in US, and then run this quick start to populate other Starbucks | ||
* stores in the rest of the world. | ||
*/ | ||
public final class FlinkQuickStart { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this just be in pinot-tools with the rest of the Quickstarts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure. pinot-tools doesn't have dependency on connectors. actually we do not build connectors for the core
import org.testng.Assert; | ||
|
||
|
||
public final class TestUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can the integration test move to pinot-integration-tests? 1. it should ideally be there with all others 2. you won't have to write this util class just for flink connector, as it's already there for use in ITs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pinot-integration-tests
does not have dependency on connectors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is fine to add the test dependency for connectors to pinot-integration-tests.. we add dependency of required plugin/connector there anyway for the test. fine if this is addressed in next PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, the integration test in the spark connector is also in the spark module. I'm fine with the suggestion, though it'll make connectors build non-optional in Pinot project build. What do you think, @xiangfu0
...ors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/util/TestUtils.java
Outdated
Show resolved
Hide resolved
pinot-connectors/pinot-flink-connector/src/test/resources/fixtures/pinotTableSchema.json
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@npawar thanks for the review. comments addressed.
* the offline table of all Starbucks store locations in US, and then run this quick start to populate other Starbucks | ||
* stores in the rest of the world. | ||
*/ | ||
public final class FlinkQuickStart { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure. pinot-tools doesn't have dependency on connectors. actually we do not build connectors for the core
...-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
Show resolved
Hide resolved
...-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
Show resolved
Hide resolved
...-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
Show resolved
Hide resolved
import org.testng.Assert; | ||
|
||
|
||
public final class TestUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pinot-integration-tests
does not have dependency on connectors.
...ors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/util/TestUtils.java
Outdated
Show resolved
Hide resolved
"transformFunction": null | ||
} | ||
], | ||
"dateTimeFieldSpecs": [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is this not failing with 2 dateTimeFieldSpecs
!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, it's just a dummy file to test the RPC calls
Description
Add a Flink connector to write data to Pinot directly. This is useful for backfilling or bootstrapping tables,
including the upsert tables. You can read more about the motivation and design in this design proposal.
Upgrade Notes