美文网首页
kurento代码分析(二)rtp流的处理

kurento代码分析(二)rtp流的处理

作者: help_youself | 来源:发表于2018-06-07 18:21 被阅读86次

     今天得闲,又翻了下kurento的代码,没忍住。学有所得,分享在这里。
     kurento在处理rtp流时,需要创建一个rtpbin这样一个element。我上一篇,分析了kurento是怎么通过工厂模式,创建一个gstreamer中的element对象。
     这种工厂模式,提供了很大的灵活性,有新的需求的时候,就可以继承父类,构造新的处理逻辑。例如关于webrtc的rtp流的处理,在C层,KmsWebrtcEndpoint继承了KmsBaseRtpEndpoint。

    struct _KmsWebrtcEndpoint
    {
      KmsBaseRtpEndpoint parent;
    
      KmsWebrtcEndpointPrivate *priv;
    };
    

     同样在C++代码中, WebRtcEndpoint继承了BaseRtpEndpoint。

    class WebRtcEndpointImpl : public BaseRtpEndpointImpl,
      public virtual WebRtcEndpoint
    class WebRtcEndpoint : public virtual BaseRtpEndpoint
    

     把WebRtcEndpointImpl放下,先分析BaseRtpEndpointImpl。

    void BaseRtpEndpointImpl::postConstructor ()
    {
      SdpEndpointImpl::postConstructor ();
    
      mediaStateChangedHandlerId = register_signal_handler (G_OBJECT (element),
                                   "media-state-changed",
                                   std::function <void (GstElement *, guint) > (std::bind (
                                         &BaseRtpEndpointImpl::updateMediaState, this,
                                         std::placeholders::_2) ),
                                   std::dynamic_pointer_cast<BaseRtpEndpointImpl>
                                   (shared_from_this() ) );
    
      connStateChangedHandlerId = register_signal_handler (G_OBJECT (element),
                                  "connection-state-changed",
                                  std::function <void (GstElement *, gchar *, guint) > (std::bind (
                                        &BaseRtpEndpointImpl::updateConnectionState, this,
                                        std::placeholders::_2, std::placeholders::_3) ),
                                  std::dynamic_pointer_cast<BaseRtpEndpointImpl>
                                  (shared_from_this() ) );
    }
    

     register_signal_handler调用g_signal_connect_data进行函数回调注册,将函数的地址传递到C层 _KmsBaseRtpEndpointClass中的(*media_state_changed)函数指针,可以认为是C层预留的事件插槽。查找下"media-state-changed"这个信号,存在kmsbasertpendpoint.c文件中。

      obj_signals[MEDIA_STATE_CHANGED] =
          g_signal_new ("media-state-changed",
          G_TYPE_FROM_CLASS (klass),
          G_SIGNAL_RUN_LAST,
          G_STRUCT_OFFSET (KmsBaseRtpEndpointClass, media_state_changed), NULL,
          NULL, g_cclosure_marshal_VOID__ENUM, G_TYPE_NONE, 1,
          KMS_TYPE_MEDIA_STATE);
    

     在KmsBaseRtpEndpoint这个element创建的时候,gobject会对自动调用其初始化函数kms_base_rtp_endpoint_init,当然也要调用kms_base_rtp_endpoint_class_init对方法进行初始化。rtpbin就是gstreamer中定义的一个element:

    static void
    kms_base_rtp_endpoint_init (KmsBaseRtpEndpoint * self)
    {
    self->priv->rtpbin = gst_element_factory_make ("rtpbin", NULL);//create an element
      g_signal_connect (self->priv->rtpbin, "request-pt-map",
          G_CALLBACK (kms_base_rtp_endpoint_rtpbin_request_pt_map), self);
    
      g_signal_connect (self->priv->rtpbin, "pad-added",
          G_CALLBACK (kms_base_rtp_endpoint_rtpbin_pad_added), self);
    
      g_signal_connect (self->priv->rtpbin, "on-new-ssrc",
          G_CALLBACK (kms_base_rtp_endpoint_rtpbin_on_new_ssrc), self);
      g_signal_connect (self->priv->rtpbin, "on-ssrc-sdes",
          G_CALLBACK (kms_base_rtp_endpoint_rtpbin_on_ssrc_sdes), self);
      g_signal_connect (self->priv->rtpbin, "on-bye-ssrc",
          G_CALLBACK (kms_base_rtp_endpoint_rtpbin_on_bye_ssrc), self);
      g_signal_connect (self->priv->rtpbin, "on-bye-timeout",
          G_CALLBACK (kms_base_rtp_endpoint_rtpbin_on_bye_timeout), self);
      g_signal_connect (self->priv->rtpbin, "new-jitterbuffer",
          G_CALLBACK (kms_base_rtp_endpoint_rtpbin_new_jitterbuffer), self);
    
      g_signal_connect (self->priv->rtpbin, "on-timeout",
          G_CALLBACK (kms_base_rtp_endpoint_rtpbin_on_timeout), self);
    
      g_signal_connect (self->priv->rtpbin, "on-ssrc-active",
          G_CALLBACK (kms_base_rtp_endpoint_rtpbin_on_ssrc_active), self);
    
      g_signal_connect (self->priv->rtpbin, "request-aux-receiver",
          G_CALLBACK (kms_base_rtp_endpoint_rtpbin_request_aux_receiver), self);
      g_signal_connect (self->priv->rtpbin, "request-aux-sender",
          G_CALLBACK (kms_base_rtp_endpoint_rtpbin_request_aux_sender), self);
    }
    static void
    kms_base_rtp_endpoint_class_init (KmsBaseRtpEndpointClass * klass)
    {
      base_endpoint_class = KMS_BASE_SDP_ENDPOINT_CLASS (klass);
      base_endpoint_class->create_session_internal =
          kms_base_rtp_endpoint_create_session_internal;
      base_endpoint_class->start_transport_send =
          kms_base_rtp_endpoint_start_transport_send;
      base_endpoint_class->connect_input_elements =
          kms_base_rtp_endpoint_connect_input_elements;
    

     这个"rtpbin"来自于gstreamer中的gst-plugins-good中的gstrtpbin.c。
     创建了element之后还不行,媒体流的数据处理需要靠创建src,sink这样的pad,这些概念是在kms在创建session之后创建的。session什么时候创建的呢?BaseRtpEndpointImpl::postConstructor ()中调用了SdpEndpointImpl::postConstructor ()。

    void SdpEndpointImpl::postConstructor ()
    {
      gchar *sess_id;
      SessionEndpointImpl::postConstructor ();
    
      g_signal_emit_by_name (element, "create-session", &sess_id);
    
      if (sess_id == nullptr) {
        throw KurentoException (SDP_END_POINT_CANNOT_CREATE_SESSON,
                                "Cannot create session");
      }
      sessId = std::string (sess_id);
      g_free (sess_id);
    }
    

     这个信号发射之后,c层就会调用kms_base_sdp_endpoint_create_session创建session,创建相应的pad。在C层可以看到KmsBaseRtpEndpoint也继承了KmsBaseSdpEndpoint。说白了,kurento中C++中的继承关系,映射到C层结构体,也是相对应的继承关系。

    struct _KmsBaseRtpEndpoint
    {
      KmsBaseSdpEndpoint parent;
    
      KmsBaseRtpEndpointPrivate *priv;
    };
    

     下面,我们来到kmsdpendpoint.c中看看其中的方法(函数)。

    static void
    kms_base_sdp_endpoint_class_init (KmsBaseSdpEndpointClass * klass)
    {
    //...
      klass->create_session = kms_base_sdp_endpoint_create_session;
      klass->release_session = kms_base_sdp_endpoint_release_session;
    
      /* Media handler management */
      klass->create_media_handler = kms_base_sdp_endpoint_create_media_handler_impl;
    
      klass->create_session_internal =
          kms_base_sdp_endpoint_create_session_internal;
      klass->start_transport_send = kms_base_sdp_endpoint_start_transport_send;
      klass->connect_input_elements = kms_base_sdp_endpoint_connect_input_elements;
    
      klass->configure_media = kms_base_sdp_endpoint_configure_media_impl;
    
      klass->generate_offer = kms_base_sdp_endpoint_generate_offer;
      klass->process_offer = kms_base_sdp_endpoint_process_offer;
      klass->process_answer = kms_base_sdp_endpoint_process_answer;
      klass->get_local_sdp = kms_base_sdp_endpoint_get_local_sdp;
      klass->get_remote_sdp = kms_base_sdp_endpoint_get_remote_sdp;
    
      /* Signals initialization */
      kms_base_sdp_endpoint_signals[SIGNAL_CREATE_SESSION] =
          g_signal_new ("create-session",
          G_TYPE_FROM_CLASS (klass),
          G_SIGNAL_ACTION | G_SIGNAL_RUN_LAST,
          G_STRUCT_OFFSET (KmsBaseSdpEndpointClass, create_session), NULL,
          NULL, __kms_core_marshal_STRING__VOID, G_TYPE_STRING, 0);
    
      kms_base_sdp_endpoint_signals[SIGNAL_RELEASE_SESSION] =
          g_signal_new ("release-session",
          G_TYPE_FROM_CLASS (klass),
          G_SIGNAL_ACTION | G_SIGNAL_RUN_LAST,
          G_STRUCT_OFFSET (KmsBaseSdpEndpointClass, release_session), NULL,
          NULL, __kms_core_marshal_BOOLEAN__STRING, G_TYPE_BOOLEAN, 1,
          G_TYPE_STRING);
    //...
    }
    

    pad,sink的创建:

    void
    kms_base_rtp_session_start_transport_send (KmsBaseRtpSession * self,
        gboolean offerer)
    {
        if (!kms_base_rtp_session_configure_media_connection (self, handler, neg_media,
                rem_media, offerer)) {
          GST_WARNING_OBJECT (self, "Cannot configure connection for media (id=%u)", i);
        }
    }
    static gboolean
    kms_base_rtp_session_configure_media_connection (KmsBaseRtpSession * self,
        KmsSdpMediaHandler * handler, const GstSDPMedia * neg_media,
        const GstSDPMedia * remote_media, gboolean offerer)
    {
      return kms_base_rtp_session_add_gst_connection_elements (self, handler,
          neg_media, active);
    }
    static gboolean
    kms_base_rtp_session_add_gst_connection_elements (KmsBaseRtpSession * self,
        KmsSdpMediaHandler * handler, const GstSDPMedia * media, gboolean active)
    {
    kms_base_rtp_session_add_gst_basic_elements (self, conn, media, active);
    }
    static void
    kms_base_rtp_session_add_gst_basic_elements (KmsBaseRtpSession * self,
        KmsIRtpConnection * conn, const GstSDPMedia * media, gboolean active)
    {
      kms_i_rtp_connection_add (conn, GST_BIN (self), active);
      kms_i_rtp_connection_sink_sync_state_with_parent (conn);
    
      kms_base_rtp_session_link_gst_connection_sink (self, conn, media);
      kms_base_rtp_session_link_gst_connection_src (self, conn, media);
    
      kms_i_rtp_connection_src_sync_state_with_parent (conn);
    }
    //最终创建pad的函数
    static void
    kms_base_rtp_session_link_gst_connection_sink (KmsBaseRtpSession * self,
        KmsIRtpConnection * conn, const GstSDPMedia * media)
    {
      GstPad *src, *sink;
    
      /* RTP */
      src = kms_i_rtp_session_manager_request_rtp_src (self->manager, self, media);
      sink = kms_i_rtp_connection_request_rtp_sink (conn);//这里使用了gobject中接口的概念,就是预留的函数指针,需要开发者去实现
      kms_base_rtp_session_link_pads (src, sink);
      /* RTCP */
      src = kms_i_rtp_session_manager_request_rtcp_src (self->manager, self, media);
      sink = kms_i_rtp_connection_request_rtcp_sink (conn);
    kms_base_rtp_session_link_pads (src, sink);
    }
    

     这里用到了接口的概念,可以参考[1]。这里的接口在kmsrtpconection.c,kmswebrtcconnection.c中均有实现。例如kms_rtp_connection_request_rtp_sink。
     问题来了,这个connection是什么时候创建的?函数kms_base_rtp_session_start_transport_send何时被调用?我翻了会代码,在kmssdpagent中kms_sdp_agent_create_answer_impl中。翻翻代码,感觉有点晕,用C实现的面向对象,到处是回调,让我如何不晕。kmssdpagent个人感觉就是接收用户的sdp消息,在有新的用户时候sdp offer时候,将之前的sdp消息answer给对方。

    static GstSDPMessage *
    kms_sdp_agent_create_answer_impl (KmsSdpAgent * agent, GError ** error)
    {
      answer = kms_sdp_agent_generate_answer (agent,
          agent->priv->remote_description, error);
    }
    
    static GstSDPMessage *
    kms_sdp_agent_generate_answer (KmsSdpAgent * agent,
        const GstSDPMessage * offer, GError ** error)
    {
      success = sdp_utils_for_each_media (offer,
          (GstSDPMediaFunc) create_media_answer, &data);
    }
    static gboolean
    create_media_answer (const GstSDPMedia * media, struct SdpAnswerData *data)
    {
      kms_sdp_agent_fire_on_answer_callback (data->agent,
          sdp_handler->sdph->handler, answer_media);
    }
    static void
    kms_sdp_agent_fire_on_answer_callback (KmsSdpAgent * agent,
        KmsSdpMediaHandler * handler, GstSDPMedia * media)
    {
      if (agent->priv->callbacks.callbacks.on_media_answer != NULL) {
        agent->priv->callbacks.callbacks.on_media_answer (agent, handler,
            media, agent->priv->callbacks.user_data);
      }
    }
    static void
    on_media_answer_cb (KmsSdpAgent * agent, KmsSdpMediaHandler * handler,
        GstSDPMedia * media, gpointer user_data)
    {
      KmsSdpSession *session = KMS_SDP_SESSION (user_data);
    
      GST_LOG_OBJECT (session, "Answer media");
    
      kms_base_sdp_endpoint_configure_media (agent, handler, media, user_data);
    }
    static gboolean
    kms_base_sdp_endpoint_configure_media (KmsSdpAgent * agent,
        KmsSdpMediaHandler * handler, GstSDPMedia * media, gpointer user_data)
    {
      return base_sdp_endpoint_class->configure_media (sess->ep, sess, handler,
          media);
    }
    //base_sdp_endpoint_class->configure_media是个可配置的函数,使用不同的协议就要创建不同的connection,
    //例如在kmsrtpendpoint.c就是这个:
    static gboolean
    kms_rtp_endpoint_configure_media (KmsBaseSdpEndpoint * base_sdp_endpoint,
        KmsSdpSession * sess, KmsSdpMediaHandler * handler, GstSDPMedia * media)
    {
      conn = kms_rtp_endpoint_get_connection (KMS_RTP_ENDPOINT (base_sdp_endpoint),
          sess, handler, media);
    }
    //在webrtcendpoin就是kms_webrtc_endpoint_configure_media
    

     回到第二个问题,kms_base_rtp_session_start_transport_send的调用。

    static GstSDPMessage *
    kms_base_sdp_endpoint_process_offer (KmsBaseSdpEndpoint * self,
        const gchar * sess_id, GstSDPMessage * offer)
    {
      answer = kms_sdp_session_process_offer (sess, offer);
      kms_base_sdp_endpoint_start_media (self, sess, FALSE);
    }
    static void
    kms_base_sdp_endpoint_start_media (KmsBaseSdpEndpoint * self,
        KmsSdpSession * sess, gboolean offerer)
    {
      base_sdp_endpoint_class->start_transport_send (self, sess, offerer);
    }
    

     另外一个问题就是,RTP数据流是从哪里接收上来的?且听下次分解。《gstreamer网络数据流的接收》

    [1]GObject 学习笔记汇总---接口的模拟

    相关文章

      网友评论

          本文标题:kurento代码分析(二)rtp流的处理

          本文链接:https://www.haomeiwen.com/subject/jqiwsftx.html