Apache Airflow at Pandora
Big data pipelines don’t run themselves. Beyond the obvious need to scale pipelines up to large data volumes, there’s a lot of work that goes into managing dependencies, handling retries, alerting, etc. This need only becomes more pressing as the organization grows and new teams seek to analyze existing data in different ways. As new teams add new pipelines, they generally want to build off the output of existing pipelines and they want to copy well-defined patterns around retries, alerts, etc so they can focus on the logic of their particular pipeline.
Pandora faced this issue long before the current plethora of open source schedulers were available. Faced with a problem without any obvious off-the-shelf solution, we built our own solution around the Quartz Scheduler (for the core scheduling logic) and Jetty (to serve up a web interface). While this solution suited our needs at the time, it’s limitations became more and more pronounced as the company grew:
- There was a high barrier to entry. We have a wide range of pipeline creators including Analysts, Scientists, and Engineers. Compiling and deploying Java code can be tough.
- For many folks, it was easier to use cron even though cron has no built-in mechanisms for managing dependencies, triggering alerts, etc.
- Understanding the structure of jobs was tricky. The team had added a lot of useful features to the web UI over the years, but there was little to no ability to visualize the structure of tasks within jobs or dependencies across jobs.
- Many of our jobs are long running jobs consisting of many different tasks. Occasionally, a flaky task would fail in the middle of a particularly long-running job. Faced with this sort of failure, folks had to choose between manually re-running the failed task and any downstream tasks or re-running the entire job. Not surprisingly, many chose the easier path of re-running the entire job even if it meant redoing a substantial amount of work.
- The team that owned our in-house scheduling framework was focused on other problems that were more critical to the business. When it comes time to decide if folks should work on new pipelines to support Pandora Premium or improve our in-house scheduler, the decision to push off any scheduler-related work was almost a foregone conclusion. When bugs were found or new features were requested, there were no dedicated resources to maintain and develop our in-house solution.
With these limitations becoming increasingly apparent, it was time to take a step back and see what other solutions existed. We wanted something that was widely used with lots of examples and documentation to help lower the learning curve and we wanted it to be open source so we could pickup bug fixes and new releases as needed.
- DAGs (Directed Acyclic Graph) are written in Python — Python is more familiar than Java to most analysts and scientists. It’s also easier to get started and iterate quickly when you’re not waiting for builds, etc.
- It’s easy to update individual DAGs — With our old system, pushing out updates was disruptive to all jobs running on that particular instance. We either had to wait for a window when nothing was running or kill currently running jobs and manually restart them afterwards.
- Airflow’s visualizations are helpful in so many different ways. Having a graphical overview of the DAG makes it easy to spot-check the structure as you build it and makes it easier for other folks on the team to get a sense of how things are organized (whether it’s because they need to change/update it or because they are responding to an alert in the middle of the night).
- With nearly 400 contributors to date there’s no doubt that Airflow is a popular open source project with a strong and growing community.
What is Airflow?
As the Airflow docs put it, “Apache Airflow is a way to programmatically author, schedule, and monitor data pipelines.”
There are extensive explanations of what Airflow does and how it works available online. For the unfamiliar, the key concepts are as follows:
- DAG: a directed acyclic graph object that ties together all the tasks in a cohesive workflow and dictates the execution frequency (i.e. schedule).
- Task: a unit of work to be executed that should be both atomic and idempotent. In Airflow there are two types of tasks: Operators and Sensors.
- Operator: a specific type of work to be executed. Airflow comes with many types out of the box such as the BashOperator which executes a bash command, the HiveOperator which executes a Hive command, the SqoopOperator, etc. It’s easy to create new ones for specific types of tasks. Some operators, like the BashOperator and the PythonOperator, are particularly flexible while others are more constrained.
- Sensor: a blocking task that runs until a condition is met or until it times out. Sensor tasks are used as upstream dependencies of Operators. They prevent work from starting until certain preconditions are met. For example, if you have a Hive script that depends on certain table partitions existing, you may use a HivePartitionSensor to block until they are created.
Besides the basic building blocks, Airflow includes a powerful UI and an even more versatile CLI. In addition, Airflow allows us to add customizations via its support for plugins.
Our Infrastructure & Deployment
When a team is interested in using Airflow, they make a request to the Infrastructure team to setup a new instance for them. The Infrastructure team uses Ansible to deploy instances in a repeatable/scalable way (we are up to 26 production instances). The Infrastructure team is responsible for the overall health of the system (e.g. monitoring and alerting) and each team is responsible for building and maintaining their DAG definitions.
Ansible is software that automates software provisioning, configuration management, and application deployment.
As mentioned above, we use Ansible to automate setting up and updating all of our Airflow instances. Our Ansible playbook creates a supervisor instance, installs the Airflow dependencies, creates all the necessary directories (such as the DAGs, logs, and plugin directories), installs our plugin, registers this instance in Consul (amongst other things, we use Consul to define basic health checks and define a scrape target for Prometheus; more on this later), creates the necessary databases in Postgres, adds some custom connections and more.
One of the drawbacks of this approach is that we must ensure the machines we are deploying to via Ansible have certain utilities the playbook is expecting. We have ambitions to “containerize” things so that we can deploy to our internal cloud infrastructure using Nomad and Docker. However, this is still a work in progress.
Supervisor is a client/server system that allows its users to monitor and control a number of processes on UNIX-like operating systems.
We use supervisor to control all of our airflow processes: the webserver, the scheduler, and the workers. Having the supervisor UI to check the process logs and perform actions on the processes such as stop, restart, etc. have proven to be very valuable and it makes maintaining easier.
Consul is a tool for discovering and configuring services in your infrastructure. It provides several key features: Service Discovery, Health Checking, KV Store, and Multi Datacenter.
Fabio is a fast, modern, zero-conf load balancing HTTP(S) and TCP router for deploying applications managed by consul.
We have a lot of different teams with their own Airflow instance. It would be a pain to keep track of where each instance was running if it had to be done manually. Thankfully, Consul makes it easy for us to register the instances as we create them. We then have a central system to turn to when folks ask “where is instance XYZ running?” Additionally, we use certain tags in the service definition to identify these instances as Prometheus scrape targets (more on this later).
Beyond service registration, we also define basic health checks in Consul. We have a simple HTTP check which gives us a basic gauge of the health of the webserver. We also wanted to have a health check that gave us some visibility into the health of Airflow’s scheduler and workers. We settled on using a TTL check which is updated by a simple DAG we schedule on all of our instances by default.
By registering our Airflow instances in Consul, we were also able to leverage consul-template to quickly build a dashboard that shows where all of our instances are deployed (i.e. hostname and port) and the status of the health checks for that instance. Although all of this information is available by navigating around the Consul UI, it was convenient to have a simplified and consolidated dashboard where we could check on instances “at a glance.”
Lastly, we use Fabio to provide more memorable, user-friendly URLs. We didn’t want our users to have to access their instances using URLs like machineXYZ:9001. Fabio gave us an easy way to give teams unique, team-specific names for their instances.
Prometheus is a systems and service monitoring system. It collects metrics from configured targets at given intervals, evaluates rule expressions, displays the results, and can trigger alerts if some condition is observed to be true.
Grafana is an open source metric analytics & visualization suite. It is most commonly used for visualizing time series data for infrastructure analytics.
We use Prometheus to monitor our Airflow instances and configure alerts around certain key metrics. Our internal plugin adds a ‘/metrics’ endpoint to each Airflow instance. More specifically, Airflow enables the addition of new web views via Flask Blueprints. Within the context of the plugin it’s easy to calculate instance specific metrics such as log size, number of database connections and DagBag processing time and others. We use Prometheus’ AlertManager to configure alerts for some of these metrics and for general instance health (e.g. are either of the Consul health checks failing?).
Once Prometheus starts scraping metrics, it’s easy to build dashboards in Grafana to visualize those metrics.This is really helpful for eyeballing if something seems off and allows us to be proactive in catching discrepancies before users notice.
Aside from metrics we collect via Prometheus, Airflow does have some support for sending stats to StatsD. We initially wanted to use statsd_exporter to forward these stats along to Prometheus, but ultimately decided not to after learning that some of these stats were incorrect/inaccurate (see AIRFLOW-774).
We use Airflow’s support for LDAP to authenticate users and limit access to the UI to members of the relevant team. The Infrastructure team is also granted access so that they can quickly debug issues if/when they arise. This can be accomplished by using the following “trick” for the user filter value in the airflow.cfg:
user_filter = |(memberOf=cn=teamXYZ,ou=groups,dc=...,dc=...)(memberOf=cn=infrastructure,ou=groups,dc=...,dc=...)
In the above expression, the “|” operates as a logical OR allowing members of teamXYZ and the infrastructure team to login to this particular instance.
After some debate, we decided to maintain our own internal fork of Airflow. Airflow still has its fair share of bugs, but with such a large and active community we find that fixes are often available and committed to master by the time we run into them. We didn’t want to subject ourselves to the instability of master, but we did want to have the flexibility to cherry pick certain fixes and back port them to more stable release branches. Right now our internal release is based off of the 1.8.1 release branch and we plan to upgrade to 1.9.1 when that release is made available.
For the times when we run into a bug that hasn’t yet been addressed by the community, we also commit fixes to our internal fork. In order to prevent our fork from drifting too far from the publicly available versions we almost always contribute these changes back. So far we have submitted and merged small UI fixes, additions to the HiveOperator, corrections to the SqoopOperator, success/failure callbacks on the DAG object level, and more.
We have used Airflow’s plugin mechanism to add a number of customizations to our Airflow instances including adding Pandora-specific Operators, Sensors, Views, Blueprints, Connections and Menu Links. For example, we’ve added a view that allows users to add new DAGs or sync existing DAGs from their BitBucket repositories and another view that allows folks to backfill directly from the UI instead of needing to use the CLI.
We have also added our own preloaded DAGs to these instances to do simple tasks. As mentioned previously, we have a “canary” DAG that exists solely to satisfy the TTL health check we’ve setup in Consul. If there are any issues with the instance that prevent this DAG from running, the Consul health check will start failing and the infrastructure team will receive an alert so that any issues can be addressed as soon as possible. Some of the other preloaded DAGs are simple examples showcasing some of the custom operators we’ve created. Other DAGs assist with basic maintenance tasks like clearing out logs older than a certain limit. It’s interesting to use Airflow to monitor and maintain itself.
from pandora_airflow_plugin.operators.postgres_sensors import PostgresTableSensor
airflow_import_error_psql_sensor = \
As mentioned before, we still run into little bugs a bit more often than we’d like. The good news is that scouring Airflow JIRA tickets often reveals a fix or a discussion that sheds light on what is happening.
Some of the issues we continue to face include:
- The UI has some issues and idiosyncrasies that can throw off newcomers. One example is when new DAGs are added or updated, the webserver processes don’t sync immediately, causing users to get confused. Another common pain point for us in the task view. Clicking on a specific execution date of a task doesn’t actually take you to the run for that execution date. Instead you are taken to the DAG page which defaults to showing the latest run. Luckily the fix is easy and just needs to be implemented. There are a handful of other bugs we have come across like stated above that just don’t match expectations.
- Manual, Backfill, and Scheduled runs are all treated differently. Users often ask for help getting a manual run to actually run (it must have schedule turned on) and getting backfills to respect max_active_runs. There are a handful of other inconsistencies that repeatedly cause confusion.
- It should be easier to kill DAGs in bulk. If you accidentally set off multiple DagRuns (and if you use Airflow, you’ve probably done this at some point or another) there’s no easy way to stop all the executions.
- The scheduler will stall out sometimes on our more heavily used instances. Tweaking the num_runs and run-duration options helped, but it’s still a bit of mystery to us why this is happening. This particular issue is usually preceded by a sharp increase in DAG file processing times.
- Airflow is a fast-growing open source project, which is awesome, but with so many contributors it can sometimes be difficult to look through the source code and understand what the intention was. Code styles and conventions are different, and sometimes “fixes” get through that could benefit from further discussion (for example AIRFLOW-1349).
- Airflow currently does not have the capability to have a task that creates tasks. There are cases, for example, where you may want to set off a task per log file but you don’t know before the DAG starts just how many log files will exist. Currently you’d have to bundle the processing of all those log files into a single task.
Just like Airflow has grown popular in the tech community, it is quickly gaining traction within Pandora. Even with some shortcomings, it’s a very powerful tool that continues to get better over time. We hope you find this post on how we deployed Airflow here at Pandora helpful. We would be happy to respond to any questions or comments. Thank you!