diff --git a/sync/src/connection_filter.rs b/sync/src/connection_filter.rs index 8b6da7dc..5ba12068 100644 --- a/sync/src/connection_filter.rs +++ b/sync/src/connection_filter.rs @@ -24,7 +24,7 @@ pub struct ConnectionFilter { /// Filter update type. filter_flags: types::FilterFlags, /// Last blocks from peer. - last_blocks: LinkedHashMap, + last_blocks: LinkedHashMap, /// Last transactions from peer. last_transactions: LinkedHashMap, /// Minimal fee in satoshis per 1000 bytes @@ -91,14 +91,14 @@ impl ConnectionFilter { } /// We have a knowledge that block with given hash is known to this connection - pub fn known_block(&mut self, block_hash: &H256) { + pub fn known_block(&mut self, block_hash: &H256, is_sent_compact: bool) { // remember that peer knows about this block if !self.last_blocks.contains_key(block_hash) { if self.last_blocks.len() == MAX_LAST_BLOCKS_TO_STORE { self.last_blocks.pop_front(); } - self.last_blocks.insert(block_hash.clone(), ()); + self.last_blocks.insert(block_hash.clone(), is_sent_compact); } } @@ -114,6 +114,11 @@ impl ConnectionFilter { } } + /// Is compact block with this hash has been sent recently + pub fn is_known_compact_block(&self, block_hash: &H256) -> bool { + self.last_blocks.get(block_hash).cloned().unwrap_or(false) + } + /// Check if block should be sent to this connection pub fn filter_block(&self, block_hash: &H256) -> bool { // check if block is known @@ -662,15 +667,15 @@ pub mod tests { let mut filter = ConnectionFilter::default(); assert!(filter.filter_block(&blocks[0].hash())); - filter.known_block(&blocks[0].hash()); + filter.known_block(&blocks[0].hash(), false); assert!(!filter.filter_block(&blocks[0].hash())); for block in blocks.iter().skip(1).take(MAX_LAST_BLOCKS_TO_STORE - 1) { - filter.known_block(&block.hash()); + filter.known_block(&block.hash(), false); assert!(!filter.filter_block(&blocks[0].hash())); } - filter.known_block(&blocks[MAX_LAST_BLOCKS_TO_STORE].hash()); + filter.known_block(&blocks[MAX_LAST_BLOCKS_TO_STORE].hash(), false); assert!(filter.filter_block(&blocks[0].hash())); } @@ -694,4 +699,13 @@ pub mod tests { filter.known_transaction(&transactions[MAX_LAST_TRANSACTIONS_TO_STORE].hash()); assert!(filter.filter_transaction(&transactions[0].hash(), &transactions[0], None)); } + + #[test] + fn known_compact_block() { + let mut filter = ConnectionFilter::default(); + filter.known_block(&test_data::block_h1().hash(), true); + filter.known_block(&test_data::block_h2().hash(), false); + assert!(filter.is_known_compact_block(&test_data::block_h1().hash())); + assert!(!filter.is_known_compact_block(&test_data::block_h2().hash())); + } } diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index d46fe4be..6222c3cf 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -216,7 +216,15 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon pub fn on_peer_get_block_txn(&self, peer_index: usize, message: types::GetBlockTxn) { trace!(target: "sync", "Got `getblocktxn` message from peer#{}", peer_index); - self.server.serve_get_block_txn(peer_index, message.request.blockhash, message.request.indexes).map(|t| self.server.add_task(peer_index, t)); + // Upon receipt of a properly-formatted getblocktxn message, nodes which *recently provided the sender + // of such a message a cmpctblock for the block hash identified in this message* MUST respond ... + // => we should check if we have send cmpctblock before + if { + let mut client = self.client.lock(); + client.is_compact_block_sent_recently(peer_index, &message.request.blockhash) + } { + self.server.serve_get_block_txn(peer_index, message.request.blockhash, message.request.indexes).map(|t| self.server.add_task(peer_index, t)); + } } pub fn on_peer_block_txn(&self, peer_index: usize, _message: types::BlockTxn) { @@ -256,7 +264,7 @@ mod tests { use synchronization_chain::Chain; use p2p::{event_loop, OutboundSyncConnection, OutboundSyncConnectionRef}; use message::types; - use message::common::{InventoryVector, InventoryType}; + use message::common::{InventoryVector, InventoryType, BlockTransactionsRequest}; use db; use super::LocalNode; use test_data; @@ -475,4 +483,67 @@ mod tests { _ => panic!("unexpected"), } } + + #[test] + fn local_node_serves_get_block_txn_when_recently_sent_compact_block() { + let (_, _, _, server, local_node) = create_local_node(); + + let genesis = test_data::genesis(); + let b1 = test_data::block_builder().header().parent(genesis.hash()).build() + .transaction().output().value(10).build().build() + .build(); // genesis -> b1 + let b1_hash = b1.hash(); + + // Append block + let peer_index1 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new()); + local_node.on_peer_block(peer_index1, types::Block { block: b1.clone() }); + + // Request compact block + let peer_index2 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new()); + local_node.on_peer_getdata(peer_index2, types::GetData {inventory: vec![ + InventoryVector { inv_type: InventoryType::MessageCompactBlock, hash: b1_hash.clone() }, + ]}); + + // forget tasks + server.take_tasks(); + + // Request compact transaction from this block + local_node.on_peer_get_block_txn(peer_index2, types::GetBlockTxn { + request: BlockTransactionsRequest { + blockhash: b1_hash.clone(), + indexes: vec![0], + } + }); + + let tasks = server.take_tasks(); + assert_eq!(tasks, vec![(2, ServerTask::ServeGetBlockTxn(b1_hash, vec![0]))]); + + } + + #[test] + fn local_node_not_serves_get_block_txn_when_compact_block_was_not_sent() { + let (_, _, _, server, local_node) = create_local_node(); + + let genesis = test_data::genesis(); + let b1 = test_data::block_builder().header().parent(genesis.hash()).build() + .transaction().output().value(10).build().build() + .build(); // genesis -> b1 + let b1_hash = b1.hash(); + + // Append block + let peer_index1 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new()); + local_node.on_peer_block(peer_index1, types::Block { block: b1.clone() }); + + // Request compact transaction from this block + let peer_index2 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new()); + local_node.on_peer_get_block_txn(peer_index2, types::GetBlockTxn { + request: BlockTransactionsRequest { + blockhash: b1_hash, + indexes: vec![0], + } + }); + + let tasks = server.take_tasks(); + assert_eq!(tasks, vec![]); + } } diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 1bb07f48..9b54f738 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -185,6 +185,7 @@ pub struct Information { pub trait Client : Send + 'static { fn best_block(&self) -> db::BestBlock; fn state(&self) -> State; + fn is_compact_block_sent_recently(&mut self, peer_index: usize, hash: &H256) -> bool; fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec) -> FilteredInventory; fn on_peer_connected(&mut self, peer_index: usize); fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec); @@ -206,6 +207,7 @@ pub trait Client : Send + 'static { pub trait ClientCore : VerificationSink { fn best_block(&self) -> db::BestBlock; fn state(&self) -> State; + fn is_compact_block_sent_recently(&mut self, peer_index: usize, hash: &H256) -> bool; fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec) -> FilteredInventory; fn on_peer_connected(&mut self, peer_index: usize); fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec); @@ -353,6 +355,10 @@ impl Client for SynchronizationClient where T: TaskExecutor, U: Veri self.core.lock().state() } + fn is_compact_block_sent_recently(&mut self, peer_index: usize, hash: &H256) -> bool { + self.core.lock().is_compact_block_sent_recently(peer_index, hash) + } + fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec) -> FilteredInventory { self.core.lock().filter_getdata_inventory(peer_index, inventory) } @@ -469,6 +475,11 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { self.state } + /// Returns true if compactblock with this hash has been sent to this peer recently + fn is_compact_block_sent_recently(&mut self, peer_index: usize, hash: &H256) -> bool { + self.peers.filter(peer_index).is_known_compact_block(hash) + } + /// Filter inventory from `getdata` message for given peer fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec) -> FilteredInventory { let storage = { self.chain.read().storage() }; @@ -489,7 +500,10 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { None => notfound.push(item), Some(block) => match filter.build_merkle_block(block) { None => notfound.push(item), - Some(merkleblock) => filtered.push((merkleblock.merkleblock, merkleblock.matching_transactions)), + Some(merkleblock) => { + filtered.push((merkleblock.merkleblock, merkleblock.matching_transactions)); + filter.known_block(&item.hash, false); + }, }, } }, @@ -508,6 +522,7 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { header: build_compact_block(indexed_block, prefilled_transactions_indexes), }; compacted.push(compact_block); + filter.known_block(&item.hash, true); }, } }, diff --git a/sync/src/synchronization_executor.rs b/sync/src/synchronization_executor.rs index d26c21a8..08289c8c 100644 --- a/sync/src/synchronization_executor.rs +++ b/sync/src/synchronization_executor.rs @@ -77,8 +77,6 @@ impl PeersConnections for LocalSynchronizationTaskExecutor { impl TaskExecutor for LocalSynchronizationTaskExecutor { fn execute(&mut self, task: Task) { - // TODO: what is types::GetBlocks::version here? (@ PR#37) - match task { Task::RequestBlocks(peer_index, blocks_hashes) => { let getdata = types::GetData { @@ -97,7 +95,7 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { Task::RequestBlocksHeaders(peer_index) => { let block_locator_hashes = self.chain.read().block_locator_hashes(); let getheaders = types::GetHeaders { - version: 0, + version: 0, // this field is ignored by clients block_locator_hashes: block_locator_hashes, hash_stop: H256::default(), }; diff --git a/sync/src/synchronization_peers.rs b/sync/src/synchronization_peers.rs index aa7bf7a8..a7540896 100644 --- a/sync/src/synchronization_peers.rs +++ b/sync/src/synchronization_peers.rs @@ -244,7 +244,7 @@ impl Peers { } // remember that peer knows about this block - self.filters.entry(peer_index).or_insert_with(ConnectionFilter::default).known_block(block_hash); + self.filters.entry(peer_index).or_insert_with(ConnectionFilter::default).known_block(block_hash, false); } /// Transaction is received from peer. diff --git a/sync/src/synchronization_server.rs b/sync/src/synchronization_server.rs index e775ba90..235b8491 100644 --- a/sync/src/synchronization_server.rs +++ b/sync/src/synchronization_server.rs @@ -123,14 +123,27 @@ impl SynchronizationServer { server } - fn locate_known_block_hash(chain: &Chain, block_locator_hashes: &Vec) -> Option { + fn locate_known_block_hash(chain: &Chain, block_locator_hashes: &Vec, stop_hash: &H256) -> Option { block_locator_hashes.into_iter() .filter_map(|hash| SynchronizationServer::locate_best_known_block_hash(&chain, hash)) .nth(0) + .or_else(|| if stop_hash != &H256::default() { + if let Some(stop_hash_number) = chain.storage().block_number(stop_hash) { + Some(db::BestBlock { + number: stop_hash_number, + hash: stop_hash.clone(), + }) + } else { + None + } + } else { + None + } + ) } - fn locate_known_block_header(chain: &Chain, block_locator_hashes: &Vec) -> Option { - SynchronizationServer::locate_known_block_hash(chain, block_locator_hashes) + fn locate_known_block_header(chain: &Chain, block_locator_hashes: &Vec, stop_hash: &H256) -> Option { + SynchronizationServer::locate_known_block_hash(chain, block_locator_hashes, stop_hash) } fn server_worker(queue_ready: Arc, queue: Arc>, chain: ChainRef, executor: Arc>) { @@ -223,22 +236,23 @@ impl SynchronizationServer { assert_eq!(indexed_task.id, ServerTaskIndex::None); let chain = chain.read(); - if let Some(best_common_block) = SynchronizationServer::locate_known_block_hash(&chain, &block_locator_hashes) { - trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash); + let blocks_hashes = match SynchronizationServer::locate_known_block_hash(&chain, &block_locator_hashes, &hash_stop) { + Some(best_common_block) => { + trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash); + SynchronizationServer::blocks_hashes_after(&chain, &best_common_block, &hash_stop, 500) + }, + None => { + trace!(target: "sync", "No common blocks with peer#{}", peer_index); + Vec::new() + }, + }; - let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_common_block, &hash_stop, 500); - if !blocks_hashes.is_empty() { - trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", blocks_hashes.len(), peer_index); - let inventory = blocks_hashes.into_iter().map(|hash| InventoryVector { - inv_type: InventoryType::MessageBlock, - hash: hash, - }).collect(); - executor.lock().execute(Task::SendInventory(peer_index, inventory)); - } - } - else { - trace!(target: "sync", "No common blocks with peer#{}", peer_index); - } + trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", blocks_hashes.len(), peer_index); + let inventory = blocks_hashes.into_iter().map(|hash| InventoryVector { + inv_type: InventoryType::MessageBlock, + hash: hash, + }).collect(); + executor.lock().execute(Task::SendInventory(peer_index, inventory)); // inform that we have processed task for peer queue.lock().task_processed(peer_index); }, @@ -246,19 +260,15 @@ impl SynchronizationServer { ServerTask::ServeGetHeaders(block_locator_hashes, hash_stop) => { let chain = chain.read(); - // TODO: if block_locator_hashes is empty => return hash_stop - let blocks_headers = match SynchronizationServer::locate_known_block_header(&chain, &block_locator_hashes) { + let blocks_headers = match SynchronizationServer::locate_known_block_header(&chain, &block_locator_hashes, &hash_stop) { Some(best_common_block) => { trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash.to_reversed_str()); - - // TODO: add test for this case - // we must respond with empty headers message even if we have no common blocks with this peer SynchronizationServer::blocks_headers_after(&chain, &best_common_block, &hash_stop, 2000) }, None => { trace!(target: "sync", "No common blocks headers with peer#{}", peer_index); Vec::new() - } + }, }; trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_headers.len(), peer_index); @@ -447,8 +457,6 @@ impl Server for SynchronizationServer { } fn serve_get_block_txn(&self, _peer_index: usize, block_hash: H256, indexes: Vec) -> Option { - // TODO: Upon receipt of a properly-formatted getblocktxn message, nodes which *recently provided the sender - // of such a message a cmpctblock for the block hash identified in this message* MUST respond ... let task = IndexedServerTask::new(ServerTask::ServeGetBlockTxn(block_hash, indexes), ServerTaskIndex::None); Some(task) } @@ -650,9 +658,9 @@ pub mod tests { block_locator_hashes: vec![genesis_block_hash.clone()], hash_stop: H256::default(), }).map(|t| server.add_task(0, t)); - // => no response + // => empty response let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout - assert_eq!(tasks, vec![]); + assert_eq!(tasks, vec![Task::SendInventory(0, vec![])]); } #[test] @@ -815,4 +823,48 @@ pub mod tests { Task::SendTransaction(0, tx_verified), ]); } + + #[test] + fn server_responds_with_nonempty_inventory_when_getdata_stop_hash_filled() { + let (chain, executor, server) = create_synchronization_server(); + { + let mut chain = chain.write(); + chain.insert_best_block(test_data::block_h1().hash(), &test_data::block_h1().into()).expect("no error"); + } + // when asking with stop_hash + server.serve_getblocks(0, types::GetBlocks { + version: 0, + block_locator_hashes: vec![], + hash_stop: test_data::genesis().hash(), + }).map(|t| server.add_task(0, t)); + // => respond with next block + let inventory = vec![InventoryVector { + inv_type: InventoryType::MessageBlock, + hash: test_data::block_h1().hash(), + }]; + let tasks = DummyTaskExecutor::wait_tasks(executor); + assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]); + } + + #[test] + fn server_responds_with_nonempty_headers_when_getdata_stop_hash_filled() { + let (chain, executor, server) = create_synchronization_server(); + { + let mut chain = chain.write(); + chain.insert_best_block(test_data::block_h1().hash(), &test_data::block_h1().into()).expect("no error"); + } + // when asking with stop_hash + let dummy_id = 6; + server.serve_getheaders(0, types::GetHeaders { + version: 0, + block_locator_hashes: vec![], + hash_stop: test_data::genesis().hash(), + }, Some(dummy_id)).map(|t| server.add_task(0, t)); + // => respond with next block + let headers = vec![ + test_data::block_h1().block_header, + ]; + let tasks = DummyTaskExecutor::wait_tasks(executor); + assert_eq!(tasks, vec![Task::SendHeaders(0, headers, ServerTaskIndex::Final(dummy_id))]); + } } diff --git a/sync/src/synchronization_verifier.rs b/sync/src/synchronization_verifier.rs index e370633f..f7a0e87e 100644 --- a/sync/src/synchronization_verifier.rs +++ b/sync/src/synchronization_verifier.rs @@ -9,6 +9,7 @@ use primitives::hash::H256; use synchronization_chain::ChainRef; use verification::{ChainVerifier, Verify as VerificationVerify, Chain}; use db::{SharedStore, IndexedBlock, PreviousTransactionOutputProvider}; +use time::get_time; /// Verification events sink pub trait VerificationSink : Send + 'static { @@ -169,11 +170,8 @@ fn execute_verification_task { - // bitcoin: AcceptToMemoryPoolWorker - - let time: u32 = 0; // TODO - let sequence: usize = 1; // TODO: change to bool - match verifier.verify_transaction(tx_output_provider, height, time, &transaction, sequence) { + let time: u32 = get_time().sec as u32; + match verifier.verify_transaction(tx_output_provider, height, time, &transaction, 1) { Ok(_) => sink.lock().on_transaction_verification_success(transaction), Err(e) => sink.lock().on_transaction_verification_error(&format!("{:?}", e), &transaction.hash()), }