약 8개월 전 회사에서 개발해 사용했던 airflow를 클라우드로 옮기면서, 2.0 업데이트를 하고자 마음먹었고 필연적으로 코드 리팩토링에 대한 필요성을 느껴 리팩토링도 함께 진행중입니다.
코드 : https://github.com/eprj453/airflow-factory
DAGFactory
airflow Task는 Operator 클래스로 만듭니다. 하나의 Operator 인스턴스는 하나의 Task가 되고, 1개 이상의 Task가 서로 의존성을 가지며 생성되는 DAG가 하나의 ETL Job을 이루게 됩니다.
- Task로 구성되는 Job은 DAG 클래스의 인스턴스다.
- 이런 구조를 가지는 DAG 인스턴스가 매우 많다.
이 2가지를 생각했을 때 가장 먼저 생각난 것이 Factory 클래스입니다. Task 구성이 항상 일정한 패턴이라면 특정 Factory 클래스 몇가지를 만들어 돌려가면서 사용하겠지만, Operator의 종류나 순서가 Job마다 다 다르기 때문에 DAGFactory라는 하나의 Factory 클래스를 만들었습니다.
여러 형태의 Factory 클래스를 만들거나 인스턴스를 만들 것도 아니기 때문에 class method를 두어 인스턴스를 생성하지 않아도 DAG를 만들 수 있도록 했습니다. 상속도 하지 않을 예정이기 때문에 static method로 만들어도 큰 상관은 없겠지만 인스턴스를 생성하지 않고 메서드를 사용하는 방식은 class method가 좀 더 직관적이고 세련되어 보이기 때문에 class method를 사용했습니다.
Class 구성
처음에는 DAG 만드는 메서드 하나만 있는 간단한 클래스를 생각했었는데, 마주친 문제는 크게 2개였습니다.
- 메서드가 그대로 외부에 노출된다.
- DAG 인스턴스를 만들때 Task도 함께 구성해줘야 한다.
첫번째 고민은 간단합니다. 내부에서 기능을 할 private method와 그 메서드를 호출할 public 메서드로 구분만 해주면 됩니다.
- private :
def _create_dag()
- public :
def generate_dag()
아주 해피합니다.
두번째는 좀 피곤합니다. DAG 인스턴스를 만들 때 Task도 같이 구성해야 합니다. 정확히는 Task를 만들 때 DAG 인스턴스를 인자로 던져줘야 하기 때문에 DAGFactory는 DAG 인스턴스를 만든다 -> Task를 만든다 -> DAG 인스턴스를 반환한다 라는 일련의 기능을 해야합니다. 이에 _add_tasks_to_dags
메서드를 class method로 추가했습니다.
최종 구조는 이렇습니다.
1 |
|
- DAG 생성(
_create_dag
) - DAG에 Task 할당 (
_add_tasks_to_dag
) - 위 2개 메서드를 실행해 DAG를 만드는 public 메서드 (
_generate_dag
)
private 메서드들은 내부 동작이고, 외부에서는 generate_dag()
메서드를 호출해 사용합니다.
인자로 static하게 받아오는 정보들은 버전이 바뀌더라도 받아와야 하는 DAG의 본질적인 기능입니다. airflow의 Job을 Task를 받아서 DAG로 만들어야 합니다. 그렇다면 dag_id, tasks, schedule 등은 airflow에 어떤 버전 업그레이드가 일어나더라도 변하지 않는 구성요소라고 생각하는 것들은 고정 인자로 받았습니다.
바뀔 것 같은 요소들은 default_args 딕셔너리로 가변적으로 받을 수 있도록 했습니다.
TaskFactory
DAGFactory의 generate_dag()
메서드는 Task를 인자로 받습니다. 그림으로 그려본다면 이런 형태의 구조입니다.
여기서 문제는, DAG의 구성요소인 Task에서도 DAG와 같은 고민을 하게 된다는 것입니다.
-
Task로 구성되는 Job은 DAG 클래스의 인스턴스다.
-> Task의 구성요소는 Operator 클래스의 인스턴스다.
-
이런 구조를 가지는 DAG 인스턴스가 매우 많다.
-> Operator로 이루어진 Task가 DAG에 많이 있을 수 있다.
DAGFactory의 _add_task_to_dag()
메서드를 보면 알겠지만, Task 또한 하나의 Factory 클래스의 class method를 거쳐 만들어지도록 했습니다.
단, DAG와 달리 Task는 그 구성요소인 Operator의 종류가 여러가지이기 때문에 Enum 형태를 포함한 추가 구현이 들어갔습니다. 다음 장에서 TaskFactory를 포함한 전체 코드 구조를 포스팅하겠습니다 :)