Merge pull request #227 from ethcore/serve_compact_blocks_requests
Serve compact blocks requests in getdata
This commit is contained in:
commit
2c4759f979
|
@ -443,4 +443,36 @@ mod tests {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn local_node_serves_compactblock() {
|
||||||
|
let (_, _, _, server, local_node) = create_local_node();
|
||||||
|
|
||||||
|
let genesis = test_data::genesis();
|
||||||
|
let b1 = test_data::block_builder().header().parent(genesis.hash()).build()
|
||||||
|
.transaction().output().value(10).build().build()
|
||||||
|
.build(); // genesis -> b1
|
||||||
|
let b1_hash = b1.hash();
|
||||||
|
|
||||||
|
// This peer will provide blocks
|
||||||
|
let peer_index1 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
|
||||||
|
local_node.on_peer_block(peer_index1, types::Block { block: b1.clone() });
|
||||||
|
|
||||||
|
// This peer will receive compact block
|
||||||
|
let peer_index2 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
|
||||||
|
local_node.on_peer_getdata(peer_index2, types::GetData {inventory: vec![
|
||||||
|
InventoryVector { inv_type: InventoryType::MessageCompactBlock, hash: b1_hash.clone() },
|
||||||
|
]});
|
||||||
|
let tasks = server.take_tasks();
|
||||||
|
assert_eq!(tasks.len(), 1);
|
||||||
|
match tasks[0] {
|
||||||
|
(_, ServerTask::ServeGetData(ref gd)) => {
|
||||||
|
assert_eq!(gd.filtered.len(), 0);
|
||||||
|
assert_eq!(gd.unfiltered.len(), 0);
|
||||||
|
assert_eq!(gd.notfound.len(), 0);
|
||||||
|
assert_eq!(gd.compacted.len(), 1);
|
||||||
|
},
|
||||||
|
_ => panic!("unexpected"),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -238,7 +238,9 @@ pub struct Config {
|
||||||
pub struct FilteredInventory {
|
pub struct FilteredInventory {
|
||||||
/// Merkleblock messages + transactions to send after
|
/// Merkleblock messages + transactions to send after
|
||||||
pub filtered: Vec<(types::MerkleBlock, Vec<(H256, Transaction)>)>,
|
pub filtered: Vec<(types::MerkleBlock, Vec<(H256, Transaction)>)>,
|
||||||
/// Rest of inventory with MessageTx, MessageBlock, MessageCompactBlock inventory types
|
/// Compactblock messages
|
||||||
|
pub compacted: Vec<types::CompactBlock>,
|
||||||
|
/// Rest of inventory with MessageTx, MessageBlock inventory types
|
||||||
pub unfiltered: Vec<InventoryVector>,
|
pub unfiltered: Vec<InventoryVector>,
|
||||||
/// Items that were supposed to be filtered, but we know nothing about these
|
/// Items that were supposed to be filtered, but we know nothing about these
|
||||||
pub notfound: Vec<InventoryVector>,
|
pub notfound: Vec<InventoryVector>,
|
||||||
|
@ -300,6 +302,7 @@ impl FilteredInventory {
|
||||||
pub fn with_unfiltered(unfiltered: Vec<InventoryVector>) -> Self {
|
pub fn with_unfiltered(unfiltered: Vec<InventoryVector>) -> Self {
|
||||||
FilteredInventory {
|
FilteredInventory {
|
||||||
filtered: Vec::new(),
|
filtered: Vec::new(),
|
||||||
|
compacted: Vec::new(),
|
||||||
unfiltered: unfiltered,
|
unfiltered: unfiltered,
|
||||||
notfound: Vec::new(),
|
notfound: Vec::new(),
|
||||||
}
|
}
|
||||||
|
@ -309,6 +312,7 @@ impl FilteredInventory {
|
||||||
pub fn with_notfound(notfound: Vec<InventoryVector>) -> Self {
|
pub fn with_notfound(notfound: Vec<InventoryVector>) -> Self {
|
||||||
FilteredInventory {
|
FilteredInventory {
|
||||||
filtered: Vec::new(),
|
filtered: Vec::new(),
|
||||||
|
compacted: Vec::new(),
|
||||||
unfiltered: Vec::new(),
|
unfiltered: Vec::new(),
|
||||||
notfound: notfound,
|
notfound: notfound,
|
||||||
}
|
}
|
||||||
|
@ -460,9 +464,10 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
|
|
||||||
/// Filter inventory from `getdata` message for given peer
|
/// Filter inventory from `getdata` message for given peer
|
||||||
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory {
|
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory {
|
||||||
let chain = self.chain.read();
|
let storage = { self.chain.read().storage() };
|
||||||
let mut filter = self.peers.filter_mut(peer_index);
|
let mut filter = self.peers.filter_mut(peer_index);
|
||||||
let mut filtered: Vec<(types::MerkleBlock, Vec<(H256, Transaction)>)> = Vec::new();
|
let mut filtered: Vec<(types::MerkleBlock, Vec<(H256, Transaction)>)> = Vec::new();
|
||||||
|
let mut compacted: Vec<types::CompactBlock> = Vec::new();
|
||||||
let mut unfiltered: Vec<InventoryVector> = Vec::new();
|
let mut unfiltered: Vec<InventoryVector> = Vec::new();
|
||||||
let mut notfound: Vec<InventoryVector> = Vec::new();
|
let mut notfound: Vec<InventoryVector> = Vec::new();
|
||||||
|
|
||||||
|
@ -473,12 +478,30 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
// 2) build && send `merkleblock` message for this block
|
// 2) build && send `merkleblock` message for this block
|
||||||
// 3) send all matching transactions after this block
|
// 3) send all matching transactions after this block
|
||||||
InventoryType::MessageFilteredBlock => {
|
InventoryType::MessageFilteredBlock => {
|
||||||
match chain.storage().block(db::BlockRef::Hash(item.hash.clone())) {
|
match storage.block(db::BlockRef::Hash(item.hash.clone())) {
|
||||||
None => notfound.push(item),
|
None => notfound.push(item),
|
||||||
Some(block) => match filter.build_merkle_block(block) {
|
Some(block) => match filter.build_merkle_block(block) {
|
||||||
None => notfound.push(item),
|
None => notfound.push(item),
|
||||||
Some(merkleblock) => filtered.push((merkleblock.merkleblock, merkleblock.matching_transactions)),
|
Some(merkleblock) => filtered.push((merkleblock.merkleblock, merkleblock.matching_transactions)),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
// peer asks for compact block:
|
||||||
|
InventoryType::MessageCompactBlock => {
|
||||||
|
match storage.block(db::BlockRef::Hash(item.hash.clone())) {
|
||||||
|
None => notfound.push(item),
|
||||||
|
Some(block) => {
|
||||||
|
let indexed_block: IndexedBlock = block.into();
|
||||||
|
let prefilled_transactions_indexes = indexed_block.transactions().enumerate()
|
||||||
|
// we do not filter by fee rate here, because it only reasonable for non-mined transactions
|
||||||
|
.filter(|&(_, (h, t))| filter.filter_transaction(h, t, None))
|
||||||
|
.map(|(idx, _)| idx)
|
||||||
|
.collect();
|
||||||
|
let compact_block = types::CompactBlock {
|
||||||
|
header: build_compact_block(indexed_block, prefilled_transactions_indexes),
|
||||||
|
};
|
||||||
|
compacted.push(compact_block);
|
||||||
|
},
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
// these will be filtered (found/not found) in sync server
|
// these will be filtered (found/not found) in sync server
|
||||||
|
@ -488,6 +511,7 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
|
|
||||||
FilteredInventory {
|
FilteredInventory {
|
||||||
filtered: filtered,
|
filtered: filtered,
|
||||||
|
compacted: compacted,
|
||||||
unfiltered: unfiltered,
|
unfiltered: unfiltered,
|
||||||
notfound: notfound,
|
notfound: notfound,
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,6 +104,7 @@ pub enum ServerTask {
|
||||||
ReturnNotFound(Vec<InventoryVector>),
|
ReturnNotFound(Vec<InventoryVector>),
|
||||||
ReturnBlock(H256),
|
ReturnBlock(H256),
|
||||||
ReturnMerkleBlock(types::MerkleBlock),
|
ReturnMerkleBlock(types::MerkleBlock),
|
||||||
|
ReturnCompactBlock(types::CompactBlock),
|
||||||
ReturnTransaction(Transaction),
|
ReturnTransaction(Transaction),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -168,6 +169,10 @@ impl SynchronizationServer {
|
||||||
new_tasks.extend(transactions.into_iter().map(|(_, t)|
|
new_tasks.extend(transactions.into_iter().map(|(_, t)|
|
||||||
IndexedServerTask::new(ServerTask::ReturnTransaction(t), ServerTaskIndex::None)));
|
IndexedServerTask::new(ServerTask::ReturnTransaction(t), ServerTaskIndex::None)));
|
||||||
}
|
}
|
||||||
|
// process compactblock items
|
||||||
|
for compactblock in inventory.compacted {
|
||||||
|
new_tasks.push(IndexedServerTask::new(ServerTask::ReturnCompactBlock(compactblock), ServerTaskIndex::None));
|
||||||
|
}
|
||||||
// extend with unknown merkleitems
|
// extend with unknown merkleitems
|
||||||
unknown_items.extend(inventory.notfound);
|
unknown_items.extend(inventory.notfound);
|
||||||
// process unfiltered items
|
// process unfiltered items
|
||||||
|
@ -191,7 +196,11 @@ impl SynchronizationServer {
|
||||||
None => unknown_items.push(item),
|
None => unknown_items.push(item),
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
_ => (), // TODO: process other inventory types
|
// we have no enough information here => it must be filtered by caller
|
||||||
|
InventoryType::MessageCompactBlock => unreachable!(),
|
||||||
|
// we have no enough information here => it must be filtered by caller
|
||||||
|
InventoryType::MessageFilteredBlock => unreachable!(),
|
||||||
|
_ => (),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -334,6 +343,10 @@ impl SynchronizationServer {
|
||||||
ServerTask::ReturnMerkleBlock(merkleblock) => {
|
ServerTask::ReturnMerkleBlock(merkleblock) => {
|
||||||
executor.lock().execute(Task::SendMerkleBlock(peer_index, merkleblock));
|
executor.lock().execute(Task::SendMerkleBlock(peer_index, merkleblock));
|
||||||
},
|
},
|
||||||
|
// `cmpctblock`
|
||||||
|
ServerTask::ReturnCompactBlock(compactblock) => {
|
||||||
|
executor.lock().execute(Task::SendCompactBlocks(peer_index, vec![compactblock.header]))
|
||||||
|
}
|
||||||
// `tx`
|
// `tx`
|
||||||
ServerTask::ReturnTransaction(transaction) => {
|
ServerTask::ReturnTransaction(transaction) => {
|
||||||
executor.lock().execute(Task::SendTransaction(peer_index, transaction));
|
executor.lock().execute(Task::SendTransaction(peer_index, transaction));
|
||||||
|
|
Loading…
Reference in New Issue