Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert two native queries to use operators #13187

Closed
wants to merge 4 commits into from

Conversation

paul-rogers
Copy link
Contributor

@paul-rogers paul-rogers commented Oct 5, 2022

@gianm and the Druid team recently introduced the new MSQ framework for batch queries (which we will informally call MSQb here.) We would like to bring the MSQ goodness to Druid's interactive queries, to provide an MSQ-interactive (or MSQi). The needs of a batch engine are vastly different than the needs of an interactive engine: this is why both Spark and Presto (or Drill or Impala) exist. It is why MSQb runs via the Overlord, while interactive queries run via the Broker. This PR is a first of many steps to evolve Druid's low-latency query path toward being multi-stage. That is, this is the first step toward MSQi.

This PR uses a the industry-standard notion of an operator DAG as discussed in Issue #11933. Please see that issue for the motivation. Please see the README.md file for a technical overview. The basic idea (thanks to @imply-cheddar) is to retain (for now) the existing QueryRunner structure, but "shim in" operators in place of Sequences. This then sets us up to evolve the QueryRunner classes in later PRs. The result is that we move from one fully-working state to another, with each step moving us toward the MSQi goal.

This PR is large and complex, but it does present the entire operator architecture. We may find that we wish to divide this PR into smaller chunks. Those smaller PRs can refer back to this one for the "big picture."

This PR replaces an earlier PR #12641 that presented an earlier version of this work which was closed to allow us to focus on MSQb at that time.

The PR is currently a draft to allow us to cross-check that each converted operator incorporates any recent changes from the corresponding QueryRunner and Sequence classes.

Highlights of This PR

Included here:

  • Operator definition and associated "helper" classes.
  • Query and fragment structure to run a DAG of operators.
  • A "generic" set of operators for common operations (limit, merge, etc.)
  • Conversion of the Scan query to use operators in place of Sequences.
  • Conversion of the TimeSeries query as above.
  • A configuration layer that allows the new path to be enabled. (It is disabled by default.)
  • Revised QueryLifecycle and QueryResponse classes that can run the query either as old-school Sequences or as the new Fragment structure.
  • Unit tests for all of the new code.
  • Rerun of all "Calcite query" tests with the operator path enabled.

The primary goal of this PR is to introduce the basics of the operator approach. Functionally, the new and old approaches produce identical results. In a few places, the new approach exploited optimizations which skip unnecessary steps. Extreme stress tests (reading 5 million generated rows) shows that the operator path has lower overhead, but that gain is unlikely to be visible except for very large result sets.

Operators

This PR is based on the operator concept discussed in detail in the issue cited above. It is may be helpful to summarize the key ideas. An operator does one task in a data pipeline. Calcite converts a SQL query to a tree of "logical operators". In most engines, a rewrite step then converts the logical plan to a physical plan, complete with distribution decisions. The physical plan is executed as a DAG of physical operators.

Historically, Druid converts the Calcite logical plan tree to a native query, then executes the native query as a set of QueryRunner/Sequence pairs. At a very high conceptual level, a "fragment" is somewhat the execution of a native query, while operators are like a Sequence. The design, however, is much different at the next level down.

An operator does one thing, and is independent of the rest of the code except at three interface points:

  • The "parameters" given to a specific operator instance (what to sort, the limit to apply, what to merge, etc.)
  • The shape of the incoming (upstream) rows.
  • The shape of the outgoing (downstream) rows.

Operators are simple, stateful classes that can be composed in any number of DAG structures, and can be unit tested in isolation. Later in the MSQi project, we will convert directly from the Calcite logical operators to a description (plan) of the physical operators, which is then used to create operator instances. For now, we stick with the detour to native queries so we can make small, incremental steps that move from one working state to the next.

The key operator abstractions include:

  • Operator: an interface for a data pipeline component. An operator can be opened to provide an iterator over results, then closed. An operator can have zero inputs (a leaf operator), one input (a filter, limit or projection operator) or multiple inputs (join, merge, union, etc.)
  • ResultIterator: a super-simple iterator over the results (rows, batches) which an operator produces. Uses an exception to signal EOF, which reduces the code needed in a data pipeline relative to the Java iterator protocol.

