一个人的世界 2018-08-25
贴下代码,留作备用,
@Override public int run(String[] args) throws Exception { Configuration hbaseConf = HBaseConfiguration.create(); /* String whh = hbaseConf.get("hbase.zookeeper.quorum"); System.out.print(whh);*/ Config config = new Config(); config.initJarFile("mr_hbase.properties"); String numReduceTasksStr = config.getValue("numReduceTasks"); Integer numReduceTasks = 3; if (NumberUtils.isDigits(numReduceTasksStr)) { numReduceTasks = Integer.valueOf(numReduceTasksStr); } String hbaseZookeeperQuorum = config.getValue("hbase.zookeeper.quorum"); hbaseConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum); String hbaseZookeeperPropertyClientPort = config.getValue("hbase.zookeeper.property.clientPort"); hbaseConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperPropertyClientPort); if (args.length > 2) { hbaseConf.set("hbase.zookeeper.quorum", args[2]); } Job job = Job.getInstance(hbaseConf); job.setJarByClass(BookKpUnitToHbaseMr.class); job.setMapperClass(BookKpUnitToHbaseMr.BookKpUnitToHbaseMapper.class); //将第一个路径参数作为输入参数 FileInputFormat.setInputPaths(job, new Path(args[0])); //将第二个HBase表参数作为输出参数 TableMapReduceUtil.initTableReducerJob(args[1], BookKpUnitToHbaseMr.BookKpUnitToHbaseReducer.class, job); -----> 设置reducer的时候,使用org.apache.hadoop.hbase.mapreduce类 job.setOutputKeyClass(Text.class); job.setOutputValueClass(StudentKpInfo.class); //设置任务个数 job.setNumReduceTasks(numReduceTasks); return job.waitForCompletion(true) ? 0 : 1; }
public static class toHbaseReducer extends TableReducer<Text, StudentScoreInfo, ImmutableBytesWritable> { ----> hbase的TableReducer设计只接受rowkey,其余的列簇,列名,列值按写入代码时灵活设置,因此这个类只有ImmutableBytesWritable @Override protected void reduce(Text key, Iterable<StudentScoreInfo> values, Context context) throws IOException, InterruptedException { try { String rowkey = key.toString(); Put put = new Put(rowkey.getBytes()); for (StudentScoreInfo studentScoreInfo : values) { put.addColumn(Bytes.toBytes("es"), Bytes.toBytes(studentScoreInfo.getStudentId()), Bytes.toBytes(studentScoreInfo.getStudentScoreValue())); // 写入列,参数1分别为 es表示列簇 参数2表示列名 参数3表示列值 } context.write(new ImmutableBytesWritable(Bytes.toBytes(rowkey)), put); // 将rowkey和这一列写入hbase } catch (Exception e) { logger.error("reduce error: ", e); } } }