win test working
This commit is contained in:
parent
e5670df3c5
commit
26cda79da7
|
@ -9,6 +9,7 @@ tokio-named-pipes = { git = "https://github.com/alexcrichton/tokio-named-pipes"
|
|||
futures = "0.1"
|
||||
tokio-core = "0.1"
|
||||
tokio-line = { git = "https://github.com/tokio-rs/tokio-line" }
|
||||
rand = "*"
|
||||
rand = "0.3"
|
||||
mio-named-pipes = { git = "https://github.com/alexcrichton/mio-named-pipes" }
|
||||
miow = "*"
|
||||
miow = "0.2"
|
||||
log = "*"
|
109
src/lib.rs
109
src/lib.rs
|
@ -5,6 +5,10 @@ extern crate futures;
|
|||
extern crate tokio_uds;
|
||||
extern crate tokio_named_pipes;
|
||||
#[macro_use] extern crate tokio_core;
|
||||
#[macro_use] extern crate log;
|
||||
|
||||
#[cfg(windows)]
|
||||
extern crate miow;
|
||||
|
||||
use std::io::{self, Read, Write};
|
||||
|
||||
|
@ -80,6 +84,27 @@ pub struct Incoming {
|
|||
inner: NamedPipeSupport,
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
fn replacement_pipe(path: &str, handle: &Handle) -> io::Result<NamedPipe> {
|
||||
extern crate mio_named_pipes;
|
||||
|
||||
use std::os::windows::io::*;
|
||||
use miow::pipe::NamedPipeBuilder;
|
||||
|
||||
let raw_handle = NamedPipeBuilder::new(path)
|
||||
.first(false)
|
||||
.inbound(true)
|
||||
.outbound(true)
|
||||
.out_buffer_size(65536)
|
||||
.in_buffer_size(65536)
|
||||
.create()?
|
||||
.into_raw_handle();
|
||||
|
||||
let mio_pipe = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(raw_handle) };
|
||||
|
||||
NamedPipe::from_pipe(mio_pipe, handle)
|
||||
}
|
||||
|
||||
impl Stream for Incoming {
|
||||
type Item = (IpcStream, RemoteId);
|
||||
type Error = io::Error;
|
||||
|
@ -95,25 +120,42 @@ impl Stream for Incoming {
|
|||
|
||||
#[cfg(windows)]
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
|
||||
use tokio_core::reactor::Timeout;
|
||||
use futures::{future, Future};
|
||||
|
||||
match self.inner.pipe.connect() {
|
||||
Ok(()) => Ok(Async::Ready(Some((
|
||||
(
|
||||
IpcStream {
|
||||
inner: ::std::mem::replace(
|
||||
&mut self.inner.pipe,
|
||||
NamedPipe::new(&self.inner.path,
|
||||
&self.inner.handle.handle().ok_or(
|
||||
io::Error::new(io::ErrorKind::Other, "Cannot spawn event loop handle")
|
||||
)?
|
||||
)?,
|
||||
)
|
||||
},
|
||||
RemoteId,
|
||||
)
|
||||
)))),
|
||||
Ok(()) => {
|
||||
trace!("Incoming connection polled successfully");
|
||||
let handle = &self.inner.handle.handle().ok_or(
|
||||
io::Error::new(io::ErrorKind::Other, "Cannot spawn event loop handle")
|
||||
)?;
|
||||
Ok(Async::Ready(Some((
|
||||
(
|
||||
IpcStream {
|
||||
inner: ::std::mem::replace(
|
||||
&mut self.inner.pipe,
|
||||
replacement_pipe(&self.inner.path, &handle)?,
|
||||
)
|
||||
},
|
||||
RemoteId,
|
||||
)
|
||||
))))
|
||||
},
|
||||
Err(e) => {
|
||||
if e.kind() == io::ErrorKind::WouldBlock {
|
||||
::futures::task::park();
|
||||
trace!("Incoming connection was to block, added timeout");
|
||||
let task = ::futures::task::park();
|
||||
let handle = &self.inner.handle.handle().ok_or(
|
||||
io::Error::new(io::ErrorKind::Other, "Cannot spawn event loop handle")
|
||||
)?;
|
||||
handle.spawn(
|
||||
Timeout::new(::std::time::Duration::from_millis(100), &handle)?
|
||||
.then(move |_| {
|
||||
task.unpark();
|
||||
trace!("Unparked connection task");
|
||||
future::ok(())
|
||||
})
|
||||
);
|
||||
Ok(Async::NotReady)
|
||||
} else {
|
||||
Err(e)
|
||||
|
@ -163,9 +205,9 @@ mod tests {
|
|||
extern crate tokio_line;
|
||||
|
||||
use std::thread;
|
||||
use tokio_core::reactor::{Core, Handle};
|
||||
use tokio_core::reactor::Core;
|
||||
use tokio_core::io::{self, Io};
|
||||
use futures::{future, Stream, Sink, Future};
|
||||
use futures::{Stream, Future};
|
||||
|
||||
use super::Endpoint;
|
||||
|
||||
|
@ -176,17 +218,16 @@ mod tests {
|
|||
}
|
||||
|
||||
pub fn dummy_request(addr: &str, buf: &[u8]) -> Vec<u8> {
|
||||
extern crate miow;
|
||||
|
||||
use miow;
|
||||
use std::io::{Read, Write};
|
||||
use std::fs::OpenOptions;
|
||||
|
||||
miow::pipe::NamedPipe::wait(addr, None).unwrap();
|
||||
let mut f = OpenOptions::new().read(true).write(true).open(addr).unwrap();
|
||||
println!("Connected");
|
||||
trace!("Connected");
|
||||
f.write_all(buf).unwrap();
|
||||
f.flush().unwrap();
|
||||
println!("Wrote");
|
||||
trace!("Wrote");
|
||||
|
||||
let mut buf = vec![0u8; 65536];
|
||||
let sz = f.read(&mut buf).unwrap_or_else(|_| { 0 });
|
||||
|
@ -199,25 +240,35 @@ mod tests {
|
|||
let path = random_pipe_path(); let path2 = path.clone();
|
||||
|
||||
thread::spawn(move || {
|
||||
let mut core = Core::new().unwrap();
|
||||
let mut core = Core::new().expect("Server event loop should start ok");
|
||||
let endpoint = Endpoint::new(path, &core.handle()).expect("Should be created");
|
||||
let srv = endpoint.incoming()
|
||||
.for_each(|(stream, _)| {
|
||||
println!("Created connection");
|
||||
trace!("Created connection");
|
||||
let (reader, writer) = stream.split();
|
||||
let mut buf = Vec::new();
|
||||
io::read_to_end(reader, buf).and_then(move |(reader, buf)| io::write_all(writer, "Ok"))
|
||||
let buf = [0u8; 6];
|
||||
io::read_exact(reader, buf).and_then(move |(_reader, _buf)| {
|
||||
let mut reply = Vec::new();
|
||||
reply.extend(&_buf[..]);
|
||||
reply.extend(b" - Ok");
|
||||
io::write_all(writer, reply)
|
||||
})
|
||||
.map_err(|e| { trace!("io error: {:?}", e); e })
|
||||
.map(|_| ())
|
||||
})
|
||||
.map(|_| ())
|
||||
.map_err(|_e| ())
|
||||
.map_err(|e|{ trace!("io error: {:?}", e); () })
|
||||
.boxed();
|
||||
|
||||
core.run(srv).unwrap();
|
||||
core.run(srv).expect("Server event loop should finish ok");
|
||||
});
|
||||
thread::sleep(::std::time::Duration::from_millis(50));
|
||||
|
||||
dummy_request(&path2, b"Space\n\r");
|
||||
let res = dummy_request(&path2, b"Space1");
|
||||
assert_eq!(res, b"Space1 - Ok");
|
||||
|
||||
let res = dummy_request(&path2, b"Space2");
|
||||
assert_eq!(res, b"Space2 - Ok");
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue