Введение
В предыдущем сообщении в блоге мы обсуждали инфраструктуру LyftLearn, построенную на основе Kubernetes. В этом посте мы сосредоточимся на вычислительном уровне LyftLearn и обсудим, как LyftLearn решает некоторые из основных проблем, с которыми сталкиваются специалисты по машинному обучению Lyft.
Эффективное использование вычислительных ресурсов
Болевые точки
Всегда существует компромисс между удобством пользователя и эффективным использованием ресурсов. Когда пользователям LyftLearn требуется больше вычислительной мощности, масштабирование по вертикали (более мощные машины) намного проще, чем по горизонтали (больше машин). Это меньше работы для пользователей, но недостатком является то, что большие машины часто нужны только для части рабочей нагрузки. В остальное время загрузка невелика, и возникают огромные затраты.
Например, использование 90-ядерных инстансов EC2 для запуска ноутбуков Jupyter, использующих только 60 ядер, приведет к потере 30 ядер и коэффициенту использования 67 %. Поскольку наш кластер Kubernetes работает на инстансах EC2, наша команда разработчиков платформы в первую очередь заботится о том, чтобы максимизировать его экономическую эффективность.
Решение: Kubernetes Spark
Apache Spark – это многоязычный механизм для обработки данных, обработки данных и машинного обучения в кластерах. Он обеспечивает превосходную масштабируемость, производительность и надежность. Использование Spark с Kubernetes дополнительно снижает нагрузку на запуск заданий Spark за счет кэширования изображений. Kubernetes позволяет отдельным пользователям использовать свои собственные изображения. Для каждого узла кластера архитектура модуля позволяет использовать разнородные рабочие процессы Spark, которые не мешают друг другу.
На следующей записи экрана показан пример операции карты delay
(переход в спящий режим на 5 секунд перед возвратом исходного элемента), который может последовательно занять 5 120 секунд, но в нашей среде LyftLearn занимает всего 22,9 секунды. Обратите внимание, что 22,9 секунды также включают в себя запрос и освобождение 1024 ядер (64 экземпляра, по 16 ядер каждый) в Kubernetes. Кластер Spark эфемерен и существует только в операторе with
.
Небольшое время запуска позволяет нам размещать исключительно эфемерные кластеры Spark по требованию. Пользователи могут запрашивать столько ядер, сколько им нужно, с минимальным программным интерфейсом. С эфемерным Spark они могут использовать меньшие экземпляры ноутбуков, а затем при необходимости использовать большие вычислительные ресурсы по запросу.
Удивительно, но даже при одновременном выполнении нескольких тяжелых заданий Spark количество экземпляров EC2 в ASG (группы автоматического масштабирования) редко затрагивается. Это означает, что Spark эффективно использует фрагментированные ресурсы в нашем кластере, поэтому мы получаем Spark бесплатно в LyftLearn!
Еще одним важным преимуществом Kubernetes Spark является то, что для рабочих процессов Spark не требуется начальная загрузка. В LyftLearn мы упростили пользователям создание собственных изображений, как показано в предыдущем сообщении в блоге. Мы также предоставляем пользователям настраиваемые базовые образы Spark, чтобы они не сталкивались с проблемами зависимостей при создании собственных образов.
Распространение существующей логики с минимальными усилиями
Болевые точки
В прошлом мы пытались убедить пользователей переписать существующую логику для оптимизации скорости и использования ресурсов, потому что это может сэкономить как время, так и деньги. На практике мы наблюдали:
- Существующая логика часто была слишком сложной или критической, чтобы ее можно было уверенно переписать.
- Масштабирование часто означало параллельный запуск одной и той же логики в большом количестве групп.
Хотя Spark идеально подходит для этого сценария, кривая обучения крутая. Учитывая бизнес-цели, на которых должны сосредоточиться наши клиентские команды, неоптимально для всех тратить время на изучение Spark. Кроме того, даже эксперты Spark могут по-прежнему тратить нетривиальные усилия на реализацию рабочего процесса с большими данными.
Так как же минимизировать код, чтобы перенести рабочие нагрузки в Spark?
Решение: преобразование фуги
Проект с открытым исходным кодом Fugue делает это очень просто. Это уровень абстракции, объединяющий вычислительную логику над Spark, Dask, Pandas и CUDF через Blazing SQL.
Одной из самых популярных возможностей Fugue является возможность использовать один вызов функции для распределения логики. Пользователи могут предоставлять функции с входными и выходными данными с аннотациями типов, а затем Fugue преобразует данные на основе аннотаций типов. Это делает пользовательскую логику независимой от Fugue и Spark, устраняя необходимость в каких-либо ранее существовавших знаниях.
Вот простой пример переноса пользовательской логики Python в Spark:
В приведенном выше примере предположим, что каждая задача должна заснуть на несколько секунд, определяемых параметром a
. В первой ячейке compute_tasks
описывает, как выполнять пакетную обработку задач локально. Также мы определили 100 задач, каждая из которых занимает 58-62 секунды. Последовательный запуск этого процесса занял бы около 100 минут без учета накладных расходов.
Во второй ячейке мы используем out_transform
от Fugue для запуска compute_task
с помощью Kubernetes Spark от LyftLearn. Выполнение задания занимает всего 1,5 минуты, включая накладные расходы на получение и освобождение вычислительных ресурсов.
В следующем примере мы рассмотрим вывод распределенного машинного обучения:
В этом случае пользовательская функция predict()
должна работать с каждым регионом отдельно. Для каждого региона он загрузит модель для конкретного региона и сделает прогнозы. В функции Fugue transform()
мы используем partition={"by": region"}
для разделения данных all_region
на отдельные разделы перед запуском функции predict()
для каждого из них. Прелесть transform()
и out_transform()
в том, что логика определяется независимо от масштаба.
Поскольку входными и выходными типами функции predict()
являются Pandas DataFrame
s, мы можем рассмотреть возможность использования Spark Pandas User-Defined Functions (UDFs) для повышения производительности. Хотя у PySpark для этого есть много разных интерфейсов, Fugue позаботится об использовании пользовательских функций Pandas под капотом без каких-либо изменений кода со стороны пользователя, если мы включим конфигурацию. Даже если функция использует собственные типы ввода и вывода Python, такие как List
и Dict
, Fugue все равно может выполнять ее как пользовательскую функцию Pandas.
Использование инструментов на основе SQL
Болевые точки
SQL является основным интерфейсом для различных служб данных и вычислительных сред. Однако даже для широко распространенного интерфейса существует множество вариаций, так что для выполнения простой операции, такой как SELECT * FROM <table> LIMIT 10
, требуется много разного шаблонного кода.
Presto отлично подходит для небольших сценариев вывода, и пользователи, как правило, создают небольшие задачи, в которых они вызывают службу Presto и загружают данные локально для дальнейшей обработки. В целях масштабирования они параллельно запускают сотни небольших задач, перегружая сервис Presto. Они могли бы использовать один запрос Hive для замены массовых вызовов Presto, но они не знают, как обрабатывать большие выходные данные Hive и обрабатывать данные распределенным способом.
Еще одной проблемой является разработка сложных SQL-запросов. Они подвержены ошибкам и требуют больших затрат на отладку и итерацию. Как мы можем использовать меньше итераций, чтобы написать 100% правильный сложный SQL?
Решение
В LyftLearn мы делаем Kubernetes Spark единственным вариантом для запуска Hive. В результате многие пользователи получили огромный прирост производительности при выполнении своих запросов. Это потому что:
- Во время выполнения запроса для него выделяются фиксированные и выделенные вычислительные ресурсы.
- Map-reduce — это естественное решение для большого количества запросов Hive.
- Адаптивное выполнение Spark 3 значительно повышает производительность SQL.
- Для каждого выполнения могут быть точно настроены такие конфигурации, как ЦП, память и порог широковещательной рассылки.
С эфемерными кластерами Spark мы редко видим конкуренцию ресурсов для выполнения запросов Hive. Большинство исполнений могут немедленно использовать свои настроенные ЦП.
Помимо повышения эффективности исполнения, LyftLearn стремится обеспечить лучший опыт разработки, чтобы повысить продуктивность разработчиков.
Presto и Hive — две наиболее часто используемые службы SQL. Таким образом, мы создали магии %%presto
и %%hive
, чтобы позволить писать SQL непосредственно в ячейках без шаблонного кода. Кроме того, Fugue SQL позволяет SQL работать со смешанными источниками данных, такими как Presto, Hive, Pandas и плоские файлы. Чтобы поддержать это, мы также привнесли магию fsql
из Fugue SQL.
В качестве примера предположим, что мы пишем запрос Presto, чтобы получить 2 лучших подсценария с наибольшим количеством записей в таблице. Вывод сохраняется как Pandas DataFrame
с именем top
. Это типичная агрегация с небольшим выводом, поэтому Presto великолепен:
Теперь мы используем запрос Hive, чтобы сбросить все данные для этих двух сценариев во временный файл. Это хорошо работает независимо от размера вывода:
С помощью кэшированного файла мы можем написать собственный код PySpark для анализа больших данных без повторного запуска предыдущих запросов. Здесь мы получаем отчетливое snapshot_ts
:
При желании мы можем написать запрос Fugue SQL, эквивалентный запросу Hive и собственному коду Spark. Он напрямую соединяет Pandas DataFrame
top
с таблицей Hive и выводит результат:
Ниже приведены несколько рекомендаций, которыми мы делимся с пользователями LyftLearn:
- Presto хорош для агрегации и небольших сценариев вывода — это не должно занимать более 10 минут. Если Presto работает медленно, попробуйте Hive.
- Hive медленнее, но в целом более масштабируем. Всегда старайтесь сохранять вывод в файлы, а не сбрасывать его в Pandas.
- Если запрос Hive слишком сложный, слишком медленный или требует смешанных источников данных, рассмотрите вариант Fugue SQL. Это может упростить логику и ускорить как разработку, так и выполнение.
Заключение
В этой статье мы обсудили три основные болевые точки, связанные с использованием распределенных пакетных систем. Чтобы решить их, мы создали интуитивно понятную, масштабируемую и гибкую систему на основе Kubernetes Spark и Fugue и предоставили пользователям лучшие практики.
Благодаря этим усилиям мы увидели отличные результаты для LyftLearn во второй половине 2021 года. По сравнению с первым полугодием среднее время выполнения задания Spark сократилось с 3 часов до 0,3 часа, и хотя общее использование Spark увеличилось более чем на 60 %, общая стоимость выполнения Spark снизилась более чем на 50%.
Наша цель — свести к минимуму общие усилия по разработке, выполнению и обслуживанию в жизненном цикле машинного обучения. Руководствуясь практическими потребностями Lyft, мы продолжаем строить сбалансированную экосистему с упором на интуитивно понятный пользовательский интерфейс, минимальную кривую обучения и дизайн системы, который влияет на лучшие практики.
Благодарности
Большое спасибо Аниндье Саха и Каталин Тода, неустанно работающим над созданием инфраструктуры Spark для LyftLearn.
Особая благодарность Ширазу Заману, Винаю Какаде и Майклу Ребелло за помощь в составлении этого материала.
Если вы считаете эту задачу увлекательной и хотите присоединиться к нам в Lyft, ознакомьтесь с нашими страницами объявления о вакансиях или университетская программа.