From 1fd7bd7ede472ee0f5581623db96487ee13a1c60 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Wed, 2 Jan 2019 11:02:15 -0800 Subject: [PATCH] Storage fixes * replicators generate their sample values * fixes to replicator block height logic --- programs/native/storage/src/lib.rs | 1 + replicator/src/main.rs | 1 + sdk/src/storage_program.rs | 11 +- src/chacha.rs | 20 ++- src/fullnode.rs | 36 ++++ src/replicator.rs | 268 ++++++++++++++++++----------- src/rpc.rs | 2 + src/storage_stage.rs | 20 ++- src/thin_client.rs | 6 + src/tvu.rs | 4 + tests/replicator.rs | 62 +++++-- wallet/tests/pay.rs | 4 + wallet/tests/request_airdrop.rs | 2 + 13 files changed, 302 insertions(+), 135 deletions(-) diff --git a/programs/native/storage/src/lib.rs b/programs/native/storage/src/lib.rs index a449a2d17..674822786 100644 --- a/programs/native/storage/src/lib.rs +++ b/programs/native/storage/src/lib.rs @@ -29,6 +29,7 @@ fn entrypoint( StorageProgram::SubmitMiningProof { sha_state, entry_height, + .. } => { info!( "Mining proof submitted with state {:?} entry_height: {}", diff --git a/replicator/src/main.rs b/replicator/src/main.rs index 778f06073..0d1324411 100644 --- a/replicator/src/main.rs +++ b/replicator/src/main.rs @@ -27,6 +27,7 @@ fn main() { .long("network") .value_name("HOST:PORT") .takes_value(true) + .required(true) .help("Rendezvous with the network at this gossip entry point"), ) .arg( diff --git a/sdk/src/storage_program.rs b/sdk/src/storage_program.rs index 69a09af1e..4f21cd886 100644 --- a/sdk/src/storage_program.rs +++ b/sdk/src/storage_program.rs @@ -1,11 +1,15 @@ use crate::hash::Hash; use crate::pubkey::Pubkey; -use crate::signature::{Keypair, KeypairUtil}; +use crate::signature::{Keypair, KeypairUtil, Signature}; use crate::transaction::Transaction; #[derive(Serialize, Deserialize, Debug, Clone)] pub enum StorageProgram { - SubmitMiningProof { sha_state: Hash, entry_height: u64 }, + SubmitMiningProof { + sha_state: Hash, + entry_height: u64, + signature: Signature, + }, } pub const STORAGE_PROGRAM_ID: [u8; 32] = [ @@ -27,6 +31,7 @@ pub trait StorageTransaction { sha_state: Hash, last_id: Hash, entry_height: u64, + signature: Signature, ) -> Self; } @@ -36,10 +41,12 @@ impl StorageTransaction for Transaction { sha_state: Hash, last_id: Hash, entry_height: u64, + signature: Signature, ) -> Self { let program = StorageProgram::SubmitMiningProof { sha_state, entry_height, + signature, }; Transaction::new( from_keypair, diff --git a/src/chacha.rs b/src/chacha.rs index 4620315e0..cecc8f3fc 100644 --- a/src/chacha.rs +++ b/src/chacha.rs @@ -38,13 +38,15 @@ pub fn chacha_cbc_encrypt_ledger( slice: u64, out_path: &Path, ivec: &mut [u8; CHACHA_BLOCK_SIZE], -) -> io::Result<()> { +) -> io::Result { let mut out_file = BufWriter::new(File::create(out_path).expect("Can't open ledger encrypted data file")); - let mut buffer = [0; 8 * 1024]; - let mut encrypted_buffer = [0; 8 * 1024]; + const BUFFER_SIZE: usize = 8 * 1024; + let mut buffer = [0; BUFFER_SIZE]; + let mut encrypted_buffer = [0; BUFFER_SIZE]; let key = [0; CHACHA_KEY_SIZE]; let mut total_entries = 0; + let mut total_size = 0; let mut entry = slice; loop { @@ -60,10 +62,18 @@ pub fn chacha_cbc_encrypt_ledger( slice, num_entries, entry_len ); debug!("read {} bytes", entry_len); - let size = entry_len as usize; + let mut size = entry_len as usize; if size == 0 { break; } + + if size < BUFFER_SIZE { + // We are on the last block, round to the nearest key_size + // boundary + size = (size + CHACHA_KEY_SIZE - 1) & !(CHACHA_KEY_SIZE - 1); + } + total_size += size; + chacha_cbc_encrypt(&buffer[..size], &mut encrypted_buffer[..size], &key, ivec); if let Err(res) = out_file.write(&encrypted_buffer[..size]) { println!("Error writing file! {:?}", res); @@ -79,7 +89,7 @@ pub fn chacha_cbc_encrypt_ledger( } } } - Ok(()) + Ok(total_size) } #[cfg(test)] diff --git a/src/fullnode.rs b/src/fullnode.rs index 90efc91e0..c36a8a75e 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -10,6 +10,7 @@ use crate::leader_scheduler::LeaderScheduler; use crate::rpc::JsonRpcService; use crate::rpc_pubsub::PubSubService; use crate::service::Service; +use crate::storage_stage::STORAGE_ROTATE_TEST_COUNT; use crate::tpu::{Tpu, TpuReturnType}; use crate::tpu_forwarder::TpuForwarder; use crate::tvu::{Sockets, Tvu, TvuReturnType}; @@ -118,6 +119,34 @@ impl Fullnode { sigverify_disabled: bool, leader_scheduler: LeaderScheduler, rpc_port: Option, + ) -> Self { + // TODO: remove this, temporary parameter to configure + // storage amount differently for test configurations + // so tests don't take forever to run. + const NUM_HASHES_FOR_STORAGE_ROTATE: u64 = 1024; + Self::new_with_storage_rotate( + node, + ledger_path, + keypair, + vote_signer, + leader_addr, + sigverify_disabled, + leader_scheduler, + rpc_port, + NUM_HASHES_FOR_STORAGE_ROTATE, + ) + } + + pub fn new_with_storage_rotate( + node: Node, + ledger_path: &str, + keypair: Arc, + vote_signer: Arc, + leader_addr: Option, + sigverify_disabled: bool, + leader_scheduler: LeaderScheduler, + rpc_port: Option, + storage_rotate_count: u64, ) -> Self { let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); @@ -152,6 +181,7 @@ impl Fullnode { ledger_path, sigverify_disabled, rpc_port, + storage_rotate_count, ); match leader_addr { @@ -183,6 +213,7 @@ impl Fullnode { ledger_path: &str, sigverify_disabled: bool, rpc_port: Option, + storage_rotate_count: u64, ) -> Self { let mut rpc_addr = node.info.rpc; let mut rpc_pubsub_addr = node.info.rpc_pubsub; @@ -283,6 +314,7 @@ impl Fullnode { &cluster_info, sockets, db_ledger.clone(), + storage_rotate_count, ); let tpu_forwarder = TpuForwarder::new( node.sockets @@ -444,6 +476,7 @@ impl Fullnode { &self.cluster_info, sockets, self.db_ledger.clone(), + STORAGE_ROTATE_TEST_COUNT, ); let tpu_forwarder = TpuForwarder::new( self.tpu_sockets @@ -638,6 +671,7 @@ mod tests { make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig, }; use crate::service::Service; + use crate::storage_stage::STORAGE_ROTATE_TEST_COUNT; use crate::streamer::responder; use crate::vote_signer_proxy::VoteSignerProxy; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -679,6 +713,7 @@ mod tests { &validator_ledger_path, false, None, + STORAGE_ROTATE_TEST_COUNT, ); v.close().unwrap(); remove_dir_all(validator_ledger_path).unwrap(); @@ -722,6 +757,7 @@ mod tests { &validator_ledger_path, false, None, + STORAGE_ROTATE_TEST_COUNT, ) }) .collect(); diff --git a/src/replicator.rs b/src/replicator.rs index 0d6569e8a..a431c97d2 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -9,15 +9,15 @@ use crate::leader_scheduler::LeaderScheduler; use crate::result::Result; use crate::rpc_request::{RpcClient, RpcRequest, RpcRequestHandler}; use crate::service::Service; -use crate::storage_stage::ENTRIES_PER_SEGMENT; +use crate::storage_stage::{get_segment_from_entry, ENTRIES_PER_SEGMENT}; use crate::streamer::BlobReceiver; -use crate::thin_client::retry_get_balance; +use crate::thin_client::{retry_get_balance, ThinClient}; use crate::window_service::window_service; use rand::thread_rng; use rand::Rng; use solana_drone::drone::{request_airdrop_transaction, DRONE_PORT}; use solana_sdk::hash::{Hash, Hasher}; -use solana_sdk::signature::{Keypair, KeypairUtil}; +use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::storage_program::StorageTransaction; use solana_sdk::transaction::Transaction; use std::fs::File; @@ -43,6 +43,7 @@ pub struct Replicator { t_window: JoinHandle<()>, pub retransmit_receiver: BlobReceiver, exit: Arc, + entry_height: u64, } pub fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result { @@ -80,6 +81,23 @@ pub fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result { Ok(hasher.result()) } +fn get_entry_heights_from_last_id( + signature: &ring::signature::Signature, + storage_entry_height: u64, +) -> (u64, u64) { + let signature_vec = signature.as_ref(); + let mut segment_index = u64::from(signature_vec[0]) + | (u64::from(signature_vec[1]) << 8) + | (u64::from(signature_vec[1]) << 16) + | (u64::from(signature_vec[2]) << 24); + let max_segment_index = get_segment_from_entry(storage_entry_height); + segment_index %= max_segment_index as u64; + let entry_height = segment_index * ENTRIES_PER_SEGMENT; + let max_entry_height = entry_height + ENTRIES_PER_SEGMENT; + + (entry_height, max_entry_height) +} + impl Replicator { #[allow(clippy::new_ret_no_self)] pub fn new( @@ -120,53 +138,18 @@ impl Replicator { ); info!("polling for leader"); - let leader; - loop { - if let Some(l) = cluster_info.read().unwrap().get_gossip_top_leader() { - leader = l.clone(); - break; - } - - sleep(Duration::from_millis(900)); - info!("{}", cluster_info.read().unwrap().node_info_trace()); - } + let leader = Self::poll_for_leader(&cluster_info)?; info!("Got leader: {:?}", leader); - let mut storage_last_id; - let mut storage_entry_height; - loop { - let rpc_client = { - let cluster_info = cluster_info.read().unwrap(); - let rpc_peers = cluster_info.rpc_peers(); - info!("rpc peers: {:?}", rpc_peers); - let node_idx = thread_rng().gen_range(0, rpc_peers.len()); - RpcClient::new_from_socket(rpc_peers[node_idx].rpc) - }; - - storage_last_id = rpc_client - .make_rpc_request(2, RpcRequest::GetStorageMiningLastId, None) - .expect("rpc request") - .to_string(); - storage_entry_height = rpc_client - .make_rpc_request(2, RpcRequest::GetStorageMiningEntryHeight, None) - .expect("rpc request") - .as_u64() - .unwrap(); - if storage_entry_height != 0 { - break; - } - } + let (storage_last_id, storage_entry_height) = + Self::poll_for_last_id_and_entry_height(&cluster_info)?; let signature = keypair.sign(storage_last_id.as_ref()); - let signature = signature.as_ref(); - let block_index = u64::from(signature[0]) - | (u64::from(signature[1]) << 8) - | (u64::from(signature[1]) << 16) - | (u64::from(signature[2]) << 24); - let mut entry_height = block_index * ENTRIES_PER_SEGMENT; - entry_height %= storage_entry_height; - let max_entry_height = entry_height + ENTRIES_PER_SEGMENT; + let (entry_height, max_entry_height) = + get_entry_heights_from_last_id(&signature, storage_entry_height); + + info!("replicating entry_height: {}", entry_height); let repair_socket = Arc::new(node.sockets.repair); let mut blob_sockets: Vec> = @@ -208,7 +191,141 @@ impl Replicator { let mut client = mk_client(&leader); - if retry_get_balance(&mut client, &keypair.pubkey(), None).is_none() { + Self::get_airdrop_tokens(&mut client, keypair, &leader_info); + info!("Done downloading ledger at {}", ledger_path.unwrap()); + + let ledger_path = Path::new(ledger_path.unwrap()); + let ledger_data_file_encrypted = ledger_path.join("ledger.enc"); + let mut sampling_offsets = Vec::new(); + + #[cfg(not(feature = "chacha"))] + sampling_offsets.push(0); + + #[cfg(feature = "chacha")] + { + use crate::storage_stage::NUM_STORAGE_SAMPLES; + use rand::{Rng, SeedableRng}; + use rand_chacha::ChaChaRng; + + let mut ivec = [0u8; 64]; + ivec.copy_from_slice(signature.as_ref()); + + let num_encrypted_bytes = chacha_cbc_encrypt_ledger( + &db_ledger, + entry_height, + &ledger_data_file_encrypted, + &mut ivec, + )?; + + let num_chacha_blocks = num_encrypted_bytes / CHACHA_BLOCK_SIZE; + let mut rng_seed = [0u8; 32]; + rng_seed.copy_from_slice(&signature.as_ref()[0..32]); + let mut rng = ChaChaRng::from_seed(rng_seed); + for _ in 0..NUM_STORAGE_SAMPLES { + sampling_offsets.push(rng.gen_range(0, num_chacha_blocks) as u64); + } + } + + info!("Done encrypting the ledger"); + + match sample_file(&ledger_data_file_encrypted, &sampling_offsets) { + Ok(hash) => { + let last_id = client.get_last_id(); + info!("sampled hash: {}", hash); + let mut tx = Transaction::storage_new_mining_proof( + &keypair, + hash, + last_id, + entry_height, + Signature::new(signature.as_ref()), + ); + client + .retry_transfer(&keypair, &mut tx, 10) + .expect("transfer didn't work!"); + } + Err(e) => info!("Error occurred while sampling: {:?}", e), + } + + Ok(Self { + gossip_service, + fetch_stage, + t_window, + retransmit_receiver, + exit, + entry_height, + }) + } + + pub fn close(self) { + self.exit.store(true, Ordering::Relaxed); + self.join() + } + + pub fn join(self) { + self.gossip_service.join().unwrap(); + self.fetch_stage.join().unwrap(); + self.t_window.join().unwrap(); + + // Drain the queue here to prevent self.retransmit_receiver from being dropped + // before the window_service thread is joined + let mut retransmit_queue_count = 0; + while let Ok(_blob) = self.retransmit_receiver.recv_timeout(Duration::new(1, 0)) { + retransmit_queue_count += 1; + } + debug!("retransmit channel count: {}", retransmit_queue_count); + } + + pub fn entry_height(&self) -> u64 { + self.entry_height + } + + fn poll_for_leader(cluster_info: &Arc>) -> Result { + for _ in 0..30 { + if let Some(l) = cluster_info.read().unwrap().get_gossip_top_leader() { + return Ok(l.clone()); + } + + sleep(Duration::from_millis(900)); + info!("{}", cluster_info.read().unwrap().node_info_trace()); + } + Err(Error::new(ErrorKind::Other, "Couldn't find leader"))? + } + + fn poll_for_last_id_and_entry_height( + cluster_info: &Arc>, + ) -> Result<(String, u64)> { + for _ in 0..10 { + let rpc_client = { + let cluster_info = cluster_info.read().unwrap(); + let rpc_peers = cluster_info.rpc_peers(); + debug!("rpc peers: {:?}", rpc_peers); + let node_idx = thread_rng().gen_range(0, rpc_peers.len()); + RpcClient::new_from_socket(rpc_peers[node_idx].rpc) + }; + + let storage_last_id = rpc_client + .make_rpc_request(2, RpcRequest::GetStorageMiningLastId, None) + .expect("rpc request") + .to_string(); + let storage_entry_height = rpc_client + .make_rpc_request(2, RpcRequest::GetStorageMiningEntryHeight, None) + .expect("rpc request") + .as_u64() + .unwrap(); + if get_segment_from_entry(storage_entry_height) != 0 { + return Ok((storage_last_id, storage_entry_height)); + } + info!("max entry_height: {}", storage_entry_height); + sleep(Duration::from_secs(3)); + } + Err(Error::new( + ErrorKind::Other, + "Couldn't get last_id or entry_height", + ))? + } + + fn get_airdrop_tokens(client: &mut ThinClient, keypair: &Keypair, leader_info: &NodeInfo) { + if retry_get_balance(client, &keypair.pubkey(), None).is_none() { let mut drone_addr = leader_info.tpu; drone_addr.set_port(DRONE_PORT); @@ -233,65 +350,6 @@ impl Replicator { } }; } - - info!("Done downloading ledger at {}", ledger_path.unwrap()); - - let ledger_path = Path::new(ledger_path.unwrap()); - let ledger_data_file_encrypted = ledger_path.join("ledger.enc"); - #[cfg(feature = "chacha")] - { - let mut ivec = [0u8; CHACHA_BLOCK_SIZE]; - ivec[0..4].copy_from_slice(&[2, 3, 4, 5]); - - chacha_cbc_encrypt_ledger( - &db_ledger, - entry_height, - &ledger_data_file_encrypted, - &mut ivec, - )?; - } - - info!("Done encrypting the ledger"); - - let sampling_offsets = [0, 1, 2, 3]; - - match sample_file(&ledger_data_file_encrypted, &sampling_offsets) { - Ok(hash) => { - let last_id = client.get_last_id(); - info!("sampled hash: {}", hash); - let tx = - Transaction::storage_new_mining_proof(&keypair, hash, last_id, entry_height); - client.transfer_signed(&tx).expect("transfer didn't work!"); - } - Err(e) => info!("Error occurred while sampling: {:?}", e), - } - - Ok(Self { - gossip_service, - fetch_stage, - t_window, - retransmit_receiver, - exit, - }) - } - - pub fn close(self) { - self.exit.store(true, Ordering::Relaxed); - self.join() - } - - pub fn join(self) { - self.gossip_service.join().unwrap(); - self.fetch_stage.join().unwrap(); - self.t_window.join().unwrap(); - - // Drain the queue here to prevent self.retransmit_receiver from being dropped - // before the window_service thread is joined - let mut retransmit_queue_count = 0; - while let Ok(_blob) = self.retransmit_receiver.recv_timeout(Duration::new(1, 0)) { - retransmit_queue_count += 1; - } - debug!("retransmit channel count: {}", retransmit_queue_count); } } diff --git a/src/rpc.rs b/src/rpc.rs index de04da94a..3bca86127 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -454,6 +454,7 @@ mod tests { use crate::leader_scheduler::LeaderScheduler; use crate::mint::Mint; use crate::rpc_request::get_rpc_request_str; + use crate::storage_stage::STORAGE_ROTATE_TEST_COUNT; use crate::vote_signer_proxy::VoteSignerProxy; use bincode::serialize; use reqwest; @@ -749,6 +750,7 @@ mod tests { &ledger_path, false, None, + STORAGE_ROTATE_TEST_COUNT, ); sleep(Duration::from_millis(900)); diff --git a/src/storage_stage.rs b/src/storage_stage.rs index 4a0a0d552..cd777985f 100644 --- a/src/storage_stage.rs +++ b/src/storage_stage.rs @@ -56,10 +56,10 @@ macro_rules! cross_boundary { }; } -const NUM_HASHES_FOR_STORAGE_ROTATE: u64 = 1024; +pub const STORAGE_ROTATE_TEST_COUNT: u64 = 128; // TODO: some way to dynamically size NUM_IDENTITIES const NUM_IDENTITIES: usize = 1024; -const NUM_SAMPLES: usize = 4; +pub const NUM_STORAGE_SAMPLES: usize = 4; pub const ENTRIES_PER_SEGMENT: u64 = 16; const KEY_SIZE: usize = 64; @@ -139,6 +139,7 @@ impl StorageStage { keypair: Arc, exit: Arc, entry_height: u64, + storage_rotate_count: u64, ) -> Self { debug!("storage_stage::new: entry_height: {}", entry_height); storage_state.state.write().unwrap().entry_height = entry_height; @@ -160,6 +161,7 @@ impl StorageStage { &mut poh_height, &mut entry_height, &mut current_key, + storage_rotate_count, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, @@ -211,7 +213,7 @@ impl StorageStage { ); let mut samples = vec![]; - for _ in 0..NUM_SAMPLES { + for _ in 0..NUM_STORAGE_SAMPLES { samples.push(rng.gen_range(0, 10)); } debug!("generated samples: {:?}", samples); @@ -256,6 +258,7 @@ impl StorageStage { poh_height: &mut u64, entry_height: &mut u64, current_key_idx: &mut usize, + storage_rotate_count: u64, ) -> Result<()> { let timeout = Duration::new(1, 0); let entries = entry_receiver.recv_timeout(timeout)?; @@ -306,7 +309,7 @@ impl StorageStage { } } } - if cross_boundary!(*poh_height, entry.num_hashes, NUM_HASHES_FOR_STORAGE_ROTATE) { + if cross_boundary!(*poh_height, entry.num_hashes, storage_rotate_count) { info!( "crosses sending at poh_height: {} entry_height: {}! hashes: {}", *poh_height, entry_height, entry.num_hashes @@ -343,7 +346,9 @@ mod tests { use crate::service::Service; use crate::storage_stage::StorageState; use crate::storage_stage::NUM_IDENTITIES; - use crate::storage_stage::{get_identity_index_from_signature, StorageStage}; + use crate::storage_stage::{ + get_identity_index_from_signature, StorageStage, STORAGE_ROTATE_TEST_COUNT, + }; use rayon::prelude::*; use solana_sdk::hash::Hash; use solana_sdk::hash::Hasher; @@ -373,6 +378,7 @@ mod tests { keypair, exit.clone(), 0, + STORAGE_ROTATE_TEST_COUNT, ); exit.store(true, Ordering::Relaxed); storage_stage.join().unwrap(); @@ -392,7 +398,7 @@ mod tests { 1, ); - let entries = make_tiny_test_entries(128); + let entries = make_tiny_test_entries(64); let db_ledger = DbLedger::open(&ledger_path).unwrap(); db_ledger .write_entries(DEFAULT_SLOT_HEIGHT, genesis_entries.len() as u64, &entries) @@ -407,6 +413,7 @@ mod tests { keypair, exit.clone(), 0, + STORAGE_ROTATE_TEST_COUNT, ); storage_entry_sender.send(entries.clone()).unwrap(); @@ -471,6 +478,7 @@ mod tests { keypair, exit.clone(), 0, + STORAGE_ROTATE_TEST_COUNT, ); storage_entry_sender.send(entries.clone()).unwrap(); diff --git a/src/thin_client.rs b/src/thin_client.rs index f6ea9ead7..e09b05a9c 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -441,6 +441,7 @@ mod tests { use crate::fullnode::Fullnode; use crate::leader_scheduler::LeaderScheduler; use crate::mint::Mint; + use crate::storage_stage::STORAGE_ROTATE_TEST_COUNT; use crate::vote_signer_proxy::VoteSignerProxy; use bincode::deserialize; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -483,6 +484,7 @@ mod tests { &ledger_path, false, None, + STORAGE_ROTATE_TEST_COUNT, ); sleep(Duration::from_millis(900)); @@ -539,6 +541,7 @@ mod tests { &ledger_path, false, None, + STORAGE_ROTATE_TEST_COUNT, ); //TODO: remove this sleep, or add a retry so CI is stable sleep(Duration::from_millis(300)); @@ -600,6 +603,7 @@ mod tests { &ledger_path, false, None, + STORAGE_ROTATE_TEST_COUNT, ); sleep(Duration::from_millis(300)); @@ -650,6 +654,7 @@ mod tests { &ledger_path, false, None, + STORAGE_ROTATE_TEST_COUNT, ); sleep(Duration::from_millis(300)); @@ -746,6 +751,7 @@ mod tests { &ledger_path, false, None, + STORAGE_ROTATE_TEST_COUNT, ); sleep(Duration::from_millis(900)); diff --git a/src/tvu.rs b/src/tvu.rs index 7e0297475..0e3ddf303 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -65,6 +65,7 @@ impl Tvu { cluster_info: &Arc>, sockets: Sockets, db_ledger: Arc, + storage_rotate_count: u64, ) -> Self { let exit = Arc::new(AtomicBool::new(false)); let keypair: Arc = cluster_info @@ -119,6 +120,7 @@ impl Tvu { keypair, exit.clone(), entry_height, + storage_rotate_count, ); Tvu { @@ -179,6 +181,7 @@ pub mod tests { use crate::mint::Mint; use crate::packet::SharedBlob; use crate::service::Service; + use crate::storage_stage::STORAGE_ROTATE_TEST_COUNT; use crate::streamer; use crate::tvu::{Sockets, Tvu}; use crate::vote_signer_proxy::VoteSignerProxy; @@ -284,6 +287,7 @@ pub mod tests { } }, Arc::new(db_ledger), + STORAGE_ROTATE_TEST_COUNT, ); let mut alice_ref_balance = starting_balance; diff --git a/tests/replicator.rs b/tests/replicator.rs index 4915f9a90..6b335da0d 100644 --- a/tests/replicator.rs +++ b/tests/replicator.rs @@ -14,8 +14,10 @@ use solana::entry::Entry; use solana::fullnode::Fullnode; use solana::leader_scheduler::LeaderScheduler; use solana::replicator::Replicator; +use solana::storage_stage::STORAGE_ROTATE_TEST_COUNT; use solana::streamer::blob_receiver; use solana::vote_signer_proxy::VoteSignerProxy; +use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::transaction::Transaction; @@ -24,6 +26,7 @@ use std::fs::remove_dir_all; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::Arc; +use std::thread::sleep; use std::time::Duration; #[test] @@ -38,7 +41,8 @@ fn test_replicator_startup() { let leader_info = leader_node.info.clone(); let leader_ledger_path = "replicator_test_leader_ledger"; - let (mint, leader_ledger_path) = create_tmp_genesis(leader_ledger_path, 100, leader_info.id, 1); + let (mint, leader_ledger_path) = + create_tmp_genesis(leader_ledger_path, 1_000_000_000, leader_info.id, 1); let validator_ledger_path = tmp_copy_ledger(&leader_ledger_path, "replicator_test_validator_ledger"); @@ -47,7 +51,7 @@ fn test_replicator_startup() { let signer_proxy = VoteSignerProxy::new(&leader_keypair, Box::new(LocalVoteSigner::default())); - let leader = Fullnode::new( + let leader = Fullnode::new_with_storage_rotate( leader_node, &leader_ledger_path, leader_keypair, @@ -56,16 +60,27 @@ fn test_replicator_startup() { false, LeaderScheduler::from_bootstrap_leader(leader_info.id.clone()), None, + STORAGE_ROTATE_TEST_COUNT, ); let validator_keypair = Arc::new(Keypair::new()); let signer_proxy = VoteSignerProxy::new(&validator_keypair, Box::new(LocalVoteSigner::default())); + + let mut leader_client = mk_client(&leader_info); + + let last_id = leader_client.get_last_id(); + let mut leader_client = mk_client(&leader_info); + + leader_client + .transfer(10, &mint.keypair(), validator_keypair.pubkey(), &last_id) + .unwrap(); + let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey()); #[cfg(feature = "chacha")] let validator_node_info = validator_node.info.clone(); - let validator = Fullnode::new( + let validator = Fullnode::new_with_storage_rotate( validator_node, &validator_ledger_path, validator_keypair, @@ -74,19 +89,26 @@ fn test_replicator_startup() { false, LeaderScheduler::from_bootstrap_leader(leader_info.id), None, + STORAGE_ROTATE_TEST_COUNT, ); - let mut leader_client = mk_client(&leader_info); - let bob = Keypair::new(); - let last_id = leader_client.get_last_id(); - leader_client - .transfer(1, &mint.keypair(), bob.pubkey(), &last_id) - .unwrap(); + info!("starting transfers.."); + + for _ in 0..64 { + let last_id = leader_client.get_last_id(); + leader_client + .transfer(1, &mint.keypair(), bob.pubkey(), &last_id) + .unwrap(); + sleep(Duration::from_millis(200)); + } let replicator_keypair = Keypair::new(); + info!("giving replicator tokens.."); + + let last_id = leader_client.get_last_id(); // Give the replicator some tokens let amount = 1; let mut tx = Transaction::system_new( @@ -113,11 +135,13 @@ fn test_replicator_startup() { ) .unwrap(); + info!("started replicator.."); + // Create a client which downloads from the replicator and see that it // can respond with blobs. let tn = Node::new_localhost(); let cluster_info = ClusterInfo::new(tn.info.clone()); - let repair_index = 1; + let repair_index = replicator.entry_height(); let req = cluster_info .window_index_request_bytes(repair_index) .unwrap(); @@ -132,7 +156,7 @@ fn test_replicator_startup() { tn.info.id, replicator_info.gossip ); - let mut num_txs = 0; + let mut received_blob = false; for _ in 0..5 { repair_socket.send_to(&req, replicator_info.gossip).unwrap(); @@ -144,7 +168,8 @@ fn test_replicator_startup() { assert!(br.index().unwrap() == repair_index); let entry: Entry = deserialize(&br.data()[..br.meta.size]).unwrap(); info!("entry: {:?}", entry); - num_txs = entry.transactions.len(); + assert_ne!(entry.id, Hash::default()); + received_blob = true; } break; } @@ -152,6 +177,8 @@ fn test_replicator_startup() { exit.store(true, Ordering::Relaxed); t_receiver.join().unwrap(); + assert!(received_blob); + // The replicator will not submit storage proofs if // chacha is not enabled #[cfg(feature = "chacha")] @@ -159,10 +186,14 @@ fn test_replicator_startup() { use solana::rpc_request::{RpcClient, RpcRequest, RpcRequestHandler}; use std::thread::sleep; + info!( + "looking for pubkeys for entry: {}", + replicator.entry_height() + ); let rpc_client = RpcClient::new_from_socket(validator_node_info.rpc); let mut non_zero_pubkeys = false; - for _ in 0..30 { - let params = json!([0]); + for _ in 0..60 { + let params = json!([replicator.entry_height()]); let pubkeys = rpc_client .make_rpc_request(1, RpcRequest::GetStoragePubkeysForEntryHeight, Some(params)) .unwrap(); @@ -176,9 +207,6 @@ fn test_replicator_startup() { assert!(non_zero_pubkeys); } - // Check that some ledger was downloaded - assert!(num_txs != 0); - replicator.close(); validator.exit(); leader.close().expect("Expected successful node closure"); diff --git a/wallet/tests/pay.rs b/wallet/tests/pay.rs index 534a0c34f..1bac57f2c 100644 --- a/wallet/tests/pay.rs +++ b/wallet/tests/pay.rs @@ -7,6 +7,7 @@ use solana::fullnode::Fullnode; use solana::leader_scheduler::LeaderScheduler; use solana::mint::Mint; use solana::rpc_request::{RpcClient, RpcRequest, RpcRequestHandler}; +use solana::storage_stage::STORAGE_ROTATE_TEST_COUNT; use solana::vote_signer_proxy::VoteSignerProxy; use solana_drone::drone::run_local_drone; use solana_sdk::pubkey::Pubkey; @@ -62,6 +63,7 @@ fn test_wallet_timestamp_tx() { &ledger_path, false, None, + STORAGE_ROTATE_TEST_COUNT, ); sleep(Duration::from_millis(900)); @@ -161,6 +163,7 @@ fn test_wallet_witness_tx() { &ledger_path, false, None, + STORAGE_ROTATE_TEST_COUNT, ); sleep(Duration::from_millis(900)); @@ -256,6 +259,7 @@ fn test_wallet_cancel_tx() { &ledger_path, false, None, + STORAGE_ROTATE_TEST_COUNT, ); sleep(Duration::from_millis(900)); diff --git a/wallet/tests/request_airdrop.rs b/wallet/tests/request_airdrop.rs index feb91aad7..a90977a5c 100644 --- a/wallet/tests/request_airdrop.rs +++ b/wallet/tests/request_airdrop.rs @@ -6,6 +6,7 @@ use solana::fullnode::Fullnode; use solana::leader_scheduler::LeaderScheduler; use solana::mint::Mint; use solana::rpc_request::{RpcClient, RpcRequest, RpcRequestHandler}; +use solana::storage_stage::STORAGE_ROTATE_TEST_COUNT; use solana::vote_signer_proxy::VoteSignerProxy; use solana_drone::drone::run_local_drone; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -48,6 +49,7 @@ fn test_wallet_request_airdrop() { &ledger_path, false, None, + STORAGE_ROTATE_TEST_COUNT, ); sleep(Duration::from_millis(900));