From adfdef609537d8f50d5f2e7f692cae581e1182b2 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Mon, 7 Nov 2016 12:12:34 +0300 Subject: [PATCH 1/7] Forks support in sync (#91) * intersect_with_inventory * process_new_blocks_inventory fixed to support forks * fixed sync issues * fixed on_peer_block to support forks * forks support in on_block_verification_* * cleaning up * cleaning up * forget about best inventory * ask for inventory after saturating * '000..000'.into() -> H256::from(0) --- sync/src/hash_queue.rs | 97 ++----- sync/src/synchronization_chain.rs | 406 +++++++++++++++------------ sync/src/synchronization_client.rs | 384 +++++++++++++------------ sync/src/synchronization_executor.rs | 16 -- sync/src/synchronization_peers.rs | 3 +- sync/src/synchronization_server.rs | 42 +-- 6 files changed, 466 insertions(+), 482 deletions(-) diff --git a/sync/src/hash_queue.rs b/sync/src/hash_queue.rs index 0d2b0d2b..7a4dd19a 100644 --- a/sync/src/hash_queue.rs +++ b/sync/src/hash_queue.rs @@ -39,11 +39,6 @@ impl HashQueue { self.queue.len() as u32 } - /// Returns true if queue is empty. - pub fn is_empty(&self) -> bool { - self.queue.is_empty() - } - /// Returns front element from the given queue. pub fn front(&self) -> Option { self.queue.front().cloned() @@ -63,19 +58,6 @@ impl HashQueue { Some(self.queue[queue_len - 2].clone()) } - /// Returns n-th element (n is starting from 0), starting from the back-element in the queue. - /// If there are no n-th element - returns (n-1) element & etc. - pub fn back_skip_n(&self, n: usize) -> Option { - let queue_len = self.queue.len(); - if queue_len == 0 { - return None; - } - if n + 1 > queue_len { - return Some(self.queue[0].clone()) - } - return Some(self.queue[queue_len - n - 1].clone()) - } - /// Returns true if queue contains element. pub fn contains(&self, hash: &H256) -> bool { self.set.contains(hash) @@ -175,11 +157,6 @@ impl HashQueueChain { self.chain[queue_index].len() } - /// Returns true if given queue is empty. - pub fn is_empty_at(&self, queue_index: usize) -> bool { - self.chain[queue_index].is_empty() - } - /// Returns element at the front of the given queue. pub fn front_at(&self, queue_index: usize) -> Option { let ref queue = self.chain[queue_index]; @@ -198,13 +175,6 @@ impl HashQueueChain { queue.pre_back() } - /// Returns n-th element (n is starting from 0), starting from the back-element in given queue. - /// If there are no n-th element - returns (n-1) element & etc. - pub fn back_skip_n_at(&self, chain_index: usize, n: usize) -> Option { - let ref queue = self.chain[chain_index]; - queue.back_skip_n(n) - } - /// Returns the back of the whole chain. pub fn back(&self) -> Option { let mut queue_index = self.chain.len() - 1; @@ -223,6 +193,7 @@ impl HashQueueChain { } /// Checks if hash is contained in given queue. + #[cfg(test)] pub fn is_contained_in(&self, queue_index: usize, hash: &H256) -> bool { self.chain[queue_index].contains(hash) } @@ -283,16 +254,15 @@ impl Index for HashQueueChain { #[cfg(test)] mod tests { use super::{HashQueue, HashQueueChain, HashPosition}; + use primitives::hash::H256; #[test] fn hash_queue_empty() { let mut queue = HashQueue::new(); assert_eq!(queue.len(), 0); - assert_eq!(queue.is_empty(), true); assert_eq!(queue.front(), None); assert_eq!(queue.back(), None); assert_eq!(queue.pre_back(), None); - assert_eq!(queue.back_skip_n(100), None); assert_eq!(queue.contains(&"000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f".into()), false); assert_eq!(queue.pop_front(), None); assert_eq!(queue.pop_front_n(100), vec![]); @@ -304,11 +274,9 @@ mod tests { let mut chain = HashQueueChain::with_number_of_queues(3); assert_eq!(chain.len(), 0); assert_eq!(chain.len_of(0), 0); - assert_eq!(chain.is_empty_at(0), true); assert_eq!(chain.front_at(0), None); assert_eq!(chain.back_at(0), None); assert_eq!(chain.pre_back_at(0), None); - assert_eq!(chain.back_skip_n_at(0, 100), None); assert_eq!(chain.back(), None); assert_eq!(chain.is_contained_in(0, &"000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f".into()), false); assert_eq!(chain.contains_in(&"000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f".into()), None); @@ -320,16 +288,16 @@ mod tests { fn hash_queue_chain_not_empty() { let mut chain = HashQueueChain::with_number_of_queues(4); chain.push_back_n_at(0, vec![ - "0000000000000000000000000000000000000000000000000000000000000000".into(), - "0000000000000000000000000000000000000000000000000000000000000001".into(), - "0000000000000000000000000000000000000000000000000000000000000002".into(), + H256::from(0), + H256::from(1), + H256::from(2), ]); chain.push_back_n_at(1, vec![ - "0000000000000000000000000000000000000000000000000000000000000003".into(), - "0000000000000000000000000000000000000000000000000000000000000004".into(), + H256::from(3), + H256::from(4), ]); chain.push_back_n_at(2, vec![ - "0000000000000000000000000000000000000000000000000000000000000005".into(), + H256::from(5), ]); assert_eq!(chain.len(), 6); @@ -337,42 +305,25 @@ mod tests { assert_eq!(chain.len_of(1), 2); assert_eq!(chain.len_of(2), 1); assert_eq!(chain.len_of(3), 0); - assert_eq!(chain.is_empty_at(0), false); - assert_eq!(chain.is_empty_at(1), false); - assert_eq!(chain.is_empty_at(2), false); - assert_eq!(chain.is_empty_at(3), true); - assert_eq!(chain.front_at(0), Some("0000000000000000000000000000000000000000000000000000000000000000".into())); - assert_eq!(chain.front_at(1), Some("0000000000000000000000000000000000000000000000000000000000000003".into())); - assert_eq!(chain.front_at(2), Some("0000000000000000000000000000000000000000000000000000000000000005".into())); + assert_eq!(chain.front_at(0), Some(H256::from(0))); + assert_eq!(chain.front_at(1), Some(H256::from(3))); + assert_eq!(chain.front_at(2), Some(H256::from(5))); assert_eq!(chain.front_at(3), None); - assert_eq!(chain.back_at(0), Some("0000000000000000000000000000000000000000000000000000000000000002".into())); - assert_eq!(chain.back_at(1), Some("0000000000000000000000000000000000000000000000000000000000000004".into())); - assert_eq!(chain.back_at(2), Some("0000000000000000000000000000000000000000000000000000000000000005".into())); + assert_eq!(chain.back_at(0), Some(H256::from(2))); + assert_eq!(chain.back_at(1), Some(H256::from(4))); + assert_eq!(chain.back_at(2), Some(H256::from(5))); assert_eq!(chain.back_at(3), None); - assert_eq!(chain.pre_back_at(0), Some("0000000000000000000000000000000000000000000000000000000000000001".into())); - assert_eq!(chain.pre_back_at(1), Some("0000000000000000000000000000000000000000000000000000000000000003".into())); + assert_eq!(chain.pre_back_at(0), Some(H256::from(1))); + assert_eq!(chain.pre_back_at(1), Some(H256::from(3))); assert_eq!(chain.pre_back_at(2), None); assert_eq!(chain.pre_back_at(3), None); - assert_eq!(chain.back(), Some("0000000000000000000000000000000000000000000000000000000000000005".into())); - assert_eq!(chain.is_contained_in(0, &"0000000000000000000000000000000000000000000000000000000000000002".into()), true); - assert_eq!(chain.is_contained_in(1, &"0000000000000000000000000000000000000000000000000000000000000002".into()), false); - assert_eq!(chain.is_contained_in(2, &"0000000000000000000000000000000000000000000000000000000000000002".into()), false); - assert_eq!(chain.is_contained_in(3, &"0000000000000000000000000000000000000000000000000000000000000002".into()), false); - assert_eq!(chain.contains_in(&"0000000000000000000000000000000000000000000000000000000000000002".into()), Some(0)); - assert_eq!(chain.contains_in(&"0000000000000000000000000000000000000000000000000000000000000005".into()), Some(2)); - assert_eq!(chain.contains_in(&"0000000000000000000000000000000000000000000000000000000000000009".into()), None); - - assert_eq!(chain.back_skip_n_at(0, 0), Some("0000000000000000000000000000000000000000000000000000000000000002".into())); - assert_eq!(chain.back_skip_n_at(1, 0), Some("0000000000000000000000000000000000000000000000000000000000000004".into())); - assert_eq!(chain.back_skip_n_at(2, 0), Some("0000000000000000000000000000000000000000000000000000000000000005".into())); - assert_eq!(chain.back_skip_n_at(3, 0), None); - assert_eq!(chain.back_skip_n_at(0, 1), Some("0000000000000000000000000000000000000000000000000000000000000001".into())); - assert_eq!(chain.back_skip_n_at(1, 1), Some("0000000000000000000000000000000000000000000000000000000000000003".into())); - assert_eq!(chain.back_skip_n_at(2, 1), Some("0000000000000000000000000000000000000000000000000000000000000005".into())); - assert_eq!(chain.back_skip_n_at(3, 1), None); - assert_eq!(chain.back_skip_n_at(0, 2), Some("0000000000000000000000000000000000000000000000000000000000000000".into())); - assert_eq!(chain.back_skip_n_at(1, 2), Some("0000000000000000000000000000000000000000000000000000000000000003".into())); - assert_eq!(chain.back_skip_n_at(2, 2), Some("0000000000000000000000000000000000000000000000000000000000000005".into())); - assert_eq!(chain.back_skip_n_at(3, 2), None); + assert_eq!(chain.back(), Some(H256::from(5))); + assert_eq!(chain.is_contained_in(0, &H256::from(2)), true); + assert_eq!(chain.is_contained_in(1, &H256::from(2)), false); + assert_eq!(chain.is_contained_in(2, &H256::from(2)), false); + assert_eq!(chain.is_contained_in(3, &H256::from(2)), false); + assert_eq!(chain.contains_in(&H256::from(2)), Some(0)); + assert_eq!(chain.contains_in(&H256::from(5)), Some(2)); + assert_eq!(chain.contains_in(&H256::from(9)), None); } } diff --git a/sync/src/synchronization_chain.rs b/sync/src/synchronization_chain.rs index fb3206bd..c9bfe55c 100644 --- a/sync/src/synchronization_chain.rs +++ b/sync/src/synchronization_chain.rs @@ -1,7 +1,7 @@ use std::fmt; use std::sync::Arc; use parking_lot::RwLock; -use chain::{Block, RepresentH256}; +use chain::Block; use db; use primitives::hash::H256; use hash_queue::{HashQueueChain, HashPosition}; @@ -47,6 +47,23 @@ pub struct Information { pub stored: u32, } +/// Result of intersecting chain && inventory +#[derive(Debug, PartialEq)] +pub enum InventoryIntersection { + /// 3.2: No intersection with in-memory queue && no intersection with db + NoKnownBlocks(usize), + /// 2.3: Inventory has no new blocks && some of blocks in inventory are in in-memory queue + InMemoryNoNewBlocks, + /// 2.4.2: Inventory has new blocks && these blocks are right after chain' best block + InMemoryMainNewBlocks(usize), + /// 2.4.3: Inventory has new blocks && these blocks are forked from our chain' best block + InMemoryForkNewBlocks(usize), + /// 3.3: No intersection with in-memory queue && has intersection with db && all blocks are already stored in db + DbAllBlocksKnown, + /// 3.4: No intersection with in-memory queue && has intersection with db && some blocks are not yet stored in db + DbForkNewBlocks(usize), +} + /// Blockchain from synchroniation point of view, consisting of: /// 1) all blocks from the `storage` [oldest blocks] /// 2) all blocks currently verifying by `verification_queue` @@ -109,7 +126,7 @@ impl Chain { scheduled: self.hash_chain.len_of(SCHEDULED_QUEUE), requested: self.hash_chain.len_of(REQUESTED_QUEUE), verifying: self.hash_chain.len_of(VERIFYING_QUEUE), - stored: self.storage.best_block().map_or(0, |block| block.number + 1), + stored: self.best_storage_block.number + 1, } } @@ -137,68 +154,14 @@ impl Chain { } } - /// Returns true if has blocks of given type - pub fn has_blocks_of_state(&self, state: BlockState) -> bool { - match state { - BlockState::Stored => true, // storage with genesis block is required - _ => !self.hash_chain.is_empty_at(state.to_queue_index()), - } - } - /// Get best block pub fn best_block(&self) -> db::BestBlock { - let storage_best_block = self.storage.best_block().expect("storage with genesis block is required"); match self.hash_chain.back() { Some(hash) => db::BestBlock { - number: storage_best_block.number + self.hash_chain.len(), + number: self.best_storage_block.number + self.hash_chain.len(), hash: hash.clone(), }, - None => db::BestBlock { - number: storage_best_block.number, - hash: storage_best_block.hash, - } - } - } - - /// Get best block of given state - pub fn best_block_of_state(&self, state: BlockState) -> Option { - match state { - BlockState::Scheduled => self.hash_chain.back_at(SCHEDULED_QUEUE) - .map(|hash| db::BestBlock { - hash: hash, - number: self.storage.best_block().expect("storage with genesis block is required").number + 1 - + self.hash_chain.len_of(VERIFYING_QUEUE) - + self.hash_chain.len_of(REQUESTED_QUEUE) - + self.hash_chain.len_of(SCHEDULED_QUEUE) - }), - BlockState::Requested => self.hash_chain.back_at(REQUESTED_QUEUE) - .map(|hash| db::BestBlock { - hash: hash, - number: self.storage.best_block().expect("storage with genesis block is required").number + 1 - + self.hash_chain.len_of(VERIFYING_QUEUE) - + self.hash_chain.len_of(REQUESTED_QUEUE) - }), - BlockState::Verifying => self.hash_chain.back_at(VERIFYING_QUEUE) - .map(|hash| db::BestBlock { - hash: hash, - number: self.storage.best_block().expect("storage with genesis block is required").number + 1 - + self.hash_chain.len_of(VERIFYING_QUEUE) - }), - BlockState::Stored => { - self.storage.best_block() - }, - _ => panic!("not supported"), - } - } - - /// Check if block has given state - pub fn block_has_state(&self, hash: &H256, state: BlockState) -> bool { - match state { - BlockState::Scheduled => self.hash_chain.is_contained_in(SCHEDULED_QUEUE, hash), - BlockState::Requested => self.hash_chain.is_contained_in(REQUESTED_QUEUE, hash), - BlockState::Verifying => self.hash_chain.is_contained_in(VERIFYING_QUEUE, hash), - BlockState::Stored => self.storage.contains_block(db::BlockRef::Hash(hash.clone())), - BlockState::Unknown => self.block_state(hash) == BlockState::Unknown, + None => self.best_storage_block.clone(), } } @@ -214,39 +177,12 @@ impl Chain { } } - /// Get block number from storage - pub fn storage_block_number(&self, hash: &H256) -> Option { - self.storage.block_number(hash) - } - - /// Get block hash from storage - pub fn storage_block_hash(&self, number: u32) -> Option { - self.storage.block_hash(number) - } - - /// Get block from the storage - pub fn storage_block(&self, hash: &H256) -> Option { - self.storage.block(db::BlockRef::Hash(hash.clone())) - } - - /// Prepare best block locator hashes - pub fn best_block_locator_hashes(&self) -> Vec { - let mut result: Vec = Vec::with_capacity(4); - if let Some(pre_best_block) = self.hash_chain.back_skip_n_at(SCHEDULED_QUEUE, 2) { - result.push(pre_best_block); - } - if let Some(pre_best_block) = self.hash_chain.back_skip_n_at(REQUESTED_QUEUE, 2) { - result.push(pre_best_block); - } - if let Some(pre_best_block) = self.hash_chain.back_skip_n_at(VERIFYING_QUEUE, 2) { - result.push(pre_best_block); - } - result.push(self.best_storage_block.hash.clone()); - result - } - /// Prepare block locator hashes, as described in protocol documentation: /// https://en.bitcoin.it/wiki/Protocol_documentation#getblocks + /// When there are forked blocks in the queue, this method can result in + /// mixed block locator hashes ([0 - from fork1, 1 - from fork2, 2 - from fork1]). + /// Peer will respond with blocks of fork1 || fork2 => we could end up in some side fork + /// To resolve this, after switching to saturated state, we will also ask all peers for inventory. pub fn block_locator_hashes(&self) -> Vec { let mut block_locator_hashes: Vec = Vec::new(); @@ -254,8 +190,7 @@ impl Chain { let (local_index, step) = self.block_locator_hashes_for_queue(&mut block_locator_hashes); // calculate for storage - let storage_best_block_number = self.storage.best_block().expect("storage with genesis block is required").number; - let storage_index = if storage_best_block_number < local_index { 0 } else { storage_best_block_number - local_index }; + let storage_index = if self.best_storage_block.number < local_index { 0 } else { self.best_storage_block.number - local_index }; self.block_locator_hashes_for_storage(storage_index, step, &mut block_locator_hashes); block_locator_hashes } @@ -287,16 +222,21 @@ impl Chain { /// Insert new best block to storage pub fn insert_best_block(&mut self, block: Block) -> Result<(), db::Error> { - if block.block_header.previous_header_hash != self.best_storage_block.hash { - return Err(db::Error::DB("Trying to insert out-of-order block".into())); - } + // insert to storage + try!(self.storage.insert_block(&block)); // remember new best block hash - self.best_storage_block.number += 1; - self.best_storage_block.hash = block.hash(); + self.best_storage_block = self.storage.best_block().expect("Inserted block above"); - // insert to storage - self.storage.insert_block(&block) + Ok(()) + } + + /// Remove block + pub fn remove_block(&mut self, hash: &H256) { + if self.hash_chain.remove_at(SCHEDULED_QUEUE, hash) == HashPosition::Missing + && self.hash_chain.remove_at(REQUESTED_QUEUE, hash) == HashPosition::Missing { + self.hash_chain.remove_at(VERIFYING_QUEUE, hash); + } } /// Remove block by hash if it is currently in given state @@ -309,6 +249,57 @@ impl Chain { self.hash_chain.remove_all_at(state.to_queue_index()); } + /// Intersect chain with inventory + pub fn intersect_with_inventory(&self, inventory: &Vec) -> InventoryIntersection { + let inventory_len = inventory.len(); + assert!(inventory_len != 0); + + // giving that blocks in inventory are ordered + match self.block_state(&inventory[0]) { + // if first block of inventory is unknown => all other blocks are also unknown + BlockState::Unknown => { + InventoryIntersection::NoKnownBlocks(0) + }, + // else if first block is known + first_block_state @ _ => match self.block_state(&inventory[inventory_len - 1]) { + // if last block is known to be in db => all inventory blocks are also in db + BlockState::Stored => { + InventoryIntersection::DbAllBlocksKnown + }, + // if first block is known && last block is unknown => intersection with queue or with db + BlockState::Unknown => { + // find last known block + let mut previous_state = first_block_state; + for index in 1..inventory_len { + let state = self.block_state(&inventory[index]); + if state == BlockState::Unknown { + // previous block is stored => fork from stored block + if previous_state == BlockState::Stored { + return InventoryIntersection::DbForkNewBlocks(index); + } + // previous block is best block => no fork + else if &self.best_block().hash == &inventory[index - 1] { + return InventoryIntersection::InMemoryMainNewBlocks(index); + } + // previous block is not a best block => fork + else { + return InventoryIntersection::InMemoryForkNewBlocks(index); + } + } + previous_state = state; + } + + // unreachable because last block is unknown && in above loop we search for unknown blocks + unreachable!(); + }, + // if first block is known && last block is also known && is in queue => queue intersection with no new block + _ => { + InventoryIntersection::InMemoryNoNewBlocks + } + } + } + } + /// Calculate block locator hashes for hash queue fn block_locator_hashes_for_queue(&self, hashes: &mut Vec) -> (u32, u32) { let queue_len = self.hash_chain.len(); @@ -359,7 +350,7 @@ impl fmt::Debug for Chain { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { try!(writeln!(f, "chain: [")); { - let mut num = self.storage.best_block().expect("Storage with genesis block is required").number; + let mut num = self.best_storage_block.number; try!(writeln!(f, "\tworse(stored): {} {:?}", 0, self.storage.block_hash(0))); try!(writeln!(f, "\tbest(stored): {} {:?}", num, self.storage.block_hash(num))); @@ -389,8 +380,9 @@ mod tests { use std::sync::Arc; use chain::RepresentH256; use hash_queue::HashPosition; - use super::{Chain, BlockState}; + use super::{Chain, BlockState, InventoryIntersection}; use db::{self, Store, BestBlock}; + use primitives::hash::H256; use test_data; #[test] @@ -406,19 +398,9 @@ mod tests { assert_eq!(chain.length_of_state(BlockState::Requested), 0); assert_eq!(chain.length_of_state(BlockState::Verifying), 0); assert_eq!(chain.length_of_state(BlockState::Stored), 1); - assert_eq!(chain.has_blocks_of_state(BlockState::Scheduled), false); - assert_eq!(chain.has_blocks_of_state(BlockState::Requested), false); - assert_eq!(chain.has_blocks_of_state(BlockState::Verifying), false); - assert_eq!(chain.has_blocks_of_state(BlockState::Stored), true); assert_eq!(&chain.best_block(), &db_best_block); - assert_eq!(chain.best_block_of_state(BlockState::Scheduled), None); - assert_eq!(chain.best_block_of_state(BlockState::Requested), None); - assert_eq!(chain.best_block_of_state(BlockState::Verifying), None); - assert_eq!(chain.best_block_of_state(BlockState::Stored), Some(db_best_block.clone())); - assert_eq!(chain.block_has_state(&db_best_block.hash, BlockState::Requested), false); - assert_eq!(chain.block_has_state(&db_best_block.hash, BlockState::Stored), true); assert_eq!(chain.block_state(&db_best_block.hash), BlockState::Stored); - assert_eq!(chain.block_state(&"0000000000000000000000000000000000000000000000000000000000000000".into()), BlockState::Unknown); + assert_eq!(chain.block_state(&H256::from(0)), BlockState::Unknown); } #[test] @@ -428,12 +410,12 @@ mod tests { // add 6 blocks to scheduled queue chain.schedule_blocks_hashes(vec![ - "0000000000000000000000000000000000000000000000000000000000000000".into(), - "0000000000000000000000000000000000000000000000000000000000000001".into(), - "0000000000000000000000000000000000000000000000000000000000000002".into(), - "0000000000000000000000000000000000000000000000000000000000000003".into(), - "0000000000000000000000000000000000000000000000000000000000000004".into(), - "0000000000000000000000000000000000000000000000000000000000000005".into(), + H256::from(0), + H256::from(1), + H256::from(2), + H256::from(3), + H256::from(4), + H256::from(5), ]); assert!(chain.information().scheduled == 6 && chain.information().requested == 0 && chain.information().verifying == 0 && chain.information().stored == 1); @@ -452,22 +434,22 @@ mod tests { && chain.information().verifying == 0 && chain.information().stored == 1); // try to remove block 0 from scheduled queue => missing - assert_eq!(chain.remove_block_with_state(&"0000000000000000000000000000000000000000000000000000000000000000".into(), BlockState::Scheduled), HashPosition::Missing); + assert_eq!(chain.remove_block_with_state(&H256::from(0), BlockState::Scheduled), HashPosition::Missing); assert!(chain.information().scheduled == 3 && chain.information().requested == 3 && chain.information().verifying == 0 && chain.information().stored == 1); // remove blocks 0 & 1 from requested queue - assert_eq!(chain.remove_block_with_state(&"0000000000000000000000000000000000000000000000000000000000000001".into(), BlockState::Requested), HashPosition::Inside); - assert_eq!(chain.remove_block_with_state(&"0000000000000000000000000000000000000000000000000000000000000000".into(), BlockState::Requested), HashPosition::Front); + assert_eq!(chain.remove_block_with_state(&H256::from(1), BlockState::Requested), HashPosition::Inside); + assert_eq!(chain.remove_block_with_state(&H256::from(0), BlockState::Requested), HashPosition::Front); assert!(chain.information().scheduled == 3 && chain.information().requested == 1 && chain.information().verifying == 0 && chain.information().stored == 1); // mark 0 & 1 as verifying - chain.verify_block_hash("0000000000000000000000000000000000000000000000000000000000000001".into()); - chain.verify_block_hash("0000000000000000000000000000000000000000000000000000000000000002".into()); + chain.verify_block_hash(H256::from(1)); + chain.verify_block_hash(H256::from(2)); assert!(chain.information().scheduled == 3 && chain.information().requested == 1 && chain.information().verifying == 2 && chain.information().stored == 1); // mark block 0 as verified - assert_eq!(chain.remove_block_with_state(&"0000000000000000000000000000000000000000000000000000000000000001".into(), BlockState::Verifying), HashPosition::Front); + assert_eq!(chain.remove_block_with_state(&H256::from(1), BlockState::Verifying), HashPosition::Front); assert!(chain.information().scheduled == 3 && chain.information().requested == 1 && chain.information().verifying == 1 && chain.information().stored == 1); // insert new best block to the chain @@ -496,85 +478,149 @@ mod tests { assert_eq!(chain.block_locator_hashes(), vec![block2_hash.clone(), block1_hash.clone(), genesis_hash.clone()]); chain.schedule_blocks_hashes(vec![ - "0000000000000000000000000000000000000000000000000000000000000000".into(), - "0000000000000000000000000000000000000000000000000000000000000001".into(), - "0000000000000000000000000000000000000000000000000000000000000002".into(), - "0000000000000000000000000000000000000000000000000000000000000003".into(), - "0000000000000000000000000000000000000000000000000000000000000004".into(), - "0000000000000000000000000000000000000000000000000000000000000005".into(), - "0000000000000000000000000000000000000000000000000000000000000006".into(), - "0000000000000000000000000000000000000000000000000000000000000007".into(), - "0000000000000000000000000000000000000000000000000000000000000008".into(), - "0000000000000000000000000000000000000000000000000000000000000009".into(), - "0000000000000000000000000000000000000000000000000000000000000010".into(), + H256::from(0), + H256::from(1), + H256::from(2), + H256::from(3), + H256::from(4), + H256::from(5), + H256::from(6), + H256::from(7), + H256::from(8), + H256::from(9), + H256::from(10), ]); chain.request_blocks_hashes(10); chain.verify_blocks_hashes(10); - assert_eq!(chain.best_block_locator_hashes()[0], "0000000000000000000000000000000000000000000000000000000000000010".into()); assert_eq!(chain.block_locator_hashes(), vec![ - "0000000000000000000000000000000000000000000000000000000000000010".into(), - "0000000000000000000000000000000000000000000000000000000000000009".into(), - "0000000000000000000000000000000000000000000000000000000000000008".into(), - "0000000000000000000000000000000000000000000000000000000000000007".into(), - "0000000000000000000000000000000000000000000000000000000000000006".into(), - "0000000000000000000000000000000000000000000000000000000000000005".into(), - "0000000000000000000000000000000000000000000000000000000000000004".into(), - "0000000000000000000000000000000000000000000000000000000000000003".into(), - "0000000000000000000000000000000000000000000000000000000000000002".into(), - "0000000000000000000000000000000000000000000000000000000000000001".into(), + H256::from(10), + H256::from(9), + H256::from(8), + H256::from(7), + H256::from(6), + H256::from(5), + H256::from(4), + H256::from(3), + H256::from(2), + H256::from(1), block2_hash.clone(), genesis_hash.clone(), ]); chain.schedule_blocks_hashes(vec![ - "0000000000000000000000000000000000000000000000000000000000000011".into(), - "0000000000000000000000000000000000000000000000000000000000000012".into(), - "0000000000000000000000000000000000000000000000000000000000000013".into(), - "0000000000000000000000000000000000000000000000000000000000000014".into(), - "0000000000000000000000000000000000000000000000000000000000000015".into(), - "0000000000000000000000000000000000000000000000000000000000000016".into(), + H256::from(11), + H256::from(12), + H256::from(13), + H256::from(14), + H256::from(15), + H256::from(16), ]); chain.request_blocks_hashes(10); - assert_eq!(chain.best_block_locator_hashes()[0], "0000000000000000000000000000000000000000000000000000000000000014".into()); assert_eq!(chain.block_locator_hashes(), vec![ - "0000000000000000000000000000000000000000000000000000000000000016".into(), - "0000000000000000000000000000000000000000000000000000000000000015".into(), - "0000000000000000000000000000000000000000000000000000000000000014".into(), - "0000000000000000000000000000000000000000000000000000000000000013".into(), - "0000000000000000000000000000000000000000000000000000000000000012".into(), - "0000000000000000000000000000000000000000000000000000000000000011".into(), - "0000000000000000000000000000000000000000000000000000000000000010".into(), - "0000000000000000000000000000000000000000000000000000000000000009".into(), - "0000000000000000000000000000000000000000000000000000000000000008".into(), - "0000000000000000000000000000000000000000000000000000000000000007".into(), - "0000000000000000000000000000000000000000000000000000000000000005".into(), - "0000000000000000000000000000000000000000000000000000000000000001".into(), + H256::from(16), + H256::from(15), + H256::from(14), + H256::from(13), + H256::from(12), + H256::from(11), + H256::from(10), + H256::from(9), + H256::from(8), + H256::from(7), + H256::from(5), + H256::from(1), genesis_hash.clone(), ]); chain.schedule_blocks_hashes(vec![ - "0000000000000000000000000000000000000000000000000000000000000020".into(), - "0000000000000000000000000000000000000000000000000000000000000021".into(), - "0000000000000000000000000000000000000000000000000000000000000022".into(), + H256::from(20), + H256::from(21), + H256::from(22), ]); - assert_eq!(chain.best_block_locator_hashes()[0], "0000000000000000000000000000000000000000000000000000000000000020".into()); assert_eq!(chain.block_locator_hashes(), vec![ - "0000000000000000000000000000000000000000000000000000000000000022".into(), - "0000000000000000000000000000000000000000000000000000000000000021".into(), - "0000000000000000000000000000000000000000000000000000000000000020".into(), - "0000000000000000000000000000000000000000000000000000000000000016".into(), - "0000000000000000000000000000000000000000000000000000000000000015".into(), - "0000000000000000000000000000000000000000000000000000000000000014".into(), - "0000000000000000000000000000000000000000000000000000000000000013".into(), - "0000000000000000000000000000000000000000000000000000000000000012".into(), - "0000000000000000000000000000000000000000000000000000000000000011".into(), - "0000000000000000000000000000000000000000000000000000000000000010".into(), - "0000000000000000000000000000000000000000000000000000000000000008".into(), - "0000000000000000000000000000000000000000000000000000000000000004".into(), + H256::from(22), + H256::from(21), + H256::from(20), + H256::from(16), + H256::from(15), + H256::from(14), + H256::from(13), + H256::from(12), + H256::from(11), + H256::from(10), + H256::from(8), + H256::from(4), genesis_hash.clone(), ]); } + + #[test] + fn chain_intersect_with_inventory() { + let mut chain = Chain::new(Arc::new(db::TestStorage::with_genesis_block())); + // append 2 db blocks + chain.insert_best_block(test_data::block_h1()).expect("Error inserting new block"); + chain.insert_best_block(test_data::block_h2()).expect("Error inserting new block"); + // append 3 verifying blocks + chain.schedule_blocks_hashes(vec![ + H256::from(0), + H256::from(1), + H256::from(2), + ]); + chain.request_blocks_hashes(3); + chain.verify_blocks_hashes(3); + // append 3 requested blocks + chain.schedule_blocks_hashes(vec![ + H256::from(10), + H256::from(11), + H256::from(12), + ]); + chain.request_blocks_hashes(10); + // append 3 scheduled blocks + chain.schedule_blocks_hashes(vec![ + H256::from(20), + H256::from(21), + H256::from(22), + ]); + + assert_eq!(chain.intersect_with_inventory(&vec![ + H256::from(30), + H256::from(31), + ]), InventoryIntersection::NoKnownBlocks(0)); + + assert_eq!(chain.intersect_with_inventory(&vec![ + H256::from(2), + H256::from(10), + H256::from(11), + H256::from(12), + H256::from(20), + ]), InventoryIntersection::InMemoryNoNewBlocks); + + assert_eq!(chain.intersect_with_inventory(&vec![ + H256::from(21), + H256::from(22), + H256::from(30), + H256::from(31), + ]), InventoryIntersection::InMemoryMainNewBlocks(2)); + + assert_eq!(chain.intersect_with_inventory(&vec![ + H256::from(20), + H256::from(21), + H256::from(30), + H256::from(31), + ]), InventoryIntersection::InMemoryForkNewBlocks(2)); + + assert_eq!(chain.intersect_with_inventory(&vec![ + test_data::block_h1().hash(), + test_data::block_h2().hash(), + ]), InventoryIntersection::DbAllBlocksKnown); + + assert_eq!(chain.intersect_with_inventory(&vec![ + test_data::block_h2().hash(), + H256::from(30), + H256::from(31), + ]), InventoryIntersection::DbForkNewBlocks(1)); + } } diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index a5ab582d..4e613fad 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -1,7 +1,7 @@ use std::thread; use std::sync::Arc; use std::cmp::{min, max}; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::collections::hash_map::Entry; use std::sync::mpsc::{channel, Sender, Receiver}; use parking_lot::Mutex; @@ -12,10 +12,9 @@ use futures_cpupool::CpuPool; use db; use chain::{Block, RepresentH256}; use primitives::hash::H256; -use hash_queue::HashPosition; use synchronization_peers::Peers; #[cfg(test)] use synchronization_peers::{Information as PeersInformation}; -use synchronization_chain::{ChainRef, BlockState}; +use synchronization_chain::{ChainRef, BlockState, InventoryIntersection}; #[cfg(test)] use synchronization_chain::{Information as ChainInformation}; use verification::{ChainVerifier, Error as VerificationError, Verify}; @@ -39,20 +38,21 @@ use std::time::Duration; ///! 2.4) if !inventory_rest.is_empty(): ===> has new unknown blocks in inventory ///! 2.4.1) queue_rest = queue after intersection ///! 2.4.2) if queue_rest.is_empty(): ===> has new unknown blocks in inventory, no fork +///! 2.4.2.1) scheduled_blocks.append(inventory_rest) +///! 2.4.2.2) stop (2.4.2) +///! 2.4.3) if !queue_rest.is_empty(): ===> has new unknown blocks in inventory, fork ///! 2.4.3.1) scheduled_blocks.append(inventory_rest) ///! 2.4.3.2) stop (2.4.3) -///! 2.4.4) if !queue_rest.is_empty(): ===> has new unknown blocks in inventory, fork -///! 2.4.4.1) scheduled_blocks.append(inventory_rest) -///! 2.4.4.2) stop (2.4.4) -///! 2.4.5) stop (2.4) +///! 2.4.3) stop (2.4) ///! 2.5) stop (2) ///! 3) if queue_intersection.is_empty(): ===> responded with out-of-sync-window blocks ///! 3.1) last_known_block = inventory.last(b => b.is_known()) ///! 3.2) if last_known_block == None: ===> we know nothing about these blocks & we haven't asked for these -///! 3.2.1) peer will be excluded later by management thread +///! 3.2.1) if !synchronizing => remember peer as useful + ask for blocks +///! 3.2.1) if synchronizing => peer will be excluded later by management thread ///! 3.2.2) stop (3.2) ///! 3.3) if last_known_block == last(inventory): ===> responded with all-known-blocks -///! 3.3.1) remember peer as useful (possibly had failures before && have been excluded from sync) +///! 3.3.1) if syncing, remember peer as useful (possibly had failures before && have been excluded from sync) ///! 3.3.2) stop (3.3) ///! 3.4) if last_known_block in the middle of inventory: ===> responded with forked blocks ///! 3.4.1) remember peer as useful @@ -62,24 +62,40 @@ use std::time::Duration; ///! 3.5) stop (3) ///! ///! on_peer_block: After receiving `block` message: -///! 1) if block_state(block) in (Scheduled, Verifying, Stored): ===> late delivery +///! 1) if block_state(block) in (Verifying, Stored): ===> late delivery ///! 1.1) remember peer as useful ///! 1.2) stop (1) -///! 2) if block_state(block) == Requested: ===> on-time delivery +///! 2) if block_state(block) in (Scheduled, Requested): ===> future/on-time delivery ///! 2.1) remember peer as useful -///! 2.2) move block from requested to verifying queue -///! 2.2) queue verification().and_then(insert).or_else(reset_sync) -///! 2.3) stop (2) +///! 2.2) if block_state(block.parent) in (Verifying, Stored): ===> we can proceed with verification +///! 2.2.1) remove block from current queue (Verifying || Stored) +///! 2.2.2) append block to the verification queue +///! 2.2.3) queue verification().and_then(on_block_verification_success).or_else(on_block_verification_error) +///! 2.2.4) try to verify orphan blocks +///! 2.2.5) stop (2.2) +///! 2.3) if block_state(block.parent) in (Requested, Scheduled): ===> we have found an orphan block +///! 2.3.1) remove block from current queue (Verifying || Stored) +///! 2.3.2) append block to the orphans +///! 2.3.3) stop (2.3) +///! 2.4) if block_state(block.parent) == Unknown: ===> bad block found +///! 2.4.1) remove block from current queue (Verifying || Stored) +///! 2.4.2) stop (2.4) +///! 2.5) stop (2) ///! 3) if block_state(block) == Unknown: ===> maybe we are on-top of chain && new block is announced? ///! 3.1) if block_state(block.parent_hash) == Unknown: ===> we do not know parent ///! 3.1.1) ignore this block ///! 3.1.2) stop (3.1) -///! 3.2) if block_state(block.parent_hash) != Unknown: ===> fork found +///! 3.2) if block_state(block.parent_hash) in (Verifying, Stored): ===> fork found, can verify ///! 3.2.1) ask peer for best inventory (after this block) ///! 3.2.2) append block to verifying queue -///! 3.2.3) queue verification().and_then(insert).or_else(reset_sync) +///! 3.2.3) queue verification().and_then(on_block_verification_success).or_else(on_block_verification_error) ///! 3.2.4) stop (3.2) -///! 2.3) stop (2) +///! 3.3) if block_state(block.parent_hash) in (Requested, Scheduled): ===> fork found, add as orphan +///! 3.3.1) ask peer for best inventory (after this block) +///! 3.3.2) append block to orphan +///! 3.3.3) stop (3.3) +///! 3.4) stop (2) +///! + if no blocks left in scheduled + requested queue => we are saturated => ask all peers for inventory & forget ///! ///! execute_synchronization_tasks: After receiving `inventory` message OR receiving `block` message OR when management thread schedules tasks: ///! 1) if there are blocks in `scheduled` queue AND we can fit more blocks into memory: ===> ask for blocks @@ -113,7 +129,8 @@ use std::time::Duration; ///! ///! on_block_verification_error: When verification completes with an error: ///! 1) remove block from verification queue -///! 2) remove all known children from all queues [so that new `block` messages will be ignored in on_peer_block.3.1.1] +///! 2) remove all known children from all queues [so that new `block` messages will be ignored in on_peer_block.3.1.1] (TODO: not implemented currently!!!) +///! ///! /// Approximate maximal number of blocks hashes in scheduled queue. @@ -162,7 +179,6 @@ pub trait Client : Send + 'static { fn on_new_blocks_inventory(&mut self, peer_index: usize, peer_hashes: Vec); fn on_peer_block(&mut self, peer_index: usize, block: Block); fn on_peer_disconnected(&mut self, peer_index: usize); - fn reset(&mut self, is_hard: bool); fn on_block_verification_success(&mut self, block: Block); fn on_block_verification_error(&mut self, err: &VerificationError, hash: &H256); } @@ -190,7 +206,7 @@ pub struct SynchronizationClient { /// Chain reference. chain: ChainRef, /// Blocks from requested_hashes, but received out-of-order. - orphaned_blocks: HashMap, + orphaned_blocks: HashMap>, /// Verification work transmission channel. verification_work_sender: Option>, /// Verification thread. @@ -218,10 +234,11 @@ impl State { impl Drop for SynchronizationClient where T: TaskExecutor { fn drop(&mut self) { if let Some(join_handle) = self.verification_worker_thread.take() { - self.verification_work_sender + // ignore send error here <= destructing anyway + let _ = self.verification_work_sender .take() .expect("Some(join_handle) => Some(verification_work_sender)") - .send(VerificationTask::Stop).expect("TODO"); + .send(VerificationTask::Stop); join_handle.join().expect("Clean shutdown."); } } @@ -246,45 +263,25 @@ impl Client for SynchronizationClient where T: TaskExecutor { // update peers to select next tasks self.peers.on_block_received(peer_index, &block_hash); - self.process_peer_block(block_hash, block); + self.process_peer_block(peer_index, block_hash, block); self.execute_synchronization_tasks(); } /// Peer disconnected. fn on_peer_disconnected(&mut self, peer_index: usize) { - self.peers.on_peer_disconnected(peer_index); - // when last peer is disconnected, reset, but let verifying blocks be verified - self.reset(false); - } - - /// Reset synchronization process - fn reset(&mut self, is_hard: bool) { - self.peers.reset(); - self.orphaned_blocks.clear(); - // TODO: reset verification queue - - let mut chain = self.chain.write(); - chain.remove_blocks_with_state(BlockState::Requested); - chain.remove_blocks_with_state(BlockState::Scheduled); - if is_hard { - self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number); - chain.remove_blocks_with_state(BlockState::Verifying); - warn!(target: "sync", "Synchronization process restarting from block {:?}", chain.best_block()); - } - else { - self.state = State::Saturated; + if self.peers.on_peer_disconnected(peer_index) { + self.switch_to_saturated_state(false); } } /// Process successful block verification fn on_block_verification_success(&mut self, block: Block) { { - let hash = block.hash(); let mut chain = self.chain.write(); - // remove from verifying queue - assert_eq!(chain.remove_block_with_state(&hash, BlockState::Verifying), HashPosition::Front); + // remove block from verification queue + chain.remove_block_with_state(&block.hash(), BlockState::Verifying); // insert to storage chain.insert_best_block(block) @@ -299,8 +296,12 @@ impl Client for SynchronizationClient where T: TaskExecutor { fn on_block_verification_error(&mut self, err: &VerificationError, hash: &H256) { warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash, err); - // reset synchronization process - self.reset(true); + { + let mut chain = self.chain.write(); + + // remove block from verification queue + chain.remove_block_with_state(&hash, BlockState::Verifying); + } // start new tasks self.execute_synchronization_tasks(); @@ -351,8 +352,10 @@ impl SynchronizationClient where T: TaskExecutor { None => return Ok(()), }; let mut client = client.lock(); - manage_synchronization_peers(&mut client.peers); - client.execute_synchronization_tasks(); + if client.state.is_synchronizing() { + manage_synchronization_peers(&mut client.peers); + client.execute_synchronization_tasks(); + } Ok(()) }) .for_each(|_| Ok(())) @@ -376,144 +379,101 @@ impl SynchronizationClient where T: TaskExecutor { } /// Process new blocks inventory - fn process_new_blocks_inventory(&mut self, peer_index: usize, mut peer_hashes: Vec) { - // | requested | QUEUED | - // --- [1] - // --- [2] + - // --- [3] + - // --- [4] - // -+- [5] + - // -+- [6] + - // -+- [7] + - // ---+---------+--- [8] + - // ---+--------+--- [9] + - // ---+---------+--------+--- [10] - + fn process_new_blocks_inventory(&mut self, peer_index: usize, mut inventory: Vec) { let mut chain = self.chain.write(); - - 'outer: loop { - // when synchronization is idling - // => request full inventory - if !chain.has_blocks_of_state(BlockState::Scheduled) - && !chain.has_blocks_of_state(BlockState::Requested) { - let unknown_blocks: Vec<_> = peer_hashes.into_iter() - .filter(|hash| chain.block_has_state(&hash, BlockState::Unknown)) - .collect(); - - // no new blocks => no need to switch to the synchronizing state - if unknown_blocks.is_empty() { - return; - } - - chain.schedule_blocks_hashes(unknown_blocks); - self.peers.insert(peer_index); - break; - } - - // cases: [2], [5], [6], [8] - // if last block from peer_hashes is in window { requested_hashes + queued_hashes } - // => no new blocks for synchronization, but we will use this peer in synchronization - let peer_hashes_len = peer_hashes.len(); - if chain.block_has_state(&peer_hashes[peer_hashes_len - 1], BlockState::Scheduled) - || chain.block_has_state(&peer_hashes[peer_hashes_len - 1], BlockState::Requested) { - self.peers.insert(peer_index); - return; - } - - // cases: [1], [3], [4], [7], [9], [10] - // try to find new blocks for synchronization from inventory - let mut last_known_peer_hash_index = peer_hashes_len - 1; - loop { - if chain.block_state(&peer_hashes[last_known_peer_hash_index]) != BlockState::Unknown { - // we have found first block which is known to us - // => blocks in range [(last_known_peer_hash_index + 1)..peer_hashes_len] are unknown - // && must be scheduled for request - let unknown_peer_hashes = peer_hashes.split_off(last_known_peer_hash_index + 1); - - chain.schedule_blocks_hashes(unknown_peer_hashes); + match chain.intersect_with_inventory(&inventory) { + InventoryIntersection::NoKnownBlocks(_) if self.state.is_synchronizing() => (), + InventoryIntersection::DbAllBlocksKnown => { + if self.state.is_synchronizing() { + // remember peer as useful self.peers.insert(peer_index); - break 'outer; } - - if last_known_peer_hash_index == 0 { - // either these are blocks from the future or blocks from the past - // => TODO: ignore this peer during synchronization - return; + }, + InventoryIntersection::InMemoryNoNewBlocks => { + // remember peer as useful + self.peers.insert(peer_index); + }, + InventoryIntersection::InMemoryMainNewBlocks(new_block_index) + | InventoryIntersection::InMemoryForkNewBlocks(new_block_index) + | InventoryIntersection::DbForkNewBlocks(new_block_index) + | InventoryIntersection::NoKnownBlocks(new_block_index) => { + // schedule new blocks + let new_blocks_hashes = inventory.split_off(new_block_index); + chain.schedule_blocks_hashes(new_blocks_hashes); + // remember peer as useful + self.peers.insert(peer_index); + // switch to synchronization state + if !self.state.is_synchronizing() { + self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number); } - last_known_peer_hash_index -= 1; } } - - // move to synchronizing state - if !self.state.is_synchronizing() { - self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number); - } } /// Process new peer block - fn process_peer_block(&mut self, block_hash: H256, block: Block) { - // this block is not requested for synchronization - let mut chain = self.chain.write(); - let block_position = chain.remove_block_with_state(&block_hash, BlockState::Requested); - if block_position == HashPosition::Missing { - return; - } + fn process_peer_block(&mut self, peer_index: usize, block_hash: H256, block: Block) { + let switch_to_saturated = { + let mut chain = self.chain.write(); + match chain.block_state(&block_hash) { + BlockState::Verifying | BlockState::Stored => { + // remember peer as useful + self.peers.insert(peer_index); + }, + BlockState::Unknown | BlockState::Scheduled | BlockState::Requested => { + // remove block from current queue + chain.remove_block(&block_hash); + // check parent block state + match chain.block_state(&block.block_header.previous_header_hash) { + BlockState::Unknown => (), + BlockState::Verifying | BlockState::Stored => { + // remember peer as useful + self.peers.insert(peer_index); + // schedule verification + let mut blocks: VecDeque<(H256, Block)> = VecDeque::new(); + blocks.push_back((block_hash, block)); + while let Some((block_hash, block)) = blocks.pop_front() { + // queue block for verification + chain.remove_block(&block_hash); - // requested block is received => move to saturated state if there are no more blocks - if !chain.has_blocks_of_state(BlockState::Scheduled) - && !chain.has_blocks_of_state(BlockState::Requested) - && !chain.has_blocks_of_state(BlockState::Verifying) { - self.state = State::Saturated; - } + match self.verification_work_sender { + Some(ref verification_work_sender) => { + chain.verify_block_hash(block_hash.clone()); + verification_work_sender + .send(VerificationTask::VerifyBlock(block)) + .expect("Verification thread have the same lifetime as `Synchronization`") + }, + None => chain.insert_best_block(block) + .expect("Error inserting to db."), + } - // check if this block is next block in the blockchain - if block_position == HashPosition::Front { - // check if this parent of this block is current best_block - let expecting_previous_header_hash = chain.best_block_of_state(BlockState::Verifying) - .unwrap_or_else(|| { - chain.best_block_of_state(BlockState::Stored) - .expect("storage with genesis block is required") - }).hash; - if block.block_header.previous_header_hash != expecting_previous_header_hash { - // TODO: penalize peer - warn!(target: "sync", "Out-of-order block {:?} was dropped. Expecting block with parent hash {:?}", block_hash, expecting_previous_header_hash); - return; - } - - // this is next block in the blockchain => queue for verification - // also unwrap all dependent orphan blocks - let mut current_block = block; - let mut current_block_hash = block_hash; - loop { - match self.verification_work_sender { - Some(ref verification_work_sender) => { - chain.verify_block_hash(current_block_hash.clone()); - verification_work_sender - .send(VerificationTask::VerifyBlock(current_block)) - .expect("Verification thread have the same lifetime as `Synchronization`"); - }, - None => { - chain.insert_best_block(current_block) - .expect("Error inserting to db."); + // process orphan blocks + if let Entry::Occupied(entry) = self.orphaned_blocks.entry(block_hash) { + let (_, orphaned) = entry.remove_entry(); + blocks.extend(orphaned); + } + } + }, + BlockState::Requested | BlockState::Scheduled => { + // remember peer as useful + self.peers.insert(peer_index); + // remember as orphan block + self.orphaned_blocks + .entry(block.block_header.previous_header_hash.clone()) + .or_insert(Vec::new()) + .push((block_hash, block)) + } } - }; - - // proceed to the next orphaned block - if let Entry::Occupied(orphaned_block_entry) = self.orphaned_blocks.entry(current_block_hash) { - let (_, orphaned_block) = orphaned_block_entry.remove_entry(); - current_block = orphaned_block; - current_block_hash = current_block.hash(); - } - else { - break; - } + }, } - return; - } + // requested block is received => move to saturated state if there are no more blocks + chain.length_of_state(BlockState::Scheduled) == 0 + && chain.length_of_state(BlockState::Requested) == 0 + }; - // this block is not the next one => mark it as orphaned - self.orphaned_blocks.insert(block.block_header.previous_header_hash.clone(), block); + if switch_to_saturated { + self.switch_to_saturated_state(true); + } } /// Schedule new synchronization tasks, if any. @@ -540,18 +500,11 @@ impl SynchronizationClient where T: TaskExecutor { } } - // TODO: instead of issuing duplicated inventory requests, wait until enough new blocks are verified, then issue // check if we can query some blocks hashes let scheduled_hashes_len = chain.length_of_state(BlockState::Scheduled); if scheduled_hashes_len < MAX_SCHEDULED_HASHES { - if self.state.is_synchronizing() { - tasks.push(Task::RequestBestInventory(idle_peers[0])); - self.peers.on_inventory_requested(idle_peers[0]); - } - else { - tasks.push(Task::RequestInventory(idle_peers[0])); - self.peers.on_inventory_requested(idle_peers[0]); - } + tasks.push(Task::RequestInventory(idle_peers[0])); + self.peers.on_inventory_requested(idle_peers[0]); } // check if we can move some blocks from scheduled to requested queue @@ -578,6 +531,27 @@ impl SynchronizationClient where T: TaskExecutor { } } + /// Switch to saturated state + fn switch_to_saturated_state(&mut self, ask_for_inventory: bool) { + self.state = State::Saturated; + self.orphaned_blocks.clear(); + self.peers.reset(); + + { + let mut chain = self.chain.write(); + chain.remove_blocks_with_state(BlockState::Requested); + chain.remove_blocks_with_state(BlockState::Scheduled); + } + + if ask_for_inventory { + let mut executor = self.executor.lock(); + for idle_peer in self.peers.idle_peers() { + self.peers.on_inventory_requested(idle_peer); + executor.execute(Task::RequestInventory(idle_peer)); + } + } + } + /// Thread procedure for handling verification tasks fn verification_worker_proc(sync: Arc>, storage: Arc, work_receiver: Receiver) { let verifier = ChainVerifier::new(storage); @@ -612,6 +586,7 @@ pub mod tests { use p2p::event_loop; use test_data; use db; + use primitives::hash::H256; fn create_sync() -> (Core, Handle, Arc>, Arc>>) { let event_loop = event_loop(); @@ -638,13 +613,13 @@ pub mod tests { let (_, _, executor, sync) = create_sync(); let mut sync = sync.lock(); - let block1: Block = "010000006fe28c0ab6f1b372c1a6a246ae63f74f931e8365e15a089c68d6190000000000982051fd1e4ba744bbbe680e1fee14677ba1a3c3540bf7b1cdb606e857233e0e61bc6649ffff001d01e362990101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000".into(); - let block2: Block = "010000004860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000d5fdcc541e25de1c7a5addedf24858b8bb665c9f36ef744ee42c316022c90f9bb0bc6649ffff001d08d2bd610101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d010bffffffff0100f2052a010000004341047211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073dee6c89064984f03385237d92167c13e236446b417ab79a0fcae412ae3316b77ac00000000".into(); + let block1: Block = test_data::block_h1(); + let block2: Block = test_data::block_h2(); sync.on_new_blocks_inventory(5, vec![block1.hash()]); let tasks = executor.lock().take_tasks(); assert_eq!(tasks.len(), 2); - assert_eq!(tasks[0], Task::RequestBestInventory(5)); + assert_eq!(tasks[0], Task::RequestInventory(5)); assert_eq!(tasks[1], Task::RequestBlocks(5, vec![block1.hash()])); assert!(sync.information().state.is_synchronizing()); assert_eq!(sync.information().orphaned, 0); @@ -654,23 +629,23 @@ pub mod tests { assert_eq!(sync.information().peers.idle, 0); assert_eq!(sync.information().peers.active, 1); - // push unknown block => nothing should change + // push unknown block => will be queued as orphan sync.on_peer_block(5, block2); assert!(sync.information().state.is_synchronizing()); - assert_eq!(sync.information().orphaned, 0); + assert_eq!(sync.information().orphaned, 1); assert_eq!(sync.information().chain.scheduled, 0); assert_eq!(sync.information().chain.requested, 1); assert_eq!(sync.information().chain.stored, 1); assert_eq!(sync.information().peers.idle, 0); assert_eq!(sync.information().peers.active, 1); - // push requested block => should be moved to the test storage + // push requested block => should be moved to the test storage && orphan should be moved sync.on_peer_block(5, block1); assert!(!sync.information().state.is_synchronizing()); assert_eq!(sync.information().orphaned, 0); assert_eq!(sync.information().chain.scheduled, 0); assert_eq!(sync.information().chain.requested, 0); - assert_eq!(sync.information().chain.stored, 2); + assert_eq!(sync.information().chain.stored, 3); // we have just requested new `inventory` from the peer => peer is forgotten assert_eq!(sync.information().peers.idle, 0); assert_eq!(sync.information().peers.active, 0); @@ -681,7 +656,7 @@ pub mod tests { let (_, _, _, sync) = create_sync(); let mut sync = sync.lock(); - let block2: Block = "010000004860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000d5fdcc541e25de1c7a5addedf24858b8bb665c9f36ef744ee42c316022c90f9bb0bc6649ffff001d08d2bd610101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d010bffffffff0100f2052a010000004341047211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073dee6c89064984f03385237d92167c13e236446b417ab79a0fcae412ae3316b77ac00000000".into(); + let block2: Block = test_data::block_h169(); sync.on_new_blocks_inventory(5, vec![block2.hash()]); sync.on_peer_block(5, block2); @@ -702,8 +677,8 @@ pub mod tests { fn synchronization_parallel_peers() { let (_, _, executor, sync) = create_sync(); - let block1: Block = "010000006fe28c0ab6f1b372c1a6a246ae63f74f931e8365e15a089c68d6190000000000982051fd1e4ba744bbbe680e1fee14677ba1a3c3540bf7b1cdb606e857233e0e61bc6649ffff001d01e362990101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000".into(); - let block2: Block = "010000004860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000d5fdcc541e25de1c7a5addedf24858b8bb665c9f36ef744ee42c316022c90f9bb0bc6649ffff001d08d2bd610101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d010bffffffff0100f2052a010000004341047211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073dee6c89064984f03385237d92167c13e236446b417ab79a0fcae412ae3316b77ac00000000".into(); + let block1: Block = test_data::block_h1(); + let block2: Block = test_data::block_h2(); { let mut sync = sync.lock(); @@ -715,7 +690,7 @@ pub mod tests { // synchronization has started && new blocks have been requested let tasks = executor.lock().take_tasks(); assert!(sync.information().state.is_synchronizing()); - assert_eq!(tasks, vec![Task::RequestBestInventory(1), Task::RequestBlocks(1, vec![block1.hash()])]); + assert_eq!(tasks, vec![Task::RequestInventory(1), Task::RequestBlocks(1, vec![block1.hash()])]); } { @@ -726,7 +701,7 @@ pub mod tests { // synchronization has started && new blocks have been requested let tasks = executor.lock().take_tasks(); assert!(sync.information().state.is_synchronizing()); - assert_eq!(tasks, vec![Task::RequestBestInventory(2), Task::RequestBlocks(2, vec![block2.hash()])]); + assert_eq!(tasks, vec![Task::RequestInventory(2), Task::RequestBlocks(2, vec![block2.hash()])]); } { @@ -750,7 +725,7 @@ pub mod tests { // request new blocks { let mut sync = sync.lock(); - sync.on_new_blocks_inventory(1, vec!["0000000000000000000000000000000000000000000000000000000000000000".into()]); + sync.on_new_blocks_inventory(1, vec![H256::from(0)]); assert!(sync.information().state.is_synchronizing()); } @@ -774,4 +749,21 @@ pub mod tests { let tasks = executor.lock().take_tasks(); assert_eq!(tasks, vec![]); } + + #[test] + fn synchronization_asks_for_inventory_after_saturating() { + let (_, _, executor, sync) = create_sync(); + let mut sync = sync.lock(); + let block = test_data::block_h1(); + let block_hash = block.hash(); + sync.on_new_blocks_inventory(1, vec![block_hash.clone()]); + sync.on_new_blocks_inventory(2, vec![block_hash.clone()]); + executor.lock().take_tasks(); + sync.on_peer_block(2, block); + + let tasks = executor.lock().take_tasks(); + assert_eq!(tasks.len(), 2); + assert!(tasks.iter().any(|t| t == &Task::RequestInventory(1))); + assert!(tasks.iter().any(|t| t == &Task::RequestInventory(2))); + } } diff --git a/sync/src/synchronization_executor.rs b/sync/src/synchronization_executor.rs index 5b35cca6..e80d3054 100644 --- a/sync/src/synchronization_executor.rs +++ b/sync/src/synchronization_executor.rs @@ -23,8 +23,6 @@ pub enum Task { RequestBlocks(usize, Vec), /// Request full inventory using block_locator_hashes. RequestInventory(usize), - /// Request inventory using best block locator only. - RequestBestInventory(usize), /// Send block. SendBlock(usize, Block), /// Send notfound @@ -96,20 +94,6 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { connection.send_getblocks(&getblocks); } }, - Task::RequestBestInventory(peer_index) => { - let block_locator_hashes = self.chain.read().best_block_locator_hashes(); - let getblocks = types::GetBlocks { - version: 0, - block_locator_hashes: block_locator_hashes, - hash_stop: H256::default(), - }; - - if let Some(connection) = self.peers.get_mut(&peer_index) { - let connection = &mut *connection; - trace!(target: "sync", "Querying best inventory from peer#{}", peer_index); - connection.send_getblocks(&getblocks); - } - }, Task::SendBlock(peer_index, block) => { let block_message = types::Block { block: block, diff --git a/sync/src/synchronization_peers.rs b/sync/src/synchronization_peers.rs index cb296916..e5ea086f 100644 --- a/sync/src/synchronization_peers.rs +++ b/sync/src/synchronization_peers.rs @@ -73,11 +73,12 @@ impl Peers { } /// Peer has been disconnected - pub fn on_peer_disconnected(&mut self, peer_index: usize) { + pub fn on_peer_disconnected(&mut self, peer_index: usize) -> bool { self.idle.remove(&peer_index); self.requests.remove(&peer_index); self.failures.remove(&peer_index); self.times.remove(&peer_index); + (self.idle.len() + self.requests.len()) == 0 } /// Block is received from peer. diff --git a/sync/src/synchronization_server.rs b/sync/src/synchronization_server.rs index 5fccf341..e9b5fc0c 100644 --- a/sync/src/synchronization_server.rs +++ b/sync/src/synchronization_server.rs @@ -62,9 +62,9 @@ impl SynchronizationServer { fn locate_known_block(&self, block_locator_hashes: Vec) -> Option { let chain = self.chain.read(); + let storage = chain.storage(); block_locator_hashes.into_iter() - .filter_map(|hash| chain - .storage_block_number(&hash) + .filter_map(|hash| storage.block_number(&hash) .map(|number| db::BestBlock { number: number, hash: hash, @@ -93,15 +93,19 @@ impl SynchronizationServer { (peer_index, ServerTask::ServeGetData(inventory)) => { let mut unknown_items: Vec = Vec::new(); let mut new_tasks: Vec = Vec::new(); - for item in inventory { - match item.inv_type { - InventoryType::MessageBlock => { - match chain.read().storage_block_number(&item.hash) { - Some(_) => new_tasks.push(ServerTask::ReturnBlock(item.hash.clone())), - None => unknown_items.push(item), - } - }, - _ => (), // TODO: process other inventory types + { + let chain = chain.read(); + let storage = chain.storage(); + for item in inventory { + match item.inv_type { + InventoryType::MessageBlock => { + match storage.block_number(&item.hash) { + Some(_) => new_tasks.push(ServerTask::ReturnBlock(item.hash.clone())), + None => unknown_items.push(item), + } + }, + _ => (), // TODO: process other inventory types + } } } // respond with `notfound` message for unknown data @@ -132,7 +136,12 @@ impl SynchronizationServer { let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_block, &hash_stop, 2000); if !blocks_hashes.is_empty() { trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_hashes.len(), peer_index); - let blocks_headers = blocks_hashes.into_iter().filter_map(|hash| chain.read().storage_block(&hash).map(|block| block.block_header)).collect(); + let chain = chain.read(); + let storage = chain.storage(); + // TODO: read block_header only + let blocks_headers = blocks_hashes.into_iter() + .filter_map(|hash| storage.block(db::BlockRef::Hash(hash)).map(|block| block.block_header)) + .collect(); executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers)); } }, @@ -158,8 +167,7 @@ impl SynchronizationServer { }, // `block` (peer_index, ServerTask::ReturnBlock(block_hash)) => { - let storage_block = chain.read().storage_block(&block_hash); - if let Some(storage_block) = storage_block { + if let Some(storage_block) = chain.read().storage().block(db::BlockRef::Hash(block_hash)) { executor.lock().execute(Task::SendBlock(peer_index, storage_block)); } }, @@ -172,7 +180,9 @@ impl SynchronizationServer { fn blocks_hashes_after(chain: &ChainRef, best_block: &db::BestBlock, hash_stop: &H256, max_hashes: u32) -> Vec { let mut hashes: Vec = Vec::new(); - let storage_block_hash = chain.read().storage_block_hash(best_block.number); + let chain = chain.read(); + let storage = chain.storage(); + let storage_block_hash = storage.block_hash(best_block.number); if let Some(hash) = storage_block_hash { // check that chain has not reorganized since task was queued if hash == best_block.hash { @@ -180,7 +190,7 @@ impl SynchronizationServer { let last_block_number = first_block_number + max_hashes; // `max_hashes` hashes after best_block.number OR hash_stop OR blockchain end for block_number in first_block_number..last_block_number { - match chain.read().storage_block_hash(block_number) { + match storage.block_hash(block_number) { Some(ref block_hash) if block_hash == hash_stop => break, None => break, Some(block_hash) => { From d30520c98189e1840a42c11f8eb5315fb3567b93 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Mon, 7 Nov 2016 12:15:15 +0300 Subject: [PATCH 2/7] added support for regnet && testnet genesis blocks --- message/src/common/magic.rs | 9 +++++++++ pbtc/commands/import.rs | 4 ++-- pbtc/commands/start.rs | 6 +++--- pbtc/util.rs | 20 +++++++++++--------- 4 files changed, 25 insertions(+), 14 deletions(-) diff --git a/message/src/common/magic.rs b/message/src/common/magic.rs index 28d60563..fdf7228d 100644 --- a/message/src/common/magic.rs +++ b/message/src/common/magic.rs @@ -2,6 +2,7 @@ //! https://www.anintegratedworld.com/unravelling-the-mysterious-block-chain-magic-number/ use ser::{Stream, Serializable}; +use chain::Block; use Error; const MAGIC_MAINNET: u32 = 0xD9B4BEF9; @@ -53,6 +54,14 @@ impl Magic { Magic::Regtest => 18443, } } + + pub fn genesis_block(&self) -> Block { + match *self { + Magic::Mainnet => "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a29ab5f49ffff001d1dac2b7c0101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000".into(), + Magic::Testnet => "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4adae5494dffff001d1aa4ae180101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000".into(), + Magic::Regtest => "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4adae5494dffff7f20020000000101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000".into(), + } + } } impl Serializable for Magic { diff --git a/pbtc/commands/import.rs b/pbtc/commands/import.rs index 97344bbc..5653ef99 100644 --- a/pbtc/commands/import.rs +++ b/pbtc/commands/import.rs @@ -4,9 +4,9 @@ use config::Config; use util::{open_db, init_db}; pub fn import(cfg: Config, matches: &ArgMatches) -> Result<(), String> { - let db = open_db(cfg.use_disk_database); + let db = open_db(&cfg); // TODO: this might be unnecessary here! - init_db(&db); + init_db(&cfg, &db); let mut writer = create_sync_blocks_writer(db); diff --git a/pbtc/commands/start.rs b/pbtc/commands/start.rs index f7fb7f06..1bf3ef44 100644 --- a/pbtc/commands/start.rs +++ b/pbtc/commands/start.rs @@ -7,6 +7,9 @@ use {config, p2p}; pub fn start(cfg: config::Config) -> Result<(), String> { let mut el = p2p::event_loop(); + let db = open_db(&cfg); + init_db(&cfg, &db); + let p2p_cfg = p2p::Config { threads: 4, protocol_minimum: 70001, @@ -26,9 +29,6 @@ pub fn start(cfg: config::Config) -> Result<(), String> { node_table_path: node_table_path(), }; - let db = open_db(cfg.use_disk_database); - init_db(&db); - let sync_handle = el.handle(); let sync_connection_factory = create_sync_connection_factory(&sync_handle, db); diff --git a/pbtc/util.rs b/pbtc/util.rs index f61d788d..112d6723 100644 --- a/pbtc/util.rs +++ b/pbtc/util.rs @@ -1,11 +1,12 @@ use std::sync::Arc; use std::path::PathBuf; use app_dirs::{app_dir, AppDataType}; -use chain::Block; +use chain::RepresentH256; use {db, APP_INFO}; +use config::Config; -pub fn open_db(use_disk_database: bool) -> Arc { - match use_disk_database { +pub fn open_db(cfg: &Config) -> Arc { + match cfg.use_disk_database { true => { let db_path = app_dir(AppDataType::UserData, &APP_INFO, "db").expect("Failed to get app dir"); Arc::new(db::Storage::new(db_path).expect("Failed to open database")) @@ -22,12 +23,13 @@ pub fn node_table_path() -> PathBuf { node_table } -pub fn init_db(db: &Arc) { +pub fn init_db(cfg: &Config, db: &Arc) { // insert genesis block if db is empty - if db.best_block().is_none() { - // TODO: move to config - let genesis_block: Block = "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a29ab5f49ffff001d1dac2b7c0101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000".into(); - db.insert_block(&genesis_block) - .expect("Failed to insert genesis block to the database"); + let genesis_block = cfg.magic.genesis_block(); + match db.block_hash(0) { + Some(ref db_genesis_block_hash) if db_genesis_block_hash != &genesis_block.hash() => panic!("Trying to open database with incompatible genesis block"), + Some(_) => (), + None => db.insert_block(&genesis_block) + .expect("Failed to insert genesis block to the database"), } } From d9f032edce4d7a99f52605c5a714bd9501b1fc4c Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Mon, 7 Nov 2016 12:31:11 +0300 Subject: [PATCH 3/7] fixed couple of issues from #91 --- sync/src/synchronization_server.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sync/src/synchronization_server.rs b/sync/src/synchronization_server.rs index e9b5fc0c..10deee56 100644 --- a/sync/src/synchronization_server.rs +++ b/sync/src/synchronization_server.rs @@ -133,6 +133,8 @@ impl SynchronizationServer { }, // `getheaders` => `headers` (peer_index, ServerTask::ServeGetHeaders(best_block, hash_stop)) => { + // What if we have no common blocks with peer at all? Maybe drop connection or penalize peer? + // https://github.com/ethcore/parity-bitcoin/pull/91#discussion_r86734568 let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_block, &hash_stop, 2000); if !blocks_hashes.is_empty() { trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_hashes.len(), peer_index); @@ -167,9 +169,9 @@ impl SynchronizationServer { }, // `block` (peer_index, ServerTask::ReturnBlock(block_hash)) => { - if let Some(storage_block) = chain.read().storage().block(db::BlockRef::Hash(block_hash)) { - executor.lock().execute(Task::SendBlock(peer_index, storage_block)); - } + let block = chain.read().storage().block(db::BlockRef::Hash(block_hash)) + .expect("we have checked that block exists in ServeGetData; db is append-only; qed"); + executor.lock().execute(Task::SendBlock(peer_index, block)); }, }, // no tasks after wake-up => stopping From 22bd2427bfd2e0b4d81c57d1d150e65f1c528b8c Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Mon, 7 Nov 2016 13:20:01 +0300 Subject: [PATCH 4/7] adder txn_count (always zero) to headers message --- message/src/types/headers.rs | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/message/src/types/headers.rs b/message/src/types/headers.rs index 5427e111..531a9aca 100644 --- a/message/src/types/headers.rs +++ b/message/src/types/headers.rs @@ -1,6 +1,6 @@ use std::io; use chain::BlockHeader; -use ser::{Stream, Reader}; +use ser::{Stream, Reader, Serializable, Deserializable, CompactInteger, Error as ReaderError}; use {Payload, MessageResult}; #[derive(Debug, PartialEq)] @@ -9,6 +9,12 @@ pub struct Headers { pub headers: Vec, } +#[derive(Debug, PartialEq)] +struct HeaderWithTxnCount { + pub header: BlockHeader, + pub txn_count: u64, +} + impl Payload for Headers { fn version() -> u32 { 0 @@ -19,15 +25,36 @@ impl Payload for Headers { } fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult where T: io::Read { + let headers_with_txn_count: Vec = try!(reader.read_list()); let headers = Headers { - headers: try!(reader.read_list()), + headers: headers_with_txn_count.into_iter().map(|h| h.header).collect(), }; Ok(headers) } fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> { - stream.append_list(&self.headers); + let headers_with_txn_count: Vec<_> = self.headers.iter().map(|h| HeaderWithTxnCount { header: h.clone(), txn_count: 0 }).collect(); + stream.append_list(&headers_with_txn_count); Ok(()) } } + +impl Serializable for HeaderWithTxnCount { + fn serialize(&self, stream: &mut Stream) { + stream + .append(&self.header) + .append(&CompactInteger::from(0u32)); + } +} + +impl Deserializable for HeaderWithTxnCount { + fn deserialize(reader: &mut Reader) -> Result where T: io::Read { + let header = HeaderWithTxnCount { + header: try!(reader.read()), + txn_count: try!(reader.read::()).into(), + }; + + Ok(header) + } +} From 9e501fe5d0e030791628c07480f25291913b3f34 Mon Sep 17 00:00:00 2001 From: debris Date: Mon, 7 Nov 2016 11:20:27 +0100 Subject: [PATCH 5/7] shutdown gracefully on incompatible db open --- pbtc/commands/import.rs | 2 +- pbtc/commands/start.rs | 2 +- pbtc/util.rs | 12 +++++++----- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/pbtc/commands/import.rs b/pbtc/commands/import.rs index 5653ef99..39b045cf 100644 --- a/pbtc/commands/import.rs +++ b/pbtc/commands/import.rs @@ -6,7 +6,7 @@ use util::{open_db, init_db}; pub fn import(cfg: Config, matches: &ArgMatches) -> Result<(), String> { let db = open_db(&cfg); // TODO: this might be unnecessary here! - init_db(&cfg, &db); + try!(init_db(&cfg, &db)); let mut writer = create_sync_blocks_writer(db); diff --git a/pbtc/commands/start.rs b/pbtc/commands/start.rs index 1bf3ef44..5860893d 100644 --- a/pbtc/commands/start.rs +++ b/pbtc/commands/start.rs @@ -8,7 +8,7 @@ pub fn start(cfg: config::Config) -> Result<(), String> { let mut el = p2p::event_loop(); let db = open_db(&cfg); - init_db(&cfg, &db); + try!(init_db(&cfg, &db)); let p2p_cfg = p2p::Config { threads: 4, diff --git a/pbtc/util.rs b/pbtc/util.rs index 112d6723..768f247d 100644 --- a/pbtc/util.rs +++ b/pbtc/util.rs @@ -23,13 +23,15 @@ pub fn node_table_path() -> PathBuf { node_table } -pub fn init_db(cfg: &Config, db: &Arc) { +pub fn init_db(cfg: &Config, db: &Arc) -> Result<(), String> { // insert genesis block if db is empty let genesis_block = cfg.magic.genesis_block(); match db.block_hash(0) { - Some(ref db_genesis_block_hash) if db_genesis_block_hash != &genesis_block.hash() => panic!("Trying to open database with incompatible genesis block"), - Some(_) => (), - None => db.insert_block(&genesis_block) - .expect("Failed to insert genesis block to the database"), + Some(ref db_genesis_block_hash) if db_genesis_block_hash != &genesis_block.hash() => Err("Trying to open database with incompatible genesis block".into()), + Some(_) => Ok(()), + None => { + db.insert_block(&genesis_block).expect("Failed to insert genesis block to the database"); + Ok(()) + } } } From 5d587e20f69a1d4d6679ab6d5ad9fcb7e01e247c Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Mon, 7 Nov 2016 13:57:23 +0300 Subject: [PATCH 6/7] Reorganization in chain database (#93) * tests first * decanonize & test * fix denote bug * fork route & test * some refactoring of the transaction processing * reorganization on insert * non-reorg tests * fix warnings * fix doc comment * long fork reorg test * todo for shorter reorgs --- db/src/storage.rs | 535 +++++++++++++++++++++++++++++++++++-- db/src/transaction_meta.rs | 6 + 2 files changed, 518 insertions(+), 23 deletions(-) diff --git a/db/src/storage.rs b/db/src/storage.rs index b46250f5..e8cdf4a0 100644 --- a/db/src/storage.rs +++ b/db/src/storage.rs @@ -28,6 +28,8 @@ const _COL_RESERVED6: u32 = 10; const DB_VERSION: u32 = 1; +const MAX_FORK_ROUTE_PRESET: usize = 128; + /// Blockchain storage interface pub trait Store : Send + Sync { /// get best block @@ -89,6 +91,16 @@ pub enum Error { Io(std::io::Error), /// Invalid meta info Meta(MetaError), + /// Unknown hash + Unknown(H256), + /// Not the block from the main chain + NotMain(H256), + /// Fork too long + ForkTooLong, + /// Main chain block transaction attempts to double-spend + DoubleSpend(H256), + /// Chain has no best block + NoBestBlock, } impl From for Error { @@ -113,6 +125,30 @@ const KEY_VERSION: &'static[u8] = b"version"; const KEY_BEST_BLOCK_NUMBER: &'static[u8] = b"best_block_number"; const KEY_BEST_BLOCK_HASH: &'static[u8] = b"best_block_hash"; +struct UpdateContext { + pub meta: HashMap, + pub db_transaction: DBTransaction, +} + +impl UpdateContext { + pub fn new(db: &Database) -> Self { + UpdateContext { + meta: HashMap::new(), + db_transaction: db.transaction(), + } + } + + pub fn apply(mut self, db: &Database) -> Result<(), Error> { + // actually saving meta + for (hash, meta) in self.meta.drain() { + self.db_transaction.put(Some(COL_TRANSACTIONS_META), &*hash, &meta.to_bytes()); + } + + try!(db.write(self.db_transaction)); + Ok(()) + } +} + impl Storage { /// new storage at the selected path @@ -218,21 +254,34 @@ impl Storage { .collect() } - /// update transactions metadata in the specified database transaction - fn update_transactions_meta(&self, db_transaction: &mut DBTransaction, number: u32, accepted_txs: &[chain::Transaction]) { - let mut meta_buf = HashMap::::new(); + fn block_header_by_hash(&self, h: &H256) -> Option { + self.get(COL_BLOCK_HEADERS, &**h).and_then(|val| + serialization::deserialize(val.as_ref()).map_err( + |e| self.db_error(format!("Error deserializing block header, possible db corruption ({:?})", e)) + ).ok() + ) + } + + /// update transactions metadata in the specified database transaction + fn update_transactions_meta(&self, context: &mut UpdateContext, number: u32, accepted_txs: &[chain::Transaction]) + -> Result<(), Error> + { // inserting new meta for coinbase transaction for accepted_tx in accepted_txs.iter() { // adding unspent transaction meta - meta_buf.insert(accepted_tx.hash(), TransactionMeta::new(number, accepted_tx.outputs.len())); + context.meta.insert(accepted_tx.hash(), TransactionMeta::new(number, accepted_tx.outputs.len())); } // another iteration skipping coinbase transaction for accepted_tx in accepted_txs.iter().skip(1) { for input in accepted_tx.inputs.iter() { - if !match meta_buf.get_mut(&input.previous_output.hash) { + if !match context.meta.get_mut(&input.previous_output.hash) { Some(ref mut meta) => { + if meta.is_spent(input.previous_output.index as usize) { + return Err(Error::DoubleSpend(input.previous_output.hash.clone())); + } + meta.note_used(input.previous_output.index as usize); true }, @@ -245,19 +294,149 @@ impl Storage { &input.previous_output.hash )); + if meta.is_spent(input.previous_output.index as usize) { + return Err(Error::DoubleSpend(input.previous_output.hash.clone())); + } + meta.note_used(input.previous_output.index as usize); - meta_buf.insert( + context.meta.insert( input.previous_output.hash.clone(), meta); } } } - // actually saving meta - for (hash, meta) in meta_buf.drain() { - db_transaction.put(Some(COL_TRANSACTIONS_META), &*hash, &meta.to_bytes()); + Ok(()) + } + + /// block decanonization + /// all transaction outputs used are marked as not used + /// all transaction meta is removed + /// DOES NOT update best block + fn decanonize_block(&self, context: &mut UpdateContext, hash: &H256) -> Result<(), Error> { + // ensure that block is of the main chain + try!(self.block_number(hash).ok_or(Error::NotMain(hash.clone()))); + + // transaction de-provisioning + let tx_hashes = self.block_transaction_hashes_by_hash(hash); + for (tx_hash_num, tx_hash) in tx_hashes.iter().enumerate() { + let tx = self.transaction(tx_hash) + .expect("Transaction in the saved block should exist as a separate entity indefinitely"); + + // remove meta + context.db_transaction.delete(Some(COL_TRANSACTIONS_META), &**tx_hash); + + // denote outputs used + if tx_hash_num == 0 { continue; } // coinbase transaction does not have inputs + for input in tx.inputs.iter() { + if !match context.meta.get_mut(&input.previous_output.hash) { + Some(ref mut meta) => { + meta.denote_used(input.previous_output.index as usize); + true + }, + None => false, + } { + let mut meta = + self.transaction_meta(&input.previous_output.hash) + .unwrap_or_else(|| panic!( + "No transaction metadata for {}! Corrupted DB? Reindex?", + &input.previous_output.hash + )); + + meta.denote_used(input.previous_output.index as usize); + + context.meta.insert( + input.previous_output.hash.clone(), + meta); + } + } } + + Ok(()) + } + + /// Returns the height where the fork occurred and chain up to this place (not including last canonical hash) + fn fork_route(&self, max_route: usize, hash: &H256) -> Result<(u32, Vec), Error> { + let header = try!(self.block_header_by_hash(hash).ok_or(Error::Unknown(hash.clone()))); + + // only main chain blocks has block numbers + // so if it has, it is not a fork and we return empty route + if let Some(number) = self.block_number(hash) { + return Ok((number, Vec::new())); + } + + let mut next_hash = header.previous_header_hash; + let mut result = Vec::new(); + + for _ in 0..max_route { + if let Some(number) = self.block_number(&next_hash) { + return Ok((number, result)); + } + result.push(next_hash.clone()); + next_hash = try!(self.block_header_by_hash(&next_hash).ok_or(Error::Unknown(hash.clone()))) + .previous_header_hash; + } + Err(Error::ForkTooLong) + } + + fn best_number(&self) -> Option { + self.read_meta_u32(KEY_BEST_BLOCK_NUMBER) + } + + fn best_hash(&self) -> Option { + self.get(COL_META, KEY_BEST_BLOCK_HASH).map(|val| H256::from(&**val)) + } + + fn canonize_block(&self, context: &mut UpdateContext, at_height: u32, hash: &H256) -> Result<(), Error> { + let transactions = self.block_transactions_by_hash(hash); + try!(self.update_transactions_meta(context, at_height, &transactions)); + + // only canonical blocks are allowed to wield a number + context.db_transaction.put(Some(COL_BLOCK_HASHES), &u32_key(at_height), std::ops::Deref::deref(hash)); + context.db_transaction.write_u32(Some(COL_BLOCK_NUMBERS), std::ops::Deref::deref(hash), at_height); + + Ok(()) + } + + // maybe reorganize to the _known_ block + // it will actually reorganize only when side chain is at least the same length as main + fn maybe_reorganize(&self, context: &mut UpdateContext, hash: &H256) -> Result, Error> { + if self.block_number(hash).is_some() { + return Ok(None); // cannot reorganize to canonical block + } + + // find the route of the block with hash `hash` to the main chain + let (at_height, route) = try!(self.fork_route(MAX_FORK_ROUTE_PRESET, hash)); + + // reorganization is performed only if length of side chain is at least the same as main chain + // todo: shorter chain may actualy become canonical during difficulty updates, though with rather low probability + if (route.len() as i32 + 1) < (self.best_number().unwrap_or(0) as i32 - at_height as i32) { + return Ok(None); + } + + let mut now_best = try!(self.best_number().ok_or(Error::NoBestBlock)); + + // decanonizing main chain to the split point + loop { + let next_to_decanonize = try!(self.best_hash().ok_or(Error::NoBestBlock)); + try!(self.decanonize_block(context, &next_to_decanonize)); + + now_best -= 1; + + if now_best == at_height { break; } + } + + // canonizing all route from the split point + for new_canonical_hash in route.iter() { + now_best += 1; + try!(self.canonize_block(context, now_best, &new_canonical_hash)); + } + + // finaly canonizing the top block we are reorganizing to + try!(self.canonize_block(context, now_best + 1, hash)); + + Ok(Some((now_best+1, hash.clone()))) } } @@ -314,16 +493,20 @@ impl Store for Storage { } fn insert_block(&self, block: &chain::Block) -> Result<(), Error> { + + // ! lock will be held during the entire insert routine let mut best_block = self.best_block.write(); + let mut context = UpdateContext::new(&self.database); + let block_hash = block.hash(); - let new_best_hash = match best_block.as_ref().map(|bb| &bb.hash) { + let mut new_best_hash = match best_block.as_ref().map(|bb| &bb.hash) { Some(best_hash) if &block.header().previous_header_hash != best_hash => best_hash.clone(), _ => block_hash.clone(), }; - let new_best_number = match best_block.as_ref().map(|b| b.number) { + let mut new_best_number = match best_block.as_ref().map(|b| b.number) { Some(best_number) => { if block.hash() == new_best_hash { best_number + 1 } else { best_number } @@ -331,40 +514,57 @@ impl Store for Storage { None => 0, }; - let mut transaction = self.database.transaction(); - let tx_space = block.transactions().len() * 32; let mut tx_refs = Vec::with_capacity(tx_space); for tx in block.transactions() { let tx_hash = tx.hash(); tx_refs.extend(&*tx_hash); - transaction.put( + context.db_transaction.put( Some(COL_TRANSACTIONS), &*tx_hash, &serialization::serialize(tx), ); } - transaction.put(Some(COL_BLOCK_TRANSACTIONS), &*block_hash, &tx_refs); + context.db_transaction.put(Some(COL_BLOCK_TRANSACTIONS), &*block_hash, &tx_refs); - transaction.put( + context.db_transaction.put( Some(COL_BLOCK_HEADERS), &*block_hash, &serialization::serialize(block.header()) ); + // the block is continuing the main chain if best_block.as_ref().map(|b| b.number) != Some(new_best_number) { - self.update_transactions_meta(&mut transaction, new_best_number, block.transactions()); - transaction.write_u32(Some(COL_META), KEY_BEST_BLOCK_NUMBER, new_best_number); + try!(self.update_transactions_meta(&mut context, new_best_number, block.transactions())); + context.db_transaction.write_u32(Some(COL_META), KEY_BEST_BLOCK_NUMBER, new_best_number); // updating main chain height reference - transaction.put(Some(COL_BLOCK_HASHES), &u32_key(new_best_number), std::ops::Deref::deref(&block_hash)); - transaction.write_u32(Some(COL_BLOCK_NUMBERS), std::ops::Deref::deref(&block_hash), new_best_number); + context.db_transaction.put(Some(COL_BLOCK_HASHES), &u32_key(new_best_number), std::ops::Deref::deref(&block_hash)); + context.db_transaction.write_u32(Some(COL_BLOCK_NUMBERS), std::ops::Deref::deref(&block_hash), new_best_number); } - transaction.put(Some(COL_META), KEY_BEST_BLOCK_HASH, std::ops::Deref::deref(&new_best_hash)); + // the block does not continue the main chain + // but can cause reorganization here + // this can canonize the block parent if block parent + this block is longer than the main chain + else if let Some((reorg_number, _)) = self.maybe_reorganize(&mut context, &block.header().previous_header_hash).unwrap_or(None) { + // if so, we have new best main chain block + new_best_number = reorg_number + 1; + new_best_hash = block_hash; - try!(self.database.write(transaction)); + // and we canonize it also by provisioning transactions + try!(self.update_transactions_meta(&mut context, new_best_number, block.transactions())); + context.db_transaction.write_u32(Some(COL_META), KEY_BEST_BLOCK_NUMBER, new_best_number); + context.db_transaction.put(Some(COL_BLOCK_HASHES), &u32_key(new_best_number), std::ops::Deref::deref(&new_best_hash)); + context.db_transaction.write_u32(Some(COL_BLOCK_NUMBERS), std::ops::Deref::deref(&new_best_hash), new_best_number); + } + // we always update best hash even if it is not changed + context.db_transaction.put(Some(COL_META), KEY_BEST_BLOCK_HASH, std::ops::Deref::deref(&new_best_hash)); + + // write accumulated transactions meta + try!(context.apply(&self.database)); + + // updating locked best block *best_block = Some(BestBlock { hash: new_best_hash, number: new_best_number }); Ok(()) @@ -388,7 +588,7 @@ impl Store for Storage { #[cfg(test)] mod tests { - use super::{Storage, Store}; + use super::{Storage, Store, UpdateContext}; use devtools::RandomTempPath; use chain::{Block, RepresentH256}; use super::super::BlockRef; @@ -591,4 +791,293 @@ mod tests { assert!(!meta.is_spent(1), "Transaction #1 output #1 in the new block should be recorded as unspent"); assert!(!meta.is_spent(3), "Transaction #1 second #3 in the new block should be recorded as unspent"); } + + #[test] + fn reorganize_simple() { + let path = RandomTempPath::create_dir(); + let store = Storage::new(path.as_path()).unwrap(); + + let genesis = test_data::genesis(); + store.insert_block(&genesis).unwrap(); + + let (main_hash1, main_block1) = test_data::block_hash_builder() + .block() + .header().parent(genesis.hash()) + .nonce(1) + .build() + .build() + .build(); + + store.insert_block(&main_block1).expect("main block 1 should insert with no problems"); + + let (_, side_block1) = test_data::block_hash_builder() + .block() + .header().parent(genesis.hash()) + .nonce(2) + .build() + .build() + .build(); + + store.insert_block(&side_block1).expect("side block 1 should insert with no problems"); + + // chain should not reorganize to side_block1 + assert_eq!(store.best_block().unwrap().hash, main_hash1); + } + + #[test] + fn fork_smoky() { + + let path = RandomTempPath::create_dir(); + let store = Storage::new(path.as_path()).unwrap(); + + let genesis = test_data::genesis(); + store.insert_block(&genesis).unwrap(); + + let (_, main_block1) = test_data::block_hash_builder() + .block() + .header().parent(genesis.hash()) + .nonce(1) + .build() + .build() + .build(); + + store.insert_block(&main_block1).expect("main block 1 should insert with no problems"); + + let (side_hash1, side_block1) = test_data::block_hash_builder() + .block() + .header().parent(genesis.hash()) + .nonce(2) + .build() + .build() + .build(); + + store.insert_block(&side_block1).expect("side block 1 should insert with no problems"); + + let (side_hash2, side_block2) = test_data::block_hash_builder() + .block() + .header().parent(side_hash1) + .nonce(3) + .build() + .build() + .build(); + + store.insert_block(&side_block2).expect("side block 2 should insert with no problems"); + + // store should reorganize to side hash 2, because it represents the longer chain + assert_eq!(store.best_block().unwrap().hash, side_hash2); + } + + #[test] + fn fork_long() { + + let path = RandomTempPath::create_dir(); + let store = Storage::new(path.as_path()).unwrap(); + + let genesis = test_data::genesis(); + store.insert_block(&genesis).unwrap(); + + let mut last_main_block_hash = genesis.hash(); + let mut last_side_block_hash = genesis.hash(); + + for n in 0..32 { + let (new_main_hash, main_block) = test_data::block_hash_builder() + .block() + .header().parent(last_main_block_hash) + .nonce(n*2) + .build() + .build() + .build(); + store.insert_block(&main_block).expect(&format!("main block {} should insert with no problems", n)); + last_main_block_hash = new_main_hash; + + let (new_side_hash, side_block) = test_data::block_hash_builder() + .block() + .header().parent(last_side_block_hash) + .nonce(n*2 + 1) + .build() + .build() + .build(); + store.insert_block(&side_block).expect(&format!("side block {} should insert with no problems", n)); + last_side_block_hash = new_side_hash; + } + + + let (reorg_side_hash, reorg_side_block) = test_data::block_hash_builder() + .block() + .header().parent(last_side_block_hash) + .nonce(3) + .build() + .build() + .build(); + store.insert_block(&reorg_side_block).expect("last side block should insert with no problems"); + + // store should reorganize to side hash 2, because it represents the longer chain + assert_eq!(store.best_block().unwrap().hash, reorg_side_hash); + } + + // test simulates when main chain and side chain are competing all along, each adding + // block one by one + #[test] + fn fork_competing() { + + let path = RandomTempPath::create_dir(); + let store = Storage::new(path.as_path()).unwrap(); + + let genesis = test_data::genesis(); + store.insert_block(&genesis).unwrap(); + + let (main_hash1, main_block1) = test_data::block_hash_builder() + .block() + .header().parent(genesis.hash()) + .nonce(1) + .build() + .build() + .build(); + + store.insert_block(&main_block1).expect("main block 1 should insert with no problems"); + + let (side_hash1, side_block1) = test_data::block_hash_builder() + .block() + .header().parent(genesis.hash()) + .nonce(2) + .build() + .build() + .build(); + + store.insert_block(&side_block1).expect("side block 1 should insert with no problems"); + + let (main_hash2, main_block2) = test_data::block_hash_builder() + .block() + .header().parent(main_hash1) + .nonce(3) + .build() + .build() + .build(); + + store.insert_block(&main_block2).expect("main block 2 should insert with no problems"); + + let (_side_hash2, side_block2) = test_data::block_hash_builder() + .block() + .header().parent(side_hash1) + .nonce(4) + .build() + .build() + .build(); + + store.insert_block(&side_block2).expect("side block 2 should insert with no problems"); + + // store should not reorganize to side hash 2, because it competing chains are of the equal length + assert_eq!(store.best_block().unwrap().hash, main_hash2); + } + + #[test] + fn decanonize() { + let path = RandomTempPath::create_dir(); + let store = Storage::new(path.as_path()).unwrap(); + + let genesis = test_data::genesis(); + store.insert_block(&genesis).unwrap(); + let genesis_coinbase = genesis.transactions()[0].hash(); + + let block = test_data::block_builder() + .header().parent(genesis.hash()).build() + .transaction().coinbase().build() + .transaction() + .input().hash(genesis_coinbase.clone()).build() + .build() + .build(); + + store.insert_block(&block).expect("inserting first block in the decanonize test should not fail"); + + let genesis_meta = store.transaction_meta(&genesis_coinbase) + .expect("Transaction meta for the genesis coinbase transaction should exist"); + assert!(genesis_meta.is_spent(0), "Genesis coinbase should be recorded as spent because block#1 transaction spends it"); + + let mut update_context = UpdateContext::new(&store.database); + store.decanonize_block(&mut update_context, &block.hash()) + .expect("Decanonizing block #1 which was just inserted should not fail"); + update_context.apply(&store.database).unwrap(); + + let genesis_meta = store.transaction_meta(&genesis_coinbase) + .expect("Transaction meta for the genesis coinbase transaction should exist"); + assert!(!genesis_meta.is_spent(0), "Genesis coinbase should be recorded as unspent because we retracted block #1"); + } + + #[test] + fn fork_route() { + let path = RandomTempPath::create_dir(); + let store = Storage::new(path.as_path()).unwrap(); + + let genesis = test_data::genesis(); + store.insert_block(&genesis).unwrap(); + + let (main_hash1, main_block1) = test_data::block_hash_builder() + .block() + .header().parent(genesis.hash()) + .nonce(1) + .build() + .build() + .build(); + store.insert_block(&main_block1).expect("main block 1 should insert with no problems"); + + let (main_hash2, main_block2) = test_data::block_hash_builder() + .block() + .header().parent(main_hash1) + .nonce(2) + .build() + .build() + .build(); + store.insert_block(&main_block2).expect("main block 2 should insert with no problems"); + + let (main_hash3, main_block3) = test_data::block_hash_builder() + .block() + .header().parent(main_hash2) + .nonce(3) + .build() + .build() + .build(); + store.insert_block(&main_block3).expect("main block 3 should insert with no problems"); + + let (_, main_block4) = test_data::block_hash_builder() + .block() + .header().parent(main_hash3) + .nonce(4) + .build() + .build() + .build(); + store.insert_block(&main_block4).expect("main block 4 should insert with no problems"); + + let (side_hash1, side_block1) = test_data::block_hash_builder() + .block() + .header().parent(genesis.hash()) + .nonce(5) + .build() + .build() + .build(); + store.insert_block(&side_block1).expect("side block 1 should insert with no problems"); + + let (side_hash2, side_block2) = test_data::block_hash_builder() + .block() + .header().parent(side_hash1.clone()) + .nonce(6) + .build() + .build() + .build(); + store.insert_block(&side_block2).expect("side block 2 should insert with no problems"); + + let (side_hash3, side_block3) = test_data::block_hash_builder() + .block() + .header().parent(side_hash2.clone()) + .nonce(7) + .build() + .build() + .build(); + store.insert_block(&side_block3).expect("side block 3 should insert with no problems"); + + + let (h, route) = store.fork_route(16, &side_hash3).expect("Fork route should have been built"); + + assert_eq!(h, 0); + assert_eq!(route, vec![side_hash2, side_hash1]); + } } diff --git a/db/src/transaction_meta.rs b/db/src/transaction_meta.rs index 7be05890..3ae7408d 100644 --- a/db/src/transaction_meta.rs +++ b/db/src/transaction_meta.rs @@ -29,6 +29,12 @@ impl TransactionMeta { self.spent.set(index, true); } + + /// note that particular output has been used + pub fn denote_used(&mut self, index: usize) { + self.spent.set(index, false); + } + pub fn to_bytes(self) -> Vec { let mut result = vec![0u8; 4]; LittleEndian::write_u32(&mut result[0..4], self.block_height); From 54e2ae2a4b3f9814a4c90cd3210594fb517918ef Mon Sep 17 00:00:00 2001 From: debris Date: Mon, 7 Nov 2016 12:38:52 +0100 Subject: [PATCH 7/7] avoid redundant clone on header serialize --- message/src/types/headers.rs | 36 +++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/message/src/types/headers.rs b/message/src/types/headers.rs index 531a9aca..9a2aeeb7 100644 --- a/message/src/types/headers.rs +++ b/message/src/types/headers.rs @@ -11,8 +11,26 @@ pub struct Headers { #[derive(Debug, PartialEq)] struct HeaderWithTxnCount { - pub header: BlockHeader, - pub txn_count: u64, + header: BlockHeader, +} + +impl From for BlockHeader { + fn from(header: HeaderWithTxnCount) -> BlockHeader { + header.header + } +} + +#[derive(Debug, PartialEq)] +struct HeaderWithTxnCountRef<'a> { + header: &'a BlockHeader, +} + +impl<'a> From<&'a BlockHeader> for HeaderWithTxnCountRef<'a> { + fn from(header: &'a BlockHeader) -> Self { + HeaderWithTxnCountRef { + header: header, + } + } } impl Payload for Headers { @@ -27,23 +45,23 @@ impl Payload for Headers { fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult where T: io::Read { let headers_with_txn_count: Vec = try!(reader.read_list()); let headers = Headers { - headers: headers_with_txn_count.into_iter().map(|h| h.header).collect(), + headers: headers_with_txn_count.into_iter().map(Into::into).collect(), }; Ok(headers) } fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> { - let headers_with_txn_count: Vec<_> = self.headers.iter().map(|h| HeaderWithTxnCount { header: h.clone(), txn_count: 0 }).collect(); + let headers_with_txn_count: Vec = self.headers.iter().map(Into::into).collect(); stream.append_list(&headers_with_txn_count); Ok(()) } } -impl Serializable for HeaderWithTxnCount { +impl<'a> Serializable for HeaderWithTxnCountRef<'a> { fn serialize(&self, stream: &mut Stream) { stream - .append(&self.header) + .append(self.header) .append(&CompactInteger::from(0u32)); } } @@ -52,9 +70,13 @@ impl Deserializable for HeaderWithTxnCount { fn deserialize(reader: &mut Reader) -> Result where T: io::Read { let header = HeaderWithTxnCount { header: try!(reader.read()), - txn_count: try!(reader.read::()).into(), }; + let txn_count: CompactInteger = try!(reader.read()); + if txn_count != 0u32.into() { + return Err(ReaderError::MalformedData); + } + Ok(header) } }