Что ж, это просто еще один вариант использования R для обработки больших наборов данных, вместо использования Rhipe
. У меня есть некоторый опыт использования потоковой передачи Hadoop, и мне это очень подходит.
Короче говоря, вы можете использовать любой язык для написания заданий сокращения карты с помощью Hadoop Streaming.
Вот некоторый код, который я написал ранее, чтобы дать вам краткое представление о том, как он выглядит в R.
Вы можете настроить свой собственный myfunction
, и он должен быть почти готов.
#!/usr/bin/Rscript
library(dplyr)
f <- file("stdin")
open(f, open="r")
mydelimiter <- '\t'
mydelimiter_col <- rawToChar(as.raw(1))
myfunction <- function(inputarray){
...
return(result)
}
# FIRST LINE
firstline <- readLines(f, n=1)
fields <- unlist(strsplit(firstline, split=mydelimiter))
mykey_current <- fields[1]
myvalue_array <- array(fields[2])
while( length(line<-readLines(f, n=1)) > 0){
fields <- unlist(strsplit(line, split=mydelimiter))
mykey_new <- fields[1]
myvalue_new <- fields[2]
if (identical(mykey_new, mykey_current)) {
# Same Key: append new value to existing value array
myvalue_array <- c(myvalue_array, myvalue_new)
} else {
# Different Key
# (1): process existing values
result <- myfunction(myvalue_array)
cat(mykey_current); cat(mydelimiter)
cat(result); cat('\n')
# (2): reset key, value
mykey_current <- mykey_new
myvalue_array <- array(myvalue_new)
}
}
# LAST LINE
result <- myfunction(myvalue_array)
cat(mykey_current); cat(mydelimiter)
cat(result); cat('\n')
Для запуска задания потоковой передачи Hadoop. Я пишу небольшой баш, который выглядит так:
#!/bin/bash
hdfs dfs -rmr <hdfspath_output>
hadoop jar \
/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.3.0-mr1-cdh5.1.0.jar \
-files mapper.py,reducer.R \
-input <hdfspath_input> \
-output <hdfspath_output> \
-mapper mapper.py \
-reducer reducer.R
20.11.2014