Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query profile mechanism #11800

Open
paul-rogers opened this issue Oct 14, 2021 · 0 comments
Open

Query profile mechanism #11800

paul-rogers opened this issue Oct 14, 2021 · 0 comments

Comments

@paul-rogers
Copy link
Contributor

paul-rogers commented Oct 14, 2021

This issue proposes to add a query profile feature to Druid. This project has multiple parts and we propose to describe each part as a separate issue. This issue presents the overall concept and acts as a "rollup" for the individual task tickets.

Motivation

A query profile is a collection of information about a single query used to diagnose performance issues for both developers and users. Query profiles are a standard feature of Drill, Impala (Cloudera version), Presto and other query tools. When aggregated, the information in the profile can show query access patterns needed to optimize performance of a cluster.

Each profile generally includes the query, the query plan and metrics about query execution. Metrics are unique to each product and reflect how the product operates internally. Our goal here is to design a query profile specifically for Druid.

Profiles in other products are used in two distinct ways. First, it helps answer the perennial user question: "why is my query slow?" Second, it helps developers understand query performance to identify further opportunities for improvement.

See an example profile here.

Goals

Druid offers blindly fast performance, but only if the system is properly tuned for the data and queries. Historically, tuning Druid was more of an art than a science because we were missing critical information (what actually happens within a query). Instead, one had to observe performance, know what to look for in data and queries, know how Druid operates internally, and thus know which changes might be beneficial. Then, one could intuitively know which change to make to improve performance, try it, and repeat the cycle to verify performance, or seek further improvements. Though many experts developed the necessary skills to know what to look for, newer users are often left with trial and error since they've not yet developed a feel for how Druid works.

A profile is distinct from the metrics which Druid produces today. Metrics report on the performance of the cluster as a whole and are designed to be aggregated by time, by node, etc. A profile, by contrast, is meant to explain the performance drivers, in detail, for one specific query. The structure of the profile differs depending on the query (in Druid, depending on the native query type.) Overall metrics tell us how many queries per second the system handles and average query latency. A profile explains exactly why query x tool the time it did: the amount of data processed, the way it was processed, and so on.

The profile is also not a flame graph. A flame graph shows the execution times within a Java process. While flame graphs help developers understand their code, they are not of great use to users since users don't know the mapping from query operations to Java implementation. A profile describes the query at the level of query operations, not code implementation. Our goal here is not to optimize the code, but rather the application. Knowing that function foo() takes x% of the time might tell an expert that something is amiss, but it is unlikely to provide useful knowledge to the average user.

Within-query visibility

One way to address this issue is to make the performance drivers explicit: to show which operations within a query take time. Users will not optimize the code, but rather will change query structure, data layout or cluster configuration to improve performance: the query profile presents query details in those terms.

Similarly, measuring query time is useful to tell us where we stand, but just knowing query run time does not tell us why the query takes that amount of time, or what we can do about it.

Our premise here is that if we know, at an application level, why some part of a query takes time, then we are in a better position to make application-level changes that improves performance. For example, we want to know:

  • How many segments where hit? Why? (Was a query phrased in a way that Druid must search all segments, or did it have a tight time range that let Druid select a small subset of segments?)
  • Did the query make use of partitioning? Why or why not? (Ideally, most queries will have filters on the columns used for partitioning. If not, then perhaps the partitioning selection is not aligned with query usage.)
  • How many rows were scanned? (If the query scans far more rows than those returned, then it is likely that the query uses filters which Druid cannot optimize. Perhaps a new column can be added that would allow index lookup instead of scanning rows.)
  • How many rows were aggregated? (If the query scans many rows to aggregate down to a far smaller set of rows, then perhaps the aggregation should be done once at ingest time rather than for every query.)
  • And so on. Once one realizes we can ask these questions, the community will suggest many more.

Empower performance decisions

Users already make use of available metrics to make decisions about how to improve performance. A suggested above, query-level details provide more precise information where before experts relied on more general signals. The result of this information is to empower the user to make performance revisions such as:

  • Revisions to the data structure: roll-up "grain", number of dimensions, number of tables, etc.
  • Partitioning: a form of top-level indexing
  • Segment sizes which influence the cost of full table scans
  • Distribution (the number of data nodes), which allows us to do more work in parallel.

