Migrate from `tokio_core` to `tokio`.

This commit is contained in:
Nick 2018-08-09 09:29:10 -07:00 committed by c0gent
parent b4a6cdf7d9
commit 1147824a0f
No known key found for this signature in database
GPG Key ID: 9CC25E71A743E892
2 changed files with 67 additions and 92 deletions

View File

@ -4,11 +4,10 @@ version = "0.1.5"
authors = ["NikVolf <nikvolf@gmail.com>"]
[dependencies]
tokio-uds = "0.1"
tokio-named-pipes = { git = "https://github.com/nikvolf/tokio-named-pipes", branch = "stable" }
tokio-uds = "0.2"
tokio-named-pipes = { version = "0.2", git = "https://github.com/poanetwork/tokio-named-pipes" }
futures = "0.1"
tokio-core = "0.1"
tokio-io = "0.1"
tokio = "0.1"
rand = "0.3"
mio-named-pipes = { git = "https://github.com/alexcrichton/mio-named-pipes" }
miow = "0.3.2"

View File

@ -1,12 +1,11 @@
//! Tokio IPC transport. Under the hood uses Unix Domain Sockets for Linux/Mac
//! and Named Pipes for Windows.
//! Tokio IPC transport. Under the hood uses Unix Domain Sockets for Linux/Mac
//! and Named Pipes for Windows.
extern crate futures;
extern crate tokio_uds;
extern crate tokio_named_pipes;
extern crate tokio_core;
extern crate tokio;
extern crate tokio_io;
extern crate bytes;
#[allow(unused_imports)] #[macro_use] extern crate log;
@ -20,11 +19,9 @@ extern crate winapi;
use std::io::{self, Read, Write};
use std::path::Path;
use futures::{Async, Poll};
use futures::stream::Stream;
#[allow(deprecated)] use tokio_core::io::Io;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::reactor::Handle;
#[allow(unused_imports)]
use futures::{stream::Stream, Async, Poll, Future};
use tokio::io::{AsyncRead, AsyncWrite};
use bytes::{BufMut, Buf};
#[cfg(windows)]
@ -57,24 +54,24 @@ pub fn dummy_endpoint() -> String {
///
/// # Examples
///
/// ```
/// extern crate tokio_core;
/// ```rust
/// extern crate tokio;
/// extern crate futures;
/// extern crate parity_tokio_ipc;
///
/// use parity_tokio_ipc::{Endpoint, dummy_endpoint};
/// use tokio_core::reactor::Core;
/// use futures::{future, Stream};
/// use futures::{future, Future, Stream};
///
/// fn main() {
/// let core = Core::new().unwrap();
/// let endpoint = Endpoint::new(dummy_endpoint());
/// endpoint.incoming(core.handle())
/// .expect("failed to open a pipe")
/// .for_each(|(stream, _)| {
/// let server = endpoint.incoming()
/// .expect("failed to open up a new pipe/socket")
/// .for_each(|(_stream, _remote_id)| {
/// println!("Connection received");
/// future::ok(())
/// });
/// })
/// .map_err(|err| panic!("Endpoint connection error: {:?}", err));
/// // ... run server etc.
/// }
/// ```
pub struct Endpoint {
@ -85,24 +82,24 @@ pub struct Endpoint {
impl Endpoint {
/// Stream of incoming connections
#[cfg(not(windows))]
pub fn incoming(self, handle: Handle) -> io::Result<Incoming> {
pub fn incoming(self) -> io::Result<Incoming> {
Ok(
Incoming { inner: self.inner(&handle)?.incoming() }
Incoming { inner: self.inner()?.incoming() }
)
}
/// Stream of incoming connections
/// Stream of incoming connections
#[cfg(windows)]
pub fn incoming(mut self, handle: Handle) -> io::Result<Incoming> {
let pipe = self.inner(&handle)?;
pub fn incoming(mut self) -> io::Result<Incoming> {
let pipe = self.inner()?;
Ok(
Incoming { inner: NamedPipeSupport { path: self.path, handle: handle.remote().clone(), pipe: pipe, security_attributes: self.security_attributes} }
Incoming { inner: NamedPipeSupport { path: self.path, pipe: pipe, security_attributes: self.security_attributes} }
)
}
/// Inner platform-dependant state of the endpoint
#[cfg(windows)]
fn inner(&mut self, handle: &Handle) -> io::Result<NamedPipe> {
fn inner(&mut self) -> io::Result<NamedPipe> {
extern crate mio_named_pipes;
use std::os::windows::io::*;
use miow::pipe::NamedPipeBuilder;
@ -117,13 +114,13 @@ impl Endpoint {
.into_raw_handle()};
let mio_pipe = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(raw_handle) };
NamedPipe::from_pipe(mio_pipe, handle)
NamedPipe::from_pipe(mio_pipe)
}
/// Inner platform-dependant state of the endpoint
#[cfg(not(windows))]
fn inner(&self, handle: &Handle) -> io::Result<tokio_uds::UnixListener> {
tokio_uds::UnixListener::bind(&self.path, handle)
fn inner(&self) -> io::Result<tokio_uds::UnixListener> {
tokio_uds::UnixListener::bind(&self.path)
}
pub fn set_security_attributes(&mut self, security_attributes: SecurityAttributes) {
@ -151,7 +148,6 @@ pub struct RemoteId;
#[cfg(windows)]
struct NamedPipeSupport {
path: String,
handle: tokio_core::reactor::Remote,
pipe: NamedPipe,
security_attributes: SecurityAttributes,
}
@ -164,11 +160,6 @@ impl NamedPipeSupport {
use std::os::windows::io::*;
use miow::pipe::NamedPipeBuilder;
let ev_handle = &self.handle.handle().ok_or(
io::Error::new(io::ErrorKind::Other, "Cannot spawn event loop handle")
)?;
let raw_handle = unsafe { NamedPipeBuilder::new(&self.path)
.first(false)
.inbound(true)
@ -179,15 +170,14 @@ impl NamedPipeSupport {
.into_raw_handle()};
let mio_pipe = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(raw_handle) };
NamedPipe::from_pipe(mio_pipe, ev_handle)
NamedPipe::from_pipe(mio_pipe)
}
}
/// Stream of incoming connections
pub struct Incoming {
#[cfg(not(windows))]
#[allow(deprecated)]
inner: ::tokio_core::io::IoStream<(tokio_uds::UnixStream, std::os::unix::net::SocketAddr)>,
inner: tokio_uds::Incoming,
#[cfg(windows)]
inner: NamedPipeSupport,
}
@ -199,7 +189,7 @@ impl Stream for Incoming {
#[cfg(not(windows))]
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
self.inner.poll().map(|poll| match poll {
Async::Ready(Some(val)) => Async::Ready(Some((IpcConnection { inner: val.0 }, RemoteId))),
Async::Ready(Some(val)) => Async::Ready(Some((IpcConnection { inner: val }, RemoteId))),
Async::Ready(None) => Async::Ready(None),
Async::NotReady => Async::NotReady,
})
@ -212,58 +202,55 @@ impl Stream for Incoming {
trace!("Incoming connection polled successfully");
let new_listener = self.inner.replacement_pipe()?;
Ok(Async::Ready(Some((
IpcConnection {
IpcConnection {
inner: ::std::mem::replace(
&mut self.inner.pipe,
&mut self.inner.pipe,
new_listener,
)
},
)
},
RemoteId,
))))
},
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
trace!("Incoming connection was to block, waiting for connection to become writeable");
self.inner.pipe.poll_write();
self.inner.pipe.poll_write_ready()?;
Ok(Async::NotReady)
} else {
Err(e)
}
},
}
}
}
}
/// IPC Connection
pub struct IpcConnection {
#[cfg(windows)]
inner: tokio_named_pipes::NamedPipe,
#[cfg(not(windows))]
inner: tokio_uds::UnixStream,
#[cfg(windows)]
inner: tokio_named_pipes::NamedPipe,
}
#[deprecated(since="0.1.5", note = "Please use `IpcConnection` instead")]
pub type IpcStream = IpcConnection;
impl IpcConnection {
pub fn connect<P: AsRef<Path>>(path: P, handle: &Handle) -> io::Result<IpcConnection> {
pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<IpcConnection> {
Ok(IpcConnection{
inner: Self::connect_inner(path.as_ref(), handle)?,
inner: Self::connect_inner(path.as_ref())?,
})
}
#[cfg(unix)]
fn connect_inner(path: &Path, handle: &Handle) -> io::Result<tokio_uds::UnixStream> {
tokio_uds::UnixStream::connect(&path, &handle)
fn connect_inner(path: &Path) -> io::Result<tokio_uds::UnixStream> {
tokio_uds::UnixStream::connect(&path).wait()
}
#[cfg(windows)]
fn connect_inner(path: &Path, handle: &Handle) -> io::Result<NamedPipe> {
fn connect_inner(path: &Path) -> io::Result<NamedPipe> {
use std::fs::OpenOptions;
use std::os::windows::fs::OpenOptionsExt;
use std::os::windows::io::{FromRawHandle, IntoRawHandle};
use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
miow::pipe::NamedPipe::wait(path, None)?;
let mut options = OpenOptions::new();
options.read(true)
@ -271,7 +258,7 @@ impl IpcConnection {
.custom_flags(FILE_FLAG_OVERLAPPED);
let file = options.open(path)?;
let mio_pipe = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(file.into_raw_handle()) };
let pipe = NamedPipe::from_pipe(mio_pipe, &handle)?;
let pipe = NamedPipe::from_pipe(mio_pipe)?;
Ok(pipe)
}
}
@ -279,7 +266,7 @@ impl IpcConnection {
impl Read for IpcConnection {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.read(buf)
}
}
}
impl Write for IpcConnection {
@ -291,17 +278,6 @@ impl Write for IpcConnection {
}
}
#[allow(deprecated)]
impl Io for IpcConnection {
fn poll_read(&mut self) -> Async<()> {
self.inner.poll_read()
}
fn poll_write(&mut self) -> Async<()> {
self.inner.poll_write()
}
}
impl AsyncRead for IpcConnection {
unsafe fn prepare_uninitialized_buffer(&self, b: &mut [u8]) -> bool {
self.inner.prepare_uninitialized_buffer(b)
@ -326,8 +302,7 @@ impl AsyncWrite for IpcConnection {
mod tests {
extern crate rand;
use tokio_core::reactor::Core;
use tokio_core::io::{self, Io};
use tokio::{self, io::{self, AsyncRead}};
use futures::{Stream, Future};
use futures::sync::oneshot;
use std::thread;
@ -353,9 +328,8 @@ mod tests {
let path = path.to_owned();
let (ok_signal, ok_rx) = oneshot::channel();
thread::spawn(|| {
let mut core = Core::new().expect("failed to spawn an event loop");
let endpoint = Endpoint::new(path);
let connections = endpoint.incoming(core.handle()).expect("failed to open up a new pipe/socket");
let connections = endpoint.incoming().expect("failed to open up a new pipe/socket");
ok_signal.send(()).expect("failed to send ok");
let srv = connections.for_each(|(stream, _)| {
let (reader, writer) = stream.split();
@ -369,45 +343,47 @@ mod tests {
.map(|_| ())
})
.map_err(|_| ());
core.run(srv).expect("server failed");
tokio::run(srv);
});
ok_rx.wait().expect("failed to receive handle")
}
// NOTE: Intermittently fails or stalls on windows.
#[test]
fn smoke_test() {
let path = random_pipe_path();
run_server(&path);
let mut core = Core::new().expect("failed to spawn an event loop");
let handle = core.handle();
let client = IpcConnection::connect(&path, &handle).expect("failed to open a client");
let other_client = IpcConnection::connect(&path, &handle).expect("failed to open a client again");
let client = IpcConnection::connect(&path).expect("failed to open a client");
let other_client = IpcConnection::connect(&path).expect("failed to open a client again");
let msg = b"hello";
let mut rx_buf = vec![0u8; msg.len()];
let client_fut = io::write_all(client, &msg).and_then(|(client, _)| {
io::read_exact(client, &mut rx_buf).map(|(_, buf)| buf)
let rx_buf = vec![0u8; msg.len()];
let client_fut = io::write_all(client, msg).and_then(move |(client, _)| {
io::read_exact(client, rx_buf).map(|(_, buf)| buf)
});
let mut rx_buf2 = vec![0u8; msg.len()];
let other_client_fut = io::write_all(other_client, &msg).and_then(|(client, _)| {
io::read_exact(client, &mut rx_buf2).map(|(_, buf)| buf)
let rx_buf2 = vec![0u8; msg.len()];
let other_client_fut = io::write_all(other_client, msg).and_then(move |(client, _)| {
io::read_exact(client, rx_buf2).map(|(_, buf)| buf)
});
let (rx_msg, other_rx_msg) = core.run(client_fut.join(other_client_fut)).expect("failed to read from server");
assert_eq!(rx_msg, msg);
assert_eq!(other_rx_msg, msg);
let test_fut = client_fut.join(other_client_fut).and_then(move |(rx_msg, other_rx_msg)| {
assert_eq!(rx_msg, msg);
assert_eq!(other_rx_msg, msg);
Ok(())
}).map_err(|err| panic!("Smoke test error: {:?}", err));
tokio::run(test_fut);
}
#[cfg(windows)]
fn create_pipe_with_permissions(attr: SecurityAttributes) -> ::std::io::Result<()> {
let mut core = Core::new().expect("Failed to spawn an event loop");
let path = random_pipe_path();
let mut endpoint = Endpoint::new(path);
endpoint.set_security_attributes(attr);
endpoint.incoming(core.handle()).map(|_| ())
endpoint.incoming().map(|_| ())
}
#[cfg(windows)]