diff --git a/Cargo.lock b/Cargo.lock index ce196a5f..ff5e1b52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -584,6 +584,7 @@ dependencies = [ "p2p 0.1.0", "parking_lot 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "primitives 0.1.0", + "test-data 0.1.0", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", "verification 0.1.0", ] diff --git a/db/src/storage.rs b/db/src/storage.rs index 51a3df9e..b46250f5 100644 --- a/db/src/storage.rs +++ b/db/src/storage.rs @@ -20,7 +20,7 @@ const COL_BLOCK_HEADERS: u32 = 2; const COL_BLOCK_TRANSACTIONS: u32 = 3; const COL_TRANSACTIONS: u32 = 4; const COL_TRANSACTIONS_META: u32 = 5; -const _COL_RESERVED2: u32 = 6; +const COL_BLOCK_NUMBERS: u32 = 6; const _COL_RESERVED3: u32 = 7; const _COL_RESERVED4: u32 = 8; const _COL_RESERVED5: u32 = 9; @@ -266,8 +266,9 @@ impl Store for Storage { self.best_block.read().clone() } - fn block_number(&self, _hash: &H256) -> Option { - unimplemented!() + fn block_number(&self, hash: &H256) -> Option { + self.get(COL_BLOCK_NUMBERS, &**hash) + .map(|val| LittleEndian::read_u32(&val)) } fn block_hash(&self, number: u32) -> Option { @@ -356,7 +357,8 @@ impl Store for Storage { transaction.write_u32(Some(COL_META), KEY_BEST_BLOCK_NUMBER, new_best_number); // updating main chain height reference - transaction.put(Some(COL_BLOCK_HASHES), &u32_key(new_best_number), std::ops::Deref::deref(&block_hash)) + transaction.put(Some(COL_BLOCK_HASHES), &u32_key(new_best_number), std::ops::Deref::deref(&block_hash)); + transaction.write_u32(Some(COL_BLOCK_NUMBERS), std::ops::Deref::deref(&block_hash), new_best_number); } transaction.put(Some(COL_META), KEY_BEST_BLOCK_HASH, std::ops::Deref::deref(&new_best_hash)); @@ -470,6 +472,18 @@ mod tests { assert_eq!(loaded_transaction.hash(), block.transactions()[0].hash()); } + #[test] + fn stores_block_number() { + let path = RandomTempPath::create_dir(); + let store = Storage::new(path.as_path()).unwrap(); + + let block: Block = test_data::block_h9(); + store.insert_block(&block).unwrap(); + + let number = store.block_number(&block.hash()).unwrap(); + assert_eq!(0, number); + } + #[test] fn transaction_meta_update() { let path = RandomTempPath::create_dir(); diff --git a/p2p/src/protocol/sync.rs b/p2p/src/protocol/sync.rs index 01eb89d6..f3257e0f 100644 --- a/p2p/src/protocol/sync.rs +++ b/p2p/src/protocol/sync.rs @@ -37,6 +37,7 @@ pub trait InboundSyncConnection : Send + Sync { fn on_compact_block(&self, message: types::CompactBlock); fn on_get_block_txn(&self, message: types::GetBlockTxn); fn on_block_txn(&self, message: types::BlockTxn); + fn on_notfound(&self, message: types::NotFound); } pub trait OutboundSyncConnection : Send + Sync { @@ -58,6 +59,7 @@ pub trait OutboundSyncConnection : Send + Sync { fn send_compact_block(&self, message: &types::CompactBlock); fn send_get_block_txn(&self, message: &types::GetBlockTxn); fn send_block_txn(&self, message: &types::BlockTxn); + fn send_notfound(&self, message: &types::NotFound); } struct OutboundSync { @@ -155,6 +157,10 @@ impl OutboundSyncConnection for OutboundSync { fn send_block_txn(&self, message: &types::BlockTxn) { self.send_message(message); } + + fn send_notfound(&self, message: &types::NotFound) { + self.send_message(message); + } } pub struct SyncProtocol { @@ -249,6 +255,10 @@ impl Protocol for SyncProtocol { let message: types::BlockTxn = try!(deserialize_payload(payload, version)); self.inbound_connection.on_block_txn(message); } + else if command == &types::NotFound::command() { + let message: types::NotFound = try!(deserialize_payload(payload, version)); + self.inbound_connection.on_notfound(message); + } Ok(()) } diff --git a/p2p/src/util/node_table.rs b/p2p/src/util/node_table.rs index 8ace79fe..783127d2 100644 --- a/p2p/src/util/node_table.rs +++ b/p2p/src/util/node_table.rs @@ -60,7 +60,12 @@ impl From for NodeByScore { impl PartialOrd for NodeByScore { fn partial_cmp(&self, other: &Self) -> Option { if self.0.failures == other.0.failures { - other.0.time.partial_cmp(&self.0.time) + if other.0.time == self.0.time { + other.0.partial_cmp(&self.0) + } + else { + other.0.time.partial_cmp(&self.0.time) + } } else { self.0.failures.partial_cmp(&other.0.failures) } @@ -70,7 +75,12 @@ impl PartialOrd for NodeByScore { impl Ord for NodeByScore { fn cmp(&self, other: &Self) -> Ordering { if self.0.failures == other.0.failures { - other.0.time.cmp(&self.0.time) + if other.0.time == self.0.time { + other.0.cmp(&self.0) + } + else { + other.0.time.cmp(&self.0.time) + } } else { self.0.failures.cmp(&other.0.failures) } @@ -88,13 +98,63 @@ impl From for NodeByTime { impl PartialOrd for NodeByTime { fn partial_cmp(&self, other: &Self) -> Option { - other.0.time.partial_cmp(&self.0.time) + if other.0.time == self.0.time { + other.0.partial_cmp(&self.0) + } + else { + other.0.time.partial_cmp(&self.0.time) + } } } impl Ord for NodeByTime { fn cmp(&self, other: &Self) -> Ordering { - other.0.time.cmp(&self.0.time) + if other.0.time == self.0.time { + other.0.cmp(&self.0) + } + else { + other.0.time.cmp(&self.0.time) + } + } +} + +impl Ord for Node { + fn cmp(&self, other: &Self) -> Ordering { + // some ordering using address as unique key + match self.addr { + SocketAddr::V4(self_addr) => match other.addr { + SocketAddr::V4(other_addr) => { + let self_port = self_addr.port(); + let other_port = other_addr.port(); + if self_port == other_port { + self_addr.ip().cmp(&other_addr.ip()) + } + else { + self_port.cmp(&other_port) + } + }, + SocketAddr::V6(_) => Ordering::Less, + }, + SocketAddr::V6(self_addr) => match other.addr { + SocketAddr::V4(_) => Ordering::Greater, + SocketAddr::V6(other_addr) => { + let self_port = self_addr.port(); + let other_port = other_addr.port(); + if self_port == other_port { + self_addr.ip().cmp(&other_addr.ip()) + } + else { + self_port.cmp(&other_port) + } + }, + }, + } + } +} + +impl PartialOrd for Node { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) } } @@ -224,7 +284,7 @@ impl NodeTable where T: Time { mod tests { use std::net::SocketAddr; use message::common::Services; - use util::time::IncrementalTime; + use util::time::{IncrementalTime, ZeroTime}; use super::NodeTable; #[test] @@ -311,4 +371,14 @@ mod tests { assert_eq!(nodes[4].failures, 0); } + #[test] + fn test_node_table_duplicates() { + let s0: SocketAddr = "127.0.0.1:8000".parse().unwrap(); + let s1: SocketAddr = "127.0.0.1:8001".parse().unwrap(); + let mut table = NodeTable::::default(); + table.insert(s0, Services::default()); + table.insert(s1, Services::default()); + table.note_failure(&s0); + table.note_failure(&s1); + } } diff --git a/p2p/src/util/time.rs b/p2p/src/util/time.rs index 09fb71a1..94ab6e63 100644 --- a/p2p/src/util/time.rs +++ b/p2p/src/util/time.rs @@ -27,3 +27,13 @@ impl Time for IncrementalTime { result } } + +#[derive(Default)] +pub struct ZeroTime { +} + +impl Time for ZeroTime { + fn get(&self) -> time::Timespec { + time::Timespec::new(0, 0) + } +} diff --git a/sync/Cargo.toml b/sync/Cargo.toml index fb0b5dd8..98711daa 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -13,4 +13,5 @@ db = { path = "../db" } message = { path = "../message" } p2p = { path = "../p2p" } primitives = { path = "../primitives" } +test-data = { path = "../test-data" } verification = { path = "../verification" } diff --git a/sync/src/inbound_connection.rs b/sync/src/inbound_connection.rs index f77956ff..02b89e4d 100644 --- a/sync/src/inbound_connection.rs +++ b/sync/src/inbound_connection.rs @@ -1,15 +1,14 @@ use message::types; use p2p::{InboundSyncConnection, InboundSyncConnectionRef}; use local_node::LocalNodeRef; -use synchronization_executor::LocalSynchronizationTaskExecutor; pub struct InboundConnection { - local_node: LocalNodeRef, + local_node: LocalNodeRef, peer_index: usize, } impl InboundConnection { - pub fn new(local_node: LocalNodeRef, peer_index: usize) -> InboundSyncConnectionRef { + pub fn new(local_node: LocalNodeRef, peer_index: usize) -> InboundSyncConnectionRef { Box::new(InboundConnection { local_node: local_node, peer_index: peer_index, @@ -97,4 +96,8 @@ impl InboundSyncConnection for InboundConnection { fn on_block_txn(&self, message: types::BlockTxn) { self.local_node.on_peer_block_txn(self.peer_index, message); } + + fn on_notfound(&self, message: types::NotFound) { + self.local_node.on_peer_notfound(self.peer_index, message); + } } diff --git a/sync/src/inbound_connection_factory.rs b/sync/src/inbound_connection_factory.rs index 578e3ec6..5b529d71 100644 --- a/sync/src/inbound_connection_factory.rs +++ b/sync/src/inbound_connection_factory.rs @@ -1,14 +1,13 @@ use p2p::{LocalSyncNode, LocalSyncNodeRef, OutboundSyncConnectionRef, InboundSyncConnectionRef}; use local_node::LocalNodeRef; use inbound_connection::InboundConnection; -use synchronization_executor::LocalSynchronizationTaskExecutor; pub struct InboundConnectionFactory { - local_node: LocalNodeRef, + local_node: LocalNodeRef, } impl InboundConnectionFactory { - pub fn with_local_node(local_node: LocalNodeRef) -> LocalSyncNodeRef { + pub fn with_local_node(local_node: LocalNodeRef) -> LocalSyncNodeRef { Box::new( InboundConnectionFactory { local_node: local_node, diff --git a/sync/src/lib.rs b/sync/src/lib.rs index a5f35167..999bccc9 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -6,6 +6,7 @@ extern crate message; extern crate p2p; extern crate parking_lot; extern crate primitives; +extern crate test_data; extern crate time; extern crate verification; @@ -13,13 +14,14 @@ mod hash_queue; mod inbound_connection; mod inbound_connection_factory; mod local_node; -mod synchronization; mod synchronization_chain; +mod synchronization_client; mod synchronization_executor; mod synchronization_peers; +mod synchronization_server; use std::sync::Arc; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; /// Create inbound synchronization connections factory for given `db`. pub fn create_sync_connection_factory(db: Arc) -> p2p::LocalSyncNodeRef { @@ -27,9 +29,13 @@ pub fn create_sync_connection_factory(db: Arc) -> p2p::LocalSyncNodeR use synchronization_executor::LocalSynchronizationTaskExecutor as SyncExecutor; use local_node::LocalNode as SyncNode; use inbound_connection_factory::InboundConnectionFactory as SyncConnectionFactory; + use synchronization_server::SynchronizationServer; + use synchronization_client::{SynchronizationClient, Config as SynchronizationConfig}; let sync_chain = Arc::new(RwLock::new(SyncChain::new(db))); let sync_executor = SyncExecutor::new(sync_chain.clone()); - let sync_node = SyncNode::new(sync_chain, sync_executor); + let sync_server = Arc::new(Mutex::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone()))); + let sync_client = SynchronizationClient::new(SynchronizationConfig::default(), sync_executor.clone(), sync_chain); + let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor)); SyncConnectionFactory::with_local_node(sync_node) } diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index 8fa7258a..1bf9d0b1 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -6,26 +6,24 @@ use chain::RepresentH256; use p2p::OutboundSyncConnectionRef; use message::common::InventoryType; use message::types; -use synchronization::{Synchronization, SynchronizationRef, Config as SynchronizationConfig, Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor}; -use synchronization_chain::ChainRef; +use synchronization_client::{Client, SynchronizationClient}; +use synchronization_executor::{Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor, LocalSynchronizationTaskExecutor}; +use synchronization_server::{Server, SynchronizationServer}; -/// Thread-safe reference to the `LocalNode`. -/// Locks order: -/// 1) sync Mutex -/// 2) executor Mutex -/// 2) chain RwLock -pub type LocalNodeRef = Arc>; +pub type LocalNodeRef = Arc>>; /// Local synchronization node -pub struct LocalNode { +pub struct LocalNode { /// Throughout counter of synchronization peers peer_counter: AtomicUsize, - /// Synchronization chain - chain: ChainRef, /// Synchronization executor executor: Arc>, /// Synchronization process - sync: SynchronizationRef, + client: Arc>, + /// Synchronization server + server: Arc>, } /// Peers list @@ -34,21 +32,22 @@ pub trait PeersConnections { fn remove_peer_connection(&mut self, peer_index: usize); } -impl LocalNode where T: SynchronizationTaskExecutor + PeersConnections + Send + 'static { +impl LocalNode where T: SynchronizationTaskExecutor + PeersConnections, + U: Server, + V: Client { /// New synchronization node with given storage - pub fn new(chain: ChainRef, executor: Arc>) -> LocalNodeRef { - let sync = Synchronization::new(SynchronizationConfig::default(), executor.clone(), chain.clone()); - Arc::new(LocalNode { + pub fn new(server: Arc>, client: Arc>, executor: Arc>) -> Self { + LocalNode { peer_counter: AtomicUsize::new(0), - chain: chain, executor: executor, - sync: sync, - }) + client: client, + server: server, + } } /// Best block hash (including non-verified, requested && non-requested blocks) pub fn best_block(&self) -> db::BestBlock { - self.chain.read().best_block() + self.client.lock().best_block() } pub fn create_sync_session(&self, _best_block_height: i32, outbound_connection: OutboundSyncConnectionRef) -> usize { @@ -71,7 +70,7 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersConnections + S trace!(target: "sync", "Stopping sync session with peer#{}", peer_index); self.executor.lock().remove_peer_connection(peer_index); - self.sync.lock().on_peer_disconnected(peer_index); + self.client.lock().on_peer_disconnected(peer_index); } pub fn on_peer_inventory(&self, peer_index: usize, message: types::Inv) { @@ -90,18 +89,22 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersConnections + S // if there are unknown blocks => start synchronizing with peer if !blocks_inventory.is_empty() { - self.sync.lock().on_new_blocks_inventory(peer_index, blocks_inventory); + self.client.lock().on_new_blocks_inventory(peer_index, blocks_inventory); } // TODO: process unknown transactions, etc... } - pub fn on_peer_getdata(&self, peer_index: usize, _message: types::GetData) { + pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData) { trace!(target: "sync", "Got `getdata` message from peer#{}", peer_index); + + self.server.lock().serve_getdata(peer_index, message); } - pub fn on_peer_getblocks(&self, peer_index: usize, _message: types::GetBlocks) { + pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks) { trace!(target: "sync", "Got `getblocks` message from peer#{}", peer_index); + + self.server.lock().serve_getblocks(peer_index, message); } pub fn on_peer_getheaders(&self, peer_index: usize, _message: types::GetHeaders) { @@ -116,7 +119,7 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersConnections + S trace!(target: "sync", "Got `block` message from peer#{}. Block hash: {}", peer_index, message.block.hash()); // try to process new block - self.sync.lock().on_peer_block(peer_index, message.block); + self.client.lock().on_peer_block(peer_index, message.block); } pub fn on_peer_headers(&self, peer_index: usize, _message: types::Headers) { @@ -166,19 +169,29 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersConnections + S pub fn on_peer_block_txn(&self, peer_index: usize, _message: types::BlockTxn) { trace!(target: "sync", "Got `blocktxn` message from peer#{}", peer_index); } + + pub fn on_peer_notfound(&self, peer_index: usize, _message: types::NotFound) { + trace!(target: "sync", "Got `notfound` message from peer#{}", peer_index); + } } #[cfg(test)] mod tests { use std::sync::Arc; use parking_lot::{Mutex, RwLock}; - use synchronization::Task; - use synchronization::tests::DummyTaskExecutor; + use chain::RepresentH256; + use synchronization_executor::Task; + use synchronization_client::tests::DummyTaskExecutor; + use synchronization_client::{Config, SynchronizationClient}; use synchronization_chain::Chain; use p2p::{OutboundSyncConnection, OutboundSyncConnectionRef}; use message::types; + use message::common::{InventoryVector, InventoryType}; use db; use super::LocalNode; + use test_data; + use synchronization_server::ServerTask; + use synchronization_server::tests::DummyServer; struct DummyOutboundSyncConnection; @@ -207,18 +220,47 @@ mod tests { fn send_compact_block(&self, _message: &types::CompactBlock) {} fn send_get_block_txn(&self, _message: &types::GetBlockTxn) {} fn send_block_txn(&self, _message: &types::BlockTxn) {} + fn send_notfound(&self, _message: &types::NotFound) {} + } + + fn create_local_node() -> (Arc>, Arc>, LocalNode>) { + let chain = Arc::new(RwLock::new(Chain::new(Arc::new(db::TestStorage::with_genesis_block())))); + let executor = Arc::new(Mutex::new(DummyTaskExecutor::default())); + let server = Arc::new(Mutex::new(DummyServer::new())); + let config = Config { skip_verification: true }; + let client = SynchronizationClient::new(config, executor.clone(), chain); + let local_node = LocalNode::new(server.clone(), client, executor.clone()); + (executor, server, local_node) } #[test] fn local_node_request_inventory_on_sync_start() { - let chain = Arc::new(RwLock::new(Chain::new(Arc::new(db::TestStorage::with_genesis_block())))); - let executor = Arc::new(Mutex::new(DummyTaskExecutor::default())); - let local_node = LocalNode::new(chain, executor.clone()); + let (executor, _, local_node) = create_local_node(); let peer_index = local_node.create_sync_session(0, DummyOutboundSyncConnection::new()); + // start sync session local_node.start_sync_session(peer_index, 0); - + // => ask for inventory let tasks = executor.lock().take_tasks(); - assert_eq!(tasks.len(), 1); assert_eq!(tasks, vec![Task::RequestInventory(peer_index)]); } -} \ No newline at end of file + + #[test] + fn local_node_serves_block() { + let (_, server, local_node) = create_local_node(); + let peer_index = local_node.create_sync_session(0, DummyOutboundSyncConnection::new()); + // peer requests genesis block + let genesis_block_hash = test_data::genesis().hash(); + let inventory = vec![ + InventoryVector { + inv_type: InventoryType::MessageBlock, + hash: genesis_block_hash.clone(), + } + ]; + local_node.on_peer_getdata(peer_index, types::GetData { + inventory: inventory.clone() + }); + // => `getdata` is served + let tasks = server.lock().take_tasks(); + assert_eq!(tasks, vec![(peer_index, ServerTask::ServeGetData(inventory))]); + } +} diff --git a/sync/src/synchronization_chain.rs b/sync/src/synchronization_chain.rs index e14684f1..48a87887 100644 --- a/sync/src/synchronization_chain.rs +++ b/sync/src/synchronization_chain.rs @@ -199,6 +199,21 @@ impl Chain { } } + /// Get block number from storage + pub fn storage_block_number(&self, hash: &H256) -> Option { + self.storage.block_number(hash) + } + + /// Get block hash from storage + pub fn storage_block_hash(&self, number: u32) -> Option { + self.storage.block_hash(number) + } + + /// Get block from the storage + pub fn storage_block(&self, hash: &H256) -> Option { + self.storage.block(db::BlockRef::Hash(hash.clone())) + } + /// Prepare best block locator hashes pub fn best_block_locator_hashes(&self) -> Vec { let mut result: Vec = Vec::with_capacity(4); diff --git a/sync/src/synchronization.rs b/sync/src/synchronization_client.rs similarity index 83% rename from sync/src/synchronization.rs rename to sync/src/synchronization_client.rs index aa1bee3b..274accd4 100644 --- a/sync/src/synchronization.rs +++ b/sync/src/synchronization_client.rs @@ -15,6 +15,7 @@ use synchronization_chain::{ChainRef, BlockState}; #[cfg(test)] use synchronization_chain::{Information as ChainInformation}; use verification::{ChainVerifier, Error as VerificationError, Verify}; +use synchronization_executor::{Task, TaskExecutor}; use time; ///! Blocks synchronization process: @@ -77,20 +78,7 @@ const MIN_BLOCKS_IN_REQUEST: u32 = 32; /// Maximum number of blocks to request from peer const MAX_BLOCKS_IN_REQUEST: u32 = 512; -/// Thread-safe reference to the `Synchronization` -pub type SynchronizationRef = Arc>>; - -/// Synchronization task for the peer. -#[derive(Debug, Eq, PartialEq)] -pub enum Task { - /// Request given blocks. - RequestBlocks(usize, Vec), - /// Request full inventory using block_locator_hashes. - RequestInventory(usize), - /// Request inventory using best block locator only. - RequestBestInventory(usize), -} - +/// Synchronization state #[derive(Debug, Clone, Copy)] pub enum State { Synchronizing(f64, u32), @@ -111,11 +99,6 @@ pub struct Information { pub orphaned: usize, } -/// Synchronization task executor -pub trait TaskExecutor { - fn execute(&mut self, task: Task); -} - /// Verification thread tasks enum VerificationTask { /// Verify single block @@ -124,15 +107,26 @@ enum VerificationTask { Stop, } -/// Synchronization config +/// Synchronization client trait +pub trait Client : Send + 'static { + fn best_block(&self) -> db::BestBlock; + fn on_new_blocks_inventory(&mut self, peer_index: usize, peer_hashes: Vec); + fn on_peer_block(&mut self, peer_index: usize, block: Block); + fn on_peer_disconnected(&mut self, peer_index: usize); + fn reset(&mut self, is_hard: bool); + fn on_block_verification_success(&mut self, block: Block); + fn on_block_verification_error(&mut self, err: &VerificationError, hash: &H256); +} + +/// Synchronization client configuration options. #[derive(Default)] pub struct Config { - /// Skip blocks verification - pub skip_block_verification: bool, -} + /// Do not verify incoming blocks before inserting to db. + pub skip_verification: bool, +} -/// New blocks synchronization process. -pub struct Synchronization { +/// Synchronization client. +pub struct SynchronizationClient { /// Synchronization state. state: State, /// Synchronization peers. @@ -158,7 +152,7 @@ impl State { } } -impl Drop for Synchronization where T: TaskExecutor + Send + 'static { +impl Drop for SynchronizationClient where T: TaskExecutor { fn drop(&mut self) { if let Some(join_handle) = self.verification_worker_thread.take() { self.verification_work_sender @@ -170,11 +164,91 @@ impl Drop for Synchronization where T: TaskExecutor + Send + 'static { } } -impl Synchronization where T: TaskExecutor + Send + 'static { +impl Client for SynchronizationClient where T: TaskExecutor { + /// Get best known block + fn best_block(&self) -> db::BestBlock { + self.chain.read().best_block() + } + + /// Try to queue synchronization of unknown blocks when new inventory is received. + fn on_new_blocks_inventory(&mut self, peer_index: usize, peer_hashes: Vec) { + self.process_new_blocks_inventory(peer_index, peer_hashes); + self.execute_synchronization_tasks(); + } + + /// Process new block. + fn on_peer_block(&mut self, peer_index: usize, block: Block) { + let block_hash = block.hash(); + + // update peers to select next tasks + self.peers.on_block_received(peer_index, &block_hash); + + self.process_peer_block(block_hash, block); + self.execute_synchronization_tasks(); + } + + /// Peer disconnected. + fn on_peer_disconnected(&mut self, peer_index: usize) { + self.peers.on_peer_disconnected(peer_index); + + // when last peer is disconnected, reset, but let verifying blocks be verified + self.reset(false); + } + + /// Reset synchronization process + fn reset(&mut self, is_hard: bool) { + self.peers.reset(); + self.orphaned_blocks.clear(); + // TODO: reset verification queue + + let mut chain = self.chain.write(); + chain.remove_blocks_with_state(BlockState::Requested); + chain.remove_blocks_with_state(BlockState::Scheduled); + if is_hard { + self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number); + chain.remove_blocks_with_state(BlockState::Verifying); + warn!(target: "sync", "Synchronization process restarting from block {:?}", chain.best_block()); + } + else { + self.state = State::Saturated; + } + } + + /// Process successful block verification + fn on_block_verification_success(&mut self, block: Block) { + { + let hash = block.hash(); + let mut chain = self.chain.write(); + + // remove from verifying queue + assert_eq!(chain.remove_block_with_state(&hash, BlockState::Verifying), HashPosition::Front); + + // insert to storage + chain.insert_best_block(block) + .expect("Error inserting to db."); + } + + // continue with synchronization + self.execute_synchronization_tasks(); + } + + /// Process failed block verification + fn on_block_verification_error(&mut self, err: &VerificationError, hash: &H256) { + warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash, err); + + // reset synchronization process + self.reset(true); + + // start new tasks + self.execute_synchronization_tasks(); + } +} + +impl SynchronizationClient where T: TaskExecutor { /// Create new synchronization window - pub fn new(config: Config, executor: Arc>, chain: ChainRef) -> SynchronizationRef { - let sync = SynchronizationRef::new(Mutex::new( - Synchronization { + pub fn new(config: Config, executor: Arc>, chain: ChainRef) -> Arc> { + let sync = Arc::new(Mutex::new( + SynchronizationClient { state: State::Saturated, peers: Peers::new(), executor: executor, @@ -185,7 +259,7 @@ impl Synchronization where T: TaskExecutor + Send + 'static { } )); - if !config.skip_block_verification { + if !config.skip_verification { let (verification_work_sender, verification_work_receiver) = channel(); let csync = sync.clone(); let mut lsync = sync.lock(); @@ -194,7 +268,7 @@ impl Synchronization where T: TaskExecutor + Send + 'static { lsync.verification_worker_thread = Some(thread::Builder::new() .name("Sync verification thread".to_string()) .spawn(move || { - Synchronization::verification_worker_proc(csync, storage, verification_work_receiver) + SynchronizationClient::verification_worker_proc(csync, storage, verification_work_receiver) }) .expect("Error creating verification thread")); } @@ -213,50 +287,6 @@ impl Synchronization where T: TaskExecutor + Send + 'static { } } - /// Try to queue synchronization of unknown blocks when new inventory is received. - pub fn on_new_blocks_inventory(&mut self, peer_index: usize, peer_hashes: Vec) { - self.process_new_blocks_inventory(peer_index, peer_hashes); - self.execute_synchronization_tasks(); - } - - /// Process new block. - pub fn on_peer_block(&mut self, peer_index: usize, block: Block) { - let block_hash = block.hash(); - - // update peers to select next tasks - self.peers.on_block_received(peer_index, &block_hash); - - self.process_peer_block(block_hash, block); - self.execute_synchronization_tasks(); - } - - /// Peer disconnected. - pub fn on_peer_disconnected(&mut self, peer_index: usize) { - self.peers.on_peer_disconnected(peer_index); - - // when last peer is disconnected, reset, but let verifying blocks be verified - self.reset(false); - } - - /// Reset synchronization process - pub fn reset(&mut self, is_hard: bool) { - self.peers.reset(); - self.orphaned_blocks.clear(); - // TODO: reset verification queue - - let mut chain = self.chain.write(); - chain.remove_blocks_with_state(BlockState::Requested); - chain.remove_blocks_with_state(BlockState::Scheduled); - if is_hard { - self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number); - chain.remove_blocks_with_state(BlockState::Verifying); - warn!(target: "sync", "Synchronization process restarting from block {:?}", chain.best_block()); - } - else { - self.state = State::Saturated; - } - } - /// Process new blocks inventory fn process_new_blocks_inventory(&mut self, peer_index: usize, mut peer_hashes: Vec) { // | requested | QUEUED | @@ -273,54 +303,62 @@ impl Synchronization where T: TaskExecutor + Send + 'static { let mut chain = self.chain.write(); - // new block is scheduled => move to synchronizing state - if !self.state.is_synchronizing() { - self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number); - } - - // when synchronization is idling - // => request full inventory - if !chain.has_blocks_of_state(BlockState::Scheduled) - && !chain.has_blocks_of_state(BlockState::Requested) { - let unknown_blocks = peer_hashes.into_iter() - .filter(|hash| chain.block_has_state(&hash, BlockState::Unknown)) - .collect(); - chain.schedule_blocks_hashes(unknown_blocks); - self.peers.insert(peer_index); - return; - } - - // cases: [2], [5], [6], [8] - // if last block from peer_hashes is in window { requested_hashes + queued_hashes } - // => no new blocks for synchronization, but we will use this peer in synchronization - let peer_hashes_len = peer_hashes.len(); - if chain.block_has_state(&peer_hashes[peer_hashes_len - 1], BlockState::Scheduled) - || chain.block_has_state(&peer_hashes[peer_hashes_len - 1], BlockState::Requested) { - self.peers.insert(peer_index); - return; - } - - // cases: [1], [3], [4], [7], [9], [10] - // try to find new blocks for synchronization from inventory - let mut last_known_peer_hash_index = peer_hashes_len - 1; loop { - if chain.block_state(&peer_hashes[last_known_peer_hash_index]) != BlockState::Unknown { - // we have found first block which is known to us - // => blocks in range [(last_known_peer_hash_index + 1)..peer_hashes_len] are unknown - // && must be scheduled for request - let unknown_peer_hashes = peer_hashes.split_off(last_known_peer_hash_index + 1); + // when synchronization is idling + // => request full inventory + if !chain.has_blocks_of_state(BlockState::Scheduled) + && !chain.has_blocks_of_state(BlockState::Requested) { + let unknown_blocks: Vec<_> = peer_hashes.into_iter() + .filter(|hash| chain.block_has_state(&hash, BlockState::Unknown)) + .collect(); - chain.schedule_blocks_hashes(unknown_peer_hashes); + // no new blocks => no need to switch to the synchronizing state + if unknown_blocks.is_empty() { + return; + } + + chain.schedule_blocks_hashes(unknown_blocks); + self.peers.insert(peer_index); + break; + } + + // cases: [2], [5], [6], [8] + // if last block from peer_hashes is in window { requested_hashes + queued_hashes } + // => no new blocks for synchronization, but we will use this peer in synchronization + let peer_hashes_len = peer_hashes.len(); + if chain.block_has_state(&peer_hashes[peer_hashes_len - 1], BlockState::Scheduled) + || chain.block_has_state(&peer_hashes[peer_hashes_len - 1], BlockState::Requested) { self.peers.insert(peer_index); return; } - if last_known_peer_hash_index == 0 { - // either these are blocks from the future or blocks from the past - // => TODO: ignore this peer during synchronization - return; + // cases: [1], [3], [4], [7], [9], [10] + // try to find new blocks for synchronization from inventory + let mut last_known_peer_hash_index = peer_hashes_len - 1; + loop { + if chain.block_state(&peer_hashes[last_known_peer_hash_index]) != BlockState::Unknown { + // we have found first block which is known to us + // => blocks in range [(last_known_peer_hash_index + 1)..peer_hashes_len] are unknown + // && must be scheduled for request + let unknown_peer_hashes = peer_hashes.split_off(last_known_peer_hash_index + 1); + + chain.schedule_blocks_hashes(unknown_peer_hashes); + self.peers.insert(peer_index); + break; + } + + if last_known_peer_hash_index == 0 { + // either these are blocks from the future or blocks from the past + // => TODO: ignore this peer during synchronization + return; + } + last_known_peer_hash_index -= 1; } - last_known_peer_hash_index -= 1; + } + + // move to synchronizing state + if !self.state.is_synchronizing() { + self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number); } } @@ -374,9 +412,9 @@ impl Synchronization where T: TaskExecutor + Send + 'static { // proceed to the next orphaned block if let Entry::Occupied(orphaned_block_entry) = self.orphaned_blocks.entry(current_block_hash) { - let (orphaned_parent_hash, orphaned_block) = orphaned_block_entry.remove_entry(); - current_block_hash = orphaned_parent_hash; + let (_, orphaned_block) = orphaned_block_entry.remove_entry(); current_block = orphaned_block; + current_block_hash = current_block.hash(); } else { break; @@ -453,7 +491,7 @@ impl Synchronization where T: TaskExecutor + Send + 'static { } /// Thread procedure for handling verification tasks - fn verification_worker_proc(sync: SynchronizationRef, storage: Arc, work_receiver: Receiver) { + fn verification_worker_proc(sync: Arc>, storage: Arc, work_receiver: Receiver) { let verifier = ChainVerifier::new(storage); while let Ok(task) = work_receiver.recv() { match task { @@ -471,35 +509,6 @@ impl Synchronization where T: TaskExecutor + Send + 'static { } } } - - /// Process successful block verification - fn on_block_verification_success(&mut self, block: Block) { - { - let hash = block.hash(); - let mut chain = self.chain.write(); - - // remove from verifying queue - assert_eq!(chain.remove_block_with_state(&hash, BlockState::Verifying), HashPosition::Front); - - // insert to storage - chain.insert_best_block(block) - .expect("Error inserting to db."); - } - - // continue with synchronization - self.execute_synchronization_tasks(); - } - - /// Process failed block verification - fn on_block_verification_error(&mut self, err: &VerificationError, hash: &H256) { - warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash, err); - - // reset synchronization process - self.reset(true); - - // start new tasks - self.execute_synchronization_tasks(); - } } #[cfg(test)] @@ -508,9 +517,11 @@ pub mod tests { use std::mem::replace; use parking_lot::{Mutex, RwLock}; use chain::{Block, RepresentH256}; - use super::{Synchronization, SynchronizationRef, Config, Task, TaskExecutor}; + use super::{Client, Config, SynchronizationClient}; + use synchronization_executor::{Task, TaskExecutor}; use local_node::PeersConnections; use synchronization_chain::{Chain, ChainRef}; + use test_data; use p2p::OutboundSyncConnectionRef; use db; @@ -536,13 +547,12 @@ pub mod tests { } } - fn create_sync() -> (Arc>, SynchronizationRef) { + fn create_sync() -> (Arc>, Arc>>) { let storage = Arc::new(db::TestStorage::with_genesis_block()); let chain = ChainRef::new(RwLock::new(Chain::new(storage.clone()))); let executor = Arc::new(Mutex::new(DummyTaskExecutor::default())); - (executor.clone(), Synchronization::new(Config { - skip_block_verification: true, - }, executor, chain)) + let config = Config { skip_verification: true }; + (executor.clone(), SynchronizationClient::new(config, executor, chain)) } #[test] @@ -658,7 +668,6 @@ pub mod tests { && sync.information().orphaned == 1); // receive block from peer#1 sync.on_peer_block(1, block1); - println!("=== {:?}", sync.information().chain); assert!(sync.information().chain.requested == 0 && sync.information().orphaned == 0 && sync.information().chain.stored == 3); @@ -683,4 +692,17 @@ pub mod tests { assert!(!sync.information().state.is_synchronizing()); } } + + #[test] + fn synchronization_not_starting_when_receiving_known_blocks() { + let (executor, sync) = create_sync(); + let mut sync = sync.lock(); + // saturated => receive inventory with known blocks only + sync.on_new_blocks_inventory(1, vec![test_data::genesis().hash()]); + // => no need to start synchronization + assert!(!sync.information().state.is_synchronizing()); + // => no synchronization tasks are scheduled + let tasks = executor.lock().take_tasks(); + assert_eq!(tasks, vec![]); + } } diff --git a/sync/src/synchronization_executor.rs b/sync/src/synchronization_executor.rs index b91a5622..e2afa3e5 100644 --- a/sync/src/synchronization_executor.rs +++ b/sync/src/synchronization_executor.rs @@ -1,16 +1,38 @@ use std::sync::Arc; use std::collections::HashMap; use parking_lot::Mutex; +use chain::{Block, RepresentH256}; use message::common::{InventoryVector, InventoryType}; use message::types; use primitives::hash::H256; use p2p::OutboundSyncConnectionRef; use synchronization_chain::ChainRef; -use synchronization::{Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor}; use local_node::PeersConnections; pub type LocalSynchronizationTaskExecutorRef = Arc>; +/// Synchronization task executor +pub trait TaskExecutor : Send + 'static { + fn execute(&mut self, task: Task); +} + +/// Synchronization task for the peer. +#[derive(Debug, PartialEq)] +pub enum Task { + /// Request given blocks. + RequestBlocks(usize, Vec), + /// Request full inventory using block_locator_hashes. + RequestInventory(usize), + /// Request inventory using best block locator only. + RequestBestInventory(usize), + /// Send block. + SendBlock(usize, Block), + /// Send notfound + SendNotFound(usize, Vec), + /// Send inventory + SendInventory(usize, Vec), +} + /// Synchronization tasks executor pub struct LocalSynchronizationTaskExecutor { /// Active synchronization peers @@ -38,12 +60,12 @@ impl PeersConnections for LocalSynchronizationTaskExecutor { } } -impl SynchronizationTaskExecutor for LocalSynchronizationTaskExecutor { - fn execute(&mut self, task: SynchronizationTask) { +impl TaskExecutor for LocalSynchronizationTaskExecutor { + fn execute(&mut self, task: Task) { // TODO: what is types::GetBlocks::version here? (@ PR#37) match task { - SynchronizationTask::RequestBlocks(peer_index, blocks_hashes) => { + Task::RequestBlocks(peer_index, blocks_hashes) => { let getdata = types::GetData { inventory: blocks_hashes.into_iter() .map(|hash| InventoryVector { @@ -58,7 +80,7 @@ impl SynchronizationTaskExecutor for LocalSynchronizationTaskExecutor { connection.send_getdata(&getdata); } } - SynchronizationTask::RequestInventory(peer_index) => { + Task::RequestInventory(peer_index) => { let block_locator_hashes = self.chain.read().block_locator_hashes(); let getblocks = types::GetBlocks { version: 0, @@ -72,7 +94,7 @@ impl SynchronizationTaskExecutor for LocalSynchronizationTaskExecutor { connection.send_getblocks(&getblocks); } }, - SynchronizationTask::RequestBestInventory(peer_index) => { + Task::RequestBestInventory(peer_index) => { let block_locator_hashes = self.chain.read().best_block_locator_hashes(); let getblocks = types::GetBlocks { version: 0, @@ -86,6 +108,39 @@ impl SynchronizationTaskExecutor for LocalSynchronizationTaskExecutor { connection.send_getblocks(&getblocks); } }, + Task::SendBlock(peer_index, block) => { + let block_message = types::Block { + block: block, + }; + + if let Some(connection) = self.peers.get_mut(&peer_index) { + let connection = &mut *connection; + trace!(target: "sync", "Sending block {:?} to peer#{}", block_message.block.hash(), peer_index); + connection.send_block(&block_message); + } + }, + Task::SendNotFound(peer_index, unknown_inventory) => { + let notfound = types::NotFound { + inventory: unknown_inventory, + }; + + if let Some(connection) = self.peers.get_mut(&peer_index) { + let connection = &mut *connection; + trace!(target: "sync", "Sending notfound to peer#{} with {} items", peer_index, notfound.inventory.len()); + connection.send_notfound(¬found); + } + }, + Task::SendInventory(peer_index, inventory) => { + let inventory = types::Inv { + inventory: inventory, + }; + + if let Some(connection) = self.peers.get_mut(&peer_index) { + let connection = &mut *connection; + trace!(target: "sync", "Sending inventory to peer#{} with {} items", peer_index, inventory.inventory.len()); + connection.send_inventory(&inventory); + } + }, } } } diff --git a/sync/src/synchronization_server.rs b/sync/src/synchronization_server.rs new file mode 100644 index 00000000..5bfd1217 --- /dev/null +++ b/sync/src/synchronization_server.rs @@ -0,0 +1,284 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread; +use std::collections::{VecDeque, HashMap}; +use std::collections::hash_map::Entry; +use parking_lot::{Mutex, Condvar}; +use message::common::{InventoryVector, InventoryType}; +use db; +use primitives::hash::H256; +use synchronization_chain::ChainRef; +use synchronization_executor::{Task, TaskExecutor}; +use message::types; + +/// Synchronization requests server trait +pub trait Server : Send + 'static { + fn serve_getdata(&mut self, peer_index: usize, message: types::GetData); + fn serve_getblocks(&mut self, peer_index: usize, message: types::GetBlocks); +} + +/// Synchronization requests server +pub struct SynchronizationServer { + chain: ChainRef, + queue_ready: Arc, + queue: Arc>, + worker_thread: Option>, +} + +struct ServerQueue { + is_stopping: AtomicBool, + queue_ready: Arc, + peers_queue: VecDeque, + tasks_queue: HashMap>, +} + +#[derive(Debug, PartialEq)] +pub enum ServerTask { + ServeGetData(Vec), + ServeGetBlocks(db::BestBlock, H256), + ReturnNotFound(Vec), + ReturnBlock(H256), +} + +impl SynchronizationServer { + pub fn new(chain: ChainRef, executor: Arc>) -> Self { + let queue_ready = Arc::new(Condvar::new()); + let queue = Arc::new(Mutex::new(ServerQueue::new(queue_ready.clone()))); + let mut server = SynchronizationServer { + chain: chain.clone(), + queue_ready: queue_ready.clone(), + queue: queue.clone(), + worker_thread: None, + }; + server.worker_thread = Some(thread::spawn(move || { + SynchronizationServer::server_worker(queue_ready, queue, chain, executor); + })); + server + } + + fn locate_known_block(&self, block_locator_hashes: Vec) -> Option { + let chain = self.chain.read(); + block_locator_hashes.into_iter() + .filter_map(|hash| chain + .storage_block_number(&hash) + .map(|number| db::BestBlock { + number: number, + hash: hash, + })) + .nth(0) + } + + fn server_worker(queue_ready: Arc, queue: Arc>, chain: ChainRef, executor: Arc>) { + loop { + let server_task = { + let mut queue = queue.lock(); + if queue.is_stopping.load(Ordering::SeqCst) { + break + } + queue.next_task() + .map_or_else(|| { + queue_ready.wait(&mut queue); + queue.next_task() + }, |next_task| Some(next_task)) + }; + + match server_task { + // has new task + Some(server_task) => match server_task { + // `getdata` => `notfound` + `block` + ... + (peer_index, ServerTask::ServeGetData(inventory)) => { + let mut unknown_items: Vec = Vec::new(); + let mut new_tasks: Vec = Vec::new(); + for item in inventory { + match item.inv_type { + InventoryType::MessageBlock => { + match chain.read().storage_block_number(&item.hash) { + Some(_) => new_tasks.push(ServerTask::ReturnBlock(item.hash.clone())), + None => unknown_items.push(item), + } + }, + _ => (), // TODO: process other inventory types + } + } + // respond with `notfound` message for unknown data + if !unknown_items.is_empty() { + trace!(target: "sync", "Going to respond with notfound with {} items to peer#{}", unknown_items.len(), peer_index); + new_tasks.push(ServerTask::ReturnNotFound(unknown_items)); + } + // schedule data responses + if !new_tasks.is_empty() { + trace!(target: "sync", "Going to respond with data with {} items to peer#{}", new_tasks.len(), peer_index); + queue.lock().add_tasks(peer_index, new_tasks); + } + }, + // `inventory` + (peer_index, ServerTask::ServeGetBlocks(best_block, hash_stop)) => { + let storage_block_hash = chain.read().storage_block_hash(best_block.number); + if let Some(hash) = storage_block_hash { + // check that chain has not reorganized since task was queued + if hash == best_block.hash { + let mut inventory: Vec = Vec::new(); + let first_block_number = best_block.number + 1; + let last_block_number = best_block.number + 500; + // 500 hashes after best_block.number OR hash_stop OR blockchain end + for block_number in first_block_number..last_block_number { + match chain.read().storage_block_hash(block_number) { + Some(ref block_hash) if block_hash == &hash_stop => break, + None => break, + Some(block_hash) => { + inventory.push(InventoryVector { + inv_type: InventoryType::MessageBlock, + hash: block_hash, + }); + }, + } + } + if !inventory.is_empty() { + trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", inventory.len(), peer_index); + executor.lock().execute(Task::SendInventory(peer_index, inventory)); + } + } + } + }, + // `notfound` + (peer_index, ServerTask::ReturnNotFound(inventory)) => { + executor.lock().execute(Task::SendNotFound(peer_index, inventory)); + }, + // `block` + (peer_index, ServerTask::ReturnBlock(block_hash)) => { + let storage_block = chain.read().storage_block(&block_hash); + if let Some(storage_block) = storage_block { + executor.lock().execute(Task::SendBlock(peer_index, storage_block)); + } + }, + }, + // no tasks after wake-up => stopping + None => break, + } + } + } +} + +impl Drop for SynchronizationServer { + fn drop(&mut self) { + if let Some(join_handle) = self.worker_thread.take() { + self.queue.lock().is_stopping.store(true, Ordering::SeqCst); + self.queue_ready.notify_one(); + join_handle.join().expect("Clean shutdown."); + } + } +} + +impl Server for SynchronizationServer { + fn serve_getdata(&mut self, peer_index: usize, message: types::GetData) { + self.queue.lock().add_task(peer_index, ServerTask::ServeGetData(message.inventory)); + } + + fn serve_getblocks(&mut self, peer_index: usize, message: types::GetBlocks) { + if let Some(best_common_block) = self.locate_known_block(message.block_locator_hashes) { + trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash); + self.queue.lock().add_task(peer_index, ServerTask::ServeGetBlocks(best_common_block, message.hash_stop)); + } + else { + trace!(target: "sync", "No common blocks with peer#{}", peer_index); + } + } +} + +impl ServerQueue { + pub fn new(queue_ready: Arc) -> Self { + ServerQueue { + is_stopping: AtomicBool::new(false), + queue_ready: queue_ready, + peers_queue: VecDeque::new(), + tasks_queue: HashMap::new(), + } + } + + pub fn next_task(&mut self) -> Option<(usize, ServerTask)> { + self.peers_queue.pop_front() + .map(|peer| { + let (peer_task, no_tasks_left) = { + let peer_tasks = self.tasks_queue.get_mut(&peer).expect("for each peer there is non-empty tasks queue"); + let peer_task = peer_tasks.pop_front().expect("for each peer there is non-empty tasks queue"); + (peer_task, peer_tasks.is_empty()) + }; + + // remove if no tasks left || schedule otherwise + if no_tasks_left { + self.tasks_queue.remove(&peer); + } + else { + self.peers_queue.push_back(peer); + } + (peer, peer_task) + }) + } + + pub fn add_task(&mut self, peer_index: usize, task: ServerTask) { + match self.tasks_queue.entry(peer_index) { + Entry::Occupied(mut entry) => { + entry.get_mut().push_back(task); + }, + Entry::Vacant(entry) => { + let mut new_tasks = VecDeque::new(); + new_tasks.push_back(task); + entry.insert(new_tasks); + self.peers_queue.push_back(peer_index); + } + } + self.queue_ready.notify_one(); + } + + pub fn add_tasks(&mut self, peer_index: usize, tasks: Vec) { + match self.tasks_queue.entry(peer_index) { + Entry::Occupied(mut entry) => { + entry.get_mut().extend(tasks); + }, + Entry::Vacant(entry) => { + let mut new_tasks = VecDeque::new(); + new_tasks.extend(tasks); + entry.insert(new_tasks); + self.peers_queue.push_back(peer_index); + } + } + self.queue_ready.notify_one(); + } +} + +#[cfg(test)] +pub mod tests { + use super::{Server, ServerTask}; + use message::types; + use db; + use std::mem::replace; + + pub struct DummyServer { + tasks: Vec<(usize, ServerTask)>, + } + + impl DummyServer { + pub fn new() -> Self { + DummyServer { + tasks: Vec::new(), + } + } + + pub fn take_tasks(&mut self) -> Vec<(usize, ServerTask)> { + replace(&mut self.tasks, Vec::new()) + } + } + + impl Server for DummyServer { + fn serve_getdata(&mut self, peer_index: usize, message: types::GetData) { + self.tasks.push((peer_index, ServerTask::ServeGetData(message.inventory))); + } + + fn serve_getblocks(&mut self, peer_index: usize, message: types::GetBlocks) { + self.tasks.push((peer_index, ServerTask::ServeGetBlocks(db::BestBlock { + number: 0, + hash: message.block_locator_hashes[0].clone(), + }, message.hash_stop))); + } + } +}