use next_entries() in recorder, recycle blobs in reconstruct_from_blobs

This commit is contained in:
Rob Walker 2018-06-22 12:20:29 -07:00 committed by Greg Fitzgerald
parent 17e8ad110f
commit 3dbbb398df
4 changed files with 48 additions and 21 deletions

View File

@ -41,12 +41,26 @@ impl Block for [Entry] {
}
}
pub fn reconstruct_entries_from_blobs(blobs: &VecDeque<SharedBlob>) -> bincode::Result<Vec<Entry>> {
pub fn reconstruct_entries_from_blobs(
blobs: VecDeque<SharedBlob>,
blob_recycler: &packet::BlobRecycler,
) -> bincode::Result<Vec<Entry>> {
let mut entries: Vec<Entry> = Vec::with_capacity(blobs.len());
for msgs in blobs {
let blob = msgs.read().unwrap();
let entry: Entry = deserialize(&blob.data()[..blob.meta.size])?;
entries.push(entry);
for blob in blobs {
let entry = {
let msg = blob.read().unwrap();
deserialize(&msg.data()[..msg.meta.size])
};
blob_recycler.recycle(blob);
match entry {
Ok(entry) => entries.push(entry),
Err(err) => {
trace!("reconstruct_entry_from_blobs: {}", err);
return Err(err);
}
}
}
Ok(entries)
}
@ -148,7 +162,10 @@ mod tests {
let mut blob_q = VecDeque::new();
entries.to_blobs(&blob_recycler, &mut blob_q);
assert_eq!(reconstruct_entries_from_blobs(&blob_q).unwrap(), entries);
assert_eq!(
reconstruct_entries_from_blobs(blob_q, &blob_recycler).unwrap(),
entries
);
}
#[test]
@ -156,7 +173,7 @@ mod tests {
let blob_recycler = BlobRecycler::default();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
let blobs_q = packet::to_blobs(vec![(0, addr)], &blob_recycler).unwrap(); // <-- attack!
assert!(reconstruct_entries_from_blobs(&blobs_q).is_err());
assert!(reconstruct_entries_from_blobs(blobs_q, &blob_recycler).is_err());
}
#[test]
@ -206,10 +223,10 @@ mod bench {
bencher.iter(|| {
let mut blob_q = VecDeque::new();
entries.to_blobs(&blob_recycler, &mut blob_q);
assert_eq!(reconstruct_entries_from_blobs(&blob_q).unwrap(), entries);
for blob in blob_q {
blob_recycler.recycle(blob);
}
assert_eq!(
reconstruct_entries_from_blobs(blob_q, &blob_recycler).unwrap(),
entries
);
});
}

View File

@ -90,8 +90,15 @@ impl RecordStage {
} else {
vec![]
};
let entry = recorder.record(txs);
sender.send(entry).map_err(|_| ())
let entries = recorder.record(txs);
let mut result = Ok(());
for entry in entries {
result = sender.send(entry).map_err(|_| ());
if result.is_err() {
break;
}
}
result
}
fn process_signals(

View File

@ -3,6 +3,7 @@
use entry::Entry;
use hash::{hash, Hash};
use ledger::next_entries_mut;
use std::time::{Duration, Instant};
use transaction::Transaction;
@ -26,15 +27,19 @@ impl Recorder {
self.num_hashes += 1;
}
pub fn record(&mut self, transactions: Vec<Transaction>) -> Entry {
Entry::new_mut(&mut self.last_hash, &mut self.num_hashes, transactions)
pub fn record(&mut self, transactions: Vec<Transaction>) -> Vec<Entry> {
next_entries_mut(&mut self.last_hash, &mut self.num_hashes, transactions)
}
pub fn tick(&mut self, start_time: Instant, tick_duration: Duration) -> Option<Entry> {
if start_time.elapsed() > tick_duration * (self.num_ticks + 1) {
// TODO: don't let this overflow u32
self.num_ticks += 1;
Some(self.record(vec![]))
Some(Entry::new_mut(
&mut self.last_hash,
&mut self.num_hashes,
vec![],
))
} else {
None
}

View File

@ -23,15 +23,13 @@ impl ReplicateStage {
) -> Result<()> {
let timer = Duration::new(1, 0);
let blobs = blob_receiver.recv_timeout(timer)?;
let entries = ledger::reconstruct_entries_from_blobs(&blobs)?;
let blobs_len = blobs.len();
let entries = ledger::reconstruct_entries_from_blobs(blobs, &blob_recycler)?;
let res = bank.process_entries(entries);
if res.is_err() {
error!("process_entries {} {:?}", blobs.len(), res);
error!("process_entries {} {:?}", blobs_len, res);
}
res?;
for blob in blobs {
blob_recycler.recycle(blob);
}
Ok(())
}