Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Flink Pinot connector #8233

Merged
merged 12 commits into from
Mar 7, 2022
Merged

Add Flink Pinot connector #8233

merged 12 commits into from
Mar 7, 2022

Conversation

yupeng9
Copy link
Contributor

@yupeng9 yupeng9 commented Feb 20, 2022

Description

Add a Flink connector to write data to Pinot directly. This is useful for backfilling or bootstrapping tables,
including the upsert tables. You can read more about the motivation and design in this design proposal.

Upgrade Notes

  • New plugins of Flink connector added

@yupeng9
Copy link
Contributor Author

yupeng9 commented Feb 20, 2022

@walterddr please review

@codecov-commenter
Copy link

codecov-commenter commented Feb 20, 2022

Codecov Report

Merging #8233 (e0cd7b2) into master (46ed731) will decrease coverage by 1.32%.
The diff coverage is n/a.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #8233      +/-   ##
============================================
- Coverage     70.83%   69.51%   -1.33%     
  Complexity     4245     4245              
============================================
  Files          1631     1631              
  Lines         85462    85490      +28     
  Branches      12877    12878       +1     
============================================
- Hits          60539    59427    -1112     
- Misses        20746    21923    +1177     
+ Partials       4177     4140      -37     
Flag Coverage Δ
integration1 ?
integration2 27.57% <ø> (-0.04%) ⬇️
unittests1 66.97% <ø> (+0.01%) ⬆️
unittests2 14.17% <ø> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...pinot/minion/exception/TaskCancelledException.java 0.00% <0.00%> (-100.00%) ⬇️
...nverttorawindex/ConvertToRawIndexTaskExecutor.java 0.00% <0.00%> (-100.00%) ⬇️
...e/pinot/common/minion/MergeRollupTaskMetadata.java 0.00% <0.00%> (-94.74%) ⬇️
...plugin/segmentuploader/SegmentUploaderDefault.java 0.00% <0.00%> (-87.10%) ⬇️
.../transform/function/MapValueTransformFunction.java 0.00% <0.00%> (-85.30%) ⬇️
...ot/common/messages/RoutingTableRebuildMessage.java 0.00% <0.00%> (-81.82%) ⬇️
...data/manager/realtime/DefaultSegmentCommitter.java 0.00% <0.00%> (-80.00%) ⬇️
...verttorawindex/ConvertToRawIndexTaskGenerator.java 5.45% <0.00%> (-80.00%) ⬇️
...ache/pinot/common/lineage/SegmentLineageUtils.java 22.22% <0.00%> (-77.78%) ⬇️
...ore/startree/executor/StarTreeGroupByExecutor.java 0.00% <0.00%> (-77.78%) ⬇️
... and 107 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 46ed731...e0cd7b2. Read the comment docs.

Response response = mock(Response.class);
MultivaluedMap<String, Object> headers = getRequestHeadersToPinot();
String expectedFullURL = "http://localhost:9000/tables/demand?type=realtime";
final Map<String, Object> resEntity = new HashMap<>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems not compilable for jdk8:

Error:  Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile (default-testCompile) on project flink-pinot-sink: Compilation failure
Error:  /home/runner/work/pinot/pinot/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/http/PinotControllerClientTest.java:[124,53] error: cannot infer type arguments for HashMap<K,V>
Error:    reason: cannot use '<>' with anonymous inner classes
Error:    where K,V are type-variables:
Error:      K extends Object declared in class HashMap
Error:      V extends Object declared in class HashMap
Error:  -> [Help 1]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we dropped jdk8 support? not yet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, this usage HashMap<> is all over the codebase

"transformFunction": null
},
"primaryKeyColumns" : [ "demand_uuid" ]
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: new line, same for other files



