void producer_plugin::plugin_startup() {
......... my->schedule_production_loop(); }
void producer_plugin_impl::schedule_production_loop() {
chain::controller& chain = app().get_plugin<chain_plugin>().chain(); //关掉以前所有的定时器,即取消所有异步等待
std::weak_ptr<producer_plugin_impl> weak_this = shared_from_this(); bool last_block; //实时更新节点所有调度信息
auto result = start_block(last_block);
/* 1. 获取各种调度信息异常,则重新获取数据进行调度;
2. 其它节点正在出块,则进行等待
3. 轮到本节点出块,则进行出块操作;
4. 计算下一个生产者出块的时间,然后再进行系统调度
if (result == start_block_result::failed) {
elog("Failed to start a pending block, will try again later");
_timer.expires_from_now( boost::posix_time::microseconds( config::block_interval_us / )); // we failed to start a block, so try again later?
_timer.async_wait([weak_this,cid=++_timer_corelation_id](const boost::system::error_code& ec) {
auto self = weak_this.lock();
if (self && ec != boost::asio::error::operation_aborted && cid == self->_timer_corelation_id) {
} else if (result == start_block_result::waiting) {
// nothing to do until more blocks arrive } else if (_pending_block_mode == pending_block_mode::producing) { // we succeeded but block may be exhausted
if (result == start_block_result::succeeded) {
// ship this block off no later than its deadline
static const boost::posix_time::ptime epoch(boost::gregorian::date(, , ));
_timer.expires_at(epoch + boost::posix_time::microseconds(chain.pending_block_time().time_since_epoch().count() + (last_block ? _last_block_time_offset_us : _produce_time_offset_us)));
fc_dlog(_log, "Scheduling Block Production on Normal Block #${num} for ${time}", ("num", chain.pending_block_state()->block_num)("time",chain.pending_block_time()));
} else {
// ship this block off immediately
_timer.expires_from_now( boost::posix_time::microseconds( ));
fc_dlog(_log, "Scheduling Block Production on Exhausted Block #${num} immediately", ("num", chain.pending_block_state()->block_num));
} _timer.async_wait([&chain,weak_this,cid=++_timer_corelation_id](const boost::system::error_code& ec) {
auto self = weak_this.lock();
if (self && ec != boost::asio::error::operation_aborted && cid == self->_timer_corelation_id) {
auto res = self->maybe_produce_block();
fc_dlog(_log, "Producing Block #${num} returned: ${res}", ("num", chain.pending_block_state()->block_num)("res", res) );
} else if (_pending_block_mode == pending_block_mode::speculating && !_producers.empty() && !production_disabled_by_policy()){
// if we have any producers then we should at least set a timer for our next available slot
optional<fc::time_point> wake_up_time;
for (const auto&p: _producers) {
auto next_producer_block_time = calculate_next_block_time(p);
if (next_producer_block_time) {
auto producer_wake_up_time = *next_producer_block_time - fc::microseconds(config::block_interval_us);
if (wake_up_time) {
// wake up with a full block interval to the deadline
wake_up_time = std::min<fc::time_point>(*wake_up_time, producer_wake_up_time);
} else {
wake_up_time = producer_wake_up_time;
} if (wake_up_time) {
fc_dlog(_log, "Specualtive Block Created; Scheduling Speculative/Production Change at ${time}", ("time", wake_up_time));
static const boost::posix_time::ptime epoch(boost::gregorian::date(, , ));
_timer.expires_at(epoch + boost::posix_time::microseconds(wake_up_time->time_since_epoch().count()));
_timer.async_wait([weak_this,cid=++_timer_corelation_id](const boost::system::error_code& ec) {
auto self = weak_this.lock();
if (self && ec != boost::asio::error::operation_aborted && cid == self->_timer_corelation_id) {
} else {
fc_dlog(_log, "Speculative Block Created; Not Scheduling Speculative/Production, no local producers had valid wake up times");
} else {
fc_dlog(_log, "Speculative Block Created");
出块的原则是:21个超级节点轮流出块,每个节点连接出块12个,一个块是0.5s,所以在不出现异常情况下一个流程为:21*12*0.5=126s,在这个流程中如果有其它节点被投票选举成为超级节点后是不会马上出块的,必须等到一个流程结束后再会把第21个超级节点挤下去进行出块(注:投票是实时的,只不过计数数是缓存在database里面的,超级节点出块的整个流程完成后会重新从database里面读取数据确定由哪些节点出块,开始下一个出块流程。还有就是如果在这个流程中有一个节点由于某种原因导致出块中断,那么在本次该 节点出块的时间段内是没有办法出块的,这里便会形成一个时间段没有块产生的。
producer_plugin_impl::start_block_result producer_plugin_impl::start_block(bool &last_block) {
chain::controller& chain = app().get_plugin<chain_plugin>().chain(); //获取本节点最高区块高度的区块状态信息
const auto& hbs = chain.head_block_state(); //Schedule for the next second's tick regardless of chain state
// If we would wait less than 50ms (1/10 of block_interval), wait for the whole block interval.
fc::time_point now = fc::time_point::now();
fc::time_point base = std::max<fc::time_point>(now, chain.head_block_time());
int64_t min_time_to_next_block = (config::block_interval_us) - (base.time_since_epoch().count() % (config::block_interval_us) );
fc::time_point block_time = base + fc::microseconds(min_time_to_next_block); if((block_time - now) < fc::microseconds(config::block_interval_us/) ) { // we must sleep for at least 50ms
// ilog("Less than ${t}us to next block time, time_to_next_block_time ${bt}",
// ("t", config::block_interval_us/10)("bt", block_time));
block_time += fc::microseconds(config::block_interval_us);
} _pending_block_mode = pending_block_mode::producing; // Not our turn
last_block = ((block_timestamp_type(block_time).slot % config::producer_repetitions) == config::producer_repetitions - ); //计算下一个区块是由哪个节点出块
const auto& scheduled_producer = hbs->get_scheduled_producer(block_time);
auto currrent_watermark_itr = _producer_watermarks.find(scheduled_producer.producer_name);
auto signature_provider_itr = _signature_providers.find(scheduled_producer.block_signing_key);
auto irreversible_block_age = get_irreversible_block_age(); // If the next block production opportunity is in the present or future, we're synced. //本节点计算出来的下一个区块的时间必须大于当前本节点收到的最大区块的时间,也即如果本节点生产区块,那区块的时间点肯定比现在最新的区块的时间要大
if( !_production_enabled ) {
_pending_block_mode = pending_block_mode::speculating;
} else if( _producers.find(scheduled_producer.producer_name) == _producers.end()) {
_pending_block_mode = pending_block_mode::speculating;
} else if (signature_provider_itr == _signature_providers.end()) {
elog("Not producing block because I don't have the private key for ${scheduled_key}", ("scheduled_key", scheduled_producer.block_signing_key));
_pending_block_mode = pending_block_mode::speculating;
} else if ( _pause_production ) {
elog("Not producing block because production is explicitly paused");
_pending_block_mode = pending_block_mode::speculating;
} else if ( _max_irreversible_block_age_us.count() >= && irreversible_block_age >= _max_irreversible_block_age_us ) {
elog("Not producing block because the irreversible block is too old [age:${age}s, max:${max}s]", ("age", irreversible_block_age.count() / '')( "max", _max_irreversible_block_age_us.count() / '' ));
_pending_block_mode = pending_block_mode::speculating;
} if (_pending_block_mode == pending_block_mode::producing) {
// determine if our watermark excludes us from producing at this point
if (currrent_watermark_itr != _producer_watermarks.end()) {
if (currrent_watermark_itr->second >= hbs->block_num + ) {
elog("Not producing block because \"${producer}\" signed a BFT confirmation OR block at a higher block number (${watermark}) than the current fork's head (${head_block_num})",
("producer", scheduled_producer.producer_name)
("watermark", currrent_watermark_itr->second)
("head_block_num", hbs->block_num));
_pending_block_mode = pending_block_mode::speculating;
} if (_pending_block_mode == pending_block_mode::speculating) {
auto head_block_age = now - chain.head_block_time();
if (head_block_age > fc::seconds())
return start_block_result::waiting;
} try {
uint16_t blocks_to_confirm = ; if (_pending_block_mode == pending_block_mode::producing) {
// determine how many blocks this producer can confirm
// 1) if it is not a producer from this node, assume no confirmations (we will discard this block anyway)
// 2) if it is a producer on this node that has never produced, the conservative approach is to assume no
// confirmations to make sure we don't double sign after a crash TODO: make these watermarks durable?
// 3) if it is a producer on this node where this node knows the last block it produced, safely set it -UNLESS-
// 4) the producer on this node's last watermark is higher (meaning on a different fork)
if (currrent_watermark_itr != _producer_watermarks.end()) {
auto watermark = currrent_watermark_itr->second;
if (watermark < hbs->block_num) {
blocks_to_confirm = std::min<uint16_t>(std::numeric_limits<uint16_t>::max(), (uint16_t)(hbs->block_num - watermark));
} chain.abort_block();
chain.start_block(block_time, blocks_to_confirm);
} FC_LOG_AND_DROP(); const auto& pbs = chain.pending_block_state();
if (pbs) { if (_pending_block_mode == pending_block_mode::producing && pbs->block_signing_key != scheduled_producer.block_signing_key) {
elog("Block Signing Key is not expected value, reverting to speculative mode! [expected: \"${expected}\", actual: \"${actual\"", ("expected", scheduled_producer.block_signing_key)("actual", pbs->block_signing_key));
_pending_block_mode = pending_block_mode::speculating;
} // attempt to play persisted transactions first
bool exhausted = false;
auto unapplied_trxs = chain.get_unapplied_transactions(); // remove all persisted transactions that have now expired
auto& persisted_by_id = _persistent_transactions.get<by_id>();
auto& persisted_by_expiry = _persistent_transactions.get<by_expiry>();
while(!persisted_by_expiry.empty() && persisted_by_expiry.begin()->expiry <= pbs->header.timestamp.to_time_point()) {
} try {
for (auto itr = unapplied_trxs.begin(); itr != unapplied_trxs.end(); ++itr) {
const auto& trx = *itr;
if (persisted_by_id.find(trx->id) != persisted_by_id.end()) {
// this is a persisted transaction, push it into the block (even if we are speculating) with
// no deadline as it has already passed the subjective deadlines once and we want to represent
// the state of the chain including this transaction
try {
chain.push_transaction(trx, fc::time_point::maximum());
} catch ( const guard_exception& e ) {
return start_block_result::failed;
} FC_LOG_AND_DROP(); // remove it from further consideration as it is applied
*itr = nullptr;
} if (_pending_block_mode == pending_block_mode::producing) {
for (const auto& trx : unapplied_trxs) {
if (exhausted) {
} if (!trx) {
// nulled in the loop above, skip it
} if (trx->packed_trx.expiration() < pbs->header.timestamp.to_time_point()) {
// expired, drop it
} try {
auto deadline = fc::time_point::now() + fc::milliseconds(_max_transaction_time_ms);
bool deadline_is_subjective = false;
if (_max_transaction_time_ms < || (_pending_block_mode == pending_block_mode::producing && block_time < deadline)) {
deadline_is_subjective = true;
deadline = block_time;
} auto trace = chain.push_transaction(trx, deadline);
if (trace->except) {
if (failure_is_subjective(*trace->except, deadline_is_subjective)) {
exhausted = true;
} else {
// this failed our configured maximum transaction time, we don't want to replay it
} catch ( const guard_exception& e ) {
return start_block_result::failed;
} auto& blacklist_by_id = _blacklisted_transactions.get<by_id>();
auto& blacklist_by_expiry = _blacklisted_transactions.get<by_expiry>();
auto now = fc::time_point::now();
while (!blacklist_by_expiry.empty() && blacklist_by_expiry.begin()->expiry <= now) {
} auto scheduled_trxs = chain.get_scheduled_transactions();
for (const auto& trx : scheduled_trxs) {
if (exhausted) {
} if (blacklist_by_id.find(trx) != blacklist_by_id.end()) {
} try {
auto deadline = fc::time_point::now() + fc::milliseconds(_max_transaction_time_ms);
bool deadline_is_subjective = false;
if (_max_transaction_time_ms < || (_pending_block_mode == pending_block_mode::producing && block_time < deadline)) {
deadline_is_subjective = true;
deadline = block_time;
} auto trace = chain.push_scheduled_transaction(trx, deadline);
if (trace->except) {
if (failure_is_subjective(*trace->except, deadline_is_subjective)) {
exhausted = true;
} else {
auto expiration = fc::time_point::now() + fc::seconds(chain.get_global_properties().configuration.deferred_trx_expiration_window);
// this failed our configured maximum transaction time, we don't want to replay it add it to a blacklist
_blacklisted_transactions.insert(transaction_id_with_expiry{trx, expiration});
} catch ( const guard_exception& e ) {
return start_block_result::failed;
} if (exhausted) {
return start_block_result::exhausted;
} else {
// attempt to apply any pending incoming transactions
if (!_pending_incoming_transactions.empty()) {
auto old_pending = std::move(_pending_incoming_transactions);
for (auto& e: old_pending) {
on_incoming_transaction_async(std::get<>(e), std::get<>(e), std::get<>(e));
} return start_block_result::succeeded;
} } catch ( boost::interprocess::bad_alloc& ) {
return start_block_result::failed;
} } return start_block_result::failed;
void net_plugin_impl::handle_message( connection_ptr c, const signed_block &msg)
这一段函数在<<EOS多节点同步代码分析>>一文有解析过,这里不再做分析,通过这里的调用和验证后会来到producer_plugin.cpp文件中的on_incoming_block 这个函数,下面才是真正的区块数据处理:
void on_incoming_block(const signed_block_ptr& block) {
fc_dlog(_log, "received incoming block ${id}", ("id", block->id())); EOS_ASSERT( block->timestamp < (fc::time_point::now() + fc::seconds()), block_from_the_future, "received a block from the future, ignoring it" ); chain::controller& chain = app().get_plugin<chain_plugin>().chain(); /* de-dupe here... no point in aborting block if we already know the block */
auto id = block->id();
auto existing = chain.fetch_block_by_id( id );
if( existing ) { return; } // abort the pending block
chain.abort_block(); // exceptions throw out, make sure we restart our loop
auto ensure = fc::make_scoped_exit([this](){
}); // push the new block
bool except = false;
try {
} catch ( const guard_exception& e ) {
} catch( const fc::exception& e ) {
except = true;
} catch ( boost::interprocess::bad_alloc& ) {
} if( except ) {
app().get_channel<channels::rejected_block>().publish( block );
if( chain.head_block_state()->header.timestamp.next().to_time_point() >= fc::time_point::now() ) {
_production_enabled = true;
} if( fc::time_point::now() - block->timestamp < fc::minutes() || (block->block_num() % == ) ) {
ilog("Received block ${id}... #${n} @ ${t} signed by ${p} [trxs: ${count}, lib: ${lib}, conf: ${confs}, latency: ${latency} ms]",
("count",block->transactions.size())("lib",chain.last_irreversible_block_num())("confs", block->confirmed)("latency", (fc::time_point::now() - block->timestamp).count()/ ) );
void push_block( const signed_block_ptr& b, controller::block_status s ) {
// idump((fc::json::to_pretty_string(*b)));
EOS_ASSERT(!pending, block_validate_exception, "it is not valid to push a block when there is a pending block");
try {
EOS_ASSERT( b, block_validate_exception, "trying to push empty block" );
EOS_ASSERT( s != controller::block_status::incomplete, block_validate_exception, "invalid block status for a completed block" );
emit( self.pre_accepted_block, b );
bool trust = !conf.force_all_checks && (s == controller::block_status::irreversible || s == controller::block_status::validated); //更新fork_db相关的数据
auto new_header_state = fork_db.add( b, trust );
emit( self.accepted_block_header, new_header_state );
// on replay irreversible is not emitted by fork database, so emit it explicitly here
if( s == controller::block_status::irreversible )
emit( self.irreversible_block, new_header_state );
if ( read_mode != db_read_mode::IRREVERSIBLE ) {
maybe_switch_forks( s );
block_state_ptr fork_database::add( signed_block_ptr b, bool trust ) {
EOS_ASSERT( b, fork_database_exception, "attempt to add null block" );
EOS_ASSERT( my->head, fork_db_block_not_found, "no head block set" );
const auto& by_id_idx = my->index.get<by_block_id>();
auto existing = by_id_idx.find( b->id() );
EOS_ASSERT( existing == by_id_idx.end(), fork_database_exception, "we already know about this block" );
auto prior = by_id_idx.find( b->previous );
EOS_ASSERT( prior != by_id_idx.end(), unlinkable_block_exception, "unlinkable block", ("id", string(b->id()))("previous", string(b->previous)) );
auto result = std::make_shared<block_state>( **prior, move(b), trust );
EOS_ASSERT( result, fork_database_exception , "fail to add new block state" );
return add(result);
void maybe_switch_forks( controller::block_status s = controller::block_status::complete ) {
auto new_head = fork_db.head();
// new_head这里是fork_db中我们上面存储的最新的block_header_state,而head是controller中存储的旧的block_header_state(非正常情况下head不一定比new_head旧,自行理解...),所以正常情况会进第一个分支,非正常情况就是我们常说的链出现分叉了
if( new_head->header.previous == head->id ) {
try {
apply_block( new_head->block, s );
fork_db.mark_in_current_chain( new_head, true );
fork_db.set_validity( new_head, true );
head = new_head;
} catch ( const fc::exception& e ) {
fork_db.set_validity( new_head, false ); // Removes new_head from fork_db index, so no need to mark it as not in the current chain.
} //链出现分叉时的处理
} else if( new_head->id != head->id ) {
ilog("switching forks from ${current_head_id} (block number ${current_head_num}) to ${new_head_id} (block number ${new_head_num})",
("current_head_id", head->id)("current_head_num", head->block_num)("new_head_id", new_head->id)("new_head_num", new_head->block_num) );
auto branches = fork_db.fetch_branch_from( new_head->id, head->id ); for( auto itr = branches.second.begin(); itr != branches.second.end(); ++itr ) {
fork_db.mark_in_current_chain( *itr , false );
EOS_ASSERT( self.head_block_id() == branches.second.back()->header.previous, fork_database_exception,
"loss of sync between fork_db and chainbase during fork switch" ); // _should_ never fail for( auto ritr = branches.first.rbegin(); ritr != branches.first.rend(); ++ritr) {
optional<fc::exception> except;
try {
apply_block( (*ritr)->block, (*ritr)->validated ? controller::block_status::validated : controller::block_status::complete );
head = *ritr;
fork_db.mark_in_current_chain( *ritr, true );
(*ritr)->validated = true;
catch (const fc::exception& e) { except = e; }
if (except) {
elog("exception thrown while switching forks ${e}", ("e",except->to_detail_string())); // ritr currently points to the block that threw
// if we mark it invalid it will automatically remove all forks built off it.
fork_db.set_validity( *ritr, false ); // pop all blocks from the bad fork
// ritr base is a forward itr to the last block successfully applied
auto applied_itr = ritr.base();
for( auto itr = applied_itr; itr != branches.first.end(); ++itr ) {
fork_db.mark_in_current_chain( *itr , false );
EOS_ASSERT( self.head_block_id() == branches.second.back()->header.previous, fork_database_exception,
"loss of sync between fork_db and chainbase during fork switch reversal" ); // _should_ never fail // re-apply good blocks
for( auto ritr = branches.second.rbegin(); ritr != branches.second.rend(); ++ritr ) {
apply_block( (*ritr)->block, controller::block_status::validated /* we previously validated these blocks*/ );
head = *ritr;
fork_db.mark_in_current_chain( *ritr, true );
throw *except;
} // end if exception
} /// end for each block in branch
ilog("successfully switched fork to new head ${new_head_id}", ("new_head_id", new_head->id));
} /// push_block
void apply_block( const signed_block_ptr& b, controller::block_status s ) { try {
try {
EOS_ASSERT( b->block_extensions.size() == , block_validate_exception, "no supported extensions" );
start_block( b->timestamp, b->confirmed, s ); transaction_trace_ptr trace;
for( const auto& receipt : b->transactions ) {
auto num_pending_receipts = pending->_pending_block_state->block->transactions.size();
if( receipt.trx.contains<packed_transaction>() ) {
auto& pt = receipt.trx.get<packed_transaction>();
auto mtrx = std::make_shared<transaction_metadata>(pt);
trace = push_transaction( mtrx, fc::time_point::maximum(), false, receipt.cpu_usage_us);
} else if( receipt.trx.contains<transaction_id_type>() ) {
trace = push_scheduled_transaction( receipt.trx.get<transaction_id_type>(), fc::time_point::maximum(), receipt.cpu_usage_us );
} else {
EOS_ASSERT( false, block_validate_exception, "encountered unexpected receipt type" );
} bool transaction_failed = trace && trace->except;
bool transaction_can_fail = receipt.status == transaction_receipt_header::hard_fail && receipt.trx.contains<transaction_id_type>();
if( transaction_failed && !transaction_can_fail) {
throw *trace->except;
} EOS_ASSERT( pending->_pending_block_state->block->transactions.size() > ,
block_validate_exception, "expected a receipt",
("block", *b)("expected_receipt", receipt)
EOS_ASSERT( pending->_pending_block_state->block->transactions.size() == num_pending_receipts + ,
block_validate_exception, "expected receipt was not added",
("block", *b)("expected_receipt", receipt)
const transaction_receipt_header& r = pending->_pending_block_state->block->transactions.back();
EOS_ASSERT( r == static_cast<const transaction_receipt_header&>(receipt),
block_validate_exception, "receipt does not match",
("producer_receipt", receipt)("validator_receipt", pending->_pending_block_state->block->transactions.back()) );
} finalize_block(); //对生产者的区块数据进行签名校验,检查数据是否被篡改过
sign_block( [&]( const auto& ){ return b->producer_signature; }, false ); //trust ); // this is implied by the signature passing
//FC_ASSERT( b->id() == pending->_pending_block_state->block->id(),
// "applying block didn't produce expected block id" );
} catch ( const fc::exception& e ) {
} FC_CAPTURE_AND_RETHROW() } /// apply_block
void start_block( block_timestamp_type when, uint16_t confirm_block_count, controller::block_status s ) {
EOS_ASSERT( !pending, block_validate_exception, "pending block is not available" ); EOS_ASSERT( db.revision() == head->block_num, database_exception, "db revision is not on par with head block",
("db.revision()", db.revision())("controller_head_block", head->block_num)("fork_db_head_block", fork_db.head()->block_num) ); auto guard_pending = fc::make_scoped_exit([this](){
pending = db.start_undo_session(true); pending->_block_status = s;
pending->_pending_block_state = std::make_shared<block_state>( *head, when ); // promotes pending schedule (if any) to active
pending->_pending_block_state->in_current_chain = true; //这里是传说中的dpos共识算法
pending->_pending_block_state->set_confirmed(confirm_block_count); auto was_pending_promoted = pending->_pending_block_state->maybe_promote_pending(); //modify state in speculative block only if we are speculative reads mode (other wise we need clean state for head or irreversible reads)
if ( read_mode == db_read_mode::SPECULATIVE || pending->_block_status != controller::block_status::incomplete ) { const auto& gpo = db.get<global_property_object>();
if( gpo.proposed_schedule_block_num.valid() && // if there is a proposed schedule that was proposed in a block ...
( *gpo.proposed_schedule_block_num <= pending->_pending_block_state->dpos_irreversible_blocknum ) && // ... that has now become irreversible ...
pending->_pending_block_state->pending_schedule.producers.size() == && // ... and there is room for a new pending schedule ...
!was_pending_promoted // ... and not just because it was promoted to active at the start of this block, then:
// Promote proposed schedule to pending schedule.
if( !replaying ) {
ilog( "promoting proposed schedule (set in block ${proposed_num}) to pending; current block: ${n} lib: ${lib} schedule: ${schedule} ",
("proposed_num", *gpo.proposed_schedule_block_num)("n", pending->_pending_block_state->block_num)
("lib", pending->_pending_block_state->dpos_irreversible_blocknum)
("schedule", static_cast<producer_schedule_type>(gpo.proposed_schedule) ) );
pending->_pending_block_state->set_new_producers( gpo.proposed_schedule );
db.modify( gpo, [&]( auto& gp ) {
gp.proposed_schedule_block_num = optional<block_num_type>();
} try {
auto onbtrx = std::make_shared<transaction_metadata>( get_on_block_transaction() );
auto reset_in_trx_requiring_checks = fc::make_scoped_exit([old_value=in_trx_requiring_checks,this](){
in_trx_requiring_checks = old_value;
in_trx_requiring_checks = true; //这里调用push_transaction,会发送到本节点对区块中的交易地
1. 对本节点各种白名单和黑名单的校验(要设置黑白名单,请注意eos中大量使用的系统用户)
2. 对交易的接收者增加通知消息
3. 用户net cpu ram资源的计算和更新
4. 对权限的验证
push_transaction( onbtrx, fc::time_point::maximum(), true, self.get_global_properties().configuration.min_transaction_cpu_usage ); } catch( const boost::interprocess::bad_alloc& e ) { elog( "on block transaction failed due to a bad allocation" ); throw; } catch( const fc::exception& e ) { wlog( "on block transaction failed, but shouldn't impact block generation, system contract needs update" ); edump((e.to_detail_string())); } catch( ... ) { } clear_expired_input_transactions(); //这里更新系统用户eosio.prods的权限,它的权限阈值是随生产节点个数变化的
} guard_pending.cancel();
} // start_block
transaction_trace_ptr push_transaction( const transaction_metadata_ptr& trx,
fc::time_point deadline,
bool implicit,
uint32_t billed_cpu_time_us)
EOS_ASSERT(deadline != fc::time_point(), transaction_exception, "deadline cannot be uninitialized"); transaction_trace_ptr trace;
try {
transaction_context trx_context(self, trx->trx, trx->id);
if ((bool)subjective_cpu_leeway && pending->_block_status == controller::block_status::incomplete) {
trx_context.leeway = *subjective_cpu_leeway;
trx_context.deadline = deadline;
trx_context.billed_cpu_time_us = billed_cpu_time_us;
trace = trx_context.trace;
try {
if( implicit ) {
trx_context.can_subjectively_fail = false;
} else { //对trx的数据进行校验,
trx_context.init_for_input_trx( trx->packed_trx.get_unprunable_size(),
} if( trx_context.can_subjectively_fail && pending->_block_status == controller::block_status::incomplete ) {
check_actor_list( trx_context.bill_to_accounts ); // Assumes bill_to_accounts is the set of actors authorizing the transaction
} trx_context.delay = fc::seconds(trx->trx.delay_sec); if( !self.skip_auth_check() && !implicit ) {
trx->recover_keys( chain_id ),
/*std::bind(&transaction_context::add_cpu_usage_and_check_time, &trx_context,
} /*对区块中所有交易的action的处理包括如下但不限于:
1. 对本节点各种白名单和黑名单的校验(要设置黑白名单,请注意eos中大量使用的系统用户)
2. 对交易的接收者增加通知消息
3. 用户net cpu ram资源的计算和更新
trx_context.finalize(); // Automatically rounds up network and CPU usage in trace and bills payers if successful auto restore = make_block_restore_point(); if (!implicit) {
transaction_receipt::status_enum s = (trx_context.delay == fc::seconds())
? transaction_receipt::executed
: transaction_receipt::delayed;
trace->receipt = push_receipt(trx->packed_trx, s, trx_context.billed_cpu_time_us, trace->net_usage);
} else {
transaction_receipt_header r;
r.status = transaction_receipt::executed;
r.cpu_usage_us = trx_context.billed_cpu_time_us;
r.net_usage_words = trace->net_usage / ;
trace->receipt = r;
} fc::move_append(pending->_actions, move(trx_context.executed)); // call the accept signal but only once for this transaction
if (!trx->accepted) {
emit( self.accepted_transaction, trx);
trx->accepted = true;
} emit(self.applied_transaction, trace); if ( read_mode != db_read_mode::SPECULATIVE && pending->_block_status == controller::block_status::incomplete ) {
//this may happen automatically in destructor, but I prefere make it more explicit
} else {
} if (!implicit) {
unapplied_transactions.erase( trx->signed_id );
return trace;
} catch (const fc::exception& e) {
trace->except = e;
trace->except_ptr = std::current_exception();
} if (!failure_is_subjective(*trace->except)) {
unapplied_transactions.erase( trx->signed_id );
} return trace;
} /// push_transaction
到最后在apply_block()中调用 的commit_block(false)函数,如下
void on_block( const block_state_ptr& bsp ) {
if( bsp->header.timestamp <= _last_signed_block_time ) return;
if( bsp->header.timestamp <= _start_time ) return;
if( bsp->block_num <= _last_signed_block_num ) return; const auto& active_producer_to_signing_key = bsp->active_schedule.producers; flat_set<account_name> active_producers;
for (const auto& p: bsp->active_schedule.producers) {
} std::set_intersection( _producers.begin(), _producers.end(),
active_producers.begin(), active_producers.end(),
boost::make_function_output_iterator( [&]( const chain::account_name& producer )
if( producer != bsp->header.producer ) {
auto itr = std::find_if( active_producer_to_signing_key.begin(), active_producer_to_signing_key.end(),
[&](const producer_key& k){ return k.producer_name == producer; } );
if( itr != active_producer_to_signing_key.end() ) {
auto private_key_itr = _signature_providers.find( itr->block_signing_key );
if( private_key_itr != _signature_providers.end() ) {
auto d = bsp->sig_digest();
auto sig = private_key_itr->second( d );
_last_signed_block_time = bsp->header.timestamp;
_last_signed_block_num = bsp->block_num; // ilog( "${n} confirmed", ("n",name(producer)) );
_self->confirmed_block( { bsp->id, d, producer, sig } );
} ) ); // since the watermark has to be set before a block is created, we are looking into the future to
// determine the new schedule to identify producers that have become active
chain::controller& chain = app().get_plugin<chain_plugin>().chain();
const auto hbn = bsp->block_num;
auto new_block_header = bsp->header;
new_block_header.timestamp = new_block_header.timestamp.next();
new_block_header.previous = bsp->id;
auto new_bs = bsp->generate_next(new_block_header.timestamp); // for newly installed producers we can set their watermarks to the block they became active
if (new_bs.maybe_promote_pending() && bsp->active_schedule.version != new_bs.active_schedule.version) {
flat_set<account_name> new_producers;
for( const auto& p: new_bs.active_schedule.producers) {
if (_producers.count(p.producer_name) > )
} for( const auto& p: bsp->active_schedule.producers) {
} for (const auto& new_producer: new_producers) {
_producer_watermarks[new_producer] = hbn;