Add storage stage which does storage mining verification for validators
This commit is contained in:
parent
47f1fa3f2e
commit
cf00354f42
|
@ -83,7 +83,6 @@ fn main() {
|
||||||
keypair.pubkey(),
|
keypair.pubkey(),
|
||||||
ncp
|
ncp
|
||||||
);
|
);
|
||||||
println!("my node: {:?}", node);
|
|
||||||
|
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let done = Arc::new(AtomicBool::new(false));
|
let done = Arc::new(AtomicBool::new(false));
|
||||||
|
|
|
@ -5,7 +5,7 @@ use sigverify::{chacha_cbc_encrypt_many_sample, chacha_end_sha_state, chacha_ini
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::mem::size_of;
|
use std::mem::size_of;
|
||||||
|
|
||||||
const ENTRIES_PER_SLICE: u64 = 16;
|
use storage_stage::ENTRIES_PER_SLICE;
|
||||||
|
|
||||||
// Encrypt a file with multiple starting IV states, determined by ivecs.len()
|
// Encrypt a file with multiple starting IV states, determined by ivecs.len()
|
||||||
//
|
//
|
||||||
|
@ -89,7 +89,6 @@ pub fn chacha_cbc_encrypt_file_many_keys(
|
||||||
num_keys as u32,
|
num_keys as u32,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
info!("num_keys: {}", num_keys);
|
|
||||||
let mut res = Vec::new();
|
let mut res = Vec::new();
|
||||||
for x in 0..num_keys {
|
for x in 0..num_keys {
|
||||||
let start = x * size_of::<Hash>();
|
let start = x * size_of::<Hash>();
|
||||||
|
|
|
@ -66,6 +66,7 @@ pub mod signature;
|
||||||
pub mod sigverify;
|
pub mod sigverify;
|
||||||
pub mod sigverify_stage;
|
pub mod sigverify_stage;
|
||||||
pub mod storage_program;
|
pub mod storage_program;
|
||||||
|
pub mod storage_stage;
|
||||||
pub mod storage_transaction;
|
pub mod storage_transaction;
|
||||||
pub mod store_ledger_stage;
|
pub mod store_ledger_stage;
|
||||||
pub mod streamer;
|
pub mod streamer;
|
||||||
|
|
|
@ -0,0 +1,446 @@
|
||||||
|
// A stage that handles generating the keys used to encrypt the ledger and sample it
|
||||||
|
// for storage mining. Replicators submit storage proofs, validator then bundles them
|
||||||
|
// to submit it's proof for mining to be rewarded.
|
||||||
|
|
||||||
|
#[cfg(feature = "cuda")]
|
||||||
|
use chacha_cuda::chacha_cbc_encrypt_file_many_keys;
|
||||||
|
use entry::EntryReceiver;
|
||||||
|
use hash::Hash;
|
||||||
|
use rand::{ChaChaRng, Rng, SeedableRng};
|
||||||
|
use result::{Error, Result};
|
||||||
|
use service::Service;
|
||||||
|
use signature::Keypair;
|
||||||
|
use signature::Signature;
|
||||||
|
use solana_sdk::pubkey::Pubkey;
|
||||||
|
use std::mem::size_of;
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
use std::sync::mpsc::RecvTimeoutError;
|
||||||
|
use std::sync::{Arc, RwLock};
|
||||||
|
use std::thread::{self, Builder, JoinHandle};
|
||||||
|
use std::time::Duration;
|
||||||
|
use vote_program::VoteProgram;
|
||||||
|
|
||||||
|
// Block of hash answers to validate against
|
||||||
|
// Vec of [ledger blocks] x [keys]
|
||||||
|
type StorageResults = Vec<Hash>;
|
||||||
|
type StorageKeys = Vec<u8>;
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct StorageState {
|
||||||
|
storage_results: Arc<RwLock<StorageResults>>,
|
||||||
|
storage_keys: Arc<RwLock<StorageKeys>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct StorageStage {
|
||||||
|
t_storage_mining_verifier: JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
macro_rules! cross_boundary {
|
||||||
|
($start:expr, $len:expr, $boundary:expr) => {
|
||||||
|
(($start + $len) & !($boundary - 1)) > $start & !($boundary - 1)
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
const NUM_HASHES_FOR_STORAGE_ROTATE: u64 = 1024;
|
||||||
|
// TODO: some way to dynamically size NUM_IDENTITIES
|
||||||
|
const NUM_IDENTITIES: usize = 1024;
|
||||||
|
const NUM_SAMPLES: usize = 4;
|
||||||
|
pub const ENTRIES_PER_SLICE: u64 = 16;
|
||||||
|
const KEY_SIZE: usize = 64;
|
||||||
|
|
||||||
|
fn get_identity_index_from_pubkey(key: &Pubkey) -> usize {
|
||||||
|
let rkey = key.as_ref();
|
||||||
|
let mut res: usize = (rkey[0] as usize)
|
||||||
|
| ((rkey[1] as usize) << 8)
|
||||||
|
| ((rkey[2] as usize) << 16)
|
||||||
|
| ((rkey[3] as usize) << 24);
|
||||||
|
res &= NUM_IDENTITIES - 1;
|
||||||
|
res
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StorageState {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
let storage_keys = Arc::new(RwLock::new(vec![0u8; KEY_SIZE * NUM_IDENTITIES]));
|
||||||
|
let storage_results = Arc::new(RwLock::new(vec![Hash::default(); NUM_IDENTITIES]));
|
||||||
|
|
||||||
|
StorageState {
|
||||||
|
storage_keys,
|
||||||
|
storage_results,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_mining_key(&self, key: &Pubkey) -> Vec<u8> {
|
||||||
|
let idx = get_identity_index_from_pubkey(key);
|
||||||
|
self.storage_keys.read().unwrap()[idx..idx + KEY_SIZE].to_vec()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_mining_result(&self, key: &Pubkey) -> Hash {
|
||||||
|
let idx = get_identity_index_from_pubkey(key);
|
||||||
|
self.storage_results.read().unwrap()[idx]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StorageStage {
|
||||||
|
pub fn new(
|
||||||
|
storage_state: &StorageState,
|
||||||
|
storage_entry_receiver: EntryReceiver,
|
||||||
|
ledger_path: Option<&str>,
|
||||||
|
keypair: Arc<Keypair>,
|
||||||
|
exit: Arc<AtomicBool>,
|
||||||
|
entry_height: u64,
|
||||||
|
) -> Self {
|
||||||
|
let storage_keys_ = storage_state.storage_keys.clone();
|
||||||
|
let storage_results_ = storage_state.storage_results.clone();
|
||||||
|
let ledger_path = ledger_path.map(String::from);
|
||||||
|
let t_storage_mining_verifier = Builder::new()
|
||||||
|
.name("solana-storage-mining-verify-stage".to_string())
|
||||||
|
.spawn(move || {
|
||||||
|
let exit = exit.clone();
|
||||||
|
let mut poh_height = 0;
|
||||||
|
let mut current_key = 0;
|
||||||
|
let mut entry_height = entry_height;
|
||||||
|
loop {
|
||||||
|
if let Some(ref ledger_path_str) = ledger_path {
|
||||||
|
if let Err(e) = Self::process_entries(
|
||||||
|
&keypair,
|
||||||
|
&storage_keys_,
|
||||||
|
&storage_results_,
|
||||||
|
&storage_entry_receiver,
|
||||||
|
ledger_path_str,
|
||||||
|
&mut poh_height,
|
||||||
|
&mut entry_height,
|
||||||
|
&mut current_key,
|
||||||
|
) {
|
||||||
|
match e {
|
||||||
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||||
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||||
|
_ => info!("Error from process_entries: {:?}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if exit.load(Ordering::Relaxed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}).unwrap();
|
||||||
|
|
||||||
|
StorageStage {
|
||||||
|
t_storage_mining_verifier,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn process_entry_crossing(
|
||||||
|
_storage_results: &Arc<RwLock<StorageResults>>,
|
||||||
|
_storage_keys: &Arc<RwLock<StorageKeys>>,
|
||||||
|
keypair: &Arc<Keypair>,
|
||||||
|
_ledger_path: &str,
|
||||||
|
entry_id: Hash,
|
||||||
|
entry_height: u64,
|
||||||
|
) -> Result<()> {
|
||||||
|
let mut seed = [0u8; 32];
|
||||||
|
let signature = keypair.sign(&entry_id.as_ref());
|
||||||
|
|
||||||
|
seed.copy_from_slice(&signature.as_ref()[..32]);
|
||||||
|
|
||||||
|
let mut rng = ChaChaRng::from_seed(seed);
|
||||||
|
|
||||||
|
// Regenerate the answers
|
||||||
|
let num_slices = (entry_height / ENTRIES_PER_SLICE) as usize;
|
||||||
|
if num_slices == 0 {
|
||||||
|
info!("Ledger has 0 slices!");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
// TODO: what if the validator does not have this slice
|
||||||
|
let slice = signature.as_ref()[0] as usize % num_slices;
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
"storage verifying: slice: {} identities: {}",
|
||||||
|
slice, NUM_IDENTITIES,
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut samples = vec![];
|
||||||
|
for _ in 0..NUM_SAMPLES {
|
||||||
|
samples.push(rng.gen_range(0, 10));
|
||||||
|
}
|
||||||
|
debug!("generated samples: {:?}", samples);
|
||||||
|
// TODO: cuda required to generate the reference values
|
||||||
|
// but if it is missing, then we need to take care not to
|
||||||
|
// process storage mining results.
|
||||||
|
#[cfg(feature = "cuda")]
|
||||||
|
{
|
||||||
|
let mut storage_results = _storage_results.write().unwrap();
|
||||||
|
|
||||||
|
// Lock the keys, since this is the IV memory,
|
||||||
|
// it will be updated in-place by the encryption.
|
||||||
|
// Should be overwritten by the vote signatures which replace the
|
||||||
|
// key values by the time it runs again.
|
||||||
|
let mut storage_keys = _storage_keys.write().unwrap();
|
||||||
|
|
||||||
|
match chacha_cbc_encrypt_file_many_keys(
|
||||||
|
_ledger_path,
|
||||||
|
slice as u64,
|
||||||
|
&mut storage_keys,
|
||||||
|
&samples,
|
||||||
|
) {
|
||||||
|
Ok(hashes) => {
|
||||||
|
debug!("Success! encrypted ledger slice: {}", slice);
|
||||||
|
storage_results.copy_from_slice(&hashes);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
info!("error encrypting file: {:?}", e);
|
||||||
|
Err(e)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// TODO: bundle up mining submissions from replicators
|
||||||
|
// and submit them in a tx to the leader to get reward.
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn process_entries(
|
||||||
|
keypair: &Arc<Keypair>,
|
||||||
|
storage_keys: &Arc<RwLock<StorageKeys>>,
|
||||||
|
storage_results: &Arc<RwLock<StorageResults>>,
|
||||||
|
entry_receiver: &EntryReceiver,
|
||||||
|
ledger_path: &str,
|
||||||
|
poh_height: &mut u64,
|
||||||
|
entry_height: &mut u64,
|
||||||
|
current_key_idx: &mut usize,
|
||||||
|
) -> Result<()> {
|
||||||
|
let timeout = Duration::new(1, 0);
|
||||||
|
let entries = entry_receiver.recv_timeout(timeout)?;
|
||||||
|
|
||||||
|
for entry in entries {
|
||||||
|
// Go through the transactions, find votes, and use them to update
|
||||||
|
// the storage_keys with their signatures.
|
||||||
|
for tx in entry.transactions {
|
||||||
|
for program_id in tx.program_ids {
|
||||||
|
if VoteProgram::check_id(&program_id) {
|
||||||
|
debug!(
|
||||||
|
"generating storage_keys from votes current_key_idx: {}",
|
||||||
|
*current_key_idx
|
||||||
|
);
|
||||||
|
let mut storage_keys = storage_keys.write().unwrap();
|
||||||
|
storage_keys[*current_key_idx..*current_key_idx + size_of::<Signature>()]
|
||||||
|
.copy_from_slice(tx.signature.as_ref());
|
||||||
|
*current_key_idx += size_of::<Signature>();
|
||||||
|
*current_key_idx %= storage_keys.len();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if cross_boundary!(*poh_height, entry.num_hashes, NUM_HASHES_FOR_STORAGE_ROTATE) {
|
||||||
|
info!(
|
||||||
|
"crosses sending at poh_height: {} entry_height: {}! hashes: {}",
|
||||||
|
*poh_height, entry_height, entry.num_hashes
|
||||||
|
);
|
||||||
|
Self::process_entry_crossing(
|
||||||
|
&storage_results,
|
||||||
|
&storage_keys,
|
||||||
|
&keypair,
|
||||||
|
&ledger_path,
|
||||||
|
entry.id,
|
||||||
|
*entry_height,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
*entry_height += 1;
|
||||||
|
*poh_height += entry.num_hashes;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Service for StorageStage {
|
||||||
|
type JoinReturnType = ();
|
||||||
|
|
||||||
|
fn join(self) -> thread::Result<()> {
|
||||||
|
self.t_storage_mining_verifier.join()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use entry::Entry;
|
||||||
|
use hash::Hash;
|
||||||
|
use ledger::make_tiny_test_entries;
|
||||||
|
use ledger::{create_tmp_sample_ledger, LedgerWriter};
|
||||||
|
use logger;
|
||||||
|
use service::Service;
|
||||||
|
use signature::{Keypair, KeypairUtil};
|
||||||
|
use std::cmp::{max, min};
|
||||||
|
use std::fs::remove_dir_all;
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
use std::sync::mpsc::channel;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::thread::sleep;
|
||||||
|
use std::time::Duration;
|
||||||
|
use storage_stage::StorageState;
|
||||||
|
use storage_stage::NUM_IDENTITIES;
|
||||||
|
use storage_stage::{get_identity_index_from_pubkey, StorageStage};
|
||||||
|
use transaction::Transaction;
|
||||||
|
use vote_program::Vote;
|
||||||
|
use vote_transaction::VoteTransaction;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_storage_stage_none_ledger() {
|
||||||
|
let keypair = Arc::new(Keypair::new());
|
||||||
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
|
||||||
|
let (_storage_entry_sender, storage_entry_receiver) = channel();
|
||||||
|
let storage_state = StorageState::new();
|
||||||
|
let storage_stage = StorageStage::new(
|
||||||
|
&storage_state,
|
||||||
|
storage_entry_receiver,
|
||||||
|
None,
|
||||||
|
keypair,
|
||||||
|
exit.clone(),
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
exit.store(true, Ordering::Relaxed);
|
||||||
|
storage_stage.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_storage_stage_process_entries() {
|
||||||
|
logger::setup();
|
||||||
|
let keypair = Arc::new(Keypair::new());
|
||||||
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
|
||||||
|
let (_mint, ledger_path, _genesis) =
|
||||||
|
create_tmp_sample_ledger("storage_stage_process_entries", 1000, 1);
|
||||||
|
|
||||||
|
let entries = make_tiny_test_entries(128);
|
||||||
|
{
|
||||||
|
let mut writer = LedgerWriter::open(&ledger_path, true).unwrap();
|
||||||
|
writer.write_entries(&entries.clone()).unwrap();
|
||||||
|
// drops writer, flushes buffers
|
||||||
|
}
|
||||||
|
|
||||||
|
let (storage_entry_sender, storage_entry_receiver) = channel();
|
||||||
|
let storage_state = StorageState::new();
|
||||||
|
let storage_stage = StorageStage::new(
|
||||||
|
&storage_state,
|
||||||
|
storage_entry_receiver,
|
||||||
|
Some(&ledger_path),
|
||||||
|
keypair,
|
||||||
|
exit.clone(),
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
storage_entry_sender.send(entries.clone()).unwrap();
|
||||||
|
|
||||||
|
let keypair = Keypair::new();
|
||||||
|
let mut result = storage_state.get_mining_result(&keypair.pubkey());
|
||||||
|
assert_eq!(result, Hash::default());
|
||||||
|
|
||||||
|
for _ in 0..9 {
|
||||||
|
storage_entry_sender.send(entries.clone()).unwrap();
|
||||||
|
}
|
||||||
|
for _ in 0..5 {
|
||||||
|
result = storage_state.get_mining_result(&keypair.pubkey());
|
||||||
|
if result != Hash::default() {
|
||||||
|
info!("found result = {:?} sleeping..", result);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
info!("result = {:?} sleeping..", result);
|
||||||
|
sleep(Duration::new(1, 0));
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("joining..?");
|
||||||
|
exit.store(true, Ordering::Relaxed);
|
||||||
|
storage_stage.join().unwrap();
|
||||||
|
|
||||||
|
#[cfg(not(feature = "cuda"))]
|
||||||
|
assert_eq!(result, Hash::default());
|
||||||
|
|
||||||
|
#[cfg(feature = "cuda")]
|
||||||
|
assert_ne!(result, Hash::default());
|
||||||
|
|
||||||
|
remove_dir_all(ledger_path).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_storage_stage_process_vote_entries() {
|
||||||
|
logger::setup();
|
||||||
|
let keypair = Arc::new(Keypair::new());
|
||||||
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
|
||||||
|
let (_mint, ledger_path, _genesis) =
|
||||||
|
create_tmp_sample_ledger("storage_stage_process_entries", 1000, 1);
|
||||||
|
|
||||||
|
let entries = make_tiny_test_entries(128);
|
||||||
|
{
|
||||||
|
let mut writer = LedgerWriter::open(&ledger_path, true).unwrap();
|
||||||
|
writer.write_entries(&entries.clone()).unwrap();
|
||||||
|
// drops writer, flushes buffers
|
||||||
|
}
|
||||||
|
|
||||||
|
let (storage_entry_sender, storage_entry_receiver) = channel();
|
||||||
|
let storage_state = StorageState::new();
|
||||||
|
let storage_stage = StorageStage::new(
|
||||||
|
&storage_state,
|
||||||
|
storage_entry_receiver,
|
||||||
|
Some(&ledger_path),
|
||||||
|
keypair,
|
||||||
|
exit.clone(),
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
storage_entry_sender.send(entries.clone()).unwrap();
|
||||||
|
|
||||||
|
let mut reference_keys;
|
||||||
|
{
|
||||||
|
let keys = storage_state.storage_keys.read().unwrap();
|
||||||
|
reference_keys = vec![0; keys.len()];
|
||||||
|
reference_keys.copy_from_slice(&keys);
|
||||||
|
}
|
||||||
|
let mut vote_txs: Vec<Transaction> = Vec::new();
|
||||||
|
let vote = Vote {
|
||||||
|
tick_height: 123456,
|
||||||
|
};
|
||||||
|
let keypair = Keypair::new();
|
||||||
|
let vote_tx = VoteTransaction::vote_new(&keypair, vote, Hash::default(), 1);
|
||||||
|
vote_txs.push(vote_tx);
|
||||||
|
let vote_entries = vec![Entry::new(&Hash::default(), 1, vote_txs)];
|
||||||
|
storage_entry_sender.send(vote_entries).unwrap();
|
||||||
|
|
||||||
|
for _ in 0..5 {
|
||||||
|
{
|
||||||
|
let keys = storage_state.storage_keys.read().unwrap();
|
||||||
|
if keys[..] != *reference_keys.as_slice() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sleep(Duration::new(1, 0));
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("joining..?");
|
||||||
|
exit.store(true, Ordering::Relaxed);
|
||||||
|
storage_stage.join().unwrap();
|
||||||
|
|
||||||
|
{
|
||||||
|
let keys = storage_state.storage_keys.read().unwrap();
|
||||||
|
assert_ne!(keys[..], *reference_keys);
|
||||||
|
}
|
||||||
|
|
||||||
|
remove_dir_all(ledger_path).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_pubkey_distribution() {
|
||||||
|
logger::setup();
|
||||||
|
// See that pub keys have an even-ish distribution..
|
||||||
|
let mut hist = [0; NUM_IDENTITIES];
|
||||||
|
for _ in 0..(128 * 256) {
|
||||||
|
let keypair = Keypair::new();
|
||||||
|
let ix = get_identity_index_from_pubkey(&keypair.pubkey());
|
||||||
|
hist[ix] += 1;
|
||||||
|
}
|
||||||
|
let mut hist_max = 0;
|
||||||
|
let mut hist_min = NUM_IDENTITIES;
|
||||||
|
for x in hist.iter() {
|
||||||
|
hist_max = max(*x, hist_max);
|
||||||
|
hist_min = min(*x, hist_min);
|
||||||
|
}
|
||||||
|
info!("min: {} max: {}", hist_min, hist_max);
|
||||||
|
assert_ne!(hist_min, 0);
|
||||||
|
}
|
||||||
|
}
|
28
src/tvu.rs
28
src/tvu.rs
|
@ -47,8 +47,10 @@ use service::Service;
|
||||||
use signature::Keypair;
|
use signature::Keypair;
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
use std::sync::mpsc::channel;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
use storage_stage::{StorageStage, StorageState};
|
||||||
use window::SharedWindow;
|
use window::SharedWindow;
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||||
|
@ -61,6 +63,7 @@ pub struct Tvu {
|
||||||
fetch_stage: BlobFetchStage,
|
fetch_stage: BlobFetchStage,
|
||||||
retransmit_stage: RetransmitStage,
|
retransmit_stage: RetransmitStage,
|
||||||
ledger_write_stage: LedgerWriteStage,
|
ledger_write_stage: LedgerWriteStage,
|
||||||
|
storage_stage: StorageStage,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -69,13 +72,15 @@ impl Tvu {
|
||||||
/// on the bank state.
|
/// on the bank state.
|
||||||
/// # Arguments
|
/// # Arguments
|
||||||
/// * `bank` - The bank state.
|
/// * `bank` - The bank state.
|
||||||
/// * `entry_height` - Initial ledger height, passed to replicate stage
|
/// * `keypair` - Node's key pair for signing
|
||||||
|
/// * `vote_account_keypair` - Vote key pair
|
||||||
|
/// * `entry_height` - Initial ledger height
|
||||||
/// * `cluster_info` - The cluster_info state.
|
/// * `cluster_info` - The cluster_info state.
|
||||||
/// * `window` - The window state.
|
/// * `window` - The window state.
|
||||||
/// * `replicate_socket` - my replicate socket
|
/// * `replicate_socket` - my replicate socket
|
||||||
/// * `repair_socket` - my repair socket
|
/// * `repair_socket` - my repair socket
|
||||||
/// * `retransmit_socket` - my retransmit socket
|
/// * `retransmit_socket` - my retransmit socket
|
||||||
/// * `exit` - The exit signal.
|
/// * `ledger_path` - path to the ledger file
|
||||||
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
|
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
|
||||||
pub fn new(
|
pub fn new(
|
||||||
keypair: Arc<Keypair>,
|
keypair: Arc<Keypair>,
|
||||||
|
@ -111,6 +116,17 @@ impl Tvu {
|
||||||
bank.leader_scheduler.clone(),
|
bank.leader_scheduler.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let (storage_entry_sender, storage_entry_receiver) = channel();
|
||||||
|
let storage_state = StorageState::new();
|
||||||
|
let storage_stage = StorageStage::new(
|
||||||
|
&storage_state,
|
||||||
|
storage_entry_receiver,
|
||||||
|
ledger_path,
|
||||||
|
keypair.clone(),
|
||||||
|
exit.clone(),
|
||||||
|
entry_height,
|
||||||
|
);
|
||||||
|
|
||||||
let (replicate_stage, ledger_entry_receiver) = ReplicateStage::new(
|
let (replicate_stage, ledger_entry_receiver) = ReplicateStage::new(
|
||||||
keypair,
|
keypair,
|
||||||
vote_account_keypair,
|
vote_account_keypair,
|
||||||
|
@ -121,13 +137,18 @@ impl Tvu {
|
||||||
entry_height,
|
entry_height,
|
||||||
);
|
);
|
||||||
|
|
||||||
let ledger_write_stage = LedgerWriteStage::new(ledger_path, ledger_entry_receiver, None);
|
let ledger_write_stage = LedgerWriteStage::new(
|
||||||
|
ledger_path,
|
||||||
|
ledger_entry_receiver,
|
||||||
|
Some(storage_entry_sender),
|
||||||
|
);
|
||||||
|
|
||||||
Tvu {
|
Tvu {
|
||||||
replicate_stage,
|
replicate_stage,
|
||||||
fetch_stage,
|
fetch_stage,
|
||||||
retransmit_stage,
|
retransmit_stage,
|
||||||
ledger_write_stage,
|
ledger_write_stage,
|
||||||
|
storage_stage,
|
||||||
exit,
|
exit,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -153,6 +174,7 @@ impl Service for Tvu {
|
||||||
self.retransmit_stage.join()?;
|
self.retransmit_stage.join()?;
|
||||||
self.fetch_stage.join()?;
|
self.fetch_stage.join()?;
|
||||||
self.ledger_write_stage.join()?;
|
self.ledger_write_stage.join()?;
|
||||||
|
self.storage_stage.join()?;
|
||||||
match self.replicate_stage.join()? {
|
match self.replicate_stage.join()? {
|
||||||
Some(ReplicateStageReturnType::LeaderRotation(
|
Some(ReplicateStageReturnType::LeaderRotation(
|
||||||
tick_height,
|
tick_height,
|
||||||
|
|
Loading…
Reference in New Issue