当先锋百科网

首页 1 2 3 4 5 6 7

首先下载并安装rabbitmq,与springboot整合:
pom依赖:

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

配置文件:

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
         http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
         http://www.springframework.org/schema/rabbit
         http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">

    <!--配置connection-factory,指定连接rabbit server参数 -->
    <rabbit:connection-factory id="rabbitConnectionFactory"
                               username="${rabbit_username}"
                               password="${rabbit_password}"
                               host="${rabbit_host}"
                               port="${rabbit_port}" />

    <!--定义rabbit template用于数据的接收和发送 -->

    <rabbit:template id="amqpTemplate"  connection-factory="rabbitConnectionFactory"
                     exchange="exchangeTest" />

    <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
    <rabbit:admin connection-factory="rabbitConnectionFactory" />

    <!--定义queue -->
    <rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" />

    <!-- 定义direct exchange,绑定queueTest -->
    <rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="queueTest"  key="queueTestKey"> </rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

</beans>

消息生产者:

@RequestMapping(value = "/send",method = RequestMethod.GET)
public void send(@RequestParam String message) {
    amqpTemplate.convertAndSend("queueTestKey",message);
}

消息消费者

@Component
public class testListener implements MessageListener {
    @Override
    @RabbitListener(queues = "queueTest")
    public void onMessage(Message message) {
        System.out.println("------消费者处理消息------");
        System.out.println("receive message" + message);
    }
}

其中Spring AMQP提供了一个发送和接收消息的操作模板类AmqpTemplate。 AmqpTemplate它定义包含了发送和接收消息等的一些基本的操作功能。RabbitTemplate是AmqpTemplate的一个实现。

消息的可靠性在四个步骤中都有所体现:
1.生产者到交换机
2.交换机到对应队列
3.消息在队列上存储
4.消费者成功消费

对应的解决:
1.事务/confirm机制
(confirm可以异步发送消息,不必等待消费者的返回)
2.mandatory/备份交换机
(mandatory需要添加ReturnListener的编程逻辑,生产者的代码将变得复杂化)
3.声明队列时将durable参数置为true
4.autoAck参数,分为了自动确认和手工确认

下文将介绍其中confirm机制和手动ack机制