当先锋百科网

首页 1 2 3 4 5 6 7

安装及配置RabbitMQ

使用docker安装:
//拉取镜像

docker pull rabbitmq:3.6.5-management

//创建RabbitMQ容器

docker run -id --name=rabbitmq --restart=always -p 5670:5670 -p 15670:15670 -p 5674:5674 -p 15674:15674 --publish 5671:5671 --publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 rabbitmq:3.6.5-management

//进入容器内部

docker exec -it rabbitmq /bin/bash

//开启相关的插件

rabbitmq-plugins enable rabbitmq_stomp rabbitmq_web_stomp rabbitmq_web_stomp_examples

//退出容器

exit

//重启容器

docker restart rabbitmq

1. MQ是什么?作用?

  1. 解耦,降低系统复杂度
  2. 异步,提高响应速度
  3. 削峰填谷 , 高并发访问

2. 什么情况下,系统采用MQ

  1. 不需要返回数据,增删改
  2. 允许短暂的不一致
  3. 大型的高并发,大数据量的应用

RabbitMQ的工作模式:

1.简单模式:
没有交换机,使用""默认交换机,只有一个消费者.一个生产者发送消息,被一个消费者消费.

2.工作队列模式:
在这里插入图片描述
一个生产者发送消息,多个消费者轮询接受(一人一次)

3.pub/sub订阅发布模式
在这里插入图片描述
群发的效果,交换机定义fanout(),所有绑定在fanout类型的交换机上的队列,都会收到消息

4.Route 路由模式
绑定:指定队列名称,交换机名称,routing key(匹配的规则)
一个交换机和队列,可以绑定多个routing key
生产者:定义交换机,指定名称,指定类型为 Direct 类型。发送消息(交换机名称,“routing key”,消息内容)
特点: 发送的消息了,根据规则routing key,可以选择性的发送到某些队列,或者是全部队列。

5.Topice 通配符模式
在这里插入图片描述
绑定: 可以使用通配符指定routing key,*表示一个任意字符串,#表示0个或者多个字符串
生产者:定义交换机,指定名称,指定类型为 TOPIC 类型。发送消息(交换机名称,“routing key”,消息内容) 注意: 发消息的时候,routing key要固定,根据发消息的时候routing key去匹配绑定的时候通配符。
配置类可以用通配符
特点:功能上和路由模式一样的,简化了绑定的流程,并且更加的灵活。

在这里插入图片描述

使用RabbitMQ

1.生产者导入POM坐标

 <!--springboot工程需要继承的父工程-->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.8.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

2.创建springboot启动类

@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class,args);
    }
}

3.创建RabbitMQ配置类

@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME ="boot_topic_exchange";
    public static final String QUEUE_NAME ="boot_queue1";
    //1.配置交换机
    @Bean("bootExchange")
    public Exchange bootExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }
    //2.配置Queue队列
    @Bean("bootQueue")
    public Queue bootQueue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }
    //3.队列和交换机绑定关系 binding
    /*  1.知道哪个队列
        2.知道哪个交换机
        3.routing key
    * */
    @Bean
    public Binding bindingExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
    }

4.配置文件yml

spring:
  rabbitmq:
    host: 192.168.200.129 # ip
    port: 5672
    username: guest
    password: guest
    virtual-host: /

5.测试类生产方发送信息

@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
    //1.注入RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSend(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","boot..mq.hello~~~~~");
    }
}

消费方

1.导入POM坐标

 <!--springboot工程需要继承的父工程-->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.8.RELEASE</version>
    </parent>
    <dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    </dependencies>

2.yml配置文件同上
3.编写启动类
4.消费方消费生产方

@Component
public class RabbitmqConsumer {
    @RabbitListener(queues = "boot_queue1")
    public void ListenerQueue(Message message){
        System.out.println(message);
    }
}

RabbitMQ 高级

如何保证消息的高可靠性传输?

1.持久化

• exchange(交换机)要持久化

• queue(队列)要持久化

• message要持久化

2.生产方确认Confirm、return 回退模式

​Confirm: 确保生产者能够将消息发送到交换机。通过ConfirmCallBack,其中的方法得知消息是否到达了交换机。

return :当消息从交换机发送到队列中的时候,失败了有回调。通过ReturnCallBack,只有当发送失败的时候才会调用。
在这里插入图片描述

3.消费方确认Ack

消费者接收消息的时候,能够在处理成功的时候,手动签收消息。如果处理失败,抛出异常等情况,可以拒签消息,消息会再次发送给消费者。

4.Broker高可用(集群):

搭建rabbitMQ集群

  1. 安装多个rabbitMQ节点(服务器)
  2. 配置镜像节点,同步所有节点之间的信息
  3. 安装HAProxy,配置所有节点的负载均衡,使用MQ连接HAProxy
如何进行消费端限流?为什么要限流?
  1. 开启ack消息确认机制
  2. 配置消费端限流,prefetch=“同时处理的消息个数”
  3. 如果消费端不进行消息的确认,就不在接收消息。
    默认情况下消息的消费者会一次性将MQ中累积的大量的消息全部拉取到自己的服务,导致服务在短时间内会处理大量的业务,可能会导致系统服务的崩溃。 所以消费端限流是非常有必要的。
消息成为死信队列的三种情况

1.队列消息长度到达限制
2.消费者拒接消费信息,并且不重回队列
3.原队列存在消息过期的设置,消息到达超时时间而未被消费
在这里插入图片描述

延迟队列

即消息进入队列后不会立即被消费,只用到达指定时间后才会被消费
提出需求:

  1. 下单后,30分钟未支付,取消订单,回滚库存。
  2. 新用户注册成功7天后,发送短信问候。
    TTL+死信队列组合实现
    在这里插入图片描述
    死信交换机,过期时间,路由key
    在这里插入图片描述

RabbitMQ应用问题

消息可靠性保障

如何能够保证消息的 100% 发送成功?

在这里插入图片描述
消息业务方面的可靠性?
消息补偿机制
保证业务逻辑正常完成,除了保证能够正常的发送,接收消息以外,还能够保证基于消息队列的业务成功。
需求:用户下单,订单系统生成订单,发送消息,通知库存系统。库存系统接收消息,扣减库存。

实现:

  1. 订单系统完成本地订单生成的操作之后,发送消息(订单ID,商品ID,商品数量)

  2. 库存系统接收该消息,操作数据库,扣减库存。

  3. 库存系统完成扣减库存,立刻向 消息检查系统 发送已经完成扣减操作。消息检查系统就会将当前消息存入检测系统的数据库。

  4. 订单系统 延迟一段时间(预估库存系统能够完成操作的时间),向消息检查系统,发送一个和刚才扣减库存相同的消息。

  5. 消息检查系统 再次接收到 订单系统 延迟消息之后,去检测系统的数据库检查是否已经存在了该消息的处理结果。

    1. 如果存在当前的消息,说明操作正常。结束。
      2. 如果没有当前的消息,说明操作失败。再次调用订单服务,再次发送调用库存的消息。

    2. 如果库存系统和延迟消息都失败的情况下,检测系统的数据库就没有消息。

    设置定时任务: 订单数据库 和 检测数据库 的数据是否一致。再次调用订单服务,再次发送调用库存的消息。

    1. 如果存在一致不成功的信息,记录一个重发次数,如果达到了3次。通知管理员,人工干预。
      问题: 如果消息检查系统消息出现了问题,就会导致消息不停的重发,但是业务已经执行成功了?怎么处理?
通过乐观锁,来保障消息的幂等性

扣减库存
库存表
id  goods  count   version
1    手机    10      2


消息内容
orderid=101, goodsid=1,count=2,version=2 (要扣减前的库存表中的version一致)

库存系统,业务操作
update 库存表 set count = count-2,version=version+1 where id =1 and version=2;

库存表
id  goods  count   version
1    手机    8      3

update 库存表 set count = count-2,version=version+1 where id =1 and version=2;
mybatis:
update 库存表 set count = count-?,version=version+1 where id =? and version=?;

思考题: 增删改。 改,删除,新增如何保证幂等性?

新增的幂等性,就是不要使用数据库的主键自增。UUID,雪花算法。