diff --git a/Cargo.lock b/Cargo.lock index 693c51c6..b8e3fc97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -324,6 +324,7 @@ name = "miner" version = "0.1.0" dependencies = [ "chain 0.1.0", + "db 0.1.0", "heapsize 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "primitives 0.1.0", "serialization 0.1.0", diff --git a/miner/Cargo.toml b/miner/Cargo.toml index a2beeeec..463ea252 100644 --- a/miner/Cargo.toml +++ b/miner/Cargo.toml @@ -6,6 +6,7 @@ authors = ["Ethcore "] [dependencies] heapsize = "0.3" chain = { path = "../chain" } +db = { path = "../db" } primitives = { path = "../primitives" } serialization = { path = "../serialization" } test-data = { path = "../test-data" } diff --git a/miner/src/fee.rs b/miner/src/fee.rs new file mode 100644 index 00000000..783d3b0b --- /dev/null +++ b/miner/src/fee.rs @@ -0,0 +1,64 @@ +use chain::Transaction; +use db::SharedStore; + +// TODO: &TransactionProvider after AsTransactionProvider is done +pub fn transaction_fee(store: SharedStore, transaction: &Transaction) -> u64 { + let inputs_sum = transaction.inputs.iter() + .fold(0, |accumulator, input| { + let input_transaction = store.transaction(&input.previous_output.hash) + .expect("transaction must be verified by caller"); + accumulator + input_transaction.outputs[input.previous_output.index as usize].value + }); + let outputs_sum = transaction.outputs.iter() + .fold(0, |accumulator, output| accumulator + output.value); + inputs_sum.saturating_sub(outputs_sum) +} + +// TODO: &TransactionProvider after AsTransactionProvider is done +pub fn transaction_fee_rate(store: SharedStore, transaction: &Transaction) -> u64 { + use ser::Serializable; + + transaction_fee(store, transaction) / transaction.serialized_size() as u64 +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use db::TestStorage; + use test_data; + use super::*; + + #[test] + fn test_transaction_fee() { + let b0 = test_data::block_builder().header().nonce(1).build() + .transaction() + .output().value(1_000_000).build() + .build() + .transaction() + .output().value(2_000_000).build() + .build() + .build(); + let tx0 = b0.transactions[0].clone(); + let tx0_hash = tx0.hash(); + let tx1 = b0.transactions[1].clone(); + let tx1_hash = tx1.hash(); + let b1 = test_data::block_builder().header().nonce(2).build() + .transaction() + .input().hash(tx0_hash).index(0).build() + .input().hash(tx1_hash).index(0).build() + .output().value(2_500_000).build() + .build() + .build(); + let tx2 = b1.transactions[0].clone(); + + let db = Arc::new(TestStorage::with_blocks(&vec![b0, b1])); + + assert_eq!(transaction_fee(db.clone(), &tx0), 0); + assert_eq!(transaction_fee(db.clone(), &tx1), 0); + assert_eq!(transaction_fee(db.clone(), &tx2), 500_000); + + assert_eq!(transaction_fee_rate(db.clone(), &tx0), 0); + assert_eq!(transaction_fee_rate(db.clone(), &tx1), 0); + assert_eq!(transaction_fee_rate(db.clone(), &tx2), 4_950); + } +} diff --git a/miner/src/lib.rs b/miner/src/lib.rs index 130a9b98..f81836da 100644 --- a/miner/src/lib.rs +++ b/miner/src/lib.rs @@ -1,9 +1,12 @@ extern crate chain; +extern crate db; extern crate heapsize; extern crate primitives; extern crate serialization as ser; extern crate test_data; +mod fee; mod memory_pool; +pub use fee::{transaction_fee, transaction_fee_rate}; pub use memory_pool::{MemoryPool, Information as MemoryPoolInformation, OrderingStrategy as MemoryPoolOrderingStrategy}; diff --git a/sync/src/connection_filter.rs b/sync/src/connection_filter.rs index 4e959a0c..5a1e68aa 100644 --- a/sync/src/connection_filter.rs +++ b/sync/src/connection_filter.rs @@ -27,6 +27,8 @@ pub struct ConnectionFilter { last_blocks: LinkedHashMap, /// Last transactions from peer. last_transactions: LinkedHashMap, + /// Minimal fee in satoshis per 1000 bytes + fee_rate: Option, } /// Connection bloom filter @@ -70,6 +72,7 @@ impl Default for ConnectionFilter { filter_flags: types::FilterFlags::None, last_blocks: LinkedHashMap::new(), last_transactions: LinkedHashMap::new(), + fee_rate: None, } } } @@ -83,6 +86,7 @@ impl ConnectionFilter { filter_flags: message.flags, last_blocks: LinkedHashMap::new(), last_transactions: LinkedHashMap::new(), + fee_rate: None, } } @@ -119,12 +123,104 @@ impl ConnectionFilter { } /// Check if transaction should be sent to this connection && optionally update filter - pub fn filter_transaction(&mut self, transaction_hash: &H256, transaction: &Transaction) -> bool { + pub fn filter_transaction(&mut self, transaction_hash: &H256, transaction: &Transaction, transaction_fee_rate: u64) -> bool { // check if transaction is known if self.last_transactions.contains_key(transaction_hash) { return false; } + // check if transaction fee rate is high enough for this peer + if let Some(fee_rate) = self.fee_rate { + if transaction_fee_rate < fee_rate { + return false; + } + } + + // check with bloom filter, if set + self.filter_transaction_with_bloom(transaction_hash, transaction) + } + + /// Load filter + pub fn load(&mut self, message: &types::FilterLoad) { + self.bloom = Some(ConnectionBloom::new(message)); + self.filter_flags = message.flags; + } + + /// Add filter + pub fn add(&mut self, message: &types::FilterAdd) { + // ignore if filter is not currently set + if let Some(ref mut bloom) = self.bloom { + bloom.insert(&message.data); + } + } + + /// Clear filter + pub fn clear(&mut self) { + self.bloom = None; + } + + /// Limit transaction announcing by transaction fee + pub fn set_fee_rate(&mut self, fee_rate: u64) { + if fee_rate == 0 { + self.fee_rate = None; + } + else { + self.fee_rate = Some(fee_rate); + } + } + + /// Convert `Block` to `MerkleBlock` using this filter + pub fn build_merkle_block(&mut self, block: Block) -> Option { + if self.bloom.is_none() { + return None; + } + + // prepare result + let all_len = block.transactions.len(); + let mut result = MerkleBlockArtefacts { + merkleblock: types::MerkleBlock { + block_header: block.block_header.clone(), + total_transactions: all_len as u32, + hashes: Vec::default(), + flags: Bytes::default(), + }, + matching_transactions: Vec::new(), + }; + + // calculate hashes && match flags for all transactions + let (all_hashes, all_flags) = block.transactions.into_iter() + .fold((Vec::::with_capacity(all_len), BitVec::with_capacity(all_len)), |(mut all_hashes, mut all_flags), t| { + let hash = t.hash(); + let flag = self.filter_transaction_with_bloom(&hash, &t); + if flag { + result.matching_transactions.push((hash.clone(), t)); + } + + all_flags.push(flag); + all_hashes.push(hash); + (all_hashes, all_flags) + }); + + // build partial merkle tree + let (hashes, flags) = PartialMerkleTree::build(all_hashes, all_flags); + result.merkleblock.hashes.extend(hashes); + // to_bytes() converts [true, false, true] to 0b10100000 + // while protocol requires [true, false, true] to be serialized as 0x00000101 + result.merkleblock.flags = flags.to_bytes().into_iter() + .map(|b| + ((b & 0b10000000) >> 7) | + ((b & 0b01000000) >> 5) | + ((b & 0b00100000) >> 3) | + ((b & 0b00010000) >> 1) | + ((b & 0b00001000) << 1) | + ((b & 0b00000100) << 3) | + ((b & 0b00000010) << 5) | + ((b & 0b00000001) << 7)).collect::>().into(); + Some(result) + } + + /// Check if transaction should be sent to this connection using bloom filter && optionally update filter + fn filter_transaction_with_bloom(&mut self, transaction_hash: &H256, transaction: &Transaction) -> bool { // check with bloom filter, if set match self.bloom { /// if no filter is set for the connection => match everything @@ -188,75 +284,6 @@ impl ConnectionFilter { }, } } - - /// Load filter - pub fn load(&mut self, message: &types::FilterLoad) { - self.bloom = Some(ConnectionBloom::new(message)); - self.filter_flags = message.flags; - } - - /// Add filter - pub fn add(&mut self, message: &types::FilterAdd) { - // ignore if filter is not currently set - if let Some(ref mut bloom) = self.bloom { - bloom.insert(&message.data); - } - } - - /// Clear filter - pub fn clear(&mut self) { - self.bloom = None; - } - - /// Convert `Block` to `MerkleBlock` using this filter - pub fn build_merkle_block(&mut self, block: Block) -> Option { - if self.bloom.is_none() { - return None; - } - - // prepare result - let all_len = block.transactions.len(); - let mut result = MerkleBlockArtefacts { - merkleblock: types::MerkleBlock { - block_header: block.block_header.clone(), - total_transactions: all_len as u32, - hashes: Vec::default(), - flags: Bytes::default(), - }, - matching_transactions: Vec::new(), - }; - - // calculate hashes && match flags for all transactions - let (all_hashes, all_flags) = block.transactions.into_iter() - .fold((Vec::::with_capacity(all_len), BitVec::with_capacity(all_len)), |(mut all_hashes, mut all_flags), t| { - let hash = t.hash(); - let flag = self.filter_transaction(&hash, &t); - if flag { - result.matching_transactions.push((hash.clone(), t)); - } - - all_flags.push(flag); - all_hashes.push(hash); - (all_hashes, all_flags) - }); - - // build partial merkle tree - let (hashes, flags) = PartialMerkleTree::build(all_hashes, all_flags); - result.merkleblock.hashes.extend(hashes); - // to_bytes() converts [true, false, true] to 0b10100000 - // while protocol requires [true, false, true] to be serialized as 0x00000101 - result.merkleblock.flags = flags.to_bytes().into_iter() - .map(|b| - ((b & 0b10000000) >> 7) | - ((b & 0b01000000) >> 5) | - ((b & 0b00100000) >> 3) | - ((b & 0b00010000) >> 1) | - ((b & 0b00001000) << 1) | - ((b & 0b00000100) << 3) | - ((b & 0b00000010) << 5) | - ((b & 0b00000001) << 7)).collect::>().into(); - Some(result) - } } impl ConnectionBloom { @@ -493,13 +520,13 @@ pub mod tests { let mut filter = ConnectionFilter::with_filterload(&default_filterload()); - assert!(!filter.filter_transaction(&tx1.hash(), &tx1)); - assert!(!filter.filter_transaction(&tx2.hash(), &tx2)); + assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000)); + assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000)); filter.add(&make_filteradd(&*tx1.hash())); - assert!(filter.filter_transaction(&tx1.hash(), &tx1)); - assert!(!filter.filter_transaction(&tx2.hash(), &tx2)); + assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000)); + assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000)); } #[test] @@ -512,13 +539,13 @@ pub mod tests { let mut filter = ConnectionFilter::with_filterload(&default_filterload()); - assert!(!filter.filter_transaction(&tx1.hash(), &tx1)); - assert!(!filter.filter_transaction(&tx2.hash(), &tx2)); + assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000)); + assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000)); filter.add(&make_filteradd(&tx1_out_data)); - assert!(filter.filter_transaction(&tx1.hash(), &tx1)); - assert!(!filter.filter_transaction(&tx2.hash(), &tx2)); + assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000)); + assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000)); } #[test] @@ -530,13 +557,13 @@ pub mod tests { let mut filter = ConnectionFilter::with_filterload(&default_filterload()); - assert!(!filter.filter_transaction(&tx1.hash(), &tx1)); - assert!(!filter.filter_transaction(&tx2.hash(), &tx2)); + assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000)); + assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000)); filter.add(&make_filteradd(&tx1_previous_output)); - assert!(filter.filter_transaction(&tx1.hash(), &tx1)); - assert!(!filter.filter_transaction(&tx2.hash(), &tx2)); + assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000)); + assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000)); } #[test] @@ -549,13 +576,39 @@ pub mod tests { let mut filter = ConnectionFilter::with_filterload(&default_filterload()); - assert!(!filter.filter_transaction(&tx1.hash(), &tx1)); - assert!(!filter.filter_transaction(&tx2.hash(), &tx2)); + assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000)); + assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000)); filter.add(&make_filteradd(&tx1_input_data)); - assert!(filter.filter_transaction(&tx1.hash(), &tx1)); - assert!(!filter.filter_transaction(&tx2.hash(), &tx2)); + assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000)); + assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000)); + } + + #[test] + fn connection_filter_matches_transaction_by_fee_rate() { + let tx1: Transaction = test_data::TransactionBuilder::with_version(1).into(); + let tx2: Transaction = test_data::TransactionBuilder::with_version(2).into(); + + let mut filter = ConnectionFilter::default(); + + assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000)); + assert!(filter.filter_transaction(&tx2.hash(), &tx2, 2000)); + + filter.set_fee_rate(1500); + + assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000)); + assert!(filter.filter_transaction(&tx2.hash(), &tx2, 2000)); + + filter.set_fee_rate(3000); + + assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000)); + assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 2000)); + + filter.set_fee_rate(0); + + assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000)); + assert!(filter.filter_transaction(&tx2.hash(), &tx2, 2000)); } #[test] diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index 39bd4797..f80017fd 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -186,8 +186,9 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon self.client.lock().on_peer_sendheaders(peer_index); } - pub fn on_peer_feefilter(&self, peer_index: usize, _message: types::FeeFilter) { + pub fn on_peer_feefilter(&self, peer_index: usize, message: types::FeeFilter) { trace!(target: "sync", "Got `feefilter` message from peer#{}", peer_index); + self.client.lock().on_peer_feefilter(peer_index, &message); } pub fn on_peer_send_compact(&self, peer_index: usize, _message: types::SendCompact) { diff --git a/sync/src/synchronization_chain.rs b/sync/src/synchronization_chain.rs index 45c312a5..14904a9a 100644 --- a/sync/src/synchronization_chain.rs +++ b/sync/src/synchronization_chain.rs @@ -182,6 +182,11 @@ impl Chain { self.storage.clone() } + /// Get storage, which contains all storage transaction && mempool transactions + // pub fn mempool_transaction_storage() -> db::SharedStore { + // TODO: implement TransactionProvider Storage + MemoryPool + //} + /// Get number of blocks in given state pub fn length_of_blocks_state(&self, state: BlockState) -> u32 { match state { diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index ae29cce2..4d1f153d 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -195,6 +195,7 @@ pub trait Client : Send + 'static { fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd); fn on_peer_filterclear(&mut self, peer_index: usize); fn on_peer_sendheaders(&mut self, peer_index: usize); + fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter); fn on_peer_disconnected(&mut self, peer_index: usize); fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>); } @@ -215,6 +216,7 @@ pub trait ClientCore : VerificationSink { fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd); fn on_peer_filterclear(&mut self, peer_index: usize); fn on_peer_sendheaders(&mut self, peer_index: usize); + fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter); fn on_peer_disconnected(&mut self, peer_index: usize); fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>); fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option>); @@ -401,6 +403,10 @@ impl Client for SynchronizationClient where T: TaskExecutor, U: Veri self.core.lock().on_peer_sendheaders(peer_index); } + fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter) { + self.core.lock().on_peer_feefilter(peer_index, message); + } + fn on_peer_disconnected(&mut self, peer_index: usize) { self.core.lock().on_peer_disconnected(peer_index); } @@ -637,6 +643,13 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { } } + /// Peer wants to limit transaction announcing by transaction fee + fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter) { + if self.peers.is_known_peer(peer_index) { + self.peers.on_peer_feefilter(peer_index, message.fee_rate); + } + } + /// Peer disconnected. fn on_peer_disconnected(&mut self, peer_index: usize) { // when last peer is disconnected, reset, but let verifying blocks be verified @@ -823,7 +836,7 @@ impl VerificationSink for SynchronizationClientCore where T: TaskExecutor fn on_transaction_verification_success(&mut self, transaction: Transaction) { let hash = transaction.hash(); - { + let transaction_fee_rate = { // insert transaction to the memory pool let mut chain = self.chain.write(); @@ -835,10 +848,17 @@ impl VerificationSink for SynchronizationClientCore where T: TaskExecutor // transaction was in verification queue => insert to memory pool chain.insert_verified_transaction(transaction.clone()); - } + + // calculate transaction fee rate + // TODO: uncomment me: after implementation + // use miner::transaction_fee_rate; + // transaction_fee_rate(chain.mempool_transaction_storage(), &transaction) + use std::u64::MAX; + MAX + }; // relay transaction to peers - self.relay_new_transactions(vec![(hash, &transaction)]); + self.relay_new_transactions(vec![(hash, &transaction, transaction_fee_rate)]); } /// Process failed transaction verification @@ -971,12 +991,14 @@ impl SynchronizationClientCore where T: TaskExecutor { } /// Relay new transactions - fn relay_new_transactions(&mut self, new_transactions: Vec<(H256, &Transaction)>) { + fn relay_new_transactions(&mut self, new_transactions: Vec<(H256, &Transaction, u64)>) { let tasks: Vec<_> = self.peers.all_peers().into_iter() .filter_map(|peer_index| { let inventory: Vec<_> = new_transactions.iter() - .filter(|&&(ref h, tx)| self.peers.filter_mut(peer_index).filter_transaction(h, tx)) - .map(|&(ref h, _)| InventoryVector { + .filter(|&&(ref h, tx, tx_fee_rate)| { + self.peers.filter_mut(peer_index).filter_transaction(h, tx, tx_fee_rate) + }) + .map(|&(ref h, _, _)| InventoryVector { inv_type: InventoryType::MessageTx, hash: h.clone(), }) diff --git a/sync/src/synchronization_peers.rs b/sync/src/synchronization_peers.rs index 9bc64742..9c996b68 100644 --- a/sync/src/synchronization_peers.rs +++ b/sync/src/synchronization_peers.rs @@ -198,6 +198,11 @@ impl Peers { self.send_headers.insert(peer_index); } + /// Peer wants to limit transaction announcing by transaction fee + pub fn on_peer_feefilter(&mut self, peer_index: usize, fee_rate: u64) { + self.filter_mut(peer_index).set_fee_rate(fee_rate); + } + /// Peer has been disconnected pub fn on_peer_disconnected(&mut self, peer_index: usize) -> Option> { // forget this peer without any chances to reuse