fixed TODOs

This commit is contained in:
Svyatoslav Nikolsky 2016-12-27 09:33:08 +03:00
parent c23b0f55a4
commit 59378066b8
5 changed files with 39 additions and 22 deletions

View File

@ -13,7 +13,7 @@ pub trait LocalSyncNode : Send + Sync {
}
pub trait InboundSyncConnection : Send + Sync {
fn start_sync_session(&self, version: u32);
fn start_sync_session(&self, version: types::Version);
fn close_session(&self);
fn on_inventory(&self, message: types::Inv);
fn on_getdata(&self, message: types::GetData);
@ -181,7 +181,20 @@ impl SyncProtocol {
impl Protocol for SyncProtocol {
fn initialize(&mut self) {
self.inbound_connection.start_sync_session(self.context.info().version);
// TODO
use message::common;
let version = types::Version::V0(types::version::V0 {
version: self.context.info().version,
services: common::Services::default(),
timestamp: 0,
receiver: common::NetAddress {
services: common::Services::default(),
address: common::IpAddress::from("127.0.0.1"),
port: common::Port::from(0),
},
});
self.inbound_connection.start_sync_session(version);
}
fn on_message(&mut self, command: &Command, payload: &Bytes) -> Result<(), Error> {

View File

@ -1,5 +1,4 @@
use chain::{IndexedTransaction, IndexedBlock};
use message::common;
use message::types;
use p2p::{InboundSyncConnection, InboundSyncConnectionRef};
use types::{PeersRef, LocalNodeRef, PeerIndex, RequestId};
@ -32,18 +31,7 @@ impl InboundConnection {
}
impl InboundSyncConnection for InboundConnection {
fn start_sync_session(&self, version: u32) {
// TODO
let version = types::Version::V0(types::version::V0 {
version: version,
services: common::Services::default(),
timestamp: 0,
receiver: common::NetAddress {
services: common::Services::default(),
address: common::IpAddress::from("127.0.0.1"),
port: common::Port::from(0),
},
});
fn start_sync_session(&self, version: types::Version) {
self.node.on_connect(self.peer_index, version);
}

View File

@ -104,7 +104,7 @@ pub fn create_local_sync_node(handle: &Handle, network: Magic, db: db::SharedSto
let verifier_sink = Arc::new(CoreVerificationSink::new(sync_client_core.clone()));
let verifier = AsyncVerifier::new(chain_verifier, db.clone(), memory_pool.clone(), verifier_sink);
let sync_client = SynchronizationClient::new(sync_state.clone(), sync_client_core, verifier);
Arc::new(SyncNode::new(peers, sync_state, sync_executor, sync_client, sync_server))
Arc::new(SyncNode::new(network, db, memory_pool, peers, sync_state, sync_executor, sync_client, sync_server))
}
/// Create inbound synchronization connections factory for given local sync node.

View File

@ -1,8 +1,11 @@
use std::sync::Arc;
use parking_lot::{Mutex, Condvar};
use time;
use futures::{Future, lazy, finished};
use chain::{Transaction, IndexedTransaction, IndexedBlock};
use message::types;
use miner::BlockAssembler;
use network::Magic;
use synchronization_client::{Client};
use synchronization_executor::{Task as SynchronizationTask, TaskExecutor};
use synchronization_server::{Server, ServerTask};
@ -10,10 +13,17 @@ use synchronization_verifier::{TransactionVerificationSink};
use primitives::hash::H256;
use miner::BlockTemplate;
use synchronization_peers::{TransactionAnnouncementType, BlockAnnouncementType};
use types::{PeerIndex, RequestId, PeersRef, ExecutorRef, ClientRef, ServerRef, SynchronizationStateRef};
use types::{PeerIndex, RequestId, StorageRef, MemoryPoolRef, PeersRef, ExecutorRef,
ClientRef, ServerRef, SynchronizationStateRef};
/// Local synchronization node
pub struct LocalNode<T: TaskExecutor, U: Server, V: Client> {
/// Network we are working on
network: Magic,
/// Storage reference
storage: StorageRef,
/// Memory pool reference
memory_pool: MemoryPoolRef,
/// Synchronization peers
peers: PeersRef,
/// Shared synchronization state
@ -39,8 +49,12 @@ struct TransactionAcceptSinkData {
impl<T, U, V> LocalNode<T, U, V> where T: TaskExecutor, U: Server, V: Client {
/// Create new synchronization node
pub fn new(peers: PeersRef, state: SynchronizationStateRef, executor: ExecutorRef<T>, client: ClientRef<V>, server: ServerRef<U>) -> Self {
pub fn new(network: Magic, storage: StorageRef, memory_pool: MemoryPoolRef, peers: PeersRef,
state: SynchronizationStateRef, executor: ExecutorRef<T>, client: ClientRef<V>, server: ServerRef<U>) -> Self {
LocalNode {
network: network,
storage: storage,
memory_pool: memory_pool,
peers: peers,
state: state,
executor: executor,
@ -258,7 +272,9 @@ impl<T, U, V> LocalNode<T, U, V> where T: TaskExecutor, U: Server, V: Client {
}
pub fn get_block_template(&self) -> BlockTemplate {
unimplemented!() // TODO
let block_assembler = BlockAssembler::default();
let memory_pool = &*self.memory_pool.read();
block_assembler.create_new_block(&self.storage, memory_pool, time::get_time().sec as u32, self.network)
}
}
@ -348,7 +364,7 @@ pub mod tests {
let memory_pool = Arc::new(RwLock::new(MemoryPool::new()));
let storage = Arc::new(db::TestStorage::with_genesis_block());
let sync_state = SynchronizationStateRef::new(SynchronizationState::with_storage(storage.clone()));
let chain = Chain::new(storage.clone(), memory_pool);
let chain = Chain::new(storage.clone(), memory_pool.clone());
let sync_peers = Arc::new(PeersImpl::default());
let executor = DummyTaskExecutor::new();
let server = Arc::new(DummyServer::new());
@ -361,7 +377,7 @@ pub mod tests {
};
verifier.set_sink(Arc::new(CoreVerificationSink::new(client_core.clone())));
let client = SynchronizationClient::new(sync_state.clone(), client_core, verifier);
let local_node = LocalNode::new(sync_peers, sync_state, executor.clone(), client, server.clone());
let local_node = LocalNode::new(Magic::Mainnet, storage, memory_pool, sync_peers, sync_state, executor.clone(), client, server.clone());
(event_loop, handle, executor, server, local_node)
}

View File

@ -98,7 +98,7 @@ use types::{PeerIndex, ClientCoreRef, SynchronizationStateRef, EmptyBoxFuture};
///!
///! manage_synchronization_peers: When management thread awakes:
///! 1) for peer in active_peers.where(p => now() - p.last_request_time() > failure_interval):
///! 1.1) return all peer' tasks to the tasks pool + TODO: filter tasks (if we have requested some hash several times from several peers && they haven't responded => drop this hash + reset sync???)
///! 1.1) return all peer' tasks to the tasks pool
///! 1.2) increase # of failures for this peer
///! 1.3) if # of failures > max_failures: ===> super-bad peer
///! 1.3.1) forget peer