-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
streaming version of select query #3307
Conversation
Do we need granularity for the scan query? |
@@ -0,0 +1,28 @@ | |||
<?xml version="1.0" encoding="UTF-8"?> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing license header
Can we do this by adding |
5acde1c
to
4b20b35
Compare
Scan query is different from the select query in many ways, not only the batchSize thing. So I think it's better to create a new query type instead of reusing the select query. |
Thanks @kaijianding for posting this! To me, the differences (result format, lack of ordering, different processing style) are substantial enough that this should be a different query type from select. we might also want to:
|
@@ -108,6 +108,7 @@ | |||
<module>extensions-contrib/parquet-extensions</module> | |||
<module>extensions-contrib/statsd-emitter</module> | |||
<module>extensions-contrib/orc-extensions</module> | |||
<module>extensions-contrib/scan-query</module> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. I heard it's supposed to be sorted. orc-extentions
was already not like that but we can fix altogether here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides orc-extentions, I found more exceptions is the list, I think ordering is not a requirement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is nice if this is sorted :)
@kaijianding, do you have any thoughts on #3307 (comment)? |
@gianm |
@kaijianding do you have any benchmarks around how fast data can be read using this query? Can we compare it against select? I suspect the difference will be substantial |
@JsonProperty("limit") int limit, | ||
@JsonProperty("filter") DimFilter dimFilter, | ||
@JsonProperty("columns") List<String> columns, | ||
@JsonProperty("context") Map<String, Object> context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we will need documentation similar to the select query on how to use the scan query
I looked this over and it looks very good to me. One main comment is that the query doesn't use the processing pool and hence will not respect query priorities and may have multi-tenancy impacts but I am still in favor of the approach taken. I think we need to note that in the docs at a minimum. 👍 after docs are added I almost want to move this up to 0.9.2 :P |
@kaijianding @KurtYoung is this still WIP? It looks ready |
Still figuring out on how to do global limit from broker side to avoid sending requests to more historicals/realtimes if data from these servers definitely won't appear in final results after final merge(usually the first server can provide enough results if query.limit is set). In another word, can broker be sending requests to servers in sequence instead of in parallel? |
@kaijianding no, we don't need caching for scan query IMO. A large result set will just fill up the entire cache. |
} | ||
|
||
private Object rowsToValueVector() { | ||
// only support list now, we can support ValueVector or Arrow in future |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UnsupportedOperationException is probably safer, otherwise callers will get things they don't expect
@kaijianding IMO if there's a global limit it's fine to hit every data node and just throw away some data past the limit. This isn't optimal performance wise, but I think that the common use case for scan is likely to be with no limit and/or with hitting data nodes directly. So, I think it's okay if we don't fully optimize for the case where there is a limit and the query hits a broker. I think caching is unnecessary, so we can just return a null caching strategy. Docs and tests are important though. @fjy I'm ok with committing to this as a core feature of druid rather than a contrib extension if the authors are into that. It seems like it'd be useful for integrating Druid with other systems. |
@kaijianding have you had a chance to work on this recently? Like mentioned in #3307 (comment), I think it's ok to pass the global limit down to each data node for the first version of the broker-side query. IMO it would be nice to get something working here in master and then improve on it. |
f72b83c
to
11bdfd0
Compare
11bdfd0
to
8d17ebb
Compare
d6e189e
to
f098c26
Compare
@fjy it is ready for review now |
@KenjiTakahashi it needs to modify QueryResource to return line delimited JSON batches, maybe in another PR. |
@kaijianding Agreed that this can be considered a separate feature, for another PR. I've tried the "compactedList" format and it is indeed noticeably (not much, but still) faster than the other one. Thanks again for working on this, we really appreciate it! |
} | ||
|
||
// at the point where this code is called, only one datasource should exist. | ||
String dataSource = Iterables.getOnlyElement(query.getDataSource().getNames()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Preconditions.checkArgument(..) instead, looks like dataSource isn't used and this line is there only for validation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unused code, will remove it.
allDims.retainAll(query.getColumns()); | ||
allMetrics.retainAll(query.getColumns()); | ||
} | ||
if (query.getColumns() == null || query.getColumns().isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't this the else clause for the if before, why not else {...}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.
Will change to else.
filter, | ||
intervals.get(0), | ||
VirtualColumns.EMPTY, | ||
QueryGranularities.ALL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure why this is truncating the timestamps completely... did you mean QueryGranularities.NONE really?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be ALL. I only want 1 cursor, and can read all data in one iterator.
If it is NONE, it will create 1 cursor per millisecond, see CursorSequenceBuilder Iterable<Long> iterable = gran.iterable(interval.getStartMillis(), interval.getEndMillis());
and BaseQueryGranularity.iterable().
AllGranularity.iterable() returns single element iterable which is what I actually need
events = rowsToList(); | ||
} | ||
responseContext.put("count", (int) responseContext.get("count") + (offset - lastOffset)); | ||
responseContext.put("timeoutAt", timeoutAt - (System.currentTimeMillis() - start)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "count" and "timeoutAt" can be pulled in some static variable somewhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified.
|
||
public class ScanResultValue implements Comparable<ScanResultValue> | ||
{ | ||
public static final String timestampKey = "timestamp"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in other places, we use __time
e.g. in groupBy queries etc. It might make sense to be consistent. Also, you can probably refer to Column.TIME_COLUMN_NAME
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I follow the select query: io.druid.query.select.EventHolder.timestampKey
Do you think it is better to use __time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dint realize that select query was using "timestamp", i guess its better to be consistent with select query so its fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@himanshug why you don't used "__time" instead of "timestamp"
final Iterable<QueryRunner<ScanResultValue>> queryRunners | ||
) | ||
{ | ||
// in single thread and in jetty thread instead of processing thread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason for not using the queryExecutor which would scan segments in parallel and following the query cancellation behavior?
I guess, you've done it to avoid parallelization so that you can limit the number of rows across segments. In that case, you can still use queryExecutor but give it single callable that scans all segments and does check for thread interrupted before scanning each segment and abort if thread was interrupted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to limit the memory usage when scan segments, memory is the pain point of select query which buffers too many data in memory before return results to client.
So I decide to prepare the next batch when the client ask for it.
Another reason for using qtp instead of queryExecutor is that, the processing thread is valuable and usually number limited(because the direct memory is configed per processing thread). Usually the Scan query can last a very long time, it uses qtp thread anyway, so I think it is not a good idea to occupy one more processing thread for a very long time which may block other queries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with processing thread, you would reserve a ByteBuffer but beyond that memory usage could stays as your current implementation.
ok, i see the argument about not taking up a processing thread for long time and letting them be available for other regular queries... then query cancellation does not work right now. but, it does not matter if you only do scan query directly on the historical.
@kaijianding it appears that if I send a query with a limit specified to broker... then it would give me different results on different calls based on which historicals responded first to the broker? in that case we should at least note that in the doc. or may be remove the limit at broker and then the limit becomes "per historical" really. |
The CachingClusteredClient will sort the sequences by query.getResultOrdering(), for scan query, it depends on the ordering of segmentId(see ScanResultValue.compareTo()), so the broker limit should always return same result. |
* streaming version of select query * use columns instead of dimensions and metrics;prepare for valueVector;remove granularity * respect query limit within historical * use constant * fix thread name corrupted bug when using jetty qtp thread rather than processing thread while working with SpecificSegmentQueryRunner * add some test for scan query * add scan query document * fix merge conflicts * add compactedList resultFormat, this format is better for json ser/der * respect query timeout * respect query limit on broker * use static consts and remove unused code
It is part of this discussion https://groups.google.com/d/msg/druid-development/FK5D162ao74/13F3ixfJEQAJ
This PR is a streaming version of select.
The query is similar to select query
The differences between select query and scan query, are
It is tested by directly talking to historicals, and not fully tested from broker side.