added LocalNode tests

This commit is contained in:
Svyatoslav Nikolsky 2016-10-31 18:22:02 +03:00
parent 812a436d9b
commit 44c2faab59
7 changed files with 106 additions and 26 deletions

View File

@ -19,8 +19,7 @@ use std::env;
use std::sync::Arc; use std::sync::Arc;
use std::net::SocketAddr; use std::net::SocketAddr;
use p2p::{P2P, event_loop, forever, NetConfig}; use p2p::{P2P, event_loop, forever, NetConfig};
use sync::local_node::LocalNode; use sync::create_sync_connection_factory;
use sync::inbound_connection_factory::InboundConnectionFactory;
use chain::Block; use chain::Block;
fn main() { fn main() {
@ -94,12 +93,10 @@ fn run() -> Result<(), String> {
let db = open_db(cfg.use_disk_database); let db = open_db(cfg.use_disk_database);
init_db(&db); init_db(&db);
let local_sync_node = LocalNode::new(db); let sync_connection_factory = create_sync_connection_factory(db);
let local_sync_factory = InboundConnectionFactory::with_local_node(local_sync_node.clone());
let p2p = P2P::new(p2p_cfg, local_sync_factory, el.handle()); let p2p = P2P::new(p2p_cfg, sync_connection_factory, el.handle());
try!(p2p.run().map_err(|_| "Failed to start p2p module")); try!(p2p.run().map_err(|_| "Failed to start p2p module"));
el.run(forever()).unwrap(); el.run(forever()).unwrap();
Ok(()) Ok(())
} }

View File

@ -1,14 +1,15 @@
use message::types; use message::types;
use p2p::{InboundSyncConnection, InboundSyncConnectionRef}; use p2p::{InboundSyncConnection, InboundSyncConnectionRef};
use local_node::LocalNodeRef; use local_node::LocalNodeRef;
use synchronization_executor::LocalSynchronizationTaskExecutor;
pub struct InboundConnection { pub struct InboundConnection {
local_node: LocalNodeRef, local_node: LocalNodeRef<LocalSynchronizationTaskExecutor>,
peer_index: usize, peer_index: usize,
} }
impl InboundConnection { impl InboundConnection {
pub fn new(local_node: LocalNodeRef, peer_index: usize) -> InboundSyncConnectionRef { pub fn new(local_node: LocalNodeRef<LocalSynchronizationTaskExecutor>, peer_index: usize) -> InboundSyncConnectionRef {
Box::new(InboundConnection { Box::new(InboundConnection {
local_node: local_node, local_node: local_node,
peer_index: peer_index, peer_index: peer_index,

View File

@ -1,13 +1,14 @@
use p2p::{LocalSyncNode, LocalSyncNodeRef, OutboundSyncConnectionRef, InboundSyncConnectionRef}; use p2p::{LocalSyncNode, LocalSyncNodeRef, OutboundSyncConnectionRef, InboundSyncConnectionRef};
use local_node::LocalNodeRef; use local_node::LocalNodeRef;
use inbound_connection::InboundConnection; use inbound_connection::InboundConnection;
use synchronization_executor::LocalSynchronizationTaskExecutor;
pub struct InboundConnectionFactory { pub struct InboundConnectionFactory {
local_node: LocalNodeRef, local_node: LocalNodeRef<LocalSynchronizationTaskExecutor>,
} }
impl InboundConnectionFactory { impl InboundConnectionFactory {
pub fn with_local_node(local_node: LocalNodeRef) -> LocalSyncNodeRef { pub fn with_local_node(local_node: LocalNodeRef<LocalSynchronizationTaskExecutor>) -> LocalSyncNodeRef {
Box::new( Box::new(
InboundConnectionFactory { InboundConnectionFactory {
local_node: local_node, local_node: local_node,

View File

@ -11,9 +11,25 @@ extern crate verification;
mod hash_queue; mod hash_queue;
mod inbound_connection; mod inbound_connection;
pub mod inbound_connection_factory; mod inbound_connection_factory;
pub mod local_node; mod local_node;
mod synchronization; mod synchronization;
mod synchronization_chain; mod synchronization_chain;
mod synchronization_executor; mod synchronization_executor;
mod synchronization_peers; mod synchronization_peers;
use std::sync::Arc;
use parking_lot::RwLock;
/// Create inbound synchronization connections factory for given `db`.
pub fn create_sync_connection_factory(db: Arc<db::Store>) -> p2p::LocalSyncNodeRef {
use synchronization_chain::Chain as SyncChain;
use synchronization_executor::LocalSynchronizationTaskExecutor as SyncExecutor;
use local_node::LocalNode as SyncNode;
use inbound_connection_factory::InboundConnectionFactory as SyncConnectionFactory;
let sync_chain = Arc::new(RwLock::new(SyncChain::new(db)));
let sync_executor = SyncExecutor::new(sync_chain.clone());
let sync_node = SyncNode::new(sync_chain, sync_executor);
SyncConnectionFactory::with_local_node(sync_node)
}

View File

@ -2,39 +2,40 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use parking_lot::Mutex; use parking_lot::Mutex;
use db; use db;
use parking_lot::RwLock;
use chain::RepresentH256; use chain::RepresentH256;
use p2p::OutboundSyncConnectionRef; use p2p::OutboundSyncConnectionRef;
use message::common::InventoryType; use message::common::InventoryType;
use message::types; use message::types;
use synchronization::{Synchronization, SynchronizationRef, Config as SynchronizationConfig, Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor}; use synchronization::{Synchronization, SynchronizationRef, Config as SynchronizationConfig, Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor};
use synchronization_chain::{Chain, ChainRef}; use synchronization_chain::ChainRef;
use synchronization_executor::LocalSynchronizationTaskExecutor;
/// Thread-safe reference to the `LocalNode`. /// Thread-safe reference to the `LocalNode`.
/// Locks order: /// Locks order:
/// 1) sync Mutex /// 1) sync Mutex
/// 2) executor Mutex /// 2) executor Mutex
/// 2) chain RwLock /// 2) chain RwLock
pub type LocalNodeRef = Arc<LocalNode>; pub type LocalNodeRef<T> = Arc<LocalNode<T>>;
/// Local synchronization node /// Local synchronization node
pub struct LocalNode { pub struct LocalNode<T: SynchronizationTaskExecutor + PeersConnections + Send + 'static> {
/// Throughout counter of synchronization peers /// Throughout counter of synchronization peers
peer_counter: AtomicUsize, peer_counter: AtomicUsize,
/// Synchronization chain /// Synchronization chain
chain: ChainRef, chain: ChainRef,
/// Synchronization executor /// Synchronization executor
executor: Arc<Mutex<LocalSynchronizationTaskExecutor>>, executor: Arc<Mutex<T>>,
/// Synchronization process /// Synchronization process
sync: SynchronizationRef<LocalSynchronizationTaskExecutor>, sync: SynchronizationRef<T>,
} }
impl LocalNode { /// Peers list
pub trait PeersConnections {
fn add_peer_connection(&mut self, peer_index: usize, outbound_connection: OutboundSyncConnectionRef);
}
impl<T> LocalNode<T> where T: SynchronizationTaskExecutor + PeersConnections + Send + 'static {
/// New synchronization node with given storage /// New synchronization node with given storage
pub fn new(storage: Arc<db::Store>) -> LocalNodeRef { pub fn new(chain: ChainRef, executor: Arc<Mutex<T>>) -> LocalNodeRef<T> {
let chain = ChainRef::new(RwLock::new(Chain::new(storage.clone())));
let executor = LocalSynchronizationTaskExecutor::new(chain.clone());
let sync = Synchronization::new(SynchronizationConfig::default(), executor.clone(), chain.clone()); let sync = Synchronization::new(SynchronizationConfig::default(), executor.clone(), chain.clone());
Arc::new(LocalNode { Arc::new(LocalNode {
peer_counter: AtomicUsize::new(0), peer_counter: AtomicUsize::new(0),
@ -158,3 +159,58 @@ impl LocalNode {
trace!(target: "sync", "Got `blocktxn` message from peer#{}", peer_index); trace!(target: "sync", "Got `blocktxn` message from peer#{}", peer_index);
} }
} }
#[cfg(test)]
mod tests {
use std::sync::Arc;
use parking_lot::{Mutex, RwLock};
use synchronization::Task;
use synchronization::tests::DummyTaskExecutor;
use synchronization_chain::Chain;
use p2p::{OutboundSyncConnection, OutboundSyncConnectionRef};
use message::types;
use db;
use super::LocalNode;
struct DummyOutboundSyncConnection;
impl DummyOutboundSyncConnection {
pub fn new() -> OutboundSyncConnectionRef {
Box::new(DummyOutboundSyncConnection {})
}
}
impl OutboundSyncConnection for DummyOutboundSyncConnection {
fn send_inventory(&self, _message: &types::Inv) {}
fn send_getdata(&self, _message: &types::GetData) {}
fn send_getblocks(&self, _message: &types::GetBlocks) {}
fn send_getheaders(&self, _message: &types::GetHeaders) {}
fn send_transaction(&self, _message: &types::Tx) {}
fn send_block(&self, _message: &types::Block) {}
fn send_headers(&self, _message: &types::Headers) {}
fn send_mempool(&self, _message: &types::MemPool) {}
fn send_filterload(&self, _message: &types::FilterLoad) {}
fn send_filteradd(&self, _message: &types::FilterAdd) {}
fn send_filterclear(&self, _message: &types::FilterClear) {}
fn send_merkleblock(&self, _message: &types::MerkleBlock) {}
fn send_sendheaders(&self, _message: &types::SendHeaders) {}
fn send_feefilter(&self, _message: &types::FeeFilter) {}
fn send_send_compact(&self, _message: &types::SendCompact) {}
fn send_compact_block(&self, _message: &types::CompactBlock) {}
fn send_get_block_txn(&self, _message: &types::GetBlockTxn) {}
fn send_block_txn(&self, _message: &types::BlockTxn) {}
}
#[test]
fn local_node_request_inventory_on_sync_start() {
let chain = Arc::new(RwLock::new(Chain::new(Arc::new(db::TestStorage::with_genesis_block()))));
let executor = Arc::new(Mutex::new(DummyTaskExecutor::default()));
let local_node = LocalNode::new(chain, executor.clone());
let peer_index = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
local_node.start_sync_session(peer_index, 0);
let tasks = executor.lock().take_tasks();
assert_eq!(tasks.len(), 1);
assert_eq!(tasks, vec![Task::RequestInventory(peer_index)]);
}
}

View File

@ -487,17 +487,19 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
} }
#[cfg(test)] #[cfg(test)]
mod tests { pub mod tests {
use std::sync::Arc; use std::sync::Arc;
use std::mem::replace; use std::mem::replace;
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use chain::{Block, RepresentH256}; use chain::{Block, RepresentH256};
use super::{Synchronization, SynchronizationRef, Config, Task, TaskExecutor}; use super::{Synchronization, SynchronizationRef, Config, Task, TaskExecutor};
use local_node::PeersConnections;
use synchronization_chain::{Chain, ChainRef}; use synchronization_chain::{Chain, ChainRef};
use p2p::OutboundSyncConnectionRef;
use db; use db;
#[derive(Default)] #[derive(Default)]
struct DummyTaskExecutor { pub struct DummyTaskExecutor {
pub tasks: Vec<Task>, pub tasks: Vec<Task>,
} }
@ -507,6 +509,10 @@ mod tests {
} }
} }
impl PeersConnections for DummyTaskExecutor {
fn add_peer_connection(&mut self, _: usize, _: OutboundSyncConnectionRef) {}
}
impl TaskExecutor for DummyTaskExecutor { impl TaskExecutor for DummyTaskExecutor {
fn execute(&mut self, task: Task) { fn execute(&mut self, task: Task) {
self.tasks.push(task); self.tasks.push(task);

View File

@ -7,6 +7,7 @@ use primitives::hash::H256;
use p2p::OutboundSyncConnectionRef; use p2p::OutboundSyncConnectionRef;
use synchronization_chain::ChainRef; use synchronization_chain::ChainRef;
use synchronization::{Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor}; use synchronization::{Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor};
use local_node::PeersConnections;
pub type LocalSynchronizationTaskExecutorRef = Arc<Mutex<LocalSynchronizationTaskExecutor>>; pub type LocalSynchronizationTaskExecutorRef = Arc<Mutex<LocalSynchronizationTaskExecutor>>;
@ -25,8 +26,10 @@ impl LocalSynchronizationTaskExecutor {
chain: chain, chain: chain,
})) }))
} }
}
pub fn add_peer_connection(&mut self, index: usize, connection: OutboundSyncConnectionRef) { impl PeersConnections for LocalSynchronizationTaskExecutor {
fn add_peer_connection(&mut self, index: usize, connection: OutboundSyncConnectionRef) {
self.peers.insert(index, connection); self.peers.insert(index, connection);
} }
} }