【消息队列-Kafka】01-Kafka入门使用

猫咪的一生 2020-06-26

一、引入kafka pom三方配置

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.0</version>
</dependency>

二、生产者使用实例

1.生产者配置

public class KafkaProducerClient {
    private Producer<String, String> producer;

    private KafkaProducerClient() {
    }

    /**
     * 获取kafka消费端实例
     */
    public KafkaProducerClient(Properties props) {
        // kafka生产者配置详解:https://www.jianshu.com/p/9a31538ea4b3
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer(props);
    }


    /**
     * 消息同步阻塞发送
     */
    public String sendSyncMessage(String topic, String message) throws Exception {
        RecordMetadata recordMetadata = producer.send(new ProducerRecord<String, String>(topic, message)).get();
        return recordMetadata.offset() + "|" + recordMetadata.partition();  
    }

    /**
     * 消息异步非阻塞发送
     */
    public void sendAsyncMessage(String topic, String message) {
        producer.send(new ProducerRecord<String, String>(topic, message));
    }

}

2.单元测试

public class KafkaProducerClientTest {
    private static final String TOPIC="topic-zcx";

    @Test
    public void sendMessage() throws Exception {
        Properties props=new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "47.97.167.84:19090");
        //配置retry时数据的幂等性,避免数据重复提交
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
        //配置数据压缩
        //props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
        KafkaProducerClient kafkaProducerClient=new KafkaProducerClient(props);
        String offset=kafkaProducerClient.sendSyncMessage(TOPIC,"hello kafka");
        System.out.printf("offset:"+offset);
    }
}

三、消费者使用实例

1.消费者配置

public class KafkaConsumerClient {
    private KafkaConsumer consumer;

    private KafkaConsumerClient() {
    }

    /**
     * 获取kafka消费端实例
     */
    public KafkaConsumerClient(Properties properties) {
        // kafka消费端配置详解:https://blog.csdn.net/Dongguabai/article/details/86524023?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-1
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer(properties);
    }


    /**
     * 监听消费指定topic数据
     */
    public void receive(String topic, IConsumerListener callable) {
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            callable.doReceive(consumer);
        }
    }

     public interface IConsumerListener {
        void doReceive(KafkaConsumer consumer);
    }
}

2.单元测试

public class KafkaConsumerClientTest {
    private static final String TOPIC="topic-zcx";

    @Test
    public void receive() {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "47.97.167.84:19090,47.97.167.84:19091,47.97.167.84:19092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-test");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
        KafkaConsumerClient kafkaConsumerClient=new KafkaConsumerClient(properties);
        kafkaConsumerClient.receive(TOPIC,consumer -> {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(30));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %s,value = %s\n", record.partition()+"|"+record.offset(),record.value());
            }
        });
    }
}

相关推荐