Программирование| Большие данные | ПиСпарк
3 способа агрегирования данных в PySpark
Основные агрегаты PySpark объясняются примерами кодирования.
Рекомендуемые курсы по запросу
Несколько моих читателей обратились ко мне с просьбой о курсах по запросу, чтобы узнать больше об Apache Sparkс Python. Вот 3 отличных ресурса, которые я бы порекомендовал:
- Потоковая передача данных с помощью Apache Kafka и Apache Spark Nano-Degree (UDACITY) →Очень качественный курс!
- Нано-диплом инженера данных (UDACITY)
- Spark и Python для больших данных с PySpark (UDEMY)
Еще не являетесь участником Medium? Рассмотрите возможность регистрации по моей реферальной ссылке, чтобы получить доступ ко всему, что может предложить Medium, всего за 5 долларов США в месяц!
Введение
Apache Spark — это механизм обработки данных, исключительно быстро выполняющий агрегацию больших наборов данных. В зависимости от типа выполненной агрегации выходной набор данных обычно будет представлять:
- Меньшая количественность по сравнению с исходным набором данных → это происходит, когда агрегирование применяется к группе параметров.
- Идентичная количество элементов по сравнению с исходным набором данных→ это происходит, когда агрегирование применяется к окну записей.
В этом руководстве я расскажу о трех методах выполнения агрегирования в PySpark DataFrame с использованием Python и объясню, когда имеет смысл использовать каждый из них в зависимости от цели.
Для примеров кодирования я буду использовать вымышленный набор данных, включающий 5 миллионов транзакций продаж по регионам мира. Если вы хотите продолжить, вы можете загрузить CSV-файл с этого веб-сайта.
Исходный набор данных был упрощен, чтобы выглядеть как таблица ниже:
Агрегирование данных в PySpark
В этом разделе я представляю три способа агрегирования данных при работе с PySpark DataFrame.
В следующих фрагментах кода я буду использовать только функцию SUM()
, однако те же рассуждения и синтаксис применимы к функциям MEAN()
, AVG()
, MAX()
, MIN()
, COUNT()
и PIVOT()
.
Метод №1: Использование GroupBy() + функция
Самый простой способ запустить агрегацию в PySpark DataFrame — использовать groupBy()
в сочетании с функцией агрегации.
Этот метод очень похож на использование предложения SQL GROUP BY
, поскольку он эффективно сворачивает входной набор данных по группе измерений, что приводит к выходному набору данных с более низкой детализацией (что означает меньшее количество записей).
Например, если выбрана функция sum()
, синтаксис будет таким:
dataframe.groupBy(‘dimension_1’, 'dimension_2', ...).sum(‘metric_1’)
Возвращаясь к набору данных о продажах, предположим, что задача заключалась в вычислении:
Total Revenue (£M)
отRegion
Total Revenue (£M)
отRegion
иItem Type
В этом случае вы могли бы написать:
Как и в случае с SQL, groupBy()
можно использовать для выполнения агрегатов по нескольким столбцам. Ниже найдите выходной набор данных, агрегированный Region
:
Выход 1
Принимая во внимание, что следующий набор выходных данных агрегирован Region
и Item Type
:
Вывод 2
Когда использовать / избегать →этот метод следует использовать, когда форматирование не столь необходимо, и вы просто хотите запустить быструю агрегацию на лету при изучении набора данных.
Вместо этого его следует избегать, если вы хотите выполнить множественное агрегирование или применить ряд преобразований к агрегированному выводу.
Например, округление агрегированного вывода и переименование столбца во что-то более аккуратное довольно громоздко, так как требует двух отдельных преобразований.
Метод № 2: Использование GroupBy() + AGG()
Другой способ выполнить агрегацию с помощью groupBy()
— это обернуть нужную функцию внутри метода AGG()
.
Что касается METHOD 1
, этот метод также ведет себя аналогично предложению SQL GROUP BY
, поскольку он создает набор данных с меньшим количеством элементов по сравнению с его источником. Однако, как вы убедитесь, это намного удобнее, чем METHOD 1
, когда требуется выполнить несколько преобразований агрегированного вывода.
Опять же, если предположить, что выбрана функция SUM()
, синтаксис будет таким:
dataframe.groupBy('dimension_1', 'dimension_2', ...).agg(sum(“column_name”))
Например, если вы хотите воспроизвести то, что было достигнуто с помощью METHOD 1
в наборах данных о продажах, вы можете написать следующее:
Это снова приводит к двум результатам, один сгруппирован по миру Region
, а другой сгруппирован по Region
и Item Type
:
Выход 1
Вывод 2
Когда использовать / избегать → этот метод должен быть вашим предпочтительным решением при выполнении агрегатов на PySpark DataFrame в рабочей среде. На самом деле, использование AGG()
позволяет вам применять несколько конкатенированных преобразований в строке, делая код более читабельным и лаконичным.
С другой стороны, вам следует избегать использования этого метода, когда ваша цель состоит в том, чтобы выполнять агрегаты, сохраняя нетронутой степень детализации исходного набора данных (это означает, что записи не сворачиваются по группам измерений).
Действительно, это требование приводит к METHOD 3
.
Способ № 3: использование оконной функции
Последний способ агрегирования данных в PySpark DataFrame — применение функции к окну строк.
Это действительно эквивалентно оконной функции SQL, так что, взяв в качестве примера SUM()
, синтаксис будет таким:
# WINDOW DEFINITION Window.partitionBy(‘dimension_1’, 'dimension_2', ...) # DF AGGREGATION USING WINDOW FUNCTION dataframe.withColumn(‘new_column_name’, functions.sum(‘metric_1’)\ .over(Window.partitionBy(‘dimension_1’)))
Как и в SQL, предложение partitionBy
используется вместо groupBy()
для применения функции SUM()
к определенному окну строк. В случае набора данных о продажах вы можете написать:
Как видно из выходных данных ниже, на этот раз столбец Total Revenue (£M)
остался без изменений, а вместо него был рассчитан новый столбец Total Revenue (£M) wndw
— показывающий общий доход в окне.
Выход 1
Вывод 2
Когда использовать / избегать → этот метод следует отдавать предпочтение, если вы хотите сохранить детализацию набора данных. По сути, одно из основных преимуществ оконных функций заключается в том, что агрегирование применяется к окну записей, а затем отображается для каждой строки без свертывания записей в исходном наборе данных.
С другой стороны, вам следует избегать этого метода, когда вы работаете с очень большими наборами данных и хотите выполнить агрегацию для получения меньшего, более управляемого результата.
Заключение
В этом руководстве я обсудил, как существуют по крайней мере три основных метода для выполнения агрегации с Python при работе с Spark DataFrame.
Концептуально агрегирование данных в PySpark очень похоже на агрегирование данных в SQL с предложением GROUP BY
или с использованием оконной функции.
Специалисты по работе с данными с большим опытом работы с SQL обычно находят переход на PySpark довольно простым, особенно при использовании модуля pyspark.sql.
А как насчет тебя? Вы когда-нибудь пробовали запускать более продвинутые агрегации, используя свертывание и куб?
Примечание для моих читателей
Этот пост содержит партнерские ссылки, за которые я могу получить небольшую комиссию без дополнительных затрат (но на самом деле со скидкой) для вас, если вы совершите покупку.