xorxos 2020-05-26
1、继承RichSinkFunction
(1)首先在构造方式传入kudu的masterAddress地址、默认表名、TableSerializationSchema、KuduTableRowConverter、Properties配置对象
(2)重写open方法
初始化KuduClient对象操作kudu,KuduSession对象并传入一堆配置
(3)重写invoke方法
核心是如果已传入TableSerializationSchema对象,则通过其serializeTable方法从输入的json数据里提取表名,如果未定义则直接取默认表名。拿到表名后就能使用KuduClient对象对其操作了
if (schema != null) { String serializeTableName = schema.serializeTable(row); if (serializeTableName == null) return; table = client.openTable(serializeTableName); } else table = client.openTable(tableName); insert = table.newInsert();
2、定义KuduTableRowConverter接口,将每一条输入数据转换成TableRow对象
public interface KuduTableRowConverter<IN> extends Serializable { TableRow convert(IN value); }
定义TableRow类,代表一行数据,key是字串型的键名,value是Object型的键值
public class TableRow implements Serializable { private static final long serialVersionUID = 1L; private Map<String, Object> pairs = new HashMap<>(); public int size() {return pairs.size();} public Map<String, Object> getPairs() {return pairs;} public Object getElement(String key) {return pairs.get(key);} public void putElement(String key, Object value) {pairs.put(key, value);} }
定义JsonKuduTableRowConverter实现KuduTableRowConverter接口,对于输入的json数据,通过一系列转换逻辑转换成TableRow对象
3、定义TableSerializationSchema接口,从每一条输入数据里提取表名
public interface TableSerializationSchema<IN> extends Serializable { String serializeTable(IN value); }
定义JsonLogidKeyTableSerializationSchema实现TableSerializationSchema接口,对于输入的json数据,使用指定key值提取value值,然后再从一个预先获取的map里找到这个value对应的表名,然后加上必要的前缀与后缀组成impala的表名