diff --git a/src/cluster_info.rs b/src/cluster_info.rs index cbbd335ad3..c208eb27c2 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -1014,8 +1014,8 @@ impl ClusterInfo { #[derive(Debug)] pub struct Sockets { pub gossip: UdpSocket, - pub replicate: Vec, - pub transaction: Vec, + pub tvu: Vec, + pub tpu: Vec, pub broadcast: UdpSocket, pub repair: UdpSocket, pub retransmit: UdpSocket, @@ -1033,9 +1033,9 @@ impl Node { Self::new_localhost_with_pubkey(pubkey) } pub fn new_localhost_with_pubkey(pubkey: Pubkey) -> Self { - let transaction = UdpSocket::bind("127.0.0.1:0").unwrap(); + let tpu = UdpSocket::bind("127.0.0.1:0").unwrap(); let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); - let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); + let tvu = UdpSocket::bind("127.0.0.1:0").unwrap(); let repair = UdpSocket::bind("127.0.0.1:0").unwrap(); let rpc_port = find_available_port_in_range((1024, 65535)).unwrap(); let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_port); @@ -1049,8 +1049,8 @@ impl Node { let info = NodeInfo::new( pubkey, gossip.local_addr().unwrap(), - replicate.local_addr().unwrap(), - transaction.local_addr().unwrap(), + tvu.local_addr().unwrap(), + tpu.local_addr().unwrap(), storage.local_addr().unwrap(), rpc_addr, rpc_pubsub_addr, @@ -1060,8 +1060,8 @@ impl Node { info, sockets: Sockets { gossip, - replicate: vec![replicate], - transaction: vec![transaction], + tvu: vec![tvu], + tpu: vec![tpu], broadcast, repair, retransmit, @@ -1082,10 +1082,10 @@ impl Node { bind() }; - let (replicate_port, replicate_sockets) = + let (tvu_port, tvu_sockets) = multi_bind_in_range(FULLNODE_PORT_RANGE, 8).expect("tvu multi_bind"); - let (transaction_port, transaction_sockets) = + let (tpu_port, tpu_sockets) = multi_bind_in_range(FULLNODE_PORT_RANGE, 32).expect("tpu multi_bind"); let (_, repair) = bind(); @@ -1096,8 +1096,8 @@ impl Node { let info = NodeInfo::new( pubkey, SocketAddr::new(gossip_addr.ip(), gossip_port), - SocketAddr::new(gossip_addr.ip(), replicate_port), - SocketAddr::new(gossip_addr.ip(), transaction_port), + SocketAddr::new(gossip_addr.ip(), tvu_port), + SocketAddr::new(gossip_addr.ip(), tpu_port), SocketAddr::new(gossip_addr.ip(), storage_port), SocketAddr::new(gossip_addr.ip(), RPC_PORT), SocketAddr::new(gossip_addr.ip(), RPC_PORT + 1), @@ -1109,8 +1109,8 @@ impl Node { info, sockets: Sockets { gossip, - replicate: replicate_sockets, - transaction: transaction_sockets, + tvu: tvu_sockets, + tpu: tpu_sockets, broadcast, repair, retransmit, @@ -1384,28 +1384,28 @@ mod tests { let ip = Ipv4Addr::from(0); let node = Node::new_with_external_ip(Keypair::new().pubkey(), &socketaddr!(ip, 0)); assert_eq!(node.sockets.gossip.local_addr().unwrap().ip(), ip); - assert!(node.sockets.replicate.len() > 1); - for tx_socket in node.sockets.replicate.iter() { + assert!(node.sockets.tvu.len() > 1); + for tx_socket in node.sockets.tvu.iter() { assert_eq!(tx_socket.local_addr().unwrap().ip(), ip); } - assert!(node.sockets.transaction.len() > 1); - for tx_socket in node.sockets.transaction.iter() { + assert!(node.sockets.tpu.len() > 1); + for tx_socket in node.sockets.tpu.iter() { assert_eq!(tx_socket.local_addr().unwrap().ip(), ip); } assert_eq!(node.sockets.repair.local_addr().unwrap().ip(), ip); assert!(node.sockets.gossip.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); assert!(node.sockets.gossip.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); - let tx_port = node.sockets.replicate[0].local_addr().unwrap().port(); + let tx_port = node.sockets.tvu[0].local_addr().unwrap().port(); assert!(tx_port >= FULLNODE_PORT_RANGE.0); assert!(tx_port < FULLNODE_PORT_RANGE.1); - for tx_socket in node.sockets.replicate.iter() { + for tx_socket in node.sockets.tvu.iter() { assert_eq!(tx_socket.local_addr().unwrap().port(), tx_port); } - let tx_port = node.sockets.transaction[0].local_addr().unwrap().port(); + let tx_port = node.sockets.tpu[0].local_addr().unwrap().port(); assert!(tx_port >= FULLNODE_PORT_RANGE.0); assert!(tx_port < FULLNODE_PORT_RANGE.1); - for tx_socket in node.sockets.transaction.iter() { + for tx_socket in node.sockets.tpu.iter() { assert_eq!(tx_socket.local_addr().unwrap().port(), tx_port); } assert!(node.sockets.repair.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); @@ -1417,27 +1417,27 @@ mod tests { let ip = IpAddr::V4(Ipv4Addr::from(0)); let node = Node::new_with_external_ip(Keypair::new().pubkey(), &socketaddr!(0, 8050)); assert_eq!(node.sockets.gossip.local_addr().unwrap().ip(), ip); - assert!(node.sockets.replicate.len() > 1); - for tx_socket in node.sockets.replicate.iter() { + assert!(node.sockets.tvu.len() > 1); + for tx_socket in node.sockets.tvu.iter() { assert_eq!(tx_socket.local_addr().unwrap().ip(), ip); } - assert!(node.sockets.transaction.len() > 1); - for tx_socket in node.sockets.transaction.iter() { + assert!(node.sockets.tpu.len() > 1); + for tx_socket in node.sockets.tpu.iter() { assert_eq!(tx_socket.local_addr().unwrap().ip(), ip); } assert_eq!(node.sockets.repair.local_addr().unwrap().ip(), ip); assert_eq!(node.sockets.gossip.local_addr().unwrap().port(), 8050); - let tx_port = node.sockets.replicate[0].local_addr().unwrap().port(); + let tx_port = node.sockets.tvu[0].local_addr().unwrap().port(); assert!(tx_port >= FULLNODE_PORT_RANGE.0); assert!(tx_port < FULLNODE_PORT_RANGE.1); - for tx_socket in node.sockets.replicate.iter() { + for tx_socket in node.sockets.tvu.iter() { assert_eq!(tx_socket.local_addr().unwrap().port(), tx_port); } - let tx_port = node.sockets.transaction[0].local_addr().unwrap().port(); + let tx_port = node.sockets.tpu[0].local_addr().unwrap().port(); assert!(tx_port >= FULLNODE_PORT_RANGE.0); assert!(tx_port < FULLNODE_PORT_RANGE.1); - for tx_socket in node.sockets.transaction.iter() { + for tx_socket in node.sockets.tpu.iter() { assert_eq!(tx_socket.local_addr().unwrap().port(), tx_port); } assert!(node.sockets.repair.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); diff --git a/src/contact_info.rs b/src/contact_info.rs index 5e12f04d5e..6eb3da4bae 100644 --- a/src/contact_info.rs +++ b/src/contact_info.rs @@ -119,16 +119,16 @@ impl ContactInfo { nxt_addr } pub fn new_with_pubkey_socketaddr(pubkey: Pubkey, bind_addr: &SocketAddr) -> Self { - let transactions_addr = *bind_addr; + let tpu_addr = *bind_addr; let gossip_addr = Self::next_port(&bind_addr, 1); - let replicate_addr = Self::next_port(&bind_addr, 2); + let tvu_addr = Self::next_port(&bind_addr, 2); let rpc_addr = SocketAddr::new(bind_addr.ip(), RPC_PORT); let rpc_pubsub_addr = SocketAddr::new(bind_addr.ip(), RPC_PORT + 1); ContactInfo::new( pubkey, gossip_addr, - replicate_addr, - transactions_addr, + tvu_addr, + tpu_addr, "0.0.0.0:0".parse().unwrap(), rpc_addr, rpc_pubsub_addr, @@ -267,7 +267,7 @@ mod tests { assert!(ci.storage_addr.ip().is_unspecified()); } #[test] - fn replicated_data_new_with_socketaddr_with_pubkey() { + fn replayed_data_new_with_socketaddr_with_pubkey() { let keypair = Keypair::new(); let d1 = ContactInfo::new_with_pubkey_socketaddr( keypair.pubkey().clone(), diff --git a/src/db_window.rs b/src/db_window.rs index 074f31f93b..c1c6859a31 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -54,12 +54,12 @@ pub fn repair( // In the case that we are not in the current scope of the leader schedule // window then either: // - // 1) The replicate stage hasn't caught up to the "consumed" entries we sent, + // 1) The replay stage hasn't caught up to the "consumed" entries we sent, // in which case it will eventually catch up // // 2) We are on the border between seed_rotation_intervals, so the // schedule won't be known until the entry on that cusp is received - // by the replicate stage (which comes after this stage). Hence, the next + // by the replay stage (which comes after this stage). Hence, the next // leader at the beginning of that next epoch will not know they are the // leader until they receive that last "cusp" entry. The leader also won't ask for repairs // for that entry because "is_next_leader" won't be set here. In this case, diff --git a/src/fullnode.rs b/src/fullnode.rs index ad8cbd99e9..c52286886d 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -100,10 +100,10 @@ pub struct Fullnode { ledger_path: String, sigverify_disabled: bool, shared_window: SharedWindow, - replicate_socket: Vec, + tvu_sockets: Vec, repair_socket: UdpSocket, retransmit_socket: UdpSocket, - transaction_sockets: Vec, + tpu_sockets: Vec, broadcast_socket: UdpSocket, rpc_addr: SocketAddr, rpc_pubsub_addr: SocketAddr, @@ -277,9 +277,9 @@ impl Fullnode { *last_entry_id, cluster_info.clone(), node.sockets - .replicate + .tvu .iter() - .map(|s| s.try_clone().expect("Failed to clone replicate sockets")) + .map(|s| s.try_clone().expect("Failed to clone TVU sockets")) .collect(), node.sockets .repair @@ -294,9 +294,9 @@ impl Fullnode { ); let tpu_forwarder = TpuForwarder::new( node.sockets - .transaction + .tpu .iter() - .map(|s| s.try_clone().expect("Failed to clone transaction sockets")) + .map(|s| s.try_clone().expect("Failed to clone TPU sockets")) .collect(), cluster_info.clone(), ); @@ -314,9 +314,9 @@ impl Fullnode { &bank, Default::default(), node.sockets - .transaction + .tpu .iter() - .map(|s| s.try_clone().expect("Failed to clone transaction sockets")) + .map(|s| s.try_clone().expect("Failed to clone TPU sockets")) .collect(), ledger_path, sigverify_disabled, @@ -356,10 +356,10 @@ impl Fullnode { node_role, ledger_path: ledger_path.to_owned(), exit, - replicate_socket: node.sockets.replicate, + tvu_sockets: node.sockets.tvu, repair_socket: node.sockets.repair, retransmit_socket: node.sockets.retransmit, - transaction_sockets: node.sockets.transaction, + tpu_sockets: node.sockets.tpu, broadcast_socket: node.sockets.broadcast, rpc_addr, rpc_pubsub_addr, @@ -435,9 +435,9 @@ impl Fullnode { entry_height, last_entry_id, self.cluster_info.clone(), - self.replicate_socket + self.tvu_sockets .iter() - .map(|s| s.try_clone().expect("Failed to clone replicate sockets")) + .map(|s| s.try_clone().expect("Failed to clone TVU sockets")) .collect(), self.repair_socket .try_clone() @@ -449,9 +449,9 @@ impl Fullnode { self.db_ledger.clone(), ); let tpu_forwarder = TpuForwarder::new( - self.transaction_sockets + self.tpu_sockets .iter() - .map(|s| s.try_clone().expect("Failed to clone transaction sockets")) + .map(|s| s.try_clone().expect("Failed to clone TPU sockets")) .collect(), self.cluster_info.clone(), ); @@ -476,15 +476,15 @@ impl Fullnode { let (tpu, blob_receiver, tpu_exit) = Tpu::new( &self.bank, Default::default(), - self.transaction_sockets + self.tpu_sockets .iter() - .map(|s| s.try_clone().expect("Failed to clone transaction sockets")) + .map(|s| s.try_clone().expect("Failed to clone TPU sockets")) .collect(), &self.ledger_path, self.sigverify_disabled, max_tick_height, - // We pass the last_entry_id from the replicate stage because we can't trust that - // the window didn't overwrite the slot at for the last entry that the replicate stage + // We pass the last_entry_id from the replay stage because we can't trust that + // the window didn't overwrite the slot at for the last entry that the replay stage // processed. We also want to avoid reading processing the ledger for the last id. &last_id, self.keypair.pubkey(), @@ -1017,12 +1017,8 @@ mod tests { // Send blobs to the validator from our mock leader let t_responder = { let (s_responder, r_responder) = channel(); - let blob_sockets: Vec> = leader_node - .sockets - .replicate - .into_iter() - .map(Arc::new) - .collect(); + let blob_sockets: Vec> = + leader_node.sockets.tvu.into_iter().map(Arc::new).collect(); let t_responder = responder( "test_validator_to_leader_transition", diff --git a/src/lib.rs b/src/lib.rs index 528703c06d..44072b4304 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,7 +50,7 @@ pub mod poh; pub mod poh_recorder; pub mod poh_service; pub mod recvmmsg; -pub mod replicate_stage; +pub mod replay_stage; pub mod replicator; pub mod result; pub mod retransmit_stage; diff --git a/src/replicate_stage.rs b/src/replay_stage.rs similarity index 89% rename from src/replicate_stage.rs rename to src/replay_stage.rs index c2b1111f23..f425768325 100644 --- a/src/replicate_stage.rs +++ b/src/replay_stage.rs @@ -1,4 +1,4 @@ -//! The `replicate_stage` replicates transactions broadcast by the leader. +//! The `replay_stage` replays transactions broadcast by the leader. use crate::bank::Bank; use crate::cluster_info::ClusterInfo; @@ -26,11 +26,11 @@ use std::time::Duration; use std::time::Instant; #[derive(Debug, PartialEq, Eq, Clone)] -pub enum ReplicateStageReturnType { +pub enum ReplayStageReturnType { LeaderRotation(u64, u64, Hash), } -// Implement a destructor for the ReplicateStage thread to signal it exited +// Implement a destructor for the ReplayStage thread to signal it exited // even on panics struct Finalizer { exit_sender: Arc, @@ -48,14 +48,14 @@ impl Drop for Finalizer { } } -pub struct ReplicateStage { +pub struct ReplayStage { t_responder: JoinHandle<()>, - t_replicate: JoinHandle>, + t_replay: JoinHandle>, } -impl ReplicateStage { +impl ReplayStage { /// Process entry blobs, already in order - fn replicate_requests( + fn process_entries( bank: &Arc, cluster_info: &Arc>, window_receiver: &EntryReceiver, @@ -74,7 +74,7 @@ impl ReplicateStage { } submit( - influxdb::Point::new("replicate-stage") + influxdb::Point::new("replay-stage") .add_field("count", influxdb::Value::Integer(entries.len() as i64)) .to_owned(), ); @@ -83,11 +83,11 @@ impl ReplicateStage { let mut num_entries_to_write = entries.len(); let now = Instant::now(); if !entries.as_slice().verify(last_entry_id) { - inc_new_counter_info!("replicate_stage-verify-fail", entries.len()); + inc_new_counter_info!("replay_stage-verify-fail", entries.len()); return Err(Error::BlobError(BlobError::VerificationFailed)); } inc_new_counter_info!( - "replicate_stage-verify-duration", + "replay_stage-verify-duration", duration_as_ms(&now.elapsed()) as usize ); let (current_leader, _) = bank @@ -128,7 +128,7 @@ impl ReplicateStage { .id; inc_new_counter_info!( - "replicate-transactions", + "replay-transactions", entries.iter().map(|x| x.transactions.len()).sum() ); @@ -164,12 +164,12 @@ impl ReplicateStage { let (vote_blob_sender, vote_blob_receiver) = channel(); let (ledger_entry_sender, ledger_entry_receiver) = channel(); let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); - let t_responder = responder("replicate_stage", Arc::new(send), vote_blob_receiver); + let t_responder = responder("replay_stage", Arc::new(send), vote_blob_receiver); let keypair = Arc::new(keypair); - let t_replicate = Builder::new() - .name("solana-replicate-stage".to_string()) + let t_replay = Builder::new() + .name("solana-replay-stage".to_string()) .spawn(move || { let _exit = Finalizer::new(exit); let now = Instant::now(); @@ -182,7 +182,7 @@ impl ReplicateStage { .expect("Scheduled leader id should never be unknown at this point"); if leader_id == keypair.pubkey() { - return Some(ReplicateStageReturnType::LeaderRotation( + return Some(ReplayStageReturnType::LeaderRotation( bank.tick_height(), entry_height_, // We should never start the TPU / this stage on an exact entry that causes leader @@ -201,7 +201,7 @@ impl ReplicateStage { None }; - match Self::replicate_requests( + match Self::process_entries( &bank, &cluster_info, &window_receiver, @@ -226,19 +226,19 @@ impl ReplicateStage { ( Self { t_responder, - t_replicate, + t_replay, }, ledger_entry_receiver, ) } } -impl Service for ReplicateStage { - type JoinReturnType = Option; +impl Service for ReplayStage { + type JoinReturnType = Option; - fn join(self) -> thread::Result> { + fn join(self) -> thread::Result> { self.t_responder.join()?; - self.t_replicate.join() + self.t_replay.join() } } @@ -254,7 +254,7 @@ mod test { use crate::ledger::{create_ticks, create_tmp_sample_ledger, LedgerWriter}; use crate::logger; use crate::packet::BlobError; - use crate::replicate_stage::{ReplicateStage, ReplicateStageReturnType}; + use crate::replay_stage::{ReplayStage, ReplayStageReturnType}; use crate::result::Error; use crate::service::Service; use crate::vote_stage::{send_validator_vote, VoteError}; @@ -266,10 +266,10 @@ mod test { use std::sync::{Arc, RwLock}; #[test] - pub fn test_replicate_stage_leader_rotation_exit() { + pub fn test_replay_stage_leader_rotation_exit() { logger::setup(); - // Set up dummy node to host a ReplicateStage + // Set up dummy node to host a ReplayStage let my_keypair = Keypair::new(); let my_id = my_keypair.pubkey(); let my_node = Node::new_localhost_with_pubkey(my_id); @@ -281,7 +281,7 @@ mod test { // Create a ledger let num_ending_ticks = 1; let (mint, my_ledger_path, genesis_entries) = create_tmp_sample_ledger( - "test_replicate_stage_leader_rotation_exit", + "test_replay_stage_leader_rotation_exit", 10_000, num_ending_ticks, old_leader_id, @@ -327,10 +327,10 @@ mod test { let (bank, _, last_entry_id) = Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler); - // Set up the replicate stage + // Set up the replay stage let (entry_sender, entry_receiver) = channel(); let exit = Arc::new(AtomicBool::new(false)); - let (replicate_stage, ledger_writer_recv) = ReplicateStage::new( + let (replay_stage, ledger_writer_recv) = ReplayStage::new( Arc::new(my_keypair), Arc::new(vote_account_keypair), Arc::new(bank), @@ -363,14 +363,14 @@ mod test { let expected_last_id = entries_to_send[leader_rotation_index].id; entry_sender.send(entries_to_send.clone()).unwrap(); - // Wait for replicate_stage to exit and check return value is correct + // Wait for replay_stage to exit and check return value is correct assert_eq!( - Some(ReplicateStageReturnType::LeaderRotation( + Some(ReplayStageReturnType::LeaderRotation( bootstrap_height, expected_entry_height, expected_last_id, )), - replicate_stage.join().expect("replicate stage join") + replay_stage.join().expect("replay stage join") ); // Check that the entries on the ledger writer channel are correct @@ -389,8 +389,8 @@ mod test { } #[test] - fn test_vote_error_replicate_stage_correctness() { - // Set up dummy node to host a ReplicateStage + fn test_vote_error_replay_stage_correctness() { + // Set up dummy node to host a ReplayStage let my_keypair = Keypair::new(); let my_id = my_keypair.pubkey(); let my_node = Node::new_localhost_with_pubkey(my_id); @@ -401,7 +401,7 @@ mod test { let num_ending_ticks = 0; let (_, my_ledger_path, genesis_entries) = create_tmp_sample_ledger( - "test_vote_error_replicate_stage_correctness", + "test_vote_error_replay_stage_correctness", 10_000, num_ending_ticks, leader_id, @@ -417,12 +417,12 @@ mod test { // Set up the cluster info let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone()))); - // Set up the replicate stage + // Set up the replay stage let vote_account_keypair = Arc::new(Keypair::new()); let bank = Arc::new(bank); let (entry_sender, entry_receiver) = channel(); let exit = Arc::new(AtomicBool::new(false)); - let (replicate_stage, ledger_writer_recv) = ReplicateStage::new( + let (replay_stage, ledger_writer_recv) = ReplayStage::new( Arc::new(my_keypair), vote_account_keypair.clone(), bank.clone(), @@ -444,7 +444,7 @@ mod test { panic!("Expected validator vote to fail with LeaderInfoNotFound"); } - // Send ReplicateStage an entry, should see it on the ledger writer receiver + // Send ReplayStage an entry, should see it on the ledger writer receiver let next_tick = create_ticks( 1, genesis_entries @@ -454,22 +454,22 @@ mod test { ); entry_sender .send(next_tick.clone()) - .expect("Error sending entry to ReplicateStage"); + .expect("Error sending entry to ReplayStage"); let received_tick = ledger_writer_recv .recv() .expect("Expected to recieve an entry on the ledger writer receiver"); assert_eq!(next_tick, received_tick); drop(entry_sender); - replicate_stage + replay_stage .join() - .expect("Expect successful ReplicateStage exit"); + .expect("Expect successful ReplayStage exit"); let _ignored = remove_dir_all(&my_ledger_path); } #[test] - fn test_vote_error_replicate_stage_leader_rotation() { - // Set up dummy node to host a ReplicateStage + fn test_vote_error_replay_stage_leader_rotation() { + // Set up dummy node to host a ReplayStage let my_keypair = Keypair::new(); let my_id = my_keypair.pubkey(); let my_node = Node::new_localhost_with_pubkey(my_id); @@ -479,7 +479,7 @@ mod test { // Create the ledger let (mint, my_ledger_path, genesis_entries) = create_tmp_sample_ledger( - "test_vote_error_replicate_stage_leader_rotation", + "test_vote_error_replay_stage_leader_rotation", 10_000, 0, leader_id, @@ -529,12 +529,12 @@ mod test { // Set up the cluster info let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone()))); - // Set up the replicate stage + // Set up the replay stage let vote_account_keypair = Arc::new(vote_account_keypair); let bank = Arc::new(bank); let (entry_sender, entry_receiver) = channel(); let exit = Arc::new(AtomicBool::new(false)); - let (replicate_stage, ledger_writer_recv) = ReplicateStage::new( + let (replay_stage, ledger_writer_recv) = ReplayStage::new( Arc::new(my_keypair), vote_account_keypair.clone(), bank.clone(), @@ -571,7 +571,7 @@ mod test { last_id = entry.id; entry_sender .send(vec![entry.clone()]) - .expect("Expected to be able to send entry to ReplicateStage"); + .expect("Expected to be able to send entry to ReplayStage"); // Check that the entries on the ledger writer channel are correct let received_entry = ledger_writer_recv .recv() @@ -585,14 +585,14 @@ mod test { assert_ne!(expected_last_id, Hash::default()); - // Wait for replicate_stage to exit and check return value is correct + // Wait for replay_stage to exit and check return value is correct assert_eq!( - Some(ReplicateStageReturnType::LeaderRotation( + Some(ReplayStageReturnType::LeaderRotation( bootstrap_height, expected_entry_height, expected_last_id, )), - replicate_stage.join().expect("replicate stage join") + replay_stage.join().expect("replay stage join") ); assert_eq!(exit.load(Ordering::Relaxed), true); @@ -600,8 +600,8 @@ mod test { } #[test] - fn test_replicate_stage_poh_error_entry_receiver() { - // Set up dummy node to host a ReplicateStage + fn test_replay_stage_poh_error_entry_receiver() { + // Set up dummy node to host a ReplayStage let my_keypair = Keypair::new(); let my_id = my_keypair.pubkey(); let vote_keypair = Keypair::new(); @@ -615,7 +615,7 @@ mod test { let old_leader_id = Keypair::new().pubkey(); let (_, my_ledger_path, _) = create_tmp_sample_ledger( - "test_replicate_stage_leader_rotation_exit", + "test_replay_stage_leader_rotation_exit", 10_000, 0, old_leader_id, @@ -634,7 +634,7 @@ mod test { .send(entries.clone()) .expect("Expected to err out"); - let res = ReplicateStage::replicate_requests( + let res = ReplayStage::process_entries( &Arc::new(Bank::default()), &cluster_info_me, &entry_receiver, @@ -660,7 +660,7 @@ mod test { .send(entries.clone()) .expect("Expected to err out"); - let res = ReplicateStage::replicate_requests( + let res = ReplayStage::process_entries( &Arc::new(Bank::default()), &cluster_info_me, &entry_receiver, diff --git a/src/replicator.rs b/src/replicator.rs index b135b8ceb4..4252f9f099 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -164,7 +164,7 @@ impl Replicator { let repair_socket = Arc::new(node.sockets.repair); let mut blob_sockets: Vec> = - node.sockets.replicate.into_iter().map(Arc::new).collect(); + node.sockets.tvu.into_iter().map(Arc::new).collect(); blob_sockets.push(repair_socket.clone()); let (fetch_stage, blob_fetch_receiver) = BlobFetchStage::new_multi_socket(blob_sockets, exit.clone()); diff --git a/src/tvu.rs b/src/tvu.rs index 60d2671291..0ccd261e8c 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -1,21 +1,25 @@ //! The `tvu` module implements the Transaction Validation Unit, a -//! 3-stage transaction validation pipeline in software. +//! 5-stage transaction validation pipeline in software. //! -//! 1. Fetch Stage -//! - Incoming blobs are picked up from the replicate socket and repair socket. -//! 2. SharedWindow Stage +//! 1. BlobFetchStage +//! - Incoming blobs are picked up from the TVU sockets and repair socket. +//! 2. RetransmitStage //! - Blobs are windowed until a contiguous chunk is available. This stage also repairs and //! retransmits blobs that are in the queue. -//! 3. Replicate Stage +//! 3. ReplayStage //! - Transactions in blobs are processed and applied to the bank. //! - TODO We need to verify the signatures in the blobs. +//! 4. LedgerWriteStage +//! - Write the replayed ledger to disk. +//! 5. StorageStage +//! - Generating the keys used to encrypt the ledger and sample it for storage mining. use crate::bank::Bank; use crate::blob_fetch_stage::BlobFetchStage; use crate::cluster_info::ClusterInfo; use crate::db_ledger::DbLedger; use crate::ledger_write_stage::LedgerWriteStage; -use crate::replicate_stage::{ReplicateStage, ReplicateStageReturnType}; +use crate::replay_stage::{ReplayStage, ReplayStageReturnType}; use crate::retransmit_stage::RetransmitStage; use crate::service::Service; use crate::storage_stage::StorageStage; @@ -32,9 +36,9 @@ pub enum TvuReturnType { } pub struct Tvu { - replicate_stage: ReplicateStage, fetch_stage: BlobFetchStage, retransmit_stage: RetransmitStage, + replay_stage: ReplayStage, ledger_write_stage: LedgerWriteStage, storage_stage: StorageStage, exit: Arc, @@ -50,7 +54,7 @@ impl Tvu { /// * `entry_height` - Initial ledger height /// * `cluster_info` - The cluster_info state. /// * `window` - The window state. - /// * `replicate_socket` - my replicate socket + /// * `fetch_sockets` - my fetch sockets /// * `repair_socket` - my repair socket /// * `retransmit_socket` - my retransmit socket /// * `ledger_path` - path to the ledger file @@ -62,7 +66,7 @@ impl Tvu { entry_height: u64, last_entry_id: Hash, cluster_info: Arc>, - replicate_sockets: Vec, + fetch_sockets: Vec, repair_socket: UdpSocket, retransmit_socket: UdpSocket, ledger_path: Option<&str>, @@ -72,7 +76,7 @@ impl Tvu { let repair_socket = Arc::new(repair_socket); let mut blob_sockets: Vec> = - replicate_sockets.into_iter().map(Arc::new).collect(); + fetch_sockets.into_iter().map(Arc::new).collect(); blob_sockets.push(repair_socket.clone()); let (fetch_stage, blob_fetch_receiver) = BlobFetchStage::new_multi_socket(blob_sockets, exit.clone()); @@ -91,7 +95,7 @@ impl Tvu { bank.leader_scheduler.clone(), ); - let (replicate_stage, ledger_entry_receiver) = ReplicateStage::new( + let (replay_stage, ledger_entry_receiver) = ReplayStage::new( keypair.clone(), vote_account_keypair, bank.clone(), @@ -115,9 +119,9 @@ impl Tvu { ); Tvu { - replicate_stage, fetch_stage, retransmit_stage, + replay_stage, ledger_write_stage, storage_stage, exit, @@ -146,8 +150,8 @@ impl Service for Tvu { self.fetch_stage.join()?; self.ledger_write_stage.join()?; self.storage_stage.join()?; - match self.replicate_stage.join()? { - Some(ReplicateStageReturnType::LeaderRotation( + match self.replay_stage.join()? { + Some(ReplayStageReturnType::LeaderRotation( tick_height, entry_height, last_entry_id, @@ -200,10 +204,10 @@ pub mod tests { (gossip_service, window) } - /// Test that message sent from leader to target1 and replicated to target2 + /// Test that message sent from leader to target1 and replayed to target2 #[test] #[ignore] - fn test_replicate() { + fn test_replay() { logger::setup(); let leader = Node::new_localhost(); let target1_keypair = Keypair::new(); @@ -230,26 +234,22 @@ pub mod tests { // to simulate the source peer and get blobs out of the socket to // simulate target peer let (s_reader, r_reader) = channel(); - let blob_sockets: Vec> = target2 - .sockets - .replicate - .into_iter() - .map(Arc::new) - .collect(); + let blob_sockets: Vec> = + target2.sockets.tvu.into_iter().map(Arc::new).collect(); let t_receiver = streamer::blob_receiver(blob_sockets[0].clone(), exit.clone(), s_reader); // simulate leader sending messages let (s_responder, r_responder) = channel(); let t_responder = streamer::responder( - "test_replicate", + "test_replay", Arc::new(leader.sockets.retransmit), r_responder, ); let starting_balance = 10_000; let mint = Mint::new(starting_balance); - let replicate_addr = target1.info.tvu; + let tvu_addr = target1.info.tvu; let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( leader_id, ))); @@ -266,7 +266,7 @@ pub mod tests { let vote_account_keypair = Arc::new(Keypair::new()); let mut cur_hash = Hash::default(); - let db_ledger_path = get_tmp_ledger_path("test_replicate"); + let db_ledger_path = get_tmp_ledger_path("test_replay"); let db_ledger = DbLedger::open(&db_ledger_path).expect("Expected to successfully open ledger"); let tvu = Tvu::new( @@ -276,7 +276,7 @@ pub mod tests { 0, cur_hash, cref1, - target1.sockets.replicate, + target1.sockets.tvu, target1.sockets.repair, target1.sockets.retransmit, None, @@ -324,7 +324,7 @@ pub mod tests { w.data_mut()[..serialized_entry.len()].copy_from_slice(&serialized_entry); w.set_size(serialized_entry.len()); - w.meta.set_addr(&replicate_addr); + w.meta.set_addr(&tvu_addr); } msgs.push(b); } diff --git a/src/vote_stage.rs b/src/vote_stage.rs index eb18025ed4..2a1584bc20 100644 --- a/src/vote_stage.rs +++ b/src/vote_stage.rs @@ -75,7 +75,7 @@ pub fn send_validator_vote( let last_id = bank.last_id(); let shared_blob = create_new_signed_vote_blob(&last_id, vote_account, bank, cluster_info)?; - inc_new_counter_info!("replicate-vote_sent", 1); + inc_new_counter_info!("validator-vote_sent", 1); vote_blob_sender.send(vec![shared_blob])?; Ok(()) diff --git a/src/window.rs b/src/window.rs index 68f29d197f..448c20c57f 100644 --- a/src/window.rs +++ b/src/window.rs @@ -156,12 +156,12 @@ impl WindowUtil for Window { // In the case that we are not in the current scope of the leader schedule // window then either: // - // 1) The replicate stage hasn't caught up to the "consumed" entries we sent, + // 1) The replay stage hasn't caught up to the "consumed" entries we sent, // in which case it will eventually catch up // // 2) We are on the border between seed_rotation_intervals, so the // schedule won't be known until the entry on that cusp is received - // by the replicate stage (which comes after this stage). Hence, the next + // by the replay stage (which comes after this stage). Hence, the next // leader at the beginning of that next epoch will not know they are the // leader until they receive that last "cusp" entry. The leader also won't ask for repairs // for that entry because "is_next_leader" won't be set here. In this case, diff --git a/src/window_service.rs b/src/window_service.rs index ff2a678cb8..d7313ffdea 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -295,7 +295,7 @@ mod test { let t_responder = { let (s_responder, r_responder) = channel(); let blob_sockets: Vec> = - tn.sockets.replicate.into_iter().map(Arc::new).collect(); + tn.sockets.tvu.into_iter().map(Arc::new).collect(); let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); let num_blobs_to_make = 10; @@ -365,7 +365,7 @@ mod test { let t_responder = { let (s_responder, r_responder) = channel(); let blob_sockets: Vec> = - tn.sockets.replicate.into_iter().map(Arc::new).collect(); + tn.sockets.tvu.into_iter().map(Arc::new).collect(); let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); let mut msgs = Vec::new(); for v in 0..10 { diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index dd7c11a345..81d48b5c58 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -24,7 +24,7 @@ fn test_node(exit: Arc) -> (Arc>, GossipService, let w = Arc::new(RwLock::new(vec![])); let d = GossipService::new(&c.clone(), w, None, tn.sockets.gossip, exit); let _ = c.read().unwrap().my_data(); - (c, d, tn.sockets.replicate.pop().unwrap()) + (c, d, tn.sockets.tvu.pop().unwrap()) } /// Test that the network converges. diff --git a/tests/multinode.rs b/tests/multinode.rs index 8f232737fd..467cd02186 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -1548,7 +1548,7 @@ fn test_broadcast_last_tick() { .iter_mut() .map(|(_, _, node, _)| { BlobFetchStage::new( - Arc::new(node.sockets.replicate.pop().unwrap()), + Arc::new(node.sockets.tvu.pop().unwrap()), blob_receiver_exit.clone(), ) })