Преодоление разрыва между тем, что я знал, и тем, что я хотел узнать. Это первая история, которую я решил написать о своем процессе изучения Apache Iceberg.
Мотивация
Некоторое время я интересовался Data Engineering. Хотя моя должность официально называется «инженер-программист», большая часть моей работы связана с написанием ETL, SQL-запросов, терраформирования и проверок DQ. И я не претендую на звание Data Engineer, но меня интересует эта область.
Одним из первых инструментов, которые я использовал, был Airflow, и сейчас я изучаю (Py)Spark.
Я слышал о терминах Data Lake, Data Mesh и Data Lakehouse, и поначалу эти термины были довольно запутанными. Я думаю, все хотят описать то, что они делают, как можно короче? Так или иначе, в какой-то момент я узнал об Apache Iceberg. И основная идея, которая меня заинтересовала, заключается в следующем:
Эй, у нас есть куча файлов, давайте рассматривать их как таблицы
Итак, я начал изучать Айсберг. Исходя из опыта разработки программного обеспечения, было много вопросов. Может быть, больше, чем должно было быть. В этом посте я попытаюсь представить эти вопросы и то, как я пришел к ответам.
Первый раз попал на https://iceberg.apache.org/
Когда вы попадаете на домашнюю страницу Apache Iceberg, первое, что вы видите, это:
Формат открытых таблиц для аналитических наборов данных.
Хорошо…. что? Если немного прокрутить вниз, то попадаешь в описание:
Iceberg — это высокопроизводительный формат для огромных аналитических таблиц. Iceberg привносит надежность и простоту таблиц SQL в большие данные, позволяя таким механизмам, как Spark, Trino, Flink, Presto, Hive и Impala, безопасно работать с одними и теми же таблицами в одно и то же время.
У меня, как у инженера-программиста, к этому моменту возникло еще больше вопросов. Что такое открытый формат таблицы? Хорошо, если мне нужен SQL с моими большими данными, не должен ли я просто использовать какую-нибудь базу данных, например PostgreSQL? В чем подвох? И что все эти инструменты перечислены?
Я начал с ввода что такое формат открытой таблицы в Google. И красивое объяснение дано здесь. По сути, вы хотите иметь в своем озере данных функции, подобные базе данных. Например, мы хотели бы иметь поддержку ACID. Итак, что такое озеро данных?
Озеро данных — это репозиторий для больших объемов данных, в каком бы формате вы их ни хранили — в структурированном, полуструктурированном или неструктурированном (например, в файлах). Итак, в основном вы хотите иметь возможность запрашивать ваши неструктурированные данные, как если бы они были загружены в базу данных.
Предпосылки
Итак, чтобы использовать Iceberg, нам нужно кое-что знать о вычислительном движке, который он использует. Поскольку Spark — один из наиболее популярных инструментов и движок, поддерживающий большинство функций, давайте использовать Spark. У меня есть некоторый опыт работы со Spark, когда я читал книгу (Анализ данных с помощью Python и PySpark) и пытался настроить автономный кластер Spark (здесь) и Hadoop Yarn (здесь).
Я знаю о Spark как о механизме анализа больших данных. По сути, если у вас есть большие объемы данных, которые необходимо проанализировать, Spark — проверенный и безопасный вариант. Но как Айсберг вписывается в картину со Спарком?
Если вы более осведомлены или нетерпеливы, чем я, вам, вероятно, следует запустить настройку докера, представленную в этом репозитории GitHub, и просто войти в нее. Однако в таких ситуациях я всегда вспоминаю цитату из статьи г-на Сорвы Notional Machines and Introductory Programming Education:
… учащимся необходимо понимание «на один уровень ниже» основного целевого уровня (программного кода), которое подходит для объяснения явлений на целевом уровне.
Имея это в виду, я решил попробовать настроить среду самостоятельно 😊.
Настройка среды
Я хотел бы разделить этот раздел на несколько этапов. Также вы можете найти код на GitHub здесь.
«Копирование» файлов докеров с GitHub
Я хотел попытаться включить то, что я узнаю, в существующие знания, которые у меня есть. Я хотел знать:
- как Iceberg вписывается в Spark? Это фреймворк поверх Spark? Что-то другое?
- может ли он работать удаленно из Spark? Как другой сервис, который может обмениваться данными через порт?
- как данные попадают в эти так называемые таблицы Iceberg? Что происходит тогда?
- что означало бы изменение данных в этих таблицах?
- Могу ли я использовать уже подготовленный автономный кластер?
Я начал с того, что зашел в репозиторий GitHub, о котором я упоминал, и просмотрел файлы docker compose и Dockerfile. Я решил скопировать части, которые мне (думаю) нужны, т.е.
FROM pyspark as spark-iceberg # Download iceberg spark runtime RUN curl https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.3_2.12/1.1.0/iceberg-spark-runtime-3.3_2.12-1.1.0.jar -Lo /opt/spark/jars/iceberg-spark-runtime-3.3_2.12-1.1.0.jar ENV TABULAR_VERSION=0.50.4 RUN curl https://tabular-repository-public.s3.amazonaws.com/releases/io/tabular/tabular-client-runtime/${TABULAR_VERSION}/tabular-client-runtime-${TABULAR_VERSION}.jar -Lo /opt/spark/jars/tabular-client-runtime-${TABULAR_VERSION}.jar # Download Java AWS SDK RUN curl https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.17.257/bundle-2.17.257.jar -Lo /opt/spark/jars/bundle-2.17.257.jar # Download URL connection client required for S3FileIO RUN curl https://repo1.maven.org/maven2/software/amazon/awssdk/url-connection-client/2.17.257/url-connection-client-2.17.257.jar -Lo /opt/spark/jars/url-connection-client-2.17.257.jar # Install AWS CLI RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" \ && unzip awscliv2.zip \ && sudo ./aws/install \ && rm awscliv2.zip \ && rm -rf aws/ # Add iceberg spark runtime jar to IJava classpath ENV IJAVA_CLASSPATH=/opt/spark/jars/* RUN mkdir -p /home/iceberg/localwarehouse /home/iceberg/notebooks /home/iceberg/warehouse /home/iceberg/spark-events /home/iceberg ARG jupyterlab_version=3.6.1 RUN apt-get update -y && \ apt-get install -y python3-pip python3-dev && \ pip3 install --upgrade pip && \ pip3 install wget jupyterlab==${jupyterlab_version} # Add a notebook command RUN echo '#! /bin/sh' >> /bin/notebook \ && echo 'export PYSPARK_DRIVER_PYTHON=jupyter' >> /bin/notebook \ && echo "export PYSPARK_DRIVER_PYTHON_OPTS=\"lab --notebook-dir=/home/iceberg/notebooks --ip='0.0.0.0' --NotebookApp.token='' --port=8888 --no-browser --allow-root\"" >> /bin/notebook \ && echo 'pyspark' >> /bin/notebook \ && chmod u+x /bin/notebook ENTRYPOINT ["./entrypoint.sh"] CMD ["notebook"]
Что касается референтного репозитория, я решил использовать лабораторию jupyter вместо ноутбука jupyter. Так же убрал загрузку файлов и некоторые другие команды. Файл spark-defaults.conf
выглядел так:
spark.master spark://spark-iceberg:7077 spark.eventLog.enabled true spark.eventLog.dir /opt/spark/spark-events spark.history.fs.logDirectory /opt/spark/spark-events spark.sql.catalogImplementation in-memory spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
Очевидно, я не настроил все, что указано в документах. Я просто добавил расширение айсберга. И это вместе со следующей строкой из документов Quickstart:
spark-sql —packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.1.0
сказал мне, что Iceberg на самом деле является пакетом для Spark. Итак, вернемся к вопросам:
- как Iceberg вписывается в Spark? Это фреймворк поверх Spark? Что-то другое? — это пакет
- может ли он работать удаленно из Spark? Как еще один сервис, который может обмениваться данными через порт? — Не думаю, что может.
Файл компоновки докера на этом этапе:
version: '3.8' services: spark-iceberg: image: spark-iceberg container_name: spark-iceberg build: ./spark entrypoint: [ './entrypoint.sh', 'master' ] volumes: - ./warehouse:/home/iceberg/warehouse - ./notebooks:/home/iceberg/notebooks - ./data:/opt/spark/data - ./spark_apps:/opt/spark/apps - spark-logs:/opt/spark/spark-events ports: - '8888:8888' - '8080:8080' - '10000:10000' - '10001:10001' spark-worker: container_name: spark-worker image: spark-iceberg entrypoint: [ './entrypoint.sh', 'worker' ] depends_on: - spark-iceberg env_file: - spark/.env volumes: - ./data:/opt/spark/data - ./spark_apps:/opt/spark/apps - spark-logs:/opt/spark/spark-events ports: - '8081:8081' spark-history-server: container_name: spark-history image: spark-iceberg entrypoint: [ './entrypoint.sh', 'history' ] depends_on: - spark-iceberg env_file: - spark/.env volumes: - spark-logs:/opt/spark/spark-events ports: - '18080:18080' volumes: spark-logs:
Запуск первого примера
Создайте образы докеров с помощью docker compose build
и запустите с помощью docker compose up
. И перейдите к https://localhost:8888
, и я приготовил небольшой блокнот, чтобы попытаться сохранить некоторые данные в виде таблицы. Вот соответствующие части приложения:
data = [("James","","Smith","36636","M",3000), ("Michael","Rose","","40288","M",4000), ("Robert","","Williams","42114","M",4000), ("Maria","Anne","Jones","39192","F",4000), ("Jen","Mary","Brown","","F",-1) ] schema = StructType([ \ StructField("firstname",StringType(),True), \ StructField("middlename",StringType(),True), \ StructField("lastname",StringType(),True), \ StructField("id", StringType(), True), \ StructField("gender", StringType(), True), \ StructField("salary", IntegerType(), True) \ ]) df = spark.createDataFrame(data=data, schema=schema) df.writeTo("db.test").create()
Предлагаемый подход к написанию таблиц в документации Iceberg заключается в использовании V2 DataFrame API. Который я использую в приведенном выше примере. После выполнения этой строки df.writeTo("db.test").create()
я получил следующий вывод:
23/03/05 17:23:04 ПРЕДУПРЕЖДЕНИЕ ResolveSessionCatalog: будет создана таблица Hive serde, так как не указан поставщик таблиц. Вы можете установить для spark.sql.legacy.createHiveTableByDefault значение false, чтобы вместо этого была создана собственная таблица источника данных.
…
AnalysisException: поддержка Hive требуется для CREATE Hive TABLE (AS SELECT);
'CreateTable `db`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, ErrorIfExists
+- LogicalRDD [имя#0, отчество#1, фамилия#2, id#3, пол № 4, зарплата № 5], ложь
У меня нет опыта работы с Hive. Тааак... Я немного погуглил про Hive и пришел к выводу, что пока не хочу его изучать. По сути, это уровень хранения поверх Hadoop с собственным языком запросов [3]. Я узнал, что в spark 3.0.0-preview2 spark.sql.legacy.createHiveTableByDefault
было установлено на false
, о чем свидетельствует документация здесь. Однако, когда я запустил искровую оболочку внутри докера и проверил значение конфигурации, оказалось, что true
:
>>> spark.conf.get("spark.sql.legacy.createHiveTableByDefault") 'true'
Похоже, что значение по умолчанию изменилось в какой-то момент. Поэтому Spark пытается создать таблицу Hive, но Hive нам не нужен, и углубляться в него было бы не в рамках этой истории.
В конце этой части я создал коммит git под названием: Medium — setting up environment
.
Настройка каталога
Итак, предыдущее не сработало… сюрприз-сюрприз. Следующим моим побуждением было попытаться настроить каталог, который будет использовать Iceberg. Из документации здесь Iceberg поддерживает каталог на основе каталогов. Это казалось достаточно простым. Итак, я изменил свой spark-defaults.conf
:
spark.master spark://spark-iceberg:7077 spark.eventLog.enabled true spark.eventLog.dir /opt/spark/spark-events spark.history.fs.logDirectory /opt/spark/spark-events spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.data org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.data.type hadoop spark.sql.catalog.data.warehouse /home/iceberg/warehouse spark.sql.defaultCatalog data spark.sql.catalogImplementation in-memory
Я добавил новый каталог с именем data
, который должен быть SparkCatalog типа hadoop
, что должно позволять хранить данные в файловой системе. Я также настроил каталог для хранения данных и установил каталог по умолчанию data
.
Когда мы теперь запускаем нашу записную книжку, в основном последнюю ячейку, кажется, что все было успешно завершено:
Том /home/iceberg/warehouse
сопоставляется с местоположением ./warehouse
в docker compose, поэтому, когда мы смотрим на этот каталог, мы видим это:
Итак, метаданные сохраняются, но данные, похоже, отсутствуют. Если мы завершим работающее ядро или приложение и запустим другой блокнот: iceberg-read-data
, мы увидим, что получаем данные:
Теперь, как это может быть? Если мы войдем в контейнер рабочего докера и выполним следующее:
root@e80874902d50:/opt/spark# ls -al /home/iceberg/warehouse/db/test/data/ total 40 drwxr-xr-x 2 root root 4096 Mar 7 20:33 . drwxr-xr-x 3 root root 4096 Mar 7 20:33 .. -rw-r--r-- 1 root root 24 Mar 7 20:33 .00000-0-8eff3bba-21dc-46e8-b20e-58b81b1640ea-00001.parquet.crc -rw-r--r-- 1 root root 24 Mar 7 20:33 .00001-1-7e33edcb-f5e4-486f-9f9b-3f811375aa2f-00001.parquet.crc -rw-r--r-- 1 root root 24 Mar 7 20:33 .00002-2-de082f5c-147b-49a4-b316-6fcfa8acbd92-00001.parquet.crc -rw-r--r-- 1 root root 24 Mar 7 20:33 .00003-3-9511efc6-1658-4a99-8d01-72b9184025df-00001.parquet.crc -rw-r--r-- 1 root root 1662 Mar 7 20:33 00000-0-8eff3bba-21dc-46e8-b20e-58b81b1640ea-00001.parquet -rw-r--r-- 1 root root 1670 Mar 7 20:33 00001-1-7e33edcb-f5e4-486f-9f9b-3f811375aa2f-00001.parquet -rw-r--r-- 1 root root 1691 Mar 7 20:33 00002-2-de082f5c-147b-49a4-b316-6fcfa8acbd92-00001.parquet -rw-r--r-- 1 root root 1705 Mar 7 20:33 00003-3-9511efc6-1658-4a99-8d01-72b9184025df-00001.parquet
мы видим, что файлы находятся в рабочем контейнере. Хорошо, что я забыл сделать на этом этапе, так это сопоставить каталог хранилища рабочего процесса с тем же каталогом хранилища, который использует главный узел. Если мы снова попробуем первый блокнот (с небольшим изменением использования createOrReplace
вместо create)
, мы увидим, что данные находятся в структуре каталогов:
Для этой части я создал коммит: Medium — configuring catalog
. Если вы редактор, который может прочитать .parquet
файлов, вы можете открыть один из файлов данных и увидеть что-то вроде этого:
Дополнительное примечание
Пока я работал над этой историей, я запустил пример с GitHub вместе со своей настройкой. Я заметил, что сеанс искры ноутбука Jupyter потребляет все ресурсы драйвера. Это означает, что вы не можете отправлять другие задания или другие записные книжки во время работы. И я спрашивал себя: «Почему это было?».
Ну а при настройке на спарке автономный режим кластера с рабочими контейнерами, как уже было сказано, драйвер автоматически жрет все ресурсы. Для мастера задан URL-адрес: spark://spark-iceberg:7077
.Существует способ запустить блокнот в локальном режиме. Таким образом драйвер не будет потреблять все ресурсы, но вам может понадобиться запустить рабочий процесс в главном контейнере. Блокнот не будет использовать рабочие контейнеры.
Вы можете сделать это, изменив строку в Dockerfile на следующую:
# Add a notebook command RUN echo '#! /bin/sh' >> /bin/notebook \ && echo 'export PYSPARK_DRIVER_PYTHON=jupyter' >> /bin/notebook \ && echo "export PYSPARK_DRIVER_PYTHON_OPTS=\"lab --notebook-dir=/home/iceberg/notebooks --ip='0.0.0.0' --NotebookApp.token='' --port=8888 --no-browser --allow-root\"" >> /bin/notebook \ && echo 'pyspark --master local[*]' >> /bin/notebook \ && chmod u+x /bin/notebook
Это запустит искру локально с «столько рабочих потоков, сколько есть логических ядер» при запуске ноутбука [5]. Приложение не будет отображаться в пользовательском интерфейсе мастера искры.
Краткое содержание
Эта история началась как попытка объяснить, какие проблемы у меня были с пониманием того, что такое Apache Iceberg на самом деле.
Apache Iceberg — это пакет, который можно установить вместе со Spark, чтобы обеспечить открытый формат таблиц для озер данных. Это позволяет нам иметь поддержку ACID среди других функций, которые могут вам понадобиться в озере данных. Подробнее читайте здесь.
В этой истории мы настроили каталог данных и использовали Apache Iceberg с минимальной конфигурацией для хранения данных в локальной файловой системе.
Я не ответил на все вопросы, которые у меня были. Тем не менее, я планирую продолжить играть с Apache Iceberg, чтобы шаг за шагом отвечать на них.
Код находится на GitHub.
Рекомендации
- https://github.com/tabular-io/docker-spark-iceberg
- https://iceberg.apache.org/docs/latest/spark-configuration/#catalogs
- https://jaceklaskowski.medium.com/why-is-spark-sql-so-obsessed-with-hive-after-just-a-single-day-with-hive-289e75fa6f2b
- https://spark.apache.org/docs/3.0.0-preview2/sql-migration-guide.html#upgrading-from-spark-sql-24-to-30
- https://stackoverflow.com/questions/32356143/what-does-setmaster-local-mean-in-spark