Преодоление разрыва между тем, что я знал, и тем, что я хотел узнать. Это первая история, которую я решил написать о своем процессе изучения Apache Iceberg.

Мотивация

Некоторое время я интересовался Data Engineering. Хотя моя должность официально называется «инженер-программист», большая часть моей работы связана с написанием ETL, SQL-запросов, терраформирования и проверок DQ. И я не претендую на звание Data Engineer, но меня интересует эта область.

Одним из первых инструментов, которые я использовал, был Airflow, и сейчас я изучаю (Py)Spark.

Я слышал о терминах Data Lake, Data Mesh и Data Lakehouse, и поначалу эти термины были довольно запутанными. Я думаю, все хотят описать то, что они делают, как можно короче? Так или иначе, в какой-то момент я узнал об Apache Iceberg. И основная идея, которая меня заинтересовала, заключается в следующем:

Эй, у нас есть куча файлов, давайте рассматривать их как таблицы

Итак, я начал изучать Айсберг. Исходя из опыта разработки программного обеспечения, было много вопросов. Может быть, больше, чем должно было быть. В этом посте я попытаюсь представить эти вопросы и то, как я пришел к ответам.

Первый раз попал на https://iceberg.apache.org/

Когда вы попадаете на домашнюю страницу Apache Iceberg, первое, что вы видите, это:

Формат открытых таблиц для аналитических наборов данных.

Хорошо…. что? Если немного прокрутить вниз, то попадаешь в описание:

Iceberg — это высокопроизводительный формат для огромных аналитических таблиц. Iceberg привносит надежность и простоту таблиц SQL в большие данные, позволяя таким механизмам, как Spark, Trino, Flink, Presto, Hive и Impala, безопасно работать с одними и теми же таблицами в одно и то же время.

У меня, как у инженера-программиста, к этому моменту возникло еще больше вопросов. Что такое открытый формат таблицы? Хорошо, если мне нужен SQL с моими большими данными, не должен ли я просто использовать какую-нибудь базу данных, например PostgreSQL? В чем подвох? И что все эти инструменты перечислены?

Я начал с ввода что такое формат открытой таблицы в Google. И красивое объяснение дано здесь. По сути, вы хотите иметь в своем озере данных функции, подобные базе данных. Например, мы хотели бы иметь поддержку ACID. Итак, что такое озеро данных?

Озеро данных — это репозиторий для больших объемов данных, в каком бы формате вы их ни хранили — в структурированном, полуструктурированном или неструктурированном (например, в файлах). Итак, в основном вы хотите иметь возможность запрашивать ваши неструктурированные данные, как если бы они были загружены в базу данных.

Предпосылки

Итак, чтобы использовать Iceberg, нам нужно кое-что знать о вычислительном движке, который он использует. Поскольку Spark — один из наиболее популярных инструментов и движок, поддерживающий большинство функций, давайте использовать Spark. У меня есть некоторый опыт работы со Spark, когда я читал книгу (Анализ данных с помощью Python и PySpark) и пытался настроить автономный кластер Spark (здесь) и Hadoop Yarn (здесь).

Я знаю о Spark как о механизме анализа больших данных. По сути, если у вас есть большие объемы данных, которые необходимо проанализировать, Spark — проверенный и безопасный вариант. Но как Айсберг вписывается в картину со Спарком?

Если вы более осведомлены или нетерпеливы, чем я, вам, вероятно, следует запустить настройку докера, представленную в этом репозитории GitHub, и просто войти в нее. Однако в таких ситуациях я всегда вспоминаю цитату из статьи г-на Сорвы Notional Machines and Introductory Programming Education:

… учащимся необходимо понимание «на один уровень ниже» основного целевого уровня (программного кода), которое подходит для объяснения явлений на целевом уровне.

Имея это в виду, я решил попробовать настроить среду самостоятельно 😊.

Настройка среды

Я хотел бы разделить этот раздел на несколько этапов. Также вы можете найти код на GitHub здесь.

«Копирование» файлов докеров с GitHub

Я хотел попытаться включить то, что я узнаю, в существующие знания, которые у меня есть. Я хотел знать:

  1. как Iceberg вписывается в Spark? Это фреймворк поверх Spark? Что-то другое?
  2. может ли он работать удаленно из Spark? Как другой сервис, который может обмениваться данными через порт?
  3. как данные попадают в эти так называемые таблицы Iceberg? Что происходит тогда?
  4. что означало бы изменение данных в этих таблицах?
  5. Могу ли я использовать уже подготовленный автономный кластер?

