chenguangchun 2019-12-28
1)Source中使用拦截器
kafka_key.sources.sources1.interceptors = i1
kafka_key.sources.sources1.interceptors.i1.type = com.bigdata.flume.MyInterceptor$Builderkafka_key.sources.sources1.interceptors = i1
kafka_key.sources.sources1.interceptors.i1.type = regex_extractor
kafka_key.sources.sources1.interceptors.i1.regex = .*?\\|(.*?)\\|.*
kafka_key.sources.sources1.interceptors.i1.serializers = s1
kafka_key.sources.sources1.interceptors.i1.serializers.s1.name = key
2)sink中使用自定义的partitioner类:
kafka_key.sinks.sink1.kafka.partitioner.class = com.lxw1234.flume17.SimplePartitioner
自定义的分区类:
import java.util.List;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import com.google.common.collect.Lists;
import com.nebo.kafka_study.an2.model.UserBehaviorRequestModel;
import com.nebo.kafka_study.an2.utils.JSONUtil;
public class BehaviorInterceptor implements Interceptor
{
@Override
public void initialize() {}
@Override
public Event intercept(Event event) {
//如果event为空过滤掉
if(event == null || event.getBody() == null || event.getBody().length == 0){
return null;
}long userId = 0;
//解析日志
try{
UserBehaviorRequestModel model = JSONUtil.json2Object(new String(event.getBody()),UserBehaviorRequestModel.class);
userId = model.getUserId();
}catch (Exception e){
e.printStackTrace();
}if(userId == 0){
return null;
}
//将userId赋值给key
event.getHeaders().put("key",userId+"");return event;
}@Override
public List<Event> intercept(List<Event> events) {List<Event> out = Lists.newArrayList();
for (Event event : events) {
Event outEvent = intercept(event);
//event 为空过滤掉
if (outEvent != null) { out.add(outEvent); }
}
return out;
}@Override
public void close() {}
//程序入口
public static class BehaviorBuilder implements Interceptor.Builder {@Override
public Interceptor build() {
return new BehaviorInterceptor();
}@Override
public void configure(Context context) {}
}
}Json工具:
package com.nebo.kafka_study.an2.utils;
import com.google.gson.Gson;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.StringWriter;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public final class JSONUtil
{private static final Logger LOG = LoggerFactory.getLogger(JSONUtil.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final ObjectMapper NEWMAPPER = new ObjectMapper();
private static JSONUtil jsonUtil;
private static Gson GSON = new Gson();static {
//NEWMAPPER.setVisibility(JsonMethod.FIELD, JsonAutoDetect.Visibility.ANY);
NEWMAPPER.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}private JSONUtil()
{
}public static JSONUtil getInstance()
{
synchronized (JSONUtil.class) {
if (jsonUtil == null) {
jsonUtil = new JSONUtil();
}
}return jsonUtil;
}public static String fromObject(Object obj) throws IOException, JsonGenerationException, JsonMappingException
{
StringWriter stringWriter = new StringWriter();
MAPPER.writeValue(stringWriter, obj);
return stringWriter.toString();
}public static String writeValueAsString(Object value)
{
try {
return MAPPER.writeValueAsString(value);
} catch (Exception e) {
throw new RuntimeException(e);
}
}public static String fromListForData(List<?> list) throws IOException, JsonGenerationException,
JsonMappingException
{
StringWriter stringWriter = new StringWriter();
stringWriter.write("{data:[");
for (int i = 0; i < list.size(); i++) {
stringWriter.write(fromObject(list.get(i)));
if (i != list.size() - 1) {
stringWriter.write(",");
}
}
stringWriter.write("]}");
return stringWriter.toString();
}public static List<?> toList(String json) throws IOException, JsonGenerationException, JsonMappingException
{
if (LOG.isDebugEnabled()) {
LOG.debug("Get json string is:" + json);
}
MAPPER.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
return MAPPER.readValue(json, List.class);
}public static Map<?, ?> toMap(String json) throws IOException, JsonGenerationException, JsonMappingException
{
MAPPER.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
return MAPPER.readValue(json, Map.class);
}/**
* 从Json字串中得到指定属性值
*/
public static Object getFromJson(String jsonStr, String proertyName)
{
Map map = new HashMap();
try {
map = JSONUtil.getInstance().toMap(jsonStr);
} catch (Exception e) {
LOG.error("", e);
}
return (Object) map.get(proertyName);
}public static <T> T json2Object(String json, Class<T> clazz) throws JsonParseException, JsonMappingException, IOException
{
return MAPPER.readValue(json, clazz);
}public static <T> T json2ObjectIgnoreDifference(String json, Class<T> clazz) throws JsonParseException, JsonMappingException, IOException
{
return NEWMAPPER.readValue(json, clazz);
}public static String listToJsonStr(List<?> list, Type type)
{
//Type listType = new TypeToken<List<?>>(){}.getType();
return GSON.toJson(list, type);
}/**
* Type listType = new TypeToken<List<T>>(){}.getType();
*/
public static List<?> listFromJsonStr(String jsonStr, Type type)
{
List resultList = GSON.fromJson(jsonStr, type);return resultList;
}
}