fix block downloading to be parallelized and commited via the verifier (#540)
This commit is contained in:
parent
afd0e90a74
commit
7245d91fe9
|
@ -2449,6 +2449,7 @@ dependencies = [
|
|||
"tracing-log",
|
||||
"tracing-subscriber",
|
||||
"zebra-chain",
|
||||
"zebra-consensus",
|
||||
"zebra-network",
|
||||
"zebra-state",
|
||||
]
|
||||
|
|
|
@ -34,6 +34,7 @@ zebra-state = { path = "../zebra-state" }
|
|||
tracing-subscriber = { version = "0.2.6", features = ["tracing-log"] }
|
||||
tracing-error = "0.1.2"
|
||||
color-eyre = "0.5"
|
||||
zebra-consensus = { path = "../zebra-consensus/" }
|
||||
|
||||
[dev-dependencies]
|
||||
abscissa_core = { version = "0.5", features = ["testing"] }
|
||||
|
|
|
@ -30,7 +30,7 @@ use zebra_chain::block::BlockHeaderHash;
|
|||
mod sync;
|
||||
|
||||
// genesis
|
||||
static GENESIS: BlockHeaderHash = BlockHeaderHash([
|
||||
const GENESIS: BlockHeaderHash = BlockHeaderHash([
|
||||
8, 206, 61, 151, 49, 176, 0, 192, 131, 56, 69, 92, 138, 74, 107, 208, 93, 161, 110, 38, 177,
|
||||
29, 170, 27, 145, 113, 132, 236, 232, 15, 4, 0,
|
||||
]);
|
||||
|
@ -58,13 +58,13 @@ impl StartCmd {
|
|||
let config = app_config().network.clone();
|
||||
let state = zebra_state::on_disk::init(zebra_state::Config::default());
|
||||
let (peer_set, _address_book) = zebra_network::init(config, node).await;
|
||||
let verifier = zebra_consensus::verify::init(state.clone());
|
||||
|
||||
let mut syncer = sync::Syncer {
|
||||
peer_set,
|
||||
state,
|
||||
verifier,
|
||||
block_requests: FuturesUnordered::new(),
|
||||
downloading: HashSet::new(),
|
||||
downloaded: HashSet::new(),
|
||||
fanout: 4,
|
||||
prospective_tips: HashSet::new(),
|
||||
};
|
||||
|
|
|
@ -1,34 +1,36 @@
|
|||
use color_eyre::eyre::{eyre, Report};
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use std::{collections::HashSet, iter, time::Duration};
|
||||
use std::{collections::HashSet, iter, sync::Arc, time::Duration};
|
||||
use tokio::time::delay_for;
|
||||
use tower::{Service, ServiceExt};
|
||||
use tracing_futures::Instrument;
|
||||
use zebra_chain::{block::BlockHeaderHash, types::BlockHeight};
|
||||
use zebra_chain::{
|
||||
block::{Block, BlockHeaderHash},
|
||||
types::BlockHeight,
|
||||
};
|
||||
|
||||
use zebra_network as zn;
|
||||
use zebra_state as zs;
|
||||
|
||||
pub struct Syncer<ZN, ZS>
|
||||
pub struct Syncer<ZN, ZS, ZV>
|
||||
where
|
||||
ZN: Service<zn::Request>,
|
||||
{
|
||||
pub peer_set: ZN,
|
||||
// TODO(jlusby): add validator
|
||||
pub state: ZS,
|
||||
pub verifier: ZV,
|
||||
pub prospective_tips: HashSet<BlockHeaderHash>,
|
||||
pub block_requests: FuturesUnordered<ZN::Future>,
|
||||
pub downloading: HashSet<BlockHeaderHash>,
|
||||
pub downloaded: HashSet<BlockHeaderHash>,
|
||||
pub fanout: NumReq,
|
||||
}
|
||||
|
||||
impl<ZN, ZS> Syncer<ZN, ZS>
|
||||
impl<ZN, ZS, ZV> Syncer<ZN, ZS, ZV>
|
||||
where
|
||||
ZN: Service<zn::Request, Response = zn::Response, Error = Error> + Send + Clone + 'static,
|
||||
ZN::Future: Send,
|
||||
ZS: Service<zs::Request, Response = zs::Response, Error = Error> + Send + Clone + 'static,
|
||||
ZS::Future: Send,
|
||||
ZV: Service<Arc<Block>, Response = BlockHeaderHash, Error = Error> + Send + Clone + 'static,
|
||||
ZV::Future: Send,
|
||||
{
|
||||
pub async fn run(&mut self) -> Result<(), Report> {
|
||||
loop {
|
||||
|
@ -41,9 +43,6 @@ where
|
|||
while !self.prospective_tips.is_empty() {
|
||||
info!("extending prospective tips");
|
||||
self.extend_tips().await?;
|
||||
|
||||
// TODO(jlusby): move this to a background task and check it for errors after each step.
|
||||
self.process_blocks().await?;
|
||||
}
|
||||
|
||||
delay_for(Duration::from_secs(15)).await;
|
||||
|
@ -161,12 +160,7 @@ where
|
|||
})
|
||||
.await;
|
||||
match res.map_err::<Report, _>(|e| eyre!(e)) {
|
||||
Ok(zn::Response::BlockHeaderHashes(hashes)) => {
|
||||
if hashes.is_empty() {
|
||||
tracing::debug!("skipping empty response");
|
||||
continue;
|
||||
}
|
||||
|
||||
Ok(zn::Response::BlockHeaderHashes(mut hashes)) => {
|
||||
// ExtendTips Step 3
|
||||
//
|
||||
// For each response, check whether the first hash in the
|
||||
|
@ -174,22 +168,26 @@ where
|
|||
// It indicates that the remote peer does not have any blocks
|
||||
// following the prospective tip.
|
||||
// TODO(jlusby): reject both main and test net genesis blocks
|
||||
if hashes[0] == super::GENESIS {
|
||||
tracing::debug!("skipping response that does not extend the tip");
|
||||
continue;
|
||||
match hashes.first() {
|
||||
Some(&super::GENESIS) => {
|
||||
tracing::debug!("skipping response that does not extend the tip");
|
||||
continue;
|
||||
}
|
||||
None => {
|
||||
tracing::debug!("skipping empty response");
|
||||
continue;
|
||||
}
|
||||
Some(_) => {}
|
||||
}
|
||||
|
||||
let new_tip = hashes.pop().expect("expected: hashes must have len > 0");
|
||||
|
||||
// ExtendTips Step 4
|
||||
//
|
||||
// Combine the last elements of the remaining responses into
|
||||
// a set, and add this set to the set of prospective tips.
|
||||
let new_tip = *hashes.last().expect("already checked is_empty");
|
||||
let _ = self.prospective_tips.insert(new_tip);
|
||||
|
||||
// ExtendTips Step 5
|
||||
//
|
||||
// Combine all elements of the remaining responses into a
|
||||
// set, and queue download and verification of those blocks
|
||||
download_set.extend(hashes);
|
||||
}
|
||||
Ok(r) => tracing::error!("unexpected response {:?}", r),
|
||||
|
@ -198,100 +196,77 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
self.request_blocks(download_set.into_iter().collect())
|
||||
.await?;
|
||||
// ExtendTips Step ??
|
||||
//
|
||||
// Remove tips that are already included behind one of the other
|
||||
// returned tips
|
||||
self.prospective_tips
|
||||
.retain(|tip| !download_set.contains(tip));
|
||||
|
||||
// ExtendTips Step 5
|
||||
//
|
||||
// Combine all elements of the remaining responses into a
|
||||
// set, and queue download and verification of those blocks
|
||||
self.request_blocks(
|
||||
download_set
|
||||
.into_iter()
|
||||
.chain(self.prospective_tips.iter().cloned())
|
||||
.collect(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Queue downloads for each block that isn't currently known to our node
|
||||
async fn request_blocks(&mut self, mut hashes: Vec<BlockHeaderHash>) -> Result<(), Report> {
|
||||
hashes.retain(|hash| !self.known_block(hash));
|
||||
|
||||
async fn request_blocks(&mut self, hashes: Vec<BlockHeaderHash>) -> Result<(), Report> {
|
||||
for chunk in hashes.chunks(10usize) {
|
||||
self.queue_download(chunk).await?;
|
||||
}
|
||||
let set = chunk.iter().cloned().collect();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
let request = self
|
||||
.peer_set
|
||||
.ready_and()
|
||||
.await
|
||||
.map_err(|e| eyre!(e))?
|
||||
.call(zn::Request::BlocksByHash(set));
|
||||
|
||||
/// Drive block downloading futures to completion and dispatch downloaded
|
||||
/// blocks to the validator
|
||||
async fn process_blocks(&mut self) -> Result<(), Report> {
|
||||
info!(in_flight = self.block_requests.len(), "processing blocks");
|
||||
let verifier = self.verifier.clone();
|
||||
|
||||
while let Some(res) = self.block_requests.next().await {
|
||||
match res.map_err::<Report, _>(|e| eyre!(e)) {
|
||||
Ok(zn::Response::Blocks(blocks)) => {
|
||||
info!(count = blocks.len(), "received blocks");
|
||||
for block in blocks {
|
||||
let hash = block.as_ref().into();
|
||||
assert!(
|
||||
self.downloading.remove(&hash),
|
||||
"all received blocks should be explicitly requested and received once"
|
||||
);
|
||||
let _ = self.downloaded.insert(hash);
|
||||
self.validate_block(block).await?;
|
||||
let _ = tokio::spawn(async move {
|
||||
let result_fut = async move {
|
||||
let mut handles = FuturesUnordered::new();
|
||||
let resp = request.await?;
|
||||
|
||||
if let zn::Response::Blocks(blocks) = resp {
|
||||
debug!(count = blocks.len(), "received blocks");
|
||||
|
||||
for block in blocks {
|
||||
let mut verifier = verifier.clone();
|
||||
let handle = tokio::spawn(async move {
|
||||
verifier.ready_and().await?.call(block).await
|
||||
});
|
||||
handles.push(handle);
|
||||
}
|
||||
} else {
|
||||
debug!(?resp, "unexpected response");
|
||||
}
|
||||
|
||||
while let Some(res) = handles.next().await {
|
||||
let _hash = res??;
|
||||
}
|
||||
|
||||
Ok::<_, Error>(())
|
||||
};
|
||||
|
||||
match result_fut.await {
|
||||
Ok(()) => {}
|
||||
Err(e) => error!("{:?}", e),
|
||||
}
|
||||
Ok(_) => continue,
|
||||
Err(e) => {
|
||||
error!("{:?}", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Validate a downloaded block using the validator service, inserting the
|
||||
/// block into the state if successful
|
||||
#[tracing::instrument(skip(self))]
|
||||
async fn validate_block(
|
||||
&mut self,
|
||||
block: std::sync::Arc<zebra_chain::block::Block>,
|
||||
) -> Result<(), Report> {
|
||||
let fut = self
|
||||
.state
|
||||
.ready_and()
|
||||
.await
|
||||
.map_err(|e| eyre!(e))?
|
||||
.call(zs::Request::AddBlock { block });
|
||||
|
||||
let _handle = tokio::spawn(
|
||||
async move {
|
||||
match fut.await.map_err::<Report, _>(|e| eyre!(e)) {
|
||||
Ok(_) => {}
|
||||
Err(report) => error!("{:?}", report),
|
||||
}
|
||||
}
|
||||
.in_current_span(),
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns true if the block is being downloaded or has been downloaded
|
||||
fn known_block(&self, hash: &BlockHeaderHash) -> bool {
|
||||
self.downloading.contains(hash) || self.downloaded.contains(hash)
|
||||
}
|
||||
|
||||
/// Queue a future to download a set of blocks from the network
|
||||
async fn queue_download(&mut self, chunk: &[BlockHeaderHash]) -> Result<(), Report> {
|
||||
let set = chunk.iter().cloned().collect();
|
||||
|
||||
let request = self
|
||||
.peer_set
|
||||
.ready_and()
|
||||
.await
|
||||
.map_err(|e| eyre!(e))?
|
||||
.call(zn::Request::BlocksByHash(set));
|
||||
|
||||
self.downloading.extend(chunk);
|
||||
self.block_requests.push(request);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the heights of the blocks for constructing a block_locator list
|
||||
|
|
Loading…
Reference in New Issue