diff --git a/Cargo.lock b/Cargo.lock index 993b20fd..8489662d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -357,6 +357,14 @@ dependencies = [ "ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "murmur3" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "net2" version = "0.2.26" @@ -640,6 +648,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" name = "sync" version = "0.1.0" dependencies = [ + "bit-vec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "chain 0.1.0", "db 0.1.0", "ethcore-devtools 1.3.0", @@ -649,9 +658,12 @@ dependencies = [ "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "message 0.1.0", "miner 0.1.0", + "murmur3 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "p2p 0.1.0", "parking_lot 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "primitives 0.1.0", + "script 0.1.0", + "serialization 0.1.0", "test-data 0.1.0", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.0 (git+https://github.com/debris/tokio-core)", @@ -803,6 +815,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d8b629fb514376c675b98c1421e80b151d3817ac42d7c667717d282761418d20" "checksum mio 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "410a1a0ff76f5a226f1e4e3ff1756128e65cd30166e39c3892283e2ac09d5b67" "checksum miow 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d5bfc6782530ac8ace97af10a540054a37126b63b0702ddaaa243b73b5745b9a" +"checksum murmur3 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ece7fe85164ffce69891c581b6b035a3c2f66dd3459f299d43d5efff90663e22" "checksum net2 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)" = "5edf9cb6be97212423aed9413dd4729d62b370b5e1c571750e882cebbbc1e3e2" "checksum nix 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a0d95c5fa8b641c10ad0b8887454ebaafa3c92b5cd5350f8fc693adafd178e7b" "checksum nodrop 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "0dbbadd3f4c98dea0bd3d9b4be4c0cdaf1ab57035cb2e41fce3983db5add7cc5" diff --git a/message/src/types/mod.rs b/message/src/types/mod.rs index 914c0818..d6c0e865 100644 --- a/message/src/types/mod.rs +++ b/message/src/types/mod.rs @@ -31,6 +31,7 @@ pub use self::blocktxn::BlockTxn; pub use self::compactblock::CompactBlock; pub use self::feefilter::FeeFilter; pub use self::filterload::FilterLoad; +pub use self::filterload::FilterFlags; pub use self::filterclear::FilterClear; pub use self::filteradd::FilterAdd; pub use self::getaddr::GetAddr; diff --git a/primitives/src/bytes.rs b/primitives/src/bytes.rs index 17e7d133..37958e75 100644 --- a/primitives/src/bytes.rs +++ b/primitives/src/bytes.rs @@ -14,6 +14,12 @@ impl Bytes { } } +impl<'a> From<&'a [u8]> for Bytes { + fn from(v: &[u8]) -> Self { + Bytes(v.into()) + } +} + impl From> for Bytes { fn from(v: Vec) -> Self { Bytes(v) diff --git a/sync/Cargo.toml b/sync/Cargo.toml index c97deb5d..d57b7fa5 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -12,6 +12,8 @@ futures-cpupool = "0.1" tokio-core = { git = "https://github.com/debris/tokio-core" } linked-hash-map = "0.3" ethcore-devtools = { path = "../devtools" } +bit-vec = "0.4.3" +murmur3 = "0.3" chain = { path = "../chain" } db = { path = "../db" } @@ -19,6 +21,8 @@ message = { path = "../message" } miner = { path = "../miner" } p2p = { path = "../p2p" } primitives = { path = "../primitives" } +script = { path = "../script" } +serialization = { path = "../serialization" } test-data = { path = "../test-data" } verification = { path = "../verification" } diff --git a/sync/src/connection_filter.rs b/sync/src/connection_filter.rs new file mode 100644 index 00000000..9cfeccbd --- /dev/null +++ b/sync/src/connection_filter.rs @@ -0,0 +1,235 @@ +#![allow(dead_code)] // TODO: remove after connecting with Client + +use bit_vec::BitVec; +use murmur3::murmur3_32; +use chain::{Transaction, OutPoint}; +use ser::serialize; +use message::types; +use script::Script; + +/// Constant optimized to create large differences in the seed for different values of `hash_functions_num`. +const SEED_OFFSET: u32 = 0xFBA4C795; + +/// Filter, which controls data relayed over connection. +#[derive(Debug)] +pub struct ConnectionFilter { + /// Bloom filter, if set. + bloom: Option, + /// Filter update type. + filter_flags: types::FilterFlags, +} + +/// Connection bloom filter +#[derive(Debug)] +struct ConnectionBloom { + /// Filter storage. + filter: BitVec, + /// Number of hash functions to use in bloom filter. + hash_functions_num: u32, + /// Value to add to Murmur3 hash seed when calculating hash. + tweak: u32, +} + +impl Default for ConnectionFilter { + fn default() -> Self { + ConnectionFilter { + bloom: None, + filter_flags: types::FilterFlags::None, + } + } +} + +impl ConnectionFilter { + #[cfg(test)] + /// Create new connection with given filter params + pub fn with_filterload(message: &types::FilterLoad) -> Self { + ConnectionFilter { + bloom: Some(ConnectionBloom::new(message)), + filter_flags: message.flags, + } + } + + /// Check if transaction is matched && update filter + pub fn match_update_transaction(&mut self, transaction: &Transaction) -> bool { + match self.bloom { + /// if no filter is set for the connection => match everything + None => true, + /// filter using bloom filter, then update + Some(ref mut bloom) => { + let transaction_hash = transaction.hash(); + let mut is_match = false; + + // match if filter contains any arbitrary script data element in any scriptPubKey in tx + for (output_index, output) in transaction.outputs.iter().enumerate() { + let script = Script::new(output.script_pubkey.clone()); + for instruction in script.iter().filter_map(|i| i.ok()) { + if let Some(instruction_data) = instruction.data { + if bloom.contains(instruction_data) { + is_match = true; + + let is_update_needed = self.filter_flags == types::FilterFlags::All + || (self.filter_flags == types::FilterFlags::PubKeyOnly && (script.is_pay_to_public_key() || script.is_multisig_script())); + if is_update_needed { + bloom.insert(&serialize(&OutPoint { + hash: transaction_hash.clone(), + index: output_index as u32, + })); + } + } + } + } + } + if is_match { + return is_match; + } + + // match if filter contains transaction itself + if bloom.contains(&*transaction_hash) { + return true; + } + + // match if filter contains an outpoint this transaction spends + for input in &transaction.inputs { + // check if match previous output + let previous_output = serialize(&input.previous_output); + is_match = bloom.contains(&*previous_output); + if is_match { + return true; + } + + // check if match any arbitrary script data element in any scriptSig in tx + let script = Script::new(input.script_sig.clone()); + for instruction in script.iter().filter_map(|i| i.ok()) { + if let Some(instruction_data) = instruction.data { + is_match = bloom.contains(&*instruction_data); + if is_match { + return true; + } + } + } + } + + // no matches + false + }, + } + } + + /// Load filter + pub fn load(&mut self, message: &types::FilterLoad) { + self.bloom = Some(ConnectionBloom::new(message)); + self.filter_flags = message.flags; + } + + /// Add filter + pub fn add(&mut self, message: &types::FilterAdd) { + // ignore if filter is not currently set + if let Some(ref mut bloom) = self.bloom { + bloom.insert(&message.data); + } + } + + /// Clear filter + pub fn clear(&mut self) { + self.bloom = None; + } +} + +impl ConnectionBloom { + /// Create with given parameters + pub fn new(message: &types::FilterLoad) -> Self { + ConnectionBloom { + filter: BitVec::from_bytes(&message.filter), + hash_functions_num: message.hash_functions, + tweak: message.tweak, + } + } + + /// True if filter contains given bytes + pub fn contains(&self, data: &[u8]) -> bool { + for hash_function_idx in 0..self.hash_functions_num { + let murmur_seed = hash_function_idx.overflowing_mul(SEED_OFFSET).0.overflowing_add(self.tweak).0; + let murmur_hash = murmur3_32(&mut data.as_ref(), murmur_seed) as usize % self.filter.len(); + if !self.filter.get(murmur_hash).expect("mod operation above") { + return false; + } + } + true + } + + /// Add bytes to the filter + pub fn insert(&mut self, data: &[u8]) { + for hash_function_idx in 0..self.hash_functions_num { + let murmur_seed = hash_function_idx.overflowing_mul(SEED_OFFSET).0.overflowing_add(self.tweak).0; + let murmur_hash = murmur3_32(&mut data.as_ref(), murmur_seed) as usize % self.filter.len(); + self.filter.set(murmur_hash, true); + } + } +} + +#[cfg(test)] +mod tests { + use std::iter::{Iterator, repeat}; + use test_data; + use message::types; + use chain::Transaction; + use primitives::hash::H256; + use primitives::bytes::Bytes; + use super::{ConnectionFilter, ConnectionBloom}; + + fn default_filterload() -> types::FilterLoad { + types::FilterLoad { + filter: Bytes::from(repeat(0u8).take(1024).collect::>()), + hash_functions: 10, + tweak: 5, + flags: types::FilterFlags::None, + } + } + + fn make_filteradd(data: &[u8]) -> types::FilterAdd { + types::FilterAdd { + data: data.into(), + } + } + + #[test] + fn bloom_insert_data() { + let mut bloom = ConnectionBloom::new(&default_filterload()); + + assert!(!bloom.contains(&*H256::default())); + + bloom.insert(&*H256::default()); + assert!(bloom.contains(&*H256::default())); + } + + #[test] + fn connection_filter_matches_transaction_by_hash() { + let tx1: Transaction = test_data::TransactionBuilder::with_output(10).into(); + let tx2: Transaction = test_data::TransactionBuilder::with_output(20).into(); + + let mut filter = ConnectionFilter::with_filterload(&default_filterload()); + + assert!(!filter.match_update_transaction(&tx1)); + assert!(!filter.match_update_transaction(&tx2)); + + filter.add(&make_filteradd(&*tx1.hash())); + + assert!(filter.match_update_transaction(&tx1)); + assert!(!filter.match_update_transaction(&tx2)); + } + + #[test] + fn connection_filter_matches_transaction_by_output_script_data_element() { + // TODO + } + + #[test] + fn connection_filter_matches_transaction_by_previous_output_point() { + // TODO + } + + #[test] + fn connection_filter_matches_transaction_by_input_script_data_element() { + // TODO + } +} diff --git a/sync/src/lib.rs b/sync/src/lib.rs index bcf96928..29c6c50b 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -9,16 +9,21 @@ extern crate message; extern crate p2p; extern crate parking_lot; extern crate linked_hash_map; +extern crate bit_vec; +extern crate murmur3; extern crate primitives; extern crate test_data; extern crate time; extern crate verification; extern crate miner; +extern crate script; +extern crate serialization as ser; #[cfg(test)] extern crate ethcore_devtools as devtools; mod best_headers_chain; mod blocks_writer; +mod connection_filter; mod hash_queue; mod inbound_connection; mod inbound_connection_factory; diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index 64306c1a..f97e1c61 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -11,6 +11,8 @@ use synchronization_server::{Server, SynchronizationServer}; use synchronization_verifier::AsyncVerifier; use primitives::hash::H256; +// TODO: check messages before processing (filterload' filter is max 36000, nHashFunc is <= 50, etc) + pub type LocalNodeRef = Arc>>; /// Local synchronization node diff --git a/sync/src/synchronization_server.rs b/sync/src/synchronization_server.rs index 63dc6006..67185ea9 100644 --- a/sync/src/synchronization_server.rs +++ b/sync/src/synchronization_server.rs @@ -365,7 +365,7 @@ impl Server for SynchronizationServer { fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option) -> Option { if let Some(best_common_block) = self.locate_known_block_header(message.block_locator_hashes) { trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash.to_reversed_str()); - let server_task_index = id.map_or_else(|| ServerTaskIndex::None, |id| ServerTaskIndex::Final(id)); + let server_task_index = id.map_or_else(|| ServerTaskIndex::None, ServerTaskIndex::Final); let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(best_common_block, message.hash_stop), server_task_index); Some(task) }