Merge pull request #240 from ethcore/sync_fixing_todos
Fixing sync TODOs
This commit is contained in:
commit
e1210efd22
|
@ -24,7 +24,7 @@ pub struct ConnectionFilter {
|
|||
/// Filter update type.
|
||||
filter_flags: types::FilterFlags,
|
||||
/// Last blocks from peer.
|
||||
last_blocks: LinkedHashMap<H256, ()>,
|
||||
last_blocks: LinkedHashMap<H256, bool>,
|
||||
/// Last transactions from peer.
|
||||
last_transactions: LinkedHashMap<H256, ()>,
|
||||
/// Minimal fee in satoshis per 1000 bytes
|
||||
|
@ -91,14 +91,14 @@ impl ConnectionFilter {
|
|||
}
|
||||
|
||||
/// We have a knowledge that block with given hash is known to this connection
|
||||
pub fn known_block(&mut self, block_hash: &H256) {
|
||||
pub fn known_block(&mut self, block_hash: &H256, is_sent_compact: bool) {
|
||||
// remember that peer knows about this block
|
||||
if !self.last_blocks.contains_key(block_hash) {
|
||||
if self.last_blocks.len() == MAX_LAST_BLOCKS_TO_STORE {
|
||||
self.last_blocks.pop_front();
|
||||
}
|
||||
|
||||
self.last_blocks.insert(block_hash.clone(), ());
|
||||
self.last_blocks.insert(block_hash.clone(), is_sent_compact);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -114,6 +114,11 @@ impl ConnectionFilter {
|
|||
}
|
||||
}
|
||||
|
||||
/// Is compact block with this hash has been sent recently
|
||||
pub fn is_known_compact_block(&self, block_hash: &H256) -> bool {
|
||||
self.last_blocks.get(block_hash).cloned().unwrap_or(false)
|
||||
}
|
||||
|
||||
/// Check if block should be sent to this connection
|
||||
pub fn filter_block(&self, block_hash: &H256) -> bool {
|
||||
// check if block is known
|
||||
|
@ -662,15 +667,15 @@ pub mod tests {
|
|||
let mut filter = ConnectionFilter::default();
|
||||
assert!(filter.filter_block(&blocks[0].hash()));
|
||||
|
||||
filter.known_block(&blocks[0].hash());
|
||||
filter.known_block(&blocks[0].hash(), false);
|
||||
assert!(!filter.filter_block(&blocks[0].hash()));
|
||||
|
||||
for block in blocks.iter().skip(1).take(MAX_LAST_BLOCKS_TO_STORE - 1) {
|
||||
filter.known_block(&block.hash());
|
||||
filter.known_block(&block.hash(), false);
|
||||
assert!(!filter.filter_block(&blocks[0].hash()));
|
||||
}
|
||||
|
||||
filter.known_block(&blocks[MAX_LAST_BLOCKS_TO_STORE].hash());
|
||||
filter.known_block(&blocks[MAX_LAST_BLOCKS_TO_STORE].hash(), false);
|
||||
assert!(filter.filter_block(&blocks[0].hash()));
|
||||
}
|
||||
|
||||
|
@ -694,4 +699,13 @@ pub mod tests {
|
|||
filter.known_transaction(&transactions[MAX_LAST_TRANSACTIONS_TO_STORE].hash());
|
||||
assert!(filter.filter_transaction(&transactions[0].hash(), &transactions[0], None));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn known_compact_block() {
|
||||
let mut filter = ConnectionFilter::default();
|
||||
filter.known_block(&test_data::block_h1().hash(), true);
|
||||
filter.known_block(&test_data::block_h2().hash(), false);
|
||||
assert!(filter.is_known_compact_block(&test_data::block_h1().hash()));
|
||||
assert!(!filter.is_known_compact_block(&test_data::block_h2().hash()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -216,7 +216,15 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
|
|||
pub fn on_peer_get_block_txn(&self, peer_index: usize, message: types::GetBlockTxn) {
|
||||
trace!(target: "sync", "Got `getblocktxn` message from peer#{}", peer_index);
|
||||
|
||||
self.server.serve_get_block_txn(peer_index, message.request.blockhash, message.request.indexes).map(|t| self.server.add_task(peer_index, t));
|
||||
// Upon receipt of a properly-formatted getblocktxn message, nodes which *recently provided the sender
|
||||
// of such a message a cmpctblock for the block hash identified in this message* MUST respond ...
|
||||
// => we should check if we have send cmpctblock before
|
||||
if {
|
||||
let mut client = self.client.lock();
|
||||
client.is_compact_block_sent_recently(peer_index, &message.request.blockhash)
|
||||
} {
|
||||
self.server.serve_get_block_txn(peer_index, message.request.blockhash, message.request.indexes).map(|t| self.server.add_task(peer_index, t));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn on_peer_block_txn(&self, peer_index: usize, _message: types::BlockTxn) {
|
||||
|
@ -256,7 +264,7 @@ mod tests {
|
|||
use synchronization_chain::Chain;
|
||||
use p2p::{event_loop, OutboundSyncConnection, OutboundSyncConnectionRef};
|
||||
use message::types;
|
||||
use message::common::{InventoryVector, InventoryType};
|
||||
use message::common::{InventoryVector, InventoryType, BlockTransactionsRequest};
|
||||
use db;
|
||||
use super::LocalNode;
|
||||
use test_data;
|
||||
|
@ -475,4 +483,67 @@ mod tests {
|
|||
_ => panic!("unexpected"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn local_node_serves_get_block_txn_when_recently_sent_compact_block() {
|
||||
let (_, _, _, server, local_node) = create_local_node();
|
||||
|
||||
let genesis = test_data::genesis();
|
||||
let b1 = test_data::block_builder().header().parent(genesis.hash()).build()
|
||||
.transaction().output().value(10).build().build()
|
||||
.build(); // genesis -> b1
|
||||
let b1_hash = b1.hash();
|
||||
|
||||
// Append block
|
||||
let peer_index1 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
|
||||
local_node.on_peer_block(peer_index1, types::Block { block: b1.clone() });
|
||||
|
||||
// Request compact block
|
||||
let peer_index2 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
|
||||
local_node.on_peer_getdata(peer_index2, types::GetData {inventory: vec![
|
||||
InventoryVector { inv_type: InventoryType::MessageCompactBlock, hash: b1_hash.clone() },
|
||||
]});
|
||||
|
||||
// forget tasks
|
||||
server.take_tasks();
|
||||
|
||||
// Request compact transaction from this block
|
||||
local_node.on_peer_get_block_txn(peer_index2, types::GetBlockTxn {
|
||||
request: BlockTransactionsRequest {
|
||||
blockhash: b1_hash.clone(),
|
||||
indexes: vec![0],
|
||||
}
|
||||
});
|
||||
|
||||
let tasks = server.take_tasks();
|
||||
assert_eq!(tasks, vec![(2, ServerTask::ServeGetBlockTxn(b1_hash, vec![0]))]);
|
||||
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn local_node_not_serves_get_block_txn_when_compact_block_was_not_sent() {
|
||||
let (_, _, _, server, local_node) = create_local_node();
|
||||
|
||||
let genesis = test_data::genesis();
|
||||
let b1 = test_data::block_builder().header().parent(genesis.hash()).build()
|
||||
.transaction().output().value(10).build().build()
|
||||
.build(); // genesis -> b1
|
||||
let b1_hash = b1.hash();
|
||||
|
||||
// Append block
|
||||
let peer_index1 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
|
||||
local_node.on_peer_block(peer_index1, types::Block { block: b1.clone() });
|
||||
|
||||
// Request compact transaction from this block
|
||||
let peer_index2 = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
|
||||
local_node.on_peer_get_block_txn(peer_index2, types::GetBlockTxn {
|
||||
request: BlockTransactionsRequest {
|
||||
blockhash: b1_hash,
|
||||
indexes: vec![0],
|
||||
}
|
||||
});
|
||||
|
||||
let tasks = server.take_tasks();
|
||||
assert_eq!(tasks, vec![]);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -185,6 +185,7 @@ pub struct Information {
|
|||
pub trait Client : Send + 'static {
|
||||
fn best_block(&self) -> db::BestBlock;
|
||||
fn state(&self) -> State;
|
||||
fn is_compact_block_sent_recently(&mut self, peer_index: usize, hash: &H256) -> bool;
|
||||
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory;
|
||||
fn on_peer_connected(&mut self, peer_index: usize);
|
||||
fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
|
||||
|
@ -206,6 +207,7 @@ pub trait Client : Send + 'static {
|
|||
pub trait ClientCore : VerificationSink {
|
||||
fn best_block(&self) -> db::BestBlock;
|
||||
fn state(&self) -> State;
|
||||
fn is_compact_block_sent_recently(&mut self, peer_index: usize, hash: &H256) -> bool;
|
||||
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory;
|
||||
fn on_peer_connected(&mut self, peer_index: usize);
|
||||
fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
|
||||
|
@ -353,6 +355,10 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
|
|||
self.core.lock().state()
|
||||
}
|
||||
|
||||
fn is_compact_block_sent_recently(&mut self, peer_index: usize, hash: &H256) -> bool {
|
||||
self.core.lock().is_compact_block_sent_recently(peer_index, hash)
|
||||
}
|
||||
|
||||
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory {
|
||||
self.core.lock().filter_getdata_inventory(peer_index, inventory)
|
||||
}
|
||||
|
@ -469,6 +475,11 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
|
|||
self.state
|
||||
}
|
||||
|
||||
/// Returns true if compactblock with this hash has been sent to this peer recently
|
||||
fn is_compact_block_sent_recently(&mut self, peer_index: usize, hash: &H256) -> bool {
|
||||
self.peers.filter(peer_index).is_known_compact_block(hash)
|
||||
}
|
||||
|
||||
/// Filter inventory from `getdata` message for given peer
|
||||
fn filter_getdata_inventory(&mut self, peer_index: usize, inventory: Vec<InventoryVector>) -> FilteredInventory {
|
||||
let storage = { self.chain.read().storage() };
|
||||
|
@ -489,7 +500,10 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
|
|||
None => notfound.push(item),
|
||||
Some(block) => match filter.build_merkle_block(block) {
|
||||
None => notfound.push(item),
|
||||
Some(merkleblock) => filtered.push((merkleblock.merkleblock, merkleblock.matching_transactions)),
|
||||
Some(merkleblock) => {
|
||||
filtered.push((merkleblock.merkleblock, merkleblock.matching_transactions));
|
||||
filter.known_block(&item.hash, false);
|
||||
},
|
||||
},
|
||||
}
|
||||
},
|
||||
|
@ -508,6 +522,7 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
|
|||
header: build_compact_block(indexed_block, prefilled_transactions_indexes),
|
||||
};
|
||||
compacted.push(compact_block);
|
||||
filter.known_block(&item.hash, true);
|
||||
},
|
||||
}
|
||||
},
|
||||
|
|
|
@ -77,8 +77,6 @@ impl PeersConnections for LocalSynchronizationTaskExecutor {
|
|||
|
||||
impl TaskExecutor for LocalSynchronizationTaskExecutor {
|
||||
fn execute(&mut self, task: Task) {
|
||||
// TODO: what is types::GetBlocks::version here? (@ PR#37)
|
||||
|
||||
match task {
|
||||
Task::RequestBlocks(peer_index, blocks_hashes) => {
|
||||
let getdata = types::GetData {
|
||||
|
@ -97,7 +95,7 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
|
|||
Task::RequestBlocksHeaders(peer_index) => {
|
||||
let block_locator_hashes = self.chain.read().block_locator_hashes();
|
||||
let getheaders = types::GetHeaders {
|
||||
version: 0,
|
||||
version: 0, // this field is ignored by clients
|
||||
block_locator_hashes: block_locator_hashes,
|
||||
hash_stop: H256::default(),
|
||||
};
|
||||
|
|
|
@ -244,7 +244,7 @@ impl Peers {
|
|||
}
|
||||
|
||||
// remember that peer knows about this block
|
||||
self.filters.entry(peer_index).or_insert_with(ConnectionFilter::default).known_block(block_hash);
|
||||
self.filters.entry(peer_index).or_insert_with(ConnectionFilter::default).known_block(block_hash, false);
|
||||
}
|
||||
|
||||
/// Transaction is received from peer.
|
||||
|
|
|
@ -123,14 +123,27 @@ impl SynchronizationServer {
|
|||
server
|
||||
}
|
||||
|
||||
fn locate_known_block_hash(chain: &Chain, block_locator_hashes: &Vec<H256>) -> Option<db::BestBlock> {
|
||||
fn locate_known_block_hash(chain: &Chain, block_locator_hashes: &Vec<H256>, stop_hash: &H256) -> Option<db::BestBlock> {
|
||||
block_locator_hashes.into_iter()
|
||||
.filter_map(|hash| SynchronizationServer::locate_best_known_block_hash(&chain, hash))
|
||||
.nth(0)
|
||||
.or_else(|| if stop_hash != &H256::default() {
|
||||
if let Some(stop_hash_number) = chain.storage().block_number(stop_hash) {
|
||||
Some(db::BestBlock {
|
||||
number: stop_hash_number,
|
||||
hash: stop_hash.clone(),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
fn locate_known_block_header(chain: &Chain, block_locator_hashes: &Vec<H256>) -> Option<db::BestBlock> {
|
||||
SynchronizationServer::locate_known_block_hash(chain, block_locator_hashes)
|
||||
fn locate_known_block_header(chain: &Chain, block_locator_hashes: &Vec<H256>, stop_hash: &H256) -> Option<db::BestBlock> {
|
||||
SynchronizationServer::locate_known_block_hash(chain, block_locator_hashes, stop_hash)
|
||||
}
|
||||
|
||||
fn server_worker<T: TaskExecutor>(queue_ready: Arc<Condvar>, queue: Arc<Mutex<ServerQueue>>, chain: ChainRef, executor: Arc<Mutex<T>>) {
|
||||
|
@ -223,22 +236,23 @@ impl SynchronizationServer {
|
|||
assert_eq!(indexed_task.id, ServerTaskIndex::None);
|
||||
|
||||
let chain = chain.read();
|
||||
if let Some(best_common_block) = SynchronizationServer::locate_known_block_hash(&chain, &block_locator_hashes) {
|
||||
trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
|
||||
let blocks_hashes = match SynchronizationServer::locate_known_block_hash(&chain, &block_locator_hashes, &hash_stop) {
|
||||
Some(best_common_block) => {
|
||||
trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
|
||||
SynchronizationServer::blocks_hashes_after(&chain, &best_common_block, &hash_stop, 500)
|
||||
},
|
||||
None => {
|
||||
trace!(target: "sync", "No common blocks with peer#{}", peer_index);
|
||||
Vec::new()
|
||||
},
|
||||
};
|
||||
|
||||
let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_common_block, &hash_stop, 500);
|
||||
if !blocks_hashes.is_empty() {
|
||||
trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", blocks_hashes.len(), peer_index);
|
||||
let inventory = blocks_hashes.into_iter().map(|hash| InventoryVector {
|
||||
inv_type: InventoryType::MessageBlock,
|
||||
hash: hash,
|
||||
}).collect();
|
||||
executor.lock().execute(Task::SendInventory(peer_index, inventory));
|
||||
}
|
||||
}
|
||||
else {
|
||||
trace!(target: "sync", "No common blocks with peer#{}", peer_index);
|
||||
}
|
||||
trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", blocks_hashes.len(), peer_index);
|
||||
let inventory = blocks_hashes.into_iter().map(|hash| InventoryVector {
|
||||
inv_type: InventoryType::MessageBlock,
|
||||
hash: hash,
|
||||
}).collect();
|
||||
executor.lock().execute(Task::SendInventory(peer_index, inventory));
|
||||
// inform that we have processed task for peer
|
||||
queue.lock().task_processed(peer_index);
|
||||
},
|
||||
|
@ -246,19 +260,15 @@ impl SynchronizationServer {
|
|||
ServerTask::ServeGetHeaders(block_locator_hashes, hash_stop) => {
|
||||
let chain = chain.read();
|
||||
|
||||
// TODO: if block_locator_hashes is empty => return hash_stop
|
||||
let blocks_headers = match SynchronizationServer::locate_known_block_header(&chain, &block_locator_hashes) {
|
||||
let blocks_headers = match SynchronizationServer::locate_known_block_header(&chain, &block_locator_hashes, &hash_stop) {
|
||||
Some(best_common_block) => {
|
||||
trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash.to_reversed_str());
|
||||
|
||||
// TODO: add test for this case
|
||||
// we must respond with empty headers message even if we have no common blocks with this peer
|
||||
SynchronizationServer::blocks_headers_after(&chain, &best_common_block, &hash_stop, 2000)
|
||||
},
|
||||
None => {
|
||||
trace!(target: "sync", "No common blocks headers with peer#{}", peer_index);
|
||||
Vec::new()
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_headers.len(), peer_index);
|
||||
|
@ -447,8 +457,6 @@ impl Server for SynchronizationServer {
|
|||
}
|
||||
|
||||
fn serve_get_block_txn(&self, _peer_index: usize, block_hash: H256, indexes: Vec<usize>) -> Option<IndexedServerTask> {
|
||||
// TODO: Upon receipt of a properly-formatted getblocktxn message, nodes which *recently provided the sender
|
||||
// of such a message a cmpctblock for the block hash identified in this message* MUST respond ...
|
||||
let task = IndexedServerTask::new(ServerTask::ServeGetBlockTxn(block_hash, indexes), ServerTaskIndex::None);
|
||||
Some(task)
|
||||
}
|
||||
|
@ -650,9 +658,9 @@ pub mod tests {
|
|||
block_locator_hashes: vec![genesis_block_hash.clone()],
|
||||
hash_stop: H256::default(),
|
||||
}).map(|t| server.add_task(0, t));
|
||||
// => no response
|
||||
// => empty response
|
||||
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
|
||||
assert_eq!(tasks, vec![]);
|
||||
assert_eq!(tasks, vec![Task::SendInventory(0, vec![])]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -815,4 +823,48 @@ pub mod tests {
|
|||
Task::SendTransaction(0, tx_verified),
|
||||
]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn server_responds_with_nonempty_inventory_when_getdata_stop_hash_filled() {
|
||||
let (chain, executor, server) = create_synchronization_server();
|
||||
{
|
||||
let mut chain = chain.write();
|
||||
chain.insert_best_block(test_data::block_h1().hash(), &test_data::block_h1().into()).expect("no error");
|
||||
}
|
||||
// when asking with stop_hash
|
||||
server.serve_getblocks(0, types::GetBlocks {
|
||||
version: 0,
|
||||
block_locator_hashes: vec![],
|
||||
hash_stop: test_data::genesis().hash(),
|
||||
}).map(|t| server.add_task(0, t));
|
||||
// => respond with next block
|
||||
let inventory = vec![InventoryVector {
|
||||
inv_type: InventoryType::MessageBlock,
|
||||
hash: test_data::block_h1().hash(),
|
||||
}];
|
||||
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
||||
assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn server_responds_with_nonempty_headers_when_getdata_stop_hash_filled() {
|
||||
let (chain, executor, server) = create_synchronization_server();
|
||||
{
|
||||
let mut chain = chain.write();
|
||||
chain.insert_best_block(test_data::block_h1().hash(), &test_data::block_h1().into()).expect("no error");
|
||||
}
|
||||
// when asking with stop_hash
|
||||
let dummy_id = 6;
|
||||
server.serve_getheaders(0, types::GetHeaders {
|
||||
version: 0,
|
||||
block_locator_hashes: vec![],
|
||||
hash_stop: test_data::genesis().hash(),
|
||||
}, Some(dummy_id)).map(|t| server.add_task(0, t));
|
||||
// => respond with next block
|
||||
let headers = vec![
|
||||
test_data::block_h1().block_header,
|
||||
];
|
||||
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
||||
assert_eq!(tasks, vec![Task::SendHeaders(0, headers, ServerTaskIndex::Final(dummy_id))]);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ use primitives::hash::H256;
|
|||
use synchronization_chain::ChainRef;
|
||||
use verification::{ChainVerifier, Verify as VerificationVerify, Chain};
|
||||
use db::{SharedStore, IndexedBlock, PreviousTransactionOutputProvider};
|
||||
use time::get_time;
|
||||
|
||||
/// Verification events sink
|
||||
pub trait VerificationSink : Send + 'static {
|
||||
|
@ -169,11 +170,8 @@ fn execute_verification_task<T: VerificationSink, U: PreviousTransactionOutputPr
|
|||
}
|
||||
},
|
||||
VerificationTask::VerifyTransaction(height, transaction) => {
|
||||
// bitcoin: AcceptToMemoryPoolWorker
|
||||
|
||||
let time: u32 = 0; // TODO
|
||||
let sequence: usize = 1; // TODO: change to bool
|
||||
match verifier.verify_transaction(tx_output_provider, height, time, &transaction, sequence) {
|
||||
let time: u32 = get_time().sec as u32;
|
||||
match verifier.verify_transaction(tx_output_provider, height, time, &transaction, 1) {
|
||||
Ok(_) => sink.lock().on_transaction_verification_success(transaction),
|
||||
Err(e) => sink.lock().on_transaction_verification_error(&format!("{:?}", e), &transaction.hash()),
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue