Моделирование данных, Python, DAG, форматы файлов больших данных, затраты… Он охватывает все

Это реальный сценарий, когда мне поручили создать масштабируемый конвейер машинного обучения с необработанными данными о событиях, отправляемыми из мобильного приложения.

История предлагает набор передовых методов, которые могут быть полезны для подготовки к интервью.

Узнайте, как работать с необработанными данными, преобразовывать их, обогащать для подготовки к машинному обучению, экспортировать их в озеро данных и архивировать необработанные данные, когда они больше не нужны.

Все, что описано в этой статье, предполагает, что у вас есть учетная запись Google Cloud Platform (GCP) и вы знакомы с основными понятиями Python и хранилищами данных.

Если нет, не волнуйтесь. Я постараюсь объяснить это подробно.

Конвейер данных

Конвейеры данных не всегда просты, и я уже писал об этом ранее.



Вот как это работает в реальной жизни.

Давайте представим, что у нас есть огромное количество необработанных данных о событиях (больших данных), поступающих из нашего мобильного приложения. Само приложение создано для IOS и Android, и мы подключаем его к Google Firebase (Google Analytics 4) для сбора данных о взаимодействии с пользователями.

Теперь мы хотим использовать этот набор данных, чтобы активировать поведение пользователей или прогнозы, т. е. отток пользователей, персоналии, уведомления и т. д. В какой-то момент мне пришлось выгрузить около 150 ТБ данных в архив облачного хранилища, чтобы оптимизировать затраты на хранилище BigQuery. Мы также хотели бы преобразовать необработанные данные о событиях и создать набор данных для конвейера машинного обучения (ML).

Данные будут передаваться, как описано ниже:

  • Мобильное приложение отправляет данные о событиях в Firebase
  • Firebase выводит данные в набор данных BigQuery
  • Мы проверяем качество и преобразуем данные с помощью SQL, чтобы создать новый набор данных для машинного обучения.
  • Мы экспортируем набор данных ML в корзину Cloud Storage (стандартный класс)
  • Архивируем необработанные данные о событиях в Cloud Storage (тип архива)

Иногда конвейеры данных немного нетрадиционны, как в этом случае. Типичный конвейер данных начинается в озере данных.

Озера данных дешевле в эксплуатации по сравнению с решениями для хранилищ данных.

Однако этот конкретный конвейер начинается с экспорта событий Firebase в BigQuery. Это естественная интеграция данных, существующая в экосистеме GCP. Никаких знаний в области кодирования не требуется, и мы можем подключить его без проблем.

Зачем экспортировать данные в архив или облачное хранилище?

До определенного момента мы никогда не думали об экспорте данных из хранилища данных как таковом. Хранилище там уже оптимизировано и через 90 дней все таблицы и партиции переходят в класс хранения near-line, который на 50% дешевле стандартного. strong> (или активное хранилище в терминах BigQuery).

Однако эти `events_` шаблонные таблицы очень тяжелые, и через пару лет это может привести к созданию огромного набора данных с петабайтами данных.

Ниже приведен пример долгосрочных затрат на хранение BigQuery. Несмотря на то, что класс хранилища был изменен через 90 дней, потенциал для оптимизации затрат сохраняется:

Зачем экспортировать данные для машинного обучения?

Мы хотели бы обучить специальную модель машинного обучения с помощью Spark/PySpark. Конечно, вы можете справедливо отметить, что у BigQuery есть собственные встроенные возможности машинного обучения.

Однако этого может быть недостаточно, если мы используем модель ценообразования с фиксированной ставкой, а нашему приложению для обучения модели требуется больше вычислительной мощности.

В этом случае нам понадобится что-то, что хорошо масштабируется и может работать с данными озера данных. В идеале он должен быть разделен и иметь определенный макет разделения, например Hive. О том, как добавить разметку Hive, я писал в этой статье:



Шаг 1. Создайте набор данных ML из данных событий Firebase/Google Analytics 4.

Мы можем использовать общедоступные данные Firebase из `firebase-public-project`.

