MapReduce编程实践-基于IDEA/Maven实现单词统计分析

yhblog 2019-06-01

在开始MapReduce编程之前,需要做好如下准备工作。

(1)搭建好单机版本或者伪分布式Hadoop环境;

CentOS 7 单机安装最新版Hadoop v3.1.2以及配置和简单测试

Hadoop v3.1.2 伪分布式安装(Pseudo-Distributed Operation)

(2)在HDFS中创建好input文件夹,并上传文本文件到HDFS中的input文件夹中;

创建input文件夹

bin/hadoop fs -mkdir input

上传本地input文件夹中文件到HDFS中的input文件夹中

bin/hadoop fs -put input/*.txt input

查看HDFS 目录(-R 是ls命令的递归选项)

bin/hadoop fs -ls -R

如果output 目录已经存在,则删除output 文件夹(重新运行应用时,也需要首先将HDFS中的output文件夹删除,然后在运行)

bin/hadoop fs -rm -r output

再次查看HDFS目录

bin/hadoop fs -ls -R

(3)删除output 文件夹,MapReduce应用运行时会将结果存放在该目录中;

编写MapReduce应用,主要包括如下几个步骤:

(1)编写Map处理逻辑;

(2)编写Reduce处理逻辑;

(3)编写main 方法;

(4)编译打包代码,以及运行应用程序

MapReduce编程实践-基于IDEA/Maven实现单词统计分析

一,编写Map处理逻辑

首先,通过IDEA创建一个Maven项目,并添加对hadoop-client的引用。

MapReduce编程实践-基于IDEA/Maven实现单词统计分析

添加对hadoop-client jar包的引用:

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-client</artifactId>

<version>3.1.2</version>

</dependency>

MapReduce编程实践-基于IDEA/Maven实现单词统计分析

通过继承Mapper类来实现Map处理逻辑。

package com.rickie.wordcount;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

import java.util.StringTokenizer;

public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

private static final IntWritable one = new IntWritable(1);

private Text word = new Text();

public TokenizerMapper() {

}

@Override

public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)

throws IOException, InterruptedException {

StringTokenizer itr = new StringTokenizer(value.toString());

while(itr.hasMoreTokens()) {

this.word.set(itr.nextToken());

context.write(this.word, one);

}

}

}

Map过程需要继承org.apache.hadoop.mapreduce包中 Mapper 类,并重写其map方法。

public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>

其中的模板参数:第一个Object表示输入key的类型;第二个Text表示输入value的类型;第三个Text表示表示输出键的类型;第四个IntWritable表示输出值的类型。

作为map方法输入的键值对,其value值存储的是文本文件中的一行(以回车符为行结束标记),而key值为该行的首字母相对于文本文件的首地址的偏移量。然后StringTokenizer类将每一行拆分成为一个个的单词,并将<word,1>作为map方法的结果输出,其余的工作都交有 MapReduce框架处理。

StringTokenizer是Java工具包中的一个类,用于将字符串进行拆分—默认情况下使用空格作为分隔符进行分割。

二,编写Reduce处理逻辑

在Map运行结束得到中间结果后,接下来进入Shuffle阶段,在这个阶段中Hadoop自动将Map的输出结果进行分区、排序、合并,然后分发给对应的Reduce任务去处理。

下面是Reduce处理逻辑的具体代码:

package com.rickie.wordcount;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

import java.util.Iterator;

public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

private IntWritable result = new IntWritable();

public IntSumReducer() {}

@Override

public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)

throws IOException, InterruptedException {

int sum = 0;

IntWritable val;

for(Iterator i = values.iterator(); i.hasNext(); sum +=val.get()){

val = (IntWritable)i.next();

}

this.result.set(sum);

context.write(key, this.result);

}

}

继承Hadoop提供的类(Reducer),并override其方法(reduce)。

public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>

其中模板参数同Map一样,依次表示是输入键类型,输入值类型,输出键类型,输出值类型。

public void reduce(Text key, Iterable<IntWritable> values, Context context)

reduce 方法的输入参数 key 为单个单词,而 values 是由各Mapper上对应单词的计数值所组成的列表(一个实现了 Iterable 接口的变量,可以理解成 values 里包含若干个 IntWritable 整数,可以通过迭代的方式遍历所有的值),所以只要遍历 values 并求和,即可得到某个单词出现的总次数。

当Reduce过程结束时,就可以得到最终需要的数据了。

三,编写main方法

为了让前面的Map/Reduce处理类能够协同工作,需要在main方法中通过Job 类设置Hadoop应用程序运行时的环境变量。

package com.rickie;

import com.rickie.wordcount.IntSumReducer;

import com.rickie.wordcount.TokenizerMapper;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

/**

* Word Count

*

*/

public class WordCount

{

public static void main( String[] args ) throws IOException, ClassNotFoundException, InterruptedException {

System.out.println( "Hello World!" );

Configuration conf = new Configuration();

String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();

if(otherArgs.length < 2) {

System.err.println("Usage: wordcount <in> [<in>...] <out>");

}

// 设置环境参数

Job job = Job.getInstance(conf, "word count");

// 设置整个应用的类名

job.setJarByClass(WordCount.class);

// 添加Mapper 类

job.setMapperClass(TokenizerMapper.class);

// 添加Reducer 类

job.setReducerClass(IntSumReducer.class);

// 设置key 输出类型

job.setOutputKeyClass(Text.class);

// 设置value 输出类型

job.setOutputValueClass(IntWritable.class);

for(int i=0; i<otherArgs.length - 1; ++i) {

// 设置输入文件

FileInputFormat.addInputPath(job, new Path(otherArgs[i]));

}

// 设置输出文件

FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length -1]));

System.exit(job.waitForCompletion(true)?0:1);

}

}

在代码的开始部分,通过类 Configuration 获取程序运行时的参数情况,并存储到 otherArgs 变量中。

然后,通过类 Job 设置环境参数。

最后,根据之前获得的程序运行时参数,设置输入/输出文件路径。

完成相应任务的参数设定后,即可调用 job.waitForCompletion() 方法执行任务。

四,编译打包代码,以及运行应用程序

完整Maven项目,如图所示。

MapReduce编程实践-基于IDEA/Maven实现单词统计分析

通过maven命令,mvn clean package,打包wordcount.jar 包文件,并复制到hadoop 存储节点。

MapReduce编程实践-基于IDEA/Maven实现单词统计分析

根据MapReduce设计理念:计算向数据靠拢。将jar 包复制到hadoop 存储节点。

MapReduce编程实践-基于IDEA/Maven实现单词统计分析

在运行之前,先检查一下input / output目录。

bin/hadoop fs -ls -R

显示input 目录中有2个文件,output目录不存在。

运行 wordcount.jar 应用

bin/hadoop jar wordcount.jar com.rickie.WordCount input output

MapReduce编程实践-基于IDEA/Maven实现单词统计分析

查看输出结果

bin/hdfs dfs -cat output/*

或者

bin/hadoop fs -cat output/*

MapReduce编程实践-基于IDEA/Maven实现单词统计分析

相关推荐