Merge pull request #174 from ethcore/sync_merkleblock

Respond to getdata' inventory item with `MSG_FILTERED_BLOCK` type
This commit is contained in:
Nikolay Volf 2016-11-24 15:10:54 +03:00 committed by GitHub
commit 59044bf2e0
10 changed files with 531 additions and 24 deletions

1
Cargo.lock generated
View File

@ -662,6 +662,7 @@ dependencies = [
"p2p 0.1.0", "p2p 0.1.0",
"parking_lot 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"primitives 0.1.0", "primitives 0.1.0",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
"script 0.1.0", "script 0.1.0",
"serialization 0.1.0", "serialization 0.1.0",
"test-data 0.1.0", "test-data 0.1.0",

View File

@ -19,6 +19,7 @@ pub use primitives::{hash, bytes};
pub use self::block::Block; pub use self::block::Block;
pub use self::block_header::BlockHeader; pub use self::block_header::BlockHeader;
pub use self::merkle_root::merkle_root; pub use self::merkle_root::merkle_root;
pub use self::merkle_root::merkle_node_hash;
pub use self::transaction::{ pub use self::transaction::{
Transaction, TransactionInput, TransactionOutput, OutPoint, Transaction, TransactionInput, TransactionOutput, OutPoint,
SEQUENCE_LOCKTIME_DISABLE_FLAG, SEQUENCE_FINAL, SEQUENCE_LOCKTIME_DISABLE_FLAG, SEQUENCE_FINAL,

View File

@ -19,19 +19,24 @@ pub fn merkle_root(hashes: &[H256]) -> H256 {
let mut row = vec![]; let mut row = vec![];
let mut i = 0; let mut i = 0;
while i + 1 < hashes.len() { while i + 1 < hashes.len() {
row.push(dhash256(&*concat(&hashes[i], &hashes[i + 1]))); row.push(merkle_node_hash(&hashes[i], &hashes[i + 1]));
i += 2 i += 2
} }
// duplicate the last element if len is not even // duplicate the last element if len is not even
if hashes.len() % 2 == 1 { if hashes.len() % 2 == 1 {
let last = &hashes[hashes.len() - 1]; let last = &hashes[hashes.len() - 1];
row.push(dhash256(&*concat(last, last))); row.push(merkle_node_hash(last, last));
} }
merkle_root(&row) merkle_root(&row)
} }
/// Calculate merkle tree node hash
pub fn merkle_node_hash(left: &H256, right: &H256) -> H256 {
dhash256(&*concat(left, right))
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use hash::H256; use hash::H256;

View File

@ -14,6 +14,7 @@ linked-hash-map = "0.3"
ethcore-devtools = { path = "../devtools" } ethcore-devtools = { path = "../devtools" }
bit-vec = "0.4.3" bit-vec = "0.4.3"
murmur3 = "0.3" murmur3 = "0.3"
rand = "0.3"
chain = { path = "../chain" } chain = { path = "../chain" }
db = { path = "../db" } db = { path = "../db" }

View File

@ -1,9 +1,11 @@
use std::cmp::min;
use linked_hash_map::LinkedHashMap; use linked_hash_map::LinkedHashMap;
use bit_vec::BitVec; use bit_vec::BitVec;
use murmur3::murmur3_32; use murmur3::murmur3_32;
use chain::{Transaction, OutPoint}; use chain::{Block, Transaction, OutPoint, merkle_node_hash};
use ser::serialize; use ser::serialize;
use message::types; use message::types;
use primitives::bytes::Bytes;
use primitives::hash::H256; use primitives::hash::H256;
use script::Script; use script::Script;
@ -38,6 +40,29 @@ struct ConnectionBloom {
tweak: u32, tweak: u32,
} }
/// `merkleblock` build artefacts
#[derive(Debug, PartialEq)]
pub struct MerkleBlockArtefacts {
/// `merkleblock` message
pub merkleblock: types::MerkleBlock,
/// All matching transactions
pub matching_transactions: Vec<(H256, Transaction)>,
}
/// Service structure to construct `merkleblock` message.
struct PartialMerkleTree {
/// All transactions length.
all_len: usize,
/// All transactions hashes.
all_hashes: Vec<H256>,
/// Match flags for all transactions.
all_matches: BitVec,
/// Partial hashes.
hashes: Vec<H256>,
/// Partial match flags.
matches: BitVec,
}
impl Default for ConnectionFilter { impl Default for ConnectionFilter {
fn default() -> Self { fn default() -> Self {
ConnectionFilter { ConnectionFilter {
@ -182,6 +207,56 @@ impl ConnectionFilter {
pub fn clear(&mut self) { pub fn clear(&mut self) {
self.bloom = None; self.bloom = None;
} }
/// Convert `Block` to `MerkleBlock` using this filter
pub fn build_merkle_block(&mut self, block: Block) -> Option<MerkleBlockArtefacts> {
if self.bloom.is_none() {
return None;
}
// prepare result
let all_len = block.transactions.len();
let mut result = MerkleBlockArtefacts {
merkleblock: types::MerkleBlock {
block_header: block.block_header.clone(),
total_transactions: all_len as u32,
hashes: Vec::default(),
flags: Bytes::default(),
},
matching_transactions: Vec::new(),
};
// calculate hashes && match flags for all transactions
let (all_hashes, all_flags) = block.transactions.into_iter()
.fold((Vec::<H256>::with_capacity(all_len), BitVec::with_capacity(all_len)), |(mut all_hashes, mut all_flags), t| {
let hash = t.hash();
let flag = self.filter_transaction(&hash, &t);
if flag {
result.matching_transactions.push((hash.clone(), t));
}
all_flags.push(flag);
all_hashes.push(hash);
(all_hashes, all_flags)
});
// build partial merkle tree
let (hashes, flags) = PartialMerkleTree::build(all_hashes, all_flags);
result.merkleblock.hashes.extend(hashes);
// to_bytes() converts [true, false, true] to 0b10100000
// while protocol requires [true, false, true] to be serialized as 0x00000101
result.merkleblock.flags = flags.to_bytes().into_iter()
.map(|b|
((b & 0b10000000) >> 7) |
((b & 0b01000000) >> 5) |
((b & 0b00100000) >> 3) |
((b & 0b00010000) >> 1) |
((b & 0b00001000) << 1) |
((b & 0b00000100) << 3) |
((b & 0b00000010) << 5) |
((b & 0b00000001) << 7)).collect::<Vec<u8>>().into();
Some(result)
}
} }
impl ConnectionBloom { impl ConnectionBloom {
@ -216,16 +291,175 @@ impl ConnectionBloom {
} }
} }
impl PartialMerkleTree {
/// Build partial merkle tree as described here:
/// https://bitcoin.org/en/developer-reference#creating-a-merkleblock-message
pub fn build(all_hashes: Vec<H256>, all_matches: BitVec) -> (Vec<H256>, BitVec) {
let mut partial_merkle_tree = PartialMerkleTree {
all_len: all_hashes.len(),
all_hashes: all_hashes,
all_matches: all_matches,
hashes: Vec::new(),
matches: BitVec::new(),
};
partial_merkle_tree.build_tree();
(partial_merkle_tree.hashes, partial_merkle_tree.matches)
}
#[cfg(test)]
/// Parse partial merkle tree as described here:
/// https://bitcoin.org/en/developer-reference#parsing-a-merkleblock-message
pub fn parse(all_len: usize, hashes: Vec<H256>, matches: BitVec) -> Result<(H256, Vec<H256>, BitVec), String> {
let mut partial_merkle_tree = PartialMerkleTree {
all_len: all_len,
all_hashes: Vec::new(),
all_matches: BitVec::from_elem(all_len, false),
hashes: hashes,
matches: matches,
};
let merkle_root = try!(partial_merkle_tree.parse_tree());
Ok((merkle_root, partial_merkle_tree.all_hashes, partial_merkle_tree.all_matches))
}
fn build_tree(&mut self) {
let tree_height = self.tree_height();
self.build_branch(tree_height, 0)
}
#[cfg(test)]
fn parse_tree(&mut self) -> Result<H256, String> {
if self.all_len == 0 {
return Err("no transactions".into());
}
if self.hashes.len() > self.all_len {
return Err("too many hashes".into());
}
if self.matches.len() < self.hashes.len() {
return Err("too few matches".into());
}
// parse tree
let mut matches_used = 0usize;
let mut hashes_used = 0usize;
let tree_height = self.tree_height();
let merkle_root = try!(self.parse_branch(tree_height, 0, &mut matches_used, &mut hashes_used));
if matches_used != self.matches.len() {
return Err("not all matches used".into());
}
if hashes_used != self.hashes.len() {
return Err("not all hashes used".into());
}
Ok(merkle_root)
}
fn build_branch(&mut self, height: usize, pos: usize) {
// determine whether this node is the parent of at least one matched txid
let transactions_begin = pos << height;
let transactions_end = min(self.all_len, (pos + 1) << height);
let flag = (transactions_begin..transactions_end).any(|idx| self.all_matches[idx]);
// remember flag
self.matches.push(flag);
// proceeed with descendants
if height == 0 || !flag {
// we're at the leaf level || there is no match
let hash = self.branch_hash(height, pos);
self.hashes.push(hash);
} else {
// proceed with left child
self.build_branch(height - 1, pos << 1);
// proceed with right child if any
if (pos << 1) + 1 < self.level_width(height - 1) {
self.build_branch(height - 1, (pos << 1) + 1);
}
}
}
#[cfg(test)]
fn parse_branch(&mut self, height: usize, pos: usize, matches_used: &mut usize, hashes_used: &mut usize) -> Result<H256, String> {
if *matches_used >= self.matches.len() {
return Err("all matches used".into());
}
let flag = self.matches[*matches_used];
*matches_used += 1;
if height == 0 || !flag {
// we're at the leaf level || there is no match
if *hashes_used > self.hashes.len() {
return Err("all hashes used".into());
}
// get node hash
let ref hash = self.hashes[*hashes_used];
*hashes_used += 1;
// on leaf level && matched flag set => mark transaction as matched
if height == 0 && flag {
self.all_hashes.push(hash.clone());
self.all_matches.set(pos, true);
}
Ok(hash.clone())
} else {
// proceed with left child
let left = try!(self.parse_branch(height - 1, pos << 1, matches_used, hashes_used));
// proceed with right child if any
let has_right_child = (pos << 1) + 1 < self.level_width(height - 1);
let right = if has_right_child {
try!(self.parse_branch(height - 1, (pos << 1) + 1, matches_used, hashes_used))
} else {
left.clone()
};
if has_right_child && left == right {
Err("met same hash twice".into())
} else {
Ok(merkle_node_hash(&left, &right))
}
}
}
fn tree_height(&self) -> usize {
let mut height = 0usize;
while self.level_width(height) > 1 {
height += 1;
}
height
}
fn level_width(&self, height: usize) -> usize {
(self.all_len + (1 << height) - 1) >> height
}
fn branch_hash(&self, height: usize, pos: usize) -> H256 {
if height == 0 {
self.all_hashes[pos].clone()
} else {
let left = self.branch_hash(height - 1, pos << 1);
let right = if (pos << 1) + 1 < self.level_width(height - 1) {
self.branch_hash(height - 1, (pos << 1) + 1)
} else {
left.clone()
};
merkle_node_hash(&left, &right)
}
}
}
#[cfg(test)] #[cfg(test)]
pub mod tests { pub mod tests {
use std::iter::{Iterator, repeat}; use std::iter::{Iterator, repeat};
use test_data; use test_data;
use message::types; use message::types;
use chain::Transaction; use chain::{merkle_root, Transaction};
use primitives::hash::H256; use primitives::hash::H256;
use primitives::bytes::Bytes; use primitives::bytes::Bytes;
use ser::serialize; use ser::serialize;
use super::{ConnectionFilter, ConnectionBloom}; use super::{ConnectionFilter, ConnectionBloom, PartialMerkleTree};
pub fn default_filterload() -> types::FilterLoad { pub fn default_filterload() -> types::FilterLoad {
types::FilterLoad { types::FilterLoad {
@ -323,4 +557,47 @@ pub mod tests {
assert!(filter.filter_transaction(&tx1.hash(), &tx1)); assert!(filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2)); assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
} }
#[test]
// test from core implementation (slow)
// https://github.com/bitcoin/bitcoin/blob/master/src/test/pmt_tests.cpp
fn test_build_merkle_block() {
use bit_vec::BitVec;
use rand::{Rng, SeedableRng, StdRng};
let rng_seed: &[_] = &[0, 0, 0, 0];
let mut rng: StdRng = SeedableRng::from_seed(rng_seed);
// for some transactions counts
let tx_counts: Vec<usize> = vec![1, 4, 7, 17, 56, 100, 127, 256, 312, 513, 1000, 4095];
for tx_count in tx_counts {
// build block with given transactions number
let transactions: Vec<Transaction> = (0..tx_count).map(|n| test_data::TransactionBuilder::with_version(n as i32).into()).collect();
let hashes: Vec<_> = transactions.iter().map(|t| t.hash()).collect();
let merkle_root = merkle_root(&hashes);
// mark different transactions as matched
for seed_tweak in 1..15 {
let mut matches: BitVec = BitVec::with_capacity(tx_count);
let mut matched_hashes: Vec<H256> = Vec::with_capacity(tx_count);
for i in 0usize..tx_count {
let is_match = (rng.gen::<u32>() & ((1 << (seed_tweak / 2)) - 1)) == 0;
matches.push(is_match);
if is_match {
matched_hashes.push(hashes[i].clone());
}
}
// build partial merkle tree
let (built_hashes, built_flags) = PartialMerkleTree::build(hashes.clone(), matches.clone());
// parse tree back
let (parsed_root, parsed_hashes, parsed_positions) = PartialMerkleTree::parse(tx_count, built_hashes, built_flags)
.expect("no error");
assert_eq!(matched_hashes, parsed_hashes);
assert_eq!(matches, parsed_positions);
assert_eq!(merkle_root, parsed_root);
}
}
}
} }

View File

@ -20,6 +20,7 @@ extern crate script;
extern crate serialization as ser; extern crate serialization as ser;
#[cfg(test)] #[cfg(test)]
extern crate ethcore_devtools as devtools; extern crate ethcore_devtools as devtools;
extern crate rand;
mod best_headers_chain; mod best_headers_chain;
mod blocks_writer; mod blocks_writer;

View File

@ -100,7 +100,11 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData) { pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData) {
trace!(target: "sync", "Got `getdata` message from peer#{}", peer_index); trace!(target: "sync", "Got `getdata` message from peer#{}", peer_index);
self.server.serve_getdata(peer_index, message).map(|t| self.server.add_task(peer_index, t)); let filtered_inventory = {
let mut client = self.client.lock();
client.filter_getdata_inventory(peer_index, message.inventory)
};
self.server.serve_getdata(peer_index, filtered_inventory).map(|t| self.server.add_task(peer_index, t));
} }
pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks) { pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
@ -227,9 +231,10 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
mod tests { mod tests {
use std::sync::Arc; use std::sync::Arc;
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use connection_filter::tests::{default_filterload, make_filteradd};
use synchronization_executor::Task; use synchronization_executor::Task;
use synchronization_executor::tests::DummyTaskExecutor; use synchronization_executor::tests::DummyTaskExecutor;
use synchronization_client::{Config, SynchronizationClient, SynchronizationClientCore}; use synchronization_client::{Config, SynchronizationClient, SynchronizationClientCore, FilteredInventory};
use synchronization_chain::Chain; use synchronization_chain::Chain;
use p2p::{event_loop, OutboundSyncConnection, OutboundSyncConnectionRef}; use p2p::{event_loop, OutboundSyncConnection, OutboundSyncConnectionRef};
use message::types; use message::types;
@ -241,6 +246,7 @@ mod tests {
use synchronization_server::tests::DummyServer; use synchronization_server::tests::DummyServer;
use synchronization_verifier::tests::DummyVerifier; use synchronization_verifier::tests::DummyVerifier;
use tokio_core::reactor::{Core, Handle}; use tokio_core::reactor::{Core, Handle};
use primitives::bytes::Bytes;
struct DummyOutboundSyncConnection; struct DummyOutboundSyncConnection;
@ -317,6 +323,106 @@ mod tests {
}); });
// => `getdata` is served // => `getdata` is served
let tasks = server.take_tasks(); let tasks = server.take_tasks();
assert_eq!(tasks, vec![(peer_index, ServerTask::ServeGetData(inventory))]); assert_eq!(tasks, vec![(peer_index, ServerTask::ServeGetData(FilteredInventory::with_unfiltered(inventory)))]);
}
#[test]
fn local_node_serves_merkleblock() {
let (_, _, _, server, local_node) = create_local_node();
let genesis = test_data::genesis();
let b1 = test_data::block_builder().header().parent(genesis.hash()).build()
.transaction().output().value(10).build().build()
.build(); // genesis -> b1
let b2 = test_data::block_builder().header().parent(b1.hash()).build()
.transaction().output().value(20).build().build()
.build(); // genesis -> b1 -> b2
let tx1 = b1.transactions[0].clone();
let tx2 = b2.transactions[0].clone();
let tx1_hash = tx1.hash();
let tx2_hash = tx2.hash();
let b1_hash = b1.hash();
let b2_hash = b2.hash();
let match_tx1 = vec![(tx1_hash.clone(), tx1)];
let match_tx2 = vec![(tx2_hash.clone(), tx2)];
let no_match_bytes = Bytes::from(vec![0x00]);
let match_bytes = Bytes::from(vec![0x01]);
// This peer will provide blocks
let peer_index1 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
local_node.on_peer_block(peer_index1, types::Block { block: b1.clone() });
local_node.on_peer_block(peer_index1, types::Block { block: b2.clone() });
// This peer won't get any blocks, because it has not set filter for the connection
let peer_index2 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
local_node.on_peer_getdata(peer_index2, types::GetData {inventory: vec![
InventoryVector { inv_type: InventoryType::MessageFilteredBlock, hash: b1_hash.clone() },
InventoryVector { inv_type: InventoryType::MessageFilteredBlock, hash: b2_hash.clone() },
]});
assert_eq!(server.take_tasks(), vec![(peer_index2, ServerTask::ServeGetData(FilteredInventory::with_notfound(vec![
InventoryVector { inv_type: InventoryType::MessageFilteredBlock, hash: b1_hash.clone() },
InventoryVector { inv_type: InventoryType::MessageFilteredBlock, hash: b2_hash.clone() },
])))]);
let peers_config = vec![
(true, false), // will get tx1
(false, true), // will get tx2
(true, true), // will get both tx
(false, false), // won't get any tx
];
for (get_tx1, get_tx2) in peers_config {
let peer_index = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
// setup filter
local_node.on_peer_filterload(peer_index, default_filterload());
if get_tx1 {
local_node.on_peer_filteradd(peer_index, make_filteradd(&*tx1_hash));
}
if get_tx2 {
local_node.on_peer_filteradd(peer_index, make_filteradd(&*tx2_hash));
}
// ask for data
local_node.on_peer_getdata(peer_index, types::GetData {inventory: vec![
InventoryVector { inv_type: InventoryType::MessageFilteredBlock, hash: b1_hash.clone() },
InventoryVector { inv_type: InventoryType::MessageFilteredBlock, hash: b2_hash.clone() },
]});
// get server tasks
let tasks = server.take_tasks();
assert_eq!(tasks.len(), 1);
match tasks[0] {
(_, ServerTask::ServeGetData(ref filtered_inventory)) => {
assert_eq!(filtered_inventory.unfiltered.len(), 0);
assert_eq!(filtered_inventory.notfound.len(), 0);
assert_eq!(filtered_inventory.filtered.len(), 2);
assert_eq!(filtered_inventory.filtered[0].0, types::MerkleBlock {
block_header: b1.block_header.clone(),
total_transactions: 1,
hashes: vec![tx1_hash.clone()],
flags: if get_tx1 { match_bytes.clone() } else { no_match_bytes.clone() },
});
if get_tx1 {
assert_eq!(filtered_inventory.filtered[0].1, match_tx1);
} else {
assert_eq!(filtered_inventory.filtered[0].1, vec![]);
}
assert_eq!(filtered_inventory.filtered[1].0, types::MerkleBlock {
block_header: b2.block_header.clone(),
total_transactions: 1,
hashes: vec![tx2_hash.clone()],
flags: if get_tx2 { match_bytes.clone() } else { no_match_bytes.clone() },
});
if get_tx2 {
assert_eq!(filtered_inventory.filtered[1].1, match_tx2);
} else {
assert_eq!(filtered_inventory.filtered[1].1, vec![]);
}
},
_ => panic!("unexpected"),
}
}
} }
} }

View File

@ -183,6 +183,7 @@ pub struct Information {
pub trait Client : Send + 'static { pub trait Client : Send + 'static {
fn best_block(&self) -> db::BestBlock; fn best_block(&self) -> db::BestBlock;
fn state(&self) -> State; fn state(&self) -> State;
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory;
fn on_peer_connected(&mut self, peer_index: usize); fn on_peer_connected(&mut self, peer_index: usize);
fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec<H256>); fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
fn on_new_transactions_inventory(&mut self, peer_index: usize, transactions_hashes: Vec<H256>); fn on_new_transactions_inventory(&mut self, peer_index: usize, transactions_hashes: Vec<H256>);
@ -201,6 +202,7 @@ pub trait Client : Send + 'static {
pub trait ClientCore : VerificationSink { pub trait ClientCore : VerificationSink {
fn best_block(&self) -> db::BestBlock; fn best_block(&self) -> db::BestBlock;
fn state(&self) -> State; fn state(&self) -> State;
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory;
fn on_peer_connected(&mut self, peer_index: usize); fn on_peer_connected(&mut self, peer_index: usize);
fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec<H256>); fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
fn on_new_transactions_inventory(&mut self, peer_index: usize, transactions_hashes: Vec<H256>); fn on_new_transactions_inventory(&mut self, peer_index: usize, transactions_hashes: Vec<H256>);
@ -219,11 +221,23 @@ pub trait ClientCore : VerificationSink {
/// Synchronization client configuration options. /// Synchronization client configuration options.
#[derive(Debug)]
pub struct Config { pub struct Config {
/// Number of threads to allocate in synchronization CpuPool. /// Number of threads to allocate in synchronization CpuPool.
pub threads_num: usize, pub threads_num: usize,
} }
/// Filtered `getdata` inventory.
#[derive(Debug, PartialEq)]
pub struct FilteredInventory {
/// Merkleblock messages + transactions to send after
pub filtered: Vec<(types::MerkleBlock, Vec<(H256, Transaction)>)>,
/// Rest of inventory with MessageTx, MessageBlock, MessageCompactBlock inventory types
pub unfiltered: Vec<InventoryVector>,
/// Items that were supposed to be filtered, but we know nothing about these
pub notfound: Vec<InventoryVector>,
}
/// Synchronization client facade /// Synchronization client facade
pub struct SynchronizationClient<T: TaskExecutor, U: Verifier> { pub struct SynchronizationClient<T: TaskExecutor, U: Verifier> {
/// Client core /// Client core
@ -264,6 +278,26 @@ impl Config {
} }
} }
impl FilteredInventory {
#[cfg(test)]
pub fn with_unfiltered(unfiltered: Vec<InventoryVector>) -> Self {
FilteredInventory {
filtered: Vec::new(),
unfiltered: unfiltered,
notfound: Vec::new(),
}
}
#[cfg(test)]
pub fn with_notfound(notfound: Vec<InventoryVector>) -> Self {
FilteredInventory {
filtered: Vec::new(),
unfiltered: Vec::new(),
notfound: notfound,
}
}
}
impl State { impl State {
pub fn is_saturated(&self) -> bool { pub fn is_saturated(&self) -> bool {
match *self { match *self {
@ -296,6 +330,10 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
self.core.lock().state() self.core.lock().state()
} }
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory {
self.core.lock().filter_getdata_inventory(peer_index, inventory)
}
fn on_peer_connected(&mut self, peer_index: usize) { fn on_peer_connected(&mut self, peer_index: usize) {
self.core.lock().on_peer_connected(peer_index); self.core.lock().on_peer_connected(peer_index);
} }
@ -395,6 +433,41 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
self.state self.state
} }
/// Filter inventory from `getdata` message for given peer
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory {
let chain = self.chain.read();
let mut filter = self.peers.filter_mut(peer_index);
let mut filtered: Vec<(types::MerkleBlock, Vec<(H256, Transaction)>)> = Vec::new();
let mut unfiltered: Vec<InventoryVector> = Vec::new();
let mut notfound: Vec<InventoryVector> = Vec::new();
for item in inventory {
match item.inv_type {
// if peer asks for filtered block => we should:
// 1) check if block has any transactions, matching connection bloom filter
// 2) build && send `merkleblock` message for this block
// 3) send all matching transactions after this block
InventoryType::MessageFilteredBlock => {
match chain.storage().block(db::BlockRef::Hash(item.hash.clone())) {
None => notfound.push(item),
Some(block) => match filter.build_merkle_block(block) {
None => notfound.push(item),
Some(merkleblock) => filtered.push((merkleblock.merkleblock, merkleblock.matching_transactions)),
}
}
},
// these will be filtered (found/not found) in sync server
_ => unfiltered.push(item),
}
}
FilteredInventory {
filtered: filtered,
unfiltered: unfiltered,
notfound: notfound,
}
}
/// Called when new peer connection is established /// Called when new peer connection is established
fn on_peer_connected(&mut self, peer_index: usize) { fn on_peer_connected(&mut self, peer_index: usize) {
// unuseful until respond with headers message // unuseful until respond with headers message
@ -688,6 +761,11 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
self.execute_synchronization_tasks(None); self.execute_synchronization_tasks(None);
// relay block to our peers // relay block to our peers
// TODO:
// SPV clients that wish to use Bloom filtering would normally set version.fRelay to false in the version message,
// then set a filter based on their wallet (or a subset of it, if they are overlapping different peers).
// Being able to opt-out of inv messages until the filter is set prevents a client being flooded with traffic in
// the brief window of time between finishing version handshaking and setting the filter.
if self.state.is_saturated() || self.state.is_nearly_saturated() { if self.state.is_saturated() || self.state.is_nearly_saturated() {
self.relay_new_blocks(insert_result.canonized_blocks_hashes); self.relay_new_blocks(insert_result.canonized_blocks_hashes);
} }

View File

@ -1,7 +1,7 @@
use std::sync::Arc; use std::sync::Arc;
use std::collections::HashMap; use std::collections::HashMap;
use parking_lot::Mutex; use parking_lot::Mutex;
use chain::{Block, BlockHeader}; use chain::{Block, BlockHeader, Transaction};
use message::common::{InventoryVector, InventoryType}; use message::common::{InventoryVector, InventoryType};
use message::types; use message::types;
use primitives::hash::H256; use primitives::hash::H256;
@ -30,6 +30,10 @@ pub enum Task {
RequestMemoryPool(usize), RequestMemoryPool(usize),
/// Send block. /// Send block.
SendBlock(usize, Block, ServerTaskIndex), SendBlock(usize, Block, ServerTaskIndex),
/// Send merkleblock
SendMerkleBlock(usize, types::MerkleBlock),
/// Send transaction
SendTransaction(usize, Transaction),
/// Send notfound /// Send notfound
SendNotFound(usize, Vec<InventoryVector>, ServerTaskIndex), SendNotFound(usize, Vec<InventoryVector>, ServerTaskIndex),
/// Send inventory /// Send inventory
@ -128,10 +132,26 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
if let Some(connection) = self.peers.get_mut(&peer_index) { if let Some(connection) = self.peers.get_mut(&peer_index) {
assert_eq!(id.raw(), None); assert_eq!(id.raw(), None);
trace!(target: "sync", "Sending block {:?} to peer#{}", block_message.block.hash(), peer_index); trace!(target: "sync", "Sending block {:?} to peer#{}", block_message.block.hash().to_reversed_str(), peer_index);
connection.send_block(&block_message); connection.send_block(&block_message);
} }
}, },
Task::SendMerkleBlock(peer_index, merkleblock) => {
if let Some(connection) = self.peers.get_mut(&peer_index) {
trace!(target: "sync", "Sending merkleblock {:?} to peer#{}", merkleblock.block_header.hash().to_reversed_str(), peer_index);
connection.send_merkleblock(&merkleblock);
}
},
Task::SendTransaction(peer_index, transaction) => {
let transaction_message = types::Tx {
transaction: transaction,
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
trace!(target: "sync", "Sending transaction {:?} to peer#{}", transaction_message.transaction.hash().to_reversed_str(), peer_index);
connection.send_transaction(&transaction_message);
}
},
Task::SendNotFound(peer_index, unknown_inventory, id) => { Task::SendNotFound(peer_index, unknown_inventory, id) => {
let notfound = types::NotFound { let notfound = types::NotFound {
inventory: unknown_inventory, inventory: unknown_inventory,

View File

@ -7,15 +7,16 @@ use futures::{Future, BoxFuture, lazy, finished};
use parking_lot::{Mutex, Condvar}; use parking_lot::{Mutex, Condvar};
use message::common::{InventoryVector, InventoryType}; use message::common::{InventoryVector, InventoryType};
use db; use db;
use chain::BlockHeader; use chain::{BlockHeader, Transaction};
use primitives::hash::H256; use primitives::hash::H256;
use synchronization_chain::{ChainRef, TransactionState}; use synchronization_chain::{ChainRef, TransactionState};
use synchronization_executor::{Task, TaskExecutor}; use synchronization_executor::{Task, TaskExecutor};
use synchronization_client::FilteredInventory;
use message::types; use message::types;
/// Synchronization requests server trait /// Synchronization requests server trait
pub trait Server : Send + Sync + 'static { pub trait Server : Send + Sync + 'static {
fn serve_getdata(&self, peer_index: usize, message: types::GetData) -> Option<IndexedServerTask>; fn serve_getdata(&self, peer_index: usize, inventory: FilteredInventory) -> Option<IndexedServerTask>;
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) -> Option<IndexedServerTask>; fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) -> Option<IndexedServerTask>;
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option<u32>) -> Option<IndexedServerTask>; fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option<u32>) -> Option<IndexedServerTask>;
fn serve_mempool(&self, peer_index: usize) -> Option<IndexedServerTask>; fn serve_mempool(&self, peer_index: usize) -> Option<IndexedServerTask>;
@ -99,12 +100,14 @@ impl IndexedServerTask {
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum ServerTask { pub enum ServerTask {
ServeGetData(Vec<InventoryVector>), ServeGetData(FilteredInventory),
ServeGetBlocks(db::BestBlock, H256), ServeGetBlocks(db::BestBlock, H256),
ServeGetHeaders(db::BestBlock, H256), ServeGetHeaders(db::BestBlock, H256),
ServeMempool, ServeMempool,
ReturnNotFound(Vec<InventoryVector>), ReturnNotFound(Vec<InventoryVector>),
ReturnBlock(H256), ReturnBlock(H256),
ReturnMerkleBlock(types::MerkleBlock),
ReturnTransaction(Transaction),
Ignore, Ignore,
} }
@ -164,7 +167,16 @@ impl SynchronizationServer {
{ {
let chain = chain.read(); let chain = chain.read();
let storage = chain.storage(); let storage = chain.storage();
for item in inventory { // process merkleblock items
for (merkleblock, transactions) in inventory.filtered {
new_tasks.push(IndexedServerTask::new(ServerTask::ReturnMerkleBlock(merkleblock), ServerTaskIndex::None));
new_tasks.extend(transactions.into_iter().map(|(_, t)|
IndexedServerTask::new(ServerTask::ReturnTransaction(t), ServerTaskIndex::None)));
}
// extend with unknown merkleitems
unknown_items.extend(inventory.notfound);
// process unfiltered items
for item in inventory.unfiltered {
match item.inv_type { match item.inv_type {
InventoryType::MessageBlock => { InventoryType::MessageBlock => {
match storage.block_number(&item.hash) { match storage.block_number(&item.hash) {
@ -256,6 +268,14 @@ impl SynchronizationServer {
// inform that we have processed task for peer // inform that we have processed task for peer
queue.lock().task_processed(peer_index); queue.lock().task_processed(peer_index);
}, },
// `merkleblock`
ServerTask::ReturnMerkleBlock(merkleblock) => {
executor.lock().execute(Task::SendMerkleBlock(peer_index, merkleblock));
},
// `tx`
ServerTask::ReturnTransaction(transaction) => {
executor.lock().execute(Task::SendTransaction(peer_index, transaction));
}
// ignore // ignore
ServerTask::Ignore => { ServerTask::Ignore => {
let response_id = indexed_task.id.raw().expect("do not schedule redundant ignore task"); let response_id = indexed_task.id.raw().expect("do not schedule redundant ignore task");
@ -345,8 +365,8 @@ impl Drop for SynchronizationServer {
} }
impl Server for SynchronizationServer { impl Server for SynchronizationServer {
fn serve_getdata(&self, _peer_index: usize, message: types::GetData) -> Option<IndexedServerTask> { fn serve_getdata(&self, _peer_index: usize, inventory: FilteredInventory) -> Option<IndexedServerTask> {
let task = IndexedServerTask::new(ServerTask::ServeGetData(message.inventory), ServerTaskIndex::None); let task = IndexedServerTask::new(ServerTask::ServeGetData(inventory), ServerTaskIndex::None);
Some(task) Some(task)
} }
@ -478,6 +498,7 @@ pub mod tests {
use synchronization_executor::Task; use synchronization_executor::Task;
use synchronization_executor::tests::DummyTaskExecutor; use synchronization_executor::tests::DummyTaskExecutor;
use synchronization_chain::Chain; use synchronization_chain::Chain;
use synchronization_client::FilteredInventory;
use super::{Server, ServerTask, SynchronizationServer, ServerTaskIndex, IndexedServerTask}; use super::{Server, ServerTask, SynchronizationServer, ServerTaskIndex, IndexedServerTask};
pub struct DummyServer { pub struct DummyServer {
@ -497,8 +518,8 @@ pub mod tests {
} }
impl Server for DummyServer { impl Server for DummyServer {
fn serve_getdata(&self, peer_index: usize, message: types::GetData) -> Option<IndexedServerTask> { fn serve_getdata(&self, peer_index: usize, inventory: FilteredInventory) -> Option<IndexedServerTask> {
self.tasks.lock().push((peer_index, ServerTask::ServeGetData(message.inventory))); self.tasks.lock().push((peer_index, ServerTask::ServeGetData(inventory)));
None None
} }
@ -544,9 +565,7 @@ pub mod tests {
hash: H256::default(), hash: H256::default(),
} }
]; ];
server.serve_getdata(0, types::GetData { server.serve_getdata(0, FilteredInventory::with_unfiltered(inventory.clone())).map(|t| server.add_task(0, t));
inventory: inventory.clone(),
}).map(|t| server.add_task(0, t));
// => respond with notfound // => respond with notfound
let tasks = DummyTaskExecutor::wait_tasks(executor); let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::None)]); assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::None)]);
@ -562,9 +581,7 @@ pub mod tests {
hash: test_data::genesis().hash(), hash: test_data::genesis().hash(),
} }
]; ];
server.serve_getdata(0, types::GetData { server.serve_getdata(0, FilteredInventory::with_unfiltered(inventory)).map(|t| server.add_task(0, t));
inventory: inventory.clone(),
}).map(|t| server.add_task(0, t));
// => respond with block // => respond with block
let tasks = DummyTaskExecutor::wait_tasks(executor); let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis(), ServerTaskIndex::None)]); assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis(), ServerTaskIndex::None)]);