Storage stage updates

* Remove logging init from storage program: saw a crash in a test
  indicating the logger being init'ed twice.
* Add entry_height mining proof to indicate which segment the result is
  for
* Add an interface to get storage miner pubkeys for a given entry_height
* Add an interface to get the current storage mining entry_height
* Set the tvu socket to 0.0.0.0:0 in replicator to stop getting entries
  after the desired ledger segment is downloaded.
* Use signature of PoH height to determine which block to download for
  replicator.
This commit is contained in:
Stephen Akridge 2018-12-10 11:38:29 -08:00 committed by sakridge
parent 3ce3f1adc1
commit 7cdbbfa88e
12 changed files with 287 additions and 86 deletions

View File

@ -14,7 +14,6 @@ use solana_sdk::account::KeyedAccount;
use solana_sdk::native_program::ProgramError;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::storage_program::*;
use std::sync::{Once, ONCE_INIT};
solana_entrypoint!(entrypoint);
fn entrypoint(
@ -23,12 +22,6 @@ fn entrypoint(
data: &[u8],
_tick_height: u64,
) -> Result<(), ProgramError> {
static INIT: Once = ONCE_INIT;
INIT.call_once(|| {
// env_logger can only be initialized once
env_logger::init();
});
// accounts_keys[0] must be signed
if keyed_accounts[0].signer_key().is_none() {
info!("account[0] is unsigned");
@ -37,8 +30,14 @@ fn entrypoint(
if let Ok(syscall) = deserialize(data) {
match syscall {
StorageProgram::SubmitMiningProof { sha_state } => {
info!("Mining proof submitted with state {:?}", sha_state);
StorageProgram::SubmitMiningProof {
sha_state,
entry_height,
} => {
info!(
"Mining proof submitted with state {:?} entry_height: {}",
sha_state, entry_height
);
}
}
Ok(())

View File

@ -5,7 +5,7 @@ use transaction::Transaction;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum StorageProgram {
SubmitMiningProof { sha_state: Hash },
SubmitMiningProof { sha_state: Hash, entry_height: u64 },
}
pub const STORAGE_PROGRAM_ID: [u8; 32] = [
@ -22,12 +22,25 @@ pub fn id() -> Pubkey {
}
pub trait StorageTransaction {
fn storage_new_mining_proof(from_keypair: &Keypair, sha_state: Hash, last_id: Hash) -> Self;
fn storage_new_mining_proof(
from_keypair: &Keypair,
sha_state: Hash,
last_id: Hash,
entry_height: u64,
) -> Self;
}
impl StorageTransaction for Transaction {
fn storage_new_mining_proof(from_keypair: &Keypair, sha_state: Hash, last_id: Hash) -> Self {
let program = StorageProgram::SubmitMiningProof { sha_state };
fn storage_new_mining_proof(
from_keypair: &Keypair,
sha_state: Hash,
last_id: Hash,
entry_height: u64,
) -> Self {
let program = StorageProgram::SubmitMiningProof {
sha_state,
entry_height,
};
Transaction::new(
from_keypair,
&[from_keypair.pubkey()],

View File

@ -486,6 +486,11 @@ impl Bank {
.expect("no last_id has been set")
}
pub fn get_pubkeys_for_entry_height(&self, entry_height: u64) -> Vec<Pubkey> {
self.storage_state
.get_pubkeys_for_entry_height(entry_height)
}
/// Store the given signature. The bank will reject any transaction with the same signature.
fn reserve_signature(signatures: &mut SignatureStatusMap, signature: &Signature) -> Result<()> {
if let Some(_result) = signatures.get(signature) {

View File

@ -7,7 +7,7 @@ use solana_sdk::hash::Hash;
use std::io;
use std::mem::size_of;
use crate::storage_stage::ENTRIES_PER_SLICE;
use crate::storage_stage::ENTRIES_PER_SEGMENT;
// Encrypt a file with multiple starting IV states, determined by ivecs.len()
//
@ -44,8 +44,11 @@ pub fn chacha_cbc_encrypt_file_many_keys(
chacha_init_sha_state(int_sha_states.as_mut_ptr(), num_keys as u32);
}
loop {
match ledger_window.get_entries_bytes(entry, ENTRIES_PER_SLICE - total_entries, &mut buffer)
{
match ledger_window.get_entries_bytes(
entry,
ENTRIES_PER_SEGMENT - total_entries,
&mut buffer,
) {
Ok((num_entries, entry_len)) => {
info!(
"encrypting slice: {} num_entries: {} entry_len: {}",
@ -72,9 +75,9 @@ pub fn chacha_cbc_encrypt_file_many_keys(
entry += num_entries;
debug!(
"total entries: {} entry: {} slice: {} entries_per_slice: {}",
total_entries, entry, slice, ENTRIES_PER_SLICE
total_entries, entry, slice, ENTRIES_PER_SEGMENT
);
if (entry - slice) >= ENTRIES_PER_SLICE {
if (entry - slice) >= ENTRIES_PER_SEGMENT {
break;
}
}

View File

@ -350,9 +350,10 @@ pub fn process_blob(
// Check if we ran over the last wanted entry
if consumed > max_ix {
let extra_unwanted_entries_len = consumed - (max_ix + 1);
let consumed_entries_len = consumed_entries.len();
consumed_entries.truncate(consumed_entries_len - extra_unwanted_entries_len as usize);
let extra_unwanted_entries_len =
cmp::min(consumed_entries_len, (consumed - (max_ix + 1)) as usize);
consumed_entries.truncate(consumed_entries_len - extra_unwanted_entries_len);
done.store(true, Ordering::Relaxed);
}
}

View File

@ -295,7 +295,7 @@ impl Fullnode {
&bank,
entry_height,
*last_entry_id,
cluster_info.clone(),
&cluster_info,
sockets,
Some(ledger_path),
db_ledger.clone(),
@ -459,7 +459,7 @@ impl Fullnode {
&self.bank,
entry_height,
last_entry_id,
self.cluster_info.clone(),
&self.cluster_info,
sockets,
Some(&self.ledger_path),
self.db_ledger.clone(),

View File

@ -10,6 +10,7 @@ use crate::ledger::LEDGER_DATA_FILE;
use crate::result::Result;
use crate::rpc_request::{RpcClient, RpcRequest};
use crate::service::Service;
use crate::storage_stage::ENTRIES_PER_SEGMENT;
use crate::store_ledger_stage::StoreLedgerStage;
use crate::streamer::BlobReceiver;
use crate::thin_client::retry_get_balance;
@ -93,12 +94,9 @@ impl Replicator {
let exit = Arc::new(AtomicBool::new(false));
let done = Arc::new(AtomicBool::new(false));
let entry_height = 0;
let max_entry_height = 1;
info!("Replicator: id: {}", keypair.pubkey());
info!("Creating cluster info....");
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(node.info)));
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(node.info.clone())));
let leader_pubkey = leader_info.id;
{
@ -141,20 +139,40 @@ impl Replicator {
info!("Got leader: {:?}", leader);
let rpc_client = {
let cluster_info = cluster_info.read().unwrap();
let rpc_peers = cluster_info.rpc_peers();
info!("rpc peers: {:?}", rpc_peers);
let node_idx = thread_rng().gen_range(0, rpc_peers.len());
RpcClient::new_from_socket(rpc_peers[node_idx].rpc)
};
let mut storage_last_id;
let mut storage_entry_height;
loop {
let rpc_client = {
let cluster_info = cluster_info.read().unwrap();
let rpc_peers = cluster_info.rpc_peers();
info!("rpc peers: {:?}", rpc_peers);
let node_idx = thread_rng().gen_range(0, rpc_peers.len());
RpcClient::new_from_socket(rpc_peers[node_idx].rpc)
};
let storage_last_id = RpcRequest::GetStorageMiningLastId
.make_rpc_request(&rpc_client, 2, None)
.expect("rpc request")
.to_string();
let _signature = keypair.sign(storage_last_id.as_ref());
// TODO: use this signature to pick the key and block
storage_last_id = RpcRequest::GetStorageMiningLastId
.make_rpc_request(&rpc_client, 2, None)
.expect("rpc request")
.to_string();
storage_entry_height = RpcRequest::GetStorageMiningEntryHeight
.make_rpc_request(&rpc_client, 2, None)
.expect("rpc request")
.as_u64()
.unwrap();
if storage_entry_height != 0 {
break;
}
}
let signature = keypair.sign(storage_last_id.as_ref());
let signature = signature.as_ref();
let block_index = u64::from(signature[0])
| (u64::from(signature[1]) << 8)
| (u64::from(signature[1]) << 16)
| (u64::from(signature[2]) << 24);
let mut entry_height = block_index * ENTRIES_PER_SEGMENT;
entry_height %= storage_entry_height;
let max_entry_height = entry_height + ENTRIES_PER_SEGMENT;
let repair_socket = Arc::new(node.sockets.repair);
let mut blob_sockets: Vec<Arc<UdpSocket>> =
@ -187,6 +205,13 @@ impl Replicator {
sleep(Duration::from_millis(100));
}
let mut node_info = node.info.clone();
node_info.tvu = "0.0.0.0:0".parse().unwrap();
{
let mut cluster_info_w = cluster_info.write().unwrap();
cluster_info_w.insert_info(node_info);
}
let mut client = mk_client(&leader);
if retry_get_balance(&mut client, &keypair.pubkey(), None).is_none() {
@ -236,7 +261,8 @@ impl Replicator {
Ok(hash) => {
let last_id = client.get_last_id();
info!("sampled hash: {}", hash);
let tx = Transaction::storage_new_mining_proof(&keypair, hash, last_id);
let tx =
Transaction::storage_new_mining_proof(&keypair, hash, last_id, entry_height);
client.transfer_signed(&tx).expect("transfer didn't work!");
}
Err(e) => info!("Error occurred while sampling: {:?}", e),

View File

@ -155,6 +155,12 @@ build_rpc_trait! {
#[rpc(meta, name = "getStorageMiningLastId")]
fn get_storage_mining_last_id(&self, Self::Metadata) -> Result<String>;
#[rpc(meta, name = "getStorageMiningEntryHeight")]
fn get_storage_mining_entry_height(&self, Self::Metadata) -> Result<u64>;
#[rpc(meta, name = "getStoragePubkeysForEntryHeight")]
fn get_storage_pubkeys_for_entry_height(&self, Self::Metadata, u64) -> Result<Vec<Pubkey>>;
}
}
@ -284,6 +290,17 @@ impl RpcSol for RpcSolImpl {
fn get_storage_mining_last_id(&self, meta: Self::Metadata) -> Result<String> {
meta.request_processor.get_storage_mining_last_id()
}
fn get_storage_mining_entry_height(&self, meta: Self::Metadata) -> Result<u64> {
meta.request_processor.get_storage_mining_entry_height()
}
fn get_storage_pubkeys_for_entry_height(
&self,
meta: Self::Metadata,
entry_height: u64,
) -> Result<Vec<Pubkey>> {
meta.request_processor
.get_storage_pubkeys_for_entry_height(entry_height)
}
}
#[derive(Clone)]
pub struct JsonRpcRequestProcessor {
@ -322,6 +339,13 @@ impl JsonRpcRequestProcessor {
let id = self.bank.storage_state.get_last_id();
Ok(bs58::encode(id).into_string())
}
fn get_storage_mining_entry_height(&self) -> Result<u64> {
let entry_height = self.bank.storage_state.get_entry_height();
Ok(entry_height)
}
fn get_storage_pubkeys_for_entry_height(&self, entry_height: u64) -> Result<Vec<Pubkey>> {
Ok(self.bank.get_pubkeys_for_entry_height(entry_height))
}
}
fn get_leader_addr(cluster_info: &Arc<RwLock<ClusterInfo>>) -> Result<SocketAddr> {

View File

@ -54,6 +54,8 @@ pub enum RpcRequest {
SignVote,
DeregisterNode,
GetStorageMiningLastId,
GetStorageMiningEntryHeight,
GetStoragePubkeysForEntryHeight,
}
impl RpcRequest {
@ -97,6 +99,8 @@ impl RpcRequest {
RpcRequest::SignVote => "signVote",
RpcRequest::DeregisterNode => "deregisterNode",
RpcRequest::GetStorageMiningLastId => "getStorageMiningLastId",
RpcRequest::GetStorageMiningEntryHeight => "getStorageMiningEntryHeight",
RpcRequest::GetStoragePubkeysForEntryHeight => "getStoragePubkeysForEntryHeight",
};
let mut request = json!({
"jsonrpc": jsonrpc,

View File

@ -7,12 +7,17 @@ use crate::chacha_cuda::chacha_cbc_encrypt_file_many_keys;
use crate::entry::EntryReceiver;
use crate::result::{Error, Result};
use crate::service::Service;
use bincode::deserialize;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaChaRng;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Keypair;
use solana_sdk::signature::Signature;
use solana_sdk::storage_program;
use solana_sdk::storage_program::StorageProgram;
use solana_sdk::vote_program;
use std::collections::HashSet;
use std::mem::size_of;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::RecvTimeoutError;
@ -24,12 +29,20 @@ use std::time::Duration;
// Vec of [ledger blocks] x [keys]
type StorageResults = Vec<Hash>;
type StorageKeys = Vec<u8>;
type ReplicatorMap = Vec<HashSet<Pubkey>>;
#[derive(Default)]
pub struct StorageStateInner {
storage_results: StorageResults,
storage_keys: StorageKeys,
replicator_map: ReplicatorMap,
storage_last_id: Hash,
entry_height: u64,
}
#[derive(Default)]
pub struct StorageState {
storage_results: Arc<RwLock<StorageResults>>,
storage_keys: Arc<RwLock<StorageKeys>>,
last_id: Hash,
state: Arc<RwLock<StorageStateInner>>,
}
pub struct StorageStage {
@ -46,9 +59,13 @@ const NUM_HASHES_FOR_STORAGE_ROTATE: u64 = 1024;
// TODO: some way to dynamically size NUM_IDENTITIES
const NUM_IDENTITIES: usize = 1024;
const NUM_SAMPLES: usize = 4;
pub const ENTRIES_PER_SLICE: u64 = 16;
pub const ENTRIES_PER_SEGMENT: u64 = 16;
const KEY_SIZE: usize = 64;
pub fn get_segment_from_entry(entry_height: u64) -> u64 {
entry_height / ENTRIES_PER_SEGMENT
}
fn get_identity_index_from_signature(key: &Signature) -> usize {
let rkey = key.as_ref();
let mut res: usize = (rkey[0] as usize)
@ -61,28 +78,55 @@ fn get_identity_index_from_signature(key: &Signature) -> usize {
impl StorageState {
pub fn new() -> Self {
let storage_keys = Arc::new(RwLock::new(vec![0u8; KEY_SIZE * NUM_IDENTITIES]));
let storage_results = Arc::new(RwLock::new(vec![Hash::default(); NUM_IDENTITIES]));
let storage_keys = vec![0u8; KEY_SIZE * NUM_IDENTITIES];
let storage_results = vec![Hash::default(); NUM_IDENTITIES];
let replicator_map = vec![];
StorageState {
let state = StorageStateInner {
storage_keys,
storage_results,
last_id: Hash::default(),
replicator_map,
entry_height: 0,
storage_last_id: Hash::default(),
};
StorageState {
state: Arc::new(RwLock::new(state)),
}
}
pub fn get_mining_key(&self, key: &Signature) -> Vec<u8> {
let idx = get_identity_index_from_signature(key);
self.storage_keys.read().unwrap()[idx..idx + KEY_SIZE].to_vec()
self.state.read().unwrap().storage_keys[idx..idx + KEY_SIZE].to_vec()
}
pub fn get_mining_result(&self, key: &Signature) -> Hash {
let idx = get_identity_index_from_signature(key);
self.storage_results.read().unwrap()[idx]
self.state.read().unwrap().storage_results[idx]
}
pub fn get_last_id(&self) -> Hash {
self.last_id
self.state.read().unwrap().storage_last_id
}
pub fn get_entry_height(&self) -> u64 {
self.state.read().unwrap().entry_height
}
pub fn get_pubkeys_for_entry_height(&self, entry_height: u64) -> Vec<Pubkey> {
// TODO: keep track of age?
const MAX_PUBKEYS_TO_RETURN: usize = 5;
let index = (entry_height / ENTRIES_PER_SEGMENT) as usize;
let replicator_map = &self.state.read().unwrap().replicator_map;
if index < replicator_map.len() {
replicator_map[index]
.iter()
.cloned()
.take(MAX_PUBKEYS_TO_RETURN)
.collect::<Vec<_>>()
} else {
vec![]
}
}
}
@ -95,8 +139,9 @@ impl StorageStage {
exit: Arc<AtomicBool>,
entry_height: u64,
) -> Self {
let storage_keys_ = storage_state.storage_keys.clone();
let storage_results_ = storage_state.storage_results.clone();
debug!("storage_stage::new: entry_height: {}", entry_height);
storage_state.state.write().unwrap().entry_height = entry_height;
let storage_state_inner = storage_state.state.clone();
let ledger_path = ledger_path.map(String::from);
let t_storage_mining_verifier = Builder::new()
.name("solana-storage-mining-verify-stage".to_string())
@ -109,8 +154,7 @@ impl StorageStage {
if let Some(ref ledger_path_str) = ledger_path {
if let Err(e) = Self::process_entries(
&keypair,
&storage_keys_,
&storage_results_,
&storage_state_inner,
&storage_entry_receiver,
ledger_path_str,
&mut poh_height,
@ -137,8 +181,7 @@ impl StorageStage {
}
pub fn process_entry_crossing(
_storage_results: &Arc<RwLock<StorageResults>>,
_storage_keys: &Arc<RwLock<StorageKeys>>,
state: &Arc<RwLock<StorageStateInner>>,
keypair: &Arc<Keypair>,
_ledger_path: &str,
entry_id: Hash,
@ -151,18 +194,20 @@ impl StorageStage {
let mut rng = ChaChaRng::from_seed(seed);
state.write().unwrap().entry_height = entry_height;
// Regenerate the answers
let num_slices = (entry_height / ENTRIES_PER_SLICE) as usize;
if num_slices == 0 {
info!("Ledger has 0 slices!");
let num_segments = (entry_height / ENTRIES_PER_SEGMENT) as usize;
if num_segments == 0 {
info!("Ledger has 0 segments!");
return Ok(());
}
// TODO: what if the validator does not have this slice
let slice = signature.as_ref()[0] as usize % num_slices;
// TODO: what if the validator does not have this segment
let segment = signature.as_ref()[0] as usize % num_segments;
debug!(
"storage verifying: slice: {} identities: {}",
slice, NUM_IDENTITIES,
"storage verifying: segment: {} identities: {}",
segment, NUM_IDENTITIES,
);
let mut samples = vec![];
@ -175,23 +220,22 @@ impl StorageStage {
// process storage mining results.
#[cfg(all(feature = "chacha", feature = "cuda"))]
{
let mut storage_results = _storage_results.write().unwrap();
// Lock the keys, since this is the IV memory,
// it will be updated in-place by the encryption.
// Should be overwritten by the vote signatures which replace the
// key values by the time it runs again.
let mut storage_keys = _storage_keys.write().unwrap();
let mut statew = state.write().unwrap();
match chacha_cbc_encrypt_file_many_keys(
_ledger_path,
slice as u64,
&mut storage_keys,
segment as u64,
&mut statew.storage_keys,
&samples,
) {
Ok(hashes) => {
debug!("Success! encrypted ledger slice: {}", slice);
storage_results.copy_from_slice(&hashes);
debug!("Success! encrypted ledger segment: {}", segment);
statew.storage_results.copy_from_slice(&hashes);
}
Err(e) => {
info!("error encrypting file: {:?}", e);
@ -206,8 +250,7 @@ impl StorageStage {
pub fn process_entries(
keypair: &Arc<Keypair>,
storage_keys: &Arc<RwLock<StorageKeys>>,
storage_results: &Arc<RwLock<StorageResults>>,
storage_state: &Arc<RwLock<StorageStateInner>>,
entry_receiver: &EntryReceiver,
ledger_path: &str,
poh_height: &mut u64,
@ -221,17 +264,45 @@ impl StorageStage {
// Go through the transactions, find votes, and use them to update
// the storage_keys with their signatures.
for tx in entry.transactions {
for program_id in tx.program_ids {
for (i, program_id) in tx.program_ids.iter().enumerate() {
if vote_program::check_id(&program_id) {
debug!(
"generating storage_keys from votes current_key_idx: {}",
*current_key_idx
);
let mut storage_keys = storage_keys.write().unwrap();
let storage_keys = &mut storage_state.write().unwrap().storage_keys;
storage_keys[*current_key_idx..*current_key_idx + size_of::<Signature>()]
.copy_from_slice(tx.signatures[0].as_ref());
*current_key_idx += size_of::<Signature>();
*current_key_idx %= storage_keys.len();
} else if storage_program::check_id(&program_id) {
match deserialize(&tx.instructions[i].userdata) {
Ok(StorageProgram::SubmitMiningProof {
entry_height: proof_entry_height,
..
}) => {
if proof_entry_height < *entry_height {
let mut statew = storage_state.write().unwrap();
let max_segment_index =
(*entry_height / ENTRIES_PER_SEGMENT) as usize;
if statew.replicator_map.len() <= max_segment_index {
statew
.replicator_map
.resize(max_segment_index, HashSet::new());
}
let proof_segment_index =
(proof_entry_height / ENTRIES_PER_SEGMENT) as usize;
if proof_segment_index < statew.replicator_map.len() {
statew.replicator_map[proof_segment_index]
.insert(tx.account_keys[0]);
}
}
debug!("storage proof: entry_height: {}", entry_height);
}
Err(e) => {
info!("error: {:?}", e);
}
}
}
}
}
@ -241,8 +312,7 @@ impl StorageStage {
*poh_height, entry_height, entry.num_hashes
);
Self::process_entry_crossing(
&storage_results,
&storage_keys,
&storage_state,
&keypair,
&ledger_path,
entry.id,
@ -408,9 +478,9 @@ mod tests {
let mut reference_keys;
{
let keys = storage_state.storage_keys.read().unwrap();
let keys = &storage_state.state.read().unwrap().storage_keys;
reference_keys = vec![0; keys.len()];
reference_keys.copy_from_slice(&keys);
reference_keys.copy_from_slice(keys);
}
let mut vote_txs: Vec<Transaction> = Vec::new();
let vote = Vote {
@ -424,7 +494,7 @@ mod tests {
for _ in 0..5 {
{
let keys = storage_state.storage_keys.read().unwrap();
let keys = &storage_state.state.read().unwrap().storage_keys;
if keys[..] != *reference_keys.as_slice() {
break;
}
@ -438,7 +508,7 @@ mod tests {
storage_stage.join().unwrap();
{
let keys = storage_state.storage_keys.read().unwrap();
let keys = &storage_state.state.read().unwrap().storage_keys;
assert_ne!(keys[..], *reference_keys);
}

View File

@ -67,7 +67,7 @@ impl Tvu {
bank: &Arc<Bank>,
entry_height: u64,
last_entry_id: Hash,
cluster_info: Arc<RwLock<ClusterInfo>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
sockets: Sockets,
ledger_path: Option<&str>,
db_ledger: Arc<RwLock<DbLedger>>,
@ -110,7 +110,7 @@ impl Tvu {
keypair.clone(),
vote_account_keypair,
bank.clone(),
cluster_info,
cluster_info.clone(),
blob_window_receiver,
exit.clone(),
entry_height,
@ -285,7 +285,7 @@ pub mod tests {
&bank,
0,
cur_hash,
cref1,
&cref1,
{
Sockets {
repair: target1.sockets.repair,

View File

@ -1,12 +1,16 @@
#[macro_use]
extern crate log;
#[cfg(feature = "chacha")]
#[macro_use]
extern crate serde_json;
use solana::client::mk_client;
use solana::cluster_info::{Node, NodeInfo};
use solana::db_ledger::DbLedger;
use solana::fullnode::Fullnode;
use solana::leader_scheduler::LeaderScheduler;
use solana::ledger::{create_tmp_genesis, get_tmp_ledger_path, read_ledger};
use solana::ledger::{create_tmp_genesis, get_tmp_ledger_path, read_ledger, tmp_copy_ledger};
use solana::replicator::Replicator;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction::SystemTransaction;
@ -30,14 +34,34 @@ fn test_replicator_startup() {
let leader_ledger_path = "replicator_test_leader_ledger";
let (mint, leader_ledger_path) = create_tmp_genesis(leader_ledger_path, 100, leader_info.id, 1);
let validator_ledger_path =
tmp_copy_ledger(&leader_ledger_path, "replicator_test_validator_ledger");
{
let leader = Fullnode::new(
leader_node,
&leader_ledger_path,
leader_keypair,
vote_account_keypair,
vote_account_keypair.clone(),
None,
false,
LeaderScheduler::from_bootstrap_leader(leader_info.id.clone()),
None,
);
let validator_keypair = Arc::new(Keypair::new());
let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey());
#[cfg(feature = "chacha")]
let validator_node_info = validator_node.info.clone();
let validator = Fullnode::new(
validator_node,
&validator_ledger_path,
validator_keypair,
vote_account_keypair,
Some(leader_info.gossip),
false,
LeaderScheduler::from_bootstrap_leader(leader_info.id),
None,
);
@ -53,6 +77,7 @@ fn test_replicator_startup() {
let replicator_keypair = Keypair::new();
// Give the replicator some tokens
let amount = 1;
let mut tx = Transaction::system_new(
&mint.keypair(),
@ -77,6 +102,7 @@ fn test_replicator_startup() {
)
.unwrap();
// Poll the ledger dir to see that some is downloaded
let mut num_entries = 0;
for _ in 0..60 {
match read_ledger(replicator_ledger_path, true) {
@ -94,13 +120,43 @@ fn test_replicator_startup() {
}
}
sleep(Duration::from_millis(300));
// Do a transfer to make sure new entries are created which
// stimulates the repair process
let last_id = leader_client.get_last_id();
leader_client
.transfer(1, &mint.keypair(), bob.pubkey(), &last_id)
.unwrap();
}
// The replicator will not submit storage proofs if
// chacha is not enabled
#[cfg(feature = "chacha")]
{
use solana::rpc_request::{RpcClient, RpcRequest};
let rpc_client = RpcClient::new_from_socket(validator_node_info.rpc);
let mut non_zero_pubkeys = false;
for _ in 0..30 {
let params = json!([0]);
let pubkeys = RpcRequest::GetStoragePubkeysForEntryHeight
.make_rpc_request(&rpc_client, 1, Some(params))
.unwrap();
info!("pubkeys: {:?}", pubkeys);
if pubkeys.as_array().unwrap().len() != 0 {
non_zero_pubkeys = true;
break;
}
sleep(Duration::from_secs(1));
}
assert!(non_zero_pubkeys);
}
// Check that some ledger was downloaded
assert!(num_entries > 0);
replicator.close();
validator.exit();
leader.close().expect("Expected successful node closure");
}