Ref: https://airflow.apache.org/
Ref: Extra Packages
Ref: https://github.com/tuanavu/airflow-tutorial [youtube教程和代碼]
有點太全了,還是i一步一步的學習的代碼demo為好。
Ref: How to write your first DAG in Apache Airflow - Airflow tutorials.
Ref: Airflow tutorial 2: Set up airflow environment with docker
學習嚮導
一、跟着官方教程走
安裝Airflow。
pip install \
apache-airflow[postgres,gcp]==1.10.12 \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-1.10.12/constraints-3.7.txt"
可能安裝過程中,會出現類似的報錯。
ERROR: Cannot uninstall 'PyYAML'. It is a distutils installed project and thus we cannot accurately determine which files belong to it which would lead to only a partial uninstall.
暫時降級pip的版本,安裝即可。安裝完畢後,再reset pip version to latest。
I used follow steps and resolved this problem
Reduced version:
pip install --upgrade --force-reinstall pip==9.0.3
Tried to re-install package:
pip install xxx --disable-pip-version-check
At last, recover the latest version for pip:
pip install --upgrade pip
Ref: Ubuntu18.04安裝Airflow
運行需要以下幾步。
export AIRFLOW_HOME=~/airflow
9、初始化數據庫
airflow initdb
10、啓動Web服務器,默認端口為8080
airflow webserver -p 8080
11、啓動調度程序
airflow scheduler
二、跟着example走
Ref: airflow環境搭建與使用
DAG:dag_id
Schedule:調度時間
Owner:dag擁有者
Recent Tasks:這裏包含9個圓圈,每個圓圈代表task的執行狀態和次數
圈1 success:現實成功的task數,基本上就是該tag包含多少個task,這裏基本上就顯示幾。
圈2 running:正在運行的task數
圈3 failed:失敗的task數
圈4 unstream_failed:
圈5 skipped:跳過的task數
圈6 up_for_retry:執行失敗的task,重新執行的task數
圈7 queued:隊列,等待執行的task數
圈8 :
圈9 scheduled:剛開始調度dag時,這一次執行總共調度了dag下面多少個task數,並且隨着task的執行成功,數值逐漸減少。
Last Run:dag最後執行的時間點
DAG Runs:這裏顯示dag的執行信息,包括3個圓圈,每個圓圈代表dag的執行狀態和次數
圈1 success:總共執行成功的dag數,執行次數
圈2 runing:正在執行dag數
圈3 faild:執行失敗的dag數
Links:
Code View: 查看任務執行代碼
Logs :查看執行日誌,比如失敗原因
Refresh :刷新dag任務
Delete Dag :刪除該dag任務
三、跟着教學視頻走
Ref: [Getting started with Airflow - 1] Installing and running Airflow using docker and docker-compose [視頻不錯]
Ref: https://www.manning.com/books/data-pipelines-with-apache-airflow
江湖地位
一、ETL
Ref: 英語流利説基礎數據平台
- 基於 S3 和 EMR 做到存儲層面的共享,而在計算資源層面做到隔離。
- Amazon EMR 在於我們可以很方便並快速的構建一個基於 Hadoop,Spark,Hive等大數據產品的計算集羣,如果不是需要長久服役,我們可以在其所有 Job 完成之後,銷燬集羣,而並不影響數據的持久化,因為所有的數據都保存在 S3。
對於集羣上的任務,他們的特點可能都不太一樣,比如
- 推薦和算法業務可能對集羣的計算能力要求較高,
- 而 ETL 類型的任務,可能又對存儲或內存要求較高。
- 任務與EMR集羣管理系統 - Execution Service
Execution Service(以下稱為ES)用來管理EMR集羣的創建,擴容,以及銷燬,另外還負責每天流利説所有調度任務的提交,執行,以及結果的反饋。 目前ES支持的任務類型包括 Bash,HQL(Hive SQL Script),以及 Spark。
所有對集羣的操作,以及提交的任務,我們統一維護到 MySQL 中。
- Airflow
流利説目前所有的 ETL 任務都是通過 Airflow 來調度的,並且 Airflow Task 之間的依賴性可通過 Python 來定義,這使得我們的學習以及維護成本更低。
ETL 的基本流程是,我們通過數據同步工具,把數據以 T+1 的方式從 MongoDB / Redis Dump 到 S3,並且在此過程中,同樣會把表的結構同步到 S3,然後利用最新的表結構和數據,在 Hive 中建立相對應的庫和表。
對於原始數據,一般數據最初以 Json 的格式保存到一個名為 raw_data 的庫中,在後續的 ETL Job 中,我們會對 raw_data 庫中的表進行清洗,計算以及轉換,最終數據以 ORC 或是 Parquet 的方式保存到 Prod 的庫中。
另外,Airflow的調度同樣承擔每天集羣的構建工作,整個生命週期類似:Start -> CreateCluster -> Jobs submission --> TerminateCluster --> End.
End.