-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Experimental kafa simple consumer based firehose #1609
Conversation
@@ -352,9 +390,13 @@ public void doRun() | |||
{ | |||
try { | |||
for (Pair<FireHydrant, Interval> pair : indexesToPersist) { | |||
metrics.incrementRowOutputCount(persistHydrant(pair.lhs, schema, pair.rhs)); | |||
metrics.incrementRowOutputCount( | |||
persistHydrant( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
outstanding review comment --
@gianm - Will we lose data if one hydrant is persisted with the metadata, then the plumber crashes? If I'm reading the code right, that would cause the next bootstrap to think that all the previously read data was persisted.
@himanshug - hmmm... that sounds correct, still thinking what would be the right thing to do here...
may be create a marker file in the end at /persist_dir/datasource/, and use commit metadata information only if the marker file was present?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cheddar what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to store the metadata outside the segments in a separate file. This is because the commit metadata isn't really associated with an individual segment-- it's associated with a set of segments that are persisted at the same time. So storing it in the segments is asking for problems.
Sort of like this:
{
"metadata" : {"foo": "bar"},
"segments": [
{"id": "datasource_2000_2001_2000_1", "hydrant": 10},
{"id": "datasource_2001_2002_2001_1", "hydrant": 12},
]
}
When a realtime node crashes and starts back up, it would delete any hydrants numbered higher than the ones in the commit file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I look more at this code, I think I agree about commitMetadata be associated with whole datasource at this level really and not to individual segments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also just include the set of segments for the same chunk of metadata in more metadata on each of the segments.
I think there is value is storing it inside the segment as a form of lineage.
I also don't necessarily feel so strongly about it that I would be against a separate file, necessarily. I don't think that has to be changed in this initial PR, however. It actually unravels and creeps out the scope quite a bit 'cause it also requires us to consider the hand-off in terms of the full set of segments being handed off instead of individual segments being handed off (that is, if one of the set succeeds in handing off and the others fail, the real-time would believe that it needs to re-ingest the data).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
noted the discussion in comments for future.
c579ce1
to
3b0e12d
Compare
3b0e12d
to
b2d9c8e
Compare
👍 after merge conflicts / travis are resolved |
b2d9c8e
to
515ae0c
Compare
@gianm resolved the merge conflict, jdk8 build actually failed on an unrelated test and hopefully will pass this time. |
bouncing for travis |
Thanks @himanshug Hmm, does @cheddar's +1 on #1482 count towards this one? Anyone else want to / available to take a look? |
@cheddar's +1 should count |
Ok, sgtm. Will merge in a bit unless there are further comments. /cc @drcrallen @nishantmonu51 @xvrl who had commented on the previous PR. |
byte[] stringBytes = new byte[length]; | ||
in.get(stringBytes); | ||
return new String(stringBytes, UTF8); | ||
return new String(readBytes(in, length), UTF8); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can StringUtils.fromUtf8 be used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will change
{ | ||
return new File( | ||
persistDir.getAbsolutePath() | ||
.replace(schema.getDataSource(), "corrupted/" + schema.getDataSource()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to use Path to build path than to assume "/" is the proper delimiter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will change that to use File.Separator to remove the delimiter assumption
Is it possible to test the corruption code paths? |
515ae0c
to
0b5e8d9
Compare
@drcrallen addressed all your review comments in latest commit. |
yeah, I was hoping to rebase the other PR off this one and then build on that. |
</dependency> | ||
<dependency> | ||
<groupId>org.apache.kafka</groupId> | ||
<artifactId>kafka_2.10</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(not blocking for this PR) we may need to consider better ways to handle the "different scala version require different artifact IDs" thing.
0b5e8d9
to
3b60351
Compare
@drcrallen updated code to have messaging around metadata parsing failure in IndexIO |
I'm 👍 on this. It is experimental and not extremely impacting. |
I think we have all the votes we need for this PR. Any more blockers/concerns? |
👍 Provided that it will be revisited to add better corruption testing, and address some of the firehose concerns that could be deferred until later. |
lgtm, @himanshug do you want to squash the commits a little? Maybe into one for the original patch and one for your changes, or however you want to do it |
3b60351
to
2e830e3
Compare
firehoseV2 addition to Realtime[Manager|Plumber], essential segment metadata persist support, kafka-simple-consumer-firehose extension patch
2e830e3
to
2e0dd1d
Compare
@gianm rebased/squashed in 2 commits , 1 from original patch and another with the changes to address review comments |
@gianm ok, finally the build has passed :) |
Experimental kafa simple consumer based firehose
tracks and addresses the review comments on #1482
copied description from #1482
This feature introduces a simple consumer implementation for realtime firehose. It keeps track of current offset metadata by storing it in metadata.drd with smooshed files, and will be able to recover the previous offset position after restart.
On restart, it take down the offset from the sequentially "last" valid persisted file, and rename all incomplete persist directory to a path for corrupted data. e.g. there are sub dirs /1, /2, /3, ..../8 under directory mydatasource/20150630T10:00:00-11:00:00/, and /7 doesn't contain meta.smoosh, RealtimePlumber will rename both /7 and /8 to corrupted/mydatasource/20150630T10:00:00-11:00:00/* and use offset from /6 as starting point.
When indexing, it follows the logic:
Note it will only advance when current row is successfully processed, which means saving the end offset of current message.
Google Group reference:
https://groups.google.com/forum/#!topic/druid-development/9HB9hCcqvuI
The goal of this PR is to
Even after this is merged, firehoseV2 is expected to be experimental and should not be the goto firehose for realtime ingestion. That will come after more adjustments, likely. Or, it's possible that this initial attempt informs things such that we actually go and change the interfaces or add a firehoseV3. As it stands, the PR does the useful thing that we initially need it to do and is hopefully a good spring board for further evolution.