diff --git a/Cargo.lock b/Cargo.lock index 3500c17a..ffddfab5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10,6 +10,7 @@ dependencies = [ "miner 0.1.0", "p2p 0.1.0", "script 0.1.0", + "sync 0.1.0", "verification 0.1.0", ] @@ -521,6 +522,18 @@ name = "strsim" version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "sync" +version = "0.1.0" +dependencies = [ + "chain 0.1.0", + "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", + "message 0.1.0", + "p2p 0.1.0", + "parking_lot 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", + "primitives 0.1.0", +] + [[package]] name = "term_size" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index b2cc74bd..3bd552f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ p2p = { path = "p2p" } script = { path = "script" } db = { path = "db" } verification = { path = "verification" } +sync = { path = "sync" } [[bin]] path = "pbtc/main.rs" diff --git a/chain/src/block.rs b/chain/src/block.rs index 87a53bd9..e06a700b 100644 --- a/chain/src/block.rs +++ b/chain/src/block.rs @@ -1,17 +1,16 @@ use hex::FromHex; -use crypto::dhash256; use hash::H256; use ser::{ Deserializable, Reader, Error as ReaderError, deserialize, - Serializable, Stream, serialize + Serializable, Stream }; use merkle_root::merkle_root; use {BlockHeader, Transaction}; -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub struct Block { - block_header: BlockHeader, - transactions: Vec, + pub block_header: BlockHeader, + pub transactions: Vec, } impl Serializable for Block { @@ -45,7 +44,7 @@ impl Block { } pub fn hash(&self) -> H256 { - dhash256(&serialize(&self.block_header)) + self.block_header.hash() } /// Returns block's merkle root. diff --git a/chain/src/block_header.rs b/chain/src/block_header.rs index d368b07d..616da011 100644 --- a/chain/src/block_header.rs +++ b/chain/src/block_header.rs @@ -1,11 +1,12 @@ use std::fmt; use ser::{ Deserializable, Reader, Error as ReaderError, - Serializable, Stream + Serializable, Stream, serialize }; +use crypto::dhash256; use hash::H256; -#[derive(PartialEq)] +#[derive(PartialEq, Clone)] pub struct BlockHeader { pub version: u32, pub previous_header_hash: H256, @@ -15,6 +16,12 @@ pub struct BlockHeader { pub nonce: u32, } +impl BlockHeader { + pub fn hash(&self) -> H256 { + dhash256(&serialize(self)) + } +} + impl fmt::Debug for BlockHeader { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("BlockHeader") diff --git a/message/src/common/inventory.rs b/message/src/common/inventory.rs index 796546cd..a28b9d61 100644 --- a/message/src/common/inventory.rs +++ b/message/src/common/inventory.rs @@ -30,7 +30,7 @@ impl From for u32 { } } -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub struct InventoryVector { pub inv_type: u32, pub hash: H256, diff --git a/message/src/types/block.rs b/message/src/types/block.rs new file mode 100644 index 00000000..6c839abd --- /dev/null +++ b/message/src/types/block.rs @@ -0,0 +1,31 @@ +use ser::{Stream, Reader}; +use chain::Block as ChainBlock; +use {Payload, MessageResult}; + +#[derive(Debug, PartialEq)] +pub struct Block { + pub block: ChainBlock, +} + +impl Payload for Block { + fn version() -> u32 { + 0 + } + + fn command() -> &'static str { + "block" + } + + fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { + let tx = Block { + block: try!(reader.read()), + }; + + Ok(tx) + } + + fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> { + stream.append(&self.block); + Ok(()) + } +} diff --git a/message/src/types/mod.rs b/message/src/types/mod.rs index 1a9bb8ad..914c0818 100644 --- a/message/src/types/mod.rs +++ b/message/src/types/mod.rs @@ -1,4 +1,5 @@ pub mod addr; +mod block; mod blocktxn; mod compactblock; mod feefilter; @@ -20,10 +21,12 @@ mod pong; pub mod reject; mod sendcompact; mod sendheaders; +mod tx; mod verack; pub mod version; pub use self::addr::Addr; +pub use self::block::Block; pub use self::blocktxn::BlockTxn; pub use self::compactblock::CompactBlock; pub use self::feefilter::FeeFilter; @@ -45,5 +48,6 @@ pub use self::pong::Pong; pub use self::reject::Reject; pub use self::sendcompact::SendCompact; pub use self::sendheaders::SendHeaders; +pub use self::tx::Tx; pub use self::verack::Verack; pub use self::version::Version; diff --git a/message/src/types/tx.rs b/message/src/types/tx.rs new file mode 100644 index 00000000..6d113dc7 --- /dev/null +++ b/message/src/types/tx.rs @@ -0,0 +1,31 @@ +use ser::{Stream, Reader}; +use chain::Transaction; +use {Payload, MessageResult}; + +#[derive(Debug, PartialEq)] +pub struct Tx { + pub transaction: Transaction, +} + +impl Payload for Tx { + fn version() -> u32 { + 0 + } + + fn command() -> &'static str { + "tx" + } + + fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult where Self: Sized { + let tx = Tx { + transaction: try!(reader.read()), + }; + + Ok(tx) + } + + fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> { + stream.append(&self.transaction); + Ok(()) + } +} diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index ce2c6efe..7f9108bb 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -32,4 +32,4 @@ pub use net::Config as NetConfig; pub use p2p::P2P; pub use event_loop::{event_loop, forever}; pub use util::{PeerId, PeerInfo}; - +pub use protocol::{InboundSyncConnection, InboundSyncConnectionRef, OutboundSyncConnection, OutboundSyncConnectionRef, LocalSyncNode, LocalSyncNodeRef}; diff --git a/p2p/src/p2p.rs b/p2p/src/p2p.rs index e3285558..04311a63 100644 --- a/p2p/src/p2p.rs +++ b/p2p/src/p2p.rs @@ -11,6 +11,7 @@ use protocol::Direction; use net::{connect, listen, Connections, Channel, Config as NetConfig}; use util::{NodeTable, Node}; use {Config, PeerInfo, PeerId}; +use protocol::{LocalSyncNodeRef, InboundSyncConnectionRef, OutboundSyncConnectionRef}; pub type BoxedEmptyFuture = BoxFuture<(), ()>; @@ -24,15 +25,18 @@ pub struct Context { pool: CpuPool, /// Remote event loop handle. remote: Remote, + /// Local synchronization node. + local_sync_node: LocalSyncNodeRef, } impl Context { - pub fn new(pool_handle: CpuPool, remote: Remote) -> Self { + pub fn new(local_sync_node: LocalSyncNodeRef, pool_handle: CpuPool, remote: Remote) -> Self { Context { connections: Default::default(), node_table: Default::default(), pool: pool_handle, remote: remote, + local_sync_node: local_sync_node, } } @@ -192,6 +196,10 @@ impl Context { self.node_table.write().note_failure(&peer_info.address); } } + + pub fn create_sync_session(&self, start_height: i32, outbound_connection: OutboundSyncConnectionRef) -> InboundSyncConnectionRef { + self.local_sync_node.lock().create_sync_session(start_height, outbound_connection) + } } pub struct P2P { @@ -219,14 +227,14 @@ impl Drop for P2P { } impl P2P { - pub fn new(config: Config, handle: Handle) -> Self { + pub fn new(config: Config, local_sync_node: LocalSyncNodeRef, handle: Handle) -> Self { let pool = CpuPool::new(config.threads); P2P { event_loop_handle: handle.clone(), pool: pool.clone(), config: config, - context: Arc::new(Context::new(pool, handle.remote().clone())), + context: Arc::new(Context::new(local_sync_node, pool, handle.remote().clone())), } } diff --git a/p2p/src/protocol/mod.rs b/p2p/src/protocol/mod.rs index 3cc6d008..cba24e56 100644 --- a/p2p/src/protocol/mod.rs +++ b/p2p/src/protocol/mod.rs @@ -8,7 +8,7 @@ use message::common::Command; pub use self::addr::AddrProtocol; pub use self::ping::PingProtocol; -pub use self::sync::SyncProtocol; +pub use self::sync::{SyncProtocol, InboundSyncConnection, InboundSyncConnectionRef, OutboundSyncConnection, OutboundSyncConnectionRef, LocalSyncNode, LocalSyncNodeRef}; #[derive(PartialEq, Clone, Copy)] pub enum Direction { diff --git a/p2p/src/protocol/sync.rs b/p2p/src/protocol/sync.rs index 3a344274..82eda2f4 100644 --- a/p2p/src/protocol/sync.rs +++ b/p2p/src/protocol/sync.rs @@ -1,54 +1,50 @@ -//TODO: remove! -#![allow(dead_code)] -#![allow(unused_variables)] - use std::sync::Arc; use parking_lot::Mutex; -use chain::{Block, Transaction}; use bytes::Bytes; -use message::{Command, Error, Payload, types}; -use protocol::Protocol; +use message::{Command, Error, Payload, types, deserialize_payload}; +use protocol::{Protocol, Direction}; use p2p::Context; use PeerId; pub type InboundSyncConnectionRef = Arc>>; - pub type OutboundSyncConnectionRef = Arc>>; +pub type LocalSyncNodeRef = Arc>>; + // TODO: use this to respond to construct Version message (start_height field) -// TODO: use this to create new inbound sessions pub trait LocalSyncNode : Send + Sync { fn start_height(&self) -> i32; - fn start_sync_session(&mut self, outbound: OutboundSyncConnectionRef) -> InboundSyncConnectionRef; + fn create_sync_session(&mut self, height: i32, outbound: OutboundSyncConnectionRef) -> InboundSyncConnectionRef; } pub trait InboundSyncConnection : Send + Sync { - fn on_iventory(&mut self, message: &types::Inv); - fn on_getdata(&mut self, message: &types::GetData); - fn on_getblocks(&mut self, message: &types::GetBlocks); - fn on_getheaders(&mut self, message: &types::GetHeaders); - fn on_transaction(&mut self, message: &Transaction); - fn on_block(&mut self, message: &Block); - fn on_headers(&mut self, message: &types::Headers); - fn on_mempool(&mut self, message: &types::MemPool); - fn on_filterload(&mut self, message: &types::FilterLoad); - fn on_filteradd(&mut self, message: &types::FilterAdd); - fn on_filterclear(&mut self, message: &types::FilterClear); - fn on_merkleblock(&mut self, message: &types::MerkleBlock); - fn on_sendheaders(&mut self, message: &types::SendHeaders); - fn on_feefilter(&mut self, message: &types::FeeFilter); - fn on_send_compact(&mut self, message: &types::SendCompact); - fn on_compact_block(&mut self, message: &types::CompactBlock); - fn on_get_block_txn(&mut self, message: &types::GetBlockTxn); - fn on_block_txn(&mut self, message: &types::BlockTxn); + fn start_sync_session(&mut self, version: u32); + fn on_inventory(&mut self, message: types::Inv); + fn on_getdata(&mut self, message: types::GetData); + fn on_getblocks(&mut self, message: types::GetBlocks); + fn on_getheaders(&mut self, message: types::GetHeaders); + fn on_transaction(&mut self, message: types::Tx); + fn on_block(&mut self, message: types::Block); + fn on_headers(&mut self, message: types::Headers); + fn on_mempool(&mut self, message: types::MemPool); + fn on_filterload(&mut self, message: types::FilterLoad); + fn on_filteradd(&mut self, message: types::FilterAdd); + fn on_filterclear(&mut self, message: types::FilterClear); + fn on_merkleblock(&mut self, message: types::MerkleBlock); + fn on_sendheaders(&mut self, message: types::SendHeaders); + fn on_feefilter(&mut self, message: types::FeeFilter); + fn on_send_compact(&mut self, message: types::SendCompact); + fn on_compact_block(&mut self, message: types::CompactBlock); + fn on_get_block_txn(&mut self, message: types::GetBlockTxn); + fn on_block_txn(&mut self, message: types::BlockTxn); } pub trait OutboundSyncConnection : Send + Sync { - fn send_iventory(&mut self, message: &types::Inv); + fn send_inventory(&mut self, message: &types::Inv); fn send_getdata(&mut self, message: &types::GetData); fn send_getblocks(&mut self, message: &types::GetBlocks); fn send_getheaders(&mut self, message: &types::GetHeaders); - fn send_transaction(&mut self, message: &Transaction); - fn send_block(&mut self, message: &Block); + fn send_transaction(&mut self, message: &types::Tx); + fn send_block(&mut self, message: &types::Block); fn send_headers(&mut self, message: &types::Headers); fn send_mempool(&mut self, message: &types::MemPool); fn send_filterload(&mut self, message: &types::FilterLoad); @@ -87,7 +83,7 @@ impl OutboundSync { } impl OutboundSyncConnection for OutboundSync { - fn send_iventory(&mut self, message: &types::Inv) { + fn send_inventory(&mut self, message: &types::Inv) { self.send_message(message); } @@ -103,12 +99,12 @@ impl OutboundSyncConnection for OutboundSync { self.send_message(message); } - fn send_transaction(&mut self, message: &Transaction) { - unimplemented!(); + fn send_transaction(&mut self, message: &types::Tx) { + self.send_message(message); } - fn send_block(&mut self, message: &Block) { - unimplemented!(); + fn send_block(&mut self, message: &types::Block) { + self.send_message(message); } fn send_headers(&mut self, message: &types::Headers) { @@ -161,32 +157,98 @@ impl OutboundSyncConnection for OutboundSync { } pub struct SyncProtocol { - //inbound_connection: InboundSyncConnectionRef, - outbound_connection: OutboundSyncConnectionRef, + inbound_connection: InboundSyncConnectionRef, } impl SyncProtocol { pub fn new(context: Arc, peer: PeerId) -> Self { - let outbound_connection = Arc::new(Mutex::new(OutboundSync::new(context, peer).boxed())); - // let inbound_connection = local_sync_node.start_sync_session(outbound_connection); // TODO: create inbound connection using LocalSyncNode::start_sync_session + let outbound_connection = Arc::new(Mutex::new(OutboundSync::new(context.clone(), peer).boxed())); + let inbound_connection = context.create_sync_session(0, outbound_connection); SyncProtocol { - // inbound_connection: inbound_connection, - outbound_connection: outbound_connection, + inbound_connection: inbound_connection, } } } impl Protocol for SyncProtocol { + fn initialize(&mut self, _direction: Direction, version: u32) -> Result<(), Error> { + self.inbound_connection.lock().start_sync_session(version); + Ok(()) + } + fn on_message(&mut self, command: &Command, payload: &Bytes, version: u32) -> Result<(), Error> { - // TODO: pass message to inbound_connection + convert response to ProtocolAction/Error - /* - if command == &Inv::command().into() { - let inventory: Inv = try!(deserialize_payload(payload, version)); - self.inbound_connection.on_iventory(&inventory); - } else { - Ok(ProtocolAction::None) + if command == &types::Inv::command() { + let message: types::Inv = try!(deserialize_payload(payload, version)); + self.inbound_connection.lock().on_inventory(message); + } + else if command == &types::GetData::command() { + let message: types::GetData = try!(deserialize_payload(payload, version)); + self.inbound_connection.lock().on_getdata(message); + } + else if command == &types::GetBlocks::command() { + let message: types::GetBlocks = try!(deserialize_payload(payload, version)); + self.inbound_connection.lock().on_getblocks(message); + } + else if command == &types::GetHeaders::command() { + let message: types::GetHeaders = try!(deserialize_payload(payload, version)); + self.inbound_connection.lock().on_getheaders(message); + } + else if command == &types::Tx::command() { + let message: types::Tx = try!(deserialize_payload(payload, version)); + self.inbound_connection.lock().on_transaction(message); + } + else if command == &types::Block::command() { + let message: types::Block = try!(deserialize_payload(payload, version)); + self.inbound_connection.lock().on_block(message); + } + else if command == &types::MemPool::command() { + let message: types::MemPool = try!(deserialize_payload(payload, version)); + self.inbound_connection.lock().on_mempool(message); + } + else if command == &types::Headers::command() { + let message: types::Headers = try!(deserialize_payload(payload, version)); + self.inbound_connection.lock().on_headers(message); + } + else if command == &types::FilterLoad::command() { + let message: types::FilterLoad = try!(deserialize_payload(payload, version)); + self.inbound_connection.lock().on_filterload(message); + } + else if command == &types::FilterAdd::command() { + let message: types::FilterAdd = try!(deserialize_payload(payload, version)); + self.inbound_connection.lock().on_filteradd(message); + } + else if command == &types::FilterClear::command() { + let message: types::FilterClear = try!(deserialize_payload(payload, version)); + self.inbound_connection.lock().on_filterclear(message); + } + else if command == &types::MerkleBlock::command() { + let message: types::MerkleBlock = try!(deserialize_payload(payload, version)); + self.inbound_connection.lock().on_merkleblock(message); + } + else if command == &types::SendHeaders::command() { + let message: types::SendHeaders = try!(deserialize_payload(payload, version)); + self.inbound_connection.lock().on_sendheaders(message); + } + else if command == &types::FeeFilter::command() { + let message: types::FeeFilter = try!(deserialize_payload(payload, version)); + self.inbound_connection.lock().on_feefilter(message); + } + else if command == &types::SendCompact::command() { + let message: types::SendCompact = try!(deserialize_payload(payload, version)); + self.inbound_connection.lock().on_send_compact(message); + } + else if command == &types::CompactBlock::command() { + let message: types::CompactBlock = try!(deserialize_payload(payload, version)); + self.inbound_connection.lock().on_compact_block(message); + } + else if command == &types::GetBlockTxn::command() { + let message: types::GetBlockTxn = try!(deserialize_payload(payload, version)); + self.inbound_connection.lock().on_get_block_txn(message); + } + else if command == &types::BlockTxn::command() { + let message: types::BlockTxn = try!(deserialize_payload(payload, version)); + self.inbound_connection.lock().on_block_txn(message); } - */ Ok(()) } } diff --git a/pbtc/main.rs b/pbtc/main.rs index 2d2a7c07..9ee45be5 100644 --- a/pbtc/main.rs +++ b/pbtc/main.rs @@ -8,11 +8,14 @@ extern crate keys; extern crate script; extern crate message; extern crate p2p; +extern crate sync; mod config; use std::net::SocketAddr; use p2p::{P2P, event_loop, forever, NetConfig}; +use sync::local_node::LocalNode; +use sync::inbound_connection_factory::InboundConnectionFactory; fn main() { env_logger::init().unwrap(); @@ -47,7 +50,10 @@ fn run() -> Result<(), String> { seeds: cfg.seednode.map_or_else(|| vec![], |x| vec![x]), }; - let p2p = P2P::new(p2p_cfg, el.handle()); + let local_sync_node = LocalNode::new(); + let local_sync_factory = InboundConnectionFactory::with_local_node(local_sync_node.clone()); + + let p2p = P2P::new(p2p_cfg, local_sync_factory, el.handle()); try!(p2p.run().map_err(|_| "Failed to start p2p module")); el.run(forever()).unwrap(); Ok(()) diff --git a/sync/Cargo.toml b/sync/Cargo.toml new file mode 100644 index 00000000..9d666467 --- /dev/null +++ b/sync/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "sync" +version = "0.1.0" +authors = ["Ethcore "] + +[dependencies] +parking_lot = "0.3" +log = "0.3" + +chain = { path = "../chain" } +message = { path = "../message" } +p2p = { path = "../p2p" } +primitives = { path = "../primitives" } diff --git a/sync/src/best_block.rs b/sync/src/best_block.rs new file mode 100644 index 00000000..e3083db0 --- /dev/null +++ b/sync/src/best_block.rs @@ -0,0 +1,7 @@ +use primitives::hash::H256; + +#[derive(Debug, Clone)] +pub struct BestBlock { + pub height: u64, + pub hash: H256, +} diff --git a/sync/src/inbound_connection.rs b/sync/src/inbound_connection.rs new file mode 100644 index 00000000..486e9117 --- /dev/null +++ b/sync/src/inbound_connection.rs @@ -0,0 +1,96 @@ +use parking_lot::Mutex; +use message::types; +use p2p::{InboundSyncConnection, InboundSyncConnectionRef}; +use local_node::LocalNodeRef; + +pub struct InboundConnection { + local_node: LocalNodeRef, + peer_index: usize, +} + +impl InboundConnection { + pub fn new(local_node: LocalNodeRef, peer_index: usize) -> InboundSyncConnectionRef { + InboundSyncConnectionRef::new(Mutex::new(Box::new(InboundConnection { + local_node: local_node, + peer_index: peer_index, + }))) + } +} + +impl InboundSyncConnection for InboundConnection { + fn start_sync_session(&mut self, version: u32) { + self.local_node.lock().start_sync_session(self.peer_index, version); + } + + fn on_inventory(&mut self, message: types::Inv) { + self.local_node.lock().on_peer_inventory(self.peer_index, message); + } + + fn on_getdata(&mut self, message: types::GetData) { + self.local_node.lock().on_peer_getdata(self.peer_index, message); + } + + fn on_getblocks(&mut self, message: types::GetBlocks) { + self.local_node.lock().on_peer_getblocks(self.peer_index, message); + } + + fn on_getheaders(&mut self, message: types::GetHeaders) { + self.local_node.lock().on_peer_getheaders(self.peer_index, message); + } + + fn on_transaction(&mut self, message: types::Tx) { + self.local_node.lock().on_peer_transaction(self.peer_index, message); + } + + fn on_block(&mut self, message: types::Block) { + self.local_node.lock().on_peer_block(self.peer_index, message); + } + + fn on_headers(&mut self, message: types::Headers) { + self.local_node.lock().on_peer_headers(self.peer_index, message); + } + + fn on_mempool(&mut self, message: types::MemPool) { + self.local_node.lock().on_peer_mempool(self.peer_index, message); + } + + fn on_filterload(&mut self, message: types::FilterLoad) { + self.local_node.lock().on_peer_filterload(self.peer_index, message); + } + + fn on_filteradd(&mut self, message: types::FilterAdd) { + self.local_node.lock().on_peer_filteradd(self.peer_index, message); + } + + fn on_filterclear(&mut self, message: types::FilterClear) { + self.local_node.lock().on_peer_filterclear(self.peer_index, message); + } + + fn on_merkleblock(&mut self, message: types::MerkleBlock) { + self.local_node.lock().on_peer_merkleblock(self.peer_index, message); + } + + fn on_sendheaders(&mut self, message: types::SendHeaders) { + self.local_node.lock().on_peer_sendheaders(self.peer_index, message); + } + + fn on_feefilter(&mut self, message: types::FeeFilter) { + self.local_node.lock().on_peer_feefilter(self.peer_index, message); + } + + fn on_send_compact(&mut self, message: types::SendCompact) { + self.local_node.lock().on_peer_send_compact(self.peer_index, message); + } + + fn on_compact_block(&mut self, message: types::CompactBlock) { + self.local_node.lock().on_peer_compact_block(self.peer_index, message); + } + + fn on_get_block_txn(&mut self, message: types::GetBlockTxn) { + self.local_node.lock().on_peer_get_block_txn(self.peer_index, message); + } + + fn on_block_txn(&mut self, message: types::BlockTxn) { + self.local_node.lock().on_peer_block_txn(self.peer_index, message); + } +} diff --git a/sync/src/inbound_connection_factory.rs b/sync/src/inbound_connection_factory.rs new file mode 100644 index 00000000..8c5ce726 --- /dev/null +++ b/sync/src/inbound_connection_factory.rs @@ -0,0 +1,35 @@ +use std::sync::Arc; +use parking_lot::Mutex; +use p2p::{LocalSyncNode, OutboundSyncConnectionRef, InboundSyncConnectionRef}; +use local_node::LocalNodeRef; +use inbound_connection::InboundConnection; + +pub struct InboundConnectionFactory { + local_node: LocalNodeRef, +} + +impl InboundConnectionFactory { + pub fn with_local_node(local_node: LocalNodeRef) -> Arc>> { + Arc::new( + Mutex::new( + Box::new( + InboundConnectionFactory { + local_node: local_node, + } + ) + ) + ) + } +} + +impl LocalSyncNode for InboundConnectionFactory { + fn start_height(&self) -> i32 { + self.local_node.lock().best_block().height as i32 + } + + fn create_sync_session(&mut self, best_block_height: i32, outbound_connection: OutboundSyncConnectionRef) -> InboundSyncConnectionRef { + let peer_index = self.local_node.lock().create_sync_session(best_block_height, outbound_connection); + let inbound_connection = InboundConnection::new(self.local_node.clone(), peer_index); + inbound_connection + } +} \ No newline at end of file diff --git a/sync/src/lib.rs b/sync/src/lib.rs new file mode 100644 index 00000000..c869c142 --- /dev/null +++ b/sync/src/lib.rs @@ -0,0 +1,13 @@ +extern crate chain; +#[macro_use] +extern crate log; +extern crate message; +extern crate p2p; +extern crate parking_lot; +extern crate primitives; + +pub mod best_block; +mod inbound_connection; +pub mod inbound_connection_factory; +mod local_chain; +pub mod local_node; diff --git a/sync/src/local_chain.rs b/sync/src/local_chain.rs new file mode 100644 index 00000000..36b66c98 --- /dev/null +++ b/sync/src/local_chain.rs @@ -0,0 +1,119 @@ +use std::collections::HashMap; +use std::collections::hash_map::Entry; +use chain::Block; +use primitives::hash::H256; +use best_block::BestBlock; + +#[derive(Debug)] +pub struct Info { + pub chain_length: usize, + pub orphan_count: usize, +} + +// TODO: this is temp storage (to use during test stage) +// it must be replaced with db + verification queue + mempools (transaction, block, ...) +pub struct LocalChain { + blocks_order: Vec, + blocks_map: HashMap, + orphan_blocks: HashMap, +} + +impl LocalChain { + pub fn new() -> LocalChain { + let mut chain = LocalChain { + blocks_order: Vec::new(), + blocks_map: HashMap::new(), + orphan_blocks: HashMap::new(), + }; + + // TODO: move this to config + let genesis_block: Block = "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a29ab5f49ffff001d1dac2b7c0101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000".into(); + let genesis_block_hash = genesis_block.hash(); + + chain.blocks_order.push(genesis_block_hash.clone()); + chain.blocks_map.insert(genesis_block_hash, genesis_block); + chain + } + + pub fn info(&self) -> Info { + Info { + chain_length: self.blocks_order.len(), + orphan_count: self.orphan_blocks.len(), + } + } + + pub fn best_block(&self) -> BestBlock { + let height = self.blocks_order.len() - 1; + let ref block = self.blocks_order[height]; + BestBlock { + height: height as u64, + hash: block.clone(), + } + } + + pub fn block_locator_hashes(&self) -> Vec { + let mut index = self.blocks_order.len() - 1; + let mut hashes: Vec = Vec::new(); + let mut step = 1; + loop { + let block_hash = self.blocks_order[index].clone(); + hashes.push(block_hash); + + if hashes.len() >= 10 { + step <<= 1; + } + if index < step { + break; + } + index -= step; + } + + hashes + } + + pub fn is_known_block(&self, hash: &H256) -> bool { + self.blocks_map.contains_key(hash) + } + + pub fn insert_block(&mut self, block: &Block) { + // check if already known block + let block_header_hash = block.block_header.hash(); + if self.blocks_map.contains_key(&block_header_hash) { + return; + } + + // check if parent block is in the storage + // if there is no parent block for this block, remember as orphaned + if !self.blocks_map.contains_key(&block.block_header.previous_header_hash) { + self.orphan_blocks.insert(block.block_header.previous_header_hash.clone(), block.clone()); + return; + } + + // insert block + for i in 0..self.blocks_order.len() { + if self.blocks_order[i] == block.block_header.previous_header_hash { + self.blocks_order.insert(i + 1, block_header_hash.clone()); + self.blocks_map.insert(block_header_hash.clone(), block.clone()); + + // TODO: forks + // check if any orphan blocks now can be moved to the blockchain + let mut position = i + 1; + let mut block_header_hash = block_header_hash; + while let Entry::Occupied(orphan_block_entry) = self.orphan_blocks.entry(block_header_hash.clone()) { + // remove from orphans + let (_, orphan_block) = orphan_block_entry.remove_entry(); + let orphan_block_hash = orphan_block.hash(); + + // insert to blockchain + self.blocks_map.insert(block_header_hash.clone(), orphan_block); + block_header_hash = orphan_block_hash; + + // insert to ordering + self.blocks_order.insert(position + 1, block_header_hash.clone()); + position += 1; + } + return; + } + } + } +} diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs new file mode 100644 index 00000000..4526bf82 --- /dev/null +++ b/sync/src/local_node.rs @@ -0,0 +1,202 @@ +use std::sync::Arc; +use std::collections::HashMap; +use parking_lot::Mutex; +use p2p::OutboundSyncConnectionRef; +use primitives::hash::H256; +use message::Payload; +use message::common::InventoryType; +use message::types; +use local_chain::LocalChain; +use best_block::BestBlock; + +pub type LocalNodeRef = Arc>; + +pub struct LocalNode { + peer_counter: usize, + chain: LocalChain, + peers: HashMap, +} + +struct RemoteNode { + version: u32, + connection: OutboundSyncConnectionRef, + getdata_requests: usize, +} + +impl LocalNode { + pub fn new() -> LocalNodeRef { + Arc::new(Mutex::new(LocalNode { + peer_counter: 0, + chain: LocalChain::new(), + peers: HashMap::new(), + })) + } + + pub fn best_block(&self) -> BestBlock { + self.chain.best_block() + } + + pub fn create_sync_session(&mut self, _best_block_height: i32, outbound_connection: OutboundSyncConnectionRef) -> usize { + trace!(target: "sync", "Creating new sync session with peer#{}", self.peer_counter + 1); + + // save connection for future + self.peer_counter += 1; + self.peers.insert(self.peer_counter, RemoteNode { + version: 0, + connection: outbound_connection.clone(), + getdata_requests: 0, + }); + self.peer_counter + } + + pub fn start_sync_session(&mut self, peer_index: usize, version: u32) { + trace!(target: "sync", "Starting new sync session with peer#{}", peer_index); + + let peer = self.peers.get_mut(&peer_index).unwrap(); + let mut connection = peer.connection.lock(); + let connection = &mut *connection; + peer.version = version; + + // start headers sync + if peer.version >= types::SendHeaders::version() { + // send `sendheaders` message to receive `headers` message instead of `inv` message + trace!(target: "sync", "Sending `sendheaders` to peer#{}", peer_index); + let sendheaders = types::SendHeaders {}; + connection.send_sendheaders(&sendheaders); + // TODO: why we do not support `sendheaders`? + // TODO: why latest bitcoind doesn't responds to the `getheaders` message? + // TODO: `getheaders` can be used only after `sendheaders`? + } + + // get peer' inventory with newest blocks + trace!(target: "sync", "Sending `getblocks` to peer#{}", peer_index); + let getblocks = types::GetBlocks { + version: 0, + block_locator_hashes: self.chain.block_locator_hashes(), + hash_stop: H256::default(), + }; + connection.send_getblocks(&getblocks); + } + + pub fn on_peer_inventory(&mut self, peer_index: usize, message: types::Inv) { + trace!(target: "sync", "Got `inventory` message from peer#{}. Inventory len: {}", peer_index, message.inventory.len()); + + // TODO: after each `getblocks` message bitcoind responds with two `inventory` messages: + // (1) with single entry + // (2) with 500 entries + // what if (1)? + + let mut getdata = types::GetData { + inventory: Vec::new(), + }; + for item in message.inventory.iter() { + match InventoryType::from_u32(item.inv_type) { + Some(InventoryType::MessageBlock) => { + if !self.chain.is_known_block(&item.hash) { + getdata.inventory.push(item.clone()); + } + }, + _ => (), + } + } + + // request unknown inventory data + if !getdata.inventory.is_empty() { + trace!(target: "sync", "Sending `getdata` message to peer#{}. Querying #{} unknown blocks", peer_index, getdata.inventory.len()); + let peer = self.peers.get_mut(&peer_index).unwrap(); + peer.getdata_requests += getdata.inventory.len(); + + let mut connection = peer.connection.lock(); + let connection = &mut *connection; + connection.send_getdata(&getdata); + } + } + + pub fn on_peer_getdata(&mut self, peer_index: usize, _message: types::GetData) { + trace!(target: "sync", "Got `getdata` message from peer#{}", peer_index); + } + + pub fn on_peer_getblocks(&mut self, peer_index: usize, _message: types::GetBlocks) { + trace!(target: "sync", "Got `getblocks` message from peer#{}", peer_index); + } + + pub fn on_peer_getheaders(&mut self, peer_index: usize, _message: types::GetHeaders) { + trace!(target: "sync", "Got `getheaders` message from peer#{}", peer_index); + } + + pub fn on_peer_transaction(&mut self, peer_index: usize, _message: types::Tx) { + trace!(target: "sync", "Got `tx` message from peer#{}", peer_index); + } + + pub fn on_peer_block(&mut self, peer_index: usize, message: types::Block) { + // insert block to the chain + self.chain.insert_block(&message.block); + + // decrease pending requests count + let peer = self.peers.get_mut(&peer_index).unwrap(); + peer.getdata_requests -= 1; + trace!(target: "sync", "Got `block` message from peer#{}. Pending #{} requests", peer_index, peer.getdata_requests); + + // if there are no pending requests, continue with next blocks chunk + if peer.getdata_requests == 0 { + trace!(target: "sync", "Sending `getblocks` to peer#{}. Local chain state: {:?}", peer_index, self.chain.info()); + let getblocks = types::GetBlocks { + version: 0, + block_locator_hashes: self.chain.block_locator_hashes(), + hash_stop: H256::default(), + }; + + let mut connection = peer.connection.lock(); + let connection = &mut *connection; + connection.send_getblocks(&getblocks); + } + } + + pub fn on_peer_headers(&mut self, peer_index: usize, _message: types::Headers) { + trace!(target: "sync", "Got `headers` message from peer#{}", peer_index); + } + + pub fn on_peer_mempool(&mut self, peer_index: usize, _message: types::MemPool) { + trace!(target: "sync", "Got `mempool` message from peer#{}", peer_index); + } + + pub fn on_peer_filterload(&mut self, peer_index: usize, _message: types::FilterLoad) { + trace!(target: "sync", "Got `filterload` message from peer#{}", peer_index); + } + + pub fn on_peer_filteradd(&mut self, peer_index: usize, _message: types::FilterAdd) { + trace!(target: "sync", "Got `filteradd` message from peer#{}", peer_index); + } + + pub fn on_peer_filterclear(&mut self, peer_index: usize, _message: types::FilterClear) { + trace!(target: "sync", "Got `filterclear` message from peer#{}", peer_index); + } + + pub fn on_peer_merkleblock(&mut self, peer_index: usize, _message: types::MerkleBlock) { + trace!(target: "sync", "Got `merkleblock` message from peer#{}", peer_index); + } + + pub fn on_peer_sendheaders(&mut self, peer_index: usize, _message: types::SendHeaders) { + trace!(target: "sync", "Got `sendheaders` message from peer#{}", peer_index); + } + + pub fn on_peer_feefilter(&mut self, peer_index: usize, _message: types::FeeFilter) { + trace!(target: "sync", "Got `feefilter` message from peer#{}", peer_index); + } + + pub fn on_peer_send_compact(&mut self, peer_index: usize, _message: types::SendCompact) { + trace!(target: "sync", "Got `sendcmpct` message from peer#{}", peer_index); + } + + pub fn on_peer_compact_block(&mut self, peer_index: usize, _message: types::CompactBlock) { + trace!(target: "sync", "Got `cmpctblock` message from peer#{}", peer_index); + } + + pub fn on_peer_get_block_txn(&mut self, peer_index: usize, _message: types::GetBlockTxn) { + trace!(target: "sync", "Got `getblocktxn` message from peer#{}", peer_index); + } + + pub fn on_peer_block_txn(&mut self, peer_index: usize, _message: types::BlockTxn) { + trace!(target: "sync", "Got `blocktxn` message from peer#{}", peer_index); + } +}