filter merkleblocks in getdata

This commit is contained in:
Svyatoslav Nikolsky 2016-11-24 10:10:20 +03:00
parent f527b023da
commit 5818c8fafa
5 changed files with 249 additions and 29 deletions

View File

@ -5,6 +5,7 @@ use murmur3::murmur3_32;
use chain::{Block, Transaction, OutPoint, merkle_root};
use ser::serialize;
use message::types;
use primitives::bytes::Bytes;
use primitives::hash::H256;
use script::Script;
@ -39,6 +40,14 @@ struct ConnectionBloom {
tweak: u32,
}
/// `merkleblock` build artefacts
pub struct MerkleBlockArtefacts {
/// `merkleblock` message
pub merkleblock: types::MerkleBlock,
/// All matching transactions
pub matching_transactions: Vec<(H256, Transaction)>,
}
/// Service structure to construct `merkleblock` message.
pub struct PartialMerkleTree {
/// All transactions length.
@ -199,29 +208,42 @@ impl ConnectionFilter {
}
/// Convert `Block` to `MerkleBlock` using this filter
pub fn make_merkle_block(&mut self, block: &Block) -> Option<types::MerkleBlock> {
pub fn build_merkle_block(&mut self, block: Block) -> Option<MerkleBlockArtefacts> {
if self.bloom.is_none() {
return None;
}
// calculate hashes && match flags for all transactions
// prepare result
let all_len = block.transactions.len();
let (all_hashes, all_flags) = block.transactions.iter()
let mut result = MerkleBlockArtefacts {
merkleblock: types::MerkleBlock {
block_header: block.block_header.clone(),
total_transactions: all_len as u32,
hashes: Vec::default(),
flags: Bytes::default(),
},
matching_transactions: Vec::new(),
};
// calculate hashes && match flags for all transactions
let (all_hashes, all_flags) = block.transactions.into_iter()
.fold((Vec::<H256>::with_capacity(all_len), BitVec::with_capacity(all_len)), |(mut all_hashes, mut all_flags), t| {
let hash = t.hash();
all_flags.push(self.filter_transaction(&hash, t));
let flag = self.filter_transaction(&hash, &t);
if flag {
result.matching_transactions.push((hash.clone(), t));
}
all_flags.push(flag);
all_hashes.push(hash);
(all_hashes, all_flags)
});
// build partial merkle tree
let (hashes, flags) = PartialMerkleTree::build(all_hashes, all_flags);
Some(types::MerkleBlock {
block_header: block.block_header.clone(),
total_transactions: all_len as u32,
hashes: hashes,
flags: flags.to_bytes().into(),
})
result.merkleblock.hashes.extend(hashes);
result.merkleblock.flags = flags.to_bytes().into();
Some(result)
}
}
@ -255,6 +277,11 @@ impl ConnectionBloom {
self.filter.set(murmur_hash, true);
}
}
#[cfg(test)]
pub fn bytes(&self) -> Bytes {
self.filter.to_bytes().into()
}
}
impl PartialMerkleTree {
@ -336,6 +363,14 @@ pub mod tests {
}
}
pub fn make_filterload(data: &[u8]) -> types::FilterLoad {
let mut filterload = default_filterload();
let mut bloom = ConnectionBloom::new(&filterload);
bloom.insert(data);
filterload.filter = bloom.bytes();
filterload
}
pub fn make_filteradd(data: &[u8]) -> types::FilterAdd {
types::FilterAdd {
data: data.into(),

View File

@ -100,7 +100,11 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData) {
trace!(target: "sync", "Got `getdata` message from peer#{}", peer_index);
self.server.serve_getdata(peer_index, message).map(|t| self.server.add_task(peer_index, t));
let filtered_inventory = {
let mut client = self.client.lock();
client.filter_getdata_inventory(peer_index, message.inventory)
};
self.server.serve_getdata(peer_index, filtered_inventory).map(|t| self.server.add_task(peer_index, t));
}
pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
@ -227,9 +231,10 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
mod tests {
use std::sync::Arc;
use parking_lot::{Mutex, RwLock};
use connection_filter::tests::make_filterload;
use synchronization_executor::Task;
use synchronization_executor::tests::DummyTaskExecutor;
use synchronization_client::{Config, SynchronizationClient, SynchronizationClientCore};
use synchronization_client::{Config, SynchronizationClient, SynchronizationClientCore, FilteredInventory};
use synchronization_chain::Chain;
use p2p::{event_loop, OutboundSyncConnection, OutboundSyncConnectionRef};
use message::types;
@ -241,6 +246,7 @@ mod tests {
use synchronization_server::tests::DummyServer;
use synchronization_verifier::tests::DummyVerifier;
use tokio_core::reactor::{Core, Handle};
use primitives::bytes::Bytes;
struct DummyOutboundSyncConnection;
@ -317,6 +323,75 @@ mod tests {
});
// => `getdata` is served
let tasks = server.take_tasks();
assert_eq!(tasks, vec![(peer_index, ServerTask::ServeGetData(inventory))]);
assert_eq!(tasks, vec![(peer_index, ServerTask::ServeGetData(FilteredInventory::with_unfiltered(inventory)))]);
}
/*#[test]
fn local_node_serves_merkleblock() {
let (_, _, _, server, local_node) = create_local_node();
let genesis = test_data::genesis();
let b1 = test_data::block_builder().header().parent(genesis.hash()).build()
.transaction().output().value(10).build().build()
.build(); // genesis -> b1
let b2 = test_data::block_builder().header().parent(b1.hash()).build()
.transaction().output().value(20).build().build()
.build(); // genesis -> b1 -> b2
let b1_hash = b1.hash();
let b2_hash = b1.hash();
let match_bytes = Bytes::from(vec![0x80]);
// This peer will provide blocks
let peer_index1 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
local_node.on_peer_block(peer_index1, types::Block { block: b1.clone() });
local_node.on_peer_block(peer_index1, types::Block { block: b2.clone() });
// This peer won't get any blocks, because it has not set filter for the connection
let peer_index2 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
local_node.on_peer_getdata(peer_index2, types::GetData {inventory: vec![
InventoryVector { inv_type: InventoryType::MessageFilteredBlock, hash: b1_hash.clone() },
InventoryVector { inv_type: InventoryType::MessageFilteredBlock, hash: b2_hash.clone() },
]});
assert_eq!(server.take_tasks(), vec![(peer_index2, ServerTask::ServeGetData(FilteredInventory::with_notfound(vec![
InventoryVector { inv_type: InventoryType::MessageFilteredBlock, hash: b1_hash.clone() },
InventoryVector { inv_type: InventoryType::MessageFilteredBlock, hash: b2_hash.clone() },
])))]);
// This peer will get filtered b1
let peer_index3 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
local_node.on_peer_filterload(peer_index3, make_filterload(&*b1.transactions[0].hash()));
local_node.on_peer_getdata(peer_index3, types::GetData {inventory: vec![
InventoryVector { inv_type: InventoryType::MessageFilteredBlock, hash: b1_hash.clone() },
InventoryVector { inv_type: InventoryType::MessageFilteredBlock, hash: b2_hash.clone() },
]});
assert_eq!(server.take_tasks(), vec![(3, ServerTask::ServeGetData(FilteredInventory {
filtered: vec![
(
types::MerkleBlock {
block_header: b1.block_header.clone(),
total_transactions: 1,
hashes: vec!["48de599e797bc0aee90c09f32aead1fce19bd8857c9407d76c8809ad076bbe98".into()],
flags: Bytes::new_with_len(1)
}, vec![]
),
(
types::MerkleBlock {
block_header: b2.block_header.clone(),
total_transactions: 1,
hashes: vec!["48de599e797bc0aee90c09f32aead1fce19bd8857c9407d76c8809ad076bbe98".into()],
flags: Bytes::new_with_len(1)
}, vec![]
)
],
unfiltered: vec![],
notfound: vec![]
}))]);
// This peer will consume filtered b2
//let peer_index4 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
// This peer will consume filtered b1 + b2
//let peer_index5 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
// This peer will consume some other filtered block
//let peer_index6 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
}*/
}

View File

@ -183,6 +183,7 @@ pub struct Information {
pub trait Client : Send + 'static {
fn best_block(&self) -> db::BestBlock;
fn state(&self) -> State;
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory;
fn on_peer_connected(&mut self, peer_index: usize);
fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
fn on_new_transactions_inventory(&mut self, peer_index: usize, transactions_hashes: Vec<H256>);
@ -201,6 +202,7 @@ pub trait Client : Send + 'static {
pub trait ClientCore : VerificationSink {
fn best_block(&self) -> db::BestBlock;
fn state(&self) -> State;
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory;
fn on_peer_connected(&mut self, peer_index: usize);
fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
fn on_new_transactions_inventory(&mut self, peer_index: usize, transactions_hashes: Vec<H256>);
@ -219,11 +221,23 @@ pub trait ClientCore : VerificationSink {
/// Synchronization client configuration options.
#[derive(Debug)]
pub struct Config {
/// Number of threads to allocate in synchronization CpuPool.
pub threads_num: usize,
}
/// Filtered `getdata` inventory.
#[derive(Debug, PartialEq)]
pub struct FilteredInventory {
/// Merkleblock messages + transactions to send after
pub filtered: Vec<(types::MerkleBlock, Vec<(H256, Transaction)>)>,
/// Rest of inventory with MessageTx, MessageBlock, MessageCompactBlock inventory types
pub unfiltered: Vec<InventoryVector>,
/// Items that were supposed to be filtered, but we know nothing about these
pub notfound: Vec<InventoryVector>,
}
/// Synchronization client facade
pub struct SynchronizationClient<T: TaskExecutor, U: Verifier> {
/// Client core
@ -264,6 +278,26 @@ impl Config {
}
}
impl FilteredInventory {
#[cfg(test)]
pub fn with_unfiltered(unfiltered: Vec<InventoryVector>) -> Self {
FilteredInventory {
filtered: Vec::new(),
unfiltered: unfiltered,
notfound: Vec::new(),
}
}
#[cfg(test)]
pub fn with_notfound(notfound: Vec<InventoryVector>) -> Self {
FilteredInventory {
filtered: Vec::new(),
unfiltered: Vec::new(),
notfound: notfound,
}
}
}
impl State {
pub fn is_saturated(&self) -> bool {
match *self {
@ -296,6 +330,10 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
self.core.lock().state()
}
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory {
self.core.lock().filter_getdata_inventory(peer_index, inventory)
}
fn on_peer_connected(&mut self, peer_index: usize) {
self.core.lock().on_peer_connected(peer_index);
}
@ -395,6 +433,41 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
self.state
}
/// Filter inventory from `getdata` message for given peer
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory {
let chain = self.chain.read();
let mut filter = self.peers.filter_mut(peer_index);
let mut filtered: Vec<(types::MerkleBlock, Vec<(H256, Transaction)>)> = Vec::new();
let mut unfiltered: Vec<InventoryVector> = Vec::new();
let mut notfound: Vec<InventoryVector> = Vec::new();
for item in inventory.into_iter() {
match item.inv_type {
// if peer asks for filtered block => we should:
// 1) check if block has any transactions, matching connection bloom filter
// 2) build && send `merkleblock` message for this block
// 3) send all matching transactions after this block
InventoryType::MessageFilteredBlock => {
match chain.storage().block(db::BlockRef::Hash(item.hash.clone())) {
None => notfound.push(item),
Some(block) => match filter.build_merkle_block(block) {
None => notfound.push(item),
Some(merkleblock) => filtered.push((merkleblock.merkleblock, merkleblock.matching_transactions)),
}
}
},
// these will be filtered (found/not found) in sync server
_ => unfiltered.push(item),
}
}
FilteredInventory {
filtered: filtered,
unfiltered: unfiltered,
notfound: notfound,
}
}
/// Called when new peer connection is established
fn on_peer_connected(&mut self, peer_index: usize) {
// unuseful until respond with headers message

View File

@ -1,7 +1,7 @@
use std::sync::Arc;
use std::collections::HashMap;
use parking_lot::Mutex;
use chain::{Block, BlockHeader};
use chain::{Block, BlockHeader, Transaction};
use message::common::{InventoryVector, InventoryType};
use message::types;
use primitives::hash::H256;
@ -30,6 +30,10 @@ pub enum Task {
RequestMemoryPool(usize),
/// Send block.
SendBlock(usize, Block, ServerTaskIndex),
/// Send merkleblock
SendMerkleBlock(usize, types::MerkleBlock),
/// Send transaction
SendTransaction(usize, Transaction),
/// Send notfound
SendNotFound(usize, Vec<InventoryVector>, ServerTaskIndex),
/// Send inventory
@ -128,10 +132,26 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
if let Some(connection) = self.peers.get_mut(&peer_index) {
assert_eq!(id.raw(), None);
trace!(target: "sync", "Sending block {:?} to peer#{}", block_message.block.hash(), peer_index);
trace!(target: "sync", "Sending block {:?} to peer#{}", block_message.block.hash().to_reversed_str(), peer_index);
connection.send_block(&block_message);
}
},
Task::SendMerkleBlock(peer_index, merkleblock) => {
if let Some(connection) = self.peers.get_mut(&peer_index) {
trace!(target: "sync", "Sending merkleblock {:?} to peer#{}", merkleblock.block_header.hash().to_reversed_str(), peer_index);
connection.send_merkleblock(&merkleblock);
}
},
Task::SendTransaction(peer_index, transaction) => {
let transaction_message = types::Tx {
transaction: transaction,
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
trace!(target: "sync", "Sending transaction {:?} to peer#{}", transaction_message.transaction.hash().to_reversed_str(), peer_index);
connection.send_transaction(&transaction_message);
}
},
Task::SendNotFound(peer_index, unknown_inventory, id) => {
let notfound = types::NotFound {
inventory: unknown_inventory,

View File

@ -7,15 +7,16 @@ use futures::{Future, BoxFuture, lazy, finished};
use parking_lot::{Mutex, Condvar};
use message::common::{InventoryVector, InventoryType};
use db;
use chain::BlockHeader;
use chain::{BlockHeader, Transaction};
use primitives::hash::H256;
use synchronization_chain::{ChainRef, TransactionState};
use synchronization_executor::{Task, TaskExecutor};
use synchronization_client::FilteredInventory;
use message::types;
/// Synchronization requests server trait
pub trait Server : Send + Sync + 'static {
fn serve_getdata(&self, peer_index: usize, message: types::GetData) -> Option<IndexedServerTask>;
fn serve_getdata(&self, peer_index: usize, inventory: FilteredInventory) -> Option<IndexedServerTask>;
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) -> Option<IndexedServerTask>;
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option<u32>) -> Option<IndexedServerTask>;
fn serve_mempool(&self, peer_index: usize) -> Option<IndexedServerTask>;
@ -99,12 +100,14 @@ impl IndexedServerTask {
#[derive(Debug, PartialEq)]
pub enum ServerTask {
ServeGetData(Vec<InventoryVector>),
ServeGetData(FilteredInventory),
ServeGetBlocks(db::BestBlock, H256),
ServeGetHeaders(db::BestBlock, H256),
ServeMempool,
ReturnNotFound(Vec<InventoryVector>),
ReturnBlock(H256),
ReturnMerkleBlock(types::MerkleBlock),
ReturnTransaction(Transaction),
Ignore,
}
@ -164,7 +167,16 @@ impl SynchronizationServer {
{
let chain = chain.read();
let storage = chain.storage();
for item in inventory {
// process merkleblock items
for (merkleblock, transactions) in inventory.filtered {
new_tasks.push(IndexedServerTask::new(ServerTask::ReturnMerkleBlock(merkleblock), ServerTaskIndex::None));
new_tasks.extend(transactions.into_iter().map(|(_, t)|
IndexedServerTask::new(ServerTask::ReturnTransaction(t), ServerTaskIndex::None)));
}
// extend with unknown merkleitems
unknown_items.extend(inventory.notfound);
// process unfiltered items
for item in inventory.unfiltered {
match item.inv_type {
InventoryType::MessageBlock => {
match storage.block_number(&item.hash) {
@ -256,6 +268,14 @@ impl SynchronizationServer {
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
// `merkleblock`
ServerTask::ReturnMerkleBlock(merkleblock) => {
executor.lock().execute(Task::SendMerkleBlock(peer_index, merkleblock));
},
// `tx`
ServerTask::ReturnTransaction(transaction) => {
executor.lock().execute(Task::SendTransaction(peer_index, transaction));
}
// ignore
ServerTask::Ignore => {
let response_id = indexed_task.id.raw().expect("do not schedule redundant ignore task");
@ -345,8 +365,8 @@ impl Drop for SynchronizationServer {
}
impl Server for SynchronizationServer {
fn serve_getdata(&self, _peer_index: usize, message: types::GetData) -> Option<IndexedServerTask> {
let task = IndexedServerTask::new(ServerTask::ServeGetData(message.inventory), ServerTaskIndex::None);
fn serve_getdata(&self, _peer_index: usize, inventory: FilteredInventory) -> Option<IndexedServerTask> {
let task = IndexedServerTask::new(ServerTask::ServeGetData(inventory), ServerTaskIndex::None);
Some(task)
}
@ -478,6 +498,7 @@ pub mod tests {
use synchronization_executor::Task;
use synchronization_executor::tests::DummyTaskExecutor;
use synchronization_chain::Chain;
use synchronization_client::FilteredInventory;
use super::{Server, ServerTask, SynchronizationServer, ServerTaskIndex, IndexedServerTask};
pub struct DummyServer {
@ -497,8 +518,8 @@ pub mod tests {
}
impl Server for DummyServer {
fn serve_getdata(&self, peer_index: usize, message: types::GetData) -> Option<IndexedServerTask> {
self.tasks.lock().push((peer_index, ServerTask::ServeGetData(message.inventory)));
fn serve_getdata(&self, peer_index: usize, inventory: FilteredInventory) -> Option<IndexedServerTask> {
self.tasks.lock().push((peer_index, ServerTask::ServeGetData(inventory)));
None
}
@ -544,9 +565,7 @@ pub mod tests {
hash: H256::default(),
}
];
server.serve_getdata(0, types::GetData {
inventory: inventory.clone(),
}).map(|t| server.add_task(0, t));
server.serve_getdata(0, FilteredInventory::with_unfiltered(inventory.clone())).map(|t| server.add_task(0, t));
// => respond with notfound
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::None)]);
@ -562,9 +581,7 @@ pub mod tests {
hash: test_data::genesis().hash(),
}
];
server.serve_getdata(0, types::GetData {
inventory: inventory.clone(),
}).map(|t| server.add_task(0, t));
server.serve_getdata(0, FilteredInventory::with_unfiltered(inventory)).map(|t| server.add_task(0, t));
// => respond with block
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis(), ServerTaskIndex::None)]);