Replicator rework

* Move more of the replicator logic into the replicator class
* Add support for the RPC interface to query the storage last_id value
  that the replicator would sign and use to pick a block.
* Fix replicator connecting to gossip and change test to exercise that
  scenario.
This commit is contained in:
Stephen Akridge 2018-11-02 08:40:29 -07:00 committed by sakridge
parent fa288ab197
commit 3441d3399b
8 changed files with 212 additions and 152 deletions

View File

@ -42,6 +42,7 @@ use std::result;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Instant;
use storage_stage::StorageState;
use tokio::prelude::Future;
/// The number of most recent `last_id` values that the bank will track the signatures
@ -301,6 +302,8 @@ pub struct Bank {
/// Tracks and updates the leader schedule based on the votes and account stakes
/// processed by the bank
pub leader_scheduler: Arc<RwLock<LeaderScheduler>>,
pub storage_state: StorageState,
}
impl Default for Bank {
@ -313,6 +316,7 @@ impl Default for Bank {
account_subscriptions: RwLock::new(HashMap::new()),
signature_subscriptions: RwLock::new(HashMap::new()),
leader_scheduler: Arc::new(RwLock::new(LeaderScheduler::default())),
storage_state: StorageState::new(),
}
}
}

View File

@ -8,25 +8,14 @@ extern crate solana_drone;
extern crate solana_sdk;
use clap::{App, Arg};
use solana::chacha::{chacha_cbc_encrypt_file, CHACHA_BLOCK_SIZE};
use solana::client::mk_client;
use solana::cluster_info::Node;
use solana::cluster_info::{Node, NodeInfo};
use solana::fullnode::Config;
use solana::ledger::LEDGER_DATA_FILE;
use solana::logger;
use solana::replicator::{sample_file, Replicator};
use solana_drone::drone::{request_airdrop_transaction, DRONE_PORT};
use solana::replicator::Replicator;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::storage_program::StorageTransaction;
use solana_sdk::transaction::Transaction;
use std::fs::File;
use std::net::{Ipv4Addr, SocketAddr};
use std::path::Path;
use std::process::exit;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
fn main() {
logger::setup();
@ -85,70 +74,14 @@ fn main() {
gossip
);
let exit = Arc::new(AtomicBool::new(false));
let done = Arc::new(AtomicBool::new(false));
let network_addr = matches
.value_of("network")
.map(|network| network.parse().expect("failed to parse network address"));
.map(|network| network.parse().expect("failed to parse network address"))
.unwrap();
// TODO: ask network what slice we should store
let entry_height = 0;
let leader_info = NodeInfo::new_entry_point(&network_addr);
let (replicator, leader_info) = Replicator::new(
entry_height,
5,
&exit,
ledger_path,
node,
network_addr,
done.clone(),
);
while !done.load(Ordering::Relaxed) {
sleep(Duration::from_millis(100));
}
println!("Done downloading ledger");
let ledger_path = Path::new(ledger_path.unwrap());
let ledger_data_file = ledger_path.join(LEDGER_DATA_FILE);
let ledger_data_file_encrypted = ledger_path.join(format!("{}.enc", LEDGER_DATA_FILE));
let mut ivec = [0u8; CHACHA_BLOCK_SIZE];
ivec[0..4].copy_from_slice(&[2, 3, 4, 5]);
if let Err(e) =
chacha_cbc_encrypt_file(&ledger_data_file, &ledger_data_file_encrypted, &mut ivec)
{
println!("Error while encrypting ledger: {:?}", e);
return;
}
println!("Done encrypting the ledger");
let sampling_offsets = [0, 1, 2, 3];
let mut client = mk_client(&leader_info);
let mut drone_addr = leader_info.tpu;
drone_addr.set_port(DRONE_PORT);
let airdrop_amount = 5;
let last_id = client.get_last_id();
let transaction =
request_airdrop_transaction(&drone_addr, &keypair.pubkey(), airdrop_amount, last_id)
.unwrap();
let signature = client.transfer_signed(&transaction).unwrap();
client.poll_for_signature(&signature).unwrap();
match sample_file(&ledger_data_file_encrypted, &sampling_offsets) {
Ok(hash) => {
let last_id = client.get_last_id();
println!("sampled hash: {}", hash);
let tx = Transaction::storage_new_mining_proof(&keypair, hash, last_id);
client.transfer_signed(&tx).expect("transfer didn't work!");
}
Err(e) => println!("Error occurred while sampling: {:?}", e),
}
let replicator = Replicator::new(ledger_path, node, &leader_info, &keypair).unwrap();
replicator.join();
}

