ongoing sync refactoring
This commit is contained in:
parent
24803433b3
commit
32e21d6e37
|
@ -2,6 +2,7 @@ use chain;
|
||||||
use primitives::hash::H256;
|
use primitives::hash::H256;
|
||||||
use serialization::Serializable;
|
use serialization::Serializable;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
pub struct IndexedBlock {
|
pub struct IndexedBlock {
|
||||||
header: chain::BlockHeader,
|
header: chain::BlockHeader,
|
||||||
header_hash: H256,
|
header_hash: H256,
|
||||||
|
|
|
@ -19,12 +19,13 @@ impl BlocksWriter {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn append_block(&mut self, block: chain::Block) -> Result<(), Error> {
|
pub fn append_block(&mut self, block: chain::Block) -> Result<(), Error> {
|
||||||
|
let indexed_block: db::IndexedBlock = block.into();
|
||||||
// TODO: share same verification code with synchronization_client
|
// TODO: share same verification code with synchronization_client
|
||||||
if self.storage.best_block().map_or(false, |bb| bb.hash != block.block_header.previous_header_hash) {
|
if self.storage.best_block().map_or(false, |bb| bb.hash != indexed_block.header().previous_header_hash) {
|
||||||
return Err(Error::OutOfOrderBlock);
|
return Err(Error::OutOfOrderBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.verifier.verify(&block) {
|
match self.verifier.verify(&indexed_block) {
|
||||||
Err(err) => Err(Error::Verification(err)),
|
Err(err) => Err(Error::Verification(err)),
|
||||||
Ok(_chain) => { try!(self.storage.insert_block(&block).map_err(Error::Database)); Ok(()) }
|
Ok(_chain) => { try!(self.storage.insert_block(&block).map_err(Error::Database)); Ok(()) }
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,13 +4,14 @@ use linked_hash_map::LinkedHashMap;
|
||||||
use time;
|
use time;
|
||||||
use chain::Block;
|
use chain::Block;
|
||||||
use primitives::hash::H256;
|
use primitives::hash::H256;
|
||||||
|
use db::IndexedBlock;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
/// Storage for blocks, for which we have no parent yet.
|
/// Storage for blocks, for which we have no parent yet.
|
||||||
/// Blocks from this storage are either moved to verification queue, or removed at all.
|
/// Blocks from this storage are either moved to verification queue, or removed at all.
|
||||||
pub struct OrphanBlocksPool {
|
pub struct OrphanBlocksPool {
|
||||||
/// Blocks from requested_hashes, but received out-of-order.
|
/// Blocks from requested_hashes, but received out-of-order.
|
||||||
orphaned_blocks: HashMap<H256, HashMap<H256, Block>>,
|
orphaned_blocks: HashMap<H256, HashMap<H256, IndexedBlock>>,
|
||||||
/// Blocks that we have received without requesting with receiving time.
|
/// Blocks that we have received without requesting with receiving time.
|
||||||
unknown_blocks: LinkedHashMap<H256, f64>,
|
unknown_blocks: LinkedHashMap<H256, f64>,
|
||||||
}
|
}
|
||||||
|
@ -41,7 +42,7 @@ impl OrphanBlocksPool {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Insert orphaned block, for which we have already requested its parent block
|
/// Insert orphaned block, for which we have already requested its parent block
|
||||||
pub fn insert_orphaned_block(&mut self, hash: H256, block: Block) {
|
pub fn insert_orphaned_block(&mut self, hash: H256, block: IndexedBlock) {
|
||||||
self.orphaned_blocks
|
self.orphaned_blocks
|
||||||
.entry(block.block_header.previous_header_hash.clone())
|
.entry(block.block_header.previous_header_hash.clone())
|
||||||
.or_insert_with(HashMap::new)
|
.or_insert_with(HashMap::new)
|
||||||
|
@ -49,7 +50,7 @@ impl OrphanBlocksPool {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Insert unknown block, for which we know nothing about its parent block
|
/// Insert unknown block, for which we know nothing about its parent block
|
||||||
pub fn insert_unknown_block(&mut self, hash: H256, block: Block) {
|
pub fn insert_unknown_block(&mut self, hash: H256, block: IndexedBlock) {
|
||||||
let previous_value = self.unknown_blocks.insert(hash.clone(), time::precise_time_s());
|
let previous_value = self.unknown_blocks.insert(hash.clone(), time::precise_time_s());
|
||||||
assert_eq!(previous_value, None);
|
assert_eq!(previous_value, None);
|
||||||
|
|
||||||
|
@ -67,7 +68,7 @@ impl OrphanBlocksPool {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove all blocks, depending on this parent
|
/// Remove all blocks, depending on this parent
|
||||||
pub fn remove_blocks_for_parent(&mut self, hash: &H256) -> Vec<(H256, Block)> {
|
pub fn remove_blocks_for_parent(&mut self, hash: &H256) -> Vec<(H256, IndexedBlock)> {
|
||||||
let mut queue: VecDeque<H256> = VecDeque::new();
|
let mut queue: VecDeque<H256> = VecDeque::new();
|
||||||
queue.push_back(hash.clone());
|
queue.push_back(hash.clone());
|
||||||
|
|
||||||
|
@ -86,7 +87,7 @@ impl OrphanBlocksPool {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove blocks with given hashes + all dependent blocks
|
/// Remove blocks with given hashes + all dependent blocks
|
||||||
pub fn remove_blocks(&mut self, hashes: &HashSet<H256>) -> Vec<(H256, Block)> {
|
pub fn remove_blocks(&mut self, hashes: &HashSet<H256>) -> Vec<(H256, IndexedBlock)> {
|
||||||
// TODO: excess clone
|
// TODO: excess clone
|
||||||
let mut removed: Vec<(H256, Block)> = Vec::new();
|
let mut removed: Vec<(H256, Block)> = Vec::new();
|
||||||
let parent_orphan_keys: Vec<_> = self.orphaned_blocks.keys().cloned().collect();
|
let parent_orphan_keys: Vec<_> = self.orphaned_blocks.keys().cloned().collect();
|
||||||
|
|
|
@ -4,7 +4,7 @@ use std::collections::VecDeque;
|
||||||
use linked_hash_map::LinkedHashMap;
|
use linked_hash_map::LinkedHashMap;
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use chain::{Block, BlockHeader, Transaction};
|
use chain::{Block, BlockHeader, Transaction};
|
||||||
use db;
|
use db::{self, IndexedBlock};
|
||||||
use best_headers_chain::{BestHeadersChain, Information as BestHeadersInformation};
|
use best_headers_chain::{BestHeadersChain, Information as BestHeadersInformation};
|
||||||
use primitives::bytes::Bytes;
|
use primitives::bytes::Bytes;
|
||||||
use primitives::hash::H256;
|
use primitives::hash::H256;
|
||||||
|
@ -309,7 +309,7 @@ impl Chain {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Insert new best block to storage
|
/// Insert new best block to storage
|
||||||
pub fn insert_best_block(&mut self, hash: H256, block: &Block) -> Result<BlockInsertionResult, db::Error> {
|
pub fn insert_best_block(&mut self, hash: H256, block: &IndexedBlock) -> Result<BlockInsertionResult, db::Error> {
|
||||||
let is_appending_to_main_branch = self.best_storage_block.hash == block.block_header.previous_header_hash;
|
let is_appending_to_main_branch = self.best_storage_block.hash == block.block_header.previous_header_hash;
|
||||||
|
|
||||||
// insert to storage
|
// insert to storage
|
||||||
|
@ -353,7 +353,7 @@ impl Chain {
|
||||||
// all transactions from this block were accepted
|
// all transactions from this block were accepted
|
||||||
// + all transactions from previous blocks of this fork were accepted
|
// + all transactions from previous blocks of this fork were accepted
|
||||||
// => delete accepted transactions from verification queue and from the memory pool
|
// => delete accepted transactions from verification queue and from the memory pool
|
||||||
let this_block_transactions_hashes = block.transactions.iter().map(|tx| tx.hash());
|
let this_block_transactions_hashes = block.transaction_hashes();
|
||||||
let mut canonized_blocks_hashes: Vec<H256> = Vec::new();
|
let mut canonized_blocks_hashes: Vec<H256> = Vec::new();
|
||||||
let mut new_main_blocks_transactions_hashes: Vec<H256> = Vec::new();
|
let mut new_main_blocks_transactions_hashes: Vec<H256> = Vec::new();
|
||||||
while let Some(canonized_block_hash) = reorganization.pop_canonized() {
|
while let Some(canonized_block_hash) = reorganization.pop_canonized() {
|
||||||
|
|
|
@ -7,7 +7,7 @@ use futures::{BoxFuture, Future, finished};
|
||||||
use futures::stream::Stream;
|
use futures::stream::Stream;
|
||||||
use tokio_core::reactor::{Handle, Interval};
|
use tokio_core::reactor::{Handle, Interval};
|
||||||
use futures_cpupool::CpuPool;
|
use futures_cpupool::CpuPool;
|
||||||
use db;
|
use db::{self, IndexedBlock};
|
||||||
use chain::{Block, BlockHeader, Transaction};
|
use chain::{Block, BlockHeader, Transaction};
|
||||||
use message::types;
|
use message::types;
|
||||||
use message::common::{InventoryVector, InventoryType};
|
use message::common::{InventoryVector, InventoryType};
|
||||||
|
@ -191,7 +191,7 @@ pub trait Client : Send + 'static {
|
||||||
fn on_new_transactions_inventory(&mut self, peer_index: usize, transactions_hashes: Vec<H256>);
|
fn on_new_transactions_inventory(&mut self, peer_index: usize, transactions_hashes: Vec<H256>);
|
||||||
fn on_new_blocks_headers(&mut self, peer_index: usize, blocks_headers: Vec<BlockHeader>);
|
fn on_new_blocks_headers(&mut self, peer_index: usize, blocks_headers: Vec<BlockHeader>);
|
||||||
fn on_peer_blocks_notfound(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
|
fn on_peer_blocks_notfound(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
|
||||||
fn on_peer_block(&mut self, peer_index: usize, block: Block);
|
fn on_peer_block(&mut self, peer_index: usize, block: IndexedBlock);
|
||||||
fn on_peer_transaction(&mut self, peer_index: usize, transaction: Transaction);
|
fn on_peer_transaction(&mut self, peer_index: usize, transaction: Transaction);
|
||||||
fn on_peer_filterload(&mut self, peer_index: usize, message: &types::FilterLoad);
|
fn on_peer_filterload(&mut self, peer_index: usize, message: &types::FilterLoad);
|
||||||
fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd);
|
fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd);
|
||||||
|
@ -212,7 +212,7 @@ pub trait ClientCore : VerificationSink {
|
||||||
fn on_new_transactions_inventory(&mut self, peer_index: usize, transactions_hashes: Vec<H256>);
|
fn on_new_transactions_inventory(&mut self, peer_index: usize, transactions_hashes: Vec<H256>);
|
||||||
fn on_new_blocks_headers(&mut self, peer_index: usize, blocks_headers: Vec<BlockHeader>);
|
fn on_new_blocks_headers(&mut self, peer_index: usize, blocks_headers: Vec<BlockHeader>);
|
||||||
fn on_peer_blocks_notfound(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
|
fn on_peer_blocks_notfound(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
|
||||||
fn on_peer_block(&mut self, peer_index: usize, block: Block) -> Option<VecDeque<(H256, Block)>>;
|
fn on_peer_block(&mut self, peer_index: usize, block: IndexedBlock) -> Option<VecDeque<(H256, IndexedBlock)>>;
|
||||||
fn on_peer_transaction(&mut self, peer_index: usize, transaction: Transaction) -> Option<VecDeque<(H256, Transaction)>>;
|
fn on_peer_transaction(&mut self, peer_index: usize, transaction: Transaction) -> Option<VecDeque<(H256, Transaction)>>;
|
||||||
fn on_peer_filterload(&mut self, peer_index: usize, message: &types::FilterLoad);
|
fn on_peer_filterload(&mut self, peer_index: usize, message: &types::FilterLoad);
|
||||||
fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd);
|
fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd);
|
||||||
|
@ -371,8 +371,8 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
|
||||||
self.core.lock().on_peer_blocks_notfound(peer_index, blocks_hashes);
|
self.core.lock().on_peer_blocks_notfound(peer_index, blocks_hashes);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_peer_block(&mut self, peer_index: usize, block: Block) {
|
fn on_peer_block(&mut self, peer_index: usize, block: IndexedBlock) {
|
||||||
let blocks_to_verify = { self.core.lock().on_peer_block(peer_index, block) };
|
let blocks_to_verify = self.core.lock().on_peer_block(peer_index, block);
|
||||||
|
|
||||||
// verify selected blocks
|
// verify selected blocks
|
||||||
if let Some(mut blocks_to_verify) = blocks_to_verify {
|
if let Some(mut blocks_to_verify) = blocks_to_verify {
|
||||||
|
@ -609,13 +609,13 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process new block.
|
/// Process new block.
|
||||||
fn on_peer_block(&mut self, peer_index: usize, block: Block) -> Option<VecDeque<(H256, Block)>> {
|
fn on_peer_block(&mut self, peer_index: usize, block: IndexedBlock) -> Option<VecDeque<(H256, IndexedBlock)>> {
|
||||||
let block_hash = block.hash();
|
let block_hash = block.hash();
|
||||||
|
|
||||||
// update peers to select next tasks
|
// update peers to select next tasks
|
||||||
self.peers.on_block_received(peer_index, &block_hash);
|
self.peers.on_block_received(peer_index, &block_hash);
|
||||||
|
|
||||||
self.process_peer_block(peer_index, block_hash, block)
|
self.process_peer_block(peer_index, block_hash.clone(), block)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process new transaction.
|
/// Process new transaction.
|
||||||
|
@ -776,7 +776,7 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
|
|
||||||
impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor {
|
impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
/// Process successful block verification
|
/// Process successful block verification
|
||||||
fn on_block_verification_success(&mut self, block: Block) {
|
fn on_block_verification_success(&mut self, block: IndexedBlock) {
|
||||||
let hash = block.hash();
|
let hash = block.hash();
|
||||||
// insert block to the storage
|
// insert block to the storage
|
||||||
match {
|
match {
|
||||||
|
@ -1114,9 +1114,9 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process new peer block
|
/// Process new peer block
|
||||||
fn process_peer_block(&mut self, peer_index: usize, block_hash: H256, block: Block) -> Option<VecDeque<(H256, Block)>> {
|
fn process_peer_block(&mut self, peer_index: usize, block_hash: H256, block: IndexedBlock) -> Option<VecDeque<(H256, IndexedBlock)>> {
|
||||||
// prepare list of blocks to verify + make all required changes to the chain
|
// prepare list of blocks to verify + make all required changes to the chain
|
||||||
let mut result: Option<VecDeque<(H256, Block)>> = None;
|
let mut result: Option<VecDeque<(H256, IndexedBlock)>> = None;
|
||||||
let mut chain = self.chain.write();
|
let mut chain = self.chain.write();
|
||||||
match chain.block_state(&block_hash) {
|
match chain.block_state(&block_hash) {
|
||||||
BlockState::Verifying | BlockState::Stored => {
|
BlockState::Verifying | BlockState::Stored => {
|
||||||
|
@ -1125,7 +1125,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
},
|
},
|
||||||
BlockState::Unknown | BlockState::Scheduled | BlockState::Requested => {
|
BlockState::Unknown | BlockState::Scheduled | BlockState::Requested => {
|
||||||
// check parent block state
|
// check parent block state
|
||||||
match chain.block_state(&block.block_header.previous_header_hash) {
|
match chain.block_state(&block.header().previous_header_hash) {
|
||||||
BlockState::Unknown => {
|
BlockState::Unknown => {
|
||||||
if self.state.is_synchronizing() {
|
if self.state.is_synchronizing() {
|
||||||
// when synchronizing, we tend to receive all blocks in-order
|
// when synchronizing, we tend to receive all blocks in-order
|
||||||
|
@ -1153,14 +1153,14 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
// remember peer as useful
|
// remember peer as useful
|
||||||
self.peers.useful_peer(peer_index);
|
self.peers.useful_peer(peer_index);
|
||||||
// schedule verification
|
// schedule verification
|
||||||
let mut blocks_to_verify: VecDeque<(H256, Block)> = VecDeque::new();
|
let mut blocks_to_verify: VecDeque<(H256, IndexedBlock)> = VecDeque::new();
|
||||||
blocks_to_verify.push_back((block_hash.clone(), block));
|
blocks_to_verify.push_back((block_hash.clone(), block));
|
||||||
blocks_to_verify.extend(self.orphaned_blocks_pool.remove_blocks_for_parent(&block_hash));
|
blocks_to_verify.extend(self.orphaned_blocks_pool.remove_blocks_for_parent(&block_hash));
|
||||||
// forget blocks we are going to process
|
// forget blocks we are going to process
|
||||||
let blocks_hashes_to_forget: Vec<_> = blocks_to_verify.iter().map(|t| t.0.clone()).collect();
|
let blocks_hashes_to_forget: Vec<_> = blocks_to_verify.iter().map(|t| t.0.clone()).collect();
|
||||||
chain.forget_blocks_leave_header(&blocks_hashes_to_forget);
|
chain.forget_blocks_leave_header(&blocks_hashes_to_forget);
|
||||||
// remember that we are verifying these blocks
|
// remember that we are verifying these blocks
|
||||||
let blocks_headers_to_verify: Vec<_> = blocks_to_verify.iter().map(|&(ref h, ref b)| (h.clone(), b.block_header.clone())).collect();
|
let blocks_headers_to_verify: Vec<_> = blocks_to_verify.iter().map(|&(ref h, ref b)| (h.clone(), b.header().clone())).collect();
|
||||||
chain.verify_blocks(blocks_headers_to_verify);
|
chain.verify_blocks(blocks_headers_to_verify);
|
||||||
// remember that we are verifying block from this peer
|
// remember that we are verifying block from this peer
|
||||||
self.verifying_blocks_by_peer.insert(block_hash.clone(), peer_index);
|
self.verifying_blocks_by_peer.insert(block_hash.clone(), peer_index);
|
||||||
|
|
|
@ -7,11 +7,12 @@ use network::{Magic, ConsensusParams};
|
||||||
use primitives::hash::H256;
|
use primitives::hash::H256;
|
||||||
use verification::{ChainVerifier, Verify as VerificationVerify};
|
use verification::{ChainVerifier, Verify as VerificationVerify};
|
||||||
use synchronization_chain::ChainRef;
|
use synchronization_chain::ChainRef;
|
||||||
|
use db::IndexedBlock;
|
||||||
|
|
||||||
/// Verification events sink
|
/// Verification events sink
|
||||||
pub trait VerificationSink : Send + 'static {
|
pub trait VerificationSink : Send + 'static {
|
||||||
/// When block verification has completed successfully.
|
/// When block verification has completed successfully.
|
||||||
fn on_block_verification_success(&mut self, block: Block);
|
fn on_block_verification_success(&mut self, block: IndexedBlock);
|
||||||
/// When block verification has failed.
|
/// When block verification has failed.
|
||||||
fn on_block_verification_error(&mut self, err: &str, hash: &H256);
|
fn on_block_verification_error(&mut self, err: &str, hash: &H256);
|
||||||
/// When transaction verification has completed successfully.
|
/// When transaction verification has completed successfully.
|
||||||
|
@ -23,7 +24,7 @@ pub trait VerificationSink : Send + 'static {
|
||||||
/// Verification thread tasks
|
/// Verification thread tasks
|
||||||
enum VerificationTask {
|
enum VerificationTask {
|
||||||
/// Verify single block
|
/// Verify single block
|
||||||
VerifyBlock(Block),
|
VerifyBlock(IndexedBlock),
|
||||||
/// Verify single transaction
|
/// Verify single transaction
|
||||||
VerifyTransaction(Transaction),
|
VerifyTransaction(Transaction),
|
||||||
/// Stop verification thread
|
/// Stop verification thread
|
||||||
|
@ -33,7 +34,7 @@ enum VerificationTask {
|
||||||
/// Synchronization verifier
|
/// Synchronization verifier
|
||||||
pub trait Verifier : Send + 'static {
|
pub trait Verifier : Send + 'static {
|
||||||
/// Verify block
|
/// Verify block
|
||||||
fn verify_block(&self, block: Block);
|
fn verify_block(&self, block: IndexedBlock);
|
||||||
/// Verify transaction
|
/// Verify transaction
|
||||||
fn verify_transaction(&self, transaction: Transaction);
|
fn verify_transaction(&self, transaction: Transaction);
|
||||||
}
|
}
|
||||||
|
@ -73,7 +74,7 @@ impl AsyncVerifier {
|
||||||
match task {
|
match task {
|
||||||
VerificationTask::VerifyBlock(block) => {
|
VerificationTask::VerifyBlock(block) => {
|
||||||
// for changes that are not relying on block#
|
// for changes that are not relying on block#
|
||||||
let is_bip16_active_on_block = block.block_header.time >= bip16_time_border;
|
let is_bip16_active_on_block = block.header().time >= bip16_time_border;
|
||||||
let force_parameters_change = is_bip16_active_on_block != is_bip16_active;
|
let force_parameters_change = is_bip16_active_on_block != is_bip16_active;
|
||||||
if force_parameters_change {
|
if force_parameters_change {
|
||||||
parameters_change_steps = Some(0);
|
parameters_change_steps = Some(0);
|
||||||
|
@ -132,7 +133,7 @@ impl Drop for AsyncVerifier {
|
||||||
|
|
||||||
impl Verifier for AsyncVerifier {
|
impl Verifier for AsyncVerifier {
|
||||||
/// Verify block
|
/// Verify block
|
||||||
fn verify_block(&self, block: Block) {
|
fn verify_block(&self, block: IndexedBlock) {
|
||||||
self.verification_work_sender
|
self.verification_work_sender
|
||||||
.send(VerificationTask::VerifyBlock(block))
|
.send(VerificationTask::VerifyBlock(block))
|
||||||
.expect("Verification thread have the same lifetime as `AsyncVerifier`");
|
.expect("Verification thread have the same lifetime as `AsyncVerifier`");
|
||||||
|
|
Loading…
Reference in New Issue