Parallel Query Execution in Rust

April 20, 2019

I’m working on a design to create a physical execution plan for DataFusion (part of Apache Arrow) that will support parallel query execution across multiple threads.

A typical use case would be executing a SQL query against a Parquet file that has already been partitioned (basically, multiple files with the same schema in a single directory).

DataFusion only supports a small number of operations currently:

  • Projection (filter columns)
  • Selection (filter rows)
  • Limit (filter rows)
  • Aggregate

The first two listed here (projection and selection) and perfectly parallelizable, meaning that the projection and selection for each partition can run on its own thread.

Aggregate is more complex. This is a two step operation. First, an aggregate query can be run in parallel for each partition, but then there is a second step to combine the results of each aggregate query and then run another aggregate. For example SELECT COUNT(*) would produce one count per partition, but then those results need to be added together to get the final count.

Another complication for now is that the ParquetFile struct in DataFusion cannot be sent between threads, so I need to start one thread per partition to actually read the files and then use channels to communicate with other threads.

I’ve started a separate project to experiment with approaches. This is my first time working with threads in Rust in any meaningful way, so I’m keen to get advice from more experienced Rust developers.

I have created a parallel-query-poc repository containing one example to experiment with approaches.

Design 1

There is an ExecutionPlan trait with an execute method that returns one or more iterators over RecordBatch.

/// An execution plan produces one iterator over record batches per partition
pub trait ExecutionPlan {
    fn execute(&self) -> Vec<Arc<Mutex<ThreadSafeRecordBatchIterator>>>;
}

I wanted to avoid using Mutex but because the Parquet execution plan creates threads, I need to pass channels between threads, and the mpsc::Sender trait does not support this. Each call to next is going to need to call lock on this mutex, so that’s a big problem right now.

The iterator trait is pretty simple. It has a next method that returns None when there are no more batches. I did explore using Iterator<Item=Result<Option<RecordBatch>>> but that was adding complexity so I am going with a proprietary iterator for now. I may revisit this later.

/// Iterator for reading a series of record batches with a known schema
pub trait ThreadSafeRecordBatchIterator: Send {
    /// Get the schema of this iterator
    fn schema(&self) -> &Arc<Schema>;

    /// Get the next batch in this iterator
    fn next(&self) -> Result<Option<RecordBatch>>;
}

Each step of the optimized logical plan will be translated to an ExecutionPlan and most plans will have an input plan. Plans will essentially be nested as demonstrated in this pseudo-code:

let execution_plan = ProjectionExec::new(
                       FilterExec::new(
                         ParquetExec::new(parquet_args), 
                         filter_args
                       ), 
                       projection_args
                     );

Although I have working code, it requires a mutex lock on every batch. The batches will typically be large so maybe this is acceptable? I’m not sure. I would prefer that each partition is lock-free.

Design 2

Shortly after posting this, I got some great feedback suggesting that I use the crossbeam library which has an implementation of channel that allows Sender to be passed between threads. This allowed me to remove the mutexes. I’m now wondering if I can also use it to allow me to somehow pass the ParquetFile between threads … I will update the post as I make more progress …

Design 3

Simply create an ExecutionContext per partition ..

Help!

If you have ideas on how I can improve the design, please comment on the Reddit thread or join the Apache Arrow mailing list and let me know.

Leave a Comment