handshake without redundant cloning data

This commit is contained in:
debris 2016-09-28 17:46:15 +02:00
parent 0140881c38
commit 4039a1b0e7
5 changed files with 46 additions and 39 deletions

View File

@ -5,7 +5,7 @@ use ser::{
};
use common::{Port, IpAddress, ServiceFlags};
#[derive(Debug, PartialEq, Clone)]
#[derive(Debug, PartialEq)]
pub struct NetAddress {
pub services: ServiceFlags,
pub address: IpAddress,

View File

@ -2,7 +2,7 @@ use std::{str, net};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
#[derive(Debug, PartialEq, Clone)]
#[derive(Debug, PartialEq)]
pub struct IpAddress(net::IpAddr);
impl From<net::IpAddr> for IpAddress {

View File

@ -1,7 +1,7 @@
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
#[derive(Debug, PartialEq, Clone)]
#[derive(Debug, PartialEq)]
pub struct Port(u16);
impl From<u16> for Port {

View File

@ -5,7 +5,7 @@ use ser::{
};
use common::{NetAddress, ServiceFlags};
#[derive(Debug, PartialEq, Clone)]
#[derive(Debug, PartialEq)]
pub enum Version {
Simple(Simple),
V106(Simple, V106),
@ -22,7 +22,7 @@ impl Version {
}
}
#[derive(Debug, PartialEq, Clone)]
#[derive(Debug, PartialEq)]
pub struct Simple {
pub version: u32,
pub services: ServiceFlags,
@ -30,7 +30,7 @@ pub struct Simple {
pub receiver: NetAddress,
}
#[derive(Debug, PartialEq, Clone)]
#[derive(Debug, PartialEq)]
pub struct V106 {
pub from: NetAddress,
pub nonce: u64,
@ -38,7 +38,7 @@ pub struct V106 {
pub start_height: i32,
}
#[derive(Debug, PartialEq, Clone)]
#[derive(Debug, PartialEq)]
pub struct V70001 {
pub relay: bool,
}

View File

@ -1,4 +1,4 @@
use std::{io, cmp};
use std::io;
use futures::{Future, Poll, Async};
use net::messages::{Version, Message, Payload};
use io::{write_message, read_message, ReadMessage, WriteMessage, Error};
@ -15,29 +15,29 @@ fn verack() -> Message {
pub struct HandshakeResult {
pub version: Version,
pub negotiated_version: u32,
}
enum HandshakeState<A> {
SendVersion(WriteMessage<A>),
ReceiveVersion(ReadMessage<A>),
ReceiveVerack {
version: Version,
version: Option<Version>,
future: ReadMessage<A>,
},
Finished,
}
enum AcceptHandshakeState<A> {
ReceiveVersion(ReadMessage<A>),
SendVersion {
version: Version,
version: Option<Version>,
future: WriteMessage<A>,
},
SendVerack {
version: Version,
version: Option<Version>,
future: WriteMessage<A>,
}
},
Finished,
}
pub fn handshake<A>(a: A) -> Handshake<A> where A: io::Write + io::Read {
@ -65,10 +65,10 @@ impl<A> Future for Handshake<A> where A: io::Read + io::Write {
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let next = match self.state {
let (next, result) = match self.state {
HandshakeState::SendVersion(ref mut future) => {
let (stream, _) = try_async!(future.poll());
HandshakeState::ReceiveVersion(read_message(stream, 0))
(HandshakeState::ReceiveVersion(read_message(stream, 0)), Async::NotReady)
},
HandshakeState::ReceiveVersion(ref mut future) => {
let (stream, message) = try_async!(future.poll());
@ -77,28 +77,30 @@ impl<A> Future for Handshake<A> where A: io::Read + io::Write {
_ => return Err(Error::HandshakeFailed),
};
HandshakeState::ReceiveVerack {
version: version,
let next = HandshakeState::ReceiveVerack {
version: Some(version),
future: read_message(stream, 0),
}
};
(next, Async::NotReady)
},
HandshakeState::ReceiveVerack { ref version, ref mut future } => {
HandshakeState::ReceiveVerack { ref mut version, ref mut future } => {
let (stream, message) = try_async!(future.poll());
if message.payload != Payload::Verack {
return Err(Error::HandshakeFailed);
}
let result = HandshakeResult {
version: version.clone(),
negotiated_version: cmp::min(VERSION, version.version()),
version: version.take().expect("verack must be preceded by version"),
};
return Ok(Async::Ready((stream, result)));
}
(HandshakeState::Finished, Async::Ready((stream, result)))
},
HandshakeState::Finished => panic!("poll Handshake after it's done"),
};
self.state = next;
Ok(Async::NotReady)
Ok(result)
}
}
@ -107,38 +109,43 @@ impl<A> Future for AcceptHandshake<A> where A: io::Read + io::Write {
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let next = match self.state {
let (next, result) = match self.state {
AcceptHandshakeState::ReceiveVersion(ref mut future) => {
let (stream, message) = try_async!(future.poll());
let version = match message.payload {
Payload::Version(version) => version,
_ => return Err(Error::HandshakeFailed),
};
AcceptHandshakeState::SendVersion {
version: version,
let next = AcceptHandshakeState::SendVersion {
version: Some(version),
future: write_message(stream, &local_version()),
}
};
(next, Async::NotReady)
},
AcceptHandshakeState::SendVersion { ref version, ref mut future } => {
AcceptHandshakeState::SendVersion { ref mut version, ref mut future } => {
let (stream, _) = try_async!(future.poll());
AcceptHandshakeState::SendVerack {
version: version.clone(),
let next = AcceptHandshakeState::SendVerack {
version: version.take(),
future: write_message(stream, &verack()),
}
};
(next, Async::NotReady)
},
AcceptHandshakeState::SendVerack { ref version, ref mut future } => {
AcceptHandshakeState::SendVerack { ref mut version, ref mut future } => {
let (stream, _) = try_async!(future.poll());
let result = HandshakeResult {
version: version.clone(),
negotiated_version: cmp::min(VERSION, version.version()),
version: version.take().expect("verack must be preceded by version"),
};
return Ok(Async::Ready((stream, result)));
}
(AcceptHandshakeState::Finished, Async::Ready((stream, result)))
},
AcceptHandshakeState::Finished => panic!("poll AcceptHandshake after it's done"),
};
self.state = next;
Ok(Async::NotReady)
Ok(result)
}
}