Merge pull request #187 from ethcore/sync_feefilter

Process `feefilter` message
This commit is contained in:
Nikolay Volf 2016-11-25 15:46:45 +03:00 committed by GitHub
commit 01dfe36a66
13 changed files with 328 additions and 96 deletions

1
Cargo.lock generated
View File

@ -337,6 +337,7 @@ name = "miner"
version = "0.1.0"
dependencies = [
"chain 0.1.0",
"db 0.1.0",
"heapsize 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"primitives 0.1.0",
"serialization 0.1.0",

View File

@ -46,7 +46,7 @@ pub use best_block::BestBlock;
pub use storage::{Storage, Store};
pub use error::Error;
pub use kvdb::Database;
pub use transaction_provider::TransactionProvider;
pub use transaction_provider::{TransactionProvider, AsTransactionProvider};
pub use transaction_meta_provider::TransactionMetaProvider;
pub use block_stapler::{BlockStapler, BlockInsertedChain};
pub use block_provider::BlockProvider;

View File

@ -2,7 +2,7 @@
use super::{
BlockRef, Store, Error, BestBlock, BlockLocation, BlockInsertedChain, BlockProvider,
BlockStapler, TransactionMetaProvider, TransactionProvider,
BlockStapler, TransactionMetaProvider, TransactionProvider, AsTransactionProvider
};
use chain::{self, Block};
use primitives::hash::H256;
@ -163,6 +163,12 @@ impl TransactionProvider for TestStorage {
}
}
impl AsTransactionProvider for TestStorage {
fn as_transaction_provider(&self) -> &TransactionProvider {
&*self
}
}
impl TransactionMetaProvider for TestStorage {
// just spawns new meta so far, use real store for proper tests
fn transaction_meta(&self, hash: &H256) -> Option<TransactionMeta> {

View File

@ -16,3 +16,8 @@ pub trait TransactionProvider {
fn transaction(&self, hash: &H256) -> Option<chain::Transaction>;
}
pub trait AsTransactionProvider {
/// returns `TransactionProvider`
fn as_transaction_provider(&self) -> &TransactionProvider;
}

View File

@ -6,6 +6,7 @@ authors = ["Ethcore <admin@ethcore.io>"]
[dependencies]
heapsize = "0.3"
chain = { path = "../chain" }
db = { path = "../db" }
primitives = { path = "../primitives" }
serialization = { path = "../serialization" }
test-data = { path = "../test-data" }

62
miner/src/fee.rs Normal file
View File

@ -0,0 +1,62 @@
use chain::Transaction;
use db::TransactionProvider;
pub fn transaction_fee(store: &TransactionProvider, transaction: &Transaction) -> u64 {
let inputs_sum = transaction.inputs.iter()
.fold(0, |accumulator, input| {
let input_transaction = store.transaction(&input.previous_output.hash)
.expect("transaction must be verified by caller");
accumulator + input_transaction.outputs[input.previous_output.index as usize].value
});
let outputs_sum = transaction.outputs.iter()
.fold(0, |accumulator, output| accumulator + output.value);
inputs_sum.saturating_sub(outputs_sum)
}
pub fn transaction_fee_rate(store: &TransactionProvider, transaction: &Transaction) -> u64 {
use ser::Serializable;
transaction_fee(store, transaction) / transaction.serialized_size() as u64
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use db::{TestStorage, AsTransactionProvider};
use test_data;
use super::*;
#[test]
fn test_transaction_fee() {
let b0 = test_data::block_builder().header().nonce(1).build()
.transaction()
.output().value(1_000_000).build()
.build()
.transaction()
.output().value(2_000_000).build()
.build()
.build();
let tx0 = b0.transactions[0].clone();
let tx0_hash = tx0.hash();
let tx1 = b0.transactions[1].clone();
let tx1_hash = tx1.hash();
let b1 = test_data::block_builder().header().nonce(2).build()
.transaction()
.input().hash(tx0_hash).index(0).build()
.input().hash(tx1_hash).index(0).build()
.output().value(2_500_000).build()
.build()
.build();
let tx2 = b1.transactions[0].clone();
let db = Arc::new(TestStorage::with_blocks(&vec![b0, b1]));
assert_eq!(transaction_fee(db.as_transaction_provider(), &tx0), 0);
assert_eq!(transaction_fee(db.as_transaction_provider(), &tx1), 0);
assert_eq!(transaction_fee(db.as_transaction_provider(), &tx2), 500_000);
assert_eq!(transaction_fee_rate(db.as_transaction_provider(), &tx0), 0);
assert_eq!(transaction_fee_rate(db.as_transaction_provider(), &tx1), 0);
assert_eq!(transaction_fee_rate(db.as_transaction_provider(), &tx2), 4_950);
}
}

View File

@ -1,9 +1,12 @@
extern crate chain;
extern crate db;
extern crate heapsize;
extern crate primitives;
extern crate serialization as ser;
extern crate test_data;
mod fee;
mod memory_pool;
pub use fee::{transaction_fee, transaction_fee_rate};
pub use memory_pool::{MemoryPool, Information as MemoryPoolInformation, OrderingStrategy as MemoryPoolOrderingStrategy};

View File

@ -5,13 +5,15 @@
//! transactions.
//! It also guarantees that ancestor-descendant relation won't break during ordered removal (ancestors always removed
//! before descendants). Removal using `remove_by_hash` can break this rule.
use db::TransactionProvider;
use primitives::bytes::Bytes;
use primitives::hash::H256;
use chain::Transaction;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::BTreeSet;
use ser::Serializable;
use ser::{Serializable, serialize};
use heapsize::HeapSizeOf;
/// Transactions ordering strategy
@ -684,6 +686,16 @@ impl MemoryPool {
}
}
impl TransactionProvider for MemoryPool {
fn transaction_bytes(&self, hash: &H256) -> Option<Bytes> {
self.get(hash).map(|t| serialize(t))
}
fn transaction(&self, hash: &H256) -> Option<Transaction> {
self.get(hash).cloned()
}
}
impl HeapSizeOf for MemoryPool {
fn heap_size_of_children(&self) -> usize {
self.storage.heap_size_of_children()

View File

@ -27,6 +27,8 @@ pub struct ConnectionFilter {
last_blocks: LinkedHashMap<H256, ()>,
/// Last transactions from peer.
last_transactions: LinkedHashMap<H256, ()>,
/// Minimal fee in satoshis per 1000 bytes
fee_rate: Option<u64>,
}
/// Connection bloom filter
@ -70,6 +72,7 @@ impl Default for ConnectionFilter {
filter_flags: types::FilterFlags::None,
last_blocks: LinkedHashMap::new(),
last_transactions: LinkedHashMap::new(),
fee_rate: None,
}
}
}
@ -83,6 +86,7 @@ impl ConnectionFilter {
filter_flags: message.flags,
last_blocks: LinkedHashMap::new(),
last_transactions: LinkedHashMap::new(),
fee_rate: None,
}
}
@ -119,12 +123,104 @@ impl ConnectionFilter {
}
/// Check if transaction should be sent to this connection && optionally update filter
pub fn filter_transaction(&mut self, transaction_hash: &H256, transaction: &Transaction) -> bool {
pub fn filter_transaction(&mut self, transaction_hash: &H256, transaction: &Transaction, transaction_fee_rate: u64) -> bool {
// check if transaction is known
if self.last_transactions.contains_key(transaction_hash) {
return false;
}
// check if transaction fee rate is high enough for this peer
if let Some(fee_rate) = self.fee_rate {
if transaction_fee_rate < fee_rate {
return false;
}
}
// check with bloom filter, if set
self.filter_transaction_with_bloom(transaction_hash, transaction)
}
/// Load filter
pub fn load(&mut self, message: &types::FilterLoad) {
self.bloom = Some(ConnectionBloom::new(message));
self.filter_flags = message.flags;
}
/// Add filter
pub fn add(&mut self, message: &types::FilterAdd) {
// ignore if filter is not currently set
if let Some(ref mut bloom) = self.bloom {
bloom.insert(&message.data);
}
}
/// Clear filter
pub fn clear(&mut self) {
self.bloom = None;
}
/// Limit transaction announcing by transaction fee
pub fn set_fee_rate(&mut self, fee_rate: u64) {
if fee_rate == 0 {
self.fee_rate = None;
}
else {
self.fee_rate = Some(fee_rate);
}
}
/// Convert `Block` to `MerkleBlock` using this filter
pub fn build_merkle_block(&mut self, block: Block) -> Option<MerkleBlockArtefacts> {
if self.bloom.is_none() {
return None;
}
// prepare result
let all_len = block.transactions.len();
let mut result = MerkleBlockArtefacts {
merkleblock: types::MerkleBlock {
block_header: block.block_header.clone(),
total_transactions: all_len as u32,
hashes: Vec::default(),
flags: Bytes::default(),
},
matching_transactions: Vec::new(),
};
// calculate hashes && match flags for all transactions
let (all_hashes, all_flags) = block.transactions.into_iter()
.fold((Vec::<H256>::with_capacity(all_len), BitVec::with_capacity(all_len)), |(mut all_hashes, mut all_flags), t| {
let hash = t.hash();
let flag = self.filter_transaction_with_bloom(&hash, &t);
if flag {
result.matching_transactions.push((hash.clone(), t));
}
all_flags.push(flag);
all_hashes.push(hash);
(all_hashes, all_flags)
});
// build partial merkle tree
let (hashes, flags) = PartialMerkleTree::build(all_hashes, all_flags);
result.merkleblock.hashes.extend(hashes);
// to_bytes() converts [true, false, true] to 0b10100000
// while protocol requires [true, false, true] to be serialized as 0x00000101
result.merkleblock.flags = flags.to_bytes().into_iter()
.map(|b|
((b & 0b10000000) >> 7) |
((b & 0b01000000) >> 5) |
((b & 0b00100000) >> 3) |
((b & 0b00010000) >> 1) |
((b & 0b00001000) << 1) |
((b & 0b00000100) << 3) |
((b & 0b00000010) << 5) |
((b & 0b00000001) << 7)).collect::<Vec<u8>>().into();
Some(result)
}
/// Check if transaction should be sent to this connection using bloom filter && optionally update filter
fn filter_transaction_with_bloom(&mut self, transaction_hash: &H256, transaction: &Transaction) -> bool {
// check with bloom filter, if set
match self.bloom {
/// if no filter is set for the connection => match everything
@ -188,75 +284,6 @@ impl ConnectionFilter {
},
}
}
/// Load filter
pub fn load(&mut self, message: &types::FilterLoad) {
self.bloom = Some(ConnectionBloom::new(message));
self.filter_flags = message.flags;
}
/// Add filter
pub fn add(&mut self, message: &types::FilterAdd) {
// ignore if filter is not currently set
if let Some(ref mut bloom) = self.bloom {
bloom.insert(&message.data);
}
}
/// Clear filter
pub fn clear(&mut self) {
self.bloom = None;
}
/// Convert `Block` to `MerkleBlock` using this filter
pub fn build_merkle_block(&mut self, block: Block) -> Option<MerkleBlockArtefacts> {
if self.bloom.is_none() {
return None;
}
// prepare result
let all_len = block.transactions.len();
let mut result = MerkleBlockArtefacts {
merkleblock: types::MerkleBlock {
block_header: block.block_header.clone(),
total_transactions: all_len as u32,
hashes: Vec::default(),
flags: Bytes::default(),
},
matching_transactions: Vec::new(),
};
// calculate hashes && match flags for all transactions
let (all_hashes, all_flags) = block.transactions.into_iter()
.fold((Vec::<H256>::with_capacity(all_len), BitVec::with_capacity(all_len)), |(mut all_hashes, mut all_flags), t| {
let hash = t.hash();
let flag = self.filter_transaction(&hash, &t);
if flag {
result.matching_transactions.push((hash.clone(), t));
}
all_flags.push(flag);
all_hashes.push(hash);
(all_hashes, all_flags)
});
// build partial merkle tree
let (hashes, flags) = PartialMerkleTree::build(all_hashes, all_flags);
result.merkleblock.hashes.extend(hashes);
// to_bytes() converts [true, false, true] to 0b10100000
// while protocol requires [true, false, true] to be serialized as 0x00000101
result.merkleblock.flags = flags.to_bytes().into_iter()
.map(|b|
((b & 0b10000000) >> 7) |
((b & 0b01000000) >> 5) |
((b & 0b00100000) >> 3) |
((b & 0b00010000) >> 1) |
((b & 0b00001000) << 1) |
((b & 0b00000100) << 3) |
((b & 0b00000010) << 5) |
((b & 0b00000001) << 7)).collect::<Vec<u8>>().into();
Some(result)
}
}
impl ConnectionBloom {
@ -493,13 +520,13 @@ pub mod tests {
let mut filter = ConnectionFilter::with_filterload(&default_filterload());
assert!(!filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
filter.add(&make_filteradd(&*tx1.hash()));
assert!(filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
}
#[test]
@ -512,13 +539,13 @@ pub mod tests {
let mut filter = ConnectionFilter::with_filterload(&default_filterload());
assert!(!filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
filter.add(&make_filteradd(&tx1_out_data));
assert!(filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
}
#[test]
@ -530,13 +557,13 @@ pub mod tests {
let mut filter = ConnectionFilter::with_filterload(&default_filterload());
assert!(!filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
filter.add(&make_filteradd(&tx1_previous_output));
assert!(filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
}
#[test]
@ -549,13 +576,39 @@ pub mod tests {
let mut filter = ConnectionFilter::with_filterload(&default_filterload());
assert!(!filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
filter.add(&make_filteradd(&tx1_input_data));
assert!(filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
}
#[test]
fn connection_filter_matches_transaction_by_fee_rate() {
let tx1: Transaction = test_data::TransactionBuilder::with_version(1).into();
let tx2: Transaction = test_data::TransactionBuilder::with_version(2).into();
let mut filter = ConnectionFilter::default();
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(filter.filter_transaction(&tx2.hash(), &tx2, 2000));
filter.set_fee_rate(1500);
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(filter.filter_transaction(&tx2.hash(), &tx2, 2000));
filter.set_fee_rate(3000);
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 2000));
filter.set_fee_rate(0);
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(filter.filter_transaction(&tx2.hash(), &tx2, 2000));
}
#[test]

View File

@ -186,8 +186,9 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
self.client.lock().on_peer_sendheaders(peer_index);
}
pub fn on_peer_feefilter(&self, peer_index: usize, _message: types::FeeFilter) {
pub fn on_peer_feefilter(&self, peer_index: usize, message: types::FeeFilter) {
trace!(target: "sync", "Got `feefilter` message from peer#{}", peer_index);
self.client.lock().on_peer_feefilter(peer_index, &message);
}
pub fn on_peer_send_compact(&self, peer_index: usize, _message: types::SendCompact) {

View File

@ -6,6 +6,7 @@ use parking_lot::RwLock;
use chain::{Block, BlockHeader, Transaction};
use db;
use best_headers_chain::{BestHeadersChain, Information as BestHeadersInformation};
use primitives::bytes::Bytes;
use primitives::hash::H256;
use hash_queue::{HashQueueChain, HashPosition};
use miner::{MemoryPool, MemoryPoolOrderingStrategy, MemoryPoolInformation};
@ -661,6 +662,18 @@ impl Chain {
}
}
impl db::TransactionProvider for Chain {
fn transaction_bytes(&self, hash: &H256) -> Option<Bytes> {
self.memory_pool.transaction_bytes(hash)
.or_else(|| self.storage.transaction_bytes(hash))
}
fn transaction(&self, hash: &H256) -> Option<Transaction> {
self.memory_pool.transaction(hash)
.or_else(|| self.storage.transaction(hash))
}
}
impl fmt::Debug for Information {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "[sch:{} / bh:{} -> req:{} -> vfy:{} -> stored: {}]", self.scheduled, self.headers.best, self.requested, self.verifying, self.stored)

View File

@ -26,6 +26,7 @@ use synchronization_manager::{manage_synchronization_peers_blocks, manage_synchr
ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig};
use synchronization_verifier::{Verifier, VerificationSink};
use hash_queue::HashPosition;
use miner::transaction_fee_rate;
use time;
use std::time::Duration;
@ -195,6 +196,7 @@ pub trait Client : Send + 'static {
fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd);
fn on_peer_filterclear(&mut self, peer_index: usize);
fn on_peer_sendheaders(&mut self, peer_index: usize);
fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter);
fn on_peer_disconnected(&mut self, peer_index: usize);
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>);
}
@ -215,6 +217,7 @@ pub trait ClientCore : VerificationSink {
fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd);
fn on_peer_filterclear(&mut self, peer_index: usize);
fn on_peer_sendheaders(&mut self, peer_index: usize);
fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter);
fn on_peer_disconnected(&mut self, peer_index: usize);
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>);
fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option<Vec<H256>>);
@ -401,6 +404,10 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
self.core.lock().on_peer_sendheaders(peer_index);
}
fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter) {
self.core.lock().on_peer_feefilter(peer_index, message);
}
fn on_peer_disconnected(&mut self, peer_index: usize) {
self.core.lock().on_peer_disconnected(peer_index);
}
@ -637,6 +644,13 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
}
}
/// Peer wants to limit transaction announcing by transaction fee
fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter) {
if self.peers.is_known_peer(peer_index) {
self.peers.on_peer_feefilter(peer_index, message.fee_rate);
}
}
/// Peer disconnected.
fn on_peer_disconnected(&mut self, peer_index: usize) {
// when last peer is disconnected, reset, but let verifying blocks be verified
@ -823,7 +837,7 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
fn on_transaction_verification_success(&mut self, transaction: Transaction) {
let hash = transaction.hash();
{
let transaction_fee_rate = {
// insert transaction to the memory pool
let mut chain = self.chain.write();
@ -835,10 +849,13 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
// transaction was in verification queue => insert to memory pool
chain.insert_verified_transaction(transaction.clone());
}
// calculate transaction fee rate
transaction_fee_rate(&*chain, &transaction)
};
// relay transaction to peers
self.relay_new_transactions(vec![(hash, &transaction)]);
self.relay_new_transactions(vec![(hash, &transaction, transaction_fee_rate)]);
}
/// Process failed transaction verification
@ -971,12 +988,14 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
}
/// Relay new transactions
fn relay_new_transactions(&mut self, new_transactions: Vec<(H256, &Transaction)>) {
fn relay_new_transactions(&mut self, new_transactions: Vec<(H256, &Transaction, u64)>) {
let tasks: Vec<_> = self.peers.all_peers().into_iter()
.filter_map(|peer_index| {
let inventory: Vec<_> = new_transactions.iter()
.filter(|&&(ref h, tx)| self.peers.filter_mut(peer_index).filter_transaction(h, tx))
.map(|&(ref h, _)| InventoryVector {
.filter(|&&(ref h, tx, tx_fee_rate)| {
self.peers.filter_mut(peer_index).filter_transaction(h, tx, tx_fee_rate)
})
.map(|&(ref h, _, _)| InventoryVector {
inv_type: InventoryType::MessageTx,
hash: h.clone(),
})
@ -1283,6 +1302,7 @@ pub mod tests {
use tokio_core::reactor::{Core, Handle};
use chain::{Block, Transaction};
use message::common::{InventoryVector, InventoryType};
use message::types;
use super::{Client, Config, SynchronizationClient, SynchronizationClientCore};
use connection_filter::tests::*;
use synchronization_executor::Task;
@ -2142,4 +2162,54 @@ pub mod tests {
Task::SendInventory(3, inventory, ServerTaskIndex::None),
]);
}
#[test]
fn relay_new_transaction_with_feefilter() {
let (_, _, executor, chain, sync) = create_sync(None, None);
let b1 = test_data::block_builder().header().parent(test_data::genesis().hash()).build()
.transaction().output().value(1_000_000).build().build()
.build(); // genesis -> b1
let tx0 = b1.transactions[0].clone();
let tx1: Transaction = test_data::TransactionBuilder::with_output(800_000).add_input(&tx0, 0).into();
let tx1_hash = tx1.hash();
let mut sync = sync.lock();
sync.on_peer_connected(1);
sync.on_peer_connected(2);
sync.on_peer_connected(3);
sync.on_peer_connected(4);
sync.on_peer_block(1, b1);
{
use miner::transaction_fee_rate;
let chain = chain.read();
assert_eq!(transaction_fee_rate(&*chain, &tx1), 3333); // 200_000 / 60
}
sync.on_peer_feefilter(2, &types::FeeFilter { fee_rate: 3000, });
sync.on_peer_feefilter(3, &types::FeeFilter { fee_rate: 4000, });
// forget previous tasks
{ executor.lock().take_tasks(); }
sync.on_peer_transaction(1, tx1);
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![
Task::SendInventory(2, vec![
InventoryVector {
inv_type: InventoryType::MessageTx,
hash: tx1_hash.clone(),
}
], ServerTaskIndex::None),
Task::SendInventory(4, vec![
InventoryVector {
inv_type: InventoryType::MessageTx,
hash: tx1_hash.clone(),
}
], ServerTaskIndex::None),
]);
}
}

View File

@ -198,6 +198,11 @@ impl Peers {
self.send_headers.insert(peer_index);
}
/// Peer wants to limit transaction announcing by transaction fee
pub fn on_peer_feefilter(&mut self, peer_index: usize, fee_rate: u64) {
self.filter_mut(peer_index).set_fee_rate(fee_rate);
}
/// Peer has been disconnected
pub fn on_peer_disconnected(&mut self, peer_index: usize) -> Option<Vec<H256>> {
// forget this peer without any chances to reuse