当先锋百科网

首页 1 2 3 4 5 6 7

官方文档:
https://www.rabbitmq.com/consumer-prefetch.html
https://www.rabbitmq.com/confirms.html#channel-qos-prefetch

【问题】

测试”消息积压“场景:在消费者没有启动的情况下,生产者先生产很多消息。然后先开启一个a消费者,再开启b消费者,发现只有a消费者不断的消费旧的消息,而b消费者”无动于衷“。。。
后面再生成新消息,b消费者确实能帮忙消费一下新消息。也就是说,直到新消息产生后b队列它才开始消费。这是为什么?

这就涉及到Consumer Prefetch(消费者预取)概念。

对于大多数消费者来说,限制该窗口的大小以避免消费者端的缓冲区(堆)无限制增长问题是有意义的。
这可以通过使用 basic.qos 方法设置 "预取计数 "值来实现。该值定义了通道上允许的最大未确认交付次数。当数量达到配置的计数时,RabbitMQ 将停止在通道上交付更多消息,直到至少有一条未确认的消息被确认。
QoS 预取设置对使用 basic.get(“pull API”)获取的报文没有影响,即使在手动确认模式下也是如此。

#值为 0 表示 "无限制",允许任何数量的未确认信息:
channel.basicQos(0); // No limit for this consumer

#最多可同时接收 10 条未确认的信息:
channel.basicQos(10); // Per consumer limit 10
#QoS 设置可针对特定通道或特定消费者进行配置。
channel.basicQos(10, false); // Per consumer limit 通常我们都是写false,针对每个消费者
channel.basicQos(15, true);  // Per channel limit

有了上面这个知识补充后,继续我之前对”消息积压“测试。这次我在代码中加入:

$channel->basic_qos(null,5,false); //每个消费者最多可同时接收5条未确认的信息

然后先停止所有队列,再生产6个消息,此时积压消息数量为6。先启动a消费者,隔1秒再启动b消费者,每条消息处理时间为5秒。看到的结果是a消费者消费了5条消息,b消费者消费了1条信息。
显然,结果符合我们的预期。因为,a消费者一开始启动,就同时接收5条消息。因为每条消息处理时间为5秒。所以在a消费者第一条消息处理完成并确认之前,b消费者已启动,并接收了仅存的1条消息。

我用的扩展是php-amqplib,代码示例:

//提示: NOT_IMPLEMENTED - prefetch_size!=0 (2)(60, 10)
//$channel->basic_qos(2,2,false);
//正确用法:https://github.com/php-amqplib/php-amqplib/blob/master/demo/basic_qos.php
$channel->basic_qos(null,5,false);//每个消费者最多可同时接收 5 条未确认的信息

【消费者确认模式、预取和吞吐量】
确认模式和 QoS 预取值对消费者吞吐量有显著影响。一般来说,增加预取值会提高向用户发送信息的速度。自动确认模式可获得最佳传输速率。不过,在这两种情况下,已交付但尚未处理的信息数量也会增加,从而增加用户 RAM(内存)消耗。
应谨慎使用自动确认模式或无限制预取的手动确认模式。如果消费者在未确认的情况下消耗大量信息,将导致其所连接节点的内存消耗增长。寻找合适的预取值需要反复试验,不同的工作负载会有不同的预取值。
100 到 300 之间的值通常能提供最佳的吞吐量,而且不会给用户带来过大的压力。更高的值通常会遇到收益递减规律。
预取值为 1 是最保守的值。它会大大降低吞吐量,尤其是在消费者连接延迟较高的环境中。对于许多应用而言,更高的值是合适和最佳的。