Continue synchronization protocol impl (#30)

* p2p <-> sync interfaces proposal

* updated with example

* send errors will be handled in p2p module => no need to return to the sync

* poc of outbound sync connection

* simplified send_to_peer

* context has cpu pool and enent loop handles

* on_message won't return ProtocolAction anymore

* session initialized sync protocol, remove retain cycles on P2P::drop

* removed ProtocolAction

* uncommented ping protocol

* node_table sorts nodes also by recently used time

* started work on sync package

* send getaddr on connect

* fixed node_table insert, added insert_many

* addr protocol

* sync headers response

* continue sync

* continue sync protocol

* separated sync connection start from sync connection creation

* switched to getblocks (instead of getheaders)

* starting to receive blocks in sync

* deal with orphaned blocks in sync::local_chain

* continue sync

* fix warnings

* removed TODOs

* protocol::sync is back to private state

* fixed grumbes
This commit is contained in:
Svyatoslav Nikolsky 2016-10-24 17:38:33 +03:00 committed by Marek Kotewicz
parent 1c7bfc9a2e
commit b081ca3d70
20 changed files with 712 additions and 65 deletions

13
Cargo.lock generated
View File

@ -10,6 +10,7 @@ dependencies = [
"miner 0.1.0",
"p2p 0.1.0",
"script 0.1.0",
"sync 0.1.0",
"verification 0.1.0",
]
@ -521,6 +522,18 @@ name = "strsim"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "sync"
version = "0.1.0"
dependencies = [
"chain 0.1.0",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"message 0.1.0",
"p2p 0.1.0",
"parking_lot 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"primitives 0.1.0",
]
[[package]]
name = "term_size"
version = "0.2.1"

View File

@ -15,6 +15,7 @@ p2p = { path = "p2p" }
script = { path = "script" }
db = { path = "db" }
verification = { path = "verification" }
sync = { path = "sync" }
[[bin]]
path = "pbtc/main.rs"

View File

@ -1,17 +1,16 @@
use hex::FromHex;
use crypto::dhash256;
use hash::H256;
use ser::{
Deserializable, Reader, Error as ReaderError, deserialize,
Serializable, Stream, serialize
Serializable, Stream
};
use merkle_root::merkle_root;
use {BlockHeader, Transaction};
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct Block {
block_header: BlockHeader,
transactions: Vec<Transaction>,
pub block_header: BlockHeader,
pub transactions: Vec<Transaction>,
}
impl Serializable for Block {
@ -45,7 +44,7 @@ impl Block {
}
pub fn hash(&self) -> H256 {
dhash256(&serialize(&self.block_header))
self.block_header.hash()
}
/// Returns block's merkle root.

View File

@ -1,11 +1,12 @@
use std::fmt;
use ser::{
Deserializable, Reader, Error as ReaderError,
Serializable, Stream
Serializable, Stream, serialize
};
use crypto::dhash256;
use hash::H256;
#[derive(PartialEq)]
#[derive(PartialEq, Clone)]
pub struct BlockHeader {
pub version: u32,
pub previous_header_hash: H256,
@ -15,6 +16,12 @@ pub struct BlockHeader {
pub nonce: u32,
}
impl BlockHeader {
pub fn hash(&self) -> H256 {
dhash256(&serialize(self))
}
}
impl fmt::Debug for BlockHeader {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BlockHeader")

View File

@ -30,7 +30,7 @@ impl From<InventoryType> for u32 {
}
}
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct InventoryVector {
pub inv_type: u32,
pub hash: H256,

View File

@ -0,0 +1,31 @@
use ser::{Stream, Reader};
use chain::Block as ChainBlock;
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub struct Block {
pub block: ChainBlock,
}
impl Payload for Block {
fn version() -> u32 {
0
}
fn command() -> &'static str {
"block"
}
fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
let tx = Block {
block: try!(reader.read()),
};
Ok(tx)
}
fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> {
stream.append(&self.block);
Ok(())
}
}

View File

@ -1,4 +1,5 @@
pub mod addr;
mod block;
mod blocktxn;
mod compactblock;
mod feefilter;
@ -20,10 +21,12 @@ mod pong;
pub mod reject;
mod sendcompact;
mod sendheaders;
mod tx;
mod verack;
pub mod version;
pub use self::addr::Addr;
pub use self::block::Block;
pub use self::blocktxn::BlockTxn;
pub use self::compactblock::CompactBlock;
pub use self::feefilter::FeeFilter;
@ -45,5 +48,6 @@ pub use self::pong::Pong;
pub use self::reject::Reject;
pub use self::sendcompact::SendCompact;
pub use self::sendheaders::SendHeaders;
pub use self::tx::Tx;
pub use self::verack::Verack;
pub use self::version::Version;

31
message/src/types/tx.rs Normal file
View File

@ -0,0 +1,31 @@
use ser::{Stream, Reader};
use chain::Transaction;
use {Payload, MessageResult};
#[derive(Debug, PartialEq)]
pub struct Tx {
pub transaction: Transaction,
}
impl Payload for Tx {
fn version() -> u32 {
0
}
fn command() -> &'static str {
"tx"
}
fn deserialize_payload(reader: &mut Reader, _version: u32) -> MessageResult<Self> where Self: Sized {
let tx = Tx {
transaction: try!(reader.read()),
};
Ok(tx)
}
fn serialize_payload(&self, stream: &mut Stream, _version: u32) -> MessageResult<()> {
stream.append(&self.transaction);
Ok(())
}
}

View File

@ -32,4 +32,4 @@ pub use net::Config as NetConfig;
pub use p2p::P2P;
pub use event_loop::{event_loop, forever};
pub use util::{PeerId, PeerInfo};
pub use protocol::{InboundSyncConnection, InboundSyncConnectionRef, OutboundSyncConnection, OutboundSyncConnectionRef, LocalSyncNode, LocalSyncNodeRef};

View File

@ -11,6 +11,7 @@ use protocol::Direction;
use net::{connect, listen, Connections, Channel, Config as NetConfig};
use util::{NodeTable, Node};
use {Config, PeerInfo, PeerId};
use protocol::{LocalSyncNodeRef, InboundSyncConnectionRef, OutboundSyncConnectionRef};
pub type BoxedEmptyFuture = BoxFuture<(), ()>;
@ -24,15 +25,18 @@ pub struct Context {
pool: CpuPool,
/// Remote event loop handle.
remote: Remote,
/// Local synchronization node.
local_sync_node: LocalSyncNodeRef,
}
impl Context {
pub fn new(pool_handle: CpuPool, remote: Remote) -> Self {
pub fn new(local_sync_node: LocalSyncNodeRef, pool_handle: CpuPool, remote: Remote) -> Self {
Context {
connections: Default::default(),
node_table: Default::default(),
pool: pool_handle,
remote: remote,
local_sync_node: local_sync_node,
}
}
@ -192,6 +196,10 @@ impl Context {
self.node_table.write().note_failure(&peer_info.address);
}
}
pub fn create_sync_session(&self, start_height: i32, outbound_connection: OutboundSyncConnectionRef) -> InboundSyncConnectionRef {
self.local_sync_node.lock().create_sync_session(start_height, outbound_connection)
}
}
pub struct P2P {
@ -219,14 +227,14 @@ impl Drop for P2P {
}
impl P2P {
pub fn new(config: Config, handle: Handle) -> Self {
pub fn new(config: Config, local_sync_node: LocalSyncNodeRef, handle: Handle) -> Self {
let pool = CpuPool::new(config.threads);
P2P {
event_loop_handle: handle.clone(),
pool: pool.clone(),
config: config,
context: Arc::new(Context::new(pool, handle.remote().clone())),
context: Arc::new(Context::new(local_sync_node, pool, handle.remote().clone())),
}
}

View File

@ -8,7 +8,7 @@ use message::common::Command;
pub use self::addr::AddrProtocol;
pub use self::ping::PingProtocol;
pub use self::sync::SyncProtocol;
pub use self::sync::{SyncProtocol, InboundSyncConnection, InboundSyncConnectionRef, OutboundSyncConnection, OutboundSyncConnectionRef, LocalSyncNode, LocalSyncNodeRef};
#[derive(PartialEq, Clone, Copy)]
pub enum Direction {

View File

@ -1,54 +1,50 @@
//TODO: remove!
#![allow(dead_code)]
#![allow(unused_variables)]
use std::sync::Arc;
use parking_lot::Mutex;
use chain::{Block, Transaction};
use bytes::Bytes;
use message::{Command, Error, Payload, types};
use protocol::Protocol;
use message::{Command, Error, Payload, types, deserialize_payload};
use protocol::{Protocol, Direction};
use p2p::Context;
use PeerId;
pub type InboundSyncConnectionRef = Arc<Mutex<Box<InboundSyncConnection>>>;
pub type OutboundSyncConnectionRef = Arc<Mutex<Box<OutboundSyncConnection>>>;
pub type LocalSyncNodeRef = Arc<Mutex<Box<LocalSyncNode>>>;
// TODO: use this to respond to construct Version message (start_height field)
// TODO: use this to create new inbound sessions
pub trait LocalSyncNode : Send + Sync {
fn start_height(&self) -> i32;
fn start_sync_session(&mut self, outbound: OutboundSyncConnectionRef) -> InboundSyncConnectionRef;
fn create_sync_session(&mut self, height: i32, outbound: OutboundSyncConnectionRef) -> InboundSyncConnectionRef;
}
pub trait InboundSyncConnection : Send + Sync {
fn on_iventory(&mut self, message: &types::Inv);
fn on_getdata(&mut self, message: &types::GetData);
fn on_getblocks(&mut self, message: &types::GetBlocks);
fn on_getheaders(&mut self, message: &types::GetHeaders);
fn on_transaction(&mut self, message: &Transaction);
fn on_block(&mut self, message: &Block);
fn on_headers(&mut self, message: &types::Headers);
fn on_mempool(&mut self, message: &types::MemPool);
fn on_filterload(&mut self, message: &types::FilterLoad);
fn on_filteradd(&mut self, message: &types::FilterAdd);
fn on_filterclear(&mut self, message: &types::FilterClear);
fn on_merkleblock(&mut self, message: &types::MerkleBlock);
fn on_sendheaders(&mut self, message: &types::SendHeaders);
fn on_feefilter(&mut self, message: &types::FeeFilter);
fn on_send_compact(&mut self, message: &types::SendCompact);
fn on_compact_block(&mut self, message: &types::CompactBlock);
fn on_get_block_txn(&mut self, message: &types::GetBlockTxn);
fn on_block_txn(&mut self, message: &types::BlockTxn);
fn start_sync_session(&mut self, version: u32);
fn on_inventory(&mut self, message: types::Inv);
fn on_getdata(&mut self, message: types::GetData);
fn on_getblocks(&mut self, message: types::GetBlocks);
fn on_getheaders(&mut self, message: types::GetHeaders);
fn on_transaction(&mut self, message: types::Tx);
fn on_block(&mut self, message: types::Block);
fn on_headers(&mut self, message: types::Headers);
fn on_mempool(&mut self, message: types::MemPool);
fn on_filterload(&mut self, message: types::FilterLoad);
fn on_filteradd(&mut self, message: types::FilterAdd);
fn on_filterclear(&mut self, message: types::FilterClear);
fn on_merkleblock(&mut self, message: types::MerkleBlock);
fn on_sendheaders(&mut self, message: types::SendHeaders);
fn on_feefilter(&mut self, message: types::FeeFilter);
fn on_send_compact(&mut self, message: types::SendCompact);
fn on_compact_block(&mut self, message: types::CompactBlock);
fn on_get_block_txn(&mut self, message: types::GetBlockTxn);
fn on_block_txn(&mut self, message: types::BlockTxn);
}
pub trait OutboundSyncConnection : Send + Sync {
fn send_iventory(&mut self, message: &types::Inv);
fn send_inventory(&mut self, message: &types::Inv);
fn send_getdata(&mut self, message: &types::GetData);
fn send_getblocks(&mut self, message: &types::GetBlocks);
fn send_getheaders(&mut self, message: &types::GetHeaders);
fn send_transaction(&mut self, message: &Transaction);
fn send_block(&mut self, message: &Block);
fn send_transaction(&mut self, message: &types::Tx);
fn send_block(&mut self, message: &types::Block);
fn send_headers(&mut self, message: &types::Headers);
fn send_mempool(&mut self, message: &types::MemPool);
fn send_filterload(&mut self, message: &types::FilterLoad);
@ -87,7 +83,7 @@ impl OutboundSync {
}
impl OutboundSyncConnection for OutboundSync {
fn send_iventory(&mut self, message: &types::Inv) {
fn send_inventory(&mut self, message: &types::Inv) {
self.send_message(message);
}
@ -103,12 +99,12 @@ impl OutboundSyncConnection for OutboundSync {
self.send_message(message);
}
fn send_transaction(&mut self, message: &Transaction) {
unimplemented!();
fn send_transaction(&mut self, message: &types::Tx) {
self.send_message(message);
}
fn send_block(&mut self, message: &Block) {
unimplemented!();
fn send_block(&mut self, message: &types::Block) {
self.send_message(message);
}
fn send_headers(&mut self, message: &types::Headers) {
@ -161,32 +157,98 @@ impl OutboundSyncConnection for OutboundSync {
}
pub struct SyncProtocol {
//inbound_connection: InboundSyncConnectionRef,
outbound_connection: OutboundSyncConnectionRef,
inbound_connection: InboundSyncConnectionRef,
}
impl SyncProtocol {
pub fn new(context: Arc<Context>, peer: PeerId) -> Self {
let outbound_connection = Arc::new(Mutex::new(OutboundSync::new(context, peer).boxed()));
// let inbound_connection = local_sync_node.start_sync_session(outbound_connection); // TODO: create inbound connection using LocalSyncNode::start_sync_session
let outbound_connection = Arc::new(Mutex::new(OutboundSync::new(context.clone(), peer).boxed()));
let inbound_connection = context.create_sync_session(0, outbound_connection);
SyncProtocol {
// inbound_connection: inbound_connection,
outbound_connection: outbound_connection,
inbound_connection: inbound_connection,
}
}
}
impl Protocol for SyncProtocol {
fn initialize(&mut self, _direction: Direction, version: u32) -> Result<(), Error> {
self.inbound_connection.lock().start_sync_session(version);
Ok(())
}
fn on_message(&mut self, command: &Command, payload: &Bytes, version: u32) -> Result<(), Error> {
// TODO: pass message to inbound_connection + convert response to ProtocolAction/Error
/*
if command == &Inv::command().into() {
let inventory: Inv = try!(deserialize_payload(payload, version));
self.inbound_connection.on_iventory(&inventory);
} else {
Ok(ProtocolAction::None)
if command == &types::Inv::command() {
let message: types::Inv = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_inventory(message);
}
else if command == &types::GetData::command() {
let message: types::GetData = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_getdata(message);
}
else if command == &types::GetBlocks::command() {
let message: types::GetBlocks = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_getblocks(message);
}
else if command == &types::GetHeaders::command() {
let message: types::GetHeaders = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_getheaders(message);
}
else if command == &types::Tx::command() {
let message: types::Tx = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_transaction(message);
}
else if command == &types::Block::command() {
let message: types::Block = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_block(message);
}
else if command == &types::MemPool::command() {
let message: types::MemPool = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_mempool(message);
}
else if command == &types::Headers::command() {
let message: types::Headers = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_headers(message);
}
else if command == &types::FilterLoad::command() {
let message: types::FilterLoad = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_filterload(message);
}
else if command == &types::FilterAdd::command() {
let message: types::FilterAdd = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_filteradd(message);
}
else if command == &types::FilterClear::command() {
let message: types::FilterClear = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_filterclear(message);
}
else if command == &types::MerkleBlock::command() {
let message: types::MerkleBlock = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_merkleblock(message);
}
else if command == &types::SendHeaders::command() {
let message: types::SendHeaders = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_sendheaders(message);
}
else if command == &types::FeeFilter::command() {
let message: types::FeeFilter = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_feefilter(message);
}
else if command == &types::SendCompact::command() {
let message: types::SendCompact = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_send_compact(message);
}
else if command == &types::CompactBlock::command() {
let message: types::CompactBlock = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_compact_block(message);
}
else if command == &types::GetBlockTxn::command() {
let message: types::GetBlockTxn = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_get_block_txn(message);
}
else if command == &types::BlockTxn::command() {
let message: types::BlockTxn = try!(deserialize_payload(payload, version));
self.inbound_connection.lock().on_block_txn(message);
}
*/
Ok(())
}
}

View File

@ -8,11 +8,14 @@ extern crate keys;
extern crate script;
extern crate message;
extern crate p2p;
extern crate sync;
mod config;
use std::net::SocketAddr;
use p2p::{P2P, event_loop, forever, NetConfig};
use sync::local_node::LocalNode;
use sync::inbound_connection_factory::InboundConnectionFactory;
fn main() {
env_logger::init().unwrap();
@ -47,7 +50,10 @@ fn run() -> Result<(), String> {
seeds: cfg.seednode.map_or_else(|| vec![], |x| vec![x]),
};
let p2p = P2P::new(p2p_cfg, el.handle());
let local_sync_node = LocalNode::new();
let local_sync_factory = InboundConnectionFactory::with_local_node(local_sync_node.clone());
let p2p = P2P::new(p2p_cfg, local_sync_factory, el.handle());
try!(p2p.run().map_err(|_| "Failed to start p2p module"));
el.run(forever()).unwrap();
Ok(())

13
sync/Cargo.toml Normal file
View File

@ -0,0 +1,13 @@
[package]
name = "sync"
version = "0.1.0"
authors = ["Ethcore <admin@ethcore.io>"]
[dependencies]
parking_lot = "0.3"
log = "0.3"
chain = { path = "../chain" }
message = { path = "../message" }
p2p = { path = "../p2p" }
primitives = { path = "../primitives" }

7
sync/src/best_block.rs Normal file
View File

@ -0,0 +1,7 @@
use primitives::hash::H256;
#[derive(Debug, Clone)]
pub struct BestBlock {
pub height: u64,
pub hash: H256,
}

View File

@ -0,0 +1,96 @@
use parking_lot::Mutex;
use message::types;
use p2p::{InboundSyncConnection, InboundSyncConnectionRef};
use local_node::LocalNodeRef;
pub struct InboundConnection {
local_node: LocalNodeRef,
peer_index: usize,
}
impl InboundConnection {
pub fn new(local_node: LocalNodeRef, peer_index: usize) -> InboundSyncConnectionRef {
InboundSyncConnectionRef::new(Mutex::new(Box::new(InboundConnection {
local_node: local_node,
peer_index: peer_index,
})))
}
}
impl InboundSyncConnection for InboundConnection {
fn start_sync_session(&mut self, version: u32) {
self.local_node.lock().start_sync_session(self.peer_index, version);
}
fn on_inventory(&mut self, message: types::Inv) {
self.local_node.lock().on_peer_inventory(self.peer_index, message);
}
fn on_getdata(&mut self, message: types::GetData) {
self.local_node.lock().on_peer_getdata(self.peer_index, message);
}
fn on_getblocks(&mut self, message: types::GetBlocks) {
self.local_node.lock().on_peer_getblocks(self.peer_index, message);
}
fn on_getheaders(&mut self, message: types::GetHeaders) {
self.local_node.lock().on_peer_getheaders(self.peer_index, message);
}
fn on_transaction(&mut self, message: types::Tx) {
self.local_node.lock().on_peer_transaction(self.peer_index, message);
}
fn on_block(&mut self, message: types::Block) {
self.local_node.lock().on_peer_block(self.peer_index, message);
}
fn on_headers(&mut self, message: types::Headers) {
self.local_node.lock().on_peer_headers(self.peer_index, message);
}
fn on_mempool(&mut self, message: types::MemPool) {
self.local_node.lock().on_peer_mempool(self.peer_index, message);
}
fn on_filterload(&mut self, message: types::FilterLoad) {
self.local_node.lock().on_peer_filterload(self.peer_index, message);
}
fn on_filteradd(&mut self, message: types::FilterAdd) {
self.local_node.lock().on_peer_filteradd(self.peer_index, message);
}
fn on_filterclear(&mut self, message: types::FilterClear) {
self.local_node.lock().on_peer_filterclear(self.peer_index, message);
}
fn on_merkleblock(&mut self, message: types::MerkleBlock) {
self.local_node.lock().on_peer_merkleblock(self.peer_index, message);
}
fn on_sendheaders(&mut self, message: types::SendHeaders) {
self.local_node.lock().on_peer_sendheaders(self.peer_index, message);
}
fn on_feefilter(&mut self, message: types::FeeFilter) {
self.local_node.lock().on_peer_feefilter(self.peer_index, message);
}
fn on_send_compact(&mut self, message: types::SendCompact) {
self.local_node.lock().on_peer_send_compact(self.peer_index, message);
}
fn on_compact_block(&mut self, message: types::CompactBlock) {
self.local_node.lock().on_peer_compact_block(self.peer_index, message);
}
fn on_get_block_txn(&mut self, message: types::GetBlockTxn) {
self.local_node.lock().on_peer_get_block_txn(self.peer_index, message);
}
fn on_block_txn(&mut self, message: types::BlockTxn) {
self.local_node.lock().on_peer_block_txn(self.peer_index, message);
}
}

View File

@ -0,0 +1,35 @@
use std::sync::Arc;
use parking_lot::Mutex;
use p2p::{LocalSyncNode, OutboundSyncConnectionRef, InboundSyncConnectionRef};
use local_node::LocalNodeRef;
use inbound_connection::InboundConnection;
pub struct InboundConnectionFactory {
local_node: LocalNodeRef,
}
impl InboundConnectionFactory {
pub fn with_local_node(local_node: LocalNodeRef) -> Arc<Mutex<Box<LocalSyncNode>>> {
Arc::new(
Mutex::new(
Box::new(
InboundConnectionFactory {
local_node: local_node,
}
)
)
)
}
}
impl LocalSyncNode for InboundConnectionFactory {
fn start_height(&self) -> i32 {
self.local_node.lock().best_block().height as i32
}
fn create_sync_session(&mut self, best_block_height: i32, outbound_connection: OutboundSyncConnectionRef) -> InboundSyncConnectionRef {
let peer_index = self.local_node.lock().create_sync_session(best_block_height, outbound_connection);
let inbound_connection = InboundConnection::new(self.local_node.clone(), peer_index);
inbound_connection
}
}

13
sync/src/lib.rs Normal file
View File

@ -0,0 +1,13 @@
extern crate chain;
#[macro_use]
extern crate log;
extern crate message;
extern crate p2p;
extern crate parking_lot;
extern crate primitives;
pub mod best_block;
mod inbound_connection;
pub mod inbound_connection_factory;
mod local_chain;
pub mod local_node;

119
sync/src/local_chain.rs Normal file
View File

@ -0,0 +1,119 @@
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use chain::Block;
use primitives::hash::H256;
use best_block::BestBlock;
#[derive(Debug)]
pub struct Info {
pub chain_length: usize,
pub orphan_count: usize,
}
// TODO: this is temp storage (to use during test stage)
// it must be replaced with db + verification queue + mempools (transaction, block, ...)
pub struct LocalChain {
blocks_order: Vec<H256>,
blocks_map: HashMap<H256, Block>,
orphan_blocks: HashMap<H256, Block>,
}
impl LocalChain {
pub fn new() -> LocalChain {
let mut chain = LocalChain {
blocks_order: Vec::new(),
blocks_map: HashMap::new(),
orphan_blocks: HashMap::new(),
};
// TODO: move this to config
let genesis_block: Block = "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a29ab5f49ffff001d1dac2b7c0101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000".into();
let genesis_block_hash = genesis_block.hash();
chain.blocks_order.push(genesis_block_hash.clone());
chain.blocks_map.insert(genesis_block_hash, genesis_block);
chain
}
pub fn info(&self) -> Info {
Info {
chain_length: self.blocks_order.len(),
orphan_count: self.orphan_blocks.len(),
}
}
pub fn best_block(&self) -> BestBlock {
let height = self.blocks_order.len() - 1;
let ref block = self.blocks_order[height];
BestBlock {
height: height as u64,
hash: block.clone(),
}
}
pub fn block_locator_hashes(&self) -> Vec<H256> {
let mut index = self.blocks_order.len() - 1;
let mut hashes: Vec<H256> = Vec::new();
let mut step = 1;
loop {
let block_hash = self.blocks_order[index].clone();
hashes.push(block_hash);
if hashes.len() >= 10 {
step <<= 1;
}
if index < step {
break;
}
index -= step;
}
hashes
}
pub fn is_known_block(&self, hash: &H256) -> bool {
self.blocks_map.contains_key(hash)
}
pub fn insert_block(&mut self, block: &Block) {
// check if already known block
let block_header_hash = block.block_header.hash();
if self.blocks_map.contains_key(&block_header_hash) {
return;
}
// check if parent block is in the storage
// if there is no parent block for this block, remember as orphaned
if !self.blocks_map.contains_key(&block.block_header.previous_header_hash) {
self.orphan_blocks.insert(block.block_header.previous_header_hash.clone(), block.clone());
return;
}
// insert block
for i in 0..self.blocks_order.len() {
if self.blocks_order[i] == block.block_header.previous_header_hash {
self.blocks_order.insert(i + 1, block_header_hash.clone());
self.blocks_map.insert(block_header_hash.clone(), block.clone());
// TODO: forks
// check if any orphan blocks now can be moved to the blockchain
let mut position = i + 1;
let mut block_header_hash = block_header_hash;
while let Entry::Occupied(orphan_block_entry) = self.orphan_blocks.entry(block_header_hash.clone()) {
// remove from orphans
let (_, orphan_block) = orphan_block_entry.remove_entry();
let orphan_block_hash = orphan_block.hash();
// insert to blockchain
self.blocks_map.insert(block_header_hash.clone(), orphan_block);
block_header_hash = orphan_block_hash;
// insert to ordering
self.blocks_order.insert(position + 1, block_header_hash.clone());
position += 1;
}
return;
}
}
}
}

202
sync/src/local_node.rs Normal file
View File

@ -0,0 +1,202 @@
use std::sync::Arc;
use std::collections::HashMap;
use parking_lot::Mutex;
use p2p::OutboundSyncConnectionRef;
use primitives::hash::H256;
use message::Payload;
use message::common::InventoryType;
use message::types;
use local_chain::LocalChain;
use best_block::BestBlock;
pub type LocalNodeRef = Arc<Mutex<LocalNode>>;
pub struct LocalNode {
peer_counter: usize,
chain: LocalChain,
peers: HashMap<usize, RemoteNode>,
}
struct RemoteNode {
version: u32,
connection: OutboundSyncConnectionRef,
getdata_requests: usize,
}
impl LocalNode {
pub fn new() -> LocalNodeRef {
Arc::new(Mutex::new(LocalNode {
peer_counter: 0,
chain: LocalChain::new(),
peers: HashMap::new(),
}))
}
pub fn best_block(&self) -> BestBlock {
self.chain.best_block()
}
pub fn create_sync_session(&mut self, _best_block_height: i32, outbound_connection: OutboundSyncConnectionRef) -> usize {
trace!(target: "sync", "Creating new sync session with peer#{}", self.peer_counter + 1);
// save connection for future
self.peer_counter += 1;
self.peers.insert(self.peer_counter, RemoteNode {
version: 0,
connection: outbound_connection.clone(),
getdata_requests: 0,
});
self.peer_counter
}
pub fn start_sync_session(&mut self, peer_index: usize, version: u32) {
trace!(target: "sync", "Starting new sync session with peer#{}", peer_index);
let peer = self.peers.get_mut(&peer_index).unwrap();
let mut connection = peer.connection.lock();
let connection = &mut *connection;
peer.version = version;
// start headers sync
if peer.version >= types::SendHeaders::version() {
// send `sendheaders` message to receive `headers` message instead of `inv` message
trace!(target: "sync", "Sending `sendheaders` to peer#{}", peer_index);
let sendheaders = types::SendHeaders {};
connection.send_sendheaders(&sendheaders);
// TODO: why we do not support `sendheaders`?
// TODO: why latest bitcoind doesn't responds to the `getheaders` message?
// TODO: `getheaders` can be used only after `sendheaders`?
}
// get peer' inventory with newest blocks
trace!(target: "sync", "Sending `getblocks` to peer#{}", peer_index);
let getblocks = types::GetBlocks {
version: 0,
block_locator_hashes: self.chain.block_locator_hashes(),
hash_stop: H256::default(),
};
connection.send_getblocks(&getblocks);
}
pub fn on_peer_inventory(&mut self, peer_index: usize, message: types::Inv) {
trace!(target: "sync", "Got `inventory` message from peer#{}. Inventory len: {}", peer_index, message.inventory.len());
// TODO: after each `getblocks` message bitcoind responds with two `inventory` messages:
// (1) with single entry
// (2) with 500 entries
// what if (1)?
let mut getdata = types::GetData {
inventory: Vec::new(),
};
for item in message.inventory.iter() {
match InventoryType::from_u32(item.inv_type) {
Some(InventoryType::MessageBlock) => {
if !self.chain.is_known_block(&item.hash) {
getdata.inventory.push(item.clone());
}
},
_ => (),
}
}
// request unknown inventory data
if !getdata.inventory.is_empty() {
trace!(target: "sync", "Sending `getdata` message to peer#{}. Querying #{} unknown blocks", peer_index, getdata.inventory.len());
let peer = self.peers.get_mut(&peer_index).unwrap();
peer.getdata_requests += getdata.inventory.len();
let mut connection = peer.connection.lock();
let connection = &mut *connection;
connection.send_getdata(&getdata);
}
}
pub fn on_peer_getdata(&mut self, peer_index: usize, _message: types::GetData) {
trace!(target: "sync", "Got `getdata` message from peer#{}", peer_index);
}
pub fn on_peer_getblocks(&mut self, peer_index: usize, _message: types::GetBlocks) {
trace!(target: "sync", "Got `getblocks` message from peer#{}", peer_index);
}
pub fn on_peer_getheaders(&mut self, peer_index: usize, _message: types::GetHeaders) {
trace!(target: "sync", "Got `getheaders` message from peer#{}", peer_index);
}
pub fn on_peer_transaction(&mut self, peer_index: usize, _message: types::Tx) {
trace!(target: "sync", "Got `tx` message from peer#{}", peer_index);
}
pub fn on_peer_block(&mut self, peer_index: usize, message: types::Block) {
// insert block to the chain
self.chain.insert_block(&message.block);
// decrease pending requests count
let peer = self.peers.get_mut(&peer_index).unwrap();
peer.getdata_requests -= 1;
trace!(target: "sync", "Got `block` message from peer#{}. Pending #{} requests", peer_index, peer.getdata_requests);
// if there are no pending requests, continue with next blocks chunk
if peer.getdata_requests == 0 {
trace!(target: "sync", "Sending `getblocks` to peer#{}. Local chain state: {:?}", peer_index, self.chain.info());
let getblocks = types::GetBlocks {
version: 0,
block_locator_hashes: self.chain.block_locator_hashes(),
hash_stop: H256::default(),
};
let mut connection = peer.connection.lock();
let connection = &mut *connection;
connection.send_getblocks(&getblocks);
}
}
pub fn on_peer_headers(&mut self, peer_index: usize, _message: types::Headers) {
trace!(target: "sync", "Got `headers` message from peer#{}", peer_index);
}
pub fn on_peer_mempool(&mut self, peer_index: usize, _message: types::MemPool) {
trace!(target: "sync", "Got `mempool` message from peer#{}", peer_index);
}
pub fn on_peer_filterload(&mut self, peer_index: usize, _message: types::FilterLoad) {
trace!(target: "sync", "Got `filterload` message from peer#{}", peer_index);
}
pub fn on_peer_filteradd(&mut self, peer_index: usize, _message: types::FilterAdd) {
trace!(target: "sync", "Got `filteradd` message from peer#{}", peer_index);
}
pub fn on_peer_filterclear(&mut self, peer_index: usize, _message: types::FilterClear) {
trace!(target: "sync", "Got `filterclear` message from peer#{}", peer_index);
}
pub fn on_peer_merkleblock(&mut self, peer_index: usize, _message: types::MerkleBlock) {
trace!(target: "sync", "Got `merkleblock` message from peer#{}", peer_index);
}
pub fn on_peer_sendheaders(&mut self, peer_index: usize, _message: types::SendHeaders) {
trace!(target: "sync", "Got `sendheaders` message from peer#{}", peer_index);
}
pub fn on_peer_feefilter(&mut self, peer_index: usize, _message: types::FeeFilter) {
trace!(target: "sync", "Got `feefilter` message from peer#{}", peer_index);
}
pub fn on_peer_send_compact(&mut self, peer_index: usize, _message: types::SendCompact) {
trace!(target: "sync", "Got `sendcmpct` message from peer#{}", peer_index);
}
pub fn on_peer_compact_block(&mut self, peer_index: usize, _message: types::CompactBlock) {
trace!(target: "sync", "Got `cmpctblock` message from peer#{}", peer_index);
}
pub fn on_peer_get_block_txn(&mut self, peer_index: usize, _message: types::GetBlockTxn) {
trace!(target: "sync", "Got `getblocktxn` message from peer#{}", peer_index);
}
pub fn on_peer_block_txn(&mut self, peer_index: usize, _message: types::BlockTxn) {
trace!(target: "sync", "Got `blocktxn` message from peer#{}", peer_index);
}
}