当先锋百科网

首页 1 2 3 4 5 6 7

目录

一、简述RabbitMQ的架构设计

 交换器类型

二、RabbitMQ如何确保消息发送 ? 消息接收?

发送方确认机制

接收方确认机制

三、RabbitMQ事务消息

四、RabbitMQ死信队列、延时队列

五、Rabbitmq的持久化机制

六、RabbitMQ如何保证消息的可靠性传输

七、Rabbitmq的普通集群原理

元数据

为什么只同步元数据

集群节点类型

八、Rabbitmq的镜像队列原理


一、简述RabbitMQ的架构设计

Broker rabbitmq 的服务节点。
Queue :队列,是 RabbitMQ 的内部对象,用于存储消息。RabbitMQ中消息只能存储在队列中。生产者投递消息到队列,消费者从队列中获取消息并消费。多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊 ( 轮询 ) 给多个消费者进行消费,而不是每个消费者都收到所有的消息进行消费。 (注意: RabbitMQ 不支持队列层面的广播消费,如果需要广播消费,可以采用一个交换器通过路由Key绑定多个队列,由多个消费者来订阅这些队列的方式。
Exchange :交换器。生产者将消息发送到 Exchange,由交换器将消息路由到一个或多个队列中。如果路由不到,或返回给生产者,或直接丢弃,或做其它处理。
RoutingKey :路由 Key 。生产者将消息发送给交换器的时候,一般会指定一个 RoutingKey,用来指定这个消息的路由规则。这个路由 Key 需要与交换器类型和绑定键 (BindingKey)联合使用才能最终生效。
在交换器类型和绑定键固定的情况下,生产者可以在发送消息给交换器时通过指定 RoutingKey来决定消息流向哪里。
Binding :通过绑定将交换器和队列关联起来,在绑定的时候一般会指定一个绑定键,这样RabbitMQ就可以指定如何正确的路由到队列了。
交换器和队列实际上是多对多关系。就像关系数据库中的两张表。他们通过 BindingKey 做关联 (多对多关系表 ) 。在投递消息时,可以通过 Exchange RoutingKey( 对应 BindingKey)就可以找到相对应的队列。
信道 :信道是建立在 Connection 之上的虚拟连接。当应用程序与 Rabbit Broker 建立 TCP连接的时候,客户端紧接着可以创建一个 AMQP 信道 (Channel) ,每个信道都会被指派一个唯一的 D RabbitMQ 处理的每条 AMQP 指令都是通过信道完成的。信道就像电缆里的光纤束。一条电缆内含有许多光纤束,允许所有的连接通过多条光线束进行传输和接收。
vhost :虚拟主机,每一个应用可以指定不同的 vhost ,此时对于应用来说、 vhost 就是 broker。可以用于不同环境,如开发环境、测试环境,如用户A使用,用户B使用,进行隔离。

 

 交换器类型

direct 点对点:消息中的路由键和绑定规则中的路由key一致,交换器就会把消息发到对应的队列,是完全匹配才会发送,一个发送者和接收者,多个接受者我们用的点对点。

fanout 广播:每个fanout类型的交换器会将消息分发到所有绑定的队列上。不处理路由键 速度最快用的最多。

topic 通过模糊匹配分配消息的路由键。将路由键和某个模式进行匹配 单词 用点分割 *一个单词 # 多个单词。

header 性能较差,一般不用。

二、RabbitMQ如何确保消息发送 ? 消息接收?

发送方确认机制

信道需要设置为 confirm 模式,则所有在信道上发布的消息都会分配一个唯一 ID

一旦消息被投递到 queue(可持久化的消息需要写入磁盘),信道会发送一个确认给生产者(包含消息唯一ID )。
如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack (未确认)消息给生产者。
所有被发送的消息都将被 confirm (即 ack ) 或者被 nack 一次。
但是没有对消息被 confirm 的快慢做任何保证,并且同一条消息不会既被 confirm 又被 nack。
发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者,生产者的回调方法会被触发。
ConfirmCallback 接口:只确认是否正确到达 Exchange 中,成功到达则回调。
ReturnCallback 接口:消息失败返回时回调。

接收方确认机制

消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(或者磁盘,持久化消息)中移去消息。否则,消息被消费后会被立即删除。

消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息, RabbitMQ 才能安全地把消息从队列中删除。
RabbitMQ 不会为未 ack的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是 RabbitMQ允许消费者消费一条消息的时间可以很长。保证数据的最终一致性;
如果消费者返回 ack 之前断开了链接, RabbitMQ 会重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重)

三、RabbitMQ事务消息

通过对信道的设置实现
1. channel.txSelect() ;通知服务器开启事务模式;服务端会返回 Tx.Select-Ok
2. channel.basicPublish ;发送消息,可以是多条,可以是消费消息提交 ack
3. channel.txCommit() 提交事务;
4. channel.txRollback() 回滚事务;
消费者使用事务:
1. autoAck=false ,手动提交 ack ,以事务提交或回滚为准;
2. autoAck=true,不支持事务的,也就是说你即使在收到消息之后在回滚事务也是于事无补的,队列已经把消息移除了。如果其中任意一个环节出现问题,就会抛出 IoException异常,用户可以拦截异常进行事务回滚,或决定要不要重复消息。
事务消息会降低 rabbitmq 的性能。

