히비스서커스의 블로그

[Airflow] Docker 환경에서 Airflow와 PostgreSQL 활용하기 본문

Programming/Python

[Airflow] Docker 환경에서 Airflow와 PostgreSQL 활용하기

HibisCircus 2023. 7. 27. 11:29
728x90

이번 포스팅에서는 Airflow를 docker 컨테이너에서 사용할 경우 PostgreSQL 컨테이너와 네트워크로 연결하는 방법을 정리하였다.

 

Airflow

 

왜 DB container로 PostgreSQL을 사용하려는가?

 

일단 Airflow에서 공식적으로 제공하는 docker-compose 파일에서 PostgreSQL을 DB container로 업로드 하도록 세팅되어 있다. 그 이유는 아마도 Airflow에서 병렬처리를 하는 Executor를 사용하려면  PostgreSQL를 DB로 하는 것이 유리하기 때문인 것으로 보인다. (만약 SQLite를 DB로 사용하게 될 경우 Executor가 아닌 Sequential Executor를 사용하여야 해서 병렬이 아닌 순차적를 할 수 밖에 없다고 한다.)

 

 

Docker환경에서 Airflow 사용 시 PostgreSQL를 활용하기 위한 설정

 

docker-compose 파일로 airflow를 띄울 경우 생성되는 컨테이너는 아래와 같다.

 

(docker-compose파일이 존재하는 디렉토리명)-airflow-worker_1
(docker-compose파일이 존재하는 디렉토리명)-airflow-scheduler_1
(docker-compose파일이 존재하는 디렉토리명)-airflow-webserver_1
(docker-compose파일이 존재하는 디렉토리명)-postgres_1
(docker-compose파일이 존재하는 디렉토리명)-redis_1

 

(docker-compose파일이 존재하는 디렉토리명) 은 편의상 airflow로 가정하고 정리하였다.

 

1) docker-compose.yaml 파일에의 설정

 

1-1) docker 컨테이너 간 네트워크 연결 설정하기

 

Airflow 관련 컨테이너와 PostgreSQL 컨테이너는 격리된 공간을 할당받기 때문에 서로 통신하기 어렵다. 그 이유는 도커 컨테이너가 기본적으로 다른 컨테이너들과 격리된 환경에서 돌아가도록 설정되어 있기 때문이다. 따라서, 이들이 하나의 네트워크에 연결해주도록 직접 설정을 해주어야 한다. 

 

먼저, local에서 도커 네트워크를 생성해준다.

 

docker network create test-net

 

docker compose로 컨테이너 운영 시 Airflow 관련 컨테이너와 PostgreSQL의 컨테이너 간의 network를 공유하기 위해 docker-compose.yaml파일에 아래의 부분도 추가해준다.

 

 

version: '3'
x-airflow-common:

(중략)

  networks: 
    - db_net
    
    
services:
  postgres:
  
(중략)

    networks: 
      - db_net
      
 
(중략)


networks:
  db_net:
    external: true
    name: test-net

 

1-2) docker 컨테이너들의 볼륨을 local과 마운트하기

 

docker 컨테이너들은 local의 데이터들을 받기 위해서 mount를 해주어야 하는데 docker-compose.yaml 파일에서 Airflow 관련 컨테이너들은 이미 설정이 되어있다. 마지막에 간단한 테스트로 해볼 것이 PostgreSQL 컨테이너에서 csv 파일을 읽어야 하므로 PostgreSQL 컨테이너에서 local의 데이터를 mount 해준다.

 

 

(생략)

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
      - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
      - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
      - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always
    networks: 
      - db_net
      
(생략)

 

2) 이외 Docker환경에서 Airflow 사용 시 PostgreSQL를 활용하기 위한 설정

 

docker-compose파일은 PostgreSQL 컨테이너를 띄우도록 되어있으나 airflow.cfg의 파일 설정은 SQLite을 위한 설정으로 되어 있기 때문에 PostgreSQL을 위한 설정으로 바꿔주어야 한다. 먼저, docker-compose를 통해 컨테이너를 먼저 띄워준다. (앞선 WandB 편에서 설명했기에 접은 글로 표현하였다.)

 

더보기

docker-compose.yaml 파일이 있는 위치에 여러 디렉토리들을 생성해주고 권한을 airflow 유저에게 할당한다.

 

mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env

 

아래의 명령어로 docker-compose 파일로 airflow와 관련된 컨테이너들을 띄워준다.

 

docker-compose up airflow-init
docker-compose up -d

 

 

 

1) airflow.cfg 파일에의 설정

 

이후 airflow-airflow-scheduler에 root 계정으로 접속 후

 

docker exec -it -u root airflow-airflow-scheduler bash

 

airflow.cfg 파일을 아래와 같이 수정한다.

 

[core]

executor = SequentialExecutor  # (이전) 30 번째 줄에 존재
executor = LocalExecutor  # (이후)

[database]

sql_alchemy_conn = sqlite:////opt/airflow/airflow.db  # (이전) 230 번째 줄에 존재
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost/airflow  # (이후)

 

2) 컨테이너에서의 설정

 

그리고  리눅스 환경에서 PostgreSQL 애플리케이션을 설정하기 위한 라이브러리를 다운받는다.

 

apt-get -y install gcc
apt-get -y install libpq-dev python3-dev

 

다음으로 이전 docker 컨테이너에서 exit 명령어로 빠져나와 airflow-scheduler 컨테이너에 airflow 계정으로 접속 후 DB를 초기화 시켜준 후 파이썬 환경에서 PostgreSQL 애플리케이션을 설정하기 위한 라이브러리를 다운받는다.

 

docker exec -it airflow-airflow-scheduler bash

airflow db init
pip install psycopg2

 

이제 postgresql container에 접속하여 PostgreSQL cli로 명령할 수 있다.

 

docker exec -it airflow-postgres-1 bash

psql -h localhost -p 5432 -U airflow -d airflow
(이후 PostgreSQL 명령어 사용가능)

 

간단한 테스트

 

이번에 테스트 해볼 것은 간단하게 아래의 csv파일을 PostgreSQL에 업로드하는 DAG를 구성하는 것이다. 아래의 파일들을 다운받아 간단하게 dag 파일 안에 옮겨준다.

 

test.sql
0.00MB
airtravel.csv
0.00MB

 

그 후 파일 dag 안에 다음과 같은 파일을 test.py 파일을 만든다.

 

from airflow                          import DAG
from airflow.operators.python         import PythonOperator
from airflow.hooks.postgres_hook      import PostgresHook

from datetime                         import datetime
import random


def get_postgres_connection():
    hook = PostgresHook(postgres_conn_id='postgres_dev_db')
    conn = hook.get_conn()
    conn.autocommit = False
    return conn.cursor()

def extract_transform(**context):
    data = './airtravel.csv'
    return data

def load(**context):
    cur = get_postgres_connection()
    sql = open('/opt/airflow/dags/test.sql').read()

    data = context("task_instance").xcom_pull(key="return_value", task_ids="extract")
    sql += f"""COPY test.test_info FROM '{data}' DELIMITER ',' CSV HEADER;"""

    try:
        cur.execute(sql)
        cur.execute("COMMIT;")
    except Exception as e:
        cur.execute("ROLLBACK;")
        raise


test_post = DAG(
    dag_id = f'test_post',
    start_date = datetime(2022,6,11),
    catchup = False,
    tags = ['hello', 'wandb'],
    schedule = '@once',
)

extract_transform = PythonOperator(
    task_id         = 'extract_transform',
    python_callable = extract_transform,
    dag             = test_post
)

load = PythonOperator(
    task_id         = 'load',
    python_callable = load,
    dag             = test_post
)

extract_transform >> load

 

인터넷 브라우저에 127.0.0.1:8080을 입력한 후 ID/PW에 airflow를 입력 후 admin / connection 에서 아래의 이미지와 같이 설정해준다.

 

 

 

그 후 dag를 실행시켜 보면 돌아가는 것을 확인할 수 있다.

728x90