美文网首页
linux下使用hiredis异步API实现sub/pub消息订

linux下使用hiredis异步API实现sub/pub消息订

作者: 安妮小子被占用了 | 来源:发表于2017-10-20 18:49 被阅读0次

    转载自:http://blog.csdn.net/chenzba/article/details/51224715

    最近使用redis的c接口——hiredis,使客户端与redis服务器通信,实现消息订阅和发布(PUB/SUB)的功能,我把遇到的一些问题和解决方法列出来供大家学习。

    废话不多说,先贴代码。

    redis_publisher.h

    [cpp]view plaincopy

    /*************************************************************************

    > File Name: redis_publisher.h

    > Author: chenzengba

    > Mail: chenzengba@gmail.com

    > Created Time: Sat 23 Apr 2016 10:15:09 PM CST

    > Description: 封装hiredis,实现消息发布给redis功能

    ************************************************************************/

    #ifndef REDIS_PUBLISHER_H

    #define REDIS_PUBLISHER_H

    #include 

    #include 

    #include 

    #include 

    #include 

    #include 

    #include 

    #include 

    #include 

    classCRedisPublisher

    {

    public:

    CRedisPublisher();

    ~CRedisPublisher();

    boolinit();

    booluninit();

    boolconnect();

    booldisconnect();

    boolpublish(conststd::string &channel_name,

    conststd::string &message);

    private:

    // 下面三个回调函数供redis服务调用

    // 连接回调

    staticvoidconnect_callback(constredisAsyncContext *redis_context,

    intstatus);

    // 断开连接的回调

    staticvoiddisconnect_callback(constredisAsyncContext *redis_context,

    intstatus);

    // 执行命令回调

    staticvoidcommand_callback(redisAsyncContext *redis_context,

    void*reply,void*privdata);

    // 事件分发线程函数

    staticvoid*event_thread(void*data);

    void*event_proc();

    private:

    // libevent事件对象

    event_base *_event_base;

    // 事件线程ID

    pthread_t _event_thread;

    // 事件线程的信号量

    sem_t _event_sem;

    // hiredis异步对象

    redisAsyncContext *_redis_context;

    };

    #endif

    redis_publisher.cpp

    [cpp]view plaincopy

    /*************************************************************************

    > File Name: redis_publisher.cpp

    > Author: chenzengba

    > Mail: chenzengba@gmail.com

    > Created Time: Sat 23 Apr 2016 10:15:09 PM CST

    > Description:

    ************************************************************************/

    #include 

    #include 

    #include 

    #include "redis_publisher.h"

    CRedisPublisher::CRedisPublisher():_event_base(0), _event_thread(0),

    _redis_context(0)

    {

    }

    CRedisPublisher::~CRedisPublisher()

    {

    }

    boolCRedisPublisher::init()

    {

    // initialize the event

    _event_base = event_base_new();// 创建libevent对象

    if(NULL == _event_base)

    {

    printf(": Create redis event failed.\n");

    returnfalse;

    }

    memset(&_event_sem, 0,sizeof(_event_sem));

    intret = sem_init(&_event_sem, 0, 0);

    if(ret != 0)

    {

    printf(": Init sem failed.\n");

    returnfalse;

    }

    returntrue;

    }

    boolCRedisPublisher::uninit()

    {

    _event_base = NULL;

    sem_destroy(&_event_sem);

    returntrue;

    }

    boolCRedisPublisher::connect()

    {

    // connect redis

    _redis_context = redisAsyncConnect("127.0.0.1", 6379);// 异步连接到redis服务器上,使用默认端口

    if(NULL == _redis_context)

    {

    printf(": Connect redis failed.\n");

    returnfalse;

    }

    if(_redis_context->err)

    {

    printf(": Connect redis error: %d, %s\n",

    _redis_context->err, _redis_context->errstr);// 输出错误信息

    returnfalse;

    }

    // attach the event

    redisLibeventAttach(_redis_context, _event_base);// 将事件绑定到redis context上,使设置给redis的回调跟事件关联

    // 创建事件处理线程

    intret = pthread_create(&_event_thread, 0, &CRedisPublisher::event_thread,this);

    if(ret != 0)

    {

    printf(": create event thread failed.\n");

    disconnect();

    returnfalse;

    }

    // 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态

    redisAsyncSetConnectCallback(_redis_context,

    &CRedisPublisher::connect_callback);

    // 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连

    redisAsyncSetDisconnectCallback(_redis_context,

    &CRedisPublisher::disconnect_callback);

    // 启动事件线程

    sem_post(&_event_sem);

    returntrue;

    }

    boolCRedisPublisher::disconnect()

    {

    if(_redis_context)

    {

    redisAsyncDisconnect(_redis_context);

    redisAsyncFree(_redis_context);

    _redis_context = NULL;

    }

    returntrue;

    }

    boolCRedisPublisher::publish(conststd::string &channel_name,

    conststd::string &message)

    {

    intret = redisAsyncCommand(_redis_context,

    &CRedisPublisher::command_callback,this,"PUBLISH %s %s",

    channel_name.c_str(), message.c_str());

    if(REDIS_ERR == ret)

    {

    printf("Publish command failed: %d\n", ret);

    returnfalse;

    }

    returntrue;

    }

    voidCRedisPublisher::connect_callback(constredisAsyncContext *redis_context,

    intstatus)

    {

    if(status != REDIS_OK)

    {

    printf(": Error: %s\n", redis_context->errstr);

    }

    else

    {

    printf(": Redis connected!\n");

    }

    }

    voidCRedisPublisher::disconnect_callback(

    constredisAsyncContext *redis_context,intstatus)

    {

    if(status != REDIS_OK)

    {

    // 这里异常退出,可以尝试重连

    printf(": Error: %s\n", redis_context->errstr);

    }

    }

    // 消息接收回调函数

    voidCRedisPublisher::command_callback(redisAsyncContext *redis_context,

    void*reply,void*privdata)

    {

    printf("command callback.\n");

    // 这里不执行任何操作

    }

    void*CRedisPublisher::event_thread(void*data)

    {

    if(NULL == data)

    {

    printf(": Error!\n");

    assert(false);

    returnNULL;

    }

    CRedisPublisher *self_this =reinterpret_cast(data);

    returnself_this->event_proc();

    }

    void*CRedisPublisher::event_proc()

    {

    sem_wait(&_event_sem);

    // 开启事件分发,event_base_dispatch会阻塞

    event_base_dispatch(_event_base);

    returnNULL;

    }

    redis_subscriber.h

    [cpp]view plaincopy

    /*************************************************************************

    > File Name: redis_subscriber.h

    > Author: chenzengba

    > Mail: chenzengba@gmail.com

    > Created Time: Sat 23 Apr 2016 10:15:09 PM CST

    > Description: 封装hiredis,实现消息订阅redis功能

    ************************************************************************/

    #ifndef REDIS_SUBSCRIBER_H

    #define REDIS_SUBSCRIBER_H

    #include 

    #include 

    #include 

    #include 

    #include 

    #include 

    #include 

    #include 

    #include 

    classCRedisSubscriber

    {

    public:

    typedef  std::tr1::function <void(const char*,const char*,int)> NotifyMessageFn;// 回调函数对象类型,当接收到消息后调用回调把消息发送出去

    CRedisSubscriber();

    ~CRedisSubscriber();

    boolinit(const NotifyMessageFn &fn);// 传入回调对象

    bool uninit();

    bool connect();

    bool disconnect();

    // 可以多次调用,订阅多个频道

    bool subscribe(conststd::string &channel_name);

    private:

    // 下面三个回调函数供redis服务调用

    // 连接回调

    static void connect_callback(constredisAsyncContext *redis_context,

    intstatus);

    // 断开连接的回调

    staticvoiddisconnect_callback(constredisAsyncContext *redis_context,

    intstatus);

    // 执行命令回调

    staticvoidcommand_callback(redisAsyncContext *redis_context,

    void*reply,void*privdata);

    // 事件分发线程函数

    staticvoid*event_thread(void*data);

    void*event_proc();

    private:

    // libevent事件对象

    event_base *_event_base;

    // 事件线程ID

    pthread_t _event_thread;

    // 事件线程的信号量

    sem_t _event_sem;

    // hiredis异步对象

    redisAsyncContext *_redis_context;

    // 通知外层的回调函数对象

    NotifyMessageFn _notify_message_fn;

    };

    #endif

    redis_subscriber.cpp:

    [cpp]view plaincopy

    /*************************************************************************

    > File Name: redis_subscriber.cpp

    > Author: chenzengba

    > Mail: chenzengba@gmail.com

    > Created Time: Sat 23 Apr 2016 10:15:09 PM CST

    > Description:

    ************************************************************************/

    #include 

    #include 

    #include 

    #include "redis_subscriber.h"

    CRedisSubscriber::CRedisSubscriber():_event_base(0), _event_thread(0),

    _redis_context(0)

    {

    }

    CRedisSubscriber::~CRedisSubscriber()

    {

    }

    boolCRedisSubscriber::init(constNotifyMessageFn &fn)

    {

    // initialize the event

    _notify_message_fn = fn;

    _event_base = event_base_new();// 创建libevent对象

    if(NULL == _event_base)

    {

    printf(": Create redis event failed.\n");

    returnfalse;

    }

    memset(&_event_sem, 0,sizeof(_event_sem));

    intret = sem_init(&_event_sem, 0, 0);

    if(ret != 0)

    {

    printf(": Init sem failed.\n");

    returnfalse;

    }

    returntrue;

    }

    boolCRedisSubscriber::uninit()

    {

    _event_base = NULL;

    sem_destroy(&_event_sem);

    returntrue;

    }

    boolCRedisSubscriber::connect()

    {

    // connect redis

    _redis_context = redisAsyncConnect("127.0.0.1", 6379);// 异步连接到redis服务器上,使用默认端口

    if(NULL == _redis_context)

    {

    printf(": Connect redis failed.\n");

    returnfalse;

    }

    if(_redis_context->err)

    {

    printf(": Connect redis error: %d, %s\n",

    _redis_context->err, _redis_context->errstr);// 输出错误信息

    returnfalse;

    }

    // attach the event

    redisLibeventAttach(_redis_context, _event_base);// 将事件绑定到redis context上,使设置给redis的回调跟事件关联

    // 创建事件处理线程

    intret = pthread_create(&_event_thread, 0, &CRedisSubscriber::event_thread,this);

    if(ret != 0)

    {

    printf(": create event thread failed.\n");

    disconnect();

    returnfalse;

    }

    // 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态

    redisAsyncSetConnectCallback(_redis_context,

    &CRedisSubscriber::connect_callback);

    // 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连

    redisAsyncSetDisconnectCallback(_redis_context,

    &CRedisSubscriber::disconnect_callback);

    // 启动事件线程

    sem_post(&_event_sem);

    returntrue;

    }

    boolCRedisSubscriber::disconnect()

    {

    if(_redis_context)

    {

    redisAsyncDisconnect(_redis_context);

    redisAsyncFree(_redis_context);

    _redis_context = NULL;

    }

    returntrue;

    }

    boolCRedisSubscriber::subscribe(conststd::string &channel_name)

    {

    intret = redisAsyncCommand(_redis_context,

    &CRedisSubscriber::command_callback,this,"SUBSCRIBE %s",

    channel_name.c_str());

    if(REDIS_ERR == ret)

    {

    printf("Subscribe command failed: %d\n", ret);

    returnfalse;

    }

    printf(": Subscribe success: %s\n", channel_name.c_str());

    returntrue;

    }

    voidCRedisSubscriber::connect_callback(constredisAsyncContext *redis_context,

    intstatus)

    {

    if(status != REDIS_OK)

    {

    printf(": Error: %s\n", redis_context->errstr);

    }

    else

    {

    printf(": Redis connected!");

    }

    }

    voidCRedisSubscriber::disconnect_callback(

    constredisAsyncContext *redis_context,intstatus)

    {

    if(status != REDIS_OK)

    {

    // 这里异常退出,可以尝试重连

    printf(": Error: %s\n", redis_context->errstr);

    }

    }

    // 消息接收回调函数

    voidCRedisSubscriber::command_callback(redisAsyncContext *redis_context,

    void*reply,void*privdata)

    {

    if(NULL == reply || NULL == privdata) {

    return;

    }

    // 静态函数中,要使用类的成员变量,把当前的this指针传进来,用this指针间接访问

    CRedisSubscriber *self_this =reinterpret_cast(privdata);

    redisReply *redis_reply =reinterpret_cast(reply);

    // 订阅接收到的消息是一个带三元素的数组

    if(redis_reply->type == REDIS_REPLY_ARRAY &&

    redis_reply->elements == 3)

    {

    printf(": Recieve message:%s:%d:%s:%d:%s:%d\n",

    redis_reply->element[0]->str, redis_reply->element[0]->len,

    redis_reply->element[1]->str, redis_reply->element[1]->len,

    redis_reply->element[2]->str, redis_reply->element[2]->len);

    // 调用函数对象把消息通知给外层

    self_this->_notify_message_fn(redis_reply->element[1]->str,

    redis_reply->element[2]->str, redis_reply->element[2]->len);

    }

    }

    void*CRedisSubscriber::event_thread(void*data)

    {

    if(NULL == data)

    {

    printf(": Error!\n");

    assert(false);

    returnNULL;

    }

    CRedisSubscriber *self_this =reinterpret_cast(data);

    returnself_this->event_proc();

    }

    void*CRedisSubscriber::event_proc()

    {

    sem_wait(&_event_sem);

    // 开启事件分发,event_base_dispatch会阻塞

    event_base_dispatch(_event_base);

    returnNULL;

    }

    问题1:hiredis官网没有异步接口的实现例子。

    hiredis提供了几个异步通信的API,一开始根据API名字的理解,我们实现了跟redis服务器建立连接、订阅和发布的功能,可在实际使用的时候,程序并没有像我们预想的那样,除了能够建立连接外,任何事情都没发生。

    网上查了很多资料,原来hiredis的异步实现是通过事件来分发redis发送过来的消息的,hiredis可以使用libae、libev、libuv和libevent中的任何一个实现事件的分发,网上的资料提示使用libae、libev和libuv可能发生其他问题,这里为了方便就选用libevent。hireds官网并没有对libevent做任何介绍,也没用说明使用异步机制需要引入事件的接口,所以一开始走了很多弯路。

    关于libevent的使用这里就不再赘述,详情可以见libevent官网。

    libevent官网:http://libevent.org/

    libevent api文档:https://www.monkey.org/~provos/libevent/doxygen-2.0.1/include_2event2_2event_8h.html#6e9827de8c3014417b11b48f2fe688ae

    CRedisPublisher和CRedisSubscriber的初始化过程:

    初始化事件处理,并获得事件处理的实例:

    [cpp]view plaincopy

    _event_base = event_base_new();

    在获得redisAsyncContext *之后,调用

    [cpp]view plaincopy

    redisLibeventAttach(_redis_context, _event_base);

    这样就将事件处理和redis关联起来,最后在另一个线程调用

    [cpp]view plaincopy

    event_base_dispatch(_event_base);

    启动事件的分发,这是一个阻塞函数,因此,创建了一个新的线程处理事件分发,值得注意的是,这里用信号灯_event_sem控制线程的启动,意在程序调用

    [cpp]view plaincopy

    redisAsyncSetConnectCallback(_redis_context,

    &CRedisSubscriber::connect_callback);

    redisAsyncSetDisconnectCallback(_redis_context,

    &CRedisSubscriber::disconnect_callback);

    之后,能够完全捕捉到这两个回调。

    问题2 奇特的‘ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context’错误

    有些人会觉得这两个类设计有点冗余,我们发现CRedisPublisher和CRedisSubscriber很多逻辑是一样的,为什么不把他们整合到一起成一个类,既能够发布消息也能够订阅消息。其实一开始我就是这么干的,在使用的时候发现,用同个redisAsynContex *对象进行消息订阅和发布,与redis服务连接会自动断开,disconnect_callback回调会被调用,并且返回奇怪的错误:ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context,因此,不能使用同个redisAsyncContext *对象实现发布和订阅。这里为了减少设计的复杂性,就将两个类的逻辑分开了。

    当然,你也可以将相同的逻辑抽象到一个基类里,并实现publish和subscribe接口。

    问题3 相关依赖的库

    编译之前,需要安装hiredis、libevent和boost库,我是用的是Ubuntu x64系统。

    hiredis官网:https://github.com/redis/hiredis

    下载源码解压,进入解压目录,执行make && make install命令。

    libevent官网:http://libevent.org/下载最新的稳定版

    解压后进入解压目录,执行命令

    ./configure -prefix=/usr

    sudo make && make install

    boost库:直接执行安装:sudo apt-get install libboost-dev

    如果你不是用std::tr1::function的函数对象来给外层通知消息,就不需要boost库。你可以用接口的形式实现回调,把接口传给CRedisSubscribe类,让它在接收到消息后调用接口回调,通知外层。

    问题4 如何使用

    最后贴出例子代码。

    publisher.cpp,实现发布消息:

    [cpp]view plaincopy

    /*************************************************************************

    > File Name: publisher.cpp

    > Author: chenzengba

    > Mail: chenzengba@gmail.com

    > Created Time: Sat 23 Apr 2016 12:13:24 PM CST

    ************************************************************************/

    #include "redis_publisher.h"

    intmain(intargc,char*argv[])

    {

    CRedisPublisher publisher;

    boolret = publisher.init();

    if(!ret)

    {

    printf("Init failed.\n");

    return0;

    }

    ret = publisher.connect();

    if(!ret)

    {

    printf("connect failed.");

    return0;

    }

    while(true)

    {

    publisher.publish("test-channel","Test message");

    sleep(1);

    }

    publisher.disconnect();

    publisher.uninit();

    return0;

    }

    subscriber.cpp实现订阅消息:

    [cpp]view plaincopy

    /*************************************************************************

    > File Name: subscriber.cpp

    > Author: chenzengba

    > Mail: chenzengba@gmail.com

    > Created Time: Sat 23 Apr 2016 12:26:42 PM CST

    ************************************************************************/

    #include "redis_subscriber.h"

    voidrecieve_message(constchar*channel_name,

    constchar*message,intlen)

    {

    printf("Recieve message:\n    channel name: %s\n    message: %s\n",

    channel_name, message);

    }

    intmain(intargc,char*argv[])

    {

    CRedisSubscriber subscriber;

    CRedisSubscriber::NotifyMessageFn fn =

    bind(recieve_message, std::tr1::placeholders::_1,

    std::tr1::placeholders::_2, std::tr1::placeholders::_3);

    boolret = subscriber.init(fn);

    if(!ret)

    {

    printf("Init failed.\n");

    return0;

    }

    ret = subscriber.connect();

    if(!ret)

    {

    printf("Connect failed.\n");

    return0;

    }

    subscriber.subscribe("test-channel");

    while(true)

    {

    sleep(1);

    }

    subscriber.disconnect();

    subscriber.uninit();

    return0;

    }

    关于编译的问题:在g++中编译,注意要加上-lhiredis -levent参数,下面是一个简单的Makefile:

    [cpp]view plaincopy

    EXE=server_main client_main

    CC=g++

    FLAG=-lhiredis -levent

    OBJ=redis_publisher.o publisher.o redis_subscriber.o subscriber.o

    all:$(EXE)

    $(EXE):$(OBJ)

    $(CC) -o publisher redis_publisher.o publisher.o $(FLAG)

    $(CC) -o subscriber redis_subscriber.o subscriber.o $(FLAG)

    redis_publisher.o:redis_publisher.h

    redis_subscriber.o:redis_subscriber.h

    publisher.o:publisher.cpp

    $(CC) -c publisher.cpp

    subscriber.o:subscriber.cpp

    $(CC) -c subscriber.cpp

    clean:

    rm publisher subscriber *.o

    致谢:

    redis异步API使用libevent:http://www.tuicool.com/articles/N73uuu

    相关文章

      网友评论

          本文标题:linux下使用hiredis异步API实现sub/pub消息订

          本文链接:https://www.haomeiwen.com/subject/minvuxtx.html