KAFKA

jiaomrswang 2018-06-05

Kafka核心组件
  Topic:消息根据Topic进行归类,可以理解为一个队里。
  Producer:消息生产者,就是向kafka broker发消息的客户端。
  Consumer:消息消费者,向kafka broker取消息的客户端。
  broker:每个kafka实例(server),一台kafka服务器就是一个broker,一个集群由多个broker组成,一个broker可以容纳多个topic。
  Zookeeper:依赖集群保存meta信息。

测试实例:

  1. 查看topic 列表(--zookeeper 指定任意一个zk节点即可,也可以全部列出,用于获取集群信息) 
    /usr/hdp/2.6.5.0-292/kafka/bin/kafka-topics.sh --zookeeper slave227:2181,slave223:2181,slave228:2181 --describe

  2. 创建topic(--replication-factor表示复制到多少个节点,--partitions表示分区数,一般设置为2或与节点数相等,不能大于总节点数)
    /usr/hdp/2.6.5.0-292/kafka/bin/kafka-topics.sh --zookeeper slave227:2181,slave223:2181,slave228:2181 --create --topic topic1 --replication-factor 2 --partitions 2

  3. 发送消息(--topic 指定topic ,--6667端口按个人配置的listeners监听端口)
    /usr/hdp/2.6.5.0-292/kafka/bin/kafka-console-producer.sh --broker-list 172.69.1.221:6667  --topic topic1
    >message1
    >message2

  4. 消费消息
    /usr/hdp/2.6.5.0-292/kafka/bin/kafka-console-consumer.sh --zookeeper slave227:2181,slave223:2181,slave228:2181 --topic topic1 
    或者
    /usr/hdp/2.6.5.0-292/kafka/bin/kafka-console-consumer.sh --zookeeper slave227:2181,slave223:2181,slave228:2181 --topic topic1 --from-beginning

  5. replica检查
    /usr/hdp/2.6.5.0-292/kafka/bin/kafka-replica-verification.sh --broker-list master:6667

  6. 查询已创建的topic
    /usr/hdp/2.6.5.0-292/kafka/bin/kafka-topics.sh --zookeeper slave227:2181,slave223:2181,slave228:2181 --list
删除topic以及清除topic数据
  1. Kafka 删除topic(如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion)
    /usr/hdp/2.6.5.0-292/kafka/bin/kafka-topics.sh  --delete --zookeeper 【zookeeper server】  --topic 【topic name】

  2. zookeeper client 删除topic(通过zookeeper-client 删除掉broker下的topic)
    /usr/hdp/2.6.5.0-292/zookeeper/bin/zkCli.sh -server slave227:2181,slave223:2181,slave228:2181
    找到topic所在的目录:ls /brokers/topics
    找到要删除的topic,执行命令:rmr /brokers/topics/【topic name】即可,此时topic被彻底删除。
    另外被标记为marked for deletion的topic你可以在zookeeper客户端中通过命令获得:ls /admin/delete_topics/【topic name】,如果你删除了此处的topic,那么marked for deletion 标记消失

  3. 最后删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录

  4. 清楚topic数据
    log.cleanup.policy=delete启用删除策略
    直接删除,删除后的消息不可恢复。可配置以下两个策略:
    清理超过指定时间清理:  
    log.retention.hours=16
    超过指定大小后,删除旧的消息:
    log.retention.bytes=1073741824
    为了避免在删除时阻塞读操作,采用了copy-on-write形式的实现,删除操作进行时,读取操作的二分查找功能实际是在一个静态的快照副本上进行的,这类似于Java的CopyOnWriteArrayList。
    主要看server.properties配置
    log.cleanup.interval.mins=10
    log.dirs=/kafka-logs
    log.index.interval.bytes=4096
    log.index.size.max.bytes=10485760
    log.retention.bytes=-1
    log.retention.hours=168
    log.roll.hours=168
    log.segment.bytes=1073741824

 总结:

彻底删除topic:

 1、删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录

  2、如果配置了delete.topic.enable=true直接通过命令删除,如果命令删除不掉,直接通过zookeeper-client 删除掉broker下的topic即可。

相关推荐

xinglun / 0评论 2020-06-14