diff --git a/src/tpu.rs b/src/tpu.rs index 111ee3428..b45087ad0 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -36,14 +36,15 @@ impl Tpu { } pub fn write_service( - obj: SharedTpu, + accounting_stage: Arc, + request_processor: Arc, exit: Arc, broadcast: streamer::BlobSender, blob_recycler: packet::BlobRecycler, writer: Mutex, ) -> JoinHandle<()> { spawn(move || loop { - let entry_writer = EntryWriter::new(&obj.accounting_stage, &obj.request_processor); + let entry_writer = EntryWriter::new(&accounting_stage, &request_processor); let _ = entry_writer.write_and_send_entries(&broadcast, &blob_recycler, &writer); if exit.load(Ordering::Relaxed) { info!("broadcat_service exiting"); @@ -52,9 +53,13 @@ impl Tpu { }) } - pub fn drain_service(obj: SharedTpu, exit: Arc) -> JoinHandle<()> { + pub fn drain_service( + accounting_stage: Arc, + request_processor: Arc, + exit: Arc, + ) -> JoinHandle<()> { spawn(move || { - let entry_writer = EntryWriter::new(&obj.accounting_stage, &obj.request_processor); + let entry_writer = EntryWriter::new(&accounting_stage, &request_processor); loop { let _ = entry_writer.drain_entries(); if exit.load(Ordering::Relaxed) { @@ -108,7 +113,8 @@ impl Tpu { let (broadcast_sender, broadcast_receiver) = channel(); let t_write = Self::write_service( - obj.clone(), + obj.accounting_stage.clone(), + obj.request_processor.clone(), exit.clone(), broadcast_sender, blob_recycler.clone(), @@ -275,7 +281,11 @@ impl Tpu { blob_recycler.clone(), ); - let t_write = Self::drain_service(obj.clone(), exit.clone()); + let t_write = Self::drain_service( + obj.accounting_stage.clone(), + obj.request_processor.clone(), + exit.clone(), + ); let t_responder = streamer::responder( respond_socket,