sweetgirl0 2020-02-19
一、下载kafka_2.12-2.4.0.tgz并解压至/home/kafka_2.12-2.4.0
二、配置kafka
2.1 创建kafka日志文件夹:/home/kafka_2.12-2.4.0/logs
2.2 创建zookeeper数据目录:/tmp/zookeeper
2.3 配置/home/kafka_2.12-2.4.0/config/server.properties 内容如下(SSL证书在下面介绍):
ssl.keystore.location=/home/ca/server/server.keystore.jks ssl.keystore.password=mima123 ssl.key.password=mima123 ssl.truststore.location=/home/ca/trust/server.truststore.jks ssl.truststore.password=mima123 ssl.client.auth=required ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.keystore.type=JKS ssl.truststore.type=JKS ssl.endpoint.identification.algorithm= #security.inter.broker.protocol=SSL inter.broker.listener.name=SSL ############################# Server Basics ############################# broker.id=0 listeners=SSL://阿里云内网IP:9093 advertised.listeners=SSL://阿里云外网IP:9093 listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 ############################# Log Basics ############################# log.dirs=/home/kafka_2.12-2.4.0/logs num.partitions=1 num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Flush Policy ############################# #log.flush.interval.messages=10000 #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# log.retention.hours=168 #log.retention.bytes=1073741824 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# zookeeper.connect=阿里云内网IP:2181 zookeeper.connection.timeout.ms=6000 ############################# Group Coordinator Settings ############################# group.initial.rebalance.delay.ms=0 delete.topic.enble=true
2.4 配置 /home/kafka_2.12-2.4.0/config/zookeeper.properties
dataDir=/tmp/zookeeper clientPort=2181 maxClientCnxns=0 admin.enableServer=false
2.5 配置/etc/hosts文件,增加红色行,IP为阿里云内网IP
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 172.18.54.18 iZwz9gq8vhwxtgpg21yonsZ iZwz9gq8vhwxtgpg21yonsZ 172.18.54.18 kafka-single
三、生成SSL相关证书文件
3.1、创建四个文件夹 /home/ca/root、/home/ca/trust、/home/ca/server、/home/ca/client
3.2、签发相关证书
第一步:生成server.keystore.jks文件(即:生成服务端的keystore文件)
keytool -keystore /home/ca/server/server.keystore.jks -alias ds-kafka-single -validity 10000 -genkey -keypass mima123 -keyalg RSA -dname "CN=kafka-single,OU=qmx,O=qmx,L=beijing,S=beijing,C=cn" -storepass mima123 -ext SAN=DNS:kafka-single
第二步:生成CA认证证书(为了保证整个证书的安全性,需要使用CA进行证书的签名保证)
openssl req -new -x509 -keyout /home/ca/root/ca-key -out /home/ca/root/ca-cert -days 10000 -passout pass:mima123 -subj "/C=cn/ST=beijing/L=beijing/O=qmx/OU=qmx/CN=kafka-single"
第三步:通过CA证书创建一个客户端信任证书
keytool -keystore /home/ca/trust/client.truststore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123
第四步:通过CA证书创建一个服务端器端信任证书
keytool -keystore /home/ca/trust/server.truststore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123
第五步:服务器证书的签名处理
第1小步:导出服务器端证书server.cert-file
keytool -keystore /home/ca/server/server.keystore.jks -alias ds-kafka-single -certreq -file /home/ca/server/server.cert-file -storepass mima123
第2小步:用CA给服务器端证书进行签名处理
openssl x509 -req -CA /home/ca/root/ca-cert -CAkey /home/ca/root/ca-key -in /home/ca/server/server.cert-file -out /home/ca/server/server.cert-signed -days 10000 -CAcreateserial -passin pass:mima123
第3小步:将CA证书导入到服务器端keystore
keytool -keystore /home/ca/server/server.keystore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123
第4小步:将已签名的服务器证书导入到服务器keystore
keytool -keystore /home/ca/server/server.keystore.jks -alias ds-kafka-single -import -file /home/ca/server/server.cert-signed -storepass mima123
客户端SSL证书签发
第一步:生成client.keystore.jks文件
keytool -keystore /home/ca/client/client.keystore.jks -alias ds-kafka-single -validity 10000 -genkey -keypass mima123-dname "CN=kafka-single,OU=qmx,O=qmx,L=beijing,S=beijing,C=cn" -ext SAN=DNS:kafka-single -storepass mima123
第二步:导出客户端证书client.cert-file
keytool -keystore /home/ca/client/client.keystore.jks -alias ds-kafka-single -certreq -file /home/ca/client/client.cert-file -storepass mima123
第三步:用CA给客户端证书进行签名处理
openssl x509 -req -CA /home/ca/root/ca-cert -CAkey /home/ca/root/ca-key -in /home/ca/client/client.cert-file -out /home/ca/client/client.cert-signed -days 10000 -CAcreateserial -passin pass:mima123
第四步:将CA证书导入到客户端keystore
keytool -keystore /home/ca/client/client.keystore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123
第五步:将已签名的证书导入到客户端keystore
keytool -keystore /home/ca/client/client.keystore.jks -alias ds-kafka-single -import -file /home/ca/client/client.cert-signed -storepass mima123
四、启动和停止kafka和zookeeper服务
cd /home/kafka_2.12-2.4.0/bin
启动zookeeper:
./zookeeper-server-start.sh /home/kafka_2.12-2.4.0/config/zookeeper.properties &
启动kafka:
./kafka-server-start.sh /home/kafka_2.12-2.4.0/config/server.properties &
查看topic情况:
./kafka-topics.sh --list --zookeeper localhost:2181
关闭kafka:
./kafka-server-stop.sh
关闭zookeeper:
./zookeeper-server-stop.sh
查看 kafka 的 topic 情况:
./kafka-topics.sh --list --zookeeper 172.18.54.18:2181
查看topic详细信息:
./kafka-topics.sh --describe --zookeeper 172.18.54.18:2181 --topic topic1
生产者客户端命令:
./kafka-console-producer.sh --broker-list 172.18.54.18:9092 --topic topic1
消费者客户端命令:
./kafka-console-consumer.sh --bootstrap-server 172.18.54.18:9092 --topic topic1 --from-beginning
五、JAVA客户端对接
5.1 Producer
package com.xrh.extend.kafka;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
public class Producer {
public static String topic = "topic2";//定义主题
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "阿里云外网IP:9093");//kafka地址,多个地址用逗号分割
// acks:消息的确认机制,默认值是0。
// acks=0:如果设置为0,生产者不会等待kafka的响应。
// acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。
// acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。
props.put("acks", "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "D:\\ca\\client.truststore.jks");
props.put("ssl.truststore.password", "mima123");
props.put("ssl.keystore.location", "D:\\ca\\client.keystore.jks");
props.put("ssl.keystore.password", "mima123");
props.setProperty("ssl.endpoint.identification.algorithm", "");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
try {
int i = 1;
while (i < 20) {
String msg = "测试 Hello," + new Random().nextInt(100);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic , "key1", msg);
kafkaProducer.send(record, new MyProducerCallBack());
System.out.println("消息发送成功:" + msg);
++ i;
Thread.sleep(500);
}
} finally {
kafkaProducer.close();
}
}
private static class MyProducerCallBack implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(null != e){
e.printStackTrace();
return;
}
System.out.println("时间戳,主题,分区,位移: " + recordMetadata.timestamp()
+ ", " + recordMetadata.topic() + "," + recordMetadata.partition()
+ " " + recordMetadata.offset());
}
};
// acks = 1
// batch.size = 16384 //当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率。
// bootstrap.servers = [39.108.124.173:9092]
// buffer.memory = 33554432
// client.dns.lookup = default
// client.id =
// compression.type = none
// connections.max.idle.ms = 540000
// delivery.timeout.ms = 120000
// enable.idempotence = false
// interceptor.classes = []
// key.serializer = class org.apache.kafka.common.serialization.StringSerializer
// linger.ms = 0
// max.block.ms = 60000
// max.in.flight.requests.per.connection = 5
// max.request.size = 1048576
// metadata.max.age.ms = 300000
// metric.reporters = []
// metrics.num.samples = 2
// metrics.recording.level = INFO
// metrics.sample.window.ms = 30000
// partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
// receive.buffer.bytes = 32768
// reconnect.backoff.max.ms = 1000
// reconnect.backoff.ms = 50
// request.timeout.ms = 30000
// retries = 2147483647 //配置为大于0的值的话,客户端会在消息发送失败时重新发送。
// retry.backoff.ms = 100
// sasl.client.callback.handler.class = null
// sasl.jaas.config = null
// sasl.kerberos.kinit.cmd = /usr/bin/kinit
// sasl.kerberos.min.time.before.relogin = 60000
// sasl.kerberos.service.name = null
// sasl.kerberos.ticket.renew.jitter = 0.05
// sasl.kerberos.ticket.renew.window.factor = 0.8
// sasl.login.callback.handler.class = null
// sasl.login.class = null
// sasl.login.refresh.buffer.seconds = 300
// sasl.login.refresh.min.period.seconds = 60
// sasl.login.refresh.window.factor = 0.8
// sasl.login.refresh.window.jitter = 0.05
// sasl.mechanism = GSSAPI
// security.protocol = PLAINTEXT
// security.providers = null
// send.buffer.bytes = 131072
// ssl.cipher.suites = null
// ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
// ssl.endpoint.identification.algorithm = https
// ssl.key.password = null
// ssl.keymanager.algorithm = SunX509
// ssl.keystore.location = null
// ssl.keystore.password = null
// ssl.keystore.type = JKS
// ssl.protocol = TLS
// ssl.provider = null
// ssl.secure.random.implementation = null
// ssl.trustmanager.algorithm = PKIX
// ssl.truststore.location = null
// ssl.truststore.password = null
// ssl.truststore.type = JKS
// transaction.timeout.ms = 60000
// transactional.id = null
// value.serializer = class org.apache.kafka.common.serialization.StringSerializer
}5.2 Consumer
package com.xrh.extend.kafka;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import javafx.util.Duration;
public class Consumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "阿里云外网IP:9093");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "D:\\ca\\client.truststore.jks");
props.put("ssl.truststore.password", "mima123");
props.put("ssl.keystore.location", "D:\\ca\\client.keystore.jks");
props.put("ssl.keystore.password", "mima123");
props.setProperty("ssl.endpoint.identification.algorithm", "");
// p.put("auto.offset.reset", "latest");
// bootstrap.servers: kafka的地址。
// group.id:组名 不同组名可以重复消费。例如你先使用了组名A消费了kafka的1000条数据,但是你还想再次进行消费这1000条数据,并且不想重新去产生,那么这里你只需要更改组名就可以重复消费了。
// enable.auto.commit:是否自动提交,默认为true。
// auto.commit.interval.ms: 从poll(拉)的回话处理时长。
// session.timeout.ms:超时时间。
// max.poll.records:一次最大拉取的条数。
// auto.offset.reset:消费规则,默认earliest 。
// earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 。
// latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 。
// none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。
// key.serializer: 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
// value.deserializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Collections.singletonList(Producer.topic));// 订阅消息
while(true){
ConsumerRecords<String, String> consumerDatas = consumer.poll(100);
if( consumerDatas.count() > 0 ){
Iterator<ConsumerRecord<String, String>> consumerIter = consumerDatas.iterator();
while(consumerIter.hasNext()){
ConsumerRecord<String, String> consumerData = consumerIter.next();
System.out.printf("offset = %d, key = %s, value = %s%n",
consumerData.offset(), consumerData.key(), consumerData.value());
}
}else{
System.out.println("KafkaConsumer1 is waiting message...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}