Add IpcStream::connect
This commit is contained in:
parent
2af3e5b6b7
commit
f483184917
87
src/lib.rs
87
src/lib.rs
|
@ -9,10 +9,11 @@ extern crate tokio_io;
|
|||
extern crate bytes;
|
||||
#[allow(unused_imports)] #[macro_use] extern crate log;
|
||||
|
||||
#[cfg(windows)]
|
||||
#[cfg(windows)]
|
||||
extern crate miow;
|
||||
|
||||
use std::io::{self, Read, Write};
|
||||
use std::path::Path;
|
||||
|
||||
use futures::{Async, Poll};
|
||||
use futures::stream::Stream;
|
||||
|
@ -197,6 +198,26 @@ pub struct IpcStream {
|
|||
inner: tokio_uds::UnixStream,
|
||||
}
|
||||
|
||||
|
||||
impl IpcStream {
|
||||
pub fn connect<P: AsRef<Path>>(path: P, handle: &Handle) -> io::Result<IpcStream> {
|
||||
|
||||
Ok(IpcStream{
|
||||
inner: Self::connect_inner(path.as_ref(), handle)?,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn connect_inner(path: &Path, handle: &Handle) -> io::Result<tokio_uds::UnixStream> {
|
||||
tokio_uds::UnixStream::connect(&path, &handle)
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
fn connect_inner(path: &Path, handle: &Handle) -> io::Result<NamedPipe> {
|
||||
NamedPipe::new(&path, &handle)
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for IpcStream {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
self.inner.read(buf)
|
||||
|
@ -255,7 +276,6 @@ mod tests {
|
|||
|
||||
use super::Endpoint;
|
||||
|
||||
#[cfg(windows)]
|
||||
fn random_pipe_path() -> String {
|
||||
let num: u64 = self::rand::Rng::gen(&mut rand::thread_rng());
|
||||
format!(r"\\.\pipe\my-pipe-{}", num)
|
||||
|
@ -279,7 +299,6 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(windows)]
|
||||
fn win_smoky() {
|
||||
let path = random_pipe_path(); let path2 = path.clone();
|
||||
|
||||
|
@ -299,7 +318,7 @@ mod tests {
|
|||
})
|
||||
.map_err(|e| { trace!("io error: {:?}", e); e })
|
||||
.map(|_| ())
|
||||
})
|
||||
; })
|
||||
.map(|_| ())
|
||||
.map_err(|e|{ trace!("io error: {:?}", e); () })
|
||||
.boxed();
|
||||
|
@ -316,4 +335,62 @@ mod tests {
|
|||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
extern crate rand;
|
||||
|
||||
use tokio_core::reactor::Core;
|
||||
use tokio_core::io::{self, Io};
|
||||
use futures::{Stream, Future};
|
||||
|
||||
use super::Endpoint;
|
||||
use super::IpcStream;
|
||||
|
||||
#[cfg(not(windows))]
|
||||
fn random_pipe_path() -> String {
|
||||
let num: u64 = self::rand::Rng::gen(&mut rand::thread_rng());
|
||||
format!(r"/tmp/parity-tokio-ipc-test-pipe-{}", num)
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
fn random_pipe_path() -> String {
|
||||
let num: u64 = self::rand::Rng::gen(&mut rand::thread_rng());
|
||||
format!(r"\\.\pipe\my-pipe-{}", num)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn smoke_test() {
|
||||
let path = random_pipe_path();
|
||||
let mut core = Core::new().expect("failed to start event loop");
|
||||
let handle = core.handle();
|
||||
let endpoint = Endpoint::new(path.to_owned(), &core.handle()).expect("failed to open a server socket");
|
||||
let srv = endpoint.incoming()
|
||||
.for_each(|(stream, _)| {
|
||||
let (reader, writer) = stream.split();
|
||||
let buf = [0u8; 5];
|
||||
io::read_exact(reader,buf).and_then(move |(_reader, buf)| {
|
||||
let mut reply = vec![];
|
||||
reply.extend(&buf[..]);
|
||||
io::write_all(writer, reply)
|
||||
})
|
||||
.map_err(|e| {trace!("io error: {:?}", e); e })
|
||||
.map(|_| ())
|
||||
})
|
||||
.map_err(|_| ())
|
||||
;
|
||||
handle.spawn(srv);
|
||||
let client = IpcStream::connect(&path, &handle).expect("failed to open a client");
|
||||
let msg = b"hello";
|
||||
let mut rx_buf = vec![0u8; msg.len()];
|
||||
let client_fut = io::write_all(client, &msg).and_then(|(client, _)| {
|
||||
io::read_exact(client, &mut rx_buf).map(|(_, buf)| {
|
||||
buf
|
||||
})
|
||||
});
|
||||
|
||||
let received_message = core.run(client_fut).expect("failed to read from server");
|
||||
assert_eq!(received_message, msg);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue