Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

cmu-db/15721-s24-ee1

Repository files navigation

Eggstrain

eggstrain is an discrete Execution Engine targeting analytical workloads. Written in Rust, eggstrain is built on top of and inspired by DataFusion.

eggstrain is being built by Connor Tsui, Kyle Booker, and Sarvesh Tandon.

Architectural Design

eggstrain is closely tied to the high-performance asynchronous runtime tokio. By relying on tokio to manage all dataflow, eggstrain is able to offload the complexity of managing dataflow between operators to an asynchronous scheduler, while focusing on high parallel performance.

eggstrain is neither a push- nor pull-based Execution Engine in the traditional sense. Since data flow is asynchronous, the tokio scheduler gets to decide whether a call to execute from a parent operator results in the parent waiting for data to be pushed from the child operator, or if another operator gets to run while the parent operator yields. More specifically, it is possible that data is pushed all the way from the bottom to the top of a pipeline on a single thread (without any contention from other threads), but is is also possible that data gets pushed to a parent operator without the parent operator running with that data for a long time.

The integration with tokio channels, rayon, and other crates is explained later in 3rd-party Crates.

Operators

eggstrain supports a few operators, and there are more to come.

  • TableScan
  • Filter (Completed)
  • Project (Completed)
  • HashAggregate (in progress)
  • HashProbe + HashBuild (in progress)
  • MergeJoin
  • Sort (Completed)
  • Exchange

All intermediate operators (non-leaf operators) must implement either the UnaryOperator or BinaryOperator trait, depending on the number of logical children an operator has. Special nodes like TableScan and Exchange are dealt with separately. Both of these traits rely on the async-trait crate, as asynchronous sendable trait objects are not stable as of Rust 1.76.

3rd-party Crates

eggstrain is tightly coupled to tokio and datafusion. rayon and arrow are also used heavily in eggstrain. The tracing crate may be implemented in the future for logging and tracking statistics.

tokio is a high-performance asynchronous runtime that provides a well-tested work-stealing scheduler over a lightweight thread pool of asynchronous tasks (otherwise known as coroutines). tokio is open-source, very well-documented, and has many features and a high degree of community support.

Other than the runtime provided by tokio::main, tokio provides asynchronous channels that allow data to be sent between threads in a completely thread-safe, efficient manner. tokio channels come in 4 flavors, but eggstrain only cares about the oneshot and broadcast (it is possible that in the future, broadcast will be replaced with a combination of mpsc and an Exchange operator). See the Data Flow section for more information on how channels are used in eggstrain.

One final thing to note is that the tokio scheduler has a feature called a LIFO slot. Since tokio has a thread pool that takes work off of a shared work queue, it is possible that in a naive implementation, data is sent upwards to a parent task, but the parent task gets placed at the back of the queue, and thus the parent task does not get to run for a "long" time ("long" meaning however long all other queued tasks take to run). If this were to happen, all memory locality would be completely lost, since other tasks may run and replace the pages in the CPU cache and TLB.

The LIFO slot is implemented on tokio's channels to prevent this from happening. When data is passed through a channel, the task on the receiving end of the channel is placed in the LIFO slot. Before worker threads look at the work queue, they will check if there is a task in the LIFO slot, and if there is, they will run that task instead of the first item on the work queue. This reduces the number of cache and TLB misses that might take place if the parent task runs a "long" time after the child task has sent the data upwards. See this blog from tokio for more details.


rayon is a parallelism crate that allows us to write parallelized and safe code using a OS thread pool with almost no developer cost. As opposed to tokio, which uses lightweight tasks (coroutines) which are completely in userspace, rayon is much more "heavyweight", as OS threads will incur system calls and other synchronization-related costs.

The reason eggstrain needs both tokio and rayon is subtle, but it comes down to using tokio for interacting with anything that might need to wait for a bit of time without doing computation (the storage client from the I/O Service component, disk reads and writes, and even data movement between threads), and using rayon for any CPU-intensive workloads (actual execution and computation). Since tokio provides the scheduling for eggstrain, all eggstrain needs to focus on is making operators run as fast as possible.


There is a Rust-native arrow crate that gives bindings into the Apache Arrow data format. Notably, it provides a RecordBatch type that can hold vectorized batches of columnar data. This will be the main form of intermediate data communicated between eggstrain and other components, as well as between the operators of eggstrain itself.

eggstrain is very similar to and makes heavy use of code from datafusion. eggstrain takes as input a DataFusion ExecutionPlan, and outputs a stream of RecordBatches.

Note: Since eggstrain is using datafusion's physical plan representation, we will make heavy use of their proprietary data types (for now). We will also use the DataFusion implementations for the operators we do not plan to implement ourselves.

Data Flow

eggstrain takes as input a datafusion ExecutionPlan as input, which is a DAG of physical plan nodes, connected by pointers (Arc<ExecutionPlan>). eggstrain mimics this DAG by creating a DAG of operator tasks, connected by tokio channels.

Every intermediate operator (non-leaf) runs in its own tokio task (lightweight thread). Depending on if the logical node has 1 or 2 children (a unary or binary node), the task will have one or two input channel receivers over the RecordBatch type. For non-pipeline breakers, the operator will process RecordBatches immediately, and then send the data up via an input channel sender, which sends the data to the parent operator's task. For pipeline breaker such as Sort or HashBuild, data is gathered until all input data has been sent from the child, and from there all data is processed together.

Project Example

