Merge pull request #165 from ethcore/revert-164-revert-163-revert_of_revert

Revert "Revert "Revert "revert "relay canonized blocks""""
This commit is contained in:
Svyatoslav Nikolsky 2016-11-22 15:58:07 +03:00 committed by GitHub
commit 5255469095
10 changed files with 143 additions and 27 deletions

View File

@ -123,7 +123,6 @@ impl BestHeadersChain {
#[cfg(test)]
mod tests {
use super::BestHeadersChain;
use chain::RepresentH256;
use primitives::hash::H256;
use test_data;

View File

@ -37,7 +37,6 @@ mod tests {
use std::sync::Arc;
use super::super::Error;
use super::BlocksWriter;
use chain::RepresentH256;
use test_data;
use verification;

View File

@ -2,7 +2,6 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use parking_lot::Mutex;
use db;
use chain::RepresentH256;
use p2p::OutboundSyncConnectionRef;
use message::common::{InventoryType, InventoryVector};
use message::types;
@ -222,7 +221,6 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
mod tests {
use std::sync::Arc;
use parking_lot::{Mutex, RwLock};
use chain::RepresentH256;
use synchronization_executor::Task;
use synchronization_executor::tests::DummyTaskExecutor;
use synchronization_client::{Config, SynchronizationClient, SynchronizationClientCore};

View File

@ -25,10 +25,22 @@ const NUMBER_OF_QUEUES: usize = 3;
/// Block insertion result
#[derive(Debug, Default, PartialEq)]
pub struct BlockInsertionResult {
/// Transaction to 'reverify'
/// Hashes of blocks, which were canonized during this insertion procedure. Order matters
pub canonized_blocks_hashes: Vec<H256>,
/// Transaction to 'reverify'. Order matters
pub transactions_to_reverify: Vec<(H256, Transaction)>,
}
impl BlockInsertionResult {
#[cfg(test)]
pub fn with_canonized_blocks(canonized_blocks_hashes: Vec<H256>) -> Self {
BlockInsertionResult {
canonized_blocks_hashes: canonized_blocks_hashes,
transactions_to_reverify: Vec::new(),
}
}
}
/// Block synchronization state
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum BlockState {
@ -323,6 +335,7 @@ impl Chain {
// no transactions to reverify, because we have just appended new transactions to the blockchain
Ok(BlockInsertionResult {
canonized_blocks_hashes: vec![hash],
transactions_to_reverify: Vec::new(),
})
}
@ -340,15 +353,18 @@ impl Chain {
// + all transactions from previous blocks of this fork were accepted
// => delete accepted transactions from verification queue and from the memory pool
let this_block_transactions_hashes = block.transactions.iter().map(|tx| tx.hash());
let mut canonized_blocks_hashes: Vec<H256> = Vec::new();
let mut new_main_blocks_transactions_hashes: Vec<H256> = Vec::new();
while let Some(canonized_block_hash) = reorganization.pop_canonized() {
let canonized_transactions_hashes = self.storage.block_transaction_hashes(db::BlockRef::Hash(canonized_block_hash));
let canonized_transactions_hashes = self.storage.block_transaction_hashes(db::BlockRef::Hash(canonized_block_hash.clone()));
new_main_blocks_transactions_hashes.extend(canonized_transactions_hashes);
canonized_blocks_hashes.push(canonized_block_hash);
}
for transaction_accepted in this_block_transactions_hashes.chain(new_main_blocks_transactions_hashes.into_iter()) {
self.memory_pool.remove_by_hash(&transaction_accepted);
self.verifying_transactions.remove(&transaction_accepted);
}
canonized_blocks_hashes.reverse();
// reverify all transactions from old main branch' blocks
let mut old_main_blocks_transactions_hashes: Vec<H256> = Vec::new();
@ -378,6 +394,7 @@ impl Chain {
self.verifying_transactions.clear();
Ok(BlockInsertionResult {
canonized_blocks_hashes: canonized_blocks_hashes,
// order matters: db transactions, then ordered mempool transactions, then ordered verifying transactions
transactions_to_reverify: old_main_blocks_transactions.into_iter()
.chain(memory_pool_transactions.into_iter())
@ -682,7 +699,7 @@ impl fmt::Debug for Chain {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use chain::{Transaction, RepresentH256};
use chain::Transaction;
use hash_queue::HashPosition;
use super::{Chain, BlockState, TransactionState, HeadersIntersection, BlockInsertionResult};
use db::{self, Store, BestBlock};
@ -1085,24 +1102,25 @@ mod tests {
chain.insert_verified_transaction(tx4);
chain.insert_verified_transaction(tx5);
assert_eq!(chain.insert_best_block(b0.hash(), &b0).expect("block accepted"), BlockInsertionResult::default());
assert_eq!(chain.insert_best_block(b0.hash(), &b0).expect("block accepted"), BlockInsertionResult::with_canonized_blocks(vec![b0.hash()]));
assert_eq!(chain.information().transactions.transactions_count, 3);
assert_eq!(chain.insert_best_block(b1.hash(), &b1).expect("block accepted"), BlockInsertionResult::default());
assert_eq!(chain.insert_best_block(b1.hash(), &b1).expect("block accepted"), BlockInsertionResult::with_canonized_blocks(vec![b1.hash()]));
assert_eq!(chain.information().transactions.transactions_count, 3);
assert_eq!(chain.insert_best_block(b2.hash(), &b2).expect("block accepted"), BlockInsertionResult::default());
assert_eq!(chain.insert_best_block(b2.hash(), &b2).expect("block accepted"), BlockInsertionResult::with_canonized_blocks(vec![b2.hash()]));
assert_eq!(chain.information().transactions.transactions_count, 3);
assert_eq!(chain.insert_best_block(b3.hash(), &b3).expect("block accepted"), BlockInsertionResult::default());
assert_eq!(chain.information().transactions.transactions_count, 3);
assert_eq!(chain.insert_best_block(b4.hash(), &b4).expect("block accepted"), BlockInsertionResult::default());
assert_eq!(chain.information().transactions.transactions_count, 3);
// order matters
let transactions_to_reverify_hashes: Vec<_> = chain.insert_best_block(b5.hash(), &b5)
.expect("block accepted")
let insert_result = chain.insert_best_block(b5.hash(), &b5).expect("block accepted");
let transactions_to_reverify_hashes: Vec<_> = insert_result
.transactions_to_reverify
.into_iter()
.map(|(h, _)| h)
.collect();
assert_eq!(transactions_to_reverify_hashes, vec![tx1_hash, tx2_hash]);
assert_eq!(insert_result.canonized_blocks_hashes, vec![b3.hash(), b4.hash(), b5.hash()]);
assert_eq!(chain.information().transactions.transactions_count, 0); // tx3, tx4, tx5 are added to the database
}
}

View File

@ -8,7 +8,8 @@ use futures::stream::Stream;
use tokio_core::reactor::{Handle, Interval};
use futures_cpupool::CpuPool;
use db;
use chain::{Block, BlockHeader, Transaction, RepresentH256};
use chain::{Block, BlockHeader, Transaction};
use message::common::{InventoryVector, InventoryType};
use primitives::hash::H256;
use synchronization_peers::Peers;
#[cfg(test)] use synchronization_peers::{Information as PeersInformation};
@ -18,6 +19,7 @@ use synchronization_chain::{Information as ChainInformation};
use synchronization_executor::{Task, TaskExecutor};
use orphan_blocks_pool::OrphanBlocksPool;
use orphan_transactions_pool::OrphanTransactionsPool;
use synchronization_server::ServerTaskIndex;
use synchronization_manager::{manage_synchronization_peers_blocks, manage_synchronization_peers_inventory,
manage_unknown_orphaned_blocks, manage_orphaned_transactions, MANAGEMENT_INTERVAL_MS,
ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig};
@ -629,8 +631,8 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
self.execute_synchronization_tasks(None);
// relay block to our peers
if self.state.is_saturated() {
// TODO: Task::BroadcastBlock
if self.state.is_saturated() || self.state.is_nearly_saturated() {
self.relay_new_blocks(insert_result.canonized_blocks_hashes);
}
// deal with block transactions
@ -768,6 +770,26 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
}
}
/// Relay new blocks
fn relay_new_blocks(&self, new_blocks_hashes: Vec<H256>) {
let mut executor = self.executor.lock();
// TODO: use all peers here (currently sync only)
// TODO: send `headers` if peer has not send `sendheaders` command
for peer_index in self.peers.all_peers() {
let inventory: Vec<_> = new_blocks_hashes.iter()
.filter(|h| !self.peers.has_block_with_hash(peer_index, h))
.cloned()
.map(|h| InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: h,
})
.collect();
if !inventory.is_empty() {
executor.execute(Task::SendInventory(peer_index, inventory, ServerTaskIndex::None));
}
}
}
/// Process new blocks inventory
fn process_new_blocks_headers(&mut self, peer_index: usize, mut hashes: Vec<H256>, mut headers: Vec<BlockHeader>) {
assert_eq!(hashes.len(), headers.len());
@ -1051,12 +1073,14 @@ pub mod tests {
use std::sync::Arc;
use parking_lot::{Mutex, RwLock};
use tokio_core::reactor::{Core, Handle};
use chain::{Block, Transaction, RepresentH256};
use chain::{Block, Transaction};
use message::common::{InventoryVector, InventoryType};
use super::{Client, Config, SynchronizationClient, SynchronizationClientCore};
use synchronization_executor::Task;
use synchronization_chain::{Chain, ChainRef};
use synchronization_executor::tests::DummyTaskExecutor;
use synchronization_verifier::tests::DummyVerifier;
use synchronization_server::ServerTaskIndex;
use primitives::hash::H256;
use p2p::event_loop;
use test_data;
@ -1243,14 +1267,17 @@ pub mod tests {
sync.on_new_blocks_headers(1, vec![block.block_header.clone()]);
sync.on_new_blocks_headers(2, vec![block.block_header.clone()]);
executor.lock().take_tasks();
sync.on_peer_block(2, block);
sync.on_peer_block(2, block.clone());
let tasks = executor.lock().take_tasks();
assert_eq!(tasks.len(), 4);
assert_eq!(tasks.len(), 5);
assert!(tasks.iter().any(|t| t == &Task::RequestBlocksHeaders(1)));
assert!(tasks.iter().any(|t| t == &Task::RequestBlocksHeaders(2)));
assert!(tasks.iter().any(|t| t == &Task::RequestMemoryPool(1)));
assert!(tasks.iter().any(|t| t == &Task::RequestMemoryPool(2)));
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: block.hash() }];
assert!(tasks.iter().any(|t| t == &Task::SendInventory(1, inventory.clone(), ServerTaskIndex::None)));
}
#[test]
@ -1764,4 +1791,48 @@ pub mod tests {
sync.on_new_blocks_headers(2, vec![b10.block_header.clone(), b21.block_header.clone(),
b22.block_header.clone(), b23.block_header.clone()]);
}
#[test]
fn relay_new_block_when_in_saturated_state() {
let (_, _, executor, _, sync) = create_sync(None, None);
let genesis = test_data::genesis();
let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build();
let b1 = test_data::block_builder().header().parent(b0.hash()).build().build();
let b2 = test_data::block_builder().header().parent(b1.hash()).build().build();
let b3 = test_data::block_builder().header().parent(b2.hash()).build().build();
let mut sync = sync.lock();
sync.on_new_blocks_headers(1, vec![b0.block_header.clone(), b1.block_header.clone()]);
sync.on_peer_block(1, b0.clone());
sync.on_peer_block(1, b1.clone());
// we were in synchronization state => block is not relayed
{
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1),
Task::RequestBlocks(1, vec![b0.hash(), b1.hash()]),
Task::RequestBlocksHeaders(1),
Task::RequestMemoryPool(1)
]);
}
sync.on_peer_block(2, b2.clone());
// we were in saturated state => block is relayed
{
let tasks = executor.lock().take_tasks();
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: b2.hash() }];
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(2), Task::SendInventory(1, inventory, ServerTaskIndex::None)]);
}
sync.on_new_blocks_headers(1, vec![b3.block_header.clone()]);
sync.on_peer_block(1, b3.clone());
// we were in nearly saturated state => block is relayed
{
let tasks = executor.lock().take_tasks();
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: b3.hash() }];
assert!(tasks.iter().any(|t| t == &Task::SendInventory(2, inventory.clone(), ServerTaskIndex::None)));
}
}
}

View File

@ -1,7 +1,7 @@
use std::sync::Arc;
use std::collections::HashMap;
use parking_lot::Mutex;
use chain::{Block, BlockHeader, RepresentH256};
use chain::{Block, BlockHeader};
use message::common::{InventoryVector, InventoryType};
use message::types;
use primitives::hash::H256;

View File

@ -185,7 +185,6 @@ mod tests {
use std::collections::HashSet;
use super::{ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig, manage_synchronization_peers_blocks,
manage_unknown_orphaned_blocks, manage_orphaned_transactions};
use chain::RepresentH256;
use synchronization_peers::Peers;
use primitives::hash::H256;
use test_data;

View File

@ -6,6 +6,8 @@ use time::precise_time_s;
/// Max peer failures # before excluding from sync process
const MAX_PEER_FAILURES: usize = 2;
/// Max last blocks to store for given peer
const MAX_LAST_BLOCKS_TO_STORE: usize = 64;
/// Set of peers selected for synchronization.
#[derive(Debug)]
@ -24,6 +26,8 @@ pub struct Peers {
inventory_requests: HashSet<usize>,
/// Last inventory message time from peer.
inventory_requests_order: LinkedHashMap<usize, f64>,
/// Last blocks from peer
last_block_responses: HashMap<usize, LinkedHashMap<H256, ()>>,
}
/// Information on synchronization peers
@ -48,6 +52,7 @@ impl Peers {
blocks_requests_order: LinkedHashMap::new(),
inventory_requests: HashSet::new(),
inventory_requests_order: LinkedHashMap::new(),
last_block_responses: HashMap::new(),
}
}
@ -73,19 +78,27 @@ impl Peers {
/// Get all peers
pub fn all_peers(&self) -> Vec<usize> {
self.idle.iter().cloned()
let mut unique: Vec<_> = self.idle.iter().cloned()
.chain(self.unuseful.iter().cloned())
.chain(self.blocks_requests.keys().cloned())
.chain(self.inventory_requests.iter().cloned())
.collect()
.collect();
// need stable (for tests) && unique peers here, as blocks_requests can intersect with inventory_requests
unique.sort();
unique.dedup();
unique
}
/// Get useful peers
pub fn useful_peers(&self) -> Vec<usize> {
self.idle.iter().cloned()
let mut unique: Vec<_> = self.idle.iter().cloned()
.chain(self.blocks_requests.keys().cloned())
.chain(self.inventory_requests.iter().cloned())
.collect()
.collect();
// need stable (for tests) && unique peers here, as blocks_requests can intersect with inventory_requests
unique.sort();
unique.dedup();
unique
}
/// Get idle peers for inventory request.
@ -125,6 +138,14 @@ impl Peers {
self.blocks_requests.get(&peer_index).cloned()
}
/// True if peer already has block with this hash
pub fn has_block_with_hash(&self, peer_index: usize, hash: &H256) -> bool {
self.last_block_responses
.get(&peer_index)
.map(|h| h.contains_key(hash))
.unwrap_or(false)
}
/// Mark peer as useful.
pub fn useful_peer(&mut self, peer_index: usize) {
// if peer is unknown => insert to idle queue
@ -162,6 +183,7 @@ impl Peers {
self.blocks_requests_order.remove(&peer_index);
self.inventory_requests.remove(&peer_index);
self.inventory_requests_order.remove(&peer_index);
self.last_block_responses.remove(&peer_index);
peer_blocks_requests
.map(|hs| hs.into_iter().collect())
}
@ -188,6 +210,16 @@ impl Peers {
if try_mark_as_idle {
self.try_mark_idle(peer_index);
}
// TODO: add test for it
// remember that peer knows about this block
let last_block_responses_entry = self.last_block_responses.entry(peer_index).or_insert_with(LinkedHashMap::default);
if !last_block_responses_entry.contains_key(block_hash) {
if last_block_responses_entry.len() == MAX_LAST_BLOCKS_TO_STORE {
last_block_responses_entry.pop_front();
}
last_block_responses_entry.insert(block_hash.clone(), ());
}
}
/// Inventory received from peer.

View File

@ -472,7 +472,7 @@ pub mod tests {
use db;
use test_data;
use primitives::hash::H256;
use chain::{Transaction, RepresentH256};
use chain::Transaction;
use message::types;
use message::common::{InventoryVector, InventoryType};
use synchronization_executor::Task;

View File

@ -2,7 +2,7 @@ use std::thread;
use std::sync::Arc;
use std::sync::mpsc::{channel, Sender, Receiver};
use parking_lot::Mutex;
use chain::{Block, Transaction, RepresentH256};
use chain::{Block, Transaction};
use message::common::ConsensusParams;
use primitives::hash::H256;
use verification::{ChainVerifier, Verify as VerificationVerify};
@ -151,7 +151,7 @@ pub mod tests {
use std::sync::Arc;
use std::collections::HashMap;
use parking_lot::Mutex;
use chain::{Block, Transaction, RepresentH256};
use chain::{Block, Transaction};
use synchronization_client::SynchronizationClientCore;
use synchronization_executor::tests::DummyTaskExecutor;
use primitives::hash::H256;