关联规则二项集Hadoop实现

NA 2012-11-08

近期看mahout的关联规则源码,颇为头痛,本来打算写一个系列分析关联规则的源码的,但是后面看到有点乱了,可能是稍微有点复杂吧,所以就打算先实现最简单的二项集关联规则。

算法的思想还是参考上次的图片:

关联规则二项集Hadoop实现

这里实现分为五个步骤:

  1. 针对原始输入计算每个项目出现的次数;
  2. 按出现次数从大到小(排除出现次数小于阈值的项目)生成frequence list file;
  3. 针对原始输入的事务进行按frequence list file进行排序并剪枝;
  4. 生成二项集规则;
  5. 计算二项集规则出现的次数,并删除小于阈值的二项集规则;

第一步的实现:包括步骤1和步骤2,代码如下:

GetFlist.java:

  1. package org.fansy.date1108.fpgrowth.twodimension;
  2. import java.io.BufferedReader;
  3. import java.io.IOException;
  4. import java.io.InputStreamReader;
  5. import java.util.ArrayList;
  6. import java.util.Comparator;
  7. import java.util.Iterator;
  8. import java.util.List;
  9. import java.util.PriorityQueue;
  10. import java.util.regex.Pattern;
  11. import org.apache.Hadoop.conf.Configuration;
  12. import org.apache.hadoop.fs.FSDataInputStream;
  13. import org.apache.hadoop.fs.FSDataOutputStream;
  14. import org.apache.hadoop.fs.FileSystem;
  15. import org.apache.hadoop.fs.Path;
  16. import org.apache.hadoop.io.IntWritable;
  17. import org.apache.hadoop.io.LongWritable;
  18. import org.apache.hadoop.io.Text;
  19. import org.apache.hadoop.mapreduce.Job;
  20. import org.apache.hadoop.mapreduce.Mapper;
  21. import org.apache.hadoop.mapreduce.Reducer;
  22. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  23. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  24. // the specific comparator
  25. class MyComparator implements Comparator<String>{
  26. private String splitter=",";
  27. public MyComparator(String splitter){
  28. this.splitter=splitter;
  29. }
  30. @Override
  31. publicint compare(String o1, String o2) {
  32. // TODO Auto-generated method stub
  33. String[] str1=o1.toString().split(splitter);
  34. String[] str2=o2.toString().split(splitter);
  35. int num1=Integer.parseInt(str1[1]);
  36. int num2=Integer.parseInt(str2[1]);
  37. if(num1>num2){
  38. return -1;
  39. }elseif(num1<num2){
  40. return1;
  41. }else{
  42. return str1[0].compareTo(str2[0]);
  43. }
  44. }
  45. }
  46. publicclass GetFList {
  47. /**
  48. * the program is based on the picture
  49. */
  50. // Mapper
  51. publicstaticclass MapperGF extends Mapper<LongWritable ,Text ,Text,IntWritable>{
  52. private Pattern splitter=Pattern.compile("[ ]*[ ,|\t]");
  53. privatefinal IntWritable newvalue=new IntWritable(1);
  54. publicvoid map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
  55. String [] items=splitter.split(value.toString());
  56. for(String item:items){
  57. context.write(new Text(item), newvalue);
  58. }
  59. }
  60. }
  61. // Reducer
  62. publicstaticclass ReducerGF extends Reducer<Text,IntWritable,Text ,IntWritable>{
  63. publicvoid reduce(Text key,Iterable<IntWritable> value,Context context) throws IOException, InterruptedException{
  64. int temp=0;
  65. for(IntWritable v:value){
  66. temp+=v.get();
  67. }
  68. context.write(key, new IntWritable(temp));
  69. }
  70. }
  71. publicstaticvoid main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  72. // TODO Auto-generated method stub
  73. if(args.length!=3){
  74. System.out.println("Usage: <input><output><min_support>");
  75. System.exit(1);
  76. }
  77. String input=args[0];
  78. String output=args[1];
  79. int minSupport=0;
  80. try {
  81. minSupport=Integer.parseInt(args[2]);
  82. } catch (NumberFormatException e) {
  83. // TODO Auto-generated catch block
  84. minSupport=3;
  85. }
  86. Configuration conf=new Configuration();
  87. String temp=args[1]+"_temp";
  88. Job job=new Job(conf,"the get flist job");
  89. job.setJarByClass(GetFList.class);
  90. job.setMapperClass(MapperGF.class);
  91. job.setCombinerClass(ReducerGF.class);
  92. job.setReducerClass(ReducerGF.class);
  93. job.setOutputKeyClass(Text.class);
  94. job.setOutputValueClass(IntWritable.class);
  95. FileInputFormat.setInputPaths(job, new Path(input));
  96. FileOutputFormat.setOutputPath(job, new Path(temp));
  97. boolean succeed=job.waitForCompletion(true);
  98. if(succeed){
  99. // read the temp output and write the data to the final output
  100. List<String> list=readFList(temp+"/part-r-00000",minSupport);
  101. System.out.println("the frequence list has generated ... ");
  102. // generate the frequence file
  103. generateFList(list,output);
  104. System.out.println("the frequence file has generated ... ");
  105. }else{
  106. System.out.println("the job is failed");
  107. System.exit(1);
  108. }
  109. }
  110. // read the temp_output and return the frequence list
  111. publicstatic List<String> readFList(String input,int minSupport) throws IOException{
  112. // read the hdfs file
  113. Configuration conf=new Configuration();
  114. Path path=new Path(input);
  115. FileSystem fs=FileSystem.get(path.toUri(),conf);
  116. FSDataInputStream in1=fs.open(path);
  117. PriorityQueue<String> queue=new PriorityQueue<String>(15,new MyComparator("\t"));
  118. InputStreamReader isr1=new InputStreamReader(in1);
  119. BufferedReader br=new BufferedReader(isr1);
  120. String line;
  121. while((line=br.readLine())!=null){
  122. int num=0;
  123. try {
  124. num=Integer.parseInt(line.split("\t")[1]);
  125. } catch (NumberFormatException e) {
  126. // TODO Auto-generated catch block
  127. num=0;
  128. }
  129. if(num>minSupport){
  130. queue.add(line);
  131. }
  132. }
  133. br.close();
  134. isr1.close();
  135. in1.close();
  136. List<String> list=new ArrayList<String>();
  137. while(!queue.isEmpty()){
  138. list.add(queue.poll());
  139. }
  140. return list;
  141. }
  142. // generate the frequence file
  143. publicstaticvoid generateFList(List<String> list,String output) throws IOException{
  144. Configuration conf=new Configuration();
  145. Path path=new Path(output);
  146. FileSystem fs=FileSystem.get(path.toUri(),conf);
  147. FSDataOutputStream writer=fs.create(path);
  148. Iterator<String> i=list.iterator();
  149. while(i.hasNext()){
  150. writer.writeBytes(i.next()+"\n");// in the last line add a \n which is not supposed to exist
  151. }
  152. writer.close();
  153. }
  154. }

相关推荐