Experience with query profiles in other products has shown that detailed query-level information is essential to visualizing, understanding, and improving each of the above factors.

Specific goals

Query profiles in other products are the product of many years of effort: people use the profile to answer questions, realize something is missing, and modify the code to capture that information. We cannot hope to compress all that learning into this single project. So, we have more modest goals.

  • Define the basic query profile structure, and gather metrics for some of the most basic query operations as a proof-of-concept.
  • Provide a mechanism to transfer summary information about a query from the data node to the Broker, or from the Broker to the client. This builds on a "response trailer" mechanism which Gian prototyped in his personal branch.
  • Since most clients won't know what to make of a query profile, provide a way for the Broker to persist profiles. This project will persist profiles on local disk, but will provide an extension point to add mechanisms to write profiles to a log collection system or other destination.
  • A typical usage pattern is to run a SQL query from some favorite query tool, then inspect the results. So, this project will also offer a new REST call that can retrieve a profile, given a query ID, from the local profile storage.

We leave it later projects to add metrics for the wide range of query operations that Druid provides. We also defer to others to add analysis, visualization or other tools on top of the basic query profile mechanism.

Usage pattern

Experience with other projects suggests the following usage patterns for this feature.

A data engineer builds an app and notices that a one or more queries perform poorly. They need to understand why. In this case:

  • The user submits the problematic query, probably using a BI tool, custom application, or dashboard.
  • The user obtains the profile for the completed query, does analysis, identifies potential issues, and makes a proposed change.
  • The user repeats the process to verify that the desired improvement occurred, or identifies additional opportunities for improvement.

Another example is for a vendor that supports Druid. In this case, the flow is similar, except that the "user" above is replaced by a support engineer. In this case, the support engineer is typically unfamiliar with the customers' cluster or application. The profile provides much of that information, the rest can be obtained by inspecting the Druid UI (for cluster size, etc.) Rather than wondering, "is the data partitioned correctly? Is the query using the right dimension index? Is the query scanning too many rows>", the support engineer simply looks at the profile to get an immediate, concrete answer.

The final usage scenario is to aggregate profiles to look for patterns. For example, looking at profiles for a set of queries may reveal that most do a large amount of aggregation, and so pre-aggregation might be helpful. Or, the profiles might reveal that most queries touch the most recent week of data, and Druid tiers can be used to put that data on higher-performing data nodes.

In all these cases, the job is made easier by understanding exactly how Druid runs a given query based on table structure and cluster configuration.

Profile design

The profile is intended for users, and provides information that can drive performance decisions. The profile preserves the essence of query execution, while abstracting away those implementation details which are not relevant to user decisions. Rather than reinvent the profile, we start with knowledge of what has worked well in the products mentioned above. For example, the Drill page linked above shows a visualization of the DAG, and the metrics gathered for each node (operator) within the DAG. The Impala query profile has a similar (though far more detailed) structure. This project will not provide such a visualization. However, the visualization comes from a JSON-structured profile organized by operator as shown above. The goal here is to provide a similar profile for Druid.

The profile must capture three essential levels of information:

  • The (user submitted) query as a whole, especially for SQL queries where a planning step converts the SQL query to a native query.
  • The scatter/gather model used to distribute the query.
  • The detailed transformations of the data needed to implement the query, since these transformations are the primary within-fragment cost drivers.

The Druid query engine

Druid is a distributed system that offers queries as one of its services. As with any good REST API: each API call does one thing well. Thus, Druid has a collection of native queries, each of which does one kind of operation. Druid also offers a SQL layer that converts SQL queries into one of the native Druid queries.

A Druid query has two main parts. At the top level are a set of REST RPC calls that scatter the query across data nodes. Within each node, Druid executes queries using a unique functional design that combines query planning and execution in a set of (mostly) stateless functions.

Druid's query engine follows functional programming (FP) patterns: a query is a stateless function which accepts a query as input and returns a result set as output. A query profile is a form of state: it collects information about what the query engine actually did. The primary challenge of this project is to map from Druid's FP model to just enough state to gather the profile. Some clever (but ugly) tricks are needed.

Learning from existing designs

A typical distributed query plan consists of a set of logical operators that describe what is to be done. Each logical operator typically corresponds to one of the well-known "relational operators" from relational database theory: scan, filter, join, sort, union, etc. Then, a set of physical operators are distributed across the network to do the actual work. One or more physical operators are grouped in a fragment which executes on a node. Thus three an be many concurrent physical operators for each logical operator. In a full DAG-based engine, multiple exchanges shift data from one fragment to another.

A DAG includes operators: objects which implement steps within the query data flow (scan, filter, sort, join, etc.). Since operators are objects, they can easily gather performance metrics. The profile is created simply by walking the operator DAG and "harvesting" that performance information. Since most query engines use a DAG, most also provide a query profile as well.

Mapping from Druid's functional programming engine to the query structure

In Druid, the native query is roughly the equivalent of a fragment, and exchanges are limited to a single level scatter/gather process between data nodes (historical, peons) and the broker.

Operators are implicit within the implementation of each native query. Druid's query engine is based on a functional programming (FP) model, not a DAG model. So our first challenge is to decide how to structure the Druid query profile. Once choice is to mimic the Druid call structure, perhaps reporting statistics for each QueryRunner.run() function. (A Druid query is simply a chain of calls to a large variety of these methods.) That is, perhaps the query profile can be structured like a frame graph with focus on the time spent in each run() method.

However, a closer inspection reveals that the QueryRunner.run() method is more like a QueryPlanner.plan() method: each builds up the actual execution mechanism. There are more runners than there are execution steps. The runners represent the specific way that Druid is coded, not an essential description of the steps required to execute the query. The bulk of the work (transforms), occurs as side effects of requests to get the next item from a Guava Sequence. Thus, a flame graph will not reveal anything about the aspects of query which the user can control (data layout, query design, cluster configuration, etc.), but will instead tell us how Druid is implemented (which is not something that the user can control.)

Instead, we want to focus on cost drivers: the actual steps (transforms) that take time. In Druid, these are represented using a Sequence with a large number of additional components added via various ad-hoc mechanisms. As it turns out, each time Druid creates a new Sequence more-or-less maps to a transform: the only reason to create a new sequence is to merge, filter, sort or otherwise transform data from the input sequence. Thus, Sequences are closer to the level at which we want to gather detailed statistics.

Still, a Sequence is a data delivery vehicle. Our interest is in the actual transform, which sits outside the sequence. In Druid, such transforms are not reified (that is, not represented as objects). Instead, they are represented by a series of lambda functions created with the QueryRunner.run() call but executed as the Sequence is read. These transforms have no name in Druid, they are just bits of code. Yet, these are what we want to instrument, so we'll call them (virtual) operators.

By making this move, we are now in a position to leverage the operator structure typical of a query profile. The profile is thus a tree (DAG) of (virtual) operators, each of which represents a bit of code in Druid that creates or transforms a Sequence as defined in some QueryRunner.run() method. The unusual details of Druid's execution are abstracted into patterns that are familiar (and relevant) to a data engineer who knows how queries typically work in Drill, Impala, Presto/Trino, Spark, Flink, etc. One advantage of such a structure is that the DAG for a query is determined by the query, and is fairly stable, while the native query implementation is subject to constant revision to increase performance. Thus, a DAG structure allows the profile to change more slowly than the actual code.

Profile structure

The profile presented to the user is JSON object that can be serialized into Java or other languages. We must define the structure of that JSON.

We've said we need to show information for the three main concepts: query, scatter/gather, and transforms. We've noted we must map from Druids FP design to the essential query structure relevant to the user. This gives rise to a profile structure with three levels: query, fragments and operators.

First, we have to understand a bit about how Druid queries operate in the large. A query arrives (typically) at a Broker (perhaps via the Router). If the query is SQL, the Broker translates the SQL into one or more native queries. If the query is already native, then we skip the SQL step. The Broker then determines the data nodes which hold data for that query, rewrites the query specifically for each node, and scatters the query to each data node. The data node then receives the query and does the necessary work. The key fact to notice is that the client could send a native query directly to a data node: there is no real difference between the internal query execution path for the broker and data nodes.