Я начал с того, что зашел в репозиторий GitHub, о котором я упоминал, и просмотрел файлы docker compose и Dockerfile. Я решил скопировать части, которые мне (думаю) нужны, т.е.

FROM pyspark as spark-iceberg

# Download iceberg spark runtime
RUN curl https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.3_2.12/1.1.0/iceberg-spark-runtime-3.3_2.12-1.1.0.jar -Lo /opt/spark/jars/iceberg-spark-runtime-3.3_2.12-1.1.0.jar

ENV TABULAR_VERSION=0.50.4

RUN curl https://tabular-repository-public.s3.amazonaws.com/releases/io/tabular/tabular-client-runtime/${TABULAR_VERSION}/tabular-client-runtime-${TABULAR_VERSION}.jar -Lo /opt/spark/jars/tabular-client-runtime-${TABULAR_VERSION}.jar

# Download Java AWS SDK
RUN curl https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.17.257/bundle-2.17.257.jar -Lo /opt/spark/jars/bundle-2.17.257.jar

# Download URL connection client required for S3FileIO
RUN curl https://repo1.maven.org/maven2/software/amazon/awssdk/url-connection-client/2.17.257/url-connection-client-2.17.257.jar -Lo /opt/spark/jars/url-connection-client-2.17.257.jar

# Install AWS CLI
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" \
 && unzip awscliv2.zip \
 && sudo ./aws/install \
 && rm awscliv2.zip \
 && rm -rf aws/

# Add iceberg spark runtime jar to IJava classpath
ENV IJAVA_CLASSPATH=/opt/spark/jars/*

RUN mkdir -p /home/iceberg/localwarehouse /home/iceberg/notebooks /home/iceberg/warehouse /home/iceberg/spark-events /home/iceberg

ARG jupyterlab_version=3.6.1

RUN apt-get update -y && \
    apt-get install -y python3-pip python3-dev && \
    pip3 install --upgrade pip && \
    pip3 install wget jupyterlab==${jupyterlab_version}

# Add a notebook command
RUN echo '#! /bin/sh' >> /bin/notebook \
 && echo 'export PYSPARK_DRIVER_PYTHON=jupyter' >> /bin/notebook \
 && echo "export PYSPARK_DRIVER_PYTHON_OPTS=\"lab --notebook-dir=/home/iceberg/notebooks --ip='0.0.0.0' --NotebookApp.token='' --port=8888 --no-browser --allow-root\"" >> /bin/notebook \
 && echo 'pyspark' >> /bin/notebook \
 && chmod u+x /bin/notebook


ENTRYPOINT ["./entrypoint.sh"]
CMD ["notebook"]

Что касается референтного репозитория, я решил использовать лабораторию jupyter вместо ноутбука jupyter. Так же убрал загрузку файлов и некоторые другие команды. Файл spark-defaults.conf выглядел так:

spark.master                           spark://spark-iceberg:7077
spark.eventLog.enabled                 true
spark.eventLog.dir                     /opt/spark/spark-events
spark.history.fs.logDirectory          /opt/spark/spark-events
spark.sql.catalogImplementation        in-memory
spark.sql.extensions                   org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

Очевидно, я не настроил все, что указано в документах. Я просто добавил расширение айсберга. И это вместе со следующей строкой из документов Quickstart:

spark-sql —packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.1.0

сказал мне, что Iceberg на самом деле является пакетом для Spark. Итак, вернемся к вопросам:

  1. как Iceberg вписывается в Spark? Это фреймворк поверх Spark? Что-то другое? — это пакет
  2. может ли он работать удаленно из Spark? Как еще один сервис, который может обмениваться данными через порт? — Не думаю, что может.

Файл компоновки докера на этом этапе:

version: '3.8'

services:
  spark-iceberg:
    image: spark-iceberg
    container_name: spark-iceberg
    build: ./spark
    entrypoint: [ './entrypoint.sh', 'master' ]
    volumes:
      - ./warehouse:/home/iceberg/warehouse
      - ./notebooks:/home/iceberg/notebooks
      - ./data:/opt/spark/data
      - ./spark_apps:/opt/spark/apps
      - spark-logs:/opt/spark/spark-events
    ports:
      - '8888:8888'
      - '8080:8080'
      - '10000:10000'
      - '10001:10001'


  spark-worker:
    container_name: spark-worker
    image: spark-iceberg
    entrypoint: [ './entrypoint.sh', 'worker' ]
    depends_on:
      - spark-iceberg
    env_file:
      - spark/.env
    volumes:
      - ./data:/opt/spark/data
      - ./spark_apps:/opt/spark/apps
      - spark-logs:/opt/spark/spark-events
    ports:
      - '8081:8081'

  spark-history-server:
    container_name: spark-history
    image: spark-iceberg
    entrypoint: [ './entrypoint.sh', 'history' ]
    depends_on:
      - spark-iceberg
    env_file:
      - spark/.env
    volumes:
      - spark-logs:/opt/spark/spark-events
    ports:
      - '18080:18080'


volumes:
  spark-logs:

Запуск первого примера

Создайте образы докеров с помощью docker compose build и запустите с помощью docker compose up. И перейдите к https://localhost:8888, и я приготовил небольшой блокнот, чтобы попытаться сохранить некоторые данные в виде таблицы. Вот соответствующие части приложения:

data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])

df = spark.createDataFrame(data=data, schema=schema)
df.writeTo("db.test").create()

Предлагаемый подход к написанию таблиц в документации Iceberg заключается в использовании V2 DataFrame API. Который я использую в приведенном выше примере. После выполнения этой строки df.writeTo("db.test").create() я получил следующий вывод:

23/03/05 17:23:04 ПРЕДУПРЕЖДЕНИЕ ResolveSessionCatalog: будет создана таблица Hive serde, так как не указан поставщик таблиц. Вы можете установить для spark.sql.legacy.createHiveTableByDefault значение false, чтобы вместо этого была создана собственная таблица источника данных.

AnalysisException: поддержка Hive требуется для CREATE Hive TABLE (AS SELECT);
'CreateTable `db`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, ErrorIfExists
+- LogicalRDD [имя#0, отчество#1, фамилия#2, id#3, пол № 4, зарплата № 5], ложь

У меня нет опыта работы с Hive. Тааак... Я немного погуглил про Hive и пришел к выводу, что пока не хочу его изучать. По сути, это уровень хранения поверх Hadoop с собственным языком запросов [3]. Я узнал, что в spark 3.0.0-preview2 spark.sql.legacy.createHiveTableByDefault было установлено на false, о чем свидетельствует документация здесь. Однако, когда я запустил искровую оболочку внутри докера и проверил значение конфигурации, оказалось, что true:

>>> spark.conf.get("spark.sql.legacy.createHiveTableByDefault")
'true'

Похоже, что значение по умолчанию изменилось в какой-то момент. Поэтому Spark пытается создать таблицу Hive, но Hive нам не нужен, и углубляться в него было бы не в рамках этой истории.

В конце этой части я создал коммит git под названием: Medium — setting up environment.

Настройка каталога

Итак, предыдущее не сработало… сюрприз-сюрприз. Следующим моим побуждением было попытаться настроить каталог, который будет использовать Iceberg. Из документации здесь Iceberg поддерживает каталог на основе каталогов. Это казалось достаточно простым. Итак, я изменил свой spark-defaults.conf:

spark.master                           spark://spark-iceberg:7077
spark.eventLog.enabled                 true
spark.eventLog.dir                     /opt/spark/spark-events
spark.history.fs.logDirectory          /opt/spark/spark-events
spark.sql.extensions                   org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.data                 org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.data.type            hadoop
spark.sql.catalog.data.warehouse       /home/iceberg/warehouse
spark.sql.defaultCatalog               data
spark.sql.catalogImplementation        in-memory

Я добавил новый каталог с именем data, который должен быть SparkCatalog типа hadoop, что должно позволять хранить данные в файловой системе. Я также настроил каталог для хранения данных и установил каталог по умолчанию data.

Когда мы теперь запускаем нашу записную книжку, в основном последнюю ячейку, кажется, что все было успешно завершено:

Том /home/iceberg/warehouse сопоставляется с местоположением ./warehouse в docker compose, поэтому, когда мы смотрим на этот каталог, мы видим это:

Итак, метаданные сохраняются, но данные, похоже, отсутствуют. Если мы завершим работающее ядро ​​или приложение и запустим другой блокнот: iceberg-read-data, мы увидим, что получаем данные:

Теперь, как это может быть? Если мы войдем в контейнер рабочего докера и выполним следующее:

root@e80874902d50:/opt/spark# ls -al /home/iceberg/warehouse/db/test/data/
total 40
drwxr-xr-x 2 root root 4096 Mar  7 20:33 .
drwxr-xr-x 3 root root 4096 Mar  7 20:33 ..
-rw-r--r-- 1 root root   24 Mar  7 20:33 .00000-0-8eff3bba-21dc-46e8-b20e-58b81b1640ea-00001.parquet.crc
-rw-r--r-- 1 root root   24 Mar  7 20:33 .00001-1-7e33edcb-f5e4-486f-9f9b-3f811375aa2f-00001.parquet.crc
-rw-r--r-- 1 root root   24 Mar  7 20:33 .00002-2-de082f5c-147b-49a4-b316-6fcfa8acbd92-00001.parquet.crc
-rw-r--r-- 1 root root   24 Mar  7 20:33 .00003-3-9511efc6-1658-4a99-8d01-72b9184025df-00001.parquet.crc
-rw-r--r-- 1 root root 1662 Mar  7 20:33 00000-0-8eff3bba-21dc-46e8-b20e-58b81b1640ea-00001.parquet
-rw-r--r-- 1 root root 1670 Mar  7 20:33 00001-1-7e33edcb-f5e4-486f-9f9b-3f811375aa2f-00001.parquet
-rw-r--r-- 1 root root 1691 Mar  7 20:33 00002-2-de082f5c-147b-49a4-b316-6fcfa8acbd92-00001.parquet
-rw-r--r-- 1 root root 1705 Mar  7 20:33 00003-3-9511efc6-1658-4a99-8d01-72b9184025df-00001.parquet

мы видим, что файлы находятся в рабочем контейнере. Хорошо, что я забыл сделать на этом этапе, так это сопоставить каталог хранилища рабочего процесса с тем же каталогом хранилища, который использует главный узел. Если мы снова попробуем первый блокнот (с небольшим изменением использования createOrReplace вместо create), мы увидим, что данные находятся в структуре каталогов:

Для этой части я создал коммит: Medium — configuring catalog. Если вы редактор, который может прочитать .parquet файлов, вы можете открыть один из файлов данных и увидеть что-то вроде этого:

Дополнительное примечание

Пока я работал над этой историей, я запустил пример с GitHub вместе со своей настройкой. Я заметил, что сеанс искры ноутбука Jupyter потребляет все ресурсы драйвера. Это означает, что вы не можете отправлять другие задания или другие записные книжки во время работы. И я спрашивал себя: «Почему это было?».

Ну а при настройке на спарке автономный режим кластера с рабочими контейнерами, как уже было сказано, драйвер автоматически жрет все ресурсы. Для мастера задан URL-адрес: spark://spark-iceberg:7077.Существует способ запустить блокнот в локальном режиме. Таким образом драйвер не будет потреблять все ресурсы, но вам может понадобиться запустить рабочий процесс в главном контейнере. Блокнот не будет использовать рабочие контейнеры.

Вы можете сделать это, изменив строку в Dockerfile на следующую:

# Add a notebook command
RUN echo '#! /bin/sh' >> /bin/notebook \
 && echo 'export PYSPARK_DRIVER_PYTHON=jupyter' >> /bin/notebook \
 && echo "export PYSPARK_DRIVER_PYTHON_OPTS=\"lab --notebook-dir=/home/iceberg/notebooks --ip='0.0.0.0' --NotebookApp.token='' --port=8888 --no-browser --allow-root\"" >> /bin/notebook \
 && echo 'pyspark --master local[*]' >> /bin/notebook \
 && chmod u+x /bin/notebook

Это запустит искру локально с «столько рабочих потоков, сколько есть логических ядер» при запуске ноутбука [5]. Приложение не будет отображаться в пользовательском интерфейсе мастера искры.

Краткое содержание

Эта история началась как попытка объяснить, какие проблемы у меня были с пониманием того, что такое Apache Iceberg на самом деле.

Apache Iceberg — это пакет, который можно установить вместе со Spark, чтобы обеспечить открытый формат таблиц для озер данных. Это позволяет нам иметь поддержку ACID среди других функций, которые могут вам понадобиться в озере данных. Подробнее читайте здесь.

В этой истории мы настроили каталог данных и использовали Apache Iceberg с минимальной конфигурацией для хранения данных в локальной файловой системе.

Я не ответил на все вопросы, которые у меня были. Тем не менее, я планирую продолжить играть с Apache Iceberg, чтобы шаг за шагом отвечать на них.

Код находится на GitHub.

Рекомендации

  1. https://github.com/tabular-io/docker-spark-iceberg
  2. https://iceberg.apache.org/docs/latest/spark-configuration/#catalogs
  3. https://jaceklaskowski.medium.com/why-is-spark-sql-so-obsessed-with-hive-after-just-a-single-day-with-hive-289e75fa6f2b
  4. https://spark.apache.org/docs/3.0.0-preview2/sql-migration-guide.html#upgrading-from-spark-sql-24-to-30
  5. https://stackoverflow.com/questions/32356143/what-does-setmaster-local-mean-in-spark