Merge pull request #202 from ethcore/sync_cmpctblk

Process cmpctblk message
This commit is contained in:
Nikolay Volf 2016-11-27 22:09:50 +03:00 committed by GitHub
commit e5afbf3877
12 changed files with 296 additions and 84 deletions

9
Cargo.lock generated
View File

@ -89,6 +89,7 @@ version = "0.1.0"
dependencies = [
"primitives 0.1.0",
"rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)",
"siphasher 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -657,6 +658,11 @@ dependencies = [
"winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "siphasher"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "slab"
version = "0.3.0"
@ -677,6 +683,8 @@ name = "sync"
version = "0.1.0"
dependencies = [
"bit-vec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"bitcrypto 0.1.0",
"byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"chain 0.1.0",
"db 0.1.0",
"ethcore-devtools 1.3.0",
@ -870,6 +878,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f417c22df063e9450888a7561788e9bd46d3bb3c1466435b4eccb903807f147d"
"checksum semver 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d4f410fedcf71af0345d7607d246e7ad15faaadd49d240ee3b24e5dc21a820ac"
"checksum shell32-sys 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "72f20b8f3c060374edb8046591ba28f62448c369ccbdc7b02075103fb3a9e38d"
"checksum siphasher 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b1c3c58c9ac43c530919fe6bd8ef11ae2612f64c2bf8eab9346f5b71ce0617f2"
"checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23"
"checksum smallvec 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "fcc8d19212aacecf95e4a7a2179b26f7aeb9732a915cf01f05b0d3e044865410"
"checksum strsim 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "50c069df92e4b01425a8bf3576d5d417943a6a7272fbabaf5bd80b1aaa76442e"

View File

@ -5,4 +5,5 @@ authors = ["debris <marek.kotewicz@gmail.com>"]
[dependencies]
rust-crypto = "0.2.36"
siphasher = "0.1.1"
primitives = { path = "../primitives" }

View File

@ -1,10 +1,13 @@
extern crate crypto as rcrypto;
extern crate primitives;
extern crate siphasher;
use std::hash::Hasher;
use rcrypto::sha1::Sha1;
use rcrypto::sha2::Sha256;
use rcrypto::ripemd160::Ripemd160;
use rcrypto::digest::Digest;
use siphasher::sip::SipHasher24;
use primitives::hash::{H32, H160, H256};
pub struct DHash160 {
@ -146,6 +149,14 @@ pub fn dhash256(input: &[u8]) -> H256 {
result
}
/// SipHash-2-4
#[inline]
pub fn siphash24(key0: u64, key1: u64, input: &[u8]) -> u64 {
let mut hasher = SipHasher24::new_with_keys(key0, key1);
hasher.write(input);
hasher.finish()
}
/// Data checksum
#[inline]
pub fn checksum(data: &[u8]) -> H32 {
@ -157,7 +168,7 @@ pub fn checksum(data: &[u8]) -> H32 {
#[cfg(test)]
mod tests {
use primitives::bytes::Bytes;
use super::{ripemd160, sha1, sha256, dhash160, dhash256, checksum};
use super::{ripemd160, sha1, sha256, dhash160, dhash256, siphash24, checksum};
#[test]
fn test_ripemd160() {
@ -192,12 +203,19 @@ mod tests {
assert_eq!(result, expected);
}
#[test]
fn test_dhash256() {
#[test]
fn test_dhash256() {
let expected = "9595c9df90075148eb06860365df33584b75bff782a510c6cd4883a419833d50".into();
let result = dhash256(b"hello");
assert_eq!(result, expected);
}
}
#[test]
fn test_siphash24() {
let expected = 0x74f839c593dc67fd_u64;
let result = siphash24(0x0706050403020100_u64, 0x0F0E0D0C0B0A0908_u64, &[0; 1]);
assert_eq!(result, expected);
}
#[test]
fn test_checksum() {

View File

@ -28,6 +28,10 @@ impl From<chain::Block> for IndexedBlock {
}
impl IndexedBlock {
pub fn transactions_len(&self) -> usize {
self.transactions.len()
}
pub fn transactions(&self) -> IndexedTransactions {
IndexedTransactions {
position: 0,

View File

@ -15,8 +15,10 @@ ethcore-devtools = { path = "../devtools" }
bit-vec = "0.4.3"
murmur3 = "0.3"
rand = "0.3"
byteorder = "0.5"
chain = { path = "../chain" }
bitcrypto = { path = "../crypto" }
db = { path = "../db" }
message = { path = "../message" }
miner = { path = "../miner" }

View File

@ -0,0 +1,76 @@
use std::collections::HashSet;
use rand::{thread_rng, Rng};
use bitcrypto::{sha256, siphash24};
use byteorder::{LittleEndian, ByteOrder};
use chain::{BlockHeader, ShortTransactionID};
use db::IndexedBlock;
use message::common::{BlockHeaderAndIDs, PrefilledTransaction};
use primitives::hash::H256;
use ser::{Stream, Serializable};
/// Maximum size of prefilled transactions in compact block
const MAX_COMPACT_BLOCK_PREFILLED_SIZE: usize = 10 * 1024;
pub fn build_compact_block(block: IndexedBlock, prefilled_transactions_indexes: HashSet<usize>) -> BlockHeaderAndIDs {
let nonce: u64 = thread_rng().gen();
let prefilled_transactions_len = prefilled_transactions_indexes.len();
let mut short_ids: Vec<ShortTransactionID> = Vec::with_capacity(block.transactions_len() - prefilled_transactions_len);
let mut prefilled_transactions: Vec<PrefilledTransaction> = Vec::with_capacity(prefilled_transactions_len);
let mut prefilled_transactions_size: usize = 0;
for (transaction_index, (transaction_hash, transaction)) in block.transactions().enumerate() {
let transaction_size = transaction.serialized_size();
if prefilled_transactions_size + transaction_size < MAX_COMPACT_BLOCK_PREFILLED_SIZE
&& prefilled_transactions_indexes.contains(&transaction_index) {
prefilled_transactions_size += transaction_size;
prefilled_transactions.push(PrefilledTransaction {
index: transaction_index,
transaction: transaction.clone(),
})
} else {
short_ids.push(short_transaction_id(nonce, block.header(), transaction_hash));
}
}
BlockHeaderAndIDs {
header: block.header().clone(),
nonce: nonce,
short_ids: short_ids,
prefilled_transactions: prefilled_transactions,
}
}
fn short_transaction_id(nonce: u64, block_header: &BlockHeader, transaction_hash: &H256) -> ShortTransactionID {
// Short transaction IDs are used to represent a transaction without sending a full 256-bit hash. They are calculated by:
// 1) single-SHA256 hashing the block header with the nonce appended (in little-endian)
let mut stream = Stream::new();
stream.append(block_header);
stream.append(&nonce);
let block_header_with_nonce_hash = sha256(&stream.out());
// 2) Running SipHash-2-4 with the input being the transaction ID and the keys (k0/k1) set to the first two little-endian
// 64-bit integers from the above hash, respectively.
let key0 = LittleEndian::read_u64(&block_header_with_nonce_hash[0..8]);
let key1 = LittleEndian::read_u64(&block_header_with_nonce_hash[8..16]);
let siphash_transaction_hash = siphash24(key0, key1, &**transaction_hash);
// 3) Dropping the 2 most significant bytes from the SipHash output to make it 6 bytes.
let mut siphash_transaction_hash_bytes = [0u8; 8];
LittleEndian::write_u64(&mut siphash_transaction_hash_bytes, siphash_transaction_hash);
siphash_transaction_hash_bytes[2..8].into()
}
#[cfg(test)]
mod tests {
#[test]
fn short_transaction_id_is_correct() {
// TODO
}
#[test]
fn compact_block_is_built_correctly() {
// TODO
}
}

View File

@ -123,7 +123,7 @@ impl ConnectionFilter {
}
/// Check if transaction should be sent to this connection && optionally update filter
pub fn filter_transaction(&mut self, transaction_hash: &H256, transaction: &Transaction, transaction_fee_rate: u64) -> bool {
pub fn filter_transaction(&mut self, transaction_hash: &H256, transaction: &Transaction, transaction_fee_rate: Option<u64>) -> bool {
// check if transaction is known
if self.last_transactions.contains_key(transaction_hash) {
return false;
@ -131,8 +131,10 @@ impl ConnectionFilter {
// check if transaction fee rate is high enough for this peer
if let Some(fee_rate) = self.fee_rate {
if transaction_fee_rate < fee_rate {
return false;
if let Some(transaction_fee_rate) = transaction_fee_rate {
if transaction_fee_rate < fee_rate {
return false;
}
}
}
@ -520,13 +522,13 @@ pub mod tests {
let mut filter = ConnectionFilter::with_filterload(&default_filterload());
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, None));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None));
filter.add(&make_filteradd(&*tx1.hash()));
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
assert!(filter.filter_transaction(&tx1.hash(), &tx1, None));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None));
}
#[test]
@ -539,13 +541,13 @@ pub mod tests {
let mut filter = ConnectionFilter::with_filterload(&default_filterload());
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, None));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None));
filter.add(&make_filteradd(&tx1_out_data));
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
assert!(filter.filter_transaction(&tx1.hash(), &tx1, None));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None));
}
#[test]
@ -557,13 +559,13 @@ pub mod tests {
let mut filter = ConnectionFilter::with_filterload(&default_filterload());
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, None));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None));
filter.add(&make_filteradd(&tx1_previous_output));
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
assert!(filter.filter_transaction(&tx1.hash(), &tx1, None));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None));
}
#[test]
@ -576,13 +578,13 @@ pub mod tests {
let mut filter = ConnectionFilter::with_filterload(&default_filterload());
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, None));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None));
filter.add(&make_filteradd(&tx1_input_data));
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000));
assert!(filter.filter_transaction(&tx1.hash(), &tx1, None));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None));
}
#[test]
@ -592,23 +594,23 @@ pub mod tests {
let mut filter = ConnectionFilter::default();
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(filter.filter_transaction(&tx2.hash(), &tx2, 2000));
assert!(filter.filter_transaction(&tx1.hash(), &tx1, Some(1000)));
assert!(filter.filter_transaction(&tx2.hash(), &tx2, Some(2000)));
filter.set_fee_rate(1500);
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(filter.filter_transaction(&tx2.hash(), &tx2, 2000));
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, Some(1000)));
assert!(filter.filter_transaction(&tx2.hash(), &tx2, Some(2000)));
filter.set_fee_rate(3000);
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 2000));
assert!(!filter.filter_transaction(&tx1.hash(), &tx1, Some(1000)));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2, Some(2000)));
filter.set_fee_rate(0);
assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000));
assert!(filter.filter_transaction(&tx2.hash(), &tx2, 2000));
assert!(filter.filter_transaction(&tx1.hash(), &tx1, Some(1000)));
assert!(filter.filter_transaction(&tx2.hash(), &tx2, Some(2000)));
}
#[test]

View File

@ -1,3 +1,5 @@
extern crate bitcrypto;
extern crate byteorder;
extern crate chain;
extern crate db;
#[macro_use]
@ -25,6 +27,7 @@ extern crate network;
mod best_headers_chain;
mod blocks_writer;
mod compact_block_builder;
mod connection_filter;
mod hash_queue;
mod inbound_connection;

View File

@ -5,7 +5,7 @@ use db;
use p2p::OutboundSyncConnectionRef;
use message::common::{InventoryType, InventoryVector};
use message::types;
use synchronization_client::{Client, SynchronizationClient};
use synchronization_client::{Client, SynchronizationClient, BlockAnnouncementType};
use synchronization_executor::{Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor, LocalSynchronizationTaskExecutor};
use synchronization_server::{Server, SynchronizationServer};
use synchronization_verifier::AsyncVerifier;
@ -183,7 +183,7 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
pub fn on_peer_sendheaders(&self, peer_index: usize, _message: types::SendHeaders) {
trace!(target: "sync", "Got `sendheaders` message from peer#{}", peer_index);
self.client.lock().on_peer_sendheaders(peer_index);
self.client.lock().on_peer_block_announcement_type(peer_index, BlockAnnouncementType::SendHeader);
}
pub fn on_peer_feefilter(&self, peer_index: usize, message: types::FeeFilter) {
@ -191,8 +191,23 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
self.client.lock().on_peer_feefilter(peer_index, &message);
}
pub fn on_peer_send_compact(&self, peer_index: usize, _message: types::SendCompact) {
pub fn on_peer_send_compact(&self, peer_index: usize, message: types::SendCompact) {
trace!(target: "sync", "Got `sendcmpct` message from peer#{}", peer_index);
// The second integer SHALL be interpreted as a little-endian version number. Nodes sending a sendcmpct message MUST currently set this value to 1.
// TODO: version 2 supports segregated witness transactions
if message.second != 1 {
return;
}
// Upon receipt of a "sendcmpct" message with the first and second integers set to 1, the node SHOULD announce new blocks by sending a cmpctblock message.
if message.first {
self.client.lock().on_peer_block_announcement_type(peer_index, BlockAnnouncementType::SendCompactBlock);
}
// else:
// Upon receipt of a "sendcmpct" message with the first integer set to 0, the node SHOULD NOT announce new blocks by sending a cmpctblock message,
// but SHOULD announce new blocks by sending invs or headers, as defined by BIP130.
// => work as before
}
pub fn on_peer_compact_block(&self, peer_index: usize, _message: types::CompactBlock) {

View File

@ -25,6 +25,7 @@ use synchronization_manager::{manage_synchronization_peers_blocks, manage_synchr
manage_unknown_orphaned_blocks, manage_orphaned_transactions, MANAGEMENT_INTERVAL_MS,
ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig};
use synchronization_verifier::{Verifier, VerificationSink};
use compact_block_builder::build_compact_block;
use hash_queue::HashPosition;
use miner::transaction_fee_rate;
use time;
@ -195,7 +196,7 @@ pub trait Client : Send + 'static {
fn on_peer_filterload(&mut self, peer_index: usize, message: &types::FilterLoad);
fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd);
fn on_peer_filterclear(&mut self, peer_index: usize);
fn on_peer_sendheaders(&mut self, peer_index: usize);
fn on_peer_block_announcement_type(&mut self, peer_index: usize, announcement_type: BlockAnnouncementType);
fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter);
fn on_peer_disconnected(&mut self, peer_index: usize);
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>);
@ -216,7 +217,7 @@ pub trait ClientCore : VerificationSink {
fn on_peer_filterload(&mut self, peer_index: usize, message: &types::FilterLoad);
fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd);
fn on_peer_filterclear(&mut self, peer_index: usize);
fn on_peer_sendheaders(&mut self, peer_index: usize);
fn on_peer_block_announcement_type(&mut self, peer_index: usize, announcement_type: BlockAnnouncementType);
fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter);
fn on_peer_disconnected(&mut self, peer_index: usize);
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>);
@ -243,6 +244,17 @@ pub struct FilteredInventory {
pub notfound: Vec<InventoryVector>,
}
#[derive(Debug, Clone, Copy)]
/// New block announcement type
pub enum BlockAnnouncementType {
/// Send inventory with block hash
SendInventory,
/// Send block header
SendHeader,
/// Send compact block
SendCompactBlock,
}
/// Synchronization client facade
pub struct SynchronizationClient<T: TaskExecutor, U: Verifier> {
/// Client core
@ -400,8 +412,8 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
self.core.lock().on_peer_filterclear(peer_index);
}
fn on_peer_sendheaders(&mut self, peer_index: usize) {
self.core.lock().on_peer_sendheaders(peer_index);
fn on_peer_block_announcement_type(&mut self, peer_index: usize, announcement_type: BlockAnnouncementType) {
self.core.lock().on_peer_block_announcement_type(peer_index, announcement_type);
}
fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter) {
@ -637,10 +649,10 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
}
}
/// Peer wants to get blocks headers instead of blocks hashes when announcing new blocks
fn on_peer_sendheaders(&mut self, peer_index: usize) {
/// Change the way peer is informed about new blocks
fn on_peer_block_announcement_type(&mut self, peer_index: usize, announcement_type: BlockAnnouncementType) {
if self.peers.is_known_peer(peer_index) {
self.peers.on_peer_sendheaders(peer_index);
self.peers.set_block_announcement_type(peer_index, announcement_type);
}
}
@ -947,35 +959,62 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
let tasks: Vec<_> = {
self.peers.all_peers().into_iter()
.filter_map(|peer_index| {
let send_headers = self.peers.send_headers(peer_index);
let block_announcement_type = self.peers.block_announcement_type(peer_index);
if send_headers {
let filtered_blocks_hashes: Vec<_> = new_blocks_hashes.iter()
.filter(|h| self.peers.filter(peer_index).filter_block(h))
.collect();
let chain = self.chain.read();
let headers: Vec<_> = filtered_blocks_hashes.into_iter()
.filter_map(|h| chain.block_header_by_hash(&h))
.collect();
if !headers.is_empty() {
Some(Task::SendHeaders(peer_index, headers, ServerTaskIndex::None))
}
else {
None
}
} else {
let inventory: Vec<_> = new_blocks_hashes.iter()
.filter(|h| self.peers.filter(peer_index).filter_block(h))
.map(|h| InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: h.clone(),
})
.collect();
if !inventory.is_empty() {
Some(Task::SendInventory(peer_index, inventory, ServerTaskIndex::None))
} else {
None
}
match block_announcement_type {
BlockAnnouncementType::SendHeader => {
let filtered_blocks_hashes: Vec<_> = new_blocks_hashes.iter()
.filter(|h| self.peers.filter(peer_index).filter_block(h))
.collect();
let chain = self.chain.read();
let headers: Vec<_> = filtered_blocks_hashes.into_iter()
.filter_map(|h| chain.block_header_by_hash(&h))
.collect();
if !headers.is_empty() {
Some(Task::SendHeaders(peer_index, headers, ServerTaskIndex::None))
}
else {
None
}
},
BlockAnnouncementType::SendCompactBlock => {
let indexed_blocks: Vec<db::IndexedBlock> = {
let chain = self.chain.read();
new_blocks_hashes.iter()
.filter_map(|h| chain.storage().block(db::BlockRef::Hash(h.clone())))
.map(|b| b.into())
.collect()
};
let block_header_and_ids: Vec<_> = indexed_blocks.into_iter()
.filter_map(|b| if self.peers.filter(peer_index).filter_block(&b.hash()) {
let prefilled_transactions_indexes = b.transactions().enumerate()
// we do not filter by fee rate here, because it only reasonable for non-mined transactions
.filter(|&(_, (h, t))| self.peers.filter_mut(peer_index).filter_transaction(h, t, None))
.map(|(idx, _)| idx)
.collect();
Some(build_compact_block(b, prefilled_transactions_indexes))
} else {
None
})
.collect();
Some(Task::SendCompactBlocks(peer_index, block_header_and_ids, ServerTaskIndex::None))
},
BlockAnnouncementType::SendInventory => {
let inventory: Vec<_> = new_blocks_hashes.iter()
.filter(|h| self.peers.filter(peer_index).filter_block(h))
.map(|h| InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: h.clone(),
})
.collect();
if !inventory.is_empty() {
Some(Task::SendInventory(peer_index, inventory, ServerTaskIndex::None))
} else {
None
}
},
}
})
.collect()
@ -993,7 +1032,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
.filter_map(|peer_index| {
let inventory: Vec<_> = new_transactions.iter()
.filter(|&&(ref h, tx, tx_fee_rate)| {
self.peers.filter_mut(peer_index).filter_transaction(h, tx, tx_fee_rate)
self.peers.filter_mut(peer_index).filter_transaction(h, tx, Some(tx_fee_rate))
})
.map(|&(ref h, _, _)| InventoryVector {
inv_type: InventoryType::MessageTx,
@ -1305,7 +1344,7 @@ pub mod tests {
use chain::{Block, Transaction};
use message::common::{InventoryVector, InventoryType};
use message::types;
use super::{Client, Config, SynchronizationClient, SynchronizationClientCore};
use super::{Client, Config, SynchronizationClient, SynchronizationClientCore, BlockAnnouncementType};
use connection_filter::tests::*;
use synchronization_executor::Task;
use synchronization_chain::{Chain, ChainRef};
@ -2148,7 +2187,7 @@ pub mod tests {
let mut sync = sync.lock();
sync.on_peer_connected(1);
sync.on_peer_connected(2);
sync.on_peer_sendheaders(2);
sync.on_peer_block_announcement_type(2, BlockAnnouncementType::SendHeader);
sync.on_peer_connected(3);
// igonore tasks
@ -2225,4 +2264,32 @@ pub mod tests {
// should not panic here
sync.on_peer_block(2, test_data::block_h2());
}
#[test]
fn relay_new_block_after_sendcmpct() {
let (_, _, executor, _, sync) = create_sync(None, None);
let genesis = test_data::genesis();
let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build();
let mut sync = sync.lock();
sync.on_peer_connected(1);
sync.on_peer_connected(2);
sync.on_peer_block_announcement_type(2, BlockAnnouncementType::SendCompactBlock);
sync.on_peer_connected(3);
// igonore tasks
{ executor.lock().take_tasks(); }
sync.on_peer_block(1, b0.clone());
let tasks = executor.lock().take_tasks();
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: b0.hash() }];
assert_eq!(tasks.len(), 3);
assert_eq!(tasks[0], Task::RequestBlocksHeaders(1));
match tasks[1] {
Task::SendCompactBlocks(2, _, _) => (),
_ => panic!("unexpected task"),
}
assert_eq!(tasks[2], Task::SendInventory(3, inventory, ServerTaskIndex::None));
}
}

View File

@ -2,7 +2,7 @@ use std::sync::Arc;
use std::collections::HashMap;
use parking_lot::Mutex;
use chain::{Block, BlockHeader, Transaction};
use message::common::{InventoryVector, InventoryType};
use message::common::{InventoryVector, InventoryType, BlockHeaderAndIDs};
use message::types;
use primitives::hash::H256;
use p2p::OutboundSyncConnectionRef;
@ -17,6 +17,7 @@ pub trait TaskExecutor : Send + 'static {
fn execute(&mut self, task: Task);
}
// TODO: get rid of unneeded ServerTaskIndex-es
/// Synchronization task for the peer.
#[derive(Debug, PartialEq)]
pub enum Task {
@ -40,6 +41,8 @@ pub enum Task {
SendInventory(usize, Vec<InventoryVector>, ServerTaskIndex),
/// Send headers
SendHeaders(usize, Vec<BlockHeader>, ServerTaskIndex),
/// Send compact blocks
SendCompactBlocks(usize, Vec<BlockHeaderAndIDs>, ServerTaskIndex),
/// Notify io about ignored request
Ignore(usize, u32),
}
@ -187,6 +190,17 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
}
}
},
Task::SendCompactBlocks(peer_index, compact_blocks, id) => {
if let Some(connection) = self.peers.get_mut(&peer_index) {
assert_eq!(id.raw(), None);
for compact_block in compact_blocks {
trace!(target: "sync", "Sending compact_block {:?} to peer#{}", compact_block.header.hash(), peer_index);
connection.send_compact_block(&types::CompactBlock {
header: compact_block,
});
}
}
},
Task::Ignore(peer_index, id) => {
if let Some(connection) = self.peers.get_mut(&peer_index) {
trace!(target: "sync", "Ignoring request from peer#{} with id {}", peer_index, id);

View File

@ -4,6 +4,7 @@ use primitives::hash::H256;
use linked_hash_map::LinkedHashMap;
use time::precise_time_s;
use connection_filter::ConnectionFilter;
use synchronization_client::BlockAnnouncementType;
/// Max peer failures # before excluding from sync process
const MAX_PEER_FAILURES: usize = 2;
@ -27,8 +28,8 @@ pub struct Peers {
inventory_requests_order: LinkedHashMap<usize, f64>,
/// Peer connections filters.
filters: HashMap<usize, ConnectionFilter>,
/// Flags, informing that peer wants `headers` message instead of `inventory` when announcing new blocks
send_headers: HashSet<usize>,
/// The way peer is informed about new blocks
block_announcement_types: HashMap<usize, BlockAnnouncementType>,
}
/// Information on synchronization peers
@ -54,7 +55,7 @@ impl Peers {
inventory_requests: HashSet::new(),
inventory_requests_order: LinkedHashMap::new(),
filters: HashMap::new(),
send_headers: HashSet::new(),
block_announcement_types: HashMap::new(),
}
}
@ -160,10 +161,10 @@ impl Peers {
self.filters.entry(peer_index).or_insert_with(ConnectionFilter::default)
}
/// Does peer wants `headers` message instead of `inventory` when announcing new blocks
pub fn send_headers(&self, peer_index: usize) -> bool {
assert!(self.is_known_peer(peer_index));
self.send_headers.contains(&peer_index)
/// Get the way peer is informed about new blocks
pub fn block_announcement_type(&self, peer_index: usize) -> BlockAnnouncementType {
self.block_announcement_types.get(&peer_index).cloned()
.unwrap_or(BlockAnnouncementType::SendInventory)
}
/// Mark peer as useful.
@ -193,9 +194,9 @@ impl Peers {
self.inventory_requests_order.remove(&peer_index);
}
/// Peer wants `headers` message instead of `inventory` when announcing new blocks
pub fn on_peer_sendheaders(&mut self, peer_index: usize) {
self.send_headers.insert(peer_index);
/// Change the way peer is informed about new blocks
pub fn set_block_announcement_type(&mut self, peer_index: usize, announcement_type: BlockAnnouncementType) {
self.block_announcement_types.insert(peer_index, announcement_type);
}
/// Peer wants to limit transaction announcing by transaction fee
@ -214,7 +215,7 @@ impl Peers {
self.inventory_requests.remove(&peer_index);
self.inventory_requests_order.remove(&peer_index);
self.filters.remove(&peer_index);
self.send_headers.remove(&peer_index);
self.block_announcement_types.remove(&peer_index);
peer_blocks_requests
.map(|hs| hs.into_iter().collect())
}