This Weekend in DataFusion (2/18/18)

February 18, 2018

I had limited time to work on DataFusion this weekend but have started to refactor the code base based on some feedback that I received on Reddit last week and have also been working on some benchmarks.

Generating Closures

DataFusion uses the Expr enum to represent the different types of expression that can be used in projections, selections, and so on. Here is the current definition of Expr, which only supports a very limited set of expressions today.

#[derive(Debug,Clone,Serialize, Deserialize)]
pub enum Expr {
    /// index into a value within the tuple
    TupleValue(usize),
    /// literal value
    Literal(Value),
    /// binary expression e.g. "age > 21"
    BinaryExpr { left: Box<Expr>, op: Operator, right: Box<Expr> },
    /// sort expression
    Sort { expr: Box<Expr>, asc: bool },
    /// scalar function
    ScalarFunction { name: String, args: Vec<Expr> }
}

Prior to this weekend, the execution runtime would dynamically interpret each expression using some matching logic. For example, here’s a portion of the original code.

match expr {
    &Expr::BinaryExpr { ref left, ref op, ref right } => {
        let left_value = self.evaluate(tuple, tt, left)?;
        let right_value = self.evaluate(tuple, tt, right)?;
        match op {
            &Operator::Eq => Ok(Value::Boolean(left_value == right_value)),
            &Operator::NotEq => Ok(Value::Boolean(left_value != right_value)),
            &Operator::Lt => Ok(Value::Boolean(left_value < right_value)),
            &Operator::LtEq => Ok(Value::Boolean(left_value <= right_value)),
            &Operator::Gt => Ok(Value::Boolean(left_value > right_value)),
            &Operator::GtEq => Ok(Value::Boolean(left_value >= right_value)),
        }
    },
    &Expr::TupleValue(index) => Ok(tuple.values[index].clone()),
    &Expr::Literal(ref value) => Ok(value.clone()),
    ...

Starting with DataFusion 0.1.6, this matching logic is now executed during the query planning stage and the expressions generate closures that can be used at runtime to evaluate the expressions.

Here’s a subset of the code that generates the closures:

/// Compiled Expression
pub type CompiledExpr = Box<Fn(&Row)-> Value>;

/// Compiles a relational expression into a closure
pub fn compile_expr(ctx: &ExecutionContext, expr: &Expr) -> Result<CompiledExpr, ExecutionError> {
    match expr {
        &Expr::Literal(ref lit) => {
            let literal_value = lit.clone();
            Ok(Box::new(move |_| literal_value.clone()))
        }
        &Expr::TupleValue(index) => Ok(Box::new(move |row| row.values[index].clone())),
        &Expr::BinaryExpr { ref left, ref op, ref right } => {
            let l = compile_expr(ctx,left)?;
            let r = compile_expr(ctx,right)?;
            match op {
                &Operator::Eq => Ok(Box::new(move |row| Value::Boolean(l(row) == r(row)))),
                &Operator::NotEq => Ok(Box::new(move |row| Value::Boolean(l(row) != r(row)))),
                &Operator::Lt => Ok(Box::new(move |row| Value::Boolean(l(row) < r(row)))),
                &Operator::LtEq => Ok(Box::new(move |row| Value::Boolean(l(row) <= r(row)))),
                &Operator::Gt => Ok(Box::new(move |row| Value::Boolean(l(row) > r(row)))),
                &Operator::GtEq => Ok(Box::new(move |row| Value::Boolean(l(row) >= r(row)))),
            }
        }

This means that at runtime, it is now possible to just invoke the generated Fn(&Row)-> Value to evaluate an expression. Here’s a portion of the code for evaluating a projection against a Row.

let values = self.expr.iter()
    .map(|expr| (*expr)(&row))
    .collect();

This change has given DataFusion a nice speed boost.

Read/Write Buffers

Previously, there was no write buffer for writing data to disk and although there was a read buffer, it was using a small default buffer size. DataFusion now uses 8MB read and write buffers for improved performance.

Performance Impact

The impact of the above changes was to increase the performance of DataFusion by 70% for the first benchmark that I am working on and DataFusion is now approximately 3.5x faster than Apache Spark for this particular benchmark.

Benchmark Repo

There is a new datafusion-benchmarks repo where I am creating benchmarks for comparing DataFusion performance with Apache Spark.

I am in the process of generating large data sets on some dedicated hardware and will be publishing some benchmarks soon, once I’ve verified they are working correctly and that they are as fair as I can reasonably make them.


Want to learn more about query engines? Check out my book "How Query Engines Work".