일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- docker attach
- 티스토리챌린지
- logistic regression
- 히비스서커스
- Decision Boundary
- WSSS
- AIFFEL
- ssh
- aiffel exploration
- vscode
- HookNet
- docker
- 오블완
- 프로그래머스
- Jupyter notebook
- 사회조사분석사2급
- 기초확률론
- IVI
- Multi-Resolution Networks for Semantic Segmentation in Whole Slide Images
- docker exec
- GIT
- 도커
- cocre
- Pull Request
- numpy
- CellPin
- cs231n
- 코크리
- airflow
- 백신후원
- Today
- Total
히비스서커스의 블로그
[Airflow] Docker 환경에서 Airflow와 PostgreSQL 활용하기 본문
이번 포스팅에서는 Airflow를 docker 컨테이너에서 사용할 경우 PostgreSQL 컨테이너와 네트워크로 연결하는 방법을 정리하였다.
왜 DB container로 PostgreSQL을 사용하려는가?
일단 Airflow에서 공식적으로 제공하는 docker-compose 파일에서 PostgreSQL을 DB container로 업로드 하도록 세팅되어 있다. 그 이유는 아마도 Airflow에서 병렬처리를 하는 Executor를 사용하려면 PostgreSQL를 DB로 하는 것이 유리하기 때문인 것으로 보인다. (만약 SQLite를 DB로 사용하게 될 경우 Executor가 아닌 Sequential Executor를 사용하여야 해서 병렬이 아닌 순차적를 할 수 밖에 없다고 한다.)
Docker환경에서 Airflow 사용 시 PostgreSQL를 활용하기 위한 설정
docker-compose 파일로 airflow를 띄울 경우 생성되는 컨테이너는 아래와 같다.
(docker-compose파일이 존재하는 디렉토리명)-airflow-worker_1
(docker-compose파일이 존재하는 디렉토리명)-airflow-scheduler_1
(docker-compose파일이 존재하는 디렉토리명)-airflow-webserver_1
(docker-compose파일이 존재하는 디렉토리명)-postgres_1
(docker-compose파일이 존재하는 디렉토리명)-redis_1
(docker-compose파일이 존재하는 디렉토리명) 은 편의상 airflow로 가정하고 정리하였다.
1) docker-compose.yaml 파일에의 설정
1-1) docker 컨테이너 간 네트워크 연결 설정하기
Airflow 관련 컨테이너와 PostgreSQL 컨테이너는 격리된 공간을 할당받기 때문에 서로 통신하기 어렵다. 그 이유는 도커 컨테이너가 기본적으로 다른 컨테이너들과 격리된 환경에서 돌아가도록 설정되어 있기 때문이다. 따라서, 이들이 하나의 네트워크에 연결해주도록 직접 설정을 해주어야 한다.
먼저, local에서 도커 네트워크를 생성해준다.
docker network create test-net
docker compose로 컨테이너 운영 시 Airflow 관련 컨테이너와 PostgreSQL의 컨테이너 간의 network를 공유하기 위해 docker-compose.yaml파일에 아래의 부분도 추가해준다.
version: '3'
x-airflow-common:
(중략)
networks:
- db_net
services:
postgres:
(중략)
networks:
- db_net
(중략)
networks:
db_net:
external: true
name: test-net
1-2) docker 컨테이너들의 볼륨을 local과 마운트하기
docker 컨테이너들은 local의 데이터들을 받기 위해서 mount를 해주어야 하는데 docker-compose.yaml 파일에서 Airflow 관련 컨테이너들은 이미 설정이 되어있다. 마지막에 간단한 테스트로 해볼 것이 PostgreSQL 컨테이너에서 csv 파일을 읽어야 하므로 PostgreSQL 컨테이너에서 local의 데이터를 mount 해준다.
(생략)
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
networks:
- db_net
(생략)
2) 이외 Docker환경에서 Airflow 사용 시 PostgreSQL를 활용하기 위한 설정
docker-compose파일은 PostgreSQL 컨테이너를 띄우도록 되어있으나 airflow.cfg의 파일 설정은 SQLite을 위한 설정으로 되어 있기 때문에 PostgreSQL을 위한 설정으로 바꿔주어야 한다. 먼저, docker-compose를 통해 컨테이너를 먼저 띄워준다. (앞선 WandB 편에서 설명했기에 접은 글로 표현하였다.)
docker-compose.yaml 파일이 있는 위치에 여러 디렉토리들을 생성해주고 권한을 airflow 유저에게 할당한다.
mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env
아래의 명령어로 docker-compose 파일로 airflow와 관련된 컨테이너들을 띄워준다.
docker-compose up airflow-init
docker-compose up -d
1) airflow.cfg 파일에의 설정
이후 airflow-airflow-scheduler에 root 계정으로 접속 후
docker exec -it -u root airflow-airflow-scheduler bash
airflow.cfg 파일을 아래와 같이 수정한다.
[core]
executor = SequentialExecutor # (이전) 30 번째 줄에 존재
executor = LocalExecutor # (이후)
[database]
sql_alchemy_conn = sqlite:////opt/airflow/airflow.db # (이전) 230 번째 줄에 존재
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost/airflow # (이후)
2) 컨테이너에서의 설정
그리고 리눅스 환경에서 PostgreSQL 애플리케이션을 설정하기 위한 라이브러리를 다운받는다.
apt-get -y install gcc
apt-get -y install libpq-dev python3-dev
다음으로 이전 docker 컨테이너에서 exit 명령어로 빠져나와 airflow-scheduler 컨테이너에 airflow 계정으로 접속 후 DB를 초기화 시켜준 후 파이썬 환경에서 PostgreSQL 애플리케이션을 설정하기 위한 라이브러리를 다운받는다.
docker exec -it airflow-airflow-scheduler bash
airflow db init
pip install psycopg2
이제 postgresql container에 접속하여 PostgreSQL cli로 명령할 수 있다.
docker exec -it airflow-postgres-1 bash
psql -h localhost -p 5432 -U airflow -d airflow
(이후 PostgreSQL 명령어 사용가능)
간단한 테스트
이번에 테스트 해볼 것은 간단하게 아래의 csv파일을 PostgreSQL에 업로드하는 DAG를 구성하는 것이다. 아래의 파일들을 다운받아 간단하게 dag 파일 안에 옮겨준다.
그 후 파일 dag 안에 다음과 같은 파일을 test.py 파일을 만든다.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime
import random
def get_postgres_connection():
hook = PostgresHook(postgres_conn_id='postgres_dev_db')
conn = hook.get_conn()
conn.autocommit = False
return conn.cursor()
def extract_transform(**context):
data = './airtravel.csv'
return data
def load(**context):
cur = get_postgres_connection()
sql = open('/opt/airflow/dags/test.sql').read()
data = context("task_instance").xcom_pull(key="return_value", task_ids="extract")
sql += f"""COPY test.test_info FROM '{data}' DELIMITER ',' CSV HEADER;"""
try:
cur.execute(sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
test_post = DAG(
dag_id = f'test_post',
start_date = datetime(2022,6,11),
catchup = False,
tags = ['hello', 'wandb'],
schedule = '@once',
)
extract_transform = PythonOperator(
task_id = 'extract_transform',
python_callable = extract_transform,
dag = test_post
)
load = PythonOperator(
task_id = 'load',
python_callable = load,
dag = test_post
)
extract_transform >> load
인터넷 브라우저에 127.0.0.1:8080을 입력한 후 ID/PW에 airflow를 입력 후 admin / connection 에서 아래의 이미지와 같이 설정해준다.
그 후 dag를 실행시켜 보면 돌아가는 것을 확인할 수 있다.
'Programming > Python' 카테고리의 다른 글
[HuggingFace] 허깅페이스 데이터 다운로드하기 (1) | 2024.11.10 |
---|---|
[Python] model의 FLOPs 계산하기 (feat.calflops) (1) | 2024.11.08 |
[Airflow] Docker 환경에서 Airflow와 Wandb 같이 활용하기 (0) | 2023.07.27 |
[mmdetection] mmdetection을 통한 object detection 데이터셋 커스터마이징 방법 (ver.3.1) (2) | 2023.07.25 |
[Pytorch] torchvision을 통한 object detection 시 Loss is nan, stopping training 에러 발생 (0) | 2023.06.19 |