Я довольно много работал с конвейерами Apache Airflow, чтобы автоматизировать кластеры ElasticMapReduce (EMR) на Amazon Web Services (AWS) для инженерных потоков данных, так что это первая из серии публикаций, в которых освещаются некоторые неожиданные ситуации. проблемы, с которыми я столкнулся при использовании этого инструмента, а также некоторые обходные пути, которые могут оказаться полезными, если вы столкнетесь с аналогичными проблемами.
Итак, во-первых, что именно я пытаюсь сделать?
Я работал над небольшим личным проектом, который будет использовать набор разрозненных данных (данные о прибытии в США за год, разбитые на отдельные ежемесячные файлы и сохраненные в формате .sas7bdata, демографические данные городов США, коды аэропортов и штат данные о температуре в обычном старом формате .csv), отправьте их в кластер EMR для агрегирования и обработки, а затем создайте и заполните простую звездную схему в Amazon Redshift.
Я использую SPARK_STEPS api в моем даге Airflow, чтобы:
- Создайте кластер EMR master-slave + том hdfs для хранения.
- Скопируйте входящие данные из корзины s3 на том hdfs, подключенный к кластеру.
- Прочтите данные SAS из тома hdfs, сложите все вместе и обработайте все до чертиков.
- Запустите тесты качества данных и выключите EMR в случае успеха.
- Скопируйте результаты из hdfs обратно в s3.
- Переместите агрегированные данные из S3 в Redshift
Основная проблема заключалась в том, что у меня был набор данных SAS, состоящий из 12 отдельных файлов, которые мне нужно было прочитать и добавить вместе в единый фрейм данных Spark для обработки, и я не хотел называть каждый из файлов по отдельности.
Скорее, я хотел, чтобы мой код просто брал что-нибудь из моего каталога данных, читал его, а затем объединял воедино.
В моей тестовой настройке (которая хранила данные SAS на локальном томе и поэтому пропустил шаг 2 выше) я просто использовал listdir для предоставления объекта списка, содержащего все файлы данных, которые мне нужно было прочитать.
Однако, когда я переместил свои входящие данные SAS на S3 (а оттуда в hdfs как часть dag), мой конвейер начал давать сбой.
SSH-подключение к кластеру и проверка журналов выявили следующую ошибку.
Traceback (most recent call last): File "/mnt/tmp/spark-75af9646-c214-4e29-b6ae-25bdc7e4c744/process_i94_fact_data.py", line 259, in <module> i94_datafile_list = get_files(I94_PATH) File "/mnt/tmp/spark-75af9646-c214-4e29-b6ae-25bdc7e4c744/process_i94_fact_data.py", line 54, in get_files for item in listdir(path): FileNotFoundError: [Errno 2] No such file or directory: 'hdfs:///user/hadoop/i94/18-83510-I94-Data-2016/'
Что озадачило:
hdfs dfs -ls hdfs:///user/hadoop/i94/18-83510-I94-Data-2016
Показал, что файлы были там, где они должны быть на томе. Вытащив то, что осталось от моих волос за пару дней, я наконец понял, что проблема в том, что том hdfs, созданный с помощью кластера, не был виден библиотеке операционной системы.
Одним из решений этого может быть загрузка модуля hdfs, например hdfs или pyarrow (и на самом деле Вес Маккинни дает отличное описание каждого из многочисленных вариантов здесь), но для загрузки библиотеки мне потребовалось бы разработать сценарий оболочки начальной загрузки EMR для загрузки библиотеки и сохранения ее в ведре S3, что казалось излишним для такого относительно скромного варианта использования. (К тому же у меня никогда не было этих надоедливых сценариев начальной загрузки).
Поковырявшись в обмене стеками и сети, я наконец наткнулся на решения, которые использовали подпроцессы для запуска команд уровня ОС. Я перепробовал почти все из них, но ни один из них, похоже, не работал у меня полностью успешно. В конце концов, я использовал их как трамплин, чтобы разработать что-то, что сработало бы для моего варианта использования:
Основная хитрость, чтобы заставить эту работу работать, заключалась прежде всего в установке флага -C в команде:
args = "/usr/bin/hdfs dfs -ls -C {}".format(path)
Поскольку это означало, что ответ подпроцесса возвращал только имена объектов в указанном пути (разделенные символами новой строки), что избавляло от необходимости форматирования / обработки пост-ответа. Вторая тонкость заключалась в том, что ответ подпроцесса имел тип byte, если я не установил следующий флаг в команде subprocess.run:
universal_newlines=True
Как только эти две вещи были решены, сниппет возвратил объект списка, содержащий полностью определенные элементы имени файла, которые я мог затем обработать дальше.
Основным недостатком этого подхода является то, что для его работы требовалось:
shell=True
в предложении subprocess.run для запуска, что является потенциальной угрозой безопасности. В настоящее время я не нашел способа обойти это, но когда найду, я опубликую продолжение.
Полное репозиторий git хранится здесь, а процедура get_files_hdfs является частью файла pyspark_steps / process_i94_fact_data.py.