From ef7022d638b077b715933148cbc52837f77b54cc Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Mon, 8 Jul 2019 10:17:25 -0700 Subject: [PATCH] Refactor replicators to not block on startup (#4932) * Refactor replicators to not block on startup * Ignore setup failure --- core/src/replicator.rs | 476 +++++++++++++++++++++---------------- core/src/window_service.rs | 6 +- replicator/src/main.rs | 5 +- 3 files changed, 275 insertions(+), 212 deletions(-) diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 4a1b0be69..d97701a1f 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -9,12 +9,15 @@ use crate::recycler::Recycler; use crate::repair_service::{RepairService, RepairSlotRange, RepairStrategy}; use crate::result::{Error, Result}; use crate::service::Service; -use crate::streamer::{blob_receiver, receiver, responder}; +use crate::storage_stage::NUM_STORAGE_SAMPLES; +use crate::streamer::{blob_receiver, receiver, responder, BlobReceiver}; use crate::window_service::WindowService; use crate::{repair_service, window_service}; use bincode::deserialize; use rand::thread_rng; use rand::Rng; +use rand::SeedableRng; +use rand_chacha::ChaChaRng; use solana_client::rpc_client::RpcClient; use solana_client::rpc_request::RpcRequest; use solana_client::thin_client::ThinClient; @@ -37,7 +40,7 @@ use std::net::{SocketAddr, UdpSocket}; use std::path::{Path, PathBuf}; use std::result; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::channel; +use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, RwLock}; use std::thread::{sleep, spawn, JoinHandle}; use std::time::Duration; @@ -48,23 +51,21 @@ pub enum ReplicatorRequest { } pub struct Replicator { - gossip_service: GossipService, - fetch_stage: BlobFetchStage, - window_service: WindowService, thread_handles: Vec>, exit: Arc, +} + +// Shared Replicator Meta struct used internally +#[derive(Default)] +struct ReplicatorMeta { slot: u64, ledger_path: String, - keypair: Arc, - storage_keypair: Arc, - blockhash: Hash, - signature: ed25519_dalek::Signature, - cluster_info: Arc>, + signature: Signature, ledger_data_file_encrypted: PathBuf, sampling_offsets: Vec, + blockhash: Hash, sha_state: Hash, num_chacha_blocks: usize, - blocktree: Arc, } pub(crate) fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result { @@ -116,7 +117,7 @@ fn get_slot_from_blockhash(signature: &ed25519_dalek::Signature, storage_slot: u fn create_request_processor( socket: UdpSocket, exit: &Arc, - slot: u64, + slot_receiver: Receiver, ) -> Vec> { let mut thread_handles = vec![]; let (s_reader, r_reader) = channel(); @@ -136,32 +137,50 @@ fn create_request_processor( thread_handles.push(t_responder); let exit = exit.clone(); - let t_processor = spawn(move || loop { - let packets = r_reader.recv_timeout(Duration::from_secs(1)); - if let Ok(packets) = packets { - for packet in &packets.packets { - let req: result::Result> = - deserialize(&packet.data[..packet.meta.size]); - match req { - Ok(ReplicatorRequest::GetSlotHeight(from)) => { - if let Ok(blob) = to_shared_blob(slot, from) { - let _ = s_responder.send(vec![blob]); + let t_processor = spawn(move || { + let slot = poll_for_slot(slot_receiver, &exit); + + loop { + if exit.load(Ordering::Relaxed) { + break; + } + + let packets = r_reader.recv_timeout(Duration::from_secs(1)); + + if let Ok(packets) = packets { + for packet in &packets.packets { + let req: result::Result> = + deserialize(&packet.data[..packet.meta.size]); + match req { + Ok(ReplicatorRequest::GetSlotHeight(from)) => { + if let Ok(blob) = to_shared_blob(slot, from) { + let _ = s_responder.send(vec![blob]); + } + } + Err(e) => { + info!("invalid request: {:?}", e); } - } - Err(e) => { - info!("invalid request: {:?}", e); } } } } - if exit.load(Ordering::Relaxed) { - break; - } }); thread_handles.push(t_processor); thread_handles } +fn poll_for_slot(receiver: Receiver, exit: &Arc) -> u64 { + loop { + let slot = receiver.recv_timeout(Duration::from_secs(1)); + if let Ok(slot) = slot { + return slot; + } + if exit.load(Ordering::Relaxed) { + return 0; + } + } +} + impl Replicator { /// Returns a Result that contains a replicator on success /// @@ -215,24 +234,12 @@ impl Replicator { }; let client = crate::gossip_service::get_client(&nodes); - let (storage_blockhash, storage_slot) = - match Self::poll_for_blockhash_and_slot(&cluster_info, &Hash::default()) { - Ok(blockhash_and_slot) => blockhash_and_slot, - Err(e) => { - //shutdown services before exiting - exit.store(true, Ordering::Relaxed); - gossip_service.join()?; - return Err(e); - } - }; - - let signature = storage_keypair.sign(storage_blockhash.as_ref()); - let slot = get_slot_from_blockhash(&signature, storage_slot); - info!("replicating slot: {}", slot); - - let mut repair_slot_range = RepairSlotRange::default(); - repair_slot_range.end = slot + SLOTS_PER_SEGMENT; - repair_slot_range.start = slot; + if let Err(e) = Self::setup_mining_account(&client, &keypair, &storage_keypair) { + //shutdown services before exiting + exit.store(true, Ordering::Relaxed); + gossip_service.join()?; + return Err(e); + }; let repair_socket = Arc::new(node.sockets.repair); let mut blob_sockets: Vec> = @@ -240,8 +247,175 @@ impl Replicator { blob_sockets.push(repair_socket.clone()); let (blob_fetch_sender, blob_fetch_receiver) = channel(); let fetch_stage = BlobFetchStage::new_multi_socket(blob_sockets, &blob_fetch_sender, &exit); + let (slot_sender, slot_receiver) = channel(); + let request_processor = + create_request_processor(node.sockets.storage.unwrap(), &exit, slot_receiver); - let (retransmit_sender, retransmit_receiver) = channel(); + let t_replicator = { + let exit = exit.clone(); + let node_info = node.info.clone(); + let mut meta = ReplicatorMeta { + ledger_path: ledger_path.to_string(), + ..ReplicatorMeta::default() + }; + spawn(move || { + // setup replicator + let window_service = Self::setup( + &mut meta, + cluster_info.clone(), + &blocktree, + &exit, + &node_info, + &storage_keypair, + repair_socket, + blob_fetch_receiver, + slot_sender, + ) + .ok(); + info!("setup complete"); + // run replicator + Self::run( + &mut meta, + &blocktree, + cluster_info, + &keypair, + &storage_keypair, + &exit, + ); + // wait until exit + request_processor + .into_iter() + .for_each(|t| t.join().unwrap()); + fetch_stage.join().unwrap(); + gossip_service.join().unwrap(); + if let Some(window) = window_service { + window.join().unwrap() + } + }) + }; + + Ok(Self { + thread_handles: vec![t_replicator], + exit, + }) + } + + fn run( + meta: &mut ReplicatorMeta, + blocktree: &Arc, + cluster_info: Arc>, + replicator_keypair: &Arc, + storage_keypair: &Arc, + exit: &Arc, + ) { + // encrypt segment + Self::encrypt_ledger(meta, blocktree).expect("ledger encrypt not successful"); + let enc_file_path = meta.ledger_data_file_encrypted.clone(); + // do replicate + loop { + if exit.load(Ordering::Relaxed) { + break; + } + + // TODO check if more segments are available - based on space constraints + Self::create_sampling_offsets(meta); + let sampling_offsets = &meta.sampling_offsets; + meta.sha_state = + match Self::sample_file_to_create_mining_hash(&enc_file_path, sampling_offsets) { + Ok(hash) => hash, + Err(err) => { + warn!("Error sampling file, exiting: {:?}", err); + break; + } + }; + + Self::submit_mining_proof(meta, &cluster_info, replicator_keypair, storage_keypair); + + // TODO make this a lot more frequent by picking a "new" blockhash instead of picking a storage blockhash + // prep the next proof + let (storage_blockhash, _) = + match Self::poll_for_blockhash_and_slot(&cluster_info, &meta.blockhash) { + Ok(blockhash_and_slot) => blockhash_and_slot, + Err(e) => { + warn!( + "Error couldn't get a newer blockhash than {:?}. {:?}", + meta.blockhash, e + ); + break; + } + }; + meta.blockhash = storage_blockhash; + Self::redeem_rewards(&cluster_info, replicator_keypair, storage_keypair); + } + } + + fn redeem_rewards( + cluster_info: &Arc>, + replicator_keypair: &Arc, + storage_keypair: &Arc, + ) { + let nodes = cluster_info.read().unwrap().tvu_peers(); + let client = crate::gossip_service::get_client(&nodes); + + if let Ok(Some(account)) = client.get_account(&storage_keypair.pubkey()) { + if let Ok(StorageContract::ReplicatorStorage { validations, .. }) = account.state() { + if !validations.is_empty() { + let ix = storage_instruction::claim_reward( + &replicator_keypair.pubkey(), + &storage_keypair.pubkey(), + ); + let message = + Message::new_with_payer(vec![ix], Some(&replicator_keypair.pubkey())); + if let Err(e) = client.send_message(&[&replicator_keypair], message) { + error!("unable to redeem reward, tx failed: {:?}", e); + } else { + info!( + "collected mining rewards: Account balance {:?}", + client.get_balance(&replicator_keypair.pubkey()) + ); + } + } + } + } else { + info!("Redeem mining reward: No account data found"); + } + } + + // Find a segment to replicate and download it. + fn setup( + meta: &mut ReplicatorMeta, + cluster_info: Arc>, + blocktree: &Arc, + exit: &Arc, + node_info: &ContactInfo, + storage_keypair: &Arc, + repair_socket: Arc, + blob_fetch_receiver: BlobReceiver, + slot_sender: Sender, + ) -> Result<(WindowService)> { + let (storage_blockhash, storage_slot) = + match Self::poll_for_blockhash_and_slot(&cluster_info, &Hash::default()) { + Ok(blockhash_and_slot) => blockhash_and_slot, + Err(e) => { + //shutdown services before exiting + exit.store(true, Ordering::Relaxed); + return Err(e); + } + }; + + let signature = storage_keypair.sign(storage_blockhash.as_ref()); + let slot = get_slot_from_blockhash(&signature, storage_slot); + info!("replicating slot: {}", slot); + slot_sender.send(slot)?; + meta.slot = slot; + meta.signature = Signature::new(&signature.to_bytes()); + meta.blockhash = storage_blockhash; + + let mut repair_slot_range = RepairSlotRange::default(); + repair_slot_range.end = slot + SLOTS_PER_SEGMENT; + repair_slot_range.start = slot; + + let (retransmit_sender, _) = channel(); let window_service = WindowService::new( blocktree.clone(), @@ -253,122 +427,12 @@ impl Replicator { RepairStrategy::RepairRange(repair_slot_range), |_, _, _| true, ); - - if let Err(e) = Self::setup_mining_account(&client, &keypair, &storage_keypair) { - //shutdown services before exiting - exit.store(true, Ordering::Relaxed); - gossip_service.join()?; - window_service.join()?; - fetch_stage.join()?; - return Err(e); - }; - let mut thread_handles = - create_request_processor(node.sockets.storage.unwrap(), &exit, slot); - - // receive blobs from retransmit and drop them. - let t_retransmit = { - let exit = exit.clone(); - spawn(move || loop { - let _ = retransmit_receiver.recv_timeout(Duration::from_secs(1)); - if exit.load(Ordering::Relaxed) { - break; - } - }) - }; - thread_handles.push(t_retransmit); - - let t_replicate = { - let exit = exit.clone(); - let blocktree = blocktree.clone(); - let cluster_info = cluster_info.clone(); - let node_info = node.info.clone(); - spawn(move || { - Self::wait_for_ledger_download(slot, &blocktree, &exit, &node_info, cluster_info) - }) - }; - //always push this last - thread_handles.push(t_replicate); - - Ok(Self { - gossip_service, - fetch_stage, - window_service, - thread_handles, - exit, - slot, - ledger_path: ledger_path.to_string(), - keypair, - storage_keypair, - blockhash: storage_blockhash, - signature, - cluster_info, - ledger_data_file_encrypted: PathBuf::default(), - sampling_offsets: vec![], - sha_state: Hash::default(), - num_chacha_blocks: 0, - blocktree, - }) - } - - pub fn run(&mut self) { info!("waiting for ledger download"); - self.thread_handles.pop().unwrap().join().unwrap(); - self.encrypt_ledger() - .expect("ledger encrypt not successful"); - loop { - self.create_sampling_offsets(); - if let Err(err) = self.sample_file_to_create_mining_hash() { - warn!("Error sampling file, exiting: {:?}", err); - break; - } - self.submit_mining_proof(); - - // Todo make this a lot more frequent by picking a "new" blockhash instead of picking a storage blockhash - // prep the next proof - let (storage_blockhash, _) = - match Self::poll_for_blockhash_and_slot(&self.cluster_info, &self.blockhash) { - Ok(blockhash_and_slot) => blockhash_and_slot, - Err(e) => { - warn!( - "Error couldn't get a newer blockhash than {:?}. {:?}", - self.blockhash, e - ); - break; - } - }; - self.blockhash = storage_blockhash; - self.redeem_rewards(); - } + Self::wait_for_segment_download(slot, &blocktree, &exit, &node_info, cluster_info); + Ok(window_service) } - fn redeem_rewards(&self) { - let nodes = self.cluster_info.read().unwrap().tvu_peers(); - let client = crate::gossip_service::get_client(&nodes); - - if let Ok(Some(account)) = client.get_account(&self.storage_keypair.pubkey()) { - if let Ok(StorageContract::ReplicatorStorage { validations, .. }) = account.state() { - if !validations.is_empty() { - let ix = storage_instruction::claim_reward( - &self.keypair.pubkey(), - &self.storage_keypair.pubkey(), - ); - let message = Message::new_with_payer(vec![ix], Some(&self.keypair.pubkey())); - if let Err(e) = client.send_message(&[&self.keypair], message) { - error!("unable to redeem reward, tx failed: {:?}", e); - } else { - info!( - "collected mining rewards: Account balance {:?}", - client.get_balance(&self.keypair.pubkey()) - ); - } - } - } - } else { - info!("Redeem mining reward: No account data found"); - } - } - - fn wait_for_ledger_download( + fn wait_for_segment_download( start_slot: u64, blocktree: &Arc, exit: &Arc, @@ -406,53 +470,49 @@ impl Replicator { } } - fn encrypt_ledger(&mut self) -> Result<()> { - let ledger_path = Path::new(&self.ledger_path); - self.ledger_data_file_encrypted = ledger_path.join("ledger.enc"); + fn encrypt_ledger(meta: &mut ReplicatorMeta, blocktree: &Arc) -> Result<()> { + let ledger_path = Path::new(&meta.ledger_path); + meta.ledger_data_file_encrypted = ledger_path.join("ledger.enc"); { let mut ivec = [0u8; 64]; - ivec.copy_from_slice(&self.signature.to_bytes()); + ivec.copy_from_slice(&meta.signature.as_ref()); let num_encrypted_bytes = chacha_cbc_encrypt_ledger( - &self.blocktree, - self.slot, - &self.ledger_data_file_encrypted, + blocktree, + meta.slot, + &meta.ledger_data_file_encrypted, &mut ivec, )?; - self.num_chacha_blocks = num_encrypted_bytes / CHACHA_BLOCK_SIZE; + meta.num_chacha_blocks = num_encrypted_bytes / CHACHA_BLOCK_SIZE; } info!( "Done encrypting the ledger: {:?}", - self.ledger_data_file_encrypted + meta.ledger_data_file_encrypted ); Ok(()) } - fn create_sampling_offsets(&mut self) { - self.sampling_offsets.clear(); - - { - use crate::storage_stage::NUM_STORAGE_SAMPLES; - use rand::SeedableRng; - use rand_chacha::ChaChaRng; - - let mut rng_seed = [0u8; 32]; - rng_seed.copy_from_slice(&self.blockhash.as_ref()); - let mut rng = ChaChaRng::from_seed(rng_seed); - for _ in 0..NUM_STORAGE_SAMPLES { - self.sampling_offsets - .push(rng.gen_range(0, self.num_chacha_blocks) as u64); - } + fn create_sampling_offsets(meta: &mut ReplicatorMeta) { + meta.sampling_offsets.clear(); + let mut rng_seed = [0u8; 32]; + rng_seed.copy_from_slice(&meta.blockhash.as_ref()); + let mut rng = ChaChaRng::from_seed(rng_seed); + for _ in 0..NUM_STORAGE_SAMPLES { + meta.sampling_offsets + .push(rng.gen_range(0, meta.num_chacha_blocks) as u64); } } - fn sample_file_to_create_mining_hash(&mut self) -> Result<()> { - self.sha_state = sample_file(&self.ledger_data_file_encrypted, &self.sampling_offsets)?; - info!("sampled sha_state: {}", self.sha_state); - Ok(()) + fn sample_file_to_create_mining_hash( + enc_file_path: &Path, + sampling_offsets: &[u64], + ) -> Result<(Hash)> { + let sha_state = sample_file(enc_file_path, sampling_offsets)?; + info!("sampled sha_state: {}", sha_state); + Ok(sha_state) } fn setup_mining_account( @@ -494,18 +554,20 @@ impl Replicator { Ok(()) } - fn submit_mining_proof(&self) { + fn submit_mining_proof( + meta: &ReplicatorMeta, + cluster_info: &Arc>, + replicator_keypair: &Arc, + storage_keypair: &Arc, + ) { // No point if we've got no storage account... - let nodes = self.cluster_info.read().unwrap().tvu_peers(); + let nodes = cluster_info.read().unwrap().tvu_peers(); let client = crate::gossip_service::get_client(&nodes); - assert!( - client - .poll_get_balance(&self.storage_keypair.pubkey()) - .unwrap() - > 0 - ); + assert!(client.poll_get_balance(&storage_keypair.pubkey()).unwrap() > 0); // ...or no lamports for fees - let balance = client.poll_get_balance(&self.keypair.pubkey()).unwrap(); + let balance = client + .poll_get_balance(&replicator_keypair.pubkey()) + .unwrap(); if balance == 0 { error!("Unable to submit mining proof, insufficient Replicator Account balance"); return; @@ -513,21 +575,22 @@ impl Replicator { let (blockhash, _) = client.get_recent_blockhash().expect("No recent blockhash"); let instruction = storage_instruction::mining_proof( - &self.storage_keypair.pubkey(), - self.sha_state, - get_segment_from_slot(self.slot), - Signature::new(&self.signature.to_bytes()), - self.blockhash, + &storage_keypair.pubkey(), + meta.sha_state, + get_segment_from_slot(meta.slot), + Signature::new(&meta.signature.as_ref()), + meta.blockhash, ); - let message = Message::new_with_payer(vec![instruction], Some(&self.keypair.pubkey())); + let message = + Message::new_with_payer(vec![instruction], Some(&replicator_keypair.pubkey())); let mut transaction = Transaction::new( - &[self.keypair.as_ref(), self.storage_keypair.as_ref()], + &[replicator_keypair.as_ref(), storage_keypair.as_ref()], message, blockhash, ); client .send_and_confirm_transaction( - &[&self.keypair, &self.storage_keypair], + &[&replicator_keypair, &storage_keypair], &mut transaction, 10, 0, @@ -541,9 +604,6 @@ impl Replicator { } pub fn join(self) { - self.gossip_service.join().unwrap(); - self.fetch_stage.join().unwrap(); - self.window_service.join().unwrap(); for handle in self.thread_handles { handle.join().unwrap(); } diff --git a/core/src/window_service.rs b/core/src/window_service.rs index c768fcc41..5e85fcc47 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -138,7 +138,11 @@ where .collect() }); - retransmit_blobs(&blobs, retransmit, my_pubkey)?; + match retransmit_blobs(&blobs, retransmit, my_pubkey) { + Ok(_) => Ok(()), + Err(Error::SendError) => Ok(()), + Err(e) => Err(e), + }?; trace!("{} num blobs received: {}", my_pubkey, blobs.len()); diff --git a/replicator/src/main.rs b/replicator/src/main.rs index a4ef18923..dcf9b915e 100644 --- a/replicator/src/main.rs +++ b/replicator/src/main.rs @@ -92,7 +92,7 @@ fn main() { ); let entrypoint_info = ContactInfo::new_gossip_entry_point(&entrypoint_addr); - let mut replicator = Replicator::new( + let replicator = Replicator::new( ledger_path, node, entrypoint_info, @@ -101,6 +101,5 @@ fn main() { ) .unwrap(); - replicator.run(); - replicator.close(); + replicator.join(); }