Предположим, вы хотите создать систему, которая периодически запускается и выполняет в ней какие-то задачи. Теперь это может быть как очень простой процесс очистки данных, так и очень сложный, например, извлечение, преобразование и загрузка данных для обычного аналитического отчета или переобучение модели машинного обучения.
Таким образом, мы можем использовать некоторые ручные методы для выполнения всех видов операций, что занимает много времени, или мы можем автоматизировать наш процесс и планировать задачи с помощью Airflow.
Так что же такое воздушный поток?
Airflow — это проект управления данными с открытым исходным кодом, написанный на Python и созданный в Airbnb в 2014 году для обработки большого количества данных в режиме реального времени и планирования управления их рабочими процессами, а также после того, как Airflow будет поддерживаться Apache. Airflow позволяет легко автоматизировать процесс с помощью концепции DAG, которую мы рассмотрим далее в этой статье.
Airflow позволяет нам писать сложные автоматические расписания процессов, а также имеет богатый веб-интерфейс для визуализации и мониторинга процесса.
Основные концепции воздушного потока
DAG (направленный ациклический граф):
DAG — это самая важная концепция Airflow. Группы DAG позволяют нам упростить процесс, упорядочивая задачи и управляя ими. связаны и как они зависят друг от друга, но концепция DAG гласит, что «в графе не должно быть цикла, что означает, что граф должен быть ациклическим и все ребра должны быть направлены».
DAG собирает задачи и упорядочивает их с помощью зависимостей, чтобы указать, как они должны выполняться. Даги касаются только того, как они должны работать, и определяют серию выполнения.
Рассмотрим эти два примера:
У нас есть четыре узла (A, B, C и D), которые подключены в определенном порядке, в котором они должны выполняться. Он также определяет свойство Dags иметь ациклический граф. Таким образом, мы можем сказать, что это действительный DAG.
Здесь у нас также есть четыре узла (A, B, C и D), которые соединены в определенном порядке, но не содержат свойства наличия ацикличности, поэтому это недопустимый Dag.
Таким образом, DAG в airflow — это скрипт на Python, который содержит набор задач, которые необходимо выполнить, с их зависимостями.
Задания
Задача — это основная единица выполнения в Airflow. Задачи объединяются в DAG, а затем между ними устанавливаются восходящие и нисходящие зависимости, чтобы выразить порядок, в котором они должны выполняться.
Жизненный цикл задачи
no_status:- планировщик создал пустой экземпляр задачи
поставлен в очередь: планировщик отправил задачу исполнителю для выполнения в очереди
running:- рабочий взял задачу и теперь выполняет ее
shutdown: - выполнение задачи было остановлено
запланировано: - экземпляр задачи, определенный планировщиком, должен быть запущен
upstream_failed: — ошибка восходящей задачи задачи
up_for_retry :- перезапустить задачу
up_for_reschedule:- перепланировать задачу каждый определенный интервал времени
успех: - задача выполнена успешно
Оператор
Это классы, которые инкапсулируют логику для выполнения единицы работы. В Airflow у нас есть много типов операторов, таких как BashOperator, PythonOperator и EmailOperator. Мы также можем определить собственный пользовательский оператор с помощью CustomizedOperator.
Теперь давайте разберемся, как даг, задача и оператор работают вместе с этой простой диаграммой.
Мы можем сказать, что DAG состоит из TASK, а TASK состоит из оператора, которые вместе образуют рабочий процесс, который необходимо выполнить.
Установка воздушного потока:
Поэтому для установки Airflow рекомендуется создать виртуальную среду, иначе мы можем получить некоторые ошибки.
Шаг 1. Создание виртуальной среды
python3 -m venv venv
Шаг 2. Установка Airflow
pip install ‘apache-airflow==2.3.2’ \ — constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.3.2/constraints-3.8.txt"
Шаг 3. Экспортируйте воздушный поток домой (САМОЕ ВАЖНОЕ)
Нам нужно экспортировать airflow домой в наш текущий каталог, чтобы запустить dags. Для этого в терминальном типе
export AIRFLOW_HOME=.
Шаг 4. Инициализируйте базу данных воздушного потока
airflow db init
Шаг 5. Создайте пользователя для входа
airflow users create — username admin — firstname XXXX — lastname XXX — role Admin — email [email protected]
Шаг 6. Войдите в веб-интерфейс
В терминале введите "airflow webserver -p 8080" для запуска сервера, а во втором терминале введите "airflow scheduler" для запуска планировщика.
Напишите свой первый DAG
В этом мы просто увидим, как мы можем начать работу с DAG и как мы можем написать нашу первую DAG, используя очень простые «операторы bash».
Поехали!!
Мы собираемся создать простой dag, который печатает какое-то сообщение в наших журналах воздушного потока, используя операторы bash.
Для создания DAG необходимо, чтобы они находились внутри папки с именем «dags».
Внутри дага папка создает ваш файл «python» для записи вашего первого дага.
# Importing packages from airflow import DAG from datetime import datetime, timedelta from airflow.operators.bash import BashOperator # for working with Bash commands
Каждая DAG должна начинаться с
With DAG( // dag_id, // args, …. // schedule_interval ) as dag: # define operators E.X (python, bash, GCP. and all)
Определение аргументов DAG и оператора Bash
default_args = { “owner”: “iamhimanshu0”, “retries”: 5, “retry_delay”: timedelta(minutes=2) } with DAG( dag_id=”First_Dag”, # dag_id default_args=default_args, # passing arguments description=”This is first dag”, # description of the DAG start_date=datetime(2022, 7, 2, 2), # DAG starting time schedule_interval=”@daily” # DAG schedule interval ) as dag: # initalize the bash operator task1 = BashOperator( task_id=”first_task”, bash_command=”echo hello world, this is first task” # execute the bash command )
Полный код операторов Bash
Теперь пришло время запустить нашу первую группу обеспечения доступности баз данных, перейти к веб-интерфейсу и обновить ее, и вы должны увидеть свою первую группу доступности базы данных.
Теперь нажмите на First Dag, вы должны увидеть что-то вроде этого пользовательского интерфейса.
Теперь нам просто нужно щелкнуть значок запуска и дождаться, пока наша задача не перейдет в состояние успеха. Как только она будет успешной, вы должны увидеть, что цвет вашей первой_задачи изменился на зеленый, что означает, что задача выполнена успешно.
Теперь пришло время увидеть вывод нашей команды Bash, поэтому для этого нам нужно нажать на нашу задачу и перейти в «журнал».
Нажмите на журнал и прокрутите вниз, вы должны увидеть свой вывод dag.
Это все, что нужно для создания вашей первой DAG с помощью BashOperator. В следующей статье мы узнаем о том, как мы можем создавать расширенные даги с помощью операторов Python и как мы можем создать полную систему ETL.
Заключение:
В этой статье мы узнали, что такое Apache Airflow, Dag и где мы можем его использовать, а также создали простой Bash DAG. Основные выводы из статьи:
- Что такое воздушный поток Apache?
- Что такое DAG и как его использовать?
- Как написать свой первый даг.
На этом пока все. Надеюсь, вы узнали что-то новое из этой статьи, увидимся в следующей статье.
Посетите мой канал на Youtube, чтобы узнать о проектах, связанных с машинным обучением, искусственным интеллектом, НЛП и т. д. -› Youtube
На этом пока все, увидимся в следующей статье.
давайте подключаться в Linkedin, Twitter, Instagram, Github и Facebook.
Спасибо, что прочитали!
Ссылки:
Я создаю все изображения в Figma