У меня есть много файлов gz
, хранящихся в кластере HDFS
из 20 узлов, которые необходимо агрегировать по столбцам. Файлы gz
очень большие (1 ГБ каждый и всего 200 файлов). Формат данных — значение ключа с двумя значениями столбца: ['key','value1','value2']
, и его необходимо сгруппировать по ключу с агрегированием по столбцу: sum(value1)
, count(value2)
.
Данные уже отсортированы по ключу, и каждый файл gz имеет эксклюзивные значения ключа.
Например:
File 1:
k1,v1,u1
k1,v2,u1
k2,v2,u2
k3,v3,u3
k3,v4,u4
File 2:
k4,v5,u6
k4,v7,u8
k5,v9,v10
File 3:
k6,...
...
...
File 200:
k200,v200,u200
k201,v201,u201
Сначала я анализирую дату и преобразовываю данные в структуру (key, list of (values))
. Вывод парсера будет таким:
parser output
(k1,[v1,u1])
(k1,[v2,u1])
(k2,[v2,u2])
(k3,[v3,u3])
(k3,[v4,u4])
Затем сгруппируйте по ключевым значениям, используя функцию reduceByKey
, которая более эффективна, чем функция groupByKey
.
reducer output:
(k1,[[v1,u1],[v2,u1])
(k2,[[v2,u2]])
(k3,[[v3,u3],[v4,u4]])
Затем агрегируйте столбцы, используя функцию процесса:
process
(k1, sum([v1,v2], len([u1,u3])))
(k2, sum([v2], len([u2])))
(k3, sum([v3,v4], len([u3,u4])))
Вот пример кода процесса
import pyspark
from pyspark import SparkFiles
def parser(line):
try:
key,val=line.split('\t)
return (key,[val1,val2])
except:
return None
def process(line):
key,gr= line[0],line[1]
vals=zip(*gr)
val1=sum(vals[0])
val2=len(vals[1])
return ('\t'.join([key,val1,val2]))
sc = pyspark.SparkContext(appName="parse")
logs=sc.textFile("hdfs:///home/user1/*.gz")
proc=logs.map(parser).filter(bool).reduceByKey(lambda acc,x: acc+x).map(process)
proc.saveAsTextFile('hdfs:///home/user1/output1')
Я думаю, что этот код не полностью использует искровой кластер. Мне нравится оптимизировать код, чтобы полностью использовать обработку.
<сильный>1. Как лучше всего обрабатывать gz-файлы в HDFS и Pyspark? — как полностью распределить обработку gz-файлов на весь кластер?
<сильный>2. Как полностью использовать все ЦП в каждом узле? для процесса агрегации и синтаксического анализа