【译】RabbitMQ系列(二)-Work模式

zhuxue 2019-07-01

Work模式

原文地址
【译】RabbitMQ系列(二)-Work模式

在第一章中,我们写了通过一个queue来发送和接收message的简单程序。在这一章中,我们会创建一个workqueue,来将执行时间敏感的任务分发到多个worker中。

work模式主要的意图是要避免等待完成一个耗时的任务。取而代之地,我们延迟任务的执行,将任务封装成消息,将之发送到queue。一个运行着的worker进程会弹出这个任务并执行它。当运行多个worker进程时,任务会在它们之间分派。

这种模式在web应用中特别有用,因为在一个较短的HTTP请求窗口中不会去执行一个复杂的任务。

准备工作

在上一章中,我们发送了一个”Hello World!"的message。现在我们将发送一个代表了复杂任务的字符串。这不是一个实际的任务,比如像调整图片大小或是重新渲染pdf文档,我们通Thead.sleep() 来模拟一个耗时的任务。message中的小圆点表示其复杂度,圆点越多则任务的执行越耗时。比如“Hello..."的message将耗时3秒。

我们简单的修改上一章的Send.java代码,允许在命令行发送任意message。新的类叫做NewTask.java

String message = String.join(" ", argv);
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

同样的,我们修改上一章中的Recv.java,让它在处理message的时候根据小圆点进行睡眠。新的类叫Worker.java

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");
  System.out.println(" [x] Received '" + message + "'");
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
  }
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == '.') Thread.sleep(1000);
    }
}

像在第一章一样编译这两个类

javac -cp $CP NewTask.java Worker.java

Round-robin分派

使用Task模式的一个明显的优势是让并行执行任务变得简单。我们只需要启动更多的worker就可以消减堆积的message,系统水平扩展简单。

首先,我们在同一时间启动两个worker。他们都会从queue获得message,来看一下具体细节。

打开了三个终端,两个是跑worker的。

java -cp $CP Worker
java -cp $CP Worker

第三个终端里来发布新的任务message。

java -cp $CP NewTask First message.
java -cp $CP NewTask Second message..
java -cp $CP NewTask Third message...
java -cp $CP NewTask Fourth message....
java -cp $CP NewTask Fifth message.....

让我们看看worker的处理message的情况.第一个worker收到了第1,3,5message,第二个worker收到了第2,4个message。

默认情况下,RabbitMQ会顺序的将message发给下一个消费者。每个消费者会得到平均数量的message。这种方式称之为round-robin(轮询).

Message 确认

执行任务需要一定的时间。你可能会好奇如果一个worker开始执行任务,但是中途异常退出,会是什么结果。在我们现在的代码中,一旦RabbitMQ将消息发送出去了,它会立即将该message删除。这样的话,就可能丢失message。

在实际场景中,我们不想丢失任何一个task。如果一个worker异常中断了,我们希望这个task能分派给另一个worker。

为了确保不会丢失message,RabbitMQ采用message确认机制。RabbitMQ只有收到该message的Ack之后,才会删除该消息。

如果worker中断退出了( channel关闭了,connection关闭了,或是TCP连接丢失了)而没有发送Ack,RabbitMQ会认为该消息没有完整的执行,会将该消息重新入队。该消息会被发送给其他的worker。这样就不用message丢失,即使是在worker经常异常中断退出的场景下。

不会有任何message会timeout。当消费者中断退出,RabbitMQ会重新分派message。即使消息的执行会花费很长的时间。

默认情况下,message是需要人工确认的。在上面的例子中,我们通过autoAck=true来关闭了人工确认。像下面这样,我们将该标志设置为false,worker就需要在完成了任务之后,发送确认。

channel.basicQos(1); // accept only one unack-ed message at a time (see below)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");

  System.out.println(" [x] Received '" + message + "'");
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

上面的代码保证即使当worker还在处理一条消息,而强制它退出,也不会丢失message。然后不久,所有未被确认的消息都会被重新分派。

发送确认必须和接收相同的channel。使用不同的channel进行确认会导致channel-level protocol 异常。

忘记确认消息是一个比较常见的错误,但是其后果是很严重的。当client退出时,message会被重新分派,但是RabbitMQ会占用越来越多的内存,因它无法释放那些未被确认的message。
可以通过rabbitmqctl来打印messages_unacknowledged:
##linux
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged 
##windows
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

Message 持久化

我们学习了在消费者出现问题的时候不丢失message。但是如果RabbitMQ服务器宕机了,我们还是会丢失message。

当RabbitMQ宕机时,默认情况下,它会”忘记“所有的queue和message。为了确保message不丢失,我们需要确认两件事情:我们要使得queue和message都是持久的。

首先,我们要确保RabbitMQ不会丢失我们设置好的queue。所以,我们要把它声明成持久的:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

虽然代码没有任何问题,但是光这样是无效的。因为我们之前已经定义过名字为hello的queue。RabbitMQ不允许你使用不同的参数去重新定义一个已经存在的queue,而且这还不会反悔任何错误信息。但是我们还是有别的方法,让我们使用一个别的名字,比如task_queue:

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

声明queue的改变要在生产者和消费者的代码里都进行修改。

接着我们要设置message的持久性,我们通过设置MessageProperties为PERSISTENT_TEXT_PLAIN:

import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
将message标记成持久的不能100%保证message不会丢失,虽然这告诉RabbitMQ将message保存到磁盘,然而在RabbitMQ从接到message到保存之间,仍然有一小段时间。同时RabbitMQ不会给每一条message执行fsync(2) -- 可能只是保存到了cache而没有写到磁盘上去。所以持久的保证也不是非常强,然后对我们简单的task queue来说则足够了。如果需要一个非常强的保证,则可以使用发布确认的方式。

Fair 分派

你可能已经注意到分派的工作没有如我们所期望的来执行。比如在有2个worker的情况系,所有偶数的message耗时很长,而所有奇数的message则耗时很短,这样其中一个worker则一直被分派到偶数的message,而另一个则一直是奇数的message。RabbitMQ对此并不知晓,进而继续这样分派着message。

这样的原因是RabbitMQ是在message入queue的时候确定分派的。它不关心消费者ack的情况。
【译】RabbitMQ系列(二)-Work模式

我们可以通过basicQos方法和prefetchCount(1)来解决这个问题。这个设置是让RabbitMQ给worker一次一个message。或者这么说,直到worker处理完之前的message并发送ack,才给worker下一个message。否则,Rabbitmq会将message发送给其它不忙的worker。

int prefetchCount = 1;
channel.basicQos(prefetchCount);
注意queue的大小。如果所有的worker都处于忙碌状态,queue可能会被装满。必须监控queue深度,可能要开启更多的worker,或者采取其他的措施。

开始执行

NewTask.java的最终版本

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        String message = String.join(" ", argv);

        channel.basicPublish("", TASK_QUEUE_NAME,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
    }
  }

}

Worker.java的最终版本

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Worker {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
            doWork(message);
        } finally {
            System.out.println(" [x] Done");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    };
    channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
        if (ch == '.') {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }
  }
}

使用message ack和prefetchCount,来设定work queue。持久化选项则在RabbitMQ重启后能让任务得以恢复。

相关推荐