hanwentan 2020-07-21
依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>代码
package com.perfect.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
public class KafkaComsumerTest {
@Test
public void cunsumertest(){
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
//关闭自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
//props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
//latest,earliest
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG,"bigdata1");
KafkaConsumer<String,String> c = new KafkaConsumer<String, String>(props);
c.subscribe(Collections.singletonList("test2"));
while(true){
for(int i=0;i<10000;i++){
ConsumerRecords records = c.poll(100);
records.forEach(System.out::println);
}
//同步提交
c.commitSync();
//异步提交
//c.commitAsync();
}
// c.close();
}
}