Here is a diagram of the Project operator, an intermediate operator that is not a pipeline breaker.

                            ▲
                            │
                            │
                            │
                            │ RecordBatch
   Project Operator Task    │
                            │
            ┌──────────┬────┴──────┬────────────┐
            │          │           │            │
            │          │    tx     │            │
            │          │           │            │
            │          └────▲──────┘            │
            │               │                   │
            │                                   │
            │       project(record_batch);      │
            │               ▲                   │
            │               │                   │
            │          ┌────┴──────┐            │
            │          │           │            │
            │          │    rx     │            │
            │          │           │            │
            └──────────┴────▲──────┴────────────┘
                            │
                            │
                            │
                            │ RecordBatch
                            │
                            │
                            │

Since Project is not a pipeline breaker, it will apply a projection to every RecordBatch it encounters from rx, and then immediately send it to its parent via tx.

Sort Example

Here is a diagram of the Sort operator, which is a pipeline breaker (as it needs all input data before it can sort anything).

                                                         │
                                                         │
                        ▲             Asynchronous Land  │  Synchronous Land
                        │                  (tokio)       │      (rayon)
                        │                                │
                        │ RecordBatch                    │
 Sort Operator Task     │                                │
                        │                                │
┌──────────────────┬────┴─────┬──────────────────────┐   │
│                  │          │                      │   │
│                  │    tx    │                      │   │
│                  │          │                      │   │
│                  └────▲─────┘                      │   │   Rayon Thread
│                       │              ┌─────────────┤   │  ┌─────────────┐
│                       │              │             │   │  │             │
│                       └──────────────┤ oneshot::rx ◄───┼──┤ oneshot::tx │
│                                      │             │   │  │             │
│                                      └─────────────┤   │  ├──────▲──────┤
│         Temporary Buffer                           │   │  │      │      │
│   ┌───────────────────────────┐                    │   │  │      │      │
│   │                           │                    │   │  │      │      │
│   │  Buffer of RecordBatches  │   rayon::spawn();  │   │  │             │
│   │                           ├────────────────────┼───┼──► sort(data); │
│   │   Stored in memory and    │                    │   │  │             │
│   │     spilled to disk       │                    │   │  └─────────────┘
│   │                           │                    │   │
│   └─────────────▲─────────────┘                    │   │
│                 │                                  │   │
│          ┌──────┘                                  │   │
│          │                                         │   │
│     ┌────┴─────┐                                   │   │
│     │          │                                   │   │
│     │    rx    │                                   │   │
│     │          │                                   │   │
└─────┴────▲─────┴───────────────────────────────────┘   │
           │                                             │
           │                                             │
           │ RecordBatch                                 │
           │                                             │
           │

Sort will continuously poll its child task by calling rx.recv() on the input channel, until it returns a RecvError::Closed. All RecordBatches are stored in some temporary buffer, either completely in memory, or with some contents spilled to temporary files on disk. See the async I/O section for more details.

Once all data has been buffered, the Sort operator can begin the sorting phase. Instead of computing the computationally heavy sort algorithm in the tokio task, Sort will instead spawn a rayon-managed OS thread to offload the computational workload to a more "heavyweight" worker. Since tokio is a completely userspace thread library, this is incredibly important, since work done by a Sort operator within a tokio task would cause other tokio tasks to block on the sort computation. By instead sending the data to a synchronous OS thread, other lightweight tasks do not need to block on the sort computation, and the CPU running those threads can choose to run something else.

This is only possible via the oneshot::channel, which provides a synchronous method tx.send() to send data from a synchronous thread to an asynchronous task. On the synchronous side, the OS thread only needs to call tx.send(sorted_data), while the asynchronous task only needs to call rx.await. This is key, since the .await call will allow the tokio scheduler to run other tasks while the sort is running.

Once the rx.await call returns with the sorted data, Sort can send data up to the next operator in fixed-sized batches.




In Progress Sections

All of these sections are incomplete, but give some insight into our plans for the future.

Asynchronous I/O and Buffer Pool Manager

Since eggstrain is so tightly coupled with an asynchronous runtime, it makes sense to use that to our benefit. Typically, we have to wait for disk reads and writes to finish before we can get access to a memory page. In synchronous land, there is nothing that we can do but yield while the OS runs a system call. The OS is allowed to context switch while this disk I/O is happening, meaning the read/write might take a "long" time to return.

It is very possible we could see large performance benefits from using asynchronous I/O for a buffer pool manager. More specifically, we could implement a buffer pool manager that has both synchronous (blocking) and asynchronous methods for reading and writing, which would give as plenty of flexibility in when we want to use one or the other.

This would probably take more time, but it might also be a great contribution we could make, since as of right now, our individual operators will never be able to outperform datafusion.

Interface and API

eggstrain can be treated as a mathematical function. It receives as input an ExecutionPlan physical plan, and outputs a stream of Apache Arrow RecordBatches, or a SendableRecordBatchStream.

The ExecutionPlan that eggstrain receives as input has nodes corresponding to one of the operators listed above. These plans will be given to each eggstrain instance by a Scheduler / Coordinator as DataFusion query plan fragments. More specifically, eggstrain will parse out the specific nodes in the relational DAG, and construct its own DAG of operators, connected by channels instead of pointers.

Once eggstrain parses the plan, it will figure out what data it requires. From there, it will make a high-level request for data it needs from the IO Service (e.g. logical columns from a table).

TODO something about TableScan, need to talk more with the I/O teams.

About

15-721 Spring 2024 - Execution Engine #1

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published