View File

@ -1,10 +1,22 @@
use blob_fetch_stage::BlobFetchStage;
#[cfg(feature = "chacha")]
use chacha::{chacha_cbc_encrypt_file, CHACHA_BLOCK_SIZE};
use client::mk_client;
use cluster_info::{ClusterInfo, Node, NodeInfo};
use db_ledger::DbLedger;
use gossip_service::GossipService;
use leader_scheduler::LeaderScheduler;
use ledger::LEDGER_DATA_FILE;
use rand::thread_rng;
use rand::Rng;
use result::Result;
use rpc_request::{RpcClient, RpcRequest};
use service::Service;
use solana_drone::drone::{request_airdrop_transaction, DRONE_PORT};
use solana_sdk::hash::{Hash, Hasher};
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::storage_program::StorageTransaction;
use solana_sdk::transaction::Transaction;
use std::fs::File;
use std::io;
use std::io::BufReader;
@ -13,17 +25,16 @@ use std::io::Seek;
use std::io::SeekFrom;
use std::io::{Error, ErrorKind};
use std::mem::size_of;
use std::net::SocketAddr;
use std::net::UdpSocket;
use std::path::Path;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::thread::JoinHandle;
use std::time::Duration;
use store_ledger_stage::StoreLedgerStage;
use streamer::BlobReceiver;
use thin_client::poll_gossip_for_leader;
use window;
use window_service::window_service;
@ -33,6 +44,7 @@ pub struct Replicator {
store_ledger_stage: StoreLedgerStage,
t_window: JoinHandle<()>,
pub retransmit_receiver: BlobReceiver,
exit: Arc<AtomicBool>,
}
pub fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result<Hash> {
@ -72,39 +84,34 @@ pub fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result<Hash> {
impl Replicator {
pub fn new(
entry_height: u64,
max_entry_height: u64,
exit: &Arc<AtomicBool>,
ledger_path: Option<&str>,
node: Node,
network_addr: Option<SocketAddr>,
done: Arc<AtomicBool>,
) -> (Replicator, NodeInfo) {
leader_info: &NodeInfo,
keypair: &Keypair,
) -> Result<Self> {
let exit = Arc::new(AtomicBool::new(false));
let done = Arc::new(AtomicBool::new(false));
let entry_height = 0;
let max_entry_height = 1;
const REPLICATOR_WINDOW_SIZE: usize = 32 * 1024;
let window = window::new_window(REPLICATOR_WINDOW_SIZE);
let shared_window = Arc::new(RwLock::new(window));
info!("Replicator: id: {}", keypair.pubkey());
info!("Creating cluster info....");
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(node.info)));
let leader_info = network_addr.map(|i| NodeInfo::new_entry_point(&i));
let leader_pubkey;
if let Some(leader_info) = leader_info {
leader_pubkey = leader_info.id;
cluster_info.write().unwrap().insert_info(leader_info);
} else {
panic!("No leader info!");
let leader_pubkey = leader_info.id;
{
let mut cluster_info_w = cluster_info.write().unwrap();
cluster_info_w.insert_info(leader_info.clone());
cluster_info_w.set_leader(leader_info.id);
}
let repair_socket = Arc::new(node.sockets.repair);
let mut blob_sockets: Vec<Arc<UdpSocket>> =
node.sockets.replicate.into_iter().map(Arc::new).collect();
blob_sockets.push(repair_socket.clone());
let (fetch_stage, blob_fetch_receiver) =
BlobFetchStage::new_multi_socket(blob_sockets, exit.clone());
let (entry_window_sender, entry_window_receiver) = channel();
// todo: pull blobs off the retransmit_receiver and recycle them?
let (retransmit_sender, retransmit_receiver) = channel();
let store_ledger_stage = StoreLedgerStage::new(entry_window_receiver, ledger_path);
// Create the RocksDb ledger, eventually will simply repurpose the input
// ledger path as the RocksDb ledger path once we replace the ledger with
@ -116,6 +123,53 @@ impl Replicator {
.expect("Expected to be able to open database ledger"),
));
let gossip_service = GossipService::new(
&cluster_info,
shared_window.clone(),
ledger_path,
node.sockets.gossip,
exit.clone(),
);
info!("polling for leader");
let leader;
loop {
if let Some(l) = cluster_info.read().unwrap().get_gossip_top_leader() {
leader = l.clone();
break;
}
sleep(Duration::from_millis(900));
info!("{}", cluster_info.read().unwrap().node_info_trace());
}
info!("Got leader: {:?}", leader);
let rpc_client = {
let cluster_info = cluster_info.read().unwrap();
let rpc_peers = cluster_info.rpc_peers();
info!("rpc peers: {:?}", rpc_peers);
let node_idx = thread_rng().gen_range(0, rpc_peers.len());
RpcClient::new_from_socket(rpc_peers[node_idx].rpc)
};
let storage_last_id = RpcRequest::GetStorageMiningLastId
.make_rpc_request(&rpc_client, 2, None)
.expect("rpc request")
.to_string();
let _signature = keypair.sign(storage_last_id.as_ref());
// TODO: use this signature to pick the key and block
let repair_socket = Arc::new(node.sockets.repair);
let mut blob_sockets: Vec<Arc<UdpSocket>> =
node.sockets.replicate.into_iter().map(Arc::new).collect();
blob_sockets.push(repair_socket.clone());
let (fetch_stage, blob_fetch_receiver) =
BlobFetchStage::new_multi_socket(blob_sockets, exit.clone());
// todo: pull blobs off the retransmit_receiver and recycle them?
let (retransmit_sender, retransmit_receiver) = channel();
let t_window = window_service(
db_ledger,
cluster_info.clone(),
@ -129,32 +183,82 @@ impl Replicator {
Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(
leader_pubkey,
))),
done,
done.clone(),
);
let store_ledger_stage = StoreLedgerStage::new(entry_window_receiver, ledger_path);
info!("window created, waiting for ledger download done");
while !done.load(Ordering::Relaxed) {
sleep(Duration::from_millis(100));
}
let gossip_service = GossipService::new(
&cluster_info,
shared_window.clone(),
ledger_path,
node.sockets.gossip,
exit.clone(),
);
let mut client = mk_client(&leader);
let leader =
poll_gossip_for_leader(network_addr.unwrap(), Some(10)).expect("couldn't reach leader");
if client.get_balance(&keypair.pubkey()).is_err() {
let mut drone_addr = leader_info.tpu;
drone_addr.set_port(DRONE_PORT);
(
Replicator {
gossip_service,
fetch_stage,
store_ledger_stage,
t_window,
retransmit_receiver,
},
leader,
)
let airdrop_amount = 1;
let last_id = client.get_last_id();
match request_airdrop_transaction(
&drone_addr,
&keypair.pubkey(),
airdrop_amount,
last_id,
) {
Ok(transaction) => {
let signature = client.transfer_signed(&transaction).unwrap();
client.poll_for_signature(&signature).unwrap();
}
Err(err) => {
panic!(
"Error requesting airdrop: {:?} to addr: {:?} amount: {}",
err, drone_addr, airdrop_amount
);
}
};
}
info!("Done downloading ledger at {}", ledger_path.unwrap());
let ledger_path = Path::new(ledger_path.unwrap());
let ledger_data_file_encrypted = ledger_path.join(format!("{}.enc", LEDGER_DATA_FILE));
#[cfg(feature = "chacha")]
{
let ledger_data_file = ledger_path.join(LEDGER_DATA_FILE);
let mut ivec = [0u8; CHACHA_BLOCK_SIZE];
ivec[0..4].copy_from_slice(&[2, 3, 4, 5]);
chacha_cbc_encrypt_file(&ledger_data_file, &ledger_data_file_encrypted, &mut ivec)?;
}
info!("Done encrypting the ledger");
let sampling_offsets = [0, 1, 2, 3];
match sample_file(&ledger_data_file_encrypted, &sampling_offsets) {
Ok(hash) => {
let last_id = client.get_last_id();
info!("sampled hash: {}", hash);
let tx = Transaction::storage_new_mining_proof(&keypair, hash, last_id);
client.transfer_signed(&tx).expect("transfer didn't work!");
}
Err(e) => info!("Error occurred while sampling: {:?}", e),
}
Ok(Replicator {
gossip_service,
fetch_stage,
store_ledger_stage,
t_window,
retransmit_receiver,
exit,
})
}
pub fn close(self) {
self.exit.store(true, Ordering::Relaxed);
self.join()
}
pub fn join(self) {

View File

@ -153,6 +153,9 @@ build_rpc_trait! {
#[rpc(meta, name = "sendTransaction")]
fn send_transaction(&self, Self::Metadata, Vec<u8>) -> Result<String>;
#[rpc(meta, name = "getStorageMiningLastId")]
fn get_storage_mining_last_id(&self, Self::Metadata) -> Result<String>;
}
}
@ -279,6 +282,9 @@ impl RpcSol for RpcSolImpl {
);
Ok(signature)
}
fn get_storage_mining_last_id(&self, meta: Self::Metadata) -> Result<String> {
meta.request_processor.get_storage_mining_last_id()
}
}
#[derive(Clone)]
pub struct JsonRpcRequestProcessor {
@ -313,6 +319,10 @@ impl JsonRpcRequestProcessor {
fn get_transaction_count(&self) -> Result<u64> {
Ok(self.bank.transaction_count() as u64)
}
fn get_storage_mining_last_id(&self) -> Result<String> {
let id = self.bank.storage_state.get_last_id();
Ok(bs58::encode(id).into_string())
}
}
fn get_leader_addr(cluster_info: &Arc<RwLock<ClusterInfo>>) -> Result<SocketAddr> {
@ -391,6 +401,7 @@ mod tests {
let request_processor = JsonRpcRequestProcessor::new(Arc::new(bank));
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(NodeInfo::default())));
let leader = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
cluster_info.write().unwrap().insert_info(leader.clone());
cluster_info.write().unwrap().set_leader(leader.id);
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);

View File

@ -53,6 +53,7 @@ pub enum RpcRequest {
RegisterNode,
SignVote,
DeregisterNode,
GetStorageMiningLastId,
}
impl RpcRequest {
@ -95,6 +96,7 @@ impl RpcRequest {
RpcRequest::RegisterNode => "registerNode",
RpcRequest::SignVote => "signVote",
RpcRequest::DeregisterNode => "deregisterNode",
RpcRequest::GetStorageMiningLastId => "getStorageMiningLastId",
};
let mut request = json!({
"jsonrpc": jsonrpc,

View File

@ -10,7 +10,6 @@ use rand_chacha::ChaChaRng;
use result::{Error, Result};
use service::Service;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Keypair;
use solana_sdk::signature::Signature;
use solana_sdk::vote_program;
@ -30,6 +29,7 @@ type StorageKeys = Vec<u8>;
pub struct StorageState {
storage_results: Arc<RwLock<StorageResults>>,
storage_keys: Arc<RwLock<StorageKeys>>,
last_id: Hash,
}
pub struct StorageStage {
@ -49,7 +49,7 @@ const NUM_SAMPLES: usize = 4;
pub const ENTRIES_PER_SLICE: u64 = 16;
const KEY_SIZE: usize = 64;
fn get_identity_index_from_pubkey(key: &Pubkey) -> usize {
fn get_identity_index_from_signature(key: &Signature) -> usize {
let rkey = key.as_ref();
let mut res: usize = (rkey[0] as usize)
| ((rkey[1] as usize) << 8)
@ -67,18 +67,23 @@ impl StorageState {
StorageState {
storage_keys,
storage_results,
last_id: Hash::default(),
}
}
pub fn get_mining_key(&self, key: &Pubkey) -> Vec<u8> {
let idx = get_identity_index_from_pubkey(key);
pub fn get_mining_key(&self, key: &Signature) -> Vec<u8> {
let idx = get_identity_index_from_signature(key);
self.storage_keys.read().unwrap()[idx..idx + KEY_SIZE].to_vec()
}
pub fn get_mining_result(&self, key: &Pubkey) -> Hash {
let idx = get_identity_index_from_pubkey(key);
pub fn get_mining_result(&self, key: &Signature) -> Hash {
let idx = get_identity_index_from_signature(key);
self.storage_results.read().unwrap()[idx]
}
pub fn get_last_id(&self) -> Hash {
self.last_id
}
}
impl StorageStage {
@ -267,7 +272,8 @@ mod tests {
use rayon::prelude::*;
use service::Service;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::hash::Hasher;
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
use solana_sdk::transaction::Transaction;
use solana_sdk::vote_program::Vote;
use solana_sdk::vote_transaction::VoteTransaction;
@ -280,7 +286,7 @@ mod tests {
use std::time::Duration;
use storage_stage::StorageState;
use storage_stage::NUM_IDENTITIES;
use storage_stage::{get_identity_index_from_pubkey, StorageStage};
use storage_stage::{get_identity_index_from_signature, StorageStage};
#[test]
fn test_storage_stage_none_ledger() {
@ -335,14 +341,16 @@ mod tests {
storage_entry_sender.send(entries.clone()).unwrap();
let keypair = Keypair::new();
let mut result = storage_state.get_mining_result(&keypair.pubkey());
let hash = Hash::default();
let signature = Signature::new(keypair.sign(&hash.as_ref()).as_ref());
let mut result = storage_state.get_mining_result(&signature);
assert_eq!(result, Hash::default());
for _ in 0..9 {
storage_entry_sender.send(entries.clone()).unwrap();
}
for _ in 0..5 {
result = storage_state.get_mining_result(&keypair.pubkey());
result = storage_state.get_mining_result(&signature);
if result != Hash::default() {
info!("found result = {:?} sleeping..", result);
break;
@ -437,19 +445,22 @@ mod tests {
}
#[test]
fn test_pubkey_distribution() {
// See that pub keys have an even-ish distribution..
fn test_signature_distribution() {
// See that signatures have an even-ish distribution..
let mut hist = Arc::new(vec![]);
for _ in 0..NUM_IDENTITIES {
Arc::get_mut(&mut hist).unwrap().push(AtomicUsize::new(0));
}
let hasher = Hasher::default();
{
let hist = hist.clone();
(0..(32 * NUM_IDENTITIES))
.into_par_iter()
.for_each(move |_| {
let keypair = Keypair::new();
let ix = get_identity_index_from_pubkey(&keypair.pubkey());
let hash = hasher.clone().result();
let signature = Signature::new(keypair.sign(&hash.as_ref()).as_ref());
let ix = get_identity_index_from_signature(&signature);
hist[ix].fetch_add(1, Ordering::Relaxed);
});
}

View File

@ -24,7 +24,7 @@ use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread;
use storage_stage::{StorageStage, StorageState};
use storage_stage::StorageStage;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum TvuReturnType {
@ -105,9 +105,8 @@ impl Tvu {
let (ledger_write_stage, storage_entry_receiver) =
LedgerWriteStage::new(ledger_path, ledger_entry_receiver);
let storage_state = StorageState::new();
let storage_stage = StorageStage::new(
&storage_state,
&bank.storage_state,
storage_entry_receiver,
ledger_path,
keypair,

View File

@ -4,7 +4,7 @@ extern crate solana;
extern crate solana_sdk;
use solana::client::mk_client;
use solana::cluster_info::Node;
use solana::cluster_info::{Node, NodeInfo};
use solana::db_ledger::DbLedger;
use solana::fullnode::Fullnode;
use solana::leader_scheduler::LeaderScheduler;
@ -13,7 +13,6 @@ use solana::logger;
use solana::replicator::Replicator;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::fs::remove_dir_all;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
@ -22,16 +21,11 @@ use std::time::Duration;
fn test_replicator_startup() {
logger::setup();
info!("starting replicator test");
let entry_height = 0;
let replicator_ledger_path = &get_tmp_ledger_path("replicator_test_replicator_ledger");
let exit = Arc::new(AtomicBool::new(false));
let done = Arc::new(AtomicBool::new(false));
info!("starting leader node");
let leader_keypair = Arc::new(Keypair::new());
let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let network_addr = leader_node.sockets.gossip.local_addr().unwrap();
let leader_info = leader_node.info.clone();
let vote_account_keypair = Arc::new(Keypair::new());
@ -61,17 +55,21 @@ fn test_replicator_startup() {
let replicator_keypair = Keypair::new();
leader_client
.transfer(1, &mint.keypair(), replicator_keypair.pubkey(), &last_id)
.unwrap();
info!("starting replicator node");
let replicator_node = Node::new_localhost_with_pubkey(replicator_keypair.pubkey());
let (replicator, _leader_info) = Replicator::new(
entry_height,
1,
&exit,
let leader_info = NodeInfo::new_entry_point(&leader_info.gossip);
let replicator = Replicator::new(
Some(replicator_ledger_path),
replicator_node,
Some(network_addr),
done.clone(),
);
&leader_info,
&replicator_keypair,
).unwrap();
let mut num_entries = 0;
for _ in 0..60 {
@ -95,10 +93,8 @@ fn test_replicator_startup() {
.transfer(1, &mint.keypair(), bob.pubkey(), &last_id)
.unwrap();
}
assert_eq!(done.load(Ordering::Relaxed), true);
assert!(num_entries > 0);
exit.store(true, Ordering::Relaxed);
replicator.join();
replicator.close();
leader.exit();
}