diff --git a/db/src/storage.rs b/db/src/storage.rs index 9aea281f..d13e8960 100644 --- a/db/src/storage.rs +++ b/db/src/storage.rs @@ -389,7 +389,7 @@ impl Storage { self.read_meta_u32(KEY_BEST_BLOCK_NUMBER) } - fn best_hash(&self) -> Option { + fn _best_hash(&self) -> Option { self.get(COL_META, KEY_BEST_BLOCK_HASH).map(|val| H256::from(&**val)) } diff --git a/message/src/common/magic.rs b/message/src/common/magic.rs index 28d60563..fdf7228d 100644 --- a/message/src/common/magic.rs +++ b/message/src/common/magic.rs @@ -2,6 +2,7 @@ //! https://www.anintegratedworld.com/unravelling-the-mysterious-block-chain-magic-number/ use ser::{Stream, Serializable}; +use chain::Block; use Error; const MAGIC_MAINNET: u32 = 0xD9B4BEF9; @@ -53,6 +54,14 @@ impl Magic { Magic::Regtest => 18443, } } + + pub fn genesis_block(&self) -> Block { + match *self { + Magic::Mainnet => "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a29ab5f49ffff001d1dac2b7c0101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000".into(), + Magic::Testnet => "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4adae5494dffff001d1aa4ae180101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000".into(), + Magic::Regtest => "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4adae5494dffff7f20020000000101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000".into(), + } + } } impl Serializable for Magic { diff --git a/message/src/types/headers.rs b/message/src/types/headers.rs index 5427e111..9a2aeeb7 100644 --- a/message/src/types/headers.rs +++ b/message/src/types/headers.rs @@ -1,6 +1,6 @@ use std::io; use chain::BlockHeader; -use ser::{Stream, Reader}; +use ser::{Stream, Reader, Serializable, Deserializable, CompactInteger, Error as ReaderError}; use {Payload, MessageResult}; #[derive(Debug, PartialEq)] @@ -9,6 +9,30 @@ pub struct Headers { pub headers: Vec, } +#[derive(Debug, PartialEq)] +struct HeaderWithTxnCount { + header: BlockHeader, +} + +impl From for BlockHeader { + fn from(header: HeaderWithTxnCount) -> BlockHeader { + header.header + } +} + +#[derive(Debug, PartialEq)] +struct HeaderWithTxnCountRef<'a> { + header: &'a BlockHeader, +} + +impl<'a> From<&'a BlockHeader> for HeaderWithTxnCountRef<'a> { + fn from(header: &'a BlockHeader) -> Self { + HeaderWithTxnCountRef { + header: header, + } + } +} + impl Payload for Headers { fn version() -> u32 { 0 @@ -19,15 +43,40 @@ impl Payload for Headers { } fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult where T: io::Read { + let headers_with_txn_count: Vec = try!(reader.read_list()); let headers = Headers { - headers: try!(reader.read_list()), + headers: headers_with_txn_count.into_iter().map(Into::into).collect(), }; Ok(headers) } fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> { - stream.append_list(&self.headers); + let headers_with_txn_count: Vec = self.headers.iter().map(Into::into).collect(); + stream.append_list(&headers_with_txn_count); Ok(()) } } + +impl<'a> Serializable for HeaderWithTxnCountRef<'a> { + fn serialize(&self, stream: &mut Stream) { + stream + .append(self.header) + .append(&CompactInteger::from(0u32)); + } +} + +impl Deserializable for HeaderWithTxnCount { + fn deserialize(reader: &mut Reader) -> Result where T: io::Read { + let header = HeaderWithTxnCount { + header: try!(reader.read()), + }; + + let txn_count: CompactInteger = try!(reader.read()); + if txn_count != 0u32.into() { + return Err(ReaderError::MalformedData); + } + + Ok(header) + } +} diff --git a/pbtc/commands/import.rs b/pbtc/commands/import.rs index 97344bbc..39b045cf 100644 --- a/pbtc/commands/import.rs +++ b/pbtc/commands/import.rs @@ -4,9 +4,9 @@ use config::Config; use util::{open_db, init_db}; pub fn import(cfg: Config, matches: &ArgMatches) -> Result<(), String> { - let db = open_db(cfg.use_disk_database); + let db = open_db(&cfg); // TODO: this might be unnecessary here! - init_db(&db); + try!(init_db(&cfg, &db)); let mut writer = create_sync_blocks_writer(db); diff --git a/pbtc/commands/start.rs b/pbtc/commands/start.rs index f7fb7f06..5860893d 100644 --- a/pbtc/commands/start.rs +++ b/pbtc/commands/start.rs @@ -7,6 +7,9 @@ use {config, p2p}; pub fn start(cfg: config::Config) -> Result<(), String> { let mut el = p2p::event_loop(); + let db = open_db(&cfg); + try!(init_db(&cfg, &db)); + let p2p_cfg = p2p::Config { threads: 4, protocol_minimum: 70001, @@ -26,9 +29,6 @@ pub fn start(cfg: config::Config) -> Result<(), String> { node_table_path: node_table_path(), }; - let db = open_db(cfg.use_disk_database); - init_db(&db); - let sync_handle = el.handle(); let sync_connection_factory = create_sync_connection_factory(&sync_handle, db); diff --git a/pbtc/util.rs b/pbtc/util.rs index f61d788d..768f247d 100644 --- a/pbtc/util.rs +++ b/pbtc/util.rs @@ -1,11 +1,12 @@ use std::sync::Arc; use std::path::PathBuf; use app_dirs::{app_dir, AppDataType}; -use chain::Block; +use chain::RepresentH256; use {db, APP_INFO}; +use config::Config; -pub fn open_db(use_disk_database: bool) -> Arc { - match use_disk_database { +pub fn open_db(cfg: &Config) -> Arc { + match cfg.use_disk_database { true => { let db_path = app_dir(AppDataType::UserData, &APP_INFO, "db").expect("Failed to get app dir"); Arc::new(db::Storage::new(db_path).expect("Failed to open database")) @@ -22,12 +23,15 @@ pub fn node_table_path() -> PathBuf { node_table } -pub fn init_db(db: &Arc) { +pub fn init_db(cfg: &Config, db: &Arc) -> Result<(), String> { // insert genesis block if db is empty - if db.best_block().is_none() { - // TODO: move to config - let genesis_block: Block = "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a29ab5f49ffff001d1dac2b7c0101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000".into(); - db.insert_block(&genesis_block) - .expect("Failed to insert genesis block to the database"); + let genesis_block = cfg.magic.genesis_block(); + match db.block_hash(0) { + Some(ref db_genesis_block_hash) if db_genesis_block_hash != &genesis_block.hash() => Err("Trying to open database with incompatible genesis block".into()), + Some(_) => Ok(()), + None => { + db.insert_block(&genesis_block).expect("Failed to insert genesis block to the database"); + Ok(()) + } } } diff --git a/sync/src/hash_queue.rs b/sync/src/hash_queue.rs index 0d2b0d2b..7a4dd19a 100644 --- a/sync/src/hash_queue.rs +++ b/sync/src/hash_queue.rs @@ -39,11 +39,6 @@ impl HashQueue { self.queue.len() as u32 } - /// Returns true if queue is empty. - pub fn is_empty(&self) -> bool { - self.queue.is_empty() - } - /// Returns front element from the given queue. pub fn front(&self) -> Option { self.queue.front().cloned() @@ -63,19 +58,6 @@ impl HashQueue { Some(self.queue[queue_len - 2].clone()) } - /// Returns n-th element (n is starting from 0), starting from the back-element in the queue. - /// If there are no n-th element - returns (n-1) element & etc. - pub fn back_skip_n(&self, n: usize) -> Option { - let queue_len = self.queue.len(); - if queue_len == 0 { - return None; - } - if n + 1 > queue_len { - return Some(self.queue[0].clone()) - } - return Some(self.queue[queue_len - n - 1].clone()) - } - /// Returns true if queue contains element. pub fn contains(&self, hash: &H256) -> bool { self.set.contains(hash) @@ -175,11 +157,6 @@ impl HashQueueChain { self.chain[queue_index].len() } - /// Returns true if given queue is empty. - pub fn is_empty_at(&self, queue_index: usize) -> bool { - self.chain[queue_index].is_empty() - } - /// Returns element at the front of the given queue. pub fn front_at(&self, queue_index: usize) -> Option { let ref queue = self.chain[queue_index]; @@ -198,13 +175,6 @@ impl HashQueueChain { queue.pre_back() } - /// Returns n-th element (n is starting from 0), starting from the back-element in given queue. - /// If there are no n-th element - returns (n-1) element & etc. - pub fn back_skip_n_at(&self, chain_index: usize, n: usize) -> Option { - let ref queue = self.chain[chain_index]; - queue.back_skip_n(n) - } - /// Returns the back of the whole chain. pub fn back(&self) -> Option { let mut queue_index = self.chain.len() - 1; @@ -223,6 +193,7 @@ impl HashQueueChain { } /// Checks if hash is contained in given queue. + #[cfg(test)] pub fn is_contained_in(&self, queue_index: usize, hash: &H256) -> bool { self.chain[queue_index].contains(hash) } @@ -283,16 +254,15 @@ impl Index for HashQueueChain { #[cfg(test)] mod tests { use super::{HashQueue, HashQueueChain, HashPosition}; + use primitives::hash::H256; #[test] fn hash_queue_empty() { let mut queue = HashQueue::new(); assert_eq!(queue.len(), 0); - assert_eq!(queue.is_empty(), true); assert_eq!(queue.front(), None); assert_eq!(queue.back(), None); assert_eq!(queue.pre_back(), None); - assert_eq!(queue.back_skip_n(100), None); assert_eq!(queue.contains(&"000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f".into()), false); assert_eq!(queue.pop_front(), None); assert_eq!(queue.pop_front_n(100), vec![]); @@ -304,11 +274,9 @@ mod tests { let mut chain = HashQueueChain::with_number_of_queues(3); assert_eq!(chain.len(), 0); assert_eq!(chain.len_of(0), 0); - assert_eq!(chain.is_empty_at(0), true); assert_eq!(chain.front_at(0), None); assert_eq!(chain.back_at(0), None); assert_eq!(chain.pre_back_at(0), None); - assert_eq!(chain.back_skip_n_at(0, 100), None); assert_eq!(chain.back(), None); assert_eq!(chain.is_contained_in(0, &"000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f".into()), false); assert_eq!(chain.contains_in(&"000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f".into()), None); @@ -320,16 +288,16 @@ mod tests { fn hash_queue_chain_not_empty() { let mut chain = HashQueueChain::with_number_of_queues(4); chain.push_back_n_at(0, vec![ - "0000000000000000000000000000000000000000000000000000000000000000".into(), - "0000000000000000000000000000000000000000000000000000000000000001".into(), - "0000000000000000000000000000000000000000000000000000000000000002".into(), + H256::from(0), + H256::from(1), + H256::from(2), ]); chain.push_back_n_at(1, vec![ - "0000000000000000000000000000000000000000000000000000000000000003".into(), - "0000000000000000000000000000000000000000000000000000000000000004".into(), + H256::from(3), + H256::from(4), ]); chain.push_back_n_at(2, vec![ - "0000000000000000000000000000000000000000000000000000000000000005".into(), + H256::from(5), ]); assert_eq!(chain.len(), 6); @@ -337,42 +305,25 @@ mod tests { assert_eq!(chain.len_of(1), 2); assert_eq!(chain.len_of(2), 1); assert_eq!(chain.len_of(3), 0); - assert_eq!(chain.is_empty_at(0), false); - assert_eq!(chain.is_empty_at(1), false); - assert_eq!(chain.is_empty_at(2), false); - assert_eq!(chain.is_empty_at(3), true); - assert_eq!(chain.front_at(0), Some("0000000000000000000000000000000000000000000000000000000000000000".into())); - assert_eq!(chain.front_at(1), Some("0000000000000000000000000000000000000000000000000000000000000003".into())); - assert_eq!(chain.front_at(2), Some("0000000000000000000000000000000000000000000000000000000000000005".into())); + assert_eq!(chain.front_at(0), Some(H256::from(0))); + assert_eq!(chain.front_at(1), Some(H256::from(3))); + assert_eq!(chain.front_at(2), Some(H256::from(5))); assert_eq!(chain.front_at(3), None); - assert_eq!(chain.back_at(0), Some("0000000000000000000000000000000000000000000000000000000000000002".into())); - assert_eq!(chain.back_at(1), Some("0000000000000000000000000000000000000000000000000000000000000004".into())); - assert_eq!(chain.back_at(2), Some("0000000000000000000000000000000000000000000000000000000000000005".into())); + assert_eq!(chain.back_at(0), Some(H256::from(2))); + assert_eq!(chain.back_at(1), Some(H256::from(4))); + assert_eq!(chain.back_at(2), Some(H256::from(5))); assert_eq!(chain.back_at(3), None); - assert_eq!(chain.pre_back_at(0), Some("0000000000000000000000000000000000000000000000000000000000000001".into())); - assert_eq!(chain.pre_back_at(1), Some("0000000000000000000000000000000000000000000000000000000000000003".into())); + assert_eq!(chain.pre_back_at(0), Some(H256::from(1))); + assert_eq!(chain.pre_back_at(1), Some(H256::from(3))); assert_eq!(chain.pre_back_at(2), None); assert_eq!(chain.pre_back_at(3), None); - assert_eq!(chain.back(), Some("0000000000000000000000000000000000000000000000000000000000000005".into())); - assert_eq!(chain.is_contained_in(0, &"0000000000000000000000000000000000000000000000000000000000000002".into()), true); - assert_eq!(chain.is_contained_in(1, &"0000000000000000000000000000000000000000000000000000000000000002".into()), false); - assert_eq!(chain.is_contained_in(2, &"0000000000000000000000000000000000000000000000000000000000000002".into()), false); - assert_eq!(chain.is_contained_in(3, &"0000000000000000000000000000000000000000000000000000000000000002".into()), false); - assert_eq!(chain.contains_in(&"0000000000000000000000000000000000000000000000000000000000000002".into()), Some(0)); - assert_eq!(chain.contains_in(&"0000000000000000000000000000000000000000000000000000000000000005".into()), Some(2)); - assert_eq!(chain.contains_in(&"0000000000000000000000000000000000000000000000000000000000000009".into()), None); - - assert_eq!(chain.back_skip_n_at(0, 0), Some("0000000000000000000000000000000000000000000000000000000000000002".into())); - assert_eq!(chain.back_skip_n_at(1, 0), Some("0000000000000000000000000000000000000000000000000000000000000004".into())); - assert_eq!(chain.back_skip_n_at(2, 0), Some("0000000000000000000000000000000000000000000000000000000000000005".into())); - assert_eq!(chain.back_skip_n_at(3, 0), None); - assert_eq!(chain.back_skip_n_at(0, 1), Some("0000000000000000000000000000000000000000000000000000000000000001".into())); - assert_eq!(chain.back_skip_n_at(1, 1), Some("0000000000000000000000000000000000000000000000000000000000000003".into())); - assert_eq!(chain.back_skip_n_at(2, 1), Some("0000000000000000000000000000000000000000000000000000000000000005".into())); - assert_eq!(chain.back_skip_n_at(3, 1), None); - assert_eq!(chain.back_skip_n_at(0, 2), Some("0000000000000000000000000000000000000000000000000000000000000000".into())); - assert_eq!(chain.back_skip_n_at(1, 2), Some("0000000000000000000000000000000000000000000000000000000000000003".into())); - assert_eq!(chain.back_skip_n_at(2, 2), Some("0000000000000000000000000000000000000000000000000000000000000005".into())); - assert_eq!(chain.back_skip_n_at(3, 2), None); + assert_eq!(chain.back(), Some(H256::from(5))); + assert_eq!(chain.is_contained_in(0, &H256::from(2)), true); + assert_eq!(chain.is_contained_in(1, &H256::from(2)), false); + assert_eq!(chain.is_contained_in(2, &H256::from(2)), false); + assert_eq!(chain.is_contained_in(3, &H256::from(2)), false); + assert_eq!(chain.contains_in(&H256::from(2)), Some(0)); + assert_eq!(chain.contains_in(&H256::from(5)), Some(2)); + assert_eq!(chain.contains_in(&H256::from(9)), None); } } diff --git a/sync/src/synchronization_chain.rs b/sync/src/synchronization_chain.rs index fb3206bd..c9bfe55c 100644 --- a/sync/src/synchronization_chain.rs +++ b/sync/src/synchronization_chain.rs @@ -1,7 +1,7 @@ use std::fmt; use std::sync::Arc; use parking_lot::RwLock; -use chain::{Block, RepresentH256}; +use chain::Block; use db; use primitives::hash::H256; use hash_queue::{HashQueueChain, HashPosition}; @@ -47,6 +47,23 @@ pub struct Information { pub stored: u32, } +/// Result of intersecting chain && inventory +#[derive(Debug, PartialEq)] +pub enum InventoryIntersection { + /// 3.2: No intersection with in-memory queue && no intersection with db + NoKnownBlocks(usize), + /// 2.3: Inventory has no new blocks && some of blocks in inventory are in in-memory queue + InMemoryNoNewBlocks, + /// 2.4.2: Inventory has new blocks && these blocks are right after chain' best block + InMemoryMainNewBlocks(usize), + /// 2.4.3: Inventory has new blocks && these blocks are forked from our chain' best block + InMemoryForkNewBlocks(usize), + /// 3.3: No intersection with in-memory queue && has intersection with db && all blocks are already stored in db + DbAllBlocksKnown, + /// 3.4: No intersection with in-memory queue && has intersection with db && some blocks are not yet stored in db + DbForkNewBlocks(usize), +} + /// Blockchain from synchroniation point of view, consisting of: /// 1) all blocks from the `storage` [oldest blocks] /// 2) all blocks currently verifying by `verification_queue` @@ -109,7 +126,7 @@ impl Chain { scheduled: self.hash_chain.len_of(SCHEDULED_QUEUE), requested: self.hash_chain.len_of(REQUESTED_QUEUE), verifying: self.hash_chain.len_of(VERIFYING_QUEUE), - stored: self.storage.best_block().map_or(0, |block| block.number + 1), + stored: self.best_storage_block.number + 1, } } @@ -137,68 +154,14 @@ impl Chain { } } - /// Returns true if has blocks of given type - pub fn has_blocks_of_state(&self, state: BlockState) -> bool { - match state { - BlockState::Stored => true, // storage with genesis block is required - _ => !self.hash_chain.is_empty_at(state.to_queue_index()), - } - } - /// Get best block pub fn best_block(&self) -> db::BestBlock { - let storage_best_block = self.storage.best_block().expect("storage with genesis block is required"); match self.hash_chain.back() { Some(hash) => db::BestBlock { - number: storage_best_block.number + self.hash_chain.len(), + number: self.best_storage_block.number + self.hash_chain.len(), hash: hash.clone(), }, - None => db::BestBlock { - number: storage_best_block.number, - hash: storage_best_block.hash, - } - } - } - - /// Get best block of given state - pub fn best_block_of_state(&self, state: BlockState) -> Option { - match state { - BlockState::Scheduled => self.hash_chain.back_at(SCHEDULED_QUEUE) - .map(|hash| db::BestBlock { - hash: hash, - number: self.storage.best_block().expect("storage with genesis block is required").number + 1 - + self.hash_chain.len_of(VERIFYING_QUEUE) - + self.hash_chain.len_of(REQUESTED_QUEUE) - + self.hash_chain.len_of(SCHEDULED_QUEUE) - }), - BlockState::Requested => self.hash_chain.back_at(REQUESTED_QUEUE) - .map(|hash| db::BestBlock { - hash: hash, - number: self.storage.best_block().expect("storage with genesis block is required").number + 1 - + self.hash_chain.len_of(VERIFYING_QUEUE) - + self.hash_chain.len_of(REQUESTED_QUEUE) - }), - BlockState::Verifying => self.hash_chain.back_at(VERIFYING_QUEUE) - .map(|hash| db::BestBlock { - hash: hash, - number: self.storage.best_block().expect("storage with genesis block is required").number + 1 - + self.hash_chain.len_of(VERIFYING_QUEUE) - }), - BlockState::Stored => { - self.storage.best_block() - }, - _ => panic!("not supported"), - } - } - - /// Check if block has given state - pub fn block_has_state(&self, hash: &H256, state: BlockState) -> bool { - match state { - BlockState::Scheduled => self.hash_chain.is_contained_in(SCHEDULED_QUEUE, hash), - BlockState::Requested => self.hash_chain.is_contained_in(REQUESTED_QUEUE, hash), - BlockState::Verifying => self.hash_chain.is_contained_in(VERIFYING_QUEUE, hash), - BlockState::Stored => self.storage.contains_block(db::BlockRef::Hash(hash.clone())), - BlockState::Unknown => self.block_state(hash) == BlockState::Unknown, + None => self.best_storage_block.clone(), } } @@ -214,39 +177,12 @@ impl Chain { } } - /// Get block number from storage - pub fn storage_block_number(&self, hash: &H256) -> Option { - self.storage.block_number(hash) - } - - /// Get block hash from storage - pub fn storage_block_hash(&self, number: u32) -> Option { - self.storage.block_hash(number) - } - - /// Get block from the storage - pub fn storage_block(&self, hash: &H256) -> Option { - self.storage.block(db::BlockRef::Hash(hash.clone())) - } - - /// Prepare best block locator hashes - pub fn best_block_locator_hashes(&self) -> Vec { - let mut result: Vec = Vec::with_capacity(4); - if let Some(pre_best_block) = self.hash_chain.back_skip_n_at(SCHEDULED_QUEUE, 2) { - result.push(pre_best_block); - } - if let Some(pre_best_block) = self.hash_chain.back_skip_n_at(REQUESTED_QUEUE, 2) { - result.push(pre_best_block); - } - if let Some(pre_best_block) = self.hash_chain.back_skip_n_at(VERIFYING_QUEUE, 2) { - result.push(pre_best_block); - } - result.push(self.best_storage_block.hash.clone()); - result - } - /// Prepare block locator hashes, as described in protocol documentation: /// https://en.bitcoin.it/wiki/Protocol_documentation#getblocks + /// When there are forked blocks in the queue, this method can result in + /// mixed block locator hashes ([0 - from fork1, 1 - from fork2, 2 - from fork1]). + /// Peer will respond with blocks of fork1 || fork2 => we could end up in some side fork + /// To resolve this, after switching to saturated state, we will also ask all peers for inventory. pub fn block_locator_hashes(&self) -> Vec { let mut block_locator_hashes: Vec = Vec::new(); @@ -254,8 +190,7 @@ impl Chain { let (local_index, step) = self.block_locator_hashes_for_queue(&mut block_locator_hashes); // calculate for storage - let storage_best_block_number = self.storage.best_block().expect("storage with genesis block is required").number; - let storage_index = if storage_best_block_number < local_index { 0 } else { storage_best_block_number - local_index }; + let storage_index = if self.best_storage_block.number < local_index { 0 } else { self.best_storage_block.number - local_index }; self.block_locator_hashes_for_storage(storage_index, step, &mut block_locator_hashes); block_locator_hashes } @@ -287,16 +222,21 @@ impl Chain { /// Insert new best block to storage pub fn insert_best_block(&mut self, block: Block) -> Result<(), db::Error> { - if block.block_header.previous_header_hash != self.best_storage_block.hash { - return Err(db::Error::DB("Trying to insert out-of-order block".into())); - } + // insert to storage + try!(self.storage.insert_block(&block)); // remember new best block hash - self.best_storage_block.number += 1; - self.best_storage_block.hash = block.hash(); + self.best_storage_block = self.storage.best_block().expect("Inserted block above"); - // insert to storage - self.storage.insert_block(&block) + Ok(()) + } + + /// Remove block + pub fn remove_block(&mut self, hash: &H256) { + if self.hash_chain.remove_at(SCHEDULED_QUEUE, hash) == HashPosition::Missing + && self.hash_chain.remove_at(REQUESTED_QUEUE, hash) == HashPosition::Missing { + self.hash_chain.remove_at(VERIFYING_QUEUE, hash); + } } /// Remove block by hash if it is currently in given state @@ -309,6 +249,57 @@ impl Chain { self.hash_chain.remove_all_at(state.to_queue_index()); } + /// Intersect chain with inventory + pub fn intersect_with_inventory(&self, inventory: &Vec) -> InventoryIntersection { + let inventory_len = inventory.len(); + assert!(inventory_len != 0); + + // giving that blocks in inventory are ordered + match self.block_state(&inventory[0]) { + // if first block of inventory is unknown => all other blocks are also unknown + BlockState::Unknown => { + InventoryIntersection::NoKnownBlocks(0) + }, + // else if first block is known + first_block_state @ _ => match self.block_state(&inventory[inventory_len - 1]) { + // if last block is known to be in db => all inventory blocks are also in db + BlockState::Stored => { + InventoryIntersection::DbAllBlocksKnown + }, + // if first block is known && last block is unknown => intersection with queue or with db + BlockState::Unknown => { + // find last known block + let mut previous_state = first_block_state; + for index in 1..inventory_len { + let state = self.block_state(&inventory[index]); + if state == BlockState::Unknown { + // previous block is stored => fork from stored block + if previous_state == BlockState::Stored { + return InventoryIntersection::DbForkNewBlocks(index); + } + // previous block is best block => no fork + else if &self.best_block().hash == &inventory[index - 1] { + return InventoryIntersection::InMemoryMainNewBlocks(index); + } + // previous block is not a best block => fork + else { + return InventoryIntersection::InMemoryForkNewBlocks(index); + } + } + previous_state = state; + } + + // unreachable because last block is unknown && in above loop we search for unknown blocks + unreachable!(); + }, + // if first block is known && last block is also known && is in queue => queue intersection with no new block + _ => { + InventoryIntersection::InMemoryNoNewBlocks + } + } + } + } + /// Calculate block locator hashes for hash queue fn block_locator_hashes_for_queue(&self, hashes: &mut Vec) -> (u32, u32) { let queue_len = self.hash_chain.len(); @@ -359,7 +350,7 @@ impl fmt::Debug for Chain { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { try!(writeln!(f, "chain: [")); { - let mut num = self.storage.best_block().expect("Storage with genesis block is required").number; + let mut num = self.best_storage_block.number; try!(writeln!(f, "\tworse(stored): {} {:?}", 0, self.storage.block_hash(0))); try!(writeln!(f, "\tbest(stored): {} {:?}", num, self.storage.block_hash(num))); @@ -389,8 +380,9 @@ mod tests { use std::sync::Arc; use chain::RepresentH256; use hash_queue::HashPosition; - use super::{Chain, BlockState}; + use super::{Chain, BlockState, InventoryIntersection}; use db::{self, Store, BestBlock}; + use primitives::hash::H256; use test_data; #[test] @@ -406,19 +398,9 @@ mod tests { assert_eq!(chain.length_of_state(BlockState::Requested), 0); assert_eq!(chain.length_of_state(BlockState::Verifying), 0); assert_eq!(chain.length_of_state(BlockState::Stored), 1); - assert_eq!(chain.has_blocks_of_state(BlockState::Scheduled), false); - assert_eq!(chain.has_blocks_of_state(BlockState::Requested), false); - assert_eq!(chain.has_blocks_of_state(BlockState::Verifying), false); - assert_eq!(chain.has_blocks_of_state(BlockState::Stored), true); assert_eq!(&chain.best_block(), &db_best_block); - assert_eq!(chain.best_block_of_state(BlockState::Scheduled), None); - assert_eq!(chain.best_block_of_state(BlockState::Requested), None); - assert_eq!(chain.best_block_of_state(BlockState::Verifying), None); - assert_eq!(chain.best_block_of_state(BlockState::Stored), Some(db_best_block.clone())); - assert_eq!(chain.block_has_state(&db_best_block.hash, BlockState::Requested), false); - assert_eq!(chain.block_has_state(&db_best_block.hash, BlockState::Stored), true); assert_eq!(chain.block_state(&db_best_block.hash), BlockState::Stored); - assert_eq!(chain.block_state(&"0000000000000000000000000000000000000000000000000000000000000000".into()), BlockState::Unknown); + assert_eq!(chain.block_state(&H256::from(0)), BlockState::Unknown); } #[test] @@ -428,12 +410,12 @@ mod tests { // add 6 blocks to scheduled queue chain.schedule_blocks_hashes(vec![ - "0000000000000000000000000000000000000000000000000000000000000000".into(), - "0000000000000000000000000000000000000000000000000000000000000001".into(), - "0000000000000000000000000000000000000000000000000000000000000002".into(), - "0000000000000000000000000000000000000000000000000000000000000003".into(), - "0000000000000000000000000000000000000000000000000000000000000004".into(), - "0000000000000000000000000000000000000000000000000000000000000005".into(), + H256::from(0), + H256::from(1), + H256::from(2), + H256::from(3), + H256::from(4), + H256::from(5), ]); assert!(chain.information().scheduled == 6 && chain.information().requested == 0 && chain.information().verifying == 0 && chain.information().stored == 1); @@ -452,22 +434,22 @@ mod tests { && chain.information().verifying == 0 && chain.information().stored == 1); // try to remove block 0 from scheduled queue => missing - assert_eq!(chain.remove_block_with_state(&"0000000000000000000000000000000000000000000000000000000000000000".into(), BlockState::Scheduled), HashPosition::Missing); + assert_eq!(chain.remove_block_with_state(&H256::from(0), BlockState::Scheduled), HashPosition::Missing); assert!(chain.information().scheduled == 3 && chain.information().requested == 3 && chain.information().verifying == 0 && chain.information().stored == 1); // remove blocks 0 & 1 from requested queue - assert_eq!(chain.remove_block_with_state(&"0000000000000000000000000000000000000000000000000000000000000001".into(), BlockState::Requested), HashPosition::Inside); - assert_eq!(chain.remove_block_with_state(&"0000000000000000000000000000000000000000000000000000000000000000".into(), BlockState::Requested), HashPosition::Front); + assert_eq!(chain.remove_block_with_state(&H256::from(1), BlockState::Requested), HashPosition::Inside); + assert_eq!(chain.remove_block_with_state(&H256::from(0), BlockState::Requested), HashPosition::Front); assert!(chain.information().scheduled == 3 && chain.information().requested == 1 && chain.information().verifying == 0 && chain.information().stored == 1); // mark 0 & 1 as verifying - chain.verify_block_hash("0000000000000000000000000000000000000000000000000000000000000001".into()); - chain.verify_block_hash("0000000000000000000000000000000000000000000000000000000000000002".into()); + chain.verify_block_hash(H256::from(1)); + chain.verify_block_hash(H256::from(2)); assert!(chain.information().scheduled == 3 && chain.information().requested == 1 && chain.information().verifying == 2 && chain.information().stored == 1); // mark block 0 as verified - assert_eq!(chain.remove_block_with_state(&"0000000000000000000000000000000000000000000000000000000000000001".into(), BlockState::Verifying), HashPosition::Front); + assert_eq!(chain.remove_block_with_state(&H256::from(1), BlockState::Verifying), HashPosition::Front); assert!(chain.information().scheduled == 3 && chain.information().requested == 1 && chain.information().verifying == 1 && chain.information().stored == 1); // insert new best block to the chain @@ -496,85 +478,149 @@ mod tests { assert_eq!(chain.block_locator_hashes(), vec![block2_hash.clone(), block1_hash.clone(), genesis_hash.clone()]); chain.schedule_blocks_hashes(vec![ - "0000000000000000000000000000000000000000000000000000000000000000".into(), - "0000000000000000000000000000000000000000000000000000000000000001".into(), - "0000000000000000000000000000000000000000000000000000000000000002".into(), - "0000000000000000000000000000000000000000000000000000000000000003".into(), - "0000000000000000000000000000000000000000000000000000000000000004".into(), - "0000000000000000000000000000000000000000000000000000000000000005".into(), - "0000000000000000000000000000000000000000000000000000000000000006".into(), - "0000000000000000000000000000000000000000000000000000000000000007".into(), - "0000000000000000000000000000000000000000000000000000000000000008".into(), - "0000000000000000000000000000000000000000000000000000000000000009".into(), - "0000000000000000000000000000000000000000000000000000000000000010".into(), + H256::from(0), + H256::from(1), + H256::from(2), + H256::from(3), + H256::from(4), + H256::from(5), + H256::from(6), + H256::from(7), + H256::from(8), + H256::from(9), + H256::from(10), ]); chain.request_blocks_hashes(10); chain.verify_blocks_hashes(10); - assert_eq!(chain.best_block_locator_hashes()[0], "0000000000000000000000000000000000000000000000000000000000000010".into()); assert_eq!(chain.block_locator_hashes(), vec![ - "0000000000000000000000000000000000000000000000000000000000000010".into(), - "0000000000000000000000000000000000000000000000000000000000000009".into(), - "0000000000000000000000000000000000000000000000000000000000000008".into(), - "0000000000000000000000000000000000000000000000000000000000000007".into(), - "0000000000000000000000000000000000000000000000000000000000000006".into(), - "0000000000000000000000000000000000000000000000000000000000000005".into(), - "0000000000000000000000000000000000000000000000000000000000000004".into(), - "0000000000000000000000000000000000000000000000000000000000000003".into(), - "0000000000000000000000000000000000000000000000000000000000000002".into(), - "0000000000000000000000000000000000000000000000000000000000000001".into(), + H256::from(10), + H256::from(9), + H256::from(8), + H256::from(7), + H256::from(6), + H256::from(5), + H256::from(4), + H256::from(3), + H256::from(2), + H256::from(1), block2_hash.clone(), genesis_hash.clone(), ]); chain.schedule_blocks_hashes(vec![ - "0000000000000000000000000000000000000000000000000000000000000011".into(), - "0000000000000000000000000000000000000000000000000000000000000012".into(), - "0000000000000000000000000000000000000000000000000000000000000013".into(), - "0000000000000000000000000000000000000000000000000000000000000014".into(), - "0000000000000000000000000000000000000000000000000000000000000015".into(), - "0000000000000000000000000000000000000000000000000000000000000016".into(), + H256::from(11), + H256::from(12), + H256::from(13), + H256::from(14), + H256::from(15), + H256::from(16), ]); chain.request_blocks_hashes(10); - assert_eq!(chain.best_block_locator_hashes()[0], "0000000000000000000000000000000000000000000000000000000000000014".into()); assert_eq!(chain.block_locator_hashes(), vec![ - "0000000000000000000000000000000000000000000000000000000000000016".into(), - "0000000000000000000000000000000000000000000000000000000000000015".into(), - "0000000000000000000000000000000000000000000000000000000000000014".into(), - "0000000000000000000000000000000000000000000000000000000000000013".into(), - "0000000000000000000000000000000000000000000000000000000000000012".into(), - "0000000000000000000000000000000000000000000000000000000000000011".into(), - "0000000000000000000000000000000000000000000000000000000000000010".into(), - "0000000000000000000000000000000000000000000000000000000000000009".into(), - "0000000000000000000000000000000000000000000000000000000000000008".into(), - "0000000000000000000000000000000000000000000000000000000000000007".into(), - "0000000000000000000000000000000000000000000000000000000000000005".into(), - "0000000000000000000000000000000000000000000000000000000000000001".into(), + H256::from(16), + H256::from(15), + H256::from(14), + H256::from(13), + H256::from(12), + H256::from(11), + H256::from(10), + H256::from(9), + H256::from(8), + H256::from(7), + H256::from(5), + H256::from(1), genesis_hash.clone(), ]); chain.schedule_blocks_hashes(vec![ - "0000000000000000000000000000000000000000000000000000000000000020".into(), - "0000000000000000000000000000000000000000000000000000000000000021".into(), - "0000000000000000000000000000000000000000000000000000000000000022".into(), + H256::from(20), + H256::from(21), + H256::from(22), ]); - assert_eq!(chain.best_block_locator_hashes()[0], "0000000000000000000000000000000000000000000000000000000000000020".into()); assert_eq!(chain.block_locator_hashes(), vec![ - "0000000000000000000000000000000000000000000000000000000000000022".into(), - "0000000000000000000000000000000000000000000000000000000000000021".into(), - "0000000000000000000000000000000000000000000000000000000000000020".into(), - "0000000000000000000000000000000000000000000000000000000000000016".into(), - "0000000000000000000000000000000000000000000000000000000000000015".into(), - "0000000000000000000000000000000000000000000000000000000000000014".into(), - "0000000000000000000000000000000000000000000000000000000000000013".into(), - "0000000000000000000000000000000000000000000000000000000000000012".into(), - "0000000000000000000000000000000000000000000000000000000000000011".into(), - "0000000000000000000000000000000000000000000000000000000000000010".into(), - "0000000000000000000000000000000000000000000000000000000000000008".into(), - "0000000000000000000000000000000000000000000000000000000000000004".into(), + H256::from(22), + H256::from(21), + H256::from(20), + H256::from(16), + H256::from(15), + H256::from(14), + H256::from(13), + H256::from(12), + H256::from(11), + H256::from(10), + H256::from(8), + H256::from(4), genesis_hash.clone(), ]); } + + #[test] + fn chain_intersect_with_inventory() { + let mut chain = Chain::new(Arc::new(db::TestStorage::with_genesis_block())); + // append 2 db blocks + chain.insert_best_block(test_data::block_h1()).expect("Error inserting new block"); + chain.insert_best_block(test_data::block_h2()).expect("Error inserting new block"); + // append 3 verifying blocks + chain.schedule_blocks_hashes(vec![ + H256::from(0), + H256::from(1), + H256::from(2), + ]); + chain.request_blocks_hashes(3); + chain.verify_blocks_hashes(3); + // append 3 requested blocks + chain.schedule_blocks_hashes(vec![ + H256::from(10), + H256::from(11), + H256::from(12), + ]); + chain.request_blocks_hashes(10); + // append 3 scheduled blocks + chain.schedule_blocks_hashes(vec![ + H256::from(20), + H256::from(21), + H256::from(22), + ]); + + assert_eq!(chain.intersect_with_inventory(&vec![ + H256::from(30), + H256::from(31), + ]), InventoryIntersection::NoKnownBlocks(0)); + + assert_eq!(chain.intersect_with_inventory(&vec![ + H256::from(2), + H256::from(10), + H256::from(11), + H256::from(12), + H256::from(20), + ]), InventoryIntersection::InMemoryNoNewBlocks); + + assert_eq!(chain.intersect_with_inventory(&vec![ + H256::from(21), + H256::from(22), + H256::from(30), + H256::from(31), + ]), InventoryIntersection::InMemoryMainNewBlocks(2)); + + assert_eq!(chain.intersect_with_inventory(&vec![ + H256::from(20), + H256::from(21), + H256::from(30), + H256::from(31), + ]), InventoryIntersection::InMemoryForkNewBlocks(2)); + + assert_eq!(chain.intersect_with_inventory(&vec![ + test_data::block_h1().hash(), + test_data::block_h2().hash(), + ]), InventoryIntersection::DbAllBlocksKnown); + + assert_eq!(chain.intersect_with_inventory(&vec![ + test_data::block_h2().hash(), + H256::from(30), + H256::from(31), + ]), InventoryIntersection::DbForkNewBlocks(1)); + } } diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index a5ab582d..4e613fad 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -1,7 +1,7 @@ use std::thread; use std::sync::Arc; use std::cmp::{min, max}; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::collections::hash_map::Entry; use std::sync::mpsc::{channel, Sender, Receiver}; use parking_lot::Mutex; @@ -12,10 +12,9 @@ use futures_cpupool::CpuPool; use db; use chain::{Block, RepresentH256}; use primitives::hash::H256; -use hash_queue::HashPosition; use synchronization_peers::Peers; #[cfg(test)] use synchronization_peers::{Information as PeersInformation}; -use synchronization_chain::{ChainRef, BlockState}; +use synchronization_chain::{ChainRef, BlockState, InventoryIntersection}; #[cfg(test)] use synchronization_chain::{Information as ChainInformation}; use verification::{ChainVerifier, Error as VerificationError, Verify}; @@ -39,20 +38,21 @@ use std::time::Duration; ///! 2.4) if !inventory_rest.is_empty(): ===> has new unknown blocks in inventory ///! 2.4.1) queue_rest = queue after intersection ///! 2.4.2) if queue_rest.is_empty(): ===> has new unknown blocks in inventory, no fork +///! 2.4.2.1) scheduled_blocks.append(inventory_rest) +///! 2.4.2.2) stop (2.4.2) +///! 2.4.3) if !queue_rest.is_empty(): ===> has new unknown blocks in inventory, fork ///! 2.4.3.1) scheduled_blocks.append(inventory_rest) ///! 2.4.3.2) stop (2.4.3) -///! 2.4.4) if !queue_rest.is_empty(): ===> has new unknown blocks in inventory, fork -///! 2.4.4.1) scheduled_blocks.append(inventory_rest) -///! 2.4.4.2) stop (2.4.4) -///! 2.4.5) stop (2.4) +///! 2.4.3) stop (2.4) ///! 2.5) stop (2) ///! 3) if queue_intersection.is_empty(): ===> responded with out-of-sync-window blocks ///! 3.1) last_known_block = inventory.last(b => b.is_known()) ///! 3.2) if last_known_block == None: ===> we know nothing about these blocks & we haven't asked for these -///! 3.2.1) peer will be excluded later by management thread +///! 3.2.1) if !synchronizing => remember peer as useful + ask for blocks +///! 3.2.1) if synchronizing => peer will be excluded later by management thread ///! 3.2.2) stop (3.2) ///! 3.3) if last_known_block == last(inventory): ===> responded with all-known-blocks -///! 3.3.1) remember peer as useful (possibly had failures before && have been excluded from sync) +///! 3.3.1) if syncing, remember peer as useful (possibly had failures before && have been excluded from sync) ///! 3.3.2) stop (3.3) ///! 3.4) if last_known_block in the middle of inventory: ===> responded with forked blocks ///! 3.4.1) remember peer as useful @@ -62,24 +62,40 @@ use std::time::Duration; ///! 3.5) stop (3) ///! ///! on_peer_block: After receiving `block` message: -///! 1) if block_state(block) in (Scheduled, Verifying, Stored): ===> late delivery +///! 1) if block_state(block) in (Verifying, Stored): ===> late delivery ///! 1.1) remember peer as useful ///! 1.2) stop (1) -///! 2) if block_state(block) == Requested: ===> on-time delivery +///! 2) if block_state(block) in (Scheduled, Requested): ===> future/on-time delivery ///! 2.1) remember peer as useful -///! 2.2) move block from requested to verifying queue -///! 2.2) queue verification().and_then(insert).or_else(reset_sync) -///! 2.3) stop (2) +///! 2.2) if block_state(block.parent) in (Verifying, Stored): ===> we can proceed with verification +///! 2.2.1) remove block from current queue (Verifying || Stored) +///! 2.2.2) append block to the verification queue +///! 2.2.3) queue verification().and_then(on_block_verification_success).or_else(on_block_verification_error) +///! 2.2.4) try to verify orphan blocks +///! 2.2.5) stop (2.2) +///! 2.3) if block_state(block.parent) in (Requested, Scheduled): ===> we have found an orphan block +///! 2.3.1) remove block from current queue (Verifying || Stored) +///! 2.3.2) append block to the orphans +///! 2.3.3) stop (2.3) +///! 2.4) if block_state(block.parent) == Unknown: ===> bad block found +///! 2.4.1) remove block from current queue (Verifying || Stored) +///! 2.4.2) stop (2.4) +///! 2.5) stop (2) ///! 3) if block_state(block) == Unknown: ===> maybe we are on-top of chain && new block is announced? ///! 3.1) if block_state(block.parent_hash) == Unknown: ===> we do not know parent ///! 3.1.1) ignore this block ///! 3.1.2) stop (3.1) -///! 3.2) if block_state(block.parent_hash) != Unknown: ===> fork found +///! 3.2) if block_state(block.parent_hash) in (Verifying, Stored): ===> fork found, can verify ///! 3.2.1) ask peer for best inventory (after this block) ///! 3.2.2) append block to verifying queue -///! 3.2.3) queue verification().and_then(insert).or_else(reset_sync) +///! 3.2.3) queue verification().and_then(on_block_verification_success).or_else(on_block_verification_error) ///! 3.2.4) stop (3.2) -///! 2.3) stop (2) +///! 3.3) if block_state(block.parent_hash) in (Requested, Scheduled): ===> fork found, add as orphan +///! 3.3.1) ask peer for best inventory (after this block) +///! 3.3.2) append block to orphan +///! 3.3.3) stop (3.3) +///! 3.4) stop (2) +///! + if no blocks left in scheduled + requested queue => we are saturated => ask all peers for inventory & forget ///! ///! execute_synchronization_tasks: After receiving `inventory` message OR receiving `block` message OR when management thread schedules tasks: ///! 1) if there are blocks in `scheduled` queue AND we can fit more blocks into memory: ===> ask for blocks @@ -113,7 +129,8 @@ use std::time::Duration; ///! ///! on_block_verification_error: When verification completes with an error: ///! 1) remove block from verification queue -///! 2) remove all known children from all queues [so that new `block` messages will be ignored in on_peer_block.3.1.1] +///! 2) remove all known children from all queues [so that new `block` messages will be ignored in on_peer_block.3.1.1] (TODO: not implemented currently!!!) +///! ///! /// Approximate maximal number of blocks hashes in scheduled queue. @@ -162,7 +179,6 @@ pub trait Client : Send + 'static { fn on_new_blocks_inventory(&mut self, peer_index: usize, peer_hashes: Vec); fn on_peer_block(&mut self, peer_index: usize, block: Block); fn on_peer_disconnected(&mut self, peer_index: usize); - fn reset(&mut self, is_hard: bool); fn on_block_verification_success(&mut self, block: Block); fn on_block_verification_error(&mut self, err: &VerificationError, hash: &H256); } @@ -190,7 +206,7 @@ pub struct SynchronizationClient { /// Chain reference. chain: ChainRef, /// Blocks from requested_hashes, but received out-of-order. - orphaned_blocks: HashMap, + orphaned_blocks: HashMap>, /// Verification work transmission channel. verification_work_sender: Option>, /// Verification thread. @@ -218,10 +234,11 @@ impl State { impl Drop for SynchronizationClient where T: TaskExecutor { fn drop(&mut self) { if let Some(join_handle) = self.verification_worker_thread.take() { - self.verification_work_sender + // ignore send error here <= destructing anyway + let _ = self.verification_work_sender .take() .expect("Some(join_handle) => Some(verification_work_sender)") - .send(VerificationTask::Stop).expect("TODO"); + .send(VerificationTask::Stop); join_handle.join().expect("Clean shutdown."); } } @@ -246,45 +263,25 @@ impl Client for SynchronizationClient where T: TaskExecutor { // update peers to select next tasks self.peers.on_block_received(peer_index, &block_hash); - self.process_peer_block(block_hash, block); + self.process_peer_block(peer_index, block_hash, block); self.execute_synchronization_tasks(); } /// Peer disconnected. fn on_peer_disconnected(&mut self, peer_index: usize) { - self.peers.on_peer_disconnected(peer_index); - // when last peer is disconnected, reset, but let verifying blocks be verified - self.reset(false); - } - - /// Reset synchronization process - fn reset(&mut self, is_hard: bool) { - self.peers.reset(); - self.orphaned_blocks.clear(); - // TODO: reset verification queue - - let mut chain = self.chain.write(); - chain.remove_blocks_with_state(BlockState::Requested); - chain.remove_blocks_with_state(BlockState::Scheduled); - if is_hard { - self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number); - chain.remove_blocks_with_state(BlockState::Verifying); - warn!(target: "sync", "Synchronization process restarting from block {:?}", chain.best_block()); - } - else { - self.state = State::Saturated; + if self.peers.on_peer_disconnected(peer_index) { + self.switch_to_saturated_state(false); } } /// Process successful block verification fn on_block_verification_success(&mut self, block: Block) { { - let hash = block.hash(); let mut chain = self.chain.write(); - // remove from verifying queue - assert_eq!(chain.remove_block_with_state(&hash, BlockState::Verifying), HashPosition::Front); + // remove block from verification queue + chain.remove_block_with_state(&block.hash(), BlockState::Verifying); // insert to storage chain.insert_best_block(block) @@ -299,8 +296,12 @@ impl Client for SynchronizationClient where T: TaskExecutor { fn on_block_verification_error(&mut self, err: &VerificationError, hash: &H256) { warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash, err); - // reset synchronization process - self.reset(true); + { + let mut chain = self.chain.write(); + + // remove block from verification queue + chain.remove_block_with_state(&hash, BlockState::Verifying); + } // start new tasks self.execute_synchronization_tasks(); @@ -351,8 +352,10 @@ impl SynchronizationClient where T: TaskExecutor { None => return Ok(()), }; let mut client = client.lock(); - manage_synchronization_peers(&mut client.peers); - client.execute_synchronization_tasks(); + if client.state.is_synchronizing() { + manage_synchronization_peers(&mut client.peers); + client.execute_synchronization_tasks(); + } Ok(()) }) .for_each(|_| Ok(())) @@ -376,144 +379,101 @@ impl SynchronizationClient where T: TaskExecutor { } /// Process new blocks inventory - fn process_new_blocks_inventory(&mut self, peer_index: usize, mut peer_hashes: Vec) { - // | requested | QUEUED | - // --- [1] - // --- [2] + - // --- [3] + - // --- [4] - // -+- [5] + - // -+- [6] + - // -+- [7] + - // ---+---------+--- [8] + - // ---+--------+--- [9] + - // ---+---------+--------+--- [10] - + fn process_new_blocks_inventory(&mut self, peer_index: usize, mut inventory: Vec) { let mut chain = self.chain.write(); - - 'outer: loop { - // when synchronization is idling - // => request full inventory - if !chain.has_blocks_of_state(BlockState::Scheduled) - && !chain.has_blocks_of_state(BlockState::Requested) { - let unknown_blocks: Vec<_> = peer_hashes.into_iter() - .filter(|hash| chain.block_has_state(&hash, BlockState::Unknown)) - .collect(); - - // no new blocks => no need to switch to the synchronizing state - if unknown_blocks.is_empty() { - return; - } - - chain.schedule_blocks_hashes(unknown_blocks); - self.peers.insert(peer_index); - break; - } - - // cases: [2], [5], [6], [8] - // if last block from peer_hashes is in window { requested_hashes + queued_hashes } - // => no new blocks for synchronization, but we will use this peer in synchronization - let peer_hashes_len = peer_hashes.len(); - if chain.block_has_state(&peer_hashes[peer_hashes_len - 1], BlockState::Scheduled) - || chain.block_has_state(&peer_hashes[peer_hashes_len - 1], BlockState::Requested) { - self.peers.insert(peer_index); - return; - } - - // cases: [1], [3], [4], [7], [9], [10] - // try to find new blocks for synchronization from inventory - let mut last_known_peer_hash_index = peer_hashes_len - 1; - loop { - if chain.block_state(&peer_hashes[last_known_peer_hash_index]) != BlockState::Unknown { - // we have found first block which is known to us - // => blocks in range [(last_known_peer_hash_index + 1)..peer_hashes_len] are unknown - // && must be scheduled for request - let unknown_peer_hashes = peer_hashes.split_off(last_known_peer_hash_index + 1); - - chain.schedule_blocks_hashes(unknown_peer_hashes); + match chain.intersect_with_inventory(&inventory) { + InventoryIntersection::NoKnownBlocks(_) if self.state.is_synchronizing() => (), + InventoryIntersection::DbAllBlocksKnown => { + if self.state.is_synchronizing() { + // remember peer as useful self.peers.insert(peer_index); - break 'outer; } - - if last_known_peer_hash_index == 0 { - // either these are blocks from the future or blocks from the past - // => TODO: ignore this peer during synchronization - return; + }, + InventoryIntersection::InMemoryNoNewBlocks => { + // remember peer as useful + self.peers.insert(peer_index); + }, + InventoryIntersection::InMemoryMainNewBlocks(new_block_index) + | InventoryIntersection::InMemoryForkNewBlocks(new_block_index) + | InventoryIntersection::DbForkNewBlocks(new_block_index) + | InventoryIntersection::NoKnownBlocks(new_block_index) => { + // schedule new blocks + let new_blocks_hashes = inventory.split_off(new_block_index); + chain.schedule_blocks_hashes(new_blocks_hashes); + // remember peer as useful + self.peers.insert(peer_index); + // switch to synchronization state + if !self.state.is_synchronizing() { + self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number); } - last_known_peer_hash_index -= 1; } } - - // move to synchronizing state - if !self.state.is_synchronizing() { - self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number); - } } /// Process new peer block - fn process_peer_block(&mut self, block_hash: H256, block: Block) { - // this block is not requested for synchronization - let mut chain = self.chain.write(); - let block_position = chain.remove_block_with_state(&block_hash, BlockState::Requested); - if block_position == HashPosition::Missing { - return; - } + fn process_peer_block(&mut self, peer_index: usize, block_hash: H256, block: Block) { + let switch_to_saturated = { + let mut chain = self.chain.write(); + match chain.block_state(&block_hash) { + BlockState::Verifying | BlockState::Stored => { + // remember peer as useful + self.peers.insert(peer_index); + }, + BlockState::Unknown | BlockState::Scheduled | BlockState::Requested => { + // remove block from current queue + chain.remove_block(&block_hash); + // check parent block state + match chain.block_state(&block.block_header.previous_header_hash) { + BlockState::Unknown => (), + BlockState::Verifying | BlockState::Stored => { + // remember peer as useful + self.peers.insert(peer_index); + // schedule verification + let mut blocks: VecDeque<(H256, Block)> = VecDeque::new(); + blocks.push_back((block_hash, block)); + while let Some((block_hash, block)) = blocks.pop_front() { + // queue block for verification + chain.remove_block(&block_hash); - // requested block is received => move to saturated state if there are no more blocks - if !chain.has_blocks_of_state(BlockState::Scheduled) - && !chain.has_blocks_of_state(BlockState::Requested) - && !chain.has_blocks_of_state(BlockState::Verifying) { - self.state = State::Saturated; - } + match self.verification_work_sender { + Some(ref verification_work_sender) => { + chain.verify_block_hash(block_hash.clone()); + verification_work_sender + .send(VerificationTask::VerifyBlock(block)) + .expect("Verification thread have the same lifetime as `Synchronization`") + }, + None => chain.insert_best_block(block) + .expect("Error inserting to db."), + } - // check if this block is next block in the blockchain - if block_position == HashPosition::Front { - // check if this parent of this block is current best_block - let expecting_previous_header_hash = chain.best_block_of_state(BlockState::Verifying) - .unwrap_or_else(|| { - chain.best_block_of_state(BlockState::Stored) - .expect("storage with genesis block is required") - }).hash; - if block.block_header.previous_header_hash != expecting_previous_header_hash { - // TODO: penalize peer - warn!(target: "sync", "Out-of-order block {:?} was dropped. Expecting block with parent hash {:?}", block_hash, expecting_previous_header_hash); - return; - } - - // this is next block in the blockchain => queue for verification - // also unwrap all dependent orphan blocks - let mut current_block = block; - let mut current_block_hash = block_hash; - loop { - match self.verification_work_sender { - Some(ref verification_work_sender) => { - chain.verify_block_hash(current_block_hash.clone()); - verification_work_sender - .send(VerificationTask::VerifyBlock(current_block)) - .expect("Verification thread have the same lifetime as `Synchronization`"); - }, - None => { - chain.insert_best_block(current_block) - .expect("Error inserting to db."); + // process orphan blocks + if let Entry::Occupied(entry) = self.orphaned_blocks.entry(block_hash) { + let (_, orphaned) = entry.remove_entry(); + blocks.extend(orphaned); + } + } + }, + BlockState::Requested | BlockState::Scheduled => { + // remember peer as useful + self.peers.insert(peer_index); + // remember as orphan block + self.orphaned_blocks + .entry(block.block_header.previous_header_hash.clone()) + .or_insert(Vec::new()) + .push((block_hash, block)) + } } - }; - - // proceed to the next orphaned block - if let Entry::Occupied(orphaned_block_entry) = self.orphaned_blocks.entry(current_block_hash) { - let (_, orphaned_block) = orphaned_block_entry.remove_entry(); - current_block = orphaned_block; - current_block_hash = current_block.hash(); - } - else { - break; - } + }, } - return; - } + // requested block is received => move to saturated state if there are no more blocks + chain.length_of_state(BlockState::Scheduled) == 0 + && chain.length_of_state(BlockState::Requested) == 0 + }; - // this block is not the next one => mark it as orphaned - self.orphaned_blocks.insert(block.block_header.previous_header_hash.clone(), block); + if switch_to_saturated { + self.switch_to_saturated_state(true); + } } /// Schedule new synchronization tasks, if any. @@ -540,18 +500,11 @@ impl SynchronizationClient where T: TaskExecutor { } } - // TODO: instead of issuing duplicated inventory requests, wait until enough new blocks are verified, then issue // check if we can query some blocks hashes let scheduled_hashes_len = chain.length_of_state(BlockState::Scheduled); if scheduled_hashes_len < MAX_SCHEDULED_HASHES { - if self.state.is_synchronizing() { - tasks.push(Task::RequestBestInventory(idle_peers[0])); - self.peers.on_inventory_requested(idle_peers[0]); - } - else { - tasks.push(Task::RequestInventory(idle_peers[0])); - self.peers.on_inventory_requested(idle_peers[0]); - } + tasks.push(Task::RequestInventory(idle_peers[0])); + self.peers.on_inventory_requested(idle_peers[0]); } // check if we can move some blocks from scheduled to requested queue @@ -578,6 +531,27 @@ impl SynchronizationClient where T: TaskExecutor { } } + /// Switch to saturated state + fn switch_to_saturated_state(&mut self, ask_for_inventory: bool) { + self.state = State::Saturated; + self.orphaned_blocks.clear(); + self.peers.reset(); + + { + let mut chain = self.chain.write(); + chain.remove_blocks_with_state(BlockState::Requested); + chain.remove_blocks_with_state(BlockState::Scheduled); + } + + if ask_for_inventory { + let mut executor = self.executor.lock(); + for idle_peer in self.peers.idle_peers() { + self.peers.on_inventory_requested(idle_peer); + executor.execute(Task::RequestInventory(idle_peer)); + } + } + } + /// Thread procedure for handling verification tasks fn verification_worker_proc(sync: Arc>, storage: Arc, work_receiver: Receiver) { let verifier = ChainVerifier::new(storage); @@ -612,6 +586,7 @@ pub mod tests { use p2p::event_loop; use test_data; use db; + use primitives::hash::H256; fn create_sync() -> (Core, Handle, Arc>, Arc>>) { let event_loop = event_loop(); @@ -638,13 +613,13 @@ pub mod tests { let (_, _, executor, sync) = create_sync(); let mut sync = sync.lock(); - let block1: Block = "010000006fe28c0ab6f1b372c1a6a246ae63f74f931e8365e15a089c68d6190000000000982051fd1e4ba744bbbe680e1fee14677ba1a3c3540bf7b1cdb606e857233e0e61bc6649ffff001d01e362990101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000".into(); - let block2: Block = "010000004860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000d5fdcc541e25de1c7a5addedf24858b8bb665c9f36ef744ee42c316022c90f9bb0bc6649ffff001d08d2bd610101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d010bffffffff0100f2052a010000004341047211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073dee6c89064984f03385237d92167c13e236446b417ab79a0fcae412ae3316b77ac00000000".into(); + let block1: Block = test_data::block_h1(); + let block2: Block = test_data::block_h2(); sync.on_new_blocks_inventory(5, vec![block1.hash()]); let tasks = executor.lock().take_tasks(); assert_eq!(tasks.len(), 2); - assert_eq!(tasks[0], Task::RequestBestInventory(5)); + assert_eq!(tasks[0], Task::RequestInventory(5)); assert_eq!(tasks[1], Task::RequestBlocks(5, vec![block1.hash()])); assert!(sync.information().state.is_synchronizing()); assert_eq!(sync.information().orphaned, 0); @@ -654,23 +629,23 @@ pub mod tests { assert_eq!(sync.information().peers.idle, 0); assert_eq!(sync.information().peers.active, 1); - // push unknown block => nothing should change + // push unknown block => will be queued as orphan sync.on_peer_block(5, block2); assert!(sync.information().state.is_synchronizing()); - assert_eq!(sync.information().orphaned, 0); + assert_eq!(sync.information().orphaned, 1); assert_eq!(sync.information().chain.scheduled, 0); assert_eq!(sync.information().chain.requested, 1); assert_eq!(sync.information().chain.stored, 1); assert_eq!(sync.information().peers.idle, 0); assert_eq!(sync.information().peers.active, 1); - // push requested block => should be moved to the test storage + // push requested block => should be moved to the test storage && orphan should be moved sync.on_peer_block(5, block1); assert!(!sync.information().state.is_synchronizing()); assert_eq!(sync.information().orphaned, 0); assert_eq!(sync.information().chain.scheduled, 0); assert_eq!(sync.information().chain.requested, 0); - assert_eq!(sync.information().chain.stored, 2); + assert_eq!(sync.information().chain.stored, 3); // we have just requested new `inventory` from the peer => peer is forgotten assert_eq!(sync.information().peers.idle, 0); assert_eq!(sync.information().peers.active, 0); @@ -681,7 +656,7 @@ pub mod tests { let (_, _, _, sync) = create_sync(); let mut sync = sync.lock(); - let block2: Block = "010000004860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000d5fdcc541e25de1c7a5addedf24858b8bb665c9f36ef744ee42c316022c90f9bb0bc6649ffff001d08d2bd610101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d010bffffffff0100f2052a010000004341047211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073dee6c89064984f03385237d92167c13e236446b417ab79a0fcae412ae3316b77ac00000000".into(); + let block2: Block = test_data::block_h169(); sync.on_new_blocks_inventory(5, vec![block2.hash()]); sync.on_peer_block(5, block2); @@ -702,8 +677,8 @@ pub mod tests { fn synchronization_parallel_peers() { let (_, _, executor, sync) = create_sync(); - let block1: Block = "010000006fe28c0ab6f1b372c1a6a246ae63f74f931e8365e15a089c68d6190000000000982051fd1e4ba744bbbe680e1fee14677ba1a3c3540bf7b1cdb606e857233e0e61bc6649ffff001d01e362990101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000".into(); - let block2: Block = "010000004860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000d5fdcc541e25de1c7a5addedf24858b8bb665c9f36ef744ee42c316022c90f9bb0bc6649ffff001d08d2bd610101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d010bffffffff0100f2052a010000004341047211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073dee6c89064984f03385237d92167c13e236446b417ab79a0fcae412ae3316b77ac00000000".into(); + let block1: Block = test_data::block_h1(); + let block2: Block = test_data::block_h2(); { let mut sync = sync.lock(); @@ -715,7 +690,7 @@ pub mod tests { // synchronization has started && new blocks have been requested let tasks = executor.lock().take_tasks(); assert!(sync.information().state.is_synchronizing()); - assert_eq!(tasks, vec![Task::RequestBestInventory(1), Task::RequestBlocks(1, vec![block1.hash()])]); + assert_eq!(tasks, vec![Task::RequestInventory(1), Task::RequestBlocks(1, vec![block1.hash()])]); } { @@ -726,7 +701,7 @@ pub mod tests { // synchronization has started && new blocks have been requested let tasks = executor.lock().take_tasks(); assert!(sync.information().state.is_synchronizing()); - assert_eq!(tasks, vec![Task::RequestBestInventory(2), Task::RequestBlocks(2, vec![block2.hash()])]); + assert_eq!(tasks, vec![Task::RequestInventory(2), Task::RequestBlocks(2, vec![block2.hash()])]); } { @@ -750,7 +725,7 @@ pub mod tests { // request new blocks { let mut sync = sync.lock(); - sync.on_new_blocks_inventory(1, vec!["0000000000000000000000000000000000000000000000000000000000000000".into()]); + sync.on_new_blocks_inventory(1, vec![H256::from(0)]); assert!(sync.information().state.is_synchronizing()); } @@ -774,4 +749,21 @@ pub mod tests { let tasks = executor.lock().take_tasks(); assert_eq!(tasks, vec![]); } + + #[test] + fn synchronization_asks_for_inventory_after_saturating() { + let (_, _, executor, sync) = create_sync(); + let mut sync = sync.lock(); + let block = test_data::block_h1(); + let block_hash = block.hash(); + sync.on_new_blocks_inventory(1, vec![block_hash.clone()]); + sync.on_new_blocks_inventory(2, vec![block_hash.clone()]); + executor.lock().take_tasks(); + sync.on_peer_block(2, block); + + let tasks = executor.lock().take_tasks(); + assert_eq!(tasks.len(), 2); + assert!(tasks.iter().any(|t| t == &Task::RequestInventory(1))); + assert!(tasks.iter().any(|t| t == &Task::RequestInventory(2))); + } } diff --git a/sync/src/synchronization_executor.rs b/sync/src/synchronization_executor.rs index 5b35cca6..e80d3054 100644 --- a/sync/src/synchronization_executor.rs +++ b/sync/src/synchronization_executor.rs @@ -23,8 +23,6 @@ pub enum Task { RequestBlocks(usize, Vec), /// Request full inventory using block_locator_hashes. RequestInventory(usize), - /// Request inventory using best block locator only. - RequestBestInventory(usize), /// Send block. SendBlock(usize, Block), /// Send notfound @@ -96,20 +94,6 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { connection.send_getblocks(&getblocks); } }, - Task::RequestBestInventory(peer_index) => { - let block_locator_hashes = self.chain.read().best_block_locator_hashes(); - let getblocks = types::GetBlocks { - version: 0, - block_locator_hashes: block_locator_hashes, - hash_stop: H256::default(), - }; - - if let Some(connection) = self.peers.get_mut(&peer_index) { - let connection = &mut *connection; - trace!(target: "sync", "Querying best inventory from peer#{}", peer_index); - connection.send_getblocks(&getblocks); - } - }, Task::SendBlock(peer_index, block) => { let block_message = types::Block { block: block, diff --git a/sync/src/synchronization_peers.rs b/sync/src/synchronization_peers.rs index cb296916..e5ea086f 100644 --- a/sync/src/synchronization_peers.rs +++ b/sync/src/synchronization_peers.rs @@ -73,11 +73,12 @@ impl Peers { } /// Peer has been disconnected - pub fn on_peer_disconnected(&mut self, peer_index: usize) { + pub fn on_peer_disconnected(&mut self, peer_index: usize) -> bool { self.idle.remove(&peer_index); self.requests.remove(&peer_index); self.failures.remove(&peer_index); self.times.remove(&peer_index); + (self.idle.len() + self.requests.len()) == 0 } /// Block is received from peer. diff --git a/sync/src/synchronization_server.rs b/sync/src/synchronization_server.rs index 5fccf341..10deee56 100644 --- a/sync/src/synchronization_server.rs +++ b/sync/src/synchronization_server.rs @@ -62,9 +62,9 @@ impl SynchronizationServer { fn locate_known_block(&self, block_locator_hashes: Vec) -> Option { let chain = self.chain.read(); + let storage = chain.storage(); block_locator_hashes.into_iter() - .filter_map(|hash| chain - .storage_block_number(&hash) + .filter_map(|hash| storage.block_number(&hash) .map(|number| db::BestBlock { number: number, hash: hash, @@ -93,15 +93,19 @@ impl SynchronizationServer { (peer_index, ServerTask::ServeGetData(inventory)) => { let mut unknown_items: Vec = Vec::new(); let mut new_tasks: Vec = Vec::new(); - for item in inventory { - match item.inv_type { - InventoryType::MessageBlock => { - match chain.read().storage_block_number(&item.hash) { - Some(_) => new_tasks.push(ServerTask::ReturnBlock(item.hash.clone())), - None => unknown_items.push(item), - } - }, - _ => (), // TODO: process other inventory types + { + let chain = chain.read(); + let storage = chain.storage(); + for item in inventory { + match item.inv_type { + InventoryType::MessageBlock => { + match storage.block_number(&item.hash) { + Some(_) => new_tasks.push(ServerTask::ReturnBlock(item.hash.clone())), + None => unknown_items.push(item), + } + }, + _ => (), // TODO: process other inventory types + } } } // respond with `notfound` message for unknown data @@ -129,10 +133,17 @@ impl SynchronizationServer { }, // `getheaders` => `headers` (peer_index, ServerTask::ServeGetHeaders(best_block, hash_stop)) => { + // What if we have no common blocks with peer at all? Maybe drop connection or penalize peer? + // https://github.com/ethcore/parity-bitcoin/pull/91#discussion_r86734568 let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_block, &hash_stop, 2000); if !blocks_hashes.is_empty() { trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_hashes.len(), peer_index); - let blocks_headers = blocks_hashes.into_iter().filter_map(|hash| chain.read().storage_block(&hash).map(|block| block.block_header)).collect(); + let chain = chain.read(); + let storage = chain.storage(); + // TODO: read block_header only + let blocks_headers = blocks_hashes.into_iter() + .filter_map(|hash| storage.block(db::BlockRef::Hash(hash)).map(|block| block.block_header)) + .collect(); executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers)); } }, @@ -158,10 +169,9 @@ impl SynchronizationServer { }, // `block` (peer_index, ServerTask::ReturnBlock(block_hash)) => { - let storage_block = chain.read().storage_block(&block_hash); - if let Some(storage_block) = storage_block { - executor.lock().execute(Task::SendBlock(peer_index, storage_block)); - } + let block = chain.read().storage().block(db::BlockRef::Hash(block_hash)) + .expect("we have checked that block exists in ServeGetData; db is append-only; qed"); + executor.lock().execute(Task::SendBlock(peer_index, block)); }, }, // no tasks after wake-up => stopping @@ -172,7 +182,9 @@ impl SynchronizationServer { fn blocks_hashes_after(chain: &ChainRef, best_block: &db::BestBlock, hash_stop: &H256, max_hashes: u32) -> Vec { let mut hashes: Vec = Vec::new(); - let storage_block_hash = chain.read().storage_block_hash(best_block.number); + let chain = chain.read(); + let storage = chain.storage(); + let storage_block_hash = storage.block_hash(best_block.number); if let Some(hash) = storage_block_hash { // check that chain has not reorganized since task was queued if hash == best_block.hash { @@ -180,7 +192,7 @@ impl SynchronizationServer { let last_block_number = first_block_number + max_hashes; // `max_hashes` hashes after best_block.number OR hash_stop OR blockchain end for block_number in first_block_number..last_block_number { - match chain.read().storage_block_hash(block_number) { + match storage.block_hash(block_number) { Some(ref block_hash) if block_hash == hash_stop => break, None => break, Some(block_hash) => {