From 3441d3399bd0006545e4d946fe3ef1a71d2ab2c2 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Fri, 2 Nov 2018 08:40:29 -0700 Subject: [PATCH] Replicator rework * Move more of the replicator logic into the replicator class * Add support for the RPC interface to query the storage last_id value that the replicator would sign and use to pick a block. * Fix replicator connecting to gossip and change test to exercise that scenario. --- src/bank.rs | 4 + src/bin/replicator.rs | 79 ++--------------- src/replicator.rs | 196 ++++++++++++++++++++++++++++++++---------- src/rpc.rs | 11 +++ src/rpc_request.rs | 2 + src/storage_stage.rs | 37 +++++--- src/tvu.rs | 5 +- tests/replicator.rs | 30 +++---- 8 files changed, 212 insertions(+), 152 deletions(-) diff --git a/src/bank.rs b/src/bank.rs index 42d8373ed4..1da3006aab 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -42,6 +42,7 @@ use std::result; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::time::Instant; +use storage_stage::StorageState; use tokio::prelude::Future; /// The number of most recent `last_id` values that the bank will track the signatures @@ -301,6 +302,8 @@ pub struct Bank { /// Tracks and updates the leader schedule based on the votes and account stakes /// processed by the bank pub leader_scheduler: Arc>, + + pub storage_state: StorageState, } impl Default for Bank { @@ -313,6 +316,7 @@ impl Default for Bank { account_subscriptions: RwLock::new(HashMap::new()), signature_subscriptions: RwLock::new(HashMap::new()), leader_scheduler: Arc::new(RwLock::new(LeaderScheduler::default())), + storage_state: StorageState::new(), } } } diff --git a/src/bin/replicator.rs b/src/bin/replicator.rs index c95601c6d0..6527cf6fa8 100644 --- a/src/bin/replicator.rs +++ b/src/bin/replicator.rs @@ -8,25 +8,14 @@ extern crate solana_drone; extern crate solana_sdk; use clap::{App, Arg}; -use solana::chacha::{chacha_cbc_encrypt_file, CHACHA_BLOCK_SIZE}; -use solana::client::mk_client; -use solana::cluster_info::Node; +use solana::cluster_info::{Node, NodeInfo}; use solana::fullnode::Config; -use solana::ledger::LEDGER_DATA_FILE; use solana::logger; -use solana::replicator::{sample_file, Replicator}; -use solana_drone::drone::{request_airdrop_transaction, DRONE_PORT}; +use solana::replicator::Replicator; use solana_sdk::signature::{Keypair, KeypairUtil}; -use solana_sdk::storage_program::StorageTransaction; -use solana_sdk::transaction::Transaction; use std::fs::File; use std::net::{Ipv4Addr, SocketAddr}; -use std::path::Path; use std::process::exit; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::thread::sleep; -use std::time::Duration; fn main() { logger::setup(); @@ -85,70 +74,14 @@ fn main() { gossip ); - let exit = Arc::new(AtomicBool::new(false)); - let done = Arc::new(AtomicBool::new(false)); - let network_addr = matches .value_of("network") - .map(|network| network.parse().expect("failed to parse network address")); + .map(|network| network.parse().expect("failed to parse network address")) + .unwrap(); - // TODO: ask network what slice we should store - let entry_height = 0; + let leader_info = NodeInfo::new_entry_point(&network_addr); - let (replicator, leader_info) = Replicator::new( - entry_height, - 5, - &exit, - ledger_path, - node, - network_addr, - done.clone(), - ); - - while !done.load(Ordering::Relaxed) { - sleep(Duration::from_millis(100)); - } - - println!("Done downloading ledger"); - - let ledger_path = Path::new(ledger_path.unwrap()); - let ledger_data_file = ledger_path.join(LEDGER_DATA_FILE); - let ledger_data_file_encrypted = ledger_path.join(format!("{}.enc", LEDGER_DATA_FILE)); - let mut ivec = [0u8; CHACHA_BLOCK_SIZE]; - ivec[0..4].copy_from_slice(&[2, 3, 4, 5]); - - if let Err(e) = - chacha_cbc_encrypt_file(&ledger_data_file, &ledger_data_file_encrypted, &mut ivec) - { - println!("Error while encrypting ledger: {:?}", e); - return; - } - - println!("Done encrypting the ledger"); - - let sampling_offsets = [0, 1, 2, 3]; - - let mut client = mk_client(&leader_info); - - let mut drone_addr = leader_info.tpu; - drone_addr.set_port(DRONE_PORT); - let airdrop_amount = 5; - let last_id = client.get_last_id(); - let transaction = - request_airdrop_transaction(&drone_addr, &keypair.pubkey(), airdrop_amount, last_id) - .unwrap(); - let signature = client.transfer_signed(&transaction).unwrap(); - client.poll_for_signature(&signature).unwrap(); - - match sample_file(&ledger_data_file_encrypted, &sampling_offsets) { - Ok(hash) => { - let last_id = client.get_last_id(); - println!("sampled hash: {}", hash); - let tx = Transaction::storage_new_mining_proof(&keypair, hash, last_id); - client.transfer_signed(&tx).expect("transfer didn't work!"); - } - Err(e) => println!("Error occurred while sampling: {:?}", e), - } + let replicator = Replicator::new(ledger_path, node, &leader_info, &keypair).unwrap(); replicator.join(); } diff --git a/src/replicator.rs b/src/replicator.rs index 73c5c64e26..15c070797d 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -1,10 +1,22 @@ use blob_fetch_stage::BlobFetchStage; +#[cfg(feature = "chacha")] +use chacha::{chacha_cbc_encrypt_file, CHACHA_BLOCK_SIZE}; +use client::mk_client; use cluster_info::{ClusterInfo, Node, NodeInfo}; use db_ledger::DbLedger; use gossip_service::GossipService; use leader_scheduler::LeaderScheduler; +use ledger::LEDGER_DATA_FILE; +use rand::thread_rng; +use rand::Rng; +use result::Result; +use rpc_request::{RpcClient, RpcRequest}; use service::Service; +use solana_drone::drone::{request_airdrop_transaction, DRONE_PORT}; use solana_sdk::hash::{Hash, Hasher}; +use solana_sdk::signature::{Keypair, KeypairUtil}; +use solana_sdk::storage_program::StorageTransaction; +use solana_sdk::transaction::Transaction; use std::fs::File; use std::io; use std::io::BufReader; @@ -13,17 +25,16 @@ use std::io::Seek; use std::io::SeekFrom; use std::io::{Error, ErrorKind}; use std::mem::size_of; -use std::net::SocketAddr; use std::net::UdpSocket; use std::path::Path; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; +use std::thread::sleep; use std::thread::JoinHandle; use std::time::Duration; use store_ledger_stage::StoreLedgerStage; use streamer::BlobReceiver; -use thin_client::poll_gossip_for_leader; use window; use window_service::window_service; @@ -33,6 +44,7 @@ pub struct Replicator { store_ledger_stage: StoreLedgerStage, t_window: JoinHandle<()>, pub retransmit_receiver: BlobReceiver, + exit: Arc, } pub fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result { @@ -72,39 +84,34 @@ pub fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result { impl Replicator { pub fn new( - entry_height: u64, - max_entry_height: u64, - exit: &Arc, ledger_path: Option<&str>, node: Node, - network_addr: Option, - done: Arc, - ) -> (Replicator, NodeInfo) { + leader_info: &NodeInfo, + keypair: &Keypair, + ) -> Result { + let exit = Arc::new(AtomicBool::new(false)); + let done = Arc::new(AtomicBool::new(false)); + + let entry_height = 0; + let max_entry_height = 1; + const REPLICATOR_WINDOW_SIZE: usize = 32 * 1024; let window = window::new_window(REPLICATOR_WINDOW_SIZE); let shared_window = Arc::new(RwLock::new(window)); + info!("Replicator: id: {}", keypair.pubkey()); + info!("Creating cluster info...."); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(node.info))); - let leader_info = network_addr.map(|i| NodeInfo::new_entry_point(&i)); - let leader_pubkey; - if let Some(leader_info) = leader_info { - leader_pubkey = leader_info.id; - cluster_info.write().unwrap().insert_info(leader_info); - } else { - panic!("No leader info!"); + let leader_pubkey = leader_info.id; + { + let mut cluster_info_w = cluster_info.write().unwrap(); + cluster_info_w.insert_info(leader_info.clone()); + cluster_info_w.set_leader(leader_info.id); } - let repair_socket = Arc::new(node.sockets.repair); - let mut blob_sockets: Vec> = - node.sockets.replicate.into_iter().map(Arc::new).collect(); - blob_sockets.push(repair_socket.clone()); - let (fetch_stage, blob_fetch_receiver) = - BlobFetchStage::new_multi_socket(blob_sockets, exit.clone()); - let (entry_window_sender, entry_window_receiver) = channel(); - // todo: pull blobs off the retransmit_receiver and recycle them? - let (retransmit_sender, retransmit_receiver) = channel(); + let store_ledger_stage = StoreLedgerStage::new(entry_window_receiver, ledger_path); // Create the RocksDb ledger, eventually will simply repurpose the input // ledger path as the RocksDb ledger path once we replace the ledger with @@ -116,6 +123,53 @@ impl Replicator { .expect("Expected to be able to open database ledger"), )); + let gossip_service = GossipService::new( + &cluster_info, + shared_window.clone(), + ledger_path, + node.sockets.gossip, + exit.clone(), + ); + + info!("polling for leader"); + let leader; + loop { + if let Some(l) = cluster_info.read().unwrap().get_gossip_top_leader() { + leader = l.clone(); + break; + } + + sleep(Duration::from_millis(900)); + info!("{}", cluster_info.read().unwrap().node_info_trace()); + } + + info!("Got leader: {:?}", leader); + + let rpc_client = { + let cluster_info = cluster_info.read().unwrap(); + let rpc_peers = cluster_info.rpc_peers(); + info!("rpc peers: {:?}", rpc_peers); + let node_idx = thread_rng().gen_range(0, rpc_peers.len()); + RpcClient::new_from_socket(rpc_peers[node_idx].rpc) + }; + + let storage_last_id = RpcRequest::GetStorageMiningLastId + .make_rpc_request(&rpc_client, 2, None) + .expect("rpc request") + .to_string(); + let _signature = keypair.sign(storage_last_id.as_ref()); + // TODO: use this signature to pick the key and block + + let repair_socket = Arc::new(node.sockets.repair); + let mut blob_sockets: Vec> = + node.sockets.replicate.into_iter().map(Arc::new).collect(); + blob_sockets.push(repair_socket.clone()); + let (fetch_stage, blob_fetch_receiver) = + BlobFetchStage::new_multi_socket(blob_sockets, exit.clone()); + + // todo: pull blobs off the retransmit_receiver and recycle them? + let (retransmit_sender, retransmit_receiver) = channel(); + let t_window = window_service( db_ledger, cluster_info.clone(), @@ -129,32 +183,82 @@ impl Replicator { Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( leader_pubkey, ))), - done, + done.clone(), ); - let store_ledger_stage = StoreLedgerStage::new(entry_window_receiver, ledger_path); + info!("window created, waiting for ledger download done"); + while !done.load(Ordering::Relaxed) { + sleep(Duration::from_millis(100)); + } - let gossip_service = GossipService::new( - &cluster_info, - shared_window.clone(), - ledger_path, - node.sockets.gossip, - exit.clone(), - ); + let mut client = mk_client(&leader); - let leader = - poll_gossip_for_leader(network_addr.unwrap(), Some(10)).expect("couldn't reach leader"); + if client.get_balance(&keypair.pubkey()).is_err() { + let mut drone_addr = leader_info.tpu; + drone_addr.set_port(DRONE_PORT); - ( - Replicator { - gossip_service, - fetch_stage, - store_ledger_stage, - t_window, - retransmit_receiver, - }, - leader, - ) + let airdrop_amount = 1; + + let last_id = client.get_last_id(); + match request_airdrop_transaction( + &drone_addr, + &keypair.pubkey(), + airdrop_amount, + last_id, + ) { + Ok(transaction) => { + let signature = client.transfer_signed(&transaction).unwrap(); + client.poll_for_signature(&signature).unwrap(); + } + Err(err) => { + panic!( + "Error requesting airdrop: {:?} to addr: {:?} amount: {}", + err, drone_addr, airdrop_amount + ); + } + }; + } + + info!("Done downloading ledger at {}", ledger_path.unwrap()); + + let ledger_path = Path::new(ledger_path.unwrap()); + let ledger_data_file_encrypted = ledger_path.join(format!("{}.enc", LEDGER_DATA_FILE)); + #[cfg(feature = "chacha")] + { + let ledger_data_file = ledger_path.join(LEDGER_DATA_FILE); + let mut ivec = [0u8; CHACHA_BLOCK_SIZE]; + ivec[0..4].copy_from_slice(&[2, 3, 4, 5]); + + chacha_cbc_encrypt_file(&ledger_data_file, &ledger_data_file_encrypted, &mut ivec)?; + } + + info!("Done encrypting the ledger"); + + let sampling_offsets = [0, 1, 2, 3]; + + match sample_file(&ledger_data_file_encrypted, &sampling_offsets) { + Ok(hash) => { + let last_id = client.get_last_id(); + info!("sampled hash: {}", hash); + let tx = Transaction::storage_new_mining_proof(&keypair, hash, last_id); + client.transfer_signed(&tx).expect("transfer didn't work!"); + } + Err(e) => info!("Error occurred while sampling: {:?}", e), + } + + Ok(Replicator { + gossip_service, + fetch_stage, + store_ledger_stage, + t_window, + retransmit_receiver, + exit, + }) + } + + pub fn close(self) { + self.exit.store(true, Ordering::Relaxed); + self.join() } pub fn join(self) { diff --git a/src/rpc.rs b/src/rpc.rs index 6137c48db9..248ac6691b 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -153,6 +153,9 @@ build_rpc_trait! { #[rpc(meta, name = "sendTransaction")] fn send_transaction(&self, Self::Metadata, Vec) -> Result; + + #[rpc(meta, name = "getStorageMiningLastId")] + fn get_storage_mining_last_id(&self, Self::Metadata) -> Result; } } @@ -279,6 +282,9 @@ impl RpcSol for RpcSolImpl { ); Ok(signature) } + fn get_storage_mining_last_id(&self, meta: Self::Metadata) -> Result { + meta.request_processor.get_storage_mining_last_id() + } } #[derive(Clone)] pub struct JsonRpcRequestProcessor { @@ -313,6 +319,10 @@ impl JsonRpcRequestProcessor { fn get_transaction_count(&self) -> Result { Ok(self.bank.transaction_count() as u64) } + fn get_storage_mining_last_id(&self) -> Result { + let id = self.bank.storage_state.get_last_id(); + Ok(bs58::encode(id).into_string()) + } } fn get_leader_addr(cluster_info: &Arc>) -> Result { @@ -391,6 +401,7 @@ mod tests { let request_processor = JsonRpcRequestProcessor::new(Arc::new(bank)); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default()))); let leader = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); + cluster_info.write().unwrap().insert_info(leader.clone()); cluster_info.write().unwrap().set_leader(leader.id); let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); diff --git a/src/rpc_request.rs b/src/rpc_request.rs index 9ac36b0bfb..42b2e9cc7e 100644 --- a/src/rpc_request.rs +++ b/src/rpc_request.rs @@ -53,6 +53,7 @@ pub enum RpcRequest { RegisterNode, SignVote, DeregisterNode, + GetStorageMiningLastId, } impl RpcRequest { @@ -95,6 +96,7 @@ impl RpcRequest { RpcRequest::RegisterNode => "registerNode", RpcRequest::SignVote => "signVote", RpcRequest::DeregisterNode => "deregisterNode", + RpcRequest::GetStorageMiningLastId => "getStorageMiningLastId", }; let mut request = json!({ "jsonrpc": jsonrpc, diff --git a/src/storage_stage.rs b/src/storage_stage.rs index 57ee1c1693..5a708e7256 100644 --- a/src/storage_stage.rs +++ b/src/storage_stage.rs @@ -10,7 +10,6 @@ use rand_chacha::ChaChaRng; use result::{Error, Result}; use service::Service; use solana_sdk::hash::Hash; -use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Keypair; use solana_sdk::signature::Signature; use solana_sdk::vote_program; @@ -30,6 +29,7 @@ type StorageKeys = Vec; pub struct StorageState { storage_results: Arc>, storage_keys: Arc>, + last_id: Hash, } pub struct StorageStage { @@ -49,7 +49,7 @@ const NUM_SAMPLES: usize = 4; pub const ENTRIES_PER_SLICE: u64 = 16; const KEY_SIZE: usize = 64; -fn get_identity_index_from_pubkey(key: &Pubkey) -> usize { +fn get_identity_index_from_signature(key: &Signature) -> usize { let rkey = key.as_ref(); let mut res: usize = (rkey[0] as usize) | ((rkey[1] as usize) << 8) @@ -67,18 +67,23 @@ impl StorageState { StorageState { storage_keys, storage_results, + last_id: Hash::default(), } } - pub fn get_mining_key(&self, key: &Pubkey) -> Vec { - let idx = get_identity_index_from_pubkey(key); + pub fn get_mining_key(&self, key: &Signature) -> Vec { + let idx = get_identity_index_from_signature(key); self.storage_keys.read().unwrap()[idx..idx + KEY_SIZE].to_vec() } - pub fn get_mining_result(&self, key: &Pubkey) -> Hash { - let idx = get_identity_index_from_pubkey(key); + pub fn get_mining_result(&self, key: &Signature) -> Hash { + let idx = get_identity_index_from_signature(key); self.storage_results.read().unwrap()[idx] } + + pub fn get_last_id(&self) -> Hash { + self.last_id + } } impl StorageStage { @@ -267,7 +272,8 @@ mod tests { use rayon::prelude::*; use service::Service; use solana_sdk::hash::Hash; - use solana_sdk::signature::{Keypair, KeypairUtil}; + use solana_sdk::hash::Hasher; + use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::transaction::Transaction; use solana_sdk::vote_program::Vote; use solana_sdk::vote_transaction::VoteTransaction; @@ -280,7 +286,7 @@ mod tests { use std::time::Duration; use storage_stage::StorageState; use storage_stage::NUM_IDENTITIES; - use storage_stage::{get_identity_index_from_pubkey, StorageStage}; + use storage_stage::{get_identity_index_from_signature, StorageStage}; #[test] fn test_storage_stage_none_ledger() { @@ -335,14 +341,16 @@ mod tests { storage_entry_sender.send(entries.clone()).unwrap(); let keypair = Keypair::new(); - let mut result = storage_state.get_mining_result(&keypair.pubkey()); + let hash = Hash::default(); + let signature = Signature::new(keypair.sign(&hash.as_ref()).as_ref()); + let mut result = storage_state.get_mining_result(&signature); assert_eq!(result, Hash::default()); for _ in 0..9 { storage_entry_sender.send(entries.clone()).unwrap(); } for _ in 0..5 { - result = storage_state.get_mining_result(&keypair.pubkey()); + result = storage_state.get_mining_result(&signature); if result != Hash::default() { info!("found result = {:?} sleeping..", result); break; @@ -437,19 +445,22 @@ mod tests { } #[test] - fn test_pubkey_distribution() { - // See that pub keys have an even-ish distribution.. + fn test_signature_distribution() { + // See that signatures have an even-ish distribution.. let mut hist = Arc::new(vec![]); for _ in 0..NUM_IDENTITIES { Arc::get_mut(&mut hist).unwrap().push(AtomicUsize::new(0)); } + let hasher = Hasher::default(); { let hist = hist.clone(); (0..(32 * NUM_IDENTITIES)) .into_par_iter() .for_each(move |_| { let keypair = Keypair::new(); - let ix = get_identity_index_from_pubkey(&keypair.pubkey()); + let hash = hasher.clone().result(); + let signature = Signature::new(keypair.sign(&hash.as_ref()).as_ref()); + let ix = get_identity_index_from_signature(&signature); hist[ix].fetch_add(1, Ordering::Relaxed); }); } diff --git a/src/tvu.rs b/src/tvu.rs index 2602d7003f..2193f00526 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -24,7 +24,7 @@ use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread; -use storage_stage::{StorageStage, StorageState}; +use storage_stage::StorageStage; #[derive(Debug, PartialEq, Eq, Clone)] pub enum TvuReturnType { @@ -105,9 +105,8 @@ impl Tvu { let (ledger_write_stage, storage_entry_receiver) = LedgerWriteStage::new(ledger_path, ledger_entry_receiver); - let storage_state = StorageState::new(); let storage_stage = StorageStage::new( - &storage_state, + &bank.storage_state, storage_entry_receiver, ledger_path, keypair, diff --git a/tests/replicator.rs b/tests/replicator.rs index 936221bcc2..1994539c39 100644 --- a/tests/replicator.rs +++ b/tests/replicator.rs @@ -4,7 +4,7 @@ extern crate solana; extern crate solana_sdk; use solana::client::mk_client; -use solana::cluster_info::Node; +use solana::cluster_info::{Node, NodeInfo}; use solana::db_ledger::DbLedger; use solana::fullnode::Fullnode; use solana::leader_scheduler::LeaderScheduler; @@ -13,7 +13,6 @@ use solana::logger; use solana::replicator::Replicator; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::fs::remove_dir_all; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread::sleep; use std::time::Duration; @@ -22,16 +21,11 @@ use std::time::Duration; fn test_replicator_startup() { logger::setup(); info!("starting replicator test"); - let entry_height = 0; let replicator_ledger_path = &get_tmp_ledger_path("replicator_test_replicator_ledger"); - let exit = Arc::new(AtomicBool::new(false)); - let done = Arc::new(AtomicBool::new(false)); - info!("starting leader node"); let leader_keypair = Arc::new(Keypair::new()); let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); - let network_addr = leader_node.sockets.gossip.local_addr().unwrap(); let leader_info = leader_node.info.clone(); let vote_account_keypair = Arc::new(Keypair::new()); @@ -61,17 +55,21 @@ fn test_replicator_startup() { let replicator_keypair = Keypair::new(); + leader_client + .transfer(1, &mint.keypair(), replicator_keypair.pubkey(), &last_id) + .unwrap(); + info!("starting replicator node"); let replicator_node = Node::new_localhost_with_pubkey(replicator_keypair.pubkey()); - let (replicator, _leader_info) = Replicator::new( - entry_height, - 1, - &exit, + + let leader_info = NodeInfo::new_entry_point(&leader_info.gossip); + + let replicator = Replicator::new( Some(replicator_ledger_path), replicator_node, - Some(network_addr), - done.clone(), - ); + &leader_info, + &replicator_keypair, + ).unwrap(); let mut num_entries = 0; for _ in 0..60 { @@ -95,10 +93,8 @@ fn test_replicator_startup() { .transfer(1, &mint.keypair(), bob.pubkey(), &last_id) .unwrap(); } - assert_eq!(done.load(Ordering::Relaxed), true); assert!(num_entries > 0); - exit.store(true, Ordering::Relaxed); - replicator.join(); + replicator.close(); leader.exit(); }