И как ими управлять

В этом посте я собираюсь обсудить интересный паттерн с удобной трансляцией. Прежде чем вдаваться в подробности, давайте освежим, что такое искровые аккумуляторы.

Общая переменная, которая может накапливаться, т. Е. Имеет коммутативную и ассоциативную операцию «сложения». Рабочие задачи в кластере Spark могут добавлять значения в аккумулятор с помощью оператора + =, но только программе драйвера разрешен доступ к его значению с помощью value. Обновления от рабочих автоматически распространяются на программу драйвера.

Источник: https://spark.apache.org/docs/2.3.1/api/python/pyspark.html#pyspark.Accumulator

Три заповеди накопителя

  1. Аккумуляторы можно использовать только для коммутативной и ассоциативной операции «сложения». Для любой другой операции мы должны использовать индивидуальную реализацию. Подробнее об этом позже.
  2. Накопитель может быть «обновлен» для рабочей задачи, но эта задача не может получить доступ к его значению.
  3. Аккумулятор можно обновить и получить к нему доступ в программе драйвера.

Несколько строк кода лучше тысячи слов

Давайте рассмотрим простой пример аккумулятора.

В приведенном выше примере кода cnt определен на глобальном уровне. add_items добавляет ввод x в cnt. Метод add_items позже применяется к каждому элементу rdd в методе global_accumulator. Это типичное использование аккумулятора, и в конце вызов global_accumulator выведет 6, что является суммой 1, 2 и 3. Обратите внимание, что нам нужно определить cnt as global, иначе различные методы не смогут получить к нему доступ, и он будет неопределенным.

Тогда в чем проблема….

Теперь, чтобы мотивировать потребность в широковещательном аккумуляторе, давайте разделим наш код на пару файлов, что было бы в модульной кодовой базе.

global_accumulator_module вызывает метод process_data из другого модуля Accumulator_process (код ниже).

Если мы выполним global_accumulator_module, он выйдет из строя со следующей ошибкой.

NameError: name ‘cnt’ is not defined

Проблема в том, что глобальные переменные в Python являются глобальными для модуля.

Глобальные переменные в Python являются глобальными для модуля, а не для всех модулей. (В отличие от C, где глобальное значение одинаково для всех файлов реализации, если вы явно не сделаете его статическим.) Если вам нужны действительно глобальные переменные из импортированных модулей, вы можете установить их в атрибуте модуля, в который вы его импортируете.

Источник: https://www.tutorialspoint.com/Explain-the-visibility-of-global-variables-in-imported-modules-in-Python

Так как же сделать глобальную переменную доступной для разных модулей? В документах Python есть несколько идей.

Канонический способ обмена информацией между модулями в рамках одной программы - создать специальный модуль (часто называемый config или cfg). Просто импортируйте модуль конфигурации во все модули вашего приложения; тогда модуль становится доступным как глобальное имя. Поскольку существует только один экземпляр каждого модуля, любые изменения, внесенные в объект модуля, отражаются повсюду. Например:

config.py:

x = 0 # Значение по умолчанию для параметра конфигурации 'x'

mod.py:

конфигурация импорта
config.x = 1

main.py:

конфигурация импорта
мод импорта
печать (config.x)

Обратите внимание, что использование модуля также является основой для реализации шаблона проектирования Singleton по той же причине.

Источник: https://docs.python.org/3/faq/programming.html?highlight=global#how-do-i-share-global-variables-across-modules

Хотя это не относится к PySpark, причина в том, что на каждом рабочем узле будет отдельный экземпляр модуля. Отсюда необходимость в аккумуляторе, поскольку типичный способ использования совместно используемых переменных в Spark не работает.

Накопитель трансляций

В этом случае метод broadcast_accumulator вызывает метод process_data_accumulator в модуле аккумулятор_процесс с аккумулятором acc.

На самом деле здесь происходит то, что мы отправляем ссылку на объект acc рабочим узлам. Таким образом, нам не нужно определять его как глобальный.

Теперь я собираюсь обсудить две настройки аккумулятора:

  1. Передавать статические значения вместе с аккумулятором
  2. Рассылка нескольких аккумуляторов

Фильтр-аккумулятор

В этом примере класс FilterAccumulator содержит список элементов. Накопитель acc обновляется, если элемент, переданный в методе process, отсутствует в списке элементов. Как отмечено в коде, init и count должны выполняться в драйвере, а process - на рабочем узле. Выполнение вышеуказанного дает значение 4, что является правильным результатом.

Несколько аккумуляторов

В этом примере у нас есть два аккумулятора sum и num, которые накапливают значение элементов и количество элементов соответственно. Опять же, как отмечено в коде, init и mean должны выполняться в драйвере, а process - на рабочем узле. Хотя среднее значение может быть получено с использованием других собственных методов Spark, это пример того, как управлять несколькими аккумуляторами с помощью одного класса, не объявляя их глобальными (что не работает в случае модульного кода).

Вывод

Надеюсь, теперь понятно, как можно управлять аккумуляторами с помощью классов и широковещания, а не объявлять глобальными. Я надеюсь опубликовать в будущем, как создавать аккумуляторы, которые не являются int и float, поскольку Spark позволяет настраивать аккумуляторы.