Refactor such that genesis can use entry_writer

This commit is contained in:
Greg Fitzgerald 2018-07-01 08:34:56 -07:00 committed by Greg Fitzgerald
parent 1c35d59f26
commit b60802ddff
2 changed files with 32 additions and 22 deletions

View File

@ -5,10 +5,12 @@ extern crate serde_json;
extern crate solana;
use atty::{is, Stream};
use solana::entry_writer::EntryWriter;
use solana::mint::Mint;
use std::error;
use std::io::{stdin, stdout, Read, Write};
use std::io::{stdin, stdout, Read};
use std::process::exit;
use std::sync::Mutex;
fn main() -> Result<(), Box<error::Error>> {
if is(Stream::Stdin) {
@ -24,9 +26,7 @@ fn main() -> Result<(), Box<error::Error>> {
}
let mint: Mint = serde_json::from_str(&buffer)?;
let mut writer = stdout();
for x in mint.create_entries() {
writeln!(writer, "{}", serde_json::to_string(&x)?)?;
}
let writer = Mutex::new(stdout());
EntryWriter::write_entries(&writer, &mint.create_entries())?;
Ok(())
}

View File

@ -25,6 +25,18 @@ impl<'a> EntryWriter<'a> {
EntryWriter { bank }
}
fn write_entry<W: Write>(writer: &Mutex<W>, entry: &Entry) -> io::Result<()> {
let serialized = serde_json::to_string(&entry).unwrap();
writeln!(writer.lock().unwrap(), "{}", serialized)
}
pub fn write_entries<W: Write>(writer: &Mutex<W>, entries: &[Entry]) -> io::Result<()> {
for entry in entries {
Self::write_entry(writer, entry)?;
}
Ok(())
}
fn write_and_register_entry<W: Write>(
&self,
writer: &Mutex<W>,
@ -34,30 +46,26 @@ impl<'a> EntryWriter<'a> {
if !entry.has_more {
self.bank.register_entry_id(&entry.id);
}
writeln!(
writer
.lock()
.expect("'writer' lock in fn fn write_and_register_entry"),
"{}",
serde_json::to_string(&entry)
.expect("'entry' to_strong in fn write_and_register_entry")
)
Self::write_entry(&writer, entry)
}
fn write_entries<W: Write>(
fn write_and_register_entries<W: Write>(
&self,
writer: &Mutex<W>,
entry_receiver: &Receiver<Entry>,
) -> Result<Vec<Entry>> {
//TODO implement a serialize for channel that does this without allocations
entries: &[Entry],
) -> io::Result<()> {
for entry in entries {
self.write_and_register_entry(writer, &entry)?;
}
Ok(())
}
fn recv_entries(entry_receiver: &Receiver<Entry>) -> Result<Vec<Entry>> {
let entry = entry_receiver.recv_timeout(Duration::new(1, 0))?;
let mut entries = vec![entry];
while let Ok(entry) = entry_receiver.try_recv() {
entries.push(entry);
}
for entry in &entries {
self.write_and_register_entry(writer, &entry)?;
}
Ok(entries)
}
@ -70,7 +78,8 @@ impl<'a> EntryWriter<'a> {
writer: &Mutex<W>,
entry_receiver: &Receiver<Entry>,
) -> Result<()> {
let entries = self.write_entries(writer, entry_receiver)?;
let entries = Self::recv_entries(entry_receiver)?;
self.write_and_register_entries(writer, &entries)?;
trace!("New blobs? {}", entries.len());
let mut blobs = VecDeque::new();
entries.to_blobs(blob_recycler, &mut blobs);
@ -84,7 +93,8 @@ impl<'a> EntryWriter<'a> {
/// Process any Entry items that have been published by the Historian.
/// continuosly broadcast blobs of entries out
pub fn drain_entries(&self, entry_receiver: &Receiver<Entry>) -> Result<()> {
self.write_entries(&Arc::new(Mutex::new(sink())), entry_receiver)?;
let entries = Self::recv_entries(entry_receiver)?;
self.write_and_register_entries(&Arc::new(Mutex::new(sink())), &entries)?;
Ok(())
}
}