diff --git a/Cargo.lock b/Cargo.lock index fc9fd279..d44a7755 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -89,6 +89,7 @@ version = "0.1.0" dependencies = [ "primitives 0.1.0", "rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)", + "siphasher 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -657,6 +658,11 @@ dependencies = [ "winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "siphasher" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "slab" version = "0.3.0" @@ -677,6 +683,8 @@ name = "sync" version = "0.1.0" dependencies = [ "bit-vec 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", + "bitcrypto 0.1.0", + "byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", "chain 0.1.0", "db 0.1.0", "ethcore-devtools 1.3.0", @@ -870,6 +878,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f417c22df063e9450888a7561788e9bd46d3bb3c1466435b4eccb903807f147d" "checksum semver 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d4f410fedcf71af0345d7607d246e7ad15faaadd49d240ee3b24e5dc21a820ac" "checksum shell32-sys 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "72f20b8f3c060374edb8046591ba28f62448c369ccbdc7b02075103fb3a9e38d" +"checksum siphasher 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b1c3c58c9ac43c530919fe6bd8ef11ae2612f64c2bf8eab9346f5b71ce0617f2" "checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23" "checksum smallvec 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "fcc8d19212aacecf95e4a7a2179b26f7aeb9732a915cf01f05b0d3e044865410" "checksum strsim 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "50c069df92e4b01425a8bf3576d5d417943a6a7272fbabaf5bd80b1aaa76442e" diff --git a/crypto/Cargo.toml b/crypto/Cargo.toml index 75a7d0a1..8e898d67 100644 --- a/crypto/Cargo.toml +++ b/crypto/Cargo.toml @@ -5,4 +5,5 @@ authors = ["debris "] [dependencies] rust-crypto = "0.2.36" +siphasher = "0.1.1" primitives = { path = "../primitives" } diff --git a/crypto/src/lib.rs b/crypto/src/lib.rs index f3eb994b..5fed6d43 100644 --- a/crypto/src/lib.rs +++ b/crypto/src/lib.rs @@ -1,10 +1,13 @@ extern crate crypto as rcrypto; extern crate primitives; +extern crate siphasher; +use std::hash::Hasher; use rcrypto::sha1::Sha1; use rcrypto::sha2::Sha256; use rcrypto::ripemd160::Ripemd160; use rcrypto::digest::Digest; +use siphasher::sip::SipHasher24; use primitives::hash::{H32, H160, H256}; pub struct DHash160 { @@ -146,6 +149,14 @@ pub fn dhash256(input: &[u8]) -> H256 { result } +/// SipHash-2-4 +#[inline] +pub fn siphash24(key0: u64, key1: u64, input: &[u8]) -> u64 { + let mut hasher = SipHasher24::new_with_keys(key0, key1); + hasher.write(input); + hasher.finish() +} + /// Data checksum #[inline] pub fn checksum(data: &[u8]) -> H32 { @@ -157,7 +168,7 @@ pub fn checksum(data: &[u8]) -> H32 { #[cfg(test)] mod tests { use primitives::bytes::Bytes; - use super::{ripemd160, sha1, sha256, dhash160, dhash256, checksum}; + use super::{ripemd160, sha1, sha256, dhash160, dhash256, siphash24, checksum}; #[test] fn test_ripemd160() { @@ -192,12 +203,19 @@ mod tests { assert_eq!(result, expected); } - #[test] - fn test_dhash256() { + #[test] + fn test_dhash256() { let expected = "9595c9df90075148eb06860365df33584b75bff782a510c6cd4883a419833d50".into(); let result = dhash256(b"hello"); assert_eq!(result, expected); - } + } + + #[test] + fn test_siphash24() { + let expected = 0x74f839c593dc67fd_u64; + let result = siphash24(0x0706050403020100_u64, 0x0F0E0D0C0B0A0908_u64, &[0; 1]); + assert_eq!(result, expected); + } #[test] fn test_checksum() { diff --git a/db/src/indexed_block.rs b/db/src/indexed_block.rs index 6eed113f..108215c8 100644 --- a/db/src/indexed_block.rs +++ b/db/src/indexed_block.rs @@ -28,6 +28,10 @@ impl From for IndexedBlock { } impl IndexedBlock { + pub fn transactions_len(&self) -> usize { + self.transactions.len() + } + pub fn transactions(&self) -> IndexedTransactions { IndexedTransactions { position: 0, diff --git a/sync/Cargo.toml b/sync/Cargo.toml index 6412467e..460b7de5 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -15,8 +15,10 @@ ethcore-devtools = { path = "../devtools" } bit-vec = "0.4.3" murmur3 = "0.3" rand = "0.3" +byteorder = "0.5" chain = { path = "../chain" } +bitcrypto = { path = "../crypto" } db = { path = "../db" } message = { path = "../message" } miner = { path = "../miner" } diff --git a/sync/src/compact_block_builder.rs b/sync/src/compact_block_builder.rs new file mode 100644 index 00000000..70ec0b49 --- /dev/null +++ b/sync/src/compact_block_builder.rs @@ -0,0 +1,76 @@ +use std::collections::HashSet; +use rand::{thread_rng, Rng}; +use bitcrypto::{sha256, siphash24}; +use byteorder::{LittleEndian, ByteOrder}; +use chain::{BlockHeader, ShortTransactionID}; +use db::IndexedBlock; +use message::common::{BlockHeaderAndIDs, PrefilledTransaction}; +use primitives::hash::H256; +use ser::{Stream, Serializable}; + +/// Maximum size of prefilled transactions in compact block +const MAX_COMPACT_BLOCK_PREFILLED_SIZE: usize = 10 * 1024; + +pub fn build_compact_block(block: IndexedBlock, prefilled_transactions_indexes: HashSet) -> BlockHeaderAndIDs { + let nonce: u64 = thread_rng().gen(); + + let prefilled_transactions_len = prefilled_transactions_indexes.len(); + let mut short_ids: Vec = Vec::with_capacity(block.transactions_len() - prefilled_transactions_len); + let mut prefilled_transactions: Vec = Vec::with_capacity(prefilled_transactions_len); + let mut prefilled_transactions_size: usize = 0; + + for (transaction_index, (transaction_hash, transaction)) in block.transactions().enumerate() { + let transaction_size = transaction.serialized_size(); + if prefilled_transactions_size + transaction_size < MAX_COMPACT_BLOCK_PREFILLED_SIZE + && prefilled_transactions_indexes.contains(&transaction_index) { + prefilled_transactions_size += transaction_size; + prefilled_transactions.push(PrefilledTransaction { + index: transaction_index, + transaction: transaction.clone(), + }) + } else { + short_ids.push(short_transaction_id(nonce, block.header(), transaction_hash)); + } + } + + BlockHeaderAndIDs { + header: block.header().clone(), + nonce: nonce, + short_ids: short_ids, + prefilled_transactions: prefilled_transactions, + } +} + +fn short_transaction_id(nonce: u64, block_header: &BlockHeader, transaction_hash: &H256) -> ShortTransactionID { + // Short transaction IDs are used to represent a transaction without sending a full 256-bit hash. They are calculated by: + // 1) single-SHA256 hashing the block header with the nonce appended (in little-endian) + let mut stream = Stream::new(); + stream.append(block_header); + stream.append(&nonce); + let block_header_with_nonce_hash = sha256(&stream.out()); + + // 2) Running SipHash-2-4 with the input being the transaction ID and the keys (k0/k1) set to the first two little-endian + // 64-bit integers from the above hash, respectively. + let key0 = LittleEndian::read_u64(&block_header_with_nonce_hash[0..8]); + let key1 = LittleEndian::read_u64(&block_header_with_nonce_hash[8..16]); + let siphash_transaction_hash = siphash24(key0, key1, &**transaction_hash); + + // 3) Dropping the 2 most significant bytes from the SipHash output to make it 6 bytes. + let mut siphash_transaction_hash_bytes = [0u8; 8]; + LittleEndian::write_u64(&mut siphash_transaction_hash_bytes, siphash_transaction_hash); + + siphash_transaction_hash_bytes[2..8].into() +} + +#[cfg(test)] +mod tests { + #[test] + fn short_transaction_id_is_correct() { + // TODO + } + + #[test] + fn compact_block_is_built_correctly() { + // TODO + } +} diff --git a/sync/src/connection_filter.rs b/sync/src/connection_filter.rs index 5a1e68aa..5062785f 100644 --- a/sync/src/connection_filter.rs +++ b/sync/src/connection_filter.rs @@ -123,7 +123,7 @@ impl ConnectionFilter { } /// Check if transaction should be sent to this connection && optionally update filter - pub fn filter_transaction(&mut self, transaction_hash: &H256, transaction: &Transaction, transaction_fee_rate: u64) -> bool { + pub fn filter_transaction(&mut self, transaction_hash: &H256, transaction: &Transaction, transaction_fee_rate: Option) -> bool { // check if transaction is known if self.last_transactions.contains_key(transaction_hash) { return false; @@ -131,8 +131,10 @@ impl ConnectionFilter { // check if transaction fee rate is high enough for this peer if let Some(fee_rate) = self.fee_rate { - if transaction_fee_rate < fee_rate { - return false; + if let Some(transaction_fee_rate) = transaction_fee_rate { + if transaction_fee_rate < fee_rate { + return false; + } } } @@ -520,13 +522,13 @@ pub mod tests { let mut filter = ConnectionFilter::with_filterload(&default_filterload()); - assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000)); - assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000)); + assert!(!filter.filter_transaction(&tx1.hash(), &tx1, None)); + assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None)); filter.add(&make_filteradd(&*tx1.hash())); - assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000)); - assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000)); + assert!(filter.filter_transaction(&tx1.hash(), &tx1, None)); + assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None)); } #[test] @@ -539,13 +541,13 @@ pub mod tests { let mut filter = ConnectionFilter::with_filterload(&default_filterload()); - assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000)); - assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000)); + assert!(!filter.filter_transaction(&tx1.hash(), &tx1, None)); + assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None)); filter.add(&make_filteradd(&tx1_out_data)); - assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000)); - assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000)); + assert!(filter.filter_transaction(&tx1.hash(), &tx1, None)); + assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None)); } #[test] @@ -557,13 +559,13 @@ pub mod tests { let mut filter = ConnectionFilter::with_filterload(&default_filterload()); - assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000)); - assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000)); + assert!(!filter.filter_transaction(&tx1.hash(), &tx1, None)); + assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None)); filter.add(&make_filteradd(&tx1_previous_output)); - assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000)); - assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000)); + assert!(filter.filter_transaction(&tx1.hash(), &tx1, None)); + assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None)); } #[test] @@ -576,13 +578,13 @@ pub mod tests { let mut filter = ConnectionFilter::with_filterload(&default_filterload()); - assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000)); - assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000)); + assert!(!filter.filter_transaction(&tx1.hash(), &tx1, None)); + assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None)); filter.add(&make_filteradd(&tx1_input_data)); - assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000)); - assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 1000)); + assert!(filter.filter_transaction(&tx1.hash(), &tx1, None)); + assert!(!filter.filter_transaction(&tx2.hash(), &tx2, None)); } #[test] @@ -592,23 +594,23 @@ pub mod tests { let mut filter = ConnectionFilter::default(); - assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000)); - assert!(filter.filter_transaction(&tx2.hash(), &tx2, 2000)); + assert!(filter.filter_transaction(&tx1.hash(), &tx1, Some(1000))); + assert!(filter.filter_transaction(&tx2.hash(), &tx2, Some(2000))); filter.set_fee_rate(1500); - assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000)); - assert!(filter.filter_transaction(&tx2.hash(), &tx2, 2000)); + assert!(!filter.filter_transaction(&tx1.hash(), &tx1, Some(1000))); + assert!(filter.filter_transaction(&tx2.hash(), &tx2, Some(2000))); filter.set_fee_rate(3000); - assert!(!filter.filter_transaction(&tx1.hash(), &tx1, 1000)); - assert!(!filter.filter_transaction(&tx2.hash(), &tx2, 2000)); + assert!(!filter.filter_transaction(&tx1.hash(), &tx1, Some(1000))); + assert!(!filter.filter_transaction(&tx2.hash(), &tx2, Some(2000))); filter.set_fee_rate(0); - assert!(filter.filter_transaction(&tx1.hash(), &tx1, 1000)); - assert!(filter.filter_transaction(&tx2.hash(), &tx2, 2000)); + assert!(filter.filter_transaction(&tx1.hash(), &tx1, Some(1000))); + assert!(filter.filter_transaction(&tx2.hash(), &tx2, Some(2000))); } #[test] diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 854512a0..af2bb6df 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -1,3 +1,5 @@ +extern crate bitcrypto; +extern crate byteorder; extern crate chain; extern crate db; #[macro_use] @@ -25,6 +27,7 @@ extern crate network; mod best_headers_chain; mod blocks_writer; +mod compact_block_builder; mod connection_filter; mod hash_queue; mod inbound_connection; diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index f80017fd..c64636b8 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -5,7 +5,7 @@ use db; use p2p::OutboundSyncConnectionRef; use message::common::{InventoryType, InventoryVector}; use message::types; -use synchronization_client::{Client, SynchronizationClient}; +use synchronization_client::{Client, SynchronizationClient, BlockAnnouncementType}; use synchronization_executor::{Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor, LocalSynchronizationTaskExecutor}; use synchronization_server::{Server, SynchronizationServer}; use synchronization_verifier::AsyncVerifier; @@ -183,7 +183,7 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon pub fn on_peer_sendheaders(&self, peer_index: usize, _message: types::SendHeaders) { trace!(target: "sync", "Got `sendheaders` message from peer#{}", peer_index); - self.client.lock().on_peer_sendheaders(peer_index); + self.client.lock().on_peer_block_announcement_type(peer_index, BlockAnnouncementType::SendHeader); } pub fn on_peer_feefilter(&self, peer_index: usize, message: types::FeeFilter) { @@ -191,8 +191,23 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon self.client.lock().on_peer_feefilter(peer_index, &message); } - pub fn on_peer_send_compact(&self, peer_index: usize, _message: types::SendCompact) { + pub fn on_peer_send_compact(&self, peer_index: usize, message: types::SendCompact) { trace!(target: "sync", "Got `sendcmpct` message from peer#{}", peer_index); + + // The second integer SHALL be interpreted as a little-endian version number. Nodes sending a sendcmpct message MUST currently set this value to 1. + // TODO: version 2 supports segregated witness transactions + if message.second != 1 { + return; + } + + // Upon receipt of a "sendcmpct" message with the first and second integers set to 1, the node SHOULD announce new blocks by sending a cmpctblock message. + if message.first { + self.client.lock().on_peer_block_announcement_type(peer_index, BlockAnnouncementType::SendCompactBlock); + } + // else: + // Upon receipt of a "sendcmpct" message with the first integer set to 0, the node SHOULD NOT announce new blocks by sending a cmpctblock message, + // but SHOULD announce new blocks by sending invs or headers, as defined by BIP130. + // => work as before } pub fn on_peer_compact_block(&self, peer_index: usize, _message: types::CompactBlock) { diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index f34200c6..244e6e32 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -25,6 +25,7 @@ use synchronization_manager::{manage_synchronization_peers_blocks, manage_synchr manage_unknown_orphaned_blocks, manage_orphaned_transactions, MANAGEMENT_INTERVAL_MS, ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig}; use synchronization_verifier::{Verifier, VerificationSink}; +use compact_block_builder::build_compact_block; use hash_queue::HashPosition; use miner::transaction_fee_rate; use time; @@ -195,7 +196,7 @@ pub trait Client : Send + 'static { fn on_peer_filterload(&mut self, peer_index: usize, message: &types::FilterLoad); fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd); fn on_peer_filterclear(&mut self, peer_index: usize); - fn on_peer_sendheaders(&mut self, peer_index: usize); + fn on_peer_block_announcement_type(&mut self, peer_index: usize, announcement_type: BlockAnnouncementType); fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter); fn on_peer_disconnected(&mut self, peer_index: usize); fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>); @@ -216,7 +217,7 @@ pub trait ClientCore : VerificationSink { fn on_peer_filterload(&mut self, peer_index: usize, message: &types::FilterLoad); fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd); fn on_peer_filterclear(&mut self, peer_index: usize); - fn on_peer_sendheaders(&mut self, peer_index: usize); + fn on_peer_block_announcement_type(&mut self, peer_index: usize, announcement_type: BlockAnnouncementType); fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter); fn on_peer_disconnected(&mut self, peer_index: usize); fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>); @@ -243,6 +244,17 @@ pub struct FilteredInventory { pub notfound: Vec, } +#[derive(Debug, Clone, Copy)] +/// New block announcement type +pub enum BlockAnnouncementType { + /// Send inventory with block hash + SendInventory, + /// Send block header + SendHeader, + /// Send compact block + SendCompactBlock, +} + /// Synchronization client facade pub struct SynchronizationClient { /// Client core @@ -400,8 +412,8 @@ impl Client for SynchronizationClient where T: TaskExecutor, U: Veri self.core.lock().on_peer_filterclear(peer_index); } - fn on_peer_sendheaders(&mut self, peer_index: usize) { - self.core.lock().on_peer_sendheaders(peer_index); + fn on_peer_block_announcement_type(&mut self, peer_index: usize, announcement_type: BlockAnnouncementType) { + self.core.lock().on_peer_block_announcement_type(peer_index, announcement_type); } fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter) { @@ -637,10 +649,10 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { } } - /// Peer wants to get blocks headers instead of blocks hashes when announcing new blocks - fn on_peer_sendheaders(&mut self, peer_index: usize) { + /// Change the way peer is informed about new blocks + fn on_peer_block_announcement_type(&mut self, peer_index: usize, announcement_type: BlockAnnouncementType) { if self.peers.is_known_peer(peer_index) { - self.peers.on_peer_sendheaders(peer_index); + self.peers.set_block_announcement_type(peer_index, announcement_type); } } @@ -947,35 +959,62 @@ impl SynchronizationClientCore where T: TaskExecutor { let tasks: Vec<_> = { self.peers.all_peers().into_iter() .filter_map(|peer_index| { - let send_headers = self.peers.send_headers(peer_index); + let block_announcement_type = self.peers.block_announcement_type(peer_index); - if send_headers { - let filtered_blocks_hashes: Vec<_> = new_blocks_hashes.iter() - .filter(|h| self.peers.filter(peer_index).filter_block(h)) - .collect(); - let chain = self.chain.read(); - let headers: Vec<_> = filtered_blocks_hashes.into_iter() - .filter_map(|h| chain.block_header_by_hash(&h)) - .collect(); - if !headers.is_empty() { - Some(Task::SendHeaders(peer_index, headers, ServerTaskIndex::None)) - } - else { - None - } - } else { - let inventory: Vec<_> = new_blocks_hashes.iter() - .filter(|h| self.peers.filter(peer_index).filter_block(h)) - .map(|h| InventoryVector { - inv_type: InventoryType::MessageBlock, - hash: h.clone(), - }) - .collect(); - if !inventory.is_empty() { - Some(Task::SendInventory(peer_index, inventory, ServerTaskIndex::None)) - } else { - None - } + match block_announcement_type { + BlockAnnouncementType::SendHeader => { + let filtered_blocks_hashes: Vec<_> = new_blocks_hashes.iter() + .filter(|h| self.peers.filter(peer_index).filter_block(h)) + .collect(); + let chain = self.chain.read(); + let headers: Vec<_> = filtered_blocks_hashes.into_iter() + .filter_map(|h| chain.block_header_by_hash(&h)) + .collect(); + if !headers.is_empty() { + Some(Task::SendHeaders(peer_index, headers, ServerTaskIndex::None)) + } + else { + None + } + }, + BlockAnnouncementType::SendCompactBlock => { + let indexed_blocks: Vec = { + let chain = self.chain.read(); + new_blocks_hashes.iter() + .filter_map(|h| chain.storage().block(db::BlockRef::Hash(h.clone()))) + .map(|b| b.into()) + .collect() + }; + + let block_header_and_ids: Vec<_> = indexed_blocks.into_iter() + .filter_map(|b| if self.peers.filter(peer_index).filter_block(&b.hash()) { + let prefilled_transactions_indexes = b.transactions().enumerate() + // we do not filter by fee rate here, because it only reasonable for non-mined transactions + .filter(|&(_, (h, t))| self.peers.filter_mut(peer_index).filter_transaction(h, t, None)) + .map(|(idx, _)| idx) + .collect(); + Some(build_compact_block(b, prefilled_transactions_indexes)) + } else { + None + }) + .collect(); + + Some(Task::SendCompactBlocks(peer_index, block_header_and_ids, ServerTaskIndex::None)) + }, + BlockAnnouncementType::SendInventory => { + let inventory: Vec<_> = new_blocks_hashes.iter() + .filter(|h| self.peers.filter(peer_index).filter_block(h)) + .map(|h| InventoryVector { + inv_type: InventoryType::MessageBlock, + hash: h.clone(), + }) + .collect(); + if !inventory.is_empty() { + Some(Task::SendInventory(peer_index, inventory, ServerTaskIndex::None)) + } else { + None + } + }, } }) .collect() @@ -993,7 +1032,7 @@ impl SynchronizationClientCore where T: TaskExecutor { .filter_map(|peer_index| { let inventory: Vec<_> = new_transactions.iter() .filter(|&&(ref h, tx, tx_fee_rate)| { - self.peers.filter_mut(peer_index).filter_transaction(h, tx, tx_fee_rate) + self.peers.filter_mut(peer_index).filter_transaction(h, tx, Some(tx_fee_rate)) }) .map(|&(ref h, _, _)| InventoryVector { inv_type: InventoryType::MessageTx, @@ -1305,7 +1344,7 @@ pub mod tests { use chain::{Block, Transaction}; use message::common::{InventoryVector, InventoryType}; use message::types; - use super::{Client, Config, SynchronizationClient, SynchronizationClientCore}; + use super::{Client, Config, SynchronizationClient, SynchronizationClientCore, BlockAnnouncementType}; use connection_filter::tests::*; use synchronization_executor::Task; use synchronization_chain::{Chain, ChainRef}; @@ -2148,7 +2187,7 @@ pub mod tests { let mut sync = sync.lock(); sync.on_peer_connected(1); sync.on_peer_connected(2); - sync.on_peer_sendheaders(2); + sync.on_peer_block_announcement_type(2, BlockAnnouncementType::SendHeader); sync.on_peer_connected(3); // igonore tasks @@ -2225,4 +2264,32 @@ pub mod tests { // should not panic here sync.on_peer_block(2, test_data::block_h2()); } + + #[test] + fn relay_new_block_after_sendcmpct() { + let (_, _, executor, _, sync) = create_sync(None, None); + let genesis = test_data::genesis(); + let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build(); + + let mut sync = sync.lock(); + sync.on_peer_connected(1); + sync.on_peer_connected(2); + sync.on_peer_block_announcement_type(2, BlockAnnouncementType::SendCompactBlock); + sync.on_peer_connected(3); + + // igonore tasks + { executor.lock().take_tasks(); } + + sync.on_peer_block(1, b0.clone()); + + let tasks = executor.lock().take_tasks(); + let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: b0.hash() }]; + assert_eq!(tasks.len(), 3); + assert_eq!(tasks[0], Task::RequestBlocksHeaders(1)); + match tasks[1] { + Task::SendCompactBlocks(2, _, _) => (), + _ => panic!("unexpected task"), + } + assert_eq!(tasks[2], Task::SendInventory(3, inventory, ServerTaskIndex::None)); + } } diff --git a/sync/src/synchronization_executor.rs b/sync/src/synchronization_executor.rs index c0c9867a..19896e8a 100644 --- a/sync/src/synchronization_executor.rs +++ b/sync/src/synchronization_executor.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use std::collections::HashMap; use parking_lot::Mutex; use chain::{Block, BlockHeader, Transaction}; -use message::common::{InventoryVector, InventoryType}; +use message::common::{InventoryVector, InventoryType, BlockHeaderAndIDs}; use message::types; use primitives::hash::H256; use p2p::OutboundSyncConnectionRef; @@ -17,6 +17,7 @@ pub trait TaskExecutor : Send + 'static { fn execute(&mut self, task: Task); } +// TODO: get rid of unneeded ServerTaskIndex-es /// Synchronization task for the peer. #[derive(Debug, PartialEq)] pub enum Task { @@ -40,6 +41,8 @@ pub enum Task { SendInventory(usize, Vec, ServerTaskIndex), /// Send headers SendHeaders(usize, Vec, ServerTaskIndex), + /// Send compact blocks + SendCompactBlocks(usize, Vec, ServerTaskIndex), /// Notify io about ignored request Ignore(usize, u32), } @@ -187,6 +190,17 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { } } }, + Task::SendCompactBlocks(peer_index, compact_blocks, id) => { + if let Some(connection) = self.peers.get_mut(&peer_index) { + assert_eq!(id.raw(), None); + for compact_block in compact_blocks { + trace!(target: "sync", "Sending compact_block {:?} to peer#{}", compact_block.header.hash(), peer_index); + connection.send_compact_block(&types::CompactBlock { + header: compact_block, + }); + } + } + }, Task::Ignore(peer_index, id) => { if let Some(connection) = self.peers.get_mut(&peer_index) { trace!(target: "sync", "Ignoring request from peer#{} with id {}", peer_index, id); diff --git a/sync/src/synchronization_peers.rs b/sync/src/synchronization_peers.rs index 9c996b68..aa7bf7a8 100644 --- a/sync/src/synchronization_peers.rs +++ b/sync/src/synchronization_peers.rs @@ -4,6 +4,7 @@ use primitives::hash::H256; use linked_hash_map::LinkedHashMap; use time::precise_time_s; use connection_filter::ConnectionFilter; +use synchronization_client::BlockAnnouncementType; /// Max peer failures # before excluding from sync process const MAX_PEER_FAILURES: usize = 2; @@ -27,8 +28,8 @@ pub struct Peers { inventory_requests_order: LinkedHashMap, /// Peer connections filters. filters: HashMap, - /// Flags, informing that peer wants `headers` message instead of `inventory` when announcing new blocks - send_headers: HashSet, + /// The way peer is informed about new blocks + block_announcement_types: HashMap, } /// Information on synchronization peers @@ -54,7 +55,7 @@ impl Peers { inventory_requests: HashSet::new(), inventory_requests_order: LinkedHashMap::new(), filters: HashMap::new(), - send_headers: HashSet::new(), + block_announcement_types: HashMap::new(), } } @@ -160,10 +161,10 @@ impl Peers { self.filters.entry(peer_index).or_insert_with(ConnectionFilter::default) } - /// Does peer wants `headers` message instead of `inventory` when announcing new blocks - pub fn send_headers(&self, peer_index: usize) -> bool { - assert!(self.is_known_peer(peer_index)); - self.send_headers.contains(&peer_index) + /// Get the way peer is informed about new blocks + pub fn block_announcement_type(&self, peer_index: usize) -> BlockAnnouncementType { + self.block_announcement_types.get(&peer_index).cloned() + .unwrap_or(BlockAnnouncementType::SendInventory) } /// Mark peer as useful. @@ -193,9 +194,9 @@ impl Peers { self.inventory_requests_order.remove(&peer_index); } - /// Peer wants `headers` message instead of `inventory` when announcing new blocks - pub fn on_peer_sendheaders(&mut self, peer_index: usize) { - self.send_headers.insert(peer_index); + /// Change the way peer is informed about new blocks + pub fn set_block_announcement_type(&mut self, peer_index: usize, announcement_type: BlockAnnouncementType) { + self.block_announcement_types.insert(peer_index, announcement_type); } /// Peer wants to limit transaction announcing by transaction fee @@ -214,7 +215,7 @@ impl Peers { self.inventory_requests.remove(&peer_index); self.inventory_requests_order.remove(&peer_index); self.filters.remove(&peer_index); - self.send_headers.remove(&peer_index); + self.block_announcement_types.remove(&peer_index); peer_blocks_requests .map(|hs| hs.into_iter().collect()) }