Add support for specifying security attributes
This commit is contained in:
parent
7c9bbe3bc4
commit
1b82b8da1c
158
src/lib.rs
158
src/lib.rs
|
@ -30,6 +30,12 @@ use bytes::{BufMut, Buf};
|
|||
#[cfg(windows)]
|
||||
use tokio_named_pipes::NamedPipe;
|
||||
|
||||
#[cfg(windows)]
|
||||
use winapi::um::minwinbase::SECURITY_ATTRIBUTES;
|
||||
#[cfg(windows)]
|
||||
use std::ptr;
|
||||
|
||||
|
||||
/// For testing/examples
|
||||
pub fn dummy_endpoint() -> String {
|
||||
extern crate rand;
|
||||
|
@ -57,58 +63,95 @@ pub fn dummy_endpoint() -> String {
|
|||
///
|
||||
/// fn main() {
|
||||
/// let core = Core::new().unwrap();
|
||||
/// let endpoint = Endpoint::new(dummy_endpoint(), &core.handle()).unwrap();
|
||||
/// endpoint.incoming().for_each(|(stream, _)| {
|
||||
/// println!("Connection received");
|
||||
/// future::ok(())
|
||||
/// let endpoint = Endpoint::new(dummy_endpoint());
|
||||
/// endpoint.incoming(core.handle())
|
||||
/// .expect("failed to open a pipe")
|
||||
/// .for_each(|(stream, _)| {
|
||||
/// println!("Connection received");
|
||||
/// future::ok(())
|
||||
/// });
|
||||
/// }
|
||||
/// ```
|
||||
pub struct Endpoint {
|
||||
_path: String,
|
||||
_handle: Handle,
|
||||
#[cfg(not(windows))]
|
||||
inner: tokio_uds::UnixListener,
|
||||
path: String,
|
||||
#[cfg(windows)]
|
||||
inner: NamedPipe,
|
||||
security_attributes: SecurityAttrWrap,
|
||||
}
|
||||
|
||||
impl Endpoint {
|
||||
/// Stream of incoming connections
|
||||
#[cfg(not(windows))]
|
||||
pub fn incoming(self) -> Incoming {
|
||||
Incoming { inner: self.inner.incoming() }
|
||||
pub fn incoming(self, handle: Handle) -> io::Result<Incoming> {
|
||||
Ok(
|
||||
Incoming { inner: self.inner(&handle)?.incoming() }
|
||||
)
|
||||
}
|
||||
|
||||
/// Stream of incoming connections
|
||||
#[cfg(windows)]
|
||||
pub fn incoming(self) -> Incoming {
|
||||
Incoming { inner: NamedPipeSupport { path: self._path, handle: self._handle.remote().clone(), pipe: self.inner } }
|
||||
pub fn incoming(self, handle: Handle) -> io::Result<Incoming> {
|
||||
let pipe = self.inner(&handle)?;
|
||||
Ok(
|
||||
Incoming { inner: NamedPipeSupport { path: self.path, handle: handle.remote().clone(), pipe: pipe, security_attributes: self.security_attributes.0} }
|
||||
)
|
||||
}
|
||||
|
||||
/// Inner platform-dependant state of the endpoint
|
||||
#[cfg(windows)]
|
||||
fn inner(p: &str, handle: &Handle) -> io::Result<NamedPipe> {
|
||||
NamedPipe::new(p, handle)
|
||||
fn inner(&self, handle: &Handle) -> io::Result<NamedPipe> {
|
||||
extern crate mio_named_pipes;
|
||||
use std::os::windows::io::*;
|
||||
use miow::pipe::NamedPipeBuilder;
|
||||
|
||||
let raw_handle = unsafe { NamedPipeBuilder::new(&self.path)
|
||||
.first(true)
|
||||
.inbound(true)
|
||||
.outbound(true)
|
||||
.out_buffer_size(65536)
|
||||
.in_buffer_size(65536)
|
||||
.with_security_attributes(self.security_attributes.0)?
|
||||
.into_raw_handle()};
|
||||
|
||||
let mio_pipe = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(raw_handle) };
|
||||
NamedPipe::from_pipe(mio_pipe, handle)
|
||||
}
|
||||
|
||||
/// Inner platform-dependant state of the endpoint
|
||||
#[cfg(not(windows))]
|
||||
fn inner(p: &str, handle: &Handle) -> io::Result<tokio_uds::UnixListener> {
|
||||
tokio_uds::UnixListener::bind(p, handle)
|
||||
fn inner(&self, handle: &Handle) -> io::Result<tokio_uds::UnixListener> {
|
||||
tokio_uds::UnixListener::bind(&self.path, handle)
|
||||
}
|
||||
|
||||
/// Sets the security attributes of the underlying named pipes. Note that if the pointer to the
|
||||
/// security attributes is not null, it should be valid for the lifetime of this struct and the
|
||||
/// lifetime of the returned `Incoming` struct.
|
||||
#[cfg(windows)]
|
||||
pub unsafe fn set_security_attributes(&mut self, security_attributes: *mut SECURITY_ATTRIBUTES) {
|
||||
self.security_attributes = SecurityAttrWrap(security_attributes);
|
||||
}
|
||||
|
||||
/// Returns the path of the endpoint.
|
||||
pub fn path(&self) -> &str {
|
||||
&self.path
|
||||
}
|
||||
|
||||
|
||||
/// New IPC endpoint at the given path
|
||||
/// Endpoint ready to accept connections immediately
|
||||
pub fn new(path: String, handle: &Handle) -> io::Result<Self> {
|
||||
Ok(Endpoint {
|
||||
inner: Self::inner(&path, handle)?,
|
||||
_path: path,
|
||||
_handle: handle.clone(),
|
||||
})
|
||||
pub fn new(path: String) -> Self {
|
||||
Endpoint {
|
||||
path: path,
|
||||
#[cfg(windows)]
|
||||
security_attributes: SecurityAttrWrap(ptr::null_mut()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This is required as otherwise the pointer to the SECURITY_ATTRIBUTES does not implement Send
|
||||
#[cfg(windows)]
|
||||
struct SecurityAttrWrap ( *mut SECURITY_ATTRIBUTES );
|
||||
#[cfg(windows)]
|
||||
unsafe impl Send for SecurityAttrWrap{}
|
||||
|
||||
/// Remote connection data, if any available
|
||||
pub struct RemoteId;
|
||||
|
||||
|
@ -116,7 +159,35 @@ pub struct RemoteId;
|
|||
struct NamedPipeSupport {
|
||||
path: String,
|
||||
handle: tokio_core::reactor::Remote,
|
||||
pipe: NamedPipe,
|
||||
pipe: NamedPipe,
|
||||
security_attributes: *mut SECURITY_ATTRIBUTES,
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
impl NamedPipeSupport {
|
||||
fn replacement_pipe(&mut self) -> io::Result<NamedPipe> {
|
||||
extern crate mio_named_pipes;
|
||||
|
||||
use std::os::windows::io::*;
|
||||
use miow::pipe::NamedPipeBuilder;
|
||||
|
||||
let ev_handle = &self.handle.handle().ok_or(
|
||||
io::Error::new(io::ErrorKind::Other, "Cannot spawn event loop handle")
|
||||
)?;
|
||||
|
||||
|
||||
let raw_handle = unsafe { NamedPipeBuilder::new(&self.path)
|
||||
.first(false)
|
||||
.inbound(true)
|
||||
.outbound(true)
|
||||
.out_buffer_size(65536)
|
||||
.in_buffer_size(65536)
|
||||
.with_security_attributes(self.security_attributes)?
|
||||
.into_raw_handle()};
|
||||
|
||||
let mio_pipe = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(raw_handle) };
|
||||
NamedPipe::from_pipe(mio_pipe, ev_handle)
|
||||
}
|
||||
}
|
||||
|
||||
/// Stream of incoming connections
|
||||
|
@ -128,27 +199,6 @@ pub struct Incoming {
|
|||
inner: NamedPipeSupport,
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
fn replacement_pipe(path: &str, handle: &Handle) -> io::Result<NamedPipe> {
|
||||
extern crate mio_named_pipes;
|
||||
|
||||
use std::os::windows::io::*;
|
||||
use miow::pipe::NamedPipeBuilder;
|
||||
|
||||
let raw_handle = NamedPipeBuilder::new(path)
|
||||
.first(false)
|
||||
.inbound(true)
|
||||
.outbound(true)
|
||||
.out_buffer_size(65536)
|
||||
.in_buffer_size(65536)
|
||||
.create()?
|
||||
.into_raw_handle();
|
||||
|
||||
let mio_pipe = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(raw_handle) };
|
||||
|
||||
NamedPipe::from_pipe(mio_pipe, handle)
|
||||
}
|
||||
|
||||
impl Stream for Incoming {
|
||||
type Item = (IpcConnection, RemoteId);
|
||||
type Error = io::Error;
|
||||
|
@ -160,21 +210,19 @@ impl Stream for Incoming {
|
|||
Async::Ready(None) => Async::Ready(None),
|
||||
Async::NotReady => Async::NotReady,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
|
||||
match self.inner.pipe.connect() {
|
||||
Ok(()) => {
|
||||
trace!("Incoming connection polled successfully");
|
||||
let handle = &self.inner.handle.handle().ok_or(
|
||||
io::Error::new(io::ErrorKind::Other, "Cannot spawn event loop handle")
|
||||
)?;
|
||||
let new_listener = self.inner.replacement_pipe()?;
|
||||
Ok(Async::Ready(Some((
|
||||
IpcConnection {
|
||||
inner: ::std::mem::replace(
|
||||
&mut self.inner.pipe,
|
||||
replacement_pipe(&self.inner.path, &handle)?,
|
||||
new_listener,
|
||||
)
|
||||
},
|
||||
RemoteId,
|
||||
|
@ -311,10 +359,10 @@ mod tests {
|
|||
let (ok_signal, ok_rx) = oneshot::channel();
|
||||
thread::spawn(|| {
|
||||
let mut core = Core::new().expect("failed to spawn an event loop");
|
||||
let endpoint = Endpoint::new(path, &core.handle()).expect("failed to open endpoint");
|
||||
let endpoint = Endpoint::new(path);
|
||||
let connections = endpoint.incoming(core.handle()).expect("failed to open up a new pipe/socket");
|
||||
ok_signal.send(()).expect("failed to send ok");
|
||||
let srv = endpoint.incoming()
|
||||
.for_each(|(stream, _)| {
|
||||
let srv = connections.for_each(|(stream, _)| {
|
||||
let (reader, writer) = stream.split();
|
||||
let buf = [0u8; 5];
|
||||
io::read_exact(reader,buf).and_then(move |(_reader, buf)| {
|
||||
|
@ -339,7 +387,7 @@ mod tests {
|
|||
let handle = core.handle();
|
||||
|
||||
let client = IpcConnection::connect(&path, &handle).expect("failed to open a client");
|
||||
let other_client = IpcConnection::connect(&path, &handle).expect("failed to open a client");
|
||||
let other_client = IpcConnection::connect(&path, &handle).expect("failed to open a client again");
|
||||
let msg = b"hello";
|
||||
|
||||
let mut rx_buf = vec![0u8; msg.len()];
|
||||
|
|
Loading…
Reference in New Issue