This commit is contained in:
NikVolf 2017-03-14 18:54:10 +03:00
commit 74604d5199
3 changed files with 163 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
target
Cargo.lock

10
Cargo.toml Normal file
View File

@ -0,0 +1,10 @@
[package]
name = "parity-tokio-ipc"
version = "0.1.0"
authors = ["NikVolf <nikvolf@gmail.com>"]
[dependencies]
tokio-uds = "0.1"
tokio-named-pipes = { git = "https://github.com/alexcrichton/tokio-named-pipes" }
futures = "0.1"
tokio-core = "0.1"

151
src/lib.rs Normal file
View File

@ -0,0 +1,151 @@
extern crate futures;
extern crate tokio_uds;
extern crate tokio_named_pipes;
#[macro_use] extern crate tokio_core;
use std::io::{self, Read, Write};
use futures::{Async, Poll};
use futures::stream::Stream;
use tokio_core::io::Io;
use tokio_core::reactor::Handle;
#[cfg(windows)]
use tokio_named_pipes::NamedPipe;
pub struct Endpoint {
path: String,
_handle: Handle,
#[cfg(not(windows))]
inner: tokio_uds::UnixListener,
#[cfg(windows)]
inner: NamedPipe,
}
impl Endpoint {
#[cfg(not(windows))]
pub fn incoming(self) -> Incoming {
Incoming { inner: self.inner.incoming() }
}
#[cfg(windows)]
pub fn incoming(self) -> Incoming {
Incoming { inner: NamedPipeSupport { path: self.path, handle: self._handle, pipe: self.inner } }
}
#[cfg(windows)]
fn inner(p: &str, handle: &Handle) -> io::Result<NamedPipe> {
NamedPipe::new(p, handle)
}
#[cfg(not(windows))]
fn inner(p: &str, handle: &Handle) -> io::Result<tokio_uds::UnixListener> {
tokio_uds::UnixListener::bind(p, handle)
}
pub fn new(path: String, handle: &Handle) -> io::Result<Self> {
Ok(Endpoint {
inner: Self::inner(&path, handle)?,
path: path,
_handle: handle.clone(),
})
}
pub fn path(&self) -> &str { &self.path }
}
pub struct RemoteId;
#[cfg(windows)]
struct NamedPipeSupport {
path: String,
handle: Handle,
pipe: NamedPipe,
}
pub struct Incoming {
#[cfg(not(windows))]
inner: ::tokio_core::io::IoStream<(tokio_uds::UnixStream, std::os::unix::net::SocketAddr)>,
#[cfg(windows)]
inner: NamedPipeSupport,
}
impl Stream for Incoming {
type Item = (IpcStream, RemoteId);
type Error = io::Error;
#[cfg(not(windows))]
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
self.inner.poll().map(|poll| match poll {
Async::Ready(Some(val)) => Async::Ready(Some((IpcStream { inner: val.0 }, RemoteId))),
Async::Ready(None) => Async::Ready(None),
Async::NotReady => Async::NotReady,
})
}
#[cfg(windows)]
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
match self.inner.pipe.connect() {
Ok(()) => Ok(Async::Ready(Some((
(
IpcStream {
inner: ::std::mem::replace(
&mut self.inner.pipe,
NamedPipe::new(&self.inner.path, &self.inner.handle)?,
)
},
RemoteId,
)
)))),
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
::futures::task::park();
Ok(Async::NotReady)
} else {
Err(e)
}
},
}
}
}
pub struct IpcStream {
#[cfg(windows)]
inner: tokio_named_pipes::NamedPipe,
#[cfg(not(windows))]
inner: tokio_uds::UnixStream,
}
impl Read for IpcStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.read(buf)
}
}
impl Write for IpcStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}
impl Io for IpcStream {
fn poll_read(&mut self) -> Async<()> {
self.inner.poll_read()
}
fn poll_write(&mut self) -> Async<()> {
self.inner.poll_write()
}
}
#[cfg(test)]
mod tests {
#[test]
fn create() {
}
}