[Airflow] 코드 리팩토링 1. DAGFactory

약 8개월 전 회사에서 개발해 사용했던 airflow를 클라우드로 옮기면서, 2.0 업데이트를 하고자 마음먹었고 필연적으로 코드 리팩토링에 대한 필요성을 느껴 리팩토링도 함께 진행중입니다.

코드 : https://github.com/eprj453/airflow-factory


DAGFactory

airflow Task는 Operator 클래스로 만듭니다. 하나의 Operator 인스턴스는 하나의 Task가 되고, 1개 이상의 Task가 서로 의존성을 가지며 생성되는 DAG가 하나의 ETL Job을 이루게 됩니다.

  • Task로 구성되는 Job은 DAG 클래스의 인스턴스다.
  • 이런 구조를 가지는 DAG 인스턴스가 매우 많다.

이 2가지를 생각했을 때 가장 먼저 생각난 것이 Factory 클래스입니다. Task 구성이 항상 일정한 패턴이라면 특정 Factory 클래스 몇가지를 만들어 돌려가면서 사용하겠지만, Operator의 종류나 순서가 Job마다 다 다르기 때문에 DAGFactory라는 하나의 Factory 클래스를 만들었습니다.

여러 형태의 Factory 클래스를 만들거나 인스턴스를 만들 것도 아니기 때문에 class method를 두어 인스턴스를 생성하지 않아도 DAG를 만들 수 있도록 했습니다. 상속도 하지 않을 예정이기 때문에 static method로 만들어도 큰 상관은 없겠지만 인스턴스를 생성하지 않고 메서드를 사용하는 방식은 class method가 좀 더 직관적이고 세련되어 보이기 때문에 class method를 사용했습니다.


Class 구성

처음에는 DAG 만드는 메서드 하나만 있는 간단한 클래스를 생각했었는데, 마주친 문제는 크게 2개였습니다.

  • 메서드가 그대로 외부에 노출된다.
  • DAG 인스턴스를 만들때 Task도 함께 구성해줘야 한다.

첫번째 고민은 간단합니다. 내부에서 기능을 할 private method와 그 메서드를 호출할 public 메서드로 구분만 해주면 됩니다.

  • private : def _create_dag()
  • public : def generate_dag()

아주 해피합니다.

두번째는 좀 피곤합니다. DAG 인스턴스를 만들 때 Task도 같이 구성해야 합니다. 정확히는 Task를 만들 때 DAG 인스턴스를 인자로 던져줘야 하기 때문에 DAGFactory는 DAG 인스턴스를 만든다 -> Task를 만든다 -> DAG 인스턴스를 반환한다 라는 일련의 기능을 해야합니다. 이에 _add_tasks_to_dags 메서드를 class method로 추가했습니다.

최종 구조는 이렇습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from datetime import datetime, timedelta
from typing import Any, Dict, Union, List

import logging

from airflow import DAG

# custom module import
....
....


class DAGFactory:

    @classmethod
    def _create_dag(cls,
                   dag_id: str,
                   default_args: Dict[str, Any],
                   cron_schedule: str,
                   catchup: int = False,  # True 설정시 start_date가 과거더라도 현재 시간부터 job 실행
                   concurrency: int = 10,
                   ) -> DAG:
        essential_default_args = {'start_date', 'retry'}

        if essential_default_args < set(default_args.keys()):
            logging.warning(f"""
            default_args 필수 인자가 없습니다. default 값으로 생성됩니다.
            {essential_default_args.difference(set(default_args.keys()))}
            """)

        DEFAULT_ARGS = {
            'owner': 'datadev-parksw2',
            'depends_on_past': True,  # 이전 task가 성공해야만 다음 task가 실행된다.
            'start_date': datetime(2021, 1, 1, 9, 0, 0),
            'email_on_failure': False,
            'email_on_retry': False,
            'retry': 1,
            'retry_delay': timedelta(minutes=5),
        }

        DEFAULT_ARGS.update(default_args)

        DAG_ARGS = {
            'default_args': DEFAULT_ARGS,
            'schedule_interval': cron_schedule,
            'catchup': catchup,
            'concurrency': concurrency
        }

        dag = DAG(dag_id=dag_id, **DAG_ARGS)
        return dag

    @classmethod
    def _add_tasks_to_dag(cls, dag: DAG, tasks: List[AirflowTask]) -> DAG:
        # task generate in here
        dependency_tasks = []
        n = len(dependency_tasks)
        for task in tasks:
            t = TaskFactory.generate_task(dag=dag, airflow_tasks=task)

            if dependency_tasks:
                upstream_task = dependency_tasks[-1]
                upstream_task.set_downstream(t)
            dependency_tasks.append(t)

        return dag

    @classmethod
    def generate_dag(cls,
                     dag_id: str,
                     default_args: Dict[str, Any],
                     cron_schedule: str,
                     tasks: Union[List[AirflowTask]],
                     catchup: bool = False,
                     concurrency: int = 10,
                     ) -> DAG:

        dag = cls._create_dag(
            dag_id=dag_id,
            default_args=default_args,
            cron_schedule=cron_schedule,
            catchup=catchup,
            concurrency=concurrency
        )
        dag = cls._add_tasks_to_dag(dag=dag, tasks=tasks)
        return dag

  • DAG 생성(_create_dag)
  • DAG에 Task 할당 (_add_tasks_to_dag)
  • 위 2개 메서드를 실행해 DAG를 만드는 public 메서드 (_generate_dag)

private 메서드들은 내부 동작이고, 외부에서는 generate_dag() 메서드를 호출해 사용합니다.

인자로 static하게 받아오는 정보들은 버전이 바뀌더라도 받아와야 하는 DAG의 본질적인 기능입니다. airflow의 Job을 Task를 받아서 DAG로 만들어야 합니다. 그렇다면 dag_id, tasks, schedule 등은 airflow에 어떤 버전 업그레이드가 일어나더라도 변하지 않는 구성요소라고 생각하는 것들은 고정 인자로 받았습니다.

바뀔 것 같은 요소들은 default_args 딕셔너리로 가변적으로 받을 수 있도록 했습니다.


TaskFactory

DAGFactory의 generate_dag() 메서드는 Task를 인자로 받습니다. 그림으로 그려본다면 이런 형태의 구조입니다.

image

여기서 문제는, DAG의 구성요소인 Task에서도 DAG와 같은 고민을 하게 된다는 것입니다.

  • Task로 구성되는 Job은 DAG 클래스의 인스턴스다.

    -> Task의 구성요소는 Operator 클래스의 인스턴스다.

  • 이런 구조를 가지는 DAG 인스턴스가 매우 많다.

    -> Operator로 이루어진 Task가 DAG에 많이 있을 수 있다.

DAGFactory의 _add_task_to_dag() 메서드를 보면 알겠지만, Task 또한 하나의 Factory 클래스의 class method를 거쳐 만들어지도록 했습니다.

단, DAG와 달리 Task는 그 구성요소인 Operator의 종류가 여러가지이기 때문에 Enum 형태를 포함한 추가 구현이 들어갔습니다. 다음 장에서 TaskFactory를 포함한 전체 코드 구조를 포스팅하겠습니다 :)