美文网首页
rabbitMQ+redis+socket.io实现消息推送

rabbitMQ+redis+socket.io实现消息推送

作者: web_柚子小记 | 来源:发表于2020-03-06 16:15 被阅读0次

    用pm2起一个nodejs服务,安装amqp-connection-manager,ioredis:
    1)建立amqp连接,创建channels,绑定queue,并消费:

    const amqp = require('amqp-connection-manager');
    const amqpConfig = {
      "amqp": {
        "cluster": [
          "amqp://用户名:密码@服务器ip:5672"
        ],
        "exchange": "****"
      }
    }
    amqp.connect(amqpConfig.amqp.cluster, { json: true })
            .createChannel({ setup: channelSetup })
            .waitForConnect().then(() => {
                console.log("MQ client is listening for messages.");
            }).catch(err => {
                console.log("MQ client setup failed.", err);
            });
    
    function channelSetup(ch) {
            return ch.assertExchange(Config.amqp.exchange, 'direct', { durable: true })
                .then(function (exchg) {
                    //消息类型
                    var qp_new_task = `push.queue.task.new`;
                    var qks = [
                        { "queue": qp_new_task, "key": qp_new_task, "durable": true }
                    ];
    
                    _.each(qks, function (qk) {
                        ch.assertQueue(qk.queue, { durable: qk.durable }).then(function (qok) {
                            return qok.queue;
                        }).then(function (queue) {
                            return ch.bindQueue(queue, exchg.exchange, qk.key).then(function () {
                                return queue;
                            });
                        }).then(function (queue) {
                            console.log("*** Message Queue [" + queue + "] ready");
    
                            return ch.consume(queue, function (msg) {
                                console.log('[receive message from amqp]')
                            }, { noAck: true });
                        });
                    });
                });
        }
    
    如果amqp连接成功,会在amqp的管理界面看到如下节点信息: image.png

    2)建立redis连接:

    const Redis = require('ioredis');
    function createRedisClient(){
    const config = {
    "redis": {
       "password": "***",
       "cluster": [
         {
           "host": "192.168.0.0",
           "port": 1234
         },
         {
           "host": "192.168.0.1",
           "port": 1234
         }
       ]
     }}
    //建立单个redis service连接:
    return new Redis({
         host: config.redis.cluster[0].host,
         port: config.redis.cluster[0].port,
         password: config.redis.password
    });
    //建立多个redis service连接:
    //new Redis.Cluster(config.redis.cluster, {
    //    redisOptions: {
    //      password: config.redis.password
    //    }
    //});
    }
    //向redis注册用户信息:
    createRedisClient().sadd(userId, userToken);
    

    3)socket.io的存在是桥梁作用,socket.io分为socket.io和socket.io-client。在web端实例化socket.io-client,用户登录后,获取到用户信息和token,推送给nodejs服务中的socket.io。socket.io接收到用户信息后在redis中注册。当amqp推送消息过来时,nodejs服务会消费到消息队列中的这条消息,然后找到对应的userId,利用socket.io推送给socket.io-client,从而实现聊天系统中的消息推送。

    实现效果: image.png image.png

    采坑记录:
    首先检查服务器上rabbitMQ和redis的端口是否正常访问
    -- 检查方法:linux和centos:telnet 10.20.66.37 5672
    macos: nc -vz -w 2 10.20.66.37 5672

    -- 如果rabbitMQ和redis使用了docker容器,需配置rabbitMQ和redis端口的对外映射。打开docker_compose,配置对外映射: image.png

    补充说明:

    exchange和消息类型的定义可以在amqp的管理界面中添加: image.png image.png

    --------补充更新-------
    当我们在聊天系统中发送消息时,首先会调用后台的api,后台api会将消息放到amqp的消息队列中,其次,每个连接amqp的nodejs服务相当于一个消费者,消息队列中一旦有消息会被消费者消费。假设当前有两个nodejs服务同时在消费,当amqp消息队列中有消息产生时,如果被其中一个nodejs服务消费掉,则另外一个nodejs将消费不到这条消息。换句话说,当有多个nodejs存在时,只会有一个nodejs接收到amqp发来的消息

    后续会整理个demo出来,更不动了。。

    相关文章

      网友评论

          本文标题:rabbitMQ+redis+socket.io实现消息推送

          本文链接:https://www.haomeiwen.com/subject/vkmirhtx.html