fixed sync::scheduled queue

This commit is contained in:
Svyatoslav Nikolsky 2016-10-31 14:26:39 +03:00
parent 543d66ce03
commit 4e3ea35835
5 changed files with 189 additions and 39 deletions

View File

@ -14,12 +14,14 @@ pub enum HashPosition {
Inside,
}
/// Ordered queue with O(1) contains() && random access operations cost.
#[derive(Clone)]
pub struct HashQueue {
queue: VecDeque<H256>,
set: HashSet<H256>
}
/// Chain of linked queues. First queue has index zero.
pub struct HashQueueChain {
chain: Vec<HashQueue>,
}
@ -32,22 +34,54 @@ impl HashQueue {
}
}
/// Returns len of the given queue.
pub fn len(&self) -> usize {
self.queue.len()
}
/// Returns true if queue is empty.
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
pub fn back<'a>(&'a self) -> Option<&'a H256> {
self.queue.back()
/// Returns front element from the given queue.
pub fn front(&self) -> Option<H256> {
self.queue.front().cloned()
}
/// Returns back element from the given queue.
pub fn back(&self) -> Option<H256> {
self.queue.back().cloned()
}
/// Returns previous-to back element from the given queue.
pub fn pre_back(&self) -> Option<H256> {
let queue_len = self.queue.len();
if queue_len <= 1 {
return None;
}
Some(self.queue[queue_len - 2].clone())
}
/// Returns n-th element (n is starting from 0), starting from the back-element in the queue.
/// If there are no n-th element - returns (n-1) element & etc.
pub fn back_skip_n(&self, n: usize) -> Option<H256> {
let queue_len = self.queue.len();
if queue_len == 0 {
return None;
}
if n + 1 > queue_len {
return Some(self.queue[0].clone())
}
return Some(self.queue[queue_len - n - 1].clone())
}
/// Returns true if queue contains element.
pub fn contains(&self, hash: &H256) -> bool {
self.set.contains(hash)
}
/// Removes element from the front of the queue.
pub fn pop_front(&mut self) -> Option<H256> {
match self.queue.pop_front() {
Some(hash) => {
@ -58,6 +92,7 @@ impl HashQueue {
}
}
/// Removes n elements from the front of the queue.
pub fn pop_front_n(&mut self, n: usize) -> Vec<H256> {
let mut result: Vec<H256> = Vec::new();
for _ in 0..n {
@ -69,6 +104,7 @@ impl HashQueue {
result
}
/// Adds element to the back of the queue.
pub fn push_back(&mut self, hash: H256) {
if !self.set.insert(hash.clone()) {
panic!("must be checked by caller");
@ -76,12 +112,14 @@ impl HashQueue {
self.queue.push_back(hash);
}
/// Adds elements to the back of the queue.
pub fn push_back_n(&mut self, hashes: Vec<H256>) {
for hash in hashes {
self.push_back(hash);
}
}
/// Removes element from the queue, returning its position.
pub fn remove(&mut self, hash: &H256) -> HashPosition {
if !self.set.remove(hash) {
return HashPosition::Missing;
@ -103,6 +141,7 @@ impl HashQueue {
unreachable!()
}
/// Removes all elements from the queue.
pub fn remove_all(&mut self) {
self.queue.clear();
self.set.clear();
@ -118,6 +157,7 @@ impl Index<usize> for HashQueue {
}
impl HashQueueChain {
/// Creates chain with given number of queues.
pub fn with_number_of_queues(number_of_queues: usize) -> Self {
assert!(number_of_queues != 0);
HashQueueChain {
@ -125,30 +165,54 @@ impl HashQueueChain {
}
}
/// Returns length of the whole chain.
pub fn len(&self) -> usize {
self.chain.iter().fold(0, |total, chain| total + chain.len())
}
/// Returns length of the given queue.
pub fn len_of(&self, chain_index: usize) -> usize {
self.chain[chain_index].len()
}
/// Returns true if given queue is empty.
pub fn is_empty_at(&self, chain_index: usize) -> bool {
self.chain[chain_index].is_empty()
}
pub fn back_at(&self, chain_index: usize) -> Option<H256> {
/// Returns element at the front of the given queue.
pub fn front_at(&self, chain_index: usize) -> Option<H256> {
let ref queue = self.chain[chain_index];
queue.back().cloned()
queue.front()
}
/// Returns element at the back of the given queue.
pub fn back_at(&self, chain_index: usize) -> Option<H256> {
let ref queue = self.chain[chain_index];
queue.back()
}
/// Returns previous-to back element from the given queue.
pub fn pre_back_at(&self, chain_index: usize) -> Option<H256> {
let ref queue = self.chain[chain_index];
queue.pre_back()
}
/// Returns n-th element (n is starting from 0), starting from the back-element in given queue.
/// If there are no n-th element - returns (n-1) element & etc.
pub fn back_skip_n_at(&self, chain_index: usize, n: usize) -> Option<H256> {
let ref queue = self.chain[chain_index];
queue.back_skip_n(n)
}
/// Returns the back of the whole chain.
pub fn back(&self) -> Option<H256> {
let mut queue_index = self.chain.len() - 1;
loop {
let ref queue = self.chain[queue_index];
let queue_back = queue.back();
if queue_back.is_some() {
return queue_back.cloned();
return queue_back;
}
queue_index = queue_index - 1;
@ -158,10 +222,12 @@ impl HashQueueChain {
}
}
/// Checks if hash is contained in given queue.
pub fn is_contained_in(&self, queue_index: usize, hash: &H256) -> bool {
self.chain[queue_index].contains(hash)
}
/// Returns the index of queue, hash is contained in.
pub fn contains_in(&self, hash: &H256) -> Option<usize> {
for i in 0..self.chain.len() {
if self.chain[i].contains(hash) {
@ -171,22 +237,27 @@ impl HashQueueChain {
None
}
/// Remove a number of hashes from the front of the given queue.
pub fn pop_front_n_at(&mut self, queue_index: usize, n: usize) -> Vec<H256> {
self.chain[queue_index].pop_front_n(n)
}
/// Push hash onto the back of the given queue.
pub fn push_back_at(&mut self, queue_index: usize, hash: H256) {
self.chain[queue_index].push_back(hash)
}
/// Push a number of hashes onto the back of the given queue.
pub fn push_back_n_at(&mut self, queue_index: usize, hashes: Vec<H256>) {
self.chain[queue_index].push_back_n(hashes)
}
/// Remove hash from given queue.
pub fn remove_at(&mut self, queue_index: usize, hash: &H256) -> HashPosition {
self.chain[queue_index].remove(hash)
}
/// Remove all items from given queue.
pub fn remove_all_at(&mut self, queue_index: usize) {
self.chain[queue_index].remove_all();
}
@ -208,3 +279,41 @@ impl Index<usize> for HashQueueChain {
panic!("invalid index");
}
}
#[cfg(test)]
mod tests {
use primitives::hash::H256;
use super::{HashQueue, HashQueueChain, HashPosition};
#[test]
fn hash_queue_empty() {
let queue = HashQueue::new();
assert_eq!(queue.len(), 1);
assert_eq!(queue.is_empty(), true);
assert_eq!(queue.front(), None);
assert_eq!(queue.back(), None);
assert_eq!(queue.pre_back(), None);
assert_eq!(queue.back_skip_n(), None);
assert_eq!(queue.contains("000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f".into()), false);
assert_eq!(queue.pop_front(), None);
assert_eq!(queue.pop_front_n(100), vec![]);
assert_eq!(queue.remove("000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f".into()), HashPosition::Missing);
}
#[test]
fn hash_queue_chain_empty() {
let chain = HashQueueChain::with_number_of_queues(3);
assert_eq!(chain.len(), 0);
assert_eq!(chain.len_of(0), 0);
assert_eq!(chain.is_empty_at(0), true);
assert_eq!(chain.front_at(0), None);
assert_eq!(chain.back_at(0), None);
assert_eq!(chain.pre_back_at(0), None);
assert_eq!(chain.back_skip_n_at(0, 100), None);
assert_eq!(chain.back(), None);
assert_eq!(chain.is_contained_in(0, &"000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f".into()), false);
assert_eq!(chain.contains_in(&"000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f".into()), None);
assert_eq!(chain.pop_front_n_at(0, 100), vec![]);
assert_eq!(chain.remove_at(0, &"000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f".into()), HashPosition::Missing);
}
}

View File

@ -8,7 +8,7 @@ use p2p::OutboundSyncConnectionRef;
use message::common::InventoryType;
use message::types;
use synchronization::{Synchronization, SynchronizationRef, Config as SynchronizationConfig, Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor};
use synchronization_chain::{Chain, ChainRef, BlockState};
use synchronization_chain::{Chain, ChainRef};
use synchronization_executor::LocalSynchronizationTaskExecutor;
use best_block::BestBlock;
@ -74,19 +74,15 @@ impl LocalNode {
// (2) with 500 entries
// what is (1)?
// process unknown blocks
let unknown_blocks: Vec<_> = {
let chain = self.chain.read();
message.inventory.iter()
.filter(|item| item.inv_type == InventoryType::MessageBlock)
.filter(|item| chain.block_state(&item.hash) == BlockState::Unknown)
.map(|item| item.hash.clone())
.collect()
};
// process blocks first
let blocks_inventory: Vec<_> = message.inventory.iter()
.filter(|item| item.inv_type == InventoryType::MessageBlock)
.map(|item| item.hash.clone())
.collect();
// if there are unknown blocks => start synchronizing with peer
if !unknown_blocks.is_empty() {
self.sync.lock().on_unknown_blocks(peer_index, unknown_blocks);
if !blocks_inventory.is_empty() {
self.sync.lock().on_new_blocks_inventory(peer_index, blocks_inventory);
}
// TODO: process unknown transactions, etc...
@ -104,7 +100,8 @@ impl LocalNode {
trace!(target: "sync", "Got `getheaders` message from peer#{}", peer_index);
}
pub fn on_peer_transaction(&self, _peer_index: usize, _message: types::Tx) {
pub fn on_peer_transaction(&self, peer_index: usize, message: types::Tx) {
trace!(target: "sync", "Got `transaction` message from peer#{}. Transaction hash: {}", peer_index, message.transaction.hash());
}
pub fn on_peer_block(&self, peer_index: usize, message: types::Block) {

View File

@ -214,8 +214,8 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
}
/// Try to queue synchronization of unknown blocks when new inventory is received.
pub fn on_unknown_blocks(&mut self, peer_index: usize, peer_hashes: Vec<H256>) {
self.process_unknown_blocks(peer_index, peer_hashes);
pub fn on_new_blocks_inventory(&mut self, peer_index: usize, peer_hashes: Vec<H256>) {
self.process_new_blocks_inventory(peer_index, peer_hashes);
self.execute_synchronization_tasks();
}
@ -241,10 +241,12 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
chain.remove_blocks_with_state(BlockState::Requested);
chain.remove_blocks_with_state(BlockState::Scheduled);
chain.remove_blocks_with_state(BlockState::Verifying);
warn!(target: "sync", "Synchronization process restarting from block {:?}", chain.best_block());
}
/// Process new unknown blocks
fn process_unknown_blocks(&mut self, peer_index: usize, mut peer_hashes: Vec<H256>) {
/// Process new blocks inventory
fn process_new_blocks_inventory(&mut self, peer_index: usize, mut peer_hashes: Vec<H256>) {
// | requested | QUEUED |
// --- [1]
// --- [2] +
@ -376,7 +378,7 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
fn execute_synchronization_tasks(&mut self) {
let mut tasks: Vec<Task> = Vec::new();
// prepar synchronization tasks
// prepare synchronization tasks
{
// display information if processed many blocks || enough time has passed since sync start
let mut chain = self.chain.write();
@ -462,15 +464,20 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
/// Process successful block verification
fn on_block_verification_success(&mut self, block: Block) {
let hash = block.hash();
let mut chain = self.chain.write();
{
let hash = block.hash();
let mut chain = self.chain.write();
// remove from verifying queue
assert_eq!(chain.remove_block_with_state(&hash, BlockState::Verifying), HashPosition::Front);
// remove from verifying queue
assert_eq!(chain.remove_block_with_state(&hash, BlockState::Verifying), HashPosition::Front);
// insert to storage
chain.insert_best_block(block)
.expect("Error inserting to db.");
// insert to storage
chain.insert_best_block(block)
.expect("Error inserting to db.");
}
// continue with synchronization
self.execute_synchronization_tasks();
}
/// Process failed block verification
@ -479,6 +486,9 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
// reset synchronization process
self.reset();
// start new tasks
self.execute_synchronization_tasks();
}
}

View File

@ -1,3 +1,4 @@
use std::fmt;
use std::sync::Arc;
use parking_lot::RwLock;
use chain::{Block, RepresentH256};
@ -202,14 +203,14 @@ impl Chain {
/// Prepare best block locator hashes
pub fn best_block_locator_hashes(&self) -> Vec<H256> {
let mut result: Vec<H256> = Vec::with_capacity(4);
if let Some(best_block) = self.hash_chain.back_at(SCHEDULED_QUEUE) {
result.push(best_block);
if let Some(pre_best_block) = self.hash_chain.back_skip_n_at(SCHEDULED_QUEUE, 2) {
result.push(pre_best_block);
}
if let Some(best_block) = self.hash_chain.back_at(REQUESTED_QUEUE) {
result.push(best_block);
if let Some(pre_best_block) = self.hash_chain.back_skip_n_at(REQUESTED_QUEUE, 2) {
result.push(pre_best_block);
}
if let Some(best_block) = self.hash_chain.back_at(VERIFYING_QUEUE) {
result.push(best_block);
if let Some(pre_best_block) = self.hash_chain.back_skip_n_at(VERIFYING_QUEUE, 2) {
result.push(pre_best_block);
}
result.push(self.best_storage_block_hash.clone());
result
@ -324,6 +325,33 @@ impl Chain {
}
}
impl fmt::Debug for Chain {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
try!(writeln!(f, "chain: ["));
let mut num = self.storage.best_block_number().unwrap() as usize;
try!(writeln!(f, "\tworse(stored): {} {:?}", 0, self.storage.block_hash(0)));
try!(writeln!(f, "\tbest(stored): {} {:?}", num, self.storage.block_hash(num as u32)));
let queues = vec![
("verifying", VERIFYING_QUEUE),
("requested", REQUESTED_QUEUE),
("scheduled", SCHEDULED_QUEUE),
];
for (state, queue) in queues {
let queue_len = self.hash_chain.len_of(queue);
if queue_len != 0 {
try!(writeln!(f, "\tworse({}): {} {:?}", state, num + 1, self.hash_chain.front_at(queue)));
num += 1 + queue_len;
if let Some(pre_best) = self.hash_chain.pre_back_at(queue) {
try!(writeln!(f, "\tpre-best({}): {} {:?}", state, num - 1, pre_best));
}
try!(writeln!(f, "\tbest({}): {} {:?}", state, num, self.hash_chain.back_at(queue)));
}
}
writeln!(f, "]")
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@ -77,13 +77,19 @@ impl Peers {
/// Blocks have been requested from peer.
pub fn on_blocks_requested(&mut self, peer_index: usize, blocks_hashes: &Vec<H256>) {
// blocks can only be requested from idle peers
assert_eq!(self.idle_peers.remove(&peer_index), true);
self.blocks_requests.entry(peer_index).or_insert(HashSet::new()).extend(blocks_hashes.iter().cloned());
self.idle_peers.remove(&peer_index);
}
/// Inventory has been requested from peer.
pub fn on_inventory_requested(&mut self, _peer_index: usize) {
// TODO
pub fn on_inventory_requested(&mut self, peer_index: usize) {
// inventory can only be requested from idle peers
assert_eq!(self.idle_peers.remove(&peer_index), true);
// peer is now out-of-synchronization process, because:
// 1) if it has new blocks, it will respond with `inventory` message && will be insrted back here
// 2) if it has no new blocks => either synchronization is completed, or it is behind us in sync
}
/// Reset peers state