connection ping pong

This commit is contained in:
debris 2016-10-04 16:29:33 +02:00
parent ffcb4d5a20
commit 3f06ab14ae
4 changed files with 67 additions and 11 deletions

View File

@ -2,7 +2,7 @@ use ser::{Serializable, Stream, Deserializable, Reader, Error as ReaderError};
#[derive(Debug, PartialEq)]
pub struct Ping {
nonce: u64,
pub nonce: u64,
}
impl Serializable for Ping {

View File

@ -9,7 +9,7 @@ use Error;
pub fn handshake<A>(a: A, magic: Magic, version: Version) -> Handshake<A> where A: io::Write + io::Read {
Handshake {
version: version.version(),
state: HandshakeState::SendVersion(write_message(a, &version_message(magic, version))),
state: HandshakeState::SendVersion(write_message(a, version_message(magic, version))),
magic: magic,
}
}
@ -149,7 +149,7 @@ impl<A> Future for AcceptHandshake<A> where A: io::Read + io::Write {
let local_version = local_version.take().expect("local version must be set");
let next = AcceptHandshakeState::SendVersion {
version: Some(version),
future: write_message(stream, &version_message(self.magic, local_version)),
future: write_message(stream, version_message(self.magic, local_version)),
};
(next, Async::NotReady)
@ -158,7 +158,7 @@ impl<A> Future for AcceptHandshake<A> where A: io::Read + io::Write {
let (stream, _) = try_ready!(future.poll());
let next = AcceptHandshakeState::SendVerack {
version: version.take(),
future: write_message(stream, &verack_message(self.magic)),
future: write_message(stream, verack_message(self.magic)),
};
(next, Async::NotReady)

View File

@ -1,11 +1,30 @@
use std::io;
use futures::{Future, Poll};
use tokio_core::io::{WriteAll, write_all};
use bytes::Bytes;
use ser::{serialize};
use ser::serialize;
use message::Message;
use Error;
pub type WriteMessage<A> = WriteAll<A, Bytes>;
pub fn write_message<A>(a: A, message: &Message) -> WriteMessage<A> where A: io::Write {
write_all(a, serialize(message))
pub fn write_message<A>(a: A, message: Message) -> WriteMessage<A> where A: io::Write {
WriteMessage {
future: write_all(a, serialize(&message)),
message: Some(message),
}
}
pub struct WriteMessage<A> {
future: WriteAll<A, Bytes>,
message: Option<Message>,
}
impl<A> Future for WriteMessage<A> where A: io::Write {
type Item = (A, Message);
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (stream, _) = try_ready!(self.future.poll());
let message = self.message.take().expect("write message must be initialized with message");
Ok((stream, message).into())
}
}

View File

@ -1,11 +1,48 @@
use std::{net, io};
use message::{Message, Payload};
use message::common::Magic;
use io::HandshakeResult;
use message::types::{Ping, Pong};
use io::{HandshakeResult, write_message, WriteMessage};
pub struct Connection<A> where A: io::Read + io::Write {
pub struct Connection<A> {
pub stream: A,
pub handshake_result: HandshakeResult,
pub magic: Magic,
pub address: net::SocketAddr,
}
impl<A> io::Read for Connection<A> where A: io::Read {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
self.stream.read(buf)
}
}
impl<A> io::Write for Connection<A> where A: io::Write {
fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
self.stream.write(buf)
}
fn flush(&mut self) -> Result<(), io::Error> {
self.stream.flush()
}
}
impl<A> Connection<A> where A: io::Read + io::Write {
pub fn ping(self) -> WriteMessage<Connection<A>> {
let payload = Payload::Ping(Ping {
nonce: 0,
});
let message = Message::new(self.magic, payload);
write_message(self, message)
}
pub fn pong(self) -> WriteMessage<Connection<A>> {
let payload = Payload::Pong(Pong {
nonce: 0,
});
let message = Message::new(self.magic, payload);
write_message(self, message)
}
}