WedX - журнал о программировании и компьютерных науках

Pyspark: как распараллелить обработку нескольких файлов gz в HDFS

У меня есть много файлов gz, хранящихся в кластере HDFS из 20 узлов, которые необходимо агрегировать по столбцам. Файлы gz очень большие (1 ГБ каждый и всего 200 файлов). Формат данных — значение ключа с двумя значениями столбца: ['key','value1','value2'], и его необходимо сгруппировать по ключу с агрегированием по столбцу: sum(value1), count(value2).

Данные уже отсортированы по ключу, и каждый файл gz имеет эксклюзивные значения ключа.

Например:

File 1:
k1,v1,u1
k1,v2,u1
k2,v2,u2
k3,v3,u3
k3,v4,u4

File 2:
k4,v5,u6
k4,v7,u8
k5,v9,v10

File 3:
k6,...
...
...

File 200:
k200,v200,u200
k201,v201,u201

Сначала я анализирую дату и преобразовываю данные в структуру (key, list of (values)). Вывод парсера будет таким:

parser output
(k1,[v1,u1])
(k1,[v2,u1])
(k2,[v2,u2])
(k3,[v3,u3])
(k3,[v4,u4])

Затем сгруппируйте по ключевым значениям, используя функцию reduceByKey, которая более эффективна, чем функция groupByKey.

reducer output:
(k1,[[v1,u1],[v2,u1])
(k2,[[v2,u2]])
(k3,[[v3,u3],[v4,u4]])

Затем агрегируйте столбцы, используя функцию процесса:

process 
(k1, sum([v1,v2], len([u1,u3])))
(k2, sum([v2], len([u2])))
(k3, sum([v3,v4], len([u3,u4])))

Вот пример кода процесса

import pyspark
from pyspark import SparkFiles

def parser(line):
    try:
        key,val=line.split('\t)
        return (key,[val1,val2])
    except:
        return None

def process(line):
    key,gr= line[0],line[1]
    vals=zip(*gr)

    val1=sum(vals[0])
    val2=len(vals[1])
    return ('\t'.join([key,val1,val2]))

sc = pyspark.SparkContext(appName="parse")
logs=sc.textFile("hdfs:///home/user1/*.gz")
proc=logs.map(parser).filter(bool).reduceByKey(lambda acc,x: acc+x).map(process)
proc.saveAsTextFile('hdfs:///home/user1/output1')

Я думаю, что этот код не полностью использует искровой кластер. Мне нравится оптимизировать код, чтобы полностью использовать обработку.

<сильный>1. Как лучше всего обрабатывать gz-файлы в HDFS и Pyspark? — как полностью распределить обработку gz-файлов на весь кластер?

<сильный>2. Как полностью использовать все ЦП в каждом узле? для процесса агрегации и синтаксического анализа


  • Сначала загрузите файлы в фреймворк данных spark. 03.08.2019

Ответы:


1

Есть по крайней мере несколько вещей, которые вы должны учитывать:

  1. Если вы используете YARN, количество исполнителей и ядер на исполнителя, которые вы назначаете своему приложению spark. Ими можно управлять с помощью --num-executors и --executor-cores. Если вы не используете YARN, ваш планировщик, вероятно, имеет аналогичный механизм управления параллелизмом, попробуйте поискать его.
  2. Количество разделов в вашем DataFrame, которое напрямую влияет на параллелизм в вашей работе. Вы можете управлять этим с помощью переразметка и/или объединить.

Оба могут ограничивать количество ядер, используемых заданием, и, следовательно, использование кластера. Кроме того, примите во внимание, что большее количество используемых процессоров не обязательно означает лучшую производительность (или время выполнения). Это будет зависеть от размера вашего кластера и размера проблемы, и я не знаю ни одного простого правила, чтобы решить это. Для меня обычно все сводится к тому, чтобы поэкспериментировать с различными конфигурациями и посмотреть, какая из них имеет лучшую производительность.

03.08.2019
Новые материалы

Объяснение документов 02: BERT
BERT представил двухступенчатую структуру обучения: предварительное обучение и тонкая настройка. Во время предварительного обучения модель обучается на неразмеченных данных с помощью..

Как проанализировать работу вашего классификатора?
Не всегда просто знать, какие показатели использовать С развитием глубокого обучения все больше и больше людей учатся обучать свой первый классификатор. Но как только вы закончите..

Работа с цепями Маркова, часть 4 (Машинное обучение)
Нелинейные цепи Маркова с агрегатором и их приложения (arXiv) Автор : Бар Лайт Аннотация: Изучаются свойства подкласса случайных процессов, называемых дискретными нелинейными цепями Маркова..

Crazy Laravel Livewire упростил мне создание электронной коммерции (панель администратора и API) [Часть 3]
Как вы сегодня, ребята? В этой части мы создадим CRUD для данных о продукте. Думаю, в этой части я не буду слишком много делиться теорией, но чаще буду делиться своим кодом. Потому что..

Использование машинного обучения и Python для классификации 1000 сезонов новичков MLB Hitter
Чему может научиться машина, глядя на сезоны новичков 1000 игроков MLB? Это то, что исследует это приложение. В этом процессе мы будем использовать неконтролируемое обучение, чтобы..

Учебные заметки: создание моего первого пакета Node.js
Это мои обучающие заметки, когда я научился создавать свой самый первый пакет Node.js, распространяемый через npm. Оглавление Глоссарий I. Новый пакет 1.1 советы по инициализации..

Забудьте о Matplotlib: улучшите визуализацию данных с помощью умопомрачительных функций Seaborn!
Примечание. Эта запись в блоге предполагает базовое знакомство с Python и концепциями анализа данных. Привет, энтузиасты данных! Добро пожаловать в мой блог, где я расскажу о невероятных..


Для любых предложений по сайту: [email protected]