Например, у Google есть образец набора данных для мобильного игрового приложения под названием Flood It! (Android, iOS) и вы можете найти его здесь: https://console.cloud.google.com/bigquery?p=firebase-public-project&d=analytics_153293282&t=events_20181003&page=table&_ga=2.124992394.-1293267939.1657258995

Этот набор данных содержит 5,7 млн ​​событий от более чем 15 000 пользователей. Откройте ссылку выше и нажмите Предварительный просмотр. Запустить Preview на любой таблице ничего не будет стоить:

Он выглядит довольно просто и содержит всего 9,7 МБ данных. У нас есть пользователи в приложении с идентификаторами их устройств (`user_pseudo_id`), а также параметры событий из данных о вовлечении пользователей.

Одним из основных требований к данным машинного обучения будет наличие этого набора данных, доступного извне в озере данных, разделенного на `date` и `event_name_category`.

В этом случае команде специалистов по обработке и анализу данных не нужно будет загружать все необработанные данные о событиях для обучения модели, и они смогут извлекать только необходимые типы событий.

Мы хотели бы создать новую таблицу, используя оператор DML и необходимые преобразования. Мы будем использовать необработанные данные только один раз для создания этого набора данных. После этого необработанные данные отправляются в архив, а данные машинного обучения передаются в посадочную область машинного обучения в Cloud Storage.

Во время этой операции мы собираемся `unnest` нужных нам `event_params` и `user_properties`. Давайте рассмотрим этот пример только для одного дня данных и только для двух событий, то есть `event_name in ('use_extra_steps', 'completed_5_levels')`. Столбец разделения должен быть полем верхнего уровня.

К сожалению, мы не можем использовать конечное поле из RECORD ( STRUCT ) в качестве столбца разделения, т.е. `partition by (dt, event_category)` не будет работать.

Однако мы работаем только с данными за один день (таблица с подстановочными знаками), поэтому мы можем просто разбить их на `event_category` и экспортировать в хранилище, т.е.

gs://firebase-events-export/public-project/dt=2018-10-02/category=1/partitionKey

После этого мы можем создать собственный скрипт для перебора категорий и экспорта данных в озеро данных with `category=1/partitionKey`.

Вот пример скрипта для этого. Не стесняйтесь добавлять любые вложенные параметры из событий и т. д. Можно запланировать ежедневное выполнение только один раз и сэкономить много денег:

create table if not exists `your-project.analytics.ml_data_20181003` (
 
  dt                DATE 
, event_timestamp   TIMESTAMP
, user_id           STRING
, user_pseudo_id    STRING
, platform          STRING
, language          STRING
, country           STRING
, event_name        STRING
, use_extra_steps_virtual_currency_name        STRING
, plays_quickplay       STRING
, event_category        INT64
)
partition by range_bucket(event_category, generate_array(0, 10, 1))
cluster by user_id, user_pseudo_id
;

INSERT `your-project.analytics.ml_data_20181003`

with 
event_category as (
   select 
      1 as event_category
      ,'use_extra_steps' as event_name
   union all 

   select
      2 as event_category
      ,'completed`event_name_category`levels' as event_name
)

,data as (
SELECT
   PARSE_DATE('%Y%m%d', event_date) as dt
 , timestamp_micros(event_timestamp) as event_timestamp
 , user_id            
 , user_pseudo_id     
 , platform           
 , device.language    
 , geo.country        
 , event_name         
 , IF(user_properties.key = 'plays_quickplay', user_properties.value.string_value, NULL)                      as plays_quickplay
 , IF(event_params.key = 'virtual_currency_name', event_params.value.string_value, NULL)                      as use_extra_steps_virtual_currency_name


FROM `firebase-public-project.analytics_153293282.events_*`
     , UNNEST(event_params)    AS event_params
     , UNNEST(user_properties) AS user_properties
WHERE
   _TABLE_SUFFIX >= '20181003'
and _TABLE_SUFFIX <= '20181003'
and event_name in ('use_extra_steps', 'completed`event_name_category`levels')
)
select d.*  ,e.event_category
from data d
join event_category e on e.event_name = d.event_name
order by
    user_pseudo_id
    , event_name
    , event_timestamp
;

select * from `your-project.analytics.ml_data_20181003` where event_category = 1
;
select * from `your-project.analytics.ml_data_20181003` where event_category = 2
;

