Merge branch 'master' of github.com:ethcore/parity-bitcoin into sync_block_writer_up
This commit is contained in:
commit
4713c1797c
|
@ -64,7 +64,7 @@ pub type SharedStore = std::sync::Arc<Store + Send + Sync>;
|
||||||
|
|
||||||
pub use best_block::BestBlock;
|
pub use best_block::BestBlock;
|
||||||
pub use storage::{Storage, Store};
|
pub use storage::{Storage, Store};
|
||||||
pub use error::Error;
|
pub use error::{Error, ConsistencyError};
|
||||||
pub use kvdb::Database;
|
pub use kvdb::Database;
|
||||||
pub use transaction_provider::{TransactionProvider, AsTransactionProvider, PreviousTransactionOutputProvider};
|
pub use transaction_provider::{TransactionProvider, AsTransactionProvider, PreviousTransactionOutputProvider};
|
||||||
pub use transaction_meta_provider::TransactionMetaProvider;
|
pub use transaction_meta_provider::TransactionMetaProvider;
|
||||||
|
|
|
@ -224,7 +224,7 @@ impl Storage {
|
||||||
/// all transaction meta is removed
|
/// all transaction meta is removed
|
||||||
/// DOES NOT update best block
|
/// DOES NOT update best block
|
||||||
fn decanonize_block(&self, context: &mut UpdateContext, hash: &H256) -> Result<(), Error> {
|
fn decanonize_block(&self, context: &mut UpdateContext, hash: &H256) -> Result<(), Error> {
|
||||||
trace!(target: "reorg", "Decanonizing block {}", hash.to_reversed_str());
|
trace!(target: "db", "Decanonizing block {}", hash.to_reversed_str());
|
||||||
|
|
||||||
// ensure that block is of the main chain
|
// ensure that block is of the main chain
|
||||||
try!(self.block_number(hash).ok_or(Error::not_main(hash)));
|
try!(self.block_number(hash).ok_or(Error::not_main(hash)));
|
||||||
|
@ -299,6 +299,8 @@ impl Storage {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn canonize_block(&self, context: &mut UpdateContext, at_height: u32, hash: &H256) -> Result<(), Error> {
|
fn canonize_block(&self, context: &mut UpdateContext, at_height: u32, hash: &H256) -> Result<(), Error> {
|
||||||
|
trace!(target: "db", "Canonizing block {}", hash.to_reversed_str());
|
||||||
|
|
||||||
let block = try!(self.block_by_hash(hash).ok_or(Error::unknown_hash(hash)));
|
let block = try!(self.block_by_hash(hash).ok_or(Error::unknown_hash(hash)));
|
||||||
try!(self.update_transactions_meta(context, at_height, &mut block.transactions()));
|
try!(self.update_transactions_meta(context, at_height, &mut block.transactions()));
|
||||||
|
|
||||||
|
@ -377,7 +379,7 @@ impl Storage {
|
||||||
// lock will be held until the end of the routine
|
// lock will be held until the end of the routine
|
||||||
let mut best_block = self.best_block.write();
|
let mut best_block = self.best_block.write();
|
||||||
|
|
||||||
let mut context = UpdateContext::new(&self.database);
|
let mut context = UpdateContext::new(&self.database, hash);
|
||||||
let mut result = Vec::new();
|
let mut result = Vec::new();
|
||||||
let mut best_number = try!(self.best_number().ok_or(Error::Consistency(ConsistencyError::NoBestBlock)));
|
let mut best_number = try!(self.best_number().ok_or(Error::Consistency(ConsistencyError::NoBestBlock)));
|
||||||
loop {
|
loop {
|
||||||
|
@ -454,7 +456,7 @@ impl BlockStapler for Storage {
|
||||||
// ! lock will be held during the entire insert routine
|
// ! lock will be held during the entire insert routine
|
||||||
let mut best_block = self.best_block.write();
|
let mut best_block = self.best_block.write();
|
||||||
|
|
||||||
let mut context = UpdateContext::new(&self.database);
|
let mut context = UpdateContext::new(&self.database, block.hash());
|
||||||
|
|
||||||
let block_hash = block.hash();
|
let block_hash = block.hash();
|
||||||
|
|
||||||
|
@ -577,6 +579,7 @@ impl BlockStapler for Storage {
|
||||||
// write accumulated transactions meta
|
// write accumulated transactions meta
|
||||||
try!(context.apply(&self.database));
|
try!(context.apply(&self.database));
|
||||||
|
|
||||||
|
trace!(target: "db", "Best block now ({}, {})", &new_best_hash, &new_best_number);
|
||||||
// updating locked best block
|
// updating locked best block
|
||||||
*best_block = Some(BestBlock { hash: new_best_hash, number: new_best_number });
|
*best_block = Some(BestBlock { hash: new_best_hash, number: new_best_number });
|
||||||
|
|
||||||
|
@ -1179,7 +1182,7 @@ mod tests {
|
||||||
.expect("Transaction meta for the genesis coinbase transaction should exist");
|
.expect("Transaction meta for the genesis coinbase transaction should exist");
|
||||||
assert!(genesis_meta.is_spent(0), "Genesis coinbase should be recorded as spent because block#1 transaction spends it");
|
assert!(genesis_meta.is_spent(0), "Genesis coinbase should be recorded as spent because block#1 transaction spends it");
|
||||||
|
|
||||||
let mut update_context = UpdateContext::new(&store.database);
|
let mut update_context = UpdateContext::new(&store.database, &block_hash);
|
||||||
store.decanonize_block(&mut update_context, &block_hash)
|
store.decanonize_block(&mut update_context, &block_hash)
|
||||||
.expect("Decanonizing block #1 which was just inserted should not fail");
|
.expect("Decanonizing block #1 which was just inserted should not fail");
|
||||||
update_context.apply(&store.database).unwrap();
|
update_context.apply(&store.database).unwrap();
|
||||||
|
|
|
@ -26,6 +26,7 @@ struct TestData {
|
||||||
blocks: HashMap<H256, chain::Block>,
|
blocks: HashMap<H256, chain::Block>,
|
||||||
heights: HashMap<u32, H256>,
|
heights: HashMap<u32, H256>,
|
||||||
hashes: HashMap<H256, u32>,
|
hashes: HashMap<H256, u32>,
|
||||||
|
insert_errors: HashMap<H256, Error>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TestStorage {
|
impl TestStorage {
|
||||||
|
@ -61,6 +62,10 @@ impl TestStorage {
|
||||||
let genesis_block: Block = "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a29ab5f49ffff001d1dac2b7c0101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000".into();
|
let genesis_block: Block = "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a29ab5f49ffff001d1dac2b7c0101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000".into();
|
||||||
TestStorage::with_blocks(&vec![genesis_block])
|
TestStorage::with_blocks(&vec![genesis_block])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn insert_error(&mut self, hash: H256, err: Error) {
|
||||||
|
self.data.write().insert_errors.insert(hash, err);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlockProvider for TestStorage {
|
impl BlockProvider for TestStorage {
|
||||||
|
@ -120,6 +125,11 @@ impl BlockStapler for TestStorage {
|
||||||
fn insert_block(&self, block: &chain::Block) -> Result<BlockInsertedChain, Error> {
|
fn insert_block(&self, block: &chain::Block) -> Result<BlockInsertedChain, Error> {
|
||||||
let hash = block.hash();
|
let hash = block.hash();
|
||||||
let mut data = self.data.write();
|
let mut data = self.data.write();
|
||||||
|
|
||||||
|
if let Some(err) = data.insert_errors.remove(&hash) {
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
|
||||||
match data.blocks.entry(hash.clone()) {
|
match data.blocks.entry(hash.clone()) {
|
||||||
Entry::Occupied(mut entry) => {
|
Entry::Occupied(mut entry) => {
|
||||||
replace(entry.get_mut(), block.clone());
|
replace(entry.get_mut(), block.clone());
|
||||||
|
|
|
@ -9,14 +9,16 @@ pub struct UpdateContext {
|
||||||
pub meta: HashMap<H256, TransactionMeta>,
|
pub meta: HashMap<H256, TransactionMeta>,
|
||||||
pub db_transaction: DBTransaction,
|
pub db_transaction: DBTransaction,
|
||||||
meta_snapshot: Option<HashMap<H256, TransactionMeta>>,
|
meta_snapshot: Option<HashMap<H256, TransactionMeta>>,
|
||||||
|
target: H256,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl UpdateContext {
|
impl UpdateContext {
|
||||||
pub fn new(db: &Database) -> Self {
|
pub fn new(db: &Database, target: &H256) -> Self {
|
||||||
UpdateContext {
|
UpdateContext {
|
||||||
meta: HashMap::new(),
|
meta: HashMap::new(),
|
||||||
db_transaction: db.transaction(),
|
db_transaction: db.transaction(),
|
||||||
meta_snapshot: None,
|
meta_snapshot: None,
|
||||||
|
target: target.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,6 +29,8 @@ impl UpdateContext {
|
||||||
}
|
}
|
||||||
|
|
||||||
try!(db.write(self.db_transaction));
|
try!(db.write(self.db_transaction));
|
||||||
|
|
||||||
|
trace!("Applied transaction for block {:?}", &self.target);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -92,7 +92,8 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
|
||||||
self.client.lock().on_new_transactions_inventory(peer_index, transactions_inventory);
|
self.client.lock().on_new_transactions_inventory(peer_index, transactions_inventory);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: process other inventory types
|
// currently we do not setup connection filter => skip InventoryType::MessageFilteredBlock
|
||||||
|
// currently we do not send sendcmpct message => skip InventoryType::MessageCompactBlock
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData) {
|
pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData) {
|
||||||
|
|
|
@ -901,8 +901,6 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
}
|
}
|
||||||
));
|
));
|
||||||
|
|
||||||
// TODO: start management worker only when synchronization is started
|
|
||||||
// currently impossible because there is no way to call Interval::new with Remote && Handle is not-Send
|
|
||||||
{
|
{
|
||||||
let peers_config = ManagePeersConfig::default();
|
let peers_config = ManagePeersConfig::default();
|
||||||
let unknown_config = ManageUnknownBlocksConfig::default();
|
let unknown_config = ManageUnknownBlocksConfig::default();
|
||||||
|
@ -999,7 +997,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
Some(Task::SendCompactBlocks(peer_index, block_header_and_ids, ServerTaskIndex::None))
|
Some(Task::SendCompactBlocks(peer_index, block_header_and_ids))
|
||||||
},
|
},
|
||||||
BlockAnnouncementType::SendInventory => {
|
BlockAnnouncementType::SendInventory => {
|
||||||
let inventory: Vec<_> = new_blocks_hashes.iter()
|
let inventory: Vec<_> = new_blocks_hashes.iter()
|
||||||
|
@ -1010,7 +1008,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
if !inventory.is_empty() {
|
if !inventory.is_empty() {
|
||||||
Some(Task::SendInventory(peer_index, inventory, ServerTaskIndex::None))
|
Some(Task::SendInventory(peer_index, inventory))
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
@ -1040,7 +1038,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
if !inventory.is_empty() {
|
if !inventory.is_empty() {
|
||||||
Some(Task::SendInventory(peer_index, inventory, ServerTaskIndex::None))
|
Some(Task::SendInventory(peer_index, inventory))
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
@ -1191,17 +1189,12 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process new peer transaction
|
/// Process new peer transaction
|
||||||
fn process_peer_transaction(&mut self, peer_index: Option<usize>, hash: H256, transaction: Transaction) -> Option<VecDeque<(H256, Transaction)>> {
|
fn process_peer_transaction(&mut self, _peer_index: Option<usize>, hash: H256, transaction: Transaction) -> Option<VecDeque<(H256, Transaction)>> {
|
||||||
// if we are in synchronization state, we will ignore this message
|
// if we are in synchronization state, we will ignore this message
|
||||||
if self.state.is_synchronizing() {
|
if self.state.is_synchronizing() {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
// mark peer as useful (TODO: remove after self.all_peers() would be all peers, not sync one)
|
|
||||||
if let Some(peer_index) = peer_index {
|
|
||||||
self.peers.useful_peer(peer_index);
|
|
||||||
}
|
|
||||||
|
|
||||||
// else => verify transaction + it's orphans and then add to the memory pool
|
// else => verify transaction + it's orphans and then add to the memory pool
|
||||||
let mut chain = self.chain.write();
|
let mut chain = self.chain.write();
|
||||||
|
|
||||||
|
@ -1548,7 +1541,7 @@ pub mod tests {
|
||||||
assert!(tasks.iter().any(|t| t == &Task::RequestMemoryPool(2)));
|
assert!(tasks.iter().any(|t| t == &Task::RequestMemoryPool(2)));
|
||||||
|
|
||||||
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: block.hash() }];
|
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: block.hash() }];
|
||||||
assert!(tasks.iter().any(|t| t == &Task::SendInventory(1, inventory.clone(), ServerTaskIndex::None)));
|
assert!(tasks.iter().any(|t| t == &Task::SendInventory(1, inventory.clone())));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -1826,7 +1819,19 @@ pub mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn sync_after_db_insert_nonfatal_fail() {
|
fn sync_after_db_insert_nonfatal_fail() {
|
||||||
// TODO: implement me
|
use db::Store;
|
||||||
|
|
||||||
|
let mut storage = db::TestStorage::with_genesis_block();
|
||||||
|
let block = test_data::block_h1();
|
||||||
|
storage.insert_error(block.hash(), db::Error::Consistency(db::ConsistencyError::NoBestBlock));
|
||||||
|
let best_genesis = storage.best_block().unwrap();
|
||||||
|
|
||||||
|
let (_, _, _, chain, sync) = create_sync(Some(Arc::new(storage)), None);
|
||||||
|
let mut sync = sync.lock();
|
||||||
|
|
||||||
|
sync.on_peer_block(1, block.into());
|
||||||
|
|
||||||
|
assert_eq!(chain.read().best_block(), best_genesis);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -2093,7 +2098,7 @@ pub mod tests {
|
||||||
{
|
{
|
||||||
let tasks = executor.lock().take_tasks();
|
let tasks = executor.lock().take_tasks();
|
||||||
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: b2.hash() }];
|
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: b2.hash() }];
|
||||||
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(2), Task::SendInventory(1, inventory, ServerTaskIndex::None)]);
|
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(2), Task::SendInventory(1, inventory)]);
|
||||||
}
|
}
|
||||||
|
|
||||||
sync.on_new_blocks_headers(1, vec![b3.block_header.clone()]);
|
sync.on_new_blocks_headers(1, vec![b3.block_header.clone()]);
|
||||||
|
@ -2103,7 +2108,7 @@ pub mod tests {
|
||||||
{
|
{
|
||||||
let tasks = executor.lock().take_tasks();
|
let tasks = executor.lock().take_tasks();
|
||||||
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: b3.hash() }];
|
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: b3.hash() }];
|
||||||
assert!(tasks.iter().any(|t| t == &Task::SendInventory(2, inventory.clone(), ServerTaskIndex::None)));
|
assert!(tasks.iter().any(|t| t == &Task::SendInventory(2, inventory.clone())));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2111,17 +2116,16 @@ pub mod tests {
|
||||||
fn relay_new_transaction_when_in_saturated_state() {
|
fn relay_new_transaction_when_in_saturated_state() {
|
||||||
let (_, _, executor, _, sync) = create_sync(None, None);
|
let (_, _, executor, _, sync) = create_sync(None, None);
|
||||||
|
|
||||||
let tx1: Transaction = test_data::TransactionBuilder::with_output(10).into();
|
let tx: Transaction = test_data::TransactionBuilder::with_output(20).into();
|
||||||
let tx2: Transaction = test_data::TransactionBuilder::with_output(20).into();
|
let tx_hash = tx.hash();
|
||||||
let tx2_hash = tx2.hash();
|
|
||||||
|
|
||||||
let mut sync = sync.lock();
|
let mut sync = sync.lock();
|
||||||
sync.on_peer_transaction(1, tx1);
|
sync.on_peer_connected(1);
|
||||||
sync.on_peer_transaction(2, tx2);
|
sync.on_peer_transaction(2, tx);
|
||||||
|
|
||||||
let tasks = { executor.lock().take_tasks() };
|
let tasks = { executor.lock().take_tasks() };
|
||||||
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageTx, hash: tx2_hash }];
|
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageTx, hash: tx_hash }];
|
||||||
assert_eq!(tasks, vec![Task::SendInventory(1, inventory, ServerTaskIndex::None)]);
|
assert_eq!(tasks, vec![Task::SendInventory(1, inventory)]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -2162,9 +2166,9 @@ pub mod tests {
|
||||||
let tasks = { executor.lock().take_tasks() };
|
let tasks = { executor.lock().take_tasks() };
|
||||||
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageTx, hash: tx1_hash }];
|
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageTx, hash: tx1_hash }];
|
||||||
assert_eq!(tasks, vec![
|
assert_eq!(tasks, vec![
|
||||||
Task::SendInventory(1, inventory.clone(), ServerTaskIndex::None),
|
Task::SendInventory(1, inventory.clone()),
|
||||||
Task::SendInventory(3, inventory.clone(), ServerTaskIndex::None),
|
Task::SendInventory(3, inventory.clone()),
|
||||||
Task::SendInventory(4, inventory.clone(), ServerTaskIndex::None),
|
Task::SendInventory(4, inventory.clone()),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
// tx2 is relayed to peers: 2, 3, 4
|
// tx2 is relayed to peers: 2, 3, 4
|
||||||
|
@ -2173,9 +2177,9 @@ pub mod tests {
|
||||||
let tasks = { executor.lock().take_tasks() };
|
let tasks = { executor.lock().take_tasks() };
|
||||||
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageTx, hash: tx2_hash }];
|
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageTx, hash: tx2_hash }];
|
||||||
assert_eq!(tasks, vec![
|
assert_eq!(tasks, vec![
|
||||||
Task::SendInventory(2, inventory.clone(), ServerTaskIndex::None),
|
Task::SendInventory(2, inventory.clone()),
|
||||||
Task::SendInventory(3, inventory.clone(), ServerTaskIndex::None),
|
Task::SendInventory(3, inventory.clone()),
|
||||||
Task::SendInventory(4, inventory.clone(), ServerTaskIndex::None),
|
Task::SendInventory(4, inventory.clone()),
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2201,7 +2205,7 @@ pub mod tests {
|
||||||
let headers = vec![b0.block_header.clone()];
|
let headers = vec![b0.block_header.clone()];
|
||||||
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1),
|
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1),
|
||||||
Task::SendHeaders(2, headers, ServerTaskIndex::None),
|
Task::SendHeaders(2, headers, ServerTaskIndex::None),
|
||||||
Task::SendInventory(3, inventory, ServerTaskIndex::None),
|
Task::SendInventory(3, inventory),
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2245,13 +2249,13 @@ pub mod tests {
|
||||||
inv_type: InventoryType::MessageTx,
|
inv_type: InventoryType::MessageTx,
|
||||||
hash: tx1_hash.clone(),
|
hash: tx1_hash.clone(),
|
||||||
}
|
}
|
||||||
], ServerTaskIndex::None),
|
]),
|
||||||
Task::SendInventory(4, vec![
|
Task::SendInventory(4, vec![
|
||||||
InventoryVector {
|
InventoryVector {
|
||||||
inv_type: InventoryType::MessageTx,
|
inv_type: InventoryType::MessageTx,
|
||||||
hash: tx1_hash.clone(),
|
hash: tx1_hash.clone(),
|
||||||
}
|
}
|
||||||
], ServerTaskIndex::None),
|
]),
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2288,9 +2292,9 @@ pub mod tests {
|
||||||
assert_eq!(tasks.len(), 3);
|
assert_eq!(tasks.len(), 3);
|
||||||
assert_eq!(tasks[0], Task::RequestBlocksHeaders(1));
|
assert_eq!(tasks[0], Task::RequestBlocksHeaders(1));
|
||||||
match tasks[1] {
|
match tasks[1] {
|
||||||
Task::SendCompactBlocks(2, _, _) => (),
|
Task::SendCompactBlocks(2, _) => (),
|
||||||
_ => panic!("unexpected task"),
|
_ => panic!("unexpected task"),
|
||||||
}
|
}
|
||||||
assert_eq!(tasks[2], Task::SendInventory(3, inventory, ServerTaskIndex::None));
|
assert_eq!(tasks[2], Task::SendInventory(3, inventory));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,6 @@ pub trait TaskExecutor : Send + 'static {
|
||||||
fn execute(&mut self, task: Task);
|
fn execute(&mut self, task: Task);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: get rid of unneeded ServerTaskIndex-es
|
|
||||||
/// Synchronization task for the peer.
|
/// Synchronization task for the peer.
|
||||||
#[derive(Debug, PartialEq)]
|
#[derive(Debug, PartialEq)]
|
||||||
pub enum Task {
|
pub enum Task {
|
||||||
|
@ -30,7 +29,7 @@ pub enum Task {
|
||||||
/// Request memory pool contents
|
/// Request memory pool contents
|
||||||
RequestMemoryPool(usize),
|
RequestMemoryPool(usize),
|
||||||
/// Send block.
|
/// Send block.
|
||||||
SendBlock(usize, Block, ServerTaskIndex),
|
SendBlock(usize, Block),
|
||||||
/// Send merkleblock
|
/// Send merkleblock
|
||||||
SendMerkleBlock(usize, types::MerkleBlock),
|
SendMerkleBlock(usize, types::MerkleBlock),
|
||||||
/// Send transaction
|
/// Send transaction
|
||||||
|
@ -38,13 +37,13 @@ pub enum Task {
|
||||||
/// Send block transactions
|
/// Send block transactions
|
||||||
SendBlockTxn(usize, H256, Vec<Transaction>),
|
SendBlockTxn(usize, H256, Vec<Transaction>),
|
||||||
/// Send notfound
|
/// Send notfound
|
||||||
SendNotFound(usize, Vec<InventoryVector>, ServerTaskIndex),
|
SendNotFound(usize, Vec<InventoryVector>),
|
||||||
/// Send inventory
|
/// Send inventory
|
||||||
SendInventory(usize, Vec<InventoryVector>, ServerTaskIndex),
|
SendInventory(usize, Vec<InventoryVector>),
|
||||||
/// Send headers
|
/// Send headers
|
||||||
SendHeaders(usize, Vec<BlockHeader>, ServerTaskIndex),
|
SendHeaders(usize, Vec<BlockHeader>, ServerTaskIndex),
|
||||||
/// Send compact blocks
|
/// Send compact blocks
|
||||||
SendCompactBlocks(usize, Vec<BlockHeaderAndIDs>, ServerTaskIndex),
|
SendCompactBlocks(usize, Vec<BlockHeaderAndIDs>),
|
||||||
/// Notify io about ignored request
|
/// Notify io about ignored request
|
||||||
Ignore(usize, u32),
|
Ignore(usize, u32),
|
||||||
}
|
}
|
||||||
|
@ -130,13 +129,12 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
|
||||||
connection.send_getdata(&getdata);
|
connection.send_getdata(&getdata);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Task::SendBlock(peer_index, block, id) => {
|
Task::SendBlock(peer_index, block) => {
|
||||||
let block_message = types::Block {
|
let block_message = types::Block {
|
||||||
block: block,
|
block: block,
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
||||||
assert_eq!(id.raw(), None);
|
|
||||||
trace!(target: "sync", "Sending block {:?} to peer#{}", block_message.block.hash().to_reversed_str(), peer_index);
|
trace!(target: "sync", "Sending block {:?} to peer#{}", block_message.block.hash().to_reversed_str(), peer_index);
|
||||||
connection.send_block(&block_message);
|
connection.send_block(&block_message);
|
||||||
}
|
}
|
||||||
|
@ -170,24 +168,22 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
|
||||||
connection.send_block_txn(&transactions_message);
|
connection.send_block_txn(&transactions_message);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Task::SendNotFound(peer_index, unknown_inventory, id) => {
|
Task::SendNotFound(peer_index, unknown_inventory) => {
|
||||||
let notfound = types::NotFound {
|
let notfound = types::NotFound {
|
||||||
inventory: unknown_inventory,
|
inventory: unknown_inventory,
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
||||||
assert_eq!(id.raw(), None);
|
|
||||||
trace!(target: "sync", "Sending notfound to peer#{} with {} items", peer_index, notfound.inventory.len());
|
trace!(target: "sync", "Sending notfound to peer#{} with {} items", peer_index, notfound.inventory.len());
|
||||||
connection.send_notfound(¬found);
|
connection.send_notfound(¬found);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Task::SendInventory(peer_index, inventory, id) => {
|
Task::SendInventory(peer_index, inventory) => {
|
||||||
let inventory = types::Inv {
|
let inventory = types::Inv {
|
||||||
inventory: inventory,
|
inventory: inventory,
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
||||||
assert_eq!(id.raw(), None);
|
|
||||||
trace!(target: "sync", "Sending inventory to peer#{} with {} items", peer_index, inventory.inventory.len());
|
trace!(target: "sync", "Sending inventory to peer#{} with {} items", peer_index, inventory.inventory.len());
|
||||||
connection.send_inventory(&inventory);
|
connection.send_inventory(&inventory);
|
||||||
}
|
}
|
||||||
|
@ -205,9 +201,8 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Task::SendCompactBlocks(peer_index, compact_blocks, id) => {
|
Task::SendCompactBlocks(peer_index, compact_blocks) => {
|
||||||
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
||||||
assert_eq!(id.raw(), None);
|
|
||||||
for compact_block in compact_blocks {
|
for compact_block in compact_blocks {
|
||||||
trace!(target: "sync", "Sending compact_block {:?} to peer#{}", compact_block.header.hash(), peer_index);
|
trace!(target: "sync", "Sending compact_block {:?} to peer#{}", compact_block.header.hash(), peer_index);
|
||||||
connection.send_compact_block(&types::CompactBlock {
|
connection.send_compact_block(&types::CompactBlock {
|
||||||
|
|
|
@ -224,7 +224,7 @@ impl SynchronizationServer {
|
||||||
inv_type: InventoryType::MessageBlock,
|
inv_type: InventoryType::MessageBlock,
|
||||||
hash: hash,
|
hash: hash,
|
||||||
}).collect();
|
}).collect();
|
||||||
executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id));
|
executor.lock().execute(Task::SendInventory(peer_index, inventory));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
@ -309,7 +309,7 @@ impl SynchronizationServer {
|
||||||
.collect();
|
.collect();
|
||||||
if !inventory.is_empty() {
|
if !inventory.is_empty() {
|
||||||
trace!(target: "sync", "Going to respond with {} memory-pool transactions ids to peer#{}", inventory.len(), peer_index);
|
trace!(target: "sync", "Going to respond with {} memory-pool transactions ids to peer#{}", inventory.len(), peer_index);
|
||||||
executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id));
|
executor.lock().execute(Task::SendInventory(peer_index, inventory));
|
||||||
} else {
|
} else {
|
||||||
assert_eq!(indexed_task.id, ServerTaskIndex::None);
|
assert_eq!(indexed_task.id, ServerTaskIndex::None);
|
||||||
}
|
}
|
||||||
|
@ -318,7 +318,7 @@ impl SynchronizationServer {
|
||||||
},
|
},
|
||||||
// `notfound`
|
// `notfound`
|
||||||
ServerTask::ReturnNotFound(inventory) => {
|
ServerTask::ReturnNotFound(inventory) => {
|
||||||
executor.lock().execute(Task::SendNotFound(peer_index, inventory, indexed_task.id));
|
executor.lock().execute(Task::SendNotFound(peer_index, inventory));
|
||||||
// inform that we have processed task for peer
|
// inform that we have processed task for peer
|
||||||
queue.lock().task_processed(peer_index);
|
queue.lock().task_processed(peer_index);
|
||||||
},
|
},
|
||||||
|
@ -326,7 +326,7 @@ impl SynchronizationServer {
|
||||||
ServerTask::ReturnBlock(block_hash) => {
|
ServerTask::ReturnBlock(block_hash) => {
|
||||||
let block = chain.read().storage().block(db::BlockRef::Hash(block_hash))
|
let block = chain.read().storage().block(db::BlockRef::Hash(block_hash))
|
||||||
.expect("we have checked that block exists in ServeGetData; db is append-only; qed");
|
.expect("we have checked that block exists in ServeGetData; db is append-only; qed");
|
||||||
executor.lock().execute(Task::SendBlock(peer_index, block, indexed_task.id));
|
executor.lock().execute(Task::SendBlock(peer_index, block));
|
||||||
// inform that we have processed task for peer
|
// inform that we have processed task for peer
|
||||||
queue.lock().task_processed(peer_index);
|
queue.lock().task_processed(peer_index);
|
||||||
},
|
},
|
||||||
|
@ -609,7 +609,7 @@ pub mod tests {
|
||||||
server.serve_getdata(0, FilteredInventory::with_unfiltered(inventory.clone())).map(|t| server.add_task(0, t));
|
server.serve_getdata(0, FilteredInventory::with_unfiltered(inventory.clone())).map(|t| server.add_task(0, t));
|
||||||
// => respond with notfound
|
// => respond with notfound
|
||||||
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
||||||
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::None)]);
|
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory)]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -625,7 +625,7 @@ pub mod tests {
|
||||||
server.serve_getdata(0, FilteredInventory::with_unfiltered(inventory)).map(|t| server.add_task(0, t));
|
server.serve_getdata(0, FilteredInventory::with_unfiltered(inventory)).map(|t| server.add_task(0, t));
|
||||||
// => respond with block
|
// => respond with block
|
||||||
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
||||||
assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis(), ServerTaskIndex::None)]);
|
assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis())]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -659,7 +659,7 @@ pub mod tests {
|
||||||
hash: test_data::block_h1().hash(),
|
hash: test_data::block_h1().hash(),
|
||||||
}];
|
}];
|
||||||
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
||||||
assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::None)]);
|
assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -722,7 +722,7 @@ pub mod tests {
|
||||||
hash: transaction_hash,
|
hash: transaction_hash,
|
||||||
}];
|
}];
|
||||||
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
||||||
assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::None)]);
|
assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -764,7 +764,7 @@ pub mod tests {
|
||||||
server.serve_getdata(0, FilteredInventory::with_unfiltered(inventory.clone())).map(|t| server.add_task(0, t));
|
server.serve_getdata(0, FilteredInventory::with_unfiltered(inventory.clone())).map(|t| server.add_task(0, t));
|
||||||
// => respond with notfound
|
// => respond with notfound
|
||||||
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
||||||
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::None)]);
|
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory)]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
Loading…
Reference in New Issue