Forks support in sync (#91)
* intersect_with_inventory * process_new_blocks_inventory fixed to support forks * fixed sync issues * fixed on_peer_block to support forks * forks support in on_block_verification_* * cleaning up * cleaning up * forget about best inventory * ask for inventory after saturating * '000..000'.into() -> H256::from(0)
This commit is contained in:
parent
5328b3b3bb
commit
adfdef6095
|
@ -39,11 +39,6 @@ impl HashQueue {
|
|||
self.queue.len() as u32
|
||||
}
|
||||
|
||||
/// Returns true if queue is empty.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.queue.is_empty()
|
||||
}
|
||||
|
||||
/// Returns front element from the given queue.
|
||||
pub fn front(&self) -> Option<H256> {
|
||||
self.queue.front().cloned()
|
||||
|
@ -63,19 +58,6 @@ impl HashQueue {
|
|||
Some(self.queue[queue_len - 2].clone())
|
||||
}
|
||||
|
||||
/// Returns n-th element (n is starting from 0), starting from the back-element in the queue.
|
||||
/// If there are no n-th element - returns (n-1) element & etc.
|
||||
pub fn back_skip_n(&self, n: usize) -> Option<H256> {
|
||||
let queue_len = self.queue.len();
|
||||
if queue_len == 0 {
|
||||
return None;
|
||||
}
|
||||
if n + 1 > queue_len {
|
||||
return Some(self.queue[0].clone())
|
||||
}
|
||||
return Some(self.queue[queue_len - n - 1].clone())
|
||||
}
|
||||
|
||||
/// Returns true if queue contains element.
|
||||
pub fn contains(&self, hash: &H256) -> bool {
|
||||
self.set.contains(hash)
|
||||
|
@ -175,11 +157,6 @@ impl HashQueueChain {
|
|||
self.chain[queue_index].len()
|
||||
}
|
||||
|
||||
/// Returns true if given queue is empty.
|
||||
pub fn is_empty_at(&self, queue_index: usize) -> bool {
|
||||
self.chain[queue_index].is_empty()
|
||||
}
|
||||
|
||||
/// Returns element at the front of the given queue.
|
||||
pub fn front_at(&self, queue_index: usize) -> Option<H256> {
|
||||
let ref queue = self.chain[queue_index];
|
||||
|
@ -198,13 +175,6 @@ impl HashQueueChain {
|
|||
queue.pre_back()
|
||||
}
|
||||
|
||||
/// Returns n-th element (n is starting from 0), starting from the back-element in given queue.
|
||||
/// If there are no n-th element - returns (n-1) element & etc.
|
||||
pub fn back_skip_n_at(&self, chain_index: usize, n: usize) -> Option<H256> {
|
||||
let ref queue = self.chain[chain_index];
|
||||
queue.back_skip_n(n)
|
||||
}
|
||||
|
||||
/// Returns the back of the whole chain.
|
||||
pub fn back(&self) -> Option<H256> {
|
||||
let mut queue_index = self.chain.len() - 1;
|
||||
|
@ -223,6 +193,7 @@ impl HashQueueChain {
|
|||
}
|
||||
|
||||
/// Checks if hash is contained in given queue.
|
||||
#[cfg(test)]
|
||||
pub fn is_contained_in(&self, queue_index: usize, hash: &H256) -> bool {
|
||||
self.chain[queue_index].contains(hash)
|
||||
}
|
||||
|
@ -283,16 +254,15 @@ impl Index<u32> for HashQueueChain {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{HashQueue, HashQueueChain, HashPosition};
|
||||
use primitives::hash::H256;
|
||||
|
||||
#[test]
|
||||
fn hash_queue_empty() {
|
||||
let mut queue = HashQueue::new();
|
||||
assert_eq!(queue.len(), 0);
|
||||
assert_eq!(queue.is_empty(), true);
|
||||
assert_eq!(queue.front(), None);
|
||||
assert_eq!(queue.back(), None);
|
||||
assert_eq!(queue.pre_back(), None);
|
||||
assert_eq!(queue.back_skip_n(100), None);
|
||||
assert_eq!(queue.contains(&"000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f".into()), false);
|
||||
assert_eq!(queue.pop_front(), None);
|
||||
assert_eq!(queue.pop_front_n(100), vec![]);
|
||||
|
@ -304,11 +274,9 @@ mod tests {
|
|||
let mut chain = HashQueueChain::with_number_of_queues(3);
|
||||
assert_eq!(chain.len(), 0);
|
||||
assert_eq!(chain.len_of(0), 0);
|
||||
assert_eq!(chain.is_empty_at(0), true);
|
||||
assert_eq!(chain.front_at(0), None);
|
||||
assert_eq!(chain.back_at(0), None);
|
||||
assert_eq!(chain.pre_back_at(0), None);
|
||||
assert_eq!(chain.back_skip_n_at(0, 100), None);
|
||||
assert_eq!(chain.back(), None);
|
||||
assert_eq!(chain.is_contained_in(0, &"000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f".into()), false);
|
||||
assert_eq!(chain.contains_in(&"000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f".into()), None);
|
||||
|
@ -320,16 +288,16 @@ mod tests {
|
|||
fn hash_queue_chain_not_empty() {
|
||||
let mut chain = HashQueueChain::with_number_of_queues(4);
|
||||
chain.push_back_n_at(0, vec![
|
||||
"0000000000000000000000000000000000000000000000000000000000000000".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000001".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000002".into(),
|
||||
H256::from(0),
|
||||
H256::from(1),
|
||||
H256::from(2),
|
||||
]);
|
||||
chain.push_back_n_at(1, vec![
|
||||
"0000000000000000000000000000000000000000000000000000000000000003".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000004".into(),
|
||||
H256::from(3),
|
||||
H256::from(4),
|
||||
]);
|
||||
chain.push_back_n_at(2, vec![
|
||||
"0000000000000000000000000000000000000000000000000000000000000005".into(),
|
||||
H256::from(5),
|
||||
]);
|
||||
|
||||
assert_eq!(chain.len(), 6);
|
||||
|
@ -337,42 +305,25 @@ mod tests {
|
|||
assert_eq!(chain.len_of(1), 2);
|
||||
assert_eq!(chain.len_of(2), 1);
|
||||
assert_eq!(chain.len_of(3), 0);
|
||||
assert_eq!(chain.is_empty_at(0), false);
|
||||
assert_eq!(chain.is_empty_at(1), false);
|
||||
assert_eq!(chain.is_empty_at(2), false);
|
||||
assert_eq!(chain.is_empty_at(3), true);
|
||||
assert_eq!(chain.front_at(0), Some("0000000000000000000000000000000000000000000000000000000000000000".into()));
|
||||
assert_eq!(chain.front_at(1), Some("0000000000000000000000000000000000000000000000000000000000000003".into()));
|
||||
assert_eq!(chain.front_at(2), Some("0000000000000000000000000000000000000000000000000000000000000005".into()));
|
||||
assert_eq!(chain.front_at(0), Some(H256::from(0)));
|
||||
assert_eq!(chain.front_at(1), Some(H256::from(3)));
|
||||
assert_eq!(chain.front_at(2), Some(H256::from(5)));
|
||||
assert_eq!(chain.front_at(3), None);
|
||||
assert_eq!(chain.back_at(0), Some("0000000000000000000000000000000000000000000000000000000000000002".into()));
|
||||
assert_eq!(chain.back_at(1), Some("0000000000000000000000000000000000000000000000000000000000000004".into()));
|
||||
assert_eq!(chain.back_at(2), Some("0000000000000000000000000000000000000000000000000000000000000005".into()));
|
||||
assert_eq!(chain.back_at(0), Some(H256::from(2)));
|
||||
assert_eq!(chain.back_at(1), Some(H256::from(4)));
|
||||
assert_eq!(chain.back_at(2), Some(H256::from(5)));
|
||||
assert_eq!(chain.back_at(3), None);
|
||||
assert_eq!(chain.pre_back_at(0), Some("0000000000000000000000000000000000000000000000000000000000000001".into()));
|
||||
assert_eq!(chain.pre_back_at(1), Some("0000000000000000000000000000000000000000000000000000000000000003".into()));
|
||||
assert_eq!(chain.pre_back_at(0), Some(H256::from(1)));
|
||||
assert_eq!(chain.pre_back_at(1), Some(H256::from(3)));
|
||||
assert_eq!(chain.pre_back_at(2), None);
|
||||
assert_eq!(chain.pre_back_at(3), None);
|
||||
assert_eq!(chain.back(), Some("0000000000000000000000000000000000000000000000000000000000000005".into()));
|
||||
assert_eq!(chain.is_contained_in(0, &"0000000000000000000000000000000000000000000000000000000000000002".into()), true);
|
||||
assert_eq!(chain.is_contained_in(1, &"0000000000000000000000000000000000000000000000000000000000000002".into()), false);
|
||||
assert_eq!(chain.is_contained_in(2, &"0000000000000000000000000000000000000000000000000000000000000002".into()), false);
|
||||
assert_eq!(chain.is_contained_in(3, &"0000000000000000000000000000000000000000000000000000000000000002".into()), false);
|
||||
assert_eq!(chain.contains_in(&"0000000000000000000000000000000000000000000000000000000000000002".into()), Some(0));
|
||||
assert_eq!(chain.contains_in(&"0000000000000000000000000000000000000000000000000000000000000005".into()), Some(2));
|
||||
assert_eq!(chain.contains_in(&"0000000000000000000000000000000000000000000000000000000000000009".into()), None);
|
||||
|
||||
assert_eq!(chain.back_skip_n_at(0, 0), Some("0000000000000000000000000000000000000000000000000000000000000002".into()));
|
||||
assert_eq!(chain.back_skip_n_at(1, 0), Some("0000000000000000000000000000000000000000000000000000000000000004".into()));
|
||||
assert_eq!(chain.back_skip_n_at(2, 0), Some("0000000000000000000000000000000000000000000000000000000000000005".into()));
|
||||
assert_eq!(chain.back_skip_n_at(3, 0), None);
|
||||
assert_eq!(chain.back_skip_n_at(0, 1), Some("0000000000000000000000000000000000000000000000000000000000000001".into()));
|
||||
assert_eq!(chain.back_skip_n_at(1, 1), Some("0000000000000000000000000000000000000000000000000000000000000003".into()));
|
||||
assert_eq!(chain.back_skip_n_at(2, 1), Some("0000000000000000000000000000000000000000000000000000000000000005".into()));
|
||||
assert_eq!(chain.back_skip_n_at(3, 1), None);
|
||||
assert_eq!(chain.back_skip_n_at(0, 2), Some("0000000000000000000000000000000000000000000000000000000000000000".into()));
|
||||
assert_eq!(chain.back_skip_n_at(1, 2), Some("0000000000000000000000000000000000000000000000000000000000000003".into()));
|
||||
assert_eq!(chain.back_skip_n_at(2, 2), Some("0000000000000000000000000000000000000000000000000000000000000005".into()));
|
||||
assert_eq!(chain.back_skip_n_at(3, 2), None);
|
||||
assert_eq!(chain.back(), Some(H256::from(5)));
|
||||
assert_eq!(chain.is_contained_in(0, &H256::from(2)), true);
|
||||
assert_eq!(chain.is_contained_in(1, &H256::from(2)), false);
|
||||
assert_eq!(chain.is_contained_in(2, &H256::from(2)), false);
|
||||
assert_eq!(chain.is_contained_in(3, &H256::from(2)), false);
|
||||
assert_eq!(chain.contains_in(&H256::from(2)), Some(0));
|
||||
assert_eq!(chain.contains_in(&H256::from(5)), Some(2));
|
||||
assert_eq!(chain.contains_in(&H256::from(9)), None);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
use parking_lot::RwLock;
|
||||
use chain::{Block, RepresentH256};
|
||||
use chain::Block;
|
||||
use db;
|
||||
use primitives::hash::H256;
|
||||
use hash_queue::{HashQueueChain, HashPosition};
|
||||
|
@ -47,6 +47,23 @@ pub struct Information {
|
|||
pub stored: u32,
|
||||
}
|
||||
|
||||
/// Result of intersecting chain && inventory
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum InventoryIntersection {
|
||||
/// 3.2: No intersection with in-memory queue && no intersection with db
|
||||
NoKnownBlocks(usize),
|
||||
/// 2.3: Inventory has no new blocks && some of blocks in inventory are in in-memory queue
|
||||
InMemoryNoNewBlocks,
|
||||
/// 2.4.2: Inventory has new blocks && these blocks are right after chain' best block
|
||||
InMemoryMainNewBlocks(usize),
|
||||
/// 2.4.3: Inventory has new blocks && these blocks are forked from our chain' best block
|
||||
InMemoryForkNewBlocks(usize),
|
||||
/// 3.3: No intersection with in-memory queue && has intersection with db && all blocks are already stored in db
|
||||
DbAllBlocksKnown,
|
||||
/// 3.4: No intersection with in-memory queue && has intersection with db && some blocks are not yet stored in db
|
||||
DbForkNewBlocks(usize),
|
||||
}
|
||||
|
||||
/// Blockchain from synchroniation point of view, consisting of:
|
||||
/// 1) all blocks from the `storage` [oldest blocks]
|
||||
/// 2) all blocks currently verifying by `verification_queue`
|
||||
|
@ -109,7 +126,7 @@ impl Chain {
|
|||
scheduled: self.hash_chain.len_of(SCHEDULED_QUEUE),
|
||||
requested: self.hash_chain.len_of(REQUESTED_QUEUE),
|
||||
verifying: self.hash_chain.len_of(VERIFYING_QUEUE),
|
||||
stored: self.storage.best_block().map_or(0, |block| block.number + 1),
|
||||
stored: self.best_storage_block.number + 1,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -137,68 +154,14 @@ impl Chain {
|
|||
}
|
||||
}
|
||||
|
||||
/// Returns true if has blocks of given type
|
||||
pub fn has_blocks_of_state(&self, state: BlockState) -> bool {
|
||||
match state {
|
||||
BlockState::Stored => true, // storage with genesis block is required
|
||||
_ => !self.hash_chain.is_empty_at(state.to_queue_index()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get best block
|
||||
pub fn best_block(&self) -> db::BestBlock {
|
||||
let storage_best_block = self.storage.best_block().expect("storage with genesis block is required");
|
||||
match self.hash_chain.back() {
|
||||
Some(hash) => db::BestBlock {
|
||||
number: storage_best_block.number + self.hash_chain.len(),
|
||||
number: self.best_storage_block.number + self.hash_chain.len(),
|
||||
hash: hash.clone(),
|
||||
},
|
||||
None => db::BestBlock {
|
||||
number: storage_best_block.number,
|
||||
hash: storage_best_block.hash,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get best block of given state
|
||||
pub fn best_block_of_state(&self, state: BlockState) -> Option<db::BestBlock> {
|
||||
match state {
|
||||
BlockState::Scheduled => self.hash_chain.back_at(SCHEDULED_QUEUE)
|
||||
.map(|hash| db::BestBlock {
|
||||
hash: hash,
|
||||
number: self.storage.best_block().expect("storage with genesis block is required").number + 1
|
||||
+ self.hash_chain.len_of(VERIFYING_QUEUE)
|
||||
+ self.hash_chain.len_of(REQUESTED_QUEUE)
|
||||
+ self.hash_chain.len_of(SCHEDULED_QUEUE)
|
||||
}),
|
||||
BlockState::Requested => self.hash_chain.back_at(REQUESTED_QUEUE)
|
||||
.map(|hash| db::BestBlock {
|
||||
hash: hash,
|
||||
number: self.storage.best_block().expect("storage with genesis block is required").number + 1
|
||||
+ self.hash_chain.len_of(VERIFYING_QUEUE)
|
||||
+ self.hash_chain.len_of(REQUESTED_QUEUE)
|
||||
}),
|
||||
BlockState::Verifying => self.hash_chain.back_at(VERIFYING_QUEUE)
|
||||
.map(|hash| db::BestBlock {
|
||||
hash: hash,
|
||||
number: self.storage.best_block().expect("storage with genesis block is required").number + 1
|
||||
+ self.hash_chain.len_of(VERIFYING_QUEUE)
|
||||
}),
|
||||
BlockState::Stored => {
|
||||
self.storage.best_block()
|
||||
},
|
||||
_ => panic!("not supported"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if block has given state
|
||||
pub fn block_has_state(&self, hash: &H256, state: BlockState) -> bool {
|
||||
match state {
|
||||
BlockState::Scheduled => self.hash_chain.is_contained_in(SCHEDULED_QUEUE, hash),
|
||||
BlockState::Requested => self.hash_chain.is_contained_in(REQUESTED_QUEUE, hash),
|
||||
BlockState::Verifying => self.hash_chain.is_contained_in(VERIFYING_QUEUE, hash),
|
||||
BlockState::Stored => self.storage.contains_block(db::BlockRef::Hash(hash.clone())),
|
||||
BlockState::Unknown => self.block_state(hash) == BlockState::Unknown,
|
||||
None => self.best_storage_block.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -214,39 +177,12 @@ impl Chain {
|
|||
}
|
||||
}
|
||||
|
||||
/// Get block number from storage
|
||||
pub fn storage_block_number(&self, hash: &H256) -> Option<u32> {
|
||||
self.storage.block_number(hash)
|
||||
}
|
||||
|
||||
/// Get block hash from storage
|
||||
pub fn storage_block_hash(&self, number: u32) -> Option<H256> {
|
||||
self.storage.block_hash(number)
|
||||
}
|
||||
|
||||
/// Get block from the storage
|
||||
pub fn storage_block(&self, hash: &H256) -> Option<Block> {
|
||||
self.storage.block(db::BlockRef::Hash(hash.clone()))
|
||||
}
|
||||
|
||||
/// Prepare best block locator hashes
|
||||
pub fn best_block_locator_hashes(&self) -> Vec<H256> {
|
||||
let mut result: Vec<H256> = Vec::with_capacity(4);
|
||||
if let Some(pre_best_block) = self.hash_chain.back_skip_n_at(SCHEDULED_QUEUE, 2) {
|
||||
result.push(pre_best_block);
|
||||
}
|
||||
if let Some(pre_best_block) = self.hash_chain.back_skip_n_at(REQUESTED_QUEUE, 2) {
|
||||
result.push(pre_best_block);
|
||||
}
|
||||
if let Some(pre_best_block) = self.hash_chain.back_skip_n_at(VERIFYING_QUEUE, 2) {
|
||||
result.push(pre_best_block);
|
||||
}
|
||||
result.push(self.best_storage_block.hash.clone());
|
||||
result
|
||||
}
|
||||
|
||||
/// Prepare block locator hashes, as described in protocol documentation:
|
||||
/// https://en.bitcoin.it/wiki/Protocol_documentation#getblocks
|
||||
/// When there are forked blocks in the queue, this method can result in
|
||||
/// mixed block locator hashes ([0 - from fork1, 1 - from fork2, 2 - from fork1]).
|
||||
/// Peer will respond with blocks of fork1 || fork2 => we could end up in some side fork
|
||||
/// To resolve this, after switching to saturated state, we will also ask all peers for inventory.
|
||||
pub fn block_locator_hashes(&self) -> Vec<H256> {
|
||||
let mut block_locator_hashes: Vec<H256> = Vec::new();
|
||||
|
||||
|
@ -254,8 +190,7 @@ impl Chain {
|
|||
let (local_index, step) = self.block_locator_hashes_for_queue(&mut block_locator_hashes);
|
||||
|
||||
// calculate for storage
|
||||
let storage_best_block_number = self.storage.best_block().expect("storage with genesis block is required").number;
|
||||
let storage_index = if storage_best_block_number < local_index { 0 } else { storage_best_block_number - local_index };
|
||||
let storage_index = if self.best_storage_block.number < local_index { 0 } else { self.best_storage_block.number - local_index };
|
||||
self.block_locator_hashes_for_storage(storage_index, step, &mut block_locator_hashes);
|
||||
block_locator_hashes
|
||||
}
|
||||
|
@ -287,16 +222,21 @@ impl Chain {
|
|||
|
||||
/// Insert new best block to storage
|
||||
pub fn insert_best_block(&mut self, block: Block) -> Result<(), db::Error> {
|
||||
if block.block_header.previous_header_hash != self.best_storage_block.hash {
|
||||
return Err(db::Error::DB("Trying to insert out-of-order block".into()));
|
||||
}
|
||||
// insert to storage
|
||||
try!(self.storage.insert_block(&block));
|
||||
|
||||
// remember new best block hash
|
||||
self.best_storage_block.number += 1;
|
||||
self.best_storage_block.hash = block.hash();
|
||||
self.best_storage_block = self.storage.best_block().expect("Inserted block above");
|
||||
|
||||
// insert to storage
|
||||
self.storage.insert_block(&block)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Remove block
|
||||
pub fn remove_block(&mut self, hash: &H256) {
|
||||
if self.hash_chain.remove_at(SCHEDULED_QUEUE, hash) == HashPosition::Missing
|
||||
&& self.hash_chain.remove_at(REQUESTED_QUEUE, hash) == HashPosition::Missing {
|
||||
self.hash_chain.remove_at(VERIFYING_QUEUE, hash);
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove block by hash if it is currently in given state
|
||||
|
@ -309,6 +249,57 @@ impl Chain {
|
|||
self.hash_chain.remove_all_at(state.to_queue_index());
|
||||
}
|
||||
|
||||
/// Intersect chain with inventory
|
||||
pub fn intersect_with_inventory(&self, inventory: &Vec<H256>) -> InventoryIntersection {
|
||||
let inventory_len = inventory.len();
|
||||
assert!(inventory_len != 0);
|
||||
|
||||
// giving that blocks in inventory are ordered
|
||||
match self.block_state(&inventory[0]) {
|
||||
// if first block of inventory is unknown => all other blocks are also unknown
|
||||
BlockState::Unknown => {
|
||||
InventoryIntersection::NoKnownBlocks(0)
|
||||
},
|
||||
// else if first block is known
|
||||
first_block_state @ _ => match self.block_state(&inventory[inventory_len - 1]) {
|
||||
// if last block is known to be in db => all inventory blocks are also in db
|
||||
BlockState::Stored => {
|
||||
InventoryIntersection::DbAllBlocksKnown
|
||||
},
|
||||
// if first block is known && last block is unknown => intersection with queue or with db
|
||||
BlockState::Unknown => {
|
||||
// find last known block
|
||||
let mut previous_state = first_block_state;
|
||||
for index in 1..inventory_len {
|
||||
let state = self.block_state(&inventory[index]);
|
||||
if state == BlockState::Unknown {
|
||||
// previous block is stored => fork from stored block
|
||||
if previous_state == BlockState::Stored {
|
||||
return InventoryIntersection::DbForkNewBlocks(index);
|
||||
}
|
||||
// previous block is best block => no fork
|
||||
else if &self.best_block().hash == &inventory[index - 1] {
|
||||
return InventoryIntersection::InMemoryMainNewBlocks(index);
|
||||
}
|
||||
// previous block is not a best block => fork
|
||||
else {
|
||||
return InventoryIntersection::InMemoryForkNewBlocks(index);
|
||||
}
|
||||
}
|
||||
previous_state = state;
|
||||
}
|
||||
|
||||
// unreachable because last block is unknown && in above loop we search for unknown blocks
|
||||
unreachable!();
|
||||
},
|
||||
// if first block is known && last block is also known && is in queue => queue intersection with no new block
|
||||
_ => {
|
||||
InventoryIntersection::InMemoryNoNewBlocks
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Calculate block locator hashes for hash queue
|
||||
fn block_locator_hashes_for_queue(&self, hashes: &mut Vec<H256>) -> (u32, u32) {
|
||||
let queue_len = self.hash_chain.len();
|
||||
|
@ -359,7 +350,7 @@ impl fmt::Debug for Chain {
|
|||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
try!(writeln!(f, "chain: ["));
|
||||
{
|
||||
let mut num = self.storage.best_block().expect("Storage with genesis block is required").number;
|
||||
let mut num = self.best_storage_block.number;
|
||||
try!(writeln!(f, "\tworse(stored): {} {:?}", 0, self.storage.block_hash(0)));
|
||||
try!(writeln!(f, "\tbest(stored): {} {:?}", num, self.storage.block_hash(num)));
|
||||
|
||||
|
@ -389,8 +380,9 @@ mod tests {
|
|||
use std::sync::Arc;
|
||||
use chain::RepresentH256;
|
||||
use hash_queue::HashPosition;
|
||||
use super::{Chain, BlockState};
|
||||
use super::{Chain, BlockState, InventoryIntersection};
|
||||
use db::{self, Store, BestBlock};
|
||||
use primitives::hash::H256;
|
||||
use test_data;
|
||||
|
||||
#[test]
|
||||
|
@ -406,19 +398,9 @@ mod tests {
|
|||
assert_eq!(chain.length_of_state(BlockState::Requested), 0);
|
||||
assert_eq!(chain.length_of_state(BlockState::Verifying), 0);
|
||||
assert_eq!(chain.length_of_state(BlockState::Stored), 1);
|
||||
assert_eq!(chain.has_blocks_of_state(BlockState::Scheduled), false);
|
||||
assert_eq!(chain.has_blocks_of_state(BlockState::Requested), false);
|
||||
assert_eq!(chain.has_blocks_of_state(BlockState::Verifying), false);
|
||||
assert_eq!(chain.has_blocks_of_state(BlockState::Stored), true);
|
||||
assert_eq!(&chain.best_block(), &db_best_block);
|
||||
assert_eq!(chain.best_block_of_state(BlockState::Scheduled), None);
|
||||
assert_eq!(chain.best_block_of_state(BlockState::Requested), None);
|
||||
assert_eq!(chain.best_block_of_state(BlockState::Verifying), None);
|
||||
assert_eq!(chain.best_block_of_state(BlockState::Stored), Some(db_best_block.clone()));
|
||||
assert_eq!(chain.block_has_state(&db_best_block.hash, BlockState::Requested), false);
|
||||
assert_eq!(chain.block_has_state(&db_best_block.hash, BlockState::Stored), true);
|
||||
assert_eq!(chain.block_state(&db_best_block.hash), BlockState::Stored);
|
||||
assert_eq!(chain.block_state(&"0000000000000000000000000000000000000000000000000000000000000000".into()), BlockState::Unknown);
|
||||
assert_eq!(chain.block_state(&H256::from(0)), BlockState::Unknown);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -428,12 +410,12 @@ mod tests {
|
|||
|
||||
// add 6 blocks to scheduled queue
|
||||
chain.schedule_blocks_hashes(vec![
|
||||
"0000000000000000000000000000000000000000000000000000000000000000".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000001".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000002".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000003".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000004".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000005".into(),
|
||||
H256::from(0),
|
||||
H256::from(1),
|
||||
H256::from(2),
|
||||
H256::from(3),
|
||||
H256::from(4),
|
||||
H256::from(5),
|
||||
]);
|
||||
assert!(chain.information().scheduled == 6 && chain.information().requested == 0
|
||||
&& chain.information().verifying == 0 && chain.information().stored == 1);
|
||||
|
@ -452,22 +434,22 @@ mod tests {
|
|||
&& chain.information().verifying == 0 && chain.information().stored == 1);
|
||||
|
||||
// try to remove block 0 from scheduled queue => missing
|
||||
assert_eq!(chain.remove_block_with_state(&"0000000000000000000000000000000000000000000000000000000000000000".into(), BlockState::Scheduled), HashPosition::Missing);
|
||||
assert_eq!(chain.remove_block_with_state(&H256::from(0), BlockState::Scheduled), HashPosition::Missing);
|
||||
assert!(chain.information().scheduled == 3 && chain.information().requested == 3
|
||||
&& chain.information().verifying == 0 && chain.information().stored == 1);
|
||||
// remove blocks 0 & 1 from requested queue
|
||||
assert_eq!(chain.remove_block_with_state(&"0000000000000000000000000000000000000000000000000000000000000001".into(), BlockState::Requested), HashPosition::Inside);
|
||||
assert_eq!(chain.remove_block_with_state(&"0000000000000000000000000000000000000000000000000000000000000000".into(), BlockState::Requested), HashPosition::Front);
|
||||
assert_eq!(chain.remove_block_with_state(&H256::from(1), BlockState::Requested), HashPosition::Inside);
|
||||
assert_eq!(chain.remove_block_with_state(&H256::from(0), BlockState::Requested), HashPosition::Front);
|
||||
assert!(chain.information().scheduled == 3 && chain.information().requested == 1
|
||||
&& chain.information().verifying == 0 && chain.information().stored == 1);
|
||||
// mark 0 & 1 as verifying
|
||||
chain.verify_block_hash("0000000000000000000000000000000000000000000000000000000000000001".into());
|
||||
chain.verify_block_hash("0000000000000000000000000000000000000000000000000000000000000002".into());
|
||||
chain.verify_block_hash(H256::from(1));
|
||||
chain.verify_block_hash(H256::from(2));
|
||||
assert!(chain.information().scheduled == 3 && chain.information().requested == 1
|
||||
&& chain.information().verifying == 2 && chain.information().stored == 1);
|
||||
|
||||
// mark block 0 as verified
|
||||
assert_eq!(chain.remove_block_with_state(&"0000000000000000000000000000000000000000000000000000000000000001".into(), BlockState::Verifying), HashPosition::Front);
|
||||
assert_eq!(chain.remove_block_with_state(&H256::from(1), BlockState::Verifying), HashPosition::Front);
|
||||
assert!(chain.information().scheduled == 3 && chain.information().requested == 1
|
||||
&& chain.information().verifying == 1 && chain.information().stored == 1);
|
||||
// insert new best block to the chain
|
||||
|
@ -496,85 +478,149 @@ mod tests {
|
|||
assert_eq!(chain.block_locator_hashes(), vec![block2_hash.clone(), block1_hash.clone(), genesis_hash.clone()]);
|
||||
|
||||
chain.schedule_blocks_hashes(vec![
|
||||
"0000000000000000000000000000000000000000000000000000000000000000".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000001".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000002".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000003".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000004".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000005".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000006".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000007".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000008".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000009".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000010".into(),
|
||||
H256::from(0),
|
||||
H256::from(1),
|
||||
H256::from(2),
|
||||
H256::from(3),
|
||||
H256::from(4),
|
||||
H256::from(5),
|
||||
H256::from(6),
|
||||
H256::from(7),
|
||||
H256::from(8),
|
||||
H256::from(9),
|
||||
H256::from(10),
|
||||
]);
|
||||
chain.request_blocks_hashes(10);
|
||||
chain.verify_blocks_hashes(10);
|
||||
|
||||
assert_eq!(chain.best_block_locator_hashes()[0], "0000000000000000000000000000000000000000000000000000000000000010".into());
|
||||
assert_eq!(chain.block_locator_hashes(), vec![
|
||||
"0000000000000000000000000000000000000000000000000000000000000010".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000009".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000008".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000007".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000006".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000005".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000004".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000003".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000002".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000001".into(),
|
||||
H256::from(10),
|
||||
H256::from(9),
|
||||
H256::from(8),
|
||||
H256::from(7),
|
||||
H256::from(6),
|
||||
H256::from(5),
|
||||
H256::from(4),
|
||||
H256::from(3),
|
||||
H256::from(2),
|
||||
H256::from(1),
|
||||
block2_hash.clone(),
|
||||
genesis_hash.clone(),
|
||||
]);
|
||||
|
||||
chain.schedule_blocks_hashes(vec![
|
||||
"0000000000000000000000000000000000000000000000000000000000000011".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000012".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000013".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000014".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000015".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000016".into(),
|
||||
H256::from(11),
|
||||
H256::from(12),
|
||||
H256::from(13),
|
||||
H256::from(14),
|
||||
H256::from(15),
|
||||
H256::from(16),
|
||||
]);
|
||||
chain.request_blocks_hashes(10);
|
||||
|
||||
assert_eq!(chain.best_block_locator_hashes()[0], "0000000000000000000000000000000000000000000000000000000000000014".into());
|
||||
assert_eq!(chain.block_locator_hashes(), vec![
|
||||
"0000000000000000000000000000000000000000000000000000000000000016".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000015".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000014".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000013".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000012".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000011".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000010".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000009".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000008".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000007".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000005".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000001".into(),
|
||||
H256::from(16),
|
||||
H256::from(15),
|
||||
H256::from(14),
|
||||
H256::from(13),
|
||||
H256::from(12),
|
||||
H256::from(11),
|
||||
H256::from(10),
|
||||
H256::from(9),
|
||||
H256::from(8),
|
||||
H256::from(7),
|
||||
H256::from(5),
|
||||
H256::from(1),
|
||||
genesis_hash.clone(),
|
||||
]);
|
||||
|
||||
chain.schedule_blocks_hashes(vec![
|
||||
"0000000000000000000000000000000000000000000000000000000000000020".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000021".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000022".into(),
|
||||
H256::from(20),
|
||||
H256::from(21),
|
||||
H256::from(22),
|
||||
]);
|
||||
|
||||
assert_eq!(chain.best_block_locator_hashes()[0], "0000000000000000000000000000000000000000000000000000000000000020".into());
|
||||
assert_eq!(chain.block_locator_hashes(), vec![
|
||||
"0000000000000000000000000000000000000000000000000000000000000022".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000021".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000020".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000016".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000015".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000014".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000013".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000012".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000011".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000010".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000008".into(),
|
||||
"0000000000000000000000000000000000000000000000000000000000000004".into(),
|
||||
H256::from(22),
|
||||
H256::from(21),
|
||||
H256::from(20),
|
||||
H256::from(16),
|
||||
H256::from(15),
|
||||
H256::from(14),
|
||||
H256::from(13),
|
||||
H256::from(12),
|
||||
H256::from(11),
|
||||
H256::from(10),
|
||||
H256::from(8),
|
||||
H256::from(4),
|
||||
genesis_hash.clone(),
|
||||
]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn chain_intersect_with_inventory() {
|
||||
let mut chain = Chain::new(Arc::new(db::TestStorage::with_genesis_block()));
|
||||
// append 2 db blocks
|
||||
chain.insert_best_block(test_data::block_h1()).expect("Error inserting new block");
|
||||
chain.insert_best_block(test_data::block_h2()).expect("Error inserting new block");
|
||||
// append 3 verifying blocks
|
||||
chain.schedule_blocks_hashes(vec![
|
||||
H256::from(0),
|
||||
H256::from(1),
|
||||
H256::from(2),
|
||||
]);
|
||||
chain.request_blocks_hashes(3);
|
||||
chain.verify_blocks_hashes(3);
|
||||
// append 3 requested blocks
|
||||
chain.schedule_blocks_hashes(vec![
|
||||
H256::from(10),
|
||||
H256::from(11),
|
||||
H256::from(12),
|
||||
]);
|
||||
chain.request_blocks_hashes(10);
|
||||
// append 3 scheduled blocks
|
||||
chain.schedule_blocks_hashes(vec![
|
||||
H256::from(20),
|
||||
H256::from(21),
|
||||
H256::from(22),
|
||||
]);
|
||||
|
||||
assert_eq!(chain.intersect_with_inventory(&vec![
|
||||
H256::from(30),
|
||||
H256::from(31),
|
||||
]), InventoryIntersection::NoKnownBlocks(0));
|
||||
|
||||
assert_eq!(chain.intersect_with_inventory(&vec![
|
||||
H256::from(2),
|
||||
H256::from(10),
|
||||
H256::from(11),
|
||||
H256::from(12),
|
||||
H256::from(20),
|
||||
]), InventoryIntersection::InMemoryNoNewBlocks);
|
||||
|
||||
assert_eq!(chain.intersect_with_inventory(&vec![
|
||||
H256::from(21),
|
||||
H256::from(22),
|
||||
H256::from(30),
|
||||
H256::from(31),
|
||||
]), InventoryIntersection::InMemoryMainNewBlocks(2));
|
||||
|
||||
assert_eq!(chain.intersect_with_inventory(&vec![
|
||||
H256::from(20),
|
||||
H256::from(21),
|
||||
H256::from(30),
|
||||
H256::from(31),
|
||||
]), InventoryIntersection::InMemoryForkNewBlocks(2));
|
||||
|
||||
assert_eq!(chain.intersect_with_inventory(&vec![
|
||||
test_data::block_h1().hash(),
|
||||
test_data::block_h2().hash(),
|
||||
]), InventoryIntersection::DbAllBlocksKnown);
|
||||
|
||||
assert_eq!(chain.intersect_with_inventory(&vec![
|
||||
test_data::block_h2().hash(),
|
||||
H256::from(30),
|
||||
H256::from(31),
|
||||
]), InventoryIntersection::DbForkNewBlocks(1));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use std::thread;
|
||||
use std::sync::Arc;
|
||||
use std::cmp::{min, max};
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::sync::mpsc::{channel, Sender, Receiver};
|
||||
use parking_lot::Mutex;
|
||||
|
@ -12,10 +12,9 @@ use futures_cpupool::CpuPool;
|
|||
use db;
|
||||
use chain::{Block, RepresentH256};
|
||||
use primitives::hash::H256;
|
||||
use hash_queue::HashPosition;
|
||||
use synchronization_peers::Peers;
|
||||
#[cfg(test)] use synchronization_peers::{Information as PeersInformation};
|
||||
use synchronization_chain::{ChainRef, BlockState};
|
||||
use synchronization_chain::{ChainRef, BlockState, InventoryIntersection};
|
||||
#[cfg(test)]
|
||||
use synchronization_chain::{Information as ChainInformation};
|
||||
use verification::{ChainVerifier, Error as VerificationError, Verify};
|
||||
|
@ -39,20 +38,21 @@ use std::time::Duration;
|
|||
///! 2.4) if !inventory_rest.is_empty(): ===> has new unknown blocks in inventory
|
||||
///! 2.4.1) queue_rest = queue after intersection
|
||||
///! 2.4.2) if queue_rest.is_empty(): ===> has new unknown blocks in inventory, no fork
|
||||
///! 2.4.2.1) scheduled_blocks.append(inventory_rest)
|
||||
///! 2.4.2.2) stop (2.4.2)
|
||||
///! 2.4.3) if !queue_rest.is_empty(): ===> has new unknown blocks in inventory, fork
|
||||
///! 2.4.3.1) scheduled_blocks.append(inventory_rest)
|
||||
///! 2.4.3.2) stop (2.4.3)
|
||||
///! 2.4.4) if !queue_rest.is_empty(): ===> has new unknown blocks in inventory, fork
|
||||
///! 2.4.4.1) scheduled_blocks.append(inventory_rest)
|
||||
///! 2.4.4.2) stop (2.4.4)
|
||||
///! 2.4.5) stop (2.4)
|
||||
///! 2.4.3) stop (2.4)
|
||||
///! 2.5) stop (2)
|
||||
///! 3) if queue_intersection.is_empty(): ===> responded with out-of-sync-window blocks
|
||||
///! 3.1) last_known_block = inventory.last(b => b.is_known())
|
||||
///! 3.2) if last_known_block == None: ===> we know nothing about these blocks & we haven't asked for these
|
||||
///! 3.2.1) peer will be excluded later by management thread
|
||||
///! 3.2.1) if !synchronizing => remember peer as useful + ask for blocks
|
||||
///! 3.2.1) if synchronizing => peer will be excluded later by management thread
|
||||
///! 3.2.2) stop (3.2)
|
||||
///! 3.3) if last_known_block == last(inventory): ===> responded with all-known-blocks
|
||||
///! 3.3.1) remember peer as useful (possibly had failures before && have been excluded from sync)
|
||||
///! 3.3.1) if syncing, remember peer as useful (possibly had failures before && have been excluded from sync)
|
||||
///! 3.3.2) stop (3.3)
|
||||
///! 3.4) if last_known_block in the middle of inventory: ===> responded with forked blocks
|
||||
///! 3.4.1) remember peer as useful
|
||||
|
@ -62,24 +62,40 @@ use std::time::Duration;
|
|||
///! 3.5) stop (3)
|
||||
///!
|
||||
///! on_peer_block: After receiving `block` message:
|
||||
///! 1) if block_state(block) in (Scheduled, Verifying, Stored): ===> late delivery
|
||||
///! 1) if block_state(block) in (Verifying, Stored): ===> late delivery
|
||||
///! 1.1) remember peer as useful
|
||||
///! 1.2) stop (1)
|
||||
///! 2) if block_state(block) == Requested: ===> on-time delivery
|
||||
///! 2) if block_state(block) in (Scheduled, Requested): ===> future/on-time delivery
|
||||
///! 2.1) remember peer as useful
|
||||
///! 2.2) move block from requested to verifying queue
|
||||
///! 2.2) queue verification().and_then(insert).or_else(reset_sync)
|
||||
///! 2.3) stop (2)
|
||||
///! 2.2) if block_state(block.parent) in (Verifying, Stored): ===> we can proceed with verification
|
||||
///! 2.2.1) remove block from current queue (Verifying || Stored)
|
||||
///! 2.2.2) append block to the verification queue
|
||||
///! 2.2.3) queue verification().and_then(on_block_verification_success).or_else(on_block_verification_error)
|
||||
///! 2.2.4) try to verify orphan blocks
|
||||
///! 2.2.5) stop (2.2)
|
||||
///! 2.3) if block_state(block.parent) in (Requested, Scheduled): ===> we have found an orphan block
|
||||
///! 2.3.1) remove block from current queue (Verifying || Stored)
|
||||
///! 2.3.2) append block to the orphans
|
||||
///! 2.3.3) stop (2.3)
|
||||
///! 2.4) if block_state(block.parent) == Unknown: ===> bad block found
|
||||
///! 2.4.1) remove block from current queue (Verifying || Stored)
|
||||
///! 2.4.2) stop (2.4)
|
||||
///! 2.5) stop (2)
|
||||
///! 3) if block_state(block) == Unknown: ===> maybe we are on-top of chain && new block is announced?
|
||||
///! 3.1) if block_state(block.parent_hash) == Unknown: ===> we do not know parent
|
||||
///! 3.1.1) ignore this block
|
||||
///! 3.1.2) stop (3.1)
|
||||
///! 3.2) if block_state(block.parent_hash) != Unknown: ===> fork found
|
||||
///! 3.2) if block_state(block.parent_hash) in (Verifying, Stored): ===> fork found, can verify
|
||||
///! 3.2.1) ask peer for best inventory (after this block)
|
||||
///! 3.2.2) append block to verifying queue
|
||||
///! 3.2.3) queue verification().and_then(insert).or_else(reset_sync)
|
||||
///! 3.2.3) queue verification().and_then(on_block_verification_success).or_else(on_block_verification_error)
|
||||
///! 3.2.4) stop (3.2)
|
||||
///! 2.3) stop (2)
|
||||
///! 3.3) if block_state(block.parent_hash) in (Requested, Scheduled): ===> fork found, add as orphan
|
||||
///! 3.3.1) ask peer for best inventory (after this block)
|
||||
///! 3.3.2) append block to orphan
|
||||
///! 3.3.3) stop (3.3)
|
||||
///! 3.4) stop (2)
|
||||
///! + if no blocks left in scheduled + requested queue => we are saturated => ask all peers for inventory & forget
|
||||
///!
|
||||
///! execute_synchronization_tasks: After receiving `inventory` message OR receiving `block` message OR when management thread schedules tasks:
|
||||
///! 1) if there are blocks in `scheduled` queue AND we can fit more blocks into memory: ===> ask for blocks
|
||||
|
@ -113,7 +129,8 @@ use std::time::Duration;
|
|||
///!
|
||||
///! on_block_verification_error: When verification completes with an error:
|
||||
///! 1) remove block from verification queue
|
||||
///! 2) remove all known children from all queues [so that new `block` messages will be ignored in on_peer_block.3.1.1]
|
||||
///! 2) remove all known children from all queues [so that new `block` messages will be ignored in on_peer_block.3.1.1] (TODO: not implemented currently!!!)
|
||||
///!
|
||||
///!
|
||||
|
||||
/// Approximate maximal number of blocks hashes in scheduled queue.
|
||||
|
@ -162,7 +179,6 @@ pub trait Client : Send + 'static {
|
|||
fn on_new_blocks_inventory(&mut self, peer_index: usize, peer_hashes: Vec<H256>);
|
||||
fn on_peer_block(&mut self, peer_index: usize, block: Block);
|
||||
fn on_peer_disconnected(&mut self, peer_index: usize);
|
||||
fn reset(&mut self, is_hard: bool);
|
||||
fn on_block_verification_success(&mut self, block: Block);
|
||||
fn on_block_verification_error(&mut self, err: &VerificationError, hash: &H256);
|
||||
}
|
||||
|
@ -190,7 +206,7 @@ pub struct SynchronizationClient<T: TaskExecutor> {
|
|||
/// Chain reference.
|
||||
chain: ChainRef,
|
||||
/// Blocks from requested_hashes, but received out-of-order.
|
||||
orphaned_blocks: HashMap<H256, Block>,
|
||||
orphaned_blocks: HashMap<H256, Vec<(H256, Block)>>,
|
||||
/// Verification work transmission channel.
|
||||
verification_work_sender: Option<Sender<VerificationTask>>,
|
||||
/// Verification thread.
|
||||
|
@ -218,10 +234,11 @@ impl State {
|
|||
impl<T> Drop for SynchronizationClient<T> where T: TaskExecutor {
|
||||
fn drop(&mut self) {
|
||||
if let Some(join_handle) = self.verification_worker_thread.take() {
|
||||
self.verification_work_sender
|
||||
// ignore send error here <= destructing anyway
|
||||
let _ = self.verification_work_sender
|
||||
.take()
|
||||
.expect("Some(join_handle) => Some(verification_work_sender)")
|
||||
.send(VerificationTask::Stop).expect("TODO");
|
||||
.send(VerificationTask::Stop);
|
||||
join_handle.join().expect("Clean shutdown.");
|
||||
}
|
||||
}
|
||||
|
@ -246,45 +263,25 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
|
|||
// update peers to select next tasks
|
||||
self.peers.on_block_received(peer_index, &block_hash);
|
||||
|
||||
self.process_peer_block(block_hash, block);
|
||||
self.process_peer_block(peer_index, block_hash, block);
|
||||
self.execute_synchronization_tasks();
|
||||
}
|
||||
|
||||
/// Peer disconnected.
|
||||
fn on_peer_disconnected(&mut self, peer_index: usize) {
|
||||
self.peers.on_peer_disconnected(peer_index);
|
||||
|
||||
// when last peer is disconnected, reset, but let verifying blocks be verified
|
||||
self.reset(false);
|
||||
}
|
||||
|
||||
/// Reset synchronization process
|
||||
fn reset(&mut self, is_hard: bool) {
|
||||
self.peers.reset();
|
||||
self.orphaned_blocks.clear();
|
||||
// TODO: reset verification queue
|
||||
|
||||
let mut chain = self.chain.write();
|
||||
chain.remove_blocks_with_state(BlockState::Requested);
|
||||
chain.remove_blocks_with_state(BlockState::Scheduled);
|
||||
if is_hard {
|
||||
self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number);
|
||||
chain.remove_blocks_with_state(BlockState::Verifying);
|
||||
warn!(target: "sync", "Synchronization process restarting from block {:?}", chain.best_block());
|
||||
}
|
||||
else {
|
||||
self.state = State::Saturated;
|
||||
if self.peers.on_peer_disconnected(peer_index) {
|
||||
self.switch_to_saturated_state(false);
|
||||
}
|
||||
}
|
||||
|
||||
/// Process successful block verification
|
||||
fn on_block_verification_success(&mut self, block: Block) {
|
||||
{
|
||||
let hash = block.hash();
|
||||
let mut chain = self.chain.write();
|
||||
|
||||
// remove from verifying queue
|
||||
assert_eq!(chain.remove_block_with_state(&hash, BlockState::Verifying), HashPosition::Front);
|
||||
// remove block from verification queue
|
||||
chain.remove_block_with_state(&block.hash(), BlockState::Verifying);
|
||||
|
||||
// insert to storage
|
||||
chain.insert_best_block(block)
|
||||
|
@ -299,8 +296,12 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
|
|||
fn on_block_verification_error(&mut self, err: &VerificationError, hash: &H256) {
|
||||
warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash, err);
|
||||
|
||||
// reset synchronization process
|
||||
self.reset(true);
|
||||
{
|
||||
let mut chain = self.chain.write();
|
||||
|
||||
// remove block from verification queue
|
||||
chain.remove_block_with_state(&hash, BlockState::Verifying);
|
||||
}
|
||||
|
||||
// start new tasks
|
||||
self.execute_synchronization_tasks();
|
||||
|
@ -351,8 +352,10 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
|
|||
None => return Ok(()),
|
||||
};
|
||||
let mut client = client.lock();
|
||||
manage_synchronization_peers(&mut client.peers);
|
||||
client.execute_synchronization_tasks();
|
||||
if client.state.is_synchronizing() {
|
||||
manage_synchronization_peers(&mut client.peers);
|
||||
client.execute_synchronization_tasks();
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.for_each(|_| Ok(()))
|
||||
|
@ -376,144 +379,101 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
|
|||
}
|
||||
|
||||
/// Process new blocks inventory
|
||||
fn process_new_blocks_inventory(&mut self, peer_index: usize, mut peer_hashes: Vec<H256>) {
|
||||
// | requested | QUEUED |
|
||||
// --- [1]
|
||||
// --- [2] +
|
||||
// --- [3] +
|
||||
// --- [4]
|
||||
// -+- [5] +
|
||||
// -+- [6] +
|
||||
// -+- [7] +
|
||||
// ---+---------+--- [8] +
|
||||
// ---+--------+--- [9] +
|
||||
// ---+---------+--------+--- [10]
|
||||
|
||||
fn process_new_blocks_inventory(&mut self, peer_index: usize, mut inventory: Vec<H256>) {
|
||||
let mut chain = self.chain.write();
|
||||
|
||||
'outer: loop {
|
||||
// when synchronization is idling
|
||||
// => request full inventory
|
||||
if !chain.has_blocks_of_state(BlockState::Scheduled)
|
||||
&& !chain.has_blocks_of_state(BlockState::Requested) {
|
||||
let unknown_blocks: Vec<_> = peer_hashes.into_iter()
|
||||
.filter(|hash| chain.block_has_state(&hash, BlockState::Unknown))
|
||||
.collect();
|
||||
|
||||
// no new blocks => no need to switch to the synchronizing state
|
||||
if unknown_blocks.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
chain.schedule_blocks_hashes(unknown_blocks);
|
||||
self.peers.insert(peer_index);
|
||||
break;
|
||||
}
|
||||
|
||||
// cases: [2], [5], [6], [8]
|
||||
// if last block from peer_hashes is in window { requested_hashes + queued_hashes }
|
||||
// => no new blocks for synchronization, but we will use this peer in synchronization
|
||||
let peer_hashes_len = peer_hashes.len();
|
||||
if chain.block_has_state(&peer_hashes[peer_hashes_len - 1], BlockState::Scheduled)
|
||||
|| chain.block_has_state(&peer_hashes[peer_hashes_len - 1], BlockState::Requested) {
|
||||
self.peers.insert(peer_index);
|
||||
return;
|
||||
}
|
||||
|
||||
// cases: [1], [3], [4], [7], [9], [10]
|
||||
// try to find new blocks for synchronization from inventory
|
||||
let mut last_known_peer_hash_index = peer_hashes_len - 1;
|
||||
loop {
|
||||
if chain.block_state(&peer_hashes[last_known_peer_hash_index]) != BlockState::Unknown {
|
||||
// we have found first block which is known to us
|
||||
// => blocks in range [(last_known_peer_hash_index + 1)..peer_hashes_len] are unknown
|
||||
// && must be scheduled for request
|
||||
let unknown_peer_hashes = peer_hashes.split_off(last_known_peer_hash_index + 1);
|
||||
|
||||
chain.schedule_blocks_hashes(unknown_peer_hashes);
|
||||
match chain.intersect_with_inventory(&inventory) {
|
||||
InventoryIntersection::NoKnownBlocks(_) if self.state.is_synchronizing() => (),
|
||||
InventoryIntersection::DbAllBlocksKnown => {
|
||||
if self.state.is_synchronizing() {
|
||||
// remember peer as useful
|
||||
self.peers.insert(peer_index);
|
||||
break 'outer;
|
||||
}
|
||||
|
||||
if last_known_peer_hash_index == 0 {
|
||||
// either these are blocks from the future or blocks from the past
|
||||
// => TODO: ignore this peer during synchronization
|
||||
return;
|
||||
},
|
||||
InventoryIntersection::InMemoryNoNewBlocks => {
|
||||
// remember peer as useful
|
||||
self.peers.insert(peer_index);
|
||||
},
|
||||
InventoryIntersection::InMemoryMainNewBlocks(new_block_index)
|
||||
| InventoryIntersection::InMemoryForkNewBlocks(new_block_index)
|
||||
| InventoryIntersection::DbForkNewBlocks(new_block_index)
|
||||
| InventoryIntersection::NoKnownBlocks(new_block_index) => {
|
||||
// schedule new blocks
|
||||
let new_blocks_hashes = inventory.split_off(new_block_index);
|
||||
chain.schedule_blocks_hashes(new_blocks_hashes);
|
||||
// remember peer as useful
|
||||
self.peers.insert(peer_index);
|
||||
// switch to synchronization state
|
||||
if !self.state.is_synchronizing() {
|
||||
self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number);
|
||||
}
|
||||
last_known_peer_hash_index -= 1;
|
||||
}
|
||||
}
|
||||
|
||||
// move to synchronizing state
|
||||
if !self.state.is_synchronizing() {
|
||||
self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number);
|
||||
}
|
||||
}
|
||||
|
||||
/// Process new peer block
|
||||
fn process_peer_block(&mut self, block_hash: H256, block: Block) {
|
||||
// this block is not requested for synchronization
|
||||
let mut chain = self.chain.write();
|
||||
let block_position = chain.remove_block_with_state(&block_hash, BlockState::Requested);
|
||||
if block_position == HashPosition::Missing {
|
||||
return;
|
||||
}
|
||||
fn process_peer_block(&mut self, peer_index: usize, block_hash: H256, block: Block) {
|
||||
let switch_to_saturated = {
|
||||
let mut chain = self.chain.write();
|
||||
match chain.block_state(&block_hash) {
|
||||
BlockState::Verifying | BlockState::Stored => {
|
||||
// remember peer as useful
|
||||
self.peers.insert(peer_index);
|
||||
},
|
||||
BlockState::Unknown | BlockState::Scheduled | BlockState::Requested => {
|
||||
// remove block from current queue
|
||||
chain.remove_block(&block_hash);
|
||||
// check parent block state
|
||||
match chain.block_state(&block.block_header.previous_header_hash) {
|
||||
BlockState::Unknown => (),
|
||||
BlockState::Verifying | BlockState::Stored => {
|
||||
// remember peer as useful
|
||||
self.peers.insert(peer_index);
|
||||
// schedule verification
|
||||
let mut blocks: VecDeque<(H256, Block)> = VecDeque::new();
|
||||
blocks.push_back((block_hash, block));
|
||||
while let Some((block_hash, block)) = blocks.pop_front() {
|
||||
// queue block for verification
|
||||
chain.remove_block(&block_hash);
|
||||
|
||||
// requested block is received => move to saturated state if there are no more blocks
|
||||
if !chain.has_blocks_of_state(BlockState::Scheduled)
|
||||
&& !chain.has_blocks_of_state(BlockState::Requested)
|
||||
&& !chain.has_blocks_of_state(BlockState::Verifying) {
|
||||
self.state = State::Saturated;
|
||||
}
|
||||
match self.verification_work_sender {
|
||||
Some(ref verification_work_sender) => {
|
||||
chain.verify_block_hash(block_hash.clone());
|
||||
verification_work_sender
|
||||
.send(VerificationTask::VerifyBlock(block))
|
||||
.expect("Verification thread have the same lifetime as `Synchronization`")
|
||||
},
|
||||
None => chain.insert_best_block(block)
|
||||
.expect("Error inserting to db."),
|
||||
}
|
||||
|
||||
// check if this block is next block in the blockchain
|
||||
if block_position == HashPosition::Front {
|
||||
// check if this parent of this block is current best_block
|
||||
let expecting_previous_header_hash = chain.best_block_of_state(BlockState::Verifying)
|
||||
.unwrap_or_else(|| {
|
||||
chain.best_block_of_state(BlockState::Stored)
|
||||
.expect("storage with genesis block is required")
|
||||
}).hash;
|
||||
if block.block_header.previous_header_hash != expecting_previous_header_hash {
|
||||
// TODO: penalize peer
|
||||
warn!(target: "sync", "Out-of-order block {:?} was dropped. Expecting block with parent hash {:?}", block_hash, expecting_previous_header_hash);
|
||||
return;
|
||||
}
|
||||
|
||||
// this is next block in the blockchain => queue for verification
|
||||
// also unwrap all dependent orphan blocks
|
||||
let mut current_block = block;
|
||||
let mut current_block_hash = block_hash;
|
||||
loop {
|
||||
match self.verification_work_sender {
|
||||
Some(ref verification_work_sender) => {
|
||||
chain.verify_block_hash(current_block_hash.clone());
|
||||
verification_work_sender
|
||||
.send(VerificationTask::VerifyBlock(current_block))
|
||||
.expect("Verification thread have the same lifetime as `Synchronization`");
|
||||
},
|
||||
None => {
|
||||
chain.insert_best_block(current_block)
|
||||
.expect("Error inserting to db.");
|
||||
// process orphan blocks
|
||||
if let Entry::Occupied(entry) = self.orphaned_blocks.entry(block_hash) {
|
||||
let (_, orphaned) = entry.remove_entry();
|
||||
blocks.extend(orphaned);
|
||||
}
|
||||
}
|
||||
},
|
||||
BlockState::Requested | BlockState::Scheduled => {
|
||||
// remember peer as useful
|
||||
self.peers.insert(peer_index);
|
||||
// remember as orphan block
|
||||
self.orphaned_blocks
|
||||
.entry(block.block_header.previous_header_hash.clone())
|
||||
.or_insert(Vec::new())
|
||||
.push((block_hash, block))
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// proceed to the next orphaned block
|
||||
if let Entry::Occupied(orphaned_block_entry) = self.orphaned_blocks.entry(current_block_hash) {
|
||||
let (_, orphaned_block) = orphaned_block_entry.remove_entry();
|
||||
current_block = orphaned_block;
|
||||
current_block_hash = current_block.hash();
|
||||
}
|
||||
else {
|
||||
break;
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
// requested block is received => move to saturated state if there are no more blocks
|
||||
chain.length_of_state(BlockState::Scheduled) == 0
|
||||
&& chain.length_of_state(BlockState::Requested) == 0
|
||||
};
|
||||
|
||||
// this block is not the next one => mark it as orphaned
|
||||
self.orphaned_blocks.insert(block.block_header.previous_header_hash.clone(), block);
|
||||
if switch_to_saturated {
|
||||
self.switch_to_saturated_state(true);
|
||||
}
|
||||
}
|
||||
|
||||
/// Schedule new synchronization tasks, if any.
|
||||
|
@ -540,18 +500,11 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: instead of issuing duplicated inventory requests, wait until enough new blocks are verified, then issue
|
||||
// check if we can query some blocks hashes
|
||||
let scheduled_hashes_len = chain.length_of_state(BlockState::Scheduled);
|
||||
if scheduled_hashes_len < MAX_SCHEDULED_HASHES {
|
||||
if self.state.is_synchronizing() {
|
||||
tasks.push(Task::RequestBestInventory(idle_peers[0]));
|
||||
self.peers.on_inventory_requested(idle_peers[0]);
|
||||
}
|
||||
else {
|
||||
tasks.push(Task::RequestInventory(idle_peers[0]));
|
||||
self.peers.on_inventory_requested(idle_peers[0]);
|
||||
}
|
||||
tasks.push(Task::RequestInventory(idle_peers[0]));
|
||||
self.peers.on_inventory_requested(idle_peers[0]);
|
||||
}
|
||||
|
||||
// check if we can move some blocks from scheduled to requested queue
|
||||
|
@ -578,6 +531,27 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
|
|||
}
|
||||
}
|
||||
|
||||
/// Switch to saturated state
|
||||
fn switch_to_saturated_state(&mut self, ask_for_inventory: bool) {
|
||||
self.state = State::Saturated;
|
||||
self.orphaned_blocks.clear();
|
||||
self.peers.reset();
|
||||
|
||||
{
|
||||
let mut chain = self.chain.write();
|
||||
chain.remove_blocks_with_state(BlockState::Requested);
|
||||
chain.remove_blocks_with_state(BlockState::Scheduled);
|
||||
}
|
||||
|
||||
if ask_for_inventory {
|
||||
let mut executor = self.executor.lock();
|
||||
for idle_peer in self.peers.idle_peers() {
|
||||
self.peers.on_inventory_requested(idle_peer);
|
||||
executor.execute(Task::RequestInventory(idle_peer));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Thread procedure for handling verification tasks
|
||||
fn verification_worker_proc(sync: Arc<Mutex<Self>>, storage: Arc<db::Store>, work_receiver: Receiver<VerificationTask>) {
|
||||
let verifier = ChainVerifier::new(storage);
|
||||
|
@ -612,6 +586,7 @@ pub mod tests {
|
|||
use p2p::event_loop;
|
||||
use test_data;
|
||||
use db;
|
||||
use primitives::hash::H256;
|
||||
|
||||
fn create_sync() -> (Core, Handle, Arc<Mutex<DummyTaskExecutor>>, Arc<Mutex<SynchronizationClient<DummyTaskExecutor>>>) {
|
||||
let event_loop = event_loop();
|
||||
|
@ -638,13 +613,13 @@ pub mod tests {
|
|||
let (_, _, executor, sync) = create_sync();
|
||||
|
||||
let mut sync = sync.lock();
|
||||
let block1: Block = "010000006fe28c0ab6f1b372c1a6a246ae63f74f931e8365e15a089c68d6190000000000982051fd1e4ba744bbbe680e1fee14677ba1a3c3540bf7b1cdb606e857233e0e61bc6649ffff001d01e362990101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000".into();
|
||||
let block2: Block = "010000004860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000d5fdcc541e25de1c7a5addedf24858b8bb665c9f36ef744ee42c316022c90f9bb0bc6649ffff001d08d2bd610101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d010bffffffff0100f2052a010000004341047211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073dee6c89064984f03385237d92167c13e236446b417ab79a0fcae412ae3316b77ac00000000".into();
|
||||
let block1: Block = test_data::block_h1();
|
||||
let block2: Block = test_data::block_h2();
|
||||
|
||||
sync.on_new_blocks_inventory(5, vec![block1.hash()]);
|
||||
let tasks = executor.lock().take_tasks();
|
||||
assert_eq!(tasks.len(), 2);
|
||||
assert_eq!(tasks[0], Task::RequestBestInventory(5));
|
||||
assert_eq!(tasks[0], Task::RequestInventory(5));
|
||||
assert_eq!(tasks[1], Task::RequestBlocks(5, vec![block1.hash()]));
|
||||
assert!(sync.information().state.is_synchronizing());
|
||||
assert_eq!(sync.information().orphaned, 0);
|
||||
|
@ -654,23 +629,23 @@ pub mod tests {
|
|||
assert_eq!(sync.information().peers.idle, 0);
|
||||
assert_eq!(sync.information().peers.active, 1);
|
||||
|
||||
// push unknown block => nothing should change
|
||||
// push unknown block => will be queued as orphan
|
||||
sync.on_peer_block(5, block2);
|
||||
assert!(sync.information().state.is_synchronizing());
|
||||
assert_eq!(sync.information().orphaned, 0);
|
||||
assert_eq!(sync.information().orphaned, 1);
|
||||
assert_eq!(sync.information().chain.scheduled, 0);
|
||||
assert_eq!(sync.information().chain.requested, 1);
|
||||
assert_eq!(sync.information().chain.stored, 1);
|
||||
assert_eq!(sync.information().peers.idle, 0);
|
||||
assert_eq!(sync.information().peers.active, 1);
|
||||
|
||||
// push requested block => should be moved to the test storage
|
||||
// push requested block => should be moved to the test storage && orphan should be moved
|
||||
sync.on_peer_block(5, block1);
|
||||
assert!(!sync.information().state.is_synchronizing());
|
||||
assert_eq!(sync.information().orphaned, 0);
|
||||
assert_eq!(sync.information().chain.scheduled, 0);
|
||||
assert_eq!(sync.information().chain.requested, 0);
|
||||
assert_eq!(sync.information().chain.stored, 2);
|
||||
assert_eq!(sync.information().chain.stored, 3);
|
||||
// we have just requested new `inventory` from the peer => peer is forgotten
|
||||
assert_eq!(sync.information().peers.idle, 0);
|
||||
assert_eq!(sync.information().peers.active, 0);
|
||||
|
@ -681,7 +656,7 @@ pub mod tests {
|
|||
let (_, _, _, sync) = create_sync();
|
||||
let mut sync = sync.lock();
|
||||
|
||||
let block2: Block = "010000004860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000d5fdcc541e25de1c7a5addedf24858b8bb665c9f36ef744ee42c316022c90f9bb0bc6649ffff001d08d2bd610101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d010bffffffff0100f2052a010000004341047211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073dee6c89064984f03385237d92167c13e236446b417ab79a0fcae412ae3316b77ac00000000".into();
|
||||
let block2: Block = test_data::block_h169();
|
||||
|
||||
sync.on_new_blocks_inventory(5, vec![block2.hash()]);
|
||||
sync.on_peer_block(5, block2);
|
||||
|
@ -702,8 +677,8 @@ pub mod tests {
|
|||
fn synchronization_parallel_peers() {
|
||||
let (_, _, executor, sync) = create_sync();
|
||||
|
||||
let block1: Block = "010000006fe28c0ab6f1b372c1a6a246ae63f74f931e8365e15a089c68d6190000000000982051fd1e4ba744bbbe680e1fee14677ba1a3c3540bf7b1cdb606e857233e0e61bc6649ffff001d01e362990101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000".into();
|
||||
let block2: Block = "010000004860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000d5fdcc541e25de1c7a5addedf24858b8bb665c9f36ef744ee42c316022c90f9bb0bc6649ffff001d08d2bd610101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d010bffffffff0100f2052a010000004341047211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073dee6c89064984f03385237d92167c13e236446b417ab79a0fcae412ae3316b77ac00000000".into();
|
||||
let block1: Block = test_data::block_h1();
|
||||
let block2: Block = test_data::block_h2();
|
||||
|
||||
{
|
||||
let mut sync = sync.lock();
|
||||
|
@ -715,7 +690,7 @@ pub mod tests {
|
|||
// synchronization has started && new blocks have been requested
|
||||
let tasks = executor.lock().take_tasks();
|
||||
assert!(sync.information().state.is_synchronizing());
|
||||
assert_eq!(tasks, vec![Task::RequestBestInventory(1), Task::RequestBlocks(1, vec![block1.hash()])]);
|
||||
assert_eq!(tasks, vec![Task::RequestInventory(1), Task::RequestBlocks(1, vec![block1.hash()])]);
|
||||
}
|
||||
|
||||
{
|
||||
|
@ -726,7 +701,7 @@ pub mod tests {
|
|||
// synchronization has started && new blocks have been requested
|
||||
let tasks = executor.lock().take_tasks();
|
||||
assert!(sync.information().state.is_synchronizing());
|
||||
assert_eq!(tasks, vec![Task::RequestBestInventory(2), Task::RequestBlocks(2, vec![block2.hash()])]);
|
||||
assert_eq!(tasks, vec![Task::RequestInventory(2), Task::RequestBlocks(2, vec![block2.hash()])]);
|
||||
}
|
||||
|
||||
{
|
||||
|
@ -750,7 +725,7 @@ pub mod tests {
|
|||
// request new blocks
|
||||
{
|
||||
let mut sync = sync.lock();
|
||||
sync.on_new_blocks_inventory(1, vec!["0000000000000000000000000000000000000000000000000000000000000000".into()]);
|
||||
sync.on_new_blocks_inventory(1, vec![H256::from(0)]);
|
||||
assert!(sync.information().state.is_synchronizing());
|
||||
}
|
||||
|
||||
|
@ -774,4 +749,21 @@ pub mod tests {
|
|||
let tasks = executor.lock().take_tasks();
|
||||
assert_eq!(tasks, vec![]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn synchronization_asks_for_inventory_after_saturating() {
|
||||
let (_, _, executor, sync) = create_sync();
|
||||
let mut sync = sync.lock();
|
||||
let block = test_data::block_h1();
|
||||
let block_hash = block.hash();
|
||||
sync.on_new_blocks_inventory(1, vec![block_hash.clone()]);
|
||||
sync.on_new_blocks_inventory(2, vec![block_hash.clone()]);
|
||||
executor.lock().take_tasks();
|
||||
sync.on_peer_block(2, block);
|
||||
|
||||
let tasks = executor.lock().take_tasks();
|
||||
assert_eq!(tasks.len(), 2);
|
||||
assert!(tasks.iter().any(|t| t == &Task::RequestInventory(1)));
|
||||
assert!(tasks.iter().any(|t| t == &Task::RequestInventory(2)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,8 +23,6 @@ pub enum Task {
|
|||
RequestBlocks(usize, Vec<H256>),
|
||||
/// Request full inventory using block_locator_hashes.
|
||||
RequestInventory(usize),
|
||||
/// Request inventory using best block locator only.
|
||||
RequestBestInventory(usize),
|
||||
/// Send block.
|
||||
SendBlock(usize, Block),
|
||||
/// Send notfound
|
||||
|
@ -96,20 +94,6 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
|
|||
connection.send_getblocks(&getblocks);
|
||||
}
|
||||
},
|
||||
Task::RequestBestInventory(peer_index) => {
|
||||
let block_locator_hashes = self.chain.read().best_block_locator_hashes();
|
||||
let getblocks = types::GetBlocks {
|
||||
version: 0,
|
||||
block_locator_hashes: block_locator_hashes,
|
||||
hash_stop: H256::default(),
|
||||
};
|
||||
|
||||
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
||||
let connection = &mut *connection;
|
||||
trace!(target: "sync", "Querying best inventory from peer#{}", peer_index);
|
||||
connection.send_getblocks(&getblocks);
|
||||
}
|
||||
},
|
||||
Task::SendBlock(peer_index, block) => {
|
||||
let block_message = types::Block {
|
||||
block: block,
|
||||
|
|
|
@ -73,11 +73,12 @@ impl Peers {
|
|||
}
|
||||
|
||||
/// Peer has been disconnected
|
||||
pub fn on_peer_disconnected(&mut self, peer_index: usize) {
|
||||
pub fn on_peer_disconnected(&mut self, peer_index: usize) -> bool {
|
||||
self.idle.remove(&peer_index);
|
||||
self.requests.remove(&peer_index);
|
||||
self.failures.remove(&peer_index);
|
||||
self.times.remove(&peer_index);
|
||||
(self.idle.len() + self.requests.len()) == 0
|
||||
}
|
||||
|
||||
/// Block is received from peer.
|
||||
|
|
|
@ -62,9 +62,9 @@ impl SynchronizationServer {
|
|||
|
||||
fn locate_known_block(&self, block_locator_hashes: Vec<H256>) -> Option<db::BestBlock> {
|
||||
let chain = self.chain.read();
|
||||
let storage = chain.storage();
|
||||
block_locator_hashes.into_iter()
|
||||
.filter_map(|hash| chain
|
||||
.storage_block_number(&hash)
|
||||
.filter_map(|hash| storage.block_number(&hash)
|
||||
.map(|number| db::BestBlock {
|
||||
number: number,
|
||||
hash: hash,
|
||||
|
@ -93,15 +93,19 @@ impl SynchronizationServer {
|
|||
(peer_index, ServerTask::ServeGetData(inventory)) => {
|
||||
let mut unknown_items: Vec<InventoryVector> = Vec::new();
|
||||
let mut new_tasks: Vec<ServerTask> = Vec::new();
|
||||
for item in inventory {
|
||||
match item.inv_type {
|
||||
InventoryType::MessageBlock => {
|
||||
match chain.read().storage_block_number(&item.hash) {
|
||||
Some(_) => new_tasks.push(ServerTask::ReturnBlock(item.hash.clone())),
|
||||
None => unknown_items.push(item),
|
||||
}
|
||||
},
|
||||
_ => (), // TODO: process other inventory types
|
||||
{
|
||||
let chain = chain.read();
|
||||
let storage = chain.storage();
|
||||
for item in inventory {
|
||||
match item.inv_type {
|
||||
InventoryType::MessageBlock => {
|
||||
match storage.block_number(&item.hash) {
|
||||
Some(_) => new_tasks.push(ServerTask::ReturnBlock(item.hash.clone())),
|
||||
None => unknown_items.push(item),
|
||||
}
|
||||
},
|
||||
_ => (), // TODO: process other inventory types
|
||||
}
|
||||
}
|
||||
}
|
||||
// respond with `notfound` message for unknown data
|
||||
|
@ -132,7 +136,12 @@ impl SynchronizationServer {
|
|||
let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_block, &hash_stop, 2000);
|
||||
if !blocks_hashes.is_empty() {
|
||||
trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_hashes.len(), peer_index);
|
||||
let blocks_headers = blocks_hashes.into_iter().filter_map(|hash| chain.read().storage_block(&hash).map(|block| block.block_header)).collect();
|
||||
let chain = chain.read();
|
||||
let storage = chain.storage();
|
||||
// TODO: read block_header only
|
||||
let blocks_headers = blocks_hashes.into_iter()
|
||||
.filter_map(|hash| storage.block(db::BlockRef::Hash(hash)).map(|block| block.block_header))
|
||||
.collect();
|
||||
executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers));
|
||||
}
|
||||
},
|
||||
|
@ -158,8 +167,7 @@ impl SynchronizationServer {
|
|||
},
|
||||
// `block`
|
||||
(peer_index, ServerTask::ReturnBlock(block_hash)) => {
|
||||
let storage_block = chain.read().storage_block(&block_hash);
|
||||
if let Some(storage_block) = storage_block {
|
||||
if let Some(storage_block) = chain.read().storage().block(db::BlockRef::Hash(block_hash)) {
|
||||
executor.lock().execute(Task::SendBlock(peer_index, storage_block));
|
||||
}
|
||||
},
|
||||
|
@ -172,7 +180,9 @@ impl SynchronizationServer {
|
|||
|
||||
fn blocks_hashes_after(chain: &ChainRef, best_block: &db::BestBlock, hash_stop: &H256, max_hashes: u32) -> Vec<H256> {
|
||||
let mut hashes: Vec<H256> = Vec::new();
|
||||
let storage_block_hash = chain.read().storage_block_hash(best_block.number);
|
||||
let chain = chain.read();
|
||||
let storage = chain.storage();
|
||||
let storage_block_hash = storage.block_hash(best_block.number);
|
||||
if let Some(hash) = storage_block_hash {
|
||||
// check that chain has not reorganized since task was queued
|
||||
if hash == best_block.hash {
|
||||
|
@ -180,7 +190,7 @@ impl SynchronizationServer {
|
|||
let last_block_number = first_block_number + max_hashes;
|
||||
// `max_hashes` hashes after best_block.number OR hash_stop OR blockchain end
|
||||
for block_number in first_block_number..last_block_number {
|
||||
match chain.read().storage_block_hash(block_number) {
|
||||
match storage.block_hash(block_number) {
|
||||
Some(ref block_hash) if block_hash == hash_stop => break,
|
||||
None => break,
|
||||
Some(block_hash) => {
|
||||
|
|
Loading…
Reference in New Issue