Merge branch 'master' into sync_connect_transaction_verifier

This commit is contained in:
Svyatoslav Nikolsky 2016-11-30 16:40:07 +03:00
commit b160f85e5f
12 changed files with 175 additions and 73 deletions

View File

@ -64,7 +64,7 @@ pub type SharedStore = std::sync::Arc<Store + Send + Sync>;
pub use best_block::BestBlock;
pub use storage::{Storage, Store};
pub use error::Error;
pub use error::{Error, ConsistencyError};
pub use kvdb::Database;
pub use transaction_provider::{TransactionProvider, AsTransactionProvider, PreviousTransactionOutputProvider};
pub use transaction_meta_provider::TransactionMetaProvider;

View File

@ -224,7 +224,7 @@ impl Storage {
/// all transaction meta is removed
/// DOES NOT update best block
fn decanonize_block(&self, context: &mut UpdateContext, hash: &H256) -> Result<(), Error> {
trace!(target: "reorg", "Decanonizing block {}", hash.to_reversed_str());
trace!(target: "db", "Decanonizing block {}", hash.to_reversed_str());
// ensure that block is of the main chain
try!(self.block_number(hash).ok_or(Error::not_main(hash)));
@ -299,6 +299,8 @@ impl Storage {
}
fn canonize_block(&self, context: &mut UpdateContext, at_height: u32, hash: &H256) -> Result<(), Error> {
trace!(target: "db", "Canonizing block {}", hash.to_reversed_str());
let block = try!(self.block_by_hash(hash).ok_or(Error::unknown_hash(hash)));
try!(self.update_transactions_meta(context, at_height, &mut block.transactions()));
@ -377,7 +379,7 @@ impl Storage {
// lock will be held until the end of the routine
let mut best_block = self.best_block.write();
let mut context = UpdateContext::new(&self.database);
let mut context = UpdateContext::new(&self.database, hash);
let mut result = Vec::new();
let mut best_number = try!(self.best_number().ok_or(Error::Consistency(ConsistencyError::NoBestBlock)));
loop {
@ -454,7 +456,7 @@ impl BlockStapler for Storage {
// ! lock will be held during the entire insert routine
let mut best_block = self.best_block.write();
let mut context = UpdateContext::new(&self.database);
let mut context = UpdateContext::new(&self.database, block.hash());
let block_hash = block.hash();
@ -577,6 +579,7 @@ impl BlockStapler for Storage {
// write accumulated transactions meta
try!(context.apply(&self.database));
trace!(target: "db", "Best block now ({}, {})", &new_best_hash.to_reversed_str(), &new_best_number);
// updating locked best block
*best_block = Some(BestBlock { hash: new_best_hash, number: new_best_number });
@ -1179,7 +1182,7 @@ mod tests {
.expect("Transaction meta for the genesis coinbase transaction should exist");
assert!(genesis_meta.is_spent(0), "Genesis coinbase should be recorded as spent because block#1 transaction spends it");
let mut update_context = UpdateContext::new(&store.database);
let mut update_context = UpdateContext::new(&store.database, &block_hash);
store.decanonize_block(&mut update_context, &block_hash)
.expect("Decanonizing block #1 which was just inserted should not fail");
update_context.apply(&store.database).unwrap();

View File

@ -26,6 +26,7 @@ struct TestData {
blocks: HashMap<H256, chain::Block>,
heights: HashMap<u32, H256>,
hashes: HashMap<H256, u32>,
insert_errors: HashMap<H256, Error>,
}
impl TestStorage {
@ -61,6 +62,10 @@ impl TestStorage {
let genesis_block: Block = "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a29ab5f49ffff001d1dac2b7c0101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000".into();
TestStorage::with_blocks(&vec![genesis_block])
}
pub fn insert_error(&mut self, hash: H256, err: Error) {
self.data.write().insert_errors.insert(hash, err);
}
}
impl BlockProvider for TestStorage {
@ -120,6 +125,11 @@ impl BlockStapler for TestStorage {
fn insert_block(&self, block: &chain::Block) -> Result<BlockInsertedChain, Error> {
let hash = block.hash();
let mut data = self.data.write();
if let Some(err) = data.insert_errors.remove(&hash) {
return Err(err);
}
match data.blocks.entry(hash.clone()) {
Entry::Occupied(mut entry) => {
replace(entry.get_mut(), block.clone());

View File

@ -9,14 +9,16 @@ pub struct UpdateContext {
pub meta: HashMap<H256, TransactionMeta>,
pub db_transaction: DBTransaction,
meta_snapshot: Option<HashMap<H256, TransactionMeta>>,
target: H256,
}
impl UpdateContext {
pub fn new(db: &Database) -> Self {
pub fn new(db: &Database, target: &H256) -> Self {
UpdateContext {
meta: HashMap::new(),
db_transaction: db.transaction(),
meta_snapshot: None,
target: target.clone(),
}
}
@ -27,6 +29,8 @@ impl UpdateContext {
}
try!(db.write(self.db_transaction));
trace!("Applied transaction for block {:?}", &self.target.to_reversed_str());
Ok(())
}

View File

@ -1,4 +1,5 @@
use std::{io, fs, path};
use std::collections::BTreeSet;
use ser::{ReadIterator, deserialize_iterator, Error as ReaderError};
use block::Block;
use fs::read_blk_dir;
@ -25,9 +26,11 @@ impl Iterator for BlkFile {
}
pub fn open_blk_dir<P>(path: P) -> Result<BlkDir, io::Error> where P: AsRef<path::Path> {
let iter = try!(read_blk_dir(path))
let files = read_blk_dir(path)?.collect::<Result<BTreeSet<_>, _>>()?;
let iter = files.into_iter()
// flatten results...
.flat_map(|entry| entry.and_then(|file| open_blk_file(file.path)))
.flat_map(|file| open_blk_file(file.path))
// flat iterators over each block in each file
.flat_map(|file| file);

View File

@ -1,4 +1,4 @@
use std::{io, fs, path};
use std::{io, fs, path, cmp};
/// Creates an iterator over all blk .dat files
pub fn read_blk_dir<P>(path: P) -> Result<ReadBlkDir, io::Error> where P: AsRef<path::Path> {
@ -13,10 +13,23 @@ pub struct ReadBlkDir {
read_dir: fs::ReadDir,
}
#[derive(PartialEq, Eq)]
pub struct BlkEntry {
pub path: path::PathBuf,
}
impl cmp::PartialOrd for BlkEntry {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
cmp::PartialOrd::partial_cmp(&self.path, &other.path)
}
}
impl cmp::Ord for BlkEntry {
fn cmp(&self, other: &Self) -> cmp::Ordering {
cmp::Ord::cmp(&self.path, &other.path)
}
}
fn is_blk_file_name(file_name: &str) -> bool {
if file_name.len() != 12 || !file_name.starts_with("blk") || !file_name.ends_with(".dat") {
return false;

View File

@ -31,8 +31,8 @@ impl ConsensusParams {
}
pub fn is_bip30_exception(&self, hash: &H256, height: u32) -> bool {
(height == 91842 && hash == &H256::from_reversed_str("0x00000000000a4d0a398161ffc163c503763b1f4360639393e0e4c8e300e0caec")) ||
(height == 91880 && hash == &H256::from_reversed_str("0x00000000000743f190a18c5577a3c2d2a1f610ae9601ac046a38084ccb7cd721"))
(height == 91842 && hash == &H256::from_reversed_str("00000000000a4d0a398161ffc163c503763b1f4360639393e0e4c8e300e0caec")) ||
(height == 91880 && hash == &H256::from_reversed_str("00000000000743f190a18c5577a3c2d2a1f610ae9601ac046a38084ccb7cd721"))
}
}

View File

@ -9,7 +9,7 @@ use synchronization_verifier::{Verifier, SyncVerifier, VerificationSink, Verific
use primitives::hash::H256;
use super::Error;
pub const MAX_ORPHANED_BLOCKS: usize = 64;
pub const MAX_ORPHANED_BLOCKS: usize = 1024;
pub struct BlocksWriter {
storage: db::SharedStore,

View File

@ -92,7 +92,8 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
self.client.lock().on_new_transactions_inventory(peer_index, transactions_inventory);
}
// TODO: process other inventory types
// currently we do not setup connection filter => skip InventoryType::MessageFilteredBlock
// currently we do not send sendcmpct message => skip InventoryType::MessageCompactBlock
}
pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData) {
@ -442,4 +443,36 @@ mod tests {
}
}
}
#[test]
fn local_node_serves_compactblock() {
let (_, _, _, server, local_node) = create_local_node();
let genesis = test_data::genesis();
let b1 = test_data::block_builder().header().parent(genesis.hash()).build()
.transaction().output().value(10).build().build()
.build(); // genesis -> b1
let b1_hash = b1.hash();
// This peer will provide blocks
let peer_index1 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
local_node.on_peer_block(peer_index1, types::Block { block: b1.clone() });
// This peer will receive compact block
let peer_index2 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
local_node.on_peer_getdata(peer_index2, types::GetData {inventory: vec![
InventoryVector { inv_type: InventoryType::MessageCompactBlock, hash: b1_hash.clone() },
]});
let tasks = server.take_tasks();
assert_eq!(tasks.len(), 1);
match tasks[0] {
(_, ServerTask::ServeGetData(ref gd)) => {
assert_eq!(gd.filtered.len(), 0);
assert_eq!(gd.unfiltered.len(), 0);
assert_eq!(gd.notfound.len(), 0);
assert_eq!(gd.compacted.len(), 1);
},
_ => panic!("unexpected"),
}
}
}

View File

@ -238,7 +238,9 @@ pub struct Config {
pub struct FilteredInventory {
/// Merkleblock messages + transactions to send after
pub filtered: Vec<(types::MerkleBlock, Vec<(H256, Transaction)>)>,
/// Rest of inventory with MessageTx, MessageBlock, MessageCompactBlock inventory types
/// Compactblock messages
pub compacted: Vec<types::CompactBlock>,
/// Rest of inventory with MessageTx, MessageBlock inventory types
pub unfiltered: Vec<InventoryVector>,
/// Items that were supposed to be filtered, but we know nothing about these
pub notfound: Vec<InventoryVector>,
@ -302,6 +304,7 @@ impl FilteredInventory {
pub fn with_unfiltered(unfiltered: Vec<InventoryVector>) -> Self {
FilteredInventory {
filtered: Vec::new(),
compacted: Vec::new(),
unfiltered: unfiltered,
notfound: Vec::new(),
}
@ -311,6 +314,7 @@ impl FilteredInventory {
pub fn with_notfound(notfound: Vec<InventoryVector>) -> Self {
FilteredInventory {
filtered: Vec::new(),
compacted: Vec::new(),
unfiltered: Vec::new(),
notfound: notfound,
}
@ -467,9 +471,10 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
/// Filter inventory from `getdata` message for given peer
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory {
let chain = self.chain.read();
let storage = { self.chain.read().storage() };
let mut filter = self.peers.filter_mut(peer_index);
let mut filtered: Vec<(types::MerkleBlock, Vec<(H256, Transaction)>)> = Vec::new();
let mut compacted: Vec<types::CompactBlock> = Vec::new();
let mut unfiltered: Vec<InventoryVector> = Vec::new();
let mut notfound: Vec<InventoryVector> = Vec::new();
@ -480,12 +485,30 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
// 2) build && send `merkleblock` message for this block
// 3) send all matching transactions after this block
InventoryType::MessageFilteredBlock => {
match chain.storage().block(db::BlockRef::Hash(item.hash.clone())) {
match storage.block(db::BlockRef::Hash(item.hash.clone())) {
None => notfound.push(item),
Some(block) => match filter.build_merkle_block(block) {
None => notfound.push(item),
Some(merkleblock) => filtered.push((merkleblock.merkleblock, merkleblock.matching_transactions)),
}
},
}
},
// peer asks for compact block:
InventoryType::MessageCompactBlock => {
match storage.block(db::BlockRef::Hash(item.hash.clone())) {
None => notfound.push(item),
Some(block) => {
let indexed_block: IndexedBlock = block.into();
let prefilled_transactions_indexes = indexed_block.transactions().enumerate()
// we do not filter by fee rate here, because it only reasonable for non-mined transactions
.filter(|&(_, (h, t))| filter.filter_transaction(h, t, None))
.map(|(idx, _)| idx)
.collect();
let compact_block = types::CompactBlock {
header: build_compact_block(indexed_block, prefilled_transactions_indexes),
};
compacted.push(compact_block);
},
}
},
// these will be filtered (found/not found) in sync server
@ -495,6 +518,7 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
FilteredInventory {
filtered: filtered,
compacted: compacted,
unfiltered: unfiltered,
notfound: notfound,
}
@ -926,8 +950,6 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
}
));
// TODO: start management worker only when synchronization is started
// currently impossible because there is no way to call Interval::new with Remote && Handle is not-Send
{
let peers_config = ManagePeersConfig::default();
let unknown_config = ManageUnknownBlocksConfig::default();
@ -1024,7 +1046,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
})
.collect();
Some(Task::SendCompactBlocks(peer_index, block_header_and_ids, ServerTaskIndex::None))
Some(Task::SendCompactBlocks(peer_index, block_header_and_ids))
},
BlockAnnouncementType::SendInventory => {
let inventory: Vec<_> = new_blocks_hashes.iter()
@ -1035,7 +1057,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
})
.collect();
if !inventory.is_empty() {
Some(Task::SendInventory(peer_index, inventory, ServerTaskIndex::None))
Some(Task::SendInventory(peer_index, inventory))
} else {
None
}
@ -1065,7 +1087,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
})
.collect();
if !inventory.is_empty() {
Some(Task::SendInventory(peer_index, inventory, ServerTaskIndex::None))
Some(Task::SendInventory(peer_index, inventory))
} else {
None
}
@ -1216,17 +1238,12 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
}
/// Process new peer transaction
fn process_peer_transaction(&mut self, peer_index: Option<usize>, hash: H256, transaction: Transaction, relay: bool) -> Option<VecDeque<(H256, Transaction)>> {
fn process_peer_transaction(&mut self, _peer_index: Option<usize>, hash: H256, transaction: Transaction, relay: bool) -> Option<VecDeque<(H256, Transaction)>> {
// if we are in synchronization state, we will ignore this message
if self.state.is_synchronizing() {
return None;
}
// mark peer as useful (TODO: remove after self.all_peers() would be all peers, not sync one)
if let Some(peer_index) = peer_index {
self.peers.useful_peer(peer_index);
}
// else => verify transaction + it's orphans and then add to the memory pool
let mut chain = self.chain.write();
@ -1576,7 +1593,7 @@ pub mod tests {
assert!(tasks.iter().any(|t| t == &Task::RequestMemoryPool(2)));
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: block.hash() }];
assert!(tasks.iter().any(|t| t == &Task::SendInventory(1, inventory.clone(), ServerTaskIndex::None)));
assert!(tasks.iter().any(|t| t == &Task::SendInventory(1, inventory.clone())));
}
#[test]
@ -1854,7 +1871,19 @@ pub mod tests {
#[test]
fn sync_after_db_insert_nonfatal_fail() {
// TODO: implement me
use db::Store;
let mut storage = db::TestStorage::with_genesis_block();
let block = test_data::block_h1();
storage.insert_error(block.hash(), db::Error::Consistency(db::ConsistencyError::NoBestBlock));
let best_genesis = storage.best_block().unwrap();
let (_, _, _, chain, sync) = create_sync(Some(Arc::new(storage)), None);
let mut sync = sync.lock();
sync.on_peer_block(1, block.into());
assert_eq!(chain.read().best_block(), best_genesis);
}
#[test]
@ -2121,7 +2150,7 @@ pub mod tests {
{
let tasks = executor.lock().take_tasks();
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: b2.hash() }];
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(2), Task::SendInventory(1, inventory, ServerTaskIndex::None)]);
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(2), Task::SendInventory(1, inventory)]);
}
sync.on_new_blocks_headers(1, vec![b3.block_header.clone()]);
@ -2131,7 +2160,7 @@ pub mod tests {
{
let tasks = executor.lock().take_tasks();
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: b3.hash() }];
assert!(tasks.iter().any(|t| t == &Task::SendInventory(2, inventory.clone(), ServerTaskIndex::None)));
assert!(tasks.iter().any(|t| t == &Task::SendInventory(2, inventory.clone())));
}
}
@ -2139,17 +2168,16 @@ pub mod tests {
fn relay_new_transaction_when_in_saturated_state() {
let (_, _, executor, _, sync) = create_sync(None, None);
let tx1: Transaction = test_data::TransactionBuilder::with_output(10).into();
let tx2: Transaction = test_data::TransactionBuilder::with_output(20).into();
let tx2_hash = tx2.hash();
let tx: Transaction = test_data::TransactionBuilder::with_output(20).into();
let tx_hash = tx.hash();
let mut sync = sync.lock();
sync.on_peer_transaction(1, tx1);
sync.on_peer_transaction(2, tx2);
sync.on_peer_connected(1);
sync.on_peer_transaction(2, tx);
let tasks = { executor.lock().take_tasks() };
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageTx, hash: tx2_hash }];
assert_eq!(tasks, vec![Task::SendInventory(1, inventory, ServerTaskIndex::None)]);
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageTx, hash: tx_hash }];
assert_eq!(tasks, vec![Task::SendInventory(1, inventory)]);
}
#[test]
@ -2190,9 +2218,9 @@ pub mod tests {
let tasks = { executor.lock().take_tasks() };
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageTx, hash: tx1_hash }];
assert_eq!(tasks, vec![
Task::SendInventory(1, inventory.clone(), ServerTaskIndex::None),
Task::SendInventory(3, inventory.clone(), ServerTaskIndex::None),
Task::SendInventory(4, inventory.clone(), ServerTaskIndex::None),
Task::SendInventory(1, inventory.clone()),
Task::SendInventory(3, inventory.clone()),
Task::SendInventory(4, inventory.clone()),
]);
// tx2 is relayed to peers: 2, 3, 4
@ -2201,9 +2229,9 @@ pub mod tests {
let tasks = { executor.lock().take_tasks() };
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageTx, hash: tx2_hash }];
assert_eq!(tasks, vec![
Task::SendInventory(2, inventory.clone(), ServerTaskIndex::None),
Task::SendInventory(3, inventory.clone(), ServerTaskIndex::None),
Task::SendInventory(4, inventory.clone(), ServerTaskIndex::None),
Task::SendInventory(2, inventory.clone()),
Task::SendInventory(3, inventory.clone()),
Task::SendInventory(4, inventory.clone()),
]);
}
@ -2229,7 +2257,7 @@ pub mod tests {
let headers = vec![b0.block_header.clone()];
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1),
Task::SendHeaders(2, headers, ServerTaskIndex::None),
Task::SendInventory(3, inventory, ServerTaskIndex::None),
Task::SendInventory(3, inventory),
]);
}
@ -2273,13 +2301,13 @@ pub mod tests {
inv_type: InventoryType::MessageTx,
hash: tx1_hash.clone(),
}
], ServerTaskIndex::None),
]),
Task::SendInventory(4, vec![
InventoryVector {
inv_type: InventoryType::MessageTx,
hash: tx1_hash.clone(),
}
], ServerTaskIndex::None),
]),
]);
}
@ -2316,9 +2344,9 @@ pub mod tests {
assert_eq!(tasks.len(), 3);
assert_eq!(tasks[0], Task::RequestBlocksHeaders(1));
match tasks[1] {
Task::SendCompactBlocks(2, _, _) => (),
Task::SendCompactBlocks(2, _) => (),
_ => panic!("unexpected task"),
}
assert_eq!(tasks[2], Task::SendInventory(3, inventory, ServerTaskIndex::None));
assert_eq!(tasks[2], Task::SendInventory(3, inventory));
}
}

View File

@ -17,7 +17,6 @@ pub trait TaskExecutor : Send + 'static {
fn execute(&mut self, task: Task);
}
// TODO: get rid of unneeded ServerTaskIndex-es
/// Synchronization task for the peer.
#[derive(Debug, PartialEq)]
pub enum Task {
@ -30,7 +29,7 @@ pub enum Task {
/// Request memory pool contents
RequestMemoryPool(usize),
/// Send block.
SendBlock(usize, Block, ServerTaskIndex),
SendBlock(usize, Block),
/// Send merkleblock
SendMerkleBlock(usize, types::MerkleBlock),
/// Send transaction
@ -38,13 +37,13 @@ pub enum Task {
/// Send block transactions
SendBlockTxn(usize, H256, Vec<Transaction>),
/// Send notfound
SendNotFound(usize, Vec<InventoryVector>, ServerTaskIndex),
SendNotFound(usize, Vec<InventoryVector>),
/// Send inventory
SendInventory(usize, Vec<InventoryVector>, ServerTaskIndex),
SendInventory(usize, Vec<InventoryVector>),
/// Send headers
SendHeaders(usize, Vec<BlockHeader>, ServerTaskIndex),
/// Send compact blocks
SendCompactBlocks(usize, Vec<BlockHeaderAndIDs>, ServerTaskIndex),
SendCompactBlocks(usize, Vec<BlockHeaderAndIDs>),
/// Notify io about ignored request
Ignore(usize, u32),
}
@ -130,13 +129,12 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
connection.send_getdata(&getdata);
}
},
Task::SendBlock(peer_index, block, id) => {
Task::SendBlock(peer_index, block) => {
let block_message = types::Block {
block: block,
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
assert_eq!(id.raw(), None);
trace!(target: "sync", "Sending block {:?} to peer#{}", block_message.block.hash().to_reversed_str(), peer_index);
connection.send_block(&block_message);
}
@ -170,24 +168,22 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
connection.send_block_txn(&transactions_message);
}
},
Task::SendNotFound(peer_index, unknown_inventory, id) => {
Task::SendNotFound(peer_index, unknown_inventory) => {
let notfound = types::NotFound {
inventory: unknown_inventory,
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
assert_eq!(id.raw(), None);
trace!(target: "sync", "Sending notfound to peer#{} with {} items", peer_index, notfound.inventory.len());
connection.send_notfound(&notfound);
}
},
Task::SendInventory(peer_index, inventory, id) => {
Task::SendInventory(peer_index, inventory) => {
let inventory = types::Inv {
inventory: inventory,
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
assert_eq!(id.raw(), None);
trace!(target: "sync", "Sending inventory to peer#{} with {} items", peer_index, inventory.inventory.len());
connection.send_inventory(&inventory);
}
@ -205,9 +201,8 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
}
}
},
Task::SendCompactBlocks(peer_index, compact_blocks, id) => {
Task::SendCompactBlocks(peer_index, compact_blocks) => {
if let Some(connection) = self.peers.get_mut(&peer_index) {
assert_eq!(id.raw(), None);
for compact_block in compact_blocks {
trace!(target: "sync", "Sending compact_block {:?} to peer#{}", compact_block.header.hash(), peer_index);
connection.send_compact_block(&types::CompactBlock {

View File

@ -104,6 +104,7 @@ pub enum ServerTask {
ReturnNotFound(Vec<InventoryVector>),
ReturnBlock(H256),
ReturnMerkleBlock(types::MerkleBlock),
ReturnCompactBlock(types::CompactBlock),
ReturnTransaction(Transaction),
}
@ -168,6 +169,10 @@ impl SynchronizationServer {
new_tasks.extend(transactions.into_iter().map(|(_, t)|
IndexedServerTask::new(ServerTask::ReturnTransaction(t), ServerTaskIndex::None)));
}
// process compactblock items
for compactblock in inventory.compacted {
new_tasks.push(IndexedServerTask::new(ServerTask::ReturnCompactBlock(compactblock), ServerTaskIndex::None));
}
// extend with unknown merkleitems
unknown_items.extend(inventory.notfound);
// process unfiltered items
@ -191,7 +196,11 @@ impl SynchronizationServer {
None => unknown_items.push(item),
}
},
_ => (), // TODO: process other inventory types
// we have no enough information here => it must be filtered by caller
InventoryType::MessageCompactBlock => unreachable!(),
// we have no enough information here => it must be filtered by caller
InventoryType::MessageFilteredBlock => unreachable!(),
_ => (),
}
}
}
@ -224,7 +233,7 @@ impl SynchronizationServer {
inv_type: InventoryType::MessageBlock,
hash: hash,
}).collect();
executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id));
executor.lock().execute(Task::SendInventory(peer_index, inventory));
}
}
else {
@ -309,7 +318,7 @@ impl SynchronizationServer {
.collect();
if !inventory.is_empty() {
trace!(target: "sync", "Going to respond with {} memory-pool transactions ids to peer#{}", inventory.len(), peer_index);
executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id));
executor.lock().execute(Task::SendInventory(peer_index, inventory));
} else {
assert_eq!(indexed_task.id, ServerTaskIndex::None);
}
@ -318,7 +327,7 @@ impl SynchronizationServer {
},
// `notfound`
ServerTask::ReturnNotFound(inventory) => {
executor.lock().execute(Task::SendNotFound(peer_index, inventory, indexed_task.id));
executor.lock().execute(Task::SendNotFound(peer_index, inventory));
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
@ -326,7 +335,7 @@ impl SynchronizationServer {
ServerTask::ReturnBlock(block_hash) => {
let block = chain.read().storage().block(db::BlockRef::Hash(block_hash))
.expect("we have checked that block exists in ServeGetData; db is append-only; qed");
executor.lock().execute(Task::SendBlock(peer_index, block, indexed_task.id));
executor.lock().execute(Task::SendBlock(peer_index, block));
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
@ -334,6 +343,10 @@ impl SynchronizationServer {
ServerTask::ReturnMerkleBlock(merkleblock) => {
executor.lock().execute(Task::SendMerkleBlock(peer_index, merkleblock));
},
// `cmpctblock`
ServerTask::ReturnCompactBlock(compactblock) => {
executor.lock().execute(Task::SendCompactBlocks(peer_index, vec![compactblock.header]))
}
// `tx`
ServerTask::ReturnTransaction(transaction) => {
executor.lock().execute(Task::SendTransaction(peer_index, transaction));
@ -609,7 +622,7 @@ pub mod tests {
server.serve_getdata(0, FilteredInventory::with_unfiltered(inventory.clone())).map(|t| server.add_task(0, t));
// => respond with notfound
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::None)]);
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory)]);
}
#[test]
@ -625,7 +638,7 @@ pub mod tests {
server.serve_getdata(0, FilteredInventory::with_unfiltered(inventory)).map(|t| server.add_task(0, t));
// => respond with block
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis(), ServerTaskIndex::None)]);
assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis())]);
}
#[test]
@ -659,7 +672,7 @@ pub mod tests {
hash: test_data::block_h1().hash(),
}];
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::None)]);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]);
}
#[test]
@ -722,7 +735,7 @@ pub mod tests {
hash: transaction_hash,
}];
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::None)]);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]);
}
#[test]
@ -764,7 +777,7 @@ pub mod tests {
server.serve_getdata(0, FilteredInventory::with_unfiltered(inventory.clone())).map(|t| server.add_task(0, t));
// => respond with notfound
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::None)]);
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory)]);
}
#[test]