get rid of custom cpu pool in sync

This commit is contained in:
Svyatoslav Nikolsky 2017-01-11 16:36:33 +03:00
parent da07809721
commit d005307db9
6 changed files with 211 additions and 147 deletions

View File

@ -110,9 +110,8 @@ pub fn start(cfg: config::Config) -> Result<(), String> {
internet_protocol: cfg.internet_protocol,
};
let sync_handle = el.handle();
let sync_peers = create_sync_peers();
let local_sync_node = create_local_sync_node(&sync_handle, cfg.magic, db.clone(), sync_peers.clone());
let local_sync_node = create_local_sync_node(cfg.magic, db.clone(), sync_peers.clone());
let sync_connection_factory = create_sync_connection_factory(sync_peers.clone(), local_sync_node.clone());
if let Some(block_notify_command) = cfg.block_notify_command {

View File

@ -46,7 +46,6 @@ pub use types::PeersRef;
use std::sync::Arc;
use parking_lot::RwLock;
use tokio_core::reactor::Handle;
use network::Magic;
use primitives::hash::H256;
use verification::BackwardsCompatibleChainVerifier as ChainVerifier;
@ -83,7 +82,7 @@ pub fn create_sync_peers() -> PeersRef {
}
/// Creates local sync node for given `db`
pub fn create_local_sync_node(handle: &Handle, network: Magic, db: db::SharedStore, peers: PeersRef) -> LocalNodeRef {
pub fn create_local_sync_node(network: Magic, db: db::SharedStore, peers: PeersRef) -> LocalNodeRef {
use miner::MemoryPool;
use synchronization_chain::Chain as SyncChain;
use synchronization_executor::LocalSynchronizationTaskExecutor as SyncExecutor;
@ -109,7 +108,7 @@ pub fn create_local_sync_node(handle: &Handle, network: Magic, db: db::SharedSto
let chain_verifier = Arc::new(ChainVerifier::new(db.clone(), network));
let sync_executor = SyncExecutor::new(peers.clone());
let sync_server = Arc::new(ServerImpl::new(peers.clone(), db.clone(), memory_pool.clone(), sync_executor.clone()));
let sync_client_core = SynchronizationClientCore::new(sync_client_config, handle, sync_state.clone(), peers.clone(), sync_executor.clone(), sync_chain, chain_verifier.clone());
let sync_client_core = SynchronizationClientCore::new(sync_client_config, sync_state.clone(), peers.clone(), sync_executor.clone(), sync_chain, chain_verifier.clone());
let verifier_sink = Arc::new(CoreVerificationSink::new(sync_client_core.clone()));
let verifier = AsyncVerifier::new(chain_verifier, db.clone(), memory_pool.clone(), verifier_sink);
let sync_client = SynchronizationClient::new(sync_state.clone(), sync_client_core, verifier);

View File

@ -331,7 +331,6 @@ pub mod tests {
use synchronization_client::SynchronizationClient;
use synchronization_client_core::{Config, SynchronizationClientCore, CoreVerificationSink};
use synchronization_chain::Chain;
use p2p::event_loop;
use message::types;
use message::common::{InventoryVector, InventoryType};
use network::Magic;
@ -343,7 +342,6 @@ pub mod tests {
use synchronization_server::ServerTask;
use synchronization_server::tests::DummyServer;
use synchronization_verifier::tests::DummyVerifier;
use tokio_core::reactor::{Core, Handle};
use primitives::bytes::Bytes;
use verification::BackwardsCompatibleChainVerifier as ChainVerifier;
use std::iter::repeat;
@ -366,9 +364,7 @@ pub mod tests {
}
}
fn create_local_node(verifier: Option<DummyVerifier>) -> (Core, Handle, Arc<DummyTaskExecutor>, Arc<DummyServer>, LocalNode<DummyTaskExecutor, DummyServer, SynchronizationClient<DummyTaskExecutor, DummyVerifier>>) {
let event_loop = event_loop();
let handle = event_loop.handle();
fn create_local_node(verifier: Option<DummyVerifier>) -> (Arc<DummyTaskExecutor>, Arc<DummyServer>, LocalNode<DummyTaskExecutor, DummyServer, SynchronizationClient<DummyTaskExecutor, DummyVerifier>>) {
let memory_pool = Arc::new(RwLock::new(MemoryPool::new()));
let storage = Arc::new(db::TestStorage::with_genesis_block());
let sync_state = SynchronizationStateRef::new(SynchronizationState::with_storage(storage.clone()));
@ -378,7 +374,7 @@ pub mod tests {
let server = Arc::new(DummyServer::new());
let config = Config { network: Magic::Mainnet, threads_num: 1, close_connection_on_bad_block: true };
let chain_verifier = Arc::new(ChainVerifier::new(storage.clone(), Magic::Mainnet));
let client_core = SynchronizationClientCore::new(config, &handle, sync_state.clone(), sync_peers.clone(), executor.clone(), chain, chain_verifier);
let client_core = SynchronizationClientCore::new(config, sync_state.clone(), sync_peers.clone(), executor.clone(), chain, chain_verifier);
let mut verifier = match verifier {
Some(verifier) => verifier,
None => DummyVerifier::default(),
@ -386,12 +382,12 @@ pub mod tests {
verifier.set_sink(Arc::new(CoreVerificationSink::new(client_core.clone())));
let client = SynchronizationClient::new(sync_state.clone(), client_core, verifier);
let local_node = LocalNode::new(Magic::Mainnet, storage, memory_pool, sync_peers, sync_state, executor.clone(), client, server.clone());
(event_loop, handle, executor, server, local_node)
(executor, server, local_node)
}
#[test]
fn local_node_serves_block() {
let (_, _, _, server, local_node) = create_local_node(None);
let (_, server, local_node) = create_local_node(None);
let peer_index = 0; local_node.on_connect(peer_index, types::Version::default());
// peer requests genesis block
let genesis_block_hash = test_data::genesis().hash();
@ -411,7 +407,7 @@ pub mod tests {
#[test]
fn local_node_accepts_local_transaction() {
let (_, _, executor, _, local_node) = create_local_node(None);
let (executor, _, local_node) = create_local_node(None);
// transaction will be relayed to this peer
let peer_index1 = 0; local_node.on_connect(peer_index1, types::Version::default());
@ -437,7 +433,7 @@ pub mod tests {
let mut verifier = DummyVerifier::default();
verifier.error_when_verifying(transaction_hash.clone(), "simulated");
let (_, _, executor, _, local_node) = create_local_node(Some(verifier));
let (executor, _, local_node) = create_local_node(Some(verifier));
let peer_index1 = 0; local_node.on_connect(peer_index1, types::Version::default());
executor.take_tasks();

View File

@ -1,14 +1,10 @@
use std::cmp::{min, max};
use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::hash_map::Entry;
use std::time::Duration;
use std::sync::Arc;
use futures::{Future, finished};
use futures::stream::Stream;
use futures_cpupool::CpuPool;
use futures::Future;
use parking_lot::Mutex;
use time;
use tokio_core::reactor::{Handle, Interval};
use chain::{IndexedBlockHeader, IndexedTransaction, Transaction, IndexedBlock};
use db;
use message::types;
@ -19,9 +15,7 @@ use primitives::hash::H256;
use verification::BackwardsCompatibleChainVerifier as ChainVerifier;
use synchronization_chain::{Chain, BlockState, TransactionState, BlockInsertionResult};
use synchronization_executor::{Task, TaskExecutor};
use synchronization_manager::{manage_synchronization_peers_blocks, manage_synchronization_peers_headers,
manage_unknown_orphaned_blocks, manage_orphaned_transactions, MANAGEMENT_INTERVAL_MS,
ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig};
use synchronization_manager::ManagementWorker;
use synchronization_peers_tasks::PeersTasks;
use synchronization_verifier::{VerificationSink, BlockVerificationSink, TransactionVerificationSink, VerificationTask};
use types::{BlockHeight, ClientCoreRef, PeersRef, PeerIndex, SynchronizationStateRef, EmptyBoxFuture, SyncListenerRef};
@ -98,10 +92,8 @@ pub struct SynchronizationClientCore<T: TaskExecutor> {
shared_state: SynchronizationStateRef,
/// Synchronization state.
state: State,
/// Cpu pool.
pool: CpuPool,
/// Sync management worker.
management_worker: Option<EmptyBoxFuture>,
management_worker: Option<ManagementWorker>,
/// Synchronization peers
peers: PeersRef,
/// Synchronization peers tasks.
@ -713,14 +705,13 @@ impl<T> TransactionVerificationSink for CoreVerificationSink<T> where T: TaskExe
impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
/// Create new synchronization client core
pub fn new(config: Config, handle: &Handle, shared_state: SynchronizationStateRef, peers: PeersRef, executor: Arc<T>, chain: Chain, chain_verifier: Arc<ChainVerifier>) -> ClientCoreRef<Self> {
pub fn new(config: Config, shared_state: SynchronizationStateRef, peers: PeersRef, executor: Arc<T>, chain: Chain, chain_verifier: Arc<ChainVerifier>) -> ClientCoreRef<Self> {
let sync = Arc::new(Mutex::new(
SynchronizationClientCore {
shared_state: shared_state,
state: State::Saturated,
peers: peers,
peers_tasks: PeersTasks::default(),
pool: CpuPool::new(config.threads_num),
management_worker: None,
executor: executor,
chain: chain,
@ -740,42 +731,9 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
));
{
let peers_config = ManagePeersConfig::default();
let unknown_config = ManageUnknownBlocksConfig::default();
let orphan_config = ManageOrphanTransactionsConfig::default();
let csync = Arc::downgrade(&sync);
let mut sync = sync.lock();
let management_worker = Interval::new(Duration::from_millis(MANAGEMENT_INTERVAL_MS), handle)
.expect("Failed to create interval")
.and_then(move |_| {
let client = match csync.upgrade() {
Some(client) => client,
None => return Ok(()),
};
let mut client = client.lock();
client.print_synchronization_information();
if client.state.is_synchronizing() || client.state.is_nearly_saturated() {
let (blocks_to_request, blocks_to_forget) = manage_synchronization_peers_blocks(&peers_config, &mut client.peers_tasks);
client.forget_failed_blocks(&blocks_to_forget); // TODO: children of blocks_to_forget can be in blocks_to_request => these have to be removed
client.execute_synchronization_tasks(
if blocks_to_request.is_empty() { None } else { Some(blocks_to_request) },
if blocks_to_forget.is_empty() { None } else { Some(blocks_to_forget) },
);
manage_synchronization_peers_headers(&peers_config, &mut client.peers_tasks);
manage_orphaned_transactions(&orphan_config, &mut client.orphaned_transactions_pool);
if let Some(orphans_to_remove) = manage_unknown_orphaned_blocks(&unknown_config, &mut client.orphaned_blocks_pool) {
for orphan_to_remove in orphans_to_remove {
client.chain.forget_block(&orphan_to_remove);
}
}
}
Ok(())
})
.for_each(|_| Ok(()))
.then(|_| finished::<(), ()>(()))
.boxed();
sync.management_worker = Some(sync.pool.spawn(management_worker).boxed());
let mut lsync = sync.lock();
lsync.management_worker = Some(ManagementWorker::new(csync));
}
sync
@ -793,24 +751,73 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
}
}
/// Get synchronization state
pub fn state(&self) -> State {
self.state
}
/// Return chain reference
pub fn chain(&mut self) -> &mut Chain {
&mut self.chain
}
/// Return peers tasks reference
pub fn peers_tasks(&mut self) -> &mut PeersTasks {
&mut self.peers_tasks
}
/// Get orphaned blocks pool reference
pub fn orphaned_blocks_pool(&mut self) -> &mut OrphanBlocksPool {
&mut self.orphaned_blocks_pool
}
/// Get orphaned transactions pool reference
pub fn orphaned_transactions_pool(&mut self) -> &mut OrphanTransactionsPool {
&mut self.orphaned_transactions_pool
}
/// Verify block headers or not?
#[cfg(test)]
pub fn set_verify_headers(&mut self, verify: bool) {
self.verify_headers = verify;
}
/// Return chain reference
#[cfg(test)]
pub fn chain(&mut self) -> &mut Chain {
&mut self.chain
}
/// Return peers reference
#[cfg(test)]
pub fn peers(&mut self) -> &Peers {
&*self.peers
}
/// Print synchronization information
pub fn print_synchronization_information(&mut self) {
if let State::Synchronizing(timestamp, num_of_blocks) = self.state {
let new_timestamp = time::precise_time_s();
let timestamp_diff = new_timestamp - timestamp;
let new_num_of_blocks = self.chain.best_storage_block().number;
let blocks_diff = if new_num_of_blocks > num_of_blocks { new_num_of_blocks - num_of_blocks } else { 0 };
if timestamp_diff >= 60.0 || blocks_diff > 1000 {
self.state = State::Synchronizing(time::precise_time_s(), new_num_of_blocks);
use time;
info!(target: "sync", "{:?} @ Processed {} blocks in {} seconds. Chain information: {:?}"
, time::strftime("%H:%M:%S", &time::now()).unwrap()
, blocks_diff, timestamp_diff
, self.chain.information());
}
}
}
/// Forget blocks, which have been requested several times, but no one has responded
pub fn forget_failed_blocks(&mut self, blocks_to_forget: &[H256]) {
if blocks_to_forget.is_empty() {
return;
}
for block_to_forget in blocks_to_forget {
self.chain.forget_block_with_children(block_to_forget);
}
}
/// Verify and select unknown headers for scheduling
fn verify_headers(&mut self, peer_index: PeerIndex, last_known_hash: H256, headers: &[IndexedBlockHeader]) -> BlocksHeadersVerificationResult {
// validate blocks headers before scheduling
@ -900,16 +907,6 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
Ok(transactions)
}
fn forget_failed_blocks(&mut self, blocks_to_forget: &[H256]) {
if blocks_to_forget.is_empty() {
return;
}
for block_to_forget in blocks_to_forget {
self.chain.forget_block_with_children(block_to_forget);
}
}
fn prepare_blocks_requests_tasks(&mut self, mut peers: Vec<PeerIndex>, mut hashes: Vec<H256>) -> Vec<Task> {
use std::mem::swap;
@ -1180,32 +1177,12 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
block_entry.remove_entry();
}
}
/// Print synchronization information
fn print_synchronization_information(&mut self) {
if let State::Synchronizing(timestamp, num_of_blocks) = self.state {
let new_timestamp = time::precise_time_s();
let timestamp_diff = new_timestamp - timestamp;
let new_num_of_blocks = self.chain.best_storage_block().number;
let blocks_diff = if new_num_of_blocks > num_of_blocks { new_num_of_blocks - num_of_blocks } else { 0 };
if timestamp_diff >= 60.0 || blocks_diff > 1000 {
self.state = State::Synchronizing(time::precise_time_s(), new_num_of_blocks);
use time;
info!(target: "sync", "{:?} @ Processed {} blocks in {} seconds. Chain information: {:?}"
, time::strftime("%H:%M:%S", &time::now()).unwrap()
, blocks_diff, timestamp_diff
, self.chain.information());
}
}
}
}
#[cfg(test)]
pub mod tests {
use std::sync::Arc;
use parking_lot::{Mutex, RwLock};
use tokio_core::reactor::{Core, Handle};
use chain::{Block, Transaction};
use db;
use devtools::RandomTempPath;
@ -1213,7 +1190,6 @@ pub mod tests {
use message::types;
use miner::MemoryPool;
use network::Magic;
use p2p::event_loop;
use primitives::hash::H256;
use test_data;
use verification::BackwardsCompatibleChainVerifier as ChainVerifier;
@ -1262,10 +1238,8 @@ pub mod tests {
Arc::new(db::Storage::new(path.as_path()).unwrap())
}
fn create_sync(storage: Option<StorageRef>, verifier: Option<DummyVerifier>) -> (Core, Handle, Arc<DummyTaskExecutor>, ClientCoreRef<SynchronizationClientCore<DummyTaskExecutor>>, Arc<SynchronizationClient<DummyTaskExecutor, DummyVerifier>>) {
fn create_sync(storage: Option<StorageRef>, verifier: Option<DummyVerifier>) -> (Arc<DummyTaskExecutor>, ClientCoreRef<SynchronizationClientCore<DummyTaskExecutor>>, Arc<SynchronizationClient<DummyTaskExecutor, DummyVerifier>>) {
let sync_peers = Arc::new(PeersImpl::default());
let event_loop = event_loop();
let handle = event_loop.handle();
let storage = match storage {
Some(storage) => storage,
None => Arc::new(db::TestStorage::with_genesis_block()),
@ -1277,7 +1251,7 @@ pub mod tests {
let config = Config { network: Magic::Mainnet, threads_num: 1, close_connection_on_bad_block: true };
let chain_verifier = Arc::new(ChainVerifier::new(storage.clone(), Magic::Unitest));
let client_core = SynchronizationClientCore::new(config, &handle, sync_state.clone(), sync_peers.clone(), executor.clone(), chain, chain_verifier.clone());
let client_core = SynchronizationClientCore::new(config, sync_state.clone(), sync_peers.clone(), executor.clone(), chain, chain_verifier.clone());
{
client_core.lock().set_verify_headers(false);
}
@ -1288,7 +1262,7 @@ pub mod tests {
verifier.set_verifier(chain_verifier);
let client = SynchronizationClient::new(sync_state, client_core.clone(), verifier);
(event_loop, handle, executor, client_core, client)
(executor, client_core, client)
}
fn request_block_headers_genesis(peer_index: PeerIndex) -> Task {
@ -1308,7 +1282,7 @@ pub mod tests {
#[test]
fn synchronization_request_inventory_on_sync_start() {
let (_, _, executor, _, sync) = create_sync(None, None);
let (executor, _, sync) = create_sync(None, None);
// start sync session
sync.on_connect(0);
// => ask for inventory
@ -1318,7 +1292,7 @@ pub mod tests {
#[test]
fn synchronization_saturated_on_start() {
let (_, _, _, core, _) = create_sync(None, None);
let (_, core, _) = create_sync(None, None);
let info = core.lock().information();
assert!(!info.state.is_synchronizing());
assert_eq!(info.orphaned_blocks, 0);
@ -1327,7 +1301,7 @@ pub mod tests {
#[test]
fn synchronization_in_order_block_path_nearly_saturated() {
let (_, _, executor, core, sync) = create_sync(None, None);
let (executor, core, sync) = create_sync(None, None);
let block1: Block = test_data::block_h1();
let block2: Block = test_data::block_h2();
@ -1367,7 +1341,7 @@ pub mod tests {
#[test]
fn synchronization_out_of_order_block_path() {
let (_, _, _, core, sync) = create_sync(None, None);
let (_, core, sync) = create_sync(None, None);
sync.on_headers(5, types::Headers::with_headers(vec![test_data::block_h1().block_header.clone(), test_data::block_h2().block_header.clone()]));
sync.on_block(5, test_data::block_h169().into());
@ -1386,7 +1360,7 @@ pub mod tests {
#[test]
fn synchronization_parallel_peers() {
let (_, _, executor, core, sync) = create_sync(None, None);
let (executor, core, sync) = create_sync(None, None);
let block1: Block = test_data::block_h1();
let block2: Block = test_data::block_h2();
@ -1430,7 +1404,7 @@ pub mod tests {
#[test]
fn synchronization_reset_when_peer_is_disconnected() {
let (_, _, _, core, sync) = create_sync(None, None);
let (_, core, sync) = create_sync(None, None);
// request new blocks
{
@ -1447,7 +1421,7 @@ pub mod tests {
#[test]
fn synchronization_not_starting_when_receiving_known_blocks() {
let (_, _, executor, core, sync) = create_sync(None, None);
let (executor, core, sync) = create_sync(None, None);
// saturated => receive inventory with known blocks only
sync.on_headers(1, types::Headers::with_headers(vec![test_data::genesis().block_header]));
// => no need to start synchronization
@ -1459,7 +1433,7 @@ pub mod tests {
#[test]
fn synchronization_asks_for_inventory_after_saturating() {
let (_, _, executor, _, sync) = create_sync(None, None);
let (executor, _, sync) = create_sync(None, None);
let block = test_data::block_h1();
sync.on_headers(1, types::Headers::with_headers(vec![block.block_header.clone()]));
sync.on_headers(2, types::Headers::with_headers(vec![block.block_header.clone()]));
@ -1480,7 +1454,7 @@ pub mod tests {
#[test]
fn synchronization_remembers_correct_block_headers_in_order() {
let (_, _, executor, core, sync) = create_sync(None, None);
let (executor, core, sync) = create_sync(None, None);
let b1 = test_data::block_h1();
let b2 = test_data::block_h2();
@ -1522,7 +1496,7 @@ pub mod tests {
#[test]
fn synchronization_remembers_correct_block_headers_out_of_order() {
let (_, _, executor, core, sync) = create_sync(None, None);
let (executor, core, sync) = create_sync(None, None);
let b1 = test_data::block_h1();
let b2 = test_data::block_h2();
@ -1564,7 +1538,7 @@ pub mod tests {
#[test]
fn synchronization_ignores_unknown_block_headers() {
let (_, _, executor, core, sync) = create_sync(None, None);
let (executor, core, sync) = create_sync(None, None);
let b169 = test_data::block_h169();
sync.on_headers(1, types::Headers::with_headers(vec![b169.block_header]));
@ -1583,7 +1557,7 @@ pub mod tests {
let genesis = test_data::genesis();
storage.insert_block(&genesis).expect("no db error");
let (_, _, executor, core, sync) = create_sync(Some(storage), None);
let (executor, core, sync) = create_sync(Some(storage), None);
let genesis_header = &genesis.block_header;
let fork1 = test_data::build_n_empty_blocks_from(2, 100, &genesis_header);
let fork2 = test_data::build_n_empty_blocks_from(3, 200, &genesis_header);
@ -1640,7 +1614,7 @@ pub mod tests {
let genesis = test_data::genesis();
storage.insert_block(&genesis).expect("no db error");
let (_, _, executor, core, sync) = create_sync(Some(storage), None);
let (executor, core, sync) = create_sync(Some(storage), None);
let common_block = test_data::block_builder().header().parent(genesis.hash()).build().build();
let fork1 = test_data::build_n_empty_blocks_from(2, 100, &common_block.block_header);
let fork2 = test_data::build_n_empty_blocks_from(3, 200, &common_block.block_header);
@ -1679,7 +1653,7 @@ pub mod tests {
#[test]
fn accept_out_of_order_blocks_when_saturated() {
let (_, _, _, core, sync) = create_sync(None, None);
let (_, core, sync) = create_sync(None, None);
sync.on_block(1, test_data::block_h2().into());
assert_eq!(core.lock().information().orphaned_blocks, 1);
@ -1700,7 +1674,7 @@ pub mod tests {
#[test]
fn do_not_rerequest_unknown_block_in_inventory() {
let (_, _, executor, _, sync) = create_sync(None, None);
let (executor, _, sync) = create_sync(None, None);
sync.on_block(1, test_data::block_h2().into());
sync.on_inventory(1, types::Inv::with_inventory(vec![
@ -1716,7 +1690,7 @@ pub mod tests {
#[test]
fn blocks_rerequested_on_peer_disconnect() {
let (_, _, executor, _, sync) = create_sync(None, None);
let (executor, _, sync) = create_sync(None, None);
let block1: Block = test_data::block_h1();
let block2: Block = test_data::block_h2();
@ -1761,7 +1735,7 @@ pub mod tests {
storage.insert_error(block.hash(), db::Error::Consistency(db::ConsistencyError::NoBestBlock));
let best_genesis = storage.best_block().unwrap();
let (_, _, _, core, sync) = create_sync(Some(Arc::new(storage)), None);
let (_, core, sync) = create_sync(Some(Arc::new(storage)), None);
sync.on_block(1, block.into());
@ -1771,7 +1745,7 @@ pub mod tests {
#[test]
fn peer_removed_from_sync_after_responding_with_requested_block_notfound() {
let (_, _, executor, core, sync) = create_sync(None, None);
let (executor, core, sync) = create_sync(None, None);
let b1 = test_data::block_h1();
let b2 = test_data::block_h2();
@ -1796,7 +1770,7 @@ pub mod tests {
#[test]
fn peer_not_removed_from_sync_after_responding_with_non_requested_block_notfound() {
let (_, _, executor, core, sync) = create_sync(None, None);
let (executor, core, sync) = create_sync(None, None);
let b1 = test_data::block_h1();
let b2 = test_data::block_h2();
@ -1821,7 +1795,7 @@ pub mod tests {
#[test]
fn transaction_is_not_requested_when_synchronizing() {
let (_, _, executor, core, sync) = create_sync(None, None);
let (executor, core, sync) = create_sync(None, None);
let b1 = test_data::block_h1();
let b2 = test_data::block_h2();
@ -1838,7 +1812,7 @@ pub mod tests {
#[test]
fn transaction_is_requested_when_not_synchronizing() {
let (_, _, executor, core, sync) = create_sync(None, None);
let (executor, core, sync) = create_sync(None, None);
sync.on_inventory(0, types::Inv::with_inventory(vec![InventoryVector::tx(H256::from(0))]));
@ -1861,7 +1835,7 @@ pub mod tests {
#[test]
fn same_transaction_can_be_requested_twice() {
let (_, _, executor, _, sync) = create_sync(None, None);
let (executor, _, sync) = create_sync(None, None);
sync.on_inventory(0, types::Inv::with_inventory(vec![InventoryVector::tx(H256::from(0))]));
@ -1880,7 +1854,7 @@ pub mod tests {
#[test]
fn known_transaction_is_not_requested() {
let (_, _, executor, _, sync) = create_sync(None, None);
let (executor, _, sync) = create_sync(None, None);
sync.on_inventory(0, types::Inv::with_inventory(vec![
InventoryVector::tx(test_data::genesis().transactions[0].hash()),
@ -1893,7 +1867,7 @@ pub mod tests {
#[test]
fn transaction_is_not_accepted_when_synchronizing() {
let (_, _, _, core, sync) = create_sync(None, None);
let (_, core, sync) = create_sync(None, None);
let b1 = test_data::block_h1();
let b2 = test_data::block_h2();
@ -1908,7 +1882,7 @@ pub mod tests {
#[test]
fn transaction_is_accepted_when_not_synchronizing() {
let (_, _, _, core, sync) = create_sync(None, None);
let (_, core, sync) = create_sync(None, None);
sync.on_transaction(1, test_data::TransactionBuilder::with_version(1).into());
assert_eq!(core.lock().information().chain.transactions.transactions_count, 1);
@ -1924,7 +1898,7 @@ pub mod tests {
#[test]
fn transaction_is_orphaned_when_input_is_unknown() {
let (_, _, _, core, sync) = create_sync(None, None);
let (_, core, sync) = create_sync(None, None);
sync.on_transaction(1, test_data::TransactionBuilder::with_default_input(0).into());
assert_eq!(core.lock().information().chain.transactions.transactions_count, 0);
@ -1937,7 +1911,7 @@ pub mod tests {
test_data::TransactionBuilder::with_output(10).store(chain) // t0
.set_input(&chain.at(0), 0).set_output(20).store(chain); // t0 -> t1
let (_, _, _, core, sync) = create_sync(None, None);
let (_, core, sync) = create_sync(None, None);
sync.on_transaction(1, chain.at(1).into());
assert_eq!(core.lock().information().chain.transactions.transactions_count, 0);
@ -1980,7 +1954,7 @@ pub mod tests {
let mut dummy_verifier = DummyVerifier::default();
dummy_verifier.error_when_verifying(b21.hash(), "simulated");
let (_, _, _, _, sync) = create_sync(None, Some(dummy_verifier));
let (_, _, sync) = create_sync(None, Some(dummy_verifier));
sync.on_headers(1, types::Headers::with_headers(vec![b10.block_header.clone(), b11.block_header.clone(), b12.block_header.clone()]));
sync.on_headers(2, types::Headers::with_headers(vec![b10.block_header.clone(), b21.block_header.clone(), b22.block_header.clone()]));
@ -1998,7 +1972,7 @@ pub mod tests {
#[test]
fn relay_new_block_when_in_saturated_state() {
let (_, _, executor, _, sync) = create_sync(None, None);
let (executor, _, sync) = create_sync(None, None);
let genesis = test_data::genesis();
let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build();
let b1 = test_data::block_builder().header().parent(b0.hash()).build().build();
@ -2044,7 +2018,7 @@ pub mod tests {
#[test]
fn relay_new_transaction_when_in_saturated_state() {
let (_, _, executor, _, sync) = create_sync(None, None);
let (executor, _, sync) = create_sync(None, None);
let tx: Transaction = test_data::TransactionBuilder::with_output(20).into();
@ -2059,7 +2033,7 @@ pub mod tests {
#[test]
fn receive_same_unknown_block_twice() {
let (_, _, _, _, sync) = create_sync(None, None);
let (_, _, sync) = create_sync(None, None);
sync.on_block(1, test_data::block_h2().into());
// should not panic here
@ -2075,7 +2049,7 @@ pub mod tests {
let mut dummy_verifier = DummyVerifier::default();
dummy_verifier.error_when_verifying(b0.hash(), "simulated");
let (_, _, _, core, sync) = create_sync(None, Some(dummy_verifier));
let (_, core, sync) = create_sync(None, Some(dummy_verifier));
core.lock().peers.insert(0, DummyOutboundSyncConnection::new());
assert!(core.lock().peers.enumerate().contains(&0));
@ -2092,7 +2066,7 @@ pub mod tests {
let b1 = test_data::block_builder().header().parent(b0.hash()).build().build();
let b2 = test_data::block_builder().header().parent(b1.hash()).build().build();
let (_, _, _, core, sync) = create_sync(None, None);
let (_, core, sync) = create_sync(None, None);
{
let mut core = core.lock(); let mut chain = core.chain();
chain.mark_dead_end_block(&b0.hash());
@ -2113,7 +2087,7 @@ pub mod tests {
let b1 = test_data::block_builder().header().parent(b0.hash()).build().build();
let b2 = test_data::block_builder().header().parent(b1.hash()).build().build();
let (_, _, _, core, sync) = create_sync(None, None);
let (_, core, sync) = create_sync(None, None);
{
let mut core = core.lock(); let mut chain = core.chain();
chain.mark_dead_end_block(&b1.hash());
@ -2133,7 +2107,7 @@ pub mod tests {
let genesis = test_data::genesis();
let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build();
let (_, _, _, core, sync) = create_sync(None, None);
let (_, core, sync) = create_sync(None, None);
{
let mut core = core.lock(); let mut chain = core.chain();
chain.mark_dead_end_block(&b0.hash());
@ -2153,7 +2127,7 @@ pub mod tests {
let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build();
let b1 = test_data::block_builder().header().parent(b0.hash()).build().build();
let (_, _, _, core, sync) = create_sync(None, None);
let (_, core, sync) = create_sync(None, None);
{
let mut core = core.lock(); let mut chain = core.chain();
chain.mark_dead_end_block(&b0.hash());
@ -2174,7 +2148,7 @@ pub mod tests {
let b1 = test_data::block_builder().header().parent(genesis.hash()).build().build(); // another branch
let b2 = test_data::block_builder().header().parent(b1.hash()).build().build();
let (_, _, executor, core, sync) = create_sync(None, None);
let (executor, core, sync) = create_sync(None, None);
// when peer1 announces 'false' b0
sync.on_headers(1, types::Headers::with_headers(vec![b0.block_header.clone()]));
@ -2203,7 +2177,7 @@ pub mod tests {
#[test]
fn when_got_same_orphan_transaction_twice() {
let (_, _, _, core, sync) = create_sync(None, None);
let (_, core, sync) = create_sync(None, None);
sync.on_transaction(1, test_data::TransactionBuilder::with_default_input(0).into());
assert_eq!(core.lock().information().chain.transactions.transactions_count, 0);
@ -2296,7 +2270,7 @@ pub mod tests {
let storage = create_disk_storage();
storage.insert_block(&b0).expect("no db error");
let (_, _, _, core, sync) = create_sync(Some(storage), Some(dummy_verifier));
let (_, core, sync) = create_sync(Some(storage), Some(dummy_verifier));
sync.on_block(0, b1.clone().into());
sync.on_transaction(0, tx2.clone().into());
sync.on_transaction(0, tx3.clone().into());
@ -2323,7 +2297,7 @@ pub mod tests {
#[test]
fn sync_listener_calls() {
let (_, _, _, _, sync) = create_sync(None, None);
let (_, _, sync) = create_sync(None, None);
// install sync listener
let data = Arc::new(Mutex::new(DummySyncListenerData::default()));

View File

@ -1,11 +1,17 @@
use std::collections::HashSet;
use std::sync::{Arc, Weak};
use std::thread;
use std::time::Duration;
use parking_lot::{Mutex, Condvar};
use time::precise_time_s;
use primitives::hash::H256;
use synchronization_client_core::{ClientCore, SynchronizationClientCore};
use synchronization_executor::TaskExecutor;
use synchronization_peers_tasks::PeersTasks;
use utils::{OrphanBlocksPool, OrphanTransactionsPool};
/// Management interval (in ms)
pub const MANAGEMENT_INTERVAL_MS: u64 = 10 * 1000;
const MANAGEMENT_INTERVAL_MS: u64 = 10 * 1000;
/// Response time before getting block to decrease peer score
const DEFAULT_PEER_BLOCK_FAILURE_INTERVAL_MS: u32 = 60 * 1000;
/// Response time before getting headers to decrease peer score
@ -19,6 +25,94 @@ const DEFAULT_ORPHAN_TRANSACTION_REMOVAL_TIME_MS: u32 = 10 * 60 * 1000;
/// Maximal number of orphaned transactions
const DEFAULT_ORPHAN_TRANSACTIONS_MAX_LEN: usize = 10000;
/// Synchronization management worker
pub struct ManagementWorker {
/// Stop flag.
is_stopping: Arc<Mutex<bool>>,
/// Stop event.
stopping_event: Arc<Condvar>,
/// Verification thread.
thread: Option<thread::JoinHandle<()>>,
}
impl ManagementWorker {
pub fn new<T: TaskExecutor>(core: Weak<Mutex<SynchronizationClientCore<T>>>) -> Self {
let is_stopping = Arc::new(Mutex::new(false));
let stopping_event = Arc::new(Condvar::new());
ManagementWorker {
is_stopping: is_stopping.clone(),
stopping_event: stopping_event.clone(),
thread: Some(thread::Builder::new()
.name("Sync management thread".to_string())
.spawn(move || ManagementWorker::worker_proc(is_stopping, stopping_event, core))
.expect("Error creating management thread"))
}
}
fn worker_proc<T: TaskExecutor>(is_stopping: Arc<Mutex<bool>>, stopping_event: Arc<Condvar>, core: Weak<Mutex<SynchronizationClientCore<T>>>) {
let peers_config = ManagePeersConfig::default();
let unknown_config = ManageUnknownBlocksConfig::default();
let orphan_config = ManageOrphanTransactionsConfig::default();
loop {
let mut lock = is_stopping.lock();
if *lock {
break;
}
if !stopping_event.wait_for(&mut lock, Duration::from_millis(MANAGEMENT_INTERVAL_MS)).timed_out() {
if *lock {
break;
}
// spurious wakeup?
continue;
}
drop(lock);
// if core is dropped => stop thread
let core = match core.upgrade() {
None => break,
Some(core) => core,
};
let mut core = core.lock();
// trace synchronization state
core.print_synchronization_information();
// execute management tasks if not saturated
if core.state().is_synchronizing() || core.state().is_nearly_saturated() {
let (blocks_to_request, blocks_to_forget) = manage_synchronization_peers_blocks(&peers_config, core.peers_tasks());
core.forget_failed_blocks(&blocks_to_forget);
core.execute_synchronization_tasks(
if blocks_to_request.is_empty() { None } else { Some(blocks_to_request) },
if blocks_to_forget.is_empty() { None } else { Some(blocks_to_forget) },
);
manage_synchronization_peers_headers(&peers_config, core.peers_tasks());
manage_orphaned_transactions(&orphan_config, core.orphaned_transactions_pool());
if let Some(orphans_to_remove) = manage_unknown_orphaned_blocks(&unknown_config, core.orphaned_blocks_pool()) {
for orphan_to_remove in orphans_to_remove {
core.chain().forget_block(&orphan_to_remove);
}
}
}
}
trace!(target: "sync", "Stopping sync management thread");
}
}
impl Drop for ManagementWorker {
fn drop(&mut self) {
if let Some(join_handle) = self.thread.take() {
*self.is_stopping.lock() = true;
self.stopping_event.notify_all();
join_handle.join().expect("Clean shutdown.");
}
}
}
/// Peers management configuration
pub struct ManagePeersConfig {
/// Time interval (in milliseconds) to wait block from the peer before penalizing && reexecuting tasks

View File

@ -79,7 +79,7 @@ impl AsyncVerifier {
.spawn(move || {
AsyncVerifier::verification_worker_proc(sink, storage, memory_pool, verifier, verification_work_receiver)
})
.expect("Error creating verification thread"))
.expect("Error creating sync verification thread"))
}
}
@ -90,6 +90,8 @@ impl AsyncVerifier {
break;
}
}
trace!(target: "sync", "Stopping sync verification thread");
}
/// Execute single verification task