/** This file is partially copied from RTAUMS gateway package */
public class PinotControllerClient extends HttpClient {
Copy link
Contributor

@xiangfu0 xiangfu0 Feb 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this is a very helpful Client.

Just drop my random thought here:
Shall we rename this to PinotAdminClient and move it to the pinot-java-client module then import it back?

cc: @KKcorps @Jackie-Jiang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH we already have something very similar but is only used in test: ControllerTestUtils. we can probably consider a prime version of it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, I'm fine with consolidating/extracting the client into a more reusable lib. But I think it's better to do this in a later PR, to reduce the scope of this PR, which is already quite large.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, can you create a new issue for consolidating the reusable utils with potential candidates that can be merged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. tracked at #8246



/** This is copied from {@link HttpClient} in RTA-UMS project. */
public abstract class HttpClient {
Copy link
Contributor

@xiangfu0 xiangfu0 Feb 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this abstract? I feel this class is more like static util methods if making _httpClient static.
Also, we have implementations at AbstractBaseAdminCommand in pinot-tools for the same purpose, you can also consider moving this logic to pinot-common :p

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it's just some helper methods that can be reused by subclasses. For example, if we want to add other clients to broker/server etc

@walterddr
Copy link
Contributor

this looks super great. thanks @yupeng9 for the contribution. I will take a look this weekend thoroughly

Copy link
Contributor Author

@yupeng9 yupeng9 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed comments

Response response = mock(Response.class);
MultivaluedMap<String, Object> headers = getRequestHeadersToPinot();
String expectedFullURL = "http://localhost:9000/tables/demand?type=realtime";
final Map<String, Object> resEntity = new HashMap<>() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, this usage HashMap<> is all over the codebase

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @yupeng9 for the contribution. I briefly looked over most of the implementions and found some issues. please kindly take a look.

Since this is largely based on my previous POC: https://github.com/walterddr/flink-pinot-sink my review might've been biased. it would be great if @npawar can also take another look at the segment write/upload logic.

thanks

import org.apache.pinot.spi.data.readers.GenericRow;


public class PinotMapRecordConverter implements RecordConverter<Map> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public class PinotMapRecordConverter implements RecordConverter<Map> {
public class PinotMapRecordConverter implements RecordConverter<Map<String, Object>> {

to avoid unchecked casts.

private BatchIngestionConfig _batchIngestionConfig;

public FlinkSegmentUploader(int indexOfSubtask) {
_indexOfSubtask = indexOfSubtask;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indexOfSubtask is not used in uploaded. I think it only matters in segment writer.

* data and also share that data with checkpoint state. Instead it uses an internal buffer within
* PinotSinkFunction for checkpoint.
*
* <p>This should change once we introduce FlinkPinotSegmentWriter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this javadoc is outdated.

Comment on lines 90 to 94
// TODO improve segment uploader to use in-memory buffer then flush to tar file.
_segmentWriter = new FlinkSegmentWriter(indexOfSubtask, getRuntimeContext().getMetricGroup());
_segmentWriter.init(_tableConfig, _schema);
// TODO improve segment uploader to take in-memory tar
// TODO launch segment uploader as separate thread for uploading (non-blocking?)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clean up TODOs.

// ...
}

private void flush()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

potential race condition when snapshot and invoke both calls flush?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have a race conditon? Only invoke can call it, since snapshotState is not used in batch processing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case we can just throw an exception? rather than invoking flush in snapshotState? if it is not use anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

_reusableRecord = new GenericData.Record(_avroSchema);

// Create tmp dir
// TODO staging Dir also need to be subtask separated otherwise there will be write conflict.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clean up TODO

private transient Counter _processedRecords;
private transient volatile long _lastRecordProcessingTimeMs = 0;

public FlinkSegmentWriter(int indexOfSubtask, MetricGroup metricGrp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Refactor common code between FlinkSegmentWriter and FileBaseSegmentWriter but can be done in separate PR

@@ -0,0 +1,130 @@
/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might've been wrong but I dont see the entire package of org.apache.pinot.connector.flink.http used in anywhere outside of this package.

is this module relevant to the flink connector PR at all? or this is simply used for integration/e2e test as an easy util?

i've deleted this entire module and tests runs just fine, so if not relevant I suggest we delete this module

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in the quickstart to show how to use REST API to fetch the schema and table config to config the connector. Otherwise developer needs to manually craft those configs.

Also, you can take a look at PinotConnectionUtils for the useful config management.

Copy link
Contributor

@walterddr walterddr Feb 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but it is not related to the flink connector. for the sake of this PR's scope. i would suggest we drop it and add in separate PR.

mainly for the following reasons.

  • it doesn't relate to flink connector, at first I thought we are utilizing http to directly ingest into pinot via some sort of REST API. but if it is not the case it shouldn't be part of flink-pinot connector but rather in pinot-tools.
  • @xiangfu0 also think we can refactor this part out later. doesn't make sense to get this in an refactor it out if it is not part of the critical path
  • there's already one util in test call ControllerUtils that can be used for demonstration purpose.

Not saying it is not useful, but it would be much easier to address it separately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. I think this is an integral part to show how the connector is used as in QuickStart, without it I don't think the connector is complete. It doesn't make sense to expect developers to manually create the Schema as well as TableConfig, not to mention several configurations decoration needed in the PinotConnectionUtils .

Xiang's point is that in later PR we can refactor this part of code with a reusable client lib. However, from the connector's user perspective, it doesn't change the way of using the connector.

Makes sense?

Copy link
Contributor

@walterddr walterddr Feb 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. I must have missed something. here is what I did:

  1. deleted the entire folder under src/main/java/org/apache/pinot/connector/flink/http.
  2. run mvn test -pl pinot-connectors/pinot-flink-connector

everything runs pass. this proves that the http folder has nothing to do with this PR. yes?

did you forget to add a Quickstart? I don't see a quickstart in this PR or any changes outside of the pinot-flink-connector module. (and if so, let's add it in a separate PR i suggest)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm , I see. This file is filtered by one of the .gitignore rules. Renamed it and pushed again.

<version>0.10.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>flink-pinot-sink</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
<artifactId>flink-pinot-sink</artifactId>
<artifactId>pinot-flink-connector</artifactId>

Comment on lines 88 to 98
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add <scope>test</scope> to these 2 dependencies.

Copy link
Contributor Author

@yupeng9 yupeng9 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@walterddr Thanks for the review. Addressed the comments PTAL.

@@ -0,0 +1,130 @@
/**
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in the quickstart to show how to use REST API to fetch the schema and table config to config the connector. Otherwise developer needs to manually craft those configs.

Also, you can take a look at PinotConnectionUtils for the useful config management.

// ...
}

private void flush()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have a race conditon? Only invoke can call it, since snapshotState is not used in batch processing.

@Jackie-Jiang Jackie-Jiang added the release-notes Referenced by PRs that need attention when compiling the next release notes label Feb 28, 2022
*/
@SuppressWarnings("NullAway")
@NotThreadSafe
public class FlinkSegmentWriter implements SegmentWriter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was creating a Flink specific impl of SegmentWriter needed? Why couldn't we just use the FileBasedSegmentWriter? most of the code looks the same, with some additional metadata like seqId and rowCount etc. Aren't those concepts generic enough that we could just add them to FileBasedSegmentWriter and enhance it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could with a later refactoring. I think one of the reasons is that we don't have an abstract class of SegmentWriter that allows customization like in this Flink case. Also, initially, this connector was not planned to add to Pinot repo, but Flink repo, which means this connector has to depend on the published pinot library only.

Similar to the consolidation with the client utils, I think we can later consolidate the SegmentWriter, after they are in the same repo.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the Sink should have been in Flink, but the Writer should have always been in Pinot, and it should have been from the impls offered. If by consolidate later, you mean soon enough, I'm fine with starting with this. But this can easily get deprioritized and fall off the plate once it is non-blocking :) And the code really is identical, except a few tweaks here and there which the default impl can benefit from anyway..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair. Let me address this in a follow-up PR, and it shouldn't be hard. But I try not to modify other modules in this PR, so it'll be a pure module addition.



/**
* Default implementation of {@link SegmentUploader} with support for all push modes The configs for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc needs to be updated

import org.slf4j.LoggerFactory;


/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the javadoc from FileBasedSegmentWriter. call out here why FlinkSegmentWriter needed to be added, why it is different from FileBased, and any todos to unify the 2. imo, FlinkSegmentWriter should just extend the FileBasedSegmentWriter for the flink metrics related special casing. Everything else should get folded into FileBased impl.

* Default implementation of {@link SegmentUploader} with support for all push modes The configs for
* push are fetched from batchConfigMaps of tableConfig
*/
@SuppressWarnings("NullAway")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class seems exactly identical to SegmentUploaderDefault. Can we just use that one?

} catch (InterruptedException e) {
_executor.shutdownNow();
}
_segmentWriter.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do this in finally?

pinot-connectors/pinot-flink-connector/README.md Outdated Show resolved Hide resolved
public PinotRowRecordConverter(RowTypeInfo rowTypeInfo) {
_rowTypeInfo = rowTypeInfo;
_fieldNames = rowTypeInfo.getFieldNames();
_fieldTypes = rowTypeInfo.getFieldTypes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is unused. remove?

import org.apache.pinot.spi.data.readers.GenericRow;


public class PinotMapRecordConverter implements RecordConverter<Map<String, Object>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(optional) this name is not the very intuitive..
How about PinotGenericRowConverter as interface, and MapGenericRowConverter and FlinkRowGenericRowConverter as impls?

* the offline table of all Starbucks store locations in US, and then run this quick start to populate other Starbucks
* stores in the rest of the world.
*/
public final class FlinkQuickStart {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this just be in pinot-tools with the rest of the Quickstarts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure. pinot-tools doesn't have dependency on connectors. actually we do not build connectors for the core

import org.testng.Assert;


public final class TestUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the integration test move to pinot-integration-tests? 1. it should ideally be there with all others 2. you won't have to write this util class just for flink connector, as it's already there for use in ITs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pinot-integration-tests does not have dependency on connectors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is fine to add the test dependency for connectors to pinot-integration-tests.. we add dependency of required plugin/connector there anyway for the test. fine if this is addressed in next PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, the integration test in the spark connector is also in the spark module. I'm fine with the suggestion, though it'll make connectors build non-optional in Pinot project build. What do you think, @xiangfu0

Copy link
Contributor Author

@yupeng9 yupeng9 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@npawar thanks for the review. comments addressed.

pinot-connectors/pinot-flink-connector/README.md Outdated Show resolved Hide resolved
* the offline table of all Starbucks store locations in US, and then run this quick start to populate other Starbucks
* stores in the rest of the world.
*/
public final class FlinkQuickStart {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure. pinot-tools doesn't have dependency on connectors. actually we do not build connectors for the core

import org.testng.Assert;


public final class TestUtils {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pinot-integration-tests does not have dependency on connectors.

"transformFunction": null
}
],
"dateTimeFieldSpecs": [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is this not failing with 2 dateTimeFieldSpecs !

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, it's just a dummy file to test the RPC calls

@yupeng9 yupeng9 merged commit e41d86b into apache:master Mar 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants