started work on feefilter message

This commit is contained in:
Svyatoslav Nikolsky 2016-11-25 09:07:29 +03:00
parent 51e3017f55
commit eb23a7d901
9 changed files with 248 additions and 93 deletions

1
Cargo.lock generated
View File

@ -324,6 +324,7 @@ name = "miner"
version = "0.1.0"
dependencies = [
"chain 0.1.0",
"db 0.1.0",
"heapsize 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"primitives 0.1.0",
"serialization 0.1.0",

View File

@ -6,6 +6,7 @@ authors = ["Ethcore <admin@ethcore.io>"]
[dependencies]
heapsize = "0.3"
chain = { path = "../chain" }
db = { path = "../db" }
primitives = { path = "../primitives" }
serialization = { path = "../serialization" }
test-data = { path = "../test-data" }

64
miner/src/fee.rs Normal file
View File

@ -0,0 +1,64 @@
use chain::Transaction;
use db::SharedStore;
// TODO: &TransactionProvider after AsTransactionProvider is done
pub fn transaction_fee(store: SharedStore, transaction: &Transaction) -> u64 {
let inputs_sum = transaction.inputs.iter()
.fold(0, |accumulator, input| {
let input_transaction = store.transaction(&input.previous_output.hash)
.expect("transaction must be verified by caller");
accumulator + input_transaction.outputs[input.previous_output.index as usize].value
});
let outputs_sum = transaction.outputs.iter()
.fold(0, |accumulator, output| accumulator + output.value);
inputs_sum.saturating_sub(outputs_sum)
}
// TODO: &TransactionProvider after AsTransactionProvider is done
pub fn transaction_fee_rate(store: SharedStore, transaction: &Transaction) -> u64 {
use ser::Serializable;
transaction_fee(store, transaction) / transaction.serialized_size() as u64
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use db::TestStorage;
use test_data;
use super::*;
#[test]
fn test_transaction_fee() {
let b0 = test_data::block_builder().header().nonce(1).build()
.transaction()
.output().value(1_000_000).build()
.build()
.transaction()
.output().value(2_000_000).build()
.build()
.build();
let tx0 = b0.transactions[0].clone();
let tx0_hash = tx0.hash();
let tx1 = b0.transactions[1].clone();
let tx1_hash = tx1.hash();
let b1 = test_data::block_builder().header().nonce(2).build()
.transaction()
.input().hash(tx0_hash).index(0).build()
.input().hash(tx1_hash).index(0).build()
.output().value(2_500_000).build()
.build()
.build();
let tx2 = b1.transactions[0].clone();
let db = Arc::new(TestStorage::with_blocks(&vec![b0, b1]));
assert_eq!(transaction_fee(db.clone(), &tx0), 0);
assert_eq!(transaction_fee(db.clone(), &tx1), 0);
assert_eq!(transaction_fee(db.clone(), &tx2), 500_000);
assert_eq!(transaction_fee_rate(db.clone(), &tx0), 0);
assert_eq!(transaction_fee_rate(db.clone(), &tx1), 0);
assert_eq!(transaction_fee_rate(db.clone(), &tx2), 4_950);
}
}

View File

@ -1,9 +1,12 @@
extern crate chain;
extern crate db;
extern crate heapsize;
extern crate primitives;
extern crate serialization as ser;
extern crate test_data;
mod fee;
mod memory_pool;
pub use fee::{transaction_fee, transaction_fee_rate};
pub use memory_pool::{MemoryPool, Information as MemoryPoolInformation, OrderingStrategy as MemoryPoolOrderingStrategy};

View File

@ -27,6 +27,8 @@ pub struct ConnectionFilter {
last_blocks: LinkedHashMap<H256, ()>,
/// Last transactions from peer.
last_transactions: LinkedHashMap<H256, ()>,
/// Minimal fee in satoshis per 1000 bytes
fee_rate: Option<u64>,
}
/// Connection bloom filter
@ -70,6 +72,7 @@ impl Default for ConnectionFilter {
filter_flags: types::FilterFlags::None,
last_blocks: LinkedHashMap::new(),
last_transactions: LinkedHashMap::new(),
fee_rate: None,
}
}
}
@ -83,6 +86,7 @@ impl ConnectionFilter {
filter_flags: message.flags,
last_blocks: LinkedHashMap::new(),
last_transactions: LinkedHashMap::new(),
fee_rate: None,
}
}
@ -119,12 +123,104 @@ impl ConnectionFilter {
}
/// Check if transaction should be sent to this connection && optionally update filter
pub fn filter_transaction(&mut self, transaction_hash: &H256, transaction: &Transaction) -> bool {
pub fn filter_transaction(&mut self, transaction_hash: &H256, transaction: &Transaction, transaction_fee_rate: u64) -> bool {
// check if transaction is known
if self.last_transactions.contains_key(transaction_hash) {
return false;
}
// check if transaction fee rate is high enough for this peer
if let Some(fee_rate) = self.fee_rate {
if transaction_fee_rate < fee_rate {
return false;
}
}
// check with bloom filter, if set
self.filter_transaction_with_bloom(transaction_hash, transaction)
}
/// Load filter
pub fn load(&mut self, message: &types::FilterLoad) {
self.bloom = Some(ConnectionBloom::new(message));
self.filter_flags = message.flags;
}
/// Add filter
pub fn add(&mut self, message: &types::FilterAdd) {
// ignore if filter is not currently set
if let Some(ref mut bloom) = self.bloom {
bloom.insert(&message.data);
}
}
/// Clear filter
pub fn clear(&mut self) {
self.bloom = None;
}
/// Limit transaction announcing by transaction fee
pub fn set_fee_rate(&mut self, fee_rate: u64) {
if fee_rate == 0 {
self.fee_rate = None;
}
else {
self.fee_rate = Some(fee_rate);
}
}
/// Convert `Block` to `MerkleBlock` using this filter
pub fn build_merkle_block(&mut self, block: Block) -> Option<MerkleBlockArtefacts> {
if self.bloom.is_none() {
return None;
}
// prepare result
let all_len = block.transactions.len();
let mut result = MerkleBlockArtefacts {
merkleblock: types::MerkleBlock {
block_header: block.block_header.clone(),
total_transactions: all_len as u32,
hashes: Vec::default(),
flags: Bytes::default(),
},
matching_transactions: Vec::new(),
};
// calculate hashes && match flags for all transactions
let (all_hashes, all_flags) = block.transactions.into_iter()
.fold((Vec::<H256>::with_capacity(all_len), BitVec::with_capacity(all_len)), |(mut all_hashes, mut all_flags), t| {
let hash = t.hash();
let flag = self.filter_transaction_with_bloom(&hash, &t);
if flag {
result.matching_transactions.push((hash.clone(), t));
}
all_flags.push(flag);
all_hashes.push(hash);
(all_hashes, all_flags)
});
// build partial merkle tree
let (hashes, flags) = PartialMerkleTree::build(all_hashes, all_flags);
result.merkleblock.hashes.extend(hashes);
// to_bytes() converts [true, false, true] to 0b10100000
// while protocol requires [true, false, true] to be serialized as 0x00000101
result.merkleblock.flags = flags.to_bytes().into_iter()
.map(|b|
((b & 0b10000000) >> 7) |
((b & 0b01000000) >> 5) |
((b & 0b00100000) >> 3) |
((b & 0b00010000) >> 1) |
((b & 0b00001000) << 1) |
((b & 0b00000100) << 3) |
((b & 0b00000010) << 5) |
((b & 0b00000001) << 7)).collect::<Vec<u8>>().into();
Some(result)
}
/// Check if transaction should be sent to this connection using bloom filter && optionally update filter
fn filter_transaction_with_bloom(&mut self, transaction_hash: &H256, transaction: &Transaction) -> bool {
// check with bloom filter, if set
match self.bloom {
/// if no filter is set for the connection => match everything
@ -188,75 +284,6 @@ impl ConnectionFilter {
},
}
}
/// Load filter
pub fn load(&mut self, message: &types::FilterLoad) {
self.bloom = Some(ConnectionBloom::new(message));
self.filter_flags = message.flags;
}
/// Add filter
pub fn add(&mut self, message: &types::FilterAdd) {
// ignore if filter is not currently set
if let Some(ref mut bloom) = self.bloom {
bloom.insert(&message.data);
}
}
/// Clear filter
pub fn clear(&mut self) {
self.bloom = None;
}
/// Convert `Block` to `MerkleBlock` using this filter
pub fn build_merkle_block(&mut self, block: Block) -> Option<MerkleBlockArtefacts> {
if self.bloom.is_none() {
return None;
}
// prepare result
let all_len = block.transactions.len();
let mut result = MerkleBlockArtefacts {
merkleblock: types::MerkleBlock {
block_header: block.block_header.clone(),
total_transactions: all_len as u32,
hashes: Vec::default(),
flags: Bytes::default(),
},
matching_transactions: Vec::new(),
};
// calculate hashes && match flags for all transactions
let (all_hashes, all_flags) = block.transactions.into_iter()
.fold((Vec::<H256>::with_capacity(all_len), BitVec::with_capacity(all_len)), |(mut all_hashes, mut all_flags), t| {
let hash = t.hash();
let flag = self.filter_transaction(&hash, &t);
if flag {
result.matching_transactions.push((hash.clone(), t));
}
all_flags.push(flag);
all_hashes.push(hash);
(all_hashes, all_flags)
});
// build partial merkle tree
let (hashes, flags) = PartialMerkleTree::build(all_hashes, all_flags);
result.merkleblock.hashes.extend(hashes);
// to_bytes() converts [true, false, true] to 0b10100000
// while protocol requires [true, false, true] to be serialized as 0x00000101
result.merkleblock.flags = flags.to_bytes().into_iter()
.map(|b|
((b & 0b10000000) >> 7) |
((b & 0b01000000) >> 5) |
((b & 0b00100000) >> 3) |
((b & 0b00010000) >> 1) |
((b & 0b00001000) << 1) |
((b & 0b00000100) << 3) |
((b & 0b00000010) << 5) |
((b & 0b00000001) << 7)).collect::<Vec<u8>>().into();
Some(result)
}
}
impl ConnectionBloom {
@ -493,13 +520,13 @@ pub mod tests {
let mut filter = ConnectionFilter::with_filterload(&default_filterload());
assert!(!filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
filter.add(&make_filteradd(&*tx1.hash()));
assert!(filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
}
#[test]
@ -512,13 +539,13 @@ pub mod tests {
let mut filter = ConnectionFilter::with_filterload(&default_filterload());
assert!(!filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
filter.add(&make_filteradd(&tx1_out_data));
assert!(filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
}
#[test]
@ -530,13 +557,13 @@ pub mod tests {
let mut filter = ConnectionFilter::with_filterload(&default_filterload());
assert!(!filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
filter.add(&make_filteradd(&tx1_previous_output));
assert!(filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
}
#[test]
@ -549,13 +576,39 @@ pub mod tests {
let mut filter = ConnectionFilter::with_filterload(&default_filterload());
assert!(!filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
filter.add(&make_filteradd(&tx1_input_data));
assert!(filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
}
#[test]
fn connection_filter_matches_transaction_by_fee_rate() {
let tx1: Transaction = test_data::TransactionBuilder::with_version(1).into();
let tx2: Transaction = test_data::TransactionBuilder::with_version(2).into();
let mut filter = ConnectionFilter::default();
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(filter.filter_transaction(&tx2.hash(), &tx2, 2000));
filter.set_fee_rate(1500);
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(filter.filter_transaction(&tx2.hash(), &tx2, 2000));
filter.set_fee_rate(3000);
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 2000));
filter.set_fee_rate(0);
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(filter.filter_transaction(&tx2.hash(), &tx2, 2000));
}
#[test]

View File

@ -186,8 +186,9 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
self.client.lock().on_peer_sendheaders(peer_index);
}
pub fn on_peer_feefilter(&self, peer_index: usize, _message: types::FeeFilter) {
pub fn on_peer_feefilter(&self, peer_index: usize, message: types::FeeFilter) {
trace!(target: "sync", "Got `feefilter` message from peer#{}", peer_index);
self.client.lock().on_peer_feefilter(peer_index, &message);
}
pub fn on_peer_send_compact(&self, peer_index: usize, _message: types::SendCompact) {

View File

@ -182,6 +182,11 @@ impl Chain {
self.storage.clone()
}
/// Get storage, which contains all storage transaction && mempool transactions
// pub fn mempool_transaction_storage() -> db::SharedStore {
// TODO: implement TransactionProvider Storage + MemoryPool
//}
/// Get number of blocks in given state
pub fn length_of_blocks_state(&self, state: BlockState) -> u32 {
match state {

View File

@ -195,6 +195,7 @@ pub trait Client : Send + 'static {
fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd);
fn on_peer_filterclear(&mut self, peer_index: usize);
fn on_peer_sendheaders(&mut self, peer_index: usize);
fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter);
fn on_peer_disconnected(&mut self, peer_index: usize);
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>);
}
@ -215,6 +216,7 @@ pub trait ClientCore : VerificationSink {
fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd);
fn on_peer_filterclear(&mut self, peer_index: usize);
fn on_peer_sendheaders(&mut self, peer_index: usize);
fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter);
fn on_peer_disconnected(&mut self, peer_index: usize);
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>);
fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option<Vec<H256>>);
@ -401,6 +403,10 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
self.core.lock().on_peer_sendheaders(peer_index);
}
fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter) {
self.core.lock().on_peer_feefilter(peer_index, message);
}
fn on_peer_disconnected(&mut self, peer_index: usize) {
self.core.lock().on_peer_disconnected(peer_index);
}
@ -637,6 +643,13 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
}
}
/// Peer wants to limit transaction announcing by transaction fee
fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter) {
if self.peers.is_known_peer(peer_index) {
self.peers.on_peer_feefilter(peer_index, message.fee_rate);
}
}
/// Peer disconnected.
fn on_peer_disconnected(&mut self, peer_index: usize) {
// when last peer is disconnected, reset, but let verifying blocks be verified
@ -823,7 +836,7 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
fn on_transaction_verification_success(&mut self, transaction: Transaction) {
let hash = transaction.hash();
{
let transaction_fee_rate = {
// insert transaction to the memory pool
let mut chain = self.chain.write();
@ -835,10 +848,17 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
// transaction was in verification queue => insert to memory pool
chain.insert_verified_transaction(transaction.clone());
}
// calculate transaction fee rate
// TODO: uncomment me: after implementation
// use miner::transaction_fee_rate;
// transaction_fee_rate(chain.mempool_transaction_storage(), &transaction)
use std::u64::MAX;
MAX
};
// relay transaction to peers
self.relay_new_transactions(vec![(hash, &transaction)]);
self.relay_new_transactions(vec![(hash, &transaction, transaction_fee_rate)]);
}
/// Process failed transaction verification
@ -971,12 +991,14 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
}
/// Relay new transactions
fn relay_new_transactions(&mut self, new_transactions: Vec<(H256, &Transaction)>) {
fn relay_new_transactions(&mut self, new_transactions: Vec<(H256, &Transaction, u64)>) {
let tasks: Vec<_> = self.peers.all_peers().into_iter()
.filter_map(|peer_index| {
let inventory: Vec<_> = new_transactions.iter()
.filter(|&&(ref h, tx)| self.peers.filter_mut(peer_index).filter_transaction(h, tx))
.map(|&(ref h, _)| InventoryVector {
.filter(|&&(ref h, tx, tx_fee_rate)| {
self.peers.filter_mut(peer_index).filter_transaction(h, tx, tx_fee_rate)
})
.map(|&(ref h, _, _)| InventoryVector {
inv_type: InventoryType::MessageTx,
hash: h.clone(),
})

View File

@ -198,6 +198,11 @@ impl Peers {
self.send_headers.insert(peer_index);
}
/// Peer wants to limit transaction announcing by transaction fee
pub fn on_peer_feefilter(&mut self, peer_index: usize, fee_rate: u64) {
self.filter_mut(peer_index).set_fee_rate(fee_rate);
}
/// Peer has been disconnected
pub fn on_peer_disconnected(&mut self, peer_index: usize) -> Option<Vec<H256>> {
// forget this peer without any chances to reuse