Implement sync component for start subcommand (#506)

This commit is contained in:
Jane Lusby 2020-06-22 19:24:53 -07:00 committed by GitHub
parent 03b8453b8a
commit 1c42b66a4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 421 additions and 152 deletions

1
.gitignore vendored
View File

@ -8,3 +8,4 @@
*\#
# Vscode detrius
.vscode/
.zebra-state/

View File

@ -12,6 +12,7 @@ use std::{
task::{Context, Poll},
};
use tower::{buffer::Buffer, Service};
use zebra_chain::block::BlockHeaderHash;
mod block_index;
@ -20,7 +21,11 @@ struct InMemoryState {
index: block_index::BlockIndex,
}
type Error = Box<dyn error::Error + Send + Sync + 'static>;
impl InMemoryState {
fn contains(&mut self, _hash: BlockHeaderHash) -> Result<Option<u32>, Error> {
todo!()
}
}
impl Service<Request> for InMemoryState {
type Response = Response;
@ -60,6 +65,16 @@ impl Service<Request> for InMemoryState {
async move { result }.boxed()
}
Request::GetDepth { hash } => {
let res = self.contains(hash);
async move {
let depth = res?;
Ok(Response::Depth(depth))
}
.boxed()
}
}
}
}
@ -76,3 +91,5 @@ pub fn init() -> impl Service<
+ 'static {
Buffer::new(InMemoryState::default(), 1)
}
type Error = Box<dyn error::Error + Send + Sync + 'static>;

View File

