From 5818c8fafa4c7b2ade92794e8597a55dc6a2617c Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Thu, 24 Nov 2016 10:10:20 +0300 Subject: [PATCH] filter merkleblocks in getdata --- sync/src/connection_filter.rs | 55 +++++++++++++++---- sync/src/local_node.rs | 81 ++++++++++++++++++++++++++-- sync/src/synchronization_client.rs | 73 +++++++++++++++++++++++++ sync/src/synchronization_executor.rs | 24 ++++++++- sync/src/synchronization_server.rs | 45 +++++++++++----- 5 files changed, 249 insertions(+), 29 deletions(-) diff --git a/sync/src/connection_filter.rs b/sync/src/connection_filter.rs index 61ec321c..77ea54cf 100644 --- a/sync/src/connection_filter.rs +++ b/sync/src/connection_filter.rs @@ -5,6 +5,7 @@ use murmur3::murmur3_32; use chain::{Block, Transaction, OutPoint, merkle_root}; use ser::serialize; use message::types; +use primitives::bytes::Bytes; use primitives::hash::H256; use script::Script; @@ -39,6 +40,14 @@ struct ConnectionBloom { tweak: u32, } +/// `merkleblock` build artefacts +pub struct MerkleBlockArtefacts { + /// `merkleblock` message + pub merkleblock: types::MerkleBlock, + /// All matching transactions + pub matching_transactions: Vec<(H256, Transaction)>, +} + /// Service structure to construct `merkleblock` message. pub struct PartialMerkleTree { /// All transactions length. @@ -199,29 +208,42 @@ impl ConnectionFilter { } /// Convert `Block` to `MerkleBlock` using this filter - pub fn make_merkle_block(&mut self, block: &Block) -> Option { + pub fn build_merkle_block(&mut self, block: Block) -> Option { if self.bloom.is_none() { return None; } - // calculate hashes && match flags for all transactions + // prepare result let all_len = block.transactions.len(); - let (all_hashes, all_flags) = block.transactions.iter() + let mut result = MerkleBlockArtefacts { + merkleblock: types::MerkleBlock { + block_header: block.block_header.clone(), + total_transactions: all_len as u32, + hashes: Vec::default(), + flags: Bytes::default(), + }, + matching_transactions: Vec::new(), + }; + + // calculate hashes && match flags for all transactions + let (all_hashes, all_flags) = block.transactions.into_iter() .fold((Vec::::with_capacity(all_len), BitVec::with_capacity(all_len)), |(mut all_hashes, mut all_flags), t| { let hash = t.hash(); - all_flags.push(self.filter_transaction(&hash, t)); + let flag = self.filter_transaction(&hash, &t); + if flag { + result.matching_transactions.push((hash.clone(), t)); + } + + all_flags.push(flag); all_hashes.push(hash); (all_hashes, all_flags) }); // build partial merkle tree let (hashes, flags) = PartialMerkleTree::build(all_hashes, all_flags); - Some(types::MerkleBlock { - block_header: block.block_header.clone(), - total_transactions: all_len as u32, - hashes: hashes, - flags: flags.to_bytes().into(), - }) + result.merkleblock.hashes.extend(hashes); + result.merkleblock.flags = flags.to_bytes().into(); + Some(result) } } @@ -255,6 +277,11 @@ impl ConnectionBloom { self.filter.set(murmur_hash, true); } } + + #[cfg(test)] + pub fn bytes(&self) -> Bytes { + self.filter.to_bytes().into() + } } impl PartialMerkleTree { @@ -336,6 +363,14 @@ pub mod tests { } } + pub fn make_filterload(data: &[u8]) -> types::FilterLoad { + let mut filterload = default_filterload(); + let mut bloom = ConnectionBloom::new(&filterload); + bloom.insert(data); + filterload.filter = bloom.bytes(); + filterload + } + pub fn make_filteradd(data: &[u8]) -> types::FilterAdd { types::FilterAdd { data: data.into(), diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index cdc35118..3530331c 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -100,7 +100,11 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData) { trace!(target: "sync", "Got `getdata` message from peer#{}", peer_index); - self.server.serve_getdata(peer_index, message).map(|t| self.server.add_task(peer_index, t)); + let filtered_inventory = { + let mut client = self.client.lock(); + client.filter_getdata_inventory(peer_index, message.inventory) + }; + self.server.serve_getdata(peer_index, filtered_inventory).map(|t| self.server.add_task(peer_index, t)); } pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks) { @@ -227,9 +231,10 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon mod tests { use std::sync::Arc; use parking_lot::{Mutex, RwLock}; + use connection_filter::tests::make_filterload; use synchronization_executor::Task; use synchronization_executor::tests::DummyTaskExecutor; - use synchronization_client::{Config, SynchronizationClient, SynchronizationClientCore}; + use synchronization_client::{Config, SynchronizationClient, SynchronizationClientCore, FilteredInventory}; use synchronization_chain::Chain; use p2p::{event_loop, OutboundSyncConnection, OutboundSyncConnectionRef}; use message::types; @@ -241,6 +246,7 @@ mod tests { use synchronization_server::tests::DummyServer; use synchronization_verifier::tests::DummyVerifier; use tokio_core::reactor::{Core, Handle}; + use primitives::bytes::Bytes; struct DummyOutboundSyncConnection; @@ -317,6 +323,75 @@ mod tests { }); // => `getdata` is served let tasks = server.take_tasks(); - assert_eq!(tasks, vec![(peer_index, ServerTask::ServeGetData(inventory))]); + assert_eq!(tasks, vec![(peer_index, ServerTask::ServeGetData(FilteredInventory::with_unfiltered(inventory)))]); } + + /*#[test] + fn local_node_serves_merkleblock() { + let (_, _, _, server, local_node) = create_local_node(); + + let genesis = test_data::genesis(); + let b1 = test_data::block_builder().header().parent(genesis.hash()).build() + .transaction().output().value(10).build().build() + .build(); // genesis -> b1 + let b2 = test_data::block_builder().header().parent(b1.hash()).build() + .transaction().output().value(20).build().build() + .build(); // genesis -> b1 -> b2 + let b1_hash = b1.hash(); + let b2_hash = b1.hash(); + let match_bytes = Bytes::from(vec![0x80]); + + // This peer will provide blocks + let peer_index1 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new()); + local_node.on_peer_block(peer_index1, types::Block { block: b1.clone() }); + local_node.on_peer_block(peer_index1, types::Block { block: b2.clone() }); + + // This peer won't get any blocks, because it has not set filter for the connection + let peer_index2 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new()); + local_node.on_peer_getdata(peer_index2, types::GetData {inventory: vec![ + InventoryVector { inv_type: InventoryType::MessageFilteredBlock, hash: b1_hash.clone() }, + InventoryVector { inv_type: InventoryType::MessageFilteredBlock, hash: b2_hash.clone() }, + ]}); + assert_eq!(server.take_tasks(), vec![(peer_index2, ServerTask::ServeGetData(FilteredInventory::with_notfound(vec![ + InventoryVector { inv_type: InventoryType::MessageFilteredBlock, hash: b1_hash.clone() }, + InventoryVector { inv_type: InventoryType::MessageFilteredBlock, hash: b2_hash.clone() }, + ])))]); + + // This peer will get filtered b1 + let peer_index3 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new()); + local_node.on_peer_filterload(peer_index3, make_filterload(&*b1.transactions[0].hash())); + local_node.on_peer_getdata(peer_index3, types::GetData {inventory: vec![ + InventoryVector { inv_type: InventoryType::MessageFilteredBlock, hash: b1_hash.clone() }, + InventoryVector { inv_type: InventoryType::MessageFilteredBlock, hash: b2_hash.clone() }, + ]}); + assert_eq!(server.take_tasks(), vec![(3, ServerTask::ServeGetData(FilteredInventory { + filtered: vec![ + ( + types::MerkleBlock { + block_header: b1.block_header.clone(), + total_transactions: 1, + hashes: vec!["48de599e797bc0aee90c09f32aead1fce19bd8857c9407d76c8809ad076bbe98".into()], + flags: Bytes::new_with_len(1) + }, vec![] + ), + ( + types::MerkleBlock { + block_header: b2.block_header.clone(), + total_transactions: 1, + hashes: vec!["48de599e797bc0aee90c09f32aead1fce19bd8857c9407d76c8809ad076bbe98".into()], + flags: Bytes::new_with_len(1) + }, vec![] + ) + ], + unfiltered: vec![], + notfound: vec![] + }))]); + + // This peer will consume filtered b2 + //let peer_index4 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new()); + // This peer will consume filtered b1 + b2 + //let peer_index5 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new()); + // This peer will consume some other filtered block + //let peer_index6 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new()); + }*/ } diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 2b69a013..208dcdd9 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -183,6 +183,7 @@ pub struct Information { pub trait Client : Send + 'static { fn best_block(&self) -> db::BestBlock; fn state(&self) -> State; + fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec) -> FilteredInventory; fn on_peer_connected(&mut self, peer_index: usize); fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec); fn on_new_transactions_inventory(&mut self, peer_index: usize, transactions_hashes: Vec); @@ -201,6 +202,7 @@ pub trait Client : Send + 'static { pub trait ClientCore : VerificationSink { fn best_block(&self) -> db::BestBlock; fn state(&self) -> State; + fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec) -> FilteredInventory; fn on_peer_connected(&mut self, peer_index: usize); fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec); fn on_new_transactions_inventory(&mut self, peer_index: usize, transactions_hashes: Vec); @@ -219,11 +221,23 @@ pub trait ClientCore : VerificationSink { /// Synchronization client configuration options. +#[derive(Debug)] pub struct Config { /// Number of threads to allocate in synchronization CpuPool. pub threads_num: usize, } +/// Filtered `getdata` inventory. +#[derive(Debug, PartialEq)] +pub struct FilteredInventory { + /// Merkleblock messages + transactions to send after + pub filtered: Vec<(types::MerkleBlock, Vec<(H256, Transaction)>)>, + /// Rest of inventory with MessageTx, MessageBlock, MessageCompactBlock inventory types + pub unfiltered: Vec, + /// Items that were supposed to be filtered, but we know nothing about these + pub notfound: Vec, +} + /// Synchronization client facade pub struct SynchronizationClient { /// Client core @@ -264,6 +278,26 @@ impl Config { } } +impl FilteredInventory { + #[cfg(test)] + pub fn with_unfiltered(unfiltered: Vec) -> Self { + FilteredInventory { + filtered: Vec::new(), + unfiltered: unfiltered, + notfound: Vec::new(), + } + } + + #[cfg(test)] + pub fn with_notfound(notfound: Vec) -> Self { + FilteredInventory { + filtered: Vec::new(), + unfiltered: Vec::new(), + notfound: notfound, + } + } +} + impl State { pub fn is_saturated(&self) -> bool { match *self { @@ -296,6 +330,10 @@ impl Client for SynchronizationClient where T: TaskExecutor, U: Veri self.core.lock().state() } + fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec) -> FilteredInventory { + self.core.lock().filter_getdata_inventory(peer_index, inventory) + } + fn on_peer_connected(&mut self, peer_index: usize) { self.core.lock().on_peer_connected(peer_index); } @@ -395,6 +433,41 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { self.state } + /// Filter inventory from `getdata` message for given peer + fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec) -> FilteredInventory { + let chain = self.chain.read(); + let mut filter = self.peers.filter_mut(peer_index); + let mut filtered: Vec<(types::MerkleBlock, Vec<(H256, Transaction)>)> = Vec::new(); + let mut unfiltered: Vec = Vec::new(); + let mut notfound: Vec = Vec::new(); + + for item in inventory.into_iter() { + match item.inv_type { + // if peer asks for filtered block => we should: + // 1) check if block has any transactions, matching connection bloom filter + // 2) build && send `merkleblock` message for this block + // 3) send all matching transactions after this block + InventoryType::MessageFilteredBlock => { + match chain.storage().block(db::BlockRef::Hash(item.hash.clone())) { + None => notfound.push(item), + Some(block) => match filter.build_merkle_block(block) { + None => notfound.push(item), + Some(merkleblock) => filtered.push((merkleblock.merkleblock, merkleblock.matching_transactions)), + } + } + }, + // these will be filtered (found/not found) in sync server + _ => unfiltered.push(item), + } + } + + FilteredInventory { + filtered: filtered, + unfiltered: unfiltered, + notfound: notfound, + } + } + /// Called when new peer connection is established fn on_peer_connected(&mut self, peer_index: usize) { // unuseful until respond with headers message diff --git a/sync/src/synchronization_executor.rs b/sync/src/synchronization_executor.rs index 16f13a50..c0c9867a 100644 --- a/sync/src/synchronization_executor.rs +++ b/sync/src/synchronization_executor.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use std::collections::HashMap; use parking_lot::Mutex; -use chain::{Block, BlockHeader}; +use chain::{Block, BlockHeader, Transaction}; use message::common::{InventoryVector, InventoryType}; use message::types; use primitives::hash::H256; @@ -30,6 +30,10 @@ pub enum Task { RequestMemoryPool(usize), /// Send block. SendBlock(usize, Block, ServerTaskIndex), + /// Send merkleblock + SendMerkleBlock(usize, types::MerkleBlock), + /// Send transaction + SendTransaction(usize, Transaction), /// Send notfound SendNotFound(usize, Vec, ServerTaskIndex), /// Send inventory @@ -128,10 +132,26 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { if let Some(connection) = self.peers.get_mut(&peer_index) { assert_eq!(id.raw(), None); - trace!(target: "sync", "Sending block {:?} to peer#{}", block_message.block.hash(), peer_index); + trace!(target: "sync", "Sending block {:?} to peer#{}", block_message.block.hash().to_reversed_str(), peer_index); connection.send_block(&block_message); } }, + Task::SendMerkleBlock(peer_index, merkleblock) => { + if let Some(connection) = self.peers.get_mut(&peer_index) { + trace!(target: "sync", "Sending merkleblock {:?} to peer#{}", merkleblock.block_header.hash().to_reversed_str(), peer_index); + connection.send_merkleblock(&merkleblock); + } + }, + Task::SendTransaction(peer_index, transaction) => { + let transaction_message = types::Tx { + transaction: transaction, + }; + + if let Some(connection) = self.peers.get_mut(&peer_index) { + trace!(target: "sync", "Sending transaction {:?} to peer#{}", transaction_message.transaction.hash().to_reversed_str(), peer_index); + connection.send_transaction(&transaction_message); + } + }, Task::SendNotFound(peer_index, unknown_inventory, id) => { let notfound = types::NotFound { inventory: unknown_inventory, diff --git a/sync/src/synchronization_server.rs b/sync/src/synchronization_server.rs index 67185ea9..a6b71762 100644 --- a/sync/src/synchronization_server.rs +++ b/sync/src/synchronization_server.rs @@ -7,15 +7,16 @@ use futures::{Future, BoxFuture, lazy, finished}; use parking_lot::{Mutex, Condvar}; use message::common::{InventoryVector, InventoryType}; use db; -use chain::BlockHeader; +use chain::{BlockHeader, Transaction}; use primitives::hash::H256; use synchronization_chain::{ChainRef, TransactionState}; use synchronization_executor::{Task, TaskExecutor}; +use synchronization_client::FilteredInventory; use message::types; /// Synchronization requests server trait pub trait Server : Send + Sync + 'static { - fn serve_getdata(&self, peer_index: usize, message: types::GetData) -> Option; + fn serve_getdata(&self, peer_index: usize, inventory: FilteredInventory) -> Option; fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) -> Option; fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option) -> Option; fn serve_mempool(&self, peer_index: usize) -> Option; @@ -99,12 +100,14 @@ impl IndexedServerTask { #[derive(Debug, PartialEq)] pub enum ServerTask { - ServeGetData(Vec), + ServeGetData(FilteredInventory), ServeGetBlocks(db::BestBlock, H256), ServeGetHeaders(db::BestBlock, H256), ServeMempool, ReturnNotFound(Vec), ReturnBlock(H256), + ReturnMerkleBlock(types::MerkleBlock), + ReturnTransaction(Transaction), Ignore, } @@ -164,7 +167,16 @@ impl SynchronizationServer { { let chain = chain.read(); let storage = chain.storage(); - for item in inventory { + // process merkleblock items + for (merkleblock, transactions) in inventory.filtered { + new_tasks.push(IndexedServerTask::new(ServerTask::ReturnMerkleBlock(merkleblock), ServerTaskIndex::None)); + new_tasks.extend(transactions.into_iter().map(|(_, t)| + IndexedServerTask::new(ServerTask::ReturnTransaction(t), ServerTaskIndex::None))); + } + // extend with unknown merkleitems + unknown_items.extend(inventory.notfound); + // process unfiltered items + for item in inventory.unfiltered { match item.inv_type { InventoryType::MessageBlock => { match storage.block_number(&item.hash) { @@ -256,6 +268,14 @@ impl SynchronizationServer { // inform that we have processed task for peer queue.lock().task_processed(peer_index); }, + // `merkleblock` + ServerTask::ReturnMerkleBlock(merkleblock) => { + executor.lock().execute(Task::SendMerkleBlock(peer_index, merkleblock)); + }, + // `tx` + ServerTask::ReturnTransaction(transaction) => { + executor.lock().execute(Task::SendTransaction(peer_index, transaction)); + } // ignore ServerTask::Ignore => { let response_id = indexed_task.id.raw().expect("do not schedule redundant ignore task"); @@ -345,8 +365,8 @@ impl Drop for SynchronizationServer { } impl Server for SynchronizationServer { - fn serve_getdata(&self, _peer_index: usize, message: types::GetData) -> Option { - let task = IndexedServerTask::new(ServerTask::ServeGetData(message.inventory), ServerTaskIndex::None); + fn serve_getdata(&self, _peer_index: usize, inventory: FilteredInventory) -> Option { + let task = IndexedServerTask::new(ServerTask::ServeGetData(inventory), ServerTaskIndex::None); Some(task) } @@ -478,6 +498,7 @@ pub mod tests { use synchronization_executor::Task; use synchronization_executor::tests::DummyTaskExecutor; use synchronization_chain::Chain; + use synchronization_client::FilteredInventory; use super::{Server, ServerTask, SynchronizationServer, ServerTaskIndex, IndexedServerTask}; pub struct DummyServer { @@ -497,8 +518,8 @@ pub mod tests { } impl Server for DummyServer { - fn serve_getdata(&self, peer_index: usize, message: types::GetData) -> Option { - self.tasks.lock().push((peer_index, ServerTask::ServeGetData(message.inventory))); + fn serve_getdata(&self, peer_index: usize, inventory: FilteredInventory) -> Option { + self.tasks.lock().push((peer_index, ServerTask::ServeGetData(inventory))); None } @@ -544,9 +565,7 @@ pub mod tests { hash: H256::default(), } ]; - server.serve_getdata(0, types::GetData { - inventory: inventory.clone(), - }).map(|t| server.add_task(0, t)); + server.serve_getdata(0, FilteredInventory::with_unfiltered(inventory.clone())).map(|t| server.add_task(0, t)); // => respond with notfound let tasks = DummyTaskExecutor::wait_tasks(executor); assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::None)]); @@ -562,9 +581,7 @@ pub mod tests { hash: test_data::genesis().hash(), } ]; - server.serve_getdata(0, types::GetData { - inventory: inventory.clone(), - }).map(|t| server.add_task(0, t)); + server.serve_getdata(0, FilteredInventory::with_unfiltered(inventory)).map(|t| server.add_task(0, t)); // => respond with block let tasks = DummyTaskExecutor::wait_tasks(executor); assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis(), ServerTaskIndex::None)]);