From 730bc619c401a4a20313e3e335d689775f3658d9 Mon Sep 17 00:00:00 2001 From: Marek Kotewicz Date: Mon, 21 Nov 2016 18:06:14 +0100 Subject: [PATCH] Revert "Revert "Revert "revert "relay canonized blocks"""" --- sync/src/synchronization_chain.rs | 32 +++++++++++---- sync/src/synchronization_client.rs | 58 ++++++++++++++++++++++++++-- sync/src/synchronization_executor.rs | 16 ++++++++ 3 files changed, 95 insertions(+), 11 deletions(-) diff --git a/sync/src/synchronization_chain.rs b/sync/src/synchronization_chain.rs index 7dac3e2a..38bc8d7c 100644 --- a/sync/src/synchronization_chain.rs +++ b/sync/src/synchronization_chain.rs @@ -25,10 +25,22 @@ const NUMBER_OF_QUEUES: usize = 3; /// Block insertion result #[derive(Debug, Default, PartialEq)] pub struct BlockInsertionResult { - /// Transaction to 'reverify' + /// Hashes of blocks, which were canonized during this insertion procedure. Order matters + pub canonized_blocks_hashes: Vec, + /// Transaction to 'reverify'. Order matters pub transactions_to_reverify: Vec<(H256, Transaction)>, } +impl BlockInsertionResult { + #[cfg(test)] + pub fn with_canonized_blocks(canonized_blocks_hashes: Vec) -> Self { + BlockInsertionResult { + canonized_blocks_hashes: canonized_blocks_hashes, + transactions_to_reverify: Vec::new(), + } + } +} + /// Block synchronization state #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum BlockState { @@ -323,6 +335,7 @@ impl Chain { // no transactions to reverify, because we have just appended new transactions to the blockchain Ok(BlockInsertionResult { + canonized_blocks_hashes: vec![hash], transactions_to_reverify: Vec::new(), }) } @@ -340,15 +353,18 @@ impl Chain { // + all transactions from previous blocks of this fork were accepted // => delete accepted transactions from verification queue and from the memory pool let this_block_transactions_hashes = block.transactions.iter().map(|tx| tx.hash()); + let mut canonized_blocks_hashes: Vec = Vec::new(); let mut new_main_blocks_transactions_hashes: Vec = Vec::new(); while let Some(canonized_block_hash) = reorganization.pop_canonized() { - let canonized_transactions_hashes = self.storage.block_transaction_hashes(db::BlockRef::Hash(canonized_block_hash)); + let canonized_transactions_hashes = self.storage.block_transaction_hashes(db::BlockRef::Hash(canonized_block_hash.clone())); new_main_blocks_transactions_hashes.extend(canonized_transactions_hashes); + canonized_blocks_hashes.push(canonized_block_hash); } for transaction_accepted in this_block_transactions_hashes.chain(new_main_blocks_transactions_hashes.into_iter()) { self.memory_pool.remove_by_hash(&transaction_accepted); self.verifying_transactions.remove(&transaction_accepted); } + canonized_blocks_hashes.reverse(); // reverify all transactions from old main branch' blocks let mut old_main_blocks_transactions_hashes: Vec = Vec::new(); @@ -378,6 +394,7 @@ impl Chain { self.verifying_transactions.clear(); Ok(BlockInsertionResult { + canonized_blocks_hashes: canonized_blocks_hashes, // order matters: db transactions, then ordered mempool transactions, then ordered verifying transactions transactions_to_reverify: old_main_blocks_transactions.into_iter() .chain(memory_pool_transactions.into_iter()) @@ -1085,24 +1102,25 @@ mod tests { chain.insert_verified_transaction(tx4); chain.insert_verified_transaction(tx5); - assert_eq!(chain.insert_best_block(b0.hash(), &b0).expect("block accepted"), BlockInsertionResult::default()); + assert_eq!(chain.insert_best_block(b0.hash(), &b0).expect("block accepted"), BlockInsertionResult::with_canonized_blocks(vec![b0.hash()])); assert_eq!(chain.information().transactions.transactions_count, 3); - assert_eq!(chain.insert_best_block(b1.hash(), &b1).expect("block accepted"), BlockInsertionResult::default()); + assert_eq!(chain.insert_best_block(b1.hash(), &b1).expect("block accepted"), BlockInsertionResult::with_canonized_blocks(vec![b1.hash()])); assert_eq!(chain.information().transactions.transactions_count, 3); - assert_eq!(chain.insert_best_block(b2.hash(), &b2).expect("block accepted"), BlockInsertionResult::default()); + assert_eq!(chain.insert_best_block(b2.hash(), &b2).expect("block accepted"), BlockInsertionResult::with_canonized_blocks(vec![b2.hash()])); assert_eq!(chain.information().transactions.transactions_count, 3); assert_eq!(chain.insert_best_block(b3.hash(), &b3).expect("block accepted"), BlockInsertionResult::default()); assert_eq!(chain.information().transactions.transactions_count, 3); assert_eq!(chain.insert_best_block(b4.hash(), &b4).expect("block accepted"), BlockInsertionResult::default()); assert_eq!(chain.information().transactions.transactions_count, 3); // order matters - let transactions_to_reverify_hashes: Vec<_> = chain.insert_best_block(b5.hash(), &b5) - .expect("block accepted") + let insert_result = chain.insert_best_block(b5.hash(), &b5).expect("block accepted"); + let transactions_to_reverify_hashes: Vec<_> = insert_result .transactions_to_reverify .into_iter() .map(|(h, _)| h) .collect(); assert_eq!(transactions_to_reverify_hashes, vec![tx1_hash, tx2_hash]); + assert_eq!(insert_result.canonized_blocks_hashes, vec![b3.hash(), b4.hash(), b5.hash()]); assert_eq!(chain.information().transactions.transactions_count, 0); // tx3, tx4, tx5 are added to the database } } diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 71ae4440..0d3988d0 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -629,8 +629,10 @@ impl VerificationSink for SynchronizationClientCore where T: TaskExecutor self.execute_synchronization_tasks(None); // relay block to our peers - if self.state.is_saturated() { - // TODO: Task::BroadcastBlock + if self.state.is_saturated() || self.state.is_nearly_saturated() { + // TODO: remember peer' last N blocks and send only if peer has no canonized blocks + // TODO: send `headers` if peer has not send `sendheaders` command + self.executor.lock().execute(Task::BroadcastBlocksHashes(insert_result.canonized_blocks_hashes)); } // deal with block transactions @@ -1243,14 +1245,15 @@ pub mod tests { sync.on_new_blocks_headers(1, vec![block.block_header.clone()]); sync.on_new_blocks_headers(2, vec![block.block_header.clone()]); executor.lock().take_tasks(); - sync.on_peer_block(2, block); + sync.on_peer_block(2, block.clone()); let tasks = executor.lock().take_tasks(); - assert_eq!(tasks.len(), 4); + assert_eq!(tasks.len(), 5); assert!(tasks.iter().any(|t| t == &Task::RequestBlocksHeaders(1))); assert!(tasks.iter().any(|t| t == &Task::RequestBlocksHeaders(2))); assert!(tasks.iter().any(|t| t == &Task::RequestMemoryPool(1))); assert!(tasks.iter().any(|t| t == &Task::RequestMemoryPool(2))); + assert!(tasks.iter().any(|t| t == &Task::BroadcastBlocksHashes(vec![block.hash()]))); } #[test] @@ -1764,4 +1767,51 @@ pub mod tests { sync.on_new_blocks_headers(2, vec![b10.block_header.clone(), b21.block_header.clone(), b22.block_header.clone(), b23.block_header.clone()]); } + + #[test] + fn relay_new_block_when_in_saturated_state() { + let (_, _, executor, _, sync) = create_sync(None, None); + let genesis = test_data::genesis(); + let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build(); + let b1 = test_data::block_builder().header().parent(b0.hash()).build().build(); + let b2 = test_data::block_builder().header().parent(b1.hash()).build().build(); + let b3 = test_data::block_builder().header().parent(b2.hash()).build().build(); + + let mut sync = sync.lock(); + sync.on_new_blocks_headers(1, vec![b0.block_header.clone(), b1.block_header.clone()]); + sync.on_peer_block(1, b0.clone()); + sync.on_peer_block(1, b1.clone()); + + // we were in synchronization state => block is not relayed + { + let tasks = executor.lock().take_tasks(); + assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), + Task::RequestBlocks(1, vec![b0.hash(), b1.hash()]), + Task::RequestBlocksHeaders(1), + Task::RequestMemoryPool(1) + ]); + } + + sync.on_peer_block(1, b2.clone()); + + // we were in saturated state => block is relayed + { + let tasks = executor.lock().take_tasks(); + assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), Task::BroadcastBlocksHashes(vec![b2.hash()])]); + } + + sync.on_new_blocks_headers(1, vec![b3.block_header.clone()]); + sync.on_peer_block(1, b3.clone()); + + // we were in nearly saturated state => block is relayed + { + let tasks = executor.lock().take_tasks(); + assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), + Task::RequestBlocks(1, vec![b3.hash()]), + Task::BroadcastBlocksHashes(vec![b3.hash()]), + Task::RequestBlocksHeaders(1), + Task::RequestMemoryPool(1) + ]); + } + } } diff --git a/sync/src/synchronization_executor.rs b/sync/src/synchronization_executor.rs index 8a8f57ca..286ccdf6 100644 --- a/sync/src/synchronization_executor.rs +++ b/sync/src/synchronization_executor.rs @@ -30,6 +30,8 @@ pub enum Task { RequestMemoryPool(usize), /// Send block. SendBlock(usize, Block, ServerTaskIndex), + /// Broadcast block. + BroadcastBlocksHashes(Vec), /// Send notfound SendNotFound(usize, Vec, ServerTaskIndex), /// Send inventory @@ -132,6 +134,20 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { connection.send_block(&block_message); } }, + Task::BroadcastBlocksHashes(blocks_hashes) => { + let inventory = types::Inv { + inventory: blocks_hashes.into_iter().map(|h| InventoryVector { + inv_type: InventoryType::MessageBlock, + hash: h, + }) + .collect(), + }; + + for (peer_index, connection) in self.peers.iter() { + trace!(target: "sync", "Sending inventory with {} blocks hashes to peer#{}", inventory.inventory.len(), peer_index); + connection.send_inventory(&inventory); + } + }, Task::SendNotFound(peer_index, unknown_inventory, id) => { let notfound = types::NotFound { inventory: unknown_inventory,