Post

Airflow(에어플로우) 시작하기

이 글은, 프로그래머스에서 진행되는 실리콘밸리에서 날아온 데이터 엔지니어링 키트 with Python에서 배운 내용을 바탕으로 이루어져 있습니다.

Apache Airflow란?

Airflow는 AriBnB에서 만든 workflow management tool입니다. workflow는 일련의 작업 흐름으로, 예를 들어 ETL처럼 데이터를 Extract -> Transform -> Load 의 흐름을 갖는 workflow가 있습니다. 이런 workflow를 관리하는 툴이 바로 airflow입니다. 여기서 관리라는 것은 worklfow를 작성, 스케줄링, 모니터링하는 작업을 말합니다.

Airflow는 특징이 되는 컴포넌트들이 있으며 각 component들 간의 아키텍쳐는 아래와 같습니다.
image

사진 출처 : airflow.apache.org

Airflow는 크게 4가지 구성요소를 갖습니다.

  1. Webserver
  2. Scheduler
  3. Executor
  4. Worker

Airflow Webserver

Airflow Webserver는 Airflow의 log를 보여주거나 Scheduler에 의해 생성된 DAG 목록, Task 상태 들을 UI를 통해 사용자에게 시각적으로 정보를 제공해줍니다.

Airflow Scheduler

Airflow Scheduler는 Airflow로 할당된 work들을 스케줄링 해주는 component입니다. Scheduled된 worklow들의 실행과 triggering을 위해 executor에게 task를 제공하는 역할을 합니다.

Airflow Executor

Airflow Executor는 worker들에게 작업을 실행시키는 역할을 합니다. Default로 순차적으로 역할을 진행하는 SequentialExecutor로 설정되는데 거의 사용되지 않습니다. 다른 Executor로 LocalExecutor, CeleryExecutor, KubernetesExecutor 등이 있습니다.

Airflow Worker

실제 task를 실행하는 주체자라고 보시면 됩니다.


추가적으로 Database, Queue라는 구성요소도 있는데 Queue는 멀티노드로 구성되는 경우에만 사용됩니다. 그리고 이 경우 Executor로 CeleryExecutor, KubernetesExecutor 같은걸 사용해아합니다.
Worker의 수보다 실행되어야 하는 task의 수가 많을 수 있기 때문에 Scheduler가 Queue에 작업들을 넣어두고 Worker가 하던 작업이 끝나면 Queue를 보고 다음 작업을 합니다.

Airflow DAG

Airflow에서 ETL은 DAG라고 부릅니다. DAG는 Directed Acyclic Graph의 줄임말로 DAG의 task는 E, T, L로 구성되는데 ETL순으로 앞의 task가 이루어져야 뒤의 task를 할 수 있기 때문에 Directed(방향이 있는)합니다. 따라서, 이 task의 흐름을 Graph로 그려보면 방향이 있는 세 개의 노드 Graph가 그려집니다. Acyclic은 루프가 있으면 안된다는 뜻으로, Airflow의 DAG는 한번 실행되면 끝, 끝까지가면 끝이지 거기서 다시 앞으로 돌아와서 루프를 돌며 task가 계속해서 돌아가는 형태로는 사용할 수 없습니다.

  • DAG는 task로 구성됩니다.
    • ETL을 3개의 task로 구성한다면 Extract, Transfomr, Load로 구성
  • Task는 Airflow Operator로 만들어집니다.
    • Airflow에서 이미 다양한 종류의 operator를 제공합니다.
    • 경우에 맞게 operator를 결정하거나 필요하다면 직접 개발해 사용합니다.
      • example : Redshift writing, Postgres query, S3 Read/Write, Hive query, Spark job, shell script

Airflow 구조

Airflow를 운영할 때, 처음엔 서버 한대에 모든 구성요소를 다 담아서 운영하지만 DAG의 수가 늘어나면 Worker의 수를 늘리는 형태로, 다수의 서버로 구성되는 형태로 scale됩니다.

서버 한대의 경우

https://user-images.githubusercontent.com/105477856/231068734-17ade6ff-cdb5-4255-a9c4-5aa820e50950.png

Worker의 수는 CPU의 수만큼 존재한다고 보시면 됩니다.

서버 한대로 쓰다가 운영하기에 너무 벅차면 스케일링을 합니다.

Airflow 스케일링 방법

  • 스케일 업 (더 좋은 사양의 서버 사용)
  • 스케일 아웃 (서버 추가)

https://user-images.githubusercontent.com/105477856/231068860-27732605-8465-463b-b8f4-8607fce8d933.png

Airflow 구조 : 다수 서버

https://user-images.githubusercontent.com/105477856/231068951-7aa00ca3-1751-4ece-919b-a609231576ad.png

DAG 구성

DAG는 다양한 구성을 갖을 수 있습니다. 순차적으로 실행될 수도 있고, 하나의 Task가 끝나면 두 개의 Task가 동시에 실행되는 구성을 갖을 수도 있습니다.

예시 1

https://user-images.githubusercontent.com/105477856/231069280-f1b984cf-fa9e-43f0-907e-fffbd8ee8df3.png

  • 3개의 Task로 구성된 DAG
  • 먼저 t1이 실행되고 t2, t3의 순으로 일렬로 실행

예시 2

https://user-images.githubusercontent.com/105477856/231069409-24fe08b9-8474-4427-a8bd-66751fabba7a.png

  • 3개의 Task로 구성된 DAG
  • 먼저 t1이 실행되고 여기서 t2와 t3로 분기

모든 Task에 필요한 기본정보

DAG를 만들 때 그 DAG에 속한 모든 Task들에 기본으로 지정해주는 속성들이 있습니다. 그걸 default_args 라고 부르고 Python dictionary입니다. Keyword의 명칭에 따라 역할을 갖기 때문에 읽어보시면 이해가 되실겁니다.

1
2
3
4
5
6
7
8
default_args = {
  'owner': 'jewoo',
  'start_date': datetime(2020, 8, 7, hour=0, minute=00),
  'end_date': datetime(2020, 8, 31, hour=23, minute=00),
  'email': ['jewoo@naver.com'],
  'retries': 1,
  'retry_delay': timedelta(minutes=3),
}

DAG를 만들기 위해 이름과 schedule을 어떻게 지정할 것인지, 태그 내용, default_args를 넣어줍니다.

1
2
3
4
5
6
test_dag = DAG(
  "dag_v1", # DAG name
  schedule="0 9 * * *", 
  tags=['test']
  default_args=default_args 
)

위의 schedule은 크론탭 문법을 따릅니다.
image

Operator 생성 예시

예시 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
t1 = BashOperator(
  task_id='print_date',
  bash_command='date',
  dag=test_dag)
t2 = BashOperator(
  task_id='sleep',
  bash_command='sleep 5',
  retries=3,
  dag=test_dag)
t3 = BashOperator(
  task_id='ls',
  bash_command='ls /tmp',
  dag=test_dag)

t1 >> t2
t1 >> t3
# = t1 >> [t2, 3]

BashOperator는 Shell command를 실행시켜주는 Operator입니다. 예시는 bash_command 에 입력된 date(지금의 날짜를 출력), sleep 5(5초 동안 정지), ls /tmp(tmp 폴더에 어떤 파일, 폴더가 있는지 보여줌)을 실행시켜주는 것으로 보시면 됩니다.

Operator 코드 아래의 t1 >> t2 은 실행 순서를 지정합니다. 그걸 그래프로 그리면 이렇습니다.
image

예시 2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
start = DummyOperator(dag=dag, task_id="start", *args, **kwargs)
t1 = BashOperator(
  task_id='ls1',
  bash_command='ls /tmp/downloaded',
  retries=3,
  dag=dag)
t2 = BashOperator(
  task_id='ls2',
  bash_command='ls /tmp/downloaded',
  dag=dag)
end = DummyOperator(dag=dag, task_id='end', *args, **kwargs)

start >> t1 >> end
start >> t2 >> end           # start >> [t1, t2] >> end 과 같습니다.

DummyOperator는 말 그대로 아무것도 안하는 Task입니다. 이 operator는 다양한 숫자의 Task가 DAG안에 있으면 시작과 끝을 나타내기 위한 용도로 많이 쓰입니다.

Operator 코드 아래의 start >> [t1, t2] >> end 은 실행 순서를 지정합니다. 그걸 그래프로 그리면 이렇습니다.
image

참고 자료

  1. https://lsjsj92.tistory.com/631
  2. https://magpienote.tistory.com/225
This post is licensed under CC BY 4.0 by the author.