Storage Account setup for replicators and validators (#3516)

* Setup Storage Accounts for replicators

* Setup Storage Accounts for validators

* Add Replicator Info to Local Cluster and Add test
This commit is contained in:
Sagar Dhawan 2019-03-27 15:54:09 -07:00 committed by GitHub
parent e45f7afd85
commit e8cc566b2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 204 additions and 41 deletions

View File

@ -36,6 +36,20 @@ impl FullnodeInfo {
}
}
pub struct ReplicatorInfo {
pub replicator_storage_id: Pubkey,
pub ledger_path: String,
}
impl ReplicatorInfo {
fn new(storage_id: Pubkey, ledger_path: String) -> Self {
Self {
replicator_storage_id: storage_id,
ledger_path,
}
}
}
pub struct LocalCluster {
/// Keypair with funding to particpiate in the network
pub funding_keypair: Keypair,
@ -47,7 +61,7 @@ pub struct LocalCluster {
genesis_ledger_path: String,
genesis_block: GenesisBlock,
replicators: Vec<Replicator>,
pub replicator_ledger_paths: Vec<String>,
pub replicator_infos: HashMap<Pubkey, ReplicatorInfo>,
}
impl LocalCluster {
@ -131,10 +145,10 @@ impl LocalCluster {
entry_point_info: leader_contact_info,
fullnodes,
replicators: vec![],
replicator_ledger_paths: vec![],
genesis_ledger_path,
genesis_block,
fullnode_infos,
replicator_infos: HashMap::new(),
fullnode_config: fullnode_config.clone(),
};
@ -148,6 +162,12 @@ impl LocalCluster {
cluster.add_replicator();
}
discover(
&cluster.entry_point_info.gossip,
node_stakes.len() + num_replicators,
)
.unwrap();
cluster
}
@ -163,8 +183,8 @@ impl LocalCluster {
node.join().unwrap();
}
while let Some(node) = self.replicators.pop() {
node.close();
while let Some(replicator) = self.replicators.pop() {
replicator.close();
}
}
@ -213,6 +233,9 @@ impl LocalCluster {
fn add_replicator(&mut self) {
let replicator_keypair = Arc::new(Keypair::new());
let replicator_id = replicator_keypair.pubkey();
let storage_keypair = Arc::new(Keypair::new());
let storage_id = storage_keypair.pubkey();
let client = create_client(
self.entry_point_info.client_facing_addr(),
FULLNODE_PORT_RANGE,
@ -224,7 +247,7 @@ impl LocalCluster {
&replicator_keypair.pubkey(),
1,
);
let replicator_node = Node::new_localhost_replicator(&replicator_keypair.pubkey());
let replicator_node = Node::new_localhost_replicator(&replicator_id);
let (replicator_ledger_path, _blockhash) = create_new_tmp_ledger!(&self.genesis_block);
let replicator = Replicator::new(
@ -232,12 +255,16 @@ impl LocalCluster {
replicator_node,
self.entry_point_info.clone(),
replicator_keypair,
storage_keypair,
None,
)
.unwrap();
self.replicator_ledger_paths.push(replicator_ledger_path);
self.replicators.push(replicator);
self.replicator_infos.insert(
replicator_id,
ReplicatorInfo::new(storage_id, replicator_ledger_path),
);
}
fn close(&mut self) {
@ -246,7 +273,7 @@ impl LocalCluster {
.fullnode_infos
.values()
.map(|f| &f.ledger_path)
.chain(self.replicator_ledger_paths.iter())
.chain(self.replicator_infos.values().map(|info| &info.ledger_path))
{
remove_dir_all(&ledger_path)
.unwrap_or_else(|_| panic!("Unable to remove {}", ledger_path));

View File

@ -22,6 +22,7 @@ use solana_client::thin_client::{create_client, ThinClient};
use solana_drone::drone::{request_airdrop_transaction, DRONE_PORT};
use solana_sdk::hash::{Hash, Hasher};
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
use solana_sdk::system_transaction::SystemTransaction;
use solana_sdk::transaction::Transaction;
use solana_storage_api::storage_instruction::StorageInstruction;
use std::fs::File;
@ -58,6 +59,7 @@ pub struct Replicator {
slot: u64,
ledger_path: String,
keypair: Arc<Keypair>,
storage_keypair: Arc<Keypair>,
signature: ring::signature::Signature,
cluster_entrypoint: ContactInfo,
ledger_data_file_encrypted: PathBuf,
@ -183,6 +185,7 @@ impl Replicator {
node: Node,
cluster_entrypoint: ContactInfo,
keypair: Arc<Keypair>,
storage_keypair: Arc<Keypair>,
_timeout: Option<Duration>,
) -> Result<Self> {
let exit = Arc::new(AtomicBool::new(false));
@ -218,7 +221,7 @@ impl Replicator {
Self::poll_for_blockhash_and_entry_height(&cluster_info)?;
let node_info = node.info.clone();
let signature = keypair.sign(storage_blockhash.as_ref());
let signature = storage_keypair.sign(storage_blockhash.as_ref());
let slot = get_entry_heights_from_blockhash(&signature, storage_entry_height);
info!("replicating slot: {}", slot);
@ -260,10 +263,27 @@ impl Replicator {
let exit3 = exit.clone();
let blocktree1 = blocktree.clone();
let t_replicate = spawn(move || loop {
Self::wait_for_ledger_download(slot, &blocktree1, &exit3, &node_info, &cluster_info);
if exit3.load(Ordering::Relaxed) {
break;
let keypair1 = keypair.clone();
let storage_keypair1 = storage_keypair.clone();
let cluster_entrypoint1 = cluster_entrypoint.clone();
let t_replicate = spawn(move || {
let client = create_client(
cluster_entrypoint1.client_facing_addr(),
FULLNODE_PORT_RANGE,
);
Self::setup_mining_account(&client, &keypair1, &storage_keypair1, &cluster_entrypoint1);
loop {
Self::wait_for_ledger_download(
slot,
&blocktree1,
&exit3,
&node_info,
&cluster_info,
);
if exit3.load(Ordering::Relaxed) {
break;
}
}
});
thread_handles.push(t_replicate);
@ -277,6 +297,7 @@ impl Replicator {
slot,
ledger_path: ledger_path.to_string(),
keypair: keypair.clone(),
storage_keypair,
signature,
cluster_entrypoint,
ledger_data_file_encrypted: PathBuf::default(),
@ -397,6 +418,36 @@ impl Replicator {
Ok(())
}
fn setup_mining_account(
client: &ThinClient,
keypair: &Keypair,
storage_keypair: &Keypair,
entrypoint: &ContactInfo,
) {
// make sure replicator has some balance
Self::get_airdrop_lamports(client, keypair, entrypoint);
// check if the account exists
if client
.wait_for_balance(&storage_keypair.pubkey(), None)
.is_none()
{
let blockhash = client.get_recent_blockhash().expect("blockhash");
//TODO the account space needs to be well defined somewhere
let tx = SystemTransaction::new_program_account(
keypair,
&storage_keypair.pubkey(),
blockhash,
1,
1024 * 4,
&solana_storage_api::id(),
0,
);
let signature = client.transfer_signed(&tx).unwrap();
client.poll_for_signature(&signature).unwrap();
}
}
fn submit_mining_proof(&self) {
let client = create_client(
self.cluster_entrypoint.client_facing_addr(),
@ -405,14 +456,14 @@ impl Replicator {
Self::get_airdrop_lamports(&client, &self.keypair, &self.cluster_entrypoint);
let ix = StorageInstruction::new_mining_proof(
&self.keypair.pubkey(),
&self.storage_keypair.pubkey(),
self.hash,
self.slot,
Signature::new(self.signature.as_ref()),
);
let mut tx = Transaction::new_unsigned_instructions(vec![ix]);
client
.retry_transfer(&self.keypair, &mut tx, 10)
.retry_transfer(&self.storage_keypair, &mut tx, 10)
.expect("transfer didn't work!");
}

View File

@ -12,10 +12,11 @@ use crate::service::Service;
use bincode::deserialize;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaChaRng;
use solana_client::thin_client::create_client_with_timeout;
use solana_client::thin_client::{create_client_with_timeout, ThinClient};
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
use solana_sdk::system_transaction::SystemTransaction;
use solana_sdk::transaction::Transaction;
use solana_storage_api::storage_instruction::StorageInstruction;
use std::collections::HashSet;
@ -141,6 +142,7 @@ impl StorageStage {
storage_entry_receiver: EntryReceiver,
blocktree: Option<Arc<Blocktree>>,
keypair: &Arc<Keypair>,
storage_keypair: &Arc<Keypair>,
exit: &Arc<AtomicBool>,
entry_height: u64,
storage_rotate_count: u64,
@ -150,7 +152,7 @@ impl StorageStage {
storage_state.state.write().unwrap().entry_height = entry_height;
let storage_state_inner = storage_state.state.clone();
let exit0 = exit.clone();
let keypair0 = keypair.clone();
let keypair0 = storage_keypair.clone();
let (tx_sender, tx_receiver) = channel();
@ -190,13 +192,21 @@ impl StorageStage {
let cluster_info0 = cluster_info.clone();
let exit1 = exit.clone();
let keypair1 = keypair.clone();
let storage_keypair1 = storage_keypair.clone();
let t_storage_create_accounts = Builder::new()
.name("solana-storage-create-accounts".to_string())
.spawn(move || loop {
match tx_receiver.recv_timeout(Duration::from_secs(1)) {
Ok(mut tx) => {
if Self::send_transaction(&cluster_info0, &mut tx, &exit1, &keypair1, None)
.is_ok()
if Self::send_transaction(
&cluster_info0,
&mut tx,
&exit1,
&keypair1,
&storage_keypair1,
Some(storage_keypair1.pubkey()),
)
.is_ok()
{
debug!("sent transaction: {:?}", tx);
}
@ -220,11 +230,31 @@ impl StorageStage {
}
}
fn check_signature(
client: &ThinClient,
signature: &Signature,
exit: &Arc<AtomicBool>,
) -> io::Result<()> {
for _ in 0..10 {
if client.check_signature(&signature) {
return Ok(());
}
if exit.load(Ordering::Relaxed) {
Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?;
}
sleep(Duration::from_millis(200));
}
Err(io::Error::new(io::ErrorKind::Other, "other failure"))
}
fn send_transaction(
cluster_info: &Arc<RwLock<ClusterInfo>>,
transaction: &mut Transaction,
exit: &Arc<AtomicBool>,
keypair: &Arc<Keypair>,
storage_keypair: &Arc<Keypair>,
account_to_create: Option<Pubkey>,
) -> io::Result<()> {
let contact_info = cluster_info.read().unwrap().my_data();
@ -234,12 +264,6 @@ impl StorageStage {
Duration::from_secs(5),
);
if let Some(account) = account_to_create {
if client.get_account_data(&account).is_ok() {
return Ok(());
}
}
let mut blockhash = None;
for _ in 0..10 {
if let Ok(new_blockhash) = client.get_recent_blockhash() {
@ -251,26 +275,32 @@ impl StorageStage {
Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?;
}
}
if let Some(blockhash) = blockhash {
transaction.sign(&[keypair.as_ref()], blockhash);
if let Some(account) = account_to_create {
if client.get_account_data(&account).is_err() {
// TODO the account space needs to be well defined somewhere
let tx = SystemTransaction::new_program_account(
keypair,
&storage_keypair.pubkey(),
blockhash,
1,
1024 * 4,
&solana_storage_api::id(),
0,
);
let signature = client.transfer_signed(&tx).unwrap();
Self::check_signature(&client, &signature, &exit)?;
}
}
transaction.sign(&[storage_keypair.as_ref()], blockhash);
if exit.load(Ordering::Relaxed) {
Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?;
}
if let Ok(signature) = client.transfer_signed(&transaction) {
for _ in 0..10 {
if client.check_signature(&signature) {
return Ok(());
}
if exit.load(Ordering::Relaxed) {
Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?;
}
sleep(Duration::from_millis(200));
}
Self::check_signature(&client, &signature, &exit)?;
return Ok(());
}
}
@ -475,6 +505,7 @@ mod tests {
#[test]
fn test_storage_stage_none_ledger() {
let keypair = Arc::new(Keypair::new());
let storage_keypair = Arc::new(Keypair::new());
let exit = Arc::new(AtomicBool::new(false));
let cluster_info = test_cluster_info(&keypair.pubkey());
@ -486,6 +517,7 @@ mod tests {
storage_entry_receiver,
None,
&keypair,
&storage_keypair,
&exit.clone(),
0,
STORAGE_ROTATE_TEST_COUNT,
@ -505,6 +537,7 @@ mod tests {
fn test_storage_stage_process_entries() {
solana_logger::setup();
let keypair = Arc::new(Keypair::new());
let storage_keypair = Arc::new(Keypair::new());
let exit = Arc::new(AtomicBool::new(false));
let (genesis_block, _mint_keypair) = GenesisBlock::new(1000);
@ -526,6 +559,7 @@ mod tests {
storage_entry_receiver,
Some(Arc::new(blocktree)),
&keypair,
&storage_keypair,
&exit.clone(),
0,
STORAGE_ROTATE_TEST_COUNT,
@ -569,6 +603,7 @@ mod tests {
fn test_storage_stage_process_proof_entries() {
solana_logger::setup();
let keypair = Arc::new(Keypair::new());
let storage_keypair = Arc::new(Keypair::new());
let exit = Arc::new(AtomicBool::new(false));
let (genesis_block, _mint_keypair) = GenesisBlock::new(1000);
@ -590,6 +625,7 @@ mod tests {
storage_entry_receiver,
Some(Arc::new(blocktree)),
&keypair,
&storage_keypair,
&exit.clone(),
0,
STORAGE_ROTATE_TEST_COUNT,

View File

@ -135,11 +135,13 @@ impl Tvu {
None
};
let storage_keypair = Arc::new(Keypair::new());
let storage_stage = StorageStage::new(
storage_state,
storage_entry_receiver,
Some(blocktree),
&keypair,
&storage_keypair,
&exit,
bank_forks_info[0].entry_height, // TODO: StorageStage needs to deal with BankForks somehow still
storage_rotate_count,

View File

@ -6,7 +6,7 @@ extern crate solana;
use bincode::{deserialize, serialize};
use solana::blocktree::{create_new_tmp_ledger, tmp_copy_blocktree, Blocktree};
use solana::cluster_info::{ClusterInfo, Node};
use solana::cluster_info::{ClusterInfo, Node, FULLNODE_PORT_RANGE};
use solana::contact_info::ContactInfo;
use solana::fullnode::{Fullnode, FullnodeConfig};
use solana::gossip_service::discover;
@ -15,6 +15,7 @@ use solana::replicator::Replicator;
use solana::replicator::ReplicatorRequest;
use solana::storage_stage::STORAGE_ROTATE_TEST_COUNT;
use solana::streamer::blob_receiver;
use solana_client::thin_client::create_client;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
@ -159,6 +160,7 @@ fn test_replicator_startup_leader_hang() {
{
let replicator_keypair = Arc::new(Keypair::new());
let storage_keypair = Arc::new(Keypair::new());
info!("starting replicator node");
let replicator_node = Node::new_localhost_with_pubkey(&replicator_keypair.pubkey());
@ -171,6 +173,7 @@ fn test_replicator_startup_leader_hang() {
replicator_node,
leader_info,
replicator_keypair,
storage_keypair,
Some(Duration::from_secs(3)),
);
@ -231,6 +234,7 @@ fn test_replicator_startup_ledger_hang() {
info!("starting replicator node");
let bad_keys = Arc::new(Keypair::new());
let storage_keypair = Arc::new(Keypair::new());
let mut replicator_node = Node::new_localhost_with_pubkey(&bad_keys.pubkey());
// Pass bad TVU sockets to prevent successful ledger download
@ -243,6 +247,7 @@ fn test_replicator_startup_ledger_hang() {
replicator_node,
leader_info,
bad_keys,
storage_keypair,
Some(Duration::from_secs(3)),
);
@ -254,3 +259,38 @@ fn test_replicator_startup_ledger_hang() {
let _ignored = remove_dir_all(&leader_ledger_path);
let _ignored = remove_dir_all(&replicator_ledger_path);
}
#[test]
fn test_account_setup() {
let num_nodes = 1;
let num_replicators = 1;
let mut fullnode_config = FullnodeConfig::default();
fullnode_config.storage_rotate_count = STORAGE_ROTATE_TEST_COUNT;
let cluster = LocalCluster::new_with_config_replicators(
&vec![100; num_nodes],
10_000,
&fullnode_config,
num_replicators,
DEFAULT_TICKS_PER_SLOT,
DEFAULT_SLOTS_PER_EPOCH,
);
let _ = discover(
&cluster.entry_point_info.gossip,
num_nodes + num_replicators,
)
.unwrap();
// now check that the cluster actually has accounts for the replicator.
let client = create_client(
cluster.entry_point_info.client_facing_addr(),
FULLNODE_PORT_RANGE,
);
cluster.replicator_infos.iter().for_each(|(_, value)| {
assert_eq!(
client
.poll_get_balance(&value.replicator_storage_id)
.unwrap(),
1
);
});
}

View File

@ -81,9 +81,16 @@ fn main() {
.unwrap();
let leader_info = ContactInfo::new_gossip_entry_point(&network_addr);
let mut replicator =
Replicator::new(ledger_path, node, leader_info, Arc::new(keypair), None).unwrap();
let storage_keypair = Arc::new(Keypair::new());
let mut replicator = Replicator::new(
ledger_path,
node,
leader_info,
Arc::new(keypair),
storage_keypair,
None,
)
.unwrap();
replicator.run();