Merge pull request #5 from mullvad/add-client-support
Add client support
This commit is contained in:
commit
c0f80b4039
|
@ -14,3 +14,6 @@ mio-named-pipes = { git = "https://github.com/alexcrichton/mio-named-pipes" }
|
|||
miow = "0.2"
|
||||
log = "*"
|
||||
bytes = "0.4"
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
winapi = { version = "0.3", features = ["winbase"] }
|
||||
|
|
164
src/lib.rs
164
src/lib.rs
|
@ -5,14 +5,20 @@ extern crate futures;
|
|||
extern crate tokio_uds;
|
||||
extern crate tokio_named_pipes;
|
||||
extern crate tokio_core;
|
||||
|
||||
extern crate tokio_io;
|
||||
extern crate bytes;
|
||||
#[allow(unused_imports)] #[macro_use] extern crate log;
|
||||
|
||||
#[cfg(windows)]
|
||||
#[cfg(windows)]
|
||||
extern crate miow;
|
||||
#[cfg(windows)]
|
||||
extern crate mio_named_pipes;
|
||||
#[cfg(windows)]
|
||||
extern crate winapi;
|
||||
|
||||
use std::io::{self, Read, Write};
|
||||
use std::path::Path;
|
||||
|
||||
use futures::{Async, Poll};
|
||||
use futures::stream::Stream;
|
||||
|
@ -144,13 +150,13 @@ fn replacement_pipe(path: &str, handle: &Handle) -> io::Result<NamedPipe> {
|
|||
}
|
||||
|
||||
impl Stream for Incoming {
|
||||
type Item = (IpcStream, RemoteId);
|
||||
type Item = (IpcConnection, RemoteId);
|
||||
type Error = io::Error;
|
||||
|
||||
#[cfg(not(windows))]
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
|
||||
self.inner.poll().map(|poll| match poll {
|
||||
Async::Ready(Some(val)) => Async::Ready(Some((IpcStream { inner: val.0 }, RemoteId))),
|
||||
Async::Ready(Some(val)) => Async::Ready(Some((IpcConnection { inner: val.0 }, RemoteId))),
|
||||
Async::Ready(None) => Async::Ready(None),
|
||||
Async::NotReady => Async::NotReady,
|
||||
})
|
||||
|
@ -165,15 +171,13 @@ impl Stream for Incoming {
|
|||
io::Error::new(io::ErrorKind::Other, "Cannot spawn event loop handle")
|
||||
)?;
|
||||
Ok(Async::Ready(Some((
|
||||
(
|
||||
IpcStream {
|
||||
IpcConnection {
|
||||
inner: ::std::mem::replace(
|
||||
&mut self.inner.pipe,
|
||||
replacement_pipe(&self.inner.path, &handle)?,
|
||||
)
|
||||
},
|
||||
RemoteId,
|
||||
)
|
||||
))))
|
||||
},
|
||||
Err(e) => {
|
||||
|
@ -189,21 +193,55 @@ impl Stream for Incoming {
|
|||
}
|
||||
}
|
||||
|
||||
/// IPC stream of client connection
|
||||
pub struct IpcStream {
|
||||
/// IPC Connection
|
||||
pub struct IpcConnection {
|
||||
#[cfg(windows)]
|
||||
inner: tokio_named_pipes::NamedPipe,
|
||||
#[cfg(not(windows))]
|
||||
inner: tokio_uds::UnixStream,
|
||||
}
|
||||
|
||||
impl Read for IpcStream {
|
||||
#[deprecated(since="0.1.5", note = "Please use `IpcConnection` instead")]
|
||||
pub type IpcStream = IpcConnection;
|
||||
|
||||
|
||||
impl IpcConnection {
|
||||
pub fn connect<P: AsRef<Path>>(path: P, handle: &Handle) -> io::Result<IpcConnection> {
|
||||
Ok(IpcConnection{
|
||||
inner: Self::connect_inner(path.as_ref(), handle)?,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn connect_inner(path: &Path, handle: &Handle) -> io::Result<tokio_uds::UnixStream> {
|
||||
tokio_uds::UnixStream::connect(&path, &handle)
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
fn connect_inner(path: &Path, handle: &Handle) -> io::Result<NamedPipe> {
|
||||
use std::fs::OpenOptions;
|
||||
use std::os::windows::fs::OpenOptionsExt;
|
||||
use std::os::windows::io::{FromRawHandle, IntoRawHandle};
|
||||
use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
|
||||
miow::pipe::NamedPipe::wait(path, None)?;
|
||||
let mut options = OpenOptions::new();
|
||||
options.read(true)
|
||||
.write(true)
|
||||
.custom_flags(FILE_FLAG_OVERLAPPED);
|
||||
let file = options.open(path)?;
|
||||
let mio_pipe = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(file.into_raw_handle()) };
|
||||
let pipe = NamedPipe::from_pipe(mio_pipe, &handle)?;
|
||||
Ok(pipe)
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for IpcConnection {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
self.inner.read(buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl Write for IpcStream {
|
||||
impl Write for IpcConnection {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.inner.write(buf)
|
||||
}
|
||||
|
@ -213,7 +251,7 @@ impl Write for IpcStream {
|
|||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
impl Io for IpcStream {
|
||||
impl Io for IpcConnection {
|
||||
fn poll_read(&mut self) -> Async<()> {
|
||||
self.inner.poll_read()
|
||||
}
|
||||
|
@ -223,7 +261,7 @@ impl Io for IpcStream {
|
|||
}
|
||||
}
|
||||
|
||||
impl AsyncRead for IpcStream {
|
||||
impl AsyncRead for IpcConnection {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, b: &mut [u8]) -> bool {
|
||||
self.inner.prepare_uninitialized_buffer(b)
|
||||
}
|
||||
|
@ -233,7 +271,7 @@ impl AsyncRead for IpcStream {
|
|||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for IpcStream {
|
||||
impl AsyncWrite for IpcConnection {
|
||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
||||
AsyncWrite::shutdown(&mut self.inner)
|
||||
}
|
||||
|
@ -244,16 +282,23 @@ impl AsyncWrite for IpcStream {
|
|||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[cfg(windows)]
|
||||
mod tests {
|
||||
extern crate rand;
|
||||
|
||||
use std::thread;
|
||||
use tokio_core::reactor::Core;
|
||||
use tokio_core::io::{self, Io};
|
||||
use futures::{Stream, Future};
|
||||
use futures::sync::oneshot;
|
||||
use std::thread;
|
||||
|
||||
use super::Endpoint;
|
||||
use super::IpcConnection;
|
||||
|
||||
#[cfg(not(windows))]
|
||||
fn random_pipe_path() -> String {
|
||||
let num: u64 = self::rand::Rng::gen(&mut rand::thread_rng());
|
||||
format!(r"/tmp/parity-tokio-ipc-test-pipe-{}", num)
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
fn random_pipe_path() -> String {
|
||||
|
@ -261,59 +306,54 @@ mod tests {
|
|||
format!(r"\\.\pipe\my-pipe-{}", num)
|
||||
}
|
||||
|
||||
pub fn dummy_request(addr: &str, buf: &[u8]) -> Vec<u8> {
|
||||
use miow;
|
||||
use std::io::{Read, Write};
|
||||
use std::fs::OpenOptions;
|
||||
|
||||
miow::pipe::NamedPipe::wait(addr, None).unwrap();
|
||||
let mut f = OpenOptions::new().read(true).write(true).open(addr).unwrap();
|
||||
trace!("Connected");
|
||||
f.write_all(buf).unwrap();
|
||||
f.flush().unwrap();
|
||||
trace!("Wrote");
|
||||
|
||||
let mut buf = vec![0u8; 65536];
|
||||
let sz = f.read(&mut buf).unwrap_or_else(|_| { 0 });
|
||||
(&buf[0..sz]).to_vec()
|
||||
fn run_server(path: &str) {
|
||||
let path = path.to_owned();
|
||||
let (ok_signal, ok_rx) = oneshot::channel();
|
||||
thread::spawn(|| {
|
||||
let mut core = Core::new().expect("failed to spawn an event loop");
|
||||
let endpoint = Endpoint::new(path, &core.handle()).expect("failed to open endpoint");
|
||||
ok_signal.send(()).expect("failed to send ok");
|
||||
let srv = endpoint.incoming()
|
||||
.for_each(|(stream, _)| {
|
||||
let (reader, writer) = stream.split();
|
||||
let buf = [0u8; 5];
|
||||
io::read_exact(reader,buf).and_then(move |(_reader, buf)| {
|
||||
let mut reply = vec![];
|
||||
reply.extend(&buf[..]);
|
||||
io::write_all(writer, reply)
|
||||
})
|
||||
.map_err(|e| {trace!("io error: {:?}", e); e })
|
||||
.map(|_| ())
|
||||
})
|
||||
.map_err(|_| ());
|
||||
core.run(srv).expect("server failed");
|
||||
});
|
||||
ok_rx.wait().expect("failed to receive handle")
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(windows)]
|
||||
fn win_smoky() {
|
||||
let path = random_pipe_path(); let path2 = path.clone();
|
||||
fn smoke_test() {
|
||||
let path = random_pipe_path();
|
||||
run_server(&path);
|
||||
let mut core = Core::new().expect("failed to spawn an event loop");
|
||||
let handle = core.handle();
|
||||
|
||||
thread::spawn(move || {
|
||||
let mut core = Core::new().expect("Server event loop should start ok");
|
||||
let endpoint = Endpoint::new(path, &core.handle()).expect("Should be created");
|
||||
let srv = endpoint.incoming()
|
||||
.for_each(|(stream, _)| {
|
||||
trace!("Created connection");
|
||||
let (reader, writer) = stream.split();
|
||||
let buf = [0u8; 6];
|
||||
io::read_exact(reader, buf).and_then(move |(_reader, _buf)| {
|
||||
let mut reply = Vec::new();
|
||||
reply.extend(&_buf[..]);
|
||||
reply.extend(b" - Ok");
|
||||
io::write_all(writer, reply)
|
||||
})
|
||||
.map_err(|e| { trace!("io error: {:?}", e); e })
|
||||
.map(|_| ())
|
||||
})
|
||||
.map(|_| ())
|
||||
.map_err(|e|{ trace!("io error: {:?}", e); () })
|
||||
.boxed();
|
||||
let client = IpcConnection::connect(&path, &handle).expect("failed to open a client");
|
||||
let other_client = IpcConnection::connect(&path, &handle).expect("failed to open a client");
|
||||
let msg = b"hello";
|
||||
|
||||
core.run(srv).expect("Server event loop should finish ok");
|
||||
let mut rx_buf = vec![0u8; msg.len()];
|
||||
let client_fut = io::write_all(client, &msg).and_then(|(client, _)| {
|
||||
io::read_exact(client, &mut rx_buf).map(|(_, buf)| buf)
|
||||
});
|
||||
thread::sleep(::std::time::Duration::from_millis(50));
|
||||
|
||||
let res = dummy_request(&path2, b"Space1");
|
||||
assert_eq!(res, b"Space1 - Ok");
|
||||
let mut rx_buf2 = vec![0u8; msg.len()];
|
||||
let other_client_fut = io::write_all(other_client, &msg).and_then(|(client, _)| {
|
||||
io::read_exact(client, &mut rx_buf2).map(|(_, buf)| buf)
|
||||
});
|
||||
|
||||
let res = dummy_request(&path2, b"Space2");
|
||||
assert_eq!(res, b"Space2 - Ok");
|
||||
let (rx_msg, other_rx_msg) = core.run(client_fut.join(other_client_fut)).expect("failed to read from server");
|
||||
assert_eq!(rx_msg, msg);
|
||||
assert_eq!(other_rx_msg, msg);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue