diff --git a/src/entry_writer.rs b/src/entry_writer.rs index 8cd05d7449..d8d8d73b79 100644 --- a/src/entry_writer.rs +++ b/src/entry_writer.rs @@ -8,47 +8,40 @@ use serde_json; use std::io::{self, Write}; use std::sync::Mutex; -pub struct EntryWriter<'a> { +pub struct EntryWriter<'a, W> { bank: &'a Bank, + writer: Mutex, } -impl<'a> EntryWriter<'a> { +impl<'a, W: Write> EntryWriter<'a, W> { /// Create a new Tpu that wraps the given Bank. - pub fn new(bank: &'a Bank) -> Self { - EntryWriter { bank } + pub fn new(bank: &'a Bank, writer: Mutex) -> Self { + EntryWriter { bank, writer } } - fn write_entry(writer: &Mutex, entry: &Entry) -> io::Result<()> { + fn write_entry(writer: &Mutex, entry: &Entry) -> io::Result<()> { let serialized = serde_json::to_string(&entry).unwrap(); writeln!(writer.lock().unwrap(), "{}", serialized) } - pub fn write_entries(writer: &Mutex, entries: &[Entry]) -> io::Result<()> { + pub fn write_entries(writer: &Mutex, entries: &[Entry]) -> io::Result<()> { for entry in entries { Self::write_entry(writer, entry)?; } Ok(()) } - fn write_and_register_entry( - &self, - writer: &Mutex, - entry: &Entry, - ) -> io::Result<()> { + fn write_and_register_entry(&self, entry: &Entry) -> io::Result<()> { trace!("write_and_register_entry entry"); if !entry.has_more { self.bank.register_entry_id(&entry.id); } - Self::write_entry(&writer, entry) + Self::write_entry(&self.writer, entry) } - pub fn write_and_register_entries( - &self, - writer: &Mutex, - entries: &[Entry], - ) -> io::Result<()> { + pub fn write_and_register_entries(&self, entries: &[Entry]) -> io::Result<()> { for entry in entries { - self.write_and_register_entry(writer, &entry)?; + self.write_and_register_entry(&entry)?; } Ok(()) } @@ -68,7 +61,8 @@ mod tests { let mint = Mint::new(1); let bank = Bank::new(&mint); - let entry_writer = EntryWriter::new(&bank); + let writer = Mutex::new(io::sink()); + let entry_writer = EntryWriter::new(&bank, writer); let keypair = KeyPair::new(); let tx = Transaction::new(&mint.keypair(), keypair.pubkey(), 1, mint.last_id()); @@ -84,16 +78,11 @@ mod tests { // Verify that write_and_register_entry doesn't register the first entries after a split. assert_eq!(bank.last_id(), mint.last_id()); - let writer = Mutex::new(io::sink()); - entry_writer - .write_and_register_entry(&writer, &entries[0]) - .unwrap(); + entry_writer.write_and_register_entry(&entries[0]).unwrap(); assert_eq!(bank.last_id(), mint.last_id()); // Verify that write_and_register_entry registers the final entry after a split. - entry_writer - .write_and_register_entry(&writer, &entries[1]) - .unwrap(); + entry_writer.write_and_register_entry(&entries[1]).unwrap(); assert_eq!(bank.last_id(), entries[1].id); } } diff --git a/src/write_stage.rs b/src/write_stage.rs index c2810164ed..cdcd221c45 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -26,14 +26,13 @@ impl WriteStage { /// Process any Entry items that have been published by the Historian. /// continuosly broadcast blobs of entries out pub fn write_and_send_entries( - entry_writer: &EntryWriter, + entry_writer: &EntryWriter, blob_sender: &BlobSender, blob_recycler: &BlobRecycler, - writer: &Mutex, entry_receiver: &Receiver>, ) -> Result<()> { let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; - entry_writer.write_and_register_entries(writer, &entries)?; + entry_writer.write_and_register_entries(&entries)?; trace!("New blobs? {}", entries.len()); let mut blobs = VecDeque::new(); entries.to_blobs(blob_recycler, &mut blobs); @@ -56,14 +55,12 @@ impl WriteStage { let thread_hdl = Builder::new() .name("solana-writer".to_string()) .spawn(move || { - let entry_writer = EntryWriter::new(&bank); - let writer = Mutex::new(writer); + let entry_writer = EntryWriter::new(&bank, Mutex::new(writer)); loop { let _ = Self::write_and_send_entries( &entry_writer, &blob_sender, &blob_recycler, - &writer, &entry_receiver, ); if exit.load(Ordering::Relaxed) {