diff --git a/src/streamer.rs b/src/streamer.rs index 138488899..9eb2360c0 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -222,6 +222,11 @@ fn retransmit( Ok(()) } +//service to retransmit messages from the leader to layer 1 nodes +//see subscriber.rs for network layer definitions +//window receives blobs from the network +//for any blobs that originated from the leader, we broadcast +//to the rest of the network pub fn retransmitter( sock: UdpSocket, exit: Arc, @@ -485,18 +490,18 @@ mod test { ))); let n3 = Node::new([0; 8], 1, read.local_addr().unwrap()); subs.write().unwrap().insert(&[n3]); - let (s_cast, r_cast) = channel(); - let re = BlobRecycler::default(); + let (s_retransmit, r_retransmit) = channel(); + let blob_recycler = BlobRecycler::default(); let saddr = send.local_addr().unwrap(); - let t_retransmit = retransmitter(send, exit.clone(), subs, re.clone(), r_cast); + let t_retransmit = retransmitter(send, exit.clone(), subs, blob_recycler.clone(), r_retransmit); let mut bq = VecDeque::new(); - let b = re.allocate(); + let b = blob_recycler.allocate(); b.write().unwrap().meta.size = 10; bq.push_back(b); - s_cast.send(bq).unwrap(); - let (s_recv, r_recv) = channel(); - let t_receiver = blob_receiver(exit.clone(), re.clone(), read, s_recv).unwrap(); - let mut oq = r_recv.recv().unwrap(); + s_retransmit.send(bq).unwrap(); + let (s_blob_receiver, r_blob_receiver) = channel(); + let t_receiver = blob_receiver(exit.clone(), blob_recycler.clone(), read, s_blob_receiver).unwrap(); + let mut oq = r_blob_receiver.recv().unwrap(); assert_eq!(oq.len(), 1); let o = oq.pop_front().unwrap(); let ro = o.read().unwrap(); diff --git a/src/subscribers.rs b/src/subscribers.rs index 0a123540b..f719d2e2d 100644 --- a/src/subscribers.rs +++ b/src/subscribers.rs @@ -13,11 +13,7 @@ pub struct Node { //sockaddr doesn't implement default impl Default for Node { fn default() -> Node { - Node { - id: [0; 8], - weight: 0, - addr: "0.0.0.0:0".parse().unwrap(), - } + Node { id: [0; 8], weight: 0, addr: "0.0.0.0:0".parse().unwrap(), } } } @@ -34,9 +30,13 @@ impl Node { } } +//Subscriber data structure +//layer 0, leader +//layer 1, as many nodes as we can fit to quickly get reliable 2/3+1 finality +//layer 2, everyone else, if layer 1 is 2**10, layer 2 should be 2**20 number of nodes pub struct Subscribers { - pub data: Vec, - pub me: Node, + data: Vec, + me: Node, pub leader: Node, } @@ -50,6 +50,8 @@ impl Subscribers { h.insert(&[me, leader]); h } + + //retransmit messages from the leader to layer 1 nodes pub fn retransmit(&self, blob: &mut Blob, s: &UdpSocket) -> Result<()> { let errs: Vec<_> = self.data .par_iter()