diff --git a/programs/native/storage/src/lib.rs b/programs/native/storage/src/lib.rs index a449a2d17c..d633bc67a3 100644 --- a/programs/native/storage/src/lib.rs +++ b/programs/native/storage/src/lib.rs @@ -3,12 +3,27 @@ //! and give reward for good proofs. use log::*; +extern crate solana_sdk; + use solana_sdk::account::KeyedAccount; use solana_sdk::native_program::ProgramError; use solana_sdk::pubkey::Pubkey; use solana_sdk::solana_entrypoint; use solana_sdk::storage_program::*; +pub const TOTAL_VALIDATOR_REWARDS: u64 = 1000; +pub const TOTAL_REPLICATOR_REWARDS: u64 = 1000; + +fn count_valid_proofs(proofs: &[ProofStatus]) -> u64 { + let mut num = 0; + for proof in proofs { + if let ProofStatus::Valid = proof { + num += 1; + } + } + num +} + solana_entrypoint!(entrypoint); fn entrypoint( _program_id: &Pubkey, @@ -18,24 +33,150 @@ fn entrypoint( ) -> Result<(), ProgramError> { solana_logger::setup(); + if keyed_accounts.len() != 2 { + // keyed_accounts[1] should be the main storage key + // to access its userdata + Err(ProgramError::InvalidArgument)?; + } + // accounts_keys[0] must be signed if keyed_accounts[0].signer_key().is_none() { info!("account[0] is unsigned"); + Err(ProgramError::GenericError)?; + } + + if *keyed_accounts[1].unsigned_key() != system_id() { + info!( + "invalid account id owner: {:?} system_id: {:?}", + keyed_accounts[1].unsigned_key(), + system_id() + ); Err(ProgramError::InvalidArgument)?; } if let Ok(syscall) = bincode::deserialize(data) { + let mut storage_account_state = if let Ok(storage_account_state) = + bincode::deserialize(&keyed_accounts[1].account.userdata) + { + storage_account_state + } else { + StorageProgramState::default() + }; + + debug!( + "deserialized state height: {}", + storage_account_state.entry_height + ); match syscall { StorageProgram::SubmitMiningProof { sha_state, entry_height, + signature, } => { - info!( + let segment_index = get_segment_from_entry(entry_height); + let current_segment_index = + get_segment_from_entry(storage_account_state.entry_height); + if segment_index >= current_segment_index { + return Err(ProgramError::InvalidArgument); + } + + debug!( "Mining proof submitted with state {:?} entry_height: {}", sha_state, entry_height ); + + let proof_info = ProofInfo { + id: *keyed_accounts[0].signer_key().unwrap(), + sha_state, + signature, + }; + storage_account_state.proofs[segment_index].push(proof_info); + } + StorageProgram::AdvertiseStorageLastId { id, entry_height } => { + let original_segments = storage_account_state.entry_height / ENTRIES_PER_SEGMENT; + let segments = entry_height / ENTRIES_PER_SEGMENT; + debug!( + "advertise new last id segments: {} orig: {}", + segments, original_segments + ); + if segments <= original_segments { + return Err(ProgramError::InvalidArgument); + } + + storage_account_state.entry_height = entry_height; + storage_account_state.id = id; + + // move the proofs to previous_proofs + storage_account_state.previous_proofs = storage_account_state.proofs.clone(); + storage_account_state.proofs.clear(); + storage_account_state + .proofs + .resize(segments as usize, Vec::new()); + + // move lockout_validations to reward_validations + storage_account_state.reward_validations = + storage_account_state.lockout_validations.clone(); + storage_account_state.lockout_validations.clear(); + storage_account_state + .lockout_validations + .resize(segments as usize, Vec::new()); + } + StorageProgram::ProofValidation { + entry_height, + proof_mask, + } => { + if entry_height >= storage_account_state.entry_height { + return Err(ProgramError::InvalidArgument); + } + + let segment_index = get_segment_from_entry(entry_height); + if storage_account_state.previous_proofs[segment_index].len() != proof_mask.len() { + return Err(ProgramError::InvalidArgument); + } + + // TODO: Check that each proof mask matches the signature + /*for (i, entry) in proof_mask.iter().enumerate() { + if storage_account_state.previous_proofs[segment_index][i] != signature.as_ref[0] { + return Err(ProgramError::InvalidArgument); + } + }*/ + + let info = ValidationInfo { + id: *keyed_accounts[0].signer_key().unwrap(), + proof_mask, + }; + storage_account_state.lockout_validations[segment_index].push(info); + } + StorageProgram::ClaimStorageReward { entry_height } => { + let claims_index = get_segment_from_entry(entry_height); + let account_key = keyed_accounts[0].signer_key().unwrap(); + let mut num_validations = 0; + let mut total_validations = 0; + for validation in &storage_account_state.reward_validations[claims_index] { + if *account_key == validation.id { + num_validations += count_valid_proofs(&validation.proof_mask); + } else { + total_validations += count_valid_proofs(&validation.proof_mask); + } + } + total_validations += num_validations; + if total_validations > 0 { + keyed_accounts[0].account.tokens += + (TOTAL_VALIDATOR_REWARDS * num_validations) / total_validations; + } } } + + keyed_accounts[1].account.userdata.clear(); + if bincode::serialize_into( + &mut keyed_accounts[1].account.userdata, + &storage_account_state, + ) + .is_err() + { + return Err(ProgramError::UserdataTooSmall); + } + Ok(()) } else { info!("Invalid instruction userdata: {:?}", data); @@ -46,8 +187,41 @@ fn entrypoint( #[cfg(test)] mod test { use super::*; - use solana_sdk::account::create_keyed_accounts; - use solana_sdk::signature::{Keypair, KeypairUtil}; + use solana_sdk::account::{create_keyed_accounts, Account}; + use solana_sdk::hash::Hash; + use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; + use solana_sdk::storage_program::ProofStatus; + use solana_sdk::storage_program::StorageTransaction; + use solana_sdk::transaction::{Instruction, Transaction}; + + fn test_transaction( + tx: &Transaction, + program_accounts: &mut [Account], + ) -> Result<(), ProgramError> { + assert_eq!(tx.instructions.len(), 1); + let Instruction { + ref accounts, + ref userdata, + .. + } = tx.instructions[0]; + + info!("accounts: {:?}", accounts); + + let mut keyed_accounts: Vec<_> = accounts + .iter() + .map(|&index| { + let index = index as usize; + let key = &tx.account_keys[index]; + (key, index < tx.signatures.len()) + }) + .zip(program_accounts.iter_mut()) + .map(|((key, is_signer), account)| KeyedAccount::new(key, is_signer, account)) + .collect(); + + let ret = entrypoint(&id(), &mut keyed_accounts, &userdata, 42); + info!("ret: {:?}", ret); + ret + } #[test] fn test_storage_tx() { @@ -56,4 +230,123 @@ mod test { let mut keyed_accounts = create_keyed_accounts(&mut accounts); assert!(entrypoint(&id(), &mut keyed_accounts, &[], 42).is_err()); } + + #[test] + fn test_invalid_accounts_len() { + let keypair = Keypair::new(); + let mut accounts = [Default::default()]; + + let tx = Transaction::storage_new_mining_proof( + &keypair, + Hash::default(), + Hash::default(), + 0, + Signature::default(), + ); + assert!(test_transaction(&tx, &mut accounts).is_err()); + + let mut accounts = [Default::default(), Default::default(), Default::default()]; + + assert!(test_transaction(&tx, &mut accounts).is_err()); + } + + #[test] + fn test_submit_mining_invalid_entry_height() { + solana_logger::setup(); + let keypair = Keypair::new(); + let mut accounts = [Default::default(), Default::default()]; + + let tx = Transaction::storage_new_mining_proof( + &keypair, + Hash::default(), + Hash::default(), + 0, + Signature::default(), + ); + + // Haven't seen a transaction to roll over the epoch, so this should fail + assert!(test_transaction(&tx, &mut accounts).is_err()); + } + + #[test] + fn test_submit_mining_ok() { + solana_logger::setup(); + let keypair = Keypair::new(); + let mut accounts = [Default::default(), Default::default()]; + + let tx = Transaction::storage_new_advertise_last_id( + &keypair, + Hash::default(), + Hash::default(), + ENTRIES_PER_SEGMENT, + ); + + assert!(test_transaction(&tx, &mut accounts).is_ok()); + + let tx = Transaction::storage_new_mining_proof( + &keypair, + Hash::default(), + Hash::default(), + 0, + Signature::default(), + ); + + assert!(test_transaction(&tx, &mut accounts).is_ok()); + } + + #[test] + fn test_validate_mining() { + solana_logger::setup(); + let keypair = Keypair::new(); + let mut accounts = [Default::default(), Default::default()]; + + let entry_height = 0; + + let tx = Transaction::storage_new_advertise_last_id( + &keypair, + Hash::default(), + Hash::default(), + ENTRIES_PER_SEGMENT, + ); + + assert!(test_transaction(&tx, &mut accounts).is_ok()); + + let tx = Transaction::storage_new_mining_proof( + &keypair, + Hash::default(), + Hash::default(), + entry_height, + Signature::default(), + ); + assert!(test_transaction(&tx, &mut accounts).is_ok()); + + let tx = Transaction::storage_new_advertise_last_id( + &keypair, + Hash::default(), + Hash::default(), + ENTRIES_PER_SEGMENT * 2, + ); + assert!(test_transaction(&tx, &mut accounts).is_ok()); + + let tx = Transaction::storage_new_proof_validation( + &keypair, + Hash::default(), + entry_height, + vec![ProofStatus::Valid], + ); + assert!(test_transaction(&tx, &mut accounts).is_ok()); + + let tx = Transaction::storage_new_advertise_last_id( + &keypair, + Hash::default(), + Hash::default(), + ENTRIES_PER_SEGMENT * 3, + ); + assert!(test_transaction(&tx, &mut accounts).is_ok()); + + let tx = Transaction::storage_new_reward_claim(&keypair, Hash::default(), entry_height); + assert!(test_transaction(&tx, &mut accounts).is_ok()); + + assert!(accounts[0].tokens == TOTAL_VALIDATOR_REWARDS); + } } diff --git a/sdk/src/storage_program.rs b/sdk/src/storage_program.rs index 69a09af1e3..fba4d86170 100644 --- a/sdk/src/storage_program.rs +++ b/sdk/src/storage_program.rs @@ -1,11 +1,65 @@ use crate::hash::Hash; use crate::pubkey::Pubkey; -use crate::signature::{Keypair, KeypairUtil}; +use crate::signature::Keypair; +use crate::signature::Signature; use crate::transaction::Transaction; +pub const ENTRIES_PER_SEGMENT: u64 = 16; + +pub fn get_segment_from_entry(entry_height: u64) -> usize { + (entry_height / ENTRIES_PER_SEGMENT) as usize +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub enum ProofStatus { + Valid, + NotValid, + Skipped, +} + +#[derive(Default, Debug, Serialize, Deserialize, Clone)] +pub struct ProofInfo { + pub id: Pubkey, + pub signature: Signature, + pub sha_state: Hash, +} + +#[derive(Default, Debug, Serialize, Deserialize, Clone)] +pub struct ValidationInfo { + pub id: Pubkey, + pub proof_mask: Vec, +} + +#[derive(Default, Debug, Serialize, Deserialize)] +pub struct StorageProgramState { + pub entry_height: u64, + pub id: Hash, + + pub proofs: Vec>, + pub previous_proofs: Vec>, + + pub lockout_validations: Vec>, + pub reward_validations: Vec>, +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub enum StorageProgram { - SubmitMiningProof { sha_state: Hash, entry_height: u64 }, + SubmitMiningProof { + sha_state: Hash, + entry_height: u64, + signature: Signature, + }, + AdvertiseStorageLastId { + id: Hash, + entry_height: u64, + }, + ClaimStorageReward { + entry_height: u64, + }, + ProofValidation { + entry_height: u64, + proof_mask: Vec, + }, } pub const STORAGE_PROGRAM_ID: [u8; 32] = [ @@ -13,6 +67,12 @@ pub const STORAGE_PROGRAM_ID: [u8; 32] = [ 0, ]; +// TODO check this is available +pub const STORAGE_SYSTEM_ACCOUNT_ID: [u8; 32] = [ + 133, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, +]; + pub fn check_id(program_id: &Pubkey) -> bool { program_id.as_ref() == STORAGE_PROGRAM_ID } @@ -21,13 +81,34 @@ pub fn id() -> Pubkey { Pubkey::new(&STORAGE_PROGRAM_ID) } +pub fn system_id() -> Pubkey { + Pubkey::new(&STORAGE_SYSTEM_ACCOUNT_ID) +} + pub trait StorageTransaction { fn storage_new_mining_proof( from_keypair: &Keypair, sha_state: Hash, last_id: Hash, entry_height: u64, + signature: Signature, ) -> Self; + + fn storage_new_advertise_last_id( + from_keypair: &Keypair, + storage_last_id: Hash, + last_id: Hash, + entry_height: u64, + ) -> Self; + + fn storage_new_proof_validation( + from_keypair: &Keypair, + last_id: Hash, + entry_height: u64, + proof_mask: Vec, + ) -> Self; + + fn storage_new_reward_claim(from_keypair: &Keypair, last_id: Hash, entry_height: u64) -> Self; } impl StorageTransaction for Transaction { @@ -36,14 +117,68 @@ impl StorageTransaction for Transaction { sha_state: Hash, last_id: Hash, entry_height: u64, + signature: Signature, ) -> Self { let program = StorageProgram::SubmitMiningProof { sha_state, entry_height, + signature, }; Transaction::new( from_keypair, - &[from_keypair.pubkey()], + &[Pubkey::new(&STORAGE_SYSTEM_ACCOUNT_ID)], + id(), + &program, + last_id, + 0, + ) + } + + fn storage_new_advertise_last_id( + from_keypair: &Keypair, + storage_id: Hash, + last_id: Hash, + entry_height: u64, + ) -> Self { + let program = StorageProgram::AdvertiseStorageLastId { + id: storage_id, + entry_height, + }; + Transaction::new( + from_keypair, + &[Pubkey::new(&STORAGE_SYSTEM_ACCOUNT_ID)], + id(), + &program, + last_id, + 0, + ) + } + + fn storage_new_proof_validation( + from_keypair: &Keypair, + last_id: Hash, + entry_height: u64, + proof_mask: Vec, + ) -> Self { + let program = StorageProgram::ProofValidation { + entry_height, + proof_mask, + }; + Transaction::new( + from_keypair, + &[Pubkey::new(&STORAGE_SYSTEM_ACCOUNT_ID)], + id(), + &program, + last_id, + 0, + ) + } + + fn storage_new_reward_claim(from_keypair: &Keypair, last_id: Hash, entry_height: u64) -> Self { + let program = StorageProgram::ClaimStorageReward { entry_height }; + Transaction::new( + from_keypair, + &[Pubkey::new(&STORAGE_SYSTEM_ACCOUNT_ID)], id(), &program, last_id, diff --git a/src/bank.rs b/src/bank.rs index 783e7ade54..aa0a127f54 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -284,6 +284,33 @@ impl Bank { .get_pubkeys_for_entry_height(entry_height) } + pub fn get_storage_entry_height(&self) -> u64 { + match self.get_account(&storage_program::system_id()) { + Some(storage_system_account) => { + let state = deserialize(&storage_system_account.userdata); + if let Ok(state) = state { + let state: storage_program::StorageProgramState = state; + return state.entry_height; + } + } + None => { + info!("error in reading entry_height"); + } + } + 0 + } + + pub fn get_storage_last_id(&self) -> Hash { + if let Some(storage_system_account) = self.get_account(&storage_program::system_id()) { + let state = deserialize(&storage_system_account.userdata); + if let Ok(state) = state { + let state: storage_program::StorageProgramState = state; + return state.id; + } + } + Hash::default() + } + /// Forget all signatures. Useful for benchmarking. pub fn clear_signatures(&self) { self.last_ids.write().unwrap().clear_signatures(); @@ -1009,6 +1036,7 @@ mod tests { use solana_sdk::native_program::ProgramError; use solana_sdk::signature::Keypair; use solana_sdk::signature::KeypairUtil; + use solana_sdk::storage_program::{StorageTransaction, ENTRIES_PER_SEGMENT}; use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::transaction::Instruction; use std; @@ -1734,6 +1762,10 @@ mod tests { 132, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ]); + let storage_system = Pubkey::new(&[ + 133, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, + ]); assert_eq!(system_program::id(), system); assert_eq!(solana_native_loader::id(), native); @@ -1742,6 +1774,7 @@ mod tests { assert_eq!(storage_program::id(), storage); assert_eq!(token_program::id(), token); assert_eq!(vote_program::id(), vote); + assert_eq!(storage_program::system_id(), storage_system); } #[test] @@ -1755,6 +1788,7 @@ mod tests { storage_program::id(), token_program::id(), vote_program::id(), + storage_program::system_id(), ]; assert!(ids.into_iter().all(move |id| unique.insert(id))); } @@ -1929,4 +1963,67 @@ mod tests { bank.rollback(); } + #[test] + fn test_bank_storage() { + solana_logger::setup(); + let alice = Mint::new(1000); + let bank = Bank::new(&alice); + + let bob = Keypair::new(); + let jack = Keypair::new(); + let jill = Keypair::new(); + + let x = 42; + let last_id = hash(&[x]); + let x2 = x * 2; + let storage_last_id = hash(&[x2]); + + bank.register_tick(&last_id); + + bank.transfer(10, &alice.keypair(), jill.pubkey(), last_id) + .unwrap(); + + let tx = Transaction::system_create( + &jill, + storage_program::system_id(), + bank.last_id(), + 1, + 4096, + storage_program::id(), + 1, + ); + + assert!(bank.process_transaction(&tx).is_ok()); + + bank.transfer(10, &alice.keypair(), bob.pubkey(), last_id) + .unwrap(); + bank.transfer(10, &alice.keypair(), jack.pubkey(), last_id) + .unwrap(); + + let tx = Transaction::storage_new_advertise_last_id( + &bob, + storage_last_id, + last_id, + ENTRIES_PER_SEGMENT, + ); + + assert!(bank.process_transaction(&tx).is_ok()); + + let entry_height = 0; + + let tx = Transaction::storage_new_mining_proof( + &jack, + Hash::default(), + last_id, + entry_height, + Signature::default(), + ); + + assert!(bank.process_transaction(&tx).is_ok()); + + assert_eq!(bank.get_storage_entry_height(), ENTRIES_PER_SEGMENT); + assert_eq!(bank.get_storage_last_id(), storage_last_id); + assert_eq!(bank.get_pubkeys_for_entry_height(0), vec![]); + } + } diff --git a/src/chacha.rs b/src/chacha.rs index 0aa18a0e35..0e8f29ca62 100644 --- a/src/chacha.rs +++ b/src/chacha.rs @@ -35,26 +35,34 @@ pub fn chacha_cbc_encrypt_file( in_path: &Path, out_path: &Path, ivec: &mut [u8; CHACHA_BLOCK_SIZE], -) -> io::Result<()> { +) -> io::Result { let mut in_file = BufReader::new(File::open(in_path).expect("Can't open ledger data file")); let mut out_file = BufWriter::new(File::create(out_path).expect("Can't open ledger encrypted data file")); - let mut buffer = [0; 4 * 1024]; - let mut encrypted_buffer = [0; 4 * 1024]; + const BUFFER_SIZE: usize = 4 * 1024; + let mut buffer = [0; BUFFER_SIZE]; + let mut encrypted_buffer = [0; BUFFER_SIZE]; let key = [0; CHACHA_KEY_SIZE]; + let mut total_size = 0; - while let Ok(size) = in_file.read(&mut buffer) { + while let Ok(mut size) = in_file.read(&mut buffer) { debug!("read {} bytes", size); if size == 0 { break; } + if size < BUFFER_SIZE { + // We are on the last block, round to the nearest key_size + // boundary + size = (size + CHACHA_KEY_SIZE - 1) & !(CHACHA_KEY_SIZE - 1); + } + total_size += size; chacha_cbc_encrypt(&buffer[..size], &mut encrypted_buffer[..size], &key, ivec); if let Err(res) = out_file.write(&encrypted_buffer[..size]) { println!("Error writing file! {:?}", res); return Err(res); } } - Ok(()) + Ok(total_size) } #[cfg(test)] @@ -84,7 +92,10 @@ mod tests { let size = out_file.read_to_end(&mut buf).unwrap(); assert_eq!( buf[..size], - [66, 54, 56, 212, 142, 110, 105, 158, 116, 82, 120, 53] + [ + 66, 54, 56, 212, 142, 110, 105, 158, 116, 82, 120, 53, 199, 78, 76, 95, 204, 148, + 226, 94, 150, 182, 82, 197, 248, 146, 26, 24, 247, 117, 120, 197 + ] ); remove_file(in_path).unwrap(); remove_file(out_path).unwrap(); diff --git a/src/chacha_cuda.rs b/src/chacha_cuda.rs index 8bfcbcfe42..2ac65ad298 100644 --- a/src/chacha_cuda.rs +++ b/src/chacha_cuda.rs @@ -7,7 +7,7 @@ use solana_sdk::hash::Hash; use std::io; use std::mem::size_of; -use crate::storage_stage::ENTRIES_PER_SEGMENT; +use solana_sdk::storage_program::ENTRIES_PER_SEGMENT; // Encrypt a file with multiple starting IV states, determined by ivecs.len() // @@ -15,7 +15,7 @@ use crate::storage_stage::ENTRIES_PER_SEGMENT; // and return the vec of sha states pub fn chacha_cbc_encrypt_file_many_keys( in_path: &str, - slice: u64, + segment: u64, ivecs: &mut [u8], samples: &[u64], ) -> io::Result> { @@ -36,7 +36,7 @@ pub fn chacha_cbc_encrypt_file_many_keys( let mut sha_states = vec![0; num_keys * size_of::()]; let mut int_sha_states = vec![0; num_keys * 112]; let keys: Vec = vec![0; num_keys * CHACHA_KEY_SIZE]; // keys not used ATM, uniqueness comes from IV - let mut entry = slice; + let mut entry = segment; let mut total_entries = 0; let mut total_entry_len = 0; let mut time: f32 = 0.0; @@ -51,8 +51,8 @@ pub fn chacha_cbc_encrypt_file_many_keys( ) { Ok((num_entries, entry_len)) => { info!( - "encrypting slice: {} num_entries: {} entry_len: {}", - slice, num_entries, entry_len + "encrypting segment: {} num_entries: {} entry_len: {}", + segment, num_entries, entry_len ); let entry_len_usz = entry_len as usize; unsafe { @@ -74,10 +74,10 @@ pub fn chacha_cbc_encrypt_file_many_keys( total_entries += num_entries; entry += num_entries; debug!( - "total entries: {} entry: {} slice: {} entries_per_slice: {}", - total_entries, entry, slice, ENTRIES_PER_SEGMENT + "total entries: {} entry: {} segment: {} entries_per_segment: {}", + total_entries, entry, segment, ENTRIES_PER_SEGMENT ); - if (entry - slice) >= ENTRIES_PER_SEGMENT { + if (entry - segment) >= ENTRIES_PER_SEGMENT { break; } } diff --git a/src/replicator.rs b/src/replicator.rs index afde66c8ad..87dfb74b51 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -10,17 +10,17 @@ use crate::ledger::LEDGER_DATA_FILE; use crate::result::Result; use crate::rpc_request::{RpcClient, RpcRequest}; use crate::service::Service; -use crate::storage_stage::ENTRIES_PER_SEGMENT; use crate::store_ledger_stage::StoreLedgerStage; use crate::streamer::BlobReceiver; -use crate::thin_client::retry_get_balance; +use crate::thin_client::{retry_get_balance, ThinClient}; use crate::window_service::window_service; use rand::thread_rng; use rand::Rng; use solana_drone::drone::{request_airdrop_transaction, DRONE_PORT}; use solana_sdk::hash::{Hash, Hasher}; -use solana_sdk::signature::{Keypair, KeypairUtil}; +use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::storage_program::StorageTransaction; +use solana_sdk::storage_program::{get_segment_from_entry, ENTRIES_PER_SEGMENT}; use solana_sdk::transaction::Transaction; use std::fs::File; use std::io; @@ -46,6 +46,7 @@ pub struct Replicator { t_window: JoinHandle<()>, pub retransmit_receiver: BlobReceiver, exit: Arc, + entry_height: u64, } pub fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result { @@ -83,6 +84,23 @@ pub fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result { Ok(hasher.result()) } +fn get_entry_heights_from_last_id( + signature: &ring::signature::Signature, + storage_entry_height: u64, +) -> (u64, u64) { + let signature_vec = signature.as_ref(); + let mut segment_index = u64::from(signature_vec[0]) + | (u64::from(signature_vec[1]) << 8) + | (u64::from(signature_vec[1]) << 16) + | (u64::from(signature_vec[2]) << 24); + let max_segment_index = get_segment_from_entry(storage_entry_height); + segment_index %= max_segment_index as u64; + let entry_height = segment_index * ENTRIES_PER_SEGMENT; + let max_entry_height = entry_height + ENTRIES_PER_SEGMENT; + + (entry_height, max_entry_height) +} + impl Replicator { #[allow(clippy::new_ret_no_self)] pub fn new( @@ -126,53 +144,18 @@ impl Replicator { ); info!("polling for leader"); - let leader; - loop { - if let Some(l) = cluster_info.read().unwrap().get_gossip_top_leader() { - leader = l.clone(); - break; - } - - sleep(Duration::from_millis(900)); - info!("{}", cluster_info.read().unwrap().node_info_trace()); - } + let leader = Self::poll_for_leader(&cluster_info); info!("Got leader: {:?}", leader); - let mut storage_last_id; - let mut storage_entry_height; - loop { - let rpc_client = { - let cluster_info = cluster_info.read().unwrap(); - let rpc_peers = cluster_info.rpc_peers(); - info!("rpc peers: {:?}", rpc_peers); - let node_idx = thread_rng().gen_range(0, rpc_peers.len()); - RpcClient::new_from_socket(rpc_peers[node_idx].rpc) - }; - - storage_last_id = RpcRequest::GetStorageMiningLastId - .make_rpc_request(&rpc_client, 2, None) - .expect("rpc request") - .to_string(); - storage_entry_height = RpcRequest::GetStorageMiningEntryHeight - .make_rpc_request(&rpc_client, 2, None) - .expect("rpc request") - .as_u64() - .unwrap(); - if storage_entry_height != 0 { - break; - } - } + let (storage_last_id, storage_entry_height) = + Self::poll_for_last_id_and_entry_height(&cluster_info); let signature = keypair.sign(storage_last_id.as_ref()); - let signature = signature.as_ref(); - let block_index = u64::from(signature[0]) - | (u64::from(signature[1]) << 8) - | (u64::from(signature[1]) << 16) - | (u64::from(signature[2]) << 24); - let mut entry_height = block_index * ENTRIES_PER_SEGMENT; - entry_height %= storage_entry_height; - let max_entry_height = entry_height + ENTRIES_PER_SEGMENT; + let (entry_height, max_entry_height) = + get_entry_heights_from_last_id(&signature, storage_entry_height); + + info!("replicating entry_height: {}", entry_height); let repair_socket = Arc::new(node.sockets.repair); let mut blob_sockets: Vec> = @@ -214,7 +197,133 @@ impl Replicator { let mut client = mk_client(&leader); - if retry_get_balance(&mut client, &keypair.pubkey(), None).is_none() { + Self::get_airdrop_tokens(&mut client, keypair, &leader_info); + info!("Done downloading ledger at {}", ledger_path.unwrap()); + + let ledger_path = Path::new(ledger_path.unwrap()); + let ledger_data_file_encrypted = ledger_path.join(format!("{}.enc", LEDGER_DATA_FILE)); + let mut sampling_offsets = Vec::new(); + + #[cfg(not(feature = "chacha"))] + sampling_offsets.push(0); + + #[cfg(feature = "chacha")] + { + use crate::storage_stage::NUM_STORAGE_SAMPLES; + use rand::{Rng, SeedableRng}; + use rand_chacha::ChaChaRng; + + let ledger_data_file = ledger_path.join(LEDGER_DATA_FILE); + let mut ivec = [0u8; 64]; + ivec.copy_from_slice(signature.as_ref()); + + let num_encrypted_bytes = + chacha_cbc_encrypt_file(&ledger_data_file, &ledger_data_file_encrypted, &mut ivec)?; + + let num_chacha_blocks = num_encrypted_bytes / CHACHA_BLOCK_SIZE; + let mut rng_seed = [0u8; 32]; + rng_seed.copy_from_slice(&signature.as_ref()[0..32]); + let mut rng = ChaChaRng::from_seed(rng_seed); + for _ in 0..NUM_STORAGE_SAMPLES { + sampling_offsets.push(rng.gen_range(0, num_chacha_blocks) as u64); + } + } + + info!("Done encrypting the ledger"); + + match sample_file(&ledger_data_file_encrypted, &sampling_offsets) { + Ok(hash) => { + let last_id = client.get_last_id(); + info!("sampled hash: {}", hash); + let mut tx = Transaction::storage_new_mining_proof( + &keypair, + hash, + last_id, + entry_height, + Signature::new(signature.as_ref()), + ); + client + .retry_transfer(&keypair, &mut tx, 10) + .expect("transfer didn't work!"); + } + Err(e) => info!("Error occurred while sampling: {:?}", e), + } + + Ok(Self { + gossip_service, + fetch_stage, + store_ledger_stage, + t_window, + retransmit_receiver, + exit, + entry_height, + }) + } + + pub fn close(self) { + self.exit.store(true, Ordering::Relaxed); + self.join() + } + + pub fn join(self) { + self.gossip_service.join().unwrap(); + self.fetch_stage.join().unwrap(); + self.t_window.join().unwrap(); + self.store_ledger_stage.join().unwrap(); + + // Drain the queue here to prevent self.retransmit_receiver from being dropped + // before the window_service thread is joined + let mut retransmit_queue_count = 0; + while let Ok(_blob) = self.retransmit_receiver.recv_timeout(Duration::new(1, 0)) { + retransmit_queue_count += 1; + } + debug!("retransmit channel count: {}", retransmit_queue_count); + } + + pub fn entry_height(&self) -> u64 { + self.entry_height + } + + fn poll_for_leader(cluster_info: &Arc>) -> NodeInfo { + loop { + if let Some(l) = cluster_info.read().unwrap().get_gossip_top_leader() { + return l.clone(); + } + + sleep(Duration::from_millis(900)); + info!("{}", cluster_info.read().unwrap().node_info_trace()); + } + } + + fn poll_for_last_id_and_entry_height(cluster_info: &Arc>) -> (String, u64) { + loop { + let rpc_client = { + let cluster_info = cluster_info.read().unwrap(); + let rpc_peers = cluster_info.rpc_peers(); + debug!("rpc peers: {:?}", rpc_peers); + let node_idx = thread_rng().gen_range(0, rpc_peers.len()); + RpcClient::new_from_socket(rpc_peers[node_idx].rpc) + }; + + let storage_last_id = RpcRequest::GetStorageMiningLastId + .make_rpc_request(&rpc_client, 2, None) + .expect("rpc request") + .to_string(); + let storage_entry_height = RpcRequest::GetStorageMiningEntryHeight + .make_rpc_request(&rpc_client, 2, None) + .expect("rpc request") + .as_u64() + .unwrap(); + if storage_entry_height != 0 { + return (storage_last_id, storage_entry_height); + } + info!("max entry_height: {}", storage_entry_height); + sleep(Duration::from_secs(3)); + } + } + + fn get_airdrop_tokens(client: &mut ThinClient, keypair: &Keypair, leader_info: &NodeInfo) { + if retry_get_balance(client, &keypair.pubkey(), None).is_none() { let mut drone_addr = leader_info.tpu; drone_addr.set_port(DRONE_PORT); @@ -239,63 +348,6 @@ impl Replicator { } }; } - - info!("Done downloading ledger at {}", ledger_path.unwrap()); - - let ledger_path = Path::new(ledger_path.unwrap()); - let ledger_data_file_encrypted = ledger_path.join(format!("{}.enc", LEDGER_DATA_FILE)); - #[cfg(feature = "chacha")] - { - let ledger_data_file = ledger_path.join(LEDGER_DATA_FILE); - let mut ivec = [0u8; CHACHA_BLOCK_SIZE]; - ivec[0..4].copy_from_slice(&[2, 3, 4, 5]); - - chacha_cbc_encrypt_file(&ledger_data_file, &ledger_data_file_encrypted, &mut ivec)?; - } - - info!("Done encrypting the ledger"); - - let sampling_offsets = [0, 1, 2, 3]; - - match sample_file(&ledger_data_file_encrypted, &sampling_offsets) { - Ok(hash) => { - let last_id = client.get_last_id(); - info!("sampled hash: {}", hash); - let tx = - Transaction::storage_new_mining_proof(&keypair, hash, last_id, entry_height); - client.transfer_signed(&tx).expect("transfer didn't work!"); - } - Err(e) => info!("Error occurred while sampling: {:?}", e), - } - - Ok(Self { - gossip_service, - fetch_stage, - store_ledger_stage, - t_window, - retransmit_receiver, - exit, - }) - } - - pub fn close(self) { - self.exit.store(true, Ordering::Relaxed); - self.join() - } - - pub fn join(self) { - self.gossip_service.join().unwrap(); - self.fetch_stage.join().unwrap(); - self.t_window.join().unwrap(); - self.store_ledger_stage.join().unwrap(); - - // Drain the queue here to prevent self.retransmit_receiver from being dropped - // before the window_service thread is joined - let mut retransmit_queue_count = 0; - while let Ok(_blob) = self.retransmit_receiver.recv_timeout(Duration::new(1, 0)) { - retransmit_queue_count += 1; - } - debug!("retransmit channel count: {}", retransmit_queue_count); } } diff --git a/src/rpc.rs b/src/rpc.rs index e00d7ef05c..0d059fdf7d 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -343,11 +343,11 @@ impl JsonRpcRequestProcessor { Ok(self.bank.transaction_count() as u64) } fn get_storage_mining_last_id(&self) -> Result { - let id = self.bank.storage_state.get_last_id(); + let id = self.bank.get_storage_last_id(); Ok(bs58::encode(id).into_string()) } fn get_storage_mining_entry_height(&self) -> Result { - let entry_height = self.bank.storage_state.get_entry_height(); + let entry_height = self.bank.get_storage_entry_height(); Ok(entry_height) } fn get_storage_pubkeys_for_entry_height(&self, entry_height: u64) -> Result> { diff --git a/src/storage_stage.rs b/src/storage_stage.rs index 6f9cab8407..7127dd224c 100644 --- a/src/storage_stage.rs +++ b/src/storage_stage.rs @@ -4,21 +4,24 @@ #[cfg(all(feature = "chacha", feature = "cuda"))] use crate::chacha_cuda::chacha_cbc_encrypt_file_many_keys; +use crate::client::mk_client; +use crate::cluster_info::ClusterInfo; use crate::entry::EntryReceiver; use crate::result::{Error, Result}; use crate::service::Service; +use crate::thin_client::ThinClient; use bincode::deserialize; use rand::{Rng, SeedableRng}; use rand_chacha::ChaChaRng; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; -use solana_sdk::signature::Keypair; use solana_sdk::signature::Signature; +use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::storage_program; -use solana_sdk::storage_program::StorageProgram; -use solana_sdk::vote_program; +use solana_sdk::storage_program::{get_segment_from_entry, StorageProgram, StorageTransaction}; +use solana_sdk::system_transaction::SystemTransaction; +use solana_sdk::transaction::Transaction; use std::collections::HashSet; -use std::mem::size_of; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::RecvTimeoutError; use std::sync::{Arc, RwLock}; @@ -34,10 +37,8 @@ type ReplicatorMap = Vec>; #[derive(Default)] pub struct StorageStateInner { storage_results: StorageResults, - storage_keys: StorageKeys, + pub storage_keys: StorageKeys, replicator_map: ReplicatorMap, - storage_last_id: Hash, - entry_height: u64, } #[derive(Default)] @@ -55,17 +56,12 @@ macro_rules! cross_boundary { }; } -const NUM_HASHES_FOR_STORAGE_ROTATE: u64 = 1024; +const NUM_HASHES_FOR_STORAGE_ROTATE: u64 = 32; // TODO: some way to dynamically size NUM_IDENTITIES const NUM_IDENTITIES: usize = 1024; -const NUM_SAMPLES: usize = 4; -pub const ENTRIES_PER_SEGMENT: u64 = 16; +pub const NUM_STORAGE_SAMPLES: usize = 4; const KEY_SIZE: usize = 64; -pub fn get_segment_from_entry(entry_height: u64) -> u64 { - entry_height / ENTRIES_PER_SEGMENT -} - fn get_identity_index_from_signature(key: &Signature) -> usize { let rkey = key.as_ref(); let mut res: usize = (rkey[0] as usize) @@ -86,8 +82,6 @@ impl StorageState { storage_keys, storage_results, replicator_map, - entry_height: 0, - storage_last_id: Hash::default(), }; StorageState { @@ -95,29 +89,17 @@ impl StorageState { } } - pub fn get_mining_key(&self, key: &Signature) -> Vec { - let idx = get_identity_index_from_signature(key); - self.state.read().unwrap().storage_keys[idx..idx + KEY_SIZE].to_vec() - } - pub fn get_mining_result(&self, key: &Signature) -> Hash { let idx = get_identity_index_from_signature(key); self.state.read().unwrap().storage_results[idx] } - pub fn get_last_id(&self) -> Hash { - self.state.read().unwrap().storage_last_id - } - - pub fn get_entry_height(&self) -> u64 { - self.state.read().unwrap().entry_height - } - pub fn get_pubkeys_for_entry_height(&self, entry_height: u64) -> Vec { // TODO: keep track of age? const MAX_PUBKEYS_TO_RETURN: usize = 5; - let index = (entry_height / ENTRIES_PER_SEGMENT) as usize; + let index = get_segment_from_entry(entry_height); let replicator_map = &self.state.read().unwrap().replicator_map; + info!("replicator_map: {:?}", replicator_map); if index < replicator_map.len() { replicator_map[index] .iter() @@ -138,17 +120,17 @@ impl StorageStage { keypair: Arc, exit: Arc, entry_height: u64, + cluster_info: &Arc>, ) -> Self { debug!("storage_stage::new: entry_height: {}", entry_height); - storage_state.state.write().unwrap().entry_height = entry_height; let storage_state_inner = storage_state.state.clone(); let ledger_path = ledger_path.map(String::from); + let cluster_info = cluster_info.clone(); let t_storage_mining_verifier = Builder::new() .name("solana-storage-mining-verify-stage".to_string()) .spawn(move || { let exit = exit.clone(); let mut poh_height = 0; - let mut current_key = 0; let mut entry_height = entry_height; loop { if let Some(ref ledger_path_str) = ledger_path { @@ -159,7 +141,7 @@ impl StorageStage { ledger_path_str, &mut poh_height, &mut entry_height, - &mut current_key, + &cluster_info, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, @@ -180,12 +162,53 @@ impl StorageStage { } } + fn create_storage_accounts(keypair: &Keypair, leader_client: &mut ThinClient) { + let last_id = leader_client.get_last_id(); + match leader_client.get_account_userdata(&storage_program::system_id()) { + Ok(_) => {} + Err(e) => { + debug!("error storage userdata: {:?}", e); + + let mut tx = Transaction::system_create( + keypair, + storage_program::system_id(), + last_id, + 1, + 16 * 1024, + storage_program::id(), + 1, + ); + if let Err(e) = leader_client.retry_transfer(keypair, &mut tx, 5) { + info!("Couldn't create storage account error: {}", e); + } + } + } + if leader_client + .get_account_userdata(&keypair.pubkey()) + .is_err() + { + let mut tx = Transaction::system_create( + keypair, + keypair.pubkey(), + last_id, + 1, + 0, + storage_program::id(), + 1, + ); + if let Err(e) = leader_client.retry_transfer(keypair, &mut tx, 5) { + info!("Couldn't create storage account error: {}", e); + } + } + } + pub fn process_entry_crossing( - state: &Arc>, + _state: &Arc>, keypair: &Arc, _ledger_path: &str, entry_id: Hash, entry_height: u64, + cluster_info: &Arc>, ) -> Result<()> { let mut seed = [0u8; 32]; let signature = keypair.sign(&entry_id.as_ref()); @@ -194,10 +217,33 @@ impl StorageStage { let mut rng = ChaChaRng::from_seed(seed); - state.write().unwrap().entry_height = entry_height; + if let Some(leader_data) = cluster_info.read().unwrap().leader_data() { + let mut leader_client = mk_client(leader_data); + + Self::create_storage_accounts(keypair, &mut leader_client); + + let last_id = leader_client.get_last_id(); + + debug!( + "advertising new storage last id entry_height: {}!", + entry_height + ); + let mut tx = Transaction::storage_new_advertise_last_id( + keypair, + entry_id, + last_id, + entry_height, + ); + if let Err(e) = leader_client.retry_transfer(keypair, &mut tx, 5) { + info!( + "Couldn't advertise my storage last id to the network! error: {}", + e + ); + } + } // Regenerate the answers - let num_segments = (entry_height / ENTRIES_PER_SEGMENT) as usize; + let num_segments = get_segment_from_entry(entry_height); if num_segments == 0 { info!("Ledger has 0 segments!"); return Ok(()); @@ -211,7 +257,7 @@ impl StorageStage { ); let mut samples = vec![]; - for _ in 0..NUM_SAMPLES { + for _ in 0..NUM_STORAGE_SAMPLES { samples.push(rng.gen_range(0, 10)); } debug!("generated samples: {:?}", samples); @@ -225,7 +271,7 @@ impl StorageStage { // Should be overwritten by the vote signatures which replace the // key values by the time it runs again. - let mut statew = state.write().unwrap(); + let mut statew = _state.write().unwrap(); match chacha_cbc_encrypt_file_many_keys( _ledger_path, @@ -248,6 +294,46 @@ impl StorageStage { Ok(()) } + fn process_storage_program( + instruction_idx: usize, + storage_state: &Arc>, + entry_height: u64, + tx: &Transaction, + ) { + match deserialize(&tx.instructions[instruction_idx].userdata) { + Ok(StorageProgram::SubmitMiningProof { + entry_height: proof_entry_height, + .. + }) => { + if proof_entry_height < entry_height { + let mut statew = storage_state.write().unwrap(); + let max_segment_index = get_segment_from_entry(entry_height); + if statew.replicator_map.len() < max_segment_index { + statew + .replicator_map + .resize(max_segment_index, HashSet::new()); + } + let proof_segment_index = get_segment_from_entry(proof_entry_height); + if proof_segment_index < statew.replicator_map.len() { + statew.replicator_map[proof_segment_index].insert(tx.account_keys[0]); + } + } + debug!( + "storage proof: max entry_height: {} proof height: {}", + entry_height, proof_entry_height + ); + } + Ok(StorageProgram::AdvertiseStorageLastId { id, .. }) => { + debug!("id: {}", id); + } + Ok(StorageProgram::ClaimStorageReward { .. }) => {} + Ok(StorageProgram::ProofValidation { .. }) => {} + Err(e) => { + info!("error: {:?}", e); + } + } + } + pub fn process_entries( keypair: &Arc, storage_state: &Arc>, @@ -255,54 +341,24 @@ impl StorageStage { ledger_path: &str, poh_height: &mut u64, entry_height: &mut u64, - current_key_idx: &mut usize, + cluster_info: &Arc>, ) -> Result<()> { let timeout = Duration::new(1, 0); let entries = entry_receiver.recv_timeout(timeout)?; + info!( + "processing: {} entries height: {} poh_height: {}", + entries.len(), + entry_height, + poh_height + ); for entry in entries { // Go through the transactions, find votes, and use them to update // the storage_keys with their signatures. for tx in entry.transactions { for (i, program_id) in tx.program_ids.iter().enumerate() { - if vote_program::check_id(&program_id) { - debug!( - "generating storage_keys from votes current_key_idx: {}", - *current_key_idx - ); - let storage_keys = &mut storage_state.write().unwrap().storage_keys; - storage_keys[*current_key_idx..*current_key_idx + size_of::()] - .copy_from_slice(tx.signatures[0].as_ref()); - *current_key_idx += size_of::(); - *current_key_idx %= storage_keys.len(); - } else if storage_program::check_id(&program_id) { - match deserialize(&tx.instructions[i].userdata) { - Ok(StorageProgram::SubmitMiningProof { - entry_height: proof_entry_height, - .. - }) => { - if proof_entry_height < *entry_height { - let mut statew = storage_state.write().unwrap(); - let max_segment_index = - (*entry_height / ENTRIES_PER_SEGMENT) as usize; - if statew.replicator_map.len() <= max_segment_index { - statew - .replicator_map - .resize(max_segment_index, HashSet::new()); - } - let proof_segment_index = - (proof_entry_height / ENTRIES_PER_SEGMENT) as usize; - if proof_segment_index < statew.replicator_map.len() { - statew.replicator_map[proof_segment_index] - .insert(tx.account_keys[0]); - } - } - debug!("storage proof: entry_height: {}", entry_height); - } - Err(e) => { - info!("error: {:?}", e); - } - } + if storage_program::check_id(&program_id) { + Self::process_storage_program(i, storage_state, *entry_height, &tx); } } } @@ -317,6 +373,7 @@ impl StorageStage { &ledger_path, entry.id, *entry_height, + cluster_info, )?; } *entry_height += 1; @@ -336,6 +393,8 @@ impl Service for StorageStage { #[cfg(test)] mod tests { + use crate::cluster_info::ClusterInfo; + use crate::cluster_info::NodeInfo; use crate::entry::Entry; use crate::ledger::make_tiny_test_entries; use crate::ledger::{create_tmp_sample_ledger, LedgerWriter}; @@ -347,18 +406,24 @@ mod tests { use rayon::prelude::*; use solana_sdk::hash::Hash; use solana_sdk::hash::Hasher; + use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; + use solana_sdk::storage_program::{StorageTransaction, ENTRIES_PER_SEGMENT}; use solana_sdk::transaction::Transaction; - use solana_sdk::vote_program::Vote; - use solana_sdk::vote_transaction::VoteTransaction; use std::cmp::{max, min}; use std::fs::remove_dir_all; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::channel; - use std::sync::Arc; + use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; + fn test_cluster_info(id: Pubkey) -> Arc> { + let node_info = NodeInfo::new_localhost(id, 0); + let cluster_info = ClusterInfo::new(node_info); + Arc::new(RwLock::new(cluster_info)) + } + #[test] fn test_storage_stage_none_ledger() { let keypair = Arc::new(Keypair::new()); @@ -366,6 +431,9 @@ mod tests { let (_storage_entry_sender, storage_entry_receiver) = channel(); let storage_state = StorageState::new(); + + let cluster_info = test_cluster_info(keypair.pubkey()); + let storage_stage = StorageStage::new( &storage_state, storage_entry_receiver, @@ -373,6 +441,7 @@ mod tests { keypair, exit.clone(), 0, + &cluster_info, ); exit.store(true, Ordering::Relaxed); storage_stage.join().unwrap(); @@ -401,6 +470,9 @@ mod tests { let (storage_entry_sender, storage_entry_receiver) = channel(); let storage_state = StorageState::new(); + + let cluster_info = test_cluster_info(keypair.pubkey()); + let storage_stage = StorageStage::new( &storage_state, storage_entry_receiver, @@ -408,6 +480,7 @@ mod tests { keypair, exit.clone(), 0, + &cluster_info, ); storage_entry_sender.send(entries.clone()).unwrap(); @@ -444,7 +517,7 @@ mod tests { } #[test] - fn test_storage_stage_process_vote_entries() { + fn test_storage_stage_process_mining_proof_entries() { solana_logger::setup(); let keypair = Arc::new(Keypair::new()); let exit = Arc::new(AtomicBool::new(false)); @@ -457,13 +530,15 @@ mod tests { 1, ); - let entries = make_tiny_test_entries(128); + let entries = make_tiny_test_entries(ENTRIES_PER_SEGMENT as usize + 2); { let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); writer.write_entries(&entries.clone()).unwrap(); // drops writer, flushes buffers } + let cluster_info = test_cluster_info(keypair.pubkey()); + let (storage_entry_sender, storage_entry_receiver) = channel(); let storage_state = StorageState::new(); let storage_stage = StorageStage::new( @@ -473,44 +548,41 @@ mod tests { keypair, exit.clone(), 0, + &cluster_info, ); storage_entry_sender.send(entries.clone()).unwrap(); - let mut reference_keys; - { - let keys = &storage_state.state.read().unwrap().storage_keys; - reference_keys = vec![0; keys.len()]; - reference_keys.copy_from_slice(keys); - } - let mut vote_txs: Vec = Vec::new(); - let vote = Vote { - tick_height: 123456, - }; let keypair = Keypair::new(); - let vote_tx = VoteTransaction::vote_new(&keypair, vote, Hash::default(), 1); - vote_txs.push(vote_tx); - let vote_entries = vec![Entry::new(&Hash::default(), 0, 1, vote_txs)]; - storage_entry_sender.send(vote_entries).unwrap(); + let last_id = Hash::default(); + let signature = Signature::new(keypair.sign(&last_id.as_ref()).as_ref()); + let storage_tx = Transaction::storage_new_mining_proof( + &keypair, + Hash::default(), + Hash::default(), + 0, + signature, + ); + let txs = vec![storage_tx]; + let storage_entries = vec![Entry::new(&Hash::default(), 0, 1, txs)]; + storage_entry_sender.send(storage_entries).unwrap(); for _ in 0..5 { - { - let keys = &storage_state.state.read().unwrap().storage_keys; - if keys[..] != *reference_keys.as_slice() { - break; - } + if storage_state.get_pubkeys_for_entry_height(0).len() != 0 { + break; } sleep(Duration::new(1, 0)); + info!("pubkeys are empty"); } - debug!("joining..?"); + info!("joining..?"); exit.store(true, Ordering::Relaxed); storage_stage.join().unwrap(); - { - let keys = &storage_state.state.read().unwrap().storage_keys; - assert_ne!(keys[..], *reference_keys); - } + assert_eq!( + storage_state.get_pubkeys_for_entry_height(0)[0], + keypair.pubkey() + ); remove_dir_all(ledger_path).unwrap(); } diff --git a/src/tvu.rs b/src/tvu.rs index 2391e105e4..d43085d6d7 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -127,6 +127,7 @@ impl Tvu { keypair, exit.clone(), entry_height, + &cluster_info, ); Tvu { diff --git a/tests/replicator.rs b/tests/replicator.rs index ed12feeb10..328959ea96 100644 --- a/tests/replicator.rs +++ b/tests/replicator.rs @@ -58,7 +58,7 @@ fn test_replicator_startup() { let validator = Fullnode::new( validator_node, &validator_ledger_path, - validator_keypair, + validator_keypair.clone(), vote_account_keypair, Some(leader_info.gossip), false, @@ -70,9 +70,17 @@ fn test_replicator_startup() { let bob = Keypair::new(); + for _ in 0..10 { + let last_id = leader_client.get_last_id(); + leader_client + .transfer(1, &mint.keypair(), bob.pubkey(), &last_id) + .unwrap(); + sleep(Duration::from_millis(200)); + } + let last_id = leader_client.get_last_id(); leader_client - .transfer(1, &mint.keypair(), bob.pubkey(), &last_id) + .transfer(10, &mint.keypair(), validator_keypair.pubkey(), &last_id) .unwrap(); let replicator_keypair = Keypair::new(); @@ -135,10 +143,14 @@ fn test_replicator_startup() { { use solana::rpc_request::{RpcClient, RpcRequest}; + debug!( + "looking for pubkeys for entry: {}", + replicator.entry_height() + ); let rpc_client = RpcClient::new_from_socket(validator_node_info.rpc); let mut non_zero_pubkeys = false; for _ in 0..30 { - let params = json!([0]); + let params = json!([replicator.entry_height()]); let pubkeys = RpcRequest::GetStoragePubkeysForEntryHeight .make_rpc_request(&rpc_client, 1, Some(params)) .unwrap();