Eighteen months ago, I started the DataFusion project with the goal of building a distributed compute platform in Rust that could (eventually) rival Apache Spark. Unsurprisingly, this turned out to be an overly ambitious goal at the time and I fell short of achieving that. However, some very good things came out of this effort. We now have a Rust implementation of Apache Arrow with a growing community of committers, and DataFusion was donated to the Apache Arrow project as an in-memory query execution engine and is now starting to see some early adoption. I even saw the first DataFusion job listing recently, which shows that this effort is already having an impact on the industry.
After taking a break from working on Arrow and DataFusion for a couple of months to focus on some deliverables at work, I have now started a new PoC project to have a second attempt at building a distributed platform with Rust, and this time around I have the advantage of already having some foundational pieces in place, namely Arrow and DataFusion. I have also been gaining experience with Kubernetes recently and I could clearly see how this would simplify the creation of a distributed platform. The pieces really are starting to fall into place.
The new project is called Ballista and is a fast moving PoC taking a top down approach to building a distributed platform. Here are some key technical details on the architecture of the current PoC:
- A Ballista cluster currently consists of a number of individual pods within a Kubernetes cluster and can be created and destroyed via the Ballista CLI
- Executor nodes are created based on Kubernetes yaml templates, giving users the flexibility to customize deployments (to mount volumes for example)
- Ballista applications also gets deployed to Kubernetes using the Ballista CLI (they must be packaged as a Docker image first) and they use Kubernetes service discovery to connect to the cluster
- Ballista applications must currently manually build the query plans to be executed on the cluster, because there is no distributed query planner yet, and the query plans are sent to the executors using gRPC. The executors execute the query plans using DataFusion, so both CSV and Parquet data sources are supported. The data is returned to the application over gRPC. If subsequent processing is required to aggregate the data from the executors then the application has to do this using DataFusion (again, because there is no distributed query planner yet)
- Applications can use SQL or a Table/DataFrame-style API to build query plans
The example application is performing the following simple aggregate query against a cluster of 12 executors with each executor querying one month of data from the 2018 Yellow Taxi Trip Data data set.
SELECT passenger_count, MIN(fare_amount), MAX(fare_amount) FROM tripdata GROUP BY passenger_count
The application could have used the DataFusion Table API to build the query as an alternative to using SQL:
let t = csv("/path/to/tripdata")?; let passenger_count = t.col("passenger_count")?; let fare_amount = t.col("fare_amount")?; let group_expr = vec![passenger_count]; let aggr_expr = vec![t.min(&fare_amount)?, t.max(&fare_amount)?]; let aggr_query = t.aggregate(group_expr, aggr_expr)?; let plan = aggr_query.to_logical_plan();
The application then runs a secondary query on the union of the results from the executors to arrive at the final aggregate result:
SELECT passenger_count, MIN(min_fare_amount), MAX(max_fare_amount) FROM tripdata GROUP BY passenger_count
Here is a video showing the current PoC in action.
This PoC has limited functionality and usability (and some hacky code) but demonstrates that it is already possible to execute distributed queries on a cluster using Rust.
To progress this beyond a PoC and into a usable product, here are some of the things on the roadmap for v1.0.0:
- Implement a distributed query planner
- Implement support for all DataFusion logical plans and expressions
- Support user code as part of distributed query execution
- Support for interactive SQL queries against a cluster (via gRPC)
- Ability to persist results on executor nodes (to CSV and Parquet)
- Support for Arrow Flight protocol
- Java bindings (supporting Java, Kotlin, Scala)
How does this affect Apache Arrow and DataFusion development?
This PoC is being used to help drive requirements for DataFusion. In fact, even though this PoC was started less than two weeks ago, it has already led to three DataFusion PRs being merged into the Apache Arrow codebase.
I fully expect the work in Ballista to lead to further contributions to DataFusion. Specifically:
Query planning for distributed query execution is largely the same as query planning for parallel query execution and the query planner and optimizer should live in DataFusion (but should be extensible). Parallel query execution should also be in DataFusion.
The Ballista proto file is based on the Gandiva proto file but adds the concept of query plans. I will start a discussion on the Arrow mailing list about having one Arrow proto file for query plans and expressions that can be leveraged by both Gandiva and DataFusion as well as other projects
Although Ballista is using DataFusion today, there is no reason why it couldn’t support executor nodes based on Gandiva in the future since the interface is gRPC.
The source code is available at https://github.com/andygrove/ballista.
Follow me on Twitter at @andygrove73 to stay up to date with news on this project.