DataFusion 0.15.0 Release Notes

September 22, 2019

DataFusion is an extensible query execution framework implemented in Rust that uses the Apache Arrow memory model, and is part of the Apache Arrow project.

DataFusion 0.15.0 is due to be released in the next few days (as part of the Apache Arrow 0.15.0 release) and contains a preview of a new query execution implementation based on a physical query plan, as opposed to executing the logical plan directly.

The main motivations for this new implementation were:

  • Adding support for partitioned data sources, with parallel query execution using threads, making DataFusion viable for some real-world use cases
  • Implementing a trait-based execution plan, so that other projects can extend DataFusion and add their own expressions and operators, for example adding support for distributed query execution, or calling custom code as part of query execution.

This new physical query execution is in preview because it currently only supports a subset of expressions and operators and doesn’t have feature parity yet with the query execution directly from a logical plan. I am working towards feature parity in time for the next release in ~2 months time. Obviously contributors are welcome, to help accelerate this progress (see ARROW-5227). The new implementation has also had very little testing outside of the unit and integration tests, so is probably not very useful yet.

New Traits

Each physical execution plan (such as Projection, Selection, HashAggregate) implements the following traits:

/// Partition-aware execution plan for a relation
pub trait ExecutionPlan {
    /// Get the schema for this execution plan
    fn schema(&self) -> Arc<Schema>;
    /// Get the partitions for this execution plan. Each partition can be 
    /// executed in parallel.
    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>>;
}

/// Represents a partition of an execution plan that can be executed on a thread
pub trait Partition: Send + Sync {
    /// Execute this partition and return an iterator over RecordBatch
    fn execute(&self) -> Result<Arc<Mutex<dyn BatchIterator>>>;
}

Additionally, expressions (such as Literal, Column, CAST, Binary Expression) must implement the PhysicalExpr trait:

/// Expression that can be evaluated against a RecordBatch
pub trait PhysicalExpr: Send + Sync {
    /// Get the name to use in a schema to represent the result of this expression
    fn name(&self) -> String;
    /// Get the data type of this expression, given the schema of the input
    fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
    /// Evaluate an expression against a RecordBatch
    fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef>;
}

Aggregate expressions can also be added, by implementing the AggregateExpr trait:

/// Agggregate expression that can be evaluated against a RecordBatch
pub trait AggregateExpr: Send + Sync {
    /// Get the name to use in a schema to represent the result of this expression
    fn name(&self) -> String;
    /// Get the data type of this expression, given the schema of the input
    fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
    /// Create an accumulator for this aggregate expression
    fn create_accumulator(&self) -> Rc<RefCell<dyn Accumulator>>;
    /// Create an aggregate expression for combining the results of accumulators 
    /// from partitions. For example, to combine the results of a parallel SUM 
    /// we just need to do another SUM, but to combine the results of parallel 
    /// COUNT we would also use SUM.
    fn create_combiner(&self, column_index: usize) -> Arc<dyn AggregateExpr>;
}

Executing SQL with the physical plan

This code sample demonstrates how to try out the physical query execution by converting a logical plan to a physical plan and then calling ExecutionContext::collect() to execute the query and return the results in a single vector.

let logical_plan = ctx.create_logical_plan("SELECT a, SUM(b) FROM c")?;
let logical_plan = ctx.optimize(&logical_plan)?;
let physical_plan = ctx.create_physical_plan(&logical_plan, 1024)?;
let results = ctx.collect(physical_plan.as_ref());

Note that this approach to creating a physical query plan is limited to the expressions and operators currently supported in the DataFusion logical plan and SQL query planner.

Manually building a physical query plan

It is possible to construct physical execution plans directly and this is necessary when using custom implementations of ExecutionPlan that are not provided natively in DataFusion, or where more control is needed over how the plans are constructed. Here is an example that demonstrates how to manually build a HashAggregate query plan against a partitioned CSV data source:

let csv = CsvExec::try_new("/path/to/partitioned/csv", schema, true, None, 1024)?;

let aggregate = HashAggregateExec::try_new(
    vec![col(1)], // GROUP BY c1
    vec![sum(col(3))], // SUM(c3)
    Arc::new(csv), // input
    Arc::new(Schema::new(vec![
        Field::new("c1", DataType::UInt32, true),
        Field::new("c3_sum", DataType::Int64, true),
    ])),
)?;

Next Steps

As I mentioned earlier, my focus now is on feature parity with the current query execution against the logical plan, and once that is achieved, I plan to deprecate the ExecutionContext::execute() method that takes a LogicalPlan as input.

I will also be updating my Ballista PoC to extend DataFusion and add support for distributed queries and that will help test out this new functionality and drive further requirements.