@ -58,12 +58,17 @@ pub enum Request {
},
/// Get the block that is the tip of the current chain
GetTip,
/// Ask the state if the given hash is part of the current best chain
GetDepth {
/// The hash to check against the current chain
hash: BlockHeaderHash,
},
}
#[derive(Debug, PartialEq)]
/// A state response
pub enum Response {
/// A response to a `AddBlock` request indicating a block was successfully
/// The response to a `AddBlock` request indicating a block was successfully
/// added to the state
Added {
/// The hash of the block that was added
@ -79,6 +84,12 @@ pub enum Response {
/// The hash of the block at the tip of the current chain
hash: BlockHeaderHash,
},
/// The response to a `Contains` request indicating that the given has is in
/// the current best chain
Depth(
/// The number of blocks above the given block in the current best chain
Option<u32>,
),
}
#[cfg(test)]

View File

@ -75,19 +75,23 @@ impl SledState {
}
}
pub(super) fn get_tip(&self) -> Result<Option<BlockHeaderHash>, Error> {
pub(super) fn get_tip(&self) -> Result<Option<Arc<Block>>, Error> {
let tree = self.storage.open_tree(b"by_height")?;
let last_entry = tree.iter().values().next_back();
match last_entry {
Some(Ok(bytes)) => {
let block = Arc::<Block>::zcash_deserialize(bytes.as_ref())?;
Ok(Some(block.as_ref().into()))
}
Some(Ok(bytes)) => Ok(Some(ZcashDeserialize::zcash_deserialize(bytes.as_ref())?)),
Some(Err(e)) => Err(e)?,
None => Ok(None),
}
}
fn contains(&self, hash: &BlockHeaderHash) -> Result<bool, Error> {
let by_hash = self.storage.open_tree(b"by_hash")?;
let key = &hash.0;
Ok(by_hash.contains_key(key)?)
}
}
impl Default for SledState {
@ -129,15 +133,55 @@ impl Service<Request> for SledState {
async move {
storage
.get_tip()?
.map(|block| block.as_ref().into())
.map(|hash| Response::Tip { hash })
.ok_or_else(|| "zebra-state contains no blocks".into())
}
.boxed()
}
Request::GetDepth { hash } => {
let storage = self.clone();
async move {
if !storage.contains(&hash)? {
return Ok(Response::Depth(None));
}
let block = storage
.get(hash)?
.expect("block must be present if contains returned true");
let tip = storage
.get_tip()?
.expect("storage must have a tip if it contains the previous block");
let depth =
tip.coinbase_height().unwrap().0 - block.coinbase_height().unwrap().0;
Ok(Response::Depth(Some(depth)))
}
.boxed()
}
}
}
}
/// An alternate repr for `BlockHeight` that implements `AsRef<[u8]>` for usage
/// with sled
struct BytesHeight(u32, [u8; 4]);
impl From<BlockHeight> for BytesHeight {
fn from(height: BlockHeight) -> Self {
let bytes = height.0.to_be_bytes();
Self(height.0, bytes)
}
}
impl AsRef<[u8]> for BytesHeight {
fn as_ref(&self) -> &[u8] {
&self.1[..]
}
}
pub(super) enum BlockQuery {
ByHash(BlockHeaderHash),
ByHeight(BlockHeight),

View File

@ -22,14 +22,12 @@ use crate::config::ZebradConfig;
use crate::{components::tokio::TokioComponent, prelude::*};
use abscissa_core::{config, Command, FrameworkError, Options, Runnable};
use color_eyre::eyre::Report;
use color_eyre::eyre::{eyre, WrapErr};
use futures::{
prelude::*,
stream::{FuturesUnordered, StreamExt},
};
use std::collections::BTreeSet;
use tower::{buffer::Buffer, service_fn, Service, ServiceExt};
use zebra_chain::{block::BlockHeaderHash, types::BlockHeight};
use futures::stream::FuturesUnordered;
use std::collections::HashSet;
use tower::{buffer::Buffer, service_fn};
use zebra_chain::block::BlockHeaderHash;
mod sync;
// genesis
static GENESIS: BlockHeaderHash = BlockHeaderHash([
@ -57,26 +55,21 @@ impl StartCmd {
}),
1,
);
let config = app_config().network.clone();
let state = zebra_state::on_disk::init(zebra_state::Config::default());
let (peer_set, _address_book) = zebra_network::init(config, node).await;
let retry_peer_set = tower::retry::Retry::new(zebra_network::RetryErrors, peer_set.clone());
let mut downloaded_block_heights = BTreeSet::<BlockHeight>::new();
downloaded_block_heights.insert(BlockHeight(0));
let mut connect = Core {
retry_peer_set,
let mut syncer = sync::Syncer {
peer_set,
state,
tip: GENESIS,
block_requests: FuturesUnordered::new(),
requested_block_heights: 0,
downloaded_block_heights,
downloading: HashSet::new(),
downloaded: HashSet::new(),
fanout: 4,
prospective_tips: HashSet::new(),
};
connect.run().await
syncer.run().await
}
}
@ -117,129 +110,3 @@ impl config::Override<ZebradConfig> for StartCmd {
Ok(config)
}
}
struct Core<ZN, ZS>
where
ZN: Service<zebra_network::Request>,
{
retry_peer_set: tower::retry::Retry<zebra_network::RetryErrors, ZN>,
peer_set: ZN,
state: ZS,
tip: BlockHeaderHash,
block_requests: FuturesUnordered<ZN::Future>,
requested_block_heights: usize,
downloaded_block_heights: BTreeSet<BlockHeight>,
}
impl<ZN, ZS> Core<ZN, ZS>
where
ZN: Service<zebra_network::Request, Response = zebra_network::Response, Error = Error>
+ Send
+ Clone
+ 'static,
ZN::Future: Send,
ZS: Service<zebra_state::Request, Response = zebra_state::Response, Error = Error>
+ Send
+ Clone
+ 'static,
ZS::Future: Send,
{
async fn run(&mut self) -> Result<(), Report> {
// TODO(jlusby): Replace with real state service
while self.requested_block_heights < 700_000 {
let hashes = self.next_hashes().await?;
self.tip = *hashes.last().unwrap();
// Request the corresponding blocks in chunks
self.request_blocks(hashes).await?;
// Allow at most 300 block requests in flight.
self.drain_requests(300).await?;
}
self.drain_requests(0).await?;
let eternity = future::pending::<()>();
eternity.await;
Ok(())
}
async fn next_hashes(&mut self) -> Result<Vec<BlockHeaderHash>, Report> {
// Request the next 500 hashes.
self.retry_peer_set
.ready_and()
.await
.map_err(|e| eyre!(e))?
.call(zebra_network::Request::FindBlocks {
known_blocks: vec![self.tip],
stop: None,
})
.await
.map_err(|e| eyre!(e))
.wrap_err("request failed, TODO implement retry")
.map(|response| match response {
zebra_network::Response::BlockHeaderHashes(hashes) => hashes,
_ => unreachable!("FindBlocks always gets a BlockHeaderHashes response"),
})
.map(|hashes| {
info!(
new_hashes = hashes.len(),
requested = self.requested_block_heights,
in_flight = self.block_requests.len(),
downloaded = self.downloaded_block_heights.len(),
highest = self.downloaded_block_heights.iter().next_back().unwrap().0,
"requested more hashes"
);
self.requested_block_heights += hashes.len();
hashes
})
}
async fn request_blocks(&mut self, hashes: Vec<BlockHeaderHash>) -> Result<(), Report> {
for chunk in hashes.chunks(10usize) {
let request = self.peer_set.ready_and().await.map_err(|e| eyre!(e))?.call(
zebra_network::Request::BlocksByHash(chunk.iter().cloned().collect()),
);
self.block_requests.push(request);
}
Ok(())
}
async fn drain_requests(&mut self, request_goal: usize) -> Result<(), Report> {
while self.block_requests.len() > request_goal {
match self
.block_requests
.next()
.await
.expect("expected: block_requests is never empty")
.map_err::<Report, _>(|e| eyre!(e))
{
Ok(zebra_network::Response::Blocks(blocks)) => {
for block in blocks {
self.downloaded_block_heights
.insert(block.coinbase_height().unwrap());
self.state
.ready_and()
.await
.map_err(|e| eyre!(e))?
.call(zebra_state::Request::AddBlock { block })
.await
.map_err(|e| eyre!(e))?;
}
}
Ok(_) => continue,
Err(e) => {
error!("{:?}", e);
}
}
}
Ok(())
}
}
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