四、RabbitMQ死信队列、延时队列

  1. 消息被消费方否定确认,使用 channel.basicNack channel.basicReject ,并且此时requeue 属性被设置为 false
  2. 消息在队列的存活时间超过设置的TTL时间。
  3. 消息队列的消息数量已经超过最大队列长度。
那么该消息将成为 死信 死信 消息会被 RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由 key,死信队列只不过是绑定在死信交换机上的队列,死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型【 Direct、 Fanout Topic
TTL :一条消息或者该队列中的所有消息的最大存活时间。
如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL设置的时间内没有被消费,则会成为 死信 。如果同时配置了队列的 TTL 和消息的 TTL ,那么较小的那个值将会被使用。
只需要消费者一直消费死信队列里的消息。
延迟队列一般使用插件的形式,设置延迟时间,到时间后消息才会到达队列被消费者消费。

五、Rabbitmq的持久化机制

  1. 交换机持久化:exchange_declare创建交互机时通过参数指定。
  2. 队列持久化:queue_declare创建队列时通过参数指定。
  3. 消息持久化:new AMQPMessage创建消息时通过参数指定。
append 的方式写文件,会根据大小自动生成新的文件, rabbitmq启动时会创建两个进程,一个负责持久化消息的存储,另一个负责非持久化消息的存储 ( 内存不够时)
消息存储时会在 ets 表中记录消息在文件中的映射以及相关信息(包括id、偏移量,有效数据,左边文件,右边文件),消息读取时根据该信息到文件中读取、同时更新信息。
消息删除时只从 ets 删除,变为垃圾数据,当垃圾数据超出比例(默认 50% ),并且文件数达到3个,触发垃圾回收,锁定左右两个文件,整理左边文件有效数据、将右边文件有效数据写入左边,更新文件信息,删除右边,完成合并。当一个文件的有用数据等于 0 时,删除该文件。
写入文件前先写 buffer 缓冲区,如果 buffer 已满,则写入文件 ( 此时只是操作系统的页存 )
每隔 25ms 刷一次磁盘,不管 buffer 满没满,都将 buffer 和页存中的数据落盘。
每次消息写入后,如果没有后续写入请求,则直接刷盘。

六、RabbitMQ如何保证消息的可靠性传输

1、使用事务消息
2、使用消息确认机制
发送方确认:
channel 设置为 confirm 模式,则每条消息会被分配一个唯一 id。
消息投递成功,信道会发送 ack 给生产者,包含了 id ,回调 ConfirmCallback 接口。
如果发生错误导致消息丢失,发生 nack 给生产者。回调 ReturnCallback 接口。
ack nack 只有一个触发,且只有一次,异步触发。可以继续发送消息。
接收方确认:
声明队列时,指定 noack=false broker 会等待消费者手动返回 ack、才会删除消息,否则立刻删除。
broker ack 没有超时机制,只会判断链接是否断开,如果断开、消息会被重新发送。

七、Rabbitmq的普通集群原理

 

节点直接同步元数据。

元数据

  • 队列元数据:队列名称和它的属性。
  • 交换器元数据:交换器名称、类型和属性。
  • 绑定元数据:一张简单的表格展示了如何将消息路由到队列。
  • vhost元数据:为vhost内的队列、交换器和绑定提供命名空间和安全属性。

为什么只同步元数据

存储空间,每一个节点都保存全量数据,影响消息堆积能力。
性能,消息的发布者需要将消息复制到每一个集群节点。
客户端连接的是非队列数据所在节点:则该节点会进行路由转发,包括发送和消费。

集群节点类型

磁盘节点:将配置信息和元信息存储在磁盘上。
内存节点:将配置信息和元信息存储在内存中。性能优于磁盘节点。依赖磁盘节点进行持久化。
RabbitMQ要求集群中至少有一个磁盘节点当节点加入和离开集群时,必须通知磁盘节点(如果集群中唯一的磁盘节点崩溃了,则不能进行创建队列、创建交换器、创建绑定、添加用户、更改权限、添加和删除集群节点)。如果唯一磁盘的磁盘节点崩溃,集群是可以保持运行的,但不能更改任何东西。因此建议在集群中设置两个磁盘节点,只要一个可以,就能正常操作。

八、Rabbitmq的镜像队列原理

GM负责消息的广播,所有的GM组成gm_group,形成链表结构,负责监听相邻节点的状态,以及传递消息到相邻节点,master的GM收到消息时代表消息同步完成。
mirror_queue_master/slave负责消息的处理,操作blockingQueue Queue 负责 AMQP协议( commit rollback ack 等)
master处理读写