From a714f9c172c912f6dd7dc01833615ea3eddc9a2e Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Thu, 1 Dec 2016 19:40:23 +0300 Subject: [PATCH 1/2] dealing with slow peers + empty verification queue --- sync/src/hash_queue.rs | 20 ++++ sync/src/synchronization_chain.rs | 8 ++ sync/src/synchronization_client.rs | 156 ++++++++++++++++++++++++++-- sync/src/synchronization_manager.rs | 1 + sync/src/synchronization_peers.rs | 2 +- 5 files changed, 178 insertions(+), 9 deletions(-) diff --git a/sync/src/hash_queue.rs b/sync/src/hash_queue.rs index 14b7acc4..462a461a 100644 --- a/sync/src/hash_queue.rs +++ b/sync/src/hash_queue.rs @@ -82,6 +82,11 @@ impl HashQueue { self.set.contains(hash) } + /// Returns n elements from the front of the queue + pub fn front_n(&self, n: u32) -> Vec { + self.queue.iter().cloned().take(n as usize).collect() + } + /// Removes element from the front of the queue. pub fn pop_front(&mut self) -> Option { match self.queue.pop_front() { @@ -255,6 +260,11 @@ impl HashQueueChain { None } + /// Returns n elements from the front of the given queue + pub fn front_n_at(&self, queue_index: usize, n: u32) -> Vec { + self.chain[queue_index].front_n(n) + } + /// Remove a number of hashes from the front of the given queue. pub fn pop_front_n_at(&mut self, queue_index: usize, n: u32) -> Vec { self.chain[queue_index].pop_front_n(n) @@ -373,4 +383,14 @@ mod tests { assert_eq!(chain.contains_in(&H256::from(5)), Some(2)); assert_eq!(chain.contains_in(&H256::from(9)), None); } + + #[test] + fn hash_queue_front_n() { + let mut queue = HashQueue::new(); + queue.push_back_n(vec![H256::from(0), H256::from(1)]); + assert_eq!(queue.front_n(3), vec![H256::from(0), H256::from(1)]); + assert_eq!(queue.front_n(3), vec![H256::from(0), H256::from(1)]); + assert_eq!(queue.pop_front_n(3), vec![H256::from(0), H256::from(1)]); + assert_eq!(queue.pop_front_n(3), vec![]); + } } diff --git a/sync/src/synchronization_chain.rs b/sync/src/synchronization_chain.rs index cb6c6422..c58284b6 100644 --- a/sync/src/synchronization_chain.rs +++ b/sync/src/synchronization_chain.rs @@ -196,6 +196,14 @@ impl Chain { } } + /// Get n best blocks of given state + pub fn best_n_of_blocks_state(&self, state: BlockState, n: u32) -> Vec { + match state { + BlockState::Scheduled | BlockState::Requested | BlockState::Verifying => self.hash_chain.front_n_at(state.to_queue_index(), n), + _ => unreachable!("must be checked by caller"), + } + } + /// Get best block pub fn best_block(&self) -> db::BestBlock { match self.hash_chain.back() { diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index a7d3e0b2..9a913192 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -156,6 +156,14 @@ const MAX_VERIFYING_BLOCKS: u32 = 256; const MIN_BLOCKS_IN_REQUEST: u32 = 32; /// Maximum number of blocks to request from peer const MAX_BLOCKS_IN_REQUEST: u32 = 128; +/// Number of blocks to receive since synchronization start to begin duplicating blocks requests +const NEAR_EMPTY_VERIFICATION_QUEUE_THRESHOLD_BLOCKS: usize = 20; +/// Number of seconds left before verification queue will be empty to count it as 'near empty queue' +const NEAR_EMPTY_VERIFICATION_QUEUE_THRESHOLD_S: f64 = 20_f64; +/// Number of blocks to inspect when calculating average sync speed +const SYNC_SPEED_BLOCKS_TO_INSPECT: usize = 512; +/// Number of blocks to inspect when calculating average blocks speed +const BLOCKS_SPEED_BLOCKS_TO_INSPECT: usize = 512; /// Synchronization state #[derive(Debug, Clone, Copy)] @@ -298,6 +306,10 @@ pub struct SynchronizationClientCore { verifying_blocks_futures: HashMap, Vec>)>, /// Hashes of items we do not want to relay after verification is completed do_not_relay: HashSet, + /// Block processing speed meter + block_speed_meter: AverageSpeedMeter, + /// Block synchronization speed meter + sync_speed_meter: AverageSpeedMeter, } /// Block headers provider from `headers` message @@ -312,6 +324,18 @@ pub struct MessageBlockHeadersProvider<'a> { headers_order: Vec, } +/// Speed meter +struct AverageSpeedMeter { + /// Number of items to inspect + inspect_items: usize, + /// Number of items currently inspected + inspected_items: VecDeque, + /// Current speed + speed: f64, + /// Last timestamp + last_timestamp: Option, +} + impl Config { pub fn new() -> Self { Config { @@ -689,6 +713,8 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { fn on_peer_block(&mut self, peer_index: usize, block: IndexedBlock) -> Option> { let block_hash = block.hash().clone(); + // update synchronization speed + self.sync_speed_meter.checkpoint(); // update peers to select next tasks self.peers.on_block_received(peer_index, &block_hash); @@ -807,18 +833,88 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { } } - // check if we can move some blocks from scheduled to requested queue let blocks_idle_peers_len = blocks_idle_peers.len() as u32; if blocks_idle_peers_len != 0 { let mut chain = self.chain.write(); - let scheduled_hashes_len = chain.length_of_blocks_state(BlockState::Scheduled); + + // check if verification queue is empty/almost empty + // && there are pending blocks requests + // && there are idle block peers + // => we may need to duplicate pending blocks requests to idle peers + // this will result in additional network load, but verification queue will be filled up earlier + // it is very useful when dealing with large blocks + some peer is responding, but with very low speed: + // requested: [B1, B2, B3, B4] from peer1 + // orphans: [B5, B6, B7, B8, ... B1024] ===> 1GB of RAM + // verifying: None <=== we are waiting for B1 to come + // idle: [peer2] + // peer1 responds with single block in ~20 seconds + // => we could ask idle peer2 about [B3, B4] + // => then after receiving, if not yet received B2 => about B2 + // => then finally about B1 + // these requests has priority over new blocks requests below let requested_hashes_len = chain.length_of_blocks_state(BlockState::Requested); let verifying_hashes_len = chain.length_of_blocks_state(BlockState::Verifying); - if requested_hashes_len + verifying_hashes_len < MAX_REQUESTED_BLOCKS + MAX_VERIFYING_BLOCKS && scheduled_hashes_len != 0 { - let chunk_size = min(MAX_BLOCKS_IN_REQUEST, max(scheduled_hashes_len / blocks_idle_peers_len, MIN_BLOCKS_IN_REQUEST)); - let hashes_to_request_len = chunk_size * blocks_idle_peers_len; - let hashes_to_request = chain.request_blocks_hashes(hashes_to_request_len); - blocks_requests = Some(hashes_to_request); + if requested_hashes_len != 0 { + let verification_speed: f64 = self.block_speed_meter.speed(); + let synchronization_speed: f64 = self.sync_speed_meter.speed(); + // estimate time when verification queue will be empty + let verification_queue_will_be_empty_in = if verifying_hashes_len == 0 { + // verification queue is already empty + if self.block_speed_meter.inspected_items_len() < NEAR_EMPTY_VERIFICATION_QUEUE_THRESHOLD_BLOCKS { + // the very beginning of synchronization + // => peers have not yet responded with a single requested blocks + 60_f64 + } else { + // blocks were are already received + // => bad situation + 0_f64 + } + } else { + if verification_speed < 0.01_f64 { + // verification speed is too slow + 60_f64 + } else { + // blocks / (blocks / second) -> second + verifying_hashes_len as f64 / verification_speed + } + }; + // estimate time when all synchronization requests will complete + let synchronization_queue_will_be_full_in = if synchronization_speed < 0.01_f64 { + // synchronization speed is too slow + 60_f64 + } else { + // blocks / (blocks / second) -> second + requested_hashes_len as f64 / synchronization_speed + }; + // if verification queue will be empty before all synchronization requests will be completed + // + do not spam with duplicated blocks requests if blocks are too big && there are still blocks left for NEAR_EMPTY_VERIFICATION_QUEUE_THRESHOLD_S + // => duplicate blocks requests + if synchronization_queue_will_be_full_in > verification_queue_will_be_empty_in && + verification_queue_will_be_empty_in < NEAR_EMPTY_VERIFICATION_QUEUE_THRESHOLD_S { + // blocks / second * second -> blocks + let hashes_requests_to_duplicate_len = synchronization_speed * (synchronization_queue_will_be_full_in - verification_queue_will_be_empty_in); + // do not ask for too many blocks + let hashes_requests_to_duplicate_len = min(MAX_BLOCKS_IN_REQUEST, hashes_requests_to_duplicate_len as u32); + // ask for at least 1 block + let hashes_requests_to_duplicate_len = max(1, min(requested_hashes_len, hashes_requests_to_duplicate_len)); + blocks_requests = Some(chain.best_n_of_blocks_state(BlockState::Requested, hashes_requests_to_duplicate_len)); + + trace!(target: "sync", "Duplicating {} blocks requests. Sync speed: {} * {}, blocks speed: {} * {}.", hashes_requests_to_duplicate_len, synchronization_speed, requested_hashes_len, verification_speed, verifying_hashes_len); + } + } + + // check if we can move some blocks from scheduled to requested queue + { + let scheduled_hashes_len = chain.length_of_blocks_state(BlockState::Scheduled); + if requested_hashes_len + verifying_hashes_len < MAX_REQUESTED_BLOCKS + MAX_VERIFYING_BLOCKS && scheduled_hashes_len != 0 { + let chunk_size = min(MAX_BLOCKS_IN_REQUEST, max(scheduled_hashes_len / blocks_idle_peers_len, MIN_BLOCKS_IN_REQUEST)); + let hashes_to_request_len = chunk_size * blocks_idle_peers_len; + let hashes_to_request = chain.request_blocks_hashes(hashes_to_request_len); + match blocks_requests { + Some(ref mut blocks_requests) => blocks_requests.extend(hashes_to_request), + None => blocks_requests = Some(hashes_to_request), + } + } } } } @@ -854,9 +950,12 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { impl VerificationSink for SynchronizationClientCore where T: TaskExecutor { /// Process successful block verification fn on_block_verification_success(&mut self, block: IndexedBlock) -> Option> { + // update block processing speed + self.block_speed_meter.checkpoint(); + + // insert block to the storage let hash = block.hash(); let needs_relay = !self.do_not_relay.remove(hash); - // insert block to the storage match { let mut chain = self.chain.write(); @@ -995,6 +1094,8 @@ impl SynchronizationClientCore where T: TaskExecutor { verifying_blocks_by_peer: HashMap::new(), verifying_blocks_futures: HashMap::new(), do_not_relay: HashSet::new(), + block_speed_meter: AverageSpeedMeter::with_inspect_items(SYNC_SPEED_BLOCKS_TO_INSPECT), + sync_speed_meter: AverageSpeedMeter::with_inspect_items(BLOCKS_SPEED_BLOCKS_TO_INSPECT), } )); @@ -1328,6 +1429,7 @@ impl SynchronizationClientCore where T: TaskExecutor { fn prepare_blocks_requests_tasks(&mut self, peers: Vec, mut hashes: Vec) -> Vec { use std::mem::swap; + // TODO: ask most fast peers for hashes at the beginning of `hashes` let chunk_size = min(MAX_BLOCKS_IN_REQUEST, max(hashes.len() as u32, MIN_BLOCKS_IN_REQUEST)); let last_peer_index = peers.len() - 1; let mut tasks: Vec = Vec::new(); @@ -1474,6 +1576,44 @@ impl<'a> BlockHeaderProvider for MessageBlockHeadersProvider<'a> { } } +impl AverageSpeedMeter { + pub fn with_inspect_items(inspect_items: usize) -> Self { + assert!(inspect_items > 0); + AverageSpeedMeter { + inspect_items: inspect_items, + inspected_items: VecDeque::with_capacity(inspect_items), + speed: 0_f64, + last_timestamp: None, + } + } + + pub fn speed(&self) -> f64 { + let items_per_second = 1_f64 / self.speed; + if items_per_second.is_normal() { items_per_second } else { 0_f64 } + } + + pub fn inspected_items_len(&self) -> usize { + self.inspected_items.len() + } + + pub fn checkpoint(&mut self) { + // if inspected_items is already full => remove oldest item from average + if self.inspected_items.len() == self.inspect_items { + let oldest = self.inspected_items.pop_front().expect("len() is not zero; qed"); + self.speed = (self.inspect_items as f64 * self.speed - oldest) / (self.inspect_items as f64 - 1_f64); + } + + // add new item + let now = time::precise_time_s(); + if let Some(last_timestamp) = self.last_timestamp { + let newest = now - last_timestamp; + self.speed = (self.inspected_items.len() as f64 * self.speed + newest) / (self.inspected_items.len() as f64 + 1_f64); + self.inspected_items.push_back(newest); + } + self.last_timestamp = Some(now); + } +} + #[cfg(test)] pub mod tests { use std::sync::Arc; diff --git a/sync/src/synchronization_manager.rs b/sync/src/synchronization_manager.rs index b2a4adcc..e876955a 100644 --- a/sync/src/synchronization_manager.rs +++ b/sync/src/synchronization_manager.rs @@ -75,6 +75,7 @@ impl Default for ManageOrphanTransactionsConfig { pub fn manage_synchronization_peers_blocks(config: &ManagePeersConfig, peers: &mut Peers) -> Option> { let mut blocks_to_request: Vec = Vec::new(); let now = precise_time_s(); + // reset tasks for peers, which has not responded during given period for (worst_peer_index, worst_peer_time) in peers.ordered_blocks_requests() { // check if peer has not responded within given time diff --git a/sync/src/synchronization_peers.rs b/sync/src/synchronization_peers.rs index a7540896..47759385 100644 --- a/sync/src/synchronization_peers.rs +++ b/sync/src/synchronization_peers.rs @@ -121,7 +121,7 @@ impl Peers { peers.difference(&except).cloned().collect() } - /// Get idle peers. + /// Get idle peers for blocks request. pub fn idle_peers_for_blocks(&self) -> Vec { let peers: HashSet<_> = self.idle.iter().cloned() .chain(self.inventory_requests.iter().cloned()) From daacd3c7105258905b3db2400620d4245f5a8b64 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Fri, 2 Dec 2016 12:21:01 +0300 Subject: [PATCH 2/2] fixed doc --- sync/src/synchronization_client.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 9a913192..53860d29 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -848,9 +848,7 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { // verifying: None <=== we are waiting for B1 to come // idle: [peer2] // peer1 responds with single block in ~20 seconds - // => we could ask idle peer2 about [B3, B4] - // => then after receiving, if not yet received B2 => about B2 - // => then finally about B1 + // => we could ask idle peer2 about [B1, B2, B3, B4] // these requests has priority over new blocks requests below let requested_hashes_len = chain.length_of_blocks_state(BlockState::Requested); let verifying_hashes_len = chain.length_of_blocks_state(BlockState::Verifying);