We map this unusual structure into typical distributed-system concepts. First, the point of entry into Druid is the (client) query: the query as sent from the client. This could be a SQL query, a native query sent to the Broker from a client, or a native query sent from the client to a specific data node. The query profile represents the "query" as a whole.

As explained above, Druid uses a scatter-gather pattern to execute a query. This is simply a two-level version of the fragment structure typical of distributed engines. So, we use the term fragment to refer each scatter step. Thus a query (as defined here) consists of one or more fragments, each of which is represented by a rewritten native query executing on a data node. Note that, from the perspective of the data node, a query is a query: it does not matter if it is client query or a fragment (often called a "subquery" in Druid). The profile, however, does make this distinction: a Druid subquery is a 8fragment*, a (non-sub)query is a client query.

Then, as explained earlier, we map Druid's functional programming based query engine into a set of operators, where each operator represents a point in the execution where data is read, or one sequence of results is transformed into another.

This gives the following basic structure:

  • query (as submitted by the client)
    • fragments (the scatter stage, each fragment is a subquery)
      • operators (the transforms within the subquery)

The actual structure is bit more subtle. The Broker does not simply issue a series of subqueries (fragments), it also combines them. As a result, the query is actually a set of Broker-side operators, one of which sends the query to a data node. Thus, the actual structure is:

  • query (as submitted by the client)
    • operators (the transforms within the query)

And, one of those operators is a exchange which handles one specific scatter/gather data operation. An exchange has two parts: a sender on the data node side, and a receiver on the broker side. So:

  • receiver operator
    • fragment (a subquery)
      • operators

For native queries, the query and the fragment are basically the same (since the code, for the most part, handles the two cases identically.) But, for a SQL query, we have additional steps such as planning. This gives the final structure:

* query (as submitted by the user)
  * query-level information (SQL text, query plan, etc.)
  * root fragment
    * merge operators, etc.
      * receiver
        * fragment ...
          * operator ...
            * scan (read data from a segment, etc.)

Or, in quasi-BNF form:

query: query-attribs, fragment
fragment: fragment-attribs, operator
operator: merge | receiver | scan | ...
receiver: receiver-attribs, fragment

The set of operators depends on the specific query: a simple query may have very few operator levels, a complex query may have many.

The actual set of operators will emerge as we implement the profile: defining them is a bit of an art since we must invent them as we go along.

Profile construction

As noted earlier, a typical query engine is reified as a collection of stateful operator objects. Each operator typically proceses multiple rows or row batches, and thus lives for the life of the query. Each object gathers state information, and the profile is created by walking the operator DAG to harvest performance information.

We've noted that Druid uses a unique (semi-)stateless FP model. So, how do we gather performance state if the design is stateless?

As it turns out, every QueryRunner.run() method takes the ResponseContext as a parameter, and the response context gathers state. Thus, the response context is a "back-door" way to gather state, and is why Druid's engine is actually "semi-stateless." We extend ResponseContext to gather and assemble operator profiles that provide an abstract data flow model of the actual FP implementation.

Druid also makes use of many detail functions to do actual work, and these are also designed to be stateless. Here, we exploit another back-door: Druid often introduces implicit state via Java closures. We leverage this pattern to pass performance information out of otherwise stateless functions that do not return performance information.

The result is decidedly ugly, but does have the virtue that it works. Details will appear in the PR for this part of the project.

Profile content

With the structure defined, we can now turn to the individual objects. The profile is a JSON-serializable set of Java objects created as the query executes. (See a later ticket for the details of how that mechanism works.) The objects follow the structure above. Each object holds information such as:

  • The way that Druid implemented an operation (when Druid has a choice and that choice is based on something in the query or fragment.)
  • The number of rows processed. (Data engineers think in terms of rows, so rows are a familiar metric.)
  • The time taken.

The profile need not repeat information readily available in the native query or in the table schema, unless that information is critical to understanding a cost driver. (For example, knowing the cardinality of one dimension in one fragment is helpful for understanding the selectivity of a filter on that dimension.)

Rather than spell out the details of the profile here, we leave that to an example and detailed comments in the Java query profile objects.

Proposed changes

