【Flume】自定义分区实现kafka有序

chenguangchun 2019-12-28

1)Source中使用拦截器

kafka_key.sources.sources1.interceptors = i1
kafka_key.sources.sources1.interceptors.i1.type = com.bigdata.flume.MyInterceptor$Builder

kafka_key.sources.sources1.interceptors = i1
kafka_key.sources.sources1.interceptors.i1.type = regex_extractor
kafka_key.sources.sources1.interceptors.i1.regex = .*?\\|(.*?)\\|.*
kafka_key.sources.sources1.interceptors.i1.serializers = s1
kafka_key.sources.sources1.interceptors.i1.serializers.s1.name = key

2)sink中使用自定义的partitioner类:
kafka_key.sinks.sink1.kafka.partitioner.class = com.lxw1234.flume17.SimplePartitioner

自定义的分区类:
import java.util.List;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import com.google.common.collect.Lists;
import com.nebo.kafka_study.an2.model.UserBehaviorRequestModel;
import com.nebo.kafka_study.an2.utils.JSONUtil;
public class BehaviorInterceptor implements Interceptor
{
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        //如果event为空过滤掉
        if(event == null || event.getBody() == null || event.getBody().length == 0){
            return null;
        }

        long userId = 0;

        //解析日志
        try{
            UserBehaviorRequestModel model = JSONUtil.json2Object(new String(event.getBody()),UserBehaviorRequestModel.class);
            userId = model.getUserId();
        }catch (Exception e){
            e.printStackTrace();
        }

        if(userId == 0){
            return null;
        }
        //将userId赋值给key
        event.getHeaders().put("key",userId+"");

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {

        List<Event> out = Lists.newArrayList();
        for (Event event : events) {
            Event outEvent = intercept(event);
            //event 为空过滤掉
            if (outEvent != null) { out.add(outEvent); }
        }
        return out;
    }

    @Override
    public void close() {

    }
    //程序入口
    public static class BehaviorBuilder implements Interceptor.Builder {

        @Override
        public Interceptor build() {
            return new BehaviorInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

Json工具:
package com.nebo.kafka_study.an2.utils;
import com.google.gson.Gson;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.StringWriter;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public final class JSONUtil
{

  private static final Logger LOG = LoggerFactory.getLogger(JSONUtil.class);
  private static final ObjectMapper MAPPER = new ObjectMapper();
  private static final ObjectMapper NEWMAPPER = new ObjectMapper();
  private static JSONUtil jsonUtil;
  private static Gson GSON = new Gson();

  static {
    //NEWMAPPER.setVisibility(JsonMethod.FIELD, JsonAutoDetect.Visibility.ANY);
    NEWMAPPER.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
  }

  private JSONUtil()
  {
  }

  public static JSONUtil getInstance()
  {
    synchronized (JSONUtil.class) {
      if (jsonUtil == null) {
        jsonUtil = new JSONUtil();
      }
    }

    return jsonUtil;
  }

  public static String fromObject(Object obj) throws IOException, JsonGenerationException, JsonMappingException
  {
    StringWriter stringWriter = new StringWriter();
    MAPPER.writeValue(stringWriter, obj);
    return stringWriter.toString();
  }

  public static String writeValueAsString(Object value)
  {
    try {
      return MAPPER.writeValueAsString(value);
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

  public static String fromListForData(List<?> list) throws IOException, JsonGenerationException,
    JsonMappingException
  {
    StringWriter stringWriter = new StringWriter();
    stringWriter.write("{data:[");
    for (int i = 0; i < list.size(); i++) {
      stringWriter.write(fromObject(list.get(i)));
      if (i != list.size() - 1) {
        stringWriter.write(",");
      }
    }
    stringWriter.write("]}");
    return stringWriter.toString();
  }

  public static List<?> toList(String json) throws IOException, JsonGenerationException, JsonMappingException
  {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Get json string is:" + json);
    }
    MAPPER.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
    return MAPPER.readValue(json, List.class);
  }

  public static Map<?, ?> toMap(String json) throws IOException, JsonGenerationException, JsonMappingException
  {
    MAPPER.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
    return MAPPER.readValue(json, Map.class);
  }

  /**
   * 从Json字串中得到指定属性值
   */
  public static Object getFromJson(String jsonStr, String proertyName)
  {
    Map map = new HashMap();
    try {
      map = JSONUtil.getInstance().toMap(jsonStr);
    } catch (Exception e) {
      LOG.error("", e);
    }
    return (Object) map.get(proertyName);
  }

  public static <T> T json2Object(String json, Class<T> clazz) throws JsonParseException, JsonMappingException, IOException
  {
    return MAPPER.readValue(json, clazz);
  }

  public static <T> T json2ObjectIgnoreDifference(String json, Class<T> clazz) throws JsonParseException, JsonMappingException, IOException
  {
    return NEWMAPPER.readValue(json, clazz);
  }

  public static String listToJsonStr(List<?> list, Type type)
  {
    //Type listType = new TypeToken<List<?>>(){}.getType();
    return GSON.toJson(list, type);
  }

  /**
   * Type listType = new TypeToken<List<T>>(){}.getType();
   */
  public static List<?> listFromJsonStr(String jsonStr, Type type)
  {
    List resultList = GSON.fromJson(jsonStr, type);

    return resultList;
  }
}

相关推荐

wsong / 0评论 2020-04-15