fixed client construction

This commit is contained in:
Svyatoslav Nikolsky 2016-11-18 14:39:34 +03:00
parent 70bb4ef5cc
commit b66fd70c6f
3 changed files with 37 additions and 65 deletions

View File

@ -61,19 +61,15 @@ pub fn create_sync_connection_factory(handle: &Handle, consensus_params: Consens
use local_node::LocalNode as SyncNode;
use inbound_connection_factory::InboundConnectionFactory as SyncConnectionFactory;
use synchronization_server::SynchronizationServer;
use synchronization_client::{SynchronizationClient, Config as SynchronizationConfig};
use synchronization_client::{SynchronizationClient, SynchronizationClientCore, Config as SynchronizationConfig};
use synchronization_verifier::AsyncVerifier;
let sync_chain = Arc::new(RwLock::new(SyncChain::new(db)));
let sync_executor = SyncExecutor::new(sync_chain.clone());
let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone()));
let sync_client = SynchronizationClient::new(SynchronizationConfig::new(), handle, sync_executor.clone(), sync_chain.clone());
{
let verifier_sink = sync_client.lock().core();
let verifier = AsyncVerifier::new(consensus_params, sync_chain, verifier_sink);
sync_client.lock().set_verifier(verifier);
}
let sync_client_core = SynchronizationClientCore::new(SynchronizationConfig::new(), handle, sync_executor.clone(), sync_chain.clone());
let verifier = AsyncVerifier::new(consensus_params, sync_chain, sync_client_core.clone());
let sync_client = SynchronizationClient::new(sync_client_core, verifier);
let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor));
SyncConnectionFactory::with_local_node(sync_node)
}

View File

@ -230,7 +230,7 @@ mod tests {
use chain::RepresentH256;
use synchronization_executor::Task;
use synchronization_executor::tests::DummyTaskExecutor;
use synchronization_client::{Config, SynchronizationClient};
use synchronization_client::{Config, SynchronizationClient, SynchronizationClientCore};
use synchronization_chain::Chain;
use p2p::{event_loop, OutboundSyncConnection, OutboundSyncConnectionRef};
use message::types;
@ -281,13 +281,10 @@ mod tests {
let executor = DummyTaskExecutor::new();
let server = Arc::new(DummyServer::new());
let config = Config { threads_num: 1 };
let client = SynchronizationClient::new(config, &handle, executor.clone(), chain.clone());
{
let verifier_sink = client.lock().core();
let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone());
let mut verifier = DummyVerifier::new();
verifier.set_sink(verifier_sink);
client.lock().set_verifier(verifier);
}
verifier.set_sink(client_core.clone());
let client = SynchronizationClient::new(client_core, verifier);
let local_node = LocalNode::new(server.clone(), client, executor.clone());
(event_loop, handle, executor, server, local_node)
}

View File

