diff --git a/core/src/local_cluster.rs b/core/src/local_cluster.rs index cffa828e4..ca43782c3 100644 --- a/core/src/local_cluster.rs +++ b/core/src/local_cluster.rs @@ -36,6 +36,20 @@ impl FullnodeInfo { } } +pub struct ReplicatorInfo { + pub replicator_storage_id: Pubkey, + pub ledger_path: String, +} + +impl ReplicatorInfo { + fn new(storage_id: Pubkey, ledger_path: String) -> Self { + Self { + replicator_storage_id: storage_id, + ledger_path, + } + } +} + pub struct LocalCluster { /// Keypair with funding to particpiate in the network pub funding_keypair: Keypair, @@ -47,7 +61,7 @@ pub struct LocalCluster { genesis_ledger_path: String, genesis_block: GenesisBlock, replicators: Vec, - pub replicator_ledger_paths: Vec, + pub replicator_infos: HashMap, } impl LocalCluster { @@ -131,10 +145,10 @@ impl LocalCluster { entry_point_info: leader_contact_info, fullnodes, replicators: vec![], - replicator_ledger_paths: vec![], genesis_ledger_path, genesis_block, fullnode_infos, + replicator_infos: HashMap::new(), fullnode_config: fullnode_config.clone(), }; @@ -148,6 +162,12 @@ impl LocalCluster { cluster.add_replicator(); } + discover( + &cluster.entry_point_info.gossip, + node_stakes.len() + num_replicators, + ) + .unwrap(); + cluster } @@ -163,8 +183,8 @@ impl LocalCluster { node.join().unwrap(); } - while let Some(node) = self.replicators.pop() { - node.close(); + while let Some(replicator) = self.replicators.pop() { + replicator.close(); } } @@ -213,6 +233,9 @@ impl LocalCluster { fn add_replicator(&mut self) { let replicator_keypair = Arc::new(Keypair::new()); + let replicator_id = replicator_keypair.pubkey(); + let storage_keypair = Arc::new(Keypair::new()); + let storage_id = storage_keypair.pubkey(); let client = create_client( self.entry_point_info.client_facing_addr(), FULLNODE_PORT_RANGE, @@ -224,7 +247,7 @@ impl LocalCluster { &replicator_keypair.pubkey(), 1, ); - let replicator_node = Node::new_localhost_replicator(&replicator_keypair.pubkey()); + let replicator_node = Node::new_localhost_replicator(&replicator_id); let (replicator_ledger_path, _blockhash) = create_new_tmp_ledger!(&self.genesis_block); let replicator = Replicator::new( @@ -232,12 +255,16 @@ impl LocalCluster { replicator_node, self.entry_point_info.clone(), replicator_keypair, + storage_keypair, None, ) .unwrap(); - self.replicator_ledger_paths.push(replicator_ledger_path); self.replicators.push(replicator); + self.replicator_infos.insert( + replicator_id, + ReplicatorInfo::new(storage_id, replicator_ledger_path), + ); } fn close(&mut self) { @@ -246,7 +273,7 @@ impl LocalCluster { .fullnode_infos .values() .map(|f| &f.ledger_path) - .chain(self.replicator_ledger_paths.iter()) + .chain(self.replicator_infos.values().map(|info| &info.ledger_path)) { remove_dir_all(&ledger_path) .unwrap_or_else(|_| panic!("Unable to remove {}", ledger_path)); diff --git a/core/src/replicator.rs b/core/src/replicator.rs index c283d9fd8..ad653624c 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -22,6 +22,7 @@ use solana_client::thin_client::{create_client, ThinClient}; use solana_drone::drone::{request_airdrop_transaction, DRONE_PORT}; use solana_sdk::hash::{Hash, Hasher}; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; +use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::transaction::Transaction; use solana_storage_api::storage_instruction::StorageInstruction; use std::fs::File; @@ -58,6 +59,7 @@ pub struct Replicator { slot: u64, ledger_path: String, keypair: Arc, + storage_keypair: Arc, signature: ring::signature::Signature, cluster_entrypoint: ContactInfo, ledger_data_file_encrypted: PathBuf, @@ -183,6 +185,7 @@ impl Replicator { node: Node, cluster_entrypoint: ContactInfo, keypair: Arc, + storage_keypair: Arc, _timeout: Option, ) -> Result { let exit = Arc::new(AtomicBool::new(false)); @@ -218,7 +221,7 @@ impl Replicator { Self::poll_for_blockhash_and_entry_height(&cluster_info)?; let node_info = node.info.clone(); - let signature = keypair.sign(storage_blockhash.as_ref()); + let signature = storage_keypair.sign(storage_blockhash.as_ref()); let slot = get_entry_heights_from_blockhash(&signature, storage_entry_height); info!("replicating slot: {}", slot); @@ -260,10 +263,27 @@ impl Replicator { let exit3 = exit.clone(); let blocktree1 = blocktree.clone(); - let t_replicate = spawn(move || loop { - Self::wait_for_ledger_download(slot, &blocktree1, &exit3, &node_info, &cluster_info); - if exit3.load(Ordering::Relaxed) { - break; + let keypair1 = keypair.clone(); + let storage_keypair1 = storage_keypair.clone(); + let cluster_entrypoint1 = cluster_entrypoint.clone(); + let t_replicate = spawn(move || { + let client = create_client( + cluster_entrypoint1.client_facing_addr(), + FULLNODE_PORT_RANGE, + ); + Self::setup_mining_account(&client, &keypair1, &storage_keypair1, &cluster_entrypoint1); + + loop { + Self::wait_for_ledger_download( + slot, + &blocktree1, + &exit3, + &node_info, + &cluster_info, + ); + if exit3.load(Ordering::Relaxed) { + break; + } } }); thread_handles.push(t_replicate); @@ -277,6 +297,7 @@ impl Replicator { slot, ledger_path: ledger_path.to_string(), keypair: keypair.clone(), + storage_keypair, signature, cluster_entrypoint, ledger_data_file_encrypted: PathBuf::default(), @@ -397,6 +418,36 @@ impl Replicator { Ok(()) } + fn setup_mining_account( + client: &ThinClient, + keypair: &Keypair, + storage_keypair: &Keypair, + entrypoint: &ContactInfo, + ) { + // make sure replicator has some balance + Self::get_airdrop_lamports(client, keypair, entrypoint); + + // check if the account exists + if client + .wait_for_balance(&storage_keypair.pubkey(), None) + .is_none() + { + let blockhash = client.get_recent_blockhash().expect("blockhash"); + //TODO the account space needs to be well defined somewhere + let tx = SystemTransaction::new_program_account( + keypair, + &storage_keypair.pubkey(), + blockhash, + 1, + 1024 * 4, + &solana_storage_api::id(), + 0, + ); + let signature = client.transfer_signed(&tx).unwrap(); + client.poll_for_signature(&signature).unwrap(); + } + } + fn submit_mining_proof(&self) { let client = create_client( self.cluster_entrypoint.client_facing_addr(), @@ -405,14 +456,14 @@ impl Replicator { Self::get_airdrop_lamports(&client, &self.keypair, &self.cluster_entrypoint); let ix = StorageInstruction::new_mining_proof( - &self.keypair.pubkey(), + &self.storage_keypair.pubkey(), self.hash, self.slot, Signature::new(self.signature.as_ref()), ); let mut tx = Transaction::new_unsigned_instructions(vec![ix]); client - .retry_transfer(&self.keypair, &mut tx, 10) + .retry_transfer(&self.storage_keypair, &mut tx, 10) .expect("transfer didn't work!"); } diff --git a/core/src/storage_stage.rs b/core/src/storage_stage.rs index b27720867..791a44412 100644 --- a/core/src/storage_stage.rs +++ b/core/src/storage_stage.rs @@ -12,10 +12,11 @@ use crate::service::Service; use bincode::deserialize; use rand::{Rng, SeedableRng}; use rand_chacha::ChaChaRng; -use solana_client::thin_client::create_client_with_timeout; +use solana_client::thin_client::{create_client_with_timeout, ThinClient}; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; +use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::transaction::Transaction; use solana_storage_api::storage_instruction::StorageInstruction; use std::collections::HashSet; @@ -141,6 +142,7 @@ impl StorageStage { storage_entry_receiver: EntryReceiver, blocktree: Option>, keypair: &Arc, + storage_keypair: &Arc, exit: &Arc, entry_height: u64, storage_rotate_count: u64, @@ -150,7 +152,7 @@ impl StorageStage { storage_state.state.write().unwrap().entry_height = entry_height; let storage_state_inner = storage_state.state.clone(); let exit0 = exit.clone(); - let keypair0 = keypair.clone(); + let keypair0 = storage_keypair.clone(); let (tx_sender, tx_receiver) = channel(); @@ -190,13 +192,21 @@ impl StorageStage { let cluster_info0 = cluster_info.clone(); let exit1 = exit.clone(); let keypair1 = keypair.clone(); + let storage_keypair1 = storage_keypair.clone(); let t_storage_create_accounts = Builder::new() .name("solana-storage-create-accounts".to_string()) .spawn(move || loop { match tx_receiver.recv_timeout(Duration::from_secs(1)) { Ok(mut tx) => { - if Self::send_transaction(&cluster_info0, &mut tx, &exit1, &keypair1, None) - .is_ok() + if Self::send_transaction( + &cluster_info0, + &mut tx, + &exit1, + &keypair1, + &storage_keypair1, + Some(storage_keypair1.pubkey()), + ) + .is_ok() { debug!("sent transaction: {:?}", tx); } @@ -220,11 +230,31 @@ impl StorageStage { } } + fn check_signature( + client: &ThinClient, + signature: &Signature, + exit: &Arc, + ) -> io::Result<()> { + for _ in 0..10 { + if client.check_signature(&signature) { + return Ok(()); + } + + if exit.load(Ordering::Relaxed) { + Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?; + } + + sleep(Duration::from_millis(200)); + } + Err(io::Error::new(io::ErrorKind::Other, "other failure")) + } + fn send_transaction( cluster_info: &Arc>, transaction: &mut Transaction, exit: &Arc, keypair: &Arc, + storage_keypair: &Arc, account_to_create: Option, ) -> io::Result<()> { let contact_info = cluster_info.read().unwrap().my_data(); @@ -234,12 +264,6 @@ impl StorageStage { Duration::from_secs(5), ); - if let Some(account) = account_to_create { - if client.get_account_data(&account).is_ok() { - return Ok(()); - } - } - let mut blockhash = None; for _ in 0..10 { if let Ok(new_blockhash) = client.get_recent_blockhash() { @@ -251,26 +275,32 @@ impl StorageStage { Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?; } } - if let Some(blockhash) = blockhash { - transaction.sign(&[keypair.as_ref()], blockhash); + if let Some(account) = account_to_create { + if client.get_account_data(&account).is_err() { + // TODO the account space needs to be well defined somewhere + let tx = SystemTransaction::new_program_account( + keypair, + &storage_keypair.pubkey(), + blockhash, + 1, + 1024 * 4, + &solana_storage_api::id(), + 0, + ); + let signature = client.transfer_signed(&tx).unwrap(); + Self::check_signature(&client, &signature, &exit)?; + } + } + transaction.sign(&[storage_keypair.as_ref()], blockhash); if exit.load(Ordering::Relaxed) { Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?; } if let Ok(signature) = client.transfer_signed(&transaction) { - for _ in 0..10 { - if client.check_signature(&signature) { - return Ok(()); - } - - if exit.load(Ordering::Relaxed) { - Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?; - } - - sleep(Duration::from_millis(200)); - } + Self::check_signature(&client, &signature, &exit)?; + return Ok(()); } } @@ -475,6 +505,7 @@ mod tests { #[test] fn test_storage_stage_none_ledger() { let keypair = Arc::new(Keypair::new()); + let storage_keypair = Arc::new(Keypair::new()); let exit = Arc::new(AtomicBool::new(false)); let cluster_info = test_cluster_info(&keypair.pubkey()); @@ -486,6 +517,7 @@ mod tests { storage_entry_receiver, None, &keypair, + &storage_keypair, &exit.clone(), 0, STORAGE_ROTATE_TEST_COUNT, @@ -505,6 +537,7 @@ mod tests { fn test_storage_stage_process_entries() { solana_logger::setup(); let keypair = Arc::new(Keypair::new()); + let storage_keypair = Arc::new(Keypair::new()); let exit = Arc::new(AtomicBool::new(false)); let (genesis_block, _mint_keypair) = GenesisBlock::new(1000); @@ -526,6 +559,7 @@ mod tests { storage_entry_receiver, Some(Arc::new(blocktree)), &keypair, + &storage_keypair, &exit.clone(), 0, STORAGE_ROTATE_TEST_COUNT, @@ -569,6 +603,7 @@ mod tests { fn test_storage_stage_process_proof_entries() { solana_logger::setup(); let keypair = Arc::new(Keypair::new()); + let storage_keypair = Arc::new(Keypair::new()); let exit = Arc::new(AtomicBool::new(false)); let (genesis_block, _mint_keypair) = GenesisBlock::new(1000); @@ -590,6 +625,7 @@ mod tests { storage_entry_receiver, Some(Arc::new(blocktree)), &keypair, + &storage_keypair, &exit.clone(), 0, STORAGE_ROTATE_TEST_COUNT, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 599205c12..6ccb3d8f4 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -135,11 +135,13 @@ impl Tvu { None }; + let storage_keypair = Arc::new(Keypair::new()); let storage_stage = StorageStage::new( storage_state, storage_entry_receiver, Some(blocktree), &keypair, + &storage_keypair, &exit, bank_forks_info[0].entry_height, // TODO: StorageStage needs to deal with BankForks somehow still storage_rotate_count, diff --git a/core/tests/replicator.rs b/core/tests/replicator.rs index f09436a2e..534112f05 100644 --- a/core/tests/replicator.rs +++ b/core/tests/replicator.rs @@ -6,7 +6,7 @@ extern crate solana; use bincode::{deserialize, serialize}; use solana::blocktree::{create_new_tmp_ledger, tmp_copy_blocktree, Blocktree}; -use solana::cluster_info::{ClusterInfo, Node}; +use solana::cluster_info::{ClusterInfo, Node, FULLNODE_PORT_RANGE}; use solana::contact_info::ContactInfo; use solana::fullnode::{Fullnode, FullnodeConfig}; use solana::gossip_service::discover; @@ -15,6 +15,7 @@ use solana::replicator::Replicator; use solana::replicator::ReplicatorRequest; use solana::storage_stage::STORAGE_ROTATE_TEST_COUNT; use solana::streamer::blob_receiver; +use solana_client::thin_client::create_client; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -159,6 +160,7 @@ fn test_replicator_startup_leader_hang() { { let replicator_keypair = Arc::new(Keypair::new()); + let storage_keypair = Arc::new(Keypair::new()); info!("starting replicator node"); let replicator_node = Node::new_localhost_with_pubkey(&replicator_keypair.pubkey()); @@ -171,6 +173,7 @@ fn test_replicator_startup_leader_hang() { replicator_node, leader_info, replicator_keypair, + storage_keypair, Some(Duration::from_secs(3)), ); @@ -231,6 +234,7 @@ fn test_replicator_startup_ledger_hang() { info!("starting replicator node"); let bad_keys = Arc::new(Keypair::new()); + let storage_keypair = Arc::new(Keypair::new()); let mut replicator_node = Node::new_localhost_with_pubkey(&bad_keys.pubkey()); // Pass bad TVU sockets to prevent successful ledger download @@ -243,6 +247,7 @@ fn test_replicator_startup_ledger_hang() { replicator_node, leader_info, bad_keys, + storage_keypair, Some(Duration::from_secs(3)), ); @@ -254,3 +259,38 @@ fn test_replicator_startup_ledger_hang() { let _ignored = remove_dir_all(&leader_ledger_path); let _ignored = remove_dir_all(&replicator_ledger_path); } + +#[test] +fn test_account_setup() { + let num_nodes = 1; + let num_replicators = 1; + let mut fullnode_config = FullnodeConfig::default(); + fullnode_config.storage_rotate_count = STORAGE_ROTATE_TEST_COUNT; + let cluster = LocalCluster::new_with_config_replicators( + &vec![100; num_nodes], + 10_000, + &fullnode_config, + num_replicators, + DEFAULT_TICKS_PER_SLOT, + DEFAULT_SLOTS_PER_EPOCH, + ); + + let _ = discover( + &cluster.entry_point_info.gossip, + num_nodes + num_replicators, + ) + .unwrap(); + // now check that the cluster actually has accounts for the replicator. + let client = create_client( + cluster.entry_point_info.client_facing_addr(), + FULLNODE_PORT_RANGE, + ); + cluster.replicator_infos.iter().for_each(|(_, value)| { + assert_eq!( + client + .poll_get_balance(&value.replicator_storage_id) + .unwrap(), + 1 + ); + }); +} diff --git a/replicator/src/main.rs b/replicator/src/main.rs index ca1416b83..eaaa758ae 100644 --- a/replicator/src/main.rs +++ b/replicator/src/main.rs @@ -81,9 +81,16 @@ fn main() { .unwrap(); let leader_info = ContactInfo::new_gossip_entry_point(&network_addr); - - let mut replicator = - Replicator::new(ledger_path, node, leader_info, Arc::new(keypair), None).unwrap(); + let storage_keypair = Arc::new(Keypair::new()); + let mut replicator = Replicator::new( + ledger_path, + node, + leader_info, + Arc::new(keypair), + storage_keypair, + None, + ) + .unwrap(); replicator.run();