[airflow] airflow 테스트 코드 작성하기

저는 현재 데이터 엔지니어링 파트에서 혼자 일하고 있습니다. 저의 그 동안 목표는 “어떻게든 돌아가게끔 구축하자!”였습니다.

회사에서 일을 시작한지 9개월이 지났고, fastapi와 kinesis를 이용한 실시간 로그 수집 아키텍처도 어느정도 구현이 되었기 때문에 그동안 airflow에 하고 싶었던 2가지를 추진하려고 합니다.

  • airflow 테스트 코드 작성
  • 스테이징 환경 구성
  • KubenetesExecutor, KubenetesPodOperator 도입

그 중 첫 번째인 테스트 코드 작성을 시작했고, 그 내용을 기록합니다. Data’s Inferno: 7 Circles of Data Testing Hell with Airflow 포스팅과 Data pipelines with Apache Airflow를 참고했습니다(참고라고는 하지만… 거의 똑같습니다).


Airflow 테스트 코드

테스트 코드가 없으면 매우 불편합니다. 클라우드 환경이 아닌 온프레미스 환경에서 airflow를 운영하고 있는 저같은 경우에는 더더욱 불편합니다. 그렇기 때문에 테스트 코드가 매우매우 필요합니다.

Data’s Inferno: 7 Circles of Data Testing Hell with Airflow에서는 7단계로 airflow 검증 코드를 정의했습니다.

  1. DAG Integrity Tests: DAG 무결성 테스트
  2. Split your ingestion from your deployment: 배포 로직과 데이터 처리 로직이 분리되어 있는지?
  3. Data Tests: 결과 데이터가 예상대로 잘 나오는지?
  4. Alerting: 오류 발생시 슬렉 알람
  5. Git Enforcing: 항상 git repository 최신 버전의 코드가 돌아가고 있는지?
  6. Mock Pipeline Tests: mock 객체를 이용한 테스트 코드
  7. DTAP: 데이터를 4단계로 분리
    • Development : 개발 단계
    • Test : 무결성 체크를 위한 신뢰성 있는 sample 데이터
    • Acceptance : 퍼포먼스 테스트를 위한 Production의 카피본
    • Production : 릴리즈를 위한 실 배포

이 7 layer를 따라가되, 필요한 부분만 가져다 쓰겠습니다. 테스트는 pytest, pytest-env 라이브러리를 사용합니다.


사전 고려사항

  1. 테스트용 python 파일은 프로젝트 최상단/tests 폴더 내부에 작성했습니다.

  2. 테스트용 python 파일은 prefix나 suffix로 test_ / _test를 가집니다.

  3. 테스트용 폴더인 tests 내부에는 __init__.py 가 없습니다. 테스트는 모듈이 아니기도 하고 독립적으로 실행되어야 하기 때문에 다른 airflow 코드를 Import하지도 않고 airflow 코드에서 테스트 코드를 import하는 일 또한 없어야 합니다.

DAG Integrity Tests

무결성 테스트코드 test_dag_integrity.py 입니다.

import glob
import importlib.util
import os
import pytest
from airflow.models import DAG

DAG_PATH = os.path.join(
    os.path.dirname(__file__), "..", "..", "dags/**/*.py"
)

DAG_FILES = glob.glob(DAG_PATH, recursive=True)


@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
    module_name, _ = os.path.splitext(dag_file)
    module_path = os.path.join(DAG_PATH, dag_file)
    mod_spec = importlib.util.spec_from_file_location(module_name, module_path)
    module = importlib.util.module_from_spec(mod_spec)
    mod_spec.loader.exec_module(module)
    dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)]

    assert dag_objects

    for dag in dag_objects:
        dag.test_cycle()

pytest를 가지고 테스트코드를 작성해보신 분들은 익숙하시겠지만 처음이라면 구조가 익숙하지 않을 수 있습니다. 차례대로 살펴봅시다.

DAG_PATH = os.path.join(
    os.path.dirname(__file__), "..", "..", "dags/**/*.py"
)

os.path.dirname(__file__) 메서드는 test_dag_integrity.py의 현재 위치를 반환합니다.

image

그 외에도 assert dag_object