Remove Thin Client from storage stage (#3976)

automerge
This commit is contained in:
Sagar Dhawan 2019-04-24 22:34:10 -07:00 committed by Grimes
parent e867ce0944
commit cf91ff8694
2 changed files with 83 additions and 104 deletions

View File

@ -2,27 +2,28 @@
// for storage mining. Replicators submit storage proofs, validator then bundles them // for storage mining. Replicators submit storage proofs, validator then bundles them
// to submit its proof for mining to be rewarded. // to submit its proof for mining to be rewarded.
use crate::bank_forks::BankForks;
use crate::blocktree::Blocktree; use crate::blocktree::Blocktree;
#[cfg(all(feature = "chacha", feature = "cuda"))] #[cfg(all(feature = "chacha", feature = "cuda"))]
use crate::chacha_cuda::chacha_cbc_encrypt_file_many_keys; use crate::chacha_cuda::chacha_cbc_encrypt_file_many_keys;
use crate::cluster_info::{ClusterInfo, FULLNODE_PORT_RANGE}; use crate::cluster_info::ClusterInfo;
use crate::entry::{Entry, EntryReceiver}; use crate::entry::{Entry, EntryReceiver};
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::service::Service; use crate::service::Service;
use bincode::deserialize; use bincode::deserialize;
use rand::{Rng, SeedableRng}; use rand::{Rng, SeedableRng};
use rand_chacha::ChaChaRng; use rand_chacha::ChaChaRng;
use solana_client::thin_client::{create_client_with_timeout, ThinClient};
use solana_sdk::client::{AsyncClient, SyncClient};
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
use solana_sdk::instruction::Instruction;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
use solana_sdk::system_transaction; use solana_sdk::system_instruction;
use solana_sdk::transaction::Transaction; use solana_sdk::transaction::Transaction;
use solana_storage_api::storage_instruction::{self, StorageInstruction}; use solana_storage_api::storage_instruction::{self, StorageInstruction};
use std::collections::HashSet; use std::collections::HashSet;
use std::io; use std::io;
use std::mem::size_of; use std::mem::size_of;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, RecvTimeoutError, Sender}; use std::sync::mpsc::{channel, RecvTimeoutError, Sender};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
@ -67,7 +68,7 @@ pub const NUM_STORAGE_SAMPLES: usize = 4;
pub const ENTRIES_PER_SEGMENT: u64 = 16; pub const ENTRIES_PER_SEGMENT: u64 = 16;
const KEY_SIZE: usize = 64; const KEY_SIZE: usize = 64;
type TransactionSender = Sender<Transaction>; type InstructionSender = Sender<Instruction>;
pub fn get_segment_from_entry(entry_height: u64) -> u64 { pub fn get_segment_from_entry(entry_height: u64) -> u64 {
entry_height / ENTRIES_PER_SEGMENT entry_height / ENTRIES_PER_SEGMENT
@ -138,6 +139,7 @@ impl StorageState {
} }
impl StorageStage { impl StorageStage {
#[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
storage_state: &StorageState, storage_state: &StorageState,
storage_entry_receiver: EntryReceiver, storage_entry_receiver: EntryReceiver,
@ -146,6 +148,7 @@ impl StorageStage {
storage_keypair: &Arc<Keypair>, storage_keypair: &Arc<Keypair>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
entry_height: u64, entry_height: u64,
bank_forks: &Arc<RwLock<BankForks>>,
storage_rotate_count: u64, storage_rotate_count: u64,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
) -> Self { ) -> Self {
@ -155,7 +158,7 @@ impl StorageStage {
let exit0 = exit.clone(); let exit0 = exit.clone();
let keypair0 = storage_keypair.clone(); let keypair0 = storage_keypair.clone();
let (tx_sender, tx_receiver) = channel(); let (instruction_sender, instruction_receiver) = channel();
let t_storage_mining_verifier = Builder::new() let t_storage_mining_verifier = Builder::new()
.name("solana-storage-mining-verify-stage".to_string()) .name("solana-storage-mining-verify-stage".to_string())
@ -174,7 +177,7 @@ impl StorageStage {
&mut entry_height, &mut entry_height,
&mut current_key, &mut current_key,
storage_rotate_count, storage_rotate_count,
&tx_sender, &instruction_sender,
) { ) {
match e { match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
@ -194,22 +197,26 @@ impl StorageStage {
let exit1 = exit.clone(); let exit1 = exit.clone();
let keypair1 = keypair.clone(); let keypair1 = keypair.clone();
let storage_keypair1 = storage_keypair.clone(); let storage_keypair1 = storage_keypair.clone();
let bank_forks1 = bank_forks.clone();
let t_storage_create_accounts = Builder::new() let t_storage_create_accounts = Builder::new()
.name("solana-storage-create-accounts".to_string()) .name("solana-storage-create-accounts".to_string())
.spawn(move || loop { .spawn(move || {
match tx_receiver.recv_timeout(Duration::from_secs(1)) { let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
Ok(mut tx) => { loop {
match instruction_receiver.recv_timeout(Duration::from_secs(1)) {
Ok(instruction) => {
if Self::send_transaction( if Self::send_transaction(
&bank_forks1,
&cluster_info0, &cluster_info0,
&mut tx, instruction,
&exit1,
&keypair1, &keypair1,
&storage_keypair1, &storage_keypair1,
Some(storage_keypair1.pubkey()), Some(storage_keypair1.pubkey()),
&transactions_socket,
) )
.is_ok() .is_err()
{ {
debug!("sent transaction: {:?}", tx); debug!("Failed to send storage transaction");
} }
} }
Err(e) => match e { Err(e) => match e {
@ -222,6 +229,7 @@ impl StorageStage {
break; break;
} }
sleep(Duration::from_millis(100)); sleep(Duration::from_millis(100));
}
}) })
.unwrap(); .unwrap();
@ -231,81 +239,43 @@ impl StorageStage {
} }
} }
fn check_signature(
client: &ThinClient,
signature: &Signature,
exit: &Arc<AtomicBool>,
) -> io::Result<()> {
for _ in 0..10 {
if client.check_signature(&signature) {
return Ok(());
}
if exit.load(Ordering::Relaxed) {
Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?;
}
sleep(Duration::from_millis(200));
}
Err(io::Error::new(io::ErrorKind::Other, "other failure"))
}
fn send_transaction( fn send_transaction(
bank_forks: &Arc<RwLock<BankForks>>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
transaction: &mut Transaction, instruction: Instruction,
exit: &Arc<AtomicBool>,
keypair: &Arc<Keypair>, keypair: &Arc<Keypair>,
storage_keypair: &Arc<Keypair>, storage_keypair: &Arc<Keypair>,
account_to_create: Option<Pubkey>, account_to_create: Option<Pubkey>,
transactions_socket: &UdpSocket,
) -> io::Result<()> { ) -> io::Result<()> {
let contact_info = cluster_info.read().unwrap().my_data(); let working_bank = bank_forks.read().unwrap().working_bank();
let client = create_client_with_timeout( let blockhash = working_bank.confirmed_last_blockhash();
contact_info.client_facing_addr(), let mut instructions = vec![];
FULLNODE_PORT_RANGE, let mut signing_keys = vec![];
Duration::from_secs(5),
);
let mut blockhash = None;
for _ in 0..10 {
if let Ok(new_blockhash) = client.get_recent_blockhash() {
blockhash = Some(new_blockhash);
break;
}
if exit.load(Ordering::Relaxed) {
Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?;
}
}
if let Some(blockhash) = blockhash {
if let Some(account) = account_to_create { if let Some(account) = account_to_create {
if client.get_account_data(&account).is_err() { if working_bank.get_account(&account).is_none() {
// TODO the account space needs to be well defined somewhere // TODO the account space needs to be well defined somewhere
let tx = system_transaction::create_account( let create_instruction = system_instruction::create_account(
keypair, &keypair.pubkey(),
&storage_keypair.pubkey(), &storage_keypair.pubkey(),
blockhash,
1, 1,
1024 * 4, 1024 * 4,
&solana_storage_api::id(), &solana_storage_api::id(),
0,
); );
let signature = client.async_send_transaction(tx).unwrap(); instructions.push(create_instruction);
Self::check_signature(&client, &signature, &exit)?; signing_keys.push(keypair.as_ref());
info!("storage account requested");
} }
} }
transaction.sign(&[storage_keypair.as_ref()], blockhash); instructions.push(instruction);
signing_keys.push(storage_keypair.as_ref());
if exit.load(Ordering::Relaxed) { let mut transaction = Transaction::new_unsigned_instructions(instructions);
Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?; transaction.sign(&signing_keys, blockhash);
} transactions_socket.send_to(
&bincode::serialize(&transaction).unwrap(),
if let Ok(signature) = client.async_send_transaction(transaction.clone()) { cluster_info.read().unwrap().my_data().tpu,
Self::check_signature(&client, &signature, &exit)?; )?;
return Ok(()); Ok(())
}
}
Err(io::Error::new(io::ErrorKind::Other, "other failure"))
} }
fn process_entry_crossing( fn process_entry_crossing(
@ -314,7 +284,7 @@ impl StorageStage {
_blocktree: &Arc<Blocktree>, _blocktree: &Arc<Blocktree>,
entry_id: Hash, entry_id: Hash,
entry_height: u64, entry_height: u64,
tx_sender: &TransactionSender, instruction_sender: &InstructionSender,
) -> Result<()> { ) -> Result<()> {
let mut seed = [0u8; 32]; let mut seed = [0u8; 32];
let signature = keypair.sign(&entry_id.as_ref()); let signature = keypair.sign(&entry_id.as_ref());
@ -324,8 +294,7 @@ impl StorageStage {
entry_id, entry_id,
entry_height, entry_height,
); );
let tx = Transaction::new_unsigned_instructions(vec![ix]); instruction_sender.send(ix)?;
tx_sender.send(tx)?;
seed.copy_from_slice(&signature.to_bytes()[..32]); seed.copy_from_slice(&signature.to_bytes()[..32]);
@ -394,7 +363,7 @@ impl StorageStage {
entry_height: &mut u64, entry_height: &mut u64,
current_key_idx: &mut usize, current_key_idx: &mut usize,
storage_rotate_count: u64, storage_rotate_count: u64,
tx_sender: &TransactionSender, instruction_sender: &InstructionSender,
) -> Result<()> { ) -> Result<()> {
let timeout = Duration::new(1, 0); let timeout = Duration::new(1, 0);
let entries: Vec<Entry> = entry_receiver.recv_timeout(timeout)?; let entries: Vec<Entry> = entry_receiver.recv_timeout(timeout)?;
@ -465,7 +434,7 @@ impl StorageStage {
&blocktree, &blocktree,
entry.hash, entry.hash,
*entry_height, *entry_height,
tx_sender, instruction_sender,
)?; )?;
} }
*entry_height += 1; *entry_height += 1;
@ -493,6 +462,7 @@ mod tests {
use crate::entry::{make_tiny_test_entries, Entry}; use crate::entry::{make_tiny_test_entries, Entry};
use crate::service::Service; use crate::service::Service;
use rayon::prelude::*; use rayon::prelude::*;
use solana_runtime::bank::Bank;
use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::{Hash, Hasher}; use solana_sdk::hash::{Hash, Hasher};
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
@ -512,7 +482,9 @@ mod tests {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let cluster_info = test_cluster_info(&keypair.pubkey()); let cluster_info = test_cluster_info(&keypair.pubkey());
let (genesis_block, _mint_keypair) = GenesisBlock::new(1000);
let bank = Arc::new(Bank::new(&genesis_block));
let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank])));
let (_storage_entry_sender, storage_entry_receiver) = channel(); let (_storage_entry_sender, storage_entry_receiver) = channel();
let storage_state = StorageState::new(); let storage_state = StorageState::new();
let storage_stage = StorageStage::new( let storage_stage = StorageStage::new(
@ -523,6 +495,7 @@ mod tests {
&storage_keypair, &storage_keypair,
&exit.clone(), &exit.clone(),
0, 0,
&bank_forks,
STORAGE_ROTATE_TEST_COUNT, STORAGE_ROTATE_TEST_COUNT,
&cluster_info, &cluster_info,
); );
@ -549,6 +522,8 @@ mod tests {
let entries = make_tiny_test_entries(64); let entries = make_tiny_test_entries(64);
let blocktree = Blocktree::open(&ledger_path).unwrap(); let blocktree = Blocktree::open(&ledger_path).unwrap();
let bank = Arc::new(Bank::new(&genesis_block));
let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank])));
blocktree blocktree
.write_entries(1, 0, 0, ticks_per_slot, &entries) .write_entries(1, 0, 0, ticks_per_slot, &entries)
.unwrap(); .unwrap();
@ -565,6 +540,7 @@ mod tests {
&storage_keypair, &storage_keypair,
&exit.clone(), &exit.clone(),
0, 0,
&bank_forks,
STORAGE_ROTATE_TEST_COUNT, STORAGE_ROTATE_TEST_COUNT,
&cluster_info, &cluster_info,
); );
@ -618,7 +594,8 @@ mod tests {
blocktree blocktree
.write_entries(1, 0, 0, ticks_per_slot, &entries) .write_entries(1, 0, 0, ticks_per_slot, &entries)
.unwrap(); .unwrap();
let bank = Arc::new(Bank::new(&genesis_block));
let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank])));
let cluster_info = test_cluster_info(&keypair.pubkey()); let cluster_info = test_cluster_info(&keypair.pubkey());
let (storage_entry_sender, storage_entry_receiver) = channel(); let (storage_entry_sender, storage_entry_receiver) = channel();
@ -631,6 +608,7 @@ mod tests {
&storage_keypair, &storage_keypair,
&exit.clone(), &exit.clone(),
0, 0,
&bank_forks,
STORAGE_ROTATE_TEST_COUNT, STORAGE_ROTATE_TEST_COUNT,
&cluster_info, &cluster_info,
); );

View File

@ -151,6 +151,7 @@ impl Tvu {
&storage_keypair, &storage_keypair,
&exit, &exit,
bank_forks_info[0].entry_height, // TODO: StorageStage needs to deal with BankForks somehow still bank_forks_info[0].entry_height, // TODO: StorageStage needs to deal with BankForks somehow still
&bank_forks,
storage_rotate_count, storage_rotate_count,
&cluster_info, &cluster_info,
); );