diff --git a/sync/src/synchronization_chain.rs b/sync/src/synchronization_chain.rs index 38bc8d7c..7dac3e2a 100644 --- a/sync/src/synchronization_chain.rs +++ b/sync/src/synchronization_chain.rs @@ -25,22 +25,10 @@ const NUMBER_OF_QUEUES: usize = 3; /// Block insertion result #[derive(Debug, Default, PartialEq)] pub struct BlockInsertionResult { - /// Hashes of blocks, which were canonized during this insertion procedure. Order matters - pub canonized_blocks_hashes: Vec, - /// Transaction to 'reverify'. Order matters + /// Transaction to 'reverify' pub transactions_to_reverify: Vec<(H256, Transaction)>, } -impl BlockInsertionResult { - #[cfg(test)] - pub fn with_canonized_blocks(canonized_blocks_hashes: Vec) -> Self { - BlockInsertionResult { - canonized_blocks_hashes: canonized_blocks_hashes, - transactions_to_reverify: Vec::new(), - } - } -} - /// Block synchronization state #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum BlockState { @@ -335,7 +323,6 @@ impl Chain { // no transactions to reverify, because we have just appended new transactions to the blockchain Ok(BlockInsertionResult { - canonized_blocks_hashes: vec![hash], transactions_to_reverify: Vec::new(), }) } @@ -353,18 +340,15 @@ impl Chain { // + all transactions from previous blocks of this fork were accepted // => delete accepted transactions from verification queue and from the memory pool let this_block_transactions_hashes = block.transactions.iter().map(|tx| tx.hash()); - let mut canonized_blocks_hashes: Vec = Vec::new(); let mut new_main_blocks_transactions_hashes: Vec = Vec::new(); while let Some(canonized_block_hash) = reorganization.pop_canonized() { - let canonized_transactions_hashes = self.storage.block_transaction_hashes(db::BlockRef::Hash(canonized_block_hash.clone())); + let canonized_transactions_hashes = self.storage.block_transaction_hashes(db::BlockRef::Hash(canonized_block_hash)); new_main_blocks_transactions_hashes.extend(canonized_transactions_hashes); - canonized_blocks_hashes.push(canonized_block_hash); } for transaction_accepted in this_block_transactions_hashes.chain(new_main_blocks_transactions_hashes.into_iter()) { self.memory_pool.remove_by_hash(&transaction_accepted); self.verifying_transactions.remove(&transaction_accepted); } - canonized_blocks_hashes.reverse(); // reverify all transactions from old main branch' blocks let mut old_main_blocks_transactions_hashes: Vec = Vec::new(); @@ -394,7 +378,6 @@ impl Chain { self.verifying_transactions.clear(); Ok(BlockInsertionResult { - canonized_blocks_hashes: canonized_blocks_hashes, // order matters: db transactions, then ordered mempool transactions, then ordered verifying transactions transactions_to_reverify: old_main_blocks_transactions.into_iter() .chain(memory_pool_transactions.into_iter()) @@ -1102,25 +1085,24 @@ mod tests { chain.insert_verified_transaction(tx4); chain.insert_verified_transaction(tx5); - assert_eq!(chain.insert_best_block(b0.hash(), &b0).expect("block accepted"), BlockInsertionResult::with_canonized_blocks(vec![b0.hash()])); + assert_eq!(chain.insert_best_block(b0.hash(), &b0).expect("block accepted"), BlockInsertionResult::default()); assert_eq!(chain.information().transactions.transactions_count, 3); - assert_eq!(chain.insert_best_block(b1.hash(), &b1).expect("block accepted"), BlockInsertionResult::with_canonized_blocks(vec![b1.hash()])); + assert_eq!(chain.insert_best_block(b1.hash(), &b1).expect("block accepted"), BlockInsertionResult::default()); assert_eq!(chain.information().transactions.transactions_count, 3); - assert_eq!(chain.insert_best_block(b2.hash(), &b2).expect("block accepted"), BlockInsertionResult::with_canonized_blocks(vec![b2.hash()])); + assert_eq!(chain.insert_best_block(b2.hash(), &b2).expect("block accepted"), BlockInsertionResult::default()); assert_eq!(chain.information().transactions.transactions_count, 3); assert_eq!(chain.insert_best_block(b3.hash(), &b3).expect("block accepted"), BlockInsertionResult::default()); assert_eq!(chain.information().transactions.transactions_count, 3); assert_eq!(chain.insert_best_block(b4.hash(), &b4).expect("block accepted"), BlockInsertionResult::default()); assert_eq!(chain.information().transactions.transactions_count, 3); // order matters - let insert_result = chain.insert_best_block(b5.hash(), &b5).expect("block accepted"); - let transactions_to_reverify_hashes: Vec<_> = insert_result + let transactions_to_reverify_hashes: Vec<_> = chain.insert_best_block(b5.hash(), &b5) + .expect("block accepted") .transactions_to_reverify .into_iter() .map(|(h, _)| h) .collect(); assert_eq!(transactions_to_reverify_hashes, vec![tx1_hash, tx2_hash]); - assert_eq!(insert_result.canonized_blocks_hashes, vec![b3.hash(), b4.hash(), b5.hash()]); assert_eq!(chain.information().transactions.transactions_count, 0); // tx3, tx4, tx5 are added to the database } } diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 0d3988d0..71ae4440 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -629,10 +629,8 @@ impl VerificationSink for SynchronizationClientCore where T: TaskExecutor self.execute_synchronization_tasks(None); // relay block to our peers - if self.state.is_saturated() || self.state.is_nearly_saturated() { - // TODO: remember peer' last N blocks and send only if peer has no canonized blocks - // TODO: send `headers` if peer has not send `sendheaders` command - self.executor.lock().execute(Task::BroadcastBlocksHashes(insert_result.canonized_blocks_hashes)); + if self.state.is_saturated() { + // TODO: Task::BroadcastBlock } // deal with block transactions @@ -1245,15 +1243,14 @@ pub mod tests { sync.on_new_blocks_headers(1, vec![block.block_header.clone()]); sync.on_new_blocks_headers(2, vec![block.block_header.clone()]); executor.lock().take_tasks(); - sync.on_peer_block(2, block.clone()); + sync.on_peer_block(2, block); let tasks = executor.lock().take_tasks(); - assert_eq!(tasks.len(), 5); + assert_eq!(tasks.len(), 4); assert!(tasks.iter().any(|t| t == &Task::RequestBlocksHeaders(1))); assert!(tasks.iter().any(|t| t == &Task::RequestBlocksHeaders(2))); assert!(tasks.iter().any(|t| t == &Task::RequestMemoryPool(1))); assert!(tasks.iter().any(|t| t == &Task::RequestMemoryPool(2))); - assert!(tasks.iter().any(|t| t == &Task::BroadcastBlocksHashes(vec![block.hash()]))); } #[test] @@ -1767,51 +1764,4 @@ pub mod tests { sync.on_new_blocks_headers(2, vec![b10.block_header.clone(), b21.block_header.clone(), b22.block_header.clone(), b23.block_header.clone()]); } - - #[test] - fn relay_new_block_when_in_saturated_state() { - let (_, _, executor, _, sync) = create_sync(None, None); - let genesis = test_data::genesis(); - let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build(); - let b1 = test_data::block_builder().header().parent(b0.hash()).build().build(); - let b2 = test_data::block_builder().header().parent(b1.hash()).build().build(); - let b3 = test_data::block_builder().header().parent(b2.hash()).build().build(); - - let mut sync = sync.lock(); - sync.on_new_blocks_headers(1, vec![b0.block_header.clone(), b1.block_header.clone()]); - sync.on_peer_block(1, b0.clone()); - sync.on_peer_block(1, b1.clone()); - - // we were in synchronization state => block is not relayed - { - let tasks = executor.lock().take_tasks(); - assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), - Task::RequestBlocks(1, vec![b0.hash(), b1.hash()]), - Task::RequestBlocksHeaders(1), - Task::RequestMemoryPool(1) - ]); - } - - sync.on_peer_block(1, b2.clone()); - - // we were in saturated state => block is relayed - { - let tasks = executor.lock().take_tasks(); - assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), Task::BroadcastBlocksHashes(vec![b2.hash()])]); - } - - sync.on_new_blocks_headers(1, vec![b3.block_header.clone()]); - sync.on_peer_block(1, b3.clone()); - - // we were in nearly saturated state => block is relayed - { - let tasks = executor.lock().take_tasks(); - assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), - Task::RequestBlocks(1, vec![b3.hash()]), - Task::BroadcastBlocksHashes(vec![b3.hash()]), - Task::RequestBlocksHeaders(1), - Task::RequestMemoryPool(1) - ]); - } - } } diff --git a/sync/src/synchronization_executor.rs b/sync/src/synchronization_executor.rs index 286ccdf6..8a8f57ca 100644 --- a/sync/src/synchronization_executor.rs +++ b/sync/src/synchronization_executor.rs @@ -30,8 +30,6 @@ pub enum Task { RequestMemoryPool(usize), /// Send block. SendBlock(usize, Block, ServerTaskIndex), - /// Broadcast block. - BroadcastBlocksHashes(Vec), /// Send notfound SendNotFound(usize, Vec, ServerTaskIndex), /// Send inventory @@ -134,20 +132,6 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { connection.send_block(&block_message); } }, - Task::BroadcastBlocksHashes(blocks_hashes) => { - let inventory = types::Inv { - inventory: blocks_hashes.into_iter().map(|h| InventoryVector { - inv_type: InventoryType::MessageBlock, - hash: h, - }) - .collect(), - }; - - for (peer_index, connection) in self.peers.iter() { - trace!(target: "sync", "Sending inventory with {} blocks hashes to peer#{}", inventory.inventory.len(), peer_index); - connection.send_inventory(&inventory); - } - }, Task::SendNotFound(peer_index, unknown_inventory, id) => { let notfound = types::NotFound { inventory: unknown_inventory,