diff --git a/pbtc/main.rs b/pbtc/main.rs index 20a07a0d..bb46e607 100644 --- a/pbtc/main.rs +++ b/pbtc/main.rs @@ -19,8 +19,7 @@ use std::env; use std::sync::Arc; use std::net::SocketAddr; use p2p::{P2P, event_loop, forever, NetConfig}; -use sync::local_node::LocalNode; -use sync::inbound_connection_factory::InboundConnectionFactory; +use sync::create_sync_connection_factory; use chain::Block; fn main() { @@ -94,12 +93,10 @@ fn run() -> Result<(), String> { let db = open_db(cfg.use_disk_database); init_db(&db); - let local_sync_node = LocalNode::new(db); - let local_sync_factory = InboundConnectionFactory::with_local_node(local_sync_node.clone()); + let sync_connection_factory = create_sync_connection_factory(db); - let p2p = P2P::new(p2p_cfg, local_sync_factory, el.handle()); + let p2p = P2P::new(p2p_cfg, sync_connection_factory, el.handle()); try!(p2p.run().map_err(|_| "Failed to start p2p module")); el.run(forever()).unwrap(); Ok(()) } - diff --git a/sync/src/inbound_connection.rs b/sync/src/inbound_connection.rs index db193a9c..ca2d8b4f 100644 --- a/sync/src/inbound_connection.rs +++ b/sync/src/inbound_connection.rs @@ -1,14 +1,15 @@ use message::types; use p2p::{InboundSyncConnection, InboundSyncConnectionRef}; use local_node::LocalNodeRef; +use synchronization_executor::LocalSynchronizationTaskExecutor; pub struct InboundConnection { - local_node: LocalNodeRef, + local_node: LocalNodeRef, peer_index: usize, } impl InboundConnection { - pub fn new(local_node: LocalNodeRef, peer_index: usize) -> InboundSyncConnectionRef { + pub fn new(local_node: LocalNodeRef, peer_index: usize) -> InboundSyncConnectionRef { Box::new(InboundConnection { local_node: local_node, peer_index: peer_index, diff --git a/sync/src/inbound_connection_factory.rs b/sync/src/inbound_connection_factory.rs index 5b529d71..578e3ec6 100644 --- a/sync/src/inbound_connection_factory.rs +++ b/sync/src/inbound_connection_factory.rs @@ -1,13 +1,14 @@ use p2p::{LocalSyncNode, LocalSyncNodeRef, OutboundSyncConnectionRef, InboundSyncConnectionRef}; use local_node::LocalNodeRef; use inbound_connection::InboundConnection; +use synchronization_executor::LocalSynchronizationTaskExecutor; pub struct InboundConnectionFactory { - local_node: LocalNodeRef, + local_node: LocalNodeRef, } impl InboundConnectionFactory { - pub fn with_local_node(local_node: LocalNodeRef) -> LocalSyncNodeRef { + pub fn with_local_node(local_node: LocalNodeRef) -> LocalSyncNodeRef { Box::new( InboundConnectionFactory { local_node: local_node, diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 506f540e..a5f35167 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -11,9 +11,25 @@ extern crate verification; mod hash_queue; mod inbound_connection; -pub mod inbound_connection_factory; -pub mod local_node; +mod inbound_connection_factory; +mod local_node; mod synchronization; mod synchronization_chain; mod synchronization_executor; mod synchronization_peers; + +use std::sync::Arc; +use parking_lot::RwLock; + +/// Create inbound synchronization connections factory for given `db`. +pub fn create_sync_connection_factory(db: Arc) -> p2p::LocalSyncNodeRef { + use synchronization_chain::Chain as SyncChain; + use synchronization_executor::LocalSynchronizationTaskExecutor as SyncExecutor; + use local_node::LocalNode as SyncNode; + use inbound_connection_factory::InboundConnectionFactory as SyncConnectionFactory; + + let sync_chain = Arc::new(RwLock::new(SyncChain::new(db))); + let sync_executor = SyncExecutor::new(sync_chain.clone()); + let sync_node = SyncNode::new(sync_chain, sync_executor); + SyncConnectionFactory::with_local_node(sync_node) +} diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index a742a284..d61c24d5 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -2,39 +2,40 @@ use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use parking_lot::Mutex; use db; -use parking_lot::RwLock; use chain::RepresentH256; use p2p::OutboundSyncConnectionRef; use message::common::InventoryType; use message::types; use synchronization::{Synchronization, SynchronizationRef, Config as SynchronizationConfig, Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor}; -use synchronization_chain::{Chain, ChainRef}; -use synchronization_executor::LocalSynchronizationTaskExecutor; +use synchronization_chain::ChainRef; /// Thread-safe reference to the `LocalNode`. /// Locks order: /// 1) sync Mutex /// 2) executor Mutex /// 2) chain RwLock -pub type LocalNodeRef = Arc; +pub type LocalNodeRef = Arc>; /// Local synchronization node -pub struct LocalNode { +pub struct LocalNode { /// Throughout counter of synchronization peers peer_counter: AtomicUsize, /// Synchronization chain chain: ChainRef, /// Synchronization executor - executor: Arc>, + executor: Arc>, /// Synchronization process - sync: SynchronizationRef, + sync: SynchronizationRef, } -impl LocalNode { +/// Peers list +pub trait PeersConnections { + fn add_peer_connection(&mut self, peer_index: usize, outbound_connection: OutboundSyncConnectionRef); +} + +impl LocalNode where T: SynchronizationTaskExecutor + PeersConnections + Send + 'static { /// New synchronization node with given storage - pub fn new(storage: Arc) -> LocalNodeRef { - let chain = ChainRef::new(RwLock::new(Chain::new(storage.clone()))); - let executor = LocalSynchronizationTaskExecutor::new(chain.clone()); + pub fn new(chain: ChainRef, executor: Arc>) -> LocalNodeRef { let sync = Synchronization::new(SynchronizationConfig::default(), executor.clone(), chain.clone()); Arc::new(LocalNode { peer_counter: AtomicUsize::new(0), @@ -158,3 +159,58 @@ impl LocalNode { trace!(target: "sync", "Got `blocktxn` message from peer#{}", peer_index); } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use parking_lot::{Mutex, RwLock}; + use synchronization::Task; + use synchronization::tests::DummyTaskExecutor; + use synchronization_chain::Chain; + use p2p::{OutboundSyncConnection, OutboundSyncConnectionRef}; + use message::types; + use db; + use super::LocalNode; + + struct DummyOutboundSyncConnection; + + impl DummyOutboundSyncConnection { + pub fn new() -> OutboundSyncConnectionRef { + Box::new(DummyOutboundSyncConnection {}) + } + } + + impl OutboundSyncConnection for DummyOutboundSyncConnection { + fn send_inventory(&self, _message: &types::Inv) {} + fn send_getdata(&self, _message: &types::GetData) {} + fn send_getblocks(&self, _message: &types::GetBlocks) {} + fn send_getheaders(&self, _message: &types::GetHeaders) {} + fn send_transaction(&self, _message: &types::Tx) {} + fn send_block(&self, _message: &types::Block) {} + fn send_headers(&self, _message: &types::Headers) {} + fn send_mempool(&self, _message: &types::MemPool) {} + fn send_filterload(&self, _message: &types::FilterLoad) {} + fn send_filteradd(&self, _message: &types::FilterAdd) {} + fn send_filterclear(&self, _message: &types::FilterClear) {} + fn send_merkleblock(&self, _message: &types::MerkleBlock) {} + fn send_sendheaders(&self, _message: &types::SendHeaders) {} + fn send_feefilter(&self, _message: &types::FeeFilter) {} + fn send_send_compact(&self, _message: &types::SendCompact) {} + fn send_compact_block(&self, _message: &types::CompactBlock) {} + fn send_get_block_txn(&self, _message: &types::GetBlockTxn) {} + fn send_block_txn(&self, _message: &types::BlockTxn) {} + } + + #[test] + fn local_node_request_inventory_on_sync_start() { + let chain = Arc::new(RwLock::new(Chain::new(Arc::new(db::TestStorage::with_genesis_block())))); + let executor = Arc::new(Mutex::new(DummyTaskExecutor::default())); + let local_node = LocalNode::new(chain, executor.clone()); + let peer_index = local_node.create_sync_session(0, DummyOutboundSyncConnection::new()); + local_node.start_sync_session(peer_index, 0); + + let tasks = executor.lock().take_tasks(); + assert_eq!(tasks.len(), 1); + assert_eq!(tasks, vec![Task::RequestInventory(peer_index)]); + } +} \ No newline at end of file diff --git a/sync/src/synchronization.rs b/sync/src/synchronization.rs index f7c6ad2d..51946191 100644 --- a/sync/src/synchronization.rs +++ b/sync/src/synchronization.rs @@ -487,17 +487,19 @@ impl Synchronization where T: TaskExecutor + Send + 'static { } #[cfg(test)] -mod tests { +pub mod tests { use std::sync::Arc; use std::mem::replace; use parking_lot::{Mutex, RwLock}; use chain::{Block, RepresentH256}; use super::{Synchronization, SynchronizationRef, Config, Task, TaskExecutor}; + use local_node::PeersConnections; use synchronization_chain::{Chain, ChainRef}; + use p2p::OutboundSyncConnectionRef; use db; #[derive(Default)] - struct DummyTaskExecutor { + pub struct DummyTaskExecutor { pub tasks: Vec, } @@ -507,6 +509,10 @@ mod tests { } } + impl PeersConnections for DummyTaskExecutor { + fn add_peer_connection(&mut self, _: usize, _: OutboundSyncConnectionRef) {} + } + impl TaskExecutor for DummyTaskExecutor { fn execute(&mut self, task: Task) { self.tasks.push(task); diff --git a/sync/src/synchronization_executor.rs b/sync/src/synchronization_executor.rs index a8871f61..ec258e25 100644 --- a/sync/src/synchronization_executor.rs +++ b/sync/src/synchronization_executor.rs @@ -7,6 +7,7 @@ use primitives::hash::H256; use p2p::OutboundSyncConnectionRef; use synchronization_chain::ChainRef; use synchronization::{Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor}; +use local_node::PeersConnections; pub type LocalSynchronizationTaskExecutorRef = Arc>; @@ -25,8 +26,10 @@ impl LocalSynchronizationTaskExecutor { chain: chain, })) } +} - pub fn add_peer_connection(&mut self, index: usize, connection: OutboundSyncConnectionRef) { +impl PeersConnections for LocalSynchronizationTaskExecutor { + fn add_peer_connection(&mut self, index: usize, connection: OutboundSyncConnectionRef) { self.peers.insert(index, connection); } }