Merge pull request #163 from ethcore/revert_of_revert

Revert "revert "relay canonized blocks""
This commit is contained in:
Svyatoslav Nikolsky 2016-11-21 20:02:59 +03:00 committed by GitHub
commit a675ab992b
3 changed files with 95 additions and 11 deletions

View File

@ -25,10 +25,22 @@ const NUMBER_OF_QUEUES: usize = 3;
/// Block insertion result
#[derive(Debug, Default, PartialEq)]
pub struct BlockInsertionResult {
/// Transaction to 'reverify'
/// Hashes of blocks, which were canonized during this insertion procedure. Order matters
pub canonized_blocks_hashes: Vec<H256>,
/// Transaction to 'reverify'. Order matters
pub transactions_to_reverify: Vec<(H256, Transaction)>,
}
impl BlockInsertionResult {
#[cfg(test)]
pub fn with_canonized_blocks(canonized_blocks_hashes: Vec<H256>) -> Self {
BlockInsertionResult {
canonized_blocks_hashes: canonized_blocks_hashes,
transactions_to_reverify: Vec::new(),
}
}
}
/// Block synchronization state
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum BlockState {
@ -323,6 +335,7 @@ impl Chain {
// no transactions to reverify, because we have just appended new transactions to the blockchain
Ok(BlockInsertionResult {
canonized_blocks_hashes: vec![hash],
transactions_to_reverify: Vec::new(),
})
}
@ -340,15 +353,18 @@ impl Chain {
// + all transactions from previous blocks of this fork were accepted
// => delete accepted transactions from verification queue and from the memory pool
let this_block_transactions_hashes = block.transactions.iter().map(|tx| tx.hash());
let mut canonized_blocks_hashes: Vec<H256> = Vec::new();
let mut new_main_blocks_transactions_hashes: Vec<H256> = Vec::new();
while let Some(canonized_block_hash) = reorganization.pop_canonized() {
let canonized_transactions_hashes = self.storage.block_transaction_hashes(db::BlockRef::Hash(canonized_block_hash));
let canonized_transactions_hashes = self.storage.block_transaction_hashes(db::BlockRef::Hash(canonized_block_hash.clone()));
new_main_blocks_transactions_hashes.extend(canonized_transactions_hashes);
canonized_blocks_hashes.push(canonized_block_hash);
}
for transaction_accepted in this_block_transactions_hashes.chain(new_main_blocks_transactions_hashes.into_iter()) {
self.memory_pool.remove_by_hash(&transaction_accepted);
self.verifying_transactions.remove(&transaction_accepted);
}
canonized_blocks_hashes.reverse();
// reverify all transactions from old main branch' blocks
let mut old_main_blocks_transactions_hashes: Vec<H256> = Vec::new();
@ -378,6 +394,7 @@ impl Chain {
self.verifying_transactions.clear();
Ok(BlockInsertionResult {
canonized_blocks_hashes: canonized_blocks_hashes,
// order matters: db transactions, then ordered mempool transactions, then ordered verifying transactions
transactions_to_reverify: old_main_blocks_transactions.into_iter()
.chain(memory_pool_transactions.into_iter())
@ -1085,24 +1102,25 @@ mod tests {
chain.insert_verified_transaction(tx4);
chain.insert_verified_transaction(tx5);
assert_eq!(chain.insert_best_block(b0.hash(), &b0).expect("block accepted"), BlockInsertionResult::default());
assert_eq!(chain.insert_best_block(b0.hash(), &b0).expect("block accepted"), BlockInsertionResult::with_canonized_blocks(vec![b0.hash()]));
assert_eq!(chain.information().transactions.transactions_count, 3);
assert_eq!(chain.insert_best_block(b1.hash(), &b1).expect("block accepted"), BlockInsertionResult::default());
assert_eq!(chain.insert_best_block(b1.hash(), &b1).expect("block accepted"), BlockInsertionResult::with_canonized_blocks(vec![b1.hash()]));
assert_eq!(chain.information().transactions.transactions_count, 3);
assert_eq!(chain.insert_best_block(b2.hash(), &b2).expect("block accepted"), BlockInsertionResult::default());
assert_eq!(chain.insert_best_block(b2.hash(), &b2).expect("block accepted"), BlockInsertionResult::with_canonized_blocks(vec![b2.hash()]));
assert_eq!(chain.information().transactions.transactions_count, 3);
assert_eq!(chain.insert_best_block(b3.hash(), &b3).expect("block accepted"), BlockInsertionResult::default());
assert_eq!(chain.information().transactions.transactions_count, 3);
assert_eq!(chain.insert_best_block(b4.hash(), &b4).expect("block accepted"), BlockInsertionResult::default());
assert_eq!(chain.information().transactions.transactions_count, 3);
// order matters
let transactions_to_reverify_hashes: Vec<_> = chain.insert_best_block(b5.hash(), &b5)
.expect("block accepted")
let insert_result = chain.insert_best_block(b5.hash(), &b5).expect("block accepted");
let transactions_to_reverify_hashes: Vec<_> = insert_result
.transactions_to_reverify
.into_iter()
.map(|(h, _)| h)
.collect();
assert_eq!(transactions_to_reverify_hashes, vec![tx1_hash, tx2_hash]);
assert_eq!(insert_result.canonized_blocks_hashes, vec![b3.hash(), b4.hash(), b5.hash()]);
assert_eq!(chain.information().transactions.transactions_count, 0); // tx3, tx4, tx5 are added to the database
}
}

View File

@ -629,8 +629,10 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
self.execute_synchronization_tasks(None);
// relay block to our peers
if self.state.is_saturated() {
// TODO: Task::BroadcastBlock
if self.state.is_saturated() || self.state.is_nearly_saturated() {
// TODO: remember peer' last N blocks and send only if peer has no canonized blocks
// TODO: send `headers` if peer has not send `sendheaders` command
self.executor.lock().execute(Task::BroadcastBlocksHashes(insert_result.canonized_blocks_hashes));
}
// deal with block transactions
@ -1243,14 +1245,15 @@ pub mod tests {
sync.on_new_blocks_headers(1, vec![block.block_header.clone()]);
sync.on_new_blocks_headers(2, vec![block.block_header.clone()]);
executor.lock().take_tasks();
sync.on_peer_block(2, block);
sync.on_peer_block(2, block.clone());
let tasks = executor.lock().take_tasks();
assert_eq!(tasks.len(), 4);
assert_eq!(tasks.len(), 5);
assert!(tasks.iter().any(|t| t == &Task::RequestBlocksHeaders(1)));
assert!(tasks.iter().any(|t| t == &Task::RequestBlocksHeaders(2)));
assert!(tasks.iter().any(|t| t == &Task::RequestMemoryPool(1)));
assert!(tasks.iter().any(|t| t == &Task::RequestMemoryPool(2)));
assert!(tasks.iter().any(|t| t == &Task::BroadcastBlocksHashes(vec![block.hash()])));
}
#[test]
@ -1764,4 +1767,51 @@ pub mod tests {
sync.on_new_blocks_headers(2, vec![b10.block_header.clone(), b21.block_header.clone(),
b22.block_header.clone(), b23.block_header.clone()]);
}
#[test]
fn relay_new_block_when_in_saturated_state() {
let (_, _, executor, _, sync) = create_sync(None, None);
let genesis = test_data::genesis();
let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build();
let b1 = test_data::block_builder().header().parent(b0.hash()).build().build();
let b2 = test_data::block_builder().header().parent(b1.hash()).build().build();
let b3 = test_data::block_builder().header().parent(b2.hash()).build().build();
let mut sync = sync.lock();
sync.on_new_blocks_headers(1, vec![b0.block_header.clone(), b1.block_header.clone()]);
sync.on_peer_block(1, b0.clone());
sync.on_peer_block(1, b1.clone());
// we were in synchronization state => block is not relayed
{
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1),
Task::RequestBlocks(1, vec![b0.hash(), b1.hash()]),
Task::RequestBlocksHeaders(1),
Task::RequestMemoryPool(1)
]);
}
sync.on_peer_block(1, b2.clone());
// we were in saturated state => block is relayed
{
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), Task::BroadcastBlocksHashes(vec![b2.hash()])]);
}
sync.on_new_blocks_headers(1, vec![b3.block_header.clone()]);
sync.on_peer_block(1, b3.clone());
// we were in nearly saturated state => block is relayed
{
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1),
Task::RequestBlocks(1, vec![b3.hash()]),
Task::BroadcastBlocksHashes(vec![b3.hash()]),
Task::RequestBlocksHeaders(1),
Task::RequestMemoryPool(1)
]);
}
}
}

View File

@ -30,6 +30,8 @@ pub enum Task {
RequestMemoryPool(usize),
/// Send block.
SendBlock(usize, Block, ServerTaskIndex),
/// Broadcast block.
BroadcastBlocksHashes(Vec<H256>),
/// Send notfound
SendNotFound(usize, Vec<InventoryVector>, ServerTaskIndex),
/// Send inventory
@ -132,6 +134,20 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
connection.send_block(&block_message);
}
},
Task::BroadcastBlocksHashes(blocks_hashes) => {
let inventory = types::Inv {
inventory: blocks_hashes.into_iter().map(|h| InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: h,
})
.collect(),
};
for (peer_index, connection) in self.peers.iter() {
trace!(target: "sync", "Sending inventory with {} blocks hashes to peer#{}", inventory.inventory.len(), peer_index);
connection.send_inventory(&inventory);
}
},
Task::SendNotFound(peer_index, unknown_inventory, id) => {
let notfound = types::NotFound {
inventory: unknown_inventory,