本篇適合對Airflow與Kubernetes有基礎了解的讀者
此篇主要介紹怎麼使用Airflow的KuberenetesPodOperator和KubernetesExecutor模式!
原本Airflow提供的Operator模式,像是BashOperator、PythonOperator等,都是在本機環境,起一個Process來去運行
但套用KuberenetesPodOperator之後,可以為單獨一個task,在Pod內以一個container底下運行,並且可以與其他Operator並存。然而KubernetesExecutor又是更全面,將一個Dag的所有Tasks,都各自起在Pod內運行,更可以達到分散式運行的效果
規格建議
每個node的規格至少 CPU 2 core, 4 GB Memory,不然運行airflow會卡很久
先建立一個Kubernetes Cluster環境
Kubernetes Cluster環境以kubeadm
為主,如何架設請參考這篇
使用kubeadm建立多節點的Kubernetes Cluster
本篇會透過以下規格來建立測試用的環境
k8s-master
1台Master Node (Control Plane)
e2-standard-2 (2 個 vCPU,8 GB 記憶體)
2台Worker Node
k8s-node
e2-standard-2 (2 個 vCPU,8 GB 記憶體)
k8s-node2
e2-standard-2 (2 個 vCPU,4 GB 記憶體)
為什麼worker node有一台是4GB? 之後會測試建立一個Memory需求大於4GB的task pod,看會不會真的分配給另外一台有8GB的機器
欲了解Master與Worker Node的差別,可至 Kubernetes 基礎概念/K8S架構說明 查看
使用 KuberenetesPodOperator
會透過 apache/airflow
的docker image在kubernetes上建置airflow的環境
所需的設定檔
會運行一個 v2.1.0版本的airflow環境
自己客製一個dag
以下範例會使用KubernetesPodOperator,透過pod運行環境印出簡單的內容,請把k8spodoperator_example.py
檔案放置到 /dags
資料夾內
k8spodoperator_example.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 from airflow import DAGfrom datetime import datetime, timedeltafrom airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperatorfrom airflow.operators.dummy_operator import DummyOperatordefault_args = { 'owner' : 'airflow' , 'depends_on_past' : False , 'start_date' : datetime.utcnow(), 'email' : ['airflow@example.com' ], 'email_on_failure' : False , 'email_on_retry' : False , 'retries' : 1 , 'retry_delay' : timedelta(minutes=5 ) } dag = DAG( 'k8spodoperator_example' , default_args=default_args, schedule_interval=timedelta(minutes=10 )) start = DummyOperator(task_id='start' , dag=dag) passing = KubernetesPodOperator(namespace='default' , image="python:3.6" , cmds=["python" ,"-c" ], arguments=["print('hello world')" ], labels={"foo" : "bar" }, name="passing-test" , task_id="passing-task" , get_logs=True , dag=dag ) failing = KubernetesPodOperator(namespace='default' , image="ubuntu:16.04" , cmds=["python" ,"-c" ], arguments=["print('hello world')" ], labels={"foo" : "bar" }, name="fail" , task_id="failing-task" , get_logs=True , dag=dag ) end = DummyOperator(task_id='end' , dag=dag) passing.set_upstream(start) failing.set_upstream(start) passing.set_downstream(end) failing.set_downstream(end)
Dockerfile
如果有自己自製的dag,可以在Dockerfile建置image時,將dag放置在 /opt/airflow/dags
資料夾內,供airflow讀取。不過待會使用image內建的範例為主
1 2 3 4 5 6 7 8 9 10 11 FROM apache/airflow:2.1.0 # 補足套件 RUN pip install --user --upgrade apache-airflow[password] RUN pip install --upgrade google-cloud-bigquery RUN pip install Flask-Bcrypt # 將自己定義的airflow dag放進ariflow images內 # ADD dags /opt/airflow/dags
建立image,這邊使用我自己的 docker hub
的名稱為 kubegut
,來建立image
1 docker build -t kubegut/airflowk8spodoperator:0 .0 .1 .
然後將image push到docker hub上,供k8s建立pod時pull用
1 docker push kubegut/airflowk8spodoperator:0 .0 .1
記得在接下來的yaml檔案設定,若image名稱有改變,請跟著把 kubegut/airflowk8spodoperator:0.0.1
更動成自己的image名稱
airflow.yaml
在此yaml檔案中,會調整一些環境變數的設定
以下yaml會啟動三個pod:
airflow: 提供web server服務
airflow-scheduler: task的排程調度與執行
airflow-initdb : 透過ariflow.cfg
設定檔案初始化一開始環境所需的資訊。要注意這個是透過initContainers
創建出來,執行完畢該Pod就會關閉
airflow.yaml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 apiVersion: apps/v1 kind: Deployment metadata: name: airflow namespace: airflow-k8spodoperator spec: replicas: 1 selector: matchLabels: name: airflow template: metadata: labels: name: airflow spec: serviceAccountName: airflow-k8spodoperator-serviceaccount automountServiceAccountToken: true containers: - args: - webserver - -p - "8000" env: - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN value: postgresql://postgres:password@airflow-db:5432/postgres - name: AIRFLOW__CORE__EXECUTOR value: LocalExecutor - name: AIRFLOW__WEBSERVER__AUTHENTICATE value: "True" - name: AIRFLOW__WEBSERVER__AUTH_BACKEND value: airflow.contrib.auth.backends.password_auth - name: AIRFLOW__API__AUTH_BACKEND value: airflow.api.auth.backend.basic_auth image: kubegut/airflowk8spodoperator:0.0.1 imagePullPolicy: Always name: airflow resources: {} volumeMounts: - mountPath: /opt/airflow/logs/ mountPropagation: None name: airflow-logs - args: - scheduler env: - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN value: postgresql://postgres:password@airflow-db:5432/postgres - name: AIRFLOW__CORE__EXECUTOR value: LocalExecutor image: kubegut/airflowk8spodoperator:0.0.1 imagePullPolicy: Always name: airflow-scheduler terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /opt/airflow/logs/ mountPropagation: None name: airflow-logs dnsPolicy: ClusterFirst initContainers: - command: ['airflow', 'db' , 'init' ] env: - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN value: postgresql://postgres:password@airflow-db:5432/postgres - name: AIRFLOW__CORE__EXECUTOR value: LocalExecutor image: kubegut/airflowk8spodoperator:0.0.1 imagePullPolicy: Always name: airflow-initdb restartPolicy: Always schedulerName: default-scheduler securityContext: {} shareProcessNamespace: false terminationGracePeriodSeconds: 600 volumes: - emptyDir: {} name: airflow-data - emptyDir: {} name: airflow-logs
airflow-db.yaml
airflow-db.yaml
: 建置一個儲存airflow資料的資料庫
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 apiVersion: apps/v1 kind: Deployment metadata: name: airflow-db namespace: airflow-k8spodoperator spec: replicas: 1 selector: matchLabels: name: airflow-db template: metadata: labels: name: airflow-db spec: containers: - env: - name: POSTGRES_PASSWORD value: password image: postgres:9.6 imagePullPolicy: IfNotPresent name: airflow-db terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /var/lib/postgresql/data mountPropagation: None name: postgresql-data dnsPolicy: ClusterFirst restartPolicy: Always schedulerName: default-scheduler terminationGracePeriodSeconds: 30 volumes: - emptyDir: {} name: postgresql-data
airflow-db-svc.yaml
airflow-db-svc.yaml
: 提供可存取airflow-db的service,供不同node的airflow服務存取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 apiVersion: v1 kind: Service metadata: name: airflow-db namespace: airflow-k8spodoperator spec: type: ClusterIP ports: - port: 5432 protocol: TCP targetPort: 5432 selector: name: airflow-db sessionAffinity: None status: loadBalancer: {}
airflow-svc-internal.yaml
airflow-svc-internal.yaml
使內部服務可以透過 domain name: airflow,聯繫到airflow pod服務
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 apiVersion: v1 kind: Service metadata: name: airflow namespace: airflow-k8spodoperator spec: type: ClusterIP ports: - port: 80 protocol: TCP targetPort: 8000 selector: name: airflow sessionAffinity: None status: loadBalancer: {}
airflow-svc-external.yaml
使外部服務可以透過 port: 31724,聯繫到內部的airflow pod服務
且externalTrafficPolicy設置為 Cluster
,表示若當前Node沒有對應label=airflow
的pod,會找尋其他Node
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 apiVersion: v1 kind: Service metadata: name: airflow-svc namespace: airflow-k8spodoperator spec: type: LoadBalancer externalTrafficPolicy: Cluster ports: - nodePort: 31724 port: 80 protocol: TCP targetPort: 8000 selector: name: airflow sessionAffinity: None
namespace.yaml
1 2 3 4 apiVersion: v1 kind: Namespace metadata: name: airflow-k8spodoperator
serviceaccount.yaml
在airflow-k8spodoperator
這個namespace底下創建一個service account叫做 airflow-k8spodoperator
,供k8s識別airflow-scheduler
用
1 2 3 4 5 apiVersion: v1 kind: ServiceAccount metadata: name: airflow-k8spodoperator-serviceaccount namespace: airflow-k8spodoperator
進行部署
1 2 3 4 5 6 7 8 kubectl apply -f namespace.yaml sleep 3 kubectl apply -f airflow-db-svc.yaml kubectl apply -f airflow-db.yaml kubectl apply -f airflow-svc-external.yaml kubectl apply -f airflow-svc-internal.yaml kubectl apply -f serviceaccount.yaml kubectl apply -f airflow.yaml
部署完成
賦予airflow權限
由於airflow-scheduler
這個pod會需要呼叫kubernetes的api service來創建pod,所以需要為之賦予權限
創建一個clusterrolebinding叫做 airflow-admin
,賦予所有namespace airflow-k8spodoperator
底下的airflow-k8spodoperator-serviceaccount
serviceaccount 都具備著k8s cluster admin
權限
1 2 3 kubectl create clusterrolebinding airflow-admin \ --clusterrole =admin \--serviceaccount =airflow-k8spodoperator:airflow-k8spodoperator-serviceaccount
Airflow with Kubernetes架構圖
使用剛剛yaml檔案設定起來的airflow環境如下所示
Client內過 airflow-svc-external
這個service,可以存取到運行airflow webserver的pod
airflow webserver與scheduler透過 airflow-svc-internal
,互相溝通
airflow webserver與scheduler可以透過airflow-db
的dns name來與airflow-db溝通
創建Airflow的帳號
至 airflow-scheduler container 創建user
1 kubectl exec --tty --stdin -n airflow-k8spodoperator [ariflow pod name] -c airflow-scheduler -- /bin/bash
產生user,帳號密碼為 airflow/airflow
,權限設定為 Admin
1 airflow users create -u airflow -p airflow -e test@test.com -f test -l test -r Admin
會冒出以下訊息,看到Admin user airflow created
代表已新增成功
1 2 3 4 5 6 7 8 airflow users create -u airflow -p airflow -e test@test.com -f test -l test -r Admin [2021-05-25 12:41:11,162] {providers_manager.py:299} WARNING - Exception when importing 'airflow.providers.google.common.hooks.leveldb.LevelDBHook' from 'apache-airflow-providers-google' package: No module named 'airflow.providers.google.common.hooks.leveldb' [2021-05-25 12:41:13,884] {providers_manager.py:299} WARNING - Exception when importing 'airflow.providers.google.common.hooks.leveldb.LevelDBHook' from 'apache-airflow-providers-google' package: No module named 'airflow.providers.google.common.hooks.leveldb' [2021-05-25 12:41:14,058] {manager.py:784} WARNING - No user yet created, use flask fab command to do it. [2021-05-25 12:41:15,913] {providers_manager.py:299} WARNING - Exception when importing 'airflow.providers.google.common.hooks.leveldb.LevelDBHook' from 'apache-airflow-providers-google' package: No module named 'airflow.providers.google.common.hooks.leveldb' [2021-05-25 12:41:16,059] {providers_manager.py:299} WARNING - Exception when importing 'airflow.providers.google.common.hooks.leveldb.LevelDBHook' from 'apache-airflow-providers-google' package: No module named 'airflow.providers.google.common.hooks.leveldb' Admin user airflow created
注意,若node只有2CPU、2GB的資源,會導致CPU過載而整個節點掛掉
接著透過 http://{airflow web server ip}:31724
登入airflow KuberenetesPodOperator與KubernetesExecutor並搭配GCP讀存Log資訊
帳密: airflow/airflow
看到登入畫面即建置成功!接著來運行Airflow範例: k8spodoperator_example
並記得將左邊按鈕打開,並且啟動他
Airflow會為類型為KubernetesPodOperator (falling-taks 與 passing-task)建立pod來運行
執行結果畫面
使用 KuberenetesExecutor
KubernetesExecutor提供dags運行所有tasks都會是以pod方式執行,所以在此模式中沒辦法搭配 KubernetesPodOperator。
所以必須要提供airflow一個基本pod的yaml檔案,可以讓每個task都有個基本的pod環境
首先準備一個自製的dag:k8sexecutor_example.py
,會運行3個task,其中use_custom_template
會運行客製的pod template,記得將該k8sexecutor_example.py
放置到 /dags
目錄內
task_1與task_2: 簡單印出log訊息
use_custom_template: 使用temp.yaml
,並印出yaml檔案所提供的環境變數
k8sexecutor_example.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 import loggingimport osfrom datetime import datetime, timedeltafrom pathlib import Pathfrom airflow import DAGfrom airflow.operators.python_operator import PythonOperatorAIRFLOW_HOME = os.environ['AIRFLOW_HOME' ] logger = logging.getLogger(__name__) dag = DAG( "k8sexecutor_example" , schedule_interval="0 1 * * *" , catchup=False , default_args={ "owner" : "admin" , "depends_on_past" : False , "start_date" : datetime(2020 , 8 , 7 ), "email_on_failure" : False , "email_on_retry" : False , "retries" : 2 , "retry_delay" : timedelta(seconds=30 ), "sla" : timedelta(hours=23 ), }, ) def mission_one () : logger.info('task-1 completed!' ) def mission_two () : logger.info('task-logger.info2 completed!' ) def use_custom_template () : logger.info('use custom template! echo the environment variable: HELLOWORLD' ) logger.info(os.environ['HELLOWORLD' ]) with dag: task_1 = PythonOperator( task_id="task-1" , python_callable=mission_one, ) task_2 = PythonOperator( task_id="task-2" , python_callable=mission_two, ), task_with_podtemplate = PythonOperator( task_id="task_with_podtemplate" , python_callable=use_custom_template, executor_config={ "pod_template_file" : os.path.join(AIRFLOW_HOME, "pod_templates/custom_template.yaml" ), }, ) task_1 >> task_with_podtemplate task_with_podtemplate >> task_2
所需設定檔案
airflow.yaml
於yaml檔案內 spec.template.spec.containers.env
提供AIRFLOW__KUBERNETES__POD_TEMPLATE_FILE
變數給pod,讓airflow曉得在/opt
目錄底下有temp.yaml
作為新增一個pod template給executor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 apiVersion: apps/v1 kind: Deployment metadata: name: airflow namespace: airflow-k8sexecutor spec: replicas: 1 selector: matchLabels: name: airflow template: metadata: labels: name: airflow spec: serviceAccountName: airflow-k8sexecutor-serviceaccount automountServiceAccountToken: true containers: - name: airflow args: - webserver - -p - "8000" env: - name: AIRFLOW_HOME value: '/opt/airflow' - name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS value: 'True' - name: AIRFLOW__KUBERNETES__POD_TEMPLATE_FILE value: '/opt/temp.yaml' - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN value: postgresql://postgres:password@airflow-db:5432/postgres - name: AIRFLOW__CORE__EXECUTOR value: KubernetesExecutor - name: AIRFLOW__WEBSERVER__AUTHENTICATE value: "True" - name: AIRFLOW__WEBSERVER__AUTH_BACKEND value: airflow.contrib.auth.backends.password_auth - name: AIRFLOW__API__AUTH_BACKEND value: airflow.api.auth.backend.basic_auth - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY value: 'kubegut/airflowk8sexec' - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG value: '0.0.1' - name: AIRFLOW__KUBERNETES__NAMESPACE value: airflow-k8sexecutor - name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME value: default - name: AIRFLOW__KUBERNETES__IN_CLUSTER value: 'true' - name: AIRFLOW__KUBERNETES__DAGS_IN_IMAGE value: 'true' - name: AIRFLOW__KUBERNETES__RUN_AS_USER value: '50000' image: kubegut/airflowk8sexec:0.0.1 imagePullPolicy: Always resources: {} volumeMounts: - mountPath: /opt/airflow/logs/ mountPropagation: None name: airflow-logs - args: - scheduler env: - name: AIRFLOW_HOME value: '/opt/airflow' - name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS value: 'True' - name: AIRFLOW__KUBERNETES__POD_TEMPLATE_FILE value: '/opt/temp.yaml' - name: AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC value: "15" - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN value: postgresql://postgres:password@airflow-db:5432/postgres - name: AIRFLOW__CORE__EXECUTOR value: KubernetesExecutor - name: AIRFLOW__KUBERNETES__NAMESPACE value: airflow-k8sexecutor - name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME value: default - name: AIRFLOW__KUBERNETES__IN_CLUSTER value: 'true' - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY value: 'kubegut/airflowk8sexec' - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG value: '0.0.1' - name: AIRFLOW__KUBERNETES__DAGS_IN_IMAGE value: 'true' - name: AIRFLOW__KUBERNETES__RUN_AS_USER value: '50000' image: kubegut/airflowk8sexec:0.0.1 imagePullPolicy: Always name: airflow-scheduler terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /opt/airflow/logs/ mountPropagation: None name: airflow-logs dnsPolicy: ClusterFirst initContainers: - command: ['airflow', 'db' , 'init' ] env: - name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS value: 'True' - name: AIRFLOW__CORE__REMOTE_LOGGING value: 'False' - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN value: postgresql://postgres:password@airflow-db:5432/postgres - name: AIRFLOW__CORE__EXECUTOR value: KubernetesExecutor - name: AIRFLOW__KUBERNETES__NAMESPACE value: airflow-k8sexecutor - name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME value: default - name: AIRFLOW__KUBERNETES__IN_CLUSTER value: 'true' - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY value: 'kubegut/airflowk8sexec' - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG value: '0.0.1' - name: AIRFLOW__KUBERNETES__DAGS_IN_IMAGE value: 'true' image: kubegut/airflowk8sexec:0.0.1 imagePullPolicy: Always name: airflow-initdb restartPolicy: Always schedulerName: default-scheduler securityContext: {} shareProcessNamespace: false terminationGracePeriodSeconds: 600 volumes: - emptyDir: {} name: airflow-data - emptyDir: {} name: airflow-logs
temp.yaml
因為Airflow的KubernetesExecutor模式,需要為每個task提供基本的pod yaml範例,所以 temp.yaml
主要用來提供pod的基本內容
temp.yaml
內spec.containers.imagePullPolicy
需設定為IfNotPresent
,
否則task pod會一直不斷重啟,但在airflow web server上顯示success的奇怪現象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 apiVersion: v1 kind: Pod metadata: name: dummy-name namespace: airflow-k8sexecutor spec: serviceAccountName: default restartPolicy: Never containers: - name: base image: kubegut/airflowk8sexec:0.0.1 imagePullPolicy: IfNotPresent ports: [] command: []
客製的pod template
custom_template.yaml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 apiVersion: v1 kind: Pod metadata: name: dummy-name namespace: airflow-k8sexecutor spec: serviceAccountName: default restartPolicy: Never containers: - name: base image: kubegut/airflowk8sexec:0.0.1 imagePullPolicy: IfNotPresent ports: [] command: [] env: - name: HELLOWORLD value: 'hello world!'
Dockerfile
記得將客製的yaml檔案 ,放置到dag運行時會指定的目錄
$AIRFLOW_HOME/pod_templates`
增加以下環境變數,提供給每個task啟動pod時候,都會共同吃到的環境變數
AIRFLOW__CORE__SQL_ALCHEMY_CONN
AIRFLOW__KUBERNETES__RUN_AS_USER
因為只有在airflow.yaml
中定義環境變數,並沒辦法傳到Executor所運行的pod中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 FROM apache/airflow:2.1.0RUN pip install --user --upgrade apache-airflow[password]RUN pip install --upgrade google-cloud-bigqueryRUN pip install Flask-BcryptENV AIRFLOW__CORE__SQL_ALCHEMY_CONN =postgresql://postgres:password@airflow-db:5432/postgres ENV AIRFLOW__KUBERNETES__RUN_AS_USER =50000 ADD temp.yaml /optADD dags /opt/airflow/dagsRUN mkdir /opt/airflow/pod_templatesADD custom_template.yaml /opt/airflow/pod_templates
建立image並push到docker hub
1 2 docker build -t kubegut/airflowk8sexec:0 .0 .1 .docker push kubegut/airflowk8sexec:0 .0 .1
其餘yaml和KubernetesPodOperator相同
請至 KuberenetesPodOperator版本
中引用以下yaml
然後namespace改成 airflow-k8sexecutor
,方便分類
airflow-db.yaml
airflow-db-svc.yaml
airflow-svc-internal.yaml
airflow-svc-external.yaml
namespace.yaml
serviceaccount.yaml
將name改成 airflow-k8sexecutor-serviceaccount
訣竅:在vim底下,可以把上述複製過來的檔案,放到同一個目錄下,執行以下這串即可換成airflow-k8sexecutor
這個namespace囉
1 sed -i 's/airflow-k8spodoperator/airflow-k8sexecutor/g' *.yaml
賦予airflow權限
記得再次授權,這裡的clusterrolebinding設置成airflow-admin-2,為k8sexecutor提供對應的角色名稱
1 2 3 kubectl create clusterrolebinding airflow-admin-2 \ --clusterrole =admin \--serviceaccount =airflow-k8sexecutor:airflow-k8sexecutor-serviceaccount
進行部署
1 2 3 4 5 6 7 8 kubectl apply -f namespace.yaml sleep 3 kubectl apply -f airflow-db-svc.yaml kubectl apply -f airflow-db.yaml kubectl apply -f airflow-svc-external.yaml kubectl apply -f airflow-svc-internal.yaml kubectl apply -f serviceaccount.yaml kubectl apply -f airflow.yaml
創建Airflow的帳號
透過以下指令進入到airflow-scheduler這個pod內,產生user帳號
1 kubectl exec --tty --stdin -n airflow-k8sexecutor [your ariflow pod name] -c airflow-scheduler -- /bin/bash
新增一組帳號
1 2 airflow users create -u airflow -p airflow \ -e test@test.com -f test -l test -r Admin
運行Airflow內建範例: k8sexecutor_example
可以看到airflow executor模式底下,會為每個task啟動各自pod的環境
task-1 pods
pod name: k8sexecutorexampletask1.4931009824d044e0ae19b2b517a61628
task-2 pods
pod name: k8sexecutorexampletask2.3d4c6da134794effb517e1c1cf500b00
task_with_podtemplate
pod name: k8sexecutorexampletaskwithpodtemplate.ff69ada0315247f5b95dd8143eea8003
問題:無法看到執行完畢的task的log…
由於task運行完畢後,pod也會跟著移除,所以log訊息也會跟著移除掉,導致我們再回到airflow UI查看時,會看到airflow找不到該pod的訊息
1 2 3 4 5 6 7 *** Trying to get logs (last 100 lines ) from worker pod k8sexecutorexampletaskwithpodtemplate.ff69ada0315247f5b95dd8143 *** *** Unable to fetch logs from worker pod k8sexecutorexampletaskwithpodtemplate.ff69ada0315247f5b95dd8143 *** (404 ) Reason: Not Found HTTP response headers: HTTPHeaderDict ({'Cache-Control' : 'no-cache, private' , 'Content-Type' : 'application/json' , 'Date' : 'Sun, 30 May 2021 10:01:36 GMT' , 'Content-Length' : '294' } ) HTTP response body: b' {"kind" :"Status" ,"apiVersion" :"v1" ,"metadata" :{},"status" :"Failure" ,"message" :"pods \\" k8sexecutorexampletaskwithpodtemplate.ff69ada0315247f5b95dd8143\\" not found" ,"reason" :"NotFound" ,"details" :{"name" :"k8sexecutorexampletaskwithpodtemplate.ff69ada0315247f5b95dd8143" ,"kind" :"pods" },"code" :404 }\n'
這時候需要一個將log永久儲存的方案,也就是下一小節這章會提到的,如何將log儲存在Google Cloud Storage,並讀取之
還記得一開始說的為什麼一開始worker node使用不同資源嗎
因為接下來要觀察,使用客制化的pod template,k8s是否會將需求大於4GB記憶體給分配到足夠資源的worker node上
custom_template.yaml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 apiVersion : v1kind : Podmetadata : name : dummy-name namespace : airflow-k8sexecutor spec : serviceAccountName : default restartPolicy : Never containers : - name : base image : kubegut/airflowk8sexec :0.0 .1 imagePullPolicy : IfNotPresent resources : limits : memory : "7Gi" requests : # 這裡調整成需要6 GB的記憶體,看會不會分配到 node1這台節點上運行 memory : "6Gi" ports : [] command : [] env : # Add extra environment - name : HELLOWORLD value : 'hello world!'
重新再跑一次 k8sexecutor_example
這個dag
透過下面指令監聽使用客製pod template的task task_with_podtemplate
會不會分配到具備8GB記憶體的k8s-node
節點
1 watch kubectl get pod -o=custom-columns=NAME :.metadata.name ,STATUS :.status .phase,NODE:.spec.nodeName --all -namespaces
可以看到使用客製pod template的task task_with_podtemplate
是在具備8GB memory的k8s-node
底下運行
透過讓task使用自己客製的pod template,可以讓我們為每個task都訂製獨特的執行需求環境!
參考
Airflow 動手玩
3 Ways to Run Airflow on Kubernetes