From 0140881c3851d78f8688215538fbf7466d6684a5 Mon Sep 17 00:00:00 2001 From: debris Date: Wed, 28 Sep 2016 16:41:15 +0200 Subject: [PATCH] AcceptHandshake --- p2p/src/io/handshake.rs | 67 +++++++++++++++++++++++++++++++++++++++++ p2p/src/io/mod.rs | 2 +- 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/p2p/src/io/handshake.rs b/p2p/src/io/handshake.rs index bc453cf7..79219d78 100644 --- a/p2p/src/io/handshake.rs +++ b/p2p/src/io/handshake.rs @@ -9,6 +9,10 @@ fn local_version() -> Message { unimplemented!(); } +fn verack() -> Message { + unimplemented!(); +} + pub struct HandshakeResult { pub version: Version, pub negotiated_version: u32, @@ -24,16 +28,38 @@ enum HandshakeState { }, } +enum AcceptHandshakeState { + ReceiveVersion(ReadMessage), + SendVersion { + version: Version, + future: WriteMessage, + }, + SendVerack { + version: Version, + future: WriteMessage, + } +} + pub fn handshake(a: A) -> Handshake where A: io::Write + io::Read { Handshake { state: HandshakeState::SendVersion(write_message(a, &local_version())), } } +pub fn accept_handshake(a: A) -> AcceptHandshake where A: io::Write + io::Read { + AcceptHandshake { + state: AcceptHandshakeState::ReceiveVersion(read_message(a, 0)), + } +} + pub struct Handshake { state: HandshakeState, } +pub struct AcceptHandshake { + state: AcceptHandshakeState, +} + impl Future for Handshake where A: io::Read + io::Write { type Item = (A, HandshakeResult); type Error = Error; @@ -75,3 +101,44 @@ impl Future for Handshake where A: io::Read + io::Write { Ok(Async::NotReady) } } + +impl Future for AcceptHandshake where A: io::Read + io::Write { + type Item = (A, HandshakeResult); + type Error = Error; + + fn poll(&mut self) -> Poll { + let next = match self.state { + AcceptHandshakeState::ReceiveVersion(ref mut future) => { + let (stream, message) = try_async!(future.poll()); + let version = match message.payload { + Payload::Version(version) => version, + _ => return Err(Error::HandshakeFailed), + }; + AcceptHandshakeState::SendVersion { + version: version, + future: write_message(stream, &local_version()), + } + }, + AcceptHandshakeState::SendVersion { ref version, ref mut future } => { + let (stream, _) = try_async!(future.poll()); + AcceptHandshakeState::SendVerack { + version: version.clone(), + future: write_message(stream, &verack()), + } + }, + AcceptHandshakeState::SendVerack { ref version, ref mut future } => { + let (stream, _) = try_async!(future.poll()); + + let result = HandshakeResult { + version: version.clone(), + negotiated_version: cmp::min(VERSION, version.version()), + }; + + return Ok(Async::Ready((stream, result))); + } + }; + + self.state = next; + Ok(Async::NotReady) + } +} diff --git a/p2p/src/io/mod.rs b/p2p/src/io/mod.rs index 2a0c4810..72c6dfa7 100644 --- a/p2p/src/io/mod.rs +++ b/p2p/src/io/mod.rs @@ -14,7 +14,7 @@ mod read_payload; mod write_message; pub use self::error::Error; -pub use self::handshake::{handshake, Handshake}; +pub use self::handshake::{handshake, accept_handshake, Handshake, AcceptHandshake}; pub use self::read_header::{read_header, ReadHeader}; pub use self::read_message::{read_message, ReadMessage}; pub use self::read_payload::{read_payload, ReadPayload};