@ -223,10 +223,10 @@ pub struct Config {
/// Synchronization client facade
pub struct SynchronizationClient<T: TaskExecutor, U: Verifier> {
/// Client
client: Arc<Mutex<SynchronizationClientCore<T>>>,
/// Client core
core: Arc<Mutex<SynchronizationClientCore<T>>>,
/// Verifier
verifier: Option<Box<U>>,
verifier: U,
}
/// Synchronization client.
@ -286,46 +286,42 @@ impl State {
impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Verifier {
fn best_block(&self) -> db::BestBlock {
self.client.lock().best_block()
self.core.lock().best_block()
}
fn state(&self) -> State {
self.client.lock().state()
self.core.lock().state()
}
fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec<H256>) {
self.client.lock().on_new_blocks_inventory(peer_index, blocks_hashes)
self.core.lock().on_new_blocks_inventory(peer_index, blocks_hashes)
}
fn on_new_transactions_inventory(&mut self, peer_index: usize, transactions_hashes: Vec<H256>) {
self.client.lock().on_new_transactions_inventory(peer_index, transactions_hashes)
self.core.lock().on_new_transactions_inventory(peer_index, transactions_hashes)
}
fn on_new_blocks_headers(&mut self, peer_index: usize, blocks_headers: Vec<BlockHeader>) {
self.client.lock().on_new_blocks_headers(peer_index, blocks_headers);
self.core.lock().on_new_blocks_headers(peer_index, blocks_headers);
}
fn on_peer_blocks_notfound(&mut self, peer_index: usize, blocks_hashes: Vec<H256>) {
self.client.lock().on_peer_blocks_notfound(peer_index, blocks_hashes);
self.core.lock().on_peer_blocks_notfound(peer_index, blocks_hashes);
}
fn on_peer_block(&mut self, peer_index: usize, block: Block) {
let blocks_to_verify = { self.client.lock().on_peer_block(peer_index, block) };
let blocks_to_verify = { self.core.lock().on_peer_block(peer_index, block) };
// verify selected blocks
if let Some(mut blocks_to_verify) = blocks_to_verify {
while let Some((_, block)) = blocks_to_verify.pop_front() {
// schedule verification
match self.verifier {
Some(ref verifier) => verifier.verify_block(block),
None => panic!("call set_verifier after construction"),
}
self.verifier.verify_block(block);
}
}
// try to switch to saturated state OR execute sync tasks
{
let mut client = self.client.lock();
let mut client = self.core.lock();
if !client.try_switch_to_saturated_state() {
client.execute_synchronization_tasks(None);
}
@ -333,53 +329,39 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
}
fn on_peer_transaction(&mut self, peer_index: usize, transaction: Transaction) {
let transactions_to_verify = { self.client.lock().on_peer_transaction(peer_index, transaction) };
let transactions_to_verify = { self.core.lock().on_peer_transaction(peer_index, transaction) };
if let Some(mut transactions_to_verify) = transactions_to_verify {
while let Some((_, tx)) = transactions_to_verify.pop_front() {
// schedule verification
match self.verifier {
Some(ref verifier) => verifier.verify_transaction(tx),
None => panic!("call set_verifier after construction"),
}
self.verifier.verify_transaction(tx);
}
}
}
fn on_peer_disconnected(&mut self, peer_index: usize) {
self.client.lock().on_peer_disconnected(peer_index);
self.core.lock().on_peer_disconnected(peer_index);
}
fn get_peers_nearly_blocks_waiter(&mut self, peer_index: usize) -> (bool, Option<Arc<PeersBlocksWaiter>>) {
self.client.lock().get_peers_nearly_blocks_waiter(peer_index)
self.core.lock().get_peers_nearly_blocks_waiter(peer_index)
}
}
impl<T, U> SynchronizationClient<T, U> where T: TaskExecutor, U: Verifier {
/// Create new synchronization client
pub fn new(config: Config, handle: &Handle, executor: Arc<Mutex<T>>, chain: ChainRef) -> Arc<Mutex<Self>> {
pub fn new(core: Arc<Mutex<SynchronizationClientCore<T>>>, verifier: U) -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(
SynchronizationClient {
client: SynchronizationClientCore::new(config, handle, executor, chain),
verifier: None,
core: core,
verifier: verifier,
}
))
}
/// Get client core
pub fn core(&self) -> Arc<Mutex<SynchronizationClientCore<T>>> {
self.client.clone()
}
/// Set verifier (TODO: use builder && check in build instead)
pub fn set_verifier(&mut self, verifier: U) {
self.verifier = Some(Box::new(verifier));
}
/// Get information on current synchronization state.
#[cfg(test)]
pub fn information(&self) -> Information {
self.client.lock().information()
self.core.lock().information()
}
}
@ -1090,7 +1072,7 @@ pub mod tests {
use parking_lot::{Mutex, RwLock};
use tokio_core::reactor::{Core, Handle};
use chain::{Block, Transaction, RepresentH256};
use super::{Client, Config, SynchronizationClient};
use super::{Client, Config, SynchronizationClient, SynchronizationClientCore};
use synchronization_executor::Task;
use synchronization_chain::{Chain, ChainRef};
use synchronization_executor::tests::DummyTaskExecutor;
@ -1117,16 +1099,13 @@ pub mod tests {
let executor = DummyTaskExecutor::new();
let config = Config { threads_num: 1 };
let client = SynchronizationClient::new(config, &handle, executor.clone(), chain.clone());
{
let verifier_sink = client.lock().core();
let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone());
let mut verifier = match verifier {
Some(verifier) => verifier,
None => DummyVerifier::new(),
};
verifier.set_sink(verifier_sink);
client.lock().set_verifier(verifier);
}
verifier.set_sink(client_core.clone());
let client = SynchronizationClient::new(client_core, verifier);
(event_loop, handle, executor, chain, client)
}