Move PeerSet setup logic into a peer_set::init()

This commit is contained in:
Henry de Valence 2019-10-16 17:06:21 -07:00
parent 63cf340ab4
commit 5847b490da
4 changed files with 115 additions and 63 deletions

View File

@ -9,3 +9,76 @@ mod unready_service;
pub use discover::PeerDiscover;
pub use set::PeerSet;
use std::pin::Pin;
use futures::future::Future;
use tower::Service;
use crate::config::Config;
use crate::protocol::internal::{Request, Response};
use crate::BoxedStdError;
use crate::{peer::PeerConnector, timestamp_collector::TimestampCollector};
type BoxedZebraService = Box<
dyn Service<
Request,
Response = Response,
Error = BoxedStdError,
Future = Pin<Box<dyn Future<Output = Result<Response, BoxedStdError>> + Send>>,
> + Send
+ 'static,
>;
/// Initialize a peer set with the given `config`, forwarding peer requests to the `inbound_service`.
pub fn init<S>(config: Config, inbound_service: S) -> (BoxedZebraService, TimestampCollector)
where
S: Service<Request, Response = Response, Error = BoxedStdError> + Clone + Send + 'static,
S::Future: Send,
{
use futures::{
future,
stream::{FuturesUnordered, StreamExt},
};
use tokio::net::TcpStream;
use tower::{
buffer::Buffer,
discover::{Change, ServiceStream},
ServiceExt,
};
use tower_load::{peak_ewma::PeakEwmaDiscover, NoInstrument};
let tc = TimestampCollector::new();
let pc = Buffer::new(PeerConnector::new(config.clone(), inbound_service, &tc), 1);
// construct a stream of services XXX currently the stream is based on a
// static stream from config.initial_peers; this should be replaced with a
// channel that starts with initial_peers but also accetps incoming, dials
// new, etc.
let client_stream = PeakEwmaDiscover::new(
ServiceStream::new(
config
.initial_peers
.into_iter()
.map(|addr| {
let mut pc = pc.clone();
async move {
let stream = TcpStream::connect(addr).await?;
pc.ready().await?;
let client = pc.call((stream, addr)).await?;
Ok::<_, BoxedStdError>(Change::Insert(addr, client))
}
})
.collect::<FuturesUnordered<_>>()
// Discard any errored connections...
.filter(|result| future::ready(result.is_ok())),
),
config.ewma_default_rtt,
config.ewma_decay_time,
NoInstrument,
);
let peer_set = PeerSet::new(client_stream);
(Box::new(peer_set), tc)
}

View File

@ -216,7 +216,8 @@ where
{
type Response = Response;
type Error = BoxedStdError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Process peer discovery updates.

View File

@ -73,76 +73,57 @@ impl ConnectCmd {
1,
);
use tokio::net::TcpStream;
let mut config = app_config().network.clone();
let config = app_config().network.clone();
let collector = TimestampCollector::new();
let mut pc = Buffer::new(
PeerConnector::new(config, Network::Mainnet, node, &collector),
1,
);
// Until we finish fleshing out the peerset -- particularly
// pulling more peers -- we don't want to start with a single
// initial peer. So make a throwaway connection to the first,
// extract a list of addresses, and discard everything else.
// All the setup is kept in a sub-scope so we know we're not reusing it.
//
// Later, this should turn into initial_peers = vec![self.addr];
config.initial_peers = {
use tokio::net::TcpStream;
let tcp_stream = TcpStream::connect(self.addr).await?;
pc.ready()
.await
.map_err(failure::Error::from_boxed_compat)?;
let mut client = pc
.call((tcp_stream, self.addr))
.await
.map_err(failure::Error::from_boxed_compat)?;
let collector = TimestampCollector::new();
let mut pc = Buffer::new(
PeerConnector::new(config.clone(), node.clone(), &collector),
1,
);
client.ready().await?;
let tcp_stream = TcpStream::connect(self.addr).await?;
pc.ready()
.await
.map_err(failure::Error::from_boxed_compat)?;
let mut client = pc
.call((tcp_stream, self.addr))
.await
.map_err(failure::Error::from_boxed_compat)?;
let addrs = match client.call(Request::GetPeers).await? {
Response::Peers(addrs) => addrs,
_ => bail!("Got wrong response type"),
client.ready().await?;
let addrs = match client.call(Request::GetPeers).await? {
Response::Peers(addrs) => addrs,
_ => bail!("Got wrong response type"),
};
info!(
addrs.len = addrs.len(),
"got addresses from first connected peer"
);
addrs.into_iter().map(|meta| meta.addr).collect::<Vec<_>>()
};
info!(
addrs.len = addrs.len(),
"got addresses from first connected peer"
);
use failure::Error;
use futures::{
future,
stream::{FuturesUnordered, StreamExt},
};
use std::time::Duration;
use tower::discover::{Change, ServiceStream};
use tower_load::{peak_ewma::PeakEwmaDiscover, NoInstrument};
// construct a stream of services
let client_stream = PeakEwmaDiscover::new(
ServiceStream::new(
addrs
.into_iter()
.map(|meta| {
let mut pc = pc.clone();
async move {
let stream = TcpStream::connect(meta.addr).await?;
pc.ready().await?;
let client = pc.call((stream, meta.addr)).await?;
Ok::<_, BoxedStdError>(Change::Insert(meta.addr, client))
}
})
.collect::<FuturesUnordered<_>>()
// Discard any errored connections...
.filter(|result| future::ready(result.is_ok())),
),
Duration::from_secs(1), // default rtt estimate
Duration::from_secs(60), // decay time
NoInstrument,
);
info!("finished constructing discover");
let mut peer_set = PeerSet::new(client_stream);
let (mut peer_set, _tc) = zebra_network::peer_set::init(config, node);
info!("waiting for peer_set ready");
peer_set.ready().await.map_err(Error::from_boxed_compat)?;
info!("peer_set became ready, constructing addr requests");
use failure::Error;
use futures::stream::{FuturesUnordered, StreamExt};
let mut addr_reqs = FuturesUnordered::new();
for i in 0..10usize {
info!(i, "awaiting peer_set ready");

View File

@ -75,10 +75,7 @@ impl ListenCmd {
let config = app_config().network.clone();
let collector = TimestampCollector::new();
let mut pc = Buffer::new(
PeerConnector::new(config, Network::Mainnet, node, &collector),
1,
);
let mut pc = Buffer::new(PeerConnector::new(config, node, &collector), 1);
let mut listener = TcpListener::bind(self.addr).await?;