Add `handle` argument to named pipe creation-related methods.
This commit is contained in:
parent
80d9794c09
commit
d42979982e
|
@ -5,7 +5,7 @@ authors = ["NikVolf <nikvolf@gmail.com>"]
|
|||
|
||||
[dependencies]
|
||||
tokio-uds = "0.2"
|
||||
tokio-named-pipes = { version = "0.2", git = "https://github.com/poanetwork/tokio-named-pipes" }
|
||||
tokio-named-pipes = { version = "0.2", git = "https://github.com/nikvolf/tokio-named-pipes" }
|
||||
futures = "0.1"
|
||||
tokio = "0.1"
|
||||
rand = "0.3"
|
||||
|
|
110
src/lib.rs
110
src/lib.rs
|
@ -19,9 +19,9 @@ extern crate winapi;
|
|||
use std::io::{self, Read, Write};
|
||||
use std::path::Path;
|
||||
|
||||
#[allow(unused_imports)]
|
||||
use futures::{stream::Stream, Async, Poll, Future};
|
||||
use futures::{stream::Stream, Async, Poll};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::reactor::Handle;
|
||||
use bytes::{BufMut, Buf};
|
||||
|
||||
#[cfg(windows)]
|
||||
|
@ -38,6 +38,9 @@ mod unix_permissions;
|
|||
#[cfg(unix)]
|
||||
pub use unix_permissions::SecurityAttributes;
|
||||
|
||||
#[cfg(windows)]
|
||||
const PIPE_AVAILABILITY_TIMEOUT: u64 = 5000;
|
||||
|
||||
/// For testing/examples
|
||||
pub fn dummy_endpoint() -> String {
|
||||
extern crate rand;
|
||||
|
@ -54,7 +57,7 @@ pub fn dummy_endpoint() -> String {
|
|||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```rust
|
||||
/// ```
|
||||
/// extern crate tokio;
|
||||
/// extern crate futures;
|
||||
/// extern crate parity_tokio_ipc;
|
||||
|
@ -63,8 +66,11 @@ pub fn dummy_endpoint() -> String {
|
|||
/// use futures::{future, Future, Stream};
|
||||
///
|
||||
/// fn main() {
|
||||
/// let runtime = tokio::runtime::Runtime::new()
|
||||
/// .expect("Error creating tokio runtime");
|
||||
/// let handle = runtime.reactor();
|
||||
/// let endpoint = Endpoint::new(dummy_endpoint());
|
||||
/// let server = endpoint.incoming()
|
||||
/// let server = endpoint.incoming(handle)
|
||||
/// .expect("failed to open up a new pipe/socket")
|
||||
/// .for_each(|(_stream, _remote_id)| {
|
||||
/// println!("Connection received");
|
||||
|
@ -82,24 +88,25 @@ pub struct Endpoint {
|
|||
impl Endpoint {
|
||||
/// Stream of incoming connections
|
||||
#[cfg(not(windows))]
|
||||
pub fn incoming(self) -> io::Result<Incoming> {
|
||||
pub fn incoming(self, handle: &Handle) -> io::Result<Incoming> {
|
||||
Ok(
|
||||
Incoming { inner: self.inner()?.incoming() }
|
||||
Incoming { inner: self.inner(handle)?.incoming() }
|
||||
)
|
||||
}
|
||||
|
||||
/// Stream of incoming connections
|
||||
#[cfg(windows)]
|
||||
pub fn incoming(mut self) -> io::Result<Incoming> {
|
||||
let pipe = self.inner()?;
|
||||
pub fn incoming(mut self, handle: &Handle) -> io::Result<Incoming> {
|
||||
let pipe = self.inner(handle)?;
|
||||
Ok(
|
||||
Incoming { inner: NamedPipeSupport { path: self.path, pipe: pipe, security_attributes: self.security_attributes} }
|
||||
Incoming { inner: NamedPipeSupport { path: self.path, handle: handle.clone(),
|
||||
pipe: pipe, security_attributes: self.security_attributes} }
|
||||
)
|
||||
}
|
||||
|
||||
/// Inner platform-dependant state of the endpoint
|
||||
#[cfg(windows)]
|
||||
fn inner(&mut self) -> io::Result<NamedPipe> {
|
||||
fn inner(&mut self, handle: &Handle) -> io::Result<NamedPipe> {
|
||||
extern crate mio_named_pipes;
|
||||
use std::os::windows::io::*;
|
||||
use miow::pipe::NamedPipeBuilder;
|
||||
|
@ -114,12 +121,12 @@ impl Endpoint {
|
|||
.into_raw_handle()};
|
||||
|
||||
let mio_pipe = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(raw_handle) };
|
||||
NamedPipe::from_pipe(mio_pipe)
|
||||
NamedPipe::from_pipe(mio_pipe, handle)
|
||||
}
|
||||
|
||||
/// Inner platform-dependant state of the endpoint
|
||||
#[cfg(not(windows))]
|
||||
fn inner(&self) -> io::Result<tokio_uds::UnixListener> {
|
||||
fn inner(&self, _handle: &Handle) -> io::Result<tokio_uds::UnixListener> {
|
||||
tokio_uds::UnixListener::bind(&self.path)
|
||||
}
|
||||
|
||||
|
@ -132,7 +139,6 @@ impl Endpoint {
|
|||
&self.path
|
||||
}
|
||||
|
||||
|
||||
/// New IPC endpoint at the given path
|
||||
pub fn new(path: String) -> Self {
|
||||
Endpoint {
|
||||
|
@ -148,6 +154,7 @@ pub struct RemoteId;
|
|||
#[cfg(windows)]
|
||||
struct NamedPipeSupport {
|
||||
path: String,
|
||||
handle: Handle,
|
||||
pipe: NamedPipe,
|
||||
security_attributes: SecurityAttributes,
|
||||
}
|
||||
|
@ -170,7 +177,7 @@ impl NamedPipeSupport {
|
|||
.into_raw_handle()};
|
||||
|
||||
let mio_pipe = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(raw_handle) };
|
||||
NamedPipe::from_pipe(mio_pipe)
|
||||
NamedPipe::from_pipe(mio_pipe, &self.handle)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -233,33 +240,34 @@ pub struct IpcConnection {
|
|||
}
|
||||
|
||||
impl IpcConnection {
|
||||
pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<IpcConnection> {
|
||||
pub fn connect<P: AsRef<Path>>(path: P, handle: &Handle) -> io::Result<IpcConnection> {
|
||||
Ok(IpcConnection{
|
||||
inner: Self::connect_inner(path.as_ref())?,
|
||||
inner: Self::connect_inner(path.as_ref(), handle)?,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn connect_inner(path: &Path) -> io::Result<tokio_uds::UnixStream> {
|
||||
fn connect_inner(path: &Path, _handle: &Handle) -> io::Result<tokio_uds::UnixStream> {
|
||||
use futures::Future;
|
||||
tokio_uds::UnixStream::connect(&path).wait()
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
fn connect_inner(path: &Path) -> io::Result<NamedPipe> {
|
||||
fn connect_inner(path: &Path, handle: &Handle) -> io::Result<NamedPipe> {
|
||||
use std::fs::OpenOptions;
|
||||
use std::os::windows::fs::OpenOptionsExt;
|
||||
use std::os::windows::io::{FromRawHandle, IntoRawHandle};
|
||||
use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
|
||||
|
||||
// Wait for the pipe to become available and fail after 3 seconds.
|
||||
miow::pipe::NamedPipe::wait(path, Some(std::time::Duration::from_millis(3000)))?;
|
||||
// Wait for the pipe to become available or fail after 5 seconds.
|
||||
miow::pipe::NamedPipe::wait(path, Some(std::time::Duration::from_millis(PIPE_AVAILABILITY_TIMEOUT))).unwrap();
|
||||
let mut options = OpenOptions::new();
|
||||
options.read(true)
|
||||
.write(true)
|
||||
.custom_flags(FILE_FLAG_OVERLAPPED);
|
||||
let file = options.open(path)?;
|
||||
let mio_pipe = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(file.into_raw_handle()) };
|
||||
let pipe = NamedPipe::from_pipe(mio_pipe)?;
|
||||
let mio_pipe = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(file.into_raw_handle()) };
|
||||
let pipe = NamedPipe::from_pipe(mio_pipe, handle)?;
|
||||
Ok(pipe)
|
||||
}
|
||||
}
|
||||
|
@ -303,9 +311,8 @@ impl AsyncWrite for IpcConnection {
|
|||
mod tests {
|
||||
extern crate rand;
|
||||
|
||||
use tokio::{self, io::{self, AsyncRead}};
|
||||
use futures::{Stream, Future};
|
||||
use futures::sync::oneshot;
|
||||
use tokio::{self, io::{self, AsyncRead}, runtime::TaskExecutor, reactor::Handle};
|
||||
use futures::{sync::oneshot, Stream, Future};
|
||||
use std::thread;
|
||||
|
||||
use super::Endpoint;
|
||||
|
@ -325,13 +332,12 @@ mod tests {
|
|||
format!(r"\\.\pipe\my-pipe-{}", num)
|
||||
}
|
||||
|
||||
fn run_server(path: &str) {
|
||||
fn run_server(path: &str, exec: TaskExecutor, handle: Handle) {
|
||||
let path = path.to_owned();
|
||||
let (ok_signal, ok_rx) = oneshot::channel();
|
||||
thread::spawn(|| {
|
||||
thread::spawn(move || {
|
||||
let endpoint = Endpoint::new(path);
|
||||
let connections = endpoint.incoming().expect("failed to open up a new pipe/socket");
|
||||
ok_signal.send(()).expect("failed to send ok");
|
||||
let connections = endpoint.incoming(&handle).expect("failed to open up a new pipe/socket");
|
||||
let srv = connections.for_each(|(stream, _)| {
|
||||
let (reader, writer) = stream.split();
|
||||
let buf = [0u8; 5];
|
||||
|
@ -344,7 +350,9 @@ mod tests {
|
|||
.map(|_| ())
|
||||
})
|
||||
.map_err(|_| ());
|
||||
tokio::run(srv);
|
||||
exec.spawn(srv);
|
||||
ok_signal.send(()).expect("failed to send ok");
|
||||
println!("Server running.");
|
||||
});
|
||||
ok_rx.wait().expect("failed to receive handle")
|
||||
}
|
||||
|
@ -352,39 +360,55 @@ mod tests {
|
|||
// NOTE: Intermittently fails or stalls on windows.
|
||||
#[test]
|
||||
fn smoke_test() {
|
||||
let path = random_pipe_path();
|
||||
run_server(&path);
|
||||
let mut runtime = tokio::runtime::Runtime::new().expect("Error creating tokio runtime");
|
||||
let exec = runtime.executor();
|
||||
let handle = runtime.reactor().clone();
|
||||
|
||||
let client = IpcConnection::connect(&path).expect("failed to open a client");
|
||||
let other_client = IpcConnection::connect(&path).expect("failed to open a client again");
|
||||
let path = random_pipe_path();
|
||||
|
||||
run_server(&path, exec, handle.clone());
|
||||
|
||||
println!("Connecting to client 0...");
|
||||
let client_0 = IpcConnection::connect(&path, &handle).expect("failed to open client_0");
|
||||
println!("Connecting to client 1...");
|
||||
let client_1 = IpcConnection::connect(&path, &handle).expect("failed to open client_1");
|
||||
let msg = b"hello";
|
||||
|
||||
let rx_buf = vec![0u8; msg.len()];
|
||||
let client_fut = io::write_all(client, msg).and_then(move |(client, _)| {
|
||||
io::read_exact(client, rx_buf).map(|(_, buf)| buf)
|
||||
});
|
||||
let client_0_fut = io::write_all(client_0, msg)
|
||||
.map_err(|err| panic!("Client 0 write error: {:?}", err))
|
||||
.and_then(move |(client, _)| {
|
||||
io::read_exact(client, rx_buf).map(|(_, buf)| buf)
|
||||
.map_err(|err| panic!("Client 0 read error: {:?}", err))
|
||||
});
|
||||
|
||||
let rx_buf2 = vec![0u8; msg.len()];
|
||||
let other_client_fut = io::write_all(other_client, msg).and_then(move |(client, _)| {
|
||||
io::read_exact(client, rx_buf2).map(|(_, buf)| buf)
|
||||
});
|
||||
let client_1_fut = io::write_all(client_1, msg)
|
||||
.map_err(|err| panic!("Client 1 write error: {:?}", err))
|
||||
.and_then(move |(client, _)| {
|
||||
io::read_exact(client, rx_buf2).map(|(_, buf)| buf)
|
||||
.map_err(|err| panic!("Client 1 read error: {:?}", err))
|
||||
});
|
||||
|
||||
let test_fut = client_fut.join(other_client_fut).and_then(move |(rx_msg, other_rx_msg)| {
|
||||
let fut = client_0_fut.join(client_1_fut).and_then(move |(rx_msg, other_rx_msg)| {
|
||||
assert_eq!(rx_msg, msg);
|
||||
assert_eq!(other_rx_msg, msg);
|
||||
Ok(())
|
||||
}).map_err(|err| panic!("Smoke test error: {:?}", err));
|
||||
|
||||
tokio::run(test_fut);
|
||||
runtime.block_on(fut).expect("Runtime error")
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
fn create_pipe_with_permissions(attr: SecurityAttributes) -> ::std::io::Result<()> {
|
||||
let runtime = tokio::runtime::Runtime::new().expect("Error creating tokio runtime");
|
||||
let handle = runtime.reactor();
|
||||
|
||||
let path = random_pipe_path();
|
||||
|
||||
let mut endpoint = Endpoint::new(path);
|
||||
endpoint.set_security_attributes(attr);
|
||||
endpoint.incoming().map(|_| ())
|
||||
endpoint.incoming(handle).map(|_| ())
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
|
|
Loading…
Reference in New Issue