Receive entries first, then write

This commit is contained in:
Greg Fitzgerald 2018-07-01 08:16:41 -07:00 committed by Greg Fitzgerald
parent adcaf715c6
commit 1c35d59f26
1 changed files with 11 additions and 10 deletions

View File

@ -51,12 +51,13 @@ impl<'a> EntryWriter<'a> {
) -> Result<Vec<Entry>> {
//TODO implement a serialize for channel that does this without allocations
let entry = entry_receiver.recv_timeout(Duration::new(1, 0))?;
self.write_and_register_entry(writer, &entry)?;
let mut entries = vec![entry];
while let Ok(entry) = entry_receiver.try_recv() {
self.write_and_register_entry(writer, &entry)?;
entries.push(entry);
}
for entry in &entries {
self.write_and_register_entry(writer, &entry)?;
}
Ok(entries)
}
@ -64,18 +65,18 @@ impl<'a> EntryWriter<'a> {
/// continuosly broadcast blobs of entries out
pub fn write_and_send_entries<W: Write>(
&self,
broadcast: &BlobSender,
blob_sender: &BlobSender,
blob_recycler: &BlobRecycler,
writer: &Mutex<W>,
entry_receiver: &Receiver<Entry>,
) -> Result<()> {
let mut q = VecDeque::new();
let list = self.write_entries(writer, entry_receiver)?;
trace!("New blobs? {}", list.len());
list.to_blobs(blob_recycler, &mut q);
if !q.is_empty() {
trace!("broadcasting {}", q.len());
broadcast.send(q)?;
let entries = self.write_entries(writer, entry_receiver)?;
trace!("New blobs? {}", entries.len());
let mut blobs = VecDeque::new();
entries.to_blobs(blob_recycler, &mut blobs);
if !blobs.is_empty() {
trace!("broadcasting {}", blobs.len());
blob_sender.send(blobs)?;
}
Ok(())
}