do not boomerang-relay new blocks
This commit is contained in:
parent
730bc619c4
commit
5e80dd69bd
|
@ -123,7 +123,6 @@ impl BestHeadersChain {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::BestHeadersChain;
|
||||
use chain::RepresentH256;
|
||||
use primitives::hash::H256;
|
||||
use test_data;
|
||||
|
||||
|
|
|
@ -37,7 +37,6 @@ mod tests {
|
|||
use std::sync::Arc;
|
||||
use super::super::Error;
|
||||
use super::BlocksWriter;
|
||||
use chain::RepresentH256;
|
||||
use test_data;
|
||||
use verification;
|
||||
|
||||
|
|
|
@ -2,7 +2,6 @@ use std::sync::Arc;
|
|||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use parking_lot::Mutex;
|
||||
use db;
|
||||
use chain::RepresentH256;
|
||||
use p2p::OutboundSyncConnectionRef;
|
||||
use message::common::{InventoryType, InventoryVector};
|
||||
use message::types;
|
||||
|
@ -222,7 +221,6 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
|
|||
mod tests {
|
||||
use std::sync::Arc;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use chain::RepresentH256;
|
||||
use synchronization_executor::Task;
|
||||
use synchronization_executor::tests::DummyTaskExecutor;
|
||||
use synchronization_client::{Config, SynchronizationClient, SynchronizationClientCore};
|
||||
|
|
|
@ -699,7 +699,7 @@ impl fmt::Debug for Chain {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
use chain::{Transaction, RepresentH256};
|
||||
use chain::Transaction;
|
||||
use hash_queue::HashPosition;
|
||||
use super::{Chain, BlockState, TransactionState, HeadersIntersection, BlockInsertionResult};
|
||||
use db::{self, Store, BestBlock};
|
||||
|
|
|
@ -8,7 +8,8 @@ use futures::stream::Stream;
|
|||
use tokio_core::reactor::{Handle, Interval};
|
||||
use futures_cpupool::CpuPool;
|
||||
use db;
|
||||
use chain::{Block, BlockHeader, Transaction, RepresentH256};
|
||||
use chain::{Block, BlockHeader, Transaction};
|
||||
use message::common::{InventoryVector, InventoryType};
|
||||
use primitives::hash::H256;
|
||||
use synchronization_peers::Peers;
|
||||
#[cfg(test)] use synchronization_peers::{Information as PeersInformation};
|
||||
|
@ -18,6 +19,7 @@ use synchronization_chain::{Information as ChainInformation};
|
|||
use synchronization_executor::{Task, TaskExecutor};
|
||||
use orphan_blocks_pool::OrphanBlocksPool;
|
||||
use orphan_transactions_pool::OrphanTransactionsPool;
|
||||
use synchronization_server::ServerTaskIndex;
|
||||
use synchronization_manager::{manage_synchronization_peers_blocks, manage_synchronization_peers_inventory,
|
||||
manage_unknown_orphaned_blocks, manage_orphaned_transactions, MANAGEMENT_INTERVAL_MS,
|
||||
ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig};
|
||||
|
@ -630,9 +632,7 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
|
|||
|
||||
// relay block to our peers
|
||||
if self.state.is_saturated() || self.state.is_nearly_saturated() {
|
||||
// TODO: remember peer' last N blocks and send only if peer has no canonized blocks
|
||||
// TODO: send `headers` if peer has not send `sendheaders` command
|
||||
self.executor.lock().execute(Task::BroadcastBlocksHashes(insert_result.canonized_blocks_hashes));
|
||||
self.relay_new_blocks(insert_result.canonized_blocks_hashes);
|
||||
}
|
||||
|
||||
// deal with block transactions
|
||||
|
@ -770,6 +770,26 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
|||
}
|
||||
}
|
||||
|
||||
/// Relay new blocks
|
||||
fn relay_new_blocks(&self, new_blocks_hashes: Vec<H256>) {
|
||||
let mut executor = self.executor.lock();
|
||||
// TODO: use all peers here (currently sync only)
|
||||
// TODO: send `headers` if peer has not send `sendheaders` command
|
||||
for peer_index in self.peers.all_peers() {
|
||||
let inventory: Vec<_> = new_blocks_hashes.iter()
|
||||
.filter(|h| !self.peers.has_block_with_hash(peer_index, h))
|
||||
.cloned()
|
||||
.map(|h| InventoryVector {
|
||||
inv_type: InventoryType::MessageBlock,
|
||||
hash: h,
|
||||
})
|
||||
.collect();
|
||||
if !inventory.is_empty() {
|
||||
executor.execute(Task::SendInventory(peer_index, inventory, ServerTaskIndex::None));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Process new blocks inventory
|
||||
fn process_new_blocks_headers(&mut self, peer_index: usize, mut hashes: Vec<H256>, mut headers: Vec<BlockHeader>) {
|
||||
assert_eq!(hashes.len(), headers.len());
|
||||
|
@ -1053,12 +1073,14 @@ pub mod tests {
|
|||
use std::sync::Arc;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use tokio_core::reactor::{Core, Handle};
|
||||
use chain::{Block, Transaction, RepresentH256};
|
||||
use chain::{Block, Transaction};
|
||||
use message::common::{InventoryVector, InventoryType};
|
||||
use super::{Client, Config, SynchronizationClient, SynchronizationClientCore};
|
||||
use synchronization_executor::Task;
|
||||
use synchronization_chain::{Chain, ChainRef};
|
||||
use synchronization_executor::tests::DummyTaskExecutor;
|
||||
use synchronization_verifier::tests::DummyVerifier;
|
||||
use synchronization_server::ServerTaskIndex;
|
||||
use primitives::hash::H256;
|
||||
use p2p::event_loop;
|
||||
use test_data;
|
||||
|
@ -1253,7 +1275,9 @@ pub mod tests {
|
|||
assert!(tasks.iter().any(|t| t == &Task::RequestBlocksHeaders(2)));
|
||||
assert!(tasks.iter().any(|t| t == &Task::RequestMemoryPool(1)));
|
||||
assert!(tasks.iter().any(|t| t == &Task::RequestMemoryPool(2)));
|
||||
assert!(tasks.iter().any(|t| t == &Task::BroadcastBlocksHashes(vec![block.hash()])));
|
||||
|
||||
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: block.hash() }];
|
||||
assert!(tasks.iter().any(|t| t == &Task::SendInventory(1, inventory.clone(), ServerTaskIndex::None)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -1792,12 +1816,13 @@ pub mod tests {
|
|||
]);
|
||||
}
|
||||
|
||||
sync.on_peer_block(1, b2.clone());
|
||||
sync.on_peer_block(2, b2.clone());
|
||||
|
||||
// we were in saturated state => block is relayed
|
||||
{
|
||||
let tasks = executor.lock().take_tasks();
|
||||
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), Task::BroadcastBlocksHashes(vec![b2.hash()])]);
|
||||
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: b2.hash() }];
|
||||
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(2), Task::SendInventory(1, inventory, ServerTaskIndex::None)]);
|
||||
}
|
||||
|
||||
sync.on_new_blocks_headers(1, vec![b3.block_header.clone()]);
|
||||
|
@ -1806,11 +1831,14 @@ pub mod tests {
|
|||
// we were in nearly saturated state => block is relayed
|
||||
{
|
||||
let tasks = executor.lock().take_tasks();
|
||||
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: b3.hash() }];
|
||||
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1),
|
||||
Task::RequestBlocks(1, vec![b3.hash()]),
|
||||
Task::BroadcastBlocksHashes(vec![b3.hash()]),
|
||||
Task::RequestBlocks(2, vec![b3.hash()]),
|
||||
Task::SendInventory(2, inventory, ServerTaskIndex::None),
|
||||
Task::RequestBlocksHeaders(1),
|
||||
Task::RequestMemoryPool(1)
|
||||
Task::RequestMemoryPool(1),
|
||||
Task::RequestBlocksHeaders(2),
|
||||
Task::RequestMemoryPool(2),
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use std::sync::Arc;
|
||||
use std::collections::HashMap;
|
||||
use parking_lot::Mutex;
|
||||
use chain::{Block, BlockHeader, RepresentH256};
|
||||
use chain::{Block, BlockHeader};
|
||||
use message::common::{InventoryVector, InventoryType};
|
||||
use message::types;
|
||||
use primitives::hash::H256;
|
||||
|
@ -30,8 +30,6 @@ pub enum Task {
|
|||
RequestMemoryPool(usize),
|
||||
/// Send block.
|
||||
SendBlock(usize, Block, ServerTaskIndex),
|
||||
/// Broadcast block.
|
||||
BroadcastBlocksHashes(Vec<H256>),
|
||||
/// Send notfound
|
||||
SendNotFound(usize, Vec<InventoryVector>, ServerTaskIndex),
|
||||
/// Send inventory
|
||||
|
@ -134,20 +132,6 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
|
|||
connection.send_block(&block_message);
|
||||
}
|
||||
},
|
||||
Task::BroadcastBlocksHashes(blocks_hashes) => {
|
||||
let inventory = types::Inv {
|
||||
inventory: blocks_hashes.into_iter().map(|h| InventoryVector {
|
||||
inv_type: InventoryType::MessageBlock,
|
||||
hash: h,
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
|
||||
for (peer_index, connection) in self.peers.iter() {
|
||||
trace!(target: "sync", "Sending inventory with {} blocks hashes to peer#{}", inventory.inventory.len(), peer_index);
|
||||
connection.send_inventory(&inventory);
|
||||
}
|
||||
},
|
||||
Task::SendNotFound(peer_index, unknown_inventory, id) => {
|
||||
let notfound = types::NotFound {
|
||||
inventory: unknown_inventory,
|
||||
|
|
|
@ -185,7 +185,6 @@ mod tests {
|
|||
use std::collections::HashSet;
|
||||
use super::{ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig, manage_synchronization_peers_blocks,
|
||||
manage_unknown_orphaned_blocks, manage_orphaned_transactions};
|
||||
use chain::RepresentH256;
|
||||
use synchronization_peers::Peers;
|
||||
use primitives::hash::H256;
|
||||
use test_data;
|
||||
|
|
|
@ -6,6 +6,8 @@ use time::precise_time_s;
|
|||
|
||||
/// Max peer failures # before excluding from sync process
|
||||
const MAX_PEER_FAILURES: usize = 2;
|
||||
/// Max last blocks to store for given peer
|
||||
const MAX_LAST_BLOCKS_TO_STORE: usize = 64;
|
||||
|
||||
/// Set of peers selected for synchronization.
|
||||
#[derive(Debug)]
|
||||
|
@ -24,6 +26,8 @@ pub struct Peers {
|
|||
inventory_requests: HashSet<usize>,
|
||||
/// Last inventory message time from peer.
|
||||
inventory_requests_order: LinkedHashMap<usize, f64>,
|
||||
/// Last blocks from peer
|
||||
last_block_responses: HashMap<usize, LinkedHashMap<H256, ()>>,
|
||||
}
|
||||
|
||||
/// Information on synchronization peers
|
||||
|
@ -48,6 +52,7 @@ impl Peers {
|
|||
blocks_requests_order: LinkedHashMap::new(),
|
||||
inventory_requests: HashSet::new(),
|
||||
inventory_requests_order: LinkedHashMap::new(),
|
||||
last_block_responses: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -73,19 +78,27 @@ impl Peers {
|
|||
|
||||
/// Get all peers
|
||||
pub fn all_peers(&self) -> Vec<usize> {
|
||||
self.idle.iter().cloned()
|
||||
let mut unique: Vec<_> = self.idle.iter().cloned()
|
||||
.chain(self.unuseful.iter().cloned())
|
||||
.chain(self.blocks_requests.keys().cloned())
|
||||
.chain(self.inventory_requests.iter().cloned())
|
||||
.collect()
|
||||
.collect();
|
||||
// need stable (for tests) && unique peers here, as blocks_requests can intersect with inventory_requests
|
||||
unique.sort();
|
||||
unique.dedup();
|
||||
unique
|
||||
}
|
||||
|
||||
/// Get useful peers
|
||||
pub fn useful_peers(&self) -> Vec<usize> {
|
||||
self.idle.iter().cloned()
|
||||
let mut unique: Vec<_> = self.idle.iter().cloned()
|
||||
.chain(self.blocks_requests.keys().cloned())
|
||||
.chain(self.inventory_requests.iter().cloned())
|
||||
.collect()
|
||||
.collect();
|
||||
// need stable (for tests) && unique peers here, as blocks_requests can intersect with inventory_requests
|
||||
unique.sort();
|
||||
unique.dedup();
|
||||
unique
|
||||
}
|
||||
|
||||
/// Get idle peers for inventory request.
|
||||
|
@ -125,6 +138,14 @@ impl Peers {
|
|||
self.blocks_requests.get(&peer_index).cloned()
|
||||
}
|
||||
|
||||
/// True if peer already has block with this hash
|
||||
pub fn has_block_with_hash(&self, peer_index: usize, hash: &H256) -> bool {
|
||||
self.last_block_responses
|
||||
.get(&peer_index)
|
||||
.map(|h| h.contains_key(hash))
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
/// Mark peer as useful.
|
||||
pub fn useful_peer(&mut self, peer_index: usize) {
|
||||
// if peer is unknown => insert to idle queue
|
||||
|
@ -162,6 +183,7 @@ impl Peers {
|
|||
self.blocks_requests_order.remove(&peer_index);
|
||||
self.inventory_requests.remove(&peer_index);
|
||||
self.inventory_requests_order.remove(&peer_index);
|
||||
self.last_block_responses.remove(&peer_index);
|
||||
peer_blocks_requests
|
||||
.map(|hs| hs.into_iter().collect())
|
||||
}
|
||||
|
@ -188,6 +210,16 @@ impl Peers {
|
|||
if try_mark_as_idle {
|
||||
self.try_mark_idle(peer_index);
|
||||
}
|
||||
|
||||
// TODO: add test for it
|
||||
// remember that peer knows about this block
|
||||
let last_block_responses_entry = self.last_block_responses.entry(peer_index).or_insert_with(LinkedHashMap::default);
|
||||
if !last_block_responses_entry.contains_key(block_hash) {
|
||||
if last_block_responses_entry.len() == MAX_LAST_BLOCKS_TO_STORE {
|
||||
last_block_responses_entry.pop_front();
|
||||
}
|
||||
last_block_responses_entry.insert(block_hash.clone(), ());
|
||||
}
|
||||
}
|
||||
|
||||
/// Inventory received from peer.
|
||||
|
|
|
@ -472,7 +472,7 @@ pub mod tests {
|
|||
use db;
|
||||
use test_data;
|
||||
use primitives::hash::H256;
|
||||
use chain::{Transaction, RepresentH256};
|
||||
use chain::Transaction;
|
||||
use message::types;
|
||||
use message::common::{InventoryVector, InventoryType};
|
||||
use synchronization_executor::Task;
|
||||
|
|
|
@ -2,7 +2,7 @@ use std::thread;
|
|||
use std::sync::Arc;
|
||||
use std::sync::mpsc::{channel, Sender, Receiver};
|
||||
use parking_lot::Mutex;
|
||||
use chain::{Block, Transaction, RepresentH256};
|
||||
use chain::{Block, Transaction};
|
||||
use message::common::ConsensusParams;
|
||||
use primitives::hash::H256;
|
||||
use verification::{ChainVerifier, Verify as VerificationVerify};
|
||||
|
@ -151,7 +151,7 @@ pub mod tests {
|
|||
use std::sync::Arc;
|
||||
use std::collections::HashMap;
|
||||
use parking_lot::Mutex;
|
||||
use chain::{Block, Transaction, RepresentH256};
|
||||
use chain::{Block, Transaction};
|
||||
use synchronization_client::SynchronizationClientCore;
|
||||
use synchronization_executor::tests::DummyTaskExecutor;
|
||||
use primitives::hash::H256;
|
||||
|
|
Loading…
Reference in New Issue