Reorganize

This commit is contained in:
Greg Fitzgerald 2018-05-11 20:50:50 -06:00
parent 765d901530
commit b781fdbd04
1 changed files with 64 additions and 65 deletions

View File

@ -80,7 +80,7 @@ impl Tpu {
/// Process any Entry items that have been published by the Historian. /// Process any Entry items that have been published by the Historian.
/// continuosly broadcast blobs of entries out /// continuosly broadcast blobs of entries out
fn broadcast_entries<W: Write>( fn write_and_send_entries<W: Write>(
&self, &self,
broadcast: &streamer::BlobSender, broadcast: &streamer::BlobSender,
blob_recycler: &packet::BlobRecycler, blob_recycler: &packet::BlobRecycler,
@ -96,7 +96,7 @@ impl Tpu {
Ok(()) Ok(())
} }
pub fn broadcast_service<W: Write + Send + 'static>( pub fn write_service<W: Write + Send + 'static>(
obj: SharedTpu, obj: SharedTpu,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
broadcast: streamer::BlobSender, broadcast: streamer::BlobSender,
@ -104,7 +104,7 @@ impl Tpu {
writer: Mutex<W>, writer: Mutex<W>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
spawn(move || loop { spawn(move || loop {
let _ = obj.broadcast_entries(&broadcast, &blob_recycler, &writer); let _ = obj.write_and_send_entries(&broadcast, &blob_recycler, &writer);
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
info!("broadcat_service exiting"); info!("broadcat_service exiting");
break; break;
@ -119,11 +119,11 @@ impl Tpu {
Ok(()) Ok(())
} }
pub fn write_service(obj: SharedTpu, exit: Arc<AtomicBool>) -> JoinHandle<()> { pub fn drain_service(obj: SharedTpu, exit: Arc<AtomicBool>) -> JoinHandle<()> {
spawn(move || loop { spawn(move || loop {
let _ = obj.drain_entries(); let _ = obj.drain_entries();
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
info!("write_service exiting"); info!("drain_service exiting");
break; break;
} }
}) })
@ -176,24 +176,29 @@ impl Tpu {
Ok(()) Ok(())
} }
/// Process verified blobs, already in order fn verifier_service(
/// Respond with a signed hash of the state exit: Arc<AtomicBool>,
fn replicate_state( packets_receiver: Arc<Mutex<streamer::PacketReceiver>>,
obj: &Tpu, verified_sender: Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
verified_receiver: &streamer::BlobReceiver, ) -> JoinHandle<()> {
blob_recycler: &packet::BlobRecycler, spawn(move || loop {
) -> Result<()> { let e = Self::verifier(&packets_receiver.clone(), &verified_sender.clone());
let timer = Duration::new(1, 0); if e.is_err() && exit.load(Ordering::Relaxed) {
let blobs = verified_receiver.recv_timeout(timer)?; break;
trace!("replicating blobs {}", blobs.len());
let entries = ledger::reconstruct_entries_from_blobs(&blobs);
obj.accounting_stage
.accountant
.process_verified_entries(entries)?;
for blob in blobs {
blob_recycler.recycle(blob);
} }
Ok(()) })
}
fn verifier_services(
exit: Arc<AtomicBool>,
packets_receiver: streamer::PacketReceiver,
verified_sender: Sender<Vec<(SharedPackets, Vec<u8>)>>,
) -> Vec<JoinHandle<()>> {
let sender = Arc::new(Mutex::new(verified_sender));
let receiver = Arc::new(Mutex::new(packets_receiver));
(0..4)
.map(|_| Self::verifier_service(exit.clone(), receiver.clone(), sender.clone()))
.collect()
} }
fn thin_client_service( fn thin_client_service(
@ -220,31 +225,6 @@ impl Tpu {
}) })
} }
fn verifier_service(
exit: Arc<AtomicBool>,
packets_receiver: Arc<Mutex<streamer::PacketReceiver>>,
verified_sender: Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
) -> JoinHandle<()> {
spawn(move || loop {
let e = Self::verifier(&packets_receiver.clone(), &verified_sender.clone());
if e.is_err() && exit.load(Ordering::Relaxed) {
break;
}
})
}
fn verifier_services(
exit: Arc<AtomicBool>,
packets_receiver: streamer::PacketReceiver,
verified_sender: Sender<Vec<(SharedPackets, Vec<u8>)>>,
) -> Vec<JoinHandle<()>> {
let sender = Arc::new(Mutex::new(verified_sender));
let receiver = Arc::new(Mutex::new(packets_receiver));
(0..4)
.map(|_| Self::verifier_service(exit.clone(), receiver.clone(), sender.clone()))
.collect()
}
/// Create a UDP microservice that forwards messages the given Tpu. /// Create a UDP microservice that forwards messages the given Tpu.
/// This service is the network leader /// This service is the network leader
/// Set `exit` to shutdown its threads. /// Set `exit` to shutdown its threads.
@ -264,11 +244,8 @@ impl Tpu {
// make sure we are on the same interface // make sure we are on the same interface
let mut local = requests_socket.local_addr()?; let mut local = requests_socket.local_addr()?;
local.set_port(0); local.set_port(0);
let respond_socket = UdpSocket::bind(local.clone())?;
let packet_recycler = packet::PacketRecycler::default(); let packet_recycler = packet::PacketRecycler::default();
let blob_recycler = packet::BlobRecycler::default();
let (packet_sender, packet_receiver) = channel(); let (packet_sender, packet_receiver) = channel();
let t_receiver = streamer::receiver( let t_receiver = streamer::receiver(
requests_socket, requests_socket,
@ -281,8 +258,19 @@ impl Tpu {
let verify_threads: Vec<_> = let verify_threads: Vec<_> =
Self::verifier_services(exit.clone(), packet_receiver, verified_sender); Self::verifier_services(exit.clone(), packet_receiver, verified_sender);
let blob_recycler = packet::BlobRecycler::default();
let (responder_sender, responder_receiver) = channel();
let t_thin_client = Self::thin_client_service(
obj.clone(),
exit.clone(),
verified_receiver,
responder_sender,
packet_recycler.clone(),
blob_recycler.clone(),
);
let (broadcast_sender, broadcast_receiver) = channel(); let (broadcast_sender, broadcast_receiver) = channel();
let t_sync = Self::broadcast_service( let t_write = Self::write_service(
obj.clone(), obj.clone(),
exit.clone(), exit.clone(),
broadcast_sender, broadcast_sender,
@ -299,7 +287,7 @@ impl Tpu {
broadcast_receiver, broadcast_receiver,
); );
let (responder_sender, responder_receiver) = channel(); let respond_socket = UdpSocket::bind(local.clone())?;
let t_responder = streamer::responder( let t_responder = streamer::responder(
respond_socket, respond_socket,
exit.clone(), exit.clone(),
@ -307,20 +295,11 @@ impl Tpu {
responder_receiver, responder_receiver,
); );
let t_thin_client = Self::thin_client_service(
obj.clone(),
exit.clone(),
verified_receiver,
responder_sender,
packet_recycler.clone(),
blob_recycler.clone(),
);
let mut threads = vec![ let mut threads = vec![
t_receiver, t_receiver,
t_responder, t_responder,
t_thin_client, t_thin_client,
t_sync, t_write,
t_gossip, t_gossip,
t_listen, t_listen,
t_broadcast, t_broadcast,
@ -329,6 +308,26 @@ impl Tpu {
Ok(threads) Ok(threads)
} }
/// Process verified blobs, already in order
/// Respond with a signed hash of the state
fn replicate_state(
obj: &Tpu,
verified_receiver: &streamer::BlobReceiver,
blob_recycler: &packet::BlobRecycler,
) -> Result<()> {
let timer = Duration::new(1, 0);
let blobs = verified_receiver.recv_timeout(timer)?;
trace!("replicating blobs {}", blobs.len());
let entries = ledger::reconstruct_entries_from_blobs(&blobs);
obj.accounting_stage
.accountant
.process_verified_entries(entries)?;
for blob in blobs {
blob_recycler.recycle(blob);
}
Ok(())
}
/// This service receives messages from a leader in the network and processes the transactions /// This service receives messages from a leader in the network and processes the transactions
/// on the accountant state. /// on the accountant state.
/// # Arguments /// # Arguments
@ -438,7 +437,7 @@ impl Tpu {
let verify_threads: Vec<_> = let verify_threads: Vec<_> =
Self::verifier_services(exit.clone(), packet_receiver, verified_sender); Self::verifier_services(exit.clone(), packet_receiver, verified_sender);
let t_sync = Self::write_service(obj.clone(), exit.clone()); let t_write = Self::drain_service(obj.clone(), exit.clone());
let t_thin_client = Self::thin_client_service( let t_thin_client = Self::thin_client_service(
obj.clone(), obj.clone(),
@ -461,7 +460,7 @@ impl Tpu {
t_packet_receiver, t_packet_receiver,
t_responder, t_responder,
t_thin_client, t_thin_client,
t_sync, t_write,
]; ];
threads.extend(verify_threads.into_iter()); threads.extend(verify_threads.into_iter());
Ok(threads) Ok(threads)