Merge branch 'master' into indexed-blocks
This commit is contained in:
commit
6fee34ad66
|
@ -89,6 +89,7 @@ version = "0.1.0"
|
|||
dependencies = [
|
||||
"primitives 0.1.0",
|
||||
"rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"siphasher 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -657,6 +658,11 @@ dependencies = [
|
|||
"winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "siphasher"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "slab"
|
||||
version = "0.3.0"
|
||||
|
@ -677,6 +683,8 @@ name = "sync"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bit-vec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"bitcrypto 0.1.0",
|
||||
"byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"chain 0.1.0",
|
||||
"db 0.1.0",
|
||||
"ethcore-devtools 1.3.0",
|
||||
|
@ -870,6 +878,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
"checksum scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f417c22df063e9450888a7561788e9bd46d3bb3c1466435b4eccb903807f147d"
|
||||
"checksum semver 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d4f410fedcf71af0345d7607d246e7ad15faaadd49d240ee3b24e5dc21a820ac"
|
||||
"checksum shell32-sys 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "72f20b8f3c060374edb8046591ba28f62448c369ccbdc7b02075103fb3a9e38d"
|
||||
"checksum siphasher 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b1c3c58c9ac43c530919fe6bd8ef11ae2612f64c2bf8eab9346f5b71ce0617f2"
|
||||
"checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23"
|
||||
"checksum smallvec 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "fcc8d19212aacecf95e4a7a2179b26f7aeb9732a915cf01f05b0d3e044865410"
|
||||
"checksum strsim 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "50c069df92e4b01425a8bf3576d5d417943a6a7272fbabaf5bd80b1aaa76442e"
|
||||
|
|
|
@ -5,4 +5,5 @@ authors = ["debris <marek.kotewicz@gmail.com>"]
|
|||
|
||||
[dependencies]
|
||||
rust-crypto = "0.2.36"
|
||||
siphasher = "0.1.1"
|
||||
primitives = { path = "../primitives" }
|
||||
|
|
|
@ -1,10 +1,13 @@
|
|||
extern crate crypto as rcrypto;
|
||||
extern crate primitives;
|
||||
extern crate siphasher;
|
||||
|
||||
use std::hash::Hasher;
|
||||
use rcrypto::sha1::Sha1;
|
||||
use rcrypto::sha2::Sha256;
|
||||
use rcrypto::ripemd160::Ripemd160;
|
||||
use rcrypto::digest::Digest;
|
||||
use siphasher::sip::SipHasher24;
|
||||
use primitives::hash::{H32, H160, H256};
|
||||
|
||||
pub struct DHash160 {
|
||||
|
@ -146,6 +149,14 @@ pub fn dhash256(input: &[u8]) -> H256 {
|
|||
result
|
||||
}
|
||||
|
||||
/// SipHash-2-4
|
||||
#[inline]
|
||||
pub fn siphash24(key0: u64, key1: u64, input: &[u8]) -> u64 {
|
||||
let mut hasher = SipHasher24::new_with_keys(key0, key1);
|
||||
hasher.write(input);
|
||||
hasher.finish()
|
||||
}
|
||||
|
||||
/// Data checksum
|
||||
#[inline]
|
||||
pub fn checksum(data: &[u8]) -> H32 {
|
||||
|
@ -157,7 +168,7 @@ pub fn checksum(data: &[u8]) -> H32 {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use primitives::bytes::Bytes;
|
||||
use super::{ripemd160, sha1, sha256, dhash160, dhash256, checksum};
|
||||
use super::{ripemd160, sha1, sha256, dhash160, dhash256, siphash24, checksum};
|
||||
|
||||
#[test]
|
||||
fn test_ripemd160() {
|
||||
|
@ -199,6 +210,13 @@ mod tests {
|
|||
assert_eq!(result, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_siphash24() {
|
||||
let expected = 0x74f839c593dc67fd_u64;
|
||||
let result = siphash24(0x0706050403020100_u64, 0x0F0E0D0C0B0A0908_u64, &[0; 1]);
|
||||
assert_eq!(result, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_checksum() {
|
||||
assert_eq!(checksum(b"hello"), "9595c9df".into());
|
||||
|
|
|
@ -15,8 +15,10 @@ ethcore-devtools = { path = "../devtools" }
|
|||
bit-vec = "0.4.3"
|
||||
murmur3 = "0.3"
|
||||
rand = "0.3"
|
||||
byteorder = "0.5"
|
||||
|
||||
chain = { path = "../chain" }
|
||||
bitcrypto = { path = "../crypto" }
|
||||
db = { path = "../db" }
|
||||
message = { path = "../message" }
|
||||
miner = { path = "../miner" }
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
use std::collections::HashSet;
|
||||
use rand::{thread_rng, Rng};
|
||||
use bitcrypto::{sha256, siphash24};
|
||||
use byteorder::{LittleEndian, ByteOrder};
|
||||
use chain::{BlockHeader, ShortTransactionID};
|
||||
use db::IndexedBlock;
|
||||
use message::common::{BlockHeaderAndIDs, PrefilledTransaction};
|
||||
use primitives::hash::H256;
|
||||
use ser::{Stream, Serializable};
|
||||
|
||||
/// Maximum size of prefilled transactions in compact block
|
||||
const MAX_COMPACT_BLOCK_PREFILLED_SIZE: usize = 10 * 1024;
|
||||
|
||||
pub fn build_compact_block(block: IndexedBlock, prefilled_transactions_indexes: HashSet<usize>) -> BlockHeaderAndIDs {
|
||||
let nonce: u64 = thread_rng().gen();
|
||||
|
||||
let prefilled_transactions_len = prefilled_transactions_indexes.len();
|
||||
let mut short_ids: Vec<ShortTransactionID> = Vec::with_capacity(block.transaction_count() - prefilled_transactions_len);
|
||||
let mut prefilled_transactions: Vec<PrefilledTransaction> = Vec::with_capacity(prefilled_transactions_len);
|
||||
let mut prefilled_transactions_size: usize = 0;
|
||||
|
||||
for (transaction_index, (transaction_hash, transaction)) in block.transactions().enumerate() {
|
||||
let transaction_size = transaction.serialized_size();
|
||||
if prefilled_transactions_size + transaction_size < MAX_COMPACT_BLOCK_PREFILLED_SIZE
|
||||
&& prefilled_transactions_indexes.contains(&transaction_index) {
|
||||
prefilled_transactions_size += transaction_size;
|
||||
prefilled_transactions.push(PrefilledTransaction {
|
||||
index: transaction_index,
|
||||
transaction: transaction.clone(),
|
||||
})
|
||||
} else {
|
||||
short_ids.push(short_transaction_id(nonce, block.header(), transaction_hash));
|
||||
}
|
||||
}
|
||||
|
||||
BlockHeaderAndIDs {
|
||||
header: block.header().clone(),
|
||||
nonce: nonce,
|
||||
short_ids: short_ids,
|
||||
prefilled_transactions: prefilled_transactions,
|
||||
}
|
||||
}
|
||||
|
||||
fn short_transaction_id(nonce: u64, block_header: &BlockHeader, transaction_hash: &H256) -> ShortTransactionID {
|
||||
// Short transaction IDs are used to represent a transaction without sending a full 256-bit hash. They are calculated by:
|
||||
// 1) single-SHA256 hashing the block header with the nonce appended (in little-endian)
|
||||
let mut stream = Stream::new();
|
||||
stream.append(block_header);
|
||||
stream.append(&nonce);
|
||||
let block_header_with_nonce_hash = sha256(&stream.out());
|
||||
|
||||
// 2) Running SipHash-2-4 with the input being the transaction ID and the keys (k0/k1) set to the first two little-endian
|
||||
// 64-bit integers from the above hash, respectively.
|
||||
let key0 = LittleEndian::read_u64(&block_header_with_nonce_hash[0..8]);
|
||||
let key1 = LittleEndian::read_u64(&block_header_with_nonce_hash[8..16]);
|
||||
let siphash_transaction_hash = siphash24(key0, key1, &**transaction_hash);
|
||||
|
||||
// 3) Dropping the 2 most significant bytes from the SipHash output to make it 6 bytes.
|
||||
let mut siphash_transaction_hash_bytes = [0u8; 8];
|
||||
LittleEndian::write_u64(&mut siphash_transaction_hash_bytes, siphash_transaction_hash);
|
||||
|
||||
siphash_transaction_hash_bytes[2..8].into()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#[test]
|
||||
fn short_transaction_id_is_correct() {
|
||||
// TODO
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compact_block_is_built_correctly() {
|
||||
// TODO
|
||||
}
|
||||
}
|
|
@ -123,7 +123,7 @@ impl ConnectionFilter {
|
|||
}
|
||||
|
||||
/// Check if transaction should be sent to this connection && optionally update filter
|
||||
pub fn filter_transaction(&mut self, transaction_hash: &H256, transaction: &Transaction, transaction_fee_rate: u64) -> bool {
|
||||
pub fn filter_transaction(&mut self, transaction_hash: &H256, transaction: &Transaction, transaction_fee_rate: Option<u64>) -> bool {
|
||||
// check if transaction is known
|
||||
if self.last_transactions.contains_key(transaction_hash) {
|
||||
return false;
|
||||
|
@ -131,10 +131,12 @@ impl ConnectionFilter {
|
|||
|
||||
// check if transaction fee rate is high enough for this peer
|
||||
if let Some(fee_rate) = self.fee_rate {
|
||||
if let Some(transaction_fee_rate) = transaction_fee_rate {
|
||||
if transaction_fee_rate < fee_rate {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check with bloom filter, if set
|
||||
self.filter_transaction_with_bloom(transaction_hash, transaction)
|
||||
|
@ -520,13 +522,13 @@ pub mod tests {
|
|||
|
||||
let mut filter = ConnectionFilter::with_filterload(&default_filterload());
|
||||
|
||||
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
|
||||
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
|
||||
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, None));
|
||||
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None));
|
||||
|
||||
filter.add(&make_filteradd(&*tx1.hash()));
|
||||
|
||||
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
|
||||
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
|
||||
assert!(filter.filter_transaction(&tx1.hash(), &tx1, None));
|
||||
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -539,13 +541,13 @@ pub mod tests {
|
|||
|
||||
let mut filter = ConnectionFilter::with_filterload(&default_filterload());
|
||||
|
||||
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
|
||||
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
|
||||
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, None));
|
||||
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None));
|
||||
|
||||
filter.add(&make_filteradd(&tx1_out_data));
|
||||
|
||||
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
|
||||
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
|
||||
assert!(filter.filter_transaction(&tx1.hash(), &tx1, None));
|
||||
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -557,13 +559,13 @@ pub mod tests {
|
|||
|
||||
let mut filter = ConnectionFilter::with_filterload(&default_filterload());
|
||||
|
||||
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
|
||||
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
|
||||
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, None));
|
||||
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None));
|
||||
|
||||
filter.add(&make_filteradd(&tx1_previous_output));
|
||||
|
||||
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
|
||||
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
|
||||
assert!(filter.filter_transaction(&tx1.hash(), &tx1, None));
|
||||
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -576,13 +578,13 @@ pub mod tests {
|
|||
|
||||
let mut filter = ConnectionFilter::with_filterload(&default_filterload());
|
||||
|
||||
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
|
||||
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
|
||||
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, None));
|
||||
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None));
|
||||
|
||||
filter.add(&make_filteradd(&tx1_input_data));
|
||||
|
||||
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
|
||||
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
|
||||
assert!(filter.filter_transaction(&tx1.hash(), &tx1, None));
|
||||
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -592,23 +594,23 @@ pub mod tests {
|
|||
|
||||
let mut filter = ConnectionFilter::default();
|
||||
|
||||
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
|
||||
assert!(filter.filter_transaction(&tx2.hash(), &tx2, 2000));
|
||||
assert!(filter.filter_transaction(&tx1.hash(), &tx1, Some(1000)));
|
||||
assert!(filter.filter_transaction(&tx2.hash(), &tx2, Some(2000)));
|
||||
|
||||
filter.set_fee_rate(1500);
|
||||
|
||||
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
|
||||
assert!(filter.filter_transaction(&tx2.hash(), &tx2, 2000));
|
||||
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, Some(1000)));
|
||||
assert!(filter.filter_transaction(&tx2.hash(), &tx2, Some(2000)));
|
||||
|
||||
filter.set_fee_rate(3000);
|
||||
|
||||
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
|
||||
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 2000));
|
||||
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, Some(1000)));
|
||||
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, Some(2000)));
|
||||
|
||||
filter.set_fee_rate(0);
|
||||
|
||||
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
|
||||
assert!(filter.filter_transaction(&tx2.hash(), &tx2, 2000));
|
||||
assert!(filter.filter_transaction(&tx1.hash(), &tx1, Some(1000)));
|
||||
assert!(filter.filter_transaction(&tx2.hash(), &tx2, Some(2000)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
extern crate bitcrypto;
|
||||
extern crate byteorder;
|
||||
extern crate chain;
|
||||
extern crate db;
|
||||
#[macro_use]
|
||||
|
@ -25,6 +27,7 @@ extern crate network;
|
|||
|
||||
mod best_headers_chain;
|
||||
mod blocks_writer;
|
||||
mod compact_block_builder;
|
||||
mod connection_filter;
|
||||
mod hash_queue;
|
||||
mod inbound_connection;
|
||||
|
|
|
@ -5,7 +5,7 @@ use db;
|
|||
use p2p::OutboundSyncConnectionRef;
|
||||
use message::common::{InventoryType, InventoryVector};
|
||||
use message::types;
|
||||
use synchronization_client::{Client, SynchronizationClient};
|
||||
use synchronization_client::{Client, SynchronizationClient, BlockAnnouncementType};
|
||||
use synchronization_executor::{Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor, LocalSynchronizationTaskExecutor};
|
||||
use synchronization_server::{Server, SynchronizationServer};
|
||||
use synchronization_verifier::AsyncVerifier;
|
||||
|
@ -183,7 +183,7 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
|
|||
|
||||
pub fn on_peer_sendheaders(&self, peer_index: usize, _message: types::SendHeaders) {
|
||||
trace!(target: "sync", "Got `sendheaders` message from peer#{}", peer_index);
|
||||
self.client.lock().on_peer_sendheaders(peer_index);
|
||||
self.client.lock().on_peer_block_announcement_type(peer_index, BlockAnnouncementType::SendHeader);
|
||||
}
|
||||
|
||||
pub fn on_peer_feefilter(&self, peer_index: usize, message: types::FeeFilter) {
|
||||
|
@ -191,8 +191,23 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
|
|||
self.client.lock().on_peer_feefilter(peer_index, &message);
|
||||
}
|
||||
|
||||
pub fn on_peer_send_compact(&self, peer_index: usize, _message: types::SendCompact) {
|
||||
pub fn on_peer_send_compact(&self, peer_index: usize, message: types::SendCompact) {
|
||||
trace!(target: "sync", "Got `sendcmpct` message from peer#{}", peer_index);
|
||||
|
||||
// The second integer SHALL be interpreted as a little-endian version number. Nodes sending a sendcmpct message MUST currently set this value to 1.
|
||||
// TODO: version 2 supports segregated witness transactions
|
||||
if message.second != 1 {
|
||||
return;
|
||||
}
|
||||
|
||||
// Upon receipt of a "sendcmpct" message with the first and second integers set to 1, the node SHOULD announce new blocks by sending a cmpctblock message.
|
||||
if message.first {
|
||||
self.client.lock().on_peer_block_announcement_type(peer_index, BlockAnnouncementType::SendCompactBlock);
|
||||
}
|
||||
// else:
|
||||
// Upon receipt of a "sendcmpct" message with the first integer set to 0, the node SHOULD NOT announce new blocks by sending a cmpctblock message,
|
||||
// but SHOULD announce new blocks by sending invs or headers, as defined by BIP130.
|
||||
// => work as before
|
||||
}
|
||||
|
||||
pub fn on_peer_compact_block(&self, peer_index: usize, _message: types::CompactBlock) {
|
||||
|
|
|
@ -25,6 +25,7 @@ use synchronization_manager::{manage_synchronization_peers_blocks, manage_synchr
|
|||
manage_unknown_orphaned_blocks, manage_orphaned_transactions, MANAGEMENT_INTERVAL_MS,
|
||||
ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig};
|
||||
use synchronization_verifier::{Verifier, VerificationSink};
|
||||
use compact_block_builder::build_compact_block;
|
||||
use hash_queue::HashPosition;
|
||||
use miner::transaction_fee_rate;
|
||||
use time;
|
||||
|
@ -195,7 +196,7 @@ pub trait Client : Send + 'static {
|
|||
fn on_peer_filterload(&mut self, peer_index: usize, message: &types::FilterLoad);
|
||||
fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd);
|
||||
fn on_peer_filterclear(&mut self, peer_index: usize);
|
||||
fn on_peer_sendheaders(&mut self, peer_index: usize);
|
||||
fn on_peer_block_announcement_type(&mut self, peer_index: usize, announcement_type: BlockAnnouncementType);
|
||||
fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter);
|
||||
fn on_peer_disconnected(&mut self, peer_index: usize);
|
||||
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>);
|
||||
|
@ -216,7 +217,7 @@ pub trait ClientCore : VerificationSink {
|
|||
fn on_peer_filterload(&mut self, peer_index: usize, message: &types::FilterLoad);
|
||||
fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd);
|
||||
fn on_peer_filterclear(&mut self, peer_index: usize);
|
||||
fn on_peer_sendheaders(&mut self, peer_index: usize);
|
||||
fn on_peer_block_announcement_type(&mut self, peer_index: usize, announcement_type: BlockAnnouncementType);
|
||||
fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter);
|
||||
fn on_peer_disconnected(&mut self, peer_index: usize);
|
||||
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>);
|
||||
|
@ -243,6 +244,17 @@ pub struct FilteredInventory {
|
|||
pub notfound: Vec<InventoryVector>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
/// New block announcement type
|
||||
pub enum BlockAnnouncementType {
|
||||
/// Send inventory with block hash
|
||||
SendInventory,
|
||||
/// Send block header
|
||||
SendHeader,
|
||||
/// Send compact block
|
||||
SendCompactBlock,
|
||||
}
|
||||
|
||||
/// Synchronization client facade
|
||||
pub struct SynchronizationClient<T: TaskExecutor, U: Verifier> {
|
||||
/// Client core
|
||||
|
@ -400,8 +412,8 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
|
|||
self.core.lock().on_peer_filterclear(peer_index);
|
||||
}
|
||||
|
||||
fn on_peer_sendheaders(&mut self, peer_index: usize) {
|
||||
self.core.lock().on_peer_sendheaders(peer_index);
|
||||
fn on_peer_block_announcement_type(&mut self, peer_index: usize, announcement_type: BlockAnnouncementType) {
|
||||
self.core.lock().on_peer_block_announcement_type(peer_index, announcement_type);
|
||||
}
|
||||
|
||||
fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter) {
|
||||
|
@ -637,10 +649,10 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
|
|||
}
|
||||
}
|
||||
|
||||
/// Peer wants to get blocks headers instead of blocks hashes when announcing new blocks
|
||||
fn on_peer_sendheaders(&mut self, peer_index: usize) {
|
||||
/// Change the way peer is informed about new blocks
|
||||
fn on_peer_block_announcement_type(&mut self, peer_index: usize, announcement_type: BlockAnnouncementType) {
|
||||
if self.peers.is_known_peer(peer_index) {
|
||||
self.peers.on_peer_sendheaders(peer_index);
|
||||
self.peers.set_block_announcement_type(peer_index, announcement_type);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -947,9 +959,10 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
|||
let tasks: Vec<_> = {
|
||||
self.peers.all_peers().into_iter()
|
||||
.filter_map(|peer_index| {
|
||||
let send_headers = self.peers.send_headers(peer_index);
|
||||
let block_announcement_type = self.peers.block_announcement_type(peer_index);
|
||||
|
||||
if send_headers {
|
||||
match block_announcement_type {
|
||||
BlockAnnouncementType::SendHeader => {
|
||||
let filtered_blocks_hashes: Vec<_> = new_blocks_hashes.iter()
|
||||
.filter(|h| self.peers.filter(peer_index).filter_block(h))
|
||||
.collect();
|
||||
|
@ -963,7 +976,32 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
|||
else {
|
||||
None
|
||||
}
|
||||
},
|
||||
BlockAnnouncementType::SendCompactBlock => {
|
||||
let indexed_blocks: Vec<db::IndexedBlock> = {
|
||||
let chain = self.chain.read();
|
||||
new_blocks_hashes.iter()
|
||||
.filter_map(|h| chain.storage().block(db::BlockRef::Hash(h.clone())))
|
||||
.map(|b| b.into())
|
||||
.collect()
|
||||
};
|
||||
|
||||
let block_header_and_ids: Vec<_> = indexed_blocks.into_iter()
|
||||
.filter_map(|b| if self.peers.filter(peer_index).filter_block(&b.hash()) {
|
||||
let prefilled_transactions_indexes = b.transactions().enumerate()
|
||||
// we do not filter by fee rate here, because it only reasonable for non-mined transactions
|
||||
.filter(|&(_, (h, t))| self.peers.filter_mut(peer_index).filter_transaction(h, t, None))
|
||||
.map(|(idx, _)| idx)
|
||||
.collect();
|
||||
Some(build_compact_block(b, prefilled_transactions_indexes))
|
||||
} else {
|
||||
None
|
||||
})
|
||||
.collect();
|
||||
|
||||
Some(Task::SendCompactBlocks(peer_index, block_header_and_ids, ServerTaskIndex::None))
|
||||
},
|
||||
BlockAnnouncementType::SendInventory => {
|
||||
let inventory: Vec<_> = new_blocks_hashes.iter()
|
||||
.filter(|h| self.peers.filter(peer_index).filter_block(h))
|
||||
.map(|h| InventoryVector {
|
||||
|
@ -976,6 +1014,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
|||
} else {
|
||||
None
|
||||
}
|
||||
},
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
|
@ -993,7 +1032,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
|||
.filter_map(|peer_index| {
|
||||
let inventory: Vec<_> = new_transactions.iter()
|
||||
.filter(|&&(ref h, tx, tx_fee_rate)| {
|
||||
self.peers.filter_mut(peer_index).filter_transaction(h, tx, tx_fee_rate)
|
||||
self.peers.filter_mut(peer_index).filter_transaction(h, tx, Some(tx_fee_rate))
|
||||
})
|
||||
.map(|&(ref h, _, _)| InventoryVector {
|
||||
inv_type: InventoryType::MessageTx,
|
||||
|
@ -1305,7 +1344,7 @@ pub mod tests {
|
|||
use chain::{Block, Transaction};
|
||||
use message::common::{InventoryVector, InventoryType};
|
||||
use message::types;
|
||||
use super::{Client, Config, SynchronizationClient, SynchronizationClientCore};
|
||||
use super::{Client, Config, SynchronizationClient, SynchronizationClientCore, BlockAnnouncementType};
|
||||
use connection_filter::tests::*;
|
||||
use synchronization_executor::Task;
|
||||
use synchronization_chain::{Chain, ChainRef};
|
||||
|
@ -2148,7 +2187,7 @@ pub mod tests {
|
|||
let mut sync = sync.lock();
|
||||
sync.on_peer_connected(1);
|
||||
sync.on_peer_connected(2);
|
||||
sync.on_peer_sendheaders(2);
|
||||
sync.on_peer_block_announcement_type(2, BlockAnnouncementType::SendHeader);
|
||||
sync.on_peer_connected(3);
|
||||
|
||||
// igonore tasks
|
||||
|
@ -2225,4 +2264,32 @@ pub mod tests {
|
|||
// should not panic here
|
||||
sync.on_peer_block(2, test_data::block_h2());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn relay_new_block_after_sendcmpct() {
|
||||
let (_, _, executor, _, sync) = create_sync(None, None);
|
||||
let genesis = test_data::genesis();
|
||||
let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build();
|
||||
|
||||
let mut sync = sync.lock();
|
||||
sync.on_peer_connected(1);
|
||||
sync.on_peer_connected(2);
|
||||
sync.on_peer_block_announcement_type(2, BlockAnnouncementType::SendCompactBlock);
|
||||
sync.on_peer_connected(3);
|
||||
|
||||
// igonore tasks
|
||||
{ executor.lock().take_tasks(); }
|
||||
|
||||
sync.on_peer_block(1, b0.clone());
|
||||
|
||||
let tasks = executor.lock().take_tasks();
|
||||
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: b0.hash() }];
|
||||
assert_eq!(tasks.len(), 3);
|
||||
assert_eq!(tasks[0], Task::RequestBlocksHeaders(1));
|
||||
match tasks[1] {
|
||||
Task::SendCompactBlocks(2, _, _) => (),
|
||||
_ => panic!("unexpected task"),
|
||||
}
|
||||
assert_eq!(tasks[2], Task::SendInventory(3, inventory, ServerTaskIndex::None));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@ use std::sync::Arc;
|
|||
use std::collections::HashMap;
|
||||
use parking_lot::Mutex;
|
||||
use chain::{Block, BlockHeader, Transaction};
|
||||
use message::common::{InventoryVector, InventoryType};
|
||||
use message::common::{InventoryVector, InventoryType, BlockHeaderAndIDs};
|
||||
use message::types;
|
||||
use primitives::hash::H256;
|
||||
use p2p::OutboundSyncConnectionRef;
|
||||
|
@ -17,6 +17,7 @@ pub trait TaskExecutor : Send + 'static {
|
|||
fn execute(&mut self, task: Task);
|
||||
}
|
||||
|
||||
// TODO: get rid of unneeded ServerTaskIndex-es
|
||||
/// Synchronization task for the peer.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum Task {
|
||||
|
@ -40,6 +41,8 @@ pub enum Task {
|
|||
SendInventory(usize, Vec<InventoryVector>, ServerTaskIndex),
|
||||
/// Send headers
|
||||
SendHeaders(usize, Vec<BlockHeader>, ServerTaskIndex),
|
||||
/// Send compact blocks
|
||||
SendCompactBlocks(usize, Vec<BlockHeaderAndIDs>, ServerTaskIndex),
|
||||
/// Notify io about ignored request
|
||||
Ignore(usize, u32),
|
||||
}
|
||||
|
@ -187,6 +190,17 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
|
|||
}
|
||||
}
|
||||
},
|
||||
Task::SendCompactBlocks(peer_index, compact_blocks, id) => {
|
||||
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
||||
assert_eq!(id.raw(), None);
|
||||
for compact_block in compact_blocks {
|
||||
trace!(target: "sync", "Sending compact_block {:?} to peer#{}", compact_block.header.hash(), peer_index);
|
||||
connection.send_compact_block(&types::CompactBlock {
|
||||
header: compact_block,
|
||||
});
|
||||
}
|
||||
}
|
||||
},
|
||||
Task::Ignore(peer_index, id) => {
|
||||
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
||||
trace!(target: "sync", "Ignoring request from peer#{} with id {}", peer_index, id);
|
||||
|
|
|
@ -4,6 +4,7 @@ use primitives::hash::H256;
|
|||
use linked_hash_map::LinkedHashMap;
|
||||
use time::precise_time_s;
|
||||
use connection_filter::ConnectionFilter;
|
||||
use synchronization_client::BlockAnnouncementType;
|
||||
|
||||
/// Max peer failures # before excluding from sync process
|
||||
const MAX_PEER_FAILURES: usize = 2;
|
||||
|
@ -27,8 +28,8 @@ pub struct Peers {
|
|||
inventory_requests_order: LinkedHashMap<usize, f64>,
|
||||
/// Peer connections filters.
|
||||
filters: HashMap<usize, ConnectionFilter>,
|
||||
/// Flags, informing that peer wants `headers` message instead of `inventory` when announcing new blocks
|
||||
send_headers: HashSet<usize>,
|
||||
/// The way peer is informed about new blocks
|
||||
block_announcement_types: HashMap<usize, BlockAnnouncementType>,
|
||||
}
|
||||
|
||||
/// Information on synchronization peers
|
||||
|
@ -54,7 +55,7 @@ impl Peers {
|
|||
inventory_requests: HashSet::new(),
|
||||
inventory_requests_order: LinkedHashMap::new(),
|
||||
filters: HashMap::new(),
|
||||
send_headers: HashSet::new(),
|
||||
block_announcement_types: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -160,10 +161,10 @@ impl Peers {
|
|||
self.filters.entry(peer_index).or_insert_with(ConnectionFilter::default)
|
||||
}
|
||||
|
||||
/// Does peer wants `headers` message instead of `inventory` when announcing new blocks
|
||||
pub fn send_headers(&self, peer_index: usize) -> bool {
|
||||
assert!(self.is_known_peer(peer_index));
|
||||
self.send_headers.contains(&peer_index)
|
||||
/// Get the way peer is informed about new blocks
|
||||
pub fn block_announcement_type(&self, peer_index: usize) -> BlockAnnouncementType {
|
||||
self.block_announcement_types.get(&peer_index).cloned()
|
||||
.unwrap_or(BlockAnnouncementType::SendInventory)
|
||||
}
|
||||
|
||||
/// Mark peer as useful.
|
||||
|
@ -193,9 +194,9 @@ impl Peers {
|
|||
self.inventory_requests_order.remove(&peer_index);
|
||||
}
|
||||
|
||||
/// Peer wants `headers` message instead of `inventory` when announcing new blocks
|
||||
pub fn on_peer_sendheaders(&mut self, peer_index: usize) {
|
||||
self.send_headers.insert(peer_index);
|
||||
/// Change the way peer is informed about new blocks
|
||||
pub fn set_block_announcement_type(&mut self, peer_index: usize, announcement_type: BlockAnnouncementType) {
|
||||
self.block_announcement_types.insert(peer_index, announcement_type);
|
||||
}
|
||||
|
||||
/// Peer wants to limit transaction announcing by transaction fee
|
||||
|
@ -214,7 +215,7 @@ impl Peers {
|
|||
self.inventory_requests.remove(&peer_index);
|
||||
self.inventory_requests_order.remove(&peer_index);
|
||||
self.filters.remove(&peer_index);
|
||||
self.send_headers.remove(&peer_index);
|
||||
self.block_announcement_types.remove(&peer_index);
|
||||
peer_blocks_requests
|
||||
.map(|hs| hs.into_iter().collect())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue