sweetgirl0 2020-01-09
//暂停kafka的消费 暂停分区的分配consumer.unsubscribe();//此处不取消订阅暂停太久会出现订阅超时的错误consumer.pause(consumer.assignment());//重新消费分区,此处不重新分配会出错
this.open(null,null,null);
if (null == consumer) { Properties props = new Properties(); props.put("bootstrap.servers", PropertiesUtil.getValue("bootstrap.servers")); // 消费者的组id props.put("group.id", constant.kafka_groupName);//Spider2 props.put("enable.auto.commit", "false");// max.poll.interval.ms(官网给得默认值为3000)的意思为,当我们从kafkaServer端poll消息时,poll()的调用之间的最大延迟。// 这提供了消费者在获取更多记录之前可以空闲的时间量的上限。 如果在此超时到期之前未调用poll(),则认为使用者失败,并且消费// 者组将重新平衡以便将分区重新分配给其他消费者,而恰好这里我们设置了Thread.sleep(6000) > max.poll.interval.ms值,// 也就是我们在手动提交的时候,实际上分区信息已经被分配到整个消费者组里面的其它消费者了 props.put("auto.commit.interval.ms", "3000"); // 从poll(拉)的回话处理时长 props.put("session.timeout.ms", "100000"); props.put("request.timeout.ms", "200000"); props.put("max.poll.records", "2"); // poll的数量限制 // props.put("max.poll.records", "100"); /* props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");*/ props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); props.put("group.name", UUID.randomUUID().toString().replaceAll("-", "")); consumer = new KafkaConsumer<String, String>(props); // 订阅主题列表topic //consumer.subscribe(Arrays.asList("test_input")); } //注册kafka rebalanceListener //consumer.subscribe(Arrays.asList("test_etl"), new ConsumerRebalanceListener(){ listener = new ConsumerRebalanceListener(){ @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { System.out.printf("threadId = {}, onPartitionsRevoked.", Thread.currentThread().getId()); consumer.commitSync(offsetsMap); consumer.commitSync(); } @Override public void onPartitionsAssigned( Collection<TopicPartition> partitions) { System.out.printf("threadId = {}, onPartitionsAssigned.", Thread.currentThread().getId()); consumer.commitSync(); offsetsMap.clear(); }}; consumer.subscribe(Arrays.asList(topicName.split(",")[0],topicName.split(",")[1],topicName.split(",")[2]), listener);
consumer.resume(consumer.assignment());