From 730bc619c401a4a20313e3e335d689775f3658d9 Mon Sep 17 00:00:00 2001 From: Marek Kotewicz Date: Mon, 21 Nov 2016 18:06:14 +0100 Subject: [PATCH 1/9] Revert "Revert "Revert "revert "relay canonized blocks"""" --- sync/src/synchronization_chain.rs | 32 +++++++++++---- sync/src/synchronization_client.rs | 58 ++++++++++++++++++++++++++-- sync/src/synchronization_executor.rs | 16 ++++++++ 3 files changed, 95 insertions(+), 11 deletions(-) diff --git a/sync/src/synchronization_chain.rs b/sync/src/synchronization_chain.rs index 7dac3e2a..38bc8d7c 100644 --- a/sync/src/synchronization_chain.rs +++ b/sync/src/synchronization_chain.rs @@ -25,10 +25,22 @@ const NUMBER_OF_QUEUES: usize = 3; /// Block insertion result #[derive(Debug, Default, PartialEq)] pub struct BlockInsertionResult { - /// Transaction to 'reverify' + /// Hashes of blocks, which were canonized during this insertion procedure. Order matters + pub canonized_blocks_hashes: Vec, + /// Transaction to 'reverify'. Order matters pub transactions_to_reverify: Vec<(H256, Transaction)>, } +impl BlockInsertionResult { + #[cfg(test)] + pub fn with_canonized_blocks(canonized_blocks_hashes: Vec) -> Self { + BlockInsertionResult { + canonized_blocks_hashes: canonized_blocks_hashes, + transactions_to_reverify: Vec::new(), + } + } +} + /// Block synchronization state #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum BlockState { @@ -323,6 +335,7 @@ impl Chain { // no transactions to reverify, because we have just appended new transactions to the blockchain Ok(BlockInsertionResult { + canonized_blocks_hashes: vec![hash], transactions_to_reverify: Vec::new(), }) } @@ -340,15 +353,18 @@ impl Chain { // + all transactions from previous blocks of this fork were accepted // => delete accepted transactions from verification queue and from the memory pool let this_block_transactions_hashes = block.transactions.iter().map(|tx| tx.hash()); + let mut canonized_blocks_hashes: Vec = Vec::new(); let mut new_main_blocks_transactions_hashes: Vec = Vec::new(); while let Some(canonized_block_hash) = reorganization.pop_canonized() { - let canonized_transactions_hashes = self.storage.block_transaction_hashes(db::BlockRef::Hash(canonized_block_hash)); + let canonized_transactions_hashes = self.storage.block_transaction_hashes(db::BlockRef::Hash(canonized_block_hash.clone())); new_main_blocks_transactions_hashes.extend(canonized_transactions_hashes); + canonized_blocks_hashes.push(canonized_block_hash); } for transaction_accepted in this_block_transactions_hashes.chain(new_main_blocks_transactions_hashes.into_iter()) { self.memory_pool.remove_by_hash(&transaction_accepted); self.verifying_transactions.remove(&transaction_accepted); } + canonized_blocks_hashes.reverse(); // reverify all transactions from old main branch' blocks let mut old_main_blocks_transactions_hashes: Vec = Vec::new(); @@ -378,6 +394,7 @@ impl Chain { self.verifying_transactions.clear(); Ok(BlockInsertionResult { + canonized_blocks_hashes: canonized_blocks_hashes, // order matters: db transactions, then ordered mempool transactions, then ordered verifying transactions transactions_to_reverify: old_main_blocks_transactions.into_iter() .chain(memory_pool_transactions.into_iter()) @@ -1085,24 +1102,25 @@ mod tests { chain.insert_verified_transaction(tx4); chain.insert_verified_transaction(tx5); - assert_eq!(chain.insert_best_block(b0.hash(), &b0).expect("block accepted"), BlockInsertionResult::default()); + assert_eq!(chain.insert_best_block(b0.hash(), &b0).expect("block accepted"), BlockInsertionResult::with_canonized_blocks(vec![b0.hash()])); assert_eq!(chain.information().transactions.transactions_count, 3); - assert_eq!(chain.insert_best_block(b1.hash(), &b1).expect("block accepted"), BlockInsertionResult::default()); + assert_eq!(chain.insert_best_block(b1.hash(), &b1).expect("block accepted"), BlockInsertionResult::with_canonized_blocks(vec![b1.hash()])); assert_eq!(chain.information().transactions.transactions_count, 3); - assert_eq!(chain.insert_best_block(b2.hash(), &b2).expect("block accepted"), BlockInsertionResult::default()); + assert_eq!(chain.insert_best_block(b2.hash(), &b2).expect("block accepted"), BlockInsertionResult::with_canonized_blocks(vec![b2.hash()])); assert_eq!(chain.information().transactions.transactions_count, 3); assert_eq!(chain.insert_best_block(b3.hash(), &b3).expect("block accepted"), BlockInsertionResult::default()); assert_eq!(chain.information().transactions.transactions_count, 3); assert_eq!(chain.insert_best_block(b4.hash(), &b4).expect("block accepted"), BlockInsertionResult::default()); assert_eq!(chain.information().transactions.transactions_count, 3); // order matters - let transactions_to_reverify_hashes: Vec<_> = chain.insert_best_block(b5.hash(), &b5) - .expect("block accepted") + let insert_result = chain.insert_best_block(b5.hash(), &b5).expect("block accepted"); + let transactions_to_reverify_hashes: Vec<_> = insert_result .transactions_to_reverify .into_iter() .map(|(h, _)| h) .collect(); assert_eq!(transactions_to_reverify_hashes, vec![tx1_hash, tx2_hash]); + assert_eq!(insert_result.canonized_blocks_hashes, vec![b3.hash(), b4.hash(), b5.hash()]); assert_eq!(chain.information().transactions.transactions_count, 0); // tx3, tx4, tx5 are added to the database } } diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 71ae4440..0d3988d0 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -629,8 +629,10 @@ impl VerificationSink for SynchronizationClientCore where T: TaskExecutor self.execute_synchronization_tasks(None); // relay block to our peers - if self.state.is_saturated() { - // TODO: Task::BroadcastBlock + if self.state.is_saturated() || self.state.is_nearly_saturated() { + // TODO: remember peer' last N blocks and send only if peer has no canonized blocks + // TODO: send `headers` if peer has not send `sendheaders` command + self.executor.lock().execute(Task::BroadcastBlocksHashes(insert_result.canonized_blocks_hashes)); } // deal with block transactions @@ -1243,14 +1245,15 @@ pub mod tests { sync.on_new_blocks_headers(1, vec![block.block_header.clone()]); sync.on_new_blocks_headers(2, vec![block.block_header.clone()]); executor.lock().take_tasks(); - sync.on_peer_block(2, block); + sync.on_peer_block(2, block.clone()); let tasks = executor.lock().take_tasks(); - assert_eq!(tasks.len(), 4); + assert_eq!(tasks.len(), 5); assert!(tasks.iter().any(|t| t == &Task::RequestBlocksHeaders(1))); assert!(tasks.iter().any(|t| t == &Task::RequestBlocksHeaders(2))); assert!(tasks.iter().any(|t| t == &Task::RequestMemoryPool(1))); assert!(tasks.iter().any(|t| t == &Task::RequestMemoryPool(2))); + assert!(tasks.iter().any(|t| t == &Task::BroadcastBlocksHashes(vec![block.hash()]))); } #[test] @@ -1764,4 +1767,51 @@ pub mod tests { sync.on_new_blocks_headers(2, vec![b10.block_header.clone(), b21.block_header.clone(), b22.block_header.clone(), b23.block_header.clone()]); } + + #[test] + fn relay_new_block_when_in_saturated_state() { + let (_, _, executor, _, sync) = create_sync(None, None); + let genesis = test_data::genesis(); + let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build(); + let b1 = test_data::block_builder().header().parent(b0.hash()).build().build(); + let b2 = test_data::block_builder().header().parent(b1.hash()).build().build(); + let b3 = test_data::block_builder().header().parent(b2.hash()).build().build(); + + let mut sync = sync.lock(); + sync.on_new_blocks_headers(1, vec![b0.block_header.clone(), b1.block_header.clone()]); + sync.on_peer_block(1, b0.clone()); + sync.on_peer_block(1, b1.clone()); + + // we were in synchronization state => block is not relayed + { + let tasks = executor.lock().take_tasks(); + assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), + Task::RequestBlocks(1, vec![b0.hash(), b1.hash()]), + Task::RequestBlocksHeaders(1), + Task::RequestMemoryPool(1) + ]); + } + + sync.on_peer_block(1, b2.clone()); + + // we were in saturated state => block is relayed + { + let tasks = executor.lock().take_tasks(); + assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), Task::BroadcastBlocksHashes(vec![b2.hash()])]); + } + + sync.on_new_blocks_headers(1, vec![b3.block_header.clone()]); + sync.on_peer_block(1, b3.clone()); + + // we were in nearly saturated state => block is relayed + { + let tasks = executor.lock().take_tasks(); + assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), + Task::RequestBlocks(1, vec![b3.hash()]), + Task::BroadcastBlocksHashes(vec![b3.hash()]), + Task::RequestBlocksHeaders(1), + Task::RequestMemoryPool(1) + ]); + } + } } diff --git a/sync/src/synchronization_executor.rs b/sync/src/synchronization_executor.rs index 8a8f57ca..286ccdf6 100644 --- a/sync/src/synchronization_executor.rs +++ b/sync/src/synchronization_executor.rs @@ -30,6 +30,8 @@ pub enum Task { RequestMemoryPool(usize), /// Send block. SendBlock(usize, Block, ServerTaskIndex), + /// Broadcast block. + BroadcastBlocksHashes(Vec), /// Send notfound SendNotFound(usize, Vec, ServerTaskIndex), /// Send inventory @@ -132,6 +134,20 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { connection.send_block(&block_message); } }, + Task::BroadcastBlocksHashes(blocks_hashes) => { + let inventory = types::Inv { + inventory: blocks_hashes.into_iter().map(|h| InventoryVector { + inv_type: InventoryType::MessageBlock, + hash: h, + }) + .collect(), + }; + + for (peer_index, connection) in self.peers.iter() { + trace!(target: "sync", "Sending inventory with {} blocks hashes to peer#{}", inventory.inventory.len(), peer_index); + connection.send_inventory(&inventory); + } + }, Task::SendNotFound(peer_index, unknown_inventory, id) => { let notfound = types::NotFound { inventory: unknown_inventory, From b2798841de2337fbdcb479aab8ed869282d77615 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 22 Nov 2016 02:50:27 +0300 Subject: [PATCH 2/9] refactored sigop counter --- script/src/script.rs | 27 ++++++++++++++++ verification/src/chain_verifier.rs | 49 ++++++++++++++++++++++-------- verification/src/lib.rs | 4 +++ verification/src/utils.rs | 5 +++ 4 files changed, 72 insertions(+), 13 deletions(-) diff --git a/script/src/script.rs b/script/src/script.rs index 6a07bf89..292ce9ce 100644 --- a/script/src/script.rs +++ b/script/src/script.rs @@ -359,6 +359,33 @@ impl Script { Ok(result) } + + pub fn sigop_count_p2sh(&self, ref_output: &Script) -> Result { + if !self.is_pay_to_script_hash() { return self.sigop_count(true); } + + let mut script_data: Option<&[u8]> = None; + // we need last command + for next in ref_output.iter() { + let instruction = match next { + Err(_) => return Ok(0), + Ok(i) => i, + }; + + if instruction.opcode as u8 > Opcode::OP_16 as u8 { + return Ok(0); + } + + script_data = instruction.data; + } + + match script_data { + Some(slc) => { + let nested_script: Script = slc.to_vec().into(); + nested_script.sigop_count(true) + }, + None => Ok(0), + } + } } pub struct Instructions<'a> { diff --git a/verification/src/chain_verifier.rs b/verification/src/chain_verifier.rs index d9ebbaaf..ceb7b52d 100644 --- a/verification/src/chain_verifier.rs +++ b/verification/src/chain_verifier.rs @@ -10,6 +10,8 @@ const COINBASE_MATURITY: u32 = 100; // 2 hours const MAX_BLOCK_SIGOPS: usize = 20000; const MAX_BLOCK_SIZE: usize = 1000000; +const BIP16_TIME: u32 = 1333238400; + pub struct ChainVerifier { store: db::SharedStore, verify_p2sh: bool, @@ -110,7 +112,11 @@ impl ChainVerifier { Ok(()) } - fn verify_transaction(&self, block: &chain::Block, transaction: &chain::Transaction) -> Result<(), TransactionError> { + fn verify_transaction(&self, + block: &chain::Block, + transaction: &chain::Transaction, + sequence: usize, + ) -> Result { use script::{ TransactionInputSigner, TransactionSignatureChecker, @@ -119,6 +125,17 @@ impl ChainVerifier { verify_script, }; + let mut sigops = utils::transaction_sigops(transaction) + .map_err(|e| TransactionError::SignatureMallformed(e.to_string()))?; + + if sequence == 0 { return Ok(sigops); } + + if sigops >= MAX_BLOCK_SIGOPS { return Err(TransactionError::Sigops(sigops)); } + + // strict pay-to-script-hash signature operations count toward block + // signature operations limit is enforced with BIP16 + let is_strict_p2sh = block.header().time >= BIP16_TIME; + for (input_index, input) in transaction.inputs().iter().enumerate() { let store_parent_transaction = self.store.transaction(&input.previous_output.hash); let parent_transaction = store_parent_transaction @@ -140,6 +157,12 @@ impl ChainVerifier { let input: Script = input.script_sig().to_vec().into(); let output: Script = paired_output.script_pubkey.to_vec().into(); + if is_strict_p2sh && input.is_pay_to_script_hash() { + sigops += utils::p2sh_sigops(&input, &output); + + if sigops >= MAX_BLOCK_SIGOPS { return Err(TransactionError::SigopsP2SH(sigops)); } + } + let flags = VerificationFlags::default() .verify_p2sh(self.verify_p2sh) .verify_clocktimeverify(self.verify_clocktimeverify); @@ -156,7 +179,7 @@ impl ChainVerifier { } } - Ok(()) + Ok(sigops) } fn verify_block(&self, block: &chain::Block) -> VerificationResult { @@ -201,20 +224,20 @@ impl ChainVerifier { return Err(Error::CoinbaseSignatureLength(coinbase_script_len)); } - // verify transactions (except coinbase) - let mut block_sigops = utils::transaction_sigops(&block.transactions()[0]) - .map_err(|e| Error::Transaction(1, TransactionError::SignatureMallformed(e.to_string())))?; - - for (idx, transaction) in block.transactions().iter().enumerate().skip(1) { - - block_sigops += utils::transaction_sigops(transaction) - .map_err(|e| Error::Transaction(idx, TransactionError::SignatureMallformed(e.to_string())))?; + // transaction verification including number of signature operations checking + let mut block_sigops = 0; + for (idx, transaction) in block.transactions().iter().enumerate() { + block_sigops += try!( + self.verify_transaction( + block, + transaction, + idx, + ).map_err(|e| Error::Transaction(idx, e)) + ); if block_sigops > MAX_BLOCK_SIGOPS { return Err(Error::MaximumSigops); } - - try!(self.verify_transaction(block, transaction).map_err(|e| Error::Transaction(idx, e))); } // todo: pre-process projected block number once verification is parallel! @@ -253,7 +276,7 @@ impl ContinueVerify for ChainVerifier { fn continue_verify(&self, block: &chain::Block, state: usize) -> VerificationResult { // verify transactions (except coinbase) for (idx, transaction) in block.transactions().iter().enumerate().skip(state - 1) { - try!(self.verify_transaction(block, transaction).map_err(|e| Error::Transaction(idx, e))); + try!(self.verify_transaction(block, transaction, idx).map_err(|e| Error::Transaction(idx, e))); } let _parent = match self.store.block(BlockRef::Hash(block.header().previous_header_hash.clone())) { diff --git a/verification/src/lib.rs b/verification/src/lib.rs index 15340c75..1f74f9b3 100644 --- a/verification/src/lib.rs +++ b/verification/src/lib.rs @@ -73,6 +73,10 @@ pub enum TransactionError { Overspend, /// Signature script can't be properly parsed SignatureMallformed(String), + /// Too many signature operations + Sigops(usize), + /// Too many signature operations once p2sh operations included + SigopsP2SH(usize), } #[derive(PartialEq, Debug)] diff --git a/verification/src/utils.rs b/verification/src/utils.rs index 701e9bc8..c04f7bfb 100644 --- a/verification/src/utils.rs +++ b/verification/src/utils.rs @@ -73,6 +73,11 @@ pub fn transaction_sigops(transaction: &chain::Transaction) -> Result usize { + // todo: not always skip malformed output? + input.sigop_count_p2sh(ref_output).unwrap_or(0) +} + #[cfg(test)] mod tests { From fff3c29e6d86b15bcdd5a868ed4476a86a8b1d3e Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 22 Nov 2016 03:18:15 +0300 Subject: [PATCH 3/9] this is always wrong way first --- script/src/script.rs | 4 ++-- verification/src/chain_verifier.rs | 4 ++-- verification/src/utils.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/script/src/script.rs b/script/src/script.rs index 292ce9ce..a08bf830 100644 --- a/script/src/script.rs +++ b/script/src/script.rs @@ -360,12 +360,12 @@ impl Script { Ok(result) } - pub fn sigop_count_p2sh(&self, ref_output: &Script) -> Result { + pub fn sigop_count_p2sh(&self, input_ref: &Script) -> Result { if !self.is_pay_to_script_hash() { return self.sigop_count(true); } let mut script_data: Option<&[u8]> = None; // we need last command - for next in ref_output.iter() { + for next in input_ref.iter() { let instruction = match next { Err(_) => return Ok(0), Ok(i) => i, diff --git a/verification/src/chain_verifier.rs b/verification/src/chain_verifier.rs index ceb7b52d..4d10f6a7 100644 --- a/verification/src/chain_verifier.rs +++ b/verification/src/chain_verifier.rs @@ -157,8 +157,8 @@ impl ChainVerifier { let input: Script = input.script_sig().to_vec().into(); let output: Script = paired_output.script_pubkey.to_vec().into(); - if is_strict_p2sh && input.is_pay_to_script_hash() { - sigops += utils::p2sh_sigops(&input, &output); + if is_strict_p2sh && output.is_pay_to_script_hash() { + sigops += utils::p2sh_sigops(&output, &input); if sigops >= MAX_BLOCK_SIGOPS { return Err(TransactionError::SigopsP2SH(sigops)); } } diff --git a/verification/src/utils.rs b/verification/src/utils.rs index c04f7bfb..c0b175c7 100644 --- a/verification/src/utils.rs +++ b/verification/src/utils.rs @@ -73,9 +73,9 @@ pub fn transaction_sigops(transaction: &chain::Transaction) -> Result usize { +pub fn p2sh_sigops(output: &Script, input_ref: &Script) -> usize { // todo: not always skip malformed output? - input.sigop_count_p2sh(ref_output).unwrap_or(0) + output.sigop_count_p2sh(input_ref).unwrap_or(0) } #[cfg(test)] From eb24c5aff6d4a82640a46bf322a42642aad159e8 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 22 Nov 2016 03:27:37 +0300 Subject: [PATCH 4/9] coinbase verification at index > 0 --- verification/src/chain_verifier.rs | 3 +++ verification/src/lib.rs | 2 ++ 2 files changed, 5 insertions(+) diff --git a/verification/src/chain_verifier.rs b/verification/src/chain_verifier.rs index 4d10f6a7..25fb7bc6 100644 --- a/verification/src/chain_verifier.rs +++ b/verification/src/chain_verifier.rs @@ -130,6 +130,9 @@ impl ChainVerifier { if sequence == 0 { return Ok(sigops); } + // must not be coinbase (sequence = 0 is returned above) + if transaction.is_coinbase() { return Err(TransactionError::MisplacedCoinbase(sequence)); } + if sigops >= MAX_BLOCK_SIGOPS { return Err(TransactionError::Sigops(sigops)); } // strict pay-to-script-hash signature operations count toward block diff --git a/verification/src/lib.rs b/verification/src/lib.rs index 1f74f9b3..5667ca58 100644 --- a/verification/src/lib.rs +++ b/verification/src/lib.rs @@ -77,6 +77,8 @@ pub enum TransactionError { Sigops(usize), /// Too many signature operations once p2sh operations included SigopsP2SH(usize), + /// Coinbase transaction is found at position that is not 0 + MisplacedCoinbase(usize), } #[derive(PartialEq, Debug)] From 5e80dd69bdc8bbbcd795925e74799306d1374488 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Tue, 22 Nov 2016 09:14:31 +0300 Subject: [PATCH 5/9] do not boomerang-relay new blocks --- sync/src/best_headers_chain.rs | 1 - sync/src/blocks_writer.rs | 1 - sync/src/local_node.rs | 2 -- sync/src/synchronization_chain.rs | 2 +- sync/src/synchronization_client.rs | 50 ++++++++++++++++++++++------ sync/src/synchronization_executor.rs | 18 +--------- sync/src/synchronization_manager.rs | 1 - sync/src/synchronization_peers.rs | 40 +++++++++++++++++++--- sync/src/synchronization_server.rs | 2 +- sync/src/synchronization_verifier.rs | 4 +-- 10 files changed, 80 insertions(+), 41 deletions(-) diff --git a/sync/src/best_headers_chain.rs b/sync/src/best_headers_chain.rs index d8c1aefc..3b03e636 100644 --- a/sync/src/best_headers_chain.rs +++ b/sync/src/best_headers_chain.rs @@ -123,7 +123,6 @@ impl BestHeadersChain { #[cfg(test)] mod tests { use super::BestHeadersChain; - use chain::RepresentH256; use primitives::hash::H256; use test_data; diff --git a/sync/src/blocks_writer.rs b/sync/src/blocks_writer.rs index 3f2f6816..8cb891a9 100644 --- a/sync/src/blocks_writer.rs +++ b/sync/src/blocks_writer.rs @@ -37,7 +37,6 @@ mod tests { use std::sync::Arc; use super::super::Error; use super::BlocksWriter; - use chain::RepresentH256; use test_data; use verification; diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index deaaf0fe..64306c1a 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use parking_lot::Mutex; use db; -use chain::RepresentH256; use p2p::OutboundSyncConnectionRef; use message::common::{InventoryType, InventoryVector}; use message::types; @@ -222,7 +221,6 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon mod tests { use std::sync::Arc; use parking_lot::{Mutex, RwLock}; - use chain::RepresentH256; use synchronization_executor::Task; use synchronization_executor::tests::DummyTaskExecutor; use synchronization_client::{Config, SynchronizationClient, SynchronizationClientCore}; diff --git a/sync/src/synchronization_chain.rs b/sync/src/synchronization_chain.rs index 38bc8d7c..45c312a5 100644 --- a/sync/src/synchronization_chain.rs +++ b/sync/src/synchronization_chain.rs @@ -699,7 +699,7 @@ impl fmt::Debug for Chain { #[cfg(test)] mod tests { use std::sync::Arc; - use chain::{Transaction, RepresentH256}; + use chain::Transaction; use hash_queue::HashPosition; use super::{Chain, BlockState, TransactionState, HeadersIntersection, BlockInsertionResult}; use db::{self, Store, BestBlock}; diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 0d3988d0..44d09ff2 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -8,7 +8,8 @@ use futures::stream::Stream; use tokio_core::reactor::{Handle, Interval}; use futures_cpupool::CpuPool; use db; -use chain::{Block, BlockHeader, Transaction, RepresentH256}; +use chain::{Block, BlockHeader, Transaction}; +use message::common::{InventoryVector, InventoryType}; use primitives::hash::H256; use synchronization_peers::Peers; #[cfg(test)] use synchronization_peers::{Information as PeersInformation}; @@ -18,6 +19,7 @@ use synchronization_chain::{Information as ChainInformation}; use synchronization_executor::{Task, TaskExecutor}; use orphan_blocks_pool::OrphanBlocksPool; use orphan_transactions_pool::OrphanTransactionsPool; +use synchronization_server::ServerTaskIndex; use synchronization_manager::{manage_synchronization_peers_blocks, manage_synchronization_peers_inventory, manage_unknown_orphaned_blocks, manage_orphaned_transactions, MANAGEMENT_INTERVAL_MS, ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig}; @@ -630,9 +632,7 @@ impl VerificationSink for SynchronizationClientCore where T: TaskExecutor // relay block to our peers if self.state.is_saturated() || self.state.is_nearly_saturated() { - // TODO: remember peer' last N blocks and send only if peer has no canonized blocks - // TODO: send `headers` if peer has not send `sendheaders` command - self.executor.lock().execute(Task::BroadcastBlocksHashes(insert_result.canonized_blocks_hashes)); + self.relay_new_blocks(insert_result.canonized_blocks_hashes); } // deal with block transactions @@ -770,6 +770,26 @@ impl SynchronizationClientCore where T: TaskExecutor { } } + /// Relay new blocks + fn relay_new_blocks(&self, new_blocks_hashes: Vec) { + let mut executor = self.executor.lock(); + // TODO: use all peers here (currently sync only) + // TODO: send `headers` if peer has not send `sendheaders` command + for peer_index in self.peers.all_peers() { + let inventory: Vec<_> = new_blocks_hashes.iter() + .filter(|h| !self.peers.has_block_with_hash(peer_index, h)) + .cloned() + .map(|h| InventoryVector { + inv_type: InventoryType::MessageBlock, + hash: h, + }) + .collect(); + if !inventory.is_empty() { + executor.execute(Task::SendInventory(peer_index, inventory, ServerTaskIndex::None)); + } + } + } + /// Process new blocks inventory fn process_new_blocks_headers(&mut self, peer_index: usize, mut hashes: Vec, mut headers: Vec) { assert_eq!(hashes.len(), headers.len()); @@ -1053,12 +1073,14 @@ pub mod tests { use std::sync::Arc; use parking_lot::{Mutex, RwLock}; use tokio_core::reactor::{Core, Handle}; - use chain::{Block, Transaction, RepresentH256}; + use chain::{Block, Transaction}; + use message::common::{InventoryVector, InventoryType}; use super::{Client, Config, SynchronizationClient, SynchronizationClientCore}; use synchronization_executor::Task; use synchronization_chain::{Chain, ChainRef}; use synchronization_executor::tests::DummyTaskExecutor; use synchronization_verifier::tests::DummyVerifier; + use synchronization_server::ServerTaskIndex; use primitives::hash::H256; use p2p::event_loop; use test_data; @@ -1253,7 +1275,9 @@ pub mod tests { assert!(tasks.iter().any(|t| t == &Task::RequestBlocksHeaders(2))); assert!(tasks.iter().any(|t| t == &Task::RequestMemoryPool(1))); assert!(tasks.iter().any(|t| t == &Task::RequestMemoryPool(2))); - assert!(tasks.iter().any(|t| t == &Task::BroadcastBlocksHashes(vec![block.hash()]))); + + let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: block.hash() }]; + assert!(tasks.iter().any(|t| t == &Task::SendInventory(1, inventory.clone(), ServerTaskIndex::None))); } #[test] @@ -1792,12 +1816,13 @@ pub mod tests { ]); } - sync.on_peer_block(1, b2.clone()); + sync.on_peer_block(2, b2.clone()); // we were in saturated state => block is relayed { let tasks = executor.lock().take_tasks(); - assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), Task::BroadcastBlocksHashes(vec![b2.hash()])]); + let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: b2.hash() }]; + assert_eq!(tasks, vec![Task::RequestBlocksHeaders(2), Task::SendInventory(1, inventory, ServerTaskIndex::None)]); } sync.on_new_blocks_headers(1, vec![b3.block_header.clone()]); @@ -1806,11 +1831,14 @@ pub mod tests { // we were in nearly saturated state => block is relayed { let tasks = executor.lock().take_tasks(); + let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: b3.hash() }]; assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), - Task::RequestBlocks(1, vec![b3.hash()]), - Task::BroadcastBlocksHashes(vec![b3.hash()]), + Task::RequestBlocks(2, vec![b3.hash()]), + Task::SendInventory(2, inventory, ServerTaskIndex::None), Task::RequestBlocksHeaders(1), - Task::RequestMemoryPool(1) + Task::RequestMemoryPool(1), + Task::RequestBlocksHeaders(2), + Task::RequestMemoryPool(2), ]); } } diff --git a/sync/src/synchronization_executor.rs b/sync/src/synchronization_executor.rs index 286ccdf6..16f13a50 100644 --- a/sync/src/synchronization_executor.rs +++ b/sync/src/synchronization_executor.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use std::collections::HashMap; use parking_lot::Mutex; -use chain::{Block, BlockHeader, RepresentH256}; +use chain::{Block, BlockHeader}; use message::common::{InventoryVector, InventoryType}; use message::types; use primitives::hash::H256; @@ -30,8 +30,6 @@ pub enum Task { RequestMemoryPool(usize), /// Send block. SendBlock(usize, Block, ServerTaskIndex), - /// Broadcast block. - BroadcastBlocksHashes(Vec), /// Send notfound SendNotFound(usize, Vec, ServerTaskIndex), /// Send inventory @@ -134,20 +132,6 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { connection.send_block(&block_message); } }, - Task::BroadcastBlocksHashes(blocks_hashes) => { - let inventory = types::Inv { - inventory: blocks_hashes.into_iter().map(|h| InventoryVector { - inv_type: InventoryType::MessageBlock, - hash: h, - }) - .collect(), - }; - - for (peer_index, connection) in self.peers.iter() { - trace!(target: "sync", "Sending inventory with {} blocks hashes to peer#{}", inventory.inventory.len(), peer_index); - connection.send_inventory(&inventory); - } - }, Task::SendNotFound(peer_index, unknown_inventory, id) => { let notfound = types::NotFound { inventory: unknown_inventory, diff --git a/sync/src/synchronization_manager.rs b/sync/src/synchronization_manager.rs index ab58e017..b051d464 100644 --- a/sync/src/synchronization_manager.rs +++ b/sync/src/synchronization_manager.rs @@ -185,7 +185,6 @@ mod tests { use std::collections::HashSet; use super::{ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig, manage_synchronization_peers_blocks, manage_unknown_orphaned_blocks, manage_orphaned_transactions}; - use chain::RepresentH256; use synchronization_peers::Peers; use primitives::hash::H256; use test_data; diff --git a/sync/src/synchronization_peers.rs b/sync/src/synchronization_peers.rs index 627a8c99..85cd16b5 100644 --- a/sync/src/synchronization_peers.rs +++ b/sync/src/synchronization_peers.rs @@ -6,6 +6,8 @@ use time::precise_time_s; /// Max peer failures # before excluding from sync process const MAX_PEER_FAILURES: usize = 2; +/// Max last blocks to store for given peer +const MAX_LAST_BLOCKS_TO_STORE: usize = 64; /// Set of peers selected for synchronization. #[derive(Debug)] @@ -24,6 +26,8 @@ pub struct Peers { inventory_requests: HashSet, /// Last inventory message time from peer. inventory_requests_order: LinkedHashMap, + /// Last blocks from peer + last_block_responses: HashMap>, } /// Information on synchronization peers @@ -48,6 +52,7 @@ impl Peers { blocks_requests_order: LinkedHashMap::new(), inventory_requests: HashSet::new(), inventory_requests_order: LinkedHashMap::new(), + last_block_responses: HashMap::new(), } } @@ -73,19 +78,27 @@ impl Peers { /// Get all peers pub fn all_peers(&self) -> Vec { - self.idle.iter().cloned() + let mut unique: Vec<_> = self.idle.iter().cloned() .chain(self.unuseful.iter().cloned()) .chain(self.blocks_requests.keys().cloned()) .chain(self.inventory_requests.iter().cloned()) - .collect() + .collect(); + // need stable (for tests) && unique peers here, as blocks_requests can intersect with inventory_requests + unique.sort(); + unique.dedup(); + unique } /// Get useful peers pub fn useful_peers(&self) -> Vec { - self.idle.iter().cloned() + let mut unique: Vec<_> = self.idle.iter().cloned() .chain(self.blocks_requests.keys().cloned()) .chain(self.inventory_requests.iter().cloned()) - .collect() + .collect(); + // need stable (for tests) && unique peers here, as blocks_requests can intersect with inventory_requests + unique.sort(); + unique.dedup(); + unique } /// Get idle peers for inventory request. @@ -125,6 +138,14 @@ impl Peers { self.blocks_requests.get(&peer_index).cloned() } + /// True if peer already has block with this hash + pub fn has_block_with_hash(&self, peer_index: usize, hash: &H256) -> bool { + self.last_block_responses + .get(&peer_index) + .map(|h| h.contains_key(hash)) + .unwrap_or(false) + } + /// Mark peer as useful. pub fn useful_peer(&mut self, peer_index: usize) { // if peer is unknown => insert to idle queue @@ -162,6 +183,7 @@ impl Peers { self.blocks_requests_order.remove(&peer_index); self.inventory_requests.remove(&peer_index); self.inventory_requests_order.remove(&peer_index); + self.last_block_responses.remove(&peer_index); peer_blocks_requests .map(|hs| hs.into_iter().collect()) } @@ -188,6 +210,16 @@ impl Peers { if try_mark_as_idle { self.try_mark_idle(peer_index); } + + // TODO: add test for it + // remember that peer knows about this block + let last_block_responses_entry = self.last_block_responses.entry(peer_index).or_insert_with(LinkedHashMap::default); + if !last_block_responses_entry.contains_key(block_hash) { + if last_block_responses_entry.len() == MAX_LAST_BLOCKS_TO_STORE { + last_block_responses_entry.pop_front(); + } + last_block_responses_entry.insert(block_hash.clone(), ()); + } } /// Inventory received from peer. diff --git a/sync/src/synchronization_server.rs b/sync/src/synchronization_server.rs index 6ac83264..63dc6006 100644 --- a/sync/src/synchronization_server.rs +++ b/sync/src/synchronization_server.rs @@ -472,7 +472,7 @@ pub mod tests { use db; use test_data; use primitives::hash::H256; - use chain::{Transaction, RepresentH256}; + use chain::Transaction; use message::types; use message::common::{InventoryVector, InventoryType}; use synchronization_executor::Task; diff --git a/sync/src/synchronization_verifier.rs b/sync/src/synchronization_verifier.rs index d65b6670..67bb95e7 100644 --- a/sync/src/synchronization_verifier.rs +++ b/sync/src/synchronization_verifier.rs @@ -2,7 +2,7 @@ use std::thread; use std::sync::Arc; use std::sync::mpsc::{channel, Sender, Receiver}; use parking_lot::Mutex; -use chain::{Block, Transaction, RepresentH256}; +use chain::{Block, Transaction}; use message::common::ConsensusParams; use primitives::hash::H256; use verification::{ChainVerifier, Verify as VerificationVerify}; @@ -151,7 +151,7 @@ pub mod tests { use std::sync::Arc; use std::collections::HashMap; use parking_lot::Mutex; - use chain::{Block, Transaction, RepresentH256}; + use chain::{Block, Transaction}; use synchronization_client::SynchronizationClientCore; use synchronization_executor::tests::DummyTaskExecutor; use primitives::hash::H256; From e6c182a90434c703178f2c61b957469deaf3bad9 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 22 Nov 2016 12:00:38 +0300 Subject: [PATCH 6/9] nbits max check --- verification/src/utils.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/verification/src/utils.rs b/verification/src/utils.rs index c0b175c7..484673a6 100644 --- a/verification/src/utils.rs +++ b/verification/src/utils.rs @@ -4,7 +4,11 @@ use byteorder::{BigEndian, ByteOrder}; use chain; use script::{self, Script}; +const MAX_NBITS: u32 = 0x207fffff; + pub fn check_nbits(hash: &H256, n_bits: u32) -> bool { + if n_bits > MAX_NBITS { return false; } + let hash_bytes: &[u8] = &**hash; let mut nb = [0u8; 4]; From 63027abe27282d19c0182dac1c578eb3e59c45e9 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Tue, 22 Nov 2016 12:06:26 +0300 Subject: [PATCH 7/9] get rid of HashMap order dependency in test --- sync/src/synchronization_client.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 44d09ff2..873faa0a 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -1832,14 +1832,7 @@ pub mod tests { { let tasks = executor.lock().take_tasks(); let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: b3.hash() }]; - assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), - Task::RequestBlocks(2, vec![b3.hash()]), - Task::SendInventory(2, inventory, ServerTaskIndex::None), - Task::RequestBlocksHeaders(1), - Task::RequestMemoryPool(1), - Task::RequestBlocksHeaders(2), - Task::RequestMemoryPool(2), - ]); + assert!(tasks.iter().any(|t| t == &Task::SendInventory(2, inventory.clone(), ServerTaskIndex::None))); } } } From 2372234d9c445a3cac6d21e0db4a955885c5041f Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Tue, 22 Nov 2016 15:08:29 +0300 Subject: [PATCH 8/9] added P2WPKH script discovery --- script/src/script.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/script/src/script.rs b/script/src/script.rs index 6a07bf89..9406b27d 100644 --- a/script/src/script.rs +++ b/script/src/script.rs @@ -28,6 +28,8 @@ pub enum ScriptType { ScriptHash, Multisig, NullData, + WitnessScript, + WitnessKey, } /// Serialized script, used inside transaction inputs and outputs. @@ -99,6 +101,13 @@ impl Script { self.data[22] == Opcode::OP_EQUAL as u8 } + /// Extra-fast test for pay-to-witness-key-hash scripts. + pub fn is_pay_to_witness_key_hash(&self) -> bool { + self.data.len() == 22 && + self.data[0] == Opcode::OP_0 as u8 && + self.data[1] == Opcode::OP_PUSHBYTES_20 as u8 + } + /// Extra-fast test for pay-to-witness-script-hash scripts. pub fn is_pay_to_witness_script_hash(&self) -> bool { self.data.len() == 34 && @@ -470,6 +479,14 @@ mod tests { assert!(!script2.is_pay_to_script_hash()); } + #[test] + fn test_is_pay_to_witness_key_hash() { + let script: Script = "00140000000000000000000000000000000000000000".into(); + let script2: Script = "01140000000000000000000000000000000000000000".into(); + assert!(script.is_pay_to_witness_key_hash()); + assert!(!script2.is_pay_to_witness_key_hash()); + } + #[test] fn test_is_pay_to_witness_script_hash() { let script: Script = "00203b80842f4ea32806ce5e723a255ddd6490cfd28dac38c58bf9254c0577330693".into(); From 9f1fcec6e97600d2369cbde41647949b60008e71 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Tue, 22 Nov 2016 15:57:19 +0300 Subject: [PATCH 9/9] FilterLoad::flags is now enum --- message/src/types/filterload.rs | 47 +++++++++++++++++++++++++++++++-- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/message/src/types/filterload.rs b/message/src/types/filterload.rs index 75188a3c..5d766288 100644 --- a/message/src/types/filterload.rs +++ b/message/src/types/filterload.rs @@ -1,15 +1,28 @@ use std::io; use bytes::Bytes; -use ser::{Stream, Reader}; +use ser::{Serializable, Deserializable, Stream, Reader, Error as ReaderError}; use {Payload, MessageResult}; +#[derive(Debug, PartialEq, Clone, Copy)] +#[repr(u8)] +/// Controls how the filter is updated after match is found. +pub enum FilterFlags { + /// Means the filter is not adjusted when a match is found. + None = 0, + /// Means if the filter matches any data element in a scriptPubKey the outpoint is serialized and inserted into the filter. + All = 1, + /// Means the outpoint is inserted into the filter only if a data element in the scriptPubKey is matched, and that script is + /// of the standard "pay to pubkey" or "pay to multisig" forms. + PubKeyOnly = 2, +} + #[derive(Debug, PartialEq)] pub struct FilterLoad { // TODO: check how this should be serialized pub filter: Bytes, pub hash_functions: u32, pub tweak: u32, - pub flags: u8, + pub flags: FilterFlags, } impl Payload for FilterLoad { @@ -41,3 +54,33 @@ impl Payload for FilterLoad { Ok(()) } } + +impl FilterFlags { + pub fn from_u8(v: u8) -> Option { + match v { + 0 => Some(FilterFlags::None), + 1 => Some(FilterFlags::All), + 2 => Some(FilterFlags::PubKeyOnly), + _ => None, + } + } +} + +impl From for u8 { + fn from(i: FilterFlags) -> Self { + i as u8 + } +} + +impl Serializable for FilterFlags { + fn serialize(&self, stream: &mut Stream) { + stream.append(&u8::from(*self)); + } +} + +impl Deserializable for FilterFlags { + fn deserialize(reader: &mut Reader) -> Result where T: io::Read { + let t: u8 = try!(reader.read()); + FilterFlags::from_u8(t).ok_or(ReaderError::MalformedData) + } +}