diff --git a/Cargo.toml b/Cargo.toml index 71a1ffa..d56d98f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tokio-named-pipes" -version = "0.1.0" +version = "0.2.0" authors = ["Alex Crichton "] license = "MIT/Apache-2.0" readme = "README.md" @@ -12,8 +12,8 @@ Windows named pipe bindings for tokio. """ [dependencies] -tokio-core = "0.1" -tokio-io = "0.1" +mio = "0.6" +tokio = "0.1" mio-named-pipes = { git = "https://github.com/alexcrichton/mio-named-pipes" } futures = "0.1" bytes = "0.4" \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index fb6b215..52cdb8f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,138 +1,110 @@ #![cfg(windows)] -extern crate tokio_core; -extern crate tokio_io; +extern crate tokio; extern crate bytes; +extern crate mio; extern crate mio_named_pipes; extern crate futures; use std::ffi::OsStr; use std::fmt; -use std::io::{self, Read, Write}; +use std::io::{Read, Write}; use std::os::windows::io::*; use futures::{Async, Poll}; - -#[allow(deprecated)] -use tokio_core::io::Io; use bytes::{BufMut, Buf}; - -use tokio_core::reactor::{PollEvented, Handle}; - -use tokio_io::{AsyncRead, AsyncWrite}; +use mio::Ready; +use tokio::reactor::{PollEvented2}; +use tokio::io::{AsyncRead, AsyncWrite}; pub struct NamedPipe { - io: PollEvented, + io: PollEvented2, } impl NamedPipe { - pub fn new>(p: P, handle: &Handle) -> io::Result { - NamedPipe::_new(p.as_ref(), handle) + pub fn new>(p: P) -> std::io::Result { + NamedPipe::_new(p.as_ref()) } - fn _new(p: &OsStr, handle: &Handle) -> io::Result { + fn _new(p: &OsStr) -> std::io::Result { let inner = try!(mio_named_pipes::NamedPipe::new(p)); - NamedPipe::from_pipe(inner, handle) + NamedPipe::from_pipe(inner) } - pub fn from_pipe(pipe: mio_named_pipes::NamedPipe, - handle: &Handle) - -> io::Result { + pub fn from_pipe(pipe: mio_named_pipes::NamedPipe) + -> std::io::Result { Ok(NamedPipe { - io: try!(PollEvented::new(pipe, handle)), + io: PollEvented2::new(pipe), }) } - pub fn connect(&self) -> io::Result<()> { + pub fn connect(&self) -> std::io::Result<()> { self.io.get_ref().connect() } - pub fn disconnect(&self) -> io::Result<()> { + pub fn disconnect(&self) -> std::io::Result<()> { self.io.get_ref().disconnect() } - pub fn poll_read(&self) -> Async<()> { - self.io.poll_read() + pub fn poll_read_ready_readable(&mut self) -> tokio::io::Result> { + self.io.poll_read_ready(Ready::readable()) } - pub fn poll_write(&self) -> Async<()> { - self.io.poll_write() + pub fn poll_write_ready(&mut self) -> tokio::io::Result> { + self.io.poll_write_ready() } - fn io_mut(&mut self) -> &mut PollEvented { + fn io_mut(&mut self) -> &mut PollEvented2 { &mut self.io } } impl Read for NamedPipe { - fn read(&mut self, buf: &mut [u8]) -> io::Result { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { self.io.read(buf) } } impl Write for NamedPipe { - fn write(&mut self, buf: &[u8]) -> io::Result { + fn write(&mut self, buf: &[u8]) -> std::io::Result { self.io.write(buf) } - fn flush(&mut self) -> io::Result<()> { + fn flush(&mut self) -> std::io::Result<()> { self.io.flush() } } -#[allow(deprecated)] -impl Io for NamedPipe { - fn poll_read(&mut self) -> Async<()> { - ::poll_read(self) - } - - fn poll_write(&mut self) -> Async<()> { - ::poll_write(self) - } -} - impl<'a> Read for &'a NamedPipe { - fn read(&mut self, buf: &mut [u8]) -> io::Result { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { (&self.io).read(buf) } } impl<'a> Write for &'a NamedPipe { - fn write(&mut self, buf: &[u8]) -> io::Result { + fn write(&mut self, buf: &[u8]) -> std::io::Result { (&self.io).write(buf) } - fn flush(&mut self) -> io::Result<()> { + fn flush(&mut self) -> std::io::Result<()> { (&self.io).flush() } } -#[allow(deprecated)] -impl<'a> Io for &'a NamedPipe { - fn poll_read(&mut self) -> Async<()> { - ::poll_read(self) - } - - fn poll_write(&mut self) -> Async<()> { - ::poll_write(self) - } -} - - impl AsyncRead for NamedPipe { unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { false } - fn read_buf(&mut self, buf: &mut B) -> Poll { - if let Async::NotReady = ::poll_read(self) { + fn read_buf(&mut self, buf: &mut B) -> Poll { + if let Async::NotReady = self.io.poll_read_ready(Ready::readable())? { return Ok(Async::NotReady) } let mut stack_buf = [0u8; 1024]; let bytes_read = self.io_mut().read(&mut stack_buf); match bytes_read { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io_mut().need_read(); + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + self.io_mut().clear_read_ready(Ready::readable())?; return Ok(Async::NotReady); }, Err(e) => Err(e), @@ -145,19 +117,19 @@ impl AsyncRead for NamedPipe { } impl AsyncWrite for NamedPipe { - fn shutdown(&mut self) -> Poll<(), io::Error> { + fn shutdown(&mut self) -> Poll<(), std::io::Error> { Ok(().into()) } - fn write_buf(&mut self, buf: &mut B) -> Poll { - if let Async::NotReady = ::poll_write(self) { + fn write_buf(&mut self, buf: &mut B) -> Poll { + if let Async::NotReady = self.io.poll_write_ready()? { return Ok(Async::NotReady) } let bytes_wrt = self.io_mut().write(buf.bytes()); match bytes_wrt { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io_mut().need_write(); + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + self.io_mut().clear_write_ready()?; return Ok(Async::NotReady); }, Err(e) => Err(e),