该系列文章主要记录阅读理解ceph代码时可能遇到的一些难点,可能跳跃比较大。如果有描述错误或任何疑问欢迎交流讨论。
状态机本身机制是比较好理解的,看状态机代码时只要理解几个特殊的地方就很容易。
默认子状态
状态机的部分状态有默认子状态,从该状态到其默认子状态是不需要事件触发的。比如:
struct Primary : boost::statechart::state< Primary, Started, Peering >, NamedState
{
Primary(my_context ctx);
void exit();
typedef boost::mpl::list <
boost::statechart::custom_reaction< ActMap >,
boost::statechart::custom_reaction< MNotifyRec >,
boost::statechart::transition< NeedActingChange, WaitActingChange >
> reactions;
boost::statechart::result react(const ActMap&);
boost::statechart::result react(const MNotifyRec&);
};
这里定义了Primary状态,其父状态是Started,默认子状态是Peering。因而从Primary进入Peering状态不需要任何事件触发。
状态跳转
状态机是由事件驱动的,定义事件的响应方法有2种方式。
- 注册事件处理函数,比如上面Primary在处理ActMap事件就注册了boost::statechart::result react(const ActMap&)函数。
- 跳转到另一个状态。比如上面Primary处理NeedActingChange事件。
在事件处理中直接跳转状态是一种方式。
还有一种状态跳转方式,就是显式的调用transit。比如:
boost::statechart::result PG::RecoveryState::Reset::react(const ActMap&)
{
PG *pg = context< RecoveryMachine >().pg;
...
...
pg->update_heartbeat_peers();
pg->take_waiters();
return transit< Started >();
}
事件处理
注意事件不一定都是在当前状态下处理,如果当前状态处理不了,可以在其父状态处理,相当于子状态继承了父状态的事件处理方法。
这里还有一个小疑问,我们看到有些地方执行post_event, 这些event是什么时候由谁执行的呢? 可以想象应该是状态机在处理完当前事件之后就开始执行了。查看boost源码也确实如此。
post_event的实现:
void post_event_impl( const event_base_ptr_type & pEvent )
{
BOOST_ASSERT( get_pointer( pEvent ) != 0 );
eventQueue_.push_back( pEvent );
}
eventQueue_是在process_event中循环处理的。
void process_event( const event_base_type & evt )
{
if ( send_event( evt ) == detail::do_defer_event )
{
deferredEventQueue_.push_back( evt.intrusive_from_this() );
}
process_queued_events();
}
void process_queued_events()
{
while ( !eventQueue_.empty() )
{
event_base_ptr_type pEvent = eventQueue_.front();
eventQueue_.pop_front();
if ( send_event( *pEvent ) == detail::do_defer_event )
{
deferredEventQueue_.push_back( pEvent );
}
}
}
有了上面的基础之后,再去找各种状态是怎么变化的就非常简单了。
状态机的事务
还有一点对于理解ceph状态机也比较重要,我们经常看到状态内部执行事务,但是没看到事务在哪里提交。答案在这里:
if (!advance_pg(curmap->get_epoch(), pg, handle, &rctx, &split_pgs))
{
// we need to requeue the PG explicitly since we didn't actually
// handle an event
peering_wq.queue(pg);
}
else
{
assert(!pg->peering_queue.empty());
PG::CephPeeringEvtRef evt = pg->peering_queue.front();
pg->peering_queue.pop_front();
pg->handle_peering_event(evt, &rctx);
}
need_up_thru = pg->need_up_thru || need_up_thru;
same_interval_since = MAX(pg->info.history.same_interval_since,
same_interval_since);
pg->write_if_dirty(*rctx.transaction);
if (!split_pgs.empty())
{
rctx.on_applied->add(new C_CompleteSplits(this, split_pgs));
split_pgs.clear();
}
dispatch_context_transaction(rctx, pg, &handle);
就在最后一行。
然后,好像没了,自己对着状态机那个图研究吧 : )
网友评论