serve compact blocks requests in getdata
This commit is contained in:
parent
bd07583fb9
commit
1b7850e31e
|
@ -443,4 +443,36 @@ mod tests {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn local_node_serves_compactblock() {
|
||||
let (_, _, _, server, local_node) = create_local_node();
|
||||
|
||||
let genesis = test_data::genesis();
|
||||
let b1 = test_data::block_builder().header().parent(genesis.hash()).build()
|
||||
.transaction().output().value(10).build().build()
|
||||
.build(); // genesis -> b1
|
||||
let b1_hash = b1.hash();
|
||||
|
||||
// This peer will provide blocks
|
||||
let peer_index1 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
|
||||
local_node.on_peer_block(peer_index1, types::Block { block: b1.clone() });
|
||||
|
||||
// This peer will receive compact block
|
||||
let peer_index2 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
|
||||
local_node.on_peer_getdata(peer_index2, types::GetData {inventory: vec![
|
||||
InventoryVector { inv_type: InventoryType::MessageCompactBlock, hash: b1_hash.clone() },
|
||||
]});
|
||||
let tasks = server.take_tasks();
|
||||
assert_eq!(tasks.len(), 1);
|
||||
match tasks[0] {
|
||||
(_, ServerTask::ServeGetData(ref gd)) => {
|
||||
assert_eq!(gd.filtered.len(), 0);
|
||||
assert_eq!(gd.unfiltered.len(), 0);
|
||||
assert_eq!(gd.notfound.len(), 0);
|
||||
assert_eq!(gd.compacted.len(), 1);
|
||||
},
|
||||
_ => panic!("unexpected"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -238,7 +238,9 @@ pub struct Config {
|
|||
pub struct FilteredInventory {
|
||||
/// Merkleblock messages + transactions to send after
|
||||
pub filtered: Vec<(types::MerkleBlock, Vec<(H256, Transaction)>)>,
|
||||
/// Rest of inventory with MessageTx, MessageBlock, MessageCompactBlock inventory types
|
||||
/// Compactblock messages
|
||||
pub compacted: Vec<types::CompactBlock>,
|
||||
/// Rest of inventory with MessageTx, MessageBlock inventory types
|
||||
pub unfiltered: Vec<InventoryVector>,
|
||||
/// Items that were supposed to be filtered, but we know nothing about these
|
||||
pub notfound: Vec<InventoryVector>,
|
||||
|
@ -300,6 +302,7 @@ impl FilteredInventory {
|
|||
pub fn with_unfiltered(unfiltered: Vec<InventoryVector>) -> Self {
|
||||
FilteredInventory {
|
||||
filtered: Vec::new(),
|
||||
compacted: Vec::new(),
|
||||
unfiltered: unfiltered,
|
||||
notfound: Vec::new(),
|
||||
}
|
||||
|
@ -309,6 +312,7 @@ impl FilteredInventory {
|
|||
pub fn with_notfound(notfound: Vec<InventoryVector>) -> Self {
|
||||
FilteredInventory {
|
||||
filtered: Vec::new(),
|
||||
compacted: Vec::new(),
|
||||
unfiltered: Vec::new(),
|
||||
notfound: notfound,
|
||||
}
|
||||
|
@ -460,9 +464,10 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
|
|||
|
||||
/// Filter inventory from `getdata` message for given peer
|
||||
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory {
|
||||
let chain = self.chain.read();
|
||||
let storage = { self.chain.read().storage() };
|
||||
let mut filter = self.peers.filter_mut(peer_index);
|
||||
let mut filtered: Vec<(types::MerkleBlock, Vec<(H256, Transaction)>)> = Vec::new();
|
||||
let mut compacted: Vec<types::CompactBlock> = Vec::new();
|
||||
let mut unfiltered: Vec<InventoryVector> = Vec::new();
|
||||
let mut notfound: Vec<InventoryVector> = Vec::new();
|
||||
|
||||
|
@ -473,12 +478,30 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
|
|||
// 2) build && send `merkleblock` message for this block
|
||||
// 3) send all matching transactions after this block
|
||||
InventoryType::MessageFilteredBlock => {
|
||||
match chain.storage().block(db::BlockRef::Hash(item.hash.clone())) {
|
||||
match storage.block(db::BlockRef::Hash(item.hash.clone())) {
|
||||
None => notfound.push(item),
|
||||
Some(block) => match filter.build_merkle_block(block) {
|
||||
None => notfound.push(item),
|
||||
Some(merkleblock) => filtered.push((merkleblock.merkleblock, merkleblock.matching_transactions)),
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
// peer asks for compact block:
|
||||
InventoryType::MessageCompactBlock => {
|
||||
match storage.block(db::BlockRef::Hash(item.hash.clone())) {
|
||||
None => notfound.push(item),
|
||||
Some(block) => {
|
||||
let indexed_block: IndexedBlock = block.into();
|
||||
let prefilled_transactions_indexes = indexed_block.transactions().enumerate()
|
||||
// we do not filter by fee rate here, because it only reasonable for non-mined transactions
|
||||
.filter(|&(_, (h, t))| filter.filter_transaction(h, t, None))
|
||||
.map(|(idx, _)| idx)
|
||||
.collect();
|
||||
let compact_block = types::CompactBlock {
|
||||
header: build_compact_block(indexed_block, prefilled_transactions_indexes),
|
||||
};
|
||||
compacted.push(compact_block);
|
||||
},
|
||||
}
|
||||
},
|
||||
// these will be filtered (found/not found) in sync server
|
||||
|
@ -488,6 +511,7 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
|
|||
|
||||
FilteredInventory {
|
||||
filtered: filtered,
|
||||
compacted: compacted,
|
||||
unfiltered: unfiltered,
|
||||
notfound: notfound,
|
||||
}
|
||||
|
|
|
@ -104,6 +104,7 @@ pub enum ServerTask {
|
|||
ReturnNotFound(Vec<InventoryVector>),
|
||||
ReturnBlock(H256),
|
||||
ReturnMerkleBlock(types::MerkleBlock),
|
||||
ReturnCompactBlock(types::CompactBlock),
|
||||
ReturnTransaction(Transaction),
|
||||
}
|
||||
|
||||
|
@ -168,6 +169,10 @@ impl SynchronizationServer {
|
|||
new_tasks.extend(transactions.into_iter().map(|(_, t)|
|
||||
IndexedServerTask::new(ServerTask::ReturnTransaction(t), ServerTaskIndex::None)));
|
||||
}
|
||||
// process compactblock items
|
||||
for compactblock in inventory.compacted {
|
||||
new_tasks.push(IndexedServerTask::new(ServerTask::ReturnCompactBlock(compactblock), ServerTaskIndex::None));
|
||||
}
|
||||
// extend with unknown merkleitems
|
||||
unknown_items.extend(inventory.notfound);
|
||||
// process unfiltered items
|
||||
|
@ -191,7 +196,11 @@ impl SynchronizationServer {
|
|||
None => unknown_items.push(item),
|
||||
}
|
||||
},
|
||||
_ => (), // TODO: process other inventory types
|
||||
// we have no enough information here => it must be filtered by caller
|
||||
InventoryType::MessageCompactBlock => unreachable!(),
|
||||
// we have no enough information here => it must be filtered by caller
|
||||
InventoryType::MessageFilteredBlock => unreachable!(),
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -334,6 +343,10 @@ impl SynchronizationServer {
|
|||
ServerTask::ReturnMerkleBlock(merkleblock) => {
|
||||
executor.lock().execute(Task::SendMerkleBlock(peer_index, merkleblock));
|
||||
},
|
||||
// `cmpctblock`
|
||||
ServerTask::ReturnCompactBlock(compactblock) => {
|
||||
executor.lock().execute(Task::SendCompactBlocks(peer_index, vec![compactblock.header]))
|
||||
}
|
||||
// `tx`
|
||||
ServerTask::ReturnTransaction(transaction) => {
|
||||
executor.lock().execute(Task::SendTransaction(peer_index, transaction));
|
||||
|
|
Loading…
Reference in New Issue