diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index 97077c34..d46fe4be 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -443,4 +443,36 @@ mod tests { } } } + + #[test] + fn local_node_serves_compactblock() { + let (_, _, _, server, local_node) = create_local_node(); + + let genesis = test_data::genesis(); + let b1 = test_data::block_builder().header().parent(genesis.hash()).build() + .transaction().output().value(10).build().build() + .build(); // genesis -> b1 + let b1_hash = b1.hash(); + + // This peer will provide blocks + let peer_index1 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new()); + local_node.on_peer_block(peer_index1, types::Block { block: b1.clone() }); + + // This peer will receive compact block + let peer_index2 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new()); + local_node.on_peer_getdata(peer_index2, types::GetData {inventory: vec![ + InventoryVector { inv_type: InventoryType::MessageCompactBlock, hash: b1_hash.clone() }, + ]}); + let tasks = server.take_tasks(); + assert_eq!(tasks.len(), 1); + match tasks[0] { + (_, ServerTask::ServeGetData(ref gd)) => { + assert_eq!(gd.filtered.len(), 0); + assert_eq!(gd.unfiltered.len(), 0); + assert_eq!(gd.notfound.len(), 0); + assert_eq!(gd.compacted.len(), 1); + }, + _ => panic!("unexpected"), + } + } } diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 017c3fa8..8b4841dc 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -238,7 +238,9 @@ pub struct Config { pub struct FilteredInventory { /// Merkleblock messages + transactions to send after pub filtered: Vec<(types::MerkleBlock, Vec<(H256, Transaction)>)>, - /// Rest of inventory with MessageTx, MessageBlock, MessageCompactBlock inventory types + /// Compactblock messages + pub compacted: Vec, + /// Rest of inventory with MessageTx, MessageBlock inventory types pub unfiltered: Vec, /// Items that were supposed to be filtered, but we know nothing about these pub notfound: Vec, @@ -300,6 +302,7 @@ impl FilteredInventory { pub fn with_unfiltered(unfiltered: Vec) -> Self { FilteredInventory { filtered: Vec::new(), + compacted: Vec::new(), unfiltered: unfiltered, notfound: Vec::new(), } @@ -309,6 +312,7 @@ impl FilteredInventory { pub fn with_notfound(notfound: Vec) -> Self { FilteredInventory { filtered: Vec::new(), + compacted: Vec::new(), unfiltered: Vec::new(), notfound: notfound, } @@ -460,9 +464,10 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { /// Filter inventory from `getdata` message for given peer fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec) -> FilteredInventory { - let chain = self.chain.read(); + let storage = { self.chain.read().storage() }; let mut filter = self.peers.filter_mut(peer_index); let mut filtered: Vec<(types::MerkleBlock, Vec<(H256, Transaction)>)> = Vec::new(); + let mut compacted: Vec = Vec::new(); let mut unfiltered: Vec = Vec::new(); let mut notfound: Vec = Vec::new(); @@ -473,12 +478,30 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { // 2) build && send `merkleblock` message for this block // 3) send all matching transactions after this block InventoryType::MessageFilteredBlock => { - match chain.storage().block(db::BlockRef::Hash(item.hash.clone())) { + match storage.block(db::BlockRef::Hash(item.hash.clone())) { None => notfound.push(item), Some(block) => match filter.build_merkle_block(block) { None => notfound.push(item), Some(merkleblock) => filtered.push((merkleblock.merkleblock, merkleblock.matching_transactions)), - } + }, + } + }, + // peer asks for compact block: + InventoryType::MessageCompactBlock => { + match storage.block(db::BlockRef::Hash(item.hash.clone())) { + None => notfound.push(item), + Some(block) => { + let indexed_block: IndexedBlock = block.into(); + let prefilled_transactions_indexes = indexed_block.transactions().enumerate() + // we do not filter by fee rate here, because it only reasonable for non-mined transactions + .filter(|&(_, (h, t))| filter.filter_transaction(h, t, None)) + .map(|(idx, _)| idx) + .collect(); + let compact_block = types::CompactBlock { + header: build_compact_block(indexed_block, prefilled_transactions_indexes), + }; + compacted.push(compact_block); + }, } }, // these will be filtered (found/not found) in sync server @@ -488,6 +511,7 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { FilteredInventory { filtered: filtered, + compacted: compacted, unfiltered: unfiltered, notfound: notfound, } diff --git a/sync/src/synchronization_server.rs b/sync/src/synchronization_server.rs index f0692d4f..5dd4d218 100644 --- a/sync/src/synchronization_server.rs +++ b/sync/src/synchronization_server.rs @@ -104,6 +104,7 @@ pub enum ServerTask { ReturnNotFound(Vec), ReturnBlock(H256), ReturnMerkleBlock(types::MerkleBlock), + ReturnCompactBlock(types::CompactBlock), ReturnTransaction(Transaction), } @@ -168,6 +169,10 @@ impl SynchronizationServer { new_tasks.extend(transactions.into_iter().map(|(_, t)| IndexedServerTask::new(ServerTask::ReturnTransaction(t), ServerTaskIndex::None))); } + // process compactblock items + for compactblock in inventory.compacted { + new_tasks.push(IndexedServerTask::new(ServerTask::ReturnCompactBlock(compactblock), ServerTaskIndex::None)); + } // extend with unknown merkleitems unknown_items.extend(inventory.notfound); // process unfiltered items @@ -191,7 +196,11 @@ impl SynchronizationServer { None => unknown_items.push(item), } }, - _ => (), // TODO: process other inventory types + // we have no enough information here => it must be filtered by caller + InventoryType::MessageCompactBlock => unreachable!(), + // we have no enough information here => it must be filtered by caller + InventoryType::MessageFilteredBlock => unreachable!(), + _ => (), } } } @@ -334,6 +343,10 @@ impl SynchronizationServer { ServerTask::ReturnMerkleBlock(merkleblock) => { executor.lock().execute(Task::SendMerkleBlock(peer_index, merkleblock)); }, + // `cmpctblock` + ServerTask::ReturnCompactBlock(compactblock) => { + executor.lock().execute(Task::SendCompactBlocks(peer_index, vec![compactblock.header])) + } // `tx` ServerTask::ReturnTransaction(transaction) => { executor.lock().execute(Task::SendTransaction(peer_index, transaction));