美文网首页
node下如何使用rabbitmq?

node下如何使用rabbitmq?

作者: 姜治宇 | 来源:发表于2022-06-15 16:59 被阅读0次

安装

首先在docker下安装mq

docker run -d --name my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management

启动后,确保http://localhost:15672可以访问到。
如果无法访问,可以执行如下命令:

docker exec -it rabbit bash
rabbitmq-plugins enable rabbitmq_management

然后安装node插件。

npm install amqplib -S

config

config.js:

const config = {
    url:'amqp://guest:guest@localhost',
    exchange: 'test',
    routingKeys: ['info', 'error'],

}
module.exports = config;

生产者

publisher.js

const amqp = require('amqplib/callback_api');
const config = require('./config');

publisher();
//生产者
async function publisher() {

    const conn = await initConn(config.url);
    const channel = await createChannel(conn);

    channel.assertExchange(config.exchange, 'direct', { durable: false });
    //发送消息
    channel.publish(config.exchange, config.routingKeys[1], Buffer.from('这是个错误'));
   
    channel.publish(config.exchange, config.routingKeys[0], Buffer.from('这是一条普通消息'));
    await sleep(1);
    await conn.close();

}

function initConn(url) {
    return new Promise((resolve, reject) => {
        amqp.connect(url, (error, conn) => {

            if (error) {
                reject(error);
            }

            resolve(conn);
        })
    });

}
function createChannel(conn) {
    return new Promise((resolve, reject) => {
        conn.createChannel((error, channel) => {
            if (error) {
                reject(error);
            }

            resolve(channel);
        })
    });
}
function sleep(time) {
    return new Promise(resolve => {
        setTimeout(resolve, time * 1000);
    });
}

消费者

customer.js

const amqp = require('amqplib/callback_api');
const config = require('./config');
//消费者
customer1();
customer2();
async function customer1() {
    const conn = await initConn(config.url);
    const channel = await createChannel(conn);
    
    channel.assertExchange(config.exchange, 'direct', { durable: false });

    //消费者1
    let c1 = await channel.assertQueue('c1');
    //c1绑定2种routingKeys消息队列
    channel.bindQueue(c1.queue, config.exchange, config.routingKeys[0]);
    channel.bindQueue(c1.queue, config.exchange, config.routingKeys[1]);
    //开始消费
    channel.consume(c1.queue, data => {
        console.log('消费者1', data.content.toString());
    }, { noAck: true });
    
}
async function customer2() {
    const conn = await initConn(config.url);
    const channel = await createChannel(conn);
    
    channel.assertExchange(config.exchange, 'direct', { durable: false });

    //消费者2
    let c2 = await channel.assertQueue('c2');
    //c2绑定1种routingKeys消息队列
    channel.bindQueue(c2.queue, config.exchange, config.routingKeys[1]);
    //开始消费
    channel.consume(c2.queue, data => {
        console.log('消费者2', data.content.toString());
    }, { noAck: true });
    
}

function initConn(url) {
    return new Promise((resolve, reject) => {
        amqp.connect(url, (error, conn) => {

            if (error) {
                reject(error);
            }

            resolve(conn);
        })
    });

}
function createChannel(conn) {
    return new Promise((resolve, reject) => {
        conn.createChannel((error, channel) => {
            if (error) {
                reject(error);
            }

            resolve(channel);
        })
    });
}

运行结果

1.png

相关文章

网友评论

      本文标题:node下如何使用rabbitmq?

      本文链接:https://www.haomeiwen.com/subject/jemxvrtx.html