美文网首页
rabbitMQ+thrift传送消息

rabbitMQ+thrift传送消息

作者: kangkangz4 | 来源:发表于2017-05-18 15:12 被阅读558次

Apache thrift是一个开源的RPC框架,看到跟protocol buffer一样也适用多种语言,就想着用rabbitMQ来处理thrift的消息,因为thrift字节比pb更少,可以适用于大量传送数据的场景,例如,每个消息10K,传送100条这样的消息,就是10*100=1M,但用thrift可以压缩40%,这里数据就少得很可观了。
这里就记录一下,我的使用过程:
首先定义一个Message.thrift

struct Message {
    1: i32 messageid,
    2: string message
}

然后用thrift生成相应的js文件

thrift --gen js:node Message.thrift

这里生成了一个Message_types.js文件
这就是我们用来序列化数据的文件
好了,我们再用node写一个rabbitMQ的发送文件
这里我们就叫send.js

var thrift = require('thrift');
var Message = require('./gen-nodejs/Message_types').Message;
var amqp = require('amqplib/callback_api');

var transport = new thrift.TBufferedTransport();
var protocol = new thrift.TBinaryProtocol(transport);

var AMPQ_URI = 'amqp://localhost:5672';

amqp.connect(AMPQ_URI, function(err, conn){
    conn.createChannel(function(err, ch){
        var q = 'hello';

        var buf = obj2buf({messageid:1,message:'{message:"1234"}'});

        ch.assertQueue(q, {durable: false});
        ch.sendToQueue(q, buf);
        console.log(" [x] Send Data Finish");
    });
    setTimeout(function(){
        conn.close();
        process.exit(0);
    }, 500);
})

/**
 * 将对象转换成buffer
 * @param  {[type]} obj [description]
 * @return {[type]}     [description]
 */
var obj2buf = function(obj){
    var message = new Message(obj);
    message.write(protocol);
    var outBuffers = transport.outBuffers;
    var outCount = transport.outCount;
    var result = new Buffer(outCount);
    var pos = 0;
    outBuffers.forEach(function(buf) {
      buf.copy(result, pos, 0);
      pos += buf.length;
    });
    return result;
}

其中obj2buf就是thrift将数据转换成buffer的方法,别问我怎么得来的,我也是从网上找的,但这个方法能用,自己亲测
我们再写一个receiver.js,这个方法是用来处理rabbitMQ消息的

var thrift = require('thrift');
var Message = require('./gen-nodejs/Message_types').Message;
var amqp = require('amqplib/callback_api');

var transport = new thrift.TBufferedTransport();
var protocol = new thrift.TBinaryProtocol(transport);

var AMQP_URI = 'amqp://localhost:5672';

amqp.connect(AMQP_URI, function(err, conn){
    conn.createChannel(function(err, ch){
        var q = 'hello';

        ch.assertQueue(q, {durable: false});
        console.log('[*] Waiting for message in %s. To exit press CTRL+C', q);
        ch.consume(q, function(msg){
            // console.log(msg);

            var message = buf2obj(msg.content);
            
            console.log(message);
            console.log('[x] Received Data Finish');
        }, {noAck: true});
    })
})

/**
 * 将buffer转换成对象
 * @param  {[type]} buffer [description]
 * @return {[type]}     [description]
 */
var buf2obj = function(buffer){
    var data = buffer;
    data.copy(transport.inBuf, transport.writeCursor, 0);
    transport.writeCursor += data.length;
    var message = new Message();
    message.read(protocol);
    return message;
}

这里的buf2obj就是将buffer转换成对象,
rabbitMQ里面传送消息都是以buffer类型。
好了,我们可以先跑

node receiver.js
屏幕快照 2017-05-18 下午3.10.07.png

再开一个窗口运行

node send.js
屏幕快照 2017-05-18 下午3.10.18.png

相关文章

  • rabbitMQ+thrift传送消息

    Apache thrift是一个开源的RPC框架,看到跟protocol buffer一样也适用多种语言,就想着用...

  • rabbitMQ+protocol buffers传送消息

    前面一篇文章写的是rabbitMQ+thrift传送消息,这里我也写一下用pb来传送消息的方法吧首先我们定义一个d...

  • 4.JMS的规范(二)

    5. 消息的持久化 什么是持久化消息? 保证消息只被传送一次和成功使用一次。在持久性消息传送至目标时,消息服务将其...

  • JMS消息确认和事务

    保证消息传送 保证消息传送有3个主要部分:消息自主性,存储并转发以及底层消息确认,下面具体看一下这些概念; 1.消...

  • 大数据开发:Kafka 异步发送消息可靠性策略

    Kafka作为分布式消息队列,对于消息传送有实时传送和异步发送的区别,而异步发送消息,就会涉及到一个很关键的问题,...

  • 2019-04-19

    数字基带传输 通信的目的是传送消息;信息的传输与交换 信息:表示和传送的对象(本质) 消息:信息的载体(形式)语言...

  • 消息队列概述

    一、什么是消息队列(WHAT ) 1.1 消息与消息队列 消息(Message)是指在应用间传送的数据。消息可以非...

  • RabbitMQ-1使用

    一. RabbitMQ使用 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只...

  • H5跨文档消息传送

    由于同源策略的限制,JavaScript 跨域的问题,一直是一个颇为棘手的问题。但是,HTML5 提供了在网页文档...

  • 什么是消息中间件,为什么要使用?

    先理解 消息 ,消息队列 这两个概念,才能更好的理解什么是 消息中间件 消息(message):指在服务之间传送的...

网友评论

      本文标题:rabbitMQ+thrift传送消息

      本文链接:https://www.haomeiwen.com/subject/vhusxxtx.html