Airflow 是一個流行的工作流程管理工具,常用來作為資料管線 (data pipeline) 以完成常見的 ETL ⎘ 任務。本文山姆鍋示範如何以 Airflow 作為殺雞用牛刀、大材小用的例子。
定期執行某些背景任務是常見的系統需求,如果沒有特別需要,在 Unix-like 系統可以使用 Cron ⎘ 或者類似的服務來達成;對於 Python 應用來說,Celery Beat 也是常見的方案。在山姆鍋目前開發的產品中,使用 Airflow ⎘ 負責背景工作流程的排程工作,同時也使用 Celery Beat 作定時任務的排程。既然決定使用 Airflow, Celery Beat 的定時排程功能就重複了。所以,有了以 Airflow 來取代 Celery Beat 的想法。
架構組態
下圖是此範例使用的元件組成:
PostgreSQL Database
Airflow 使用 PostgreSQL Database 作為狀態儲存以及排程協調工具 (透過 SQL SELECT FOR UPDATE)。
Airflow Scheduler
使用兩個 Scheduler,A 跟 B,如此可以測試將其中任意一個 Scheduler 終止會有什麼效果。由於使用 LocalExecutor, DAG 的任務 (task) 也在此容器執行 (使用 sub-process 執行不同的任務)。
Airflow Webserver
Airflow Webserver 提供 Airflow 的 Web 使用者介面以及 REST API。
Airflow Init
Airflow Init 負責資料庫初始化工作,會建立資料庫表格以及預設的使用者。
準備工作
此文的範例只在 OS X(x86_64 架構) 的筆電上測試過,假設測試環境已經安裝好 Docker 以及 Docker compose 工具。
為了測試目的,在 dags/ 目錄中已經有 job-1.py 以及 job-2.py 這兩個 DAGs。兩個都只是簡單地每分鐘執行一次且在標準輸出顯示不同的簡短訊息。
本文範例程式碼在此 Repo ⎘
操作步驟
首先,需要初始化設定資料:
docker-compose up airflow-init
一旦資料設定完成,就可以啟用整個環境:
docker-compose up
環境啟動後,每一分鐘應該可以看到 Scheduler A 或者 B 執行任務的紀錄輸出,Airflow Web UI 可以透過 http://localhost:8080 ⎘ 網址,使用瀏覽器存取。
此環境共有 Scheduler A 以及 Scheduler B, 可以使用下列指令將其中一個終止 (這裡以 Scheduler A 為例)
docker-compose stop airflow-scheduler-a
小結
只把 Airflow 當作定時器看似大材小用,但在無外部高可用 (HA) 定時排程服務 (e.g. Cloud Scheduler, K8S Cronjob) 時,使用 Airflow 可以實現一個高可用且負載均衡的工作排程器。山姆鍋對於 Airflow 並不夠熟悉也還沒太多經驗,請把本文內容純粹視為個人的試驗結果。