commit
d7670cd4ff
|
@ -15,6 +15,7 @@ pub mod recorder;
|
||||||
pub mod result;
|
pub mod result;
|
||||||
pub mod signature;
|
pub mod signature;
|
||||||
pub mod streamer;
|
pub mod streamer;
|
||||||
|
pub mod subscribers;
|
||||||
pub mod transaction;
|
pub mod transaction;
|
||||||
extern crate bincode;
|
extern crate bincode;
|
||||||
extern crate byteorder;
|
extern crate byteorder;
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
//! The `packet` module defines data structures and methods to pull data from the network.
|
||||||
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
|
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
|
||||||
use result::{Error, Result};
|
use result::{Error, Result};
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
|
|
214
src/streamer.rs
214
src/streamer.rs
|
@ -1,12 +1,14 @@
|
||||||
|
//! The `streamer` module defines a set of services for effecently pulling data from udp sockets.
|
||||||
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, NUM_BLOBS};
|
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, NUM_BLOBS};
|
||||||
use result::Result;
|
use result::Result;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc;
|
use std::sync::mpsc;
|
||||||
use std::sync::Arc;
|
use std::sync::{Arc, RwLock};
|
||||||
use std::thread::{spawn, JoinHandle};
|
use std::thread::{spawn, JoinHandle};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
use subscribers;
|
||||||
|
|
||||||
pub type PacketReceiver = mpsc::Receiver<SharedPackets>;
|
pub type PacketReceiver = mpsc::Receiver<SharedPackets>;
|
||||||
pub type PacketSender = mpsc::Sender<SharedPackets>;
|
pub type PacketSender = mpsc::Sender<SharedPackets>;
|
||||||
|
@ -75,14 +77,79 @@ pub fn responder(
|
||||||
|
|
||||||
//TODO, we would need to stick block authentication before we create the
|
//TODO, we would need to stick block authentication before we create the
|
||||||
//window.
|
//window.
|
||||||
|
fn recv_blobs(recycler: &BlobRecycler, sock: &UdpSocket, s: &BlobSender) -> Result<()> {
|
||||||
|
let dq = Blob::recv_from(recycler, sock)?;
|
||||||
|
if !dq.is_empty() {
|
||||||
|
s.send(dq)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn blob_receiver(
|
||||||
|
exit: Arc<AtomicBool>,
|
||||||
|
recycler: BlobRecycler,
|
||||||
|
sock: UdpSocket,
|
||||||
|
s: BlobSender,
|
||||||
|
) -> Result<JoinHandle<()>> {
|
||||||
|
//DOCUMENTED SIDE-EFFECT
|
||||||
|
//1 second timeout on socket read
|
||||||
|
let timer = Duration::new(1, 0);
|
||||||
|
sock.set_read_timeout(Some(timer))?;
|
||||||
|
let t = spawn(move || loop {
|
||||||
|
if exit.load(Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let _ = recv_blobs(&recycler, &sock, &s);
|
||||||
|
});
|
||||||
|
Ok(t)
|
||||||
|
}
|
||||||
|
|
||||||
fn recv_window(
|
fn recv_window(
|
||||||
window: &mut Vec<Option<SharedBlob>>,
|
window: &mut Vec<Option<SharedBlob>>,
|
||||||
|
subs: &Arc<RwLock<subscribers::Subscribers>>,
|
||||||
recycler: &BlobRecycler,
|
recycler: &BlobRecycler,
|
||||||
consumed: &mut usize,
|
consumed: &mut usize,
|
||||||
socket: &UdpSocket,
|
r: &BlobReceiver,
|
||||||
s: &BlobSender,
|
s: &BlobSender,
|
||||||
|
cast: &BlobSender,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut dq = Blob::recv_from(recycler, socket)?;
|
let timer = Duration::new(1, 0);
|
||||||
|
let mut dq = r.recv_timeout(timer)?;
|
||||||
|
while let Ok(mut nq) = r.try_recv() {
|
||||||
|
dq.append(&mut nq)
|
||||||
|
}
|
||||||
|
{
|
||||||
|
//retransmit all leader blocks
|
||||||
|
let mut castq = VecDeque::new();
|
||||||
|
let rsubs = subs.read().unwrap();
|
||||||
|
for b in &dq {
|
||||||
|
let p = b.read().unwrap();
|
||||||
|
//TODO this check isn't safe against adverserial packets
|
||||||
|
//we need to maintain a sequence window
|
||||||
|
if p.meta.addr() == rsubs.leader.addr {
|
||||||
|
//TODO
|
||||||
|
//need to copy the retransmited blob
|
||||||
|
//otherwise we get into races with which thread
|
||||||
|
//should do the recycling
|
||||||
|
//
|
||||||
|
//a better absraction would be to recycle when the blob
|
||||||
|
//is dropped via a weakref to the recycler
|
||||||
|
let nv = recycler.allocate();
|
||||||
|
{
|
||||||
|
let mut mnv = nv.write().unwrap();
|
||||||
|
let sz = p.meta.size;
|
||||||
|
mnv.meta.size = sz;
|
||||||
|
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
|
||||||
|
}
|
||||||
|
castq.push_back(nv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !castq.is_empty() {
|
||||||
|
cast.send(castq)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//send a contiguous set of blocks
|
||||||
|
let mut contq = VecDeque::new();
|
||||||
while let Some(b) = dq.pop_front() {
|
while let Some(b) = dq.pop_front() {
|
||||||
let b_ = b.clone();
|
let b_ = b.clone();
|
||||||
let mut p = b.write().unwrap();
|
let mut p = b.write().unwrap();
|
||||||
|
@ -97,46 +164,91 @@ fn recv_window(
|
||||||
} else {
|
} else {
|
||||||
debug!("duplicate blob at index {:}", w);
|
debug!("duplicate blob at index {:}", w);
|
||||||
}
|
}
|
||||||
//send a contiguous set of blocks
|
|
||||||
let mut dq = VecDeque::new();
|
|
||||||
loop {
|
loop {
|
||||||
let k = *consumed % NUM_BLOBS;
|
let k = *consumed % NUM_BLOBS;
|
||||||
if window[k].is_none() {
|
if window[k].is_none() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
dq.push_back(window[k].clone().unwrap());
|
contq.push_back(window[k].clone().unwrap());
|
||||||
window[k] = None;
|
window[k] = None;
|
||||||
*consumed += 1;
|
*consumed += 1;
|
||||||
}
|
}
|
||||||
if !dq.is_empty() {
|
|
||||||
s.send(dq)?;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if !contq.is_empty() {
|
||||||
|
s.send(contq)?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn window(
|
pub fn window(
|
||||||
sock: UdpSocket,
|
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
r: BlobRecycler,
|
subs: Arc<RwLock<subscribers::Subscribers>>,
|
||||||
|
recycler: BlobRecycler,
|
||||||
|
r: BlobReceiver,
|
||||||
s: BlobSender,
|
s: BlobSender,
|
||||||
|
cast: BlobSender,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
spawn(move || {
|
spawn(move || {
|
||||||
let mut window = vec![None; NUM_BLOBS];
|
let mut window = vec![None; NUM_BLOBS];
|
||||||
let mut consumed = 0;
|
let mut consumed = 0;
|
||||||
let timer = Duration::new(1, 0);
|
|
||||||
sock.set_read_timeout(Some(timer)).unwrap();
|
|
||||||
loop {
|
loop {
|
||||||
if recv_window(&mut window, &r, &mut consumed, &sock, &s).is_err()
|
if exit.load(Ordering::Relaxed) {
|
||||||
|| exit.load(Ordering::Relaxed)
|
|
||||||
{
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
let _ = recv_window(&mut window, &subs, &recycler, &mut consumed, &r, &s, &cast);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn retransmit(
|
||||||
|
subs: &Arc<RwLock<subscribers::Subscribers>>,
|
||||||
|
recycler: &BlobRecycler,
|
||||||
|
r: &BlobReceiver,
|
||||||
|
sock: &UdpSocket,
|
||||||
|
) -> Result<()> {
|
||||||
|
let timer = Duration::new(1, 0);
|
||||||
|
let mut dq = r.recv_timeout(timer)?;
|
||||||
|
while let Ok(mut nq) = r.try_recv() {
|
||||||
|
dq.append(&mut nq);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let wsubs = subs.read().unwrap();
|
||||||
|
for b in &dq {
|
||||||
|
let mut mb = b.write().unwrap();
|
||||||
|
wsubs.retransmit(&mut mb, sock)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
while let Some(b) = dq.pop_front() {
|
||||||
|
recycler.recycle(b);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Service to retransmit messages from the leader to layer 1 nodes.
|
||||||
|
/// See `subscribers` for network layer definitions.
|
||||||
|
/// # Arguments
|
||||||
|
/// * `sock` - Socket to read from. Read timeout is set to 1.
|
||||||
|
/// * `exit` - Boolean to signal system exit.
|
||||||
|
/// * `subs` - Shared Subscriber structure. This structure needs to be updated and popualted by
|
||||||
|
/// the accountant.
|
||||||
|
/// * `recycler` - Blob recycler.
|
||||||
|
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
|
||||||
|
pub fn retransmitter(
|
||||||
|
sock: UdpSocket,
|
||||||
|
exit: Arc<AtomicBool>,
|
||||||
|
subs: Arc<RwLock<subscribers::Subscribers>>,
|
||||||
|
recycler: BlobRecycler,
|
||||||
|
r: BlobReceiver,
|
||||||
|
) -> JoinHandle<()> {
|
||||||
|
spawn(move || loop {
|
||||||
|
if exit.load(Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let _ = retransmit(&subs, &recycler, &r, &sock);
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(all(feature = "unstable", test))]
|
#[cfg(all(feature = "unstable", test))]
|
||||||
mod bench {
|
mod bench {
|
||||||
extern crate test;
|
extern crate test;
|
||||||
|
@ -248,9 +360,11 @@ mod test {
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc::channel;
|
use std::sync::mpsc::channel;
|
||||||
use std::sync::Arc;
|
use std::sync::{Arc, RwLock};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use streamer::{receiver, responder, window, BlobReceiver, PacketReceiver};
|
use streamer::{blob_receiver, receiver, responder, retransmitter, window, BlobReceiver,
|
||||||
|
PacketReceiver};
|
||||||
|
use subscribers::{Node, Subscribers};
|
||||||
|
|
||||||
fn get_msgs(r: PacketReceiver, num: &mut usize) {
|
fn get_msgs(r: PacketReceiver, num: &mut usize) {
|
||||||
for _t in 0..5 {
|
for _t in 0..5 {
|
||||||
|
@ -325,9 +439,24 @@ mod test {
|
||||||
let addr = read.local_addr().unwrap();
|
let addr = read.local_addr().unwrap();
|
||||||
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
let subs = Arc::new(RwLock::new(Subscribers::new(
|
||||||
|
Node::default(),
|
||||||
|
Node::new([0; 8], 0, send.local_addr().unwrap()),
|
||||||
|
)));
|
||||||
let resp_recycler = BlobRecycler::default();
|
let resp_recycler = BlobRecycler::default();
|
||||||
let (s_reader, r_reader) = channel();
|
let (s_reader, r_reader) = channel();
|
||||||
let t_receiver = window(read, exit.clone(), resp_recycler.clone(), s_reader);
|
let t_receiver =
|
||||||
|
blob_receiver(exit.clone(), resp_recycler.clone(), read, s_reader).unwrap();
|
||||||
|
let (s_window, r_window) = channel();
|
||||||
|
let (s_cast, r_cast) = channel();
|
||||||
|
let t_window = window(
|
||||||
|
exit.clone(),
|
||||||
|
subs,
|
||||||
|
resp_recycler.clone(),
|
||||||
|
r_reader,
|
||||||
|
s_window,
|
||||||
|
s_cast,
|
||||||
|
);
|
||||||
let (s_responder, r_responder) = channel();
|
let (s_responder, r_responder) = channel();
|
||||||
let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder);
|
let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder);
|
||||||
let mut msgs = VecDeque::new();
|
let mut msgs = VecDeque::new();
|
||||||
|
@ -344,10 +473,57 @@ mod test {
|
||||||
}
|
}
|
||||||
s_responder.send(msgs).expect("send");
|
s_responder.send(msgs).expect("send");
|
||||||
let mut num = 0;
|
let mut num = 0;
|
||||||
get_blobs(r_reader, &mut num);
|
get_blobs(r_window, &mut num);
|
||||||
assert_eq!(num, 10);
|
assert_eq!(num, 10);
|
||||||
|
let mut q = r_cast.recv().unwrap();
|
||||||
|
while let Ok(mut nq) = r_cast.try_recv() {
|
||||||
|
q.append(&mut nq);
|
||||||
|
}
|
||||||
|
assert_eq!(q.len(), 10);
|
||||||
exit.store(true, Ordering::Relaxed);
|
exit.store(true, Ordering::Relaxed);
|
||||||
t_receiver.join().expect("join");
|
t_receiver.join().expect("join");
|
||||||
t_responder.join().expect("join");
|
t_responder.join().expect("join");
|
||||||
|
t_window.join().expect("join");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
pub fn retransmit() {
|
||||||
|
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
|
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
let subs = Arc::new(RwLock::new(Subscribers::new(
|
||||||
|
Node::default(),
|
||||||
|
Node::default(),
|
||||||
|
)));
|
||||||
|
let n3 = Node::new([0; 8], 1, read.local_addr().unwrap());
|
||||||
|
subs.write().unwrap().insert(&[n3]);
|
||||||
|
let (s_retransmit, r_retransmit) = channel();
|
||||||
|
let blob_recycler = BlobRecycler::default();
|
||||||
|
let saddr = send.local_addr().unwrap();
|
||||||
|
let t_retransmit = retransmitter(
|
||||||
|
send,
|
||||||
|
exit.clone(),
|
||||||
|
subs,
|
||||||
|
blob_recycler.clone(),
|
||||||
|
r_retransmit,
|
||||||
|
);
|
||||||
|
let mut bq = VecDeque::new();
|
||||||
|
let b = blob_recycler.allocate();
|
||||||
|
b.write().unwrap().meta.size = 10;
|
||||||
|
bq.push_back(b);
|
||||||
|
s_retransmit.send(bq).unwrap();
|
||||||
|
let (s_blob_receiver, r_blob_receiver) = channel();
|
||||||
|
let t_receiver =
|
||||||
|
blob_receiver(exit.clone(), blob_recycler.clone(), read, s_blob_receiver).unwrap();
|
||||||
|
let mut oq = r_blob_receiver.recv().unwrap();
|
||||||
|
assert_eq!(oq.len(), 1);
|
||||||
|
let o = oq.pop_front().unwrap();
|
||||||
|
let ro = o.read().unwrap();
|
||||||
|
assert_eq!(ro.meta.size, 10);
|
||||||
|
assert_eq!(ro.meta.addr(), saddr);
|
||||||
|
exit.store(true, Ordering::Relaxed);
|
||||||
|
t_receiver.join().expect("join");
|
||||||
|
t_retransmit.join().expect("join");
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,140 @@
|
||||||
|
//! The `subscribers` module defines data structures to keep track of nodes on the network.
|
||||||
|
//! The network is arranged in layers:
|
||||||
|
//!
|
||||||
|
//! * layer 0 - Leader.
|
||||||
|
//! * layer 1 - As many nodes as we can fit to quickly get reliable `2/3+1` finality
|
||||||
|
//! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes.
|
||||||
|
//!
|
||||||
|
//! It's up to the external state machine to keep this updated.
|
||||||
|
use packet::Blob;
|
||||||
|
use rayon::prelude::*;
|
||||||
|
use result::{Error, Result};
|
||||||
|
use std::net::{SocketAddr, UdpSocket};
|
||||||
|
|
||||||
|
#[derive(Clone, PartialEq)]
|
||||||
|
pub struct Node {
|
||||||
|
pub id: [u64; 8],
|
||||||
|
pub weight: u64,
|
||||||
|
pub addr: SocketAddr,
|
||||||
|
}
|
||||||
|
|
||||||
|
//sockaddr doesn't implement default
|
||||||
|
impl Default for Node {
|
||||||
|
fn default() -> Node {
|
||||||
|
Node {
|
||||||
|
id: [0; 8],
|
||||||
|
weight: 0,
|
||||||
|
addr: "0.0.0.0:0".parse().unwrap(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Node {
|
||||||
|
pub fn new(id: [u64; 8], weight: u64, addr: SocketAddr) -> Node {
|
||||||
|
Node { id, weight, addr }
|
||||||
|
}
|
||||||
|
fn key(&self) -> i64 {
|
||||||
|
(self.weight as i64).checked_neg().unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Subscribers {
|
||||||
|
data: Vec<Node>,
|
||||||
|
me: Node,
|
||||||
|
pub leader: Node,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Subscribers {
|
||||||
|
pub fn new(me: Node, leader: Node) -> Subscribers {
|
||||||
|
let mut h = Subscribers {
|
||||||
|
data: vec![],
|
||||||
|
me: me.clone(),
|
||||||
|
leader: leader.clone(),
|
||||||
|
};
|
||||||
|
h.insert(&[me, leader]);
|
||||||
|
h
|
||||||
|
}
|
||||||
|
|
||||||
|
/// retransmit messages from the leader to layer 1 nodes
|
||||||
|
pub fn retransmit(&self, blob: &mut Blob, s: &UdpSocket) -> Result<()> {
|
||||||
|
let errs: Vec<_> = self.data
|
||||||
|
.par_iter()
|
||||||
|
.map(|i| {
|
||||||
|
if self.me == *i {
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
if self.leader == *i {
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
trace!("retransmit blob to {}", i.addr);
|
||||||
|
s.send_to(&blob.data[..blob.meta.size], &i.addr)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
for e in errs {
|
||||||
|
trace!("retransmit result {:?}", e);
|
||||||
|
match e {
|
||||||
|
Err(e) => return Err(Error::IO(e)),
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
pub fn insert(&mut self, ns: &[Node]) {
|
||||||
|
self.data.extend_from_slice(ns);
|
||||||
|
self.data.sort_by_key(Node::key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use packet::Blob;
|
||||||
|
use rayon::prelude::*;
|
||||||
|
use std::net::UdpSocket;
|
||||||
|
use std::time::Duration;
|
||||||
|
use subscribers::{Node, Subscribers};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
pub fn subscriber() {
|
||||||
|
let mut me = Node::default();
|
||||||
|
me.weight = 10;
|
||||||
|
let mut leader = Node::default();
|
||||||
|
leader.weight = 11;
|
||||||
|
let mut s = Subscribers::new(me, leader);
|
||||||
|
assert_eq!(s.data.len(), 2);
|
||||||
|
assert_eq!(s.data[0].weight, 11);
|
||||||
|
assert_eq!(s.data[1].weight, 10);
|
||||||
|
let mut n = Node::default();
|
||||||
|
n.weight = 12;
|
||||||
|
s.insert(&[n]);
|
||||||
|
assert_eq!(s.data.len(), 3);
|
||||||
|
assert_eq!(s.data[0].weight, 12);
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
pub fn retransmit() {
|
||||||
|
let s1 = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
|
let s2 = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
|
let s3 = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
|
let n1 = Node::new([0; 8], 0, s1.local_addr().unwrap());
|
||||||
|
let n2 = Node::new([0; 8], 0, s2.local_addr().unwrap());
|
||||||
|
let mut s = Subscribers::new(n1.clone(), n2.clone());
|
||||||
|
let n3 = Node::new([0; 8], 0, s3.local_addr().unwrap());
|
||||||
|
s.insert(&[n3]);
|
||||||
|
let mut b = Blob::default();
|
||||||
|
b.meta.size = 10;
|
||||||
|
let s4 = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
|
s.retransmit(&mut b, &s4).unwrap();
|
||||||
|
let res: Vec<_> = [s1, s2, s3]
|
||||||
|
.into_par_iter()
|
||||||
|
.map(|s| {
|
||||||
|
let mut b = Blob::default();
|
||||||
|
s.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
|
||||||
|
s.recv_from(&mut b.data).is_err()
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
assert_eq!(res, [true, true, false]);
|
||||||
|
let mut n4 = Node::default();
|
||||||
|
n4.addr = "255.255.255.255:1".parse().unwrap();
|
||||||
|
s.insert(&[n4]);
|
||||||
|
assert!(s.retransmit(&mut b, &s4).is_err());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue