Kafka Producer中buffer.memory参数和max.block.ms参数的含义?

amwayy 2019-11-22

一、buffer.memory 参数

The buffer.memory controls the total amount of memory available to the producer for buffering. If records are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is exhausted additional send calls will block. The threshold for time to block is determined by max.block.ms after which it throws a TimeoutException.

buffer.memory设置决定了Producer缓存区整个可用的内存。如果记录记录发送速度总是比推送到集群速度快,那么缓存区将被耗尽。当缓存区资源耗尽,消息发送send方法调用将被阻塞,阻塞的时间由max.block.ms设定,阻塞超过限定时间会抛出TimeoutException异常。

默认值: 33554432(32MB)

二、max.block.ms 参数

The configuration controls how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block.These methods can be blocked either because the buffer is full or metadata unavailable.

Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.

max.block.ms 参数决定KafkaProducer.send() 和 KafkaProducer.partitionsFor() 方法被阻塞的时间。当缓存区满了或者元数据不可用的时间将产生阻塞。用户提供的序列化器或分区器中的阻塞将不计入此超时。

默认值: 60000

三、request.timeout.ms 参数

这个参数容易和上面的max.block.ms 参数相混淆,这里也一同说明一下。

生产者producer发送消息后等待响应的最大时间,如果在配置时间内没有得到响应,生产者会重试。

默认值: 30000

四、示例代码

在kafka producer初始化时,设置相关的参数, 一般而言,使用默认值即可。

package com.rickie.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerParamsDemo {
 public static void main(String[] args) {
 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("max.block.ms", 30000);
 props.put("request.timeout.ms", 30000);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for(int i = 0; i < 100; i++) {
 producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
 }
 producer.close();
 }
}

相关推荐