Получите максимальную отдачу от поддержки PyArrow в pandas и Dask прямо сейчас
Введение
В этом посте исследуется, где мы можем использовать PyArrow для улучшения наших рабочих процессов pandas и Dask прямо сейчас. Общая поддержка dtypes PyArrow была добавлена с pandas 2.0 в pandas и Dask. Это решает кучу давних проблем пользователей обеих библиотек. Пользователи pandas часто жалуются мне, что pandas не поддерживает пропущенные значения в произвольных dtypes или что нестандартные dtypes не очень хорошо поддерживаются. Особенно раздражающей проблемой для пользователей Dask является нехватка памяти для больших наборов данных. Строковые столбцы, поддерживаемые PyArrow, потребляют до 70 % меньше памяти по сравнению со столбцами объектов NumPy и, таким образом, могут смягчить эту проблему, а также обеспечить значительное улучшение производительности.
Поддержка dtypes PyArrow в пандах и, соответственно, в Dask все еще является относительно новой. Я бы рекомендовал проявлять осторожность при выборе PyArrow dtype_backend
до тех пор, пока не будет выпущена как минимум pandas 2.1. Не каждая часть обоих API еще оптимизирована. Тем не менее, вы сможете добиться значительного улучшения в определенных рабочих процессах. В этом посте будет рассмотрено несколько примеров, в которых я бы порекомендовал сразу перейти на PyArrow, потому что он уже дает огромные преимущества.
Сам Dask может по-разному извлечь выгоду из dtypes PyArrow. Мы рассмотрим, как строки, поддерживаемые PyArrow, могут легко смягчить проблему нехватки памяти в кластерах Dask и как мы можем повысить производительность за счет использования PyArrow.
Я являюсь частью основной команды pandas и принимал активное участие в реализации и улучшении поддержки PyArrow в pandas. Я недавно присоединился к Coiled, где работаю над Dask. Одна из моих задач — улучшить интеграцию PyArrow с Dask.
Общий обзор поддержки PyArrow
PyArrow dtypes изначально были представлены в pandas 1.5. Реализация была экспериментальной, и я бы не рекомендовал использовать ее на пандах 1.5.x. Их поддержка все еще относительно новая. pandas 2.0 обеспечивает огромное улучшение, в том числе упрощает выбор поддерживаемых PyArrow DataFrames. Мы все еще работаем над их правильной поддержкой везде, поэтому их следует использовать с осторожностью, по крайней мере, до выхода pandas 2.1. Оба проекта постоянно работают над улучшением поддержки Dask и pandas.
Мы призываем пользователей попробовать их! Это поможет нам лучше понять, что еще не поддерживается или работает недостаточно быстро. Предоставление отзывов помогает нам улучшить поддержку и значительно сократить время, необходимое для обеспечения бесперебойного взаимодействия с пользователем.
Набор данных
Мы будем использовать набор данных такси из Нью-Йорка, который содержит все поездки Uber и Lyft. У него есть некоторые интересные атрибуты, такие как цена, чаевые, оплата водителя и многое другое. Набор данных находится здесь (см. условия использования) и хранится в паркетных файлах. При анализе запросов Dask мы будем использовать общедоступную корзину S3, чтобы упростить наши запросы: s3://coiled-datasets/uber-lyft-tlc/
. Мы будем использовать набор данных за декабрь 2022 года для наших запросов pandas, так как это максимум, который удобно помещается в памяти на моей машине (24 ГБ ОЗУ). Мы должны избегать акцентирования внимания на использовании оперативной памяти, так как это может привести к побочным эффектам при анализе производительности.
Мы также изучим производительность read_csv
. Мы будем использовать набор данных Crimes in Chicago, который можно найти здесь.
Даск-кластер
Существуют различные варианты настройки кластера Dask, неполный список вариантов развертывания см. в Документации по Dask. Я буду использовать Coiled для создания кластера на AWS с 30 машинами через:
import coiled cluster = coiled.Cluster( n_workers=30, name="dask-performance-comparisons", region="us-east-2", # this is the region of our dataset worker_vm_type="m6i.large", )
Coiled подключен к моей учетной записи AWS. Он создает кластер в моей учетной записи и управляет всеми ресурсами для меня. 30 машин достаточно, чтобы комфортно работать с нашим набором данных. Мы исследуем, как мы можем уменьшить необходимое количество рабочих до 15 с помощью небольших модификаций.
панды StringDtype
при поддержке PyArrow
Мы начнем с функции, которая была впервые представлена более 3 лет назад в pandas 1.0. Установка dtype в pandas или Dask на string
возвращает объект с StringDtype
. Эта функция является относительно зрелой и должна обеспечивать бесперебойную работу пользователей.
Исторически панды представляли строковые данные через массивы NumPy с типом dtype object
. Данные объекта NumPy хранятся в виде массива указателей, указывающих на фактические данные в памяти. Это делает итерацию по массиву, содержащему строки, очень медленным. Pandas 1.0 изначально представил указанный StringDtype
, который позволял более простые и согласованные операции со строками. Этот dtype по-прежнему поддерживался строками Python и поэтому тоже не отличался высокой производительностью. Скорее, он обеспечивал четкую абстракцию строковых данных.
pandas 1.3, наконец, представил улучшение для создания эффективного строкового dtype. Этот тип данных поддерживается массивами PyArrow. PyArrow предоставляет структуру данных, которая обеспечивает высокую производительность и эффективное использование памяти. Начиная с этого момента, пользователи могли использовать строковый dtype, который был непрерывным в памяти и, следовательно, очень быстрым. Этот dtype можно запросить через string[pyarrow]
. В качестве альтернативы мы можем запросить его, указав string
в качестве dtype и настроив:
pd.options.mode.string_storage = "pyarrow"
Поскольку Dask строится поверх pandas, здесь также доступна эта строка dtype. Кроме того, Dask предлагает удобную опцию, которая автоматически преобразует все строковые данные в string[pyarrow]
.
dask.config.set({"dataframe.convert-string": True})
Это удобный способ избежать dtype объекта NumPy для строковых столбцов. Кроме того, у него есть то преимущество, что он изначально создает массивы PyArrow для методов ввода-вывода, которые работают с объектами Arrow. Помимо значительного повышения производительности, строки PyArrow потребляют значительно меньше памяти. В среднем Dask DataFrame со строками PyArrow потребляет около 33–50% исходной памяти по сравнению с объектом NumPy. Это решает самую большую проблему для пользователей Dask, связанную с нехваткой памяти при работе с большими наборами данных. Опция включает глобальное тестирование в наборе тестов Dask. Это гарантирует, что поддерживаемые PyArrow строки будут достаточно зрелыми, чтобы обеспечить бесперебойную работу пользователей.
Давайте рассмотрим несколько операций, представляющих типичные строковые операции. Мы начнем с нескольких примеров pandas, прежде чем переключиться на операции в нашем кластере Dask.
Мы будем использовать df.convert_dtypes
для преобразования столбцов наших объектов в строковые массивы PyArrow. Существуют более эффективные способы получения dtypes PyArrow в pandas, которые мы рассмотрим позже. Мы будем использовать набор данных Uber-Lyft за декабрь 2022 года, этот файл удобно помещается в памяти моей машины.
import pandas as pd pd.options.mode.string_storage = "pyarrow" df = pd.read_parquet( "fhvhv_tripdata_2022-10.parquet", columns=[ "tips", "hvfhs_license_num", "driver_pay", "base_passenger_fare", "dispatching_base_num", ], ) df = df.convert_dtypes( convert_boolean=False, convert_floating=False, convert_integer=False, )
Наш DataFrame имеет dtypes NumPy для всех нестроковых столбцов в этом примере. Начнем с фильтрации всех поездок, которыми управляет Uber.
df[df["hvfhs_license_num"] == "HV0003"]
Эта операция создает маску со значениями True/False, которые указывают, выполнял ли Uber поездку. Здесь не используются какие-либо специальные строковые методы, но сравнение на равенство отправляется в PyArrow. Далее мы будем использовать метод доступа String, который реализован в pandas и дает вам доступ ко всем видам строковых операций для каждого элемента. Мы хотим найти все поездки, отправленные с базы, начинающейся с "B028"
.
df[df["dispatching_base_num"].str.startswith("B028")]
startswith
перебирает наш массив и проверяет, начинается ли каждая строка с указанной подстроки. Преимущество PyArrow легко увидеть. Данные непрерывны в памяти, что означает, что мы можем эффективно перебирать их. Кроме того, эти массивы имеют второй массив с указателями, указывающими на первый адрес памяти каждой строки, что еще больше ускоряет вычисление начальной последовательности.
Наконец, мы рассмотрим операцию GroupBy
, которая группирует строковые столбцы PyArrow. Вычисление групп также может быть отправлено в PyArrow, что более эффективно, чем факторизация массива объектов NumPy.
df.groupby( ["hvfhs_license_num", "dispatching_base_num"] ).mean(numeric_only=True)
Давайте посмотрим, как эти операции сочетаются с DataFrames, где строковые столбцы представлены типом объекта NumPy.
Результаты более-менее соответствуют нашим ожиданиям. Сравнения на основе строк выполняются значительно быстрее при выполнении строк PyArrow. Большинство методов доступа к строкам должны обеспечить значительное улучшение производительности. Еще одним интересным наблюдением является использование памяти, оно уменьшено примерно на 50% по сравнению с типом объекта NumPy. Мы подробнее рассмотрим это с Даском.
Dask отражает API pandas и отправляет в pandas большинство операций. Следовательно, мы можем использовать тот же API для доступа к строкам PyArrow. Удобным вариантом для глобального запроса является вариант, упомянутый выше, который мы будем использовать здесь:
dask.config.set({"dataframe.convert-string": True})
Одним из самых больших преимуществ этой опции во время разработки является то, что она позволяет легко тестировать строки PyArrow глобально в Dask, чтобы убедиться, что все работает гладко. Мы будем использовать набор данных Uber-Lyft для наших исследований. Набор данных занимает около 240 ГБ памяти в нашем кластере. В нашем начальном кластере 30 машин, этого достаточно для комфортного выполнения наших вычислений.
import dask import dask.dataframe as dd from distributed import wait dask.config.set({"dataframe.convert-string": True}) df = dd.read_parquet( "s3://coiled-datasets/uber-lyft-tlc/", storage_options={"anon": True}, ) df = df.persist() wait(df) # Wait till the computation is finished
Мы сохраняем данные в памяти, чтобы производительность ввода-вывода не влияла на наши измерения производительности. Наши данные теперь доступны в памяти, что делает доступ быстрым. Мы будем выполнять вычисления, аналогичные нашим вычислениям панд. Одна из основных целей — показать, что преимущества панд перейдут на вычисления в распределенной среде с помощью Dask.
Одним из первых наблюдений является то, что DataFrame со строковыми столбцами, поддерживаемыми PyArrow, потребляет всего 130 ГБ памяти, что составляет лишь половину того, что он потребляет со столбцами объектов NumPy. У нас есть только несколько строковых столбцов в нашем DataFrame, а это означает, что экономия памяти для строковых столбцов на самом деле выше, чем около 50% при переключении на строки PyArrow. Следовательно, мы уменьшим размер нашего кластера до 15 рабочих процессов при выполнении наших операций со строковыми столбцами PyArrow.
cluster.scale(15)
Мы измеряем производительность операции-маски и одного из методов доступа String вместе посредством последующей фильтрации DataFrame.
df = df[df["hvfhs_license_num"] == "HV0003"] df = df[df["dispatching_base_num"].str.startswith("B028")] df = df.persist() wait(df)
Мы видим, что можем использовать те же методы, что и в предыдущем примере. Это делает переход от pandas к Dask относительно простым.
Кроме того, мы снова вычислим операцию GroupBy
для наших данных. Это значительно сложнее в распределенной среде, что делает результаты более интересными. Предыдущие операции относительно легко распараллеливаются на большом кластере, в то время как с GroupBy
это сложнее.
df = df.groupby( ["hvfhs_license_num", "dispatching_base_num"] ).mean(numeric_only=True) df = df.persist() wait(df)
Мы получаем хорошие улучшения в 2 и 3 раза. Это особенно интригует, поскольку мы уменьшили размер нашего кластера с 30 машин до 15, снизив стоимость на 50%. Впоследствии мы также сократили наши вычислительные ресурсы в 2 раза, что делает наше улучшение производительности еще более впечатляющим. Таким образом, производительность улучшилась в 4 и 6 раз соответственно. Мы можем выполнять те же вычисления на меньшем кластере, что экономит деньги и в целом более эффективно, и при этом получать прирост производительности.
Подводя итог, мы увидели, что строковые столбцы PyArrow являются огромным улучшением по сравнению с столбцами объектов NumPy в DataFrames. Переход на строки PyArrow — это относительно небольшое изменение, которое может повысить производительность и эффективность обычного рабочего процесса, который сильно зависит от строковых данных. Эти улучшения одинаково заметны в pandas и Dask!
Ключевое слово Engine в методах ввода/вывода
Теперь мы рассмотрим функции ввода-вывода в pandas и Dask. Некоторые функции имеют собственные реализации, например read_csv
, в то время как другие отправляются в другую библиотеку, например, с read_excel
по openpyxl
. Некоторые из этих функций получили новое ключевое слово engine
, которое позволяет нам выполнять диспетчеризацию в PyArrow
. Парсеры PyArrow по умолчанию являются многопоточными и, следовательно, могут обеспечить значительное улучшение производительности.
pd.read_csv("Crimes_-_2001_to_Present.csv", engine="pyarrow")
Эта конфигурация вернет те же результаты, что и другие механизмы. Единственное отличие состоит в том, что PyArrow используется для чтения данных. Та же опция доступна для read_json
. Были добавлены PyArrow-движки, чтобы обеспечить более быстрый способ чтения данных. Улучшенная скорость — только одно из преимуществ. Парсеры PyArrow возвращают данные в виде таблицы PyArrow. Таблица PyArrow предоставляет встроенную функциональность для преобразования в pandas DataFrame
. В зависимости от данных может потребоваться копирование при преобразовании в NumPy (строка, целые числа с отсутствующими значениями,...), что приводит к ненужному замедлению. Вот тут-то и появляется PyArrow dtype_backend
. Он реализован как класс ArrowExtensionArray
в pandas, который поддерживается PyArrow ChunkedArray. Как прямое следствие, преобразование таблицы PyArrow в pandas чрезвычайно дешево, поскольку не требует никаких копий.
pd.read_csv( "Crimes_-_2001_to_Present.csv", engine="pyarrow", dtype_backend="pyarrow", )
Это возвращает DataFrame
, который поддерживается массивами PyArrow. pandas еще не везде оптимизирован, поэтому это может замедлить последующие операции. Это может стоить того, если рабочая нагрузка особенно велика по вводу-выводу. Давайте посмотрим на прямое сравнение:
Мы видим, что PyArrow-engine и PyArrow dtypes обеспечивают 15-кратное ускорение по сравнению с C-engine.
Те же преимущества относятся и к Dask. Dask является оболочкой для чтения csv pandas и, следовательно, получает те же функции бесплатно.
Сравнение с Dask немного сложнее. Во-первых, мой пример считывает данные с моей локальной машины, а наши примеры Dask считывают данные из корзины S3. Скорость сети будет важным компонентом. Кроме того, распределенные вычисления имеют некоторые накладные расходы, которые мы должны учитывать.
Здесь нам нужна исключительно скорость, поэтому мы будем считывать некоторые данные временных рядов из общедоступной корзины S3.
import dask.dataframe as dd from distributed import wait df = dd.read_csv( "s3://coiled-datasets/timeseries/20-years/csv/", storage_options={"anon": True}, engine="pyarrow", parse_dates=["timestamp"], ) df = df.persist() wait(df)
Мы выполним этот фрагмент кода для engine="c"
, engine="pyarrow"
и дополнительно engine="pyarrow"
с dtype_backend="pyarrow"
. Давайте посмотрим на некоторые сравнения производительности. Оба примера были выполнены с 30 машинами в кластере.
Движок PyArrow работает примерно в 2 раза быстрее, чем C-движок. Обе реализации использовали одинаковое количество машин. Использование памяти было уменьшено на 50 % с помощью PyArrow dtype_backend
. Такое же сокращение доступно, если только столбцы объектов преобразуются в строки PyArrow, что обеспечивает лучший опыт в последующих операциях.
Мы видели, что движки Arrow обеспечивают значительное ускорение по сравнению с пользовательскими реализациями C. Они еще не поддерживают все функции пользовательских реализаций, но если ваш вариант использования совместим с поддерживаемыми параметрами, вы должны получить значительное ускорение бесплатно.
Случай с PyArrow dtype_backend
немного сложнее. Еще не все области API оптимизированы. Если вы тратите много времени на обработку данных вне функций ввода-вывода, то это может не дать вам того, что вам нужно. Это ускорит вашу обработку, если ваш рабочий процесс тратит много времени на чтение данных.
dtype_backend в встроенных в PyArrow считывателях ввода-вывода
Некоторые другие методы ввода-вывода также имеют ключевое слово двигателя. read_parquet
— самый популярный пример. Хотя тут немного другая ситуация. Эти методы ввода-вывода уже использовали механизм PyArrow по умолчанию. Так парсинг максимально эффективен. Еще одно потенциальное преимущество в производительности — использование ключевого слова dtype_backend
. Обычно PyArrow возвращает таблицу PyArrow, которая затем преобразуется в DataFrame pandas. Типы PyArrow преобразуются в их эквивалент NumPy. Параметр dtype_backend="pyarrow"
позволяет избежать этого преобразования. Это дает приличное улучшение производительности и экономит много памяти.
Давайте посмотрим на одно сравнение производительности панд. Мы читаем данные такси Uber-Lyft за декабрь 2022 года.
pd.read_parquet("fhvhv_tripdata_2022-10.parquet")
Читаем данные с dtype_backend="pyarrow"
и без.
Мы легко видим, что больше всего времени уходит на преобразование после завершения чтения файла Parquet. Функция работает в 3 раза быстрее, если не выполнять преобразование в dtypes NumPy.
Dask имеет специализированную реализацию для read_parquet
, которая имеет некоторые преимущества, адаптированные для распределенных рабочих нагрузок по сравнению с реализацией pandas. Общим знаменателем является то, что обе функции отправляются в PyArrow для чтения файла паркета. Оба имеют общее то, что данные преобразуются в dtypes NumPy после успешного чтения файла. Мы читаем весь набор данных Uber-Lyft, который занимает около 240 ГБ памяти в нашем кластере.
import dask.dataframe as dd from distributed import wait df = dd.read_parquet( "s3://coiled-datasets/uber-lyft-tlc/", storage_options={"anon": True}, ) df = df.persist() wait(df)
Мы читаем набор данных в 3 разных конфигурациях. Сначала с типами dtypes NumPy по умолчанию, затем с включенной строковой опцией PyArrow:
dask.config.set({"dataframe.convert-string": True})
И наконец с dtype_backend="pyarrow"
. Давайте посмотрим, что это означает с точки зрения производительности:
Как и в нашем примере с пандами, мы видим, что преобразование в dtypes NumPy занимает огромную часть времени выполнения. Типы PyArrow дают нам хорошее улучшение производительности. Обе конфигурации PyArrow используют половину памяти, которую используют dtypes NumPy.
Строки PyArrow намного более зрелые, чем обычные PyArrow dtype_backend
. Основываясь на полученной нами диаграмме производительности, мы получаем примерно такой же прирост производительности при использовании строк PyArrow и dtypes NumPy для всех остальных dtypes. Если рабочий процесс еще недостаточно хорошо работает с PyArrow dtypes, я бы рекомендовал включить только строки PyArrow.
Заключение
Мы увидели, как мы можем использовать PyArrow в pandas в Dask прямо сейчас. Строковые столбцы, поддерживаемые PyArrow, могут положительно повлиять на большинство рабочих процессов и обеспечить бесперебойную работу пользователей с pandas 2.0. Dask имеет удобную возможность глобально избегать dtype объекта NumPy, когда это возможно, что еще больше упрощает использование строк, поддерживаемых PyArrow. PyArrow также обеспечивает значительное ускорение в других областях, где это возможно. PyArrow dtype_backend
все еще довольно новый и уже сейчас может значительно сократить время ввода-вывода. Безусловно, стоит изучить, может ли он устранить узкие места в производительности. Ведется большая работа по улучшению поддержки общих dtypes PyArrow с потенциалом ускорения среднего рабочего процесса в ближайшем будущем.
В настоящее время в pandas есть предложение начать выводить строки как строки, поддерживаемые PyArrow, по умолчанию, начиная с pandas 3.0. Кроме того, он включает в себя гораздо больше областей, в которых имеет смысл больше опираться на PyArrow (например, десятичные дроби, структурированные данные и т. д.). Ознакомиться с предложением можно здесь.
Спасибо за чтение. Не стесняйтесь обращаться в комментариях, чтобы поделиться своими мыслями и отзывами о поддержке PyArrow в обеих библиотеках. Я буду писать последующие посты, посвященные этой теме и пандам в целом. Подпишитесь на меня на Medium, если хотите узнать больше о пандах и Dask.