diff --git a/src/bin/genesis.rs b/src/bin/genesis.rs index 884fadae26..bc0698cd41 100644 --- a/src/bin/genesis.rs +++ b/src/bin/genesis.rs @@ -5,10 +5,12 @@ extern crate serde_json; extern crate solana; use atty::{is, Stream}; +use solana::entry_writer::EntryWriter; use solana::mint::Mint; use std::error; -use std::io::{stdin, stdout, Read, Write}; +use std::io::{stdin, stdout, Read}; use std::process::exit; +use std::sync::Mutex; fn main() -> Result<(), Box> { if is(Stream::Stdin) { @@ -24,9 +26,7 @@ fn main() -> Result<(), Box> { } let mint: Mint = serde_json::from_str(&buffer)?; - let mut writer = stdout(); - for x in mint.create_entries() { - writeln!(writer, "{}", serde_json::to_string(&x)?)?; - } + let writer = Mutex::new(stdout()); + EntryWriter::write_entries(&writer, &mint.create_entries())?; Ok(()) } diff --git a/src/entry_writer.rs b/src/entry_writer.rs index 0ca30d772a..d6f8303800 100644 --- a/src/entry_writer.rs +++ b/src/entry_writer.rs @@ -25,6 +25,18 @@ impl<'a> EntryWriter<'a> { EntryWriter { bank } } + fn write_entry(writer: &Mutex, entry: &Entry) -> io::Result<()> { + let serialized = serde_json::to_string(&entry).unwrap(); + writeln!(writer.lock().unwrap(), "{}", serialized) + } + + pub fn write_entries(writer: &Mutex, entries: &[Entry]) -> io::Result<()> { + for entry in entries { + Self::write_entry(writer, entry)?; + } + Ok(()) + } + fn write_and_register_entry( &self, writer: &Mutex, @@ -34,30 +46,26 @@ impl<'a> EntryWriter<'a> { if !entry.has_more { self.bank.register_entry_id(&entry.id); } - writeln!( - writer - .lock() - .expect("'writer' lock in fn fn write_and_register_entry"), - "{}", - serde_json::to_string(&entry) - .expect("'entry' to_strong in fn write_and_register_entry") - ) + Self::write_entry(&writer, entry) } - fn write_entries( + fn write_and_register_entries( &self, writer: &Mutex, - entry_receiver: &Receiver, - ) -> Result> { - //TODO implement a serialize for channel that does this without allocations + entries: &[Entry], + ) -> io::Result<()> { + for entry in entries { + self.write_and_register_entry(writer, &entry)?; + } + Ok(()) + } + + fn recv_entries(entry_receiver: &Receiver) -> Result> { let entry = entry_receiver.recv_timeout(Duration::new(1, 0))?; let mut entries = vec![entry]; while let Ok(entry) = entry_receiver.try_recv() { entries.push(entry); } - for entry in &entries { - self.write_and_register_entry(writer, &entry)?; - } Ok(entries) } @@ -70,7 +78,8 @@ impl<'a> EntryWriter<'a> { writer: &Mutex, entry_receiver: &Receiver, ) -> Result<()> { - let entries = self.write_entries(writer, entry_receiver)?; + let entries = Self::recv_entries(entry_receiver)?; + self.write_and_register_entries(writer, &entries)?; trace!("New blobs? {}", entries.len()); let mut blobs = VecDeque::new(); entries.to_blobs(blob_recycler, &mut blobs); @@ -84,7 +93,8 @@ impl<'a> EntryWriter<'a> { /// Process any Entry items that have been published by the Historian. /// continuosly broadcast blobs of entries out pub fn drain_entries(&self, entry_receiver: &Receiver) -> Result<()> { - self.write_entries(&Arc::new(Mutex::new(sink())), entry_receiver)?; + let entries = Self::recv_entries(entry_receiver)?; + self.write_and_register_entries(&Arc::new(Mutex::new(sink())), &entries)?; Ok(()) } }