ActiveMQ简单简绍(“点对点通讯”和 “发布订阅模式”)

胡献根 2020-02-13

ActiveMQ简单简绍
MQ简介
  MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有IBMWEBSPHERE MQ。
MQ特点
  MQ的消费-生产者模型的一个典型的代表,一端往消息队列中不断的写入消息,而另一端则可以读取或者订阅队列中的消息。MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。
使用场景
  在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
JMS简介
  JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
定义
  JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,翻译为Java消息服务。
简介
  JMS是一种与厂商无关的 API,用来访问消息收发系统消息。它类似于JDBC(Java DatabaseConnectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商目前都支持JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个JMS客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
JMS和MQ的关系
  JMS是一个用于提供消息服务的技术规范,它制定了在整个消息服务提供过程中的所有数据结构和交互流程。而MQ则是消息队列服务,是面向消息中间件(MOM)的最终实现,是真正的服务提供者;MQ的实现可以基于JMS,也可以基于其他规范或标准。
支持JMS的开源MQ:
目前选择的最多的是ActiveMQ。
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
主要特点
1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WSNotification,XMPP,AMQP
2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
6. 支持通过JDBC和journal提供高速的消息持久化
7. 从设计上保证了高性能的集群,客户端-服务器,点对点
8. 支持Ajax
9. 支持与Axis的整合
10. 可以很容易得调用内嵌JMS provider,进行测试
11. ActiveMQ速度非常快;一般要比jbossMQ快10倍。
优点
  是一个快速的开源消息组件(框架),支持集群,同等网络,自动检测,TCP,SSL,广播,持久化,XA,和J2EE1.4容器无缝结合,并且支持轻量级容器和大多数跨语言客户端上的Java虚拟机。消息异步接受,减少软件多系统集成的耦合度。消息可靠接收,确保消息在中间件可靠保存,多个消息也可以组成原子事务。
缺点
  ActiveMQ默认的配置性能偏低,需要优化配置,但是配置文件复杂,ActiveMQ本身不提供管理工具;示例代码少;主页上的文档看上去比较全面,但是缺乏一种有效的组织方式,文档只有片段,用户很难由浅入深进行了解,二、文档整体的专业性太强。在研究阶段可以通过查maillist、看Javadoc、分析源代码来了解。
 
 
个人理解总结
  activeMQ是什么?
  是Apache公司旗下的一个消息总线
ActiveMQ是一个开源兼容Java Message  Service  (JMS) 1.1面向消息的中件间. 来自Apache Software Foundation. ActiveMQ提供松耦合的应用程序架构. 
activeMQ能干什么?
  用来在服务与服务之间进行异步通信的
activeMQ优势
  1.流量肖锋
  2.任务异步处理
    特点:可以解耦合
  (学习新技术的三要素:是什么?能干什么?有什么优势?)
 
 图1:
 ActiveMQ简单简绍(“点对点通讯”和 “发布订阅模式”)
 
 
通信模式:
  1.点对点(queue)
    》一个消息只能被一个服务接收
    》消息一旦被消费,就会消失
    》如果没有被消费,就会一直等待,直到被消费
    》多个服务监听同一个消费空间,先到先得
  详解:这个特点的原理是这样的,在activeMQ

点对点通讯(“代码”)
  生产者:
  public static void main(String[] args) throws JMSException {
    //步骤一:创建连接工厂
  ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,  ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
    //步骤二:创建连接
  Connection connection = activeMQConnectionFactory.createConnection();
    //步骤三:启动连接
  connection.start();
    //步骤四:获取会话工厂
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //步骤五:创建队列
  Queue queue = session.createQueue("wdksoft_queue");
    //创建消息生产者
  MessageProducer producer = session.createProducer(queue);
    //消息持久化  
  producer.setDeliveryMode(2);
    //模拟消息
  TextMessage textMessage = session.createTextMessage("hello activeMQ");
    //发送消息
  producer.send(textMessage);
  System.out.println("生产者生产消息完毕~");
    //回收资源
  session.close();
  connection.close();
  }
 消费者:
  public static void main(String[] args) throws JMSException {
    //步骤一:创建连接工厂
  ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
    //步骤二:创建连接
  Connection connection = activeMQConnectionFactory.createConnection();
    //步骤三:开启连接
  connection.start();
    //创建会话对象
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //获取到接受消息的队列
  Queue queue = session.createQueue("wdksoft_queue");
    //创建消费者
  MessageConsumer consumer = session.createConsumer(queue);
  while(true){
    //获取消息
  TextMessage message = (TextMessage)consumer.receive();
  if(message!=null){
  System.out.println("消费者获取消息:"+message.getText());
  }else{
  break;
  }
  }
      //回收资源
  session.close();
  connection.close();

}

  2.发布/订阅模式(topic)
     》一个消息可以被多个服务接收
    》订阅一个主题的消费者,只能消费自它订阅之后发布的消息。
    》消费端如果在生产端发送消息之后启动,是接收不到消息的,除非生产端对消息进行了持久化(例如广播,只有当时听到的人能听到信息)
  

 消费者:
  public static void main(String[] args) throws JMSException {
    //步骤一:创建连接工厂
  ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
    //步骤二:创建连接
  Connection connection = activeMQConnectionFactory.createConnection();
    //步骤三:开启连接
  connection.start();
    //创建会话对象
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //获取到接受消息的队列
  Topic topic = session.createTopic("wdksoft_topic");
    //创建消费者
  MessageConsumer consumer = session.createConsumer(topic);
  while(true){
    //获取消息
  TextMessage message = (TextMessage)consumer.receive();
  if(message!=null){
  System.out.println("消费者获取消息:"+message.getText());
  }else{
  break;
  }
  }
    //回收资源
  session.close();
  connection.close();
  }
   生产者:
  public static void main(String[] args) throws JMSException {
    //步骤一:创建连接工厂
  ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,       ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
    //步骤二:创建连接
  Connection connection = activeMQConnectionFactory.createConnection();
    //步骤三:启动连接
  connection.start();
    //步骤四:获取会话工厂
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //步骤五:创建主题
  Topic topic = session.createTopic("wdksoft_topic");
    //创建消息生产者
  MessageProducer producer = session.createProducer(null);
    //消息持久化
  producer.setDeliveryMode(2);
    //模拟消息
  TextMessage textMessage = session.createTextMessage("hello activeMQ pub");
    //发送消息
  producer.send(topic,textMessage);
  System.out.println("生产者生产消息完毕~");
    //回收资源
  session.close();
  connection.close();
  }

 
   图2:
ActiveMQ简单简绍(“点对点通讯”和 “发布订阅模式”)  
 
    注:消息是被推送和拉取的(消息生产端和消费端),不是mq服务器去主动发送的
    总:一些简单常用的应用场景

相关推荐

xinglun / 0评论 2020-06-14