美文网首页WebRTCwebrtc数据流分析
WebRTC PacedSender 原理分析(一)

WebRTC PacedSender 原理分析(一)

作者: JeffreyLau | 来源:发表于2020-05-16 18:58 被阅读0次

    PacedSender 的族普关系

    paced_class_uml_001.png
    • PacedSender继承Module类,实现其Process和TimeUntilNextProcess方法,其中TimeUntilNextProcess的实现便是相隔多少时间Process函数会被paced_thread回调一次

    • PacedSender类依赖PacingController类事实上,PacedSender把大部分工作都交给了PacingController

      和PacketRouter

    PacedSender 入队操作

    #modules/pacing/paced_sender.cc
    void PacedSender::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
      rtc::CritScope cs(&critsect_);
      pacing_controller_.EnqueuePacket(std::move(packet));
    }
    
    • 通过RTPSenderVideo::SendVideoPacket将rtp包通过回调EnqueuePacket将rtp包存入PacedSender所管理的队列当中
    • PacedSender::EnqueuePacket把工作交给PacingController
    #modules/pacing/pacing_controller.cc
    void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
      RTC_DCHECK(pacing_bitrate_ > DataRate::Zero())
          << "SetPacingRate must be called before InsertPacket.";
    
      Timestamp now = CurrentTime();
      prober_.OnIncomingPacket(packet->payload_size());
    
      if (packet->capture_time_ms() < 0) {
        packet->set_capture_time_ms(now.ms());
      }
    
      RTC_CHECK(packet->packet_type());
      int priority = GetPriorityForType(*packet->packet_type());
      packet_queue_.Push(priority, now, packet_counter_++, std::move(packet));
    }
    
    • 如果capture_time_ms小于0,在这里为期初始化时间
    • 获取优先级,kAudio(0),kRetransmission(1),kVideo(2),kPadding(3)
    • 最终根据优先级将packet放入packet_queue_队列

    RoundRobinPacketQueue原理分析

    RoundRobinPacketQueue_01.png
    • RoundRobinPacketQueue队列的核心实现是内部管理4个数据结构
    • streams_容器用来管理以ssrc为key,以Stream对象为value的容器,依次可以看出,对于不同ssrc的流都会被该容器所管理
    • rtp_packets_列表用来托管真正的rtp流对应std::unique_ptr<RtpPacketToSend>,所有的的发送真实的rtp流都会存到这里,后续发送到网络通过从该列表中获得发送
    • enqueue_times_集合用来记录每次rtp流入队列的时间
    • 在每个数据包插入到队列的时候会创建一个QueuedPacket,同时会根据QueuedPacket的优先级创建一个StreamPrioKey对象,并且会以此对象为key,该包的ssrc值为value,将其插入到stream_priorities_集合
    #modules/pacing/round_robin_packet_queue.cc
    void RoundRobinPacketQueue::Push(int priority,
                                     Timestamp enqueue_time,
                                     uint64_t enqueue_order,
                                     std::unique_ptr<RtpPacketToSend> packet) {
      uint32_t ssrc = packet->Ssrc();
      uint16_t sequence_number = packet->SequenceNumber();
      int64_t capture_time_ms = packet->capture_time_ms();
      DataSize size =
          DataSize::bytes(send_side_bwe_with_overhead_
                              ? packet->size()
                              : packet->payload_size() + packet->padding_size());
      auto type = packet->packet_type();
      RTC_DCHECK(type.has_value());
    
      rtp_packets_.push_front(std::move(packet));
      Push(QueuedPacket(
          priority, *type, ssrc, sequence_number, capture_time_ms, enqueue_time,
          size, *type == RtpPacketToSend::Type::kRetransmission, enqueue_order,
          enqueue_times_.insert(enqueue_time), rtp_packets_.begin()));
    }
    
    • 首先得到ssrc,sequence_number,capture_time_ms,size(rtp包的大小)
    • 将RtpPacketToSend包通过rtp_packets_.push_front存入rtp_packets_列表
    • 以各参数创建QueuedPacket,由此可见每个RtpPacketToSend对应一个QueuedPacket,但是它并不正在存放RtpPacketToSend数据,只是记录了其szie,ssrc,sequence_number,以及rtp_packets_.begin()迭代器头,因为每次将RtpPacketToSend插入到rtp_packets_列表都是从头部插入,这里相当于得到其索引,便于后续发送到网络使用

    QueuedPacket数据结构的实现

    RoundRobinPacketQueue_QueuedPacket.png
    • QueuedPacket重要的成员变量就是packet_it_,它就是真实rtp包的索引所在
    • QueuedPacket提供了如下函数用于获取当前QueuedPacket对应的RtpPacketToSend包
    #modules/pacing/round_robin_packet_queue.cc
    std::unique_ptr<RtpPacketToSend>
    RoundRobinPacketQueue::QueuedPacket::ReleasePacket() {
      return packet_it_ ? std::move(**packet_it_) : nullptr;
    }
    
    • 与上面分析对应通过std::move(**packet_it_)返回

    Stream数据结构的实现

    RoundRobinPacketQueue_Stream.png
    • 以下结合代码来分析该数据结构
    ##modules/pacing/round_robin_packet_queue.cc
    void RoundRobinPacketQueue::Push(QueuedPacket packet) {
      auto stream_info_it = streams_.find(packet.ssrc());
      if (stream_info_it == streams_.end()) {
        stream_info_it = streams_.emplace(packet.ssrc(), Stream()).first;
        stream_info_it->second.priority_it = stream_priorities_.end();
        stream_info_it->second.ssrc = packet.ssrc();
      }
    
      Stream* stream = &stream_info_it->second;
    
      if (stream->priority_it == stream_priorities_.end()) {
        // If the SSRC is not currently scheduled, add it to |stream_priorities_|.
        RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
        stream->priority_it = stream_priorities_.emplace(
            StreamPrioKey(packet.priority(), stream->size), packet.ssrc());
      } else if (packet.priority() < stream->priority_it->first.priority) {
        // If the priority of this SSRC increased, remove the outdated StreamPrioKey
        // and insert a new one with the new priority. Note that |priority_| uses
        // lower ordinal for higher priority.
        stream_priorities_.erase(stream->priority_it);
        stream->priority_it = stream_priorities_.emplace(
            StreamPrioKey(packet.priority(), stream->size), packet.ssrc());
      }
      RTC_CHECK(stream->priority_it != stream_priorities_.end());
    
      // In order to figure out how much time a packet has spent in the queue while
      // not in a paused state, we subtract the total amount of time the queue has
      // been paused so far, and when the packet is popped we subtract the total
      // amount of time the queue has been paused at that moment. This way we
      // subtract the total amount of time the packet has spent in the queue while
      // in a paused state.
      UpdateQueueTime(packet.enqueue_time());
      packet.SubtractPauseTime(pause_time_sum_);
    
      size_packets_ += 1;
      size_ += packet.size();
    
      stream->packet_queue.push(packet);
    }
    
    • 根据QueuedPacket的ssrc来查询streams_集合中是否有该ssrc对应的Stream对象,如果没有则根据该ssrc实例化一个Stream对象并以ssrc为key将其插入到streams_集合
    • 在插入后Stream的成员变量priority_it是指向stream_priorities_.end的
    • 下面的处理如果Stream的成员变量priority_it是指向stream_priorities_.end则为当前的QueuedPacket包通过stream_priorities__.emplace 以StreamPrioKey对象为key,以ssrc为value插入到stream_priorities_集合当中并放回当前迭代器赋值给Stream的成员变量priority_it
    • 假设同一路stream也就是同一个ssrc,在插入的时候,本次的priority小于上一次的priority(越小优先级越高?),那么首先需要将原来stream_priorities_管理的擦除,然后在重新创建StreamPrioKey插入到stream_priorities_
    • 最后通过stream->packet_queue.push(packet)将QueuedPacket插入到Stream管理的packet_queue集合当中
    • 经过以上分析大致可得出如下关系
    RoundRobinPacketQueue_Stream_2.png
    • 每一个RtpPacketToSend包对应一个QueuedPacket对象
    • 每一路ssrc对应的stream对应一个Stream,而每一个Stream对象管理着入队的多个QueuedPacket

    PacedSender 出队操作

    • PacedSender 出队操作是一个十分复杂的过程,涉及到动态码率估计,webrtc经过bwe发送端码率估计评测出新码率后会将码率作用到paced模块,让PacedSender按照新的码率进行数据发送,本文为便于分析不考虑码率估计进行分析假设码率已知
    • PacedSender 出队操作要从PacedSender派生Module模块谈起,经paced_thread_处理,检测PacedSender重载的TimeUntilNextProcess函数判断下一次回调PacedSender::Process函数
    • webrtc初始化创建PacedSender过程会通过SetPacingRates设置初始化码率
    #modules/pacing/paced_sender.cc
    int64_t PacedSender::TimeUntilNextProcess() {
      rtc::CritScope cs(&critsect_);
    
      // When paused we wake up every 500 ms to send a padding packet to ensure
      // we won't get stuck in the paused state due to no feedback being received.
      TimeDelta elapsed_time = pacing_controller_.TimeElapsedSinceLastProcess();
      if (pacing_controller_.IsPaused()) {
        return std::max(PacingController::kPausedProcessInterval - elapsed_time,
                        TimeDelta::Zero())
            .ms();
      }
    
      auto next_probe = pacing_controller_.TimeUntilNextProbe();
      if (next_probe) {
        return next_probe->ms();
      }
    
      const TimeDelta min_packet_limit = TimeDelta::ms(5);
      return std::max(min_packet_limit - elapsed_time, TimeDelta::Zero()).ms();
    }
    
    • 首先通过pacing_controller_.TimeElapsedSinceLastProcess()得到已经流逝的时间,也就是当前时间和上一次处理时间相减
    • 假设next_probe为-1或nullptr也就是不做码率探测
    • 默认最小发包间隔是5ms,这里将min_packet_limit - elapsed_time和0取最大值,超过5ms则立即执行
    void PacedSender::Process() {
      rtc::CritScope cs(&critsect_);
      pacing_controller_.ProcessPackets();
    }
    
    • PacedSender将真正的处理交给PacingController
    void PacingController::ProcessPackets() {
      Timestamp now = CurrentTime();
      TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
      ....
      if (paused_)
        return;
    
      if (elapsed_time > TimeDelta::Zero()) {
        DataRate target_rate = pacing_bitrate_;
        DataSize queue_size_data = packet_queue_.Size();
        if (queue_size_data > DataSize::Zero()) {
          // Assuming equal size packets and input/output rate, the average packet
          // has avg_time_left_ms left to get queue_size_bytes out of the queue, if
          // time constraint shall be met. Determine bitrate needed for that.
          packet_queue_.UpdateQueueTime(CurrentTime());
          if (drain_large_queues_) {
            TimeDelta avg_time_left =
                std::max(TimeDelta::ms(1),
                         queue_time_limit - packet_queue_.AverageQueueTime());
            DataRate min_rate_needed = queue_size_data / avg_time_left;
            if (min_rate_needed > target_rate) {
              target_rate = min_rate_needed;
              RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps="
                                  << target_rate.kbps();
            }
          }
        }
    
        media_budget_.set_target_rate_kbps(target_rate.kbps());
        UpdateBudgetWithElapsedTime(elapsed_time);
      }
    
      bool is_probing = prober_.IsProbing();
      PacedPacketInfo pacing_info;
      absl::optional<DataSize> recommended_probe_size;
      if (is_probing) {
        pacing_info = prober_.CurrentCluster();
        recommended_probe_size = DataSize::bytes(prober_.RecommendedMinProbeSize());
      }
    
      DataSize data_sent = DataSize::Zero();
      // The paused state is checked in the loop since it leaves the critical
      // section allowing the paused state to be changed from other code.
      while (!paused_) {
        auto* packet = GetPendingPacket(pacing_info);
        if (packet == nullptr) {
          // No packet available to send, check if we should send padding.
          DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent);
          if (padding_to_add > DataSize::Zero()) {
            std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets =
                packet_sender_->GeneratePadding(padding_to_add);
            if (padding_packets.empty()) {
              // No padding packets were generated, quite send loop.
              break;
            }
            for (auto& packet : padding_packets) {
              EnqueuePacket(std::move(packet));
            }
            // Continue loop to send the padding that was just added.
            continue;
          }
    
          // Can't fetch new packet and no padding to send, exit send loop.
          break;
        }
    
        std::unique_ptr<RtpPacketToSend> rtp_packet = packet->ReleasePacket();
        RTC_DCHECK(rtp_packet);
        packet_sender_->SendRtpPacket(std::move(rtp_packet), pacing_info);
    
        data_sent += packet->size();
        // Send succeeded, remove it from the queue.
        OnPacketSent(packet);
        if (recommended_probe_size && data_sent > *recommended_probe_size)
          break;
      }
    
      if (is_probing) {
        probing_send_failure_ = data_sent == DataSize::Zero();
        if (!probing_send_failure_) {
          prober_.ProbeSent(CurrentTime().ms(), data_sent.bytes());
        }
      }
    }
    
    • 获取流逝的时间病更新上一次处理的时间为当前时间,流逝时间不得大于2s,如果大于2s则elapsed_time为2s
    • 在drain_large_queues_支持的情况下(一次处理可以发送多个数据包?),根据时间差来计算本次发送的最小码率,如果当前的码率比实际发送的最小码率要小则通过media_budget_.set_target_rate_kbps(target_rate.kbps())设置码率
    • 如果正在进行码率探测,则获取本次码率探测得出的本次推荐发送的数(推荐发送多少数据)
    • 进入while循环通过GetPendingPacket()从RoundRobinPacketQueue中获取QueuedPacket包,然后通过packet->ReleasePacket()得到RtpPacketToSend,最后通过packet_sender_->SendRtpPacket进行发送
    • GetPendingPacket如果在网络拥塞并且码率探测其未进入探测的的情况下会返回空,并且会将从RoundRobinPacketQueue弹出的QueuedPacket重新插入到队列当中,同时跳出循环
    • 如果RoundRobinPacketQueue为空GetPendingPacket获取不到数据while循环会跳出
    • 如果成功发送后recommended_probe_size的值大于0并且实际发送值已经大于或等于recommended_probe_size也会跳出循环结束本次process
    RoundRobinPacketQueue::QueuedPacket* PacingController::GetPendingPacket(
        const PacedPacketInfo& pacing_info) {
      if (packet_queue_.Empty()) {
        return nullptr;
      }
    
      // Since we need to release the lock in order to send, we first pop the
      // element from the priority queue but keep it in storage, so that we can
      // reinsert it if send fails.
      RoundRobinPacketQueue::QueuedPacket* packet = packet_queue_.BeginPop();
      bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;
      bool apply_pacing = !audio_packet || pace_audio_;
      if (apply_pacing && (Congested() || (media_budget_.bytes_remaining() == 0 &&
                                           pacing_info.probe_cluster_id ==
                                               PacedPacketInfo::kNotAProbe))) {
        packet_queue_.CancelPop();
        return nullptr;
      }
      return packet;
    }
    
    • packet_queue_.BeginPop()弹出QueuedPacket
    • packet_queue_.CancelPop()重新将QueuedPacket加入到队列
    • BeginPop的原理是首先通过GetHighestPriorityStream遍历stream_priorities_获取优先发送的流对应的ssrc
    • 其次通过对应的ssrc查找streams_集合得到Stream,然后通过Stream得到依次要发送的QueuedPacket

    相关文章

      网友评论

        本文标题:WebRTC PacedSender 原理分析(一)

        本文链接:https://www.haomeiwen.com/subject/zfvrohtx.html