Move channel code to write stage

This commit is contained in:
Greg Fitzgerald 2018-07-01 09:15:19 -07:00 committed by Greg Fitzgerald
parent 76fc5822c9
commit 09e9139855
2 changed files with 44 additions and 43 deletions

View File

@ -4,16 +4,9 @@
use bank::Bank;
use entry::Entry;
use ledger::Block;
use packet::BlobRecycler;
use result::Result;
use serde_json;
use std::collections::VecDeque;
use std::io::{self, sink, Write};
use std::sync::mpsc::Receiver;
use std::io::{self, Write};
use std::sync::Mutex;
use std::time::Duration;
use streamer::BlobSender;
pub struct EntryWriter<'a> {
bank: &'a Bank,
@ -49,7 +42,7 @@ impl<'a> EntryWriter<'a> {
Self::write_entry(&writer, entry)
}
fn write_and_register_entries<W: Write>(
pub fn write_and_register_entries<W: Write>(
&self,
writer: &Mutex<W>,
entries: &[Entry],
@ -59,35 +52,6 @@ impl<'a> EntryWriter<'a> {
}
Ok(())
}
/// Process any Entry items that have been published by the Historian.
/// continuosly broadcast blobs of entries out
pub fn write_and_send_entries<W: Write>(
&self,
blob_sender: &BlobSender,
blob_recycler: &BlobRecycler,
writer: &Mutex<W>,
entry_receiver: &Receiver<Vec<Entry>>,
) -> Result<()> {
let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
self.write_and_register_entries(writer, &entries)?;
trace!("New blobs? {}", entries.len());
let mut blobs = VecDeque::new();
entries.to_blobs(blob_recycler, &mut blobs);
if !blobs.is_empty() {
trace!("broadcasting {}", blobs.len());
blob_sender.send(blobs)?;
}
Ok(())
}
/// Process any Entry items that have been published by the Historian.
/// continuosly broadcast blobs of entries out
pub fn drain_entries(&self, entry_receiver: &Receiver<Vec<Entry>>) -> Result<()> {
let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
self.write_and_register_entries(&Mutex::new(sink()), &entries)?;
Ok(())
}
}
#[cfg(test)]
@ -120,7 +84,7 @@ mod tests {
// Verify that write_and_register_entry doesn't register the first entries after a split.
assert_eq!(bank.last_id(), mint.last_id());
let writer = Mutex::new(sink());
let writer = Mutex::new(io::sink());
entry_writer
.write_and_register_entry(&writer, &entries[0])
.unwrap();

View File

@ -5,13 +5,17 @@
use bank::Bank;
use entry::Entry;
use entry_writer::EntryWriter;
use ledger::Block;
use packet::BlobRecycler;
use std::io::Write;
use result::Result;
use std::collections::VecDeque;
use std::io::{self, Write};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex};
use std::thread::{Builder, JoinHandle};
use streamer::BlobReceiver;
use std::time::Duration;
use streamer::{BlobReceiver, BlobSender};
pub struct WriteStage {
pub thread_hdl: JoinHandle<()>,
@ -19,6 +23,27 @@ pub struct WriteStage {
}
impl WriteStage {
/// Process any Entry items that have been published by the Historian.
/// continuosly broadcast blobs of entries out
pub fn write_and_send_entries<W: Write>(
entry_writer: &EntryWriter,
blob_sender: &BlobSender,
blob_recycler: &BlobRecycler,
writer: &Mutex<W>,
entry_receiver: &Receiver<Vec<Entry>>,
) -> Result<()> {
let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
entry_writer.write_and_register_entries(writer, &entries)?;
trace!("New blobs? {}", entries.len());
let mut blobs = VecDeque::new();
entries.to_blobs(blob_recycler, &mut blobs);
if !blobs.is_empty() {
trace!("broadcasting {}", blobs.len());
blob_sender.send(blobs)?;
}
Ok(())
}
/// Create a new Rpu that wraps the given Bank.
pub fn new<W: Write + Send + 'static>(
bank: Arc<Bank>,
@ -32,7 +57,8 @@ impl WriteStage {
.name("solana-writer".to_string())
.spawn(move || loop {
let entry_writer = EntryWriter::new(&bank);
let _ = entry_writer.write_and_send_entries(
let _ = Self::write_and_send_entries(
&entry_writer,
&blob_sender,
&blob_recycler,
&writer,
@ -51,6 +77,17 @@ impl WriteStage {
}
}
/// Process any Entry items that have been published by the Historian.
/// continuosly broadcast blobs of entries out
pub fn drain_entries(
entry_writer: &EntryWriter,
entry_receiver: &Receiver<Vec<Entry>>,
) -> Result<()> {
let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
entry_writer.write_and_register_entries(&Mutex::new(io::sink()), &entries)?;
Ok(())
}
pub fn new_drain(
bank: Arc<Bank>,
exit: Arc<AtomicBool>,
@ -62,7 +99,7 @@ impl WriteStage {
.spawn(move || {
let entry_writer = EntryWriter::new(&bank);
loop {
let _ = entry_writer.drain_entries(&entry_receiver);
let _ = Self::drain_entries(&entry_writer, &entry_receiver);
if exit.load(Ordering::Relaxed) {
info!("drain_service exiting");
break;