Revert "Validators make a transaction to advertise their storage last_id"

This reverts commit a1759aed19.
This commit is contained in:
Michael Vines 2018-12-23 21:40:52 +00:00 committed by Grimes
parent 3c835b692b
commit 58f2598d5d
10 changed files with 234 additions and 907 deletions

View File

@ -3,27 +3,12 @@
//! and give reward for good proofs.
use log::*;
extern crate solana_sdk;
use solana_sdk::account::KeyedAccount;
use solana_sdk::native_program::ProgramError;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::solana_entrypoint;
use solana_sdk::storage_program::*;
pub const TOTAL_VALIDATOR_REWARDS: u64 = 1000;
pub const TOTAL_REPLICATOR_REWARDS: u64 = 1000;
fn count_valid_proofs(proofs: &[ProofStatus]) -> u64 {
let mut num = 0;
for proof in proofs {
if let ProofStatus::Valid = proof {
num += 1;
}
}
num
}
solana_entrypoint!(entrypoint);
fn entrypoint(
_program_id: &Pubkey,
@ -33,150 +18,24 @@ fn entrypoint(
) -> Result<(), ProgramError> {
solana_logger::setup();
if keyed_accounts.len() != 2 {
// keyed_accounts[1] should be the main storage key
// to access its userdata
Err(ProgramError::InvalidArgument)?;
}
// accounts_keys[0] must be signed
if keyed_accounts[0].signer_key().is_none() {
info!("account[0] is unsigned");
Err(ProgramError::GenericError)?;
}
if *keyed_accounts[1].unsigned_key() != system_id() {
info!(
"invalid account id owner: {:?} system_id: {:?}",
keyed_accounts[1].unsigned_key(),
system_id()
);
Err(ProgramError::InvalidArgument)?;
}
if let Ok(syscall) = bincode::deserialize(data) {
let mut storage_account_state = if let Ok(storage_account_state) =
bincode::deserialize(&keyed_accounts[1].account.userdata)
{
storage_account_state
} else {
StorageProgramState::default()
};
debug!(
"deserialized state height: {}",
storage_account_state.entry_height
);
match syscall {
StorageProgram::SubmitMiningProof {
sha_state,
entry_height,
signature,
} => {
let segment_index = get_segment_from_entry(entry_height);
let current_segment_index =
get_segment_from_entry(storage_account_state.entry_height);
if segment_index >= current_segment_index {
return Err(ProgramError::InvalidArgument);
}
debug!(
info!(
"Mining proof submitted with state {:?} entry_height: {}",
sha_state, entry_height
);
let proof_info = ProofInfo {
id: *keyed_accounts[0].signer_key().unwrap(),
sha_state,
signature,
};
storage_account_state.proofs[segment_index].push(proof_info);
}
StorageProgram::AdvertiseStorageLastId { id, entry_height } => {
let original_segments = storage_account_state.entry_height / ENTRIES_PER_SEGMENT;
let segments = entry_height / ENTRIES_PER_SEGMENT;
debug!(
"advertise new last id segments: {} orig: {}",
segments, original_segments
);
if segments <= original_segments {
return Err(ProgramError::InvalidArgument);
}
storage_account_state.entry_height = entry_height;
storage_account_state.id = id;
// move the proofs to previous_proofs
storage_account_state.previous_proofs = storage_account_state.proofs.clone();
storage_account_state.proofs.clear();
storage_account_state
.proofs
.resize(segments as usize, Vec::new());
// move lockout_validations to reward_validations
storage_account_state.reward_validations =
storage_account_state.lockout_validations.clone();
storage_account_state.lockout_validations.clear();
storage_account_state
.lockout_validations
.resize(segments as usize, Vec::new());
}
StorageProgram::ProofValidation {
entry_height,
proof_mask,
} => {
if entry_height >= storage_account_state.entry_height {
return Err(ProgramError::InvalidArgument);
}
let segment_index = get_segment_from_entry(entry_height);
if storage_account_state.previous_proofs[segment_index].len() != proof_mask.len() {
return Err(ProgramError::InvalidArgument);
}
// TODO: Check that each proof mask matches the signature
/*for (i, entry) in proof_mask.iter().enumerate() {
if storage_account_state.previous_proofs[segment_index][i] != signature.as_ref[0] {
return Err(ProgramError::InvalidArgument);
}
}*/
let info = ValidationInfo {
id: *keyed_accounts[0].signer_key().unwrap(),
proof_mask,
};
storage_account_state.lockout_validations[segment_index].push(info);
}
StorageProgram::ClaimStorageReward { entry_height } => {
let claims_index = get_segment_from_entry(entry_height);
let account_key = keyed_accounts[0].signer_key().unwrap();
let mut num_validations = 0;
let mut total_validations = 0;
for validation in &storage_account_state.reward_validations[claims_index] {
if *account_key == validation.id {
num_validations += count_valid_proofs(&validation.proof_mask);
} else {
total_validations += count_valid_proofs(&validation.proof_mask);
}
}
total_validations += num_validations;
if total_validations > 0 {
keyed_accounts[0].account.tokens +=
(TOTAL_VALIDATOR_REWARDS * num_validations) / total_validations;
}
}
}
keyed_accounts[1].account.userdata.clear();
if bincode::serialize_into(
&mut keyed_accounts[1].account.userdata,
&storage_account_state,
)
.is_err()
{
return Err(ProgramError::UserdataTooSmall);
}
Ok(())
} else {
info!("Invalid instruction userdata: {:?}", data);
@ -187,41 +46,8 @@ fn entrypoint(
#[cfg(test)]
mod test {
use super::*;
use solana_sdk::account::{create_keyed_accounts, Account};
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
use solana_sdk::storage_program::ProofStatus;
use solana_sdk::storage_program::StorageTransaction;
use solana_sdk::transaction::{Instruction, Transaction};
fn test_transaction(
tx: &Transaction,
program_accounts: &mut [Account],
) -> Result<(), ProgramError> {
assert_eq!(tx.instructions.len(), 1);
let Instruction {
ref accounts,
ref userdata,
..
} = tx.instructions[0];
info!("accounts: {:?}", accounts);
let mut keyed_accounts: Vec<_> = accounts
.iter()
.map(|&index| {
let index = index as usize;
let key = &tx.account_keys[index];
(key, index < tx.signatures.len())
})
.zip(program_accounts.iter_mut())
.map(|((key, is_signer), account)| KeyedAccount::new(key, is_signer, account))
.collect();
let ret = entrypoint(&id(), &mut keyed_accounts, &userdata, 42);
info!("ret: {:?}", ret);
ret
}
use solana_sdk::account::create_keyed_accounts;
use solana_sdk::signature::{Keypair, KeypairUtil};
#[test]
fn test_storage_tx() {
@ -230,123 +56,4 @@ mod test {
let mut keyed_accounts = create_keyed_accounts(&mut accounts);
assert!(entrypoint(&id(), &mut keyed_accounts, &[], 42).is_err());
}
#[test]
fn test_invalid_accounts_len() {
let keypair = Keypair::new();
let mut accounts = [Default::default()];
let tx = Transaction::storage_new_mining_proof(
&keypair,
Hash::default(),
Hash::default(),
0,
Signature::default(),
);
assert!(test_transaction(&tx, &mut accounts).is_err());
let mut accounts = [Default::default(), Default::default(), Default::default()];
assert!(test_transaction(&tx, &mut accounts).is_err());
}
#[test]
fn test_submit_mining_invalid_entry_height() {
solana_logger::setup();
let keypair = Keypair::new();
let mut accounts = [Default::default(), Default::default()];
let tx = Transaction::storage_new_mining_proof(
&keypair,
Hash::default(),
Hash::default(),
0,
Signature::default(),
);
// Haven't seen a transaction to roll over the epoch, so this should fail
assert!(test_transaction(&tx, &mut accounts).is_err());
}
#[test]
fn test_submit_mining_ok() {
solana_logger::setup();
let keypair = Keypair::new();
let mut accounts = [Default::default(), Default::default()];
let tx = Transaction::storage_new_advertise_last_id(
&keypair,
Hash::default(),
Hash::default(),
ENTRIES_PER_SEGMENT,
);
assert!(test_transaction(&tx, &mut accounts).is_ok());
let tx = Transaction::storage_new_mining_proof(
&keypair,
Hash::default(),
Hash::default(),
0,
Signature::default(),
);
assert!(test_transaction(&tx, &mut accounts).is_ok());
}
#[test]
fn test_validate_mining() {
solana_logger::setup();
let keypair = Keypair::new();
let mut accounts = [Default::default(), Default::default()];
let entry_height = 0;
let tx = Transaction::storage_new_advertise_last_id(
&keypair,
Hash::default(),
Hash::default(),
ENTRIES_PER_SEGMENT,
);
assert!(test_transaction(&tx, &mut accounts).is_ok());
let tx = Transaction::storage_new_mining_proof(
&keypair,
Hash::default(),
Hash::default(),
entry_height,
Signature::default(),
);
assert!(test_transaction(&tx, &mut accounts).is_ok());
let tx = Transaction::storage_new_advertise_last_id(
&keypair,
Hash::default(),
Hash::default(),
ENTRIES_PER_SEGMENT * 2,
);
assert!(test_transaction(&tx, &mut accounts).is_ok());
let tx = Transaction::storage_new_proof_validation(
&keypair,
Hash::default(),
entry_height,
vec![ProofStatus::Valid],
);
assert!(test_transaction(&tx, &mut accounts).is_ok());
let tx = Transaction::storage_new_advertise_last_id(
&keypair,
Hash::default(),
Hash::default(),
ENTRIES_PER_SEGMENT * 3,
);
assert!(test_transaction(&tx, &mut accounts).is_ok());
let tx = Transaction::storage_new_reward_claim(&keypair, Hash::default(), entry_height);
assert!(test_transaction(&tx, &mut accounts).is_ok());
assert!(accounts[0].tokens == TOTAL_VALIDATOR_REWARDS);
}
}

View File

@ -1,65 +1,11 @@
use crate::hash::Hash;
use crate::pubkey::Pubkey;
use crate::signature::Keypair;
use crate::signature::Signature;
use crate::signature::{Keypair, KeypairUtil};
use crate::transaction::Transaction;
pub const ENTRIES_PER_SEGMENT: u64 = 16;
pub fn get_segment_from_entry(entry_height: u64) -> usize {
(entry_height / ENTRIES_PER_SEGMENT) as usize
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum ProofStatus {
Valid,
NotValid,
Skipped,
}
#[derive(Default, Debug, Serialize, Deserialize, Clone)]
pub struct ProofInfo {
pub id: Pubkey,
pub signature: Signature,
pub sha_state: Hash,
}
#[derive(Default, Debug, Serialize, Deserialize, Clone)]
pub struct ValidationInfo {
pub id: Pubkey,
pub proof_mask: Vec<ProofStatus>,
}
#[derive(Default, Debug, Serialize, Deserialize)]
pub struct StorageProgramState {
pub entry_height: u64,
pub id: Hash,
pub proofs: Vec<Vec<ProofInfo>>,
pub previous_proofs: Vec<Vec<ProofInfo>>,
pub lockout_validations: Vec<Vec<ValidationInfo>>,
pub reward_validations: Vec<Vec<ValidationInfo>>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum StorageProgram {
SubmitMiningProof {
sha_state: Hash,
entry_height: u64,
signature: Signature,
},
AdvertiseStorageLastId {
id: Hash,
entry_height: u64,
},
ClaimStorageReward {
entry_height: u64,
},
ProofValidation {
entry_height: u64,
proof_mask: Vec<ProofStatus>,
},
SubmitMiningProof { sha_state: Hash, entry_height: u64 },
}
pub const STORAGE_PROGRAM_ID: [u8; 32] = [
@ -67,12 +13,6 @@ pub const STORAGE_PROGRAM_ID: [u8; 32] = [
0,
];
// TODO check this is available
pub const STORAGE_SYSTEM_ACCOUNT_ID: [u8; 32] = [
133, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0,
];
pub fn check_id(program_id: &Pubkey) -> bool {
program_id.as_ref() == STORAGE_PROGRAM_ID
}
@ -81,34 +21,13 @@ pub fn id() -> Pubkey {
Pubkey::new(&STORAGE_PROGRAM_ID)
}
pub fn system_id() -> Pubkey {
Pubkey::new(&STORAGE_SYSTEM_ACCOUNT_ID)
}
pub trait StorageTransaction {
fn storage_new_mining_proof(
from_keypair: &Keypair,
sha_state: Hash,
last_id: Hash,
entry_height: u64,
signature: Signature,
) -> Self;
fn storage_new_advertise_last_id(
from_keypair: &Keypair,
storage_last_id: Hash,
last_id: Hash,
entry_height: u64,
) -> Self;
fn storage_new_proof_validation(
from_keypair: &Keypair,
last_id: Hash,
entry_height: u64,
proof_mask: Vec<ProofStatus>,
) -> Self;
fn storage_new_reward_claim(from_keypair: &Keypair, last_id: Hash, entry_height: u64) -> Self;
}
impl StorageTransaction for Transaction {
@ -117,68 +36,14 @@ impl StorageTransaction for Transaction {
sha_state: Hash,
last_id: Hash,
entry_height: u64,
signature: Signature,
) -> Self {
let program = StorageProgram::SubmitMiningProof {
sha_state,
entry_height,
signature,
};
Transaction::new(
from_keypair,
&[Pubkey::new(&STORAGE_SYSTEM_ACCOUNT_ID)],
id(),
&program,
last_id,
0,
)
}
fn storage_new_advertise_last_id(
from_keypair: &Keypair,
storage_id: Hash,
last_id: Hash,
entry_height: u64,
) -> Self {
let program = StorageProgram::AdvertiseStorageLastId {
id: storage_id,
entry_height,
};
Transaction::new(
from_keypair,
&[Pubkey::new(&STORAGE_SYSTEM_ACCOUNT_ID)],
id(),
&program,
last_id,
0,
)
}
fn storage_new_proof_validation(
from_keypair: &Keypair,
last_id: Hash,
entry_height: u64,
proof_mask: Vec<ProofStatus>,
) -> Self {
let program = StorageProgram::ProofValidation {
entry_height,
proof_mask,
};
Transaction::new(
from_keypair,
&[Pubkey::new(&STORAGE_SYSTEM_ACCOUNT_ID)],
id(),
&program,
last_id,
0,
)
}
fn storage_new_reward_claim(from_keypair: &Keypair, last_id: Hash, entry_height: u64) -> Self {
let program = StorageProgram::ClaimStorageReward { entry_height };
Transaction::new(
from_keypair,
&[Pubkey::new(&STORAGE_SYSTEM_ACCOUNT_ID)],
&[from_keypair.pubkey()],
id(),
&program,
last_id,

View File

@ -284,33 +284,6 @@ impl Bank {
.get_pubkeys_for_entry_height(entry_height)
}
pub fn get_storage_entry_height(&self) -> u64 {
match self.get_account(&storage_program::system_id()) {
Some(storage_system_account) => {
let state = deserialize(&storage_system_account.userdata);
if let Ok(state) = state {
let state: storage_program::StorageProgramState = state;
return state.entry_height;
}
}
None => {
info!("error in reading entry_height");
}
}
0
}
pub fn get_storage_last_id(&self) -> Hash {
if let Some(storage_system_account) = self.get_account(&storage_program::system_id()) {
let state = deserialize(&storage_system_account.userdata);
if let Ok(state) = state {
let state: storage_program::StorageProgramState = state;
return state.id;
}
}
Hash::default()
}
/// Forget all signatures. Useful for benchmarking.
pub fn clear_signatures(&self) {
self.last_ids.write().unwrap().clear_signatures();
@ -1036,7 +1009,6 @@ mod tests {
use solana_sdk::native_program::ProgramError;
use solana_sdk::signature::Keypair;
use solana_sdk::signature::KeypairUtil;
use solana_sdk::storage_program::{StorageTransaction, ENTRIES_PER_SEGMENT};
use solana_sdk::system_transaction::SystemTransaction;
use solana_sdk::transaction::Instruction;
use std;
@ -1762,10 +1734,6 @@ mod tests {
132, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0,
]);
let storage_system = Pubkey::new(&[
133, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0,
]);
assert_eq!(system_program::id(), system);
assert_eq!(solana_native_loader::id(), native);
@ -1774,7 +1742,6 @@ mod tests {
assert_eq!(storage_program::id(), storage);
assert_eq!(token_program::id(), token);
assert_eq!(vote_program::id(), vote);
assert_eq!(storage_program::system_id(), storage_system);
}
#[test]
@ -1788,7 +1755,6 @@ mod tests {
storage_program::id(),
token_program::id(),
vote_program::id(),
storage_program::system_id(),
];
assert!(ids.into_iter().all(move |id| unique.insert(id)));
}
@ -1963,67 +1929,4 @@ mod tests {
bank.rollback();
}
#[test]
fn test_bank_storage() {
solana_logger::setup();
let alice = Mint::new(1000);
let bank = Bank::new(&alice);
let bob = Keypair::new();
let jack = Keypair::new();
let jill = Keypair::new();
let x = 42;
let last_id = hash(&[x]);
let x2 = x * 2;
let storage_last_id = hash(&[x2]);
bank.register_tick(&last_id);
bank.transfer(10, &alice.keypair(), jill.pubkey(), last_id)
.unwrap();
let tx = Transaction::system_create(
&jill,
storage_program::system_id(),
bank.last_id(),
1,
4096,
storage_program::id(),
1,
);
assert!(bank.process_transaction(&tx).is_ok());
bank.transfer(10, &alice.keypair(), bob.pubkey(), last_id)
.unwrap();
bank.transfer(10, &alice.keypair(), jack.pubkey(), last_id)
.unwrap();
let tx = Transaction::storage_new_advertise_last_id(
&bob,
storage_last_id,
last_id,
ENTRIES_PER_SEGMENT,
);
assert!(bank.process_transaction(&tx).is_ok());
let entry_height = 0;
let tx = Transaction::storage_new_mining_proof(
&jack,
Hash::default(),
last_id,
entry_height,
Signature::default(),
);
assert!(bank.process_transaction(&tx).is_ok());
assert_eq!(bank.get_storage_entry_height(), ENTRIES_PER_SEGMENT);
assert_eq!(bank.get_storage_last_id(), storage_last_id);
assert_eq!(bank.get_pubkeys_for_entry_height(0), vec![]);
}
}

View File

@ -35,34 +35,26 @@ pub fn chacha_cbc_encrypt_file(
in_path: &Path,
out_path: &Path,
ivec: &mut [u8; CHACHA_BLOCK_SIZE],
) -> io::Result<usize> {
) -> io::Result<()> {
let mut in_file = BufReader::new(File::open(in_path).expect("Can't open ledger data file"));
let mut out_file =
BufWriter::new(File::create(out_path).expect("Can't open ledger encrypted data file"));
const BUFFER_SIZE: usize = 4 * 1024;
let mut buffer = [0; BUFFER_SIZE];
let mut encrypted_buffer = [0; BUFFER_SIZE];
let mut buffer = [0; 4 * 1024];
let mut encrypted_buffer = [0; 4 * 1024];
let key = [0; CHACHA_KEY_SIZE];
let mut total_size = 0;
while let Ok(mut size) = in_file.read(&mut buffer) {
while let Ok(size) = in_file.read(&mut buffer) {
debug!("read {} bytes", size);
if size == 0 {
break;
}
if size < BUFFER_SIZE {
// We are on the last block, round to the nearest key_size
// boundary
size = (size + CHACHA_KEY_SIZE - 1) & !(CHACHA_KEY_SIZE - 1);
}
total_size += size;
chacha_cbc_encrypt(&buffer[..size], &mut encrypted_buffer[..size], &key, ivec);
if let Err(res) = out_file.write(&encrypted_buffer[..size]) {
println!("Error writing file! {:?}", res);
return Err(res);
}
}
Ok(total_size)
Ok(())
}
#[cfg(test)]
@ -92,10 +84,7 @@ mod tests {
let size = out_file.read_to_end(&mut buf).unwrap();
assert_eq!(
buf[..size],
[
66, 54, 56, 212, 142, 110, 105, 158, 116, 82, 120, 53, 199, 78, 76, 95, 204, 148,
226, 94, 150, 182, 82, 197, 248, 146, 26, 24, 247, 117, 120, 197
]
[66, 54, 56, 212, 142, 110, 105, 158, 116, 82, 120, 53]
);
remove_file(in_path).unwrap();
remove_file(out_path).unwrap();

View File

@ -7,7 +7,7 @@ use solana_sdk::hash::Hash;
use std::io;
use std::mem::size_of;
use solana_sdk::storage_program::ENTRIES_PER_SEGMENT;
use crate::storage_stage::ENTRIES_PER_SEGMENT;
// Encrypt a file with multiple starting IV states, determined by ivecs.len()
//
@ -15,7 +15,7 @@ use solana_sdk::storage_program::ENTRIES_PER_SEGMENT;
// and return the vec of sha states
pub fn chacha_cbc_encrypt_file_many_keys(
in_path: &str,
segment: u64,
slice: u64,
ivecs: &mut [u8],
samples: &[u64],
) -> io::Result<Vec<Hash>> {
@ -36,7 +36,7 @@ pub fn chacha_cbc_encrypt_file_many_keys(
let mut sha_states = vec![0; num_keys * size_of::<Hash>()];
let mut int_sha_states = vec![0; num_keys * 112];
let keys: Vec<u8> = vec![0; num_keys * CHACHA_KEY_SIZE]; // keys not used ATM, uniqueness comes from IV
let mut entry = segment;
let mut entry = slice;
let mut total_entries = 0;
let mut total_entry_len = 0;
let mut time: f32 = 0.0;
@ -51,8 +51,8 @@ pub fn chacha_cbc_encrypt_file_many_keys(
) {
Ok((num_entries, entry_len)) => {
info!(
"encrypting segment: {} num_entries: {} entry_len: {}",
segment, num_entries, entry_len
"encrypting slice: {} num_entries: {} entry_len: {}",
slice, num_entries, entry_len
);
let entry_len_usz = entry_len as usize;
unsafe {
@ -74,10 +74,10 @@ pub fn chacha_cbc_encrypt_file_many_keys(
total_entries += num_entries;
entry += num_entries;
debug!(
"total entries: {} entry: {} segment: {} entries_per_segment: {}",
total_entries, entry, segment, ENTRIES_PER_SEGMENT
"total entries: {} entry: {} slice: {} entries_per_slice: {}",
total_entries, entry, slice, ENTRIES_PER_SEGMENT
);
if (entry - segment) >= ENTRIES_PER_SEGMENT {
if (entry - slice) >= ENTRIES_PER_SEGMENT {
break;
}
}

View File

@ -10,17 +10,17 @@ use crate::ledger::LEDGER_DATA_FILE;
use crate::result::Result;
use crate::rpc_request::{RpcClient, RpcRequest};
use crate::service::Service;
use crate::storage_stage::ENTRIES_PER_SEGMENT;
use crate::store_ledger_stage::StoreLedgerStage;
use crate::streamer::BlobReceiver;
use crate::thin_client::{retry_get_balance, ThinClient};
use crate::thin_client::retry_get_balance;
use crate::window_service::window_service;
use rand::thread_rng;
use rand::Rng;
use solana_drone::drone::{request_airdrop_transaction, DRONE_PORT};
use solana_sdk::hash::{Hash, Hasher};
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::storage_program::StorageTransaction;
use solana_sdk::storage_program::{get_segment_from_entry, ENTRIES_PER_SEGMENT};
use solana_sdk::transaction::Transaction;
use std::fs::File;
use std::io;
@ -46,7 +46,6 @@ pub struct Replicator {
t_window: JoinHandle<()>,
pub retransmit_receiver: BlobReceiver,
exit: Arc<AtomicBool>,
entry_height: u64,
}
pub fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result<Hash> {
@ -84,23 +83,6 @@ pub fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result<Hash> {
Ok(hasher.result())
}
fn get_entry_heights_from_last_id(
signature: &ring::signature::Signature,
storage_entry_height: u64,
) -> (u64, u64) {
let signature_vec = signature.as_ref();
let mut segment_index = u64::from(signature_vec[0])
| (u64::from(signature_vec[1]) << 8)
| (u64::from(signature_vec[1]) << 16)
| (u64::from(signature_vec[2]) << 24);
let max_segment_index = get_segment_from_entry(storage_entry_height);
segment_index %= max_segment_index as u64;
let entry_height = segment_index * ENTRIES_PER_SEGMENT;
let max_entry_height = entry_height + ENTRIES_PER_SEGMENT;
(entry_height, max_entry_height)
}
impl Replicator {
#[allow(clippy::new_ret_no_self)]
pub fn new(
@ -144,18 +126,53 @@ impl Replicator {
);
info!("polling for leader");
let leader = Self::poll_for_leader(&cluster_info);
let leader;
loop {
if let Some(l) = cluster_info.read().unwrap().get_gossip_top_leader() {
leader = l.clone();
break;
}
sleep(Duration::from_millis(900));
info!("{}", cluster_info.read().unwrap().node_info_trace());
}
info!("Got leader: {:?}", leader);
let (storage_last_id, storage_entry_height) =
Self::poll_for_last_id_and_entry_height(&cluster_info);
let mut storage_last_id;
let mut storage_entry_height;
loop {
let rpc_client = {
let cluster_info = cluster_info.read().unwrap();
let rpc_peers = cluster_info.rpc_peers();
info!("rpc peers: {:?}", rpc_peers);
let node_idx = thread_rng().gen_range(0, rpc_peers.len());
RpcClient::new_from_socket(rpc_peers[node_idx].rpc)
};
storage_last_id = RpcRequest::GetStorageMiningLastId
.make_rpc_request(&rpc_client, 2, None)
.expect("rpc request")
.to_string();
storage_entry_height = RpcRequest::GetStorageMiningEntryHeight
.make_rpc_request(&rpc_client, 2, None)
.expect("rpc request")
.as_u64()
.unwrap();
if storage_entry_height != 0 {
break;
}
}
let signature = keypair.sign(storage_last_id.as_ref());
let (entry_height, max_entry_height) =
get_entry_heights_from_last_id(&signature, storage_entry_height);
info!("replicating entry_height: {}", entry_height);
let signature = signature.as_ref();
let block_index = u64::from(signature[0])
| (u64::from(signature[1]) << 8)
| (u64::from(signature[1]) << 16)
| (u64::from(signature[2]) << 24);
let mut entry_height = block_index * ENTRIES_PER_SEGMENT;
entry_height %= storage_entry_height;
let max_entry_height = entry_height + ENTRIES_PER_SEGMENT;
let repair_socket = Arc::new(node.sockets.repair);
let mut blob_sockets: Vec<Arc<UdpSocket>> =
@ -197,133 +214,7 @@ impl Replicator {
let mut client = mk_client(&leader);
Self::get_airdrop_tokens(&mut client, keypair, &leader_info);
info!("Done downloading ledger at {}", ledger_path.unwrap());
let ledger_path = Path::new(ledger_path.unwrap());
let ledger_data_file_encrypted = ledger_path.join(format!("{}.enc", LEDGER_DATA_FILE));
let mut sampling_offsets = Vec::new();
#[cfg(not(feature = "chacha"))]
sampling_offsets.push(0);
#[cfg(feature = "chacha")]
{
use crate::storage_stage::NUM_STORAGE_SAMPLES;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaChaRng;
let ledger_data_file = ledger_path.join(LEDGER_DATA_FILE);
let mut ivec = [0u8; 64];
ivec.copy_from_slice(signature.as_ref());
let num_encrypted_bytes =
chacha_cbc_encrypt_file(&ledger_data_file, &ledger_data_file_encrypted, &mut ivec)?;
let num_chacha_blocks = num_encrypted_bytes / CHACHA_BLOCK_SIZE;
let mut rng_seed = [0u8; 32];
rng_seed.copy_from_slice(&signature.as_ref()[0..32]);
let mut rng = ChaChaRng::from_seed(rng_seed);
for _ in 0..NUM_STORAGE_SAMPLES {
sampling_offsets.push(rng.gen_range(0, num_chacha_blocks) as u64);
}
}
info!("Done encrypting the ledger");
match sample_file(&ledger_data_file_encrypted, &sampling_offsets) {
Ok(hash) => {
let last_id = client.get_last_id();
info!("sampled hash: {}", hash);
let mut tx = Transaction::storage_new_mining_proof(
&keypair,
hash,
last_id,
entry_height,
Signature::new(signature.as_ref()),
);
client
.retry_transfer(&keypair, &mut tx, 10)
.expect("transfer didn't work!");
}
Err(e) => info!("Error occurred while sampling: {:?}", e),
}
Ok(Self {
gossip_service,
fetch_stage,
store_ledger_stage,
t_window,
retransmit_receiver,
exit,
entry_height,
})
}
pub fn close(self) {
self.exit.store(true, Ordering::Relaxed);
self.join()
}
pub fn join(self) {
self.gossip_service.join().unwrap();
self.fetch_stage.join().unwrap();
self.t_window.join().unwrap();
self.store_ledger_stage.join().unwrap();
// Drain the queue here to prevent self.retransmit_receiver from being dropped
// before the window_service thread is joined
let mut retransmit_queue_count = 0;
while let Ok(_blob) = self.retransmit_receiver.recv_timeout(Duration::new(1, 0)) {
retransmit_queue_count += 1;
}
debug!("retransmit channel count: {}", retransmit_queue_count);
}
pub fn entry_height(&self) -> u64 {
self.entry_height
}
fn poll_for_leader(cluster_info: &Arc<RwLock<ClusterInfo>>) -> NodeInfo {
loop {
if let Some(l) = cluster_info.read().unwrap().get_gossip_top_leader() {
return l.clone();
}
sleep(Duration::from_millis(900));
info!("{}", cluster_info.read().unwrap().node_info_trace());
}
}
fn poll_for_last_id_and_entry_height(cluster_info: &Arc<RwLock<ClusterInfo>>) -> (String, u64) {
loop {
let rpc_client = {
let cluster_info = cluster_info.read().unwrap();
let rpc_peers = cluster_info.rpc_peers();
debug!("rpc peers: {:?}", rpc_peers);
let node_idx = thread_rng().gen_range(0, rpc_peers.len());
RpcClient::new_from_socket(rpc_peers[node_idx].rpc)
};
let storage_last_id = RpcRequest::GetStorageMiningLastId
.make_rpc_request(&rpc_client, 2, None)
.expect("rpc request")
.to_string();
let storage_entry_height = RpcRequest::GetStorageMiningEntryHeight
.make_rpc_request(&rpc_client, 2, None)
.expect("rpc request")
.as_u64()
.unwrap();
if storage_entry_height != 0 {
return (storage_last_id, storage_entry_height);
}
info!("max entry_height: {}", storage_entry_height);
sleep(Duration::from_secs(3));
}
}
fn get_airdrop_tokens(client: &mut ThinClient, keypair: &Keypair, leader_info: &NodeInfo) {
if retry_get_balance(client, &keypair.pubkey(), None).is_none() {
if retry_get_balance(&mut client, &keypair.pubkey(), None).is_none() {
let mut drone_addr = leader_info.tpu;
drone_addr.set_port(DRONE_PORT);
@ -348,6 +239,63 @@ impl Replicator {
}
};
}
info!("Done downloading ledger at {}", ledger_path.unwrap());
let ledger_path = Path::new(ledger_path.unwrap());
let ledger_data_file_encrypted = ledger_path.join(format!("{}.enc", LEDGER_DATA_FILE));
#[cfg(feature = "chacha")]
{
let ledger_data_file = ledger_path.join(LEDGER_DATA_FILE);
let mut ivec = [0u8; CHACHA_BLOCK_SIZE];
ivec[0..4].copy_from_slice(&[2, 3, 4, 5]);
chacha_cbc_encrypt_file(&ledger_data_file, &ledger_data_file_encrypted, &mut ivec)?;
}
info!("Done encrypting the ledger");
let sampling_offsets = [0, 1, 2, 3];
match sample_file(&ledger_data_file_encrypted, &sampling_offsets) {
Ok(hash) => {
let last_id = client.get_last_id();
info!("sampled hash: {}", hash);
let tx =
Transaction::storage_new_mining_proof(&keypair, hash, last_id, entry_height);
client.transfer_signed(&tx).expect("transfer didn't work!");
}
Err(e) => info!("Error occurred while sampling: {:?}", e),
}
Ok(Self {
gossip_service,
fetch_stage,
store_ledger_stage,
t_window,
retransmit_receiver,
exit,
})
}
pub fn close(self) {
self.exit.store(true, Ordering::Relaxed);
self.join()
}
pub fn join(self) {
self.gossip_service.join().unwrap();
self.fetch_stage.join().unwrap();
self.t_window.join().unwrap();
self.store_ledger_stage.join().unwrap();
// Drain the queue here to prevent self.retransmit_receiver from being dropped
// before the window_service thread is joined
let mut retransmit_queue_count = 0;
while let Ok(_blob) = self.retransmit_receiver.recv_timeout(Duration::new(1, 0)) {
retransmit_queue_count += 1;
}
debug!("retransmit channel count: {}", retransmit_queue_count);
}
}

View File

@ -343,11 +343,11 @@ impl JsonRpcRequestProcessor {
Ok(self.bank.transaction_count() as u64)
}
fn get_storage_mining_last_id(&self) -> Result<String> {
let id = self.bank.get_storage_last_id();
let id = self.bank.storage_state.get_last_id();
Ok(bs58::encode(id).into_string())
}
fn get_storage_mining_entry_height(&self) -> Result<u64> {
let entry_height = self.bank.get_storage_entry_height();
let entry_height = self.bank.storage_state.get_entry_height();
Ok(entry_height)
}
fn get_storage_pubkeys_for_entry_height(&self, entry_height: u64) -> Result<Vec<Pubkey>> {

View File

@ -4,24 +4,21 @@
#[cfg(all(feature = "chacha", feature = "cuda"))]
use crate::chacha_cuda::chacha_cbc_encrypt_file_many_keys;
use crate::client::mk_client;
use crate::cluster_info::ClusterInfo;
use crate::entry::EntryReceiver;
use crate::result::{Error, Result};
use crate::service::Service;
use crate::thin_client::ThinClient;
use bincode::deserialize;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaChaRng;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Keypair;
use solana_sdk::signature::Signature;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::storage_program;
use solana_sdk::storage_program::{get_segment_from_entry, StorageProgram, StorageTransaction};
use solana_sdk::system_transaction::SystemTransaction;
use solana_sdk::transaction::Transaction;
use solana_sdk::storage_program::StorageProgram;
use solana_sdk::vote_program;
use std::collections::HashSet;
use std::mem::size_of;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::RecvTimeoutError;
use std::sync::{Arc, RwLock};
@ -37,8 +34,10 @@ type ReplicatorMap = Vec<HashSet<Pubkey>>;
#[derive(Default)]
pub struct StorageStateInner {
storage_results: StorageResults,
pub storage_keys: StorageKeys,
storage_keys: StorageKeys,
replicator_map: ReplicatorMap,
storage_last_id: Hash,
entry_height: u64,
}
#[derive(Default)]
@ -56,12 +55,17 @@ macro_rules! cross_boundary {
};
}
const NUM_HASHES_FOR_STORAGE_ROTATE: u64 = 32;
const NUM_HASHES_FOR_STORAGE_ROTATE: u64 = 1024;
// TODO: some way to dynamically size NUM_IDENTITIES
const NUM_IDENTITIES: usize = 1024;
pub const NUM_STORAGE_SAMPLES: usize = 4;
const NUM_SAMPLES: usize = 4;
pub const ENTRIES_PER_SEGMENT: u64 = 16;
const KEY_SIZE: usize = 64;
pub fn get_segment_from_entry(entry_height: u64) -> u64 {
entry_height / ENTRIES_PER_SEGMENT
}
fn get_identity_index_from_signature(key: &Signature) -> usize {
let rkey = key.as_ref();
let mut res: usize = (rkey[0] as usize)
@ -82,6 +86,8 @@ impl StorageState {
storage_keys,
storage_results,
replicator_map,
entry_height: 0,
storage_last_id: Hash::default(),
};
StorageState {
@ -89,17 +95,29 @@ impl StorageState {
}
}
pub fn get_mining_key(&self, key: &Signature) -> Vec<u8> {
let idx = get_identity_index_from_signature(key);
self.state.read().unwrap().storage_keys[idx..idx + KEY_SIZE].to_vec()
}
pub fn get_mining_result(&self, key: &Signature) -> Hash {
let idx = get_identity_index_from_signature(key);
self.state.read().unwrap().storage_results[idx]
}
pub fn get_last_id(&self) -> Hash {
self.state.read().unwrap().storage_last_id
}
pub fn get_entry_height(&self) -> u64 {
self.state.read().unwrap().entry_height
}
pub fn get_pubkeys_for_entry_height(&self, entry_height: u64) -> Vec<Pubkey> {
// TODO: keep track of age?
const MAX_PUBKEYS_TO_RETURN: usize = 5;
let index = get_segment_from_entry(entry_height);
let index = (entry_height / ENTRIES_PER_SEGMENT) as usize;
let replicator_map = &self.state.read().unwrap().replicator_map;
info!("replicator_map: {:?}", replicator_map);
if index < replicator_map.len() {
replicator_map[index]
.iter()
@ -120,17 +138,17 @@ impl StorageStage {
keypair: Arc<Keypair>,
exit: Arc<AtomicBool>,
entry_height: u64,
cluster_info: &Arc<RwLock<ClusterInfo>>,
) -> Self {
debug!("storage_stage::new: entry_height: {}", entry_height);
storage_state.state.write().unwrap().entry_height = entry_height;
let storage_state_inner = storage_state.state.clone();
let ledger_path = ledger_path.map(String::from);
let cluster_info = cluster_info.clone();
let t_storage_mining_verifier = Builder::new()
.name("solana-storage-mining-verify-stage".to_string())
.spawn(move || {
let exit = exit.clone();
let mut poh_height = 0;
let mut current_key = 0;
let mut entry_height = entry_height;
loop {
if let Some(ref ledger_path_str) = ledger_path {
@ -141,7 +159,7 @@ impl StorageStage {
ledger_path_str,
&mut poh_height,
&mut entry_height,
&cluster_info,
&mut current_key,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
@ -162,53 +180,12 @@ impl StorageStage {
}
}
fn create_storage_accounts(keypair: &Keypair, leader_client: &mut ThinClient) {
let last_id = leader_client.get_last_id();
match leader_client.get_account_userdata(&storage_program::system_id()) {
Ok(_) => {}
Err(e) => {
debug!("error storage userdata: {:?}", e);
let mut tx = Transaction::system_create(
keypair,
storage_program::system_id(),
last_id,
1,
16 * 1024,
storage_program::id(),
1,
);
if let Err(e) = leader_client.retry_transfer(keypair, &mut tx, 5) {
info!("Couldn't create storage account error: {}", e);
}
}
}
if leader_client
.get_account_userdata(&keypair.pubkey())
.is_err()
{
let mut tx = Transaction::system_create(
keypair,
keypair.pubkey(),
last_id,
1,
0,
storage_program::id(),
1,
);
if let Err(e) = leader_client.retry_transfer(keypair, &mut tx, 5) {
info!("Couldn't create storage account error: {}", e);
}
}
}
pub fn process_entry_crossing(
_state: &Arc<RwLock<StorageStateInner>>,
state: &Arc<RwLock<StorageStateInner>>,
keypair: &Arc<Keypair>,
_ledger_path: &str,
entry_id: Hash,
entry_height: u64,
cluster_info: &Arc<RwLock<ClusterInfo>>,
) -> Result<()> {
let mut seed = [0u8; 32];
let signature = keypair.sign(&entry_id.as_ref());
@ -217,33 +194,10 @@ impl StorageStage {
let mut rng = ChaChaRng::from_seed(seed);
if let Some(leader_data) = cluster_info.read().unwrap().leader_data() {
let mut leader_client = mk_client(leader_data);
Self::create_storage_accounts(keypair, &mut leader_client);
let last_id = leader_client.get_last_id();
debug!(
"advertising new storage last id entry_height: {}!",
entry_height
);
let mut tx = Transaction::storage_new_advertise_last_id(
keypair,
entry_id,
last_id,
entry_height,
);
if let Err(e) = leader_client.retry_transfer(keypair, &mut tx, 5) {
info!(
"Couldn't advertise my storage last id to the network! error: {}",
e
);
}
}
state.write().unwrap().entry_height = entry_height;
// Regenerate the answers
let num_segments = get_segment_from_entry(entry_height);
let num_segments = (entry_height / ENTRIES_PER_SEGMENT) as usize;
if num_segments == 0 {
info!("Ledger has 0 segments!");
return Ok(());
@ -257,7 +211,7 @@ impl StorageStage {
);
let mut samples = vec![];
for _ in 0..NUM_STORAGE_SAMPLES {
for _ in 0..NUM_SAMPLES {
samples.push(rng.gen_range(0, 10));
}
debug!("generated samples: {:?}", samples);
@ -271,7 +225,7 @@ impl StorageStage {
// Should be overwritten by the vote signatures which replace the
// key values by the time it runs again.
let mut statew = _state.write().unwrap();
let mut statew = state.write().unwrap();
match chacha_cbc_encrypt_file_many_keys(
_ledger_path,
@ -294,46 +248,6 @@ impl StorageStage {
Ok(())
}
fn process_storage_program(
instruction_idx: usize,
storage_state: &Arc<RwLock<StorageStateInner>>,
entry_height: u64,
tx: &Transaction,
) {
match deserialize(&tx.instructions[instruction_idx].userdata) {
Ok(StorageProgram::SubmitMiningProof {
entry_height: proof_entry_height,
..
}) => {
if proof_entry_height < entry_height {
let mut statew = storage_state.write().unwrap();
let max_segment_index = get_segment_from_entry(entry_height);
if statew.replicator_map.len() < max_segment_index {
statew
.replicator_map
.resize(max_segment_index, HashSet::new());
}
let proof_segment_index = get_segment_from_entry(proof_entry_height);
if proof_segment_index < statew.replicator_map.len() {
statew.replicator_map[proof_segment_index].insert(tx.account_keys[0]);
}
}
debug!(
"storage proof: max entry_height: {} proof height: {}",
entry_height, proof_entry_height
);
}
Ok(StorageProgram::AdvertiseStorageLastId { id, .. }) => {
debug!("id: {}", id);
}
Ok(StorageProgram::ClaimStorageReward { .. }) => {}
Ok(StorageProgram::ProofValidation { .. }) => {}
Err(e) => {
info!("error: {:?}", e);
}
}
}
pub fn process_entries(
keypair: &Arc<Keypair>,
storage_state: &Arc<RwLock<StorageStateInner>>,
@ -341,24 +255,54 @@ impl StorageStage {
ledger_path: &str,
poh_height: &mut u64,
entry_height: &mut u64,
cluster_info: &Arc<RwLock<ClusterInfo>>,
current_key_idx: &mut usize,
) -> Result<()> {
let timeout = Duration::new(1, 0);
let entries = entry_receiver.recv_timeout(timeout)?;
info!(
"processing: {} entries height: {} poh_height: {}",
entries.len(),
entry_height,
poh_height
);
for entry in entries {
// Go through the transactions, find votes, and use them to update
// the storage_keys with their signatures.
for tx in entry.transactions {
for (i, program_id) in tx.program_ids.iter().enumerate() {
if storage_program::check_id(&program_id) {
Self::process_storage_program(i, storage_state, *entry_height, &tx);
if vote_program::check_id(&program_id) {
debug!(
"generating storage_keys from votes current_key_idx: {}",
*current_key_idx
);
let storage_keys = &mut storage_state.write().unwrap().storage_keys;
storage_keys[*current_key_idx..*current_key_idx + size_of::<Signature>()]
.copy_from_slice(tx.signatures[0].as_ref());
*current_key_idx += size_of::<Signature>();
*current_key_idx %= storage_keys.len();
} else if storage_program::check_id(&program_id) {
match deserialize(&tx.instructions[i].userdata) {
Ok(StorageProgram::SubmitMiningProof {
entry_height: proof_entry_height,
..
}) => {
if proof_entry_height < *entry_height {
let mut statew = storage_state.write().unwrap();
let max_segment_index =
(*entry_height / ENTRIES_PER_SEGMENT) as usize;
if statew.replicator_map.len() <= max_segment_index {
statew
.replicator_map
.resize(max_segment_index, HashSet::new());
}
let proof_segment_index =
(proof_entry_height / ENTRIES_PER_SEGMENT) as usize;
if proof_segment_index < statew.replicator_map.len() {
statew.replicator_map[proof_segment_index]
.insert(tx.account_keys[0]);
}
}
debug!("storage proof: entry_height: {}", entry_height);
}
Err(e) => {
info!("error: {:?}", e);
}
}
}
}
}
@ -373,7 +317,6 @@ impl StorageStage {
&ledger_path,
entry.id,
*entry_height,
cluster_info,
)?;
}
*entry_height += 1;
@ -393,8 +336,6 @@ impl Service for StorageStage {
#[cfg(test)]
mod tests {
use crate::cluster_info::ClusterInfo;
use crate::cluster_info::NodeInfo;
use crate::entry::Entry;
use crate::ledger::make_tiny_test_entries;
use crate::ledger::{create_tmp_sample_ledger, LedgerWriter};
@ -406,24 +347,18 @@ mod tests {
use rayon::prelude::*;
use solana_sdk::hash::Hash;
use solana_sdk::hash::Hasher;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
use solana_sdk::storage_program::{StorageTransaction, ENTRIES_PER_SEGMENT};
use solana_sdk::transaction::Transaction;
use solana_sdk::vote_program::Vote;
use solana_sdk::vote_transaction::VoteTransaction;
use std::cmp::{max, min};
use std::fs::remove_dir_all;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
fn test_cluster_info(id: Pubkey) -> Arc<RwLock<ClusterInfo>> {
let node_info = NodeInfo::new_localhost(id, 0);
let cluster_info = ClusterInfo::new(node_info);
Arc::new(RwLock::new(cluster_info))
}
#[test]
fn test_storage_stage_none_ledger() {
let keypair = Arc::new(Keypair::new());
@ -431,9 +366,6 @@ mod tests {
let (_storage_entry_sender, storage_entry_receiver) = channel();
let storage_state = StorageState::new();
let cluster_info = test_cluster_info(keypair.pubkey());
let storage_stage = StorageStage::new(
&storage_state,
storage_entry_receiver,
@ -441,7 +373,6 @@ mod tests {
keypair,
exit.clone(),
0,
&cluster_info,
);
exit.store(true, Ordering::Relaxed);
storage_stage.join().unwrap();
@ -470,9 +401,6 @@ mod tests {
let (storage_entry_sender, storage_entry_receiver) = channel();
let storage_state = StorageState::new();
let cluster_info = test_cluster_info(keypair.pubkey());
let storage_stage = StorageStage::new(
&storage_state,
storage_entry_receiver,
@ -480,7 +408,6 @@ mod tests {
keypair,
exit.clone(),
0,
&cluster_info,
);
storage_entry_sender.send(entries.clone()).unwrap();
@ -517,7 +444,7 @@ mod tests {
}
#[test]
fn test_storage_stage_process_mining_proof_entries() {
fn test_storage_stage_process_vote_entries() {
solana_logger::setup();
let keypair = Arc::new(Keypair::new());
let exit = Arc::new(AtomicBool::new(false));
@ -530,15 +457,13 @@ mod tests {
1,
);
let entries = make_tiny_test_entries(ENTRIES_PER_SEGMENT as usize + 2);
let entries = make_tiny_test_entries(128);
{
let mut writer = LedgerWriter::open(&ledger_path, true).unwrap();
writer.write_entries(&entries.clone()).unwrap();
// drops writer, flushes buffers
}
let cluster_info = test_cluster_info(keypair.pubkey());
let (storage_entry_sender, storage_entry_receiver) = channel();
let storage_state = StorageState::new();
let storage_stage = StorageStage::new(
@ -548,41 +473,44 @@ mod tests {
keypair,
exit.clone(),
0,
&cluster_info,
);
storage_entry_sender.send(entries.clone()).unwrap();
let mut reference_keys;
{
let keys = &storage_state.state.read().unwrap().storage_keys;
reference_keys = vec![0; keys.len()];
reference_keys.copy_from_slice(keys);
}
let mut vote_txs: Vec<Transaction> = Vec::new();
let vote = Vote {
tick_height: 123456,
};
let keypair = Keypair::new();
let last_id = Hash::default();
let signature = Signature::new(keypair.sign(&last_id.as_ref()).as_ref());
let storage_tx = Transaction::storage_new_mining_proof(
&keypair,
Hash::default(),
Hash::default(),
0,
signature,
);
let txs = vec![storage_tx];
let storage_entries = vec![Entry::new(&Hash::default(), 0, 1, txs)];
storage_entry_sender.send(storage_entries).unwrap();
let vote_tx = VoteTransaction::vote_new(&keypair, vote, Hash::default(), 1);
vote_txs.push(vote_tx);
let vote_entries = vec![Entry::new(&Hash::default(), 0, 1, vote_txs)];
storage_entry_sender.send(vote_entries).unwrap();
for _ in 0..5 {
if storage_state.get_pubkeys_for_entry_height(0).len() != 0 {
break;
{
let keys = &storage_state.state.read().unwrap().storage_keys;
if keys[..] != *reference_keys.as_slice() {
break;
}
}
sleep(Duration::new(1, 0));
info!("pubkeys are empty");
}
info!("joining..?");
debug!("joining..?");
exit.store(true, Ordering::Relaxed);
storage_stage.join().unwrap();
assert_eq!(
storage_state.get_pubkeys_for_entry_height(0)[0],
keypair.pubkey()
);
{
let keys = &storage_state.state.read().unwrap().storage_keys;
assert_ne!(keys[..], *reference_keys);
}
remove_dir_all(ledger_path).unwrap();
}

View File

@ -127,7 +127,6 @@ impl Tvu {
keypair,
exit.clone(),
entry_height,
&cluster_info,
);
Tvu {

View File

@ -58,7 +58,7 @@ fn test_replicator_startup() {
let validator = Fullnode::new(
validator_node,
&validator_ledger_path,
validator_keypair.clone(),
validator_keypair,
vote_account_keypair,
Some(leader_info.gossip),
false,
@ -70,17 +70,9 @@ fn test_replicator_startup() {
let bob = Keypair::new();
for _ in 0..10 {
let last_id = leader_client.get_last_id();
leader_client
.transfer(1, &mint.keypair(), bob.pubkey(), &last_id)
.unwrap();
sleep(Duration::from_millis(200));
}
let last_id = leader_client.get_last_id();
leader_client
.transfer(10, &mint.keypair(), validator_keypair.pubkey(), &last_id)
.transfer(1, &mint.keypair(), bob.pubkey(), &last_id)
.unwrap();
let replicator_keypair = Keypair::new();
@ -143,14 +135,10 @@ fn test_replicator_startup() {
{
use solana::rpc_request::{RpcClient, RpcRequest};
debug!(
"looking for pubkeys for entry: {}",
replicator.entry_height()
);
let rpc_client = RpcClient::new_from_socket(validator_node_info.rpc);
let mut non_zero_pubkeys = false;
for _ in 0..30 {
let params = json!([replicator.entry_height()]);
let params = json!([0]);
let pubkeys = RpcRequest::GetStoragePubkeysForEntryHeight
.make_rpc_request(&rpc_client, 1, Some(params))
.unwrap();