Implement retry policy for syncer (#551)

This commit is contained in:
Jane Lusby 2020-07-01 13:35:01 -07:00 committed by GitHub
parent f999ec75e6
commit 51f6ce86ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 15 deletions

View File

@ -22,8 +22,6 @@ use crate::config::ZebradConfig;
use crate::{components::tokio::TokioComponent, prelude::*};
use abscissa_core::{config, Command, FrameworkError, Options, Runnable};
use color_eyre::eyre::Report;
use futures::stream::FuturesUnordered;
use std::collections::HashSet;
use tower::{buffer::Buffer, service_fn};
use zebra_chain::block::BlockHeaderHash;
@ -60,14 +58,7 @@ impl StartCmd {
let (peer_set, _address_book) = zebra_network::init(config, node).await;
let verifier = zebra_consensus::verify::init(state.clone());
let mut syncer = sync::Syncer {
peer_set,
state,
verifier,
block_requests: FuturesUnordered::new(),
fanout: 4,
prospective_tips: HashSet::new(),
};
let mut syncer = sync::Syncer::new(peer_set, state, verifier);
syncer.run().await
}

View File

@ -2,14 +2,13 @@ use color_eyre::eyre::{eyre, Report};
use futures::stream::{FuturesUnordered, StreamExt};
use std::{collections::HashSet, iter, sync::Arc, time::Duration};
use tokio::time::delay_for;
use tower::{Service, ServiceExt};
use tower::{retry::Retry, Service, ServiceExt};
use zebra_chain::{
block::{Block, BlockHeaderHash},
types::BlockHeight,
};
use zebra_network as zn;
use zebra_state as zs;
use zebra_network::{self as zn, RetryLimit};
use zebra_state::{self as zs};
pub struct Syncer<ZN, ZS, ZV>
where
@ -18,11 +17,30 @@ where
pub peer_set: ZN,
pub state: ZS,
pub verifier: ZV,
pub retry_peer_set: Retry<RetryLimit, ZN>,
pub prospective_tips: HashSet<BlockHeaderHash>,
pub block_requests: FuturesUnordered<ZN::Future>,
pub fanout: NumReq,
}
impl<ZN, ZS, ZC> Syncer<ZN, ZS, ZC>
where
ZN: Service<zn::Request> + Clone,
{
pub fn new(peer_set: ZN, state: ZS, verifier: ZC) -> Self {
let retry_peer_set = Retry::new(RetryLimit::new(3), peer_set.clone());
Self {
peer_set,
state,
verifier,
retry_peer_set,
block_requests: FuturesUnordered::new(),
fanout: 4,
prospective_tips: HashSet::new(),
}
}
}
impl<ZN, ZS, ZV> Syncer<ZN, ZS, ZV>
where
ZN: Service<zn::Request, Response = zn::Response, Error = Error> + Send + Clone + 'static,
@ -224,7 +242,7 @@ where
let set = chunk.iter().cloned().collect();
let request = self
.peer_set
.retry_peer_set
.ready_and()
.await
.map_err(|e| eyre!(e))?