Multiple variations of operators are provided in this PR. All of these operators are simple in the sense that they only refer to other operators, but not to any of Druid's query infrastructure.

  • LimitOperator: applies a limit to a result set.
  • NullOperator: does nothing, like an empty list or empty iterator.
  • MappingOperator: takes one input and applies some form of mapping as defined by a derived class.
  • ConcatOpreator: performs a union of its inputs, emitting each one after the other.
  • WrappingOperator similar to "baggage" on sequences: an operator that does tasks at the start and end, of result set, but imposes no per-row overhead.
  • And many more.

There are also native-query-specific operators for the Scan and Timeseries queries. As it turns out, there are many code paths common to all native queries. Operators exist for this path as well so that for Scan and Timeseries, it is "operators all the way down" while for other query types it is a mix of operators and Sequences -- until we convert those other native queries later.

General Operators

Another group of operators are those common to all native queries, and are specific to Druid's implementation:

  • CpuMetricOperator a "wrapper" operator that gathers CPU metrics.
  • SegmentLockOperator implements the pin of each segment as the query runs.
  • MergeOperator generic priority queue ordered merge.
  • And many more.

Scan Operators

Operators for the scan query:

  • CursorReader reads from one or more Cursors.
  • ScanQueryOperator replaces the ScanQueryEngine, which pretty much reads from a set of cursors.
  • GroupedScanResultLimitOperator, UngroupedScanResultLimitOperator: limit operators for scans.
  • ScanResultOffsetOperator: offset operator for scans.
  • ScanListToArrayOperator, ScanCompactListToArrayOperator: unpacks scan "batches" to individual rows.

Surprisingly, these are the only scan-specific operators required.

TimeSeries Operators

For the Time Series query:

  • TimeseriesEngineOperator replaces the TimeseriesQueryEngine to read from one or more cursors (vectorize or non-vectorized).
  • IntermediateAggOperator to perform streaming aggregation of time groups.
  • GrandTotalOperator to perform the second-stage aggregation for time series including a "grand total" row.
  • Misc. plumbing classes, including one that tames the otherwise-wild VectorCursorGranularizer class.

Fragments

Operators combine to form a data pipeline. Data pipelines are distributed, as in Druid's scatter/gather architecture. A common terminology is to say that the entire query forms a DAG. The DAG is "sliced" at node boundaries, with exchanges between slices. At runtime, a slice is replicated across many nodes. Each instance of a slice is a fragment.

This PR provides the basics of the query and fragment structure. We discussed above how, in most engines, a planner converts SQL into a logical plan, then into a physical plan that describes the operator DAG. Slices of that plan are sent to nodes which then execute the fragments. Druid, however, already has a mature, existing scatter/gather structure based around QueryRunners, and we mentioned our goal is to reuse what exists, making incremental changes along the way. In the PR, we retain the majority of the native query structure. Fragments are introduced as a way of managing the group of operators needed for a single native query on one node. When queries run in the "Calcite test" framework, we simulate distribution via worker threads. In this case, we have a query with multiple fragments (one per thread.) A query structure acts as the overall structure which ties the fragments together. This PR does not introduce new exchange methods: it simply fits itself into the existing scatter/gather structure. Again, all this will evolve later, but we have to start simple.

  • FragmentContext: the state shared by all operators in a fragment. For now, this state includes the ResponseContext and, internally, the collection of all operators that form the fragment.
  • FragmentManager: orchestrates the steps to build and run an individual fragment.
  • QueryManager: is the "container" for all fragments in a query.

We will need a way to pass fragment information to QueryRunners so that they can create operators for a fragment. It turns out that QueryPlus is handy way to accomplish this: it now holds a FragmentManager used by query runners to build their operators. A simplified form, FragmentContext, makes fragment-level resources (timeout, response context, etc.) available to operators.

Query Runners

We've noted that Druid already has an existing QueryRunner based structure which we reuse in this PR. The QueryRunner.run() method can actually be seen as being a QueryPlanner.plan() method: the method decides which sequences are needed to run a query. The sequences do the actually running. In this light, it is easy to see how we convert to operators: the QueryRunner.run() method creates an operator instead.

Our long-term goal is to retire the QueryRunner classes. Thinking ahead, the "planning" code can be gathered up in a "native query planner" (akin to QueryKit in MSQb). In anticipation, each converted QueryRunner calls to a "query planner" class to decide which operator(s) to create. In some cases, the operator is a 1:1 replacement for a sequence. In other cases, it turns out we can optimize the query by omitting unneeded operators, or by using one of several finely tuned operators in place of the generalized sequence in the existing code. It should be clear how, in a later step, a new native query planner will call these methods directly without the need to first pass through a QueryRunner.

The general pattern is:

  • Accept a QueryPlus and an upstream QueryRunner.
  • Create an operator which runs the QueryRunner.
  • Convert the resulting sequence to an "input operator."
  • Decide which operator(s) we need to add to perform the task at hand.
  • Wrap the result in a sequence compatible with the return value of QueryRunner.run().

This PR converts two entire native queries, and the common parts of other queries. As a result, for those partially-converted queries, we have a combination of sequences and operators. A set of shims allows operators to read from sequences, and to masquerade as sequences. This allows operators to seamlessly insert themselves into a sequence-based execution pipeline. When two operators are adjacent, the intervening sequence is optimized away, leaving the the two operators to talk directly. As the conversion continues, there will be more operators and fewer in the execution pipeline.

Query Lifecycle

The QueryLifecycle class runs a native query and is where we make the decision to use the "traditional" execution path (based on Sequence) or "MSQi" execution path (based on operators). That decision is based on a configuration property, as explained below. If we execute the query the traditional way, the object handed to the caller is a Sequence. If we go the new route, the returned object is a FragmentManager. As it turns out, @imply-cheddar recently added a QueryResponse object which is perfect for this: that class wraps the two alternatives in a common interface. Our API level expects a sequence, so the fragment-based QueryResponse simply wraps the operator DAG in a sequence (plus a bit of overhead.)

Bootstrap

The QueryNGModule defines a QueryManagerFactory which is injected into QueryLifecycleFactory. That factory then creates a QueryLifecycle. The query lifecycle checks if the query is enabled (as described above) using the config attached to QueryManagerFactory `. If so, it then uses QueryManagerFactory `` to create a FragmentManager which is then attached to the `QueryPlus` for that query.

From then on, each QueryRunner checks if there is a FragmentManager instance attached to the QueryPlus. If so, the query runner creates an operator (if that query runner has been converted to do so), else it executes the "classic" code path to create a sequence.

Note that, if the mechanism is not enabled (the default in this PR), there will never be a FragmentManager attached to the QueryPlus, and so execution will ignore the operator path.

Configuration

At present, the operator path is experimental, and thus disabled by default. The new path is enabled by setting a system property: -druid.queryng.enabled=true on the command line or in a properties file. Just to be clear: even if this flag is set, Druid will happily use the existing path for any queries not yet converted to use operators.

Query, Fragment and Operator Profiles

Since this change is purely at the "plumbing" level, it is hard for the casual observer (or even the dedicated developer) to see if we execute one path or another. The new "native query planner" methods work hard to optimize away bits of functionality not needed for a query, but it is hard to see that in action. We also want to know, in general, how much data flows through operators and how long things take. To address all of these concerns, the code provides a "profile" mechanism to gather basic stats per operator, then present them for a fragment or overall query. Since we don't yet have a good place to store such profiles, they are simply written to the log when enabled. Example:

Query ID: 50496d1d-b9fd-4e3e-8dc9-42195a534eeb
Runtime (ms): 4
Query Type: TimeseriesQuery

-- Root Slice  --

project-sql-results
|   row-count: 1
| 
timeseries-to-array
|   row-count: 1
| 
cpu-time
|   cpu-time-ns: 145000
| 
finalizer
|   row-count: 1
| 
intermediate-agg
|   group-count: 1
|   row-count: 1
| 
segment-retry
|   try-count: 1
|   missing-segment-count: 0
| 
finalizer
|   row-count: 1
| 
intermediate-agg
|   group-count: 1
|   row-count: 1
| 
finalizer
|   row-count: 1
| 
intermediate-agg
|   group-count: 1
|   row-count: 1
| 
ordered-scatter-gather
|   input-count: 1
|   row-count: 1
| 
Slice 2

-- Slice 2  --

Fragment 1

timeseries-engine
  vectorized: true
  row-count: 0

Tests

One of the very handy things about operators is that they are highly modular and thus extremely easy to unit test. Tests exist for all the basic abstractions defined above. Further, all SQL CalciteQueryTest queries were run with the mechanism enabled.

Next Steps

The goal of this PR is to introduce the operator framework with two concrete implementations, starting with the simpler queries so attention can focus on the framework, less on the nuances of the more complex native queries. This PR is a fully-functional replacement for the two native queries, though it is disabled by default. As already noted, future PRs will convert other operators, then begin to convert query runners to an MSQi version of QueryKit. This then allows us to introduce the MSQb frame concept and to add multi-tier queries for expensive merges, for joins and so on.


This PR has:

  • been self-reviewed.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • been tested in a test Druid cluster. (N/A, since the code is not yet integrated into Druid.)

Partial refactoring of fragment context
Operator for test query run
Scan query exec path converted to use operators
Enabled only via a system property.
Scan query operator testing
Finish scan query op testing
Snapshot toward profiles
Query profile
Union query runner operators
Time series grouped agg operators
Query, fragment managers
Scatter/gather operator & profile
Result-level caching operator
Time series to-array operator
TImeSeries query engine
@paul-rogers paul-rogers marked this pull request as draft October 5, 2022 22:13
@paul-rogers paul-rogers changed the title Convert native queries to use operators Convert two native queries to use operators Oct 5, 2022
@lgtm-com
Copy link

lgtm-com bot commented Oct 5, 2022

This pull request introduces 3 alerts when merging 14f0abe into 41e51b2 - view on LGTM.com

new alerts:

  • 1 for Container contents are never accessed
  • 1 for Dereferenced variable may be null
  • 1 for Useless null check

@lgtm-com
Copy link

lgtm-com bot commented Oct 7, 2022

This pull request introduces 2 alerts when merging 546daca into f89496c - view on LGTM.com

new alerts:

  • 1 for Dereferenced variable may be null
  • 1 for Useless null check

@FrankChen021
Copy link
Member

I think we can give a better code name for the 'queryng' package because we might have another 'next generation' in theory😄

@paul-rogers
Copy link
Contributor Author

@FrankChen021, good point! queryng will probably get renamed to msqi or some such. It's had other names previously.

Actually, if we go this route, the code should probably just end up in the query packages where the QueryRunners now live. But, I put it all in one place for now so it's easier to manage in this early stage.

@paul-rogers
Copy link
Contributor Author

There was some offline discussion about isomorphisms between this PR and the existing design. For the benefit of other readers. per the original concept of query runners:

Existing This PR Description
Lots of code Query planners Decides what is to be done for a given query.
QueryRunner Operator Does one task in a query pipeline & returns results.
Sequence ResultIterator Mechanism to obtain the results.

Given how the code evolved into its current state:

Existing This PR Description
QueryRunner Query planners Decides what is to be done for a given query.
Sequence Operator Does one task in a query pipeline & returns results.
Sequence ResultIterator Mechanism to obtain the results.

The key notion is that, in the present PR, we separate the task of "what to do" with "go do it", while in the existing code, these two are often combined. A key reason to split the tasks is to improve testability and reusability. Operators that don't decide what do to, but just do one thing well, can be more easily composed into a large variety of query shapes. QueryRunners and Sequences may have started out this way, but today they tend to be tightly coupled to their context and to one another.

One other difference is that a Sequence is reluctant to provide its contents: it wants to do the aggregation for its "downstream" consumer. This couples the implementation of the adjacent Sequences: the upstream one has to be able to implement what the downstream needs. Yielders can coerce a Sequence into coughing up individual rows, which is what often happens in practice.

By contrast, the Operator abstraction makes a sharper split: an Operator produces a result (usually a batch) and has no desire to know what the downstream operator does with those results. Similarly, a downstream operator says, "just show me the data, baby!" It doesn't care how the batch of rows was produced. Usual arguments apply for testability, modularity and reusability.

Perhaps the goal of a Sequence was to avoid transferring any more data than necessary: transfer only the aggregates. This is ideal across the network. In memory: there is no "transfer cost", just a pointer changing hands. So, it does not matter which side of the line the aggregation is done on. (With the obvious exception of pushing things down into segments whenever possible.) This lets aggregators be aggregators, and other operators just do their own jobs, without responsibilities smearing across boundaries. For the network case, yes, put the aggregate operator on the sender side of the exchange. But, it's still just an operator.

One other thought, I can't take the credit (or blame) for this idea or naming. The "operator" name comes from "relational operator" in the relational calculus that Codd invented way back when. The operator structure has been around since at least the Volcano paper. All we're doing here is borrowing good ideas so we don't have to reinvent the wheel.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants