Factor out entry processing and fix replicate test to call global setup fn

This commit is contained in:
Stephen Akridge 2018-05-03 15:55:59 -07:00
parent 38af0f436d
commit 2d5313639a
1 changed files with 66 additions and 43 deletions

View File

@ -11,7 +11,7 @@ use event::Event;
use hash::Hash; use hash::Hash;
use historian::Historian; use historian::Historian;
use packet; use packet;
use packet::{SharedPackets, BLOB_SIZE}; use packet::{SharedBlob, SharedPackets, BLOB_SIZE};
use rayon::prelude::*; use rayon::prelude::*;
use recorder::Signal; use recorder::Signal;
use result::Result; use result::Result;
@ -135,18 +135,11 @@ impl AccountantSkel {
Ok(l) Ok(l)
} }
/// Process any Entry items that have been published by the Historian. fn process_entry_list_into_blobs(
/// continuosly broadcast blobs of entries out list: &Vec<Entry>,
fn run_sync<W: Write>(
obj: SharedSkel,
broadcast: &streamer::BlobSender,
blob_recycler: &packet::BlobRecycler, blob_recycler: &packet::BlobRecycler,
writer: &Arc<Mutex<W>>, q: &mut VecDeque<SharedBlob>,
exit: Arc<AtomicBool>, ) {
) -> Result<()> {
let mut q = VecDeque::new();
while let Ok(list) = Self::receive_all(&obj, writer) {
trace!("New blobs? {}", list.len());
let mut start = 0; let mut start = 0;
let mut end = 0; let mut end = 0;
while start < list.len() { while start < list.len() {
@ -161,13 +154,18 @@ impl AccountantSkel {
} }
// See that we made progress and a single // See that we made progress and a single
// vec of Events wasn't too big for a single packet // vec of Events wasn't too big for a single packet
assert!(end > start); if end <= start {
eprintln!("Event too big for the blob!");
start += 1;
end = start;
continue;
}
let b = blob_recycler.allocate(); let b = blob_recycler.allocate();
let pos = { let pos = {
let mut bd = b.write().unwrap(); let mut bd = b.write().unwrap();
let mut out = Cursor::new(bd.data_mut()); let mut out = Cursor::new(bd.data_mut());
serialize_into(&mut out, &list[start..end]) serialize_into(&mut out, &list[start..end]).expect("failed to serialize output");
.expect("failed to serialize output");
out.position() as usize out.position() as usize
}; };
assert!(pos < BLOB_SIZE); assert!(pos < BLOB_SIZE);
@ -175,6 +173,21 @@ impl AccountantSkel {
q.push_back(b); q.push_back(b);
start = end; start = end;
} }
}
/// Process any Entry items that have been published by the Historian.
/// continuosly broadcast blobs of entries out
fn run_sync<W: Write>(
obj: SharedSkel,
broadcast: &streamer::BlobSender,
blob_recycler: &packet::BlobRecycler,
writer: &Arc<Mutex<W>>,
exit: Arc<AtomicBool>,
) -> Result<()> {
let mut q = VecDeque::new();
while let Ok(list) = Self::receive_all(&obj, writer) {
trace!("New blobs? {}", list.len());
Self::process_entry_list_into_blobs(&list, blob_recycler, &mut q);
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
break; break;
} }
@ -617,7 +630,7 @@ mod tests {
use accountant_skel::{to_packets, Request}; use accountant_skel::{to_packets, Request};
use bincode::serialize; use bincode::serialize;
use ecdsa; use ecdsa;
use packet::{BlobRecycler, PacketRecycler, NUM_PACKETS}; use packet::{BlobRecycler, PacketRecycler, BLOB_SIZE, NUM_PACKETS};
use transaction::{memfind, test_tx}; use transaction::{memfind, test_tx};
use accountant::Accountant; use accountant::Accountant;
@ -632,6 +645,7 @@ mod tests {
use futures::Future; use futures::Future;
use hash::{hash, Hash}; use hash::{hash, Hash};
use historian::Historian; use historian::Historian;
use logger;
use mint::Mint; use mint::Mint;
use plan::Plan; use plan::Plan;
use recorder::Signal; use recorder::Signal;
@ -764,18 +778,6 @@ mod tests {
} }
} }
use std::sync::{Once, ONCE_INIT};
extern crate env_logger;
static INIT: Once = ONCE_INIT;
/// Setup function that is only run once, even if called multiple times.
fn setup() {
INIT.call_once(|| {
env_logger::init().unwrap();
});
}
fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket) { fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket) {
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); let replicate = UdpSocket::bind("127.0.0.1:0").unwrap();
@ -793,7 +795,7 @@ mod tests {
/// Test that mesasge sent from leader to target1 and repliated to target2 /// Test that mesasge sent from leader to target1 and repliated to target2
#[test] #[test]
fn test_replicate() { fn test_replicate() {
setup(); logger::setup();
let (leader_data, leader_gossip, _, leader_serve) = test_node(); let (leader_data, leader_gossip, _, leader_serve) = test_node();
let (target1_data, target1_gossip, target1_replicate, _) = test_node(); let (target1_data, target1_gossip, target1_replicate, _) = test_node();
let (target2_data, target2_gossip, target2_replicate, _) = test_node(); let (target2_data, target2_gossip, target2_replicate, _) = test_node();
@ -932,6 +934,27 @@ mod tests {
t_l_gossip.join().expect("join"); t_l_gossip.join().expect("join");
t_l_listen.join().expect("join"); t_l_listen.join().expect("join");
} }
#[test]
fn test_entry_to_blobs() {
let zero = Hash::default();
let keypair = KeyPair::new();
let tr0 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 0, zero));
let tr1 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 1, zero));
let e0 = entry::create_entry(&zero, 0, vec![tr0.clone(), tr1.clone()]);
let entry_list = vec![e0; 1000];
let blob_recycler = BlobRecycler::default();
let mut blob_q = VecDeque::new();
AccountantSkel::process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q);
let serialized_entry_list = serialize(&entry_list).unwrap();
let mut num_blobs_ref = serialized_entry_list.len() / BLOB_SIZE;
if serialized_entry_list.len() % BLOB_SIZE != 0 {
num_blobs_ref += 1
}
trace!("len: {} ref_len: {}", blob_q.len(), num_blobs_ref);
assert!(blob_q.len() > num_blobs_ref);
}
} }
#[cfg(all(feature = "unstable", test))] #[cfg(all(feature = "unstable", test))]