View File

@ -0,0 +1,329 @@
use color_eyre::eyre::{eyre, Report};
use futures::stream::{FuturesUnordered, StreamExt};
use std::{collections::HashSet, iter, time::Duration};
use tokio::time::delay_for;
use tower::{Service, ServiceExt};
use tracing_futures::Instrument;
use zebra_chain::{block::BlockHeaderHash, types::BlockHeight};
pub struct Syncer<ZN, ZS>
where
ZN: Service<zebra_network::Request>,
{
pub peer_set: ZN,
// TODO(jlusby): add validator
pub state: ZS,
pub prospective_tips: HashSet<BlockHeaderHash>,
pub block_requests: FuturesUnordered<ZN::Future>,
pub downloading: HashSet<BlockHeaderHash>,
pub downloaded: HashSet<BlockHeaderHash>,
pub fanout: NumReq,
}
impl<ZN, ZS> Syncer<ZN, ZS>
where
ZN: Service<zebra_network::Request, Response = zebra_network::Response, Error = Error>
+ Send
+ Clone
+ 'static,
ZN::Future: Send,
ZS: Service<zebra_state::Request, Response = zebra_state::Response, Error = Error>
+ Send
+ Clone
+ 'static,
ZS::Future: Send,
{
pub async fn run(&mut self) -> Result<(), Report> {
loop {
info!("populating prospective tips list");
self.obtain_tips().await?;
// ObtainTips Step 6
//
// If there are any prospective tips, call ExtendTips. Continue this step until there are no more prospective tips.
while !self.prospective_tips.is_empty() {
info!("extending prospective tips");
self.extend_tips().await?;
// TODO(jlusby): move this to a background task and check it for errors after each step.
self.process_blocks().await?;
}
delay_for(Duration::from_secs(15)).await;
}
}
/// Given a block_locator list fan out request for subsequent hashes to
/// multiple peers
async fn obtain_tips(&mut self) -> Result<(), Report> {
// ObtainTips Step 1
//
// Query the current state to construct the sequence of hashes: handled by
// the caller
//
// TODO(jlusby): get the block_locator from the state
let block_locator = vec![super::GENESIS];
let mut tip_futs = FuturesUnordered::new();
// ObtainTips Step 2
//
// Make a FindBlocksByHash request to the network F times, where F is a
// fanout parameter, to get resp1, ..., respF
for _ in 0..self.fanout {
let req = self.peer_set.ready_and().await.map_err(|e| eyre!(e))?.call(
zebra_network::Request::FindBlocks {
known_blocks: block_locator.clone(),
stop: None,
},
);
tip_futs.push(req);
}
let mut download_set = HashSet::new();
while let Some(res) = tip_futs.next().await {
match res.map_err::<Report, _>(|e| eyre!(e)) {
Ok(zebra_network::Response::BlockHeaderHashes(hashes)) => {
info!(
new_hashes = hashes.len(),
in_flight = self.block_requests.len(),
downloaded = self.downloaded.len(),
"requested more hashes"
);
// TODO(jlusby): reject both main and test net genesis blocks
if hashes.last() == Some(&super::GENESIS) {
continue;
}
let mut hashes = hashes.into_iter().peekable();
let new_tip = if let Some(tip) = hashes.next() {
tip
} else {
continue;
};
// ObtainTips Step 3
//
// For each response, starting from the beginning of the
// list, prune any block hashes already included in the
// state, stopping at the first unknown hash to get resp1',
// ..., respF'. (These lists may be empty).
while let Some(&next) = hashes.peek() {
let resp = self
.state
.ready_and()
.await
.map_err(|e| eyre!(e))?
.call(zebra_state::Request::GetDepth { hash: next })
.await
.map_err(|e| eyre!(e))?;
let should_download = matches!(resp, zebra_state::Response::Depth(None));
if should_download {
download_set.extend(hashes);
break;
} else {
let _ = hashes.next();
}
}
// ObtainTips Step 4
//
// Combine the last elements of each list into a set; this
// is the set of prospective tips.
let _ = self.prospective_tips.insert(new_tip);
}
Ok(_) => {}
Err(e) => {
error!("{:?}", e);
}
}
}
// ObtainTips Step 5
//
// Combine all elements of each list into a set, and queue
// download and verification of those blocks.
self.request_blocks(download_set.into_iter().collect())
.await?;
Ok(())
}
async fn extend_tips(&mut self) -> Result<(), Report> {
// Extend Tips 1
//
// remove all prospective tips and iterate over them individually
let tips = std::mem::take(&mut self.prospective_tips);
let mut download_set = HashSet::new();
for tip in tips {
// ExtendTips Step 2
//
// Create a FindBlocksByHash request consisting of just the
// prospective tip. Send this request to the network F times
for _ in 0..self.fanout {
let res = self
.peer_set
.ready_and()
.await
.map_err(|e| eyre!(e))?
.call(zebra_network::Request::FindBlocks {
known_blocks: vec![tip],
stop: None,
})
.await;
match res.map_err::<Report, _>(|e| eyre!(e)) {
Ok(zebra_network::Response::BlockHeaderHashes(hashes)) => {
info!(
new_hashes = hashes.len(),
in_flight = self.block_requests.len(),
downloaded = self.downloaded.len(),
"requested more hashes"
);
// ExtendTips Step 3
//
// For each response, check whether the first hash in the
// response is the genesis block; if so, discard the response.
// It indicates that the remote peer does not have any blocks
// following the prospective tip.
if hashes.last() == Some(&super::GENESIS) {
continue;
}
let mut hashes = hashes.into_iter();
let new_tip = if let Some(tip) = hashes.next() {
tip
} else {
continue;
};
// ExtendTips Step 4
//
// Combine the last elements of the remaining responses into
// a set, and add this set to the set of prospective tips.
let _ = self.prospective_tips.insert(new_tip);
// ExtendTips Step 5
//
// Combine all elements of the remaining responses into a
// set, and queue download and verification of those blocks
download_set.extend(hashes);
}
Ok(_) => {}
Err(e) => {
error!("{:?}", e);
}
}
}
}
self.request_blocks(download_set.into_iter().collect())
.await?;
Ok(())
}
/// Queue downloads for each block that isn't currently known to our node
async fn request_blocks(&mut self, mut hashes: Vec<BlockHeaderHash>) -> Result<(), Report> {
hashes.retain(|hash| !self.known_block(hash));
for chunk in hashes.chunks(10usize) {
self.queue_download(chunk).await?;
}
Ok(())
}
/// Drive block downloading futures to completion and dispatch downloaded
/// blocks to the validator
async fn process_blocks(&mut self) -> Result<(), Report> {
info!(in_flight = self.block_requests.len(), "processing blocks");
while let Some(res) = self.block_requests.next().await {
match res.map_err::<Report, _>(|e| eyre!(e)) {
Ok(zebra_network::Response::Blocks(blocks)) => {
info!(count = blocks.len(), "received blocks");
for block in blocks {
let hash = block.as_ref().into();
assert!(
self.downloading.remove(&hash),
"all received blocks should be explicitly requested and received once"
);
let _ = self.downloaded.insert(hash);
self.validate_block(block).await?;
}
}
Ok(_) => continue,
Err(e) => {
error!("{:?}", e);
}
}
}
Ok(())
}
/// Validate a downloaded block using the validator service, inserting the
/// block into the state if successful
#[tracing::instrument(skip(self))]
async fn validate_block(
&mut self,
block: std::sync::Arc<zebra_chain::block::Block>,
) -> Result<(), Report> {
let fut = self
.state
.ready_and()
.await
.map_err(|e| eyre!(e))?
.call(zebra_state::Request::AddBlock { block });
let _handle = tokio::spawn(
async move {
match fut.await.map_err::<Report, _>(|e| eyre!(e)) {
Ok(_) => {}
Err(report) => error!("{:?}", report),
}
}
.in_current_span(),
);
Ok(())
}
/// Returns true if the block is being downloaded or has been downloaded
fn known_block(&self, hash: &BlockHeaderHash) -> bool {
self.downloading.contains(hash) || self.downloaded.contains(hash)
}
/// Queue a future to download a set of blocks from the network
async fn queue_download(&mut self, chunk: &[BlockHeaderHash]) -> Result<(), Report> {
let set = chunk.iter().cloned().collect();
let request = self
.peer_set
.ready_and()
.await
.map_err(|e| eyre!(e))?
.call(zebra_network::Request::BlocksByHash(set));
self.downloading.extend(chunk);
self.block_requests.push(request);
Ok(())
}
}
/// Get the heights of the blocks for constructing a block_locator list
#[allow(dead_code)]
pub fn block_locator_heights(tip_height: BlockHeight) -> impl Iterator<Item = BlockHeight> {
iter::successors(Some(1u32), |h| h.checked_mul(2))
.flat_map(move |step| tip_height.0.checked_sub(step))
.map(BlockHeight)
.chain(iter::once(BlockHeight(0)))
}
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type NumReq = u32;