Merge branch 'master' into reorg

This commit is contained in:
NikVolf 2016-11-07 21:05:38 +03:00
commit 631842f0e4
12 changed files with 550 additions and 502 deletions

View File

@ -389,7 +389,7 @@ impl Storage {
self.read_meta_u32(KEY_BEST_BLOCK_NUMBER)
}
fn best_hash(&self) -> Option<H256> {
fn _best_hash(&self) -> Option<H256> {
self.get(COL_META, KEY_BEST_BLOCK_HASH).map(|val| H256::from(&**val))
}

View File

@ -2,6 +2,7 @@
//! https://www.anintegratedworld.com/unravelling-the-mysterious-block-chain-magic-number/
use ser::{Stream, Serializable};
use chain::Block;
use Error;
const MAGIC_MAINNET: u32 = 0xD9B4BEF9;
@ -53,6 +54,14 @@ impl Magic {
Magic::Regtest => 18443,
}
}
pub fn genesis_block(&self) -> Block {
match *self {
Magic::Mainnet => "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a29ab5f49ffff001d1dac2b7c0101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000".into(),
Magic::Testnet => "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4adae5494dffff001d1aa4ae180101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000".into(),
Magic::Regtest => "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4adae5494dffff7f20020000000101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000".into(),
}
}
}
impl Serializable for Magic {

View File

@ -1,6 +1,6 @@
use std::io;
use chain::BlockHeader;
use ser::{Stream, Reader};
use ser::{Stream, Reader, Serializable, Deserializable, CompactInteger, Error as ReaderError};
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
@ -9,6 +9,30 @@ pub struct Headers {
pub headers: Vec<BlockHeader>,
}
#[derive(Debug, PartialEq)]
struct HeaderWithTxnCount {
header: BlockHeader,
}
impl From<HeaderWithTxnCount> for BlockHeader {
fn from(header: HeaderWithTxnCount) -> BlockHeader {
header.header
}
}
#[derive(Debug, PartialEq)]
struct HeaderWithTxnCountRef<'a> {
header: &'a BlockHeader,
}
impl<'a> From<&'a BlockHeader> for HeaderWithTxnCountRef<'a> {
fn from(header: &'a BlockHeader) -> Self {
HeaderWithTxnCountRef {
header: header,
}
}
}
impl Payload for Headers {
fn version() -> u32 {
0
@ -19,15 +43,40 @@ impl Payload for Headers {
}
fn deserialize_payload<T>(reader: &mut Reader<T>, _version: u32) -> MessageResult<Self> where T: io::Read {
let headers_with_txn_count: Vec<HeaderWithTxnCount> = try!(reader.read_list());
let headers = Headers {
headers: try!(reader.read_list()),
headers: headers_with_txn_count.into_iter().map(Into::into).collect(),
};
Ok(headers)
}
fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> {
stream.append_list(&self.headers);
let headers_with_txn_count: Vec<HeaderWithTxnCountRef> = self.headers.iter().map(Into::into).collect();
stream.append_list(&headers_with_txn_count);
Ok(())
}
}
impl<'a> Serializable for HeaderWithTxnCountRef<'a> {
fn serialize(&self, stream: &mut Stream) {
stream
.append(self.header)
.append(&CompactInteger::from(0u32));
}
}
impl Deserializable for HeaderWithTxnCount {
fn deserialize<T>(reader: &mut Reader<T>) -> Result<Self, ReaderError> where T: io::Read {
let header = HeaderWithTxnCount {
header: try!(reader.read()),
};
let txn_count: CompactInteger = try!(reader.read());
if txn_count != 0u32.into() {
return Err(ReaderError::MalformedData);
}
Ok(header)
}
}

View File

@ -4,9 +4,9 @@ use config::Config;
use util::{open_db, init_db};
pub fn import(cfg: Config, matches: &ArgMatches) -> Result<(), String> {
let db = open_db(cfg.use_disk_database);
let db = open_db(&cfg);
// TODO: this might be unnecessary here!
init_db(&db);
try!(init_db(&cfg, &db));
let mut writer = create_sync_blocks_writer(db);

View File

@ -7,6 +7,9 @@ use {config, p2p};
pub fn start(cfg: config::Config) -> Result<(), String> {
let mut el = p2p::event_loop();
let db = open_db(&cfg);
try!(init_db(&cfg, &db));
let p2p_cfg = p2p::Config {
threads: 4,
protocol_minimum: 70001,
@ -26,9 +29,6 @@ pub fn start(cfg: config::Config) -> Result<(), String> {
node_table_path: node_table_path(),
};
let db = open_db(cfg.use_disk_database);
init_db(&db);
let sync_handle = el.handle();
let sync_connection_factory = create_sync_connection_factory(&sync_handle, db);

View File

@ -1,11 +1,12 @@
use std::sync::Arc;
use std::path::PathBuf;
use app_dirs::{app_dir, AppDataType};
use chain::Block;
use chain::RepresentH256;
use {db, APP_INFO};
use config::Config;
pub fn open_db(use_disk_database: bool) -> Arc<db::Store> {
match use_disk_database {
pub fn open_db(cfg: &Config) -> Arc<db::Store> {
match cfg.use_disk_database {
true => {
let db_path = app_dir(AppDataType::UserData, &APP_INFO, "db").expect("Failed to get app dir");
Arc::new(db::Storage::new(db_path).expect("Failed to open database"))
@ -22,12 +23,15 @@ pub fn node_table_path() -> PathBuf {
node_table
}
pub fn init_db(db: &Arc<db::Store>) {
pub fn init_db(cfg: &Config, db: &Arc<db::Store>) -> Result<(), String> {
// insert genesis block if db is empty
if db.best_block().is_none() {
// TODO: move to config
let genesis_block: Block = "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a29ab5f49ffff001d1dac2b7c0101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000".into();
db.insert_block(&genesis_block)
.expect("Failed to insert genesis block to the database");
let genesis_block = cfg.magic.genesis_block();
match db.block_hash(0) {
Some(ref db_genesis_block_hash) if db_genesis_block_hash != &genesis_block.hash() => Err("Trying to open database with incompatible genesis block".into()),
Some(_) => Ok(()),
None => {
db.insert_block(&genesis_block).expect("Failed to insert genesis block to the database");
Ok(())
}
}
}

View File

@ -39,11 +39,6 @@ impl HashQueue {
self.queue.len() as u32
}
/// Returns true if queue is empty.
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
/// Returns front element from the given queue.
pub fn front(&self) -> Option<H256> {
self.queue.front().cloned()
@ -63,19 +58,6 @@ impl HashQueue {
Some(self.queue[queue_len - 2].clone())
}
/// Returns n-th element (n is starting from 0), starting from the back-element in the queue.
/// If there are no n-th element - returns (n-1) element & etc.
pub fn back_skip_n(&self, n: usize) -> Option<H256> {
let queue_len = self.queue.len();
if queue_len == 0 {
return None;
}
if n + 1 > queue_len {
return Some(self.queue[0].clone())
}
return Some(self.queue[queue_len - n - 1].clone())
}
/// Returns true if queue contains element.
pub fn contains(&self, hash: &H256) -> bool {
self.set.contains(hash)
@ -175,11 +157,6 @@ impl HashQueueChain {
self.chain[queue_index].len()
}
/// Returns true if given queue is empty.
pub fn is_empty_at(&self, queue_index: usize) -> bool {
self.chain[queue_index].is_empty()
}
/// Returns element at the front of the given queue.
pub fn front_at(&self, queue_index: usize) -> Option<H256> {
let ref queue = self.chain[queue_index];
@ -198,13 +175,6 @@ impl HashQueueChain {
queue.pre_back()
}
/// Returns n-th element (n is starting from 0), starting from the back-element in given queue.
/// If there are no n-th element - returns (n-1) element & etc.
pub fn back_skip_n_at(&self, chain_index: usize, n: usize) -> Option<H256> {
let ref queue = self.chain[chain_index];
queue.back_skip_n(n)
}
/// Returns the back of the whole chain.
pub fn back(&self) -> Option<H256> {
let mut queue_index = self.chain.len() - 1;
@ -223,6 +193,7 @@ impl HashQueueChain {
}
/// Checks if hash is contained in given queue.
#[cfg(test)]
pub fn is_contained_in(&self, queue_index: usize, hash: &H256) -> bool {
self.chain[queue_index].contains(hash)
}
@ -283,16 +254,15 @@ impl Index<u32> for HashQueueChain {
#[cfg(test)]
mod tests {
use super::{HashQueue, HashQueueChain, HashPosition};
use primitives::hash::H256;
#[test]
fn hash_queue_empty() {
let mut queue = HashQueue::new();
assert_eq!(queue.len(), 0);
assert_eq!(queue.is_empty(), true);
assert_eq!(queue.front(), None);
assert_eq!(queue.back(), None);
assert_eq!(queue.pre_back(), None);
assert_eq!(queue.back_skip_n(100), None);
assert_eq!(queue.contains(&"000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f".into()), false);
assert_eq!(queue.pop_front(), None);
assert_eq!(queue.pop_front_n(100), vec![]);
@ -304,11 +274,9 @@ mod tests {
let mut chain = HashQueueChain::with_number_of_queues(3);
assert_eq!(chain.len(), 0);
assert_eq!(chain.len_of(0), 0);
assert_eq!(chain.is_empty_at(0), true);
assert_eq!(chain.front_at(0), None);
assert_eq!(chain.back_at(0), None);
assert_eq!(chain.pre_back_at(0), None);
assert_eq!(chain.back_skip_n_at(0, 100), None);
assert_eq!(chain.back(), None);
assert_eq!(chain.is_contained_in(0, &"000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f".into()), false);
assert_eq!(chain.contains_in(&"000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f".into()), None);
@ -320,16 +288,16 @@ mod tests {
fn hash_queue_chain_not_empty() {
let mut chain = HashQueueChain::with_number_of_queues(4);
chain.push_back_n_at(0, vec![
"0000000000000000000000000000000000000000000000000000000000000000".into(),
"0000000000000000000000000000000000000000000000000000000000000001".into(),
"0000000000000000000000000000000000000000000000000000000000000002".into(),
H256::from(0),
H256::from(1),
H256::from(2),
]);
chain.push_back_n_at(1, vec![
"0000000000000000000000000000000000000000000000000000000000000003".into(),
"0000000000000000000000000000000000000000000000000000000000000004".into(),
H256::from(3),
H256::from(4),
]);
chain.push_back_n_at(2, vec![
"0000000000000000000000000000000000000000000000000000000000000005".into(),
H256::from(5),
]);
assert_eq!(chain.len(), 6);
@ -337,42 +305,25 @@ mod tests {
assert_eq!(chain.len_of(1), 2);
assert_eq!(chain.len_of(2), 1);
assert_eq!(chain.len_of(3), 0);
assert_eq!(chain.is_empty_at(0), false);
assert_eq!(chain.is_empty_at(1), false);
assert_eq!(chain.is_empty_at(2), false);
assert_eq!(chain.is_empty_at(3), true);
assert_eq!(chain.front_at(0), Some("0000000000000000000000000000000000000000000000000000000000000000".into()));
assert_eq!(chain.front_at(1), Some("0000000000000000000000000000000000000000000000000000000000000003".into()));
assert_eq!(chain.front_at(2), Some("0000000000000000000000000000000000000000000000000000000000000005".into()));
assert_eq!(chain.front_at(0), Some(H256::from(0)));
assert_eq!(chain.front_at(1), Some(H256::from(3)));
assert_eq!(chain.front_at(2), Some(H256::from(5)));
assert_eq!(chain.front_at(3), None);
assert_eq!(chain.back_at(0), Some("0000000000000000000000000000000000000000000000000000000000000002".into()));
assert_eq!(chain.back_at(1), Some("0000000000000000000000000000000000000000000000000000000000000004".into()));
assert_eq!(chain.back_at(2), Some("0000000000000000000000000000000000000000000000000000000000000005".into()));
assert_eq!(chain.back_at(0), Some(H256::from(2)));
assert_eq!(chain.back_at(1), Some(H256::from(4)));
assert_eq!(chain.back_at(2), Some(H256::from(5)));
assert_eq!(chain.back_at(3), None);
assert_eq!(chain.pre_back_at(0), Some("0000000000000000000000000000000000000000000000000000000000000001".into()));
assert_eq!(chain.pre_back_at(1), Some("0000000000000000000000000000000000000000000000000000000000000003".into()));
assert_eq!(chain.pre_back_at(0), Some(H256::from(1)));
assert_eq!(chain.pre_back_at(1), Some(H256::from(3)));
assert_eq!(chain.pre_back_at(2), None);
assert_eq!(chain.pre_back_at(3), None);
assert_eq!(chain.back(), Some("0000000000000000000000000000000000000000000000000000000000000005".into()));
assert_eq!(chain.is_contained_in(0, &"0000000000000000000000000000000000000000000000000000000000000002".into()), true);
assert_eq!(chain.is_contained_in(1, &"0000000000000000000000000000000000000000000000000000000000000002".into()), false);
assert_eq!(chain.is_contained_in(2, &"0000000000000000000000000000000000000000000000000000000000000002".into()), false);
assert_eq!(chain.is_contained_in(3, &"0000000000000000000000000000000000000000000000000000000000000002".into()), false);
assert_eq!(chain.contains_in(&"0000000000000000000000000000000000000000000000000000000000000002".into()), Some(0));
assert_eq!(chain.contains_in(&"0000000000000000000000000000000000000000000000000000000000000005".into()), Some(2));
assert_eq!(chain.contains_in(&"0000000000000000000000000000000000000000000000000000000000000009".into()), None);
assert_eq!(chain.back_skip_n_at(0, 0), Some("0000000000000000000000000000000000000000000000000000000000000002".into()));
assert_eq!(chain.back_skip_n_at(1, 0), Some("0000000000000000000000000000000000000000000000000000000000000004".into()));
assert_eq!(chain.back_skip_n_at(2, 0), Some("0000000000000000000000000000000000000000000000000000000000000005".into()));
assert_eq!(chain.back_skip_n_at(3, 0), None);
assert_eq!(chain.back_skip_n_at(0, 1), Some("0000000000000000000000000000000000000000000000000000000000000001".into()));
assert_eq!(chain.back_skip_n_at(1, 1), Some("0000000000000000000000000000000000000000000000000000000000000003".into()));
assert_eq!(chain.back_skip_n_at(2, 1), Some("0000000000000000000000000000000000000000000000000000000000000005".into()));
assert_eq!(chain.back_skip_n_at(3, 1), None);
assert_eq!(chain.back_skip_n_at(0, 2), Some("0000000000000000000000000000000000000000000000000000000000000000".into()));
assert_eq!(chain.back_skip_n_at(1, 2), Some("0000000000000000000000000000000000000000000000000000000000000003".into()));
assert_eq!(chain.back_skip_n_at(2, 2), Some("0000000000000000000000000000000000000000000000000000000000000005".into()));
assert_eq!(chain.back_skip_n_at(3, 2), None);
assert_eq!(chain.back(), Some(H256::from(5)));
assert_eq!(chain.is_contained_in(0, &H256::from(2)), true);
assert_eq!(chain.is_contained_in(1, &H256::from(2)), false);
assert_eq!(chain.is_contained_in(2, &H256::from(2)), false);
assert_eq!(chain.is_contained_in(3, &H256::from(2)), false);
assert_eq!(chain.contains_in(&H256::from(2)), Some(0));
assert_eq!(chain.contains_in(&H256::from(5)), Some(2));
assert_eq!(chain.contains_in(&H256::from(9)), None);
}
}

View File

@ -1,7 +1,7 @@
use std::fmt;
use std::sync::Arc;
use parking_lot::RwLock;
use chain::{Block, RepresentH256};
use chain::Block;
use db;
use primitives::hash::H256;
use hash_queue::{HashQueueChain, HashPosition};
@ -47,6 +47,23 @@ pub struct Information {
pub stored: u32,
}
/// Result of intersecting chain && inventory
#[derive(Debug, PartialEq)]
pub enum InventoryIntersection {
/// 3.2: No intersection with in-memory queue && no intersection with db
NoKnownBlocks(usize),
/// 2.3: Inventory has no new blocks && some of blocks in inventory are in in-memory queue
InMemoryNoNewBlocks,
/// 2.4.2: Inventory has new blocks && these blocks are right after chain' best block
InMemoryMainNewBlocks(usize),
/// 2.4.3: Inventory has new blocks && these blocks are forked from our chain' best block
InMemoryForkNewBlocks(usize),
/// 3.3: No intersection with in-memory queue && has intersection with db && all blocks are already stored in db
DbAllBlocksKnown,
/// 3.4: No intersection with in-memory queue && has intersection with db && some blocks are not yet stored in db
DbForkNewBlocks(usize),
}
/// Blockchain from synchroniation point of view, consisting of:
/// 1) all blocks from the `storage` [oldest blocks]
/// 2) all blocks currently verifying by `verification_queue`
@ -109,7 +126,7 @@ impl Chain {
scheduled: self.hash_chain.len_of(SCHEDULED_QUEUE),
requested: self.hash_chain.len_of(REQUESTED_QUEUE),
verifying: self.hash_chain.len_of(VERIFYING_QUEUE),
stored: self.storage.best_block().map_or(0, |block| block.number + 1),
stored: self.best_storage_block.number + 1,
}
}
@ -137,68 +154,14 @@ impl Chain {
}
}
/// Returns true if has blocks of given type
pub fn has_blocks_of_state(&self, state: BlockState) -> bool {
match state {
BlockState::Stored => true, // storage with genesis block is required
_ => !self.hash_chain.is_empty_at(state.to_queue_index()),
}
}
/// Get best block
pub fn best_block(&self) -> db::BestBlock {
let storage_best_block = self.storage.best_block().expect("storage with genesis block is required");
match self.hash_chain.back() {
Some(hash) => db::BestBlock {
number: storage_best_block.number + self.hash_chain.len(),
number: self.best_storage_block.number + self.hash_chain.len(),
hash: hash.clone(),
},
None => db::BestBlock {
number: storage_best_block.number,
hash: storage_best_block.hash,
}
}
}
/// Get best block of given state
pub fn best_block_of_state(&self, state: BlockState) -> Option<db::BestBlock> {
match state {
BlockState::Scheduled => self.hash_chain.back_at(SCHEDULED_QUEUE)
.map(|hash| db::BestBlock {
hash: hash,
number: self.storage.best_block().expect("storage with genesis block is required").number + 1
+ self.hash_chain.len_of(VERIFYING_QUEUE)
+ self.hash_chain.len_of(REQUESTED_QUEUE)
+ self.hash_chain.len_of(SCHEDULED_QUEUE)
}),
BlockState::Requested => self.hash_chain.back_at(REQUESTED_QUEUE)
.map(|hash| db::BestBlock {
hash: hash,
number: self.storage.best_block().expect("storage with genesis block is required").number + 1
+ self.hash_chain.len_of(VERIFYING_QUEUE)
+ self.hash_chain.len_of(REQUESTED_QUEUE)
}),
BlockState::Verifying => self.hash_chain.back_at(VERIFYING_QUEUE)
.map(|hash| db::BestBlock {
hash: hash,
number: self.storage.best_block().expect("storage with genesis block is required").number + 1
+ self.hash_chain.len_of(VERIFYING_QUEUE)
}),
BlockState::Stored => {
self.storage.best_block()
},
_ => panic!("not supported"),
}
}
/// Check if block has given state
pub fn block_has_state(&self, hash: &H256, state: BlockState) -> bool {
match state {
BlockState::Scheduled => self.hash_chain.is_contained_in(SCHEDULED_QUEUE, hash),
BlockState::Requested => self.hash_chain.is_contained_in(REQUESTED_QUEUE, hash),
BlockState::Verifying => self.hash_chain.is_contained_in(VERIFYING_QUEUE, hash),
BlockState::Stored => self.storage.contains_block(db::BlockRef::Hash(hash.clone())),
BlockState::Unknown => self.block_state(hash) == BlockState::Unknown,
None => self.best_storage_block.clone(),
}
}
@ -214,39 +177,12 @@ impl Chain {
}
}
/// Get block number from storage
pub fn storage_block_number(&self, hash: &H256) -> Option<u32> {
self.storage.block_number(hash)
}
/// Get block hash from storage
pub fn storage_block_hash(&self, number: u32) -> Option<H256> {
self.storage.block_hash(number)
}
/// Get block from the storage
pub fn storage_block(&self, hash: &H256) -> Option<Block> {
self.storage.block(db::BlockRef::Hash(hash.clone()))
}
/// Prepare best block locator hashes
pub fn best_block_locator_hashes(&self) -> Vec<H256> {
let mut result: Vec<H256> = Vec::with_capacity(4);
if let Some(pre_best_block) = self.hash_chain.back_skip_n_at(SCHEDULED_QUEUE, 2) {
result.push(pre_best_block);
}
if let Some(pre_best_block) = self.hash_chain.back_skip_n_at(REQUESTED_QUEUE, 2) {
result.push(pre_best_block);
}
if let Some(pre_best_block) = self.hash_chain.back_skip_n_at(VERIFYING_QUEUE, 2) {
result.push(pre_best_block);
}
result.push(self.best_storage_block.hash.clone());
result
}
/// Prepare block locator hashes, as described in protocol documentation:
/// https://en.bitcoin.it/wiki/Protocol_documentation#getblocks
/// When there are forked blocks in the queue, this method can result in
/// mixed block locator hashes ([0 - from fork1, 1 - from fork2, 2 - from fork1]).
/// Peer will respond with blocks of fork1 || fork2 => we could end up in some side fork
/// To resolve this, after switching to saturated state, we will also ask all peers for inventory.
pub fn block_locator_hashes(&self) -> Vec<H256> {
let mut block_locator_hashes: Vec<H256> = Vec::new();
@ -254,8 +190,7 @@ impl Chain {
let (local_index, step) = self.block_locator_hashes_for_queue(&mut block_locator_hashes);
// calculate for storage
let storage_best_block_number = self.storage.best_block().expect("storage with genesis block is required").number;
let storage_index = if storage_best_block_number < local_index { 0 } else { storage_best_block_number - local_index };
let storage_index = if self.best_storage_block.number < local_index { 0 } else { self.best_storage_block.number - local_index };
self.block_locator_hashes_for_storage(storage_index, step, &mut block_locator_hashes);
block_locator_hashes
}
@ -287,16 +222,21 @@ impl Chain {
/// Insert new best block to storage
pub fn insert_best_block(&mut self, block: Block) -> Result<(), db::Error> {
if block.block_header.previous_header_hash != self.best_storage_block.hash {
return Err(db::Error::DB("Trying to insert out-of-order block".into()));
}
// insert to storage
try!(self.storage.insert_block(&block));
// remember new best block hash
self.best_storage_block.number += 1;
self.best_storage_block.hash = block.hash();
self.best_storage_block = self.storage.best_block().expect("Inserted block above");
// insert to storage
self.storage.insert_block(&block)
Ok(())
}
/// Remove block
pub fn remove_block(&mut self, hash: &H256) {
if self.hash_chain.remove_at(SCHEDULED_QUEUE, hash) == HashPosition::Missing
&& self.hash_chain.remove_at(REQUESTED_QUEUE, hash) == HashPosition::Missing {
self.hash_chain.remove_at(VERIFYING_QUEUE, hash);
}
}
/// Remove block by hash if it is currently in given state
@ -309,6 +249,57 @@ impl Chain {
self.hash_chain.remove_all_at(state.to_queue_index());
}
/// Intersect chain with inventory
pub fn intersect_with_inventory(&self, inventory: &Vec<H256>) -> InventoryIntersection {
let inventory_len = inventory.len();
assert!(inventory_len != 0);
// giving that blocks in inventory are ordered
match self.block_state(&inventory[0]) {
// if first block of inventory is unknown => all other blocks are also unknown
BlockState::Unknown => {
InventoryIntersection::NoKnownBlocks(0)
},
// else if first block is known
first_block_state @ _ => match self.block_state(&inventory[inventory_len - 1]) {
// if last block is known to be in db => all inventory blocks are also in db
BlockState::Stored => {
InventoryIntersection::DbAllBlocksKnown
},
// if first block is known && last block is unknown => intersection with queue or with db
BlockState::Unknown => {
// find last known block
let mut previous_state = first_block_state;
for index in 1..inventory_len {
let state = self.block_state(&inventory[index]);
if state == BlockState::Unknown {
// previous block is stored => fork from stored block
if previous_state == BlockState::Stored {
return InventoryIntersection::DbForkNewBlocks(index);
}
// previous block is best block => no fork
else if &self.best_block().hash == &inventory[index - 1] {
return InventoryIntersection::InMemoryMainNewBlocks(index);
}
// previous block is not a best block => fork
else {
return InventoryIntersection::InMemoryForkNewBlocks(index);
}
}
previous_state = state;
}
// unreachable because last block is unknown && in above loop we search for unknown blocks
unreachable!();
},
// if first block is known && last block is also known && is in queue => queue intersection with no new block
_ => {
InventoryIntersection::InMemoryNoNewBlocks
}
}
}
}
/// Calculate block locator hashes for hash queue
fn block_locator_hashes_for_queue(&self, hashes: &mut Vec<H256>) -> (u32, u32) {
let queue_len = self.hash_chain.len();
@ -359,7 +350,7 @@ impl fmt::Debug for Chain {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
try!(writeln!(f, "chain: ["));
{
let mut num = self.storage.best_block().expect("Storage with genesis block is required").number;
let mut num = self.best_storage_block.number;
try!(writeln!(f, "\tworse(stored): {} {:?}", 0, self.storage.block_hash(0)));
try!(writeln!(f, "\tbest(stored): {} {:?}", num, self.storage.block_hash(num)));
@ -389,8 +380,9 @@ mod tests {
use std::sync::Arc;
use chain::RepresentH256;
use hash_queue::HashPosition;
use super::{Chain, BlockState};
use super::{Chain, BlockState, InventoryIntersection};
use db::{self, Store, BestBlock};
use primitives::hash::H256;
use test_data;
#[test]
@ -406,19 +398,9 @@ mod tests {
assert_eq!(chain.length_of_state(BlockState::Requested), 0);
assert_eq!(chain.length_of_state(BlockState::Verifying), 0);
assert_eq!(chain.length_of_state(BlockState::Stored), 1);
assert_eq!(chain.has_blocks_of_state(BlockState::Scheduled), false);
assert_eq!(chain.has_blocks_of_state(BlockState::Requested), false);
assert_eq!(chain.has_blocks_of_state(BlockState::Verifying), false);
assert_eq!(chain.has_blocks_of_state(BlockState::Stored), true);
assert_eq!(&chain.best_block(), &db_best_block);
assert_eq!(chain.best_block_of_state(BlockState::Scheduled), None);
assert_eq!(chain.best_block_of_state(BlockState::Requested), None);
assert_eq!(chain.best_block_of_state(BlockState::Verifying), None);
assert_eq!(chain.best_block_of_state(BlockState::Stored), Some(db_best_block.clone()));
assert_eq!(chain.block_has_state(&db_best_block.hash, BlockState::Requested), false);
assert_eq!(chain.block_has_state(&db_best_block.hash, BlockState::Stored), true);
assert_eq!(chain.block_state(&db_best_block.hash), BlockState::Stored);
assert_eq!(chain.block_state(&"0000000000000000000000000000000000000000000000000000000000000000".into()), BlockState::Unknown);
assert_eq!(chain.block_state(&H256::from(0)), BlockState::Unknown);
}
#[test]
@ -428,12 +410,12 @@ mod tests {
// add 6 blocks to scheduled queue
chain.schedule_blocks_hashes(vec![
"0000000000000000000000000000000000000000000000000000000000000000".into(),
"0000000000000000000000000000000000000000000000000000000000000001".into(),
"0000000000000000000000000000000000000000000000000000000000000002".into(),
"0000000000000000000000000000000000000000000000000000000000000003".into(),
"0000000000000000000000000000000000000000000000000000000000000004".into(),
"0000000000000000000000000000000000000000000000000000000000000005".into(),
H256::from(0),
H256::from(1),
H256::from(2),
H256::from(3),
H256::from(4),
H256::from(5),
]);
assert!(chain.information().scheduled == 6 && chain.information().requested == 0
&& chain.information().verifying == 0 && chain.information().stored == 1);
@ -452,22 +434,22 @@ mod tests {
&& chain.information().verifying == 0 && chain.information().stored == 1);
// try to remove block 0 from scheduled queue => missing
assert_eq!(chain.remove_block_with_state(&"0000000000000000000000000000000000000000000000000000000000000000".into(), BlockState::Scheduled), HashPosition::Missing);
assert_eq!(chain.remove_block_with_state(&H256::from(0), BlockState::Scheduled), HashPosition::Missing);
assert!(chain.information().scheduled == 3 && chain.information().requested == 3
&& chain.information().verifying == 0 && chain.information().stored == 1);
// remove blocks 0 & 1 from requested queue
assert_eq!(chain.remove_block_with_state(&"0000000000000000000000000000000000000000000000000000000000000001".into(), BlockState::Requested), HashPosition::Inside);
assert_eq!(chain.remove_block_with_state(&"0000000000000000000000000000000000000000000000000000000000000000".into(), BlockState::Requested), HashPosition::Front);
assert_eq!(chain.remove_block_with_state(&H256::from(1), BlockState::Requested), HashPosition::Inside);
assert_eq!(chain.remove_block_with_state(&H256::from(0), BlockState::Requested), HashPosition::Front);
assert!(chain.information().scheduled == 3 && chain.information().requested == 1
&& chain.information().verifying == 0 && chain.information().stored == 1);
// mark 0 & 1 as verifying
chain.verify_block_hash("0000000000000000000000000000000000000000000000000000000000000001".into());
chain.verify_block_hash("0000000000000000000000000000000000000000000000000000000000000002".into());
chain.verify_block_hash(H256::from(1));
chain.verify_block_hash(H256::from(2));
assert!(chain.information().scheduled == 3 && chain.information().requested == 1
&& chain.information().verifying == 2 && chain.information().stored == 1);
// mark block 0 as verified
assert_eq!(chain.remove_block_with_state(&"0000000000000000000000000000000000000000000000000000000000000001".into(), BlockState::Verifying), HashPosition::Front);
assert_eq!(chain.remove_block_with_state(&H256::from(1), BlockState::Verifying), HashPosition::Front);
assert!(chain.information().scheduled == 3 && chain.information().requested == 1
&& chain.information().verifying == 1 && chain.information().stored == 1);
// insert new best block to the chain
@ -496,85 +478,149 @@ mod tests {
assert_eq!(chain.block_locator_hashes(), vec![block2_hash.clone(), block1_hash.clone(), genesis_hash.clone()]);
chain.schedule_blocks_hashes(vec![
"0000000000000000000000000000000000000000000000000000000000000000".into(),
"0000000000000000000000000000000000000000000000000000000000000001".into(),
"0000000000000000000000000000000000000000000000000000000000000002".into(),
"0000000000000000000000000000000000000000000000000000000000000003".into(),
"0000000000000000000000000000000000000000000000000000000000000004".into(),
"0000000000000000000000000000000000000000000000000000000000000005".into(),
"0000000000000000000000000000000000000000000000000000000000000006".into(),
"0000000000000000000000000000000000000000000000000000000000000007".into(),
"0000000000000000000000000000000000000000000000000000000000000008".into(),
"0000000000000000000000000000000000000000000000000000000000000009".into(),
"0000000000000000000000000000000000000000000000000000000000000010".into(),
H256::from(0),
H256::from(1),
H256::from(2),
H256::from(3),
H256::from(4),
H256::from(5),
H256::from(6),
H256::from(7),
H256::from(8),
H256::from(9),
H256::from(10),
]);
chain.request_blocks_hashes(10);
chain.verify_blocks_hashes(10);
assert_eq!(chain.best_block_locator_hashes()[0], "0000000000000000000000000000000000000000000000000000000000000010".into());
assert_eq!(chain.block_locator_hashes(), vec![
"0000000000000000000000000000000000000000000000000000000000000010".into(),
"0000000000000000000000000000000000000000000000000000000000000009".into(),
"0000000000000000000000000000000000000000000000000000000000000008".into(),
"0000000000000000000000000000000000000000000000000000000000000007".into(),
"0000000000000000000000000000000000000000000000000000000000000006".into(),
"0000000000000000000000000000000000000000000000000000000000000005".into(),
"0000000000000000000000000000000000000000000000000000000000000004".into(),
"0000000000000000000000000000000000000000000000000000000000000003".into(),
"0000000000000000000000000000000000000000000000000000000000000002".into(),
"0000000000000000000000000000000000000000000000000000000000000001".into(),
H256::from(10),
H256::from(9),
H256::from(8),
H256::from(7),
H256::from(6),
H256::from(5),
H256::from(4),
H256::from(3),
H256::from(2),
H256::from(1),
block2_hash.clone(),
genesis_hash.clone(),
]);
chain.schedule_blocks_hashes(vec![
"0000000000000000000000000000000000000000000000000000000000000011".into(),
"0000000000000000000000000000000000000000000000000000000000000012".into(),
"0000000000000000000000000000000000000000000000000000000000000013".into(),
"0000000000000000000000000000000000000000000000000000000000000014".into(),
"0000000000000000000000000000000000000000000000000000000000000015".into(),
"0000000000000000000000000000000000000000000000000000000000000016".into(),
H256::from(11),
H256::from(12),
H256::from(13),
H256::from(14),
H256::from(15),
H256::from(16),
]);
chain.request_blocks_hashes(10);
assert_eq!(chain.best_block_locator_hashes()[0], "0000000000000000000000000000000000000000000000000000000000000014".into());
assert_eq!(chain.block_locator_hashes(), vec![
"0000000000000000000000000000000000000000000000000000000000000016".into(),
"0000000000000000000000000000000000000000000000000000000000000015".into(),
"0000000000000000000000000000000000000000000000000000000000000014".into(),
"0000000000000000000000000000000000000000000000000000000000000013".into(),
"0000000000000000000000000000000000000000000000000000000000000012".into(),
"0000000000000000000000000000000000000000000000000000000000000011".into(),
"0000000000000000000000000000000000000000000000000000000000000010".into(),
"0000000000000000000000000000000000000000000000000000000000000009".into(),
"0000000000000000000000000000000000000000000000000000000000000008".into(),
"0000000000000000000000000000000000000000000000000000000000000007".into(),
"0000000000000000000000000000000000000000000000000000000000000005".into(),
"0000000000000000000000000000000000000000000000000000000000000001".into(),
H256::from(16),
H256::from(15),
H256::from(14),
H256::from(13),
H256::from(12),
H256::from(11),
H256::from(10),
H256::from(9),
H256::from(8),
H256::from(7),
H256::from(5),
H256::from(1),
genesis_hash.clone(),
]);
chain.schedule_blocks_hashes(vec![
"0000000000000000000000000000000000000000000000000000000000000020".into(),
"0000000000000000000000000000000000000000000000000000000000000021".into(),
"0000000000000000000000000000000000000000000000000000000000000022".into(),
H256::from(20),
H256::from(21),
H256::from(22),
]);
assert_eq!(chain.best_block_locator_hashes()[0], "0000000000000000000000000000000000000000000000000000000000000020".into());
assert_eq!(chain.block_locator_hashes(), vec![
"0000000000000000000000000000000000000000000000000000000000000022".into(),
"0000000000000000000000000000000000000000000000000000000000000021".into(),
"0000000000000000000000000000000000000000000000000000000000000020".into(),
"0000000000000000000000000000000000000000000000000000000000000016".into(),
"0000000000000000000000000000000000000000000000000000000000000015".into(),
"0000000000000000000000000000000000000000000000000000000000000014".into(),
"0000000000000000000000000000000000000000000000000000000000000013".into(),
"0000000000000000000000000000000000000000000000000000000000000012".into(),
"0000000000000000000000000000000000000000000000000000000000000011".into(),
"0000000000000000000000000000000000000000000000000000000000000010".into(),
"0000000000000000000000000000000000000000000000000000000000000008".into(),
"0000000000000000000000000000000000000000000000000000000000000004".into(),
H256::from(22),
H256::from(21),
H256::from(20),
H256::from(16),
H256::from(15),
H256::from(14),
H256::from(13),
H256::from(12),
H256::from(11),
H256::from(10),
H256::from(8),
H256::from(4),
genesis_hash.clone(),
]);
}
#[test]
fn chain_intersect_with_inventory() {
let mut chain = Chain::new(Arc::new(db::TestStorage::with_genesis_block()));
// append 2 db blocks
chain.insert_best_block(test_data::block_h1()).expect("Error inserting new block");
chain.insert_best_block(test_data::block_h2()).expect("Error inserting new block");
// append 3 verifying blocks
chain.schedule_blocks_hashes(vec![
H256::from(0),
H256::from(1),
H256::from(2),
]);
chain.request_blocks_hashes(3);
chain.verify_blocks_hashes(3);
// append 3 requested blocks
chain.schedule_blocks_hashes(vec![
H256::from(10),
H256::from(11),
H256::from(12),
]);
chain.request_blocks_hashes(10);
// append 3 scheduled blocks
chain.schedule_blocks_hashes(vec![
H256::from(20),
H256::from(21),
H256::from(22),
]);
assert_eq!(chain.intersect_with_inventory(&vec![
H256::from(30),
H256::from(31),
]), InventoryIntersection::NoKnownBlocks(0));
assert_eq!(chain.intersect_with_inventory(&vec![
H256::from(2),
H256::from(10),
H256::from(11),
H256::from(12),
H256::from(20),
]), InventoryIntersection::InMemoryNoNewBlocks);
assert_eq!(chain.intersect_with_inventory(&vec![
H256::from(21),
H256::from(22),
H256::from(30),
H256::from(31),
]), InventoryIntersection::InMemoryMainNewBlocks(2));
assert_eq!(chain.intersect_with_inventory(&vec![
H256::from(20),
H256::from(21),
H256::from(30),
H256::from(31),
]), InventoryIntersection::InMemoryForkNewBlocks(2));
assert_eq!(chain.intersect_with_inventory(&vec![
test_data::block_h1().hash(),
test_data::block_h2().hash(),
]), InventoryIntersection::DbAllBlocksKnown);
assert_eq!(chain.intersect_with_inventory(&vec![
test_data::block_h2().hash(),
H256::from(30),
H256::from(31),
]), InventoryIntersection::DbForkNewBlocks(1));
}
}

View File

@ -1,7 +1,7 @@
use std::thread;
use std::sync::Arc;
use std::cmp::{min, max};
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::collections::hash_map::Entry;
use std::sync::mpsc::{channel, Sender, Receiver};
use parking_lot::Mutex;
@ -12,10 +12,9 @@ use futures_cpupool::CpuPool;
use db;
use chain::{Block, RepresentH256};
use primitives::hash::H256;
use hash_queue::HashPosition;
use synchronization_peers::Peers;
#[cfg(test)] use synchronization_peers::{Information as PeersInformation};
use synchronization_chain::{ChainRef, BlockState};
use synchronization_chain::{ChainRef, BlockState, InventoryIntersection};
#[cfg(test)]
use synchronization_chain::{Information as ChainInformation};
use verification::{ChainVerifier, Error as VerificationError, Verify};
@ -39,20 +38,21 @@ use std::time::Duration;
///! 2.4) if !inventory_rest.is_empty(): ===> has new unknown blocks in inventory
///! 2.4.1) queue_rest = queue after intersection
///! 2.4.2) if queue_rest.is_empty(): ===> has new unknown blocks in inventory, no fork
///! 2.4.2.1) scheduled_blocks.append(inventory_rest)
///! 2.4.2.2) stop (2.4.2)
///! 2.4.3) if !queue_rest.is_empty(): ===> has new unknown blocks in inventory, fork
///! 2.4.3.1) scheduled_blocks.append(inventory_rest)
///! 2.4.3.2) stop (2.4.3)
///! 2.4.4) if !queue_rest.is_empty(): ===> has new unknown blocks in inventory, fork
///! 2.4.4.1) scheduled_blocks.append(inventory_rest)
///! 2.4.4.2) stop (2.4.4)
///! 2.4.5) stop (2.4)
///! 2.4.3) stop (2.4)
///! 2.5) stop (2)
///! 3) if queue_intersection.is_empty(): ===> responded with out-of-sync-window blocks
///! 3.1) last_known_block = inventory.last(b => b.is_known())
///! 3.2) if last_known_block == None: ===> we know nothing about these blocks & we haven't asked for these
///! 3.2.1) peer will be excluded later by management thread
///! 3.2.1) if !synchronizing => remember peer as useful + ask for blocks
///! 3.2.1) if synchronizing => peer will be excluded later by management thread
///! 3.2.2) stop (3.2)
///! 3.3) if last_known_block == last(inventory): ===> responded with all-known-blocks
///! 3.3.1) remember peer as useful (possibly had failures before && have been excluded from sync)
///! 3.3.1) if syncing, remember peer as useful (possibly had failures before && have been excluded from sync)
///! 3.3.2) stop (3.3)
///! 3.4) if last_known_block in the middle of inventory: ===> responded with forked blocks
///! 3.4.1) remember peer as useful
@ -62,24 +62,40 @@ use std::time::Duration;
///! 3.5) stop (3)
///!
///! on_peer_block: After receiving `block` message:
///! 1) if block_state(block) in (Scheduled, Verifying, Stored): ===> late delivery
///! 1) if block_state(block) in (Verifying, Stored): ===> late delivery
///! 1.1) remember peer as useful
///! 1.2) stop (1)
///! 2) if block_state(block) == Requested: ===> on-time delivery
///! 2) if block_state(block) in (Scheduled, Requested): ===> future/on-time delivery
///! 2.1) remember peer as useful
///! 2.2) move block from requested to verifying queue
///! 2.2) queue verification().and_then(insert).or_else(reset_sync)
///! 2.3) stop (2)
///! 2.2) if block_state(block.parent) in (Verifying, Stored): ===> we can proceed with verification
///! 2.2.1) remove block from current queue (Verifying || Stored)
///! 2.2.2) append block to the verification queue
///! 2.2.3) queue verification().and_then(on_block_verification_success).or_else(on_block_verification_error)
///! 2.2.4) try to verify orphan blocks
///! 2.2.5) stop (2.2)
///! 2.3) if block_state(block.parent) in (Requested, Scheduled): ===> we have found an orphan block
///! 2.3.1) remove block from current queue (Verifying || Stored)
///! 2.3.2) append block to the orphans
///! 2.3.3) stop (2.3)
///! 2.4) if block_state(block.parent) == Unknown: ===> bad block found
///! 2.4.1) remove block from current queue (Verifying || Stored)
///! 2.4.2) stop (2.4)
///! 2.5) stop (2)
///! 3) if block_state(block) == Unknown: ===> maybe we are on-top of chain && new block is announced?
///! 3.1) if block_state(block.parent_hash) == Unknown: ===> we do not know parent
///! 3.1.1) ignore this block
///! 3.1.2) stop (3.1)
///! 3.2) if block_state(block.parent_hash) != Unknown: ===> fork found
///! 3.2) if block_state(block.parent_hash) in (Verifying, Stored): ===> fork found, can verify
///! 3.2.1) ask peer for best inventory (after this block)
///! 3.2.2) append block to verifying queue
///! 3.2.3) queue verification().and_then(insert).or_else(reset_sync)
///! 3.2.3) queue verification().and_then(on_block_verification_success).or_else(on_block_verification_error)
///! 3.2.4) stop (3.2)
///! 2.3) stop (2)
///! 3.3) if block_state(block.parent_hash) in (Requested, Scheduled): ===> fork found, add as orphan
///! 3.3.1) ask peer for best inventory (after this block)
///! 3.3.2) append block to orphan
///! 3.3.3) stop (3.3)
///! 3.4) stop (2)
///! + if no blocks left in scheduled + requested queue => we are saturated => ask all peers for inventory & forget
///!
///! execute_synchronization_tasks: After receiving `inventory` message OR receiving `block` message OR when management thread schedules tasks:
///! 1) if there are blocks in `scheduled` queue AND we can fit more blocks into memory: ===> ask for blocks
@ -113,7 +129,8 @@ use std::time::Duration;
///!
///! on_block_verification_error: When verification completes with an error:
///! 1) remove block from verification queue
///! 2) remove all known children from all queues [so that new `block` messages will be ignored in on_peer_block.3.1.1]
///! 2) remove all known children from all queues [so that new `block` messages will be ignored in on_peer_block.3.1.1] (TODO: not implemented currently!!!)
///!
///!
/// Approximate maximal number of blocks hashes in scheduled queue.
@ -162,7 +179,6 @@ pub trait Client : Send + 'static {
fn on_new_blocks_inventory(&mut self, peer_index: usize, peer_hashes: Vec<H256>);
fn on_peer_block(&mut self, peer_index: usize, block: Block);
fn on_peer_disconnected(&mut self, peer_index: usize);
fn reset(&mut self, is_hard: bool);
fn on_block_verification_success(&mut self, block: Block);
fn on_block_verification_error(&mut self, err: &VerificationError, hash: &H256);
}
@ -190,7 +206,7 @@ pub struct SynchronizationClient<T: TaskExecutor> {
/// Chain reference.
chain: ChainRef,
/// Blocks from requested_hashes, but received out-of-order.
orphaned_blocks: HashMap<H256, Block>,
orphaned_blocks: HashMap<H256, Vec<(H256, Block)>>,
/// Verification work transmission channel.
verification_work_sender: Option<Sender<VerificationTask>>,
/// Verification thread.
@ -218,10 +234,11 @@ impl State {
impl<T> Drop for SynchronizationClient<T> where T: TaskExecutor {
fn drop(&mut self) {
if let Some(join_handle) = self.verification_worker_thread.take() {
self.verification_work_sender
// ignore send error here <= destructing anyway
let _ = self.verification_work_sender
.take()
.expect("Some(join_handle) => Some(verification_work_sender)")
.send(VerificationTask::Stop).expect("TODO");
.send(VerificationTask::Stop);
join_handle.join().expect("Clean shutdown.");
}
}
@ -246,45 +263,25 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
// update peers to select next tasks
self.peers.on_block_received(peer_index, &block_hash);
self.process_peer_block(block_hash, block);
self.process_peer_block(peer_index, block_hash, block);
self.execute_synchronization_tasks();
}
/// Peer disconnected.
fn on_peer_disconnected(&mut self, peer_index: usize) {
self.peers.on_peer_disconnected(peer_index);
// when last peer is disconnected, reset, but let verifying blocks be verified
self.reset(false);
}
/// Reset synchronization process
fn reset(&mut self, is_hard: bool) {
self.peers.reset();
self.orphaned_blocks.clear();
// TODO: reset verification queue
let mut chain = self.chain.write();
chain.remove_blocks_with_state(BlockState::Requested);
chain.remove_blocks_with_state(BlockState::Scheduled);
if is_hard {
self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number);
chain.remove_blocks_with_state(BlockState::Verifying);
warn!(target: "sync", "Synchronization process restarting from block {:?}", chain.best_block());
}
else {
self.state = State::Saturated;
if self.peers.on_peer_disconnected(peer_index) {
self.switch_to_saturated_state(false);
}
}
/// Process successful block verification
fn on_block_verification_success(&mut self, block: Block) {
{
let hash = block.hash();
let mut chain = self.chain.write();
// remove from verifying queue
assert_eq!(chain.remove_block_with_state(&hash, BlockState::Verifying), HashPosition::Front);
// remove block from verification queue
chain.remove_block_with_state(&block.hash(), BlockState::Verifying);
// insert to storage
chain.insert_best_block(block)
@ -299,8 +296,12 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
fn on_block_verification_error(&mut self, err: &VerificationError, hash: &H256) {
warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash, err);
// reset synchronization process
self.reset(true);
{
let mut chain = self.chain.write();
// remove block from verification queue
chain.remove_block_with_state(&hash, BlockState::Verifying);
}
// start new tasks
self.execute_synchronization_tasks();
@ -351,8 +352,10 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
None => return Ok(()),
};
let mut client = client.lock();
manage_synchronization_peers(&mut client.peers);
client.execute_synchronization_tasks();
if client.state.is_synchronizing() {
manage_synchronization_peers(&mut client.peers);
client.execute_synchronization_tasks();
}
Ok(())
})
.for_each(|_| Ok(()))
@ -376,144 +379,101 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
}
/// Process new blocks inventory
fn process_new_blocks_inventory(&mut self, peer_index: usize, mut peer_hashes: Vec<H256>) {
// | requested | QUEUED |
// --- [1]
// --- [2] +
// --- [3] +
// --- [4]
// -+- [5] +
// -+- [6] +
// -+- [7] +
// ---+---------+--- [8] +
// ---+--------+--- [9] +
// ---+---------+--------+--- [10]
fn process_new_blocks_inventory(&mut self, peer_index: usize, mut inventory: Vec<H256>) {
let mut chain = self.chain.write();
'outer: loop {
// when synchronization is idling
// => request full inventory
if !chain.has_blocks_of_state(BlockState::Scheduled)
&& !chain.has_blocks_of_state(BlockState::Requested) {
let unknown_blocks: Vec<_> = peer_hashes.into_iter()
.filter(|hash| chain.block_has_state(&hash, BlockState::Unknown))
.collect();
// no new blocks => no need to switch to the synchronizing state
if unknown_blocks.is_empty() {
return;
}
chain.schedule_blocks_hashes(unknown_blocks);
self.peers.insert(peer_index);
break;
}
// cases: [2], [5], [6], [8]
// if last block from peer_hashes is in window { requested_hashes + queued_hashes }
// => no new blocks for synchronization, but we will use this peer in synchronization
let peer_hashes_len = peer_hashes.len();
if chain.block_has_state(&peer_hashes[peer_hashes_len - 1], BlockState::Scheduled)
|| chain.block_has_state(&peer_hashes[peer_hashes_len - 1], BlockState::Requested) {
self.peers.insert(peer_index);
return;
}
// cases: [1], [3], [4], [7], [9], [10]
// try to find new blocks for synchronization from inventory
let mut last_known_peer_hash_index = peer_hashes_len - 1;
loop {
if chain.block_state(&peer_hashes[last_known_peer_hash_index]) != BlockState::Unknown {
// we have found first block which is known to us
// => blocks in range [(last_known_peer_hash_index + 1)..peer_hashes_len] are unknown
// && must be scheduled for request
let unknown_peer_hashes = peer_hashes.split_off(last_known_peer_hash_index + 1);
chain.schedule_blocks_hashes(unknown_peer_hashes);
match chain.intersect_with_inventory(&inventory) {
InventoryIntersection::NoKnownBlocks(_) if self.state.is_synchronizing() => (),
InventoryIntersection::DbAllBlocksKnown => {
if self.state.is_synchronizing() {
// remember peer as useful
self.peers.insert(peer_index);
break 'outer;
}
if last_known_peer_hash_index == 0 {
// either these are blocks from the future or blocks from the past
// => TODO: ignore this peer during synchronization
return;
},
InventoryIntersection::InMemoryNoNewBlocks => {
// remember peer as useful
self.peers.insert(peer_index);
},
InventoryIntersection::InMemoryMainNewBlocks(new_block_index)
| InventoryIntersection::InMemoryForkNewBlocks(new_block_index)
| InventoryIntersection::DbForkNewBlocks(new_block_index)
| InventoryIntersection::NoKnownBlocks(new_block_index) => {
// schedule new blocks
let new_blocks_hashes = inventory.split_off(new_block_index);
chain.schedule_blocks_hashes(new_blocks_hashes);
// remember peer as useful
self.peers.insert(peer_index);
// switch to synchronization state
if !self.state.is_synchronizing() {
self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number);
}
last_known_peer_hash_index -= 1;
}
}
// move to synchronizing state
if !self.state.is_synchronizing() {
self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number);
}
}
/// Process new peer block
fn process_peer_block(&mut self, block_hash: H256, block: Block) {
// this block is not requested for synchronization
let mut chain = self.chain.write();
let block_position = chain.remove_block_with_state(&block_hash, BlockState::Requested);
if block_position == HashPosition::Missing {
return;
}
fn process_peer_block(&mut self, peer_index: usize, block_hash: H256, block: Block) {
let switch_to_saturated = {
let mut chain = self.chain.write();
match chain.block_state(&block_hash) {
BlockState::Verifying | BlockState::Stored => {
// remember peer as useful
self.peers.insert(peer_index);
},
BlockState::Unknown | BlockState::Scheduled | BlockState::Requested => {
// remove block from current queue
chain.remove_block(&block_hash);
// check parent block state
match chain.block_state(&block.block_header.previous_header_hash) {
BlockState::Unknown => (),
BlockState::Verifying | BlockState::Stored => {
// remember peer as useful
self.peers.insert(peer_index);
// schedule verification
let mut blocks: VecDeque<(H256, Block)> = VecDeque::new();
blocks.push_back((block_hash, block));
while let Some((block_hash, block)) = blocks.pop_front() {
// queue block for verification
chain.remove_block(&block_hash);
// requested block is received => move to saturated state if there are no more blocks
if !chain.has_blocks_of_state(BlockState::Scheduled)
&& !chain.has_blocks_of_state(BlockState::Requested)
&& !chain.has_blocks_of_state(BlockState::Verifying) {
self.state = State::Saturated;
}
match self.verification_work_sender {
Some(ref verification_work_sender) => {
chain.verify_block_hash(block_hash.clone());
verification_work_sender
.send(VerificationTask::VerifyBlock(block))
.expect("Verification thread have the same lifetime as `Synchronization`")
},
None => chain.insert_best_block(block)
.expect("Error inserting to db."),
}
// check if this block is next block in the blockchain
if block_position == HashPosition::Front {
// check if this parent of this block is current best_block
let expecting_previous_header_hash = chain.best_block_of_state(BlockState::Verifying)
.unwrap_or_else(|| {
chain.best_block_of_state(BlockState::Stored)
.expect("storage with genesis block is required")
}).hash;
if block.block_header.previous_header_hash != expecting_previous_header_hash {
// TODO: penalize peer
warn!(target: "sync", "Out-of-order block {:?} was dropped. Expecting block with parent hash {:?}", block_hash, expecting_previous_header_hash);
return;
}
// this is next block in the blockchain => queue for verification
// also unwrap all dependent orphan blocks
let mut current_block = block;
let mut current_block_hash = block_hash;
loop {
match self.verification_work_sender {
Some(ref verification_work_sender) => {
chain.verify_block_hash(current_block_hash.clone());
verification_work_sender
.send(VerificationTask::VerifyBlock(current_block))
.expect("Verification thread have the same lifetime as `Synchronization`");
},
None => {
chain.insert_best_block(current_block)
.expect("Error inserting to db.");
// process orphan blocks
if let Entry::Occupied(entry) = self.orphaned_blocks.entry(block_hash) {
let (_, orphaned) = entry.remove_entry();
blocks.extend(orphaned);
}
}
},
BlockState::Requested | BlockState::Scheduled => {
// remember peer as useful
self.peers.insert(peer_index);
// remember as orphan block
self.orphaned_blocks
.entry(block.block_header.previous_header_hash.clone())
.or_insert(Vec::new())
.push((block_hash, block))
}
}
};
// proceed to the next orphaned block
if let Entry::Occupied(orphaned_block_entry) = self.orphaned_blocks.entry(current_block_hash) {
let (_, orphaned_block) = orphaned_block_entry.remove_entry();
current_block = orphaned_block;
current_block_hash = current_block.hash();
}
else {
break;
}
},
}
return;
}
// requested block is received => move to saturated state if there are no more blocks
chain.length_of_state(BlockState::Scheduled) == 0
&& chain.length_of_state(BlockState::Requested) == 0
};
// this block is not the next one => mark it as orphaned
self.orphaned_blocks.insert(block.block_header.previous_header_hash.clone(), block);
if switch_to_saturated {
self.switch_to_saturated_state(true);
}
}
/// Schedule new synchronization tasks, if any.
@ -540,18 +500,11 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
}
}
// TODO: instead of issuing duplicated inventory requests, wait until enough new blocks are verified, then issue
// check if we can query some blocks hashes
let scheduled_hashes_len = chain.length_of_state(BlockState::Scheduled);
if scheduled_hashes_len < MAX_SCHEDULED_HASHES {
if self.state.is_synchronizing() {
tasks.push(Task::RequestBestInventory(idle_peers[0]));
self.peers.on_inventory_requested(idle_peers[0]);
}
else {
tasks.push(Task::RequestInventory(idle_peers[0]));
self.peers.on_inventory_requested(idle_peers[0]);
}
tasks.push(Task::RequestInventory(idle_peers[0]));
self.peers.on_inventory_requested(idle_peers[0]);
}
// check if we can move some blocks from scheduled to requested queue
@ -578,6 +531,27 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
}
}
/// Switch to saturated state
fn switch_to_saturated_state(&mut self, ask_for_inventory: bool) {
self.state = State::Saturated;
self.orphaned_blocks.clear();
self.peers.reset();
{
let mut chain = self.chain.write();
chain.remove_blocks_with_state(BlockState::Requested);
chain.remove_blocks_with_state(BlockState::Scheduled);
}
if ask_for_inventory {
let mut executor = self.executor.lock();
for idle_peer in self.peers.idle_peers() {
self.peers.on_inventory_requested(idle_peer);
executor.execute(Task::RequestInventory(idle_peer));
}
}
}
/// Thread procedure for handling verification tasks
fn verification_worker_proc(sync: Arc<Mutex<Self>>, storage: Arc<db::Store>, work_receiver: Receiver<VerificationTask>) {
let verifier = ChainVerifier::new(storage);
@ -612,6 +586,7 @@ pub mod tests {
use p2p::event_loop;
use test_data;
use db;
use primitives::hash::H256;
fn create_sync() -> (Core, Handle, Arc<Mutex<DummyTaskExecutor>>, Arc<Mutex<SynchronizationClient<DummyTaskExecutor>>>) {
let event_loop = event_loop();
@ -638,13 +613,13 @@ pub mod tests {
let (_, _, executor, sync) = create_sync();
let mut sync = sync.lock();
let block1: Block = "010000006fe28c0ab6f1b372c1a6a246ae63f74f931e8365e15a089c68d6190000000000982051fd1e4ba744bbbe680e1fee14677ba1a3c3540bf7b1cdb606e857233e0e61bc6649ffff001d01e362990101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000".into();
let block2: Block = "010000004860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000d5fdcc541e25de1c7a5addedf24858b8bb665c9f36ef744ee42c316022c90f9bb0bc6649ffff001d08d2bd610101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d010bffffffff0100f2052a010000004341047211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073dee6c89064984f03385237d92167c13e236446b417ab79a0fcae412ae3316b77ac00000000".into();
let block1: Block = test_data::block_h1();
let block2: Block = test_data::block_h2();
sync.on_new_blocks_inventory(5, vec![block1.hash()]);
let tasks = executor.lock().take_tasks();
assert_eq!(tasks.len(), 2);
assert_eq!(tasks[0], Task::RequestBestInventory(5));
assert_eq!(tasks[0], Task::RequestInventory(5));
assert_eq!(tasks[1], Task::RequestBlocks(5, vec![block1.hash()]));
assert!(sync.information().state.is_synchronizing());
assert_eq!(sync.information().orphaned, 0);
@ -654,23 +629,23 @@ pub mod tests {
assert_eq!(sync.information().peers.idle, 0);
assert_eq!(sync.information().peers.active, 1);
// push unknown block => nothing should change
// push unknown block => will be queued as orphan
sync.on_peer_block(5, block2);
assert!(sync.information().state.is_synchronizing());
assert_eq!(sync.information().orphaned, 0);
assert_eq!(sync.information().orphaned, 1);
assert_eq!(sync.information().chain.scheduled, 0);
assert_eq!(sync.information().chain.requested, 1);
assert_eq!(sync.information().chain.stored, 1);
assert_eq!(sync.information().peers.idle, 0);
assert_eq!(sync.information().peers.active, 1);
// push requested block => should be moved to the test storage
// push requested block => should be moved to the test storage && orphan should be moved
sync.on_peer_block(5, block1);
assert!(!sync.information().state.is_synchronizing());
assert_eq!(sync.information().orphaned, 0);
assert_eq!(sync.information().chain.scheduled, 0);
assert_eq!(sync.information().chain.requested, 0);
assert_eq!(sync.information().chain.stored, 2);
assert_eq!(sync.information().chain.stored, 3);
// we have just requested new `inventory` from the peer => peer is forgotten
assert_eq!(sync.information().peers.idle, 0);
assert_eq!(sync.information().peers.active, 0);
@ -681,7 +656,7 @@ pub mod tests {
let (_, _, _, sync) = create_sync();
let mut sync = sync.lock();
let block2: Block = "010000004860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000d5fdcc541e25de1c7a5addedf24858b8bb665c9f36ef744ee42c316022c90f9bb0bc6649ffff001d08d2bd610101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d010bffffffff0100f2052a010000004341047211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073dee6c89064984f03385237d92167c13e236446b417ab79a0fcae412ae3316b77ac00000000".into();
let block2: Block = test_data::block_h169();
sync.on_new_blocks_inventory(5, vec![block2.hash()]);
sync.on_peer_block(5, block2);
@ -702,8 +677,8 @@ pub mod tests {
fn synchronization_parallel_peers() {
let (_, _, executor, sync) = create_sync();
let block1: Block = "010000006fe28c0ab6f1b372c1a6a246ae63f74f931e8365e15a089c68d6190000000000982051fd1e4ba744bbbe680e1fee14677ba1a3c3540bf7b1cdb606e857233e0e61bc6649ffff001d01e362990101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000".into();
let block2: Block = "010000004860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000d5fdcc541e25de1c7a5addedf24858b8bb665c9f36ef744ee42c316022c90f9bb0bc6649ffff001d08d2bd610101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d010bffffffff0100f2052a010000004341047211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073dee6c89064984f03385237d92167c13e236446b417ab79a0fcae412ae3316b77ac00000000".into();
let block1: Block = test_data::block_h1();
let block2: Block = test_data::block_h2();
{
let mut sync = sync.lock();
@ -715,7 +690,7 @@ pub mod tests {
// synchronization has started && new blocks have been requested
let tasks = executor.lock().take_tasks();
assert!(sync.information().state.is_synchronizing());
assert_eq!(tasks, vec![Task::RequestBestInventory(1), Task::RequestBlocks(1, vec![block1.hash()])]);
assert_eq!(tasks, vec![Task::RequestInventory(1), Task::RequestBlocks(1, vec![block1.hash()])]);
}
{
@ -726,7 +701,7 @@ pub mod tests {
// synchronization has started && new blocks have been requested
let tasks = executor.lock().take_tasks();
assert!(sync.information().state.is_synchronizing());
assert_eq!(tasks, vec![Task::RequestBestInventory(2), Task::RequestBlocks(2, vec![block2.hash()])]);
assert_eq!(tasks, vec![Task::RequestInventory(2), Task::RequestBlocks(2, vec![block2.hash()])]);
}
{
@ -750,7 +725,7 @@ pub mod tests {
// request new blocks
{
let mut sync = sync.lock();
sync.on_new_blocks_inventory(1, vec!["0000000000000000000000000000000000000000000000000000000000000000".into()]);
sync.on_new_blocks_inventory(1, vec![H256::from(0)]);
assert!(sync.information().state.is_synchronizing());
}
@ -774,4 +749,21 @@ pub mod tests {
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![]);
}
#[test]
fn synchronization_asks_for_inventory_after_saturating() {
let (_, _, executor, sync) = create_sync();
let mut sync = sync.lock();
let block = test_data::block_h1();
let block_hash = block.hash();
sync.on_new_blocks_inventory(1, vec![block_hash.clone()]);
sync.on_new_blocks_inventory(2, vec![block_hash.clone()]);
executor.lock().take_tasks();
sync.on_peer_block(2, block);
let tasks = executor.lock().take_tasks();
assert_eq!(tasks.len(), 2);
assert!(tasks.iter().any(|t| t == &Task::RequestInventory(1)));
assert!(tasks.iter().any(|t| t == &Task::RequestInventory(2)));
}
}

View File

@ -23,8 +23,6 @@ pub enum Task {
RequestBlocks(usize, Vec<H256>),
/// Request full inventory using block_locator_hashes.
RequestInventory(usize),
/// Request inventory using best block locator only.
RequestBestInventory(usize),
/// Send block.
SendBlock(usize, Block),
/// Send notfound
@ -96,20 +94,6 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
connection.send_getblocks(&getblocks);
}
},
Task::RequestBestInventory(peer_index) => {
let block_locator_hashes = self.chain.read().best_block_locator_hashes();
let getblocks = types::GetBlocks {
version: 0,
block_locator_hashes: block_locator_hashes,
hash_stop: H256::default(),
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
let connection = &mut *connection;
trace!(target: "sync", "Querying best inventory from peer#{}", peer_index);
connection.send_getblocks(&getblocks);
}
},
Task::SendBlock(peer_index, block) => {
let block_message = types::Block {
block: block,

View File

@ -73,11 +73,12 @@ impl Peers {
}
/// Peer has been disconnected
pub fn on_peer_disconnected(&mut self, peer_index: usize) {
pub fn on_peer_disconnected(&mut self, peer_index: usize) -> bool {
self.idle.remove(&peer_index);
self.requests.remove(&peer_index);
self.failures.remove(&peer_index);
self.times.remove(&peer_index);
(self.idle.len() + self.requests.len()) == 0
}
/// Block is received from peer.

View File

@ -62,9 +62,9 @@ impl SynchronizationServer {
fn locate_known_block(&self, block_locator_hashes: Vec<H256>) -> Option<db::BestBlock> {
let chain = self.chain.read();
let storage = chain.storage();
block_locator_hashes.into_iter()
.filter_map(|hash| chain
.storage_block_number(&hash)
.filter_map(|hash| storage.block_number(&hash)
.map(|number| db::BestBlock {
number: number,
hash: hash,
@ -93,15 +93,19 @@ impl SynchronizationServer {
(peer_index, ServerTask::ServeGetData(inventory)) => {
let mut unknown_items: Vec<InventoryVector> = Vec::new();
let mut new_tasks: Vec<ServerTask> = Vec::new();
for item in inventory {
match item.inv_type {
InventoryType::MessageBlock => {
match chain.read().storage_block_number(&item.hash) {
Some(_) => new_tasks.push(ServerTask::ReturnBlock(item.hash.clone())),
None => unknown_items.push(item),
}
},
_ => (), // TODO: process other inventory types
{
let chain = chain.read();
let storage = chain.storage();
for item in inventory {
match item.inv_type {
InventoryType::MessageBlock => {
match storage.block_number(&item.hash) {
Some(_) => new_tasks.push(ServerTask::ReturnBlock(item.hash.clone())),
None => unknown_items.push(item),
}
},
_ => (), // TODO: process other inventory types
}
}
}
// respond with `notfound` message for unknown data
@ -129,10 +133,17 @@ impl SynchronizationServer {
},
// `getheaders` => `headers`
(peer_index, ServerTask::ServeGetHeaders(best_block, hash_stop)) => {
// What if we have no common blocks with peer at all? Maybe drop connection or penalize peer?
// https://github.com/ethcore/parity-bitcoin/pull/91#discussion_r86734568
let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_block, &hash_stop, 2000);
if !blocks_hashes.is_empty() {
trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_hashes.len(), peer_index);
let blocks_headers = blocks_hashes.into_iter().filter_map(|hash| chain.read().storage_block(&hash).map(|block| block.block_header)).collect();
let chain = chain.read();
let storage = chain.storage();
// TODO: read block_header only
let blocks_headers = blocks_hashes.into_iter()
.filter_map(|hash| storage.block(db::BlockRef::Hash(hash)).map(|block| block.block_header))
.collect();
executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers));
}
},
@ -158,10 +169,9 @@ impl SynchronizationServer {
},
// `block`
(peer_index, ServerTask::ReturnBlock(block_hash)) => {
let storage_block = chain.read().storage_block(&block_hash);
if let Some(storage_block) = storage_block {
executor.lock().execute(Task::SendBlock(peer_index, storage_block));
}
let block = chain.read().storage().block(db::BlockRef::Hash(block_hash))
.expect("we have checked that block exists in ServeGetData; db is append-only; qed");
executor.lock().execute(Task::SendBlock(peer_index, block));
},
},
// no tasks after wake-up => stopping
@ -172,7 +182,9 @@ impl SynchronizationServer {
fn blocks_hashes_after(chain: &ChainRef, best_block: &db::BestBlock, hash_stop: &H256, max_hashes: u32) -> Vec<H256> {
let mut hashes: Vec<H256> = Vec::new();
let storage_block_hash = chain.read().storage_block_hash(best_block.number);
let chain = chain.read();
let storage = chain.storage();
let storage_block_hash = storage.block_hash(best_block.number);
if let Some(hash) = storage_block_hash {
// check that chain has not reorganized since task was queued
if hash == best_block.hash {
@ -180,7 +192,7 @@ impl SynchronizationServer {
let last_block_number = first_block_number + max_hashes;
// `max_hashes` hashes after best_block.number OR hash_stop OR blockchain end
for block_number in first_block_number..last_block_number {
match chain.read().storage_block_hash(block_number) {
match storage.block_hash(block_number) {
Some(ref block_hash) if block_hash == hash_stop => break,
None => break,
Some(block_hash) => {