当先锋百科网

首页 1 2 3 4 5 6 7

1. RabbitMQ的优点和缺点

优点

解耦:系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码

异步:将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度

削峰:并发量大的时候,请求可以先转发到消息队列中,然后MQ再处理请求

缺点

系统可用性降低:如果MQ出问题,导致业务系统出问题

系统复杂性增加:要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输等

2. RabbitMQ基础概念介绍

Broker:rabbitmq的服务节点

ConnectionFactory:制造connection的工厂

Connection:应用程序与Server的网络连接,TCP连接

Channel:消息读写等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务。

Queue:Queue(队列)是RabbitMQ的内部对象,用于存储消息

Message acknowledgment:消费者收到Queue中的消息,但没有处理完成就宕机,可能会导致消息丢失。为了避免这种情况发生,消费者在消费完后需要发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在超时的概念,哪怕消费者处理的时间再长,除非它的RabbitMQ连接断开。

Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂。有Properties和Body组成。Properties为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容

Message durability:我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失;(RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了,这种小概率事件只能用MQ事务解决)

Prefetch count:如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。但是每个消费者的处理效率不一样,这时我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。

Exchange:生产者不会直接发消息到队列,而是将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃);RabbitMQ中常用的Exchange有四种类型,不同的类型有着不同的路由策略

  • direct:该类型的交换器将所有发送到该交换器的消息转发到RoutingKey指定的队列中,也就是说路由到BindingKey和RoutingKey完全匹配的队列中。
  • topic:该类型的交换器将所有发送到Topic Exchange的消息转发到所有RoutingKey中指定的Topic的队列上面。Exchange将RoutingKey和某Topic进行模糊匹配;#:匹配一个或多个词,比如:audit.#:能够匹配audit.irs.corporate 或者 audit.irs*:匹配不多不少恰好1个词,比如:audit.*:只能匹配audit.irs;不同的单词用句号.分割开
  • fanout:该类型交换器不处理路由键,会把所有发送到交换器的消息路由到所有绑定的队列中。优点是转发消息最快,性能最好。
  • headers:该类型的交换器不依赖路由规则来路由消息,而是根据消息内容中的headers属性进行匹配。headers类型交换器性能差,在实际中并不常用

routing key:生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效,routing key设定的长度限制为255 bytes

Binding:RabbitMQ中通过Binding将Exchange与Queue关联起来

Binding key:在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中;在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。

Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue。

3. 如何确保消息正确的发送至MQ,如何确保消费者消费了消息

发送方确认模式:

将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID。一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个ack确认给生产者(包含消息唯一ID)。如果RabbitMQ发生内部错误从而导致消息丢失,会发送一条nack(not acknowledged,未确认)消息。发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。

接收方确认机制:

消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。这里并没有用到超时机制,RabbitMQ仅通过Consumer的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer足够长的时间来处理消息。

两种特殊情况:

  • 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要根据bizId去重)
  • 如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,将不会给该消费者分发更多的消息。

4. 如何避免消息重复投递或重复消费

在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列;在消息消费时,要求消息体中必须要有一个bizId(对于同一业务全局唯一,如支付ID、订单ID、帖子ID等)作为去重和幂等的依据,避免同一条消息被重复消费。

5. 如何解决丢数据的问题

生产者丢数据

  • transaction机制解决方案:发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit())。缺点就是吞吐量下降
  • confirm模式解决方案:进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。

消息队列丢数据

开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。

如何持久化

1、将queue的持久化标识durable设置为true,则代表是一个持久的队列
2、发送消息的时候将deliveryMode=2

消费者丢数据

启用手动确认模式可以解决这个问题

①自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让消息不断重试。

②手动确认模式,如果消费者来不及处理就死掉时,没有响应ack时会重复发送一条信息给其他消费者;如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常;如果对异常进行了捕获,但是没有在finally里ack,也会一直重复发送消息(重试机制)。

③不确认模式,acknowledge=“none” 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。

6. RabbitMQ事务消息

生产者使用事务进行发送消息:

  • channel.txSelect() 开启事务
  • channel.basicPublish 发送消息
  • channel.txCommit() 提交事务
  • channel.txRollback() 回滚事务

消费者也可以使用事务:

  • autoAck=false ,手动提交ack无效,会以事务提交或回滚为准
  • autoAck=true,不支持事务

7. RabbitMQ死信队列和延时队列

当一条消息在队列中出现以下三种情况的时候,该消息就会变成一条死信

  • 消息被拒绝(Basic.Reject或Basic.Nack)并且设置 requeue 参数的值为 false
  • 消息在队列的存活时间超过了设置的TTL时间
  • 队列达到最大长度

当消息在一个队列中变成一个死信之后,如果配置了死信队列,它将被重新publish到死信交换机,死信交换机将死信投递到一个队列上,这个队列就是死信队列

过期消息:

在 rabbitmq 中存在2种方可设置消息的过期时间,第一种通过对队列进行设置,这种设置后,该队列中所有的消息都存在相同的过期时间,第二种通过对消息本身进行设置,那么每条消息的过期时间都不一样。如果同时使用这2种方法,那么以过期时间小的那个数值为准。当消息达到过期时间还没有被消费,那么那个消息就成为了一个 死信 消息。

队列设置:在队列申明的时候使用 x-message-ttl 参数,单位为 毫秒
单个消息设置:是设置消息属性的 expiration 参数的值,单位为 毫秒

延时队列:在rabbitmq中不存在延时队列,但是我们可以通过设置消息的过期时间和死信队列来模拟出延时队列。消费者监听死信交换器绑定的队列,而不要监听消息发送的队列。

8. RabbitMQ集群

rabbitmq是依据erlang的分布式特性(RabbitMQ底层是通过Erlang架构来实现的,所以rabbitmqctl会启动Erlang节点,并基于Erlang节点来使用Erlang系统连接RabbitMQ节点,在连接过程中需要正确的Erlang Cookie和节点名称,Erlang节点通过交换Erlang Cookie以获得认证)来实现的,所以部署rabbitmq分布式集群时要先安装erlang,并把其中一个服务的cookie复制到另外的节点。

RabbitMQ的Cluster集群模式一般分为两种,普通模式和镜像模式

普通模式下,以两个节点(rabbit01、rabbit02)为例来进行说明。对于Queue来说,消息实体只存在于其中一个节点rabbit01(或者rabbit02),rabbit01和rabbit02两个节点仅有相同的元数据,即队列的结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit01、rabbit02间进行消息传输,把A中的消息实体取出并经过B发送给consumer。

镜像模式下,将需要消费的队列变为镜像队列,存在于多个节点,这样就可以实现RabbitMQ的HA高可用性。作用就是消息实体会主动在镜像节点之间实现同步,而不是像普通模式那样,在consumer消费数据时临时读取。缺点就是,集群内部的同步通讯会占用大量的网络带宽。