Merge pull request #246 from ethcore/sync_ask_fast_peers

Dealing with slow peers + empty verification queue
This commit is contained in:
Nikolay Volf 2016-12-02 12:32:38 +03:00 committed by GitHub
commit 1ddc962c42
5 changed files with 176 additions and 9 deletions

View File

@ -82,6 +82,11 @@ impl HashQueue {
self.set.contains(hash)
}
/// Returns n elements from the front of the queue
pub fn front_n(&self, n: u32) -> Vec<H256> {
self.queue.iter().cloned().take(n as usize).collect()
}
/// Removes element from the front of the queue.
pub fn pop_front(&mut self) -> Option<H256> {
match self.queue.pop_front() {
@ -255,6 +260,11 @@ impl HashQueueChain {
None
}
/// Returns n elements from the front of the given queue
pub fn front_n_at(&self, queue_index: usize, n: u32) -> Vec<H256> {
self.chain[queue_index].front_n(n)
}
/// Remove a number of hashes from the front of the given queue.
pub fn pop_front_n_at(&mut self, queue_index: usize, n: u32) -> Vec<H256> {
self.chain[queue_index].pop_front_n(n)
@ -373,4 +383,14 @@ mod tests {
assert_eq!(chain.contains_in(&H256::from(5)), Some(2));
assert_eq!(chain.contains_in(&H256::from(9)), None);
}
#[test]
fn hash_queue_front_n() {
let mut queue = HashQueue::new();
queue.push_back_n(vec![H256::from(0), H256::from(1)]);
assert_eq!(queue.front_n(3), vec![H256::from(0), H256::from(1)]);
assert_eq!(queue.front_n(3), vec![H256::from(0), H256::from(1)]);
assert_eq!(queue.pop_front_n(3), vec![H256::from(0), H256::from(1)]);
assert_eq!(queue.pop_front_n(3), vec![]);
}
}

View File

@ -196,6 +196,14 @@ impl Chain {
}
}
/// Get n best blocks of given state
pub fn best_n_of_blocks_state(&self, state: BlockState, n: u32) -> Vec<H256> {
match state {
BlockState::Scheduled | BlockState::Requested | BlockState::Verifying => self.hash_chain.front_n_at(state.to_queue_index(), n),
_ => unreachable!("must be checked by caller"),
}
}
/// Get best block
pub fn best_block(&self) -> db::BestBlock {
match self.hash_chain.back() {

View File

@ -156,6 +156,14 @@ const MAX_VERIFYING_BLOCKS: u32 = 256;
const MIN_BLOCKS_IN_REQUEST: u32 = 32;
/// Maximum number of blocks to request from peer
const MAX_BLOCKS_IN_REQUEST: u32 = 128;
/// Number of blocks to receive since synchronization start to begin duplicating blocks requests
const NEAR_EMPTY_VERIFICATION_QUEUE_THRESHOLD_BLOCKS: usize = 20;
/// Number of seconds left before verification queue will be empty to count it as 'near empty queue'
const NEAR_EMPTY_VERIFICATION_QUEUE_THRESHOLD_S: f64 = 20_f64;
/// Number of blocks to inspect when calculating average sync speed
const SYNC_SPEED_BLOCKS_TO_INSPECT: usize = 512;
/// Number of blocks to inspect when calculating average blocks speed
const BLOCKS_SPEED_BLOCKS_TO_INSPECT: usize = 512;
/// Synchronization state
#[derive(Debug, Clone, Copy)]
@ -298,6 +306,10 @@ pub struct SynchronizationClientCore<T: TaskExecutor> {
verifying_blocks_futures: HashMap<usize, (HashSet<H256>, Vec<BoxFuture<(), ()>>)>,
/// Hashes of items we do not want to relay after verification is completed
do_not_relay: HashSet<H256>,
/// Block processing speed meter
block_speed_meter: AverageSpeedMeter,
/// Block synchronization speed meter
sync_speed_meter: AverageSpeedMeter,
}
/// Block headers provider from `headers` message
@ -312,6 +324,18 @@ pub struct MessageBlockHeadersProvider<'a> {
headers_order: Vec<H256>,
}
/// Speed meter
struct AverageSpeedMeter {
/// Number of items to inspect
inspect_items: usize,
/// Number of items currently inspected
inspected_items: VecDeque<f64>,
/// Current speed
speed: f64,
/// Last timestamp
last_timestamp: Option<f64>,
}
impl Config {
pub fn new() -> Self {
Config {
@ -689,6 +713,8 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
fn on_peer_block(&mut self, peer_index: usize, block: IndexedBlock) -> Option<VecDeque<(H256, IndexedBlock)>> {
let block_hash = block.hash().clone();
// update synchronization speed
self.sync_speed_meter.checkpoint();
// update peers to select next tasks
self.peers.on_block_received(peer_index, &block_hash);
@ -807,18 +833,86 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
}
}
// check if we can move some blocks from scheduled to requested queue
let blocks_idle_peers_len = blocks_idle_peers.len() as u32;
if blocks_idle_peers_len != 0 {
let mut chain = self.chain.write();
let scheduled_hashes_len = chain.length_of_blocks_state(BlockState::Scheduled);
// check if verification queue is empty/almost empty
// && there are pending blocks requests
// && there are idle block peers
// => we may need to duplicate pending blocks requests to idle peers
// this will result in additional network load, but verification queue will be filled up earlier
// it is very useful when dealing with large blocks + some peer is responding, but with very low speed:
// requested: [B1, B2, B3, B4] from peer1
// orphans: [B5, B6, B7, B8, ... B1024] ===> 1GB of RAM
// verifying: None <=== we are waiting for B1 to come
// idle: [peer2]
// peer1 responds with single block in ~20 seconds
// => we could ask idle peer2 about [B1, B2, B3, B4]
// these requests has priority over new blocks requests below
let requested_hashes_len = chain.length_of_blocks_state(BlockState::Requested);
let verifying_hashes_len = chain.length_of_blocks_state(BlockState::Verifying);
if requested_hashes_len + verifying_hashes_len < MAX_REQUESTED_BLOCKS + MAX_VERIFYING_BLOCKS && scheduled_hashes_len != 0 {
let chunk_size = min(MAX_BLOCKS_IN_REQUEST, max(scheduled_hashes_len / blocks_idle_peers_len, MIN_BLOCKS_IN_REQUEST));
let hashes_to_request_len = chunk_size * blocks_idle_peers_len;
let hashes_to_request = chain.request_blocks_hashes(hashes_to_request_len);
blocks_requests = Some(hashes_to_request);
if requested_hashes_len != 0 {
let verification_speed: f64 = self.block_speed_meter.speed();
let synchronization_speed: f64 = self.sync_speed_meter.speed();
// estimate time when verification queue will be empty
let verification_queue_will_be_empty_in = if verifying_hashes_len == 0 {
// verification queue is already empty
if self.block_speed_meter.inspected_items_len() < NEAR_EMPTY_VERIFICATION_QUEUE_THRESHOLD_BLOCKS {
// the very beginning of synchronization
// => peers have not yet responded with a single requested blocks
60_f64
} else {
// blocks were are already received
// => bad situation
0_f64
}
} else {
if verification_speed < 0.01_f64 {
// verification speed is too slow
60_f64
} else {
// blocks / (blocks / second) -> second
verifying_hashes_len as f64 / verification_speed
}
};
// estimate time when all synchronization requests will complete
let synchronization_queue_will_be_full_in = if synchronization_speed < 0.01_f64 {
// synchronization speed is too slow
60_f64
} else {
// blocks / (blocks / second) -> second
requested_hashes_len as f64 / synchronization_speed
};
// if verification queue will be empty before all synchronization requests will be completed
// + do not spam with duplicated blocks requests if blocks are too big && there are still blocks left for NEAR_EMPTY_VERIFICATION_QUEUE_THRESHOLD_S
// => duplicate blocks requests
if synchronization_queue_will_be_full_in > verification_queue_will_be_empty_in &&
verification_queue_will_be_empty_in < NEAR_EMPTY_VERIFICATION_QUEUE_THRESHOLD_S {
// blocks / second * second -> blocks
let hashes_requests_to_duplicate_len = synchronization_speed * (synchronization_queue_will_be_full_in - verification_queue_will_be_empty_in);
// do not ask for too many blocks
let hashes_requests_to_duplicate_len = min(MAX_BLOCKS_IN_REQUEST, hashes_requests_to_duplicate_len as u32);
// ask for at least 1 block
let hashes_requests_to_duplicate_len = max(1, min(requested_hashes_len, hashes_requests_to_duplicate_len));
blocks_requests = Some(chain.best_n_of_blocks_state(BlockState::Requested, hashes_requests_to_duplicate_len));
trace!(target: "sync", "Duplicating {} blocks requests. Sync speed: {} * {}, blocks speed: {} * {}.", hashes_requests_to_duplicate_len, synchronization_speed, requested_hashes_len, verification_speed, verifying_hashes_len);
}
}
// check if we can move some blocks from scheduled to requested queue
{
let scheduled_hashes_len = chain.length_of_blocks_state(BlockState::Scheduled);
if requested_hashes_len + verifying_hashes_len < MAX_REQUESTED_BLOCKS + MAX_VERIFYING_BLOCKS && scheduled_hashes_len != 0 {
let chunk_size = min(MAX_BLOCKS_IN_REQUEST, max(scheduled_hashes_len / blocks_idle_peers_len, MIN_BLOCKS_IN_REQUEST));
let hashes_to_request_len = chunk_size * blocks_idle_peers_len;
let hashes_to_request = chain.request_blocks_hashes(hashes_to_request_len);
match blocks_requests {
Some(ref mut blocks_requests) => blocks_requests.extend(hashes_to_request),
None => blocks_requests = Some(hashes_to_request),
}
}
}
}
}
@ -854,9 +948,12 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor {
/// Process successful block verification
fn on_block_verification_success(&mut self, block: IndexedBlock) -> Option<Vec<VerificationTask>> {
// update block processing speed
self.block_speed_meter.checkpoint();
// insert block to the storage
let hash = block.hash();
let needs_relay = !self.do_not_relay.remove(hash);
// insert block to the storage
match {
let mut chain = self.chain.write();
@ -995,6 +1092,8 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
verifying_blocks_by_peer: HashMap::new(),
verifying_blocks_futures: HashMap::new(),
do_not_relay: HashSet::new(),
block_speed_meter: AverageSpeedMeter::with_inspect_items(SYNC_SPEED_BLOCKS_TO_INSPECT),
sync_speed_meter: AverageSpeedMeter::with_inspect_items(BLOCKS_SPEED_BLOCKS_TO_INSPECT),
}
));
@ -1328,6 +1427,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
fn prepare_blocks_requests_tasks(&mut self, peers: Vec<usize>, mut hashes: Vec<H256>) -> Vec<Task> {
use std::mem::swap;
// TODO: ask most fast peers for hashes at the beginning of `hashes`
let chunk_size = min(MAX_BLOCKS_IN_REQUEST, max(hashes.len() as u32, MIN_BLOCKS_IN_REQUEST));
let last_peer_index = peers.len() - 1;
let mut tasks: Vec<Task> = Vec::new();
@ -1474,6 +1574,44 @@ impl<'a> BlockHeaderProvider for MessageBlockHeadersProvider<'a> {
}
}
impl AverageSpeedMeter {
pub fn with_inspect_items(inspect_items: usize) -> Self {
assert!(inspect_items > 0);
AverageSpeedMeter {
inspect_items: inspect_items,
inspected_items: VecDeque::with_capacity(inspect_items),
speed: 0_f64,
last_timestamp: None,
}
}
pub fn speed(&self) -> f64 {
let items_per_second = 1_f64 / self.speed;
if items_per_second.is_normal() { items_per_second } else { 0_f64 }
}
pub fn inspected_items_len(&self) -> usize {
self.inspected_items.len()
}
pub fn checkpoint(&mut self) {
// if inspected_items is already full => remove oldest item from average
if self.inspected_items.len() == self.inspect_items {
let oldest = self.inspected_items.pop_front().expect("len() is not zero; qed");
self.speed = (self.inspect_items as f64 * self.speed - oldest) / (self.inspect_items as f64 - 1_f64);
}
// add new item
let now = time::precise_time_s();
if let Some(last_timestamp) = self.last_timestamp {
let newest = now - last_timestamp;
self.speed = (self.inspected_items.len() as f64 * self.speed + newest) / (self.inspected_items.len() as f64 + 1_f64);
self.inspected_items.push_back(newest);
}
self.last_timestamp = Some(now);
}
}
#[cfg(test)]
pub mod tests {
use std::sync::Arc;

View File

@ -75,6 +75,7 @@ impl Default for ManageOrphanTransactionsConfig {
pub fn manage_synchronization_peers_blocks(config: &ManagePeersConfig, peers: &mut Peers) -> Option<Vec<H256>> {
let mut blocks_to_request: Vec<H256> = Vec::new();
let now = precise_time_s();
// reset tasks for peers, which has not responded during given period
for (worst_peer_index, worst_peer_time) in peers.ordered_blocks_requests() {
// check if peer has not responded within given time

View File

@ -121,7 +121,7 @@ impl Peers {
peers.difference(&except).cloned().collect()
}
/// Get idle peers.
/// Get idle peers for blocks request.
pub fn idle_peers_for_blocks(&self) -> Vec<usize> {
let peers: HashSet<_> = self.idle.iter().cloned()
.chain(self.inventory_requests.iter().cloned())