Программирование| Большие данные | ПиСпарк

3 способа агрегирования данных в PySpark

Основные агрегаты PySpark объясняются примерами кодирования.

Рекомендуемые курсы по запросу

Несколько моих читателей обратились ко мне с просьбой о курсах по запросу, чтобы узнать больше об Apache Sparkс Python. Вот 3 отличных ресурса, которые я бы порекомендовал:

Еще не являетесь участником Medium? Рассмотрите возможность регистрации по моей реферальной ссылке, чтобы получить доступ ко всему, что может предложить Medium, всего за 5 долларов США в месяц!

Введение

Apache Spark — это механизм обработки данных, исключительно быстро выполняющий агрегацию больших наборов данных. В зависимости от типа выполненной агрегации выходной набор данных обычно будет представлять:

  • Меньшая количественность по сравнению с исходным набором данных → это происходит, когда агрегирование применяется к группе параметров.
  • Идентичная количество элементов по ​​сравнению с исходным набором данных→ это происходит, когда агрегирование применяется к окну записей.

В этом руководстве я расскажу о трех методах выполнения агрегирования в PySpark DataFrame с использованием Python и объясню, когда имеет смысл использовать каждый из них в зависимости от цели.

Для примеров кодирования я буду использовать вымышленный набор данных, включающий 5 миллионов транзакций продаж по регионам мира. Если вы хотите продолжить, вы можете загрузить CSV-файл с этого веб-сайта.

Исходный набор данных был упрощен, чтобы выглядеть как таблица ниже:



Агрегирование данных в PySpark

В этом разделе я представляю три способа агрегирования данных при работе с PySpark DataFrame.

В следующих фрагментах кода я буду использовать только функцию SUM(), однако те же рассуждения и синтаксис применимы к функциям MEAN(), AVG(), MAX(), MIN(), COUNT() и PIVOT().

Метод №1: Использование GroupBy() + функция

Самый простой способ запустить агрегацию в PySpark DataFrame — использовать groupBy() в сочетании с функцией агрегации.

Этот метод очень похож на использование предложения SQL GROUP BY, поскольку он эффективно сворачивает входной набор данных по группе измерений, что приводит к выходному набору данных с более низкой детализацией (что означает меньшее количество записей).

Например, если выбрана функция sum(), синтаксис будет таким:

dataframe.groupBy(‘dimension_1’, 'dimension_2', ...).sum(‘metric_1’)

Возвращаясь к набору данных о продажах, предположим, что задача заключалась в вычислении:

  • Total Revenue (£M) от Region
  • Total Revenue (£M) от Region и Item Type

В этом случае вы могли бы написать:

Как и в случае с SQL, groupBy() можно использовать для выполнения агрегатов по нескольким столбцам. Ниже найдите выходной набор данных, агрегированный Region:

Выход 1

Принимая во внимание, что следующий набор выходных данных агрегирован Region и Item Type:

Вывод 2

Когда использовать / избегать →этот метод следует использовать, когда форматирование не столь необходимо, и вы просто хотите запустить быструю агрегацию на лету при изучении набора данных.

Вместо этого его следует избегать, если вы хотите выполнить множественное агрегирование или применить ряд преобразований к агрегированному выводу.

Например, округление агрегированного вывода и переименование столбца во что-то более аккуратное довольно громоздко, так как требует двух отдельных преобразований.

Метод № 2: Использование GroupBy() + AGG()

Другой способ выполнить агрегацию с помощью groupBy() — это обернуть нужную функцию внутри метода AGG().

Что касается METHOD 1, этот метод также ведет себя аналогично предложению SQL GROUP BY, поскольку он создает набор данных с меньшим количеством элементов по сравнению с его источником. Однако, как вы убедитесь, это намного удобнее, чем METHOD 1 , когда требуется выполнить несколько преобразований агрегированного вывода.

Опять же, если предположить, что выбрана функция SUM(), синтаксис будет таким:

dataframe.groupBy('dimension_1', 'dimension_2', ...).agg(sum(“column_name”))

Например, если вы хотите воспроизвести то, что было достигнуто с помощью METHOD 1 в наборах данных о продажах, вы можете написать следующее:

Это снова приводит к двум результатам, один сгруппирован по миру Region, а другой сгруппирован по Region и Item Type:

Выход 1

Вывод 2

Когда использовать / избегать → этот метод должен быть вашим предпочтительным решением при выполнении агрегатов на PySpark DataFrame в рабочей среде. На самом деле, использование AGG() позволяет вам применять несколько конкатенированных преобразований в строке, делая код более читабельным и лаконичным.

С другой стороны, вам следует избегать использования этого метода, когда ваша цель состоит в том, чтобы выполнять агрегаты, сохраняя нетронутой степень детализации исходного набора данных (это означает, что записи не сворачиваются по группам измерений).

Действительно, это требование приводит к METHOD 3.

Способ № 3: использование оконной функции

Последний способ агрегирования данных в PySpark DataFrame — применение функции к окну строк.

Это действительно эквивалентно оконной функции SQL, так что, взяв в качестве примера SUM(), синтаксис будет таким:

# WINDOW DEFINITION
Window.partitionBy(‘dimension_1’, 'dimension_2', ...)

# DF AGGREGATION USING WINDOW FUNCTION
dataframe.withColumn(‘new_column_name’, functions.sum(‘metric_1’)\
         .over(Window.partitionBy(‘dimension_1’)))

Как и в SQL, предложение partitionBy используется вместо groupBy() для применения функции SUM() к определенному окну строк. В случае набора данных о продажах вы можете написать:

Как видно из выходных данных ниже, на этот раз столбец Total Revenue (£M) остался без изменений, а вместо него был рассчитан новый столбец Total Revenue (£M) wndwпоказывающий общий доход в окне.

Выход 1

Вывод 2

Когда использовать / избегать → этот метод следует отдавать предпочтение, если вы хотите сохранить детализацию набора данных. По сути, одно из основных преимуществ оконных функций заключается в том, что агрегирование применяется к окну записей, а затем отображается для каждой строки без свертывания записей в исходном наборе данных.

С другой стороны, вам следует избегать этого метода, когда вы работаете с очень большими наборами данных и хотите выполнить агрегацию для получения меньшего, более управляемого результата.



Заключение

В этом руководстве я обсудил, как существуют по крайней мере три основных метода для выполнения агрегации с Python при работе с Spark DataFrame.

Концептуально агрегирование данных в PySpark очень похоже на агрегирование данных в SQL с предложением GROUP BY или с использованием оконной функции.

Специалисты по работе с данными с большим опытом работы с SQL обычно находят переход на PySpark довольно простым, особенно при использовании модуля pyspark.sql.

А как насчет тебя? Вы когда-нибудь пробовали запускать более продвинутые агрегации, используя свертывание и куб?

Примечание для моих читателей

Этот пост содержит партнерские ссылки, за которые я могу получить небольшую комиссию без дополнительных затрат (но на самом деле со скидкой) для вас, если вы совершите покупку.

Источники