3.5 Hadoop Pipes
Hadoop Pipes提供了一个在Hadoop上运行C++程序的方法。与流不同的是,流使用的是标准输入输出作为可执行程序与Hadoop相关进程间通信的工具,而Pipes使用的是Sockets。先看一个示例程序wordcount.cpp:
include"hadoop/Pipes.hh"
include"hadoop/TemplateFactory.hh"
include"hadoop/StringUtils.hh"
const std:string WORDCOUNT="WORDCOUNT";
const std:string INPUT_WORDS="INPUT_WORDS";
const std:string OUTPUT_WORDS="OUTPUT_WORDS";
class WordCountMap:public HadoopPipes:Mapper{
public:
HadoopPipes:TaskContext:Counter*inputWords;
WordCountMap(HadoopPipes:TaskContext&context){
inputWords=context.getCounter(WORDCOUNT, INPUT_WORDS);
}
void map(HadoopPipes:MapContext&context){
std:vector<std:string>words=
HadoopUtils:splitString(context.getInputValue(),"");
for(unsigned int i=0;i<words.size();++i){
context.emit(words[i],"1");
}
context.incrementCounter(inputWords, words.size());
}
};
class WordCountReduce:public HadoopPipes:Reducer{
public:
HadoopPipes:TaskContext:Counter*outputWords;
WordCountReduce(HadoopPipes:TaskContext&context){
outputWords=context.getCounter(WORDCOUNT, OUTPUT_WORDS);
}
void reduce(HadoopPipes:ReduceContext&context){
int sum=0;
while(context.nextValue()){
sum+=HadoopUtils:toInt(context.getInputValue());
}
context.emit(context.getInputKey(),HadoopUtils:toString(sum));
context.incrementCounter(outputWords,1);
}
};
int main(int argc, char*argv[]){
return HadoopPipes:runTask(HadoopPipes:TemplateFactory<WordCountMap,
WordCountReduce>());
}
这个程序连接的是一个C++库,结构类似于Java编写的程序。如新版API一样,这个程序使用context方法读入和收集<key, value>对。在使用时要重写HadoopPipes名字空间下的Mapper和Reducer函数,并用context.emit()方法输出<key, value>对。main函数是应用程序的入口,它调用HadoopPipes:runTask方法,这个方法由一个TemplateFactory参数来创建Map和Reduce实例,也可以重载factory设置combiner()、partitioner()、record reader、record writer。
接下来,编译这个程序。这个编译命令需要用到g++,读者可以使用apt自动安装这个程序。g++的命令格式如下所示:
apt-get install g++
然后建立文件Makerfile,如下所示:
HADOOP_INSTALL="你的hadoop安装文件夹"
PLATFORM=Linux-i386-32(如果是AMD的CPU,请使用Linux-amd64-64)
CC=g++
CPPFLAGS=-m32-I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include
wordcount:wordcount.cpp
$(CC)$(CPPFLAGS)$<-Wall-L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib-lhadooppipes
-lhadooputils-lpthread-g-O2-o$@
注意在$(CC)前有一个<tab>符号,这个分隔符是很关键的。
在当前目录下建立一个WordCount可执行文件。
接着,上传可执行文件到HDFS上,这是为了TaskTracker能够获得这个可执行文件。这里上传到bin文件夹内。
~/hadoop/bin/hadoop fs-mkdir bin
~/hadoop/bin/hadoop dfs-put wordcount bin
然后,就可以运行这个MapReduce程序了,可以采用两种配置方式运行这个程序。一种方式是直接在命令中运行指定配置,如下所示:
~/hadoop/bin/hadoop pipes\
-D hadoop.pipes.java.recordreader=true\
-D hadoop.pipes.java.recordwriter=true\
-input input\
-output Coutput\
-program bin/wordcount
另一种方式是预先将配置写入配置文件中,如下所示:
<?xml version="1.0"?>
<configuration>
<property>
//Set the binary path on DFS
<name>hadoop.pipes.executable</name>
<value>bin/wordcount</value>
</property>
<property>
<name>hadoop.pipes.java.recordreader</name>
<value>true</value>
</property>
<property>
<name>hadoop.pipes.java.recordwriter</name>
<value>true</value>
</property>
</configuration>
然后通过如下命令运行这个程序:
~/hadoop/bin/hadoop pipes-conf word.xml-input input-output output
将参数hadoop.pipes.executable和hadoop.pipes.java.recordreader设置为true表示使用Hadoop默认的输入输出方式(即Java的)。同样的,也可以设置一个Java语言编写的Mapper函数、Reducer函数、combiner函数和partitioner函数。实际上,在任何一个作业中,都可以混用Java类和C++类。