commit
edf6272374
|
@ -22,12 +22,14 @@ use std::io::Write;
|
||||||
use std::net::{SocketAddr, UdpSocket};
|
use std::net::{SocketAddr, UdpSocket};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc::{channel, Receiver, Sender};
|
use std::sync::mpsc::{channel, Receiver, Sender};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex, RwLock};
|
||||||
use std::thread::{spawn, JoinHandle};
|
use std::thread::{spawn, JoinHandle};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use streamer;
|
use streamer;
|
||||||
use transaction::Transaction;
|
use transaction::Transaction;
|
||||||
|
|
||||||
|
use subscribers;
|
||||||
|
|
||||||
pub struct AccountantSkel<W: Write + Send + 'static> {
|
pub struct AccountantSkel<W: Write + Send + 'static> {
|
||||||
acc: Accountant,
|
acc: Accountant,
|
||||||
last_id: Hash,
|
last_id: Hash,
|
||||||
|
@ -288,8 +290,36 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
/// Process verified blobs, already in order
|
||||||
|
/// Respond with a signed hash of the state
|
||||||
|
fn replicate_state(
|
||||||
|
obj: &Arc<Mutex<AccountantSkel<W>>>,
|
||||||
|
verified_receiver: &streamer::BlobReceiver,
|
||||||
|
blob_recycler: &packet::BlobRecycler,
|
||||||
|
) -> Result<()> {
|
||||||
|
let timer = Duration::new(1, 0);
|
||||||
|
let blobs = verified_receiver.recv_timeout(timer)?;
|
||||||
|
for msgs in &blobs {
|
||||||
|
let blob = msgs.read().unwrap();
|
||||||
|
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
|
||||||
|
for entry in entries {
|
||||||
|
obj.lock().unwrap().acc.register_entry_id(&entry.id);
|
||||||
|
|
||||||
|
obj.lock()
|
||||||
|
.unwrap()
|
||||||
|
.acc
|
||||||
|
.process_verified_events(entry.events)?;
|
||||||
|
}
|
||||||
|
//TODO respond back to leader with hash of the state
|
||||||
|
}
|
||||||
|
for blob in blobs {
|
||||||
|
blob_recycler.recycle(blob);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Create a UDP microservice that forwards messages the given AccountantSkel.
|
/// Create a UDP microservice that forwards messages the given AccountantSkel.
|
||||||
|
/// This service is the network leader
|
||||||
/// Set `exit` to shutdown its threads.
|
/// Set `exit` to shutdown its threads.
|
||||||
pub fn serve(
|
pub fn serve(
|
||||||
obj: &Arc<Mutex<AccountantSkel<W>>>,
|
obj: &Arc<Mutex<AccountantSkel<W>>>,
|
||||||
|
@ -322,7 +352,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
||||||
|
|
||||||
let skel = obj.clone();
|
let skel = obj.clone();
|
||||||
let t_server = spawn(move || loop {
|
let t_server = spawn(move || loop {
|
||||||
let e = AccountantSkel::process(
|
let e = Self::process(
|
||||||
&skel,
|
&skel,
|
||||||
&verified_receiver,
|
&verified_receiver,
|
||||||
&blob_sender,
|
&blob_sender,
|
||||||
|
@ -340,6 +370,75 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
||||||
});
|
});
|
||||||
Ok(vec![t_receiver, t_responder, t_server, t_verifier])
|
Ok(vec![t_receiver, t_responder, t_server, t_verifier])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// This service receives messages from a leader in the network and processes the transactions
|
||||||
|
/// on the accountant state.
|
||||||
|
/// # Arguments
|
||||||
|
/// * `obj` - The accountant state.
|
||||||
|
/// * `rsubs` - The subscribers.
|
||||||
|
/// * `exit` - The exit signal.
|
||||||
|
/// # Remarks
|
||||||
|
/// The pipeline is constructed as follows:
|
||||||
|
/// 1. receive blobs from the network, these are out of order
|
||||||
|
/// 2. verify blobs, PoH, signatures (TODO)
|
||||||
|
/// 3. reconstruct contiguous window
|
||||||
|
/// a. order the blobs
|
||||||
|
/// b. use erasure coding to reconstruct missing blobs
|
||||||
|
/// c. ask the network for missing blobs, if erasure coding is insufficient
|
||||||
|
/// d. make sure that the blobs PoH sequences connect (TODO)
|
||||||
|
/// 4. process the transaction state machine
|
||||||
|
/// 5. respond with the hash of the state back to the leader
|
||||||
|
pub fn replicate(
|
||||||
|
obj: &Arc<Mutex<AccountantSkel<W>>>,
|
||||||
|
rsubs: subscribers::Subscribers,
|
||||||
|
exit: Arc<AtomicBool>,
|
||||||
|
) -> Result<Vec<JoinHandle<()>>> {
|
||||||
|
let read = UdpSocket::bind(rsubs.me.addr)?;
|
||||||
|
// make sure we are on the same interface
|
||||||
|
let mut local = read.local_addr()?;
|
||||||
|
local.set_port(0);
|
||||||
|
let write = UdpSocket::bind(local)?;
|
||||||
|
|
||||||
|
let blob_recycler = packet::BlobRecycler::default();
|
||||||
|
let (blob_sender, blob_receiver) = channel();
|
||||||
|
let t_blob_receiver = streamer::blob_receiver(
|
||||||
|
exit.clone(),
|
||||||
|
blob_recycler.clone(),
|
||||||
|
read,
|
||||||
|
blob_sender.clone(),
|
||||||
|
)?;
|
||||||
|
let (window_sender, window_receiver) = channel();
|
||||||
|
let (retransmit_sender, retransmit_receiver) = channel();
|
||||||
|
|
||||||
|
let subs = Arc::new(RwLock::new(rsubs));
|
||||||
|
let t_retransmit = streamer::retransmitter(
|
||||||
|
write,
|
||||||
|
exit.clone(),
|
||||||
|
subs.clone(),
|
||||||
|
blob_recycler.clone(),
|
||||||
|
retransmit_receiver,
|
||||||
|
);
|
||||||
|
//TODO
|
||||||
|
//the packets coming out of blob_receiver need to be sent to the GPU and verified
|
||||||
|
//then sent to the window, which does the erasure coding reconstruction
|
||||||
|
let t_window = streamer::window(
|
||||||
|
exit.clone(),
|
||||||
|
subs,
|
||||||
|
blob_recycler.clone(),
|
||||||
|
blob_receiver,
|
||||||
|
window_sender,
|
||||||
|
retransmit_sender,
|
||||||
|
);
|
||||||
|
|
||||||
|
let skel = obj.clone();
|
||||||
|
let t_server = spawn(move || loop {
|
||||||
|
let e = Self::replicate_state(&skel, &window_receiver, &blob_recycler);
|
||||||
|
if e.is_err() && exit.load(Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Ok(vec![t_blob_receiver, t_retransmit, t_window, t_server])
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -367,7 +466,7 @@ mod tests {
|
||||||
use accountant_skel::{to_packets, Request};
|
use accountant_skel::{to_packets, Request};
|
||||||
use bincode::serialize;
|
use bincode::serialize;
|
||||||
use ecdsa;
|
use ecdsa;
|
||||||
use packet::{PacketRecycler, NUM_PACKETS};
|
use packet::{BlobRecycler, PacketRecycler, NUM_PACKETS};
|
||||||
use transaction::{memfind, test_tx};
|
use transaction::{memfind, test_tx};
|
||||||
|
|
||||||
use accountant::Accountant;
|
use accountant::Accountant;
|
||||||
|
@ -388,6 +487,15 @@ mod tests {
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use transaction::Transaction;
|
use transaction::Transaction;
|
||||||
|
|
||||||
|
use subscribers::{Node, Subscribers};
|
||||||
|
use streamer;
|
||||||
|
use std::sync::mpsc::channel;
|
||||||
|
use std::collections::VecDeque;
|
||||||
|
use hash::{hash, Hash};
|
||||||
|
use event::Event;
|
||||||
|
use entry;
|
||||||
|
use chrono::prelude::*;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_layout() {
|
fn test_layout() {
|
||||||
let tr = test_tx();
|
let tr = test_tx();
|
||||||
|
@ -492,6 +600,138 @@ mod tests {
|
||||||
exit.store(true, Ordering::Relaxed);
|
exit.store(true, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
use std::sync::{Once, ONCE_INIT};
|
||||||
|
extern crate env_logger;
|
||||||
|
|
||||||
|
static INIT: Once = ONCE_INIT;
|
||||||
|
|
||||||
|
/// Setup function that is only run once, even if called multiple times.
|
||||||
|
fn setup() {
|
||||||
|
INIT.call_once(|| {
|
||||||
|
env_logger::init().unwrap();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_replicate() {
|
||||||
|
setup();
|
||||||
|
let leader_sock = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
|
let leader_addr = leader_sock.local_addr().unwrap();
|
||||||
|
let me_addr = "127.0.0.1:9010".parse().unwrap();
|
||||||
|
let target_peer_sock = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
|
let target_peer_addr = target_peer_sock.local_addr().unwrap();
|
||||||
|
let source_peer_sock = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
|
||||||
|
let node_me = Node::new([0, 0, 0, 0, 0, 0, 0, 1], 10, me_addr);
|
||||||
|
let node_subs = vec![Node::new([0, 0, 0, 0, 0, 0, 0, 2], 8, target_peer_addr); 1];
|
||||||
|
let node_leader = Node::new([0, 0, 0, 0, 0, 0, 0, 3], 20, leader_addr);
|
||||||
|
let subs = Subscribers::new(node_me, node_leader, &node_subs);
|
||||||
|
|
||||||
|
// setup some blob services to send blobs into the socket
|
||||||
|
// to simulate the source peer and get blobs out of the socket to
|
||||||
|
// simulate target peer
|
||||||
|
let recv_recycler = BlobRecycler::default();
|
||||||
|
let resp_recycler = BlobRecycler::default();
|
||||||
|
let (s_reader, r_reader) = channel();
|
||||||
|
let t_receiver = streamer::blob_receiver(
|
||||||
|
exit.clone(),
|
||||||
|
recv_recycler.clone(),
|
||||||
|
target_peer_sock,
|
||||||
|
s_reader,
|
||||||
|
).unwrap();
|
||||||
|
let (s_responder, r_responder) = channel();
|
||||||
|
let t_responder = streamer::responder(
|
||||||
|
source_peer_sock,
|
||||||
|
exit.clone(),
|
||||||
|
resp_recycler.clone(),
|
||||||
|
r_responder,
|
||||||
|
);
|
||||||
|
|
||||||
|
let starting_balance = 10_000;
|
||||||
|
let alice = Mint::new(starting_balance);
|
||||||
|
let acc = Accountant::new(&alice);
|
||||||
|
let historian = Historian::new(&alice.last_id(), Some(30));
|
||||||
|
let acc = Arc::new(Mutex::new(AccountantSkel::new(
|
||||||
|
acc,
|
||||||
|
alice.last_id(),
|
||||||
|
sink(),
|
||||||
|
historian,
|
||||||
|
)));
|
||||||
|
|
||||||
|
let _threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap();
|
||||||
|
|
||||||
|
let mut alice_ref_balance = starting_balance;
|
||||||
|
let mut msgs = VecDeque::new();
|
||||||
|
let mut cur_hash = Hash::default();
|
||||||
|
let num_blobs = 10;
|
||||||
|
let transfer_amount = 501;
|
||||||
|
let bob_keypair = KeyPair::new();
|
||||||
|
for i in 0..num_blobs {
|
||||||
|
let b = resp_recycler.allocate();
|
||||||
|
let b_ = b.clone();
|
||||||
|
let mut w = b.write().unwrap();
|
||||||
|
w.set_index(i).unwrap();
|
||||||
|
|
||||||
|
let tr0 = Event::new_timestamp(&bob_keypair, Utc::now());
|
||||||
|
let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]);
|
||||||
|
acc.lock().unwrap().acc.register_entry_id(&cur_hash);
|
||||||
|
cur_hash = hash(&cur_hash);
|
||||||
|
|
||||||
|
let tr1 = Transaction::new(
|
||||||
|
&alice.keypair(),
|
||||||
|
bob_keypair.pubkey(),
|
||||||
|
transfer_amount,
|
||||||
|
cur_hash,
|
||||||
|
);
|
||||||
|
acc.lock().unwrap().acc.register_entry_id(&cur_hash);
|
||||||
|
cur_hash = hash(&cur_hash);
|
||||||
|
let entry1 =
|
||||||
|
entry::create_entry(&cur_hash, i + num_blobs, vec![Event::Transaction(tr1)]);
|
||||||
|
acc.lock().unwrap().acc.register_entry_id(&cur_hash);
|
||||||
|
cur_hash = hash(&cur_hash);
|
||||||
|
|
||||||
|
alice_ref_balance -= transfer_amount;
|
||||||
|
|
||||||
|
let serialized_entry = serialize(&vec![entry0, entry1]).unwrap();
|
||||||
|
|
||||||
|
w.data_mut()[..serialized_entry.len()].copy_from_slice(&serialized_entry);
|
||||||
|
w.set_size(serialized_entry.len());
|
||||||
|
w.meta.set_addr(&me_addr);
|
||||||
|
drop(w);
|
||||||
|
msgs.push_back(b_);
|
||||||
|
}
|
||||||
|
|
||||||
|
// send the blobs into the socket
|
||||||
|
s_responder.send(msgs).expect("send");
|
||||||
|
|
||||||
|
// receive retransmitted messages
|
||||||
|
let timer = Duration::new(1, 0);
|
||||||
|
let mut msgs: Vec<_> = Vec::new();
|
||||||
|
while let Ok(msg) = r_reader.recv_timeout(timer) {
|
||||||
|
trace!("msg: {:?}", msg);
|
||||||
|
msgs.push(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
let alice_balance = acc.lock()
|
||||||
|
.unwrap()
|
||||||
|
.acc
|
||||||
|
.get_balance(&alice.keypair().pubkey())
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(alice_balance, alice_ref_balance);
|
||||||
|
|
||||||
|
let bob_balance = acc.lock()
|
||||||
|
.unwrap()
|
||||||
|
.acc
|
||||||
|
.get_balance(&bob_keypair.pubkey())
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(bob_balance, starting_balance - alice_ref_balance);
|
||||||
|
|
||||||
|
exit.store(true, Ordering::Relaxed);
|
||||||
|
t_receiver.join().expect("join");
|
||||||
|
t_responder.join().expect("join");
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(all(feature = "unstable", test))]
|
#[cfg(all(feature = "unstable", test))]
|
||||||
|
|
|
@ -6,6 +6,7 @@ use std::fmt;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
|
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
|
||||||
use std::sync::{Arc, Mutex, RwLock};
|
use std::sync::{Arc, Mutex, RwLock};
|
||||||
|
use std::mem::size_of;
|
||||||
|
|
||||||
pub type SharedPackets = Arc<RwLock<Packets>>;
|
pub type SharedPackets = Arc<RwLock<Packets>>;
|
||||||
pub type SharedBlob = Arc<RwLock<Blob>>;
|
pub type SharedBlob = Arc<RwLock<Blob>>;
|
||||||
|
@ -210,23 +211,28 @@ impl Packets {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const BLOB_INDEX_SIZE: usize = size_of::<u64>();
|
||||||
|
|
||||||
impl Blob {
|
impl Blob {
|
||||||
pub fn get_index(&self) -> Result<u64> {
|
pub fn get_index(&self) -> Result<u64> {
|
||||||
let mut rdr = io::Cursor::new(&self.data[0..8]);
|
let mut rdr = io::Cursor::new(&self.data[0..BLOB_INDEX_SIZE]);
|
||||||
let r = rdr.read_u64::<LittleEndian>()?;
|
let r = rdr.read_u64::<LittleEndian>()?;
|
||||||
Ok(r)
|
Ok(r)
|
||||||
}
|
}
|
||||||
pub fn set_index(&mut self, ix: u64) -> Result<()> {
|
pub fn set_index(&mut self, ix: u64) -> Result<()> {
|
||||||
let mut wtr = vec![];
|
let mut wtr = vec![];
|
||||||
wtr.write_u64::<LittleEndian>(ix)?;
|
wtr.write_u64::<LittleEndian>(ix)?;
|
||||||
self.data[..8].clone_from_slice(&wtr);
|
self.data[..BLOB_INDEX_SIZE].clone_from_slice(&wtr);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
pub fn data(&self) -> &[u8] {
|
pub fn data(&self) -> &[u8] {
|
||||||
&self.data[8..]
|
&self.data[BLOB_INDEX_SIZE..]
|
||||||
}
|
}
|
||||||
pub fn data_mut(&mut self) -> &mut [u8] {
|
pub fn data_mut(&mut self) -> &mut [u8] {
|
||||||
&mut self.data[8..]
|
&mut self.data[BLOB_INDEX_SIZE..]
|
||||||
|
}
|
||||||
|
pub fn set_size(&mut self, size: usize) {
|
||||||
|
self.meta.size = size + BLOB_INDEX_SIZE;
|
||||||
}
|
}
|
||||||
pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result<VecDeque<SharedBlob>> {
|
pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result<VecDeque<SharedBlob>> {
|
||||||
let mut v = VecDeque::new();
|
let mut v = VecDeque::new();
|
||||||
|
|
|
@ -4,6 +4,7 @@ use bincode;
|
||||||
use serde_json;
|
use serde_json;
|
||||||
use std;
|
use std;
|
||||||
use std::any::Any;
|
use std::any::Any;
|
||||||
|
use accountant;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
|
@ -14,6 +15,7 @@ pub enum Error {
|
||||||
RecvError(std::sync::mpsc::RecvError),
|
RecvError(std::sync::mpsc::RecvError),
|
||||||
RecvTimeoutError(std::sync::mpsc::RecvTimeoutError),
|
RecvTimeoutError(std::sync::mpsc::RecvTimeoutError),
|
||||||
Serialize(std::boxed::Box<bincode::ErrorKind>),
|
Serialize(std::boxed::Box<bincode::ErrorKind>),
|
||||||
|
AccountingError(accountant::AccountingError),
|
||||||
SendError,
|
SendError,
|
||||||
Services,
|
Services,
|
||||||
}
|
}
|
||||||
|
@ -30,6 +32,11 @@ impl std::convert::From<std::sync::mpsc::RecvTimeoutError> for Error {
|
||||||
Error::RecvTimeoutError(e)
|
Error::RecvTimeoutError(e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
impl std::convert::From<accountant::AccountingError> for Error {
|
||||||
|
fn from(e: accountant::AccountingError) -> Error {
|
||||||
|
Error::AccountingError(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
impl<T> std::convert::From<std::sync::mpsc::SendError<T>> for Error {
|
impl<T> std::convert::From<std::sync::mpsc::SendError<T>> for Error {
|
||||||
fn from(_e: std::sync::mpsc::SendError<T>) -> Error {
|
fn from(_e: std::sync::mpsc::SendError<T>) -> Error {
|
||||||
Error::SendError
|
Error::SendError
|
||||||
|
|
|
@ -8,7 +8,7 @@ use std::sync::mpsc;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::thread::{spawn, JoinHandle};
|
use std::thread::{spawn, JoinHandle};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use subscribers;
|
use subscribers::Subscribers;
|
||||||
|
|
||||||
pub type PacketReceiver = mpsc::Receiver<SharedPackets>;
|
pub type PacketReceiver = mpsc::Receiver<SharedPackets>;
|
||||||
pub type PacketSender = mpsc::Sender<SharedPackets>;
|
pub type PacketSender = mpsc::Sender<SharedPackets>;
|
||||||
|
@ -99,19 +99,22 @@ pub fn blob_receiver(
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
let _ = recv_blobs(&recycler, &sock, &s);
|
let ret = recv_blobs(&recycler, &sock, &s);
|
||||||
|
if ret.is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
});
|
});
|
||||||
Ok(t)
|
Ok(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn recv_window(
|
fn recv_window(
|
||||||
window: &mut Vec<Option<SharedBlob>>,
|
window: &mut Vec<Option<SharedBlob>>,
|
||||||
subs: &Arc<RwLock<subscribers::Subscribers>>,
|
subs: &Arc<RwLock<Subscribers>>,
|
||||||
recycler: &BlobRecycler,
|
recycler: &BlobRecycler,
|
||||||
consumed: &mut usize,
|
consumed: &mut usize,
|
||||||
r: &BlobReceiver,
|
r: &BlobReceiver,
|
||||||
s: &BlobSender,
|
s: &BlobSender,
|
||||||
cast: &BlobSender,
|
retransmit: &BlobSender,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let timer = Duration::new(1, 0);
|
let timer = Duration::new(1, 0);
|
||||||
let mut dq = r.recv_timeout(timer)?;
|
let mut dq = r.recv_timeout(timer)?;
|
||||||
|
@ -120,12 +123,18 @@ fn recv_window(
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
//retransmit all leader blocks
|
//retransmit all leader blocks
|
||||||
let mut castq = VecDeque::new();
|
let mut retransmitq = VecDeque::new();
|
||||||
let rsubs = subs.read().unwrap();
|
let rsubs = subs.read().unwrap();
|
||||||
for b in &dq {
|
for b in &dq {
|
||||||
let p = b.read().unwrap();
|
let p = b.read().unwrap();
|
||||||
//TODO this check isn't safe against adverserial packets
|
//TODO this check isn't safe against adverserial packets
|
||||||
//we need to maintain a sequence window
|
//we need to maintain a sequence window
|
||||||
|
trace!(
|
||||||
|
"idx: {} addr: {:?} leader: {:?}",
|
||||||
|
p.get_index().unwrap(),
|
||||||
|
p.meta.addr(),
|
||||||
|
rsubs.leader.addr
|
||||||
|
);
|
||||||
if p.meta.addr() == rsubs.leader.addr {
|
if p.meta.addr() == rsubs.leader.addr {
|
||||||
//TODO
|
//TODO
|
||||||
//need to copy the retransmited blob
|
//need to copy the retransmited blob
|
||||||
|
@ -141,11 +150,11 @@ fn recv_window(
|
||||||
mnv.meta.size = sz;
|
mnv.meta.size = sz;
|
||||||
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
|
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
|
||||||
}
|
}
|
||||||
castq.push_back(nv);
|
retransmitq.push_back(nv);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !castq.is_empty() {
|
if !retransmitq.is_empty() {
|
||||||
cast.send(castq)?;
|
retransmit.send(retransmitq)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//send a contiguous set of blocks
|
//send a contiguous set of blocks
|
||||||
|
@ -158,6 +167,7 @@ fn recv_window(
|
||||||
//TODO, after the block are authenticated
|
//TODO, after the block are authenticated
|
||||||
//if we get different blocks at the same index
|
//if we get different blocks at the same index
|
||||||
//that is a network failure/attack
|
//that is a network failure/attack
|
||||||
|
trace!("window w: {} size: {}", w, p.meta.size);
|
||||||
{
|
{
|
||||||
if window[w].is_none() {
|
if window[w].is_none() {
|
||||||
window[w] = Some(b_);
|
window[w] = Some(b_);
|
||||||
|
@ -166,6 +176,7 @@ fn recv_window(
|
||||||
}
|
}
|
||||||
loop {
|
loop {
|
||||||
let k = *consumed % NUM_BLOBS;
|
let k = *consumed % NUM_BLOBS;
|
||||||
|
trace!("k: {} consumed: {}", k, *consumed);
|
||||||
if window[k].is_none() {
|
if window[k].is_none() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -175,6 +186,7 @@ fn recv_window(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
trace!("sending contq.len: {}", contq.len());
|
||||||
if !contq.is_empty() {
|
if !contq.is_empty() {
|
||||||
s.send(contq)?;
|
s.send(contq)?;
|
||||||
}
|
}
|
||||||
|
@ -183,11 +195,11 @@ fn recv_window(
|
||||||
|
|
||||||
pub fn window(
|
pub fn window(
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
subs: Arc<RwLock<subscribers::Subscribers>>,
|
subs: Arc<RwLock<Subscribers>>,
|
||||||
recycler: BlobRecycler,
|
recycler: BlobRecycler,
|
||||||
r: BlobReceiver,
|
r: BlobReceiver,
|
||||||
s: BlobSender,
|
s: BlobSender,
|
||||||
cast: BlobSender,
|
retransmit: BlobSender,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
spawn(move || {
|
spawn(move || {
|
||||||
let mut window = vec![None; NUM_BLOBS];
|
let mut window = vec![None; NUM_BLOBS];
|
||||||
|
@ -196,13 +208,21 @@ pub fn window(
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
let _ = recv_window(&mut window, &subs, &recycler, &mut consumed, &r, &s, &cast);
|
let _ = recv_window(
|
||||||
|
&mut window,
|
||||||
|
&subs,
|
||||||
|
&recycler,
|
||||||
|
&mut consumed,
|
||||||
|
&r,
|
||||||
|
&s,
|
||||||
|
&retransmit,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn retransmit(
|
fn retransmit(
|
||||||
subs: &Arc<RwLock<subscribers::Subscribers>>,
|
subs: &Arc<RwLock<Subscribers>>,
|
||||||
recycler: &BlobRecycler,
|
recycler: &BlobRecycler,
|
||||||
r: &BlobReceiver,
|
r: &BlobReceiver,
|
||||||
sock: &UdpSocket,
|
sock: &UdpSocket,
|
||||||
|
@ -237,7 +257,7 @@ fn retransmit(
|
||||||
pub fn retransmitter(
|
pub fn retransmitter(
|
||||||
sock: UdpSocket,
|
sock: UdpSocket,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
subs: Arc<RwLock<subscribers::Subscribers>>,
|
subs: Arc<RwLock<Subscribers>>,
|
||||||
recycler: BlobRecycler,
|
recycler: BlobRecycler,
|
||||||
r: BlobReceiver,
|
r: BlobReceiver,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
|
@ -442,20 +462,21 @@ mod test {
|
||||||
let subs = Arc::new(RwLock::new(Subscribers::new(
|
let subs = Arc::new(RwLock::new(Subscribers::new(
|
||||||
Node::default(),
|
Node::default(),
|
||||||
Node::new([0; 8], 0, send.local_addr().unwrap()),
|
Node::new([0; 8], 0, send.local_addr().unwrap()),
|
||||||
|
&[],
|
||||||
)));
|
)));
|
||||||
let resp_recycler = BlobRecycler::default();
|
let resp_recycler = BlobRecycler::default();
|
||||||
let (s_reader, r_reader) = channel();
|
let (s_reader, r_reader) = channel();
|
||||||
let t_receiver =
|
let t_receiver =
|
||||||
blob_receiver(exit.clone(), resp_recycler.clone(), read, s_reader).unwrap();
|
blob_receiver(exit.clone(), resp_recycler.clone(), read, s_reader).unwrap();
|
||||||
let (s_window, r_window) = channel();
|
let (s_window, r_window) = channel();
|
||||||
let (s_cast, r_cast) = channel();
|
let (s_retransmit, r_retransmit) = channel();
|
||||||
let t_window = window(
|
let t_window = window(
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
subs,
|
subs,
|
||||||
resp_recycler.clone(),
|
resp_recycler.clone(),
|
||||||
r_reader,
|
r_reader,
|
||||||
s_window,
|
s_window,
|
||||||
s_cast,
|
s_retransmit,
|
||||||
);
|
);
|
||||||
let (s_responder, r_responder) = channel();
|
let (s_responder, r_responder) = channel();
|
||||||
let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder);
|
let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder);
|
||||||
|
@ -475,8 +496,8 @@ mod test {
|
||||||
let mut num = 0;
|
let mut num = 0;
|
||||||
get_blobs(r_window, &mut num);
|
get_blobs(r_window, &mut num);
|
||||||
assert_eq!(num, 10);
|
assert_eq!(num, 10);
|
||||||
let mut q = r_cast.recv().unwrap();
|
let mut q = r_retransmit.recv().unwrap();
|
||||||
while let Ok(mut nq) = r_cast.try_recv() {
|
while let Ok(mut nq) = r_retransmit.try_recv() {
|
||||||
q.append(&mut nq);
|
q.append(&mut nq);
|
||||||
}
|
}
|
||||||
assert_eq!(q.len(), 10);
|
assert_eq!(q.len(), 10);
|
||||||
|
@ -494,9 +515,8 @@ mod test {
|
||||||
let subs = Arc::new(RwLock::new(Subscribers::new(
|
let subs = Arc::new(RwLock::new(Subscribers::new(
|
||||||
Node::default(),
|
Node::default(),
|
||||||
Node::default(),
|
Node::default(),
|
||||||
|
&[Node::new([0; 8], 1, read.local_addr().unwrap())],
|
||||||
)));
|
)));
|
||||||
let n3 = Node::new([0; 8], 1, read.local_addr().unwrap());
|
|
||||||
subs.write().unwrap().insert(&[n3]);
|
|
||||||
let (s_retransmit, r_retransmit) = channel();
|
let (s_retransmit, r_retransmit) = channel();
|
||||||
let blob_recycler = BlobRecycler::default();
|
let blob_recycler = BlobRecycler::default();
|
||||||
let saddr = send.local_addr().unwrap();
|
let saddr = send.local_addr().unwrap();
|
||||||
|
|
|
@ -11,6 +11,8 @@ use rayon::prelude::*;
|
||||||
use result::{Error, Result};
|
use result::{Error, Result};
|
||||||
use std::net::{SocketAddr, UdpSocket};
|
use std::net::{SocketAddr, UdpSocket};
|
||||||
|
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
#[derive(Clone, PartialEq)]
|
#[derive(Clone, PartialEq)]
|
||||||
pub struct Node {
|
pub struct Node {
|
||||||
pub id: [u64; 8],
|
pub id: [u64; 8],
|
||||||
|
@ -38,20 +40,27 @@ impl Node {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for Node {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
write!(f, "Node {{ weight: {} addr: {} }}", self.weight, self.addr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct Subscribers {
|
pub struct Subscribers {
|
||||||
data: Vec<Node>,
|
data: Vec<Node>,
|
||||||
me: Node,
|
pub me: Node,
|
||||||
pub leader: Node,
|
pub leader: Node,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Subscribers {
|
impl Subscribers {
|
||||||
pub fn new(me: Node, leader: Node) -> Subscribers {
|
pub fn new(me: Node, leader: Node, network: &[Node]) -> Subscribers {
|
||||||
let mut h = Subscribers {
|
let mut h = Subscribers {
|
||||||
data: vec![],
|
data: vec![],
|
||||||
me: me.clone(),
|
me: me.clone(),
|
||||||
leader: leader.clone(),
|
leader: leader.clone(),
|
||||||
};
|
};
|
||||||
h.insert(&[me, leader]);
|
h.insert(&[me, leader]);
|
||||||
|
h.insert(network);
|
||||||
h
|
h
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -99,7 +108,7 @@ mod test {
|
||||||
me.weight = 10;
|
me.weight = 10;
|
||||||
let mut leader = Node::default();
|
let mut leader = Node::default();
|
||||||
leader.weight = 11;
|
leader.weight = 11;
|
||||||
let mut s = Subscribers::new(me, leader);
|
let mut s = Subscribers::new(me, leader, &[]);
|
||||||
assert_eq!(s.data.len(), 2);
|
assert_eq!(s.data.len(), 2);
|
||||||
assert_eq!(s.data[0].weight, 11);
|
assert_eq!(s.data[0].weight, 11);
|
||||||
assert_eq!(s.data[1].weight, 10);
|
assert_eq!(s.data[1].weight, 10);
|
||||||
|
@ -116,7 +125,7 @@ mod test {
|
||||||
let s3 = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
let s3 = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
let n1 = Node::new([0; 8], 0, s1.local_addr().unwrap());
|
let n1 = Node::new([0; 8], 0, s1.local_addr().unwrap());
|
||||||
let n2 = Node::new([0; 8], 0, s2.local_addr().unwrap());
|
let n2 = Node::new([0; 8], 0, s2.local_addr().unwrap());
|
||||||
let mut s = Subscribers::new(n1.clone(), n2.clone());
|
let mut s = Subscribers::new(n1.clone(), n2.clone(), &[]);
|
||||||
let n3 = Node::new([0; 8], 0, s3.local_addr().unwrap());
|
let n3 = Node::new([0; 8], 0, s3.local_addr().unwrap());
|
||||||
s.insert(&[n3]);
|
s.insert(&[n3]);
|
||||||
let mut b = Blob::default();
|
let mut b = Blob::default();
|
||||||
|
|
Loading…
Reference in New Issue