qingyuerji 2020-04-14
近期在研究各种消息队列方案,为了有一个直观的使用体验,我把Kafka,RocketMQ,RabbitMQ各自部署了一遍,并使用了最基本的生产与消费消息功能。在部署过程中也遇到一些问题,特此记录。本文只适用于没有使用过消息队列,还停留在安装部署阶段的新手用户,要了解一个软件,最好的开始方法是开始使用他,这样才会有一个直观的印象。本篇文章的作用也在于此,至于需要了解更深入的架构与细节,则需要查询其他的文档资料,这也不是本文的目的。我这里使用的操作系统是Centos 6.x,硬件配置一般即可。
Kafka的部署我是参考官网的步骤开始的,请直接参考其Quickstart章节。
Step1:下载安装包并解压
# wget https://mirror.bit.edu.cn/apache/kafka/2.4.1/kafka_2.12-2.4.1.tgz# tar
-xzf kafka_2.12-2.4.1.tgz<br />
# cd
kafka_2.12-2.4.1
问题:下载有可能报下面的错误(没有报错则忽略):
To connect to mirrors.tuna.tsinghua.edu.cn insecurely, use ‘--no-check-certificate’.
只需要添加报错提示的参数即可:
wget --no-check-certificate https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.1/kafka_2.12-2.4.1.tgz
Step2:启动服务器
在上述第一步执行完成之后,当前在目录kafka_2.12-2.4.1下,运行下面的命令启动zookeeper服务,Kafka依赖zookeeper服务,所以我们首先要启动zookeeper服务,由于zookeeper服务需要另外安装,Kafka的安装包中提供了一个简单的单节点的zookeeper实例,以方便快速运行Kafka服务,如果是在正式环境中使用则需要另外单独安装zookeeper服务(一般是分布式集群而非单节点)给Kafka使用。
# bin
/zookeeper-server-start
.sh config
/zookeeper
.properties
我在运行此命令时候报下面的错误:<br />
Unrecognized VM option ‘+UseGCLogFileRotation‘
Could not create the Java virtual machine.
从提示上看是因为java虚拟机不支持参数UseGCLogFileRotation,我经过查资料说这个参数可以去掉,bin
/zookeeper-server-start
.sh 脚本中调用了 bin/kafka-run-class.sh ,真正起作用的代码在这个脚本中 kafka-run-class.sh,打开这个脚本找到上述参数所在的地方,去掉 -XX:+UseGCLogFileRotation , 如下图所示:<br /><img src="http://image.mamicode.com/info/202004/20200414210328098533.png" title="Kafka,RocketMQ,RabbitMQ部署与使用体验" alt="Kafka,RocketMQ,RabbitMQ部署与使用体验"><br />
继续运行上面的脚本,继续报错:<br />
Unrecognized VM option ‘NumberOfGCLogFiles=10‘
Could not create the Java virtual machine.
像刚才一样,继续去掉 -XX:NumberOfGCLogFiles=10 ,如下图:
继续运行,得到报错:
Unrecognized VM option ‘GCLogFileSize=100M‘
Could not create the Java virtual machine.
我们继续去掉脚本中的 -XX:GCLogFileSize=100M 如下图:
继续运行,得到下面的报错:
从提示信息可以看出应该是java字节码的兼容性问题,java.lang.UnsupportedClassVersionError,表示当前运行的zookeeper的字节码是高版本的java编译器编译的,而现在的运行时环境是低版本,所以不兼容。运行下面的命令查看java版本:
# java -version
java version "1.6.0_24"
OpenJDK Runtime Environment (IcedTea6 1.11.8) (rhel-1.56.1.11.8.el6_3-x86_64)
OpenJDK 64-Bit Server VM (build 20.0-b12, mixed mode)
是jdk1.6,这个版本太低了,我们需要安装jdk1.8或者以上的版本,怎么安装呢,这里我首先搜索一下系统的安装源里面有没有,如果有我们直接从操作系统提供的源进行安装,如果没有则需要通过源码等其他办法进行安装了,运行下面命令查看:
# yum search openjdk
结果图如下:
我们安装开发环境,也就是下面这个,区别就是开发环境中有javac等编译工具,而运行时环境中没有,运行时环境只包含运行java程序所需要的工具,因为后面我们需要使用javac编译,所以我们安装开发环境,使用下面命令安装完成即可:
# yum install java-1.8.0-openjdk-devel.x86_64
成功之后我们再次运行 bin
/zookeeper-server-start
.sh config
/zookeeper
.properties 以启动zookeeper服务,没有任何错误,最后停留在如下界面:<br /><img src="http://image.mamicode.com/info/202004/20200414210330224591.png" title="Kafka,RocketMQ,RabbitMQ部署与使用体验" alt="Kafka,RocketMQ,RabbitMQ部署与使用体验"><br />
启动Kafka服务:bin
/kafka-server-start
.sh config
/server
.properties 没有任何报错,中间会打印出Kafka的配置,输出比较多,我只截取了一部分,其中 INFO KafkaConfig values 后面就是kafka的配置:
Step3:创建topic
# bin
/kafka-topics
.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic
testtopic
我们可以列出kafka服务器上的topic列表:
# bin
/kafka-topics
.sh --list --bootstrap-server localhost:9092
testtopic
Step4:发送消息
# bin
/kafka-console-producer
.sh --broker-list localhost:9092 --topic testtopic<br />
>hello message
>
运行命令之后 > 提示符等待输入消息,我们输入字符串 hello message 并回车
Step5:启动消费者
# bin
/kafka-console-consumer
.sh --bootstrap-server localhost:9092 --topic
testtopic
--from-beginning<br />hello message<br />
此时在发送消息侧不断输入消息并回车,消费者会不断打印收到的消息,至此Kafka的简单收发消息运行完毕
我们依然根据官网文档中的 Quick Start节的提示开始操作,官网建议使用64位Linux/Unix/Mac操作系统,我们使用的是64位的Centos 6.x,建议使用JDK1.8+,我们上面部署Kafka的时候已经安装了jdk1.8,需要maven3.2.x,我的系统上没有,所以首先需要安装maven:
安装maven:
1. 下载maven安装包,我这里使用的是更高一点的版本:
# wget http://mirrors.cnnic.cn/apache/maven/maven-3/3.5.4/binaries/apache-maven-3.5.4-bin.tar.gz
2. 解压maven安装包:
# tar -zxvf apache-maven-3.5.4-bin.tar.gz
3. 配置maven到profile文件中以便启动的时候自动设置环境变量 vim /etc/profile 添加下面两行:
export MAVEN_HOME=/data/rocket_mq/apache-maven-3.5.4
export PATH=$MAVEN_HOME/bin:$PATH
注意这里你应该换成你自己的对应的目录
4. 使环境变量生效:
# source /etc/profile
5. 检验是否正常:
# mvn -version
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /data/rocket_mq/apache-maven-3.5.4
Java version: 1.8.0_212, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.212.b04-0.el6_10.x86_64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "4.1.0-32.el6.ucloud.x86_64", arch: "amd64", family: "unix"
下载RocketMQ的源码并编译:
# wget https://archive.apache.org/dist/rocketmq/4.7.0/rocketmq-all-4.7.0-source-release.zip
# unzip rocketmq-all-4.7.0-source-release.zip
# cd rocketmq-all-4.7.0/
# mvn -Prelease-all -DskipTests clean install -U
# cd distribution/target/rocketmq-4.7.0/rocketmq-4.7.0
编译成功之后的截图如下:
运行nameserver服务:
# sh bin/mqnamesrv
ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!
首先我们在编译RocketMQ的时候我们已经将目录定位到 cd distribution/target/rocketmq-4.7.0/rocketmq-4.7.0 在此目录下运行nameserver服务报错,根据提示我们需要设置 JAVA_HOME 环境变量,如下(修改为你自己的地址):
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.212.b04-0.el6_10.x86_64/
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin
之后再次运行:
# sh bin/mqnamesrv
OpenJDK 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
OpenJDK 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
可以看到Name Server运行成功,这里提一下,Name Server是为RocketMQ提供名字服务的模块,其角色与zookeeper在Kafka中的作用类似
启动Broker服务:
在启动Broker这里有一点要注意的是,我们这里运行命令所在的目录跟上面运行Name Server的目录一样,都是在 distribution/target/rocketmq-4.7.0/rocketmq-4.7.0 这个目录下运行,并且上面需要导入的 JAVA_HOME 等环境变量这里都需要,除此之外,运行Broker我们还需要修改一下配置文件,在当前目录下(distribution/target/rocketmq-4.7.0/rocketmq-4.7.0) 修改文件 vim conf/broker.conf 在文件后面添加如下一行配置:
brokerIP1 = 127.0.0.1
设置Broker的ip,如果不设置的话,当机器存在多网卡的时候,例如一个内外ip和一个外网ip的情况,Broker可能会配置第一个ip地址,导致我们后面的生产者启动的时候写入消息失败报告下面的错误:
为了测试方便,我们直接设置BrokerIP1为本机回环ip地址127.0.0.1,然后通过配置文件启动Broker:
# bin/mqbroker -n localhost:9876 -c conf/broker.conf
The broker[broker-a, 127.0.0.1:10911] boot success. serializeType=JSON and name server is localhost:9876
启动生产者发送消息:
Broker启动完成之后,我们接下来启动生产者发送消息(同样要定位到 distribution/target/rocketmq-4.7.0/rocketmq-4.7.0 并导入 JAVA_HOME 等环境变量):
# export NAMESRV_ADDR=localhost:9876
# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
03:08:05.769 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
SendResult [sendStatus=SEND_OK, msgId=20020AB3FD9D000000000000000000017E931540E19D439EE28E0000, offsetMsgId=7F00000100002A9F00000000000638E4, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=500]
SendResult [sendStatus=SEND_OK, msgId=20020AB3FD9D000000000000000000017E931540E19D439EE2B00001, offsetMsgId=7F00000100002A9F00000000000639AE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=500]
......
03:08:07.668 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
03:08:07.669 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:10911] result: true
可以看到中间我省略了很多日志,这些日志表示生产者向服务器写入了大量的消息,由于输出太多,我只列出2条
启动消费者消费消息:
接着我们启动消费者消费消息(同样要定位到 distribution/target/rocketmq-4.7.0/rocketmq-4.7.0 并导入 JAVA_HOME 等环境变量):
# export NAMESRV_ADDR=localhost:9876
# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
03:11:37.229 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=202, queueOffset=500, sysFlag=0, bornTimestamp=1586804886200, bornHost=/127.0.0.1:42523, storeTimestamp=1586804886203, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F0000000000063B42, commitLogOffset=408386, bodyCRC=1032136437, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest‘, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=750, CONSUME_START_TIME=1586805097731, UNIQ_KEY=20020AB3FD9D000000000000000000017E931540E19D439EE2B80003, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 51], transactionId=‘null‘}]]
ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=202, queueOffset=500, sysFlag=0, bornTimestamp=1586804886195, bornHost=/127.0.0.1:42523, storeTimestamp=1586804886196, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F0000000000063A78, commitLogOffset=408184, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘TopicTest‘, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=750, CONSUME_START_TIME=1586805097731, UNIQ_KEY=20020AB3FD9D000000000000000000017E931540E19D439EE2B30002, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId=‘null‘}]]
同样是大量消费消息的日志,这里也只列出前面2条输出,至此,RocketMQ的部署与简单消息收发使用完毕
RabbitMQ的部署我们同样参照官网给出的步骤,找到官网的 Get Started 点开按钮 “Download + Installation” ,RabbitMQ有多种运行方式,可以基于docker镜像运行,我们本次不使用docker这种方式部署。我们使用常规的部署方法,RabbitMQ使用Erlang语言编写,运行需要Erlang的运行时环境,所以我们首先需要安装Erlang环境,RabbitMQ团队为我们提供了一个Erlang安装包,其仅包含运行RabbitMQ所需要的全部组件,为了方便我们就使用此Erlang安装包,安装起来很容易:
安装Erlang环境:
# wget https://github.com/rabbitmq/erlang-rpm/releases/download/v22.3.1/erlang-22.3.1-1.el6.x86_64.rpm
# yum install erlang-22.3.1-1.el6.x86_64.rpm
没有报错,我这里是下载的Centos版本的二进制安装包,其他系统可以自行去RabbitMQ官网下载,Erlang安装好之后,我们就可以安装RabbitMQ服务了:
安装RabbitMq:
# wget https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
# rpm --import rabbitmq-release-signing-key.asc
# wget http://wangqiguoy.cn-bj.ufileos.com/rabbitmq-server-3.8.3-1.el6.noarch.rpm
# yum install rabbitmq-server-3.8.3-1.el6.noarch.rpm
我安装的时候没有任何报错,直接安装完成,比Kafka和RocketMQ要顺利多了,RabbitMQ提供了一个web界面可以对其进行管理,可以在上面看到当前的队列,通道信息,消息堆积情况等等,通过下面的命令开启插件:
开启web管理插件:
# rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node :
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to
The following plugins have been enabled:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
started 3 plugins.
开启成功之后,可以通过一个http的地址访问web界面管理RabbitMQ,地址形式是:http://ip:15672/ 将ip地址换成自己的服务器ip地址即可,登陆界面如下:
修改RabbitMQ的配置文件以登陆其web界面:
# vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.3/ebin/rabbit.app
{loopback_users, [<<"guest">>]},
改成
{loopback_users, [guest]}
修改好之后,重启服务生效,可以通过上面的web界面登录,用户名和密码都是guest,登陆进去之后的界面如下图:
功能还是很丰富的,可以查看connection、channel、exchange、queue等信息,还可以管理用户权限,除了通过web界面管理RabbitMQ之外,Rabbit还提供一个命令行工具rabbitmqctl来管理RabbitMQ,该工具的具体使用可以查看其官网文档,另外对RabbitMQ常用的启动停止操作可以参考如下命令:
service rabbitmq-server start
service rabbitmq-server stop
service rabbitmq-server restart
service rabbitmq-server status
发送消息与消费消息:
RabbitMQ没有提供命令行的测试方式,不过RabbitMQ提供了很多客户端的demo可供使用,这里我选择的是使用nodejs,首先要准备node的环境如下:
# yum install nodejs
# yum install npm
# npm install amqplib
最后一步安装 amqplib 即是nodejs中要使用的与RabbitMQ相关的客户端库,我们的生产者 send.js 代码如下:
//send.js var amqp = require(‘amqplib/callback_api‘); amqp.connect(‘amqp://localhost‘, function(error0, connection) { if (error0) { throw error0; } connection.createChannel(function(error1, channel) { if (error1) { throw error1; } var queue = ‘hello‘; var msg = ‘{"name":"Hello World!"}‘; channel.assertQueue(queue, { durable: false }); channel.sendToQueue(queue, new Buffer(msg)); console.log(" [x] Sent %s", msg); }); setTimeout(function() { connection.close(); process.exit(0); }, 500); });
消费者 receive.js 的代码如下:
//receive.js var amqp = require(‘amqplib/callback_api‘); amqp.connect(‘amqp://localhost‘, function(error0, connection) { if (error0) { throw error0; } connection.createChannel(function(error1, channel) { if (error1) { throw error1; } var queue = ‘hello‘; channel.assertQueue(queue, { durable: false }); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue); channel.consume(queue, function(msg) { console.log(" [x] Received %s", msg.content.toString()); }, { noAck: true }); }); });
分别运行 send.js 与 receive.js 得到如下输出:
# node send.js
[x] Sent {"name":"Hello World!"}
# node receive.js
[*] Waiting for messages in hello. To exit press CTRL+C
[x] Received {"name":"Hello World!"}
至此RabbitMQ的部署以及简单消息收发使用完成