segwit: sync changes

This commit is contained in:
Svyatoslav Nikolsky 2017-08-18 15:13:39 +03:00
parent 1e28ec4ed5
commit 062a68204c
11 changed files with 140 additions and 60 deletions

View File

@ -395,8 +395,8 @@ impl Context {
}
}
pub fn create_sync_session(&self, start_height: i32, outbound_connection: OutboundSyncConnectionRef) -> InboundSyncConnectionRef {
self.local_sync_node.create_sync_session(start_height, outbound_connection)
pub fn create_sync_session(&self, start_height: i32, services: Services, outbound_connection: OutboundSyncConnectionRef) -> InboundSyncConnectionRef {
self.local_sync_node.create_sync_session(start_height, services, outbound_connection)
}
pub fn connections(&self) -> &Connections {

View File

@ -1,6 +1,6 @@
use std::sync::Arc;
use bytes::Bytes;
use message::{Command, Error, Payload, types, deserialize_payload};
use message::{Command, Error, Payload, Services, types, deserialize_payload};
use protocol::Protocol;
use net::PeerContext;
use ser::SERIALIZE_TRANSACTION_WITNESS;
@ -10,7 +10,7 @@ pub type OutboundSyncConnectionRef = Arc<OutboundSyncConnection>;
pub type LocalSyncNodeRef = Box<LocalSyncNode>;
pub trait LocalSyncNode : Send + Sync {
fn create_sync_session(&self, height: i32, outbound: OutboundSyncConnectionRef) -> InboundSyncConnectionRef;
fn create_sync_session(&self, height: i32, services: Services, outbound: OutboundSyncConnectionRef) -> InboundSyncConnectionRef;
}
pub trait InboundSyncConnection : Send + Sync {
@ -183,7 +183,7 @@ pub struct SyncProtocol {
impl SyncProtocol {
pub fn new(context: Arc<PeerContext>) -> Self {
let outbound_connection = Arc::new(OutboundSync::new(context.clone()));
let inbound_connection = context.global().create_sync_session(0, outbound_connection);
let inbound_connection = context.global().create_sync_session(0, context.info().version_message.services(), outbound_connection);
SyncProtocol {
inbound_connection: inbound_connection,
context: context,

View File

@ -1,5 +1,6 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use p2p::{LocalSyncNode, LocalSyncNodeRef, OutboundSyncConnectionRef, InboundSyncConnectionRef};
use message::Services;
use inbound_connection::InboundConnection;
use types::{PeersRef, LocalNodeRef};
@ -30,11 +31,11 @@ impl InboundConnectionFactory {
}
impl LocalSyncNode for InboundConnectionFactory {
fn create_sync_session(&self, _best_block_height: i32, outbound_connection: OutboundSyncConnectionRef) -> InboundSyncConnectionRef {
fn create_sync_session(&self, _best_block_height: i32, services: Services, outbound_connection: OutboundSyncConnectionRef) -> InboundSyncConnectionRef {
let peer_index = self.counter.fetch_add(1, Ordering::SeqCst) + 1;
trace!(target: "sync", "Creating new sync session with peer#{}", peer_index);
// remember outbound connection
self.peers.insert(peer_index, outbound_connection);
self.peers.insert(peer_index, services, outbound_connection);
// create new inbound connection
InboundConnection::new(peer_index, self.peers.clone(), self.node.clone()).boxed()
}

View File

@ -41,6 +41,7 @@ pub use types::PeersRef;
use std::sync::Arc;
use parking_lot::RwLock;
use message::Services;
use network::{Magic, ConsensusParams};
use primitives::hash::H256;
use verification::BackwardsCompatibleChainVerifier as ChainVerifier;
@ -107,7 +108,11 @@ pub fn create_local_sync_node(consensus: ConsensusParams, db: db::SharedStore, p
let memory_pool = Arc::new(RwLock::new(MemoryPool::new()));
let sync_state = SynchronizationStateRef::new(SynchronizationState::with_storage(db.clone()));
let sync_chain = SyncChain::new(db.clone(), memory_pool.clone());
let sync_chain = SyncChain::new(db.clone(), consensus.clone(), memory_pool.clone());
if sync_chain.is_segwit_active() {
peers.require_peer_services(Services::default().with_witness(true));
}
let chain_verifier = Arc::new(ChainVerifier::new(db.clone(), consensus.clone()));
let sync_executor = SyncExecutor::new(peers.clone());
let sync_server = Arc::new(ServerImpl::new(peers.clone(), db.clone(), memory_pool.clone(), sync_executor.clone()));

View File

@ -374,7 +374,7 @@ pub mod tests {
let memory_pool = Arc::new(RwLock::new(MemoryPool::new()));
let storage = Arc::new(BlockChainDatabase::init_test_chain(vec![test_data::genesis().into()]));
let sync_state = SynchronizationStateRef::new(SynchronizationState::with_storage(storage.clone()));
let chain = Chain::new(storage.clone(), memory_pool.clone());
let chain = Chain::new(storage.clone(), ConsensusParams::new(Magic::Unitest, ConsensusFork::NoFork), memory_pool.clone());
let sync_peers = Arc::new(PeersImpl::default());
let executor = DummyTaskExecutor::new();
let server = Arc::new(DummyServer::new());

View File

@ -4,10 +4,12 @@ use linked_hash_map::LinkedHashMap;
use chain::{BlockHeader, Transaction, IndexedBlockHeader, IndexedBlock, IndexedTransaction};
use db;
use miner::{MemoryPoolOrderingStrategy, MemoryPoolInformation};
use network::ConsensusParams;
use primitives::bytes::Bytes;
use primitives::hash::H256;
use utils::{BestHeadersChain, BestHeadersChainInformation, HashQueueChain, HashPosition};
use types::{BlockHeight, StorageRef, MemoryPoolRef};
use verification::Deployments;
/// Index of 'verifying' queue
const VERIFYING_QUEUE: usize = 0;
@ -104,6 +106,8 @@ pub struct Chain {
best_storage_block: db::BestBlock,
/// Local blocks storage
storage: StorageRef,
/// Consensus params.
consensus: ConsensusParams,
/// In-memory queue of blocks hashes
hash_chain: HashQueueChain,
/// In-memory queue of blocks headers
@ -114,6 +118,8 @@ pub struct Chain {
memory_pool: MemoryPoolRef,
/// Blocks that have been marked as dead-ends
dead_end_blocks: HashSet<H256>,
/// Is SegWit active?
is_segwit_active: bool,
}
impl BlockState {
@ -138,22 +144,25 @@ impl BlockState {
impl Chain {
/// Create new `Chain` with given storage
pub fn new(storage: StorageRef, memory_pool: MemoryPoolRef) -> Self {
pub fn new(storage: StorageRef, consensus: ConsensusParams, memory_pool: MemoryPoolRef) -> Self {
// we only work with storages with genesis block
let genesis_block_hash = storage.block_hash(0)
.expect("storage with genesis block is required");
let best_storage_block = storage.best_block();
let best_storage_block_hash = best_storage_block.hash.clone();
let is_segwit_active = Deployments::new().segwit(best_storage_block.number, storage.as_block_header_provider(), &consensus);
Chain {
genesis_block_hash: genesis_block_hash,
best_storage_block: best_storage_block,
storage: storage,
consensus: consensus,
hash_chain: HashQueueChain::with_number_of_queues(NUMBER_OF_QUEUES),
headers_chain: BestHeadersChain::new(best_storage_block_hash),
verifying_transactions: LinkedHashMap::new(),
memory_pool: memory_pool,
dead_end_blocks: HashSet::new(),
is_segwit_active: is_segwit_active,
}
}
@ -179,6 +188,11 @@ impl Chain {
self.memory_pool.clone()
}
/// Is segwit active
pub fn is_segwit_active(&self) -> bool {
self.is_segwit_active
}
/// Get number of blocks in given state
pub fn length_of_blocks_state(&self, state: BlockState) -> BlockHeight {
match state {
@ -348,7 +362,8 @@ impl Chain {
self.storage.canonize(block.hash())?;
// remember new best block hash
self.best_storage_block = self.storage.best_block();
self.best_storage_block = self.storage.as_store().best_block();
self.is_segwit_active = Deployments::new().segwit(self.best_storage_block.number, self.storage.as_block_header_provider(), &self.consensus);
// remove inserted block + handle possible reorganization in headers chain
// TODO: mk, not sure if we need both of those params
@ -384,6 +399,7 @@ impl Chain {
// remember new best block hash
self.best_storage_block = self.storage.best_block();
self.is_segwit_active = Deployments::new().segwit(self.best_storage_block.number, self.storage.as_block_header_provider(), &self.consensus);
// remove inserted block + handle possible reorganization in headers chain
// TODO: mk, not sure if we need both of those params
@ -723,6 +739,7 @@ mod tests {
use chain::{Transaction, IndexedBlockHeader};
use db::BlockChainDatabase;
use miner::MemoryPool;
use network::{Magic, ConsensusParams, ConsensusFork};
use primitives::hash::H256;
use super::{Chain, BlockState, TransactionState, BlockInsertionResult};
use utils::HashPosition;
@ -731,7 +748,7 @@ mod tests {
fn chain_empty() {
let db = Arc::new(BlockChainDatabase::init_test_chain(vec![test_data::genesis().into()]));
let db_best_block = db.best_block();
let chain = Chain::new(db.clone(), Arc::new(RwLock::new(MemoryPool::new())));
let chain = Chain::new(db.clone(), ConsensusParams::new(Magic::Unitest, ConsensusFork::NoFork), Arc::new(RwLock::new(MemoryPool::new())));
assert_eq!(chain.information().scheduled, 0);
assert_eq!(chain.information().requested, 0);
assert_eq!(chain.information().verifying, 0);
@ -748,7 +765,7 @@ mod tests {
#[test]
fn chain_block_path() {
let db = Arc::new(BlockChainDatabase::init_test_chain(vec![test_data::genesis().into()]));
let mut chain = Chain::new(db.clone(), Arc::new(RwLock::new(MemoryPool::new())));
let mut chain = Chain::new(db.clone(), ConsensusParams::new(Magic::Unitest, ConsensusFork::NoFork), Arc::new(RwLock::new(MemoryPool::new())));
// add 6 blocks to scheduled queue
let blocks = test_data::build_n_empty_blocks_from_genesis(6, 0);
@ -800,7 +817,7 @@ mod tests {
#[test]
fn chain_block_locator_hashes() {
let db = Arc::new(BlockChainDatabase::init_test_chain(vec![test_data::genesis().into()]));
let mut chain = Chain::new(db, Arc::new(RwLock::new(MemoryPool::new())));
let mut chain = Chain::new(db, ConsensusParams::new(Magic::Unitest, ConsensusFork::NoFork), Arc::new(RwLock::new(MemoryPool::new())));
let genesis_hash = chain.best_block().hash;
assert_eq!(chain.block_locator_hashes(), vec![genesis_hash.clone()]);
@ -885,7 +902,7 @@ mod tests {
#[test]
fn chain_transaction_state() {
let db = Arc::new(BlockChainDatabase::init_test_chain(vec![test_data::genesis().into()]));
let mut chain = Chain::new(db, Arc::new(RwLock::new(MemoryPool::new())));
let mut chain = Chain::new(db, ConsensusParams::new(Magic::Unitest, ConsensusFork::NoFork), Arc::new(RwLock::new(MemoryPool::new())));
let genesis_block = test_data::genesis();
let block1 = test_data::block_h1();
let tx1: Transaction = test_data::TransactionBuilder::with_version(1).into();
@ -922,7 +939,7 @@ mod tests {
let tx2_hash = tx2.hash();
let db = Arc::new(BlockChainDatabase::init_test_chain(vec![b0.into()]));
let mut chain = Chain::new(db, Arc::new(RwLock::new(MemoryPool::new())));
let mut chain = Chain::new(db, ConsensusParams::new(Magic::Unitest, ConsensusFork::NoFork), Arc::new(RwLock::new(MemoryPool::new())));
chain.verify_transaction(tx1.into());
chain.insert_verified_transaction(tx2.into());
@ -946,7 +963,7 @@ mod tests {
.set_default_input(0).set_output(400).store(test_chain); // t4
let db = Arc::new(BlockChainDatabase::init_test_chain(vec![test_data::genesis().into()]));
let mut chain = Chain::new(db, Arc::new(RwLock::new(MemoryPool::new())));
let mut chain = Chain::new(db, ConsensusParams::new(Magic::Unitest, ConsensusFork::NoFork), Arc::new(RwLock::new(MemoryPool::new())));
chain.verify_transaction(test_chain.at(0).into());
chain.verify_transaction(test_chain.at(1).into());
chain.verify_transaction(test_chain.at(2).into());
@ -968,7 +985,7 @@ mod tests {
.set_default_input(0).set_output(400).store(test_chain); // t4
let db = Arc::new(BlockChainDatabase::init_test_chain(vec![test_data::genesis().into()]));
let mut chain = Chain::new(db, Arc::new(RwLock::new(MemoryPool::new())));
let mut chain = Chain::new(db, ConsensusParams::new(Magic::Unitest, ConsensusFork::NoFork), Arc::new(RwLock::new(MemoryPool::new())));
chain.insert_verified_transaction(test_chain.at(0).into());
chain.insert_verified_transaction(test_chain.at(1).into());
chain.insert_verified_transaction(test_chain.at(2).into());
@ -994,7 +1011,7 @@ mod tests {
let tx2_hash = tx2.hash();
let db = Arc::new(BlockChainDatabase::init_test_chain(vec![b0.into()]));
let mut chain = Chain::new(db, Arc::new(RwLock::new(MemoryPool::new())));
let mut chain = Chain::new(db, ConsensusParams::new(Magic::Unitest, ConsensusFork::NoFork), Arc::new(RwLock::new(MemoryPool::new())));
chain.verify_transaction(tx1.into());
chain.insert_verified_transaction(tx2.into());
@ -1042,7 +1059,7 @@ mod tests {
let tx5 = b5.transactions[0].clone();
let db = Arc::new(BlockChainDatabase::init_test_chain(vec![genesis.into()]));
let mut chain = Chain::new(db, Arc::new(RwLock::new(MemoryPool::new())));
let mut chain = Chain::new(db, ConsensusParams::new(Magic::Unitest, ConsensusFork::NoFork), Arc::new(RwLock::new(MemoryPool::new())));
chain.insert_verified_transaction(tx3.into());
chain.insert_verified_transaction(tx4.into());
@ -1086,7 +1103,7 @@ mod tests {
// insert tx2 to memory pool
let db = Arc::new(BlockChainDatabase::init_test_chain(vec![test_data::genesis().into()]));
let mut chain = Chain::new(db, Arc::new(RwLock::new(MemoryPool::new())));
let mut chain = Chain::new(db, ConsensusParams::new(Magic::Unitest, ConsensusFork::NoFork), Arc::new(RwLock::new(MemoryPool::new())));
chain.insert_verified_transaction(tx2.clone().into());
chain.insert_verified_transaction(tx3.clone().into());
// insert verified block with tx1
@ -1105,7 +1122,7 @@ mod tests {
.reset().set_input(&data_chain.at(0), 0).add_output(30).store(data_chain); // transaction0 -> transaction2
let db = Arc::new(BlockChainDatabase::init_test_chain(vec![test_data::genesis().into()]));
let mut chain = Chain::new(db, Arc::new(RwLock::new(MemoryPool::new())));
let mut chain = Chain::new(db, ConsensusParams::new(Magic::Unitest, ConsensusFork::NoFork), Arc::new(RwLock::new(MemoryPool::new())));
chain.insert_verified_transaction(data_chain.at(1).into());
assert_eq!(chain.information().transactions.transactions_count, 1);
chain.insert_verified_transaction(data_chain.at(2).into());

View File

@ -6,7 +6,7 @@ use futures::Future;
use parking_lot::Mutex;
use time::precise_time_s;
use chain::{IndexedBlockHeader, IndexedTransaction, Transaction, IndexedBlock};
use message::types;
use message::{types, Services};
use message::common::{InventoryType, InventoryVector};
use miner::transaction_fee_rate;
use primitives::hash::H256;
@ -226,6 +226,8 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
}
// else ask for all unknown transactions and blocks
let is_segwit_active = self.chain.is_segwit_active();
let ask_for_witness = is_segwit_active && self.peers.is_segwit_enabled(peer_index);
let unknown_inventory: Vec<_> = message.inventory.into_iter()
.filter(|item| {
match item.inv_type {
@ -253,6 +255,24 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
}
}
})
// we are not synchronizing =>
// 1) either segwit is active and we are connected to segwit-enabled nodes => we could ask for witness
// 2) or segwit is inactive => we shall not ask for witness
.map(|item| if !ask_for_witness {
item
} else {
match item.inv_type {
InventoryType::MessageTx => InventoryVector {
inv_type: InventoryType::MessageWitnessTx,
hash: item.hash,
},
InventoryType::MessageBlock => InventoryVector {
inv_type: InventoryType::MessageWitnessBlock,
hash: item.hash,
},
_ => item,
}
})
.collect();
// if everything is known => ignore this message
@ -262,7 +282,6 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
}
// ask for unknown items
// TODO: if segwit is active, ask with witness data
let message = types::GetData::with_inventory(unknown_inventory);
self.executor.execute(Task::GetData(peer_index, message));
}
@ -964,10 +983,13 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
// remember that peer is asked for these blocks
self.peers_tasks.on_blocks_requested(peer, &chunk_hashes);
// TODO: if block is believed to have witness, ask with witness data
// request blocks
// request blocks. If block is believed to have witness - ask for witness
let is_segwit_active = self.chain.is_segwit_active();
let getdata = types::GetData {
inventory: chunk_hashes.into_iter().map(InventoryVector::block).collect(),
inventory: chunk_hashes.into_iter().map(|h| InventoryVector {
inv_type: if is_segwit_active { InventoryType::MessageWitnessBlock } else { InventoryType::MessageBlock },
hash: h,
}).collect(),
};
tasks.push(Task::GetData(peer, getdata));
}
@ -1047,8 +1069,8 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
// update block processing speed
self.block_speed_meter.checkpoint();
// TODO: if segwit activates after this block, disconnect from all nodes without NODE_WITNESS support
// TODO: no more connections to !NODE_WITNESS nodes
// remember if SegWit was active before this block
let segwit_was_active = self.chain.is_segwit_active();
// remove flags
let needs_relay = !self.do_not_relay.remove(block.hash());
@ -1070,6 +1092,13 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
// update shared state
self.shared_state.update_best_storage_block_height(self.chain.best_storage_block().number);
// if SegWit activated after this block insertion:
// 1) no more connections to !NODE_WITNESS nodes
// 2) disconnect from all nodes without NODE_WITNESS support
if !segwit_was_active && self.chain.is_segwit_active() {
self.peers.require_peer_services(Services::default().with_witness(true));
}
// notify listener
if let Some(best_block_hash) = insert_result.canonized_blocks_hashes.last() {
if let Some(ref listener) = self.listener {
@ -1233,7 +1262,7 @@ pub mod tests {
use chain::{Block, Transaction};
use db::BlockChainDatabase;
use message::common::InventoryVector;
use message::types;
use message::{Services, types};
use miner::MemoryPool;
use network::{ConsensusParams, ConsensusFork, Magic};
use primitives::hash::H256;
@ -1286,7 +1315,7 @@ pub mod tests {
};
let sync_state = SynchronizationStateRef::new(SynchronizationState::with_storage(storage.clone()));
let memory_pool = Arc::new(RwLock::new(MemoryPool::new()));
let chain = Chain::new(storage.clone(), memory_pool.clone());
let chain = Chain::new(storage.clone(), ConsensusParams::new(Magic::Unitest, ConsensusFork::NoFork), memory_pool.clone());
let executor = DummyTaskExecutor::new();
let config = Config { close_connection_on_bad_block: true };
@ -2087,7 +2116,7 @@ pub mod tests {
let (_, core, sync) = create_sync(None, Some(dummy_verifier));
core.lock().peers.insert(0, DummyOutboundSyncConnection::new());
core.lock().peers.insert(0, Services::default(), DummyOutboundSyncConnection::new());
assert!(core.lock().peers.enumerate().contains(&0));
sync.on_block(0, b0.into());
@ -2108,7 +2137,7 @@ pub mod tests {
chain.mark_dead_end_block(&b0.hash());
}
core.lock().peers.insert(0, DummyOutboundSyncConnection::new());
core.lock().peers.insert(0, Services::default(), DummyOutboundSyncConnection::new());
assert!(core.lock().peers.enumerate().contains(&0));
sync.on_headers(0, types::Headers::with_headers(vec![b0.block_header.clone(), b1.block_header.clone(), b2.block_header.clone()]));
@ -2130,7 +2159,7 @@ pub mod tests {
}
core.lock().set_verify_headers(true);
core.lock().peers.insert(0, DummyOutboundSyncConnection::new());
core.lock().peers.insert(0, Services::default(), DummyOutboundSyncConnection::new());
assert!(core.lock().peers.enumerate().contains(&0));
sync.on_headers(0, types::Headers::with_headers(vec![b0.block_header.clone(), b1.block_header.clone(), b2.block_header.clone()]));
@ -2149,7 +2178,7 @@ pub mod tests {
chain.mark_dead_end_block(&b0.hash());
}
core.lock().peers.insert(0, DummyOutboundSyncConnection::new());
core.lock().peers.insert(0, Services::default(), DummyOutboundSyncConnection::new());
assert!(core.lock().peers.enumerate().contains(&0));
sync.on_block(0, b0.into());
@ -2169,7 +2198,7 @@ pub mod tests {
chain.mark_dead_end_block(&b0.hash());
}
core.lock().peers.insert(0, DummyOutboundSyncConnection::new());
core.lock().peers.insert(0, Services::default(), DummyOutboundSyncConnection::new());
assert!(core.lock().peers.enumerate().contains(&0));
sync.on_block(0, b1.into());

View File

@ -250,7 +250,7 @@ pub mod tests {
use std::time;
use parking_lot::{Mutex, Condvar};
use chain::Transaction;
use message::types;
use message::{Services, types};
use inbound_connection::tests::DummyOutboundSyncConnection;
use local_node::tests::{default_filterload, make_filteradd};
use synchronization_peers::{PeersImpl, PeersContainer, PeersFilters, PeersOptions, BlockAnnouncementType};
@ -303,9 +303,9 @@ pub mod tests {
let executor = LocalSynchronizationTaskExecutor::new(peers.clone());
let c1 = DummyOutboundSyncConnection::new();
peers.insert(1, c1.clone());
peers.insert(1, Services::default(), c1.clone());
let c2 = DummyOutboundSyncConnection::new();
peers.insert(2, c2.clone());
peers.insert(2, Services::default(), c2.clone());
peers.set_block_announcement_type(2, BlockAnnouncementType::SendCompactBlock);
executor.execute(Task::RelayNewBlock(test_data::genesis().into()));
@ -319,9 +319,9 @@ pub mod tests {
let executor = LocalSynchronizationTaskExecutor::new(peers.clone());
let c1 = DummyOutboundSyncConnection::new();
peers.insert(1, c1.clone());
peers.insert(1, Services::default(), c1.clone());
let c2 = DummyOutboundSyncConnection::new();
peers.insert(2, c2.clone());
peers.insert(2, Services::default(), c2.clone());
peers.set_block_announcement_type(2, BlockAnnouncementType::SendHeaders);
executor.execute(Task::RelayNewBlock(test_data::genesis().into()));
@ -343,26 +343,26 @@ pub mod tests {
// peer#1 wants tx1
let c1 = DummyOutboundSyncConnection::new();
peers.insert(1, c1.clone());
peers.insert(1, Services::default(), c1.clone());
peers.set_bloom_filter(1, default_filterload());
peers.update_bloom_filter(1, make_filteradd(&*tx1_hash));
// peer#2 wants tx2
let c2 = DummyOutboundSyncConnection::new();
peers.insert(2, c2.clone());
peers.insert(2, Services::default(), c2.clone());
peers.set_bloom_filter(2, default_filterload());
peers.update_bloom_filter(2, make_filteradd(&*tx2_hash));
// peer#3 wants tx1 + tx2 transactions
let c3 = DummyOutboundSyncConnection::new();
peers.insert(3, c3.clone());
peers.insert(3, Services::default(), c3.clone());
peers.set_bloom_filter(3, default_filterload());
peers.update_bloom_filter(3, make_filteradd(&*tx1_hash));
peers.update_bloom_filter(3, make_filteradd(&*tx2_hash));
// peer#4 has default behaviour (no filter)
let c4 = DummyOutboundSyncConnection::new();
peers.insert(4, c4.clone());
peers.insert(4, Services::default(), c4.clone());
// peer#5 wants some other transactions
let c5 = DummyOutboundSyncConnection::new();
peers.insert(5, c5.clone());
peers.insert(5, Services::default(), c5.clone());
peers.set_bloom_filter(5, default_filterload());
peers.update_bloom_filter(5, make_filteradd(&*tx3_hash));
@ -389,13 +389,13 @@ pub mod tests {
let executor = LocalSynchronizationTaskExecutor::new(peers.clone());
let c2 = DummyOutboundSyncConnection::new();
peers.insert(2, c2.clone());
peers.insert(2, Services::default(), c2.clone());
peers.set_fee_filter(2, types::FeeFilter::with_fee_rate(3000));
let c3 = DummyOutboundSyncConnection::new();
peers.insert(3, c3.clone());
peers.insert(3, Services::default(), c3.clone());
peers.set_fee_filter(3, types::FeeFilter::with_fee_rate(4000));
let c4 = DummyOutboundSyncConnection::new();
peers.insert(4, c4.clone());
peers.insert(4, Services::default(), c4.clone());
executor.execute(Task::RelayNewTransaction(test_data::genesis().transactions[0].clone().into(), 3500));

View File

@ -1,7 +1,7 @@
use std::collections::HashMap;
use parking_lot::RwLock;
use chain::{IndexedBlock, IndexedTransaction};
use message::types;
use message::{types, Services};
use p2p::OutboundSyncConnectionRef;
use primitives::hash::H256;
use types::PeerIndex;
@ -40,6 +40,8 @@ pub struct MerkleBlockArtefacts {
/// Connected peers
pub trait Peers : Send + Sync + PeersContainer + PeersFilters + PeersOptions {
/// Require peers services.
fn require_peer_services(&self, services: Services);
/// Get peer connection
fn connection(&self, peer_index: PeerIndex) -> Option<OutboundSyncConnectionRef>;
}
@ -49,7 +51,7 @@ pub trait PeersContainer {
/// Enumerate all known peers (TODO: iterator + separate entity 'Peer')
fn enumerate(&self) -> Vec<PeerIndex>;
/// Insert new peer connection
fn insert(&self, peer_index: PeerIndex, connection: OutboundSyncConnectionRef);
fn insert(&self, peer_index: PeerIndex, services: Services, connection: OutboundSyncConnectionRef);
/// Remove peer connection
fn remove(&self, peer_index: PeerIndex);
/// Close and remove peer connection due to misbehaving
@ -84,6 +86,8 @@ pub trait PeersFilters {
/// Options for peers connections
pub trait PeersOptions {
/// Is node supporting SegWit?
fn is_segwit_enabled(&self, peer_index: PeerIndex) -> bool;
/// Set up new block announcement type for the connection
fn set_block_announcement_type(&self, peer_index: PeerIndex, announcement_type: BlockAnnouncementType);
/// Set up new transaction announcement type for the connection
@ -94,6 +98,8 @@ pub trait PeersOptions {
struct Peer {
/// Connection to this peer
pub connection: OutboundSyncConnectionRef,
/// Peer services
pub services: Services,
/// Connection filter
pub filter: ConnectionFilter,
/// Block announcement type
@ -111,9 +117,10 @@ pub struct PeersImpl {
}
impl Peer {
pub fn with_connection(connection: OutboundSyncConnectionRef) -> Self {
pub fn new(services: Services, connection: OutboundSyncConnectionRef) -> Self {
Peer {
connection: connection,
services: services,
filter: ConnectionFilter::default(),
block_announcement_type: BlockAnnouncementType::SendInventory,
transaction_announcement_type: TransactionAnnouncementType::SendInventory,
@ -122,6 +129,19 @@ impl Peer {
}
impl Peers for PeersImpl {
fn require_peer_services(&self, services: Services) {
// possible optimization: force p2p level to establish connections to SegWit-nodes only
// without it, all other nodes will be eventually banned (this could take some time, though)
let mut peers = self.peers.write();
for peer_index in peers.iter().filter(|&(_, p)| p.services.includes(&services)).map(|(p, _)| *p).collect::<Vec<_>>() {
let peer = peers.remove(&peer_index).expect("iterating peers keys; qed");
let expected_services: u64 = services.into();
let actual_services: u64 = peer.services.into();
warn!(target: "sync", "Disconnecting from peer#{} because of insufficient services. Expected {:x}, actual: {:x}", peer_index, expected_services, actual_services);
peer.connection.close();
}
}
fn connection(&self, peer_index: PeerIndex) -> Option<OutboundSyncConnectionRef> {
self.peers.read().get(&peer_index).map(|peer| peer.connection.clone())
}
@ -132,9 +152,9 @@ impl PeersContainer for PeersImpl {
self.peers.read().keys().cloned().collect()
}
fn insert(&self, peer_index: PeerIndex, connection: OutboundSyncConnectionRef) {
fn insert(&self, peer_index: PeerIndex, services: Services, connection: OutboundSyncConnectionRef) {
trace!(target: "sync", "Connected to peer#{}", peer_index);
assert!(self.peers.write().insert(peer_index, Peer::with_connection(connection)).is_none());
assert!(self.peers.write().insert(peer_index, Peer::new(services, connection)).is_none());
}
fn remove(&self, peer_index: PeerIndex) {
@ -227,6 +247,13 @@ impl PeersFilters for PeersImpl {
}
impl PeersOptions for PeersImpl {
fn is_segwit_enabled(&self, peer_index: PeerIndex) -> bool {
self.peers.read()
.get(&peer_index)
.map(|peer| peer.services.witness())
.unwrap_or_default()
}
fn set_block_announcement_type(&self, peer_index: PeerIndex, announcement_type: BlockAnnouncementType) {
if let Some(peer) = self.peers.write().get_mut(&peer_index) {
peer.block_announcement_type = announcement_type;

View File

@ -482,7 +482,7 @@ pub mod tests {
use parking_lot::{Mutex, RwLock};
use db::{BlockChainDatabase};
use message::types;
use message::common::{self, InventoryVector, InventoryType};
use message::common::{self, Services, InventoryVector, InventoryType};
use primitives::hash::H256;
use chain::Transaction;
use inbound_connection::tests::DummyOutboundSyncConnection;
@ -664,7 +664,7 @@ pub mod tests {
fn server_get_block_txn_responds_when_good_request() {
let (_, _, executor, peers, server) = create_synchronization_server();
peers.insert(0, DummyOutboundSyncConnection::new());
peers.insert(0, Services::default(), DummyOutboundSyncConnection::new());
peers.hash_known_as(0, test_data::genesis().hash(), KnownHashType::CompactBlock);
// when asking for block_txns
@ -689,7 +689,7 @@ pub mod tests {
fn server_get_block_txn_do_not_responds_when_bad_request() {
let (_, _, _, peers, server) = create_synchronization_server();
peers.insert(0, DummyOutboundSyncConnection::new());
peers.insert(0, Services::default(), DummyOutboundSyncConnection::new());
assert!(peers.enumerate().contains(&0));
// when asking for block_txns
@ -828,7 +828,7 @@ pub mod tests {
storage.canonize(&b2.hash()).unwrap();
// This peer won't get any blocks, because it has not set filter for the connection
let peer_index2 = 1; peers.insert(peer_index2, DummyOutboundSyncConnection::new());
let peer_index2 = 1; peers.insert(peer_index2, Services::default(), DummyOutboundSyncConnection::new());
let mut loop_task = ServerTask::GetData(peer_index2, types::GetData {inventory: vec![
InventoryVector { inv_type: InventoryType::MessageFilteredBlock, hash: b1_hash.clone() },
@ -851,7 +851,7 @@ pub mod tests {
let mut counter = 2;
for (get_tx1, get_tx2) in peers_config {
let peer_index = counter; peers.insert(peer_index, DummyOutboundSyncConnection::new());
let peer_index = counter; peers.insert(peer_index, Services::default(), DummyOutboundSyncConnection::new());
counter += 1;
// setup filter
peers.set_bloom_filter(peer_index, default_filterload());
@ -922,7 +922,7 @@ pub mod tests {
storage.canonize(&b1.hash()).unwrap();
// This peer will receive compact block
let peer_index2 = 1; peers.insert(peer_index2, DummyOutboundSyncConnection::new());
let peer_index2 = 1; peers.insert(peer_index2, Services::default(), DummyOutboundSyncConnection::new());
// ask for data
let mut loop_task = ServerTask::GetData(peer_index2, types::GetData {inventory: vec![

View File

@ -109,6 +109,7 @@ pub use error::{Error, TransactionError};
pub use sigops::transaction_sigops;
pub use timestamp::median_timestamp;
pub use work::{work_required, is_valid_proof_of_work, is_valid_proof_of_work_hash, block_reward_satoshi};
pub use deployments::Deployments;
#[derive(Debug, Clone, Copy, PartialEq)]
/// Blocks verification level.