深入剖析WebRTC事件机制之Sigslot

作者: 音视频直播技术专家 | 来源:发表于2017-10-24 00:35 被阅读403次

    前言

    我最早了解到 sigslot 大概是在 2007年 左右,当时在QT中大量使用了 sigslot 的概念。 现在 WebRTC 中也大量使用了 sigslot 这种机制来处理底层的事件。它对我们阅读WebRTC代码至关重要。本篇文章就详细介绍一下 sigslot。

    Sigslot作用

    Sigslot 的作用一句话表式就是为了解耦。例如,有两个类 A 和 B,如果 B 使用 A, 就必须在 B 类中写入与 A 类有关的代码。看下代码:

    class A {
    public:
        void funcA();
    }
    
    class B {
    public:
        B(A& a){
            m_a = a;
        }
        
        void funcB(){
            m_a.funcA(); //这里调用了A类的方法
        }
      
    private:
        A m_a; //引用 A 类型成员变量。
    }
    
    void main(int argc, char *argv[]){
        A a;
        B b(a);
        b.funcB();
    }
    

    这里的弊端是 B 中必须要声名使用 A。如果我们的项目特别复杂,这样的使用方式在后期维护时很容易让我们掉入“陷阱”。有没有一种通用的办法可以做到在 B 中不用使用 A 也可以调用 A 中的方法呢?答案就是使用 sigslot。我们看下面的代码:

    class A : public sigslot::has_slot<>  
    {  
    public:  
        void  funcA();  
    };
    
    class B  
    {  
    public:  
        sigslot::signal0<> sender;  
    };  
      
    void main(int argc, char *argv[]){
    
        A a;  
        B b;
        
        //在运行时才将 a 和 b 绑定到一起  
        b.sender.connect(&a, &A::funcA);   
        b.sender();
        
    } 
    

    通过上面的代码我们可以看到 B 中没有一行与 A 相关的代码。只在 main 函数中(也就是在运行时)才知道 A 与 B 有关联关系。是不是觉得很神奇呢?下面我们就看一下它的实现原理。

    实现原理

    sigslot的原理其实非常简单,它就是一个变化的观察者模式。观察者模式如下所示:

    观察者模式,首先让 Observer(“观察者”)对象 注册到 Subject(“被观察者”) 对象中。当 Subject 状态发生变化时,遍历所有注册到自己的 Observer 对象,并调用它们的 notify方法。

    sigslot与观察者模式类似,它使用signal(“信号”)和slot("槽"),区别在于 signal 主动连接自己感兴趣的类及其方法,将它们保存到自己的列表中。当发射信号时,它遍历所有的连接,调用 slot(“槽”) 方法。

    如何使用

    下面我们看一下 WebRTC 中是如何使用 sigslot 的。

    • 首先,定义 slot("槽"),也就是事件处理函数。在WebRTC中定义槽必须继承 has_slots<>。如下图所示:
    • 其次,定义 signal (“信号”) ,也就是发送的信号。

      sigslot::signal1<AsyncSocket*,
              sigslot::multi_threaded_local> SignalWriteEvent;
      
    • 然后,将 signal 与 slot 连接到一起。在这里就是将 AsyncUDPSocket和 OnWriteEvent方法与signal绑定到一起。

      socket_->SignalWriteEvent.connect(this,
                      &AsyncUDPSocket::OnWriteEvent);
      
    • 最后,发送信号。在 WebRTC中根据参数的不同定义了许多 signal,如 signal1 说明带一个参数,signal2说明带两个参数。

      SignalWriteEvent(this);
      

    关键代码

    下面是对 sigslog 的类关系图及关键代码与其详细注释。

    ...
    
    // On our copy of sigslot.h, we set single threading as default.
    #define SIGSLOT_DEFAULT_MT_POLICY single_threaded
    
    #if defined(SIGSLOT_PURE_ISO) ||                   \
        (!defined(WEBRTC_WIN) && !defined(__GNUG__) && \
         !defined(SIGSLOT_USE_POSIX_THREADS))
    #define _SIGSLOT_SINGLE_THREADED
    #elif defined(WEBRTC_WIN)
    #define _SIGSLOT_HAS_WIN32_THREADS
    #if !defined(WIN32_LEAN_AND_MEAN)
    #define WIN32_LEAN_AND_MEAN
    #endif
    #include "webrtc/rtc_base/win32.h"
    #elif defined(__GNUG__) || defined(SIGSLOT_USE_POSIX_THREADS)
    #define _SIGSLOT_HAS_POSIX_THREADS
    #include <pthread.h>
    #else
    #define _SIGSLOT_SINGLE_THREADED
    #endif
    
    #ifndef SIGSLOT_DEFAULT_MT_POLICY
    #ifdef _SIGSLOT_SINGLE_THREADED
    #define SIGSLOT_DEFAULT_MT_POLICY single_threaded
    #else
    #define SIGSLOT_DEFAULT_MT_POLICY multi_threaded_local
    #endif
    #endif
    
    // TODO: change this namespace to rtc?
    namespace sigslot {
    
    ...
    
    //这面这大段代码是为了实现智能锁使用的。
    //它会根据不同的平台初始化不同的互斥量,并调用不同的锁函数。
    
    //如果是 Window 平台
    #ifdef _SIGSLOT_HAS_WIN32_THREADS
    // The multi threading policies only get compiled in if they are enabled.
    
    //如果是全局线程
    class multi_threaded_global {
     public:
      multi_threaded_global() {
        static bool isinitialised = false;
    
        if (!isinitialised) {
          InitializeCriticalSection(get_critsec());
          isinitialised = true;
        }
      }
    
      void lock() { EnterCriticalSection(get_critsec()); }
    
      void unlock() { LeaveCriticalSection(get_critsec()); }
    
     private:
      CRITICAL_SECTION* get_critsec() {
        static CRITICAL_SECTION g_critsec;
        return &g_critsec;
      }
    };
    
    //如果是本地线程
    class multi_threaded_local {
     public:
      multi_threaded_local() { InitializeCriticalSection(&m_critsec); }
    
      multi_threaded_local(const multi_threaded_local&) {
        InitializeCriticalSection(&m_critsec);
      }
    
      ~multi_threaded_local() { DeleteCriticalSection(&m_critsec); }
    
      void lock() { EnterCriticalSection(&m_critsec); }
    
      void unlock() { LeaveCriticalSection(&m_critsec); }
    
     private:
      CRITICAL_SECTION m_critsec;
    };
    #endif  // _SIGSLOT_HAS_WIN32_THREADS
    
    //非window平台
    #ifdef _SIGSLOT_HAS_POSIX_THREADS
    // The multi threading policies only get compiled in if they are enabled.
    
    //如果是全局线程
    class multi_threaded_global {
     public:
      void lock() { pthread_mutex_lock(get_mutex()); }
      void unlock() { pthread_mutex_unlock(get_mutex()); }
    
     private:
      static pthread_mutex_t* get_mutex();
    };
    
    //如果是本地线程
    class multi_threaded_local {
     public:
      multi_threaded_local() { pthread_mutex_init(&m_mutex, nullptr); }
      multi_threaded_local(const multi_threaded_local&) {
        pthread_mutex_init(&m_mutex, nullptr);
      }
      ~multi_threaded_local() { pthread_mutex_destroy(&m_mutex); }
      void lock() { pthread_mutex_lock(&m_mutex); }
      void unlock() { pthread_mutex_unlock(&m_mutex); }
    
     private:
      pthread_mutex_t m_mutex;
    };
    #endif  // _SIGSLOT_HAS_POSIX_THREADS
    
    //根据不同的策略调用不同的锁。
    //这里的策略就是不同的平台
    template <class mt_policy>
    class lock_block {
     public:
      mt_policy* m_mutex;
    
      lock_block(mt_policy* mtx) : m_mutex(mtx) { m_mutex->lock(); }
    
      ~lock_block() { m_mutex->unlock(); }
    };
    
    class _signal_base_interface;
    
    class has_slots_interface {
    
     ...
     
     public:
      void signal_connect(_signal_base_interface* sender) {
        ...
      }
    
      void signal_disconnect(_signal_base_interface* sender) {
        ...
      }
    
      void disconnect_all() { ... }
    };
    
    class _signal_base_interface {
     ...
    
     public:
      void slot_disconnect(has_slots_interface* pslot) {
        ...
      }
    
      void slot_duplicate(const has_slots_interface* poldslot,
                          has_slots_interface* pnewslot) {
        ...
      }
    };
    
    // 该类是一个特别重要的类
    // signal与slot绑定之前,必须先将槽对象与槽方法组成 connection
    //
    class _opaque_connection {
     private:
      typedef void (*emit_t)(const _opaque_connection*);
      
      //联合结构体,用于函数转换
      template <typename FromT, typename ToT>
      union union_caster {
        FromT from;
        ToT to;
      };
    
      //信号发射函数指针
      emit_t pemit;
      //存放“槽”对象
      has_slots_interface* pdest; 
      // Pointers to member functions may be up to 16 bytes for virtual classes,
      // so make sure we have enough space to store it.
      unsigned char pmethod[16];
    
     public:
      //构造函数
      //在构造connect时,要传入槽对象和槽类方法指针
      template <typename DestT, typename... Args>
      _opaque_connection(DestT* pd, void (DestT::*pm)(Args...)) : pdest(pd) {
        //定义成员函数指针,与C语言中的函数指针是类似的
        typedef void (DestT::*pm_t)(Args...);
        static_assert(sizeof(pm_t) <= sizeof(pmethod),
                      "Size of slot function pointer too large.");
    
        std::memcpy(pmethod, &pm, sizeof(pm_t));
    
         //定义了一个函数指针
        typedef void (*em_t)(const _opaque_connection* self, Args...);
        
        //通过下面的方法,将 pemit 函数变理指向了 emitter 函数。
        union_caster<em_t, emit_t> caster2;
        //注意 emitter后面的是模版参数,不是函数参数,这里不要弄混了。
        caster2.from = &_opaque_connection::emitter<DestT, Args...>;
        pemit = caster2.to;
      }
    
      //返回"槽"对象
      has_slots_interface* getdest() const { return pdest; }
    
      ...
    
      //因为在构造函数里已经将 pemit 设置为 emitter 了,
      //所以下面的代码就是调用 emitter 函数。为里只不过做了一次函数指针类型转换。
      //也就是说调用 connect 的 emit 方法,实际调的是 emitter。
      template <typename... Args>
      void emit(Args... args) const {
        typedef void (*em_t)(const _opaque_connection*, Args...);
        union_caster<emit_t, em_t> caster;
        caster.from = pemit;
        (caster.to)(this, args...);
      }
    
     private:
      template <typename DestT, typename... Args>
      static void emitter(const _opaque_connection* self, Args... args) {
        //pm_t是一个成员函数指针,它指向的是传进来的成员方法
        typedef void (DestT::*pm_t)(Args...);
        pm_t pm;
        std::memcpy(&pm, self->pmethod, sizeof(pm_t));
        //调用成员方法
        (static_cast<DestT*>(self->pdest)->*(pm))(args...);
      }
    };
    
    //signal_with_thread_policy类的父类。
    //该类最主要的作用是存有一个conn list。
    //在 signal_with_thread_policy中的connect方法就是对该成员变量的操作。
    
    template <class mt_policy>
    class _signal_base : public _signal_base_interface, public mt_policy {
     protected:
      typedef std::list<_opaque_connection> connections_list;
     
     public:
      ...
     
     protected:
      //在 _signal_base 中定义了一个connection list,用于绑定的 slots.
      connections_list m_connected_slots;
      ...
      
    };
    
    //该类是"槽"的实现
    template <class mt_policy = SIGSLOT_DEFAULT_MT_POLICY>
    class has_slots : public has_slots_interface, public mt_policy {
     private:
      typedef std::set<_signal_base_interface*> sender_set;
      typedef sender_set::const_iterator const_iterator;
    
     public:
      has_slots()
          : has_slots_interface(&has_slots::do_signal_connect,
                                &has_slots::do_signal_disconnect,
                                &has_slots::do_disconnect_all) {}
    
     ...
    
     private:
      has_slots& operator=(has_slots const&);
    
      //静态函数,用于与signal绑定,由父类调用
      //它是在构造函数时传给父类的
      static void do_signal_connect(has_slots_interface* p,
                                    _signal_base_interface* sender) {
        has_slots* const self = static_cast<has_slots*>(p);
        lock_block<mt_policy> lock(self);
        self->m_senders.insert(sender);
      }
    
      //静态函数,用于解绑signal,由父类调用
      //它是在构造函数时传给父类的
      static void do_signal_disconnect(has_slots_interface* p,
                                       _signal_base_interface* sender) {
        has_slots* const self = static_cast<has_slots*>(p);
        lock_block<mt_policy> lock(self);
        self->m_senders.erase(sender);
      }
    
      ...
      
      private:
      //该集合中存放的是与slog绑定的 signal
      sender_set m_senders;
    };
    
    //该类是信号的具体实现
    //为了保证信号可以在不同的平台是线程安全的,所以这里使用了策略模式
    //mt_policy参数表式的是,不同的平台使用不同的策略
    //该类中有两个重要的函数,一个是connect用于与槽进行绑定;另一个是 emit用于发射信号
    
    template <class mt_policy, typename... Args>
    class signal_with_thread_policy : public _signal_base<mt_policy> {
    public:
    
      ...
      
      template <class desttype>
      void connect(desttype* pclass, void (desttype::*pmemfun)(Args...)) {
        //这是一个智能锁,当函数结束时,自动释放锁。
        lock_block<mt_policy> lock(this);
        //先将对象与"槽"组成一个conn,然后存放到 signal的 conn list里
        //当发射信号时,调用 conn list中的每个conn的 emit方法。
        this->m_connected_slots.push_back(_opaque_connection(pclass, pmemfun));
        
        //在槽对象中也要保存 signal 对象。
        pclass->signal_connect(static_cast<_signal_base_interface*>(this));
      }
    
      //遍历所有的连接,并调用 conn 的emit方法。最终调用的是绑定"槽"的方法
      void emit(Args... args) {
        lock_block<mt_policy> lock(this);
        this->m_current_iterator = this->m_connected_slots.begin();
        while (this->m_current_iterator != this->m_connected_slots.end()) {
          _opaque_connection const& conn = *this->m_current_iterator;
          ++(this->m_current_iterator);
          
          //调 conn 的 emit 方法,最终会调用绑定的 "槽" 方法。
          conn.emit<Args...>(args...);
        }
      }
    
      //重载()操作符,这样就从直接调用emit方法变成隐含调用emit方法。
      void operator()(Args... args) { emit(args...); }
    };
    
    
    //下面的对不同参数信号的定义
    template <typename... Args>
    using signal = signal_with_thread_policy<SIGSLOT_DEFAULT_MT_POLICY, Args...>;
    
    
    template <typename mt_policy = SIGSLOT_DEFAULT_MT_POLICY>
    using signal0 = signal_with_thread_policy<mt_policy>;
    
    template <typename A1, typename mt_policy = SIGSLOT_DEFAULT_MT_POLICY>
    using signal1 = signal_with_thread_policy<mt_policy, A1>;
    
    template <typename A1,
              typename A2,
              typename mt_policy = SIGSLOT_DEFAULT_MT_POLICY>
    using signal2 = signal_with_thread_policy<mt_policy, A1, A2>;
    
    ...
    
    }  // namespace sigslot
    

    小结

    本文通过 sigslot作用、实现原理、如何使用以及详细的代码注释四个部分剖析了 WebRTC 中的 sigslot。sigslot是 WebRTC中非常底性的基础代码,它对 WebRTC 事件机制起着关键性的作用。熟悉sigslot,对我们阅读 WebRTC 代码会有非常大的帮助。

    希望本文能对你有所帮助。谢谢!

    相关文章

      网友评论

      • 226a0cf544fa:难怪我看了两天代码没看懂,原来如此。
      • acenodie:写的很好,期待大佬有时间分析下webrtc的proxy机制!感激不尽!
        音视频直播技术专家:@acenodie 打日志,效率就是低啊
        acenodie:@音视频直播技术专家 敢问大佬,在阅读webrtc代码的时候,是用什么工具调试的呢?我是android平台下的,阅读的时候不能断点调试,效率很低。。。

      本文标题:深入剖析WebRTC事件机制之Sigslot

      本文链接:https://www.haomeiwen.com/subject/lxgzuxtx.html