The project divides into several parts:

  • The query profile design, outlined here, which will be populated throughout the query engine.
  • Revisions to the ResponseContext, and QueryMetrics which will be our "back-door" to gather state from Druid's quasi-stateless functional programming implementation.
  • A response trailer mechanism, originally prototyped by Gian, which delivers fragment information from a data node to the broker.
  • An extensible ProfileConsumer mechanism to allow third-party systems to gather and process query profiles, including a basic implementation which saves profiles to local storage on the Broker.
  • Two new REST API methods to obtain profiles for previously-run queries.

This issue is already quite large, here we focus on the overall concept. We will open additional issues to provide detailed designs for each of the above areas. The work will be submitted as series of PRs, each focused on one specific area.

Rationale

The main question is: why a query profile? Why not more logging? More metrics? Additional information returned with the query itself. The simplest answer is that the profile mechanism here derives from designs that evolved to work well in Drill, Impala, Presto and similar projects.

  • More logging. One could imagine adding detailed logging for every "interesting" bit of information about each query. This approach is simple and easy. But, it places a huge burden on the user: how to assemble the information for one specific query from logs spread across multiple worker nodes. Doing so would take a complex data pipeline. The profile, however, does all that work: it already gathers the information for only one query, organized to show data flow and the corresponding "interesting" metrics.
  • More metrics. Druid already emits many metrics and users have built metric systems to gather them. Experience with other projects suggests that metrics systems are a poor fit to understand the performance of an individual query, Spark job, YARN task, etc. Instead, such systems provide some way to describe specific queries (or jobs) individually and in detail. A profile does not replace metrics; it is a different view of the data that addresses a use case distinct from the one which metrics address.
  • Tracing. In modern micro-service architecture, shops use tracking solutions to gather metrics for each service, and then assemble them into an overall trace for a request. Multiple products exist to help assemble the trace. Could we use such an approach here? Certainly: at a cost: an Apache Druid user would need to integrate one of these systems to assemble a complete profile from the individual traces. The design here does the assembly work as part of running the query. That said, an extension could certainly omit the assembly within Druid and instead send the piece-parts to an external system for assembly.
  • Status quo. Today, it takes experts to know what Druid must be doing internally for a query, along with knowledge of the "knobs" to turn to change that behavior. The profile makes the internal cost drivers explicit so that a data engineer can understand the costs without being a Druid code expert. Once the costs are understood, the data engineer can make changes and see the results. Without a profile (that is, with the status quo), the data engineer only sees the final result (query latency), bit without an understanding of why that value is obtained: without a profile the data engineer is trying to drive Druid by a sense of smell. The profile provide a windshield to reveal what is actually happening.

Operational impact

The query profile feature, if on all the time, could create a large number of profiles, most of which would never be used. To address this, we propose that the feature be disabled by default. When enabled, there should be three levels of support:

  • Never gather profiles.
  • Gather profiles only for specific queries.
  • Always gather profiles.

The "never" option is the default so that operations will see no impact from profiles unless a site specifically choose to use them.

The specific-query option is a good setting for many sites: a user can request that a profile be created for specific queries of interest. Those profiles will reside on disk (by default), but should take little space.

The "always" option makes the most sense when combined with a profile "consumer" that sends profiles to some other system (perhaps to Kafka) for bulk processing.

Profile size

The basic profile design includes one entry for each fragment (data node) which processes the query. This design is fine for small clusters. However, if a cluster has hundreds of nodes, and thousands of segments, the profile will become quite large since it will have one entry for each node, and within each node, one entry for each segment which the query processes.

The initial profile design acknowledges this problem, but does not seek to solve it. The best solution for a large cluster is to gather profiles only on request, then summarize them with some external Python script or some such.

Once we gain experience with how people use profiles on large clusters, we can consider adding additional functionality to, say, aggregate or merge fragments. We note, however, that such aggregation destroys the very detailed data that a profile is intended to provide, so some thought will be required.

Initial upgrade

In a Druid cluster, any node can be upgraded in any order. This section tackles the issue of upgrading from a pre-profile version to Druid version that supports profiles.

The safest way to do the upgrade is to leave profiles turned off (the default) until the upgrade is complete. However, some users may be so excited about the profile feature that they enable it straight away. We must ensure that this pattern works also.

As per Gian's response trailer design, the data node returns the trailer only when requested. By default, no trailer is sent. This becomes the basis of our initial upgrade strategy.

