diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 3bc3e7309..a44c757e6 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -1,6 +1,5 @@ use crate::blob_fetch_stage::BlobFetchStage; use crate::blocktree::Blocktree; -use crate::blocktree_processor; #[cfg(feature = "chacha")] use crate::chacha::{chacha_cbc_encrypt_ledger, CHACHA_BLOCK_SIZE}; use crate::cluster_info::{ClusterInfo, Node, FULLNODE_PORT_RANGE}; @@ -18,7 +17,6 @@ use solana_client::client::create_client; use solana_client::rpc_request::{RpcClient, RpcRequest, RpcRequestHandler}; use solana_client::thin_client::{retry_get_balance, ThinClient}; use solana_drone::drone::{request_airdrop_transaction, DRONE_PORT}; -use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::{Hash, Hasher}; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_storage_api::StorageTransaction; @@ -32,6 +30,7 @@ use std::io::{Error, ErrorKind}; use std::mem::size_of; use std::net::UdpSocket; use std::path::Path; +use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; @@ -44,7 +43,19 @@ pub struct Replicator { window_service: WindowService, pub retransmit_receiver: BlobReceiver, exit: Arc, - entry_height: u64, + slot: u64, + ledger_path: String, + keypair: Arc, + signature: ring::signature::Signature, + cluster_entrypoint: ContactInfo, + node_info: ContactInfo, + cluster_info: Arc>, + ledger_data_file_encrypted: PathBuf, + sampling_offsets: Vec, + hash: Hash, + blocktree: Arc, + #[cfg(feature = "chacha")] + num_chacha_blocks: usize, } pub fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result { @@ -103,7 +114,7 @@ impl Replicator { /// * `ledger_path` - path to where the ledger will be stored. /// Causes panic if none /// * `node` - The replicator node - /// * `leader_info` - ContactInfo representing the leader + /// * `cluster_entrypoint` - ContactInfo representing an entry into the network /// * `keypair` - Keypair for this replicator /// * `timeout` - (optional) timeout for polling for leader/downloading the ledger. Defaults to /// 30 seconds @@ -111,8 +122,8 @@ impl Replicator { pub fn new( ledger_path: &str, node: Node, - leader_info: &ContactInfo, - keypair: &Arc, + cluster_entrypoint: ContactInfo, + keypair: Arc, _timeout: Option, ) -> Result { let exit = Arc::new(AtomicBool::new(false)); @@ -120,7 +131,7 @@ impl Replicator { info!("Replicator: id: {}", keypair.pubkey()); info!("Creating cluster info...."); let mut cluster_info = ClusterInfo::new(node.info.clone(), keypair.clone()); - cluster_info.set_entrypoint(leader_info.clone()); + cluster_info.set_entrypoint(cluster_entrypoint.clone()); let cluster_info = Arc::new(RwLock::new(cluster_info)); // Create Blocktree, eventually will simply repurpose the input @@ -131,16 +142,8 @@ impl Replicator { let blocktree = Blocktree::open(ledger_path).expect("Expected to be able to open database ledger"); - let genesis_block = - GenesisBlock::load(ledger_path).expect("Expected to successfully open genesis block"); - - let (_bank_forks, _bank_forks_info) = - blocktree_processor::process_blocktree(&genesis_block, &blocktree, None) - .expect("process_blocktree failed"); - let blocktree = Arc::new(blocktree); - //TODO(sagar) Does replicator need a bank also ? let gossip_service = GossipService::new( &cluster_info, Some(blocktree.clone()), @@ -149,19 +152,20 @@ impl Replicator { &exit, ); - info!("Looking for leader at {:?}", leader_info); - crate::gossip_service::discover(&leader_info.gossip, 1)?; + info!("Looking for leader at {:?}", cluster_entrypoint); + crate::gossip_service::discover(&cluster_entrypoint.gossip, 1)?; let (storage_blockhash, storage_entry_height) = Self::poll_for_blockhash_and_entry_height(&cluster_info)?; + let node_info = node.info.clone(); let signature = keypair.sign(storage_blockhash.as_ref()); - let entry_height = get_entry_heights_from_blockhash(&signature, storage_entry_height); - let mut repair_slot_range = RepairSlotRange::default(); - repair_slot_range.end = entry_height; - repair_slot_range.start = entry_height - ENTRIES_PER_SEGMENT; + let slot = get_entry_heights_from_blockhash(&signature, storage_entry_height); + info!("replicating slot: {}", slot); - info!("replicating entry_height: {}", entry_height); + let mut repair_slot_range = RepairSlotRange::default(); + repair_slot_range.end = slot + ENTRIES_PER_SEGMENT; + repair_slot_range.start = slot; let repair_socket = Arc::new(node.sockets.repair); let mut blob_sockets: Vec> = @@ -183,44 +187,96 @@ impl Replicator { repair_slot_range, ); + Ok(Self { + gossip_service, + fetch_stage, + window_service, + retransmit_receiver, + exit, + slot, + ledger_path: ledger_path.to_string(), + keypair: keypair.clone(), + signature, + cluster_entrypoint, + node_info, + cluster_info, + ledger_data_file_encrypted: PathBuf::default(), + sampling_offsets: vec![], + hash: Hash::default(), + blocktree, + #[cfg(feature = "chacha")] + num_chacha_blocks: 0, + }) + } + + pub fn run(&mut self) { + self.wait_for_ledger_download(); + self.encrypt_ledger() + .expect("ledger encrypt not successful"); + loop { + self.create_sampling_offsets(); + if self.sample_file_to_create_mining_hash().is_err() { + info!("Error sampling file, exiting..."); + break; + } + self.submit_mining_proof(); + } + } + + fn wait_for_ledger_download(&self) { info!("window created, waiting for ledger download done"); let _start = Instant::now(); let mut _received_so_far = 0; - /*while !done.load(Ordering::Relaxed) { - sleep(Duration::from_millis(100)); - - let elapsed = start.elapsed(); - received_so_far += entry_receiver.try_recv().map(|v| v.len()).unwrap_or(0); - - if received_so_far == 0 && elapsed > timeout { - return Err(result::Error::IO(io::Error::new( - ErrorKind::TimedOut, - "Timed out waiting to receive any blocks", - ))); + loop { + if let Ok(entries) = self.blocktree.get_slot_entries(self.slot, 0, None) { + if !entries.is_empty() { + break; + } } - }*/ + sleep(Duration::from_secs(1)); + } info!("Done receiving entries from window_service"); - let mut contact_info = node.info.clone(); + let mut contact_info = self.node_info.clone(); contact_info.tvu = "0.0.0.0:0".parse().unwrap(); { - let mut cluster_info_w = cluster_info.write().unwrap(); + let mut cluster_info_w = self.cluster_info.write().unwrap(); cluster_info_w.insert_self(contact_info); } - let mut client = create_client(leader_info.client_facing_addr(), FULLNODE_PORT_RANGE); + info!("Done downloading ledger at {}", self.ledger_path); + } - Self::get_airdrop_lamports(&mut client, &keypair, &leader_info); - info!("Done downloading ledger at {}", ledger_path); + fn encrypt_ledger(&mut self) -> Result<()> { + let ledger_path = Path::new(&self.ledger_path); + self.ledger_data_file_encrypted = ledger_path.join("ledger.enc"); - let ledger_path = Path::new(ledger_path); - let ledger_data_file_encrypted = ledger_path.join("ledger.enc"); - let mut sampling_offsets = Vec::new(); + #[cfg(feature = "chacha")] + { + let mut ivec = [0u8; 64]; + ivec.copy_from_slice(self.signature.as_ref()); + + let num_encrypted_bytes = chacha_cbc_encrypt_ledger( + &self.blocktree, + self.slot, + &self.ledger_data_file_encrypted, + &mut ivec, + )?; + + self.num_chacha_blocks = num_encrypted_bytes / CHACHA_BLOCK_SIZE; + } + + info!("Done encrypting the ledger"); + Ok(()) + } + + fn create_sampling_offsets(&mut self) { + self.sampling_offsets.clear(); #[cfg(not(feature = "chacha"))] - sampling_offsets.push(0); + self.sampling_offsets.push(0); #[cfg(feature = "chacha")] { @@ -228,53 +284,40 @@ impl Replicator { use rand::{Rng, SeedableRng}; use rand_chacha::ChaChaRng; - let mut ivec = [0u8; 64]; - ivec.copy_from_slice(signature.as_ref()); - - let num_encrypted_bytes = chacha_cbc_encrypt_ledger( - &blocktree, - entry_height, - &ledger_data_file_encrypted, - &mut ivec, - )?; - - let num_chacha_blocks = num_encrypted_bytes / CHACHA_BLOCK_SIZE; let mut rng_seed = [0u8; 32]; - rng_seed.copy_from_slice(&signature.as_ref()[0..32]); + rng_seed.copy_from_slice(&self.signature.as_ref()[0..32]); let mut rng = ChaChaRng::from_seed(rng_seed); for _ in 0..NUM_STORAGE_SAMPLES { - sampling_offsets.push(rng.gen_range(0, num_chacha_blocks) as u64); + self.sampling_offsets + .push(rng.gen_range(0, self.num_chacha_blocks) as u64); } } + } - info!("Done encrypting the ledger"); + fn sample_file_to_create_mining_hash(&mut self) -> Result<()> { + self.hash = sample_file(&self.ledger_data_file_encrypted, &self.sampling_offsets)?; + info!("sampled hash: {}", self.hash); + Ok(()) + } - match sample_file(&ledger_data_file_encrypted, &sampling_offsets) { - Ok(hash) => { - let blockhash = client.get_recent_blockhash(); - info!("sampled hash: {}", hash); - let mut tx = StorageTransaction::new_mining_proof( - &keypair, - hash, - blockhash, - entry_height, - Signature::new(signature.as_ref()), - ); - client - .retry_transfer(&keypair, &mut tx, 10) - .expect("transfer didn't work!"); - } - Err(e) => info!("Error occurred while sampling: {:?}", e), - } + fn submit_mining_proof(&self) { + let mut client = create_client( + self.cluster_entrypoint.client_facing_addr(), + FULLNODE_PORT_RANGE, + ); + Self::get_airdrop_lamports(&mut client, &self.keypair, &self.cluster_entrypoint); - Ok(Self { - gossip_service, - fetch_stage, - window_service, - retransmit_receiver, - exit, - entry_height, - }) + let blockhash = client.get_recent_blockhash(); + let mut tx = StorageTransaction::new_mining_proof( + &self.keypair, + self.hash, + blockhash, + self.slot, + Signature::new(self.signature.as_ref()), + ); + client + .retry_transfer(&self.keypair, &mut tx, 10) + .expect("transfer didn't work!"); } pub fn close(self) { @@ -297,7 +340,7 @@ impl Replicator { } pub fn entry_height(&self) -> u64 { - self.entry_height + self.slot } fn poll_for_blockhash_and_entry_height( @@ -321,10 +364,10 @@ impl Replicator { .expect("rpc request") .as_u64() .unwrap(); + info!("max entry_height: {}", storage_entry_height); if get_segment_from_entry(storage_entry_height) != 0 { return Ok((storage_blockhash, storage_entry_height)); } - info!("max entry_height: {}", storage_entry_height); sleep(Duration::from_secs(3)); } Err(Error::new( @@ -333,9 +376,13 @@ impl Replicator { ))? } - fn get_airdrop_lamports(client: &mut ThinClient, keypair: &Keypair, leader_info: &ContactInfo) { + fn get_airdrop_lamports( + client: &mut ThinClient, + keypair: &Keypair, + cluster_entrypoint: &ContactInfo, + ) { if retry_get_balance(client, &keypair.pubkey(), None).is_none() { - let mut drone_addr = leader_info.tpu; + let mut drone_addr = cluster_entrypoint.tpu; drone_addr.set_port(DRONE_PORT); let airdrop_amount = 1; diff --git a/replicator/src/main.rs b/replicator/src/main.rs index da93265b5..a54d5005d 100644 --- a/replicator/src/main.rs +++ b/replicator/src/main.rs @@ -82,8 +82,10 @@ fn main() { let leader_info = ContactInfo::new_gossip_entry_point(&network_addr); - let replicator = - Replicator::new(ledger_path, node, &leader_info, &Arc::new(keypair), None).unwrap(); + let mut replicator = + Replicator::new(ledger_path, node, leader_info, Arc::new(keypair), None).unwrap(); - replicator.join(); + replicator.run(); + + replicator.close(); } diff --git a/tests/replicator.rs b/tests/replicator.rs index 1bcfca315..61c2ca843 100644 --- a/tests/replicator.rs +++ b/tests/replicator.rs @@ -1,37 +1,26 @@ #[macro_use] extern crate log; -#[cfg(feature = "chacha")] -#[macro_use] -extern crate serde_json; - #[macro_use] extern crate solana; -use bincode::deserialize; use solana::blocktree::{ create_new_tmp_ledger, get_tmp_ledger_path, tmp_copy_blocktree, Blocktree, }; -use solana::cluster_info::{ClusterInfo, Node, FULLNODE_PORT_RANGE}; +use solana::cluster_info::{Node, FULLNODE_PORT_RANGE}; use solana::contact_info::ContactInfo; -use solana::entry::Entry; use solana::fullnode::{Fullnode, FullnodeConfig}; use solana::replicator::Replicator; use solana::storage_stage::STORAGE_ROTATE_TEST_COUNT; -use solana::streamer::blob_receiver; use solana_client::client::create_client; use solana_sdk::genesis_block::GenesisBlock; -use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction::SystemTransaction; use std::fs::remove_dir_all; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::channel; use std::sync::Arc; use std::time::Duration; #[test] -#[ignore] fn test_replicator_startup_basic() { solana_logger::setup(); info!("starting replicator test"); @@ -79,8 +68,6 @@ fn test_replicator_startup_basic() { .unwrap(); let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey()); - #[cfg(feature = "chacha")] - let validator_contact_info = validator_node.info.clone(); let validator = Fullnode::new( validator_node, @@ -130,91 +117,21 @@ fn test_replicator_startup_basic() { info!("starting replicator node"); let replicator_node = Node::new_localhost_with_pubkey(&replicator_keypair.pubkey()); - let replicator_info = replicator_node.info.clone(); + let _replicator_info = replicator_node.info.clone(); let leader_info = ContactInfo::new_gossip_entry_point(&leader_info.gossip); let replicator = Replicator::new( replicator_ledger_path, replicator_node, - &leader_info, - &replicator_keypair, + leader_info, + replicator_keypair, None, ) .unwrap(); info!("started replicator.."); - // Create a client which downloads from the replicator and see that it - // can respond with blobs. - let tn = Node::new_localhost(); - let cluster_info = ClusterInfo::new_with_invalid_keypair(tn.info.clone()); - let repair_index = replicator.entry_height(); - let req = cluster_info - .window_index_request_bytes(0, repair_index) - .unwrap(); - - let exit = Arc::new(AtomicBool::new(false)); - let (s_reader, r_reader) = channel(); - let repair_socket = Arc::new(tn.sockets.repair); - let t_receiver = blob_receiver(repair_socket.clone(), &exit, s_reader); - - info!( - "Sending repair requests from: {} to: {}", - tn.info.id, replicator_info.gossip - ); - - let mut received_blob = false; - for _ in 0..5 { - repair_socket.send_to(&req, replicator_info.gossip).unwrap(); - - let x = r_reader.recv_timeout(Duration::new(1, 0)); - - if let Ok(blobs) = x { - for b in blobs { - let br = b.read().unwrap(); - assert!(br.index() == repair_index); - let entry: Entry = deserialize(&br.data()[..br.meta.size]).unwrap(); - info!("entry: {:?}", entry); - assert_ne!(entry.hash, Hash::default()); - received_blob = true; - } - break; - } - } - exit.store(true, Ordering::Relaxed); - t_receiver.join().unwrap(); - - assert!(received_blob); - - // The replicator will not submit storage proofs if - // chacha is not enabled - #[cfg(feature = "chacha")] - { - use solana_client::rpc_request::{RpcClient, RpcRequest, RpcRequestHandler}; - use std::thread::sleep; - - info!( - "looking for pubkeys for entry: {}", - replicator.entry_height() - ); - let rpc_client = RpcClient::new_from_socket(validator_contact_info.rpc); - let mut non_zero_pubkeys = false; - for _ in 0..60 { - let params = json!([replicator.entry_height()]); - let pubkeys = rpc_client - .make_rpc_request(1, RpcRequest::GetStoragePubkeysForEntryHeight, Some(params)) - .unwrap(); - info!("pubkeys: {:?}", pubkeys); - if pubkeys.as_array().unwrap().len() != 0 { - non_zero_pubkeys = true; - break; - } - sleep(Duration::from_secs(1)); - } - assert!(non_zero_pubkeys); - } - replicator.close(); validator.close().unwrap(); leader.close().unwrap(); @@ -251,8 +168,8 @@ fn test_replicator_startup_leader_hang() { let replicator_res = Replicator::new( &replicator_ledger_path, replicator_node, - &leader_info, - &replicator_keypair, + leader_info, + replicator_keypair, Some(Duration::from_secs(3)), ); @@ -323,8 +240,8 @@ fn test_replicator_startup_ledger_hang() { let replicator_res = Replicator::new( &replicator_ledger_path, replicator_node, - &leader_info, - &bad_keys, + leader_info, + bad_keys, Some(Duration::from_secs(3)), );