当先锋百科网

首页 1 2 3 4 5 6 7

MQ(Message Queue)

消息队列是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。 ——维基百科

消息队列的简单架构:如图所示,先进先出。
在这里插入图片描述
Producer:消息生产者,负责产生和发送消息到 Broker;
Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue;
Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理。

MQ的特点
①异步,消息队列是异步的,它允许消费者在生产者发出信息很长时间后再进行处理,而且生产者不需要等待消费者回应,应用方式即将一些非必要的业务逻辑通过写入消息队列的方式以异步方式运行,减少了主逻辑的响应时间,加快整个逻辑的响应速度;产生的缺点,消费者必须轮询消息队列才能接收到最近的消息。

②解耦,消息队列减少了服务之间的耦合性,不同的服务可以通过消息队列进行通信,而不用关心彼此的实现细节(产生的异常也只与异常双方有关,不会牵连到第三方),只需要事先商定消息的格式,这样做的好处是各个系统/服务之间不需要相互调用,且在有新系统接入时,不需要修改代码,只需要让新系统读取消息队列即可。

③削峰/限流,当上下游的处理能力存在差异时,使用消息队列做一个中间载体,消费者按照自己的逻辑处理能力从消息队列中拉取任务执行,如此不会造成突然的高并发引起系统/数据库的崩溃,虽然会造成任务暂时积压,但在生产中,这个短暂的高峰期积压是允许的。

适合的应用场景:对应于MQ的特点,当业务逻辑需要异步/解耦/限流时,引入消息队列的机制。
不适合的应用场景:当调用方实时依赖调用结果时,不能使用MQ。

使用MQ可能遇到的问题
①可用性降低,在原有的系统基础之上添加了新的组件,增加了故障的可能性,因此可用性降低,一般项目中的MQ都是集群/分布式的。

②消息重复消费,正常情况下,消费者在消费消息时候,消费完毕后,会发送一个确认信息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除;不同的消息队列发送的确认信息形式不同(如RabbitMQ发送一个ACK确认消息,RocketMQ返回一个CONSUME_SUCCESS成功标志,kafka的每一个消息都有一个offset,kafka消费过消息后,需要提交offset,让消息队列知道自己已经消费过了),当因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息时,其会再次将该消息分发给其他的消费者;这个问题的解决方案一般根据业务场景的不同可以灵活变化,例如:
Ⅰ拿到这个消息做数据库的insert操作,可以给这个消息做一个唯一主键,那么就算出现重复消费的情况,会导致主键冲突,避免数据库出现脏数据;
Ⅱ拿到这个消息做redis的set的操作,那么无需操作解决,set操作本身就是幂等操作;
Ⅲ也可以准备一个第三方介质,来做消费记录,以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis,那消费者开始消费前,先去redis中查询有没消费记录即可。

③消息的可靠性传输,每种MQ都要从三个角度来分析:生产者弄丢数据、消息队列弄丢数据、消费者弄丢数据。
以RabbitMQ为例:
Ⅰ生产者丢数据,RabbitMQ提供transaction(开启事务,出现异常则回滚,但会影响吞吐量)和confirm(所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者,否则为Nack,可以进行重试)模式来确保生产者不丢消息。
Ⅱ消息队列丢数据,一般是通过开启持久化磁盘的配置进行备份,重启后也可恢复数据。
Ⅲ消费者丢数据,一般是因为采用了自动确认消息模式,这种模式下,消费者会自动确认收到信息,这时rahbitMQ会立即将消息删除,如果消费者出现异常而没能处理该消息,就会丢失该消息。

④保证消息的顺序性,一般是通过一些算法,将需要保持先后顺序的消息放到同一个消息队列中,只用一个消费者去消费该队列。

⑤消费者从消息队列中拿取消息的方法,一般分为两种,消息队列有数据时主动通知消费者(push),消费者不断轮询消息队列查看消息状态(pull)。

RabbitMQ

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

AMQP
Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

AMQP的一些基本概念
在这里插入图片描述
①Message 消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

②Publisher 消息的生产者,也是一个向交换器发布消息的客户端应用程序。

③Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

④Binding 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

⑤Queue 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

⑥Connection 网络连接,比如一个TCP连接。

⑦Channel 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

⑧Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

⑨Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定。

⑩Broker 表示消息队列服务器实体。

RabbitMQ的技术亮点
①可靠性,RabbitMQ提供了多种技术可以让你在性能和可靠性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证实和高可用性机制。

②灵活的路由,消息在到达队列前是通过交换机进行路由的。RabbitMQ为典型的路由逻辑提供了多种内置交换机类型。如果你有更复杂的路由需求,可以将这些交换机组合起来使用,你甚至可以实现自己的交换机类型,并且当做RabbitMQ的插件来使用。

③集群,在相同局域网中的多个RabbitMQ服务器可以聚合在一起,作为一个独立的逻辑代理来使用。

④联合,对于服务器来说,它比集群需要更多的松散和非可靠链接。为此RabbitMQ提供了联合模型。

⑤高可用,在同一个集群里,队列可以被镜像到多个机器中,以确保当其中某些硬件出现故障后,你的消息仍然安全。

⑥多协议,RabbitMQ 支持多种消息协议的消息传递。

⑦广泛的客户端,只要是你能想到的编程语言几乎都有与其相适配的RabbitMQ客户端。

⑧可视化管理工具,RabbitMQ附带了一个易于使用的可视化管理工具,它可以帮助你监控消息代理的每一个环节。

⑨追踪,如果你的消息系统有异常行为,RabbitMQ还提供了追踪的支持,让你能够发现问题所在。

⑩插件系统,RabbitMQ附带了各种各样的插件来对自己进行扩展,也可以写自己的插件来使用。
且围绕着RabbitMQ有一个大型的成熟的社区,有着各种各样的客户端、插件、指南等等。

RabbitMQ的一些重要概念
①生产者/生产:producer/producing,队列:queue,消费者/消费:consumer/consuming/receiving,这些概念与MQ中的概念相同,其中消费与接收是同一个意思。

②工作队列:Task Queues,它可以并行的处理队列,即可以将消息分发给多个consumer,默认的模式为轮询(round-robin),即按顺序把消息发送给每个消费者。

③消息确认:Message acknowledgment,在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失,为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。此处注意如果忘记确认,则Queue中堆积的消息会越来越多,消费者重启后会重复消费这些消息并重复执行业务逻辑。

④消息持久化:Message durability,如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。但依然解决不了小概率丢失事件的发生(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果一定要保证持久化,需要使用事务。

⑤公平调度:Prefetch count,在默认情况下,如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况,可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。

⑥交换机:Exchange,在RabbitMQ中,生产者将消息发送到Exchange(如图所示X),由Exchange将消息路由到一个或多个Queue中(或者丢弃),交换机处理消息的规则是通过交换机类型(exchange type)来定义的。
在这里插入图片描述
⑦routing key,生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。 在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。 RabbitMQ为routing key设定的长度限制为255 bytes。

⑧Binding,RabbitMQ中通过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。
在这里插入图片描述
⑨Binding key,在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。这个将在Exchange Types章节会列举实际的例子加以说明。 在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。 binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。

⑩Exchange Types,分为四种:
Ⅰfanout,fanout类型的Exchange路由规则会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,即每个queue中的消息都相同。
Ⅱdirect,direct类型的Exchange路由规则会把消息路由到那些binding key与routing key完全匹配的Queue中,在direct中,允许多个绑定(Multiple bindings),即两个队列的binding key允许相同,带有与binding key相同的routing key的消息会被广播到多个匹配的队列。
在这里插入图片描述
Ⅲtopic,topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但其匹配规则不同:1routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
2binding key与routing key一样也是句点号“. ”分隔的字符串
3binding key中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
Ⅳheaders,headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。 在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。

远程过程调用
RPC(Remote procedure call) MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。 但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。

RabbitMQ中RPC的工作机制
①当客户端启动的时候,它创建一个匿名独享的回调队列。
②在RPC请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 reply_to 属性,另一个是设置唯一值的 correlation_id 属性。
③将请求发送到一个 rpc_queue 队列中。
④RPC工作者(又名:服务器)等待请求发送到这个队列中来。当请求出现的时候,它执行他的工作并且将带有执行结果的消息发送给reply_to字段指定的队列。
⑤客户端等待回调队列里的数据。当有消息出现的时候,它会检查correlation_id属性。如果此属性的值与请求匹配,将它返回给应用。
在这里插入图片描述

redis作为MQ与RabbitMQ的对比
①可靠消费
Redis:没有相应的机制保证消息的消费,当消费者消费失败的时候,消息体丢失,需要手动处理;
RabbitMQ:具有消息消费确认,即使消费者消费失败,也会自动使消息体返回原队列,同时可全程持久化,保证消息体被正确消费。

②可靠发布
Reids:不提供,需自行实现;
RabbitMQ:具有发布确认功能,保证消息被发布到服务器。

③高可用
Redis:采用主从模式,读写分离,但是故障转移还没有非常完善的官方解决方案;
RabbitMQ:集群采用磁盘、内存节点,任意单点故障都不会影响整个队列的操作。

④持久化
Redis:将整个Redis实例持久化到磁盘;
RabbitMQ:队列,消息,都可以选择是否持久化。

⑤消费者负载均衡
Redis:不提供,需自行实现;
RabbitMQ:根据消费者情况,进行消息的均衡分发。

⑥队列监控
Redis:不提供,需自行实现;
RabbitMQ:后台可以监控某个队列的所有信息,(内存,磁盘,消费者,生产者,速率等)。

⑦流量控制
Redis:不提供,需自行实现;
RabbitMQ:服务器过载的情况,对生产者速率会进行限制,保证服务可靠性。

⑧应用场景分析
Redis:轻量级,高并发,延迟敏感即时数据分析、秒杀计数器、缓存等;RabbitMQ:重量级,高并发,异步批量数据异步处理、并行任务串行化,高负载任务的负载均衡等。

大部分内容转载自:
https://blog.csdn.net/Joker_Fei/article/details/89391297
https://blog.csdn.net/qq_34288630/article/details/79411529
https://blog.csdn.net/alinshen/article/details/80583214
https://mp.weixin.qq.com/s?__biz=MjM5ODYxMDA5OQ==&mid=2651960012&idx=1&sn=c6af5c79ecead98daa4d742e5ad20ce5&chksm=bd2d07108a5a8e0624ae6ad95001c4efe09d7ba695f2ddb672064805d771f3f84bee8123b8a6&mpshare=1&scene=1&srcid=04054h4e90lz5Qc2YKnLNuvY
https://github.com/jasonGeng88/blog/blob/master/201705/MQ.md
https://blog.csdn.net/hellozpc/article/details/81436980
http://rabbitmq.mr-ping.com/
https://www.sojson.com/blog/48.html
https://blog.csdn.net/lilun517735159/article/details/78824819
https://blog.csdn.net/piaoslowly/article/details/81625687