strongyoung 2020-02-20
*要求:读取HBase当中user这张表的f1:name、f1:age数据,将数据写入到另外一张user2表的f1列族里面去==****
注意:两张表的列族一定要相同
/** create ‘user‘,‘f1‘ put ‘user‘,‘rk001‘,‘f1:name‘,‘tony‘ put ‘user‘,‘rk001‘,‘f1:age‘,‘12‘ put ‘user‘,‘rk001‘,‘f1:address‘,‘beijing‘ put ‘user‘,‘rk002‘,‘f1:name‘,‘wangwu‘ create ‘user2‘,‘f1‘ */
pom.xml文件内容如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>Hadoop</groupId> <artifactId>HbaseTang</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <properties> <hadoop.version>2.7.3</hadoop.version> </properties> <dependencies> <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>1.2</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-app</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-hs</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies> </project>
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import java.io.IOException;
/**
myuser f1: name&age => myuser2 f1
*/
public class HBaseReadMapper extends TableMapper<Text, Put> {
/**
*
@param key rowkey
@param value rowkey此行的数据 Result类型
@param context
@throws IOException
@throws InterruptedException
/**
Result value:hbase中读取的result对象
//获得roweky的字节数组
byte[] rowkey_bytes = key.get();
String rowkeyStr = Bytes.toString(rowkey_bytes);
Text text = new Text(rowkeyStr);
//输出数据 -> 写数据 -> Put 构建Put对象
Put put = new Put(rowkey_bytes);
//获取一行中所有的Cell对象
Cell[] cells = value.rawCells();
//将f1 : name& age输出
for(Cell cell: cells) {
//当前cell是否是f1
//列族
byte[] family_bytes = CellUtil.cloneFamily(cell);
String familyStr = Bytes.toString(family_bytes);
if("f1".equals(familyStr)) {
//在判断是否是name | age
byte[] qualifier_bytes = CellUtil.cloneQualifier(cell);
String qualifierStr = Bytes.toString(qualifier_bytes);
if("name".equals(qualifierStr)) {
put.add(cell);
}
if("age".equals(qualifierStr)) {
put.add(cell);
}
}
}
//判断是否为空;不为空,才输出
if(!put.isEmpty()){
context.write(text, put);
}
}
}
package com.kaikeba.hbase.demo01; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.Text; import java.io.IOException; /** * TableReducer第三个泛型包含rowkey信息 */ public class HBaseWriteReducer extends TableReducer<Text, Put, ImmutableBytesWritable> { //将map传输过来的数据,写入到hbase表 /** Text:map端输出键类型 Put:map端输出值类型 ImmutableBytesWritable:reduce端输出键类型 */ @Override protected void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException { /** *Text key:接收map端输出键 *Iterable<Put> values:接收map端输出值,put对象封装成的迭代器 */ //rowkey ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(); immutableBytesWritable.set(key.toString().getBytes()); //遍历put对象,并输出 for(Put put: values) { context.write(immutableBytesWritable, put); } } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class HBaseMR extends Configured implements Tool { public static void main(String[] args) throws Exception { Configuration configuration = HBaseConfiguration.create(); //设定绑定的zk集群 configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181"); int run = ToolRunner.run(configuration, new HBaseMR(), args); System.exit(run); } @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(super.getConf()); job.setJarByClass(HBaseMR.class); //mapper TableMapReduceUtil.initTableMapperJob(TableName.valueOf("myuser"), new Scan(),HBaseReadMapper.class, Text.class, Put.class, job); //reducer TableMapReduceUtil.initTableReducerJob("myuser2", HBaseWriteReducer.class, job); boolean b = job.waitForCompletion(true); return b? 0: 1; } }
打包:
执行命令:
hadoop jar HbaseTang-1.0-SNAPSHOT.jar mapreduce_hbase.HbaseMR
执行结果: