diff --git a/src/tpu.rs b/src/tpu.rs index 3b8fa0fba0..d701e1590d 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -80,7 +80,7 @@ impl Tpu { /// Process any Entry items that have been published by the Historian. /// continuosly broadcast blobs of entries out - fn broadcast_entries( + fn write_and_send_entries( &self, broadcast: &streamer::BlobSender, blob_recycler: &packet::BlobRecycler, @@ -96,7 +96,7 @@ impl Tpu { Ok(()) } - pub fn broadcast_service( + pub fn write_service( obj: SharedTpu, exit: Arc, broadcast: streamer::BlobSender, @@ -104,7 +104,7 @@ impl Tpu { writer: Mutex, ) -> JoinHandle<()> { spawn(move || loop { - let _ = obj.broadcast_entries(&broadcast, &blob_recycler, &writer); + let _ = obj.write_and_send_entries(&broadcast, &blob_recycler, &writer); if exit.load(Ordering::Relaxed) { info!("broadcat_service exiting"); break; @@ -119,11 +119,11 @@ impl Tpu { Ok(()) } - pub fn write_service(obj: SharedTpu, exit: Arc) -> JoinHandle<()> { + pub fn drain_service(obj: SharedTpu, exit: Arc) -> JoinHandle<()> { spawn(move || loop { let _ = obj.drain_entries(); if exit.load(Ordering::Relaxed) { - info!("write_service exiting"); + info!("drain_service exiting"); break; } }) @@ -176,24 +176,29 @@ impl Tpu { Ok(()) } - /// Process verified blobs, already in order - /// Respond with a signed hash of the state - fn replicate_state( - obj: &Tpu, - verified_receiver: &streamer::BlobReceiver, - blob_recycler: &packet::BlobRecycler, - ) -> Result<()> { - let timer = Duration::new(1, 0); - let blobs = verified_receiver.recv_timeout(timer)?; - trace!("replicating blobs {}", blobs.len()); - let entries = ledger::reconstruct_entries_from_blobs(&blobs); - obj.accounting_stage - .accountant - .process_verified_entries(entries)?; - for blob in blobs { - blob_recycler.recycle(blob); - } - Ok(()) + fn verifier_service( + exit: Arc, + packets_receiver: Arc>, + verified_sender: Arc)>>>>, + ) -> JoinHandle<()> { + spawn(move || loop { + let e = Self::verifier(&packets_receiver.clone(), &verified_sender.clone()); + if e.is_err() && exit.load(Ordering::Relaxed) { + break; + } + }) + } + + fn verifier_services( + exit: Arc, + packets_receiver: streamer::PacketReceiver, + verified_sender: Sender)>>, + ) -> Vec> { + let sender = Arc::new(Mutex::new(verified_sender)); + let receiver = Arc::new(Mutex::new(packets_receiver)); + (0..4) + .map(|_| Self::verifier_service(exit.clone(), receiver.clone(), sender.clone())) + .collect() } fn thin_client_service( @@ -220,31 +225,6 @@ impl Tpu { }) } - fn verifier_service( - exit: Arc, - packets_receiver: Arc>, - verified_sender: Arc)>>>>, - ) -> JoinHandle<()> { - spawn(move || loop { - let e = Self::verifier(&packets_receiver.clone(), &verified_sender.clone()); - if e.is_err() && exit.load(Ordering::Relaxed) { - break; - } - }) - } - - fn verifier_services( - exit: Arc, - packets_receiver: streamer::PacketReceiver, - verified_sender: Sender)>>, - ) -> Vec> { - let sender = Arc::new(Mutex::new(verified_sender)); - let receiver = Arc::new(Mutex::new(packets_receiver)); - (0..4) - .map(|_| Self::verifier_service(exit.clone(), receiver.clone(), sender.clone())) - .collect() - } - /// Create a UDP microservice that forwards messages the given Tpu. /// This service is the network leader /// Set `exit` to shutdown its threads. @@ -264,11 +244,8 @@ impl Tpu { // make sure we are on the same interface let mut local = requests_socket.local_addr()?; local.set_port(0); - let respond_socket = UdpSocket::bind(local.clone())?; let packet_recycler = packet::PacketRecycler::default(); - let blob_recycler = packet::BlobRecycler::default(); - let (packet_sender, packet_receiver) = channel(); let t_receiver = streamer::receiver( requests_socket, @@ -281,8 +258,19 @@ impl Tpu { let verify_threads: Vec<_> = Self::verifier_services(exit.clone(), packet_receiver, verified_sender); + let blob_recycler = packet::BlobRecycler::default(); + let (responder_sender, responder_receiver) = channel(); + let t_thin_client = Self::thin_client_service( + obj.clone(), + exit.clone(), + verified_receiver, + responder_sender, + packet_recycler.clone(), + blob_recycler.clone(), + ); + let (broadcast_sender, broadcast_receiver) = channel(); - let t_sync = Self::broadcast_service( + let t_write = Self::write_service( obj.clone(), exit.clone(), broadcast_sender, @@ -299,7 +287,7 @@ impl Tpu { broadcast_receiver, ); - let (responder_sender, responder_receiver) = channel(); + let respond_socket = UdpSocket::bind(local.clone())?; let t_responder = streamer::responder( respond_socket, exit.clone(), @@ -307,20 +295,11 @@ impl Tpu { responder_receiver, ); - let t_thin_client = Self::thin_client_service( - obj.clone(), - exit.clone(), - verified_receiver, - responder_sender, - packet_recycler.clone(), - blob_recycler.clone(), - ); - let mut threads = vec![ t_receiver, t_responder, t_thin_client, - t_sync, + t_write, t_gossip, t_listen, t_broadcast, @@ -329,6 +308,26 @@ impl Tpu { Ok(threads) } + /// Process verified blobs, already in order + /// Respond with a signed hash of the state + fn replicate_state( + obj: &Tpu, + verified_receiver: &streamer::BlobReceiver, + blob_recycler: &packet::BlobRecycler, + ) -> Result<()> { + let timer = Duration::new(1, 0); + let blobs = verified_receiver.recv_timeout(timer)?; + trace!("replicating blobs {}", blobs.len()); + let entries = ledger::reconstruct_entries_from_blobs(&blobs); + obj.accounting_stage + .accountant + .process_verified_entries(entries)?; + for blob in blobs { + blob_recycler.recycle(blob); + } + Ok(()) + } + /// This service receives messages from a leader in the network and processes the transactions /// on the accountant state. /// # Arguments @@ -438,7 +437,7 @@ impl Tpu { let verify_threads: Vec<_> = Self::verifier_services(exit.clone(), packet_receiver, verified_sender); - let t_sync = Self::write_service(obj.clone(), exit.clone()); + let t_write = Self::drain_service(obj.clone(), exit.clone()); let t_thin_client = Self::thin_client_service( obj.clone(), @@ -461,7 +460,7 @@ impl Tpu { t_packet_receiver, t_responder, t_thin_client, - t_sync, + t_write, ]; threads.extend(verify_threads.into_iter()); Ok(threads)