No need for TPU dependency

This commit is contained in:
Greg Fitzgerald 2018-05-11 23:51:35 -06:00
parent 2376dfc139
commit 73abea088a
1 changed files with 16 additions and 6 deletions

View File

@ -36,14 +36,15 @@ impl Tpu {
} }
pub fn write_service<W: Write + Send + 'static>( pub fn write_service<W: Write + Send + 'static>(
obj: SharedTpu, accounting_stage: Arc<AccountingStage>,
request_processor: Arc<RequestProcessor>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
broadcast: streamer::BlobSender, broadcast: streamer::BlobSender,
blob_recycler: packet::BlobRecycler, blob_recycler: packet::BlobRecycler,
writer: Mutex<W>, writer: Mutex<W>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
spawn(move || loop { spawn(move || loop {
let entry_writer = EntryWriter::new(&obj.accounting_stage, &obj.request_processor); let entry_writer = EntryWriter::new(&accounting_stage, &request_processor);
let _ = entry_writer.write_and_send_entries(&broadcast, &blob_recycler, &writer); let _ = entry_writer.write_and_send_entries(&broadcast, &blob_recycler, &writer);
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
info!("broadcat_service exiting"); info!("broadcat_service exiting");
@ -52,9 +53,13 @@ impl Tpu {
}) })
} }
pub fn drain_service(obj: SharedTpu, exit: Arc<AtomicBool>) -> JoinHandle<()> { pub fn drain_service(
accounting_stage: Arc<AccountingStage>,
request_processor: Arc<RequestProcessor>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
spawn(move || { spawn(move || {
let entry_writer = EntryWriter::new(&obj.accounting_stage, &obj.request_processor); let entry_writer = EntryWriter::new(&accounting_stage, &request_processor);
loop { loop {
let _ = entry_writer.drain_entries(); let _ = entry_writer.drain_entries();
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
@ -108,7 +113,8 @@ impl Tpu {
let (broadcast_sender, broadcast_receiver) = channel(); let (broadcast_sender, broadcast_receiver) = channel();
let t_write = Self::write_service( let t_write = Self::write_service(
obj.clone(), obj.accounting_stage.clone(),
obj.request_processor.clone(),
exit.clone(), exit.clone(),
broadcast_sender, broadcast_sender,
blob_recycler.clone(), blob_recycler.clone(),
@ -275,7 +281,11 @@ impl Tpu {
blob_recycler.clone(), blob_recycler.clone(),
); );
let t_write = Self::drain_service(obj.clone(), exit.clone()); let t_write = Self::drain_service(
obj.accounting_stage.clone(),
obj.request_processor.clone(),
exit.clone(),
);
let t_responder = streamer::responder( let t_responder = streamer::responder(
respond_socket, respond_socket,