Druid will ship with query profiles turned off. Thus, even if a data node is upgraded to one that can produce a profile, that profile will simply be discarded and not sent to the broker. This means that we can upgrade either the broker or data node first.

  • A newer data node will gather the profile, but will not send it unless the required request header is present. Since old brokers won't include the header, a newer data node will return the "traditional" response so that the cluster operates properly.
  • A newer broker will ship with query profiles turned off. Since the newer broker will not include the trailer-request header, we operate in the same mode as above, and so it does not matter if data nodes are older or the same version as the broker.
  • If an over-eager user upgrades the broker and immediately enables query profiles, then the broker will send trailer-request header, but the old data nodes will ignore it and return the traditional response. The broker must be prepared to receive this traditional response and fill in a "stub" dummy fragment for the missing profile. Once the data nodes are upgraded, then the newer, trailer response will become available, and the query profiles will contain the full fragment information for those nodes.

Rolling upgrades

Druid can be used in a rollin upgrade scenario in which the broker is at a different version than a data node. In this case, as the profile evolves, a broker may receive a "fragment" which is at a version newer or older than the broker itself. The profile is designed so that the Java objects which gather information are only ever serialized, never deserialized. Specifically, the broker accepts the fragment profile as generic Java Map, not as a specific Java object.

Consumers

Profiles are created to be consumed. Experience suggests that many sites will use the simplest approach: data engineers will look at profiles when they have a problem. In this case, the humans will have no problem if the profile changes a bit between releases.

Other shops may choose to process profiles in bulk to extract metrics of interest. Such shops need a way to adjust code based on profile version. Profiles will thus contain a profile version. This is not a software version: it is a version of the profile format. The version is incremented each time a breaking change occurs. A breaking change is defined as changing the name or type of a field or removing a field.

Clients should be prepared to handle additions, since they are the most common change. One way to do that is to store the profile as tree of maps rather than deserializing to a class or similar structure.

Consumers should also be prepared for the occasional "mismatch" profile from a rolling upgrade: the profile version of the query itself differs from that of one or more fragments within the profile. (That is, the broker was a different version than the data nodes.) Easiest is to simply discard such profiles in the sites's profile pipeline.

Security

When the disk-based profile consumer is enabled, Druid will write profiles to disk, and will provide REST API calls to obtain the profiles. In an internal or development cluster, it is probably fine to allow anyone with access to the REST API to obtain all profiles. In a production cluster, however, access may be limited to certain users.

For this reason, the profile access REST APIs provide a new security setting to control profile access. Details will appear in the PR for the new REST APIs.

Test plan

Testing occurs at two levels:

  • The basic mechanism will be covered with JUnit tests.
  • Profile contents are difficult to test automatically. Instead, profile contents will be tested by inspection. Each bit of the profile is created to answer some specific problem. The process of adding the required information should also verify that the information was, in fact, gathered correctly.

Future work

A query profile is never "done". It grows and evolves as the engine grows and as users want to answer new questions. For this reason, this task plans to add the mechanisms described above, and decent coverage of one or more native queries. (The example is for the scan query.)

We expect others to contribute additional query types, code paths, detailed metrics and other information as people find the need.

We also expect third parties to add profile consumers, and perhaps tools to do useful things with the profile, such as the DAG visualization shown in the Drill documentation.

Our goal here is to create the foundations: we encourage others to build on these foundations.

Multiple metrics mechanisms

With the introduction of the query profile, the query engine will now gather four kinds of metrics:

  • Those placed in the response header (via the response context)
  • Those gathered via QueryMetrics to be sent to a metrics system
  • Logging
  • Query profile

In an ideal world, we'd have a single "metrics collector" used by the engine code, and have implementations that shuffle each metric to where it needs to go. We leave this idea for a future enhancement, for two reasons.

First, the required changes are already large, so we want to avoid even larger changes.

Second, it turns out that there is an "impedance mismatch" between the existing mechanisms and the query profile. The query profile wants to gather metrics for specific operators: row count for THIS filter vs. THAT merge. While, the QueryMetrics wants to be generic: the number of rows returned from all scans, say, or from the query as a whole. Some work is needed to invent a common mechanism.

Still, having a common mechanism would be good and is left as a future exercise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant