[Перевод] Как настроить мультинодовый кластер Airflow с помощью Celery и RabbitMQ
Что такое Airflow?
Apache Airflow — это продвинутый workflow менеджер и незаменимый инструмент в арсенале современного дата инженера.
Airflow позволяет создавать рабочие процессы в виде направленных ациклических графов (DAG) задач. Разнообразные служебные программы командной строки выполняют сложные операции на DAG. Пользовательский интерфейс легко визуализирует конвейеры, работающие в производственной среде, отслеживает ход выполнения и при необходимости устраняет неполадки.
Программно создавайте, планируйте и контролируйте рабочий процесс. Он предоставляет функциональную абстракцию в виде идемпотентного DAG (направленного ациклического графа). Функция как служба абстракции для выполнения задач с заданными интервалами.
Кластер с одним узлом Airflow
В одноузловом кластере Airflow все компоненты (рабочий, планировщик, веб-сервер) установлены на одном узле, известном как “Master нода”. Чтобы масштабировать кластер с одним узлом, Airflow
должен быть настроен в режиме LocalExecutor
. Worker берет (pull) задачу из очереди IPC (межпроцессное взаимодействие), это очень хорошо масштабируется до тех пор, пока ресурсы доступны на Master нода. Чтобы масштабировать Airflow на много нод, необходимо включить Celery Executor
.
Архитектура с одной нодой Airflow
Мультинодовый кластер Airflow
В мультинодовой архитектуре Airflow его демоны распределены по всем рабочим нодам. Поскольку веб-сервер и планировщик будут установлены на главной ноде, а рабочие будут установлены на каждом отдельном рабочей ноде, поэтому он может хорошо масштабироваться как по горизонтали, так и по вертикали. Чтобы использовать этот режим архитектуры, необходимо настроить Airflow с помощью CeleryExecutor
.
Серверную часть Celery необходимо настроить для включения режима CeleryExecutor
в архитектуре Airflow. Популярными фреймворками / приложениями для бэкэнда Celery являются Redis и RabbitMQ. RabbitMQ — это брокер сообщений. Его задача — управлять обменом данными между несколькими службами задач путем управления очередями сообщений. Вместо канала связи IPC, который был бы в архитектуре с одной нодой, RabbitMQ предоставляет модель механизма публикации — подписки для обмена сообщениями в разных очередях. Каждая очередь в RabbitMQ опубликована с событиями / сообщениями в виде команд задач, работники Celery будут извлекать команды задач из каждой очереди и выполнять их как действительно распределенные и параллельные способы. Что действительно может ускорить действительно мощное одновременное и параллельное выполнение задач в кластере.
Мультинодовая архитектура Airflow
Celery:
Celery — это асинхронная очередь задач, основанная на распределенной передаче сообщений. Он ориентирован на работу в реальном времени, но также поддерживает планирование. Airflow использует его для выполнения нескольких параллельных операций на уровне задач на нескольких рабочих узлах с использованием многопроцессорности и многозадачности. Мультинодовая архитектура Airflow позволяет масштабировать Airflow, легко добавляя новые воркеры.
Устновка мультинодового кластера Airflow и настройка Celery:
Примечание. Мы используем операционную систему CentOS 7 Linux.
- Установка RabbitMQ
yum install epel-release
yum install rabbitmq-server
- Включение и запуск RabbitMQ Server
systemctl enable rabbitmq-server.service
systemctl start rabbitmq-server.service
- Включение интерфейса веб-консоли управления RabbitMQ
rabbitmq-plugins enable rabbitmq_management
Номер порта сервера rabbitmq по умолчанию — 15672
, имя пользователя и пароль по умолчанию для веб-консоли управления — admin/admin
.
- Установка протокола транспорта
pyamqp
для RabbitMQ и адаптера PostGreSQL
pip install pyamqp
amqp://
— это псевдоним, который использует librabbitmq, если он доступен, или py-amqp
, если его нет.
Вы должны использовать pyamqp://
или librabbitmq://
, если хотите точно указать, какой протокол передачи данных использовать. Протокол pyamqp://
использует библиотеку amqp
(http://github.com/celery/py-amqp)
Установка адаптера PostGreSQL: psycopg2
Psycopg — это адаптер PostgreSQL для языка программирования Python.
pip install psycopg2
- Установка Airflow.
pip install 'apache-airflow[all]'
Проверьте версию airflow
airflow version
Мы используем версию Airflow v1.10.0, рекомендованную и стабильную в настоящее время.
- Инициализация базы данных
airflow initdb
После установки и настройки вам необходимо инициализировать базу данных, прежде чем вы сможете запустить группы обеспечения доступности баз данных и ее задачу. Поэтому последние изменения будут отражены в метаданных Airflow из конфигурации.
- Установка Celery
Celery должен быть установлен на главной ноде и на всех рабочих нодах.
pip install celery==4.3.0
Проверка версии Celery
celery --version
4.3.0 (rhubarb)
- Изменение файла airflow.cfg для Celery Executor.
executor = CeleryExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@{HOSTNAME}/airflow
broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/
celery_result_backend = db+postgresql://airflow:airflow@{HOSTNAME}/airflow
dags_are_paused_at_creation = True
load_examples = False
После внесения этих изменений в файл конфигурации airflow.cfg
необходимо обновить метаданные airflow с помощью команды airflow initdb
, а затем перезапустить airflow
.
Теперь вы можете запустить веб-сервер airflow с помощью следующей команды
# default port is 8080
airflow webserver -p 8000
Вы можете запустить планировщик
# start the scheduler
airflow scheduler
Вы также должны запустить airflow на каждом рабочем узле.
airflow worker
Как только вы закончите запускать различные службы airflow, вы можете проверить фантастический интерфейс airflow при помощи команды:
http://<IP-ADDRESS/HOSTNAME>:8000
поскольку мы указали порт 8000 в нашей команде запуска службы веб-сервера, в противном случае номер порта по умолчанию — 8080.
Да! Мы закончили создание кластера с мультинодовый архитектурой Airflow. 🙂