Merge branch 'master' into previous_transaction_output_provider
This commit is contained in:
commit
b90d3eebcf
|
@ -1,6 +1,7 @@
|
|||
use std::{fmt, io};
|
||||
use hex::FromHex;
|
||||
use ser::{
|
||||
Deserializable, Reader, Error as ReaderError,
|
||||
Deserializable, Reader, Error as ReaderError, deserialize,
|
||||
Serializable, Stream, serialize
|
||||
};
|
||||
use crypto::dhash256;
|
||||
|
@ -62,6 +63,12 @@ impl Deserializable for BlockHeader {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<&'static str> for BlockHeader {
|
||||
fn from(s: &'static str) -> Self {
|
||||
deserialize(&s.from_hex().unwrap() as &[u8]).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use ser::{Reader, Error as ReaderError, Stream};
|
||||
|
|
|
@ -367,6 +367,34 @@ impl Storage {
|
|||
|
||||
Ok(Some(reorganization))
|
||||
}
|
||||
|
||||
/// Decanonizes main chain until best block is the one with `hash`
|
||||
pub fn rollback(&self, hash: &H256) -> Result<Vec<H256>, Error> {
|
||||
if self.block_number(hash).is_none() {
|
||||
return Err(Error::not_main(hash));
|
||||
}
|
||||
|
||||
// lock will be held until the end of the routine
|
||||
let mut best_block = self.best_block.write();
|
||||
|
||||
let mut context = UpdateContext::new(&self.database);
|
||||
let mut result = Vec::new();
|
||||
let mut best_number = try!(self.best_number().ok_or(Error::Consistency(ConsistencyError::NoBestBlock)));
|
||||
loop {
|
||||
let next = try!(self.block_hash(best_number).ok_or(Error::unknown_number(best_number)));
|
||||
if &next == hash {
|
||||
context.db_transaction.put(Some(COL_META), KEY_BEST_BLOCK_HASH, &**hash);
|
||||
context.db_transaction.write_u32(Some(COL_META), KEY_BEST_BLOCK_NUMBER, best_number);
|
||||
try!(context.apply(&self.database));
|
||||
*best_block = Some(BestBlock { hash: next, number: best_number });
|
||||
|
||||
return Ok(result);
|
||||
}
|
||||
try!(self.decanonize_block(&mut context, &next));
|
||||
best_number -= 1;
|
||||
result.push(next);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl BlockProvider for Storage {
|
||||
|
@ -1318,6 +1346,38 @@ mod tests {
|
|||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rollback() {
|
||||
let path = RandomTempPath::create_dir();
|
||||
let store = Storage::new(path.as_path()).unwrap();
|
||||
|
||||
let genesis = test_data::genesis();
|
||||
store.insert_block(&genesis).unwrap();
|
||||
|
||||
let (main_hash1, main_block1) = test_data::block_hash_builder()
|
||||
.block()
|
||||
.header().parent(genesis.hash())
|
||||
.nonce(1)
|
||||
.build()
|
||||
.build()
|
||||
.build();
|
||||
store.insert_block(&main_block1).expect("main block 1 should insert with no problems");
|
||||
|
||||
let (main_hash2, main_block2) = test_data::block_hash_builder()
|
||||
.block()
|
||||
.header().parent(main_hash1.clone())
|
||||
.nonce(2)
|
||||
.build()
|
||||
.build()
|
||||
.build();
|
||||
store.insert_block(&main_block2).expect("main block 2 should insert with no problems");
|
||||
assert_eq!(store.best_block().unwrap().hash, main_hash2);
|
||||
|
||||
store.rollback(&main_hash1).unwrap();
|
||||
|
||||
assert_eq!(store.best_block().unwrap().hash, main_hash1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fork_route() {
|
||||
let path = RandomTempPath::create_dir();
|
||||
|
|
|
@ -19,6 +19,7 @@ pub fn build_compact_block(block: IndexedBlock, prefilled_transactions_indexes:
|
|||
let mut prefilled_transactions: Vec<PrefilledTransaction> = Vec::with_capacity(prefilled_transactions_len);
|
||||
let mut prefilled_transactions_size: usize = 0;
|
||||
|
||||
let (key0, key1) = short_transaction_id_keys(nonce, block.header());
|
||||
for (transaction_index, (transaction_hash, transaction)) in block.transactions().enumerate() {
|
||||
let transaction_size = transaction.serialized_size();
|
||||
if prefilled_transactions_size + transaction_size < MAX_COMPACT_BLOCK_PREFILLED_SIZE
|
||||
|
@ -29,7 +30,7 @@ pub fn build_compact_block(block: IndexedBlock, prefilled_transactions_indexes:
|
|||
transaction: transaction.clone(),
|
||||
})
|
||||
} else {
|
||||
short_ids.push(short_transaction_id(nonce, block.header(), transaction_hash));
|
||||
short_ids.push(short_transaction_id(key0, key1, transaction_hash));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -41,7 +42,7 @@ pub fn build_compact_block(block: IndexedBlock, prefilled_transactions_indexes:
|
|||
}
|
||||
}
|
||||
|
||||
fn short_transaction_id(nonce: u64, block_header: &BlockHeader, transaction_hash: &H256) -> ShortTransactionID {
|
||||
pub fn short_transaction_id_keys(nonce: u64, block_header: &BlockHeader) -> (u64, u64) {
|
||||
// Short transaction IDs are used to represent a transaction without sending a full 256-bit hash. They are calculated by:
|
||||
// 1) single-SHA256 hashing the block header with the nonce appended (in little-endian)
|
||||
let mut stream = Stream::new();
|
||||
|
@ -53,24 +54,68 @@ fn short_transaction_id(nonce: u64, block_header: &BlockHeader, transaction_hash
|
|||
// 64-bit integers from the above hash, respectively.
|
||||
let key0 = LittleEndian::read_u64(&block_header_with_nonce_hash[0..8]);
|
||||
let key1 = LittleEndian::read_u64(&block_header_with_nonce_hash[8..16]);
|
||||
|
||||
(key0, key1)
|
||||
}
|
||||
|
||||
pub fn short_transaction_id(key0: u64, key1: u64, transaction_hash: &H256) -> ShortTransactionID {
|
||||
// 2) Running SipHash-2-4 with the input being the transaction ID and the keys (k0/k1) set to the first two little-endian
|
||||
// 64-bit integers from the above hash, respectively.
|
||||
let siphash_transaction_hash = siphash24(key0, key1, &**transaction_hash);
|
||||
|
||||
// 3) Dropping the 2 most significant bytes from the SipHash output to make it 6 bytes.
|
||||
let mut siphash_transaction_hash_bytes = [0u8; 8];
|
||||
LittleEndian::write_u64(&mut siphash_transaction_hash_bytes, siphash_transaction_hash);
|
||||
|
||||
siphash_transaction_hash_bytes[2..8].into()
|
||||
siphash_transaction_hash_bytes[0..6].into()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashSet;
|
||||
use chain::{BlockHeader, Transaction, ShortTransactionID};
|
||||
use message::common::{BlockHeaderAndIDs, PrefilledTransaction};
|
||||
use test_data;
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn short_transaction_id_is_correct() {
|
||||
// TODO
|
||||
// https://webbtc.com/tx/fa755807ab9f3ca8a9b25982570700f3f94bb0627f373893c3cfe79b5cf16def
|
||||
let transaction: Transaction = "01000000015fe01688dd8ae4428e21835c0e1b7af571c4223658d94da0c123e6fd7399862a010000006b483045022100f9e6d1bd3c9f54dcc72405994ec9ac2795878dd0b3cfbdc52bed28c2737fbecc02201fd68deab17bfaef1626e232cc4488dc273ba6fa5d807712b111d017cb96e0990121021fff64d1a21ede90d77cafa35fe7621db8aa433d947267980b395c35d23bd87fffffffff021ea56f72000000001976a9146fae1c8e7a648fff905dfdac9b019d3e887d7e8f88ac80f0fa02000000001976a9147f29b567c7dd9fc59cd3a7f716914966cc91ffa188ac00000000".into();
|
||||
let transaction_hash = transaction.hash();
|
||||
// https://webbtc.com/block/000000000000000001582cb2307ac43f3b4b268f2a75d3581d0babd48df1c300
|
||||
let block_header: BlockHeader = "000000205a54771c6a1a2bcc8f3412184f319dc02f7258b56fd5060100000000000000001de7a03cefe565d11cdfa369f6ffe59b9368a257203726c9cc363d31b4e3c2ebca4f3c58d4e6031830ccfd80".into();
|
||||
let nonce = 13450019974716797918_u64;
|
||||
let (key0, key1) = short_transaction_id_keys(nonce, &block_header);
|
||||
let actual_id = short_transaction_id(key0, key1, &transaction_hash);
|
||||
let expected_id: ShortTransactionID = "036e8b8b8f00".into();
|
||||
assert_eq!(expected_id, actual_id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn compact_block_is_built_correctly() {
|
||||
// TODO
|
||||
let block = test_data::block_builder().header().parent(test_data::genesis().hash()).build()
|
||||
.transaction().output().value(10).build().build()
|
||||
.transaction().output().value(20).build().build()
|
||||
.transaction().output().value(30).build().build()
|
||||
.build(); // genesis -> block
|
||||
let prefilled: HashSet<_> = vec![1].into_iter().collect();
|
||||
let compact_block = build_compact_block(block.clone().into(), prefilled);
|
||||
let (key0, key1) = short_transaction_id_keys(compact_block.nonce, &block.block_header);
|
||||
let short_ids = vec![
|
||||
short_transaction_id(key0, key1, &block.transactions[0].hash()),
|
||||
short_transaction_id(key0, key1, &block.transactions[2].hash()),
|
||||
];
|
||||
assert_eq!(compact_block, BlockHeaderAndIDs {
|
||||
header: block.block_header.clone(),
|
||||
nonce: compact_block.nonce,
|
||||
short_ids: short_ids,
|
||||
prefilled_transactions: vec![
|
||||
PrefilledTransaction {
|
||||
index: 1,
|
||||
transaction: block.transactions[1].clone(),
|
||||
}
|
||||
],
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,9 +12,9 @@ use script::Script;
|
|||
/// Constant optimized to create large differences in the seed for different values of `hash_functions_num`.
|
||||
const SEED_OFFSET: u32 = 0xFBA4C795;
|
||||
/// Max last blocks to store for given peer
|
||||
const MAX_LAST_BLOCKS_TO_STORE: usize = 64;
|
||||
pub const MAX_LAST_BLOCKS_TO_STORE: usize = 64;
|
||||
/// Max last transactions to store for given peer
|
||||
const MAX_LAST_TRANSACTIONS_TO_STORE: usize = 64;
|
||||
pub const MAX_LAST_TRANSACTIONS_TO_STORE: usize = 64;
|
||||
|
||||
/// Filter, which controls data relayed over connection.
|
||||
#[derive(Debug)]
|
||||
|
@ -92,7 +92,6 @@ impl ConnectionFilter {
|
|||
|
||||
/// We have a knowledge that block with given hash is known to this connection
|
||||
pub fn known_block(&mut self, block_hash: &H256) {
|
||||
// TODO: add test for it
|
||||
// remember that peer knows about this block
|
||||
if !self.last_blocks.contains_key(block_hash) {
|
||||
if self.last_blocks.len() == MAX_LAST_BLOCKS_TO_STORE {
|
||||
|
@ -105,7 +104,6 @@ impl ConnectionFilter {
|
|||
|
||||
/// We have a knowledge that transaction with given hash is known to this connection
|
||||
pub fn known_transaction(&mut self, transaction_hash: &H256) {
|
||||
// TODO: add test for it
|
||||
// remember that peer knows about this block
|
||||
if !self.last_transactions.contains_key(transaction_hash) {
|
||||
if self.last_transactions.len() == MAX_LAST_TRANSACTIONS_TO_STORE {
|
||||
|
@ -488,7 +486,8 @@ pub mod tests {
|
|||
use primitives::hash::H256;
|
||||
use primitives::bytes::Bytes;
|
||||
use ser::serialize;
|
||||
use super::{ConnectionFilter, ConnectionBloom, PartialMerkleTree};
|
||||
use super::{ConnectionFilter, ConnectionBloom, PartialMerkleTree,
|
||||
MAX_LAST_BLOCKS_TO_STORE, MAX_LAST_TRANSACTIONS_TO_STORE};
|
||||
|
||||
pub fn default_filterload() -> types::FilterLoad {
|
||||
types::FilterLoad {
|
||||
|
@ -655,4 +654,44 @@ pub mod tests {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn block_is_filtered_out_when_it_is_received_from_peer() {
|
||||
let blocks = test_data::build_n_empty_blocks_from_genesis((MAX_LAST_BLOCKS_TO_STORE + 1) as u32, 1);
|
||||
|
||||
let mut filter = ConnectionFilter::default();
|
||||
assert!(filter.filter_block(&blocks[0].hash()));
|
||||
|
||||
filter.known_block(&blocks[0].hash());
|
||||
assert!(!filter.filter_block(&blocks[0].hash()));
|
||||
|
||||
for block in blocks.iter().skip(1).take(MAX_LAST_BLOCKS_TO_STORE - 1) {
|
||||
filter.known_block(&block.hash());
|
||||
assert!(!filter.filter_block(&blocks[0].hash()));
|
||||
}
|
||||
|
||||
filter.known_block(&blocks[MAX_LAST_BLOCKS_TO_STORE].hash());
|
||||
assert!(filter.filter_block(&blocks[0].hash()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn transaction_is_filtered_out_when_it_is_received_from_peer() {
|
||||
let transactions: Vec<Transaction> = (0..MAX_LAST_TRANSACTIONS_TO_STORE + 1)
|
||||
.map(|version| test_data::TransactionBuilder::with_version(version as i32).into())
|
||||
.collect();
|
||||
|
||||
let mut filter = ConnectionFilter::default();
|
||||
assert!(filter.filter_transaction(&transactions[0].hash(), &transactions[0], None));
|
||||
|
||||
filter.known_transaction(&transactions[0].hash());
|
||||
assert!(!filter.filter_transaction(&transactions[0].hash(), &transactions[0], None));
|
||||
|
||||
for transaction in transactions.iter().skip(1).take(MAX_LAST_TRANSACTIONS_TO_STORE - 1) {
|
||||
filter.known_transaction(&transaction.hash());
|
||||
assert!(!filter.filter_transaction(&transactions[0].hash(), &transactions[0], None));
|
||||
}
|
||||
|
||||
filter.known_transaction(&transactions[MAX_LAST_TRANSACTIONS_TO_STORE].hash());
|
||||
assert!(filter.filter_transaction(&transactions[0].hash(), &transactions[0], None));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,8 +67,6 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
|
|||
trace!(target: "sync", "Starting new sync session with peer#{}", peer_index);
|
||||
|
||||
// request inventory from peer
|
||||
// TODO: bitcoind doesn't respond to the `getheaders` request while it is synchronizing
|
||||
// but it answers to the `inventory` request
|
||||
self.executor.lock().execute(SynchronizationTask::RequestBlocksHeaders(peer_index));
|
||||
}
|
||||
|
||||
|
|
|
@ -1163,14 +1163,15 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
|||
let blocks_headers_to_verify: Vec<_> = blocks_to_verify.iter().map(|&(ref h, ref b)| (h.clone(), b.header().clone())).collect();
|
||||
chain.verify_blocks(blocks_headers_to_verify);
|
||||
// remember that we are verifying block from this peer
|
||||
self.verifying_blocks_by_peer.insert(block_hash.clone(), peer_index);
|
||||
for verifying_block_hash in blocks_to_verify.iter().map(|&(ref h, _)| h.clone()) {
|
||||
self.verifying_blocks_by_peer.insert(verifying_block_hash, peer_index);
|
||||
}
|
||||
match self.verifying_blocks_futures.entry(peer_index) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().0.insert(block_hash.clone());
|
||||
entry.get_mut().0.extend(blocks_to_verify.iter().map(|&(ref h, _)| h.clone()));
|
||||
},
|
||||
Entry::Vacant(entry) => {
|
||||
let mut block_hashes = HashSet::new();
|
||||
block_hashes.insert(block_hash.clone());
|
||||
let block_hashes: HashSet<_> = blocks_to_verify.iter().map(|&(ref h, _)| h.clone()).collect();
|
||||
entry.insert((block_hashes, Vec::new()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,7 +9,7 @@ use message::common::{InventoryVector, InventoryType};
|
|||
use db;
|
||||
use chain::{BlockHeader, Transaction};
|
||||
use primitives::hash::H256;
|
||||
use synchronization_chain::{ChainRef, TransactionState};
|
||||
use synchronization_chain::{Chain, ChainRef, TransactionState};
|
||||
use synchronization_executor::{Task, TaskExecutor};
|
||||
use synchronization_client::FilteredInventory;
|
||||
use message::types;
|
||||
|
@ -26,7 +26,6 @@ pub trait Server : Send + Sync + 'static {
|
|||
|
||||
/// Synchronization requests server
|
||||
pub struct SynchronizationServer {
|
||||
chain: ChainRef,
|
||||
queue_ready: Arc<Condvar>,
|
||||
queue: Arc<Mutex<ServerQueue>>,
|
||||
worker_thread: Option<thread::JoinHandle<()>>,
|
||||
|
@ -87,10 +86,6 @@ impl IndexedServerTask {
|
|||
}
|
||||
|
||||
impl IndexedServerTask {
|
||||
fn ignore(id: u32) -> Self {
|
||||
IndexedServerTask::new(ServerTask::Ignore, ServerTaskIndex::Final(id))
|
||||
}
|
||||
|
||||
pub fn future<T: Server>(self, peer_index: usize, server: Weak<T>) -> BoxFuture<(), ()> {
|
||||
lazy(move || {
|
||||
server.upgrade().map(|s| s.add_task(peer_index, self));
|
||||
|
@ -102,15 +97,14 @@ impl IndexedServerTask {
|
|||
#[derive(Debug, PartialEq)]
|
||||
pub enum ServerTask {
|
||||
ServeGetData(FilteredInventory),
|
||||
ServeGetBlocks(db::BestBlock, H256),
|
||||
ServeGetHeaders(db::BestBlock, H256),
|
||||
ServeGetBlocks(Vec<H256>, H256),
|
||||
ServeGetHeaders(Vec<H256>, H256),
|
||||
ServeGetBlockTxn(H256, Vec<usize>),
|
||||
ServeMempool,
|
||||
ReturnNotFound(Vec<InventoryVector>),
|
||||
ReturnBlock(H256),
|
||||
ReturnMerkleBlock(types::MerkleBlock),
|
||||
ReturnTransaction(Transaction),
|
||||
Ignore,
|
||||
}
|
||||
|
||||
impl SynchronizationServer {
|
||||
|
@ -118,7 +112,6 @@ impl SynchronizationServer {
|
|||
let queue_ready = Arc::new(Condvar::new());
|
||||
let queue = Arc::new(Mutex::new(ServerQueue::new(queue_ready.clone())));
|
||||
let mut server = SynchronizationServer {
|
||||
chain: chain.clone(),
|
||||
queue_ready: queue_ready.clone(),
|
||||
queue: queue.clone(),
|
||||
worker_thread: None,
|
||||
|
@ -129,14 +122,14 @@ impl SynchronizationServer {
|
|||
server
|
||||
}
|
||||
|
||||
fn locate_known_block_hash(&self, block_locator_hashes: Vec<H256>) -> Option<db::BestBlock> {
|
||||
fn locate_known_block_hash(chain: &Chain, block_locator_hashes: &Vec<H256>) -> Option<db::BestBlock> {
|
||||
block_locator_hashes.into_iter()
|
||||
.filter_map(|hash| SynchronizationServer::locate_best_known_block_hash(&self.chain, &hash))
|
||||
.filter_map(|hash| SynchronizationServer::locate_best_known_block_hash(&chain, hash))
|
||||
.nth(0)
|
||||
}
|
||||
|
||||
fn locate_known_block_header(&self, block_locator_hashes: Vec<H256>) -> Option<db::BestBlock> {
|
||||
self.locate_known_block_hash(block_locator_hashes)
|
||||
fn locate_known_block_header(chain: &Chain, block_locator_hashes: &Vec<H256>) -> Option<db::BestBlock> {
|
||||
SynchronizationServer::locate_known_block_hash(chain, block_locator_hashes)
|
||||
}
|
||||
|
||||
fn server_worker<T: TaskExecutor>(queue_ready: Arc<Condvar>, queue: Arc<Mutex<ServerQueue>>, chain: ChainRef, executor: Arc<Mutex<T>>) {
|
||||
|
@ -217,31 +210,50 @@ impl SynchronizationServer {
|
|||
queue.lock().task_processed(peer_index);
|
||||
},
|
||||
// `getblocks` => `inventory`
|
||||
ServerTask::ServeGetBlocks(best_block, hash_stop) => {
|
||||
let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_block, &hash_stop, 500);
|
||||
if !blocks_hashes.is_empty() {
|
||||
trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", blocks_hashes.len(), peer_index);
|
||||
let inventory = blocks_hashes.into_iter().map(|hash| InventoryVector {
|
||||
inv_type: InventoryType::MessageBlock,
|
||||
hash: hash,
|
||||
}).collect();
|
||||
executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id));
|
||||
} else {
|
||||
assert_eq!(indexed_task.id, ServerTaskIndex::None);
|
||||
ServerTask::ServeGetBlocks(block_locator_hashes, hash_stop) => {
|
||||
assert_eq!(indexed_task.id, ServerTaskIndex::None);
|
||||
|
||||
let chain = chain.read();
|
||||
if let Some(best_common_block) = SynchronizationServer::locate_known_block_hash(&chain, &block_locator_hashes) {
|
||||
trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
|
||||
|
||||
let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_common_block, &hash_stop, 500);
|
||||
if !blocks_hashes.is_empty() {
|
||||
trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", blocks_hashes.len(), peer_index);
|
||||
let inventory = blocks_hashes.into_iter().map(|hash| InventoryVector {
|
||||
inv_type: InventoryType::MessageBlock,
|
||||
hash: hash,
|
||||
}).collect();
|
||||
executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id));
|
||||
}
|
||||
}
|
||||
else {
|
||||
trace!(target: "sync", "No common blocks with peer#{}", peer_index);
|
||||
}
|
||||
// inform that we have processed task for peer
|
||||
queue.lock().task_processed(peer_index);
|
||||
},
|
||||
// `getheaders` => `headers`
|
||||
ServerTask::ServeGetHeaders(best_block, hash_stop) => {
|
||||
// What if we have no common blocks with peer at all? Maybe drop connection or penalize peer?
|
||||
// https://github.com/ethcore/parity-bitcoin/pull/91#discussion_r86734568
|
||||
let blocks_headers = SynchronizationServer::blocks_headers_after(&chain, &best_block, &hash_stop, 2000);
|
||||
if !blocks_headers.is_empty() {
|
||||
trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_headers.len(), peer_index);
|
||||
executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers, indexed_task.id));
|
||||
} else if let Some(response_id) = indexed_task.id.raw() {
|
||||
executor.lock().execute(Task::Ignore(peer_index, response_id));
|
||||
ServerTask::ServeGetHeaders(block_locator_hashes, hash_stop) => {
|
||||
let chain = chain.read();
|
||||
if let Some(best_common_block) = SynchronizationServer::locate_known_block_header(&chain, &block_locator_hashes) {
|
||||
trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash.to_reversed_str());
|
||||
|
||||
// What if we have no common blocks with peer at all? Maybe drop connection or penalize peer?
|
||||
// https://github.com/ethcore/parity-bitcoin/pull/91#discussion_r86734568
|
||||
let blocks_headers = SynchronizationServer::blocks_headers_after(&chain, &best_common_block, &hash_stop, 2000);
|
||||
if !blocks_headers.is_empty() {
|
||||
trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_headers.len(), peer_index);
|
||||
executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers, indexed_task.id));
|
||||
} else if let Some(response_id) = indexed_task.id.raw() {
|
||||
executor.lock().execute(Task::Ignore(peer_index, response_id));
|
||||
}
|
||||
}
|
||||
else {
|
||||
trace!(target: "sync", "No common blocks headers with peer#{}", peer_index);
|
||||
if let Some(response_id) = indexed_task.id.raw() {
|
||||
executor.lock().execute(Task::Ignore(peer_index, response_id));
|
||||
}
|
||||
}
|
||||
// inform that we have processed task for peer
|
||||
queue.lock().task_processed(peer_index);
|
||||
|
@ -326,18 +338,11 @@ impl SynchronizationServer {
|
|||
ServerTask::ReturnTransaction(transaction) => {
|
||||
executor.lock().execute(Task::SendTransaction(peer_index, transaction));
|
||||
}
|
||||
// ignore
|
||||
ServerTask::Ignore => {
|
||||
let response_id = indexed_task.id.raw().expect("do not schedule redundant ignore task");
|
||||
executor.lock().execute(Task::Ignore(peer_index, response_id));
|
||||
queue.lock().task_processed(peer_index);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn blocks_hashes_after(chain: &ChainRef, best_block: &db::BestBlock, hash_stop: &H256, max_hashes: u32) -> Vec<H256> {
|
||||
let chain = chain.read();
|
||||
fn blocks_hashes_after(chain: &Chain, best_block: &db::BestBlock, hash_stop: &H256, max_hashes: u32) -> Vec<H256> {
|
||||
// check that chain has not reorganized since task was queued
|
||||
if chain.block_hash(best_block.number).map(|h| h != best_block.hash).unwrap_or(true) {
|
||||
return Vec::new();
|
||||
|
@ -354,8 +359,7 @@ impl SynchronizationServer {
|
|||
.collect()
|
||||
}
|
||||
|
||||
fn blocks_headers_after(chain: &ChainRef, best_block: &db::BestBlock, hash_stop: &H256, max_hashes: u32) -> Vec<BlockHeader> {
|
||||
let chain = chain.read();
|
||||
fn blocks_headers_after(chain: &Chain, best_block: &db::BestBlock, hash_stop: &H256, max_hashes: u32) -> Vec<BlockHeader> {
|
||||
// check that chain has not reorganized since task was queued
|
||||
if chain.block_hash(best_block.number).map(|h| h != best_block.hash).unwrap_or(true) {
|
||||
return Vec::new();
|
||||
|
@ -373,8 +377,7 @@ impl SynchronizationServer {
|
|||
}
|
||||
|
||||
|
||||
fn locate_best_known_block_hash(chain: &ChainRef, hash: &H256) -> Option<db::BestBlock> {
|
||||
let chain = chain.read();
|
||||
fn locate_best_known_block_hash(chain: &Chain, hash: &H256) -> Option<db::BestBlock> {
|
||||
match chain.block_number(hash) {
|
||||
Some(number) => Some(db::BestBlock {
|
||||
number: number,
|
||||
|
@ -420,33 +423,15 @@ impl Server for SynchronizationServer {
|
|||
Some(task)
|
||||
}
|
||||
|
||||
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) -> Option<IndexedServerTask> {
|
||||
if let Some(best_common_block) = self.locate_known_block_hash(message.block_locator_hashes) {
|
||||
trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
|
||||
let task = IndexedServerTask::new(ServerTask::ServeGetBlocks(best_common_block, message.hash_stop), ServerTaskIndex::None);
|
||||
Some(task)
|
||||
}
|
||||
else {
|
||||
trace!(target: "sync", "No common blocks with peer#{}", peer_index);
|
||||
None
|
||||
}
|
||||
fn serve_getblocks(&self, _peer_index: usize, message: types::GetBlocks) -> Option<IndexedServerTask> {
|
||||
let task = IndexedServerTask::new(ServerTask::ServeGetBlocks(message.block_locator_hashes, message.hash_stop), ServerTaskIndex::None);
|
||||
Some(task)
|
||||
}
|
||||
|
||||
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option<u32>) -> Option<IndexedServerTask> {
|
||||
if let Some(best_common_block) = self.locate_known_block_header(message.block_locator_hashes) {
|
||||
trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash.to_reversed_str());
|
||||
let server_task_index = id.map_or_else(|| ServerTaskIndex::None, ServerTaskIndex::Final);
|
||||
let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(best_common_block, message.hash_stop), server_task_index);
|
||||
Some(task)
|
||||
}
|
||||
else {
|
||||
trace!(target: "sync", "No common blocks headers with peer#{}", peer_index);
|
||||
if let Some(id) = id {
|
||||
Some(IndexedServerTask::ignore(id))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
fn serve_getheaders(&self, _peer_index: usize, message: types::GetHeaders, id: Option<u32>) -> Option<IndexedServerTask> {
|
||||
let server_task_index = id.map_or_else(|| ServerTaskIndex::None, ServerTaskIndex::Final);
|
||||
let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(message.block_locator_hashes, message.hash_stop), server_task_index);
|
||||
Some(task)
|
||||
}
|
||||
|
||||
fn serve_get_block_txn(&self, _peer_index: usize, block_hash: H256, indexes: Vec<usize>) -> Option<IndexedServerTask> {
|
||||
|
@ -581,18 +566,12 @@ pub mod tests {
|
|||
}
|
||||
|
||||
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) -> Option<IndexedServerTask> {
|
||||
self.tasks.lock().push((peer_index, ServerTask::ServeGetBlocks(db::BestBlock {
|
||||
number: 0,
|
||||
hash: message.block_locator_hashes[0].clone(),
|
||||
}, message.hash_stop)));
|
||||
self.tasks.lock().push((peer_index, ServerTask::ServeGetBlocks(message.block_locator_hashes, message.hash_stop)));
|
||||
None
|
||||
}
|
||||
|
||||
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, _id: Option<u32>) -> Option<IndexedServerTask> {
|
||||
self.tasks.lock().push((peer_index, ServerTask::ServeGetHeaders(db::BestBlock {
|
||||
number: 0,
|
||||
hash: message.block_locator_hashes[0].clone(),
|
||||
}, message.hash_stop)));
|
||||
self.tasks.lock().push((peer_index, ServerTask::ServeGetHeaders(message.block_locator_hashes, message.hash_stop)));
|
||||
None
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue