Hadoop之MapReduce学习(三)之ip去重、MaxScore示例、TotalScoreMapper示例

香帅 2020-04-15

一、IP去重示例

数据文件:

192.168.10.111
192.168.10.111
10.32.100.111
192.168.21.111
192.168.10.112
192.168.10.111
192.168.11.111
192.168.12.112
192.168.11.111
IPMapper:
package com.blb.ip;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class IPMapper
        extends Mapper<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 每一行数据就是一个ip
        context.write(value, NullWritable.get());
    }
}
IPReducer:
package com.blb.ip;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class IPReducer
        extends Reducer<Text, NullWritable, Text, NullWritable> {

    // key:ip
    // values:null,null,null...
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

        context.write(key, NullWritable.get());

    }
}
IPDriver:
package com.blb.ip;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class IPDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(IPDriver.class);
        job.setMapperClass(IPMapper.class);
        job.setReducerClass(IPReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.addInputPath(job,
                new Path("hdfs://192.168.10.131:9000/txt/ip.txt"));

        FileOutputFormat.setOutputPath(job,
                new Path("hdfs://192.168.10.131:9000/result/ip"));

        job.waitForCompletion(true);
    }
}

二、axScore示例

数据文件:

张三 684
李四 312
王五 721
赵六 548
田七 470
王八 668
陈九 340

MaxScoreMapper:

package com.blb.maxscore;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MaxScoreMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String[] arr = value.toString().split(" ");
context.write(new Text(arr[0]),
new IntWritable(Integer.parseInt(arr[1])));
}
}

MaxScoreReducer:

package com.blb.maxscore;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MaxScoreReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 在MapReduce中,为了节省内存,减少对象的创建和销毁,采取了地址复用的机制
// 即在迭代过程中,对象只创建一次
IntWritable max = new IntWritable(0);
// IntWritable max = new IntWritable(0);
// IntWritable value = new IntWritable();
// values:684 312 721 548...
// value.set(684);
// value.get() > max.get() --- 684 > 0 --- true
// max = value; 此时max和value都是引用类型,给的是地址
// 也就意味着max和value的地址一样
// value.set(312); 此时max的值也是312
// value.get() > max.get() --- 312 > 312 --- false
// 继续迭代,max和value的地址始终一致
// max的值就是迭代的最后一个值
for (IntWritable value : values) {
if (value.get() > max.get())
// max = value;
max.set(value.get());
}
context.write(key, max);
}
}

MaxScoreDriver:

package com.blb.maxscore;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MaxScoreDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

job.setJarByClass(MaxScoreDriver.class);
job.setMapperClass(MaxScoreMapper.class);
job.setReducerClass(MaxScoreReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job,
new Path("hdfs://192.168.10.131:9000/txt/score2.txt"));

FileOutputFormat.setOutputPath(job,
new Path("hdfs://192.168.10.131:9000/result/maxscore2"));

job.waitForCompletion(true);
}
}

 三、TotalScoreMapper示例

数据文件:

张三 78
李四 66
王五 73
张三 88
田七 75
张三 65
陈九 90
李四 67
王五 78

IPMapper:

package com.blb.maxscore;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MaxScoreMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String[] arr = value.toString().split(" ");
context.write(new Text(arr[0]),
new IntWritable(Integer.parseInt(arr[1])));
}
}

TotalScoreMapper:

package com.blb.totalscore;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class TotalScoreMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 拆分名字和分数
String[] arr = value.toString().split(" ");
context.write(new Text(arr[0]),
new IntWritable(Integer.parseInt(arr[1])));
}
}

TotalScoreReducer:

package com.blb.totalscore;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class TotalScoreReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}

}

TotalScoreDriver:

package com.blb.totalscore;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class TotalScoreDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

job.setJarByClass(TotalScoreDriver.class);
job.setMapperClass(TotalScoreMapper.class);
job.setReducerClass(TotalScoreReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job,
new Path("hdfs://192.168.10.131:9000/txt/score2/"));

FileOutputFormat.setOutputPath(job,
new Path("hdfs://192.168.10.131:9000/result/totalscore"));

job.waitForCompletion(true);
}
}

 

相关推荐

飞鸿踏雪0 / 0评论 2020-05-07