From cc48f0a75fc1fbdacef4299055d83879a958e9a4 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Tue, 22 Nov 2016 18:03:07 +0300 Subject: [PATCH 1/3] initial connection filter commit --- Cargo.lock | 13 ++ primitives/src/bytes.rs | 6 + sync/Cargo.toml | 4 + sync/src/connection_filter.rs | 226 ++++++++++++++++++++++++++++++++++ sync/src/lib.rs | 5 + sync/src/local_node.rs | 2 + 6 files changed, 256 insertions(+) create mode 100644 sync/src/connection_filter.rs diff --git a/Cargo.lock b/Cargo.lock index 6a748c59..15386579 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -357,6 +357,14 @@ dependencies = [ "ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "murmur3" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "net2" version = "0.2.26" @@ -639,6 +647,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" name = "sync" version = "0.1.0" dependencies = [ + "bit-vec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "chain 0.1.0", "db 0.1.0", "ethcore-devtools 1.3.0", @@ -648,9 +657,12 @@ dependencies = [ "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "message 0.1.0", "miner 0.1.0", + "murmur3 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "p2p 0.1.0", "parking_lot 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "primitives 0.1.0", + "script 0.1.0", + "serialization 0.1.0", "test-data 0.1.0", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.0 (git+https://github.com/debris/tokio-core)", @@ -802,6 +814,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d8b629fb514376c675b98c1421e80b151d3817ac42d7c667717d282761418d20" "checksum mio 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "410a1a0ff76f5a226f1e4e3ff1756128e65cd30166e39c3892283e2ac09d5b67" "checksum miow 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d5bfc6782530ac8ace97af10a540054a37126b63b0702ddaaa243b73b5745b9a" +"checksum murmur3 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ece7fe85164ffce69891c581b6b035a3c2f66dd3459f299d43d5efff90663e22" "checksum net2 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)" = "5edf9cb6be97212423aed9413dd4729d62b370b5e1c571750e882cebbbc1e3e2" "checksum nix 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a0d95c5fa8b641c10ad0b8887454ebaafa3c92b5cd5350f8fc693adafd178e7b" "checksum nodrop 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "0dbbadd3f4c98dea0bd3d9b4be4c0cdaf1ab57035cb2e41fce3983db5add7cc5" diff --git a/primitives/src/bytes.rs b/primitives/src/bytes.rs index 17e7d133..37958e75 100644 --- a/primitives/src/bytes.rs +++ b/primitives/src/bytes.rs @@ -14,6 +14,12 @@ impl Bytes { } } +impl<'a> From<&'a [u8]> for Bytes { + fn from(v: &[u8]) -> Self { + Bytes(v.into()) + } +} + impl From> for Bytes { fn from(v: Vec) -> Self { Bytes(v) diff --git a/sync/Cargo.toml b/sync/Cargo.toml index c97deb5d..d57b7fa5 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -12,6 +12,8 @@ futures-cpupool = "0.1" tokio-core = { git = "https://github.com/debris/tokio-core" } linked-hash-map = "0.3" ethcore-devtools = { path = "../devtools" } +bit-vec = "0.4.3" +murmur3 = "0.3" chain = { path = "../chain" } db = { path = "../db" } @@ -19,6 +21,8 @@ message = { path = "../message" } miner = { path = "../miner" } p2p = { path = "../p2p" } primitives = { path = "../primitives" } +script = { path = "../script" } +serialization = { path = "../serialization" } test-data = { path = "../test-data" } verification = { path = "../verification" } diff --git a/sync/src/connection_filter.rs b/sync/src/connection_filter.rs new file mode 100644 index 00000000..63281748 --- /dev/null +++ b/sync/src/connection_filter.rs @@ -0,0 +1,226 @@ +#![allow(dead_code)] + +use bit_vec::BitVec; +use murmur3::murmur3_32; +use chain::{Transaction, OutPoint}; +use ser::serialize; +use message::types; +use script::Script; + +/// Constant optimized to create large differences in the seed for different values of hash_functions_num. +const SEED_OFFSET: u32 = 0xFBA4C795; + +/// Filter, which controls data relayed over connection. +#[derive(Debug, Default)] +pub struct ConnectionFilter { + /// Bloom filter, if set. + bloom: Option, + /// Filter update type. + filter_flags: u8, +} + +/// Connection bloom filter +#[derive(Debug)] +struct ConnectionBloom { + /// Filter storage. + filter: BitVec, + /// Number of hash functions to use in bloom filter. + hash_functions_num: u32, + /// Value to add to Murmur3 hash seed when calculating hash. + tweak: u32, +} + +impl ConnectionFilter { + #[cfg(test)] + /// Create new connection with given filter params + pub fn with_filterload(message: &types::FilterLoad) -> Self { + ConnectionFilter { + bloom: Some(ConnectionBloom::new(message)), + filter_flags: message.flags, + } + } + + /// Check if transaction is matched && update filter + pub fn match_update_transaction(&mut self, transaction: &Transaction) -> bool { + match self.bloom { + /// if no filter is set for the connection => match everything + None => true, + /// filter using bloom filter, then update + Some(ref mut bloom) => { + let transaction_hash = transaction.hash(); + let mut is_match = false; + + // match if filter contains any arbitrary script data element in any scriptPubKey in tx + for (output_index, output) in transaction.outputs.iter().enumerate() { + let script = Script::new(output.script_pubkey.clone()); + for instruction in script.iter().filter_map(|i| i.ok()) { + if let Some(instruction_data) = instruction.data { + if bloom.contains(instruction_data) { + is_match = true; + + let is_update_needed = self.filter_flags == 1 + || (self.filter_flags == 2 && (script.is_pay_to_public_key() || script.is_multisig_script())); + if is_update_needed { + bloom.insert(&serialize(&OutPoint { + hash: transaction_hash.clone(), + index: output_index as u32, + })); + } + } + } + } + } + if is_match { + return is_match; + } + + // match if filter contains transaction itself + if bloom.contains(&*transaction_hash) { + return true; + } + + // match if filter contains an outpoint this transaction spends + for input in &transaction.inputs { + // check if match previous output + let previous_output = serialize(&input.previous_output); + is_match = bloom.contains(&*previous_output); + if is_match { + return true; + } + + // check if match any arbitrary script data element in any scriptSig in tx + let script = Script::new(input.script_sig.clone()); + for instruction in script.iter().filter_map(|i| i.ok()) { + if let Some(instruction_data) = instruction.data { + is_match = bloom.contains(&*instruction_data); + if is_match { + return true; + } + } + } + } + + // no matches + false + }, + } + } + + /// Load filter + pub fn load(&mut self, message: &types::FilterLoad) { + self.bloom = Some(ConnectionBloom::new(message)); + self.filter_flags = message.flags; + } + + /// Add filter + pub fn add(&mut self, message: &types::FilterAdd) { + // ignore if filter is not currently set + if let Some(ref mut bloom) = self.bloom { + bloom.insert(&message.data); + } + } + + /// Clear filter + pub fn clear(&mut self) { + self.bloom = None; + } +} + +impl ConnectionBloom { + /// Create with given parameters + pub fn new(message: &types::FilterLoad) -> Self { + ConnectionBloom { + filter: BitVec::from_bytes(&message.filter), + hash_functions_num: message.hash_functions, + tweak: message.tweak, + } + } + + /// True if filter contains given bytes + pub fn contains(&self, data: &[u8]) -> bool { + for hash_function_idx in 0..self.hash_functions_num { + let murmur_seed = hash_function_idx.overflowing_mul(SEED_OFFSET).0.overflowing_add(self.tweak).0; + let murmur_hash = murmur3_32(&mut data.as_ref(), murmur_seed) as usize % self.filter.len(); + if !self.filter.get(murmur_hash).expect("mod operation above") { + return false; + } + } + true + } + + /// Add bytes to the filter + pub fn insert(&mut self, data: &[u8]) { + for hash_function_idx in 0..self.hash_functions_num { + let murmur_seed = hash_function_idx.overflowing_mul(SEED_OFFSET).0.overflowing_add(self.tweak).0; + let murmur_hash = murmur3_32(&mut data.as_ref(), murmur_seed) as usize % self.filter.len(); + self.filter.set(murmur_hash, true); + } + } +} + +#[cfg(test)] +mod tests { + use std::iter::{Iterator, repeat}; + use test_data; + use message::types; + use chain::{Transaction, RepresentH256}; + use primitives::hash::H256; + use primitives::bytes::Bytes; + use super::{ConnectionFilter, ConnectionBloom}; + + fn default_filterload() -> types::FilterLoad { + types::FilterLoad { + filter: Bytes::from(repeat(0u8).take(1024).collect::>()), + hash_functions: 10, + tweak: 5, + flags: 0, + } + } + + fn make_filteradd(data: &[u8]) -> types::FilterAdd { + types::FilterAdd { + data: data.into(), + } + } + + #[test] + fn bloom_insert_data() { + let mut bloom = ConnectionBloom::new(&default_filterload()); + + assert!(!bloom.contains(&*H256::default())); + + bloom.insert(&*H256::default()); + assert!(bloom.contains(&*H256::default())); + } + + #[test] + fn connection_filter_matches_transaction_by_hash() { + let tx1: Transaction = test_data::TransactionBuilder::with_output(10).into(); + let tx2: Transaction = test_data::TransactionBuilder::with_output(20).into(); + + let mut filter = ConnectionFilter::with_filterload(&default_filterload()); + + assert!(!filter.match_update_transaction(&tx1)); + assert!(!filter.match_update_transaction(&tx2)); + + filter.add(&make_filteradd(&*tx1.hash())); + + assert!(filter.match_update_transaction(&tx1)); + assert!(!filter.match_update_transaction(&tx2)); + } + + #[test] + fn connection_filter_matches_transaction_by_output_script_data_element() { + // TODO + } + + #[test] + fn connection_filter_matches_transaction_by_previous_output_point() { + // TODO + } + + #[test] + fn connection_filter_matches_transaction_by_input_script_data_element() { + // TODO + } +} \ No newline at end of file diff --git a/sync/src/lib.rs b/sync/src/lib.rs index bcf96928..29c6c50b 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -9,16 +9,21 @@ extern crate message; extern crate p2p; extern crate parking_lot; extern crate linked_hash_map; +extern crate bit_vec; +extern crate murmur3; extern crate primitives; extern crate test_data; extern crate time; extern crate verification; extern crate miner; +extern crate script; +extern crate serialization as ser; #[cfg(test)] extern crate ethcore_devtools as devtools; mod best_headers_chain; mod blocks_writer; +mod connection_filter; mod hash_queue; mod inbound_connection; mod inbound_connection_factory; diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index deaaf0fe..b6c83389 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -12,6 +12,8 @@ use synchronization_server::{Server, SynchronizationServer}; use synchronization_verifier::AsyncVerifier; use primitives::hash::H256; +// TODO: check messages before processing (filterload' filter is max 36000, nHashFunc is <= 50, etc) + pub type LocalNodeRef = Arc>>; /// Local synchronization node From 3a7aae5188893e1f2f958ea00c115064e393669d Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Tue, 22 Nov 2016 18:11:46 +0300 Subject: [PATCH 2/3] fix after merge --- message/src/types/mod.rs | 1 + sync/src/connection_filter.rs | 25 +++++++++++++++++-------- sync/src/synchronization_server.rs | 2 +- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/message/src/types/mod.rs b/message/src/types/mod.rs index 914c0818..d6c0e865 100644 --- a/message/src/types/mod.rs +++ b/message/src/types/mod.rs @@ -31,6 +31,7 @@ pub use self::blocktxn::BlockTxn; pub use self::compactblock::CompactBlock; pub use self::feefilter::FeeFilter; pub use self::filterload::FilterLoad; +pub use self::filterload::FilterFlags; pub use self::filterclear::FilterClear; pub use self::filteradd::FilterAdd; pub use self::getaddr::GetAddr; diff --git a/sync/src/connection_filter.rs b/sync/src/connection_filter.rs index 63281748..54a5d41f 100644 --- a/sync/src/connection_filter.rs +++ b/sync/src/connection_filter.rs @@ -1,4 +1,4 @@ -#![allow(dead_code)] +#![allow(dead_code)] // TODO: remove after connecting with Client use bit_vec::BitVec; use murmur3::murmur3_32; @@ -7,16 +7,16 @@ use ser::serialize; use message::types; use script::Script; -/// Constant optimized to create large differences in the seed for different values of hash_functions_num. +/// Constant optimized to create large differences in the seed for different values of `hash_functions_num`. const SEED_OFFSET: u32 = 0xFBA4C795; /// Filter, which controls data relayed over connection. -#[derive(Debug, Default)] +#[derive(Debug)] pub struct ConnectionFilter { /// Bloom filter, if set. bloom: Option, /// Filter update type. - filter_flags: u8, + filter_flags: types::FilterFlags, } /// Connection bloom filter @@ -30,6 +30,15 @@ struct ConnectionBloom { tweak: u32, } +impl Default for ConnectionFilter { + fn default() -> Self { + ConnectionFilter { + bloom: None, + filter_flags: types::FilterFlags::None, + } + } +} + impl ConnectionFilter { #[cfg(test)] /// Create new connection with given filter params @@ -58,8 +67,8 @@ impl ConnectionFilter { if bloom.contains(instruction_data) { is_match = true; - let is_update_needed = self.filter_flags == 1 - || (self.filter_flags == 2 && (script.is_pay_to_public_key() || script.is_multisig_script())); + let is_update_needed = self.filter_flags == types::FilterFlags::All + || (self.filter_flags == types::FilterFlags::PubKeyOnly && (script.is_pay_to_public_key() || script.is_multisig_script())); if is_update_needed { bloom.insert(&serialize(&OutPoint { hash: transaction_hash.clone(), @@ -163,7 +172,7 @@ mod tests { use std::iter::{Iterator, repeat}; use test_data; use message::types; - use chain::{Transaction, RepresentH256}; + use chain::Transaction; use primitives::hash::H256; use primitives::bytes::Bytes; use super::{ConnectionFilter, ConnectionBloom}; @@ -173,7 +182,7 @@ mod tests { filter: Bytes::from(repeat(0u8).take(1024).collect::>()), hash_functions: 10, tweak: 5, - flags: 0, + flags: types::FilterFlags::None, } } diff --git a/sync/src/synchronization_server.rs b/sync/src/synchronization_server.rs index 63dc6006..67185ea9 100644 --- a/sync/src/synchronization_server.rs +++ b/sync/src/synchronization_server.rs @@ -365,7 +365,7 @@ impl Server for SynchronizationServer { fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option) -> Option { if let Some(best_common_block) = self.locate_known_block_header(message.block_locator_hashes) { trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash.to_reversed_str()); - let server_task_index = id.map_or_else(|| ServerTaskIndex::None, |id| ServerTaskIndex::Final(id)); + let server_task_index = id.map_or_else(|| ServerTaskIndex::None, ServerTaskIndex::Final); let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(best_common_block, message.hash_stop), server_task_index); Some(task) } From 98605ee7b3481209076c1aaac71b209ede9f9425 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Tue, 22 Nov 2016 18:14:43 +0300 Subject: [PATCH 3/3] newline at the end of file --- sync/src/connection_filter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sync/src/connection_filter.rs b/sync/src/connection_filter.rs index 54a5d41f..9cfeccbd 100644 --- a/sync/src/connection_filter.rs +++ b/sync/src/connection_filter.rs @@ -232,4 +232,4 @@ mod tests { fn connection_filter_matches_transaction_by_input_script_data_element() { // TODO } -} \ No newline at end of file +}