experimental implementation

This commit is contained in:
NikVolf 2017-04-20 23:58:45 +03:00
parent 3a22f8fc9a
commit 0b9b728eae
2 changed files with 71 additions and 7 deletions

View File

@ -12,10 +12,8 @@ Windows named pipe bindings for tokio.
"""
[dependencies]
tokio-core = "0.1.1"
tokio-core = "0.1"
tokio-io = "0.1"
mio-named-pipes = { git = "https://github.com/alexcrichton/mio-named-pipes" }
futures = '0.1.4'
[replace.'mio:0.6.1']
git = "https://github.com/alexcrichton/mio"
branch = 'custom-iocp'
futures = "0.1"
bytes = "0.4"

View File

@ -1,6 +1,8 @@
#![cfg(windows)]
extern crate tokio_core;
extern crate tokio_io;
extern crate bytes;
extern crate mio_named_pipes;
extern crate futures;
@ -9,10 +11,16 @@ use std::fmt;
use std::io::{self, Read, Write};
use std::os::windows::io::*;
use futures::Async;
use futures::{Async, Poll};
#[allow(deprecated)]
use tokio_core::io::Io;
use bytes::{BufMut, Buf};
use tokio_core::reactor::{PollEvented, Handle};
use tokio_io::{AsyncRead, AsyncWrite};
pub struct NamedPipe {
io: PollEvented<mio_named_pipes::NamedPipe>,
}
@ -50,6 +58,10 @@ impl NamedPipe {
pub fn poll_write(&self) -> Async<()> {
self.io.poll_write()
}
fn io_mut(&mut self) -> &mut PollEvented<mio_named_pipes::NamedPipe> {
&mut self.io
}
}
impl Read for NamedPipe {
@ -67,6 +79,7 @@ impl Write for NamedPipe {
}
}
#[allow(deprecated)]
impl Io for NamedPipe {
fn poll_read(&mut self) -> Async<()> {
<NamedPipe>::poll_read(self)
@ -93,6 +106,7 @@ impl<'a> Write for &'a NamedPipe {
}
}
#[allow(deprecated)]
impl<'a> Io for &'a NamedPipe {
fn poll_read(&mut self) -> Async<()> {
<NamedPipe>::poll_read(self)
@ -103,6 +117,58 @@ impl<'a> Io for &'a NamedPipe {
}
}
impl AsyncRead for NamedPipe {
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
false
}
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
if let Async::NotReady = <NamedPipe>::poll_read(self) {
return Ok(Async::NotReady)
}
let mut stack_buf = [0u8; 1024];
let bytes_read = self.io_mut().read(&mut stack_buf);
match bytes_read {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io_mut().need_read();
return Ok(Async::NotReady);
},
Err(e) => Err(e),
Ok(bytes_read) => {
buf.put_slice(&stack_buf[0..bytes_read]);
Ok(Async::Ready(bytes_read))
}
}
}
}
impl AsyncWrite for NamedPipe {
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
if let Async::NotReady = <NamedPipe>::poll_write(self) {
return Ok(Async::NotReady)
}
let bytes_wrt = self.io_mut().write(buf.bytes());
match bytes_wrt {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io_mut().need_write();
return Ok(Async::NotReady);
},
Err(e) => Err(e),
Ok(bytes_wrt) => {
buf.advance(bytes_wrt);
Ok(Async::Ready(bytes_wrt))
}
}
}
}
impl fmt::Debug for NamedPipe {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.io.get_ref().fmt(f)