当先锋百科网

首页 1 2 3 4 5 6 7

基础架构

1.入门案例

在这里插入图片描述

生产者(Producer)示例

//      1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
//      2.设置参数
        factory.setHost("172.25.196.240");//默认为localhost
        factory.setPort(5672);//端口默认也是5672
        factory.setVirtualHost("/zzk");//设置虚拟机
        factory.setUsername("zzk");//用户名 默认guest
        factory.setPassword("200079111zzk");//密码 默认guest
//      3.创建连接
        Connection connection = factory.newConnection();
//      4.创建Channel
        Channel channel = connection.createChannel();
//      5.创建队列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        参数:
            queue:队列名称
            durable:是否持久化,即mq重启之后是否还在
            exclusive:
                是否独占,即是否只有一个消费者监听
                当connection关闭时,是否删除队列
            autoDelete:当没有consumer时是否自动删除
            arguments:参数
        */
        channel.queueDeclare("hello",true,false,false,null);
//      6.发送消息
        /*
        basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
        参数:
            1.exchange:交换机名称.简单模式下交换机默认
            2.routingKey:路由名称
            3.props:配置信息
            4.body:发送消息数据
        */
        channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());
//      7.释放资源
        channel.close();
        connection.close();

消费者(Consumer)示例

//      1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
//      2.设置参数
        factory.setHost("172.25.196.240");//默认为localhost
        factory.setPort(5672);//端口默认也是5672
        factory.setVirtualHost("/zzk");//设置虚拟机
        factory.setUsername("zzk");//用户名 默认guest
        factory.setPassword("200079111zzk");//密码 默认guest
//      3.创建连接
        Connection connection = factory.newConnection();
//      4.创建Channel
        Channel channel = connection.createChannel();
//      5.创建队列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        参数:
            queue:队列名称
            durable:是否持久化,即mq重启之后是否还在
            exclusive:
                是否独占,即是否只有一个消费者监听
                当connection关闭时,是否删除队列
            autoDelete:当没有consumer时是否自动删除
            arguments:参数
        */
        channel.queueDeclare("hello",true,false,false,null);
//      6.接收消息
        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1.queue: 队列名称
            2.autoAck: 是否自动确认
            3.callback: 回调对象
        */
        Consumer consumer = new DefaultConsumer(channel){
            /*回调方法
            1.consumerTag: 标识
            2.envelope: 获取一些信息,交换机
            3.properties: 配置信息
            4.body: 数据
            */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                super.handleDelivery(consumerTag, envelope, properties, body);
                System.out.println(consumerTag);
                System.out.println(envelope.getExchange());
                System.out.println(envelope.getRoutingKey());
                System.out.println(properties);
                System.out.println(new String(body));
            }
        };
        channel.basicConsume("hello",true,consumer);

2.Work queues

在这里插入图片描述
该模式下,在一个队列中多个消费者之间对于同一个消息的关系是竞争的关系。
对于任务过重或者任务较多的情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功即可。

3.Publish/Subscribe

在这里插入图片描述
PubSub模式通过交换机(exchange)对多个队列同时发送信息,达到一条生产者消息多个消费者共享的效果

生产者代码

//      1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
//      2.设置参数
        factory.setPort(5672);//端口默认也是5672
        factory.setVirtualHost("/zzk");//设置虚拟机
        factory.setUsername("zzk");//用户名 默认guest
        factory.setPassword("200079111zzk");//密码 默认guest
//      3.创建连接
        Connection connection = factory.newConnection();
//      4.创建Channel
        Channel channel = connection.createChannel();
//      5.创建交换机
        String exchangeName = "test_fanout";
        //广播模式
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
//      6.创建队列
        String queueName = "test_fanout_queue";
        String queueName2 = "test_fanout_queue2";
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
//      7.绑定队列和交换机
        channel.queueBind(queueName,exchangeName,"");
        channel.queueBind(queueName2,exchangeName,"");

//      8.发送消息
        String body = "Hello PubSub";
        channel.basicPublish(exchangeName,"",null,body.getBytes());
//      9.释放资源
        channel.close();
        connection.close();

4.Routing

在这里插入图片描述

Routing模式与Publish/Subscribe模式的区别是Routing模式下交换机与队列的绑定不再是任意绑定,而是指定一个RoutingKey(路由key)
生产者将消息向交换机(Exchange)发送消息时,携带了RoutingKey,交换机(Exchange)根据RoutingKey发送给RoutingKey相同的队列

        //      1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
//      2.设置参数
        factory.setPort(5672);//端口默认也是5672
        factory.setVirtualHost("/zzk");//设置虚拟机
        factory.setUsername("zzk");//用户名 默认guest
        factory.setPassword("200079111zzk");//密码 默认guest
//      3.创建连接
        Connection connection = factory.newConnection();
//      4.创建Channel
        Channel channel = connection.createChannel();
//      5.创建交换机
        String exchangeName = "test_direct";
        //广播模式
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
//      6.创建队列
        String queueName = "test_direct_queue";
        String queueName2 = "test_direct_queue2";
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
//      7.绑定队列和交换机
        channel.queueBind(queueName,exchangeName,"error");
        channel.queueBind(queueName2,exchangeName,"info");
        channel.queueBind(queueName2,exchangeName,"error");

//      8.发送消息
        String body = "Hello Routing";
        channel.basicPublish(exchangeName,"error",null,body.getBytes());
        channel.basicPublish(exchangeName,"info",null,body.getBytes());
//      9.释放资源
        channel.close();
        connection.close();

5.Topics

在这里插入图片描述
Topic模式可以实现Pub/Sub发布与订阅模式和Routing路由模式的功能,Topic在配置routingKey时可以使用通配符,更加灵活

//      1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
//      2.设置参数
        factory.setPort(5672);//端口默认也是5672
        factory.setVirtualHost("/zzk");//设置虚拟机
        factory.setUsername("zzk");//用户名 默认guest
        factory.setPassword("200079111zzk");//密码 默认guest
//      3.创建连接
        Connection connection = factory.newConnection();
//      4.创建Channel
        Channel channel = connection.createChannel();
//      5.创建交换机
        String exchangeName = "test_topic";
        //广播模式
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
//      6.创建队列
        String queueName = "test_topic_queue";
        String queueName2 = "test_topic_queue2";
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
//      7.绑定队列和交换机
//      例:routingKey 系统的名称.日志级别
//      需求:所有error级别的日志都会走队列1,所有order系统的日志都会走队列2
        channel.queueBind(queueName,exchangeName,"#.error");
        channel.queueBind(queueName2,exchangeName,"order.*");

//      8.发送消息
        String body = "Hello Topic";
        channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
        channel.basicPublish(exchangeName,"order.error",null,body.getBytes());
//      9.释放资源
        channel.close();
        connection.close();

SpringBoot整合RabbitMQ

发送方

创建交换机和队列

@Configuration
public class RabbitConfig {

    public static final String Exchange_name = "boot_topic_exchange";
    public static final String Queue_name = "boot_queue2";

    //1.交换机
    @Bean("bootExchange")
    public Exchange bootExchange(){
        return ExchangeBuilder.topicExchange(Exchange_name).durable(true).build();
    }

//  2. Queue 队列
    @Bean("bootQueue")
    public Queue bootQueue(){
        return QueueBuilder.durable(Queue_name).build();
    }

//  3.交换机和队列绑定
    @Bean
    public Binding binding(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("#.error").noargs();
    }
}

发送消息

@SpringBootTest
public class ProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test(){
        rabbitTemplate.convertAndSend(RabbitConfig.Exchange_name,"order.error","SpringBoot-Rabbit_Topics_Test");
    }
}

接收方

@Component
public class RabbitMQListener {

    @RabbitListener(queues = "boot_queue")
    public void ListenerQueue(Message message){
        System.out.println(message);
    }
}