-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Batch ingestion replace #12137
Batch ingestion replace #12137
Conversation
Perhaps I'm missing something obvious, but why can't this be done with minimal code changes via reusing the |
|
@loquisgon but how does this solution avoid the same issue? Tombstones will be announced immediately, before the replacement segments are loaded, and so you still have the scenario where old data is unavailable but new data has not yet been loaded. To my understanding, you are attempting to retrofit atomic table replacement onto atomic partition replacement either way and so the same data availability issues exist here. To help me understand, what happens in your code if the historicals are under load and so take minutes to load the new physical segments? How are the tombstone segments held back in this case? More broadly, it would be a useful feature for Druid to have the ability to atomically swap a table, not just a partition (which is just a restatement of what you're trying to accomplish here I think). A common way this is done in other systems is to fully load the new version of a table and only once that's done update metadata pointers from the old to the new. Was a similar design considered here? Why or why not? If you've already considered and rejected this approach, can we add a paragraph to the design alternatives discussing why? |
@JulianJaffePinterest This feature is about replacing time chunks that happen to fall in a given interval with data provided in that interval, re-ingesting with |
This pull request introduces 1 alert when merging 9f2db07 into a813816 - view on LGTM.com new alerts:
|
@loquisgon doesn't the same race condition exist though? Since tombstones will be loaded instantly but actual data segments will need to be read from deep storage, memory mapped, etc. if there's load on the historicals data will be dropped from a client perspective (e.g. replaced with a tombstone) before fresh data is available. If we take your logic to determine which data segments to tombstone and instead use it to identify which segments to mark as unused (leaving the segments that will be overshadowed marked as used), what race condition does it introduce compared to your PR? |
@JulianJaffePinterest Yeah dropping the segments and replacing them with tombstones can be very similar in some cases. What I have been trying to say is that replace with tombstones will follow the standard way of segment replacement as explained in the Druid's design documentation. However, let me share the following scenario where replace with tombstones produces the intended semantics as explained in the PR's design discussion and drop does not. Start with: Replace with: However, with this new replace functionality (as stated in the design of this PR and the code), the replace would generate 12 new segments, nine of them tombstones and the three segments with data, one row each). All these segments would still partially overshadow the existing DAY segment but the net effect would be that all data in the 12 hours in the replace interval would be replaced by just the three new rows in the input (all other hours in the replace would not report any data since they are covered by the tombstones). I hope this example helps to clarify the semantic difference. |
Very interesting! Some comments on the design first. Afterwards, a musing 🙂
The musing I promised: IMO this is going to be a very useful feature beyond the
This is confusing, and it's fixed by creating tombstones in (1) rather than marking the segments unused. That way, in step (4), the tombstones would block the unwanted segments from getting reloaded. So, this patch updates |
Map<Interval, String> tombstonesAndVersions = new HashMap<>(); | ||
for (Interval interval : tombstoneIntervals) { | ||
NonnullPair<Interval, String> intervalAndVersion = | ||
findIntervalAndVersion( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally speaking, at this point, the time interval should've already been locked, but the method that you are calling here has a bunch of logic that is trying to deal with a lock not yet existing. Do you have a test case that exercises the code to somehow cause an interval to be hit here that isn't already locked? I'm curious about the case of events that can lead to that, as there is likely a bug somewhere else in the stack. This code should be able to just use the locks that already exist to get a version (that version should also generally be the same for all of the locks), it should most likely throw an error if it's gotten to this point and not locked the interval yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@imply-cheddar At the time that I wrote that code I could not find an easy way to use the allocators (SegmentAllocator
implementations) since they are typically created and used in the sub-tasks but the current code in this PR is creating them after the sub-tasks completed (in the parallel case). But I since then I thought of a potential way to integrate them in the sub-tasks (have each sub-task create the tombstones after it completed pushing all segments, create the tombstones there using its allocator, and then propagate the tombstones back to the supervisor task in the task report, then have the supervisor filter out unneeded tombstones). I will work next on using this new path, with the allocators, since the allocator should be already be prepared to deal with issues as the one you comment above. So please hold on for a little while while I try to implement this new path and provide an update to this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@imply-cheddar I have updated the way allocation is done. I decided to use the "natural" path to use the allocators. That is I integrated the creation of tombstones in each of the paths for that different partition schemes: dynamic (linear), hash and range. The idea that I used is to create the tombstones at the "end" of processing of each one of those paths. At the "end" we know the actual pushed segments and the input intervals (these are now required for dropExisting
true
) thus we can compute and allocate the tombstones. I chose the right place where the allocator is available to create the tombstones. In general each one of the sub-tasks creates its tombstones. This means that some tombstones will not be kept because some of the other sub-tasks (of the same parallel task) could have created a real segment in that interval. Thus, at the "end" when all segments are combined from the subtasks there is a process that removes those "redundant" tomsbtones. I added unit tests to the corresponding. parallel task test and tested this manually as well in a local server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took back the code where tombstones are created in the sub-tasks because they generate incorrect tombstones. I thought they could be cleaned up/removed at the supervisor task level but attempting that caused missing core partitions some times. Now the allocation is similar as before, straightforward: just use the versions from the locks. For now, code does no support segment locking and if segment locking is requested the code forces time chunk locking. Segment locking support is left as future work.
@gianm Thanks for your comments! Rather than a lengthy response now let me briefly tell you that I will keep your points in mind in this review cycle. I promise to update the PR introductory text with a section of current limitations and future work where I will address your points above. |
dcdbe09
to
68eaf36
Compare
I had a build conflict and need to clean the build before proceeding... |
68eaf36
to
35b70a7
Compare
Build should be fine now... |
…t overlaps any of the input intervals for the spec
…ng segments. Add support for tombstones in the broker.
… killer used in the peon for killing segments.
core/src/main/java/org/apache/druid/segment/loading/TombstoneLoadSpec.java
Outdated
Show resolved
Hide resolved
…oadSpec.java Typo Co-authored-by: Jonathan Wei <jon-wei@users.noreply.github.com>
Missing Co-authored-by: Jonathan Wei <jon-wei@users.noreply.github.com>
…edundant and more importantly, the test file was very close or exceeding the 10 min default "no output" CI Travis threshold.
openjdk15 test failures are due to a Travis CI issue so it is ok to merge |
* Tombstone support for replace functionality * A used segment interval is the interval of a current used segment that overlaps any of the input intervals for the spec * Update compaction test to match replace behavior * Adapt ITAutoCompactionTest to work with tombstones rather than dropping segments. Add support for tombstones in the broker. * Style plus simple queriableindex test * Add segment cache loader tombstone test * Add more tests * Add a method to the LogicalSegment to test whether it has any data * Test filter with some empty logical segments * Refactor more compaction/dropexisting tests * Code coverage * Support for all empty segments * Skip tombstones when looking-up broker's timeline. Discard changes made to tool chest to avoid empty segments since they will no longer have empty segments after lookup because we are skipping over them. * Fix null ptr when segment does not have a queriable index * Add support for empty replace interval (all input data has been filtered out) * Fixed coverage & style * Find tombstone versions from lock versions * Test failures & style * Interner was making this fail since the two segments were consider equal due to their id's being equal * Cleanup tombstone version code * Force timeChunkLock whenever replace (i.e. dropExisting=true) is being used * Reject replace spec when input intervals are empty * Documentation * Style and unit test * Restore test code deleted by mistake * Allocate forces TIME_CHUNK locking and uses lock versions. TombstoneShardSpec added. * Unused imports. Dead code. Test coverage. * Coverage. * Prevent killer from throwing an exception for tombstones. This is the killer used in the peon for killing segments. * Fix OmniKiller + more test coverage. * Tombstones are now marked using a shard spec * Drop a segment factory.json in the segment cache for tombstones * Style * Style + coverage * style * Add TombstoneLoadSpec.class to mapper in test * Update core/src/main/java/org/apache/druid/segment/loading/TombstoneLoadSpec.java Typo Co-authored-by: Jonathan Wei <jon-wei@users.noreply.github.com> * Update docs/configuration/index.md Missing Co-authored-by: Jonathan Wei <jon-wei@users.noreply.github.com> * Typo * Integrated replace with an existing test since the replace part was redundant and more importantly, the test file was very close or exceeding the 10 min default "no output" CI Travis threshold. * Range does not work with multi-dim Co-authored-by: Jonathan Wei <jon-wei@users.noreply.github.com>
Druid batch ingestion replace high level design
This PR adds tombstones to Druid in order to support complete replace semantics. It supports tombstones for primary partitioning and all forms of secondary partitioning (dynamic/linear, hash and range).
Current known limitations (as of March 1, 2022)
The intention is to merge with the current limitations but to remove them in future releases.
The broker is filtering out tombstones from its timeline.
This is a limitation that can be fixed in this PR or leave as future work. The consequence of this is that metadata queries will not consider tombstones. In the extreme case where a datasource's visible segments are all tombstones (i.e. the datasource is practically empty) then queries will fail with an exception. Future work can address this by enabling metadata queries for tombstones or some other means.
Metadata queries will not reflect metadata in tombstones
This is a consequence of the fact that brokers are filtering out tombstones. This will be fixed in a future patch.
SEGMENT
locking is not supportedReplace will force
SEGMENT
locking to useTIME_CHUNK
locking (a log message will be emitted in the task log. Future work can address this.Cleanup of tombstones
If all the segments are being completely overshadowed by tombstones and the overshadowed segments are all marked as not used then the tombstones can be removed. From a cleanup perspective tombstones are the same as regular segments so the general cleanup process should suffice for tombstones as well (i.e.
KillUnusedSegments
coordinator duty will clean them up.)Tombstone remains after append
If data is appended to an interval with a tombstone the tombstone remains and new data segments are appended. No query issues have been observed.
Upgrade/Downgrade process
Upgrade
The
dropExisting
flag is being reused for tombstones since tombstones subsume the previous semantics but without the race conditions inherent in the original implementation. This should be fine. If there is a task executing with that flag set while an upgrade is going on, since the tombstones (or dropping segments) are all executed in the main task (single task in sequential, supervisor in parallel ingestion), the outcome depends on which code the main task was in. If main task was still in the "old" code (because of a rolling upgrade) then existing segments will be dropped (which is fine since that's what the user wanted and the drop APIs are still in the code). If it is new code, then tombstones will be created which will also meet the expectations since tombstones subsume dropping segments.Downgrade
If you upgrade to a version with tombstone support and you use the tombstone feature (i.e. tombstones are created) and then you want to downgrade your system should continue to work but with potential exceptions being thrown regarding the tombstones created and not recognized by previous versions. This should not make Druid stop working but will pollute the logs with the exceptions. The only way to stop those exceptions would be to manually delete the tombstones from the segments metadata table for the corresponding datasources.
Problem statement
Assume there is an existing data source that has data across its time chunks. Assume that you want to replace data for this data source from time
t1
to timet2
wheret2
>t1
. Assume that the some of the replacement time chunks generated by the input data set and its ingestion spec will not have any data. Also assume that the original data set contained data for some of those replacement time chunks. Then the original intervals for those replacement time chunks will not be "replaced, they will keep their data in the original data source. Thus the end result is that today it is not possible to support the intended semantics of a "replace" functionality. This design is intended to make the necessary changes to Druid to support replace semantics for data.The following example may help clarify the problem.
Example of today functionality
Assume we have the following data set (with
DAY
segment granularity):Now assume we re-ingested same data/segment granularity as above but with a filter removing all countries that have "China" as their value. If we do this we may expect all rows with "China" to be removed from our data set. However this is what we get:
Only the first row with country "China" was removed! This is consistent with today's design but it may be surprising especially for new Druid users. What is going on is that the filter is removing (i.e. 'throw away" in Druid ingestion terminology) all rows with country values "China" so there is no ingestion data for the existing segment for DAY
4/2/20
(i.e. a DAY "time chunk") and therefore the original segment for that time-chunk remains in the data source.What we need is a concept of "replace with nothing". In many databases this is called a "tombstone". What we need is a functionality that we need to tell Druid in batch ingest: "please replace all time chunks in this interval even if there is no data for them.". This is what this feature is all about.
Assuming that we have changed Druid to have the replace functionality using tombstones and assume also that the existing flag
dropExisting
has been redefined to support it. So given that we have this functionality, all we have to do with the above re-ingest spec is to set the parameterdropExisting
totrue
and expect to get the new data:Details & design idea
A replace batch ingestion spec (or simply replace spec) is a batch ingestion spec with the following characteristics:
Input data (the usual stuff)
Batch ingestion intervals in the ioConfig section of the spec (the usual stuff) (we call this the "replace intervals")
A flag in the ioConfig (reuse
dropExisting
) that indicates that this is replace of time chunks (this is a new behavior for an existing flag supporting similar but not quite exactly the same behavior using a different implementation)The idea is that after all sub-tasks for a particular ingestion completes and it is ready to publish the segments then, before publishing, the task identifies real segments in input intervals as well as "empty" intervals that should contain tombstones. The tombstones will be represented as
DataSegment
like real segments but inside its metadata they are clearly identifiable as a "tombstone". After this point, the tombstone segments are added to the real segments and published together by the Overlord. Then the coordinator takes over and assigns them to historicals which "load" tombstones (i.e. being aware that tombstones are special and are not actually stored in the file system) and announce them to the broker. The broker would then incorporate tombstones into its time line and either filtering them out when queries arrive or not filtering them out and send the queries to the historicals which know that queries in tombstone intervals should return no data.Thus the following are our design goals:
DataSegment
(but with no backing file in deep storage or in local segment caches) and should be transparent to most of the existing codeComponents and their corresponding changes
Middlemanager/peon (create, push tombstones, publish to overlord)
DataSegment
withshardSpec
of a new typeTombstoneShardSpec
. Call theseDataSegments
tombstones . TheDataSegments
for tombstones are created using versions from theTIME-CHUNK
locks, thusSEGMENT
locking is not supported. To minimize coordinator issues we gave tombstones a size of 1.DataSegments
as usualGranularitySpec
. IfdropExisting
is set to true with an ingestion spec with no input intervals then the ingestion spec is immediately rejected with an error.SEGMENT
locking to useTIME_CHUNK
locking (a log message will be emitted in the task log.Coordinator
Assignment: The coordinator by means of the load rules detects that new segments exist for the data source. The new segments may contain tombstones, the coordinator does not care about this and uses its usual assignment strategy to assign the data source’s segments (some of them are tombstones) to historically.
The only potential change in the coordinator may be having an assignment strategy for tombstones so that they all do not end up in the same historical but this may be left for future work.
Historical query path
QueriableIndex
marked as “tombstone” (using a newQueriableIndex
method:boolean isFromTombstone()
) and announces the tombstoneNoOpQueryRunner
(this is an existing query runner that always returns an empty result set, exactly what is needed for tombstones). Thus the part of the query hitting a tombstone queryable index will return an empty sequence since that is how theNoOpQueryRunner
works and it is the expected result set from a tombstone since tombstones do not have any rows.Broker
VersionedIntervalTimeline
that avoids them in itslookup
interval method.Compaction:
Cleaning tombstone datasegments
DataSegment
s in the segment metadata Druid table they are candidates for overshadowing, being markedunused
, etc. like any otherDataSegment
. Initially we will leave it to Druid to clean them up the same as non-tombstone segments. If needed, in the future we may add more aggressive cleanup mechanisms for tombstones.Design alternatives
Probably the first idea that comes to mind instead of tombstones is simply dropping existing segments when replace is desired. This was actually already tried and implemented (PR). However as you can see in the discussion that functionality is currently marked as "experimental" due to a race condition between dropping and loading segments that may make data temporarily unavailable. Besides dropping segments by itself is not enough to support the full replace functionality. For instance when replacement data using finer granularity may not be always possible to drop the existing, coarser, segment (the replace PR has an example in one of the comments). So dropping segments is more like a subset of replace use cases therefore the new replace functionality discussed in this document subsumes the current
dropExisting
functionality but eliminating the race condition.We also considered variations of assigning, announcing and processing queries for datasources with tombstones. The tombstone creation was pretty stable and we feel strongly that using
DataSegment
to represent tombstones and the normal publishing process works well. For the assigning & announcement we considered having the coordinator announcing them and the broker incorporating them into its timeline. We discarded this option though using the reasoning to stick with the current design as much as possible.Implementation status
As of Feb 15, 2022
Functionality is restricted to
TIME_CHUNK
locking and tombstone versions are obtained from the lock's versions. A newshardSpec
,TombstoneShardSpec
is introduced.As of Feb 12, 2022
Segment locking is now allowed for all forms of secondary partitioning. Tombstone allocation is done using the corresponding allocators normally used in the particular code flow where tombstones are created.
As of Feb 1, 2022
All functionality as designed has been implemented with the following potential exceptions. The PR build is green. All existing tests for the similar
dropExisting
flag have been kept and refactored to matchreplace
semantics with tombstones (dropExisting
should continue working as is from a functional perspective even though it is now implemented as tombstone). New unit test have been added for new components introduced.This functionality will not support segment locking. If segment locking is requested at the same time that replace is requested, timeChunk locking will be forced.
Implementation status
As of January 31, 2022
All functionality as designed has been implemented with the following potential exceptions. The PR build is green. All existing tests for the similar
dropExisting
flag have been kept and refactored to matchreplace
semantics with tombstones (dropExisting
should continue working as is from a functional perspective even though it is now implemented as tombstone). New unit test have been added for new components introduced.The most visible potential gap at this point is lack of complete support for
SEGMENT
locking. We could prevent replace for the case ofSEGMENT
locking and leave this support for a future release or just further work on the PR to fully support segment locking.This PR has: