-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Appenderators, DataSource metadata, KafkaIndexTask #2220
Conversation
I am so excited! |
[wip] because TODOs remain, but would really appreciate comments |
c3f6217
to
a22c035
Compare
f5e4f44
to
1fa64c3
Compare
toolbox.getEmitter().emit(metricBuilder.build("segment/txn/success", 1)); | ||
} else { | ||
toolbox.getEmitter().emit(metricBuilder.build("segment/txn/failure", 1)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i understand the usefulness of segment/txn/failure
but not sure how segment/txn/success
is going to be useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe you want to confirm that txns are actually happening? I dunno? It seemed like if one existed then the other should too
did a scan, looks like a step in right direction, waiting to see the impl for SegmentAllocator. also, i believe, we should set the "scope" for this PR to at least result in a working (even if experimental) version of no-window period realtime ingestion with tranquility. |
@himanshug SegmentAllocator has an implementation in KafkaIndexTask. Do you have a suggestion for a better name for kafka-indexing-service? I just called it that because it has Kafka stuff for the indexing service. Hmm IMO this PR should be scoped to be the base for tranquility working and for kafka working. It actually doesn't fully achieve either one but it was getting big enough that I thought it made sense to cut it off at this point and do the rest in follow on PRs. I think the follow on PRs would be:
|
we can rename kafka-indexing-service to just kafka-indexing or may be I'm overthinking? regarding scope, I would move the kafka module in a separate PR and do (2) in this PR itself. That said, I will let you take the final call on that. |
@himanshug kafka-indexing sounds cool Hmm, I would prefer to have this PR have the stuff it has for now, mostly because this stuff is "done" (ish) and the follow-on stuff is not done :). I did start working on (2) though, so depending on how long this PR is open for, we could potentially include that instead of the kafka stuff. Out of curiosity why do you prefer to have the tranquility stuff here instead of the kafka stuff? |
"Out of curiosity why do you prefer to have the tranquility stuff here instead of the kafka stuff?" also, I dint realize that (2) was going to be "tranquility specific" code, i thought that was core druid change and not a new module. |
Haha, ok :) I actually had been doing the tranquility stuff first and then took a detour through the kafka stuff as I think it is a lot simpler, and wanted to get some kind of initial thing working. The tranquility stuff ended up getting kind of complicated. I plan to go back to that soon though. But in the meantime, some review on this stuff would be great. |
final long sleepMillis = computeNextRetrySleep(++nTry); | ||
log.warn(e, "Failed publishAll (try %d), retrying in %,dms.", nTry, sleepMillis); | ||
Thread.sleep(sleepMillis); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
various places where we have long running stuff, we need to check for thread interrupt status and finish early, so that processes stop properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an interrupt will cause publishAll
to throw an InterruptedException (due to the Thread.sleep), is that enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if the Exception e caught here is an InterruptedException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, will fix.
log.makeAlert("Unknown query type, [%s]", query.getClass()) | ||
.addData("dataSource", query.getDataSource()) | ||
.emit(); | ||
return new NoopQueryRunner<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should probably be an exception instead of responding with empty result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good, will change to ISE
👍 |
handoffNotifier.start(); | ||
|
||
final FiniteAppenderatorDriverMetadata metadata = objectMapper.convertValue( | ||
appenderator.startJob(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
appenderator.startJob() might give null in many cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's ok, convertValue turns null into null, and then there's a null check in this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, dint know that converValue could handle null.
@gianm had some comments but overall looks good and from what I see this PR does not change anything in RealtimeIndexTask so it should have no impact on existing ingestion mechanism. regarding the discussion points..
|
Appenderators are a way of getting more control over the ingestion process than a Plumber allows. The idea is that existing Plumbers could be implemented using Appenderators, but you could also implement things that Plumbers can't do. FiniteAppenderatorDrivers help simplify indexing a finite stream of data. Also: - Sink: Ability to consider itself "finished" vs "still writable". - Sink: Ability to return the number of rows contained within the sink.
Geared towards supporting transactional inserts of new segments. This involves an interface "DataSourceMetadata" that allows combining of partially specified metadata (useful for partitioned ingestion). DataSource metadata is stored in a new "dataSource" table.
111cfa6
to
bbb3108
Compare
@himanshug updated with most of your review comments addressed, but I am not entirely sure what you mean on the thread w/ #2220 (comment). |
Reads a specific offset range from specific partitions, and can use dataSource metadata transactions to guarantee exactly-once ingestion. Each task has a finite lifecycle, so it is expected that some process will be supervising existing tasks and creating new ones when needed.
bbb3108
to
f22fb2c
Compare
👍 @gianm looks good overall, can you pls cleanup your commit history as needed. |
@himanshug I had it split up as 6 commit on purpose (I think the 6 commits are pretty distinct), do you think I should combine some of them? |
i did say, "as needed" in there. if its already done then, great. 👍 |
cool, just checking 😄 |
Appenderators, DataSource metadata, KafkaIndexTask
See also #1642. FYI we are running this in our cluster as of 2016-03-02, things seem to work so far…
Set of three related things (each one a separate commit):
kafka-indexing-service core extension
Includes KafkaIndexTask, which reads a specific offset range from specific partitions, and can use dataSource metadata transactions to guarantee exactly-once ingestion.
Each task has a finite lifecycle, so it is expected that a process will be supervising existing tasks and creating new ones when needed. @dclim is working on this process in https://github.com/dclim/druid/tree/kafka-supervisor.
This extension requires the other two features (DataSource metadata and Appenderators).
DataSource metadata
Geared towards supporting transactional inserts of new segments. This involves an interface
DataSourceMetadata
that allows combining of partially specified metadata (useful for partitioned ingestion). It also involves changes to theSegmentInsertAction
to allow it to take astartMetadata
andendMetadata
for compare-and-swap.DataSource metadata is stored in a new "dataSource" table.
Appenderators
Like Plumbers, but different. Appenderators are a way of getting more control over the ingestion process than a Plumber allows. They are less ambitious than Plumbers, but more flexible. In particular, they offer facilities to deal with:
But they do not do any of these things:
So you can think of Appenderators as a way of separating out the mechanical functionality of Plumbers from their decision-making processes. The idea is that existing Plumbers could be implemented using Appenderators, but you could also use them to implement workflows that the existing Plumbers can't support.
Discussion?
Some open questions (and reasons this is still marked Discuss),
AppenderatorImpl
actually has a lot of functionality and code overlap with RealtimePlumber (the query-runner stuff is particularly annoying since it is mostly similar but also kinda different). I think the main stumbling block is that the RealtimePlumber and the AppenderatorImpl have different persist directory layouts (mostly because the RealtimePlumber never has to have more than one shard per interval, but the AppenderatorImpl might) and so the code will need to be able to migrate existing data.BTW, I think if we do either of these things, they should be in future PRs rather than this one.