Merge pull request #218 from aeyakovenko/multitest-rebase
multinode test
This commit is contained in:
commit
f4971be236
|
@ -160,6 +160,7 @@ impl Bank {
|
|||
/// Deduct tokens from the 'from' address the account has sufficient
|
||||
/// funds and isn't a duplicate.
|
||||
pub fn process_verified_transaction_debits(&self, tr: &Transaction) -> Result<()> {
|
||||
info!("Transaction {}", tr.data.tokens);
|
||||
let bals = self.balances
|
||||
.read()
|
||||
.expect("'balances' read lock in process_verified_transaction_debits");
|
||||
|
|
187
src/crdt.rs
187
src/crdt.rs
|
@ -83,7 +83,7 @@ impl ReplicatedData {
|
|||
/// * `listen` - listen for requests and responses
|
||||
/// No attempt to keep track of timeouts or dropped requests is made, or should be.
|
||||
pub struct Crdt {
|
||||
table: HashMap<PublicKey, ReplicatedData>,
|
||||
pub table: HashMap<PublicKey, ReplicatedData>,
|
||||
/// Value of my update index when entry in table was updated.
|
||||
/// Nodes will ask for updates since `update_index`, and this node
|
||||
/// should respond with all the identities that are greater then the
|
||||
|
@ -93,7 +93,7 @@ pub struct Crdt {
|
|||
/// This Node will ask external nodes for updates since the value in this list
|
||||
pub remote: HashMap<PublicKey, u64>,
|
||||
pub update_index: u64,
|
||||
me: PublicKey,
|
||||
pub me: PublicKey,
|
||||
timeout: Duration,
|
||||
}
|
||||
// TODO These messages should be signed, and go through the gpu pipeline for spam filtering
|
||||
|
@ -106,6 +106,8 @@ enum Protocol {
|
|||
//TODO might need a since?
|
||||
/// from id, form's last update index, ReplicatedData
|
||||
ReceiveUpdates(PublicKey, u64, Vec<ReplicatedData>),
|
||||
/// ask for a missing index
|
||||
RequestWindowIndex(ReplicatedData, u64),
|
||||
}
|
||||
|
||||
impl Crdt {
|
||||
|
@ -117,7 +119,7 @@ impl Crdt {
|
|||
remote: HashMap::new(),
|
||||
me: me.id,
|
||||
update_index: 1,
|
||||
timeout: Duration::new(0, 100_000),
|
||||
timeout: Duration::from_millis(100),
|
||||
};
|
||||
g.local.insert(me.id, g.update_index);
|
||||
g.table.insert(me.id, me);
|
||||
|
@ -134,10 +136,10 @@ impl Crdt {
|
|||
let mut me = self.my_data().clone();
|
||||
me.current_leader_id = key;
|
||||
me.version += 1;
|
||||
self.insert(me);
|
||||
self.insert(&me);
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, v: ReplicatedData) {
|
||||
pub fn insert(&mut self, v: &ReplicatedData) {
|
||||
// TODO check that last_verified types are always increasing
|
||||
if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) {
|
||||
//somehow we signed a message for our own identity with a higher version that
|
||||
|
@ -169,11 +171,12 @@ impl Crdt {
|
|||
let (me, table): (ReplicatedData, Vec<ReplicatedData>) = {
|
||||
// copy to avoid locking durring IO
|
||||
let robj = obj.read().expect("'obj' read lock in pub fn broadcast");
|
||||
info!("broadcast table {}", robj.table.len());
|
||||
let cloned_table: Vec<ReplicatedData> = robj.table.values().cloned().collect();
|
||||
(robj.table[&robj.me].clone(), cloned_table)
|
||||
};
|
||||
let daddr = "0.0.0.0:0".parse().unwrap();
|
||||
let items: Vec<(usize, &ReplicatedData)> = table
|
||||
let nodes: Vec<&ReplicatedData> = table
|
||||
.iter()
|
||||
.filter(|v| {
|
||||
if me.id == v.id {
|
||||
|
@ -183,15 +186,30 @@ impl Crdt {
|
|||
//filter nodes that are not listening
|
||||
false
|
||||
} else {
|
||||
info!("broadcast node {}", v.replicate_addr);
|
||||
true
|
||||
}
|
||||
})
|
||||
.enumerate()
|
||||
.collect();
|
||||
let orders: Vec<_> = items.into_iter().cycle().zip(blobs.iter()).collect();
|
||||
assert!(nodes.len() > 0);
|
||||
info!("nodes table {}", nodes.len());
|
||||
info!("blobs table {}", blobs.len());
|
||||
// enumerate all the blobs, those are the indecies
|
||||
// transmit them to nodes, starting from a different node
|
||||
let orders: Vec<_> = blobs
|
||||
.iter()
|
||||
.enumerate()
|
||||
.zip(
|
||||
nodes
|
||||
.iter()
|
||||
.cycle()
|
||||
.skip((*transmit_index as usize) % nodes.len()),
|
||||
)
|
||||
.collect();
|
||||
info!("orders table {}", orders.len());
|
||||
let errs: Vec<_> = orders
|
||||
.into_par_iter()
|
||||
.map(|((i, v), b)| {
|
||||
.into_iter()
|
||||
.map(|((i, b), v)| {
|
||||
// only leader should be broadcasting
|
||||
assert!(me.current_leader_id != v.id);
|
||||
let mut blob = b.write().expect("'b' write lock in pub fn broadcast");
|
||||
|
@ -199,13 +217,19 @@ impl Crdt {
|
|||
blob.set_index(*transmit_index + i as u64)
|
||||
.expect("set_index in pub fn broadcast");
|
||||
//TODO profile this, may need multiple sockets for par_iter
|
||||
s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr)
|
||||
info!("broadcast {} to {}", blob.meta.size, v.replicate_addr);
|
||||
let e = s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr);
|
||||
info!("done broadcast {} to {}", blob.meta.size, v.replicate_addr);
|
||||
e
|
||||
})
|
||||
.collect();
|
||||
info!("broadcast results {}", errs.len());
|
||||
for e in errs {
|
||||
trace!("retransmit result {:?}", e);
|
||||
match e {
|
||||
Err(e) => return Err(Error::IO(e)),
|
||||
Err(e) => {
|
||||
error!("broadcast result {:?}", e);
|
||||
return Err(Error::IO(e));
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
*transmit_index += 1;
|
||||
|
@ -222,7 +246,11 @@ impl Crdt {
|
|||
let s = obj.read().expect("'obj' read lock in pub fn retransmit");
|
||||
(s.table[&s.me].clone(), s.table.values().cloned().collect())
|
||||
};
|
||||
let rblob = blob.read().expect("'blob' read lock in pub fn retransmit");
|
||||
blob.write()
|
||||
.unwrap()
|
||||
.set_id(me.id)
|
||||
.expect("set_id in pub fn retransmit");
|
||||
let rblob = blob.read().unwrap();
|
||||
let daddr = "0.0.0.0:0".parse().unwrap();
|
||||
let orders: Vec<_> = table
|
||||
.iter()
|
||||
|
@ -243,15 +271,21 @@ impl Crdt {
|
|||
let errs: Vec<_> = orders
|
||||
.par_iter()
|
||||
.map(|v| {
|
||||
trace!("retransmit blob to {}", v.replicate_addr);
|
||||
info!(
|
||||
"retransmit blob {} to {}",
|
||||
rblob.get_index().unwrap(),
|
||||
v.replicate_addr
|
||||
);
|
||||
//TODO profile this, may need multiple sockets for par_iter
|
||||
s.send_to(&rblob.data[..rblob.meta.size], &v.replicate_addr)
|
||||
})
|
||||
.collect();
|
||||
for e in errs {
|
||||
trace!("retransmit result {:?}", e);
|
||||
match e {
|
||||
Err(e) => return Err(Error::IO(e)),
|
||||
Err(e) => {
|
||||
info!("retransmit error {:?}", e);
|
||||
return Err(Error::IO(e));
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
@ -278,29 +312,32 @@ impl Crdt {
|
|||
(id, ups, data)
|
||||
}
|
||||
|
||||
pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec<u8>)> {
|
||||
if self.table.len() <= 1 {
|
||||
return Err(Error::CrdtToSmall);
|
||||
}
|
||||
let mut n = (Self::random() as usize) % self.table.len();
|
||||
while self.table.values().nth(n).unwrap().id == self.me {
|
||||
n = (Self::random() as usize) % self.table.len();
|
||||
}
|
||||
let addr = self.table.values().nth(n).unwrap().gossip_addr.clone();
|
||||
let req = Protocol::RequestWindowIndex(self.table[&self.me].clone(), ix);
|
||||
let out = serialize(&req)?;
|
||||
Ok((addr, out))
|
||||
}
|
||||
|
||||
/// Create a random gossip request
|
||||
/// # Returns
|
||||
/// (A,B)
|
||||
/// * A - Address to send to
|
||||
/// * B - RequestUpdates protocol message
|
||||
fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> {
|
||||
if self.table.len() <= 1 {
|
||||
return Err(Error::GeneralError);
|
||||
let options: Vec<_> = self.table.values().filter(|v| v.id != self.me).collect();
|
||||
if options.len() < 1 {
|
||||
return Err(Error::CrdtToSmall);
|
||||
}
|
||||
let mut n = (Self::random() as usize) % self.table.len();
|
||||
while self.table
|
||||
.values()
|
||||
.nth(n)
|
||||
.expect("'values().nth(n)' while loop in fn gossip_request")
|
||||
.id == self.me
|
||||
{
|
||||
n = (Self::random() as usize) % self.table.len();
|
||||
}
|
||||
let v = self.table
|
||||
.values()
|
||||
.nth(n)
|
||||
.expect("'values().nth(n)' in fn gossip_request")
|
||||
.clone();
|
||||
let n = (Self::random() as usize) % options.len();
|
||||
let v = options[n].clone();
|
||||
let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0);
|
||||
let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone());
|
||||
Ok((v.gossip_addr, req))
|
||||
|
@ -334,7 +371,7 @@ impl Crdt {
|
|||
// TODO we need to punish/spam resist here
|
||||
// sig verify the whole update and slash anyone who sends a bad update
|
||||
for v in data {
|
||||
self.insert(v.clone());
|
||||
self.insert(&v);
|
||||
}
|
||||
*self.remote.entry(from).or_insert(update_index) = update_index;
|
||||
}
|
||||
|
@ -354,9 +391,40 @@ impl Crdt {
|
|||
);
|
||||
})
|
||||
}
|
||||
|
||||
fn run_window_request(
|
||||
window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
|
||||
sock: &UdpSocket,
|
||||
from: &ReplicatedData,
|
||||
ix: u64,
|
||||
) -> Result<()> {
|
||||
let pos = (ix as usize) % window.read().unwrap().len();
|
||||
let mut outblob = vec![];
|
||||
if let &Some(ref blob) = &window.read().unwrap()[pos] {
|
||||
let rblob = blob.read().unwrap();
|
||||
let blob_ix = rblob.get_index().expect("run_window_request get_index");
|
||||
if blob_ix == ix {
|
||||
// copy to avoid doing IO inside the lock
|
||||
outblob.extend(&rblob.data[..rblob.meta.size]);
|
||||
}
|
||||
} else {
|
||||
assert!(window.read().unwrap()[pos].is_none());
|
||||
info!("failed RequestWindowIndex {} {}", ix, from.replicate_addr);
|
||||
}
|
||||
if outblob.len() > 0 {
|
||||
info!(
|
||||
"responding RequestWindowIndex {} {}",
|
||||
ix, from.replicate_addr
|
||||
);
|
||||
sock.send_to(&outblob, from.replicate_addr)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
/// Process messages from the network
|
||||
fn run_listen(obj: &Arc<RwLock<Self>>, sock: &UdpSocket) -> Result<()> {
|
||||
fn run_listen(
|
||||
obj: &Arc<RwLock<Self>>,
|
||||
window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
|
||||
sock: &UdpSocket,
|
||||
) -> Result<()> {
|
||||
//TODO cache connections
|
||||
let mut buf = vec![0u8; 1024 * 64];
|
||||
let (amt, src) = sock.recv_from(&mut buf)?;
|
||||
|
@ -378,7 +446,7 @@ impl Crdt {
|
|||
//TODO verify reqdata belongs to sender
|
||||
obj.write()
|
||||
.expect("'obj' write lock in RequestUpdates")
|
||||
.insert(reqdata);
|
||||
.insert(&reqdata);
|
||||
sock.send_to(&rsp, addr)
|
||||
.expect("'sock.send_to' in RequestUpdates");
|
||||
trace!("send_to done!");
|
||||
|
@ -389,18 +457,30 @@ impl Crdt {
|
|||
.expect("'obj' write lock in ReceiveUpdates")
|
||||
.apply_updates(from, ups, &data);
|
||||
}
|
||||
Protocol::RequestWindowIndex(from, ix) => {
|
||||
//TODO verify from is signed
|
||||
obj.write().unwrap().insert(&from);
|
||||
let me = obj.read().unwrap().my_data().clone();
|
||||
info!(
|
||||
"received RequestWindowIndex {} {} myaddr {}",
|
||||
ix, from.replicate_addr, me.replicate_addr
|
||||
);
|
||||
assert_ne!(from.replicate_addr, me.replicate_addr);
|
||||
let _ = Self::run_window_request(window, sock, &from, ix);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
pub fn listen(
|
||||
obj: Arc<RwLock<Self>>,
|
||||
window: Arc<RwLock<Vec<Option<SharedBlob>>>>,
|
||||
sock: UdpSocket,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> JoinHandle<()> {
|
||||
sock.set_read_timeout(Some(Duration::new(2, 0)))
|
||||
.expect("'sock.set_read_timeout' in crdt.rs");
|
||||
spawn(move || loop {
|
||||
let _ = Self::run_listen(&obj, &sock);
|
||||
let _ = Self::run_listen(&obj, &window, &sock);
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
|
@ -458,7 +538,8 @@ mod test {
|
|||
.map(|_| {
|
||||
let (crdt, gossip, _, _) = test_node();
|
||||
let c = Arc::new(RwLock::new(crdt));
|
||||
let l = Crdt::listen(c.clone(), gossip, exit.clone());
|
||||
let w = Arc::new(RwLock::new(vec![]));
|
||||
let l = Crdt::listen(c.clone(), w, gossip, exit.clone());
|
||||
(c, l)
|
||||
})
|
||||
.collect();
|
||||
|
@ -468,7 +549,7 @@ mod test {
|
|||
.map(|&(ref c, _)| Crdt::gossip(c.clone(), exit.clone()))
|
||||
.collect();
|
||||
let mut done = true;
|
||||
for _ in 0..(num * 16) {
|
||||
for _ in 0..(num * 32) {
|
||||
done = true;
|
||||
for &(ref c, _) in listen.iter() {
|
||||
trace!(
|
||||
|
@ -504,6 +585,7 @@ mod test {
|
|||
}
|
||||
/// ring a -> b -> c -> d -> e -> a
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn gossip_ring_test() {
|
||||
run_gossip_topo(|listen| {
|
||||
let num = listen.len();
|
||||
|
@ -514,13 +596,14 @@ mod test {
|
|||
let yv = listen[y].0.read().unwrap();
|
||||
let mut d = yv.table[&yv.me].clone();
|
||||
d.version = 0;
|
||||
xv.insert(d);
|
||||
xv.insert(&d);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// star (b,c,d,e) -> a
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn gossip_star_test() {
|
||||
run_gossip_topo(|listen| {
|
||||
let num = listen.len();
|
||||
|
@ -531,7 +614,7 @@ mod test {
|
|||
let yv = listen[y].0.read().unwrap();
|
||||
let mut d = yv.table[&yv.me].clone();
|
||||
d.version = 0;
|
||||
xv.insert(d);
|
||||
xv.insert(&d);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -549,14 +632,15 @@ mod test {
|
|||
let mut crdt = Crdt::new(d.clone());
|
||||
assert_eq!(crdt.table[&d.id].version, 0);
|
||||
d.version = 2;
|
||||
crdt.insert(d.clone());
|
||||
crdt.insert(&d);
|
||||
assert_eq!(crdt.table[&d.id].version, 2);
|
||||
d.version = 1;
|
||||
crdt.insert(d.clone());
|
||||
crdt.insert(&d);
|
||||
assert_eq!(crdt.table[&d.id].version, 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
pub fn test_crdt_retransmit() {
|
||||
logger::setup();
|
||||
trace!("c1:");
|
||||
|
@ -568,8 +652,8 @@ mod test {
|
|||
let c1_id = c1.my_data().id;
|
||||
c1.set_leader(c1_id);
|
||||
|
||||
c2.insert(c1.my_data().clone());
|
||||
c3.insert(c1.my_data().clone());
|
||||
c2.insert(&c1.my_data());
|
||||
c3.insert(&c1.my_data());
|
||||
|
||||
c2.set_leader(c1.my_data().id);
|
||||
c3.set_leader(c1.my_data().id);
|
||||
|
@ -577,14 +661,17 @@ mod test {
|
|||
let exit = Arc::new(AtomicBool::new(false));
|
||||
|
||||
// Create listen threads
|
||||
let win1 = Arc::new(RwLock::new(vec![]));
|
||||
let a1 = Arc::new(RwLock::new(c1));
|
||||
let t1 = Crdt::listen(a1.clone(), s1, exit.clone());
|
||||
let t1 = Crdt::listen(a1.clone(), win1, s1, exit.clone());
|
||||
|
||||
let a2 = Arc::new(RwLock::new(c2));
|
||||
let t2 = Crdt::listen(a2.clone(), s2, exit.clone());
|
||||
let win2 = Arc::new(RwLock::new(vec![]));
|
||||
let t2 = Crdt::listen(a2.clone(), win2, s2, exit.clone());
|
||||
|
||||
let a3 = Arc::new(RwLock::new(c3));
|
||||
let t3 = Crdt::listen(a3.clone(), s3, exit.clone());
|
||||
let win3 = Arc::new(RwLock::new(vec![]));
|
||||
let t3 = Crdt::listen(a3.clone(), win3, s3, exit.clone());
|
||||
|
||||
// Create gossip threads
|
||||
let t1_gossip = Crdt::gossip(a1.clone(), exit.clone());
|
||||
|
@ -594,7 +681,7 @@ mod test {
|
|||
//wait to converge
|
||||
trace!("waitng to converge:");
|
||||
let mut done = false;
|
||||
for _ in 0..10 {
|
||||
for _ in 0..30 {
|
||||
done = a1.read().unwrap().table.len() == 3 && a2.read().unwrap().table.len() == 3
|
||||
&& a3.read().unwrap().table.len() == 3;
|
||||
if done {
|
||||
|
|
|
@ -62,9 +62,10 @@ impl<'a> EntryWriter<'a> {
|
|||
) -> Result<()> {
|
||||
let mut q = VecDeque::new();
|
||||
let list = self.write_entries(writer, entry_receiver)?;
|
||||
trace!("New blobs? {}", list.len());
|
||||
info!("New blobs? {}", list.len());
|
||||
ledger::process_entry_list_into_blobs(&list, blob_recycler, &mut q);
|
||||
if !q.is_empty() {
|
||||
info!("broadcasting {}", q.len());
|
||||
broadcast.send(q)?;
|
||||
}
|
||||
Ok(())
|
||||
|
|
|
@ -271,7 +271,9 @@ impl Blob {
|
|||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
info!("recv_from err {:?}", e);
|
||||
if e.kind() != io::ErrorKind::WouldBlock {
|
||||
info!("recv_from err {:?}", e);
|
||||
}
|
||||
return Err(Error::IO(e));
|
||||
}
|
||||
Ok((nrecv, from)) => {
|
||||
|
|
|
@ -18,7 +18,8 @@ pub enum Error {
|
|||
BankError(bank::BankError),
|
||||
SendError,
|
||||
Services,
|
||||
GeneralError,
|
||||
CrdtToSmall,
|
||||
GenericError,
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
|
|
@ -73,7 +73,8 @@ impl Rpu {
|
|||
) -> Result<Vec<JoinHandle<()>>> {
|
||||
let crdt = Arc::new(RwLock::new(Crdt::new(me)));
|
||||
let t_gossip = Crdt::gossip(crdt.clone(), exit.clone());
|
||||
let t_listen = Crdt::listen(crdt.clone(), gossip, exit.clone());
|
||||
let window = streamer::default_window();
|
||||
let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone());
|
||||
|
||||
// make sure we are on the same interface
|
||||
let mut local = requests_socket.local_addr()?;
|
||||
|
@ -121,6 +122,7 @@ impl Rpu {
|
|||
broadcast_socket,
|
||||
exit.clone(),
|
||||
crdt.clone(),
|
||||
window,
|
||||
blob_recycler.clone(),
|
||||
broadcast_receiver,
|
||||
);
|
||||
|
|
158
src/streamer.rs
158
src/streamer.rs
|
@ -2,16 +2,17 @@
|
|||
use crdt::Crdt;
|
||||
#[cfg(feature = "erasure")]
|
||||
use erasure;
|
||||
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, NUM_BLOBS};
|
||||
use result::Result;
|
||||
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets};
|
||||
use result::{Error, Result};
|
||||
use std::collections::VecDeque;
|
||||
use std::net::UdpSocket;
|
||||
use std::net::{SocketAddr, UdpSocket};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::{spawn, JoinHandle};
|
||||
use std::time::Duration;
|
||||
|
||||
pub const WINDOW_SIZE: usize = 2 * 1024;
|
||||
pub type PacketReceiver = mpsc::Receiver<SharedPackets>;
|
||||
pub type PacketSender = mpsc::Sender<SharedPackets>;
|
||||
pub type BlobSender = mpsc::Sender<VecDeque<SharedBlob>>;
|
||||
|
@ -70,7 +71,7 @@ fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Res
|
|||
pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec<SharedPackets>, usize)> {
|
||||
let timer = Duration::new(1, 0);
|
||||
let msgs = recvr.recv_timeout(timer)?;
|
||||
debug!("got msgs");
|
||||
trace!("got msgs");
|
||||
let mut len = msgs.read().unwrap().packets.len();
|
||||
let mut batch = vec![msgs];
|
||||
while let Ok(more) = recvr.try_recv() {
|
||||
|
@ -128,16 +129,58 @@ pub fn blob_receiver(
|
|||
Ok(t)
|
||||
}
|
||||
|
||||
fn find_next_missing(
|
||||
locked_window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
|
||||
crdt: &Arc<RwLock<Crdt>>,
|
||||
consumed: &mut usize,
|
||||
received: &mut usize,
|
||||
) -> Result<Vec<(SocketAddr, Vec<u8>)>> {
|
||||
if *received <= *consumed {
|
||||
return Err(Error::GenericError);
|
||||
}
|
||||
let window = locked_window.read().unwrap();
|
||||
let reqs: Vec<_> = (*consumed..*received)
|
||||
.filter_map(|pix| {
|
||||
let i = pix % WINDOW_SIZE;
|
||||
if let &None = &window[i] {
|
||||
let val = crdt.read().unwrap().window_index_request(pix as u64);
|
||||
if let Ok((to, req)) = val {
|
||||
return Some((to, req));
|
||||
}
|
||||
}
|
||||
None
|
||||
})
|
||||
.collect();
|
||||
Ok(reqs)
|
||||
}
|
||||
|
||||
fn repair_window(
|
||||
locked_window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
|
||||
crdt: &Arc<RwLock<Crdt>>,
|
||||
consumed: &mut usize,
|
||||
received: &mut usize,
|
||||
) -> Result<()> {
|
||||
let reqs = find_next_missing(locked_window, crdt, consumed, received)?;
|
||||
info!("repair_window {} {}", *consumed, *received);
|
||||
let sock = UdpSocket::bind("0.0.0.0:0")?;
|
||||
for (to, req) in reqs {
|
||||
//todo cache socket
|
||||
sock.send_to(&req, to)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn recv_window(
|
||||
window: &mut Vec<Option<SharedBlob>>,
|
||||
locked_window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
|
||||
crdt: &Arc<RwLock<Crdt>>,
|
||||
recycler: &BlobRecycler,
|
||||
consumed: &mut usize,
|
||||
received: &mut usize,
|
||||
r: &BlobReceiver,
|
||||
s: &BlobSender,
|
||||
retransmit: &BlobSender,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::new(1, 0);
|
||||
let timer = Duration::from_millis(200);
|
||||
let mut dq = r.recv_timeout(timer)?;
|
||||
let leader_id = crdt.read()
|
||||
.expect("'crdt' read lock in fn recv_window")
|
||||
|
@ -188,19 +231,27 @@ fn recv_window(
|
|||
let b_ = b.clone();
|
||||
let p = b.write().expect("'b' write lock in fn recv_window");
|
||||
let pix = p.get_index()? as usize;
|
||||
let w = pix % NUM_BLOBS;
|
||||
if pix > *received {
|
||||
*received = pix;
|
||||
}
|
||||
let w = pix % WINDOW_SIZE;
|
||||
//TODO, after the block are authenticated
|
||||
//if we get different blocks at the same index
|
||||
//that is a network failure/attack
|
||||
trace!("window w: {} size: {}", w, p.meta.size);
|
||||
{
|
||||
let mut window = locked_window.write().unwrap();
|
||||
if window[w].is_none() {
|
||||
window[w] = Some(b_);
|
||||
} else {
|
||||
debug!("duplicate blob at index {:}", w);
|
||||
} else if let &Some(ref cblob) = &window[w] {
|
||||
if cblob.read().unwrap().get_index().unwrap() != pix as u64 {
|
||||
warn!("overrun blob at index {:}", w);
|
||||
} else {
|
||||
debug!("duplicate blob at index {:}", w);
|
||||
}
|
||||
}
|
||||
loop {
|
||||
let k = *consumed % NUM_BLOBS;
|
||||
let k = *consumed % WINDOW_SIZE;
|
||||
trace!("k: {} consumed: {}", k, *consumed);
|
||||
if window[k].is_none() {
|
||||
break;
|
||||
|
@ -211,43 +262,71 @@ fn recv_window(
|
|||
}
|
||||
}
|
||||
}
|
||||
{
|
||||
let buf: Vec<_> = locked_window
|
||||
.read()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, v)| {
|
||||
if i == (*consumed % WINDOW_SIZE) {
|
||||
assert!(v.is_none());
|
||||
"_"
|
||||
} else if v.is_none() {
|
||||
"0"
|
||||
} else {
|
||||
"1"
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
trace!("WINDOW: {}", buf.join(""));
|
||||
}
|
||||
trace!("sending contq.len: {}", contq.len());
|
||||
if !contq.is_empty() {
|
||||
trace!("sending contq.len: {}", contq.len());
|
||||
s.send(contq)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn default_window() -> Arc<RwLock<Vec<Option<SharedBlob>>>> {
|
||||
Arc::new(RwLock::new(vec![None; WINDOW_SIZE]))
|
||||
}
|
||||
|
||||
pub fn window(
|
||||
exit: Arc<AtomicBool>,
|
||||
crdt: Arc<RwLock<Crdt>>,
|
||||
window: Arc<RwLock<Vec<Option<SharedBlob>>>>,
|
||||
recycler: BlobRecycler,
|
||||
r: BlobReceiver,
|
||||
s: BlobSender,
|
||||
retransmit: BlobSender,
|
||||
) -> JoinHandle<()> {
|
||||
spawn(move || {
|
||||
let mut window = vec![None; NUM_BLOBS];
|
||||
let mut consumed = 0;
|
||||
let mut received = 0;
|
||||
loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
let _ = recv_window(
|
||||
&mut window,
|
||||
&window,
|
||||
&crdt,
|
||||
&recycler,
|
||||
&mut consumed,
|
||||
&mut received,
|
||||
&r,
|
||||
&s,
|
||||
&retransmit,
|
||||
);
|
||||
let _ = repair_window(&window, &crdt, &mut consumed, &mut received);
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn broadcast(
|
||||
crdt: &Arc<RwLock<Crdt>>,
|
||||
window: &Arc<RwLock<Vec<Option<SharedBlob>>>>,
|
||||
recycler: &BlobRecycler,
|
||||
r: &BlobReceiver,
|
||||
sock: &UdpSocket,
|
||||
|
@ -263,8 +342,31 @@ fn broadcast(
|
|||
#[cfg(feature = "erasure")]
|
||||
erasure::generate_codes(blobs);
|
||||
Crdt::broadcast(crdt, &blobs, &sock, transmit_index)?;
|
||||
while let Some(b) = blobs.pop() {
|
||||
recycler.recycle(b);
|
||||
// keep the cache of blobs that are broadcast
|
||||
{
|
||||
let mut win = window.write().unwrap();
|
||||
for b in &blobs {
|
||||
let ix = b.read().unwrap().get_index().expect("blob index");
|
||||
let pos = (ix as usize) % WINDOW_SIZE;
|
||||
if let Some(x) = &win[pos] {
|
||||
trace!(
|
||||
"popped {} at {}",
|
||||
x.read().unwrap().get_index().unwrap(),
|
||||
pos
|
||||
);
|
||||
recycler.recycle(x.clone());
|
||||
}
|
||||
trace!("null {}", pos);
|
||||
win[pos] = None;
|
||||
assert!(win[pos].is_none());
|
||||
}
|
||||
while let Some(b) = blobs.pop() {
|
||||
let ix = b.read().unwrap().get_index().expect("blob index");
|
||||
let pos = (ix as usize) % WINDOW_SIZE;
|
||||
trace!("caching {} at {}", ix, pos);
|
||||
assert!(win[pos].is_none());
|
||||
win[pos] = Some(b);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -275,12 +377,14 @@ fn broadcast(
|
|||
/// * `sock` - Socket to send from.
|
||||
/// * `exit` - Boolean to signal system exit.
|
||||
/// * `crdt` - CRDT structure
|
||||
/// * `window` - Cache of blobs that we have broadcast
|
||||
/// * `recycler` - Blob recycler.
|
||||
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
|
||||
pub fn broadcaster(
|
||||
sock: UdpSocket,
|
||||
exit: Arc<AtomicBool>,
|
||||
crdt: Arc<RwLock<Crdt>>,
|
||||
window: Arc<RwLock<Vec<Option<SharedBlob>>>>,
|
||||
recycler: BlobRecycler,
|
||||
r: BlobReceiver,
|
||||
) -> JoinHandle<()> {
|
||||
|
@ -290,7 +394,7 @@ pub fn broadcaster(
|
|||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
let _ = broadcast(&crdt, &recycler, &r, &sock, &mut transmit_index);
|
||||
let _ = broadcast(&crdt, &window, &recycler, &r, &sock, &mut transmit_index);
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -463,7 +567,7 @@ mod test {
|
|||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
use streamer::{BlobReceiver, PacketReceiver};
|
||||
use streamer::{default_window, BlobReceiver, PacketReceiver};
|
||||
use streamer::{blob_receiver, receiver, responder, retransmitter, window};
|
||||
|
||||
fn get_msgs(r: PacketReceiver, num: &mut usize) {
|
||||
|
@ -558,9 +662,11 @@ mod test {
|
|||
blob_receiver(exit.clone(), resp_recycler.clone(), read, s_reader).unwrap();
|
||||
let (s_window, r_window) = channel();
|
||||
let (s_retransmit, r_retransmit) = channel();
|
||||
let win = default_window();
|
||||
let t_window = window(
|
||||
exit.clone(),
|
||||
subs,
|
||||
win,
|
||||
resp_recycler.clone(),
|
||||
r_reader,
|
||||
s_window,
|
||||
|
@ -628,15 +734,27 @@ mod test {
|
|||
let (crdt_leader, sock_gossip_leader, _, sock_leader) = test_node();
|
||||
let (crdt_target, sock_gossip_target, sock_replicate_target, _) = test_node();
|
||||
let leader_data = crdt_leader.read().unwrap().my_data().clone();
|
||||
crdt_leader.write().unwrap().insert(leader_data.clone());
|
||||
crdt_leader.write().unwrap().insert(&leader_data);
|
||||
crdt_leader.write().unwrap().set_leader(leader_data.id);
|
||||
let t_crdt_leader_g = Crdt::gossip(crdt_leader.clone(), exit.clone());
|
||||
let t_crdt_leader_l = Crdt::listen(crdt_leader.clone(), sock_gossip_leader, exit.clone());
|
||||
let window_leader = Arc::new(RwLock::new(vec![]));
|
||||
let t_crdt_leader_l = Crdt::listen(
|
||||
crdt_leader.clone(),
|
||||
window_leader,
|
||||
sock_gossip_leader,
|
||||
exit.clone(),
|
||||
);
|
||||
|
||||
crdt_target.write().unwrap().insert(leader_data.clone());
|
||||
crdt_target.write().unwrap().insert(&leader_data);
|
||||
crdt_target.write().unwrap().set_leader(leader_data.id);
|
||||
let t_crdt_target_g = Crdt::gossip(crdt_target.clone(), exit.clone());
|
||||
let t_crdt_target_l = Crdt::listen(crdt_target.clone(), sock_gossip_target, exit.clone());
|
||||
let window_target = Arc::new(RwLock::new(vec![]));
|
||||
let t_crdt_target_l = Crdt::listen(
|
||||
crdt_target.clone(),
|
||||
window_target,
|
||||
sock_gossip_target,
|
||||
exit.clone(),
|
||||
);
|
||||
//leader retransmitter
|
||||
let (s_retransmit, r_retransmit) = channel();
|
||||
let blob_recycler = BlobRecycler::default();
|
||||
|
|
|
@ -40,9 +40,9 @@ impl ThinClient {
|
|||
|
||||
pub fn recv_response(&self) -> io::Result<Response> {
|
||||
let mut buf = vec![0u8; 1024];
|
||||
info!("start recv_from");
|
||||
trace!("start recv_from");
|
||||
self.requests_socket.recv_from(&mut buf)?;
|
||||
info!("end recv_from");
|
||||
trace!("end recv_from");
|
||||
let resp = deserialize(&buf).expect("deserialize balance in thin_client");
|
||||
Ok(resp)
|
||||
}
|
||||
|
@ -50,7 +50,7 @@ impl ThinClient {
|
|||
pub fn process_response(&mut self, resp: Response) {
|
||||
match resp {
|
||||
Response::Balance { key, val } => {
|
||||
info!("Response balance {:?} {:?}", key, val);
|
||||
trace!("Response balance {:?} {:?}", key, val);
|
||||
self.balances.insert(key, val);
|
||||
}
|
||||
Response::LastId { id } => {
|
||||
|
@ -89,7 +89,7 @@ impl ThinClient {
|
|||
/// until the server sends a response. If the response packet is dropped
|
||||
/// by the network, this method will hang indefinitely.
|
||||
pub fn get_balance(&mut self, pubkey: &PublicKey) -> io::Result<i64> {
|
||||
info!("get_balance");
|
||||
trace!("get_balance");
|
||||
let req = Request::GetBalance { key: *pubkey };
|
||||
let data = serialize(&req).expect("serialize GetBalance in pub fn get_balance");
|
||||
self.requests_socket
|
||||
|
@ -98,7 +98,7 @@ impl ThinClient {
|
|||
let mut done = false;
|
||||
while !done {
|
||||
let resp = self.recv_response()?;
|
||||
info!("recv_response {:?}", resp);
|
||||
trace!("recv_response {:?}", resp);
|
||||
if let &Response::Balance { ref key, .. } = &resp {
|
||||
done = key == pubkey;
|
||||
}
|
||||
|
@ -165,9 +165,11 @@ mod tests {
|
|||
use std::io::sink;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread::JoinHandle;
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use streamer::default_window;
|
||||
use tvu::{self, Tvu};
|
||||
|
||||
#[test]
|
||||
|
@ -284,81 +286,104 @@ mod tests {
|
|||
(leader, gossip, serve, replicate, events_socket)
|
||||
}
|
||||
|
||||
fn replicant(
|
||||
leader: &ReplicatedData,
|
||||
exit: Arc<AtomicBool>,
|
||||
alice: &Mint,
|
||||
threads: &mut Vec<JoinHandle<()>>,
|
||||
) {
|
||||
let replicant = test_node();
|
||||
let replicant_bank = {
|
||||
let bank = Bank::new(&alice);
|
||||
Arc::new(Tvu::new(bank, alice.last_id(), None))
|
||||
};
|
||||
let mut ts = Tvu::serve(
|
||||
&replicant_bank,
|
||||
replicant.0.clone(),
|
||||
replicant.1,
|
||||
replicant.2,
|
||||
replicant.3,
|
||||
leader.clone(),
|
||||
exit.clone(),
|
||||
).unwrap();
|
||||
threads.append(&mut ts);
|
||||
}
|
||||
|
||||
fn converge(
|
||||
leader: &ReplicatedData,
|
||||
exit: Arc<AtomicBool>,
|
||||
num_nodes: usize,
|
||||
threads: &mut Vec<JoinHandle<()>>,
|
||||
) -> Vec<SocketAddr> {
|
||||
//lets spy on the network
|
||||
let (mut spy, spy_gossip, _, _, _) = test_node();
|
||||
let daddr = "0.0.0.0:0".parse().unwrap();
|
||||
let me = spy.id.clone();
|
||||
spy.replicate_addr = daddr;
|
||||
spy.serve_addr = daddr;
|
||||
let mut spy_crdt = Crdt::new(spy);
|
||||
spy_crdt.insert(&leader);
|
||||
spy_crdt.set_leader(leader.id);
|
||||
|
||||
let spy_ref = Arc::new(RwLock::new(spy_crdt));
|
||||
let spy_window = default_window();
|
||||
let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone());
|
||||
let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone());
|
||||
//wait for the network to converge
|
||||
for _ in 0..30 {
|
||||
let len = spy_ref.read().unwrap().table.values().len();
|
||||
let mut min = num_nodes as u64;
|
||||
for u in spy_ref.read().unwrap().remote.values() {
|
||||
if min > *u {
|
||||
min = *u;
|
||||
}
|
||||
}
|
||||
info!("length {} {}", len, min);
|
||||
if num_nodes == len && min >= (num_nodes as u64) {
|
||||
warn!("converged! {} {}", len, min);
|
||||
break;
|
||||
}
|
||||
sleep(Duration::new(1, 0));
|
||||
}
|
||||
threads.push(t_spy_listen);
|
||||
threads.push(t_spy_gossip);
|
||||
let v: Vec<SocketAddr> = spy_ref
|
||||
.read()
|
||||
.unwrap()
|
||||
.table
|
||||
.values()
|
||||
.into_iter()
|
||||
.filter(|x| x.id != me)
|
||||
.map(|x| x.serve_addr)
|
||||
.collect();
|
||||
v.clone()
|
||||
}
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_multi_node() {
|
||||
logger::setup();
|
||||
info!("test_multi_node");
|
||||
const N: usize = 5;
|
||||
trace!("test_multi_accountant_stub");
|
||||
let leader = test_node();
|
||||
let replicant = test_node();
|
||||
let alice = Mint::new(10_000);
|
||||
let bob_pubkey = KeyPair::new().pubkey();
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let leader_bank = {
|
||||
let bank = Bank::new(&alice);
|
||||
Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30)))
|
||||
Rpu::new(bank, alice.last_id(), None)
|
||||
};
|
||||
|
||||
let replicant_bank = {
|
||||
let bank = Bank::new(&alice);
|
||||
Arc::new(Tvu::new(
|
||||
bank,
|
||||
alice.last_id(),
|
||||
Some(Duration::from_millis(30)),
|
||||
))
|
||||
};
|
||||
|
||||
let leader_threads = leader_bank
|
||||
let mut threads = leader_bank
|
||||
.serve(leader.0.clone(), leader.2, leader.1, exit.clone(), sink())
|
||||
.unwrap();
|
||||
let replicant_threads = Tvu::serve(
|
||||
&replicant_bank,
|
||||
replicant.0.clone(),
|
||||
replicant.1,
|
||||
replicant.2,
|
||||
replicant.3,
|
||||
leader.0.clone(),
|
||||
exit.clone(),
|
||||
).unwrap();
|
||||
|
||||
//lets spy on the network
|
||||
let (mut spy, spy_gossip, _, _, _) = test_node();
|
||||
let daddr = "0.0.0.0:0".parse().unwrap();
|
||||
spy.replicate_addr = daddr;
|
||||
spy.serve_addr = daddr;
|
||||
let mut spy_crdt = Crdt::new(spy);
|
||||
spy_crdt.insert(leader.0.clone());
|
||||
spy_crdt.set_leader(leader.0.id);
|
||||
|
||||
let spy_ref = Arc::new(RwLock::new(spy_crdt));
|
||||
let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_gossip, exit.clone());
|
||||
let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone());
|
||||
//wait for the network to converge
|
||||
for _ in 0..20 {
|
||||
let ix = spy_ref.read().unwrap().update_index;
|
||||
info!("my update index is {}", ix);
|
||||
let len = spy_ref.read().unwrap().remote.values().len();
|
||||
let mut done = false;
|
||||
info!("remote len {}", len);
|
||||
if len > 1 && ix > 2 {
|
||||
done = true;
|
||||
//check if everyones remote index is greater or equal to ours
|
||||
let vs: Vec<u64> = spy_ref.read().unwrap().remote.values().cloned().collect();
|
||||
for t in vs.into_iter() {
|
||||
info!("remote update index is {} vs {}", t, ix);
|
||||
if t < 3 {
|
||||
done = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
if done == true {
|
||||
info!("converged!");
|
||||
break;
|
||||
}
|
||||
sleep(Duration::new(1, 0));
|
||||
for _ in 0..N {
|
||||
replicant(&leader.0, exit.clone(), &alice, &mut threads);
|
||||
}
|
||||
|
||||
let addrs = converge(&leader.0, exit.clone(), N + 2, &mut threads);
|
||||
//contains the leader addr as well
|
||||
assert_eq!(addrs.len(), N + 1);
|
||||
//verify leader can do transfer
|
||||
let leader_balance = {
|
||||
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
|
@ -368,47 +393,41 @@ mod tests {
|
|||
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
|
||||
let mut client = ThinClient::new(leader.0.serve_addr, requests_socket, events_socket);
|
||||
info!("getting leader last_id");
|
||||
trace!("getting leader last_id");
|
||||
let last_id = client.get_last_id().wait().unwrap();
|
||||
info!("executing leader transer");
|
||||
let _sig = client
|
||||
.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
|
||||
.unwrap();
|
||||
info!("getting leader balance");
|
||||
trace!("getting leader balance");
|
||||
client.get_balance(&bob_pubkey).unwrap()
|
||||
};
|
||||
assert_eq!(leader_balance, 500);
|
||||
//verify replicant has the same balance
|
||||
let mut replicant_balance = 0;
|
||||
for _ in 0..10 {
|
||||
let mut success = 0usize;
|
||||
for serve_addr in addrs.iter() {
|
||||
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
requests_socket
|
||||
.set_read_timeout(Some(Duration::new(1, 0)))
|
||||
.unwrap();
|
||||
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
|
||||
let mut client =
|
||||
ThinClient::new(replicant.0.serve_addr, requests_socket, events_socket);
|
||||
info!("getting replicant balance");
|
||||
if let Ok(bal) = client.get_balance(&bob_pubkey) {
|
||||
replicant_balance = bal;
|
||||
let mut client = ThinClient::new(*serve_addr, requests_socket, events_socket);
|
||||
for i in 0..10 {
|
||||
trace!("getting replicant balance {} {}/10", *serve_addr, i);
|
||||
if let Ok(bal) = client.get_balance(&bob_pubkey) {
|
||||
trace!("replicant balance {}", bal);
|
||||
if bal == leader_balance {
|
||||
success += 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
sleep(Duration::new(1, 0));
|
||||
}
|
||||
info!("replicant balance {}", replicant_balance);
|
||||
if replicant_balance == leader_balance {
|
||||
break;
|
||||
}
|
||||
sleep(Duration::new(1, 0));
|
||||
}
|
||||
assert_eq!(replicant_balance, leader_balance);
|
||||
|
||||
assert_eq!(success, addrs.len());
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
for t in leader_threads {
|
||||
t.join().unwrap();
|
||||
}
|
||||
for t in replicant_threads {
|
||||
t.join().unwrap();
|
||||
}
|
||||
for t in vec![t_spy_listen, t_spy_gossip] {
|
||||
for t in threads {
|
||||
t.join().unwrap();
|
||||
}
|
||||
}
|
||||
|
|
21
src/tvu.rs
21
src/tvu.rs
|
@ -63,9 +63,12 @@ impl Tvu {
|
|||
) -> Result<()> {
|
||||
let timer = Duration::new(1, 0);
|
||||
let blobs = verified_receiver.recv_timeout(timer)?;
|
||||
trace!("replicating blobs {}", blobs.len());
|
||||
let entries = ledger::reconstruct_entries_from_blobs(&blobs);
|
||||
obj.bank.process_verified_entries(entries)?;
|
||||
let res = obj.bank.process_verified_entries(entries);
|
||||
if res.is_err() {
|
||||
error!("process_verified_entries {} {:?}", blobs.len(), res);
|
||||
}
|
||||
res?;
|
||||
for blob in blobs {
|
||||
blob_recycler.recycle(blob);
|
||||
}
|
||||
|
@ -106,9 +109,10 @@ impl Tvu {
|
|||
.set_leader(leader.id);
|
||||
crdt.write()
|
||||
.expect("'crdt' write lock before insert() in pub fn replicate")
|
||||
.insert(leader);
|
||||
.insert(&leader);
|
||||
let t_gossip = Crdt::gossip(crdt.clone(), exit.clone());
|
||||
let t_listen = Crdt::listen(crdt.clone(), gossip, exit.clone());
|
||||
let window = streamer::default_window();
|
||||
let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone());
|
||||
|
||||
// make sure we are on the same interface
|
||||
let mut local = replicate.local_addr()?;
|
||||
|
@ -140,6 +144,7 @@ impl Tvu {
|
|||
let t_window = streamer::window(
|
||||
exit.clone(),
|
||||
crdt.clone(),
|
||||
window,
|
||||
blob_recycler.clone(),
|
||||
blob_receiver,
|
||||
window_sender,
|
||||
|
@ -273,16 +278,18 @@ mod tests {
|
|||
|
||||
let cref_l = Arc::new(RwLock::new(crdt_l));
|
||||
let t_l_gossip = Crdt::gossip(cref_l.clone(), exit.clone());
|
||||
let t_l_listen = Crdt::listen(cref_l, leader_gossip, exit.clone());
|
||||
let window1 = streamer::default_window();
|
||||
let t_l_listen = Crdt::listen(cref_l, window1, leader_gossip, exit.clone());
|
||||
|
||||
//start crdt2
|
||||
let mut crdt2 = Crdt::new(target2_data.clone());
|
||||
crdt2.insert(leader_data.clone());
|
||||
crdt2.insert(&leader_data);
|
||||
crdt2.set_leader(leader_data.id);
|
||||
let leader_id = leader_data.id;
|
||||
let cref2 = Arc::new(RwLock::new(crdt2));
|
||||
let t2_gossip = Crdt::gossip(cref2.clone(), exit.clone());
|
||||
let t2_listen = Crdt::listen(cref2, target2_gossip, exit.clone());
|
||||
let window2 = streamer::default_window();
|
||||
let t2_listen = Crdt::listen(cref2, window2, target2_gossip, exit.clone());
|
||||
|
||||
// setup some blob services to send blobs into the socket
|
||||
// to simulate the source peer and get blobs out of the socket to
|
||||
|
|
Loading…
Reference in New Issue