From 1c35d59f263121f2f8624367d59d8368bb087342 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Sun, 1 Jul 2018 08:16:41 -0700 Subject: [PATCH] Receive entries first, then write --- src/entry_writer.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/entry_writer.rs b/src/entry_writer.rs index be7839d4f9..0ca30d772a 100644 --- a/src/entry_writer.rs +++ b/src/entry_writer.rs @@ -51,12 +51,13 @@ impl<'a> EntryWriter<'a> { ) -> Result> { //TODO implement a serialize for channel that does this without allocations let entry = entry_receiver.recv_timeout(Duration::new(1, 0))?; - self.write_and_register_entry(writer, &entry)?; let mut entries = vec![entry]; while let Ok(entry) = entry_receiver.try_recv() { - self.write_and_register_entry(writer, &entry)?; entries.push(entry); } + for entry in &entries { + self.write_and_register_entry(writer, &entry)?; + } Ok(entries) } @@ -64,18 +65,18 @@ impl<'a> EntryWriter<'a> { /// continuosly broadcast blobs of entries out pub fn write_and_send_entries( &self, - broadcast: &BlobSender, + blob_sender: &BlobSender, blob_recycler: &BlobRecycler, writer: &Mutex, entry_receiver: &Receiver, ) -> Result<()> { - let mut q = VecDeque::new(); - let list = self.write_entries(writer, entry_receiver)?; - trace!("New blobs? {}", list.len()); - list.to_blobs(blob_recycler, &mut q); - if !q.is_empty() { - trace!("broadcasting {}", q.len()); - broadcast.send(q)?; + let entries = self.write_entries(writer, entry_receiver)?; + trace!("New blobs? {}", entries.len()); + let mut blobs = VecDeque::new(); + entries.to_blobs(blob_recycler, &mut blobs); + if !blobs.is_empty() { + trace!("broadcasting {}", blobs.len()); + blob_sender.send(blobs)?; } Ok(()) }