Моделирование данных, Python, DAG, форматы файлов больших данных, затраты… Он охватывает все
Это реальный сценарий, когда мне поручили создать масштабируемый конвейер машинного обучения с необработанными данными о событиях, отправляемыми из мобильного приложения.
История предлагает набор передовых методов, которые могут быть полезны для подготовки к интервью.
Узнайте, как работать с необработанными данными, преобразовывать их, обогащать для подготовки к машинному обучению, экспортировать их в озеро данных и архивировать необработанные данные, когда они больше не нужны.
Все, что описано в этой статье, предполагает, что у вас есть учетная запись Google Cloud Platform (GCP) и вы знакомы с основными понятиями Python и хранилищами данных.
Если нет, не волнуйтесь. Я постараюсь объяснить это подробно.
Конвейер данных
Конвейеры данных не всегда просты, и я уже писал об этом ранее.
Вот как это работает в реальной жизни.
Давайте представим, что у нас есть огромное количество необработанных данных о событиях (больших данных), поступающих из нашего мобильного приложения. Само приложение создано для IOS и Android, и мы подключаем его к Google Firebase (Google Analytics 4) для сбора данных о взаимодействии с пользователями.
Теперь мы хотим использовать этот набор данных, чтобы активировать поведение пользователей или прогнозы, т. е. отток пользователей, персоналии, уведомления и т. д. В какой-то момент мне пришлось выгрузить около 150 ТБ данных в архив облачного хранилища, чтобы оптимизировать затраты на хранилище BigQuery. Мы также хотели бы преобразовать необработанные данные о событиях и создать набор данных для конвейера машинного обучения (ML).
Данные будут передаваться, как описано ниже:
- Мобильное приложение отправляет данные о событиях в Firebase
- Firebase выводит данные в набор данных BigQuery
- Мы проверяем качество и преобразуем данные с помощью SQL, чтобы создать новый набор данных для машинного обучения.
- Мы экспортируем набор данных ML в корзину Cloud Storage (стандартный класс)
- Архивируем необработанные данные о событиях в Cloud Storage (тип архива)
Иногда конвейеры данных немного нетрадиционны, как в этом случае. Типичный конвейер данных начинается в озере данных.
Озера данных дешевле в эксплуатации по сравнению с решениями для хранилищ данных.
Однако этот конкретный конвейер начинается с экспорта событий Firebase в BigQuery. Это естественная интеграция данных, существующая в экосистеме GCP. Никаких знаний в области кодирования не требуется, и мы можем подключить его без проблем.
Зачем экспортировать данные в архив или облачное хранилище?
До определенного момента мы никогда не думали об экспорте данных из хранилища данных как таковом. Хранилище там уже оптимизировано и через 90 дней все таблицы и партиции переходят в класс хранения near-line, который на 50% дешевле стандартного. strong> (или активное хранилище в терминах BigQuery).
Однако эти `events_`
шаблонные таблицы очень тяжелые, и через пару лет это может привести к созданию огромного набора данных с петабайтами данных.
Ниже приведен пример долгосрочных затрат на хранение BigQuery. Несмотря на то, что класс хранилища был изменен через 90 дней, потенциал для оптимизации затрат сохраняется:
Зачем экспортировать данные для машинного обучения?
Мы хотели бы обучить специальную модель машинного обучения с помощью Spark/PySpark. Конечно, вы можете справедливо отметить, что у BigQuery есть собственные встроенные возможности машинного обучения.
Однако этого может быть недостаточно, если мы используем модель ценообразования с фиксированной ставкой, а нашему приложению для обучения модели требуется больше вычислительной мощности.
В этом случае нам понадобится что-то, что хорошо масштабируется и может работать с данными озера данных. В идеале он должен быть разделен и иметь определенный макет разделения, например Hive. О том, как добавить разметку Hive, я писал в этой статье:
Шаг 1. Создайте набор данных ML из данных событий Firebase/Google Analytics 4.
Мы можем использовать общедоступные данные Firebase из `firebase-public-project`
.
Например, у Google есть образец набора данных для мобильного игрового приложения под названием Flood It! (Android, iOS) и вы можете найти его здесь: https://console.cloud.google.com/bigquery?p=firebase-public-project&d=analytics_153293282&t=events_20181003&page=table&_ga=2.124992394.-1293267939.1657258995
Этот набор данных содержит 5,7 млн событий от более чем 15 000 пользователей. Откройте ссылку выше и нажмите Предварительный просмотр. Запустить Preview на любой таблице ничего не будет стоить:
Он выглядит довольно просто и содержит всего 9,7 МБ данных. У нас есть пользователи в приложении с идентификаторами их устройств (`user_pseudo_id`
), а также параметры событий из данных о вовлечении пользователей.
Одним из основных требований к данным машинного обучения будет наличие этого набора данных, доступного извне в озере данных, разделенного на
`date`
и`event_name_category`
.
В этом случае команде специалистов по обработке и анализу данных не нужно будет загружать все необработанные данные о событиях для обучения модели, и они смогут извлекать только необходимые типы событий.
Мы хотели бы создать новую таблицу, используя оператор DML и необходимые преобразования. Мы будем использовать необработанные данные только один раз для создания этого набора данных. После этого необработанные данные отправляются в архив, а данные машинного обучения передаются в посадочную область машинного обучения в Cloud Storage.
Во время этой операции мы собираемся `unnest`
нужных нам `event_params`
и `user_properties`
. Давайте рассмотрим этот пример только для одного дня данных и только для двух событий, то есть `event_name in ('use_extra_steps', 'completed_5_levels')`
. Столбец разделения должен быть полем верхнего уровня.
К сожалению, мы не можем использовать конечное поле из RECORD ( STRUCT ) в качестве столбца разделения, т.е.
`partition by (dt, event_category)`
не будет работать.
Однако мы работаем только с данными за один день (таблица с подстановочными знаками), поэтому мы можем просто разбить их на `event_category`
и экспортировать в хранилище, т.е.
gs://firebase-events-export/public-project/dt=2018-10-02/category=1/partitionKey
После этого мы можем создать собственный скрипт для перебора категорий и экспорта данных в озеро данных with `category=1/partitionKey`
.
Вот пример скрипта для этого. Не стесняйтесь добавлять любые вложенные параметры из событий и т. д. Можно запланировать ежедневное выполнение только один раз и сэкономить много денег:
create table if not exists `your-project.analytics.ml_data_20181003` ( dt DATE , event_timestamp TIMESTAMP , user_id STRING , user_pseudo_id STRING , platform STRING , language STRING , country STRING , event_name STRING , use_extra_steps_virtual_currency_name STRING , plays_quickplay STRING , event_category INT64 ) partition by range_bucket(event_category, generate_array(0, 10, 1)) cluster by user_id, user_pseudo_id ; INSERT `your-project.analytics.ml_data_20181003` with event_category as ( select 1 as event_category ,'use_extra_steps' as event_name union all select 2 as event_category ,'completed`event_name_category`
levels' as event_name ) ,data as ( SELECT PARSE_DATE('%Y%m%d', event_date) as dt , timestamp_micros(event_timestamp) as event_timestamp , user_id , user_pseudo_id , platform , device.language , geo.country , event_name , IF(user_properties.key = 'plays_quickplay', user_properties.value.string_value, NULL) as plays_quickplay , IF(event_params.key = 'virtual_currency_name', event_params.value.string_value, NULL) as use_extra_steps_virtual_currency_name FROM `firebase-public-project.analytics_153293282.events_*` , UNNEST(event_params) AS event_params , UNNEST(user_properties) AS user_properties WHERE _TABLE_SUFFIX >= '20181003' and _TABLE_SUFFIX <= '20181003' and event_name in ('use_extra_steps', 'completed`event_name_category`
levels') ) select d.* ,e.event_category from data d join event_category e on e.event_name = d.event_name order by user_pseudo_id , event_name , event_timestamp ; select * from `your-project.analytics.ml_data_20181003` where event_category = 1 ; select * from `your-project.analytics.ml_data_20181003` where event_category = 2 ;
В результатах запроса вы увидите, что мы можем использовать `event_category`
в качестве раздела, чтобы избежать полного сканирования таблицы в будущем.
Мы обработали необработанные данные только один раз и теперь можем создать внешнее секционированное ведро озера данных с макетом раздела Hive.
Мы можем перебрать каждую таблицу с подстановочными знаками и каждый раздел `event_category`
, чтобы экспортировать данные, если это необходимо.
Мы знаем, что некоторые операции позволяют добавлять к идентификатору таблицы декоратор раздела, например sample_table$20190123
. Итак, в нашем случае это будет:
bq head --max_rows=10 'your-project:analytics.ml_data_20181002$1'
Мы можем использовать его для экспорта данных в озеро данных с разделом `category`
, т. е. gs://firebase-events-archive-avro/dt=2018–10–03/category=1/partitionKey/events_*.avro
.
Я объясню, как это сделать в следующем шаге.
Что такое макет секционирования Hive?
Это просто способ форматирования имен объектов в озере данных.
Если позже мы решим использовать внешние секционированные данные, мы захотим сохранить их в облачном хранилище, используя макет секционирования Hive по умолчанию.
В этом случае мы можем создавать внешне секционированные таблицы в файлах Avro, CSV, JSON, ORC и Parquet и
использовать озеро данных в качестве исходного слоя для инструментов Hadoop и EMR.
Пример:
gs://events-export-avro/public-project/avro_external_test/dt=2018-10-01/lang=en/partitionKey gs://events-export-avro/public-project/avro_external_test/dt=2018-10-02/lang=fr/partitionKey
Как выбрать правильный формат файла больших данных?
Avro, CSV, JSON, ORC или паркет?
Может быть трудно определить, какой формат будет лучше другого, потому что каждый предлагает преимущества и разные формы сжатия.
Когда нам нужна лучшая степень сжатия, нам больше подойдет ORC или Parquet. На самом деле это зависит от того, какой инструмент мы собираемся использовать для выполнения аналитических запросов к нашим данным. ORC лучше оптимизирован для рабочих нагрузок среды HIVE и Pig, тогда как Parquet является форматом файла по умолчанию для Искра.
Ранее я писал об этом здесь:
Когда все поля должны быть доступны, хранилище на основе строк делает AVRO лучшим вариантом. Он оказался очень быстрым с запросами с интенсивным записью и имеет расширенную поддержку эволюции схемы. Поэтому это может быть лучшим выбором для зоны приземления и загрузки данных.
Шаг 2. Экспорт данных в облачное хранилище
Во многих современных решениях для хранилищ данных есть функция экспорта данных в хранилище с помощью SQL. Итак, теоретически мы могли бы сделать что-то подобное, используя BigQuery и общедоступный проект Firebase:
EXPORT DATA OPTIONS ( uri = 'gs://firebase-events-export/public-project/dt=2018-10-01/partitionKey/*.json', format = 'JSON', overwrite = true ) AS ( SELECT * FROM `firebase-public-project.analytics_153293282.events_20181001` );
Параметр `uri`
определяет схему выходного хранилища, т. е. `uri = 'gs://firebase-events-export/public-project/dt=2018–10–01/partitionKey/*.avro'`
Однако есть одна вещь, которую следует учитывать... Когда мы используем SQL и `SELECT * …`
, он выполнит полное сканирование этой таблицы.
Так что «экспорт» в этом случае не совсем бесплатный.
На самом деле это распространенное заблуждение, т. е. в документации BigQuery это бесплатно, но нам придется платить за запрос, который мы используем в операции экспорта данных.
Как бесплатно экспортировать данные из BigQuery
Мы можем использовать общий пул для бесплатного экспорта данных из набора данных BigQuery в облачное хранилище. Давайте напишем код на Python. Ссылку на разделяемый пул и экспорт данных я положу внизу этой статьи.
Мы хотели бы создать простой микросервис, который работает в направленном ациклическом графе (DAG), и, возможно, запланировать экспорт данных через 60 дней.
Что такое ДАГ?
За этим термином стоит множество умных математических терминов, но в инженерии данных мы имеем в виду конвейер данных, в котором некоторые действия вызываются некоторыми событиями или результатами других действий.
Папка нашего приложения будет выглядеть так:
. ├── stack └── bq_extractor ├── app.py ├── bq_extractor_env ├── event.json └── requirements.txt
Давайте создадим виртуальную среду со всеми необходимыми библиотеками, которые мы собираемся использовать. В нашем `requirements.txt`
будут установлены следующие библиотеки Python:
google-auth==2.15.0 google-cloud-bigquery==3.4.0 requests==2.28.1 pyyaml==6.0 python-lambda-local==0.1.13
Давайте установим их.
cd stack cd bq_extractor virtualenv bq_extractor_env source bq_extractor_env/bin/activate pip install -r requirements.txt
Теперь давайте создадим наш микросервис. Я быстро набросал этот фрагмент ниже для этой статьи. Не стесняйтесь изменять код в соответствии с вашими потребностями. Он будет использовать библиотеки `google`
для аутентификации клиента BigQuery и запуска задания `export`
.
# https://googleapis.dev/python/bigquery/latest/index.html import json import requests from datetime import datetime, date, timedelta from google.api_core import retry from google.cloud import bigquery from google.oauth2 import service_account # Test your service locally by ruunning # python-lambda-local -f lambda_handler -t 10 app.py event.json # It should be able to do a request response = requests.get('https://api.github.com') print(response) # Paste your JSON service account credentials here: service_acount_str = { "type": "service_account", "project_id": "your-project", "private_key_id": "", "private_key": "-----BEGIN PRIVATE KEY----...\n-----END PRIVATE KEY-----\n", "client_email": "[email protected]", "client_id": "123", "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/bigquery-adminsdk%40client.iam.gserviceaccount.com" } credentials = service_account.Credentials.from_service_account_info(service_acount_str) # ? https://googleapis.dev/python/google-api-core/latest/auth.html#overview print(credentials.project_id) # Simple function to check connectivity: def bigquery_hello(txt): client = bigquery.Client(credentials=credentials, project=credentials.project_id) QUERY = ('SELECT "{} nice to meet you";'.format(txt)) query_job = client.query(QUERY) # API request rows = query_job.result() # Waits for query to finish greet = list(rows)[0][0] return greet # Main helper function def export_table_to_storage(table_name, bucket_partition): # Connect to BigQuery to run jobs programmatically client = bigquery.Client(credentials=credentials, project=credentials.project_id) # Public project source and test staging buucket project = 'firebase-public-project' dataset_id = 'analytics_153293282' bucket_name = 'firebase-events-archive-avro' destination_uri = "gs://{}/{}/partitionKey/events_*.avro".format(bucket_name, bucket_partition) dataset_ref = bigquery.DatasetReference(project, dataset_id) table_ref = dataset_ref.table(table_name) job_config = bigquery.job.ExtractJobConfig() # job_config.compression = bigquery.Compression.GZIP # https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.ExtractJobConfig job_config.destination_format = bigquery.DestinationFormat.AVRO job_config.compression = bigquery.Compression.SNAPPY extract_job = client.extract_table( table_ref, destination_uri, # Location must match that of the source table. location="US", job_config=job_config, ) # API request # extract_job.result() # Waits for job to complete. Calling client.extract_table starts the job. No need to wait to finish print("Export table to {}".format(destination_uri)) def lambda_handler(event, context): print(event) start_date = date(2018,9,1) end_date = date(2018,9,3) dates= [start_date+timedelta(days=x) for x in range((end_date-start_date).days)] for dt in dates: table_name = dt.strftime('events_%Y%m%d') partition_name = dt.strftime('dt=%Y-%m-%d') export_table_to_storage(table_name, partition_name) bigquery_message = bigquery_hello('it is ') message = 'Hello {} {}, {}!'.format(event['first_name'], event['last_name'], bigquery_message) return { 'message' : message }
Наш сервис будет подключаться к BigQuery для программного запуска заданий, включая `extract`
.
Давайте сначала создадим нашу корзину облачного хранилища. Мы можем сделать это с помощью веб-консоли или с помощью инструментов командной строки. Если у нас установлен `gsutil`
, запустите это в командной строке:
gsutil mb -c archive -l US-CENTRAL1 -p your-project-name gs://firebase-events-archive-avro
Теперь давайте запустим наш микросервис.
Когда app.py
будет готов, вы можете протестировать его локально:
# Test your service locally by running in command line python-lambda-local -f lambda_handler -t 10 app.py event.json
Давайте перечислим наше ведро, чтобы увидеть, есть ли там данные:
gsutil ls gs://firebase-events-archive-avro/
Используйте этот скрипт bash, чтобы получить все размеры каталогов в ведре
Например, мы можем захотеть проверить, действительно ли операция экспорта работает.
gsutil ls -l gs://firebase-events-archive-avro/ | xargs -I{} gsutil du -sh {}
Как добавить дополнительный раздел ведра с макетом Hive?
Если нам нужно экспортировать данные с дополнительным `category`
ключом раздела, мы можем использовать что-то вроде этого:
category_number = 1 for dt in dates: table_name = dt.strftime('ml_data_%Y%m%d$1') partition_name = dt.strftime('dt=%Y-%m-%d') category = "category={}".format(category_number) export_table_to_storage(table_name, partition_name, category)
Мы можем просто перебрать все даты и все категории, чтобы создать вывод озера данных следующим образом:
`gs://firebase-events-archive-avro/dt=2018–10-03/category=1/partitionKey/events_*.avro`
Результат будет:
Export table to gs://firebase-events-archive-avro/dt=2018-10-03/category=1/partitionKey/events_*.avro [root - INFO - 2023-02-01 15:30:22,716] END RequestId: ac865a0b-05dd-4ef3-9b5f-9bfe7f0ce5b7 [root - INFO - 2023-02-01 15:30:22,717] REPORT RequestId: ac865a0b-05dd-4ef3-9b5f-9bfe7f0ce5b7 Duration: 2284.26 ms
Давайте проверим числа на случай, если мы решим загрузить его обратно:
select count(*) from `your-project.analytics.ml_data_20181003` where event_category = 1 ; LOAD DATA INTO your-project.source.ml_data_20181003_1 FROM FILES( format='AVRO', uris = ['gs://firebase-events-archive-avro/dt=2018-10-03/category=1/*'] ) ; select count(*) from your-project.source.ml_data_20181003_1 ;
Заключение
Надеюсь, эта история будет для вас полезной. Это реальный сценарий обработки данных, в котором нам нужно подготовить необработанные данные о событиях и передать их в службу машинного обучения дальше по конвейеру. Моделирование данных — один из важнейших навыков инженера данных. В этой статье рассказывается, как мы применяем его для оптимизации схем наборов данных, разделов и хранилища, когда данные больше не нужны.
Если есть способ сделать это бесплатно, то почему бы и нет?
Мы создали простой микросервис с AWS Lambda для экспорта данных, но мы можем сделать с ним гораздо больше. Мы можем подключить его к шлюзу API, создать еще один веб-сервис для управления конвейерами (например, DataHub), использовать другие события в качестве триггеров и т. д.
После необходимых преобразований DML в наших данных о событиях они сохраняются в озере данных, где другие службы ML могут получить к ним доступ и обрабатывать их более эффективным и масштабируемым способом для обучения моделей машинного обучения.
Репозиторий
https://github.com/mshakhomirov/bigquery_extractor
Рекомендуем прочитать
- https://cloud.google.com/bigquery/docs/managing-partitioned-table-data
- https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client
- https://cloud.google.com/bigquery/docs/external-data-cloud-storage
- https://cloud.google.com/bigquery/docs/hive-partitioned-queries
- https://cloud.google.com/bigquery/docs/exporting-data
- https://cloud.google.com/bigquery/quotas#export_jobs