2019-10-07 15:36:16 -07:00
|
|
|
use std::{
|
2019-10-15 16:10:43 -07:00
|
|
|
collections::HashSet,
|
2019-10-07 15:36:16 -07:00
|
|
|
net::SocketAddr,
|
|
|
|
pin::Pin,
|
2019-10-15 16:10:43 -07:00
|
|
|
sync::{Arc, Mutex},
|
2019-10-07 15:36:16 -07:00
|
|
|
task::{Context, Poll},
|
|
|
|
};
|
|
|
|
|
|
|
|
use chrono::Utc;
|
|
|
|
use futures::channel::mpsc;
|
2019-10-21 01:51:37 -07:00
|
|
|
use tokio::{codec::Framed, net::TcpStream, prelude::*, timer::Interval};
|
2019-10-22 12:44:08 -07:00
|
|
|
use tower::Service;
|
2019-10-07 15:36:16 -07:00
|
|
|
use tracing::{span, Level};
|
|
|
|
use tracing_futures::Instrument;
|
|
|
|
|
|
|
|
use zebra_chain::types::BlockHeight;
|
|
|
|
|
|
|
|
use crate::{
|
|
|
|
constants,
|
2019-11-26 22:42:42 -08:00
|
|
|
protocol::{
|
|
|
|
external::{types::*, Codec, Message},
|
|
|
|
internal::{Request, Response},
|
|
|
|
},
|
2019-10-21 01:51:37 -07:00
|
|
|
types::MetaAddr,
|
2019-10-17 14:33:45 -07:00
|
|
|
BoxedStdError, Config,
|
2019-10-07 15:36:16 -07:00
|
|
|
};
|
|
|
|
|
2019-11-27 11:48:41 -08:00
|
|
|
use super::{Client, ErrorSlot, HandshakeError, Server};
|
2019-10-07 15:36:16 -07:00
|
|
|
|
2019-10-22 11:30:24 -07:00
|
|
|
/// A [`Service`] that handshakes with a remote peer and constructs a
|
|
|
|
/// client/server pair.
|
2019-11-27 11:42:59 -08:00
|
|
|
pub struct Handshake<S> {
|
2019-10-08 13:57:24 -07:00
|
|
|
config: Config,
|
2019-10-07 15:36:16 -07:00
|
|
|
internal_service: S,
|
2019-10-17 16:38:44 -07:00
|
|
|
timestamp_collector: mpsc::Sender<MetaAddr>,
|
2019-10-15 16:10:43 -07:00
|
|
|
nonces: Arc<Mutex<HashSet<Nonce>>>,
|
2019-10-07 15:36:16 -07:00
|
|
|
}
|
|
|
|
|
2019-11-27 11:42:59 -08:00
|
|
|
impl<S: Clone> Clone for Handshake<S> {
|
2019-10-22 12:44:08 -07:00
|
|
|
fn clone(&self) -> Self {
|
2019-11-27 11:42:59 -08:00
|
|
|
Handshake {
|
2019-10-22 12:44:08 -07:00
|
|
|
config: self.config.clone(),
|
|
|
|
internal_service: self.internal_service.clone(),
|
|
|
|
timestamp_collector: self.timestamp_collector.clone(),
|
|
|
|
nonces: self.nonces.clone(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-27 11:42:59 -08:00
|
|
|
impl<S> Handshake<S>
|
2019-10-07 15:36:16 -07:00
|
|
|
where
|
2019-10-15 16:10:43 -07:00
|
|
|
S: Service<Request, Response = Response, Error = BoxedStdError> + Clone + Send + 'static,
|
2019-10-07 15:36:16 -07:00
|
|
|
S::Future: Send,
|
|
|
|
{
|
2019-10-08 13:57:24 -07:00
|
|
|
/// Construct a new `PeerConnector`.
|
2019-10-17 16:38:44 -07:00
|
|
|
pub fn new(
|
|
|
|
config: Config,
|
|
|
|
internal_service: S,
|
|
|
|
timestamp_collector: mpsc::Sender<MetaAddr>,
|
|
|
|
) -> Self {
|
2019-10-08 13:57:24 -07:00
|
|
|
// XXX this function has too many parameters, but it's not clear how to
|
|
|
|
// do a nice builder as all fields are mandatory. Could have Builder1,
|
|
|
|
// Builder2, ..., with Builder1::with_config() -> Builder2;
|
|
|
|
// Builder2::with_internal_service() -> ... or use Options in a single
|
|
|
|
// Builder type or use the derive_builder crate.
|
2019-11-27 11:42:59 -08:00
|
|
|
Handshake {
|
2019-10-08 13:57:24 -07:00
|
|
|
config,
|
2019-10-07 15:36:16 -07:00
|
|
|
internal_service,
|
2019-10-17 16:38:44 -07:00
|
|
|
timestamp_collector,
|
2019-10-15 16:10:43 -07:00
|
|
|
nonces: Arc::new(Mutex::new(HashSet::new())),
|
2019-10-07 15:36:16 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-11-27 11:42:59 -08:00
|
|
|
impl<S> Service<(TcpStream, SocketAddr)> for Handshake<S>
|
2019-10-07 15:36:16 -07:00
|
|
|
where
|
2019-10-15 20:38:26 -07:00
|
|
|
S: Service<Request, Response = Response, Error = BoxedStdError> + Clone + Send + 'static,
|
2019-10-07 15:36:16 -07:00
|
|
|
S::Future: Send,
|
|
|
|
{
|
2019-11-27 11:27:17 -08:00
|
|
|
type Response = Client;
|
2019-10-15 16:10:43 -07:00
|
|
|
type Error = HandshakeError;
|
2019-10-15 16:46:12 -07:00
|
|
|
type Future =
|
|
|
|
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
2019-10-07 15:36:16 -07:00
|
|
|
|
|
|
|
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
|
|
|
Poll::Ready(Ok(()))
|
|
|
|
}
|
|
|
|
|
2019-10-15 14:51:01 -07:00
|
|
|
fn call(&mut self, req: (TcpStream, SocketAddr)) -> Self::Future {
|
|
|
|
let (tcp_stream, addr) = req;
|
|
|
|
|
2019-10-07 15:36:16 -07:00
|
|
|
let connector_span = span!(Level::INFO, "connector", addr = ?addr);
|
2019-10-22 10:23:35 -07:00
|
|
|
// set parent: None for the peer connection span, as it should exist
|
|
|
|
// independently of its creation source (inbound connection, crawler,
|
|
|
|
// initial peer, ...)
|
|
|
|
let connection_span = span!(parent: None, Level::INFO, "peer", addr = ?addr);
|
2019-10-07 15:36:16 -07:00
|
|
|
|
|
|
|
// Clone these upfront, so they can be moved into the future.
|
2019-10-16 15:16:29 -07:00
|
|
|
let nonces = self.nonces.clone();
|
2019-10-07 15:36:16 -07:00
|
|
|
let internal_service = self.internal_service.clone();
|
2019-10-17 16:38:44 -07:00
|
|
|
let timestamp_collector = self.timestamp_collector.clone();
|
2019-10-08 13:57:24 -07:00
|
|
|
let user_agent = self.config.user_agent.clone();
|
2019-10-16 15:16:29 -07:00
|
|
|
let network = self.config.network.clone();
|
2019-10-07 15:36:16 -07:00
|
|
|
|
|
|
|
let fut = async move {
|
2019-10-08 13:58:05 -07:00
|
|
|
info!("connecting to remote peer");
|
|
|
|
|
2019-10-15 14:51:01 -07:00
|
|
|
let mut stream =
|
|
|
|
Framed::new(tcp_stream, Codec::builder().for_network(network).finish());
|
2019-10-07 15:36:16 -07:00
|
|
|
|
2019-10-15 16:10:43 -07:00
|
|
|
let local_nonce = Nonce::default();
|
|
|
|
nonces
|
|
|
|
.lock()
|
|
|
|
.expect("mutex should be unpoisoned")
|
|
|
|
.insert(local_nonce);
|
|
|
|
|
2019-10-07 15:36:16 -07:00
|
|
|
let version = Message::Version {
|
|
|
|
version: constants::CURRENT_VERSION,
|
|
|
|
services: PeerServices::NODE_NETWORK,
|
|
|
|
timestamp: Utc::now(),
|
|
|
|
address_recv: (PeerServices::NODE_NETWORK, addr),
|
|
|
|
address_from: (
|
|
|
|
PeerServices::NODE_NETWORK,
|
|
|
|
"127.0.0.1:9000".parse().unwrap(),
|
|
|
|
),
|
2019-10-15 16:10:43 -07:00
|
|
|
nonce: local_nonce,
|
2019-10-08 13:57:24 -07:00
|
|
|
user_agent,
|
|
|
|
// XXX eventually the `PeerConnector` will need to have a handle
|
|
|
|
// for a service that gets the current block height.
|
2019-10-07 15:36:16 -07:00
|
|
|
start_height: BlockHeight(0),
|
|
|
|
relay: false,
|
|
|
|
};
|
|
|
|
|
2019-10-08 13:58:05 -07:00
|
|
|
debug!(?version, "sending initial version message");
|
2019-10-07 15:36:16 -07:00
|
|
|
stream.send(version).await?;
|
|
|
|
|
2019-10-15 16:10:43 -07:00
|
|
|
let remote_msg = stream
|
2019-10-07 15:36:16 -07:00
|
|
|
.next()
|
|
|
|
.await
|
2019-10-15 16:10:43 -07:00
|
|
|
.ok_or_else(|| HandshakeError::ConnectionClosed)??;
|
|
|
|
|
|
|
|
// Check that we got a Version and destructure its fields into the local scope.
|
|
|
|
debug!(?remote_msg, "got message from remote peer");
|
2019-10-17 14:33:45 -07:00
|
|
|
let (remote_nonce, remote_services) = if let Message::Version {
|
|
|
|
nonce, services, ..
|
|
|
|
} = remote_msg
|
|
|
|
{
|
|
|
|
(nonce, services)
|
2019-10-15 16:10:43 -07:00
|
|
|
} else {
|
|
|
|
return Err(HandshakeError::UnexpectedMessage(remote_msg));
|
|
|
|
};
|
2019-10-07 15:36:16 -07:00
|
|
|
|
2019-10-15 16:10:43 -07:00
|
|
|
// Check for nonce reuse, indicating self-connection.
|
|
|
|
if {
|
|
|
|
let mut locked_nonces = nonces.lock().expect("mutex should be unpoisoned");
|
|
|
|
let nonce_reuse = locked_nonces.contains(&remote_nonce);
|
|
|
|
// Regardless of whether we observed nonce reuse, clean up the nonce set.
|
|
|
|
locked_nonces.remove(&local_nonce);
|
|
|
|
nonce_reuse
|
|
|
|
} {
|
|
|
|
return Err(HandshakeError::NonceReuse);
|
|
|
|
}
|
2019-10-08 13:58:05 -07:00
|
|
|
|
2019-10-07 15:36:16 -07:00
|
|
|
stream.send(Message::Verack).await?;
|
2019-10-15 16:10:43 -07:00
|
|
|
|
|
|
|
let remote_msg = stream
|
2019-10-07 15:36:16 -07:00
|
|
|
.next()
|
|
|
|
.await
|
2019-10-15 16:10:43 -07:00
|
|
|
.ok_or_else(|| HandshakeError::ConnectionClosed)??;
|
|
|
|
if let Message::Verack = remote_msg {
|
|
|
|
debug!("got verack from remote peer");
|
|
|
|
} else {
|
|
|
|
return Err(HandshakeError::UnexpectedMessage(remote_msg));
|
|
|
|
}
|
2019-10-08 13:58:05 -07:00
|
|
|
|
2019-10-07 15:36:16 -07:00
|
|
|
// XXX here is where we would set the version to the minimum of the
|
|
|
|
// two versions, etc. -- actually is it possible to edit the `Codec`
|
|
|
|
// after using it to make a framed adapter?
|
|
|
|
|
2019-11-27 11:27:17 -08:00
|
|
|
debug!("constructing client, spawning server");
|
2019-10-07 15:36:16 -07:00
|
|
|
|
2019-10-21 11:31:47 -07:00
|
|
|
// These channels should not be cloned more than they are
|
|
|
|
// in this block, see constants.rs for more.
|
2019-10-21 11:05:09 -07:00
|
|
|
let (server_tx, server_rx) = mpsc::channel(0);
|
2019-10-07 15:36:16 -07:00
|
|
|
let slot = ErrorSlot::default();
|
|
|
|
|
2019-11-27 11:27:17 -08:00
|
|
|
let client = Client {
|
2019-10-07 15:36:16 -07:00
|
|
|
span: connection_span.clone(),
|
2019-10-21 11:05:09 -07:00
|
|
|
server_tx: server_tx.clone(),
|
2019-10-07 15:36:16 -07:00
|
|
|
error_slot: slot.clone(),
|
|
|
|
};
|
|
|
|
|
|
|
|
let (peer_tx, peer_rx) = stream.split();
|
|
|
|
|
2019-11-27 11:34:25 -08:00
|
|
|
use super::server;
|
2019-11-27 11:31:35 -08:00
|
|
|
let server = Server {
|
2019-11-27 11:34:25 -08:00
|
|
|
state: server::State::AwaitingRequest,
|
2019-10-07 15:36:16 -07:00
|
|
|
svc: internal_service,
|
2019-10-21 11:05:09 -07:00
|
|
|
client_rx: server_rx,
|
2019-10-07 15:36:16 -07:00
|
|
|
error_slot: slot,
|
|
|
|
peer_tx,
|
2019-10-10 17:54:15 -07:00
|
|
|
request_timer: None,
|
2019-10-07 15:36:16 -07:00
|
|
|
};
|
|
|
|
|
|
|
|
let hooked_peer_rx = peer_rx
|
|
|
|
.then(move |msg| {
|
2019-10-17 16:38:44 -07:00
|
|
|
let mut timestamp_collector = timestamp_collector.clone();
|
2019-10-07 15:36:16 -07:00
|
|
|
async move {
|
|
|
|
if let Ok(_) = msg {
|
|
|
|
use futures::sink::SinkExt;
|
2019-10-17 16:38:44 -07:00
|
|
|
let _ = timestamp_collector
|
2019-10-17 14:33:45 -07:00
|
|
|
.send(MetaAddr {
|
|
|
|
addr,
|
|
|
|
services: remote_services,
|
|
|
|
last_seen: Utc::now(),
|
|
|
|
})
|
|
|
|
.await;
|
2019-10-07 15:36:16 -07:00
|
|
|
}
|
|
|
|
msg
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.boxed();
|
|
|
|
|
|
|
|
tokio::spawn(
|
|
|
|
server
|
|
|
|
.run(hooked_peer_rx)
|
|
|
|
.instrument(connection_span)
|
|
|
|
.boxed(),
|
|
|
|
);
|
|
|
|
|
2019-10-21 11:05:09 -07:00
|
|
|
tokio::spawn(async move {
|
|
|
|
use futures::channel::oneshot;
|
2019-10-21 01:51:37 -07:00
|
|
|
|
2019-10-21 11:05:09 -07:00
|
|
|
use super::client::ClientRequest;
|
|
|
|
|
|
|
|
let mut server_tx = server_tx;
|
|
|
|
|
2019-10-21 11:59:47 -07:00
|
|
|
let mut interval_stream = Interval::new_interval(constants::HEARTBEAT_INTERVAL);
|
2019-10-21 11:05:09 -07:00
|
|
|
|
|
|
|
loop {
|
|
|
|
interval_stream.next().await;
|
|
|
|
|
|
|
|
// We discard the server handle because our
|
|
|
|
// heartbeat `Ping`s are a special case, and we
|
|
|
|
// don't actually care about the response here.
|
|
|
|
let (request_tx, _) = oneshot::channel();
|
|
|
|
let msg = ClientRequest(Request::Ping(Nonce::default()), request_tx);
|
|
|
|
|
|
|
|
if server_tx.send(msg).await.is_err() {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
2019-10-21 01:51:37 -07:00
|
|
|
|
2019-10-07 15:36:16 -07:00
|
|
|
Ok(client)
|
|
|
|
};
|
|
|
|
fut.instrument(connector_span).boxed()
|
|
|
|
}
|
|
|
|
}
|