Merge pull request #172 from ethcore/sync_filter

Sync ConnectionFilter to support `filterload`, `filteradd` && `filterclear` inf future
This commit is contained in:
Nikolay Volf 2016-11-23 11:55:54 +03:00 committed by GitHub
commit a961bcdbed
8 changed files with 267 additions and 1 deletions

13
Cargo.lock generated
View File

@ -357,6 +357,14 @@ dependencies = [
"ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "murmur3"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "net2"
version = "0.2.26"
@ -640,6 +648,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
name = "sync"
version = "0.1.0"
dependencies = [
"bit-vec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"chain 0.1.0",
"db 0.1.0",
"ethcore-devtools 1.3.0",
@ -649,9 +658,12 @@ dependencies = [
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"message 0.1.0",
"miner 0.1.0",
"murmur3 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"p2p 0.1.0",
"parking_lot 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"primitives 0.1.0",
"script 0.1.0",
"serialization 0.1.0",
"test-data 0.1.0",
"time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.0 (git+https://github.com/debris/tokio-core)",
@ -803,6 +815,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d8b629fb514376c675b98c1421e80b151d3817ac42d7c667717d282761418d20"
"checksum mio 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "410a1a0ff76f5a226f1e4e3ff1756128e65cd30166e39c3892283e2ac09d5b67"
"checksum miow 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d5bfc6782530ac8ace97af10a540054a37126b63b0702ddaaa243b73b5745b9a"
"checksum murmur3 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ece7fe85164ffce69891c581b6b035a3c2f66dd3459f299d43d5efff90663e22"
"checksum net2 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)" = "5edf9cb6be97212423aed9413dd4729d62b370b5e1c571750e882cebbbc1e3e2"
"checksum nix 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a0d95c5fa8b641c10ad0b8887454ebaafa3c92b5cd5350f8fc693adafd178e7b"
"checksum nodrop 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "0dbbadd3f4c98dea0bd3d9b4be4c0cdaf1ab57035cb2e41fce3983db5add7cc5"

View File

@ -31,6 +31,7 @@ pub use self::blocktxn::BlockTxn;
pub use self::compactblock::CompactBlock;
pub use self::feefilter::FeeFilter;
pub use self::filterload::FilterLoad;
pub use self::filterload::FilterFlags;
pub use self::filterclear::FilterClear;
pub use self::filteradd::FilterAdd;
pub use self::getaddr::GetAddr;

View File

@ -14,6 +14,12 @@ impl Bytes {
}
}
impl<'a> From<&'a [u8]> for Bytes {
fn from(v: &[u8]) -> Self {
Bytes(v.into())
}
}
impl From<Vec<u8>> for Bytes {
fn from(v: Vec<u8>) -> Self {
Bytes(v)

View File

@ -12,6 +12,8 @@ futures-cpupool = "0.1"
tokio-core = { git = "https://github.com/debris/tokio-core" }
linked-hash-map = "0.3"
ethcore-devtools = { path = "../devtools" }
bit-vec = "0.4.3"
murmur3 = "0.3"
chain = { path = "../chain" }
db = { path = "../db" }
@ -19,6 +21,8 @@ message = { path = "../message" }
miner = { path = "../miner" }
p2p = { path = "../p2p" }
primitives = { path = "../primitives" }
script = { path = "../script" }
serialization = { path = "../serialization" }
test-data = { path = "../test-data" }
verification = { path = "../verification" }

View File

@ -0,0 +1,235 @@
#![allow(dead_code)] // TODO: remove after connecting with Client
use bit_vec::BitVec;
use murmur3::murmur3_32;
use chain::{Transaction, OutPoint};
use ser::serialize;
use message::types;
use script::Script;
/// Constant optimized to create large differences in the seed for different values of `hash_functions_num`.
const SEED_OFFSET: u32 = 0xFBA4C795;
/// Filter, which controls data relayed over connection.
#[derive(Debug)]
pub struct ConnectionFilter {
/// Bloom filter, if set.
bloom: Option<ConnectionBloom>,
/// Filter update type.
filter_flags: types::FilterFlags,
}
/// Connection bloom filter
#[derive(Debug)]
struct ConnectionBloom {
/// Filter storage.
filter: BitVec,
/// Number of hash functions to use in bloom filter.
hash_functions_num: u32,
/// Value to add to Murmur3 hash seed when calculating hash.
tweak: u32,
}
impl Default for ConnectionFilter {
fn default() -> Self {
ConnectionFilter {
bloom: None,
filter_flags: types::FilterFlags::None,
}
}
}
impl ConnectionFilter {
#[cfg(test)]
/// Create new connection with given filter params
pub fn with_filterload(message: &types::FilterLoad) -> Self {
ConnectionFilter {
bloom: Some(ConnectionBloom::new(message)),
filter_flags: message.flags,
}
}
/// Check if transaction is matched && update filter
pub fn match_update_transaction(&mut self, transaction: &Transaction) -> bool {
match self.bloom {
/// if no filter is set for the connection => match everything
None => true,
/// filter using bloom filter, then update
Some(ref mut bloom) => {
let transaction_hash = transaction.hash();
let mut is_match = false;
// match if filter contains any arbitrary script data element in any scriptPubKey in tx
for (output_index, output) in transaction.outputs.iter().enumerate() {
let script = Script::new(output.script_pubkey.clone());
for instruction in script.iter().filter_map(|i| i.ok()) {
if let Some(instruction_data) = instruction.data {
if bloom.contains(instruction_data) {
is_match = true;
let is_update_needed = self.filter_flags == types::FilterFlags::All
|| (self.filter_flags == types::FilterFlags::PubKeyOnly && (script.is_pay_to_public_key() || script.is_multisig_script()));
if is_update_needed {
bloom.insert(&serialize(&OutPoint {
hash: transaction_hash.clone(),
index: output_index as u32,
}));
}
}
}
}
}
if is_match {
return is_match;
}
// match if filter contains transaction itself
if bloom.contains(&*transaction_hash) {
return true;
}
// match if filter contains an outpoint this transaction spends
for input in &transaction.inputs {
// check if match previous output
let previous_output = serialize(&input.previous_output);
is_match = bloom.contains(&*previous_output);
if is_match {
return true;
}
// check if match any arbitrary script data element in any scriptSig in tx
let script = Script::new(input.script_sig.clone());
for instruction in script.iter().filter_map(|i| i.ok()) {
if let Some(instruction_data) = instruction.data {
is_match = bloom.contains(&*instruction_data);
if is_match {
return true;
}
}
}
}
// no matches
false
},
}
}
/// Load filter
pub fn load(&mut self, message: &types::FilterLoad) {
self.bloom = Some(ConnectionBloom::new(message));
self.filter_flags = message.flags;
}
/// Add filter
pub fn add(&mut self, message: &types::FilterAdd) {
// ignore if filter is not currently set
if let Some(ref mut bloom) = self.bloom {
bloom.insert(&message.data);
}
}
/// Clear filter
pub fn clear(&mut self) {
self.bloom = None;
}
}
impl ConnectionBloom {
/// Create with given parameters
pub fn new(message: &types::FilterLoad) -> Self {
ConnectionBloom {
filter: BitVec::from_bytes(&message.filter),
hash_functions_num: message.hash_functions,
tweak: message.tweak,
}
}
/// True if filter contains given bytes
pub fn contains(&self, data: &[u8]) -> bool {
for hash_function_idx in 0..self.hash_functions_num {
let murmur_seed = hash_function_idx.overflowing_mul(SEED_OFFSET).0.overflowing_add(self.tweak).0;
let murmur_hash = murmur3_32(&mut data.as_ref(), murmur_seed) as usize % self.filter.len();
if !self.filter.get(murmur_hash).expect("mod operation above") {
return false;
}
}
true
}
/// Add bytes to the filter
pub fn insert(&mut self, data: &[u8]) {
for hash_function_idx in 0..self.hash_functions_num {
let murmur_seed = hash_function_idx.overflowing_mul(SEED_OFFSET).0.overflowing_add(self.tweak).0;
let murmur_hash = murmur3_32(&mut data.as_ref(), murmur_seed) as usize % self.filter.len();
self.filter.set(murmur_hash, true);
}
}
}
#[cfg(test)]
mod tests {
use std::iter::{Iterator, repeat};
use test_data;
use message::types;
use chain::Transaction;
use primitives::hash::H256;
use primitives::bytes::Bytes;
use super::{ConnectionFilter, ConnectionBloom};
fn default_filterload() -> types::FilterLoad {
types::FilterLoad {
filter: Bytes::from(repeat(0u8).take(1024).collect::<Vec<_>>()),
hash_functions: 10,
tweak: 5,
flags: types::FilterFlags::None,
}
}
fn make_filteradd(data: &[u8]) -> types::FilterAdd {
types::FilterAdd {
data: data.into(),
}
}
#[test]
fn bloom_insert_data() {
let mut bloom = ConnectionBloom::new(&default_filterload());
assert!(!bloom.contains(&*H256::default()));
bloom.insert(&*H256::default());
assert!(bloom.contains(&*H256::default()));
}
#[test]
fn connection_filter_matches_transaction_by_hash() {
let tx1: Transaction = test_data::TransactionBuilder::with_output(10).into();
let tx2: Transaction = test_data::TransactionBuilder::with_output(20).into();
let mut filter = ConnectionFilter::with_filterload(&default_filterload());
assert!(!filter.match_update_transaction(&tx1));
assert!(!filter.match_update_transaction(&tx2));
filter.add(&make_filteradd(&*tx1.hash()));
assert!(filter.match_update_transaction(&tx1));
assert!(!filter.match_update_transaction(&tx2));
}
#[test]
fn connection_filter_matches_transaction_by_output_script_data_element() {
// TODO
}
#[test]
fn connection_filter_matches_transaction_by_previous_output_point() {
// TODO
}
#[test]
fn connection_filter_matches_transaction_by_input_script_data_element() {
// TODO
}
}

View File

@ -9,16 +9,21 @@ extern crate message;
extern crate p2p;
extern crate parking_lot;
extern crate linked_hash_map;
extern crate bit_vec;
extern crate murmur3;
extern crate primitives;
extern crate test_data;
extern crate time;
extern crate verification;
extern crate miner;
extern crate script;
extern crate serialization as ser;
#[cfg(test)]
extern crate ethcore_devtools as devtools;
mod best_headers_chain;
mod blocks_writer;
mod connection_filter;
mod hash_queue;
mod inbound_connection;
mod inbound_connection_factory;

View File

@ -11,6 +11,8 @@ use synchronization_server::{Server, SynchronizationServer};
use synchronization_verifier::AsyncVerifier;
use primitives::hash::H256;
// TODO: check messages before processing (filterload' filter is max 36000, nHashFunc is <= 50, etc)
pub type LocalNodeRef = Arc<LocalNode<LocalSynchronizationTaskExecutor, SynchronizationServer, SynchronizationClient<LocalSynchronizationTaskExecutor, AsyncVerifier>>>;
/// Local synchronization node

View File

@ -365,7 +365,7 @@ impl Server for SynchronizationServer {
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option<u32>) -> Option<IndexedServerTask> {
if let Some(best_common_block) = self.locate_known_block_header(message.block_locator_hashes) {
trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash.to_reversed_str());
let server_task_index = id.map_or_else(|| ServerTaskIndex::None, |id| ServerTaskIndex::Final(id));
let server_task_index = id.map_or_else(|| ServerTaskIndex::None, ServerTaskIndex::Final);
let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(best_common_block, message.hash_stop), server_task_index);
Some(task)
}