В результатах запроса вы увидите, что мы можем использовать `event_category` в качестве раздела, чтобы избежать полного сканирования таблицы в будущем.

Мы обработали необработанные данные только один раз и теперь можем создать внешнее секционированное ведро озера данных с макетом раздела Hive.

Мы можем перебрать каждую таблицу с подстановочными знаками и каждый раздел `event_category`, чтобы экспортировать данные, если это необходимо.

Мы знаем, что некоторые операции позволяют добавлять к идентификатору таблицы декоратор раздела, например sample_table$20190123. Итак, в нашем случае это будет:

bq head --max_rows=10 'your-project:analytics.ml_data_20181002$1'

Мы можем использовать его для экспорта данных в озеро данных с разделом `category`, т. е. gs://firebase-events-archive-avro/dt=2018–10–03/category=1/partitionKey/events_*.avro.

Я объясню, как это сделать в следующем шаге.

Что такое макет секционирования Hive?

Это просто способ форматирования имен объектов в озере данных.

Если позже мы решим использовать внешние секционированные данные, мы захотим сохранить их в облачном хранилище, используя макет секционирования Hive по умолчанию.

В этом случае мы можем создавать внешне секционированные таблицы в файлах Avro, CSV, JSON, ORC и Parquet и

использовать озеро данных в качестве исходного слоя для инструментов Hadoop и EMR.

Пример:

gs://events-export-avro/public-project/avro_external_test/dt=2018-10-01/lang=en/partitionKey
gs://events-export-avro/public-project/avro_external_test/dt=2018-10-02/lang=fr/partitionKey

Как выбрать правильный формат файла больших данных?

Avro, CSV, JSON, ORC или паркет?

Может быть трудно определить, какой формат будет лучше другого, потому что каждый предлагает преимущества и разные формы сжатия.

Когда нам нужна лучшая степень сжатия, нам больше подойдет ORC или Parquet. На самом деле это зависит от того, какой инструмент мы собираемся использовать для выполнения аналитических запросов к нашим данным. ORC лучше оптимизирован для рабочих нагрузок среды HIVE и Pig, тогда как Parquet является форматом файла по умолчанию для Искра.

Ранее я писал об этом здесь:



Когда все поля должны быть доступны, хранилище на основе строк делает AVRO лучшим вариантом. Он оказался очень быстрым с запросами с интенсивным записью и имеет расширенную поддержку эволюции схемы. Поэтому это может быть лучшим выбором для зоны приземления и загрузки данных.

Шаг 2. Экспорт данных в облачное хранилище

Во многих современных решениях для хранилищ данных есть функция экспорта данных в хранилище с помощью SQL. Итак, теоретически мы могли бы сделать что-то подобное, используя BigQuery и общедоступный проект Firebase:

EXPORT DATA
  OPTIONS (
    uri = 'gs://firebase-events-export/public-project/dt=2018-10-01/partitionKey/*.json',
    format = 'JSON', 
    overwrite = true
)
AS (
SELECT *
FROM `firebase-public-project.analytics_153293282.events_20181001`
);

Параметр `uri` определяет схему выходного хранилища, т. е. `uri = 'gs://firebase-events-export/public-project/dt=2018–10–01/partitionKey/*.avro'`

Однако есть одна вещь, которую следует учитывать... Когда мы используем SQL и `SELECT * …`, он выполнит полное сканирование этой таблицы.

Так что «экспорт» в этом случае не совсем бесплатный.

На самом деле это распространенное заблуждение, т. е. в документации BigQuery это бесплатно, но нам придется платить за запрос, который мы используем в операции экспорта данных.

Как бесплатно экспортировать данные из BigQuery

Мы можем использовать общий пул для бесплатного экспорта данных из набора данных BigQuery в облачное хранилище. Давайте напишем код на Python. Ссылку на разделяемый пул и экспорт данных я положу внизу этой статьи.

Мы хотели бы создать простой микросервис, который работает в направленном ациклическом графе (DAG), и, возможно, запланировать экспорт данных через 60 дней.

Что такое ДАГ?

За этим термином стоит множество умных математических терминов, но в инженерии данных мы имеем в виду конвейер данных, в котором некоторые действия вызываются некоторыми событиями или результатами других действий.

Папка нашего приложения будет выглядеть так:

.
├── stack
    └── bq_extractor
        ├── app.py
        ├── bq_extractor_env
        ├── event.json
        └── requirements.txt

Давайте создадим виртуальную среду со всеми необходимыми библиотеками, которые мы собираемся использовать. В нашем `requirements.txt` будут установлены следующие библиотеки Python:

google-auth==2.15.0
google-cloud-bigquery==3.4.0
requests==2.28.1
pyyaml==6.0
python-lambda-local==0.1.13

Давайте установим их.

cd stack
cd bq_extractor
virtualenv bq_extractor_env
source bq_extractor_env/bin/activate
pip install -r requirements.txt

Теперь давайте создадим наш микросервис. Я быстро набросал этот фрагмент ниже для этой статьи. Не стесняйтесь изменять код в соответствии с вашими потребностями. Он будет использовать библиотеки `google` для аутентификации клиента BigQuery и запуска задания `export`.

# https://googleapis.dev/python/bigquery/latest/index.html
import json
import requests
from datetime import datetime, date, timedelta
from google.api_core import retry
from google.cloud import bigquery
from google.oauth2 import service_account

# Test your service locally by ruunning
# python-lambda-local -f lambda_handler -t 10 app.py event.json
# It should be able to do a request
response = requests.get('https://api.github.com')
print(response)

# Paste your JSON service account credentials here:
service_acount_str = { "type": "service_account", "project_id": "your-project", "private_key_id": "", "private_key": "-----BEGIN PRIVATE KEY----...\n-----END PRIVATE KEY-----\n", "client_email": "[email protected]", "client_id": "123", "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/bigquery-adminsdk%40client.iam.gserviceaccount.com" }


credentials = service_account.Credentials.from_service_account_info(service_acount_str)
# ? https://googleapis.dev/python/google-api-core/latest/auth.html#overview
print(credentials.project_id)

# Simple function to check connectivity:
def bigquery_hello(txt):
    client = bigquery.Client(credentials=credentials, project=credentials.project_id)
    QUERY = ('SELECT "{} nice to meet you";'.format(txt))
    query_job = client.query(QUERY)  # API request
    rows = query_job.result()  # Waits for query to finish
    greet = list(rows)[0][0]
    return greet

# Main helper function
def export_table_to_storage(table_name, bucket_partition):
    # Connect to BigQuery to run jobs programmatically
    client = bigquery.Client(credentials=credentials, project=credentials.project_id)

    # Public project source and test staging buucket
    project = 'firebase-public-project'
    dataset_id = 'analytics_153293282'
    bucket_name = 'firebase-events-archive-avro'

    destination_uri = "gs://{}/{}/partitionKey/events_*.avro".format(bucket_name, bucket_partition)
    dataset_ref = bigquery.DatasetReference(project, dataset_id)
    table_ref = dataset_ref.table(table_name)
    job_config = bigquery.job.ExtractJobConfig()
    # job_config.compression = bigquery.Compression.GZIP
    # https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.ExtractJobConfig
    job_config.destination_format = bigquery.DestinationFormat.AVRO
    job_config.compression = bigquery.Compression.SNAPPY

    extract_job = client.extract_table(
        table_ref,
        destination_uri,
        # Location must match that of the source table.
        location="US",
        job_config=job_config,
    )  # API request
    # extract_job.result()  # Waits for job to complete. Calling client.extract_table starts the job. No need to wait to finish
    print("Export table to {}".format(destination_uri))

def lambda_handler(event, context):
    print(event)

    start_date = date(2018,9,1)
    end_date = date(2018,9,3)
    
    dates= [start_date+timedelta(days=x) for x in range((end_date-start_date).days)]
    for dt in dates:
        table_name = dt.strftime('events_%Y%m%d')
        partition_name = dt.strftime('dt=%Y-%m-%d')
        export_table_to_storage(table_name, partition_name)

    bigquery_message = bigquery_hello('it is ')

    message = 'Hello {} {}, {}!'.format(event['first_name'], event['last_name'], bigquery_message)  
    return { 
        'message' : message
    }

Наш сервис будет подключаться к BigQuery для программного запуска заданий, включая `extract`.

Давайте сначала создадим нашу корзину облачного хранилища. Мы можем сделать это с помощью веб-консоли или с помощью инструментов командной строки. Если у нас установлен `gsutil`, запустите это в командной строке:

gsutil mb -c archive -l US-CENTRAL1 -p your-project-name gs://firebase-events-archive-avro

Теперь давайте запустим наш микросервис.

Когда app.py будет готов, вы можете протестировать его локально:

# Test your service locally by running in command line
python-lambda-local -f lambda_handler -t 10 app.py event.json

Давайте перечислим наше ведро, чтобы увидеть, есть ли там данные:

gsutil ls gs://firebase-events-archive-avro/

Используйте этот скрипт bash, чтобы получить все размеры каталогов в ведре

Например, мы можем захотеть проверить, действительно ли операция экспорта работает.

gsutil ls -l gs://firebase-events-archive-avro/ | xargs -I{} gsutil du -sh  {}

Как добавить дополнительный раздел ведра с макетом Hive?

Если нам нужно экспортировать данные с дополнительным `category` ключом раздела, мы можем использовать что-то вроде этого:

    category_number = 1
    for dt in dates:
        table_name = dt.strftime('ml_data_%Y%m%d$1')
        partition_name = dt.strftime('dt=%Y-%m-%d')
        category = "category={}".format(category_number)
        export_table_to_storage(table_name, partition_name, category)

Мы можем просто перебрать все даты и все категории, чтобы создать вывод озера данных следующим образом:

`gs://firebase-events-archive-avro/dt=2018–10-03/category=1/partitionKey/events_*.avro`

Результат будет:

Export table to gs://firebase-events-archive-avro/dt=2018-10-03/category=1/partitionKey/events_*.avro
[root - INFO - 2023-02-01 15:30:22,716] END RequestId: ac865a0b-05dd-4ef3-9b5f-9bfe7f0ce5b7
[root - INFO - 2023-02-01 15:30:22,717] REPORT RequestId: ac865a0b-05dd-4ef3-9b5f-9bfe7f0ce5b7  Duration: 2284.26 ms

Давайте проверим числа на случай, если мы решим загрузить его обратно:

select count(*) 
from `your-project.analytics.ml_data_20181003` 
where event_category = 1
;

LOAD DATA INTO your-project.source.ml_data_20181003_1
 FROM FILES(
   format='AVRO',
   uris = ['gs://firebase-events-archive-avro/dt=2018-10-03/category=1/*']
 )
;

select count(*) 
from your-project.source.ml_data_20181003_1
;

Заключение

Надеюсь, эта история будет для вас полезной. Это реальный сценарий обработки данных, в котором нам нужно подготовить необработанные данные о событиях и передать их в службу машинного обучения дальше по конвейеру. Моделирование данных — один из важнейших навыков инженера данных. В этой статье рассказывается, как мы применяем его для оптимизации схем наборов данных, разделов и хранилища, когда данные больше не нужны.

Если есть способ сделать это бесплатно, то почему бы и нет?

Мы создали простой микросервис с AWS Lambda для экспорта данных, но мы можем сделать с ним гораздо больше. Мы можем подключить его к шлюзу API, создать еще один веб-сервис для управления конвейерами (например, DataHub), использовать другие события в качестве триггеров и т. д.

После необходимых преобразований DML в наших данных о событиях они сохраняются в озере данных, где другие службы ML могут получить к ним доступ и обрабатывать их более эффективным и масштабируемым способом для обучения моделей машинного обучения.

Репозиторий

https://github.com/mshakhomirov/bigquery_extractor

Рекомендуем прочитать

  1. https://cloud.google.com/bigquery/docs/managing-partitioned-table-data
  2. https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client
  3. https://cloud.google.com/bigquery/docs/external-data-cloud-storage
  4. https://cloud.google.com/bigquery/docs/hive-partitioned-queries
  5. https://cloud.google.com/bigquery/docs/exporting-data
  6. https://cloud.google.com/bigquery/quotas#export_jobs