diff --git a/programs/native/storage/src/lib.rs b/programs/native/storage/src/lib.rs index 2615178ada..5caebcda0a 100644 --- a/programs/native/storage/src/lib.rs +++ b/programs/native/storage/src/lib.rs @@ -14,7 +14,6 @@ use solana_sdk::account::KeyedAccount; use solana_sdk::native_program::ProgramError; use solana_sdk::pubkey::Pubkey; use solana_sdk::storage_program::*; -use std::sync::{Once, ONCE_INIT}; solana_entrypoint!(entrypoint); fn entrypoint( @@ -23,12 +22,6 @@ fn entrypoint( data: &[u8], _tick_height: u64, ) -> Result<(), ProgramError> { - static INIT: Once = ONCE_INIT; - INIT.call_once(|| { - // env_logger can only be initialized once - env_logger::init(); - }); - // accounts_keys[0] must be signed if keyed_accounts[0].signer_key().is_none() { info!("account[0] is unsigned"); @@ -37,8 +30,14 @@ fn entrypoint( if let Ok(syscall) = deserialize(data) { match syscall { - StorageProgram::SubmitMiningProof { sha_state } => { - info!("Mining proof submitted with state {:?}", sha_state); + StorageProgram::SubmitMiningProof { + sha_state, + entry_height, + } => { + info!( + "Mining proof submitted with state {:?} entry_height: {}", + sha_state, entry_height + ); } } Ok(()) diff --git a/sdk/src/storage_program.rs b/sdk/src/storage_program.rs index 5759f1bda3..a0efde05d8 100644 --- a/sdk/src/storage_program.rs +++ b/sdk/src/storage_program.rs @@ -5,7 +5,7 @@ use transaction::Transaction; #[derive(Serialize, Deserialize, Debug, Clone)] pub enum StorageProgram { - SubmitMiningProof { sha_state: Hash }, + SubmitMiningProof { sha_state: Hash, entry_height: u64 }, } pub const STORAGE_PROGRAM_ID: [u8; 32] = [ @@ -22,12 +22,25 @@ pub fn id() -> Pubkey { } pub trait StorageTransaction { - fn storage_new_mining_proof(from_keypair: &Keypair, sha_state: Hash, last_id: Hash) -> Self; + fn storage_new_mining_proof( + from_keypair: &Keypair, + sha_state: Hash, + last_id: Hash, + entry_height: u64, + ) -> Self; } impl StorageTransaction for Transaction { - fn storage_new_mining_proof(from_keypair: &Keypair, sha_state: Hash, last_id: Hash) -> Self { - let program = StorageProgram::SubmitMiningProof { sha_state }; + fn storage_new_mining_proof( + from_keypair: &Keypair, + sha_state: Hash, + last_id: Hash, + entry_height: u64, + ) -> Self { + let program = StorageProgram::SubmitMiningProof { + sha_state, + entry_height, + }; Transaction::new( from_keypair, &[from_keypair.pubkey()], diff --git a/src/bank.rs b/src/bank.rs index 5d257a15ad..4323e402d1 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -486,6 +486,11 @@ impl Bank { .expect("no last_id has been set") } + pub fn get_pubkeys_for_entry_height(&self, entry_height: u64) -> Vec { + self.storage_state + .get_pubkeys_for_entry_height(entry_height) + } + /// Store the given signature. The bank will reject any transaction with the same signature. fn reserve_signature(signatures: &mut SignatureStatusMap, signature: &Signature) -> Result<()> { if let Some(_result) = signatures.get(signature) { diff --git a/src/chacha_cuda.rs b/src/chacha_cuda.rs index 32b6dde342..f9dffda091 100644 --- a/src/chacha_cuda.rs +++ b/src/chacha_cuda.rs @@ -7,7 +7,7 @@ use solana_sdk::hash::Hash; use std::io; use std::mem::size_of; -use crate::storage_stage::ENTRIES_PER_SLICE; +use crate::storage_stage::ENTRIES_PER_SEGMENT; // Encrypt a file with multiple starting IV states, determined by ivecs.len() // @@ -44,8 +44,11 @@ pub fn chacha_cbc_encrypt_file_many_keys( chacha_init_sha_state(int_sha_states.as_mut_ptr(), num_keys as u32); } loop { - match ledger_window.get_entries_bytes(entry, ENTRIES_PER_SLICE - total_entries, &mut buffer) - { + match ledger_window.get_entries_bytes( + entry, + ENTRIES_PER_SEGMENT - total_entries, + &mut buffer, + ) { Ok((num_entries, entry_len)) => { info!( "encrypting slice: {} num_entries: {} entry_len: {}", @@ -72,9 +75,9 @@ pub fn chacha_cbc_encrypt_file_many_keys( entry += num_entries; debug!( "total entries: {} entry: {} slice: {} entries_per_slice: {}", - total_entries, entry, slice, ENTRIES_PER_SLICE + total_entries, entry, slice, ENTRIES_PER_SEGMENT ); - if (entry - slice) >= ENTRIES_PER_SLICE { + if (entry - slice) >= ENTRIES_PER_SEGMENT { break; } } diff --git a/src/db_window.rs b/src/db_window.rs index 19f6629065..fa1bbd36ad 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -350,9 +350,10 @@ pub fn process_blob( // Check if we ran over the last wanted entry if consumed > max_ix { - let extra_unwanted_entries_len = consumed - (max_ix + 1); let consumed_entries_len = consumed_entries.len(); - consumed_entries.truncate(consumed_entries_len - extra_unwanted_entries_len as usize); + let extra_unwanted_entries_len = + cmp::min(consumed_entries_len, (consumed - (max_ix + 1)) as usize); + consumed_entries.truncate(consumed_entries_len - extra_unwanted_entries_len); done.store(true, Ordering::Relaxed); } } diff --git a/src/fullnode.rs b/src/fullnode.rs index b237e9c398..88e0bcfdf1 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -295,7 +295,7 @@ impl Fullnode { &bank, entry_height, *last_entry_id, - cluster_info.clone(), + &cluster_info, sockets, Some(ledger_path), db_ledger.clone(), @@ -459,7 +459,7 @@ impl Fullnode { &self.bank, entry_height, last_entry_id, - self.cluster_info.clone(), + &self.cluster_info, sockets, Some(&self.ledger_path), self.db_ledger.clone(), diff --git a/src/replicator.rs b/src/replicator.rs index 8a1920dce6..4f71fa5bb9 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -10,6 +10,7 @@ use crate::ledger::LEDGER_DATA_FILE; use crate::result::Result; use crate::rpc_request::{RpcClient, RpcRequest}; use crate::service::Service; +use crate::storage_stage::ENTRIES_PER_SEGMENT; use crate::store_ledger_stage::StoreLedgerStage; use crate::streamer::BlobReceiver; use crate::thin_client::retry_get_balance; @@ -93,12 +94,9 @@ impl Replicator { let exit = Arc::new(AtomicBool::new(false)); let done = Arc::new(AtomicBool::new(false)); - let entry_height = 0; - let max_entry_height = 1; - info!("Replicator: id: {}", keypair.pubkey()); info!("Creating cluster info...."); - let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(node.info))); + let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(node.info.clone()))); let leader_pubkey = leader_info.id; { @@ -141,20 +139,40 @@ impl Replicator { info!("Got leader: {:?}", leader); - let rpc_client = { - let cluster_info = cluster_info.read().unwrap(); - let rpc_peers = cluster_info.rpc_peers(); - info!("rpc peers: {:?}", rpc_peers); - let node_idx = thread_rng().gen_range(0, rpc_peers.len()); - RpcClient::new_from_socket(rpc_peers[node_idx].rpc) - }; + let mut storage_last_id; + let mut storage_entry_height; + loop { + let rpc_client = { + let cluster_info = cluster_info.read().unwrap(); + let rpc_peers = cluster_info.rpc_peers(); + info!("rpc peers: {:?}", rpc_peers); + let node_idx = thread_rng().gen_range(0, rpc_peers.len()); + RpcClient::new_from_socket(rpc_peers[node_idx].rpc) + }; - let storage_last_id = RpcRequest::GetStorageMiningLastId - .make_rpc_request(&rpc_client, 2, None) - .expect("rpc request") - .to_string(); - let _signature = keypair.sign(storage_last_id.as_ref()); - // TODO: use this signature to pick the key and block + storage_last_id = RpcRequest::GetStorageMiningLastId + .make_rpc_request(&rpc_client, 2, None) + .expect("rpc request") + .to_string(); + storage_entry_height = RpcRequest::GetStorageMiningEntryHeight + .make_rpc_request(&rpc_client, 2, None) + .expect("rpc request") + .as_u64() + .unwrap(); + if storage_entry_height != 0 { + break; + } + } + + let signature = keypair.sign(storage_last_id.as_ref()); + let signature = signature.as_ref(); + let block_index = u64::from(signature[0]) + | (u64::from(signature[1]) << 8) + | (u64::from(signature[1]) << 16) + | (u64::from(signature[2]) << 24); + let mut entry_height = block_index * ENTRIES_PER_SEGMENT; + entry_height %= storage_entry_height; + let max_entry_height = entry_height + ENTRIES_PER_SEGMENT; let repair_socket = Arc::new(node.sockets.repair); let mut blob_sockets: Vec> = @@ -187,6 +205,13 @@ impl Replicator { sleep(Duration::from_millis(100)); } + let mut node_info = node.info.clone(); + node_info.tvu = "0.0.0.0:0".parse().unwrap(); + { + let mut cluster_info_w = cluster_info.write().unwrap(); + cluster_info_w.insert_info(node_info); + } + let mut client = mk_client(&leader); if retry_get_balance(&mut client, &keypair.pubkey(), None).is_none() { @@ -236,7 +261,8 @@ impl Replicator { Ok(hash) => { let last_id = client.get_last_id(); info!("sampled hash: {}", hash); - let tx = Transaction::storage_new_mining_proof(&keypair, hash, last_id); + let tx = + Transaction::storage_new_mining_proof(&keypair, hash, last_id, entry_height); client.transfer_signed(&tx).expect("transfer didn't work!"); } Err(e) => info!("Error occurred while sampling: {:?}", e), diff --git a/src/rpc.rs b/src/rpc.rs index 72de3963ec..74a966bd73 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -155,6 +155,12 @@ build_rpc_trait! { #[rpc(meta, name = "getStorageMiningLastId")] fn get_storage_mining_last_id(&self, Self::Metadata) -> Result; + + #[rpc(meta, name = "getStorageMiningEntryHeight")] + fn get_storage_mining_entry_height(&self, Self::Metadata) -> Result; + + #[rpc(meta, name = "getStoragePubkeysForEntryHeight")] + fn get_storage_pubkeys_for_entry_height(&self, Self::Metadata, u64) -> Result>; } } @@ -284,6 +290,17 @@ impl RpcSol for RpcSolImpl { fn get_storage_mining_last_id(&self, meta: Self::Metadata) -> Result { meta.request_processor.get_storage_mining_last_id() } + fn get_storage_mining_entry_height(&self, meta: Self::Metadata) -> Result { + meta.request_processor.get_storage_mining_entry_height() + } + fn get_storage_pubkeys_for_entry_height( + &self, + meta: Self::Metadata, + entry_height: u64, + ) -> Result> { + meta.request_processor + .get_storage_pubkeys_for_entry_height(entry_height) + } } #[derive(Clone)] pub struct JsonRpcRequestProcessor { @@ -322,6 +339,13 @@ impl JsonRpcRequestProcessor { let id = self.bank.storage_state.get_last_id(); Ok(bs58::encode(id).into_string()) } + fn get_storage_mining_entry_height(&self) -> Result { + let entry_height = self.bank.storage_state.get_entry_height(); + Ok(entry_height) + } + fn get_storage_pubkeys_for_entry_height(&self, entry_height: u64) -> Result> { + Ok(self.bank.get_pubkeys_for_entry_height(entry_height)) + } } fn get_leader_addr(cluster_info: &Arc>) -> Result { diff --git a/src/rpc_request.rs b/src/rpc_request.rs index 0621c6200e..e5e7b375b2 100644 --- a/src/rpc_request.rs +++ b/src/rpc_request.rs @@ -54,6 +54,8 @@ pub enum RpcRequest { SignVote, DeregisterNode, GetStorageMiningLastId, + GetStorageMiningEntryHeight, + GetStoragePubkeysForEntryHeight, } impl RpcRequest { @@ -97,6 +99,8 @@ impl RpcRequest { RpcRequest::SignVote => "signVote", RpcRequest::DeregisterNode => "deregisterNode", RpcRequest::GetStorageMiningLastId => "getStorageMiningLastId", + RpcRequest::GetStorageMiningEntryHeight => "getStorageMiningEntryHeight", + RpcRequest::GetStoragePubkeysForEntryHeight => "getStoragePubkeysForEntryHeight", }; let mut request = json!({ "jsonrpc": jsonrpc, diff --git a/src/storage_stage.rs b/src/storage_stage.rs index cd1b2d8109..ceafed7fa9 100644 --- a/src/storage_stage.rs +++ b/src/storage_stage.rs @@ -7,12 +7,17 @@ use crate::chacha_cuda::chacha_cbc_encrypt_file_many_keys; use crate::entry::EntryReceiver; use crate::result::{Error, Result}; use crate::service::Service; +use bincode::deserialize; use rand::{Rng, SeedableRng}; use rand_chacha::ChaChaRng; use solana_sdk::hash::Hash; +use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Keypair; use solana_sdk::signature::Signature; +use solana_sdk::storage_program; +use solana_sdk::storage_program::StorageProgram; use solana_sdk::vote_program; +use std::collections::HashSet; use std::mem::size_of; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::RecvTimeoutError; @@ -24,12 +29,20 @@ use std::time::Duration; // Vec of [ledger blocks] x [keys] type StorageResults = Vec; type StorageKeys = Vec; +type ReplicatorMap = Vec>; + +#[derive(Default)] +pub struct StorageStateInner { + storage_results: StorageResults, + storage_keys: StorageKeys, + replicator_map: ReplicatorMap, + storage_last_id: Hash, + entry_height: u64, +} #[derive(Default)] pub struct StorageState { - storage_results: Arc>, - storage_keys: Arc>, - last_id: Hash, + state: Arc>, } pub struct StorageStage { @@ -46,9 +59,13 @@ const NUM_HASHES_FOR_STORAGE_ROTATE: u64 = 1024; // TODO: some way to dynamically size NUM_IDENTITIES const NUM_IDENTITIES: usize = 1024; const NUM_SAMPLES: usize = 4; -pub const ENTRIES_PER_SLICE: u64 = 16; +pub const ENTRIES_PER_SEGMENT: u64 = 16; const KEY_SIZE: usize = 64; +pub fn get_segment_from_entry(entry_height: u64) -> u64 { + entry_height / ENTRIES_PER_SEGMENT +} + fn get_identity_index_from_signature(key: &Signature) -> usize { let rkey = key.as_ref(); let mut res: usize = (rkey[0] as usize) @@ -61,28 +78,55 @@ fn get_identity_index_from_signature(key: &Signature) -> usize { impl StorageState { pub fn new() -> Self { - let storage_keys = Arc::new(RwLock::new(vec![0u8; KEY_SIZE * NUM_IDENTITIES])); - let storage_results = Arc::new(RwLock::new(vec![Hash::default(); NUM_IDENTITIES])); + let storage_keys = vec![0u8; KEY_SIZE * NUM_IDENTITIES]; + let storage_results = vec![Hash::default(); NUM_IDENTITIES]; + let replicator_map = vec![]; - StorageState { + let state = StorageStateInner { storage_keys, storage_results, - last_id: Hash::default(), + replicator_map, + entry_height: 0, + storage_last_id: Hash::default(), + }; + + StorageState { + state: Arc::new(RwLock::new(state)), } } pub fn get_mining_key(&self, key: &Signature) -> Vec { let idx = get_identity_index_from_signature(key); - self.storage_keys.read().unwrap()[idx..idx + KEY_SIZE].to_vec() + self.state.read().unwrap().storage_keys[idx..idx + KEY_SIZE].to_vec() } pub fn get_mining_result(&self, key: &Signature) -> Hash { let idx = get_identity_index_from_signature(key); - self.storage_results.read().unwrap()[idx] + self.state.read().unwrap().storage_results[idx] } pub fn get_last_id(&self) -> Hash { - self.last_id + self.state.read().unwrap().storage_last_id + } + + pub fn get_entry_height(&self) -> u64 { + self.state.read().unwrap().entry_height + } + + pub fn get_pubkeys_for_entry_height(&self, entry_height: u64) -> Vec { + // TODO: keep track of age? + const MAX_PUBKEYS_TO_RETURN: usize = 5; + let index = (entry_height / ENTRIES_PER_SEGMENT) as usize; + let replicator_map = &self.state.read().unwrap().replicator_map; + if index < replicator_map.len() { + replicator_map[index] + .iter() + .cloned() + .take(MAX_PUBKEYS_TO_RETURN) + .collect::>() + } else { + vec![] + } } } @@ -95,8 +139,9 @@ impl StorageStage { exit: Arc, entry_height: u64, ) -> Self { - let storage_keys_ = storage_state.storage_keys.clone(); - let storage_results_ = storage_state.storage_results.clone(); + debug!("storage_stage::new: entry_height: {}", entry_height); + storage_state.state.write().unwrap().entry_height = entry_height; + let storage_state_inner = storage_state.state.clone(); let ledger_path = ledger_path.map(String::from); let t_storage_mining_verifier = Builder::new() .name("solana-storage-mining-verify-stage".to_string()) @@ -109,8 +154,7 @@ impl StorageStage { if let Some(ref ledger_path_str) = ledger_path { if let Err(e) = Self::process_entries( &keypair, - &storage_keys_, - &storage_results_, + &storage_state_inner, &storage_entry_receiver, ledger_path_str, &mut poh_height, @@ -137,8 +181,7 @@ impl StorageStage { } pub fn process_entry_crossing( - _storage_results: &Arc>, - _storage_keys: &Arc>, + state: &Arc>, keypair: &Arc, _ledger_path: &str, entry_id: Hash, @@ -151,18 +194,20 @@ impl StorageStage { let mut rng = ChaChaRng::from_seed(seed); + state.write().unwrap().entry_height = entry_height; + // Regenerate the answers - let num_slices = (entry_height / ENTRIES_PER_SLICE) as usize; - if num_slices == 0 { - info!("Ledger has 0 slices!"); + let num_segments = (entry_height / ENTRIES_PER_SEGMENT) as usize; + if num_segments == 0 { + info!("Ledger has 0 segments!"); return Ok(()); } - // TODO: what if the validator does not have this slice - let slice = signature.as_ref()[0] as usize % num_slices; + // TODO: what if the validator does not have this segment + let segment = signature.as_ref()[0] as usize % num_segments; debug!( - "storage verifying: slice: {} identities: {}", - slice, NUM_IDENTITIES, + "storage verifying: segment: {} identities: {}", + segment, NUM_IDENTITIES, ); let mut samples = vec![]; @@ -175,23 +220,22 @@ impl StorageStage { // process storage mining results. #[cfg(all(feature = "chacha", feature = "cuda"))] { - let mut storage_results = _storage_results.write().unwrap(); - // Lock the keys, since this is the IV memory, // it will be updated in-place by the encryption. // Should be overwritten by the vote signatures which replace the // key values by the time it runs again. - let mut storage_keys = _storage_keys.write().unwrap(); + + let mut statew = state.write().unwrap(); match chacha_cbc_encrypt_file_many_keys( _ledger_path, - slice as u64, - &mut storage_keys, + segment as u64, + &mut statew.storage_keys, &samples, ) { Ok(hashes) => { - debug!("Success! encrypted ledger slice: {}", slice); - storage_results.copy_from_slice(&hashes); + debug!("Success! encrypted ledger segment: {}", segment); + statew.storage_results.copy_from_slice(&hashes); } Err(e) => { info!("error encrypting file: {:?}", e); @@ -206,8 +250,7 @@ impl StorageStage { pub fn process_entries( keypair: &Arc, - storage_keys: &Arc>, - storage_results: &Arc>, + storage_state: &Arc>, entry_receiver: &EntryReceiver, ledger_path: &str, poh_height: &mut u64, @@ -221,17 +264,45 @@ impl StorageStage { // Go through the transactions, find votes, and use them to update // the storage_keys with their signatures. for tx in entry.transactions { - for program_id in tx.program_ids { + for (i, program_id) in tx.program_ids.iter().enumerate() { if vote_program::check_id(&program_id) { debug!( "generating storage_keys from votes current_key_idx: {}", *current_key_idx ); - let mut storage_keys = storage_keys.write().unwrap(); + let storage_keys = &mut storage_state.write().unwrap().storage_keys; storage_keys[*current_key_idx..*current_key_idx + size_of::()] .copy_from_slice(tx.signatures[0].as_ref()); *current_key_idx += size_of::(); *current_key_idx %= storage_keys.len(); + } else if storage_program::check_id(&program_id) { + match deserialize(&tx.instructions[i].userdata) { + Ok(StorageProgram::SubmitMiningProof { + entry_height: proof_entry_height, + .. + }) => { + if proof_entry_height < *entry_height { + let mut statew = storage_state.write().unwrap(); + let max_segment_index = + (*entry_height / ENTRIES_PER_SEGMENT) as usize; + if statew.replicator_map.len() <= max_segment_index { + statew + .replicator_map + .resize(max_segment_index, HashSet::new()); + } + let proof_segment_index = + (proof_entry_height / ENTRIES_PER_SEGMENT) as usize; + if proof_segment_index < statew.replicator_map.len() { + statew.replicator_map[proof_segment_index] + .insert(tx.account_keys[0]); + } + } + debug!("storage proof: entry_height: {}", entry_height); + } + Err(e) => { + info!("error: {:?}", e); + } + } } } } @@ -241,8 +312,7 @@ impl StorageStage { *poh_height, entry_height, entry.num_hashes ); Self::process_entry_crossing( - &storage_results, - &storage_keys, + &storage_state, &keypair, &ledger_path, entry.id, @@ -408,9 +478,9 @@ mod tests { let mut reference_keys; { - let keys = storage_state.storage_keys.read().unwrap(); + let keys = &storage_state.state.read().unwrap().storage_keys; reference_keys = vec![0; keys.len()]; - reference_keys.copy_from_slice(&keys); + reference_keys.copy_from_slice(keys); } let mut vote_txs: Vec = Vec::new(); let vote = Vote { @@ -424,7 +494,7 @@ mod tests { for _ in 0..5 { { - let keys = storage_state.storage_keys.read().unwrap(); + let keys = &storage_state.state.read().unwrap().storage_keys; if keys[..] != *reference_keys.as_slice() { break; } @@ -438,7 +508,7 @@ mod tests { storage_stage.join().unwrap(); { - let keys = storage_state.storage_keys.read().unwrap(); + let keys = &storage_state.state.read().unwrap().storage_keys; assert_ne!(keys[..], *reference_keys); } diff --git a/src/tvu.rs b/src/tvu.rs index 05a8a324a5..8fadee47ed 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -67,7 +67,7 @@ impl Tvu { bank: &Arc, entry_height: u64, last_entry_id: Hash, - cluster_info: Arc>, + cluster_info: &Arc>, sockets: Sockets, ledger_path: Option<&str>, db_ledger: Arc>, @@ -110,7 +110,7 @@ impl Tvu { keypair.clone(), vote_account_keypair, bank.clone(), - cluster_info, + cluster_info.clone(), blob_window_receiver, exit.clone(), entry_height, @@ -285,7 +285,7 @@ pub mod tests { &bank, 0, cur_hash, - cref1, + &cref1, { Sockets { repair: target1.sockets.repair, diff --git a/tests/replicator.rs b/tests/replicator.rs index 51a79124e7..81b336f0f1 100644 --- a/tests/replicator.rs +++ b/tests/replicator.rs @@ -1,12 +1,16 @@ #[macro_use] extern crate log; +#[cfg(feature = "chacha")] +#[macro_use] +extern crate serde_json; + use solana::client::mk_client; use solana::cluster_info::{Node, NodeInfo}; use solana::db_ledger::DbLedger; use solana::fullnode::Fullnode; use solana::leader_scheduler::LeaderScheduler; -use solana::ledger::{create_tmp_genesis, get_tmp_ledger_path, read_ledger}; +use solana::ledger::{create_tmp_genesis, get_tmp_ledger_path, read_ledger, tmp_copy_ledger}; use solana::replicator::Replicator; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction::SystemTransaction; @@ -30,14 +34,34 @@ fn test_replicator_startup() { let leader_ledger_path = "replicator_test_leader_ledger"; let (mint, leader_ledger_path) = create_tmp_genesis(leader_ledger_path, 100, leader_info.id, 1); + let validator_ledger_path = + tmp_copy_ledger(&leader_ledger_path, "replicator_test_validator_ledger"); + { let leader = Fullnode::new( leader_node, &leader_ledger_path, leader_keypair, - vote_account_keypair, + vote_account_keypair.clone(), None, false, + LeaderScheduler::from_bootstrap_leader(leader_info.id.clone()), + None, + ); + + let validator_keypair = Arc::new(Keypair::new()); + let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey()); + + #[cfg(feature = "chacha")] + let validator_node_info = validator_node.info.clone(); + + let validator = Fullnode::new( + validator_node, + &validator_ledger_path, + validator_keypair, + vote_account_keypair, + Some(leader_info.gossip), + false, LeaderScheduler::from_bootstrap_leader(leader_info.id), None, ); @@ -53,6 +77,7 @@ fn test_replicator_startup() { let replicator_keypair = Keypair::new(); + // Give the replicator some tokens let amount = 1; let mut tx = Transaction::system_new( &mint.keypair(), @@ -77,6 +102,7 @@ fn test_replicator_startup() { ) .unwrap(); + // Poll the ledger dir to see that some is downloaded let mut num_entries = 0; for _ in 0..60 { match read_ledger(replicator_ledger_path, true) { @@ -94,13 +120,43 @@ fn test_replicator_startup() { } } sleep(Duration::from_millis(300)); + + // Do a transfer to make sure new entries are created which + // stimulates the repair process let last_id = leader_client.get_last_id(); leader_client .transfer(1, &mint.keypair(), bob.pubkey(), &last_id) .unwrap(); } + + // The replicator will not submit storage proofs if + // chacha is not enabled + #[cfg(feature = "chacha")] + { + use solana::rpc_request::{RpcClient, RpcRequest}; + + let rpc_client = RpcClient::new_from_socket(validator_node_info.rpc); + let mut non_zero_pubkeys = false; + for _ in 0..30 { + let params = json!([0]); + let pubkeys = RpcRequest::GetStoragePubkeysForEntryHeight + .make_rpc_request(&rpc_client, 1, Some(params)) + .unwrap(); + info!("pubkeys: {:?}", pubkeys); + if pubkeys.as_array().unwrap().len() != 0 { + non_zero_pubkeys = true; + break; + } + sleep(Duration::from_secs(1)); + } + assert!(non_zero_pubkeys); + } + + // Check that some ledger was downloaded assert!(num_entries > 0); + replicator.close(); + validator.exit(); leader.close().expect("Expected successful node closure"); }