美文网首页
百度 Apollo 8.0 Cyber 源代码分析(四)

百度 Apollo 8.0 Cyber 源代码分析(四)

作者: RonZheng2010 | 来源:发表于2024-11-25 09:23 被阅读0次

4 cyber Reader接收数据

cyber收发数据有几种类型:进程内,进程间,跨主机。进程间的收发基于共享内存,跨主机的收发基于fastrtps网路库。这里以进程间的场景为例说明。

ShmReceiver基于共享内存实现Reciever接口,它委托ShmDispatcher从共享内存读取数据。

DataDispatcher/DataVisitor及相关类,实现一写多读的生产/消费模式。ShmReceiver读取的数据,会复制到DataDispatcher的环形缓存,这个缓存由DataVisitor提供。(ReceiverManager负责将DataDispatcher连接到receiver)。

每个Reader实例创建一个DataVisitor实例,并向DataDispatcher注册它。DataVisitor的环形缓存是在构造函数创建的。

Reader通过指定的回调函数接收消息。Reader通过RoutineFactory创建了一个协程,从DataVisitor读取消息,然后调用这个回调函数。这样Reader就收到消息了。

4.1 ConditionNotifier与MulticastNotifier

NotifierBase用于一组数据的生产/消费通知,这里一个数据单元称作block。也
就是,当写入一个block完成时,writer会通知一个或一组reader。

  • 虚拟成员函数 Notify()用于通知,Listen()用于得到通知。

作为NotfiBase的派生类,ConditionNotifier和MutlicastNotifier的区别是,前者通知一个消费者,后者通知所有消费者。

ConditionNotifier基于共享内存实现。

  • 成员key_ 是基于”/apollo/cyber/transport/shm/notifier”的hash值,用于创建共享内存
  • 成员managed_shm_指向创建的共享内存,其中存放一组Indicator实例,也就是成员indicator_。Indicator保存block信息,如host_id_、channel_id、block_index_等。

MulticastNotifier基于udp组播实现。
+notifier_fd_是socket fd,使用组播地址239.255.0.100/8888。

4.2 Segment

基于共享内存的通信使用Segment传输数据,每个channel一个Segment实例。

  • 成员channel_id_是channel id
  • 成员managed_shm_ 是共享内存块。成员state_ 和blocks_在managed_shm_分配,分别保存meta信息和block数据。每个block一块数据。
  • 如果共享内存块还没创建,OpenOrCreate()创建它,如果已经创建了,OpenOnly()打开它。
  • 成员函数AcquireBlockToWrite()/ReleaseWrittenBlocks()用于写block,AcquireBlockToRead()/ReleaseReadBlock()用于读block。

XsiSegment和PosixSegment的区别是创建共享内存块managed_shm_ 的方式不同。

SegmentFactory根据配置文件的选项创建XsiSegment/PosixSegment实例。

4.3 Dispatcher / ListenserHandler

ListenerHandlerBase定义了消息通知接口,ListenerHandler基于Signal实现这个接口。

  • 成员signal_ 是Signal实例。成员函数Connect()将指定回调函数连接到signal_ 上,以便得到通知。
  • 成员signal_conns_ 是从某种类型id到Signal实例的映射。
  • 成员函数Run()使用给定的消息,调用signal_中的回调函数。

Dispatcher从传输层接收消息,并向上层派发。

  • 成员msg_listeners_ 是从channel_id到ListenserHandleBase(实际上就是ListenerHandler)实例的映射。
  • 成员函数AdListener()/RemoveListner()注册指定的回调函数。它在msg_listeners_查找MessageListener实例,如果没有就增加一个新的;将回调函数连接MessageListener的成员signal_上。这样当MessageListener::Run()放入消息时,回调函数会被调用。

4.4 ShmDispatcher

ShmDispatcher基于共享内存实现Dispatcher接口。

  • 成员host_id_是本地主机id。
  • 成员segments_ 是从channel_id到Segement实例的映射。从segments_获取消息数据,一个channel有一个Segment。成员函数AddSegment()向segments增加Segment实例。
  • 当某个channel的Segment有消息到达时,可以从成员notifier_得到通知。notifier_负责多个channel。
  • 注意ShmDispatcher有自己版本的AddListener(),使用模板参数Message。其中定义定义了一个将Message适配到ReadableBlock的函数。后面这个函数使用模板参数ReadableBlock调用Dispatcher::AddListener()。
  • 成员thread_是接收消息的线程。它等待成员notifier_ 的通知;根据得到的channel调用ReadMessage()。
  • 在ReadMessage()中,从相应channel的Segment获取ReadableBlock类型数据,从数据中反序列化,得到MessageInfo信息(没有数据部分),调用OnMessage()。
  • 在OnMessage()中,从msg_listeners_中找到相应channel_id的ListenerHandler实例,调用ListenerHandler::Run();从msg_listeners_找到对应channel的ListenerHandler实例,然后调用功能ListenerHandler::Run()。这样之前ShmDispatcher::AddListener()中定义的适配函数会被调用。
  • 这个适配函数从ReadableBlock解析出Message,然后调用使用者的回调函数。

4.5 Receiver

Receiver负责接收消息。

  • 成员函数OnNewMessage()在消息到达被调用,然后它会调用成员msg_listener_。后者由使用者通过Receiver构造函数指定。
  • 虚拟成员函数Enable()开始接收消息,Disable()停止接收消息。派生类一般在这里注册/反注册OnNewMessage()。

4.6 ShmReceiver

ShmReceiver基于共享内存实现Receiver接口,它将主要工作委托给ShmDispatcher。

  • 成员dispatcher_ 是ShmDispatcher实例。
  • ShmReceiver定义了自己版本的Enable()/Disable(),以便把Receiver::OnNewMessage()加入到dispatcher_的监听者队列中。当dispatcher_收到消息时,OnNewMessage()会调用使用者的回调函数。这个回调函数在ShmReceiver/Receiver的构造函数中指定。

4.7 HybridReceiver

HybridReceiver是混合型Receiver实例,它实际上是将工作委托给自己的成员去处理,这些成员其他基本类型的Receiver,如ShmReceiver。

4.8 Transport

Transport负责根据指定的Mode创建Receiver/Transmiter实例。

4.9 cyber如何使用DataVisitor向多个Reader推送消息

DataVisitor在cyber中的使用场景如下:

  • 在全局函数CreateRoutineFactory()中,创建RoutineFactory的实例。它的成员create_routine是函数指针,成员data_visitor_是DataVisitor实例。如前面介绍DataVisitor时所述,它的成员notifier_是一个回调函数。
  • 用这个RoutineFactory实例作为参数,调用Scheduler::CreateTask()。
    • 创建CRoutine实例,协程处理函数设置为RoutineFactory::create_routine,
    • 定义一个函数,将它设置为RoutineFactory.data_visitor_.notifier_。
  • 当消息到达时,DataDispatcher::dispatch()被调用,消息保存到成员buffer_中;同时成员notifier_被调用,这个回调函数会通知Scheduler更新当前协程的等待状态,进入可调度状态;如果协程的处理线程处于等待状态,还会唤醒它。

  • 回到RoutineFactory::create_routine的实现。这个函数在一个for()循环中,

    • 调用DataVisitor::TryFetch(),尝试从成员buffer_ 获取消息。如果有消息,则调用用户提供的回调函数;如果没有,则调用CRoutine::Yield(),协程放弃剩余的时间片,进入等待。
  • 对于Reader,它在Reader::Init()中定义了一个回调函数。这样Reader实例就得到了消息。

4.10 ReceiverManager

ReceieverManager基于Transport创建Receiver实例。这里定义了一个函数绑定到Receiver实例上。当消息到达时,调用DataDispatcher::Dispatch()推送它,保存到DataVisitor实例中。

5 cyber Writer发送数据

cyber收发数据有几种类型:进程内,进程间,跨主机。进程间的收发基于共享内存,跨主机的收发基于fastrtps网路库。这里以进程间的场景为例说明。

5.1 Transmitter

Transmitter负责发送消息。

  • 成员函数Transmit()发送消息。
  • 虚拟成员函数Enable()使能发送消息,Disable()停止发送消息。

5.2 ShmTrasmitter

ShmTransmitter基于共享内存实现Transmitter接口。

  • 成员channel_id_ 是channel id
  • 成员segment_ 是Segment实例,数据写入这里。
  • 成员notifer_是Notifier实例,用于通知对端数据准备好了。

5.3 HybridTransmitter

HybridTransmitter是混合型Transmitter实例,它实际上是将工作委托给自己的成员去处理,这些成员其他基本类型的Transmitter,如ShmTransmitter。

6 cyber 跨主机收发数据的场景

前面以进程间的场景说明了cyber如何收发数据,这里说明跨主机收发数据的场景。

RtpsDispatcher派生自Dispatcher,负责从fastrtps接收指定channel的消息,然后派发给上层的DataDispatcher/DataVisitor实例,进而被Reader接收。

  • 成员participant是Participant实例。
  • 成员 subs_是一个从channel id到Subcriber实例的映射。Subriber的成员包括fastrtps_Subscriber和SubListener实例,用于监听fastrtps上指定channel。

RtpsTransmitter派生自Transmitter,负责通过fastrtps发送指定channel的消息。消息发送之前,会先序列化成字符串。

  • 成员participant是Participant实例。
  • 成员publisher_ 是fastrtps_Publisher实例。

7 cyber 收发消息的底层消息

进程间通信时,ShmTransmitter/ShmDispatcher传输的消息单元是ReadableBlock/WritableBlock;跨主机通信时,RtpsTranmistter/RtpsDispatcher传输的消息单元是UnderlayMessage。

MessageInfo中保存发送端的Id,消息序列号等。传输时需要它来标记消息。

8 cyber跨主机收发数据的qos

cyber可以在配置文件cyber/conf/cyber.conf中,给跨主机通信设置质量保证选项

这些选项读入后保存在RoleAttributes的成员qos_profle中,这是一个QosProfile实例。

当创建fastrtps_Subscriber/fastrtps_Publisher时,这些选项会作为创建的参数传入。

相关链接

百度 Apollo 8.0 Cyber 源代码分析(一)
百度 Apollo 8.0 Cyber 源代码分析(二)
百度 Apollo 8.0 Cyber 源代码分析(三)
百度 Apollo 8.0 Cyber 源代码分析(四)
百度 Apollo 8.0 Cyber 源代码分析(五)

相关文章

网友评论

      本文标题:百度 Apollo 8.0 Cyber 源代码分析(四)

      本文链接:https://www.haomeiwen.com/subject/tspzjjtx.html