EOS源码备忘-Push Transaction机制
这里我们讨论EOS Push Transaction 的逻辑,这块EOS与Eosforce实现有一些区别,我们会着重点出。 关于wasm相关的内容我们会有一片专门的文档分析。
我们这里通常将Transaction译做交易,其实这里应该是事务的意思。
1. Transaction与Action
在EOS中Transaction与Action是最重要的几个类型, 在EOS中,所有的链上行为都是Action,Transaction是一系列Action组成的事务。
EOS中使用继承体系划分trx与action结构,关系图如下:
transaction_header <- transaction <- signed_transaction <- deferred_transaction
|
packed_transaction
1.1 Action
我们这里先看一下Action的声明:
// 权限结构structpermission_level{account_name actor; permission_name permission; }; ...structaction{account_name account; action_name name;// 执行所需的权限vector authorization; bytes data; ...// 打包成二进制templateTdata_as()const{ ... } };
Action没有什么特别的内容,但要注意:
!> 在EOS中一个transaction中包含很多个action,而在Eosforce中一个trx只能包括一个action。
1.2 Transaction
下面我们分析一下transaction,这里简写为trx。
首先看下
/**
* The transaction header contains the fixed-sized data
* associated with each transaction. It is separated from
* the transaction body to facilitate partial parsing of
* transactions without requiring dynamic memory allocation.
*
* All transactions have an expiration time after which they
* may no longer be included in the blockchain. Once a block
* with a block_header::timestamp greater than expiration is
* deemed irreversible, then a user can safely trust the transaction
* will never be included.
*
* Each region is an independent blockchain, it is included as routing
* information for inter-blockchain communication. A contract in this
* region might generate or authorize a transaction intended for a foreign
* region.
*/structtransaction_header{time_point_sec expiration;///< trx超时时间uint16_tref_block_num =0U;// 包含trx的block num 注意这个值是后2^16个块中uint32_tref_block_prefix =0UL;// blockid的低32位fc::unsigned_int max_net_usage_words =0UL;// 网络资源上限uint8_tmax_cpu_usage_ms =0;// cpu资源上限fc::unsigned_int delay_sec =0UL;/// 延迟交易的延迟时间/**
* @return the absolute block number given the relative ref_block_num
* 计算ref_block_num
*/block_num_typeget_ref_blocknum( block_num_type head_blocknum )const{return((head_blocknum/0xffff)*0xffff) + head_blocknum%0xffff; }voidset_reference_block(constblock_id_type& reference_block );boolverify_reference_block(constblock_id_type& reference_block )const;voidvalidate()const; };
transaction_header包含一个trx中固定长度的数据,这里之所以要单独提出来主要是为了优化。
transaction视为交易体数据,这里主要是存储这个trx包含的action。
/**
* A transaction consits of a set of messages which must all be applied or
* all are rejected. These messages have access to data within the given
* read and write scopes.
*/
// 在EOS中一个交易中 action要么全部执行,要么都不执行
struct transaction : public transaction_header {
vector<action> context_free_actions;
vector<action> actions;
extensions_type transaction_extensions;
// 获取trx id
transaction_id_type id()const;
digest_type sig_digest( const chain_id_type& chain_id, const vector<bytes>& cfd = vector<bytes>() )const;
...
};
注意这里的context_free_actions,这里指上下文无关的Action,具体信息可以参见这里: https://medium.com/@bytemaster/eosio-development-update-272198df22c1 和 https://github.com/EOSIO/eos/issues/1387。 如果一个Action执行时只依赖与transaction的数据,而不依赖与链上的状态,这样的action可以并发的执行。
另外一个值得注意的是trx id:
transaction_id_type transaction::id()const{ digest_type::encoder enc; fc::raw::pack( enc, *this);returnenc.result();}
!> Eosforce不同
在Eosforce中为了添加手续费信息,trx与EOS结构不同,主要是增加了fee, 在transaction中:
struct transaction : public transaction_header {
vector<action> context_free_actions;
vector<action> actions;
extensions_type transaction_extensions;
asset fee; // EOSForce 增加的手续费,在客户端push trx时需要写入
transaction_id_type id()const;
digest_type sig_digest( const chain_id_type& chain_id, const vector<bytes>& cfd = vector<bytes>() )const;
flat_set<public_key_type> get_signature_keys( const vector<signature_type>& signatures,
const chain_id_type& chain_id,
const vector<bytes>& cfd = vector<bytes>(),
bool allow_duplicate_keys = false,
bool use_cache = true )const;
uint32_t total_actions()const { return context_free_actions.size() + actions.size(); }
account_name first_authorizor()const {
for( const auto& a : actions ) {
for( const auto& u : a.authorization )
return u.actor;
}
return account_name();
}
};
在 https://eosforce.github.io/Documentation/#/zh-cn/eosforce_client_develop_guild 这篇文档里也有说明。
这里计算trx id时完全使用trx的数据,这意味着,如果是两个trx数据完全一致,特别的他们在一个区块中,那么这两个trx的id就会是一样的。
1.3 signed_transaction
一个trx签名之后会得到一个signed_transaction,
structsigned_transaction:publictransaction { ...vector signatures;// 签名vector context_free_data;// 上下文无关的action所使用的数据// 签名constsignature_type&sign(constprivate_key_type& key,constchain_id_type& chain_id);signature_typesign(constprivate_key_type& key,constchain_id_type& chain_id)const; flat_set get_signature_keys(constchain_id_type& chain_id,boolallow_duplicate_keys =false,booluse_cache =true)const; };
signed_transaction包含签名数据和上下文无关的action所使用的数据,
这里要谈一下context_free_data,可以参见 https://github.com/EOSIO/eos/commit/a41b4d56b5cbfd0346de34b0e03819f72e834041 ,之前我们看过context_free_actions, 在上下文无关的action中可以去从context_free_data获取数据,可以参见在api_tests.cpp中的测试用例:
... {// back to normal actionactionact1(pl, da); signed_transaction trx; trx.context_free_actions.push_back(act); trx.context_free_data.emplace_back(fc::raw::pack(100));// verify payload matches context free datatrx.context_free_data.emplace_back(fc::raw::pack(200)); trx.actions.push_back(act1);// attempt to access non context free apifor(uint32_ti =200; i <=211; ++i) { trx.context_free_actions.clear(); trx.context_free_data.clear(); cfa.payload = i; cfa.cfd_idx =1;actioncfa_act({}, cfa); trx.context_free_actions.emplace_back(cfa_act); trx.signatures.clear(); set_transaction_headers(trx); sigs = trx.sign(get_private_key(N(testapi),"active"), control->get_chain_id()); BOOST_CHECK_EXCEPTION(push_transaction(trx), unaccessible_api, [](constfc::exception& e) {returnexpect_assert_message(e,"only context free api's can be used in this context"); } ); }...
这里可以作为context_free_action的一个例子,在test_api.cpp中的合约会调用void test_action::test_cf_action()函数:
// 这个是测试`context_free_action`的action
void test_action::test_cf_action() {
eosio::action act = eosio::get_action( 0, 0 );
cf_action cfa = act.data_as<cf_action>();
if ( cfa.payload == 100 ) {
// verify read of get_context_free_data, also verifies system api access
// 测试在合约中通过 get_context_free_data 获取 context_free_data
int size = get_context_free_data( cfa.cfd_idx, nullptr, 0 );
eosio_assert( size > 0, "size determination failed" );
eosio::bytes cfd( static_cast<size_t>(size) );
size = get_context_free_data( cfa.cfd_idx, &cfd[0], static_cast<size_t>(size) );
eosio_assert(static_cast<size_t>(size) == cfd.size(), "get_context_free_data failed" );
uint32_t v = eosio::unpack<uint32_t>( &cfd[0], cfd.size() );
eosio_assert( v == cfa.payload, "invalid value" );
// 以下是测试一些功能
// verify crypto api access
checksum256 hash;
char test[] = "test";
...
// verify context_free_system_api
eosio_assert( true, "verify eosio_assert can be called" );
// 下面是测试一些在上下文无关action中不能使用的功能
} else if ( cfa.payload == 200 ) {
// attempt to access non context free api, privileged_api
is_privileged(act.name);
eosio_assert( false, "privileged_api should not be allowed" );
} else if ( cfa.payload == 201 ) {
// attempt to access non context free api, producer_api
get_active_producers( nullptr, 0 );
eosio_assert( false, "producer_api should not be allowed" );
...
} else if ( cfa.payload == 211 ) {
send_deferred( N(testapi), N(testapi), "hello", 6 );
eosio_assert( false, "transaction_api should not be allowed" );
}
}
接下来我们来看一看packed_transaction,通过这个类我们可以将trx打包,这样可以最大的节省空间,关于它的功能,会在下面使用的提到。
2. Transaction的接收和转发流程
了解Transaction类定义之后,我们先来看一下trx在EOS系统中的接收和转发流程,确定发起trx的入口, 在EOS中,大部分trx都是由用户所操纵的客户端发向同步节点,再通过同步网络发送给超级节点,超级节点会把trx打包进块,这里我们梳理一下这里的逻辑,
首先,关于客户端提交trx的流程,可以参见 https://eosforce.github.io/Documentation/#/zh-cn/eosforce_client_develop_guild , 我们这里从node的角度看是怎么处理收到的trx的。
对于一个节点,trx可能是其他节点同步过来的,也可能是客户端通过api请求的,我们先看看api:
EOS中通过http_plugin插件响应http请求,这里我们只看处理逻辑,在chain_api_plugin.cpp中注册的这两个:
voidchain_api_plugin::plugin_startup() { ilog("starting chain_api_plugin"); my.reset(newchain_api_plugin_impl(app().get_plugin().chain()));autoro_api = app().get_plugin().get_read_only_api();autorw_api = app().get_plugin().get_read_write_api(); app().get_plugin().add_api({ ... CHAIN_RW_CALL_ASYNC(push_transaction, chain_apis::read_write::push_transaction_results,202), CHAIN_RW_CALL_ASYNC(push_transactions, chain_apis::read_write::push_transactions_results,202) });}
最终实际调用的是这里:
// 调用流程 push_transactions -> push_recurse -> push_transactionvoidread_write::push_transaction(constread_write::push_transaction_params& params, next_function next) {try{autopretty_input =std::make_shared();autoresolver = make_resolver(this, abi_serializer_max_time);try{// 这里在使用 packed_transaction 解包abi_serializer::from_variant(params, *pretty_input, resolver, abi_serializer_max_time); } EOS_RETHROW_EXCEPTIONS(chain::packed_transaction_type_exception,"Invalid packed transaction")// 这里调用 incoming::methods::transaction_async 函数app().get_method()(pretty_input,true, [this, next](constfc::static_variant& result) ->void{ ...// 返回返回值, 略去}); }catch( boost::interprocess::bad_alloc& ) { raise(SIGUSR1); } CATCH_AND_CALL(next);}
注意这里的 persist_until_expired 参数,我们在 EOS源码备忘-Block Produce机制 这篇文档中分析过。 incoming::methods::transaction_async注册的是on_incoming_transaction_async函数:
my->_incoming_transaction_async_provider = app().get_method().register_provider([this](constpacked_transaction_ptr& trx,boolpersist_until_expired, next_function next) ->void{returnmy->on_incoming_transaction_async(trx, persist_until_expired, next ); });
on_incoming_transaction_async如下:
voidon_incoming_transaction_async(constpacked_transaction_ptr& trx,boolpersist_until_expired, next_function next){ chain::controller& chain = app().get_plugin().chain();if(!chain.pending_block_state()) { _pending_incoming_transactions.emplace_back(trx, persist_until_expired, next);return; }autoblock_time = chain.pending_block_state()->header.timestamp.to_time_point();// 返回结果的回调autosend_response = [this, &trx, &next](constfc::static_variant& response) { next(response);if(response.contains()) { _transaction_ack_channel.publish(std::pair(response.get(), trx)); }else{ _transaction_ack_channel.publish(std::pair(nullptr, trx)); } };autoid = trx->id();// 超时时间检查if( fc::time_point(trx->expiration()) < block_time ) { send_response(std::static_pointer_cast(std::make_shared(FC_LOG_MESSAGE(error,"expired transaction ${id}", ("id", id)) )));return; }// 检查是否是已处理过的trxif( chain.is_known_unexpired_transaction(id) ) { send_response(std::static_pointer_cast(std::make_shared(FC_LOG_MESSAGE(error,"duplicate transaction ${id}", ("id", id)) )));return; }// 看看是否超过最大的执行时间了autodeadline = fc::time_point::now() + fc::milliseconds(_max_transaction_time_ms);booldeadline_is_subjective =false;if(_max_transaction_time_ms <0|| (_pending_block_mode == pending_block_mode::producing && block_time < deadline) ) { deadline_is_subjective =true; deadline = block_time; }try{// 这里直接调用`push_transaction`来执行trxautotrace = chain.push_transaction(std::make_shared(*trx), deadline);if(trace->except) {if(failure_is_subjective(*trace->except, deadline_is_subjective)) { _pending_incoming_transactions.emplace_back(trx, persist_until_expired, next); }else{autoe_ptr = trace->except->dynamic_copy_exception(); send_response(e_ptr); } }else{if(persist_until_expired) {// if this trx didnt fail/soft-fail and the persist flag is set, store its ID so that we can// ensure its applied to all future speculative blocks as well._persistent_transactions.insert(transaction_id_with_expiry{trx->id(), trx->expiration()}); } send_response(trace); } }catch(constguard_exception& e ) { app().get_plugin().handle_guard_exception(e); }catch( boost::interprocess::bad_alloc& ) { raise(SIGUSR1); } CATCH_AND_CALL(send_response); }
注意上面的is_known_unexpired_transaction,代码如下:
boolcontroller::is_known_unexpired_transaction(consttransaction_id_type& id)const{returndb().find(id);}
与之对应的是这个函数:
voidtransaction_context::record_transaction(consttransaction_id_type& id, fc::time_point_sec expire ) {try{ control.db().create([&](transaction_object& transaction) { transaction.trx_id = id; transaction.expiration = expire; }); }catch(constboost::interprocess::bad_alloc& ) {throw; }catch( ... ) { EOS_ASSERT(false, tx_duplicate,"duplicate transaction ${id}", ("id", id ) ); } }/// record_transaction
在push_transaction中会调用到,记录trx已经被处理过了。
下面我们来看看send_response这个回调:
autosend_response = [this, &trx, &next](constfc::static_variant& response) { next(response);if(response.contains()) { _transaction_ack_channel.publish(std::pair(response.get(), trx)); }else{ _transaction_ack_channel.publish(std::pair(nullptr, trx)); } };
在执行之后会调用send_response,这里是将结果发送到_transaction_ack_channel中,对于_transaction_ack_channel, 这个实际对应的是下面这个类型:
namespacecompat {namespacechannels {usingtransaction_ack = channel_decl>; } }
在EOS中在net_plugin注册响应这个channel的函数:
my->incoming_transaction_ack_subscription =
app().get_channel<channels::transaction_ack>().subscribe(
boost::bind(&net_plugin_impl::transaction_ack, my.get(), _1));
处理的函数如下:
voidnet_plugin_impl::transaction_ack(conststd::pair& results) { transaction_id_type id = results.second->id();if(results.first) { fc_ilog(logger,"signaled NACK, trx-id = ${id} : ${why}",("id", id)("why", results.first->to_detail_string())); dispatcher->rejected_transaction(id); }else{ fc_ilog(logger,"signaled ACK, trx-id = ${id}",("id", id)); dispatcher->bcast_transaction(*results.second); } }
这里会将运行正常的广播给其他节点,这其中会发送给超级节点打包入块,打包过程可以参见 https://eosforce.github.io/Documentation/#/zh-cn/code/block_produce 。
网友评论