Airflow-KuberenetesPodOperator與KubernetesExecutor使用紀錄與介紹

Posted by Kubeguts on 2021-05-30

本篇適合對Airflow與Kubernetes有基礎了解的讀者

此篇主要介紹怎麼使用Airflow的KuberenetesPodOperator和KubernetesExecutor模式!

原本Airflow提供的Operator模式,像是BashOperator、PythonOperator等,都是在本機環境,起一個Process來去運行

但套用KuberenetesPodOperator之後,可以為單獨一個task,在Pod內以一個container底下運行,並且可以與其他Operator並存。然而KubernetesExecutor又是更全面,將一個Dag的所有Tasks,都各自起在Pod內運行,更可以達到分散式運行的效果

規格建議

每個node的規格至少 CPU 2 core, 4 GB Memory,不然運行airflow會卡很久

先建立一個Kubernetes Cluster環境

Kubernetes Cluster環境以kubeadm為主,如何架設請參考這篇

使用kubeadm建立多節點的Kubernetes Cluster

本篇會透過以下規格來建立測試用的環境

  1. k8s-master 1台Master Node (Control Plane)
  • e2-standard-2 (2 個 vCPU,8 GB 記憶體)
  1. 2台Worker Node
  • k8s-node e2-standard-2 (2 個 vCPU,8 GB 記憶體)
  • k8s-node2 e2-standard-2 (2 個 vCPU,4 GB 記憶體)

為什麼worker node有一台是4GB? 之後會測試建立一個Memory需求大於4GB的task pod,看會不會真的分配給另外一台有8GB的機器

欲了解Master與Worker Node的差別,可至 Kubernetes 基礎概念/K8S架構說明 查看

使用 KuberenetesPodOperator

會透過 apache/airflow 的docker image在kubernetes上建置airflow的環境

所需的設定檔

會運行一個 v2.1.0版本的airflow環境

自己客製一個dag

以下範例會使用KubernetesPodOperator,透過pod運行環境印出簡單的內容,請把k8spodoperator_example.py檔案放置到 /dags資料夾內

k8spodoperator_example.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.utcnow(),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}

dag = DAG(
'k8spodoperator_example', default_args=default_args, schedule_interval=timedelta(minutes=10))


start = DummyOperator(task_id='start', dag=dag)

passing = KubernetesPodOperator(namespace='default',
image="python:3.6",
cmds=["python","-c"],
arguments=["print('hello world')"],
labels={"foo": "bar"},
name="passing-test",
task_id="passing-task",
get_logs=True,
dag=dag
)

failing = KubernetesPodOperator(namespace='default',
image="ubuntu:16.04",
cmds=["python","-c"],
arguments=["print('hello world')"],
labels={"foo": "bar"},
name="fail",
task_id="failing-task",
get_logs=True,
dag=dag
)

end = DummyOperator(task_id='end', dag=dag)


passing.set_upstream(start)
failing.set_upstream(start)
passing.set_downstream(end)
failing.set_downstream(end)

Dockerfile

如果有自己自製的dag,可以在Dockerfile建置image時,將dag放置在 /opt/airflow/dags資料夾內,供airflow讀取。不過待會使用image內建的範例為主

1
2
3
4
5
6
7
8
9
10
11
FROM apache/airflow:2.1.0

# 補足套件

RUN pip install --user --upgrade apache-airflow[password]
RUN pip install --upgrade google-cloud-bigquery
RUN pip install Flask-Bcrypt

# 將自己定義的airflow dag放進ariflow images內
## 記得在當前目錄要創建一個dags資料夾,不然會build failed
ADD dags /opt/airflow/dags

建立image,這邊使用我自己的 docker hub的名稱為 kubegut,來建立image

1
docker build -t kubegut/airflowk8spodoperator:0.0.1 .

然後將image push到docker hub上,供k8s建立pod時pull用

1
docker push kubegut/airflowk8spodoperator:0.0.1

記得在接下來的yaml檔案設定,若image名稱有改變,請跟著把 kubegut/airflowk8spodoperator:0.0.1 更動成自己的image名稱

airflow.yaml

在此yaml檔案中,會調整一些環境變數的設定

  • 修改AIRFLOW__API__AUTH_BACKEND的value為airflow.api.auth.backend.basic_auth

  • 原本官方提供的yaml有設置livenessProbereadinessProbe的部分註解,由於server起得較久,會導致probe不斷去偵測會一直報錯,故會把偵測的部分給註解掉

以下yaml會啟動三個pod:

  1. airflow: 提供web server服務
  2. airflow-scheduler: task的排程調度與執行
  3. airflow-initdb : 透過ariflow.cfg設定檔案初始化一開始環境所需的資訊。要注意這個是透過initContainers創建出來,執行完畢該Pod就會關閉

airflow.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow
namespace: airflow-k8spodoperator
spec:
replicas: 1
selector:
matchLabels:
name: airflow
template:
metadata:
labels:
name: airflow
spec:
serviceAccountName: airflow-k8spodoperator-serviceaccount
automountServiceAccountToken: true
containers:
- args:
- webserver
- -p
- "8000"
env:
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: postgresql://postgres:password@airflow-db:5432/postgres
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
- name: AIRFLOW__WEBSERVER__AUTHENTICATE
value: "True"
- name: AIRFLOW__WEBSERVER__AUTH_BACKEND
value: airflow.contrib.auth.backends.password_auth
- name: AIRFLOW__API__AUTH_BACKEND
value: airflow.api.auth.backend.basic_auth
image: kubegut/airflowk8spodoperator:0.0.1
imagePullPolicy: Always
# Webserver的檢查
# livenessProbe:
# failureThreshold: 20
# httpGet:
# path: /admin/
# port: 8000
# scheme: HTTP
# periodSeconds: 10
# successThreshold: 1
# timeoutSeconds: 3
name: airflow
# readinessProbe:
# failureThreshold: 3
# httpGet:
# path: /admin/
# port: 8000
# scheme: HTTP
# periodSeconds: 10
# successThreshold: 1
# timeoutSeconds: 1
resources: {}
volumeMounts:
- mountPath: /opt/airflow/logs/
mountPropagation: None
name: airflow-logs
- args:
- scheduler
env:
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: postgresql://postgres:password@airflow-db:5432/postgres
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
image: kubegut/airflowk8spodoperator:0.0.1
imagePullPolicy: Always
name: airflow-scheduler
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /opt/airflow/logs/
mountPropagation: None
name: airflow-logs
dnsPolicy: ClusterFirst
initContainers:
- command: ['airflow', 'db', 'init']
env:
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: postgresql://postgres:password@airflow-db:5432/postgres
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
image: kubegut/airflowk8spodoperator:0.0.1
imagePullPolicy: Always
name: airflow-initdb
restartPolicy: Always
schedulerName: default-scheduler
securityContext: {}
shareProcessNamespace: false
terminationGracePeriodSeconds: 600
volumes:
- emptyDir: {}
name: airflow-data
- emptyDir: {}
name: airflow-logs

airflow-db.yaml

airflow-db.yaml: 建置一個儲存airflow資料的資料庫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow-db
namespace: airflow-k8spodoperator
spec:
replicas: 1
selector:
matchLabels:
name: airflow-db
template:
metadata:
labels:
name: airflow-db
spec:
containers:
- env:
- name: POSTGRES_PASSWORD
value: password
image: postgres:9.6
imagePullPolicy: IfNotPresent
name: airflow-db
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /var/lib/postgresql/data
mountPropagation: None
name: postgresql-data
dnsPolicy: ClusterFirst
restartPolicy: Always
schedulerName: default-scheduler
terminationGracePeriodSeconds: 30
volumes:
- emptyDir: {}
name: postgresql-data

airflow-db-svc.yaml

airflow-db-svc.yaml: 提供可存取airflow-db的service,供不同node的airflow服務存取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
apiVersion: v1
kind: Service
metadata:
name: airflow-db
namespace: airflow-k8spodoperator
spec:
type: ClusterIP
ports:
- port: 5432
protocol: TCP
targetPort: 5432
selector:
name: airflow-db
sessionAffinity: None
status:
loadBalancer: {}

airflow-svc-internal.yaml

airflow-svc-internal.yaml使內部服務可以透過 domain name: airflow,聯繫到airflow pod服務

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
apiVersion: v1
kind: Service
metadata:
name: airflow
namespace: airflow-k8spodoperator
spec:
type: ClusterIP
ports:
- port: 80
protocol: TCP
targetPort: 8000
selector:
name: airflow
sessionAffinity: None
status:
loadBalancer: {}

airflow-svc-external.yaml

使外部服務可以透過 port: 31724,聯繫到內部的airflow pod服務

且externalTrafficPolicy設置為 Cluster,表示若當前Node沒有對應label=airflow的pod,會找尋其他Node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
apiVersion: v1
kind: Service
metadata:
name: airflow-svc
namespace: airflow-k8spodoperator
spec:
type: LoadBalancer
externalTrafficPolicy: Cluster
ports:
- nodePort: 31724
port: 80
protocol: TCP
targetPort: 8000
selector:
name: airflow
sessionAffinity: None

namespace.yaml

1
2
3
4
apiVersion: v1
kind: Namespace
metadata:
name: airflow-k8spodoperator

serviceaccount.yaml

airflow-k8spodoperator這個namespace底下創建一個service account叫做 airflow-k8spodoperator,供k8s識別airflow-scheduler

1
2
3
4
5
apiVersion: v1
kind: ServiceAccount
metadata:
name: airflow-k8spodoperator-serviceaccount
namespace: airflow-k8spodoperator

進行部署

1
2
3
4
5
6
7
8
kubectl apply -f namespace.yaml
sleep 3
kubectl apply -f airflow-db-svc.yaml
kubectl apply -f airflow-db.yaml
kubectl apply -f airflow-svc-external.yaml
kubectl apply -f airflow-svc-internal.yaml
kubectl apply -f serviceaccount.yaml
kubectl apply -f airflow.yaml

部署完成

img05

賦予airflow權限

由於airflow-scheduler這個pod會需要呼叫kubernetes的api service來創建pod,所以需要為之賦予權限

創建一個clusterrolebinding叫做 airflow-admin,賦予所有namespace airflow-k8spodoperator底下的airflow-k8spodoperator-serviceaccount serviceaccount 都具備著k8s cluster admin 權限

1
2
3
kubectl create clusterrolebinding airflow-admin \
--clusterrole=admin \
--serviceaccount=airflow-k8spodoperator:airflow-k8spodoperator-serviceaccount

Airflow with Kubernetes架構圖

使用剛剛yaml檔案設定起來的airflow環境如下所示

  • Client內過 airflow-svc-external這個service,可以存取到運行airflow webserver的pod
  • airflow webserver與scheduler透過 airflow-svc-internal,互相溝通
  • airflow webserver與scheduler可以透過airflow-db的dns name來與airflow-db溝通

dg01

創建Airflow的帳號

至 airflow-scheduler container 創建user

1
kubectl exec --tty --stdin -n airflow-k8spodoperator [ariflow pod name] -c airflow-scheduler -- /bin/bash

產生user,帳號密碼為 airflow/airflow,權限設定為 Admin

1
airflow users create -u airflow -p airflow -e test@test.com -f test -l test -r Admin

會冒出以下訊息,看到Admin user airflow created代表已新增成功

1
2
3
4
5
6
7
8
airflow users create -u airflow -p airflow -e test@test.com -f test -l test -r Admin

[2021-05-25 12:41:11,162] {providers_manager.py:299} WARNING - Exception when importing 'airflow.providers.google.common.hooks.leveldb.LevelDBHook' from 'apache-airflow-providers-google' package: No module named 'airflow.providers.google.common.hooks.leveldb'
[2021-05-25 12:41:13,884] {providers_manager.py:299} WARNING - Exception when importing 'airflow.providers.google.common.hooks.leveldb.LevelDBHook' from 'apache-airflow-providers-google' package: No module named 'airflow.providers.google.common.hooks.leveldb'
[2021-05-25 12:41:14,058] {manager.py:784} WARNING - No user yet created, use flask fab command to do it.
[2021-05-25 12:41:15,913] {providers_manager.py:299} WARNING - Exception when importing 'airflow.providers.google.common.hooks.leveldb.LevelDBHook' from 'apache-airflow-providers-google' package: No module named 'airflow.providers.google.common.hooks.leveldb'
[2021-05-25 12:41:16,059] {providers_manager.py:299} WARNING - Exception when importing 'airflow.providers.google.common.hooks.leveldb.LevelDBHook' from 'apache-airflow-providers-google' package: No module named 'airflow.providers.google.common.hooks.leveldb'
Admin user airflow created
注意,若node只有2CPU、2GB的資源,會導致CPU過載而整個節點掛掉

接著透過 http://{airflow web server ip}:31724登入airflow KuberenetesPodOperator與KubernetesExecutor並搭配GCP讀存Log資訊

帳密: airflow/airflow

看到登入畫面即建置成功!接著來運行Airflow範例: k8spodoperator_example
並記得將左邊按鈕打開,並且啟動他

img13

Airflow會為類型為KubernetesPodOperator (falling-taks 與 passing-task)建立pod來運行

img14

img15

執行結果畫面

img16

使用 KuberenetesExecutor

KubernetesExecutor提供dags運行所有tasks都會是以pod方式執行,所以在此模式中沒辦法搭配 KubernetesPodOperator。

所以必須要提供airflow一個基本pod的yaml檔案,可以讓每個task都有個基本的pod環境

首先準備一個自製的dag:k8sexecutor_example.py,會運行3個task,其中use_custom_template會運行客製的pod template,記得將該k8sexecutor_example.py放置到 /dags目錄內

  • task_1與task_2: 簡單印出log訊息
  • use_custom_template: 使用temp.yaml,並印出yaml檔案所提供的環境變數

k8sexecutor_example.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import logging
import os
from datetime import datetime, timedelta
from pathlib import Path

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

AIRFLOW_HOME = os.environ['AIRFLOW_HOME']

logger = logging.getLogger(__name__)

dag = DAG(
"k8sexecutor_example",
schedule_interval="0 1 * * *",
catchup=False,
default_args={
"owner": "admin",
"depends_on_past": False,
"start_date": datetime(2020, 8, 7),
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(seconds=30),
"sla": timedelta(hours=23),
},
)

def mission_one():
logger.info('task-1 completed!')

def mission_two():
logger.info('task-logger.info2 completed!')

def use_custom_template():
logger.info('use custom template! echo the environment variable: HELLOWORLD')
logger.info(os.environ['HELLOWORLD'])

with dag:
task_1 = PythonOperator(
task_id="task-1",
python_callable=mission_one,
)
task_2 = PythonOperator(
task_id="task-2",
python_callable=mission_two,
),
task_with_podtemplate = PythonOperator(
task_id="task_with_podtemplate",
python_callable=use_custom_template,
executor_config={
"pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/custom_template.yaml"),
},
)

task_1 >> task_with_podtemplate
task_with_podtemplate >> task_2

所需設定檔案

airflow.yaml

於yaml檔案內 spec.template.spec.containers.env 提供AIRFLOW__KUBERNETES__POD_TEMPLATE_FILE變數給pod,讓airflow曉得在/opt目錄底下有temp.yaml 作為新增一個pod template給executor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow
namespace: airflow-k8sexecutor
spec:
replicas: 1
selector:
matchLabels:
name: airflow
template:
metadata:
labels:
name: airflow
spec:
serviceAccountName: airflow-k8sexecutor-serviceaccount
automountServiceAccountToken: true
containers:
# - command: ['airflow', 'users', 'create', '-u', 'airflow', '-p', 'airflow', '-e', 'test@test.com', '-f', 'test', '-l', 'test', '-r', 'Admin']
- name: airflow
args:
- webserver
- -p
- "8000"
env:
- name: AIRFLOW_HOME
value: '/opt/airflow'
- name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS
value: 'True'
- name: AIRFLOW__KUBERNETES__POD_TEMPLATE_FILE
value: '/opt/temp.yaml'
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: postgresql://postgres:password@airflow-db:5432/postgres
- name: AIRFLOW__CORE__EXECUTOR
value: KubernetesExecutor
- name: AIRFLOW__WEBSERVER__AUTHENTICATE
value: "True"
- name: AIRFLOW__WEBSERVER__AUTH_BACKEND
value: airflow.contrib.auth.backends.password_auth
- name: AIRFLOW__API__AUTH_BACKEND
value: airflow.api.auth.backend.basic_auth
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
value: 'kubegut/airflowk8sexec' # Supply your container worker image here, either selfbuilt with dags or apache/airflow
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
value: '0.0.1' # Your container worker image tag or 10.10.12 if using apache/airflow
- name: AIRFLOW__KUBERNETES__NAMESPACE
value: airflow-k8sexecutor
- name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
value: default
- name: AIRFLOW__KUBERNETES__IN_CLUSTER
value: 'true'
- name: AIRFLOW__KUBERNETES__DAGS_IN_IMAGE
value: 'true'
- name: AIRFLOW__KUBERNETES__RUN_AS_USER
value: '50000'
image: kubegut/airflowk8sexec:0.0.1 # Use your airflow docker image here, either selfbuilt with dags or apache/airflow:10.10.12
imagePullPolicy: Always
resources: {}
volumeMounts:
- mountPath: /opt/airflow/logs/
mountPropagation: None
name: airflow-logs
- args:
- scheduler
env:
- name: AIRFLOW_HOME
value: '/opt/airflow'
- name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS
value: 'True'
- name: AIRFLOW__KUBERNETES__POD_TEMPLATE_FILE
value: '/opt/temp.yaml'
- name: AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC
value: "15"
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: postgresql://postgres:password@airflow-db:5432/postgres
- name: AIRFLOW__CORE__EXECUTOR
value: KubernetesExecutor
- name: AIRFLOW__KUBERNETES__NAMESPACE
value: airflow-k8sexecutor
- name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
value: default
- name: AIRFLOW__KUBERNETES__IN_CLUSTER
value: 'true'
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
value: 'kubegut/airflowk8sexec' # Supply your container worker image here, either selfbuilt with dags or apache/airflow
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
value: '0.0.1' # Your container worker image tag or 10.10.12 if using apache/airflow
- name: AIRFLOW__KUBERNETES__DAGS_IN_IMAGE
value: 'true'
- name: AIRFLOW__KUBERNETES__RUN_AS_USER
value: '50000'
image: kubegut/airflowk8sexec:0.0.1 # Use your airflow docker image here, either selfbuilt with dags or apache/airflow:10.10.12
imagePullPolicy: Always
name: airflow-scheduler
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /opt/airflow/logs/
mountPropagation: None
name: airflow-logs
dnsPolicy: ClusterFirst
initContainers:
- command: ['airflow', 'db', 'init']
# - args:
# - initdb
env:
- name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS
value: 'True'
- name: AIRFLOW__CORE__REMOTE_LOGGING
value: 'False'
# - name: AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER
# value: 'gs://airflow-k8sexecutor-logs'
# - name: AIRFLOW__CORE__REMOTE_LOG_CONN_ID
# value: 'gcs-logs'
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: postgresql://postgres:password@airflow-db:5432/postgres
- name: AIRFLOW__CORE__EXECUTOR
value: KubernetesExecutor
- name: AIRFLOW__KUBERNETES__NAMESPACE
value: airflow-k8sexecutor
- name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
value: default
- name: AIRFLOW__KUBERNETES__IN_CLUSTER
value: 'true'
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
value: 'kubegut/airflowk8sexec' # Supply your container worker image here, either selfbuilt with dags or apache/airflow
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
value: '0.0.1' # Your container worker image tag or 10.10.12 if using apache/airflow
- name: AIRFLOW__KUBERNETES__DAGS_IN_IMAGE
value: 'true'
# Use your airflow docker image here, either self-built with dags or apache/airflow:10.10.12
image: kubegut/airflowk8sexec:0.0.1
imagePullPolicy: Always
name: airflow-initdb
restartPolicy: Always
schedulerName: default-scheduler
securityContext: {}
shareProcessNamespace: false
terminationGracePeriodSeconds: 600
volumes:
- emptyDir: {}
name: airflow-data
- emptyDir: {}
name: airflow-logs

temp.yaml

因為Airflow的KubernetesExecutor模式,需要為每個task提供基本的pod yaml範例,所以 temp.yaml 主要用來提供pod的基本內容

temp.yamlspec.containers.imagePullPolicy 需設定為IfNotPresent

否則task pod會一直不斷重啟,但在airflow web server上顯示success的奇怪現象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
apiVersion: v1
kind: Pod
metadata:
name: dummy-name
namespace: airflow-k8sexecutor
spec:
serviceAccountName: default
restartPolicy: Never
containers:
- name: base
image: kubegut/airflowk8sexec:0.0.1
imagePullPolicy: IfNotPresent
ports: []
command: []

客製的pod template

custom_template.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
apiVersion: v1
kind: Pod
metadata:
name: dummy-name
namespace: airflow-k8sexecutor
spec:
serviceAccountName: default
restartPolicy: Never
containers:
- name: base
image: kubegut/airflowk8sexec:0.0.1
imagePullPolicy: IfNotPresent
ports: []
command: []
env: # Add extra environment
- name: HELLOWORLD
value: 'hello world!'

Dockerfile

記得將客製的yaml檔案 ,放置到dag運行時會指定的目錄$AIRFLOW_HOME/pod_templates`

增加以下環境變數,提供給每個task啟動pod時候,都會共同吃到的環境變數

  • AIRFLOW__CORE__SQL_ALCHEMY_CONN
  • AIRFLOW__KUBERNETES__RUN_AS_USER

因為只有在airflow.yaml中定義環境變數,並沒辦法傳到Executor所運行的pod中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
FROM apache/airflow:2.1.0

# add dependencies for http basic auth
RUN pip install --user --upgrade apache-airflow[password]
RUN pip install --upgrade google-cloud-bigquery
RUN pip install Flask-Bcrypt

## provide task pod env variables
ENV AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql://postgres:password@airflow-db:5432/postgres
ENV AIRFLOW__KUBERNETES__RUN_AS_USER=50000

# add template yaml file into /opt
ADD temp.yaml /opt
## add your custom dags in current docker build folder
ADD dags /opt/airflow/dags

# use custom pod template
RUN mkdir /opt/airflow/pod_templates
ADD custom_template.yaml /opt/airflow/pod_templates

建立image並push到docker hub

1
2
docker build -t kubegut/airflowk8sexec:0.0.1 .
docker push kubegut/airflowk8sexec:0.0.1

其餘yaml和KubernetesPodOperator相同

請至 KuberenetesPodOperator版本中引用以下yaml
然後namespace改成 airflow-k8sexecutor,方便分類

  • airflow-db.yaml
  • airflow-db-svc.yaml
  • airflow-svc-internal.yaml
  • airflow-svc-external.yaml
  • namespace.yaml
  • serviceaccount.yaml
    • 將name改成 airflow-k8sexecutor-serviceaccount

訣竅:在vim底下,可以把上述複製過來的檔案,放到同一個目錄下,執行以下這串即可換成airflow-k8sexecutor這個namespace囉

1
sed -i 's/airflow-k8spodoperator/airflow-k8sexecutor/g' *.yaml

賦予airflow權限

記得再次授權,這裡的clusterrolebinding設置成airflow-admin-2,為k8sexecutor提供對應的角色名稱

1
2
3
kubectl create clusterrolebinding airflow-admin-2 \
--clusterrole=admin \
--serviceaccount=airflow-k8sexecutor:airflow-k8sexecutor-serviceaccount

進行部署

1
2
3
4
5
6
7
8
kubectl apply -f namespace.yaml
sleep 3
kubectl apply -f airflow-db-svc.yaml
kubectl apply -f airflow-db.yaml
kubectl apply -f airflow-svc-external.yaml
kubectl apply -f airflow-svc-internal.yaml
kubectl apply -f serviceaccount.yaml
kubectl apply -f airflow.yaml

img07

img08

創建Airflow的帳號

透過以下指令進入到airflow-scheduler這個pod內,產生user帳號

1
kubectl exec --tty --stdin -n airflow-k8sexecutor [your ariflow pod name] -c airflow-scheduler -- /bin/bash

新增一組帳號

1
2
airflow users create -u airflow -p airflow \
-e test@test.com -f test -l test -r Admin

運行Airflow內建範例: k8sexecutor_example

img17

img18

可以看到airflow executor模式底下,會為每個task啟動各自pod的環境

task-1 pods

pod name: k8sexecutorexampletask1.4931009824d044e0ae19b2b517a61628

img10

task-2 pods

pod name: k8sexecutorexampletask2.3d4c6da134794effb517e1c1cf500b00

img12

task_with_podtemplate

pod name: k8sexecutorexampletaskwithpodtemplate.ff69ada0315247f5b95dd8143eea8003

img19

問題:無法看到執行完畢的task的log…

由於task運行完畢後,pod也會跟著移除,所以log訊息也會跟著移除掉,導致我們再回到airflow UI查看時,會看到airflow找不到該pod的訊息

1
2
3
4
5
6
7
*** Trying to get logs (last 100 lines) from worker pod k8sexecutorexampletaskwithpodtemplate.ff69ada0315247f5b95dd8143 ***

*** Unable to fetch logs from worker pod k8sexecutorexampletaskwithpodtemplate.ff69ada0315247f5b95dd8143 ***
(404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Sun, 30 May 2021 10:01:36 GMT', 'Content-Length': '294'})
HTTP response body: b'{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"pods \\"k8sexecutorexampletaskwithpodtemplate.ff69ada0315247f5b95dd8143\\" not found","reason":"NotFound","details":{"name":"k8sexecutorexampletaskwithpodtemplate.ff69ada0315247f5b95dd8143","kind":"pods"},"code":404}\n'

這時候需要一個將log永久儲存的方案,也就是下一小節這章會提到的,如何將log儲存在Google Cloud Storage,並讀取之

還記得一開始說的為什麼一開始worker node使用不同資源嗎

因為接下來要觀察,使用客制化的pod template,k8s是否會將需求大於4GB記憶體給分配到足夠資源的worker node上

custom_template.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
apiVersion: v1
kind: Pod
metadata:
name: dummy-name
namespace: airflow-k8sexecutor
spec:
serviceAccountName: default
restartPolicy: Never
containers:
- name: base
image: kubegut/airflowk8sexec:0.0.1
imagePullPolicy: IfNotPresent
resources:
limits:
memory: "7Gi"
requests: # 這裡調整成需要6GB的記憶體,看會不會分配到 node1這台節點上運行
memory: "6Gi"
ports: []
command: []
env: # Add extra environment
- name: HELLOWORLD
value: 'hello world!'

重新再跑一次 k8sexecutor_example 這個dag

透過下面指令監聽使用客製pod template的task task_with_podtemplate會不會分配到具備8GB記憶體的k8s-node節點

1
watch kubectl get pod -o=custom-columns=NAME:.metadata.name,STATUS:.status.phase,NODE:.spec.nodeName --all-namespaces

可以看到使用客製pod template的task task_with_podtemplate是在具備8GB memory的k8s-node底下運行

img20

透過讓task使用自己客製的pod template,可以讓我們為每個task都訂製獨特的執行需求環境!

參考