diff --git a/db/src/lib.rs b/db/src/lib.rs index cc04ad11..78abf148 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -64,7 +64,7 @@ pub type SharedStore = std::sync::Arc; pub use best_block::BestBlock; pub use storage::{Storage, Store}; -pub use error::Error; +pub use error::{Error, ConsistencyError}; pub use kvdb::Database; pub use transaction_provider::{TransactionProvider, AsTransactionProvider, PreviousTransactionOutputProvider}; pub use transaction_meta_provider::TransactionMetaProvider; diff --git a/db/src/storage.rs b/db/src/storage.rs index e4d7ae4a..d12abec1 100644 --- a/db/src/storage.rs +++ b/db/src/storage.rs @@ -224,7 +224,7 @@ impl Storage { /// all transaction meta is removed /// DOES NOT update best block fn decanonize_block(&self, context: &mut UpdateContext, hash: &H256) -> Result<(), Error> { - trace!(target: "reorg", "Decanonizing block {}", hash.to_reversed_str()); + trace!(target: "db", "Decanonizing block {}", hash.to_reversed_str()); // ensure that block is of the main chain try!(self.block_number(hash).ok_or(Error::not_main(hash))); @@ -299,6 +299,8 @@ impl Storage { } fn canonize_block(&self, context: &mut UpdateContext, at_height: u32, hash: &H256) -> Result<(), Error> { + trace!(target: "db", "Canonizing block {}", hash.to_reversed_str()); + let block = try!(self.block_by_hash(hash).ok_or(Error::unknown_hash(hash))); try!(self.update_transactions_meta(context, at_height, &mut block.transactions())); @@ -377,7 +379,7 @@ impl Storage { // lock will be held until the end of the routine let mut best_block = self.best_block.write(); - let mut context = UpdateContext::new(&self.database); + let mut context = UpdateContext::new(&self.database, hash); let mut result = Vec::new(); let mut best_number = try!(self.best_number().ok_or(Error::Consistency(ConsistencyError::NoBestBlock))); loop { @@ -454,7 +456,7 @@ impl BlockStapler for Storage { // ! lock will be held during the entire insert routine let mut best_block = self.best_block.write(); - let mut context = UpdateContext::new(&self.database); + let mut context = UpdateContext::new(&self.database, block.hash()); let block_hash = block.hash(); @@ -577,6 +579,7 @@ impl BlockStapler for Storage { // write accumulated transactions meta try!(context.apply(&self.database)); + trace!(target: "db", "Best block now ({}, {})", &new_best_hash.to_reversed_str(), &new_best_number); // updating locked best block *best_block = Some(BestBlock { hash: new_best_hash, number: new_best_number }); @@ -1179,7 +1182,7 @@ mod tests { .expect("Transaction meta for the genesis coinbase transaction should exist"); assert!(genesis_meta.is_spent(0), "Genesis coinbase should be recorded as spent because block#1 transaction spends it"); - let mut update_context = UpdateContext::new(&store.database); + let mut update_context = UpdateContext::new(&store.database, &block_hash); store.decanonize_block(&mut update_context, &block_hash) .expect("Decanonizing block #1 which was just inserted should not fail"); update_context.apply(&store.database).unwrap(); diff --git a/db/src/test_storage.rs b/db/src/test_storage.rs index 22750aad..5dbc60ba 100644 --- a/db/src/test_storage.rs +++ b/db/src/test_storage.rs @@ -26,6 +26,7 @@ struct TestData { blocks: HashMap, heights: HashMap, hashes: HashMap, + insert_errors: HashMap, } impl TestStorage { @@ -61,6 +62,10 @@ impl TestStorage { let genesis_block: Block = "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a29ab5f49ffff001d1dac2b7c0101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000".into(); TestStorage::with_blocks(&vec![genesis_block]) } + + pub fn insert_error(&mut self, hash: H256, err: Error) { + self.data.write().insert_errors.insert(hash, err); + } } impl BlockProvider for TestStorage { @@ -120,6 +125,11 @@ impl BlockStapler for TestStorage { fn insert_block(&self, block: &chain::Block) -> Result { let hash = block.hash(); let mut data = self.data.write(); + + if let Some(err) = data.insert_errors.remove(&hash) { + return Err(err); + } + match data.blocks.entry(hash.clone()) { Entry::Occupied(mut entry) => { replace(entry.get_mut(), block.clone()); diff --git a/db/src/update_context.rs b/db/src/update_context.rs index 7a2537f9..eb93e17f 100644 --- a/db/src/update_context.rs +++ b/db/src/update_context.rs @@ -9,14 +9,16 @@ pub struct UpdateContext { pub meta: HashMap, pub db_transaction: DBTransaction, meta_snapshot: Option>, + target: H256, } impl UpdateContext { - pub fn new(db: &Database) -> Self { + pub fn new(db: &Database, target: &H256) -> Self { UpdateContext { meta: HashMap::new(), db_transaction: db.transaction(), meta_snapshot: None, + target: target.clone(), } } @@ -27,6 +29,8 @@ impl UpdateContext { } try!(db.write(self.db_transaction)); + + trace!("Applied transaction for block {:?}", &self.target.to_reversed_str()); Ok(()) } diff --git a/import/src/blk.rs b/import/src/blk.rs index 8d2a4ec8..85d71c5b 100644 --- a/import/src/blk.rs +++ b/import/src/blk.rs @@ -1,4 +1,5 @@ use std::{io, fs, path}; +use std::collections::BTreeSet; use ser::{ReadIterator, deserialize_iterator, Error as ReaderError}; use block::Block; use fs::read_blk_dir; @@ -25,9 +26,11 @@ impl Iterator for BlkFile { } pub fn open_blk_dir

(path: P) -> Result where P: AsRef { - let iter = try!(read_blk_dir(path)) + let files = read_blk_dir(path)?.collect::, _>>()?; + + let iter = files.into_iter() // flatten results... - .flat_map(|entry| entry.and_then(|file| open_blk_file(file.path))) + .flat_map(|file| open_blk_file(file.path)) // flat iterators over each block in each file .flat_map(|file| file); diff --git a/import/src/fs.rs b/import/src/fs.rs index d10b749c..1933bdfe 100644 --- a/import/src/fs.rs +++ b/import/src/fs.rs @@ -1,4 +1,4 @@ -use std::{io, fs, path}; +use std::{io, fs, path, cmp}; /// Creates an iterator over all blk .dat files pub fn read_blk_dir

(path: P) -> Result where P: AsRef { @@ -13,10 +13,23 @@ pub struct ReadBlkDir { read_dir: fs::ReadDir, } +#[derive(PartialEq, Eq)] pub struct BlkEntry { pub path: path::PathBuf, } +impl cmp::PartialOrd for BlkEntry { + fn partial_cmp(&self, other: &Self) -> Option { + cmp::PartialOrd::partial_cmp(&self.path, &other.path) + } +} + +impl cmp::Ord for BlkEntry { + fn cmp(&self, other: &Self) -> cmp::Ordering { + cmp::Ord::cmp(&self.path, &other.path) + } +} + fn is_blk_file_name(file_name: &str) -> bool { if file_name.len() != 12 || !file_name.starts_with("blk") || !file_name.ends_with(".dat") { return false; diff --git a/network/src/consensus.rs b/network/src/consensus.rs index 3890a73f..be7629d7 100644 --- a/network/src/consensus.rs +++ b/network/src/consensus.rs @@ -31,8 +31,8 @@ impl ConsensusParams { } pub fn is_bip30_exception(&self, hash: &H256, height: u32) -> bool { - (height == 91842 && hash == &H256::from_reversed_str("0x00000000000a4d0a398161ffc163c503763b1f4360639393e0e4c8e300e0caec")) || - (height == 91880 && hash == &H256::from_reversed_str("0x00000000000743f190a18c5577a3c2d2a1f610ae9601ac046a38084ccb7cd721")) + (height == 91842 && hash == &H256::from_reversed_str("00000000000a4d0a398161ffc163c503763b1f4360639393e0e4c8e300e0caec")) || + (height == 91880 && hash == &H256::from_reversed_str("00000000000743f190a18c5577a3c2d2a1f610ae9601ac046a38084ccb7cd721")) } } diff --git a/sync/src/blocks_writer.rs b/sync/src/blocks_writer.rs index bb618b2a..0c885ace 100644 --- a/sync/src/blocks_writer.rs +++ b/sync/src/blocks_writer.rs @@ -9,7 +9,7 @@ use synchronization_verifier::{Verifier, SyncVerifier, VerificationSink, Verific use primitives::hash::H256; use super::Error; -pub const MAX_ORPHANED_BLOCKS: usize = 64; +pub const MAX_ORPHANED_BLOCKS: usize = 1024; pub struct BlocksWriter { storage: db::SharedStore, diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index e3c23e6d..d46fe4be 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -92,7 +92,8 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon self.client.lock().on_new_transactions_inventory(peer_index, transactions_inventory); } - // TODO: process other inventory types + // currently we do not setup connection filter => skip InventoryType::MessageFilteredBlock + // currently we do not send sendcmpct message => skip InventoryType::MessageCompactBlock } pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData) { @@ -442,4 +443,36 @@ mod tests { } } } + + #[test] + fn local_node_serves_compactblock() { + let (_, _, _, server, local_node) = create_local_node(); + + let genesis = test_data::genesis(); + let b1 = test_data::block_builder().header().parent(genesis.hash()).build() + .transaction().output().value(10).build().build() + .build(); // genesis -> b1 + let b1_hash = b1.hash(); + + // This peer will provide blocks + let peer_index1 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new()); + local_node.on_peer_block(peer_index1, types::Block { block: b1.clone() }); + + // This peer will receive compact block + let peer_index2 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new()); + local_node.on_peer_getdata(peer_index2, types::GetData {inventory: vec![ + InventoryVector { inv_type: InventoryType::MessageCompactBlock, hash: b1_hash.clone() }, + ]}); + let tasks = server.take_tasks(); + assert_eq!(tasks.len(), 1); + match tasks[0] { + (_, ServerTask::ServeGetData(ref gd)) => { + assert_eq!(gd.filtered.len(), 0); + assert_eq!(gd.unfiltered.len(), 0); + assert_eq!(gd.notfound.len(), 0); + assert_eq!(gd.compacted.len(), 1); + }, + _ => panic!("unexpected"), + } + } } diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index f962f6b4..1bb07f48 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -238,7 +238,9 @@ pub struct Config { pub struct FilteredInventory { /// Merkleblock messages + transactions to send after pub filtered: Vec<(types::MerkleBlock, Vec<(H256, Transaction)>)>, - /// Rest of inventory with MessageTx, MessageBlock, MessageCompactBlock inventory types + /// Compactblock messages + pub compacted: Vec, + /// Rest of inventory with MessageTx, MessageBlock inventory types pub unfiltered: Vec, /// Items that were supposed to be filtered, but we know nothing about these pub notfound: Vec, @@ -302,6 +304,7 @@ impl FilteredInventory { pub fn with_unfiltered(unfiltered: Vec) -> Self { FilteredInventory { filtered: Vec::new(), + compacted: Vec::new(), unfiltered: unfiltered, notfound: Vec::new(), } @@ -311,6 +314,7 @@ impl FilteredInventory { pub fn with_notfound(notfound: Vec) -> Self { FilteredInventory { filtered: Vec::new(), + compacted: Vec::new(), unfiltered: Vec::new(), notfound: notfound, } @@ -467,9 +471,10 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { /// Filter inventory from `getdata` message for given peer fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec) -> FilteredInventory { - let chain = self.chain.read(); + let storage = { self.chain.read().storage() }; let mut filter = self.peers.filter_mut(peer_index); let mut filtered: Vec<(types::MerkleBlock, Vec<(H256, Transaction)>)> = Vec::new(); + let mut compacted: Vec = Vec::new(); let mut unfiltered: Vec = Vec::new(); let mut notfound: Vec = Vec::new(); @@ -480,12 +485,30 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { // 2) build && send `merkleblock` message for this block // 3) send all matching transactions after this block InventoryType::MessageFilteredBlock => { - match chain.storage().block(db::BlockRef::Hash(item.hash.clone())) { + match storage.block(db::BlockRef::Hash(item.hash.clone())) { None => notfound.push(item), Some(block) => match filter.build_merkle_block(block) { None => notfound.push(item), Some(merkleblock) => filtered.push((merkleblock.merkleblock, merkleblock.matching_transactions)), - } + }, + } + }, + // peer asks for compact block: + InventoryType::MessageCompactBlock => { + match storage.block(db::BlockRef::Hash(item.hash.clone())) { + None => notfound.push(item), + Some(block) => { + let indexed_block: IndexedBlock = block.into(); + let prefilled_transactions_indexes = indexed_block.transactions().enumerate() + // we do not filter by fee rate here, because it only reasonable for non-mined transactions + .filter(|&(_, (h, t))| filter.filter_transaction(h, t, None)) + .map(|(idx, _)| idx) + .collect(); + let compact_block = types::CompactBlock { + header: build_compact_block(indexed_block, prefilled_transactions_indexes), + }; + compacted.push(compact_block); + }, } }, // these will be filtered (found/not found) in sync server @@ -495,6 +518,7 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { FilteredInventory { filtered: filtered, + compacted: compacted, unfiltered: unfiltered, notfound: notfound, } @@ -926,8 +950,6 @@ impl SynchronizationClientCore where T: TaskExecutor { } )); - // TODO: start management worker only when synchronization is started - // currently impossible because there is no way to call Interval::new with Remote && Handle is not-Send { let peers_config = ManagePeersConfig::default(); let unknown_config = ManageUnknownBlocksConfig::default(); @@ -1024,7 +1046,7 @@ impl SynchronizationClientCore where T: TaskExecutor { }) .collect(); - Some(Task::SendCompactBlocks(peer_index, block_header_and_ids, ServerTaskIndex::None)) + Some(Task::SendCompactBlocks(peer_index, block_header_and_ids)) }, BlockAnnouncementType::SendInventory => { let inventory: Vec<_> = new_blocks_hashes.iter() @@ -1035,7 +1057,7 @@ impl SynchronizationClientCore where T: TaskExecutor { }) .collect(); if !inventory.is_empty() { - Some(Task::SendInventory(peer_index, inventory, ServerTaskIndex::None)) + Some(Task::SendInventory(peer_index, inventory)) } else { None } @@ -1065,7 +1087,7 @@ impl SynchronizationClientCore where T: TaskExecutor { }) .collect(); if !inventory.is_empty() { - Some(Task::SendInventory(peer_index, inventory, ServerTaskIndex::None)) + Some(Task::SendInventory(peer_index, inventory)) } else { None } @@ -1216,17 +1238,12 @@ impl SynchronizationClientCore where T: TaskExecutor { } /// Process new peer transaction - fn process_peer_transaction(&mut self, peer_index: Option, hash: H256, transaction: Transaction, relay: bool) -> Option> { + fn process_peer_transaction(&mut self, _peer_index: Option, hash: H256, transaction: Transaction, relay: bool) -> Option> { // if we are in synchronization state, we will ignore this message if self.state.is_synchronizing() { return None; } - // mark peer as useful (TODO: remove after self.all_peers() would be all peers, not sync one) - if let Some(peer_index) = peer_index { - self.peers.useful_peer(peer_index); - } - // else => verify transaction + it's orphans and then add to the memory pool let mut chain = self.chain.write(); @@ -1576,7 +1593,7 @@ pub mod tests { assert!(tasks.iter().any(|t| t == &Task::RequestMemoryPool(2))); let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: block.hash() }]; - assert!(tasks.iter().any(|t| t == &Task::SendInventory(1, inventory.clone(), ServerTaskIndex::None))); + assert!(tasks.iter().any(|t| t == &Task::SendInventory(1, inventory.clone()))); } #[test] @@ -1854,7 +1871,19 @@ pub mod tests { #[test] fn sync_after_db_insert_nonfatal_fail() { - // TODO: implement me + use db::Store; + + let mut storage = db::TestStorage::with_genesis_block(); + let block = test_data::block_h1(); + storage.insert_error(block.hash(), db::Error::Consistency(db::ConsistencyError::NoBestBlock)); + let best_genesis = storage.best_block().unwrap(); + + let (_, _, _, chain, sync) = create_sync(Some(Arc::new(storage)), None); + let mut sync = sync.lock(); + + sync.on_peer_block(1, block.into()); + + assert_eq!(chain.read().best_block(), best_genesis); } #[test] @@ -2121,7 +2150,7 @@ pub mod tests { { let tasks = executor.lock().take_tasks(); let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: b2.hash() }]; - assert_eq!(tasks, vec![Task::RequestBlocksHeaders(2), Task::SendInventory(1, inventory, ServerTaskIndex::None)]); + assert_eq!(tasks, vec![Task::RequestBlocksHeaders(2), Task::SendInventory(1, inventory)]); } sync.on_new_blocks_headers(1, vec![b3.block_header.clone()]); @@ -2131,7 +2160,7 @@ pub mod tests { { let tasks = executor.lock().take_tasks(); let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: b3.hash() }]; - assert!(tasks.iter().any(|t| t == &Task::SendInventory(2, inventory.clone(), ServerTaskIndex::None))); + assert!(tasks.iter().any(|t| t == &Task::SendInventory(2, inventory.clone()))); } } @@ -2139,17 +2168,16 @@ pub mod tests { fn relay_new_transaction_when_in_saturated_state() { let (_, _, executor, _, sync) = create_sync(None, None); - let tx1: Transaction = test_data::TransactionBuilder::with_output(10).into(); - let tx2: Transaction = test_data::TransactionBuilder::with_output(20).into(); - let tx2_hash = tx2.hash(); + let tx: Transaction = test_data::TransactionBuilder::with_output(20).into(); + let tx_hash = tx.hash(); let mut sync = sync.lock(); - sync.on_peer_transaction(1, tx1); - sync.on_peer_transaction(2, tx2); + sync.on_peer_connected(1); + sync.on_peer_transaction(2, tx); let tasks = { executor.lock().take_tasks() }; - let inventory = vec![InventoryVector { inv_type: InventoryType::MessageTx, hash: tx2_hash }]; - assert_eq!(tasks, vec![Task::SendInventory(1, inventory, ServerTaskIndex::None)]); + let inventory = vec![InventoryVector { inv_type: InventoryType::MessageTx, hash: tx_hash }]; + assert_eq!(tasks, vec![Task::SendInventory(1, inventory)]); } #[test] @@ -2190,9 +2218,9 @@ pub mod tests { let tasks = { executor.lock().take_tasks() }; let inventory = vec![InventoryVector { inv_type: InventoryType::MessageTx, hash: tx1_hash }]; assert_eq!(tasks, vec![ - Task::SendInventory(1, inventory.clone(), ServerTaskIndex::None), - Task::SendInventory(3, inventory.clone(), ServerTaskIndex::None), - Task::SendInventory(4, inventory.clone(), ServerTaskIndex::None), + Task::SendInventory(1, inventory.clone()), + Task::SendInventory(3, inventory.clone()), + Task::SendInventory(4, inventory.clone()), ]); // tx2 is relayed to peers: 2, 3, 4 @@ -2201,9 +2229,9 @@ pub mod tests { let tasks = { executor.lock().take_tasks() }; let inventory = vec![InventoryVector { inv_type: InventoryType::MessageTx, hash: tx2_hash }]; assert_eq!(tasks, vec![ - Task::SendInventory(2, inventory.clone(), ServerTaskIndex::None), - Task::SendInventory(3, inventory.clone(), ServerTaskIndex::None), - Task::SendInventory(4, inventory.clone(), ServerTaskIndex::None), + Task::SendInventory(2, inventory.clone()), + Task::SendInventory(3, inventory.clone()), + Task::SendInventory(4, inventory.clone()), ]); } @@ -2229,7 +2257,7 @@ pub mod tests { let headers = vec![b0.block_header.clone()]; assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), Task::SendHeaders(2, headers, ServerTaskIndex::None), - Task::SendInventory(3, inventory, ServerTaskIndex::None), + Task::SendInventory(3, inventory), ]); } @@ -2273,13 +2301,13 @@ pub mod tests { inv_type: InventoryType::MessageTx, hash: tx1_hash.clone(), } - ], ServerTaskIndex::None), + ]), Task::SendInventory(4, vec![ InventoryVector { inv_type: InventoryType::MessageTx, hash: tx1_hash.clone(), } - ], ServerTaskIndex::None), + ]), ]); } @@ -2316,9 +2344,9 @@ pub mod tests { assert_eq!(tasks.len(), 3); assert_eq!(tasks[0], Task::RequestBlocksHeaders(1)); match tasks[1] { - Task::SendCompactBlocks(2, _, _) => (), + Task::SendCompactBlocks(2, _) => (), _ => panic!("unexpected task"), } - assert_eq!(tasks[2], Task::SendInventory(3, inventory, ServerTaskIndex::None)); + assert_eq!(tasks[2], Task::SendInventory(3, inventory)); } } diff --git a/sync/src/synchronization_executor.rs b/sync/src/synchronization_executor.rs index 54a2b3c7..d26c21a8 100644 --- a/sync/src/synchronization_executor.rs +++ b/sync/src/synchronization_executor.rs @@ -17,7 +17,6 @@ pub trait TaskExecutor : Send + 'static { fn execute(&mut self, task: Task); } -// TODO: get rid of unneeded ServerTaskIndex-es /// Synchronization task for the peer. #[derive(Debug, PartialEq)] pub enum Task { @@ -30,7 +29,7 @@ pub enum Task { /// Request memory pool contents RequestMemoryPool(usize), /// Send block. - SendBlock(usize, Block, ServerTaskIndex), + SendBlock(usize, Block), /// Send merkleblock SendMerkleBlock(usize, types::MerkleBlock), /// Send transaction @@ -38,13 +37,13 @@ pub enum Task { /// Send block transactions SendBlockTxn(usize, H256, Vec), /// Send notfound - SendNotFound(usize, Vec, ServerTaskIndex), + SendNotFound(usize, Vec), /// Send inventory - SendInventory(usize, Vec, ServerTaskIndex), + SendInventory(usize, Vec), /// Send headers SendHeaders(usize, Vec, ServerTaskIndex), /// Send compact blocks - SendCompactBlocks(usize, Vec, ServerTaskIndex), + SendCompactBlocks(usize, Vec), /// Notify io about ignored request Ignore(usize, u32), } @@ -130,13 +129,12 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { connection.send_getdata(&getdata); } }, - Task::SendBlock(peer_index, block, id) => { + Task::SendBlock(peer_index, block) => { let block_message = types::Block { block: block, }; if let Some(connection) = self.peers.get_mut(&peer_index) { - assert_eq!(id.raw(), None); trace!(target: "sync", "Sending block {:?} to peer#{}", block_message.block.hash().to_reversed_str(), peer_index); connection.send_block(&block_message); } @@ -170,24 +168,22 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { connection.send_block_txn(&transactions_message); } }, - Task::SendNotFound(peer_index, unknown_inventory, id) => { + Task::SendNotFound(peer_index, unknown_inventory) => { let notfound = types::NotFound { inventory: unknown_inventory, }; if let Some(connection) = self.peers.get_mut(&peer_index) { - assert_eq!(id.raw(), None); trace!(target: "sync", "Sending notfound to peer#{} with {} items", peer_index, notfound.inventory.len()); connection.send_notfound(¬found); } }, - Task::SendInventory(peer_index, inventory, id) => { + Task::SendInventory(peer_index, inventory) => { let inventory = types::Inv { inventory: inventory, }; if let Some(connection) = self.peers.get_mut(&peer_index) { - assert_eq!(id.raw(), None); trace!(target: "sync", "Sending inventory to peer#{} with {} items", peer_index, inventory.inventory.len()); connection.send_inventory(&inventory); } @@ -205,9 +201,8 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { } } }, - Task::SendCompactBlocks(peer_index, compact_blocks, id) => { + Task::SendCompactBlocks(peer_index, compact_blocks) => { if let Some(connection) = self.peers.get_mut(&peer_index) { - assert_eq!(id.raw(), None); for compact_block in compact_blocks { trace!(target: "sync", "Sending compact_block {:?} to peer#{}", compact_block.header.hash(), peer_index); connection.send_compact_block(&types::CompactBlock { diff --git a/sync/src/synchronization_server.rs b/sync/src/synchronization_server.rs index 3c24e981..5dd4d218 100644 --- a/sync/src/synchronization_server.rs +++ b/sync/src/synchronization_server.rs @@ -104,6 +104,7 @@ pub enum ServerTask { ReturnNotFound(Vec), ReturnBlock(H256), ReturnMerkleBlock(types::MerkleBlock), + ReturnCompactBlock(types::CompactBlock), ReturnTransaction(Transaction), } @@ -168,6 +169,10 @@ impl SynchronizationServer { new_tasks.extend(transactions.into_iter().map(|(_, t)| IndexedServerTask::new(ServerTask::ReturnTransaction(t), ServerTaskIndex::None))); } + // process compactblock items + for compactblock in inventory.compacted { + new_tasks.push(IndexedServerTask::new(ServerTask::ReturnCompactBlock(compactblock), ServerTaskIndex::None)); + } // extend with unknown merkleitems unknown_items.extend(inventory.notfound); // process unfiltered items @@ -191,7 +196,11 @@ impl SynchronizationServer { None => unknown_items.push(item), } }, - _ => (), // TODO: process other inventory types + // we have no enough information here => it must be filtered by caller + InventoryType::MessageCompactBlock => unreachable!(), + // we have no enough information here => it must be filtered by caller + InventoryType::MessageFilteredBlock => unreachable!(), + _ => (), } } } @@ -224,7 +233,7 @@ impl SynchronizationServer { inv_type: InventoryType::MessageBlock, hash: hash, }).collect(); - executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id)); + executor.lock().execute(Task::SendInventory(peer_index, inventory)); } } else { @@ -309,7 +318,7 @@ impl SynchronizationServer { .collect(); if !inventory.is_empty() { trace!(target: "sync", "Going to respond with {} memory-pool transactions ids to peer#{}", inventory.len(), peer_index); - executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id)); + executor.lock().execute(Task::SendInventory(peer_index, inventory)); } else { assert_eq!(indexed_task.id, ServerTaskIndex::None); } @@ -318,7 +327,7 @@ impl SynchronizationServer { }, // `notfound` ServerTask::ReturnNotFound(inventory) => { - executor.lock().execute(Task::SendNotFound(peer_index, inventory, indexed_task.id)); + executor.lock().execute(Task::SendNotFound(peer_index, inventory)); // inform that we have processed task for peer queue.lock().task_processed(peer_index); }, @@ -326,7 +335,7 @@ impl SynchronizationServer { ServerTask::ReturnBlock(block_hash) => { let block = chain.read().storage().block(db::BlockRef::Hash(block_hash)) .expect("we have checked that block exists in ServeGetData; db is append-only; qed"); - executor.lock().execute(Task::SendBlock(peer_index, block, indexed_task.id)); + executor.lock().execute(Task::SendBlock(peer_index, block)); // inform that we have processed task for peer queue.lock().task_processed(peer_index); }, @@ -334,6 +343,10 @@ impl SynchronizationServer { ServerTask::ReturnMerkleBlock(merkleblock) => { executor.lock().execute(Task::SendMerkleBlock(peer_index, merkleblock)); }, + // `cmpctblock` + ServerTask::ReturnCompactBlock(compactblock) => { + executor.lock().execute(Task::SendCompactBlocks(peer_index, vec![compactblock.header])) + } // `tx` ServerTask::ReturnTransaction(transaction) => { executor.lock().execute(Task::SendTransaction(peer_index, transaction)); @@ -609,7 +622,7 @@ pub mod tests { server.serve_getdata(0, FilteredInventory::with_unfiltered(inventory.clone())).map(|t| server.add_task(0, t)); // => respond with notfound let tasks = DummyTaskExecutor::wait_tasks(executor); - assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::None)]); + assert_eq!(tasks, vec![Task::SendNotFound(0, inventory)]); } #[test] @@ -625,7 +638,7 @@ pub mod tests { server.serve_getdata(0, FilteredInventory::with_unfiltered(inventory)).map(|t| server.add_task(0, t)); // => respond with block let tasks = DummyTaskExecutor::wait_tasks(executor); - assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis(), ServerTaskIndex::None)]); + assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis())]); } #[test] @@ -659,7 +672,7 @@ pub mod tests { hash: test_data::block_h1().hash(), }]; let tasks = DummyTaskExecutor::wait_tasks(executor); - assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::None)]); + assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]); } #[test] @@ -722,7 +735,7 @@ pub mod tests { hash: transaction_hash, }]; let tasks = DummyTaskExecutor::wait_tasks(executor); - assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::None)]); + assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]); } #[test] @@ -764,7 +777,7 @@ pub mod tests { server.serve_getdata(0, FilteredInventory::with_unfiltered(inventory.clone())).map(|t| server.add_task(0, t)); // => respond with notfound let tasks = DummyTaskExecutor::wait_tasks(executor); - assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::None)]); + assert_eq!(tasks, vec![Task::SendNotFound(0, inventory)]); } #[test]