安装
首先在docker下安装mq
docker run -d --name my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
启动后,确保http://localhost:15672可以访问到。
如果无法访问,可以执行如下命令:
docker exec -it rabbit bash
rabbitmq-plugins enable rabbitmq_management
然后安装node插件。
npm install amqplib -S
config
config.js:
const config = {
url:'amqp://guest:guest@localhost',
exchange: 'test',
routingKeys: ['info', 'error'],
}
module.exports = config;
生产者
publisher.js
const amqp = require('amqplib/callback_api');
const config = require('./config');
publisher();
//生产者
async function publisher() {
const conn = await initConn(config.url);
const channel = await createChannel(conn);
channel.assertExchange(config.exchange, 'direct', { durable: false });
//发送消息
channel.publish(config.exchange, config.routingKeys[1], Buffer.from('这是个错误'));
channel.publish(config.exchange, config.routingKeys[0], Buffer.from('这是一条普通消息'));
await sleep(1);
await conn.close();
}
function initConn(url) {
return new Promise((resolve, reject) => {
amqp.connect(url, (error, conn) => {
if (error) {
reject(error);
}
resolve(conn);
})
});
}
function createChannel(conn) {
return new Promise((resolve, reject) => {
conn.createChannel((error, channel) => {
if (error) {
reject(error);
}
resolve(channel);
})
});
}
function sleep(time) {
return new Promise(resolve => {
setTimeout(resolve, time * 1000);
});
}
消费者
customer.js
const amqp = require('amqplib/callback_api');
const config = require('./config');
//消费者
customer1();
customer2();
async function customer1() {
const conn = await initConn(config.url);
const channel = await createChannel(conn);
channel.assertExchange(config.exchange, 'direct', { durable: false });
//消费者1
let c1 = await channel.assertQueue('c1');
//c1绑定2种routingKeys消息队列
channel.bindQueue(c1.queue, config.exchange, config.routingKeys[0]);
channel.bindQueue(c1.queue, config.exchange, config.routingKeys[1]);
//开始消费
channel.consume(c1.queue, data => {
console.log('消费者1', data.content.toString());
}, { noAck: true });
}
async function customer2() {
const conn = await initConn(config.url);
const channel = await createChannel(conn);
channel.assertExchange(config.exchange, 'direct', { durable: false });
//消费者2
let c2 = await channel.assertQueue('c2');
//c2绑定1种routingKeys消息队列
channel.bindQueue(c2.queue, config.exchange, config.routingKeys[1]);
//开始消费
channel.consume(c2.queue, data => {
console.log('消费者2', data.content.toString());
}, { noAck: true });
}
function initConn(url) {
return new Promise((resolve, reject) => {
amqp.connect(url, (error, conn) => {
if (error) {
reject(error);
}
resolve(conn);
})
});
}
function createChannel(conn) {
return new Promise((resolve, reject) => {
conn.createChannel((error, channel) => {
if (error) {
reject(error);
}
resolve(channel);
})
});
}
运行结果
![](https://img.haomeiwen.com/i7455247/6e801185f445ca28.png)
网友评论