当先锋百科网

首页 1 2 3 4 5 6 7

1. 目标

本次使用NodeJs创建两个应用,一个应用负责发送消息,一个应用负责接收消息。 案例来源于官方文档 本例使用 NodeJS v12.17.0(x64)

2. 创建NodeJS项目

cd g:\
mkdir node_rabbitmq
cd node_rabbitmq
npm init -y
npm i amqplib -S

NodeJS 使用 amqplib库 完成与RabbitMQ服务之间的通信

3. 发送方(生产者)

在根目录下创建 send.js

  1. 引入 amqplib/callback_api 库
  2. 准备连接信息,包含 RabbitMQ服务器IP,端口,用户名,密码,virtual host
  3. 建立连接
  4. 连接成功后创建 channel
  5. channel 创建成功后,使用channel声明 队列,然后发送数据。
  6. 发送的数据是存放在NodeJS的 Buffer对象中的。
const amqp = require('amqplib/callback_api')

const serverConfig = {
  protocol: 'amqp',
  hostname: '192.168.1.201',
  port: 5672,
  username: 'rabbitmq',
  password: 'rabbitmq',
  vhost: '/electron'
}
// 与服务器连接成功后,执行回调
amqp.connect(serverConfig, (e, connection) => {
  if (e) {
    console.log('create connection error:', e)
    throw e
  }
  // 创建channel,创建成功后,执行回调
  connection.createChannel((e, channel) => {
    if (e) {
      console.log('create channel error:', e)
      throw error1
    }
    // 队列名称
    var queue = 'mq_hello'
    // 要发送的消息
    var msg = 'Hello World!'
    // 声明队列
    channel.assertQueue(queue, {
      durable: false // 队列不进行持久化
    })
    // 发送消息
    channel.sendToQueue(queue, Buffer.from(msg))
    console.log(' [x] Sent %s,按 ctrl+c 退出程序', msg)
  })
})
process.openStdin()

4. 接收方(消费者)

在根目录下创建 receive.js

  1. 引入 amqplib/callback_api 库

  2. 准备连接信息,包含 RabbitMQ服务器IP,端口,用户名,密码,virtual host

  3. 建立连接

  4. 连接成功后创建 channel

  5. channel 创建成功后,使用channel声明 队列

  6. 调用channel 的consume 方法开始消费队列,这个方法会阻塞整个应用,因为它要持续监听服务器上的队列。

const amqp = require('amqplib/callback_api')

const serverConfig = {
  protocol: 'amqp',
  hostname: '192.168.1.201',
  port: 5672,
  username: 'rabbitmq',
  password: 'rabbitmq',
  vhost: '/electron'
}
// 与服务器连接成功后,执行回调
amqp.connect(serverConfig, (e, connection) => {
  if (e) {
    console.log('create connection error:', e)
    throw e
  }
  // 创建channel,创建成功后,执行回调
  connection.createChannel((e, channel) => {
    if (e) {
      console.log('create channel error:', e)
      throw error1
    }
    // 队列名称
    var queue = 'mq_hello'
    // 要发送的消息
    var msg = 'Hello World!'
    // 声明队列
    channel.assertQueue(queue, {
      durable: false // 队列不进行持久化
    })

    console.log(' [*] Waiting for messages in %s. To exit press CTRL+C', queue)
    // 开始消费队列,持续监听队列
    channel.consume(
      queue,
      function (msg) {
        console.log(' [x] Received %s', msg.content.toString())
      },
      {
        noAck: true
      }
    )
  })
})

5. 运行测试

# 打开一个控制台, 运行 receive.js 接收信息
cd g:\node_rabbitmq
node receive.js
# 打开另外一个控制台,运行 send.js 发送数据
cd g:\node_rabbitmq
node send.js
# 以下是输出
[x] Sent Hello World!,按 ctrl+c 退出程序

同时接收方控制台将会收到消息:

 [x] Received Hello World!

6. 使用基于promise 的API

上面接收方和发送方所使用的API都是基于 callback方式的,amqplib 库也提供了基于 promise 的调用方式

  • const amqp = require(‘amqplib/callback_api’) 这是基于 callback的api调用方式
  • const amqp = require(‘amqplib’) 这是基于promise的api 调用方式

创建连接,创建channel,声明 queue 在消息发送和消息接收上都有相同的代码,这里将它们提取出来放在一个公共的文件中:

rabbitmq.js

// 采用 promise 形式的API
const amqp = require('amqplib')

const serverConfig = {
  protocol: 'amqp',
  hostname: '192.168.1.201',
  port: 5672,
  username: 'qinye',
  password: 'qinye',
  vhost: '/electron'
}
let queueName = 'mq_hello'

let connPromise = amqp.connect(serverConfig)
let channel = new Promise((resolve, reject) => {
  connPromise.then(
    conn => {
      conn.createChannel().then(
        ch => {
          ch.assertQueue(queueName, {
            durable: false // 队列不进行持久化
          })
          resolve(ch)
        },
        err => {
          reject(err)
        }
      )
    },
    err => {
      reject(err)
    }
  )
})
// 导出channel 和队列名称
module.exports = {
  queueName,
  channel
}

send.js

const { channel, queueName } = require('./rabbitmq')

channel.then(ch => {
  var msg = 'Hello World!'
  ch.sendToQueue(queueName, Buffer.from(msg))
  console.log(' [x] Sent %s,按 ctrl+c 退出程序', msg)
})
process.openStdin()

receive.js

const { channel, queueName } = require('./rabbitmq')
channel.then(ch => {
  ch.consume(
    queueName,
    function (msg) {
      console.log(' [x] Received %s', msg.content.toString())
    },
    {
      noAck: true
    }
  )
})