From 6da7a784f2718938938c9c00fa25b2faf4e9d93f Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Tue, 29 Jan 2019 00:21:27 -0800 Subject: [PATCH] Stream entries (#2582) * Add entry streaming option * Fix tests * Remove obsolete comment * Move entry stream functionality to struct w/ trait in order to test without i/o --- fullnode/src/main.rs | 9 +++++ multinode-demo/fullnode.sh | 10 ++++- src/entry_stream.rs | 57 ++++++++++++++++++++++++++++ src/fullnode.rs | 15 ++++++++ src/lib.rs | 1 + src/replay_stage.rs | 76 ++++++++++++++++++++++++++++++++++++-- src/thin_client.rs | 1 + src/tvu.rs | 3 ++ tests/multinode.rs | 22 +++++++++++ tests/replicator.rs | 4 ++ 10 files changed, 192 insertions(+), 6 deletions(-) create mode 100644 src/entry_stream.rs diff --git a/fullnode/src/main.rs b/fullnode/src/main.rs index c081d5c7a..fe388c34e 100644 --- a/fullnode/src/main.rs +++ b/fullnode/src/main.rs @@ -130,6 +130,13 @@ fn main() { let matches = App::new("fullnode") .version(crate_version!()) + .arg( + Arg::with_name("entry_stream") + .long("entry-stream") + .takes_value(true) + .value_name("UNIX DOMAIN SOCKET") + .help("Open entry stream at this unix domain socket location") + ) .arg( Arg::with_name("identity") .short("i") @@ -227,6 +234,7 @@ fn main() { .expect("unable to allocate rpc port") }; let init_complete_file = matches.value_of("init_complete_file"); + let entry_stream = matches.value_of("entry_stream").map(|s| s.to_string()); let keypair = Arc::new(keypair); let node = Node::new_with_external_ip(keypair.pubkey(), &gossip); @@ -263,6 +271,7 @@ fn main() { .as_ref(), no_sigverify, Some(rpc_port), + entry_stream, ); if !no_signer { diff --git a/multinode-demo/fullnode.sh b/multinode-demo/fullnode.sh index dcda60a77..75bf6252b 100755 --- a/multinode-demo/fullnode.sh +++ b/multinode-demo/fullnode.sh @@ -21,13 +21,14 @@ usage() { echo fi cat < Result<()>; +} + +pub struct EntryStream { + pub socket: String, +} + +impl EntryStream { + pub fn new(socket: String) -> Self { + EntryStream { socket } + } +} + +impl EntryStreamHandler for EntryStream { + fn stream_entries(&mut self, entries: &[Entry]) -> Result<()> { + let mut socket = UnixStream::connect(Path::new(&self.socket))?; + for entry in entries { + let result = serde_json::to_string(&entry)?; + socket.write_all(result.as_bytes())?; + } + socket.shutdown(Shutdown::Write)?; + Ok(()) + } +} + +pub struct MockEntryStream { + pub socket: Vec, +} + +impl MockEntryStream { + #[allow(clippy::needless_pass_by_value)] + pub fn new(_socket: String) -> Self { + MockEntryStream { socket: Vec::new() } + } +} + +impl EntryStreamHandler for MockEntryStream { + fn stream_entries(&mut self, entries: &[Entry]) -> Result<()> { + for entry in entries { + let result = serde_json::to_string(&entry)?; + self.socket.push(result); + } + Ok(()) + } +} diff --git a/src/fullnode.rs b/src/fullnode.rs index 6ace8624a..8f87c1b47 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -95,6 +95,7 @@ impl Fullnode { entrypoint_info_option: Option<&NodeInfo>, sigverify_disabled: bool, rpc_port: Option, + entry_stream: Option, ) -> Self { Self::new_with_storage_rotate( node, @@ -106,9 +107,11 @@ impl Fullnode { leader_scheduler, rpc_port, NUM_HASHES_FOR_STORAGE_ROTATE, + entry_stream, ) } + #[allow(clippy::too_many_arguments)] pub fn new_with_storage_rotate( node: Node, keypair: Arc, @@ -119,6 +122,7 @@ impl Fullnode { leader_scheduler: Arc>, rpc_port: Option, storage_rotate_count: u64, + entry_stream: Option, ) -> Self { let (genesis_block, db_ledger) = Self::make_db_ledger(ledger_path); let (bank, entry_height, last_entry_id) = @@ -135,6 +139,7 @@ impl Fullnode { sigverify_disabled, rpc_port, storage_rotate_count, + entry_stream, ) } @@ -150,6 +155,7 @@ impl Fullnode { entrypoint_info_option: Option<&NodeInfo>, sigverify_disabled: bool, rpc_port: Option, + entry_stream: Option, ) -> Self { let (_genesis_block, db_ledger) = Self::make_db_ledger(ledger_path); Self::new_with_bank_and_db_ledger( @@ -164,6 +170,7 @@ impl Fullnode { sigverify_disabled, rpc_port, NUM_HASHES_FOR_STORAGE_ROTATE, + entry_stream, ) } @@ -180,6 +187,7 @@ impl Fullnode { sigverify_disabled: bool, rpc_port: Option, storage_rotate_count: u64, + entry_stream: Option, ) -> Self { let mut rpc_addr = node.info.rpc; let mut rpc_pubsub_addr = node.info.rpc_pubsub; @@ -299,6 +307,7 @@ impl Fullnode { storage_rotate_count, to_leader_sender, &storage_state, + entry_stream, ); let max_tick_height = { let ls_lock = bank.leader_scheduler.read().unwrap(); @@ -564,6 +573,7 @@ mod tests { Some(&entry), false, None, + None, ); v.close().unwrap(); remove_dir_all(validator_ledger_path).unwrap(); @@ -606,6 +616,7 @@ mod tests { Some(&entry), false, None, + None, ) }) .collect(); @@ -677,6 +688,7 @@ mod tests { Some(&bootstrap_leader_info), false, None, + None, ); // Wait for the leader to transition, ticks should cause the leader to @@ -781,6 +793,7 @@ mod tests { Some(&bootstrap_leader_info), false, None, + None, ); assert!(!bootstrap_leader.node_services.tpu.is_leader()); @@ -795,6 +808,7 @@ mod tests { Some(&bootstrap_leader_info), false, None, + None, ); assert!(validator.node_services.tpu.is_leader()); @@ -890,6 +904,7 @@ mod tests { Some(&leader_node.info), false, None, + None, ); // Send blobs to the validator from our mock leader diff --git a/src/lib.rs b/src/lib.rs index 3ecae922a..b268ff9b1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,6 +34,7 @@ pub mod compute_leader_confirmation_service; pub mod db_ledger; pub mod db_window; pub mod entry; +pub mod entry_stream; #[cfg(feature = "erasure")] pub mod erasure; pub mod fetch_stage; diff --git a/src/replay_stage.rs b/src/replay_stage.rs index 70f19bbbb..2a6638b56 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -3,8 +3,12 @@ use crate::bank::Bank; use crate::cluster_info::ClusterInfo; use crate::counter::Counter; -use crate::entry::EntrySlice; -use crate::entry::{EntryReceiver, EntrySender}; +use crate::entry::{EntryReceiver, EntrySender, EntrySlice}; +#[cfg(not(test))] +use crate::entry_stream::EntryStream; +use crate::entry_stream::EntryStreamHandler; +#[cfg(test)] +use crate::entry_stream::MockEntryStream as EntryStream; use crate::fullnode::TvuRotationSender; use crate::leader_scheduler::TICKS_PER_BLOCK; use crate::packet::BlobError; @@ -65,6 +69,7 @@ impl ReplayStage { ledger_entry_sender: &EntrySender, entry_height: &Arc>, last_entry_id: &Arc>, + entry_stream: Option<&mut EntryStream>, ) -> Result<()> { let timer = Duration::new(1, 0); //coalesce all the available entries into a single vote @@ -76,6 +81,12 @@ impl ReplayStage { } } + if let Some(stream) = entry_stream { + stream.stream_entries(&entries).unwrap_or_else(|e| { + error!("Entry Stream error: {:?}, {:?}", e, stream.socket); + }); + } + submit( influxdb::Point::new("replicate-stage") .add_field("count", influxdb::Value::Integer(entries.len() as i64)) @@ -192,7 +203,7 @@ impl ReplayStage { Ok(()) } - #[allow(clippy::new_ret_no_self)] + #[allow(clippy::new_ret_no_self, clippy::too_many_arguments)] pub fn new( keypair: Arc, vote_signer: Option>, @@ -203,6 +214,7 @@ impl ReplayStage { entry_height: Arc>, last_entry_id: Arc>, to_leader_sender: TvuRotationSender, + entry_stream: Option, ) -> (Self, EntryReceiver) { let (vote_blob_sender, vote_blob_receiver) = channel(); let (ledger_entry_sender, ledger_entry_receiver) = channel(); @@ -219,6 +231,7 @@ impl ReplayStage { let (mut last_leader_id, _) = bank .get_current_leader() .expect("Scheduled leader should be calculated by this point"); + let mut entry_stream = entry_stream.map(EntryStream::new); loop { let (leader_id, _) = bank .get_current_leader() @@ -244,6 +257,7 @@ impl ReplayStage { &ledger_entry_sender, &entry_height_.clone(), &last_entry_id.clone(), + entry_stream.as_mut(), ) { Err(Error::RecvTimeoutError(RecvTimeoutError::Disconnected)) => break, Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), @@ -275,6 +289,7 @@ impl Service for ReplayStage { #[cfg(test)] mod test { + use super::*; use crate::bank::Bank; use crate::cluster_info::{ClusterInfo, Node}; use crate::db_ledger::create_tmp_sample_ledger; @@ -383,6 +398,7 @@ mod test { Arc::new(RwLock::new(initial_entry_len)), Arc::new(RwLock::new(last_entry_id)), rotation_sender, + None, ); // Send enough ticks to trigger leader rotation @@ -485,6 +501,7 @@ mod test { Arc::new(RwLock::new(initial_entry_len as u64)), Arc::new(RwLock::new(last_entry_id)), to_leader_sender, + None, ); // Vote sender should error because no leader contact info is found in the @@ -603,6 +620,7 @@ mod test { Arc::new(RwLock::new(initial_entry_len as u64)), Arc::new(RwLock::new(last_entry_id)), rotation_tx, + None, ); // Vote sender should error because no leader contact info is found in the @@ -673,7 +691,6 @@ mod test { let (entry_sender, entry_receiver) = channel(); let (ledger_entry_sender, _ledger_entry_receiver) = channel(); let last_entry_id = Hash::default(); - // Create keypair for the old leader let entry_height = 0; let mut last_id = Hash::default(); @@ -699,6 +716,7 @@ mod test { &ledger_entry_sender, &Arc::new(RwLock::new(entry_height)), &Arc::new(RwLock::new(last_entry_id)), + None, ); match res { @@ -725,6 +743,7 @@ mod test { &ledger_entry_sender, &Arc::new(RwLock::new(entry_height)), &Arc::new(RwLock::new(last_entry_id)), + None, ); match res { @@ -737,4 +756,53 @@ mod test { ), } } + + #[test] + fn test_replay_stage_stream_entries() { + // Set up entry stream + let mut entry_stream = EntryStream::new("test_stream".to_string()); + + // Set up dummy node to host a ReplayStage + let my_keypair = Keypair::new(); + let my_id = my_keypair.pubkey(); + let my_node = Node::new_localhost_with_pubkey(my_id); + // Set up the cluster info + let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone()))); + let (entry_sender, entry_receiver) = channel(); + let (ledger_entry_sender, _ledger_entry_receiver) = channel(); + let last_entry_id = Hash::default(); + + let entry_height = 0; + let mut last_id = Hash::default(); + let mut entries = Vec::new(); + let mut expected_entries = Vec::new(); + for _ in 0..5 { + let entry = Entry::new(&mut last_id, 0, 1, vec![]); //just ticks + last_id = entry.id; + expected_entries.push(serde_json::to_string(&entry).unwrap()); + entries.push(entry); + } + entry_sender + .send(entries.clone()) + .expect("Expected to err out"); + + let my_keypair = Arc::new(my_keypair); + let vote_signer = Arc::new(VoteSignerProxy::new_local(&my_keypair)); + ReplayStage::process_entries( + &Arc::new(Bank::default()), + &cluster_info_me, + &entry_receiver, + &my_keypair, + Some(&vote_signer), + None, + &ledger_entry_sender, + &Arc::new(RwLock::new(entry_height)), + &Arc::new(RwLock::new(last_entry_id)), + Some(&mut entry_stream), + ) + .unwrap(); + + assert_eq!(entry_stream.socket.len(), 5); + assert_eq!(entry_stream.socket, expected_entries); + } } diff --git a/src/thin_client.rs b/src/thin_client.rs index db906db6e..35e3acb7f 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -474,6 +474,7 @@ pub fn new_fullnode( None, false, None, + None, ); (server, leader_data, genesis_block, alice, ledger_path) diff --git a/src/tvu.rs b/src/tvu.rs index 41f5f474e..3a1cca9be 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -72,6 +72,7 @@ impl Tvu { storage_rotate_count: u64, to_leader_sender: TvuRotationSender, storage_state: &StorageState, + entry_stream: Option, ) -> Self { let exit = Arc::new(AtomicBool::new(false)); let keypair: Arc = cluster_info @@ -121,6 +122,7 @@ impl Tvu { l_entry_height.clone(), l_last_entry_id.clone(), to_leader_sender, + entry_stream, ); let storage_stage = StorageStage::new( @@ -300,6 +302,7 @@ pub mod tests { STORAGE_ROTATE_TEST_COUNT, sender, &StorageState::default(), + None, ); let mut alice_ref_balance = starting_balance; diff --git a/tests/multinode.rs b/tests/multinode.rs index 12e267f6b..a2200d66c 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -165,6 +165,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { None, false, None, + None, ); // start up another validator from zero, converge and then check @@ -185,6 +186,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { Some(&leader_data), false, None, + None, ); // Send validator some tokens to vote @@ -269,6 +271,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { None, false, None, + None, ); let mut nodes = vec![server]; @@ -303,6 +306,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { Some(&leader_data), false, None, + None, ); nodes.push(val); } @@ -366,6 +370,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { Some(&leader_data), false, None, + None, ); nodes.push(val); let servers = converge(&leader_data, N + 2); // contains the leader and new node @@ -456,6 +461,7 @@ fn test_multi_node_basic() { None, false, None, + None, ); let mut nodes = vec![server]; @@ -486,6 +492,7 @@ fn test_multi_node_basic() { Some(&leader_data), false, None, + None, ); nodes.push(val); } @@ -566,6 +573,7 @@ fn test_boot_validator_from_file() -> result::Result<()> { None, false, None, + None, ); let leader_balance = send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(500)).unwrap(); @@ -591,6 +599,7 @@ fn test_boot_validator_from_file() -> result::Result<()> { Some(&leader_data), false, None, + None, ); let mut client = mk_client(&validator_data); let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(leader_balance)); @@ -624,6 +633,7 @@ fn create_leader( None, false, None, + None, ); (leader_data, leader_fullnode) } @@ -701,6 +711,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { Some(&leader_data), false, None, + None, ); // trigger broadcast, validator should catch up from leader, whose window contains @@ -770,6 +781,7 @@ fn test_multi_node_dynamic_network() { None, true, None, + None, ); info!( "found leader: {:?}", @@ -845,6 +857,7 @@ fn test_multi_node_dynamic_network() { Some(&leader_data), true, None, + None, ); (rd, val) }) @@ -1026,6 +1039,7 @@ fn test_leader_to_validator_transition() { Some(&leader_info), false, None, + None, ); // Make an extra node for our leader to broadcast to, @@ -1181,6 +1195,7 @@ fn test_leader_validator_basic() { Some(&leader_info), false, None, + None, ); // Start the leader fullnode @@ -1194,6 +1209,7 @@ fn test_leader_validator_basic() { Some(&leader_info), false, None, + None, ); // Wait for convergence @@ -1389,6 +1405,7 @@ fn test_dropped_handoff_recovery() { Some(&bootstrap_leader_info), false, None, + None, ); let mut nodes = vec![bootstrap_leader]; @@ -1412,6 +1429,7 @@ fn test_dropped_handoff_recovery() { Some(&bootstrap_leader_info), false, None, + None, ); nodes.push(validator); @@ -1439,6 +1457,7 @@ fn test_dropped_handoff_recovery() { Some(&bootstrap_leader_info), false, None, + None, ); info!("Wait for 'next leader' to assume leader role"); @@ -1589,6 +1608,7 @@ fn test_full_leader_validator_network() { Some(&bootstrap_leader_info), false, None, + None, ); schedules.push(leader_scheduler); @@ -1607,6 +1627,7 @@ fn test_full_leader_validator_network() { Some(&bootstrap_leader_info), false, None, + None, ); schedules.push(leader_scheduler); @@ -1785,6 +1806,7 @@ fn test_broadcast_last_tick() { Some(&bootstrap_leader_info), false, None, + None, ); // Wait for convergence diff --git a/tests/replicator.rs b/tests/replicator.rs index 48536d8b4..48e0f40b5 100644 --- a/tests/replicator.rs +++ b/tests/replicator.rs @@ -61,6 +61,7 @@ fn test_replicator_startup() { ))), None, STORAGE_ROTATE_TEST_COUNT, + None, ); let validator_keypair = Arc::new(Keypair::new()); @@ -91,6 +92,7 @@ fn test_replicator_startup() { ))), None, STORAGE_ROTATE_TEST_COUNT, + None, ); let bob = Keypair::new(); @@ -287,6 +289,7 @@ fn test_replicator_startup_ledger_hang() { None, false, None, + None, ); let validator_keypair = Arc::new(Keypair::new()); @@ -304,6 +307,7 @@ fn test_replicator_startup_ledger_hang() { Some(&leader_info), false, None, + None, ); info!("starting replicator node");