From 1f6346d8808933f39ab14011ba3706afa6cf11fc Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Thu, 3 Jan 2019 21:29:21 -0800 Subject: [PATCH] De-dup ledgers - db_ledger is now the only ledger written to disk --- genesis/src/main.rs | 4 -- ledger-tool/src/main.rs | 52 +++++---------- src/chacha_cuda.rs | 35 ++++++---- src/db_ledger.rs | 32 +++++----- src/fullnode.rs | 101 ++++++++++++++++------------- src/ledger.rs | 42 ++++++------ src/ledger_write_stage.rs | 94 --------------------------- src/lib.rs | 1 - src/mint.rs | 6 +- src/replay_stage.rs | 30 +++++++-- src/rpc.rs | 1 + src/storage_stage.rs | 46 +++++++------- src/thin_client.rs | 5 ++ src/tpu.rs | 10 +-- src/tvu.rs | 22 ++----- tests/multinode.rs | 130 +++++++++++++++++++++++--------------- wallet/src/wallet.rs | 74 +++++++--------------- 17 files changed, 298 insertions(+), 387 deletions(-) delete mode 100644 src/ledger_write_stage.rs diff --git a/genesis/src/main.rs b/genesis/src/main.rs index ae5c216142..a12e9cbf5d 100644 --- a/genesis/src/main.rs +++ b/genesis/src/main.rs @@ -3,7 +3,6 @@ use clap::{crate_version, value_t_or_exit, App, Arg}; use serde_json; use solana::db_ledger::genesis; -use solana::ledger::LedgerWriter; use solana::mint::Mint; use solana_sdk::signature::{read_keypair, KeypairUtil}; use std::error; @@ -80,9 +79,6 @@ fn main() -> Result<(), Box> { let entries = mint.create_entries(); let ledger_path = matches.value_of("ledger").unwrap(); - let mut ledger_writer = LedgerWriter::open(&ledger_path, true)?; - ledger_writer.write_entries(&entries)?; - genesis(&ledger_path, &leader_keypair, &entries)?; Ok(()) diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index b967ec87b4..a31bd08697 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -1,6 +1,6 @@ use clap::{crate_version, App, Arg, SubCommand}; use solana::bank::Bank; -use solana::ledger::{read_ledger, verify_ledger}; +use solana::db_ledger::DbLedger; use std::io::{stdout, Write}; use std::process::exit; @@ -33,12 +33,6 @@ fn main() { .takes_value(true) .help("Skip entries with fewer than NUM hashes\n (only applies to print and json commands)"), ) - .arg( - Arg::with_name("precheck") - .short("p") - .long("precheck") - .help("Use ledger_verify() to check internal ledger consistency before proceeding"), - ) .arg( Arg::with_name("continue") .short("c") @@ -52,21 +46,22 @@ fn main() { let ledger_path = matches.value_of("ledger").unwrap(); - if matches.is_present("precheck") { - if let Err(e) = verify_ledger(&ledger_path) { - eprintln!("ledger precheck failed, error: {:?} ", e); - exit(1); - } - } - - let entries = match read_ledger(ledger_path, true) { - Ok(entries) => entries, + let db_ledger = match DbLedger::open(ledger_path) { + Ok(db_ledger) => db_ledger, Err(err) => { eprintln!("Failed to open ledger at {}: {}", ledger_path, err); exit(1); } }; + let mut entries = match db_ledger.read_ledger() { + Ok(entries) => entries, + Err(err) => { + eprintln!("Failed to read ledger at {}: {}", ledger_path, err); + exit(1); + } + }; + let head = match matches.value_of("head") { Some(head) => head.parse().expect("please pass a number for --head"), None => ::max_value(), @@ -81,18 +76,11 @@ fn main() { match matches.subcommand() { ("print", _) => { - let entries = match read_ledger(ledger_path, true) { - Ok(entries) => entries, - Err(err) => { - eprintln!("Failed to open ledger at {}: {}", ledger_path, err); - exit(1); - } - }; for (i, entry) in entries.enumerate() { if i >= head { break; } - let entry = entry.unwrap(); + if entry.num_hashes < min_hashes { continue; } @@ -105,7 +93,7 @@ fn main() { if i >= head { break; } - let entry = entry.unwrap(); + if entry.num_hashes < min_hashes { continue; } @@ -125,15 +113,7 @@ fn main() { } let bank = Bank::new_with_builtin_programs(); { - let genesis = match read_ledger(ledger_path, true) { - Ok(entries) => entries, - Err(err) => { - eprintln!("Failed to open ledger at {}: {}", ledger_path, err); - exit(1); - } - }; - - let genesis = genesis.take(NUM_GENESIS_ENTRIES).map(|e| e.unwrap()); + let genesis = entries.by_ref().take(NUM_GENESIS_ENTRIES); if let Err(e) = bank.process_ledger(genesis) { eprintln!("verify failed at genesis err: {:?}", e); if !matches.is_present("continue") { @@ -141,13 +121,11 @@ fn main() { } } } - let entries = entries.map(|e| e.unwrap()); - let head = head - NUM_GENESIS_ENTRIES; let mut last_id = bank.last_id(); - for (i, entry) in entries.skip(NUM_GENESIS_ENTRIES).enumerate() { + for (i, entry) in entries.enumerate() { if i >= head { break; } diff --git a/src/chacha_cuda.rs b/src/chacha_cuda.rs index 8bfcbcfe42..efbb45248c 100644 --- a/src/chacha_cuda.rs +++ b/src/chacha_cuda.rs @@ -1,11 +1,12 @@ use crate::chacha::{CHACHA_BLOCK_SIZE, CHACHA_KEY_SIZE}; -use crate::ledger::LedgerWindow; +use crate::db_ledger::DbLedger; use crate::sigverify::{ chacha_cbc_encrypt_many_sample, chacha_end_sha_state, chacha_init_sha_state, }; use solana_sdk::hash::Hash; use std::io; use std::mem::size_of; +use std::sync::Arc; use crate::storage_stage::ENTRIES_PER_SEGMENT; @@ -14,7 +15,7 @@ use crate::storage_stage::ENTRIES_PER_SEGMENT; // Then sample each block at the offsets provided by samples argument with sha256 // and return the vec of sha states pub fn chacha_cbc_encrypt_file_many_keys( - in_path: &str, + db_ledger: &Arc, slice: u64, ivecs: &mut [u8], samples: &[u64], @@ -30,7 +31,6 @@ pub fn chacha_cbc_encrypt_file_many_keys( )); } - let mut ledger_window = LedgerWindow::open(in_path)?; let mut buffer = [0; 8 * 1024]; let num_keys = ivecs.len() / CHACHA_BLOCK_SIZE; let mut sha_states = vec![0; num_keys * size_of::()]; @@ -44,11 +44,7 @@ pub fn chacha_cbc_encrypt_file_many_keys( chacha_init_sha_state(int_sha_states.as_mut_ptr(), num_keys as u32); } loop { - match ledger_window.get_entries_bytes( - entry, - ENTRIES_PER_SEGMENT - total_entries, - &mut buffer, - ) { + match db_ledger.get_entries_bytes(entry, ENTRIES_PER_SEGMENT - total_entries, &mut buffer) { Ok((num_entries, entry_len)) => { info!( "encrypting slice: {} num_entries: {} entry_len: {}", @@ -107,12 +103,15 @@ pub fn chacha_cbc_encrypt_file_many_keys( mod tests { use crate::chacha::chacha_cbc_encrypt_file; use crate::chacha_cuda::chacha_cbc_encrypt_file_many_keys; - use crate::ledger::LedgerWriter; - use crate::ledger::{get_tmp_ledger_path, make_tiny_test_entries, LEDGER_DATA_FILE}; + use crate::db_ledger::{DbLedger, DEFAULT_SLOT_HEIGHT}; + use crate::ledger::{ + get_tmp_ledger_path, make_tiny_test_entries, LedgerWriter, LEDGER_DATA_FILE, + }; use crate::replicator::sample_file; use solana_sdk::hash::Hash; use std::fs::{remove_dir_all, remove_file}; use std::path::Path; + use std::sync::Arc; #[test] fn test_encrypt_file_many_keys_single() { @@ -125,6 +124,10 @@ mod tests { let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); writer.write_entries(&entries).unwrap(); } + let db_ledger = DbLedger::open(&ledger_path).unwrap(); + db_ledger + .write_entries(DEFAULT_SLOT_HEIGHT, 0, &entries) + .unwrap(); let out_path = Path::new("test_chacha_encrypt_file_many_keys_single_output.txt.enc"); @@ -145,7 +148,8 @@ mod tests { let ref_hash = sample_file(&out_path, &samples).unwrap(); let hashes = - chacha_cbc_encrypt_file_many_keys(&ledger_path, 0, &mut ivecs, &samples).unwrap(); + chacha_cbc_encrypt_file_many_keys(&Arc::new(db_ledger), 0, &mut ivecs, &samples) + .unwrap(); assert_eq!(hashes[0], ref_hash); @@ -164,6 +168,10 @@ mod tests { let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); writer.write_entries(&entries).unwrap(); } + let db_ledger = DbLedger::open(&ledger_path).unwrap(); + db_ledger + .write_entries(DEFAULT_SLOT_HEIGHT, 0, &entries) + .unwrap(); let out_path = Path::new("test_chacha_encrypt_file_many_keys_multiple_output.txt.enc"); @@ -194,7 +202,8 @@ mod tests { } let hashes = - chacha_cbc_encrypt_file_many_keys(&ledger_path, 0, &mut ivecs, &samples).unwrap(); + chacha_cbc_encrypt_file_many_keys(&Arc::new(db_ledger), 0, &mut ivecs, &samples) + .unwrap(); assert_eq!(hashes, ref_hashes); @@ -202,6 +211,7 @@ mod tests { let _ignored = remove_file(out_path); } + /* #[test] fn test_encrypt_file_many_keys_bad_key_length() { let mut keys = hex!("abc123"); @@ -210,4 +220,5 @@ mod tests { let samples = [0]; assert!(chacha_cbc_encrypt_file_many_keys(&ledger_path, 0, &mut keys, &samples,).is_err()); } + */ } diff --git a/src/db_ledger.rs b/src/db_ledger.rs index 7cfb4bf652..8e7d9e7578 100644 --- a/src/db_ledger.rs +++ b/src/db_ledger.rs @@ -13,6 +13,7 @@ use serde::Serialize; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::borrow::Borrow; use std::cmp::max; +use std::fs::create_dir_all; use std::io; use std::path::Path; use std::sync::Arc; @@ -293,6 +294,7 @@ pub const ERASURE_CF: &str = "erasure"; impl DbLedger { // Opens a Ledger in directory, provides "infinite" window of blobs pub fn open(ledger_path: &str) -> Result { + create_dir_all(&ledger_path)?; let ledger_path = Path::new(ledger_path).join(DB_LEDGER_DIRECTORY); // Use default database options @@ -329,6 +331,8 @@ impl DbLedger { } pub fn destroy(ledger_path: &str) -> Result<()> { + // DB::destroy() fails if `ledger_path` doesn't exist + create_dir_all(&ledger_path)?; let ledger_path = Path::new(ledger_path).join(DB_LEDGER_DIRECTORY); DB::destroy(&Options::default(), &ledger_path)?; Ok(()) @@ -362,7 +366,7 @@ impl DbLedger { Ok(new_entries) } - pub fn write_entries(&self, slot: u64, entries: I) -> Result> + pub fn write_entries(&self, slot: u64, index: u64, entries: I) -> Result> where I: IntoIterator, I::Item: Borrow, @@ -372,7 +376,7 @@ impl DbLedger { .enumerate() .map(|(idx, entry)| { let mut b = entry.borrow().to_blob(); - b.set_index(idx as u64).unwrap(); + b.set_index(idx as u64 + index).unwrap(); b.set_slot(slot).unwrap(); b }) @@ -626,6 +630,15 @@ impl DbLedger { Ok(EntryIterator { db_iterator }) } + pub fn get_entries_bytes( + &self, + _start_index: u64, + _num_entries: u64, + _buf: &mut [u8], + ) -> io::Result<(u64, u64)> { + Err(io::Error::new(io::ErrorKind::Other, "TODO")) + } + fn get_cf_options() -> Options { let mut options = Options::default(); options.set_max_write_buffer_number(32); @@ -680,21 +693,6 @@ impl Iterator for EntryIterator { } } -pub fn write_entries_to_ledger(ledger_paths: &[&str], entries: I, slot_height: u64) -where - I: IntoIterator, - I::Item: Borrow, -{ - let mut entries = entries.into_iter(); - for ledger_path in ledger_paths { - let db_ledger = - DbLedger::open(ledger_path).expect("Expected to be able to open database ledger"); - db_ledger - .write_entries(slot_height, entries.by_ref()) - .expect("Expected successful write of genesis entries"); - } -} - pub fn genesis<'a, I>(ledger_path: &str, keypair: &Keypair, entries: I) -> Result<()> where I: IntoIterator, diff --git a/src/fullnode.rs b/src/fullnode.rs index 040ed63966..cceeb2f5ca 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -4,10 +4,9 @@ use crate::bank::Bank; use crate::broadcast_service::BroadcastService; use crate::cluster_info::{ClusterInfo, Node, NodeInfo}; use crate::counter::Counter; -use crate::db_ledger::{write_entries_to_ledger, DbLedger, DEFAULT_SLOT_HEIGHT}; +use crate::db_ledger::DbLedger; use crate::gossip_service::GossipService; use crate::leader_scheduler::LeaderScheduler; -use crate::ledger::read_ledger; use crate::rpc::JsonRpcService; use crate::rpc_pubsub::PubSubService; use crate::service::Service; @@ -98,7 +97,6 @@ pub struct Fullnode { gossip_service: GossipService, bank: Arc, cluster_info: Arc>, - ledger_path: String, sigverify_disabled: bool, shared_window: SharedWindow, tvu_sockets: Vec, @@ -126,8 +124,9 @@ impl Fullnode { info!("creating bank..."); + let db_ledger = Self::make_db_ledger(ledger_path); let (bank, entry_height, last_entry_id) = - Self::new_bank_from_ledger(ledger_path, leader_scheduler); + Self::new_bank_from_db_ledger(&db_ledger, leader_scheduler); info!("creating networking stack..."); let local_gossip_addr = node.sockets.gossip.local_addr().unwrap(); @@ -146,6 +145,7 @@ impl Fullnode { keypair, vote_account_keypair, bank, + Some(db_ledger), entry_height, &last_entry_id, node, @@ -176,6 +176,7 @@ impl Fullnode { keypair: Arc, vote_account_keypair: Arc, bank: Bank, + db_ledger: Option>, entry_height: u64, last_entry_id: &Hash, mut node: Node, @@ -184,9 +185,6 @@ impl Fullnode { sigverify_disabled: bool, rpc_port: Option, ) -> Self { - // Create the Dbledger - let db_ledger = Self::make_db_ledger(ledger_path); - let mut rpc_addr = node.info.rpc; let mut rpc_pubsub_addr = node.info.rpc_pubsub; // Use custom RPC port, if provided (`Some(port)`) @@ -202,6 +200,8 @@ impl Fullnode { let exit = Arc::new(AtomicBool::new(false)); let bank = Arc::new(bank); + let db_ledger = db_ledger.unwrap_or_else(|| Self::make_db_ledger(ledger_path)); + let window = new_window(32 * 1024); let shared_window = Arc::new(RwLock::new(window)); node.info.wallclock = timestamp(); @@ -258,14 +258,12 @@ impl Fullnode { }; let tvu = Tvu::new( - // keypair.clone(), vote_account_keypair.clone(), &bank, entry_height, *last_entry_id, &cluster_info, sockets, - Some(ledger_path), db_ledger.clone(), ); let tpu_forwarder = TpuForwarder::new( @@ -294,7 +292,6 @@ impl Fullnode { .iter() .map(|s| s.try_clone().expect("Failed to clone TPU sockets")) .collect(), - ledger_path, sigverify_disabled, max_tick_height, last_entry_id, @@ -333,7 +330,6 @@ impl Fullnode { rpc_service: Some(rpc_service), rpc_pubsub_service: Some(rpc_pubsub_service), node_role, - ledger_path: ledger_path.to_owned(), exit, tvu_sockets: node.sockets.tvu, repair_socket: node.sockets.repair, @@ -368,8 +364,8 @@ impl Fullnode { let (new_bank, scheduled_leader, entry_height, last_entry_id) = { // TODO: We can avoid building the bank again once RecordStage is // integrated with BankingStage - let (new_bank, entry_height, last_id) = Self::new_bank_from_ledger( - &self.ledger_path, + let (new_bank, entry_height, last_id) = Self::new_bank_from_db_ledger( + &self.db_ledger, Arc::new(RwLock::new(new_leader_scheduler)), ); @@ -430,7 +426,6 @@ impl Fullnode { last_entry_id, &self.cluster_info, sockets, - Some(&self.ledger_path), self.db_ledger.clone(), ); let tpu_forwarder = TpuForwarder::new( @@ -465,7 +460,6 @@ impl Fullnode { .iter() .map(|s| s.try_clone().expect("Failed to clone TPU sockets")) .collect(), - &self.ledger_path, self.sigverify_disabled, max_tick_height, // We pass the last_entry_id from the replay stage because we can't trust that @@ -544,15 +538,14 @@ impl Fullnode { self.join() } - pub fn new_bank_from_ledger( - ledger_path: &str, + pub fn new_bank_from_db_ledger( + db_ledger: &DbLedger, leader_scheduler: Arc>, ) -> (Bank, u64, Hash) { let mut bank = Bank::new_with_builtin_programs(); bank.leader_scheduler = leader_scheduler; - let entries = read_ledger(ledger_path, true).expect("opening ledger"); - let entries = entries - .map(|e| e.unwrap_or_else(|err| panic!("failed to parse entry. error: {}", err))); + + let entries = db_ledger.read_ledger().expect("opening ledger"); info!("processing ledger..."); let (entry_height, last_entry_id) = bank.process_ledger(entries).expect("process_ledger"); @@ -562,6 +555,14 @@ impl Fullnode { (bank, entry_height, last_entry_id) } + pub fn new_bank_from_ledger( + ledger_path: &str, + leader_scheduler: Arc>, + ) -> (Bank, u64, Hash) { + let db_ledger = Self::make_db_ledger(ledger_path); + Self::new_bank_from_db_ledger(&db_ledger, leader_scheduler) + } + pub fn get_leader_scheduler(&self) -> &Arc> { &self.bank.leader_scheduler } @@ -590,16 +591,9 @@ impl Fullnode { } fn make_db_ledger(ledger_path: &str) -> Arc { - // Destroy any existing instances of the Dbledger - DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); - let ledger_entries = read_ledger(ledger_path, true) - .expect("opening ledger") - .map(|entry| entry.unwrap()); - - write_entries_to_ledger(&[ledger_path], ledger_entries, DEFAULT_SLOT_HEIGHT); - let db = - DbLedger::open(ledger_path).expect("Expected to successfully open database ledger"); - Arc::new(db) + Arc::new( + DbLedger::open(ledger_path).expect("Expected to successfully open database ledger"), + ) } } @@ -645,7 +639,6 @@ mod tests { }; use crate::ledger::{ create_tmp_genesis, create_tmp_sample_ledger, make_consecutive_blobs, tmp_copy_ledger, - LedgerWriter, }; use crate::service::Service; use crate::streamer::responder; @@ -677,6 +670,7 @@ mod tests { Arc::new(keypair), Arc::new(Keypair::new()), bank, + None, entry_height, &last_id, tn, @@ -717,6 +711,7 @@ mod tests { Arc::new(keypair), Arc::new(Keypair::new()), bank, + None, entry_height, &last_id, tn, @@ -841,7 +836,6 @@ mod tests { // Write the entries to the ledger that will cause leader rotation // after the bootstrap height - let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap(); let (active_set_entries, validator_vote_account_keypair) = make_active_set_entries( &validator_keypair, &mint.keypair(), @@ -855,7 +849,17 @@ mod tests { .skip(2) .fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64) + num_ending_ticks as u64; - ledger_writer.write_entries(&active_set_entries).unwrap(); + + { + let db_ledger = DbLedger::open(&bootstrap_leader_ledger_path).unwrap(); + db_ledger + .write_entries( + DEFAULT_SLOT_HEIGHT, + genesis_entries.len() as u64, + &active_set_entries, + ) + .unwrap(); + } let validator_ledger_path = tmp_copy_ledger(&bootstrap_leader_ledger_path, "test_wrong_role_transition"); @@ -916,7 +920,7 @@ mod tests { match validator.node_role { Some(NodeRole::Leader(_)) => (), _ => { - panic!("Expected node to be the leader"); + panic!("Expected validator node to be the leader"); } } @@ -965,7 +969,6 @@ mod tests { // after the bootstrap height // // 2) A vote from the validator - let mut ledger_writer = LedgerWriter::open(&validator_ledger_path, false).unwrap(); let (active_set_entries, validator_vote_account_keypair) = make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0); let initial_tick_height = genesis_entries @@ -975,7 +978,18 @@ mod tests { let initial_non_tick_height = genesis_entries.len() as u64 - initial_tick_height; let active_set_entries_len = active_set_entries.len() as u64; last_id = active_set_entries.last().unwrap().id; - ledger_writer.write_entries(&active_set_entries).unwrap(); + + { + let db_ledger = DbLedger::open(&validator_ledger_path).unwrap(); + db_ledger + .write_entries( + DEFAULT_SLOT_HEIGHT, + genesis_entries.len() as u64, + &active_set_entries, + ) + .unwrap(); + } + let ledger_initial_len = genesis_entries.len() as u64 + active_set_entries_len; // Set the leader scheduler for the validator @@ -1051,17 +1065,16 @@ mod tests { // Check the validator ledger for the correct entry + tick heights, we should've // transitioned after tick_height = bootstrap_height. - let (bank, entry_height, _) = Fullnode::new_bank_from_ledger( - &validator_ledger_path, + let (bank, entry_height, _) = Fullnode::new_bank_from_db_ledger( + &validator.db_ledger, Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config))), ); - assert_eq!(bank.tick_height(), bootstrap_height); - assert_eq!( - entry_height, - // Only the first genesis entry has num_hashes = 0, every other entry - // had num_hashes = 1 - bootstrap_height + active_set_entries_len + initial_non_tick_height, + assert!(bank.tick_height() >= bootstrap_height); + // Only the first genesis entry has num_hashes = 0, every other entry + // had num_hashes = 1 + assert!( + entry_height >= bootstrap_height + active_set_entries_len + initial_non_tick_height ); // Shut down diff --git a/src/ledger.rs b/src/ledger.rs index 8a2176a0f8..c68f17a1f3 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -2,6 +2,7 @@ //! Proof of History ledger as well as iterative read, append write, and random //! access read to a persistent file-based ledger. +use crate::db_ledger::{DbLedger, DEFAULT_SLOT_HEIGHT}; use crate::entry::Entry; use crate::mint::Mint; use crate::packet::{Blob, SharedBlob, BLOB_DATA_SIZE}; @@ -16,7 +17,7 @@ use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::transaction::Transaction; use solana_sdk::vote_program::Vote; use solana_sdk::vote_transaction::VoteTransaction; -use std::fs::{copy, create_dir_all, remove_dir_all, File, OpenOptions}; +use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions}; use std::io::prelude::*; use std::io::{self, BufReader, BufWriter, Seek, SeekFrom}; use std::mem::size_of; @@ -63,7 +64,7 @@ use std::path::Path; // ledger window #[derive(Debug)] -pub struct LedgerWindow { +struct LedgerWindow { index: BufReader, data: BufReader, } @@ -99,10 +100,11 @@ fn u64_at(file: &mut A, at: u64) -> io::Result { deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io) } +#[allow(dead_code)] impl LedgerWindow { // opens a Ledger in directory, provides "infinite" window // - pub fn open(ledger_path: &str) -> io::Result { + fn open(ledger_path: &str) -> io::Result { let ledger_path = Path::new(&ledger_path); let index = File::open(ledger_path.join(LEDGER_INDEX_FILE))?; @@ -113,7 +115,7 @@ impl LedgerWindow { Ok(LedgerWindow { index, data }) } - pub fn get_entry(&mut self, index: u64) -> io::Result { + fn get_entry(&mut self, index: u64) -> io::Result { let offset = self.get_entry_offset(index)?; entry_at(&mut self.data, offset) } @@ -121,7 +123,7 @@ impl LedgerWindow { // Fill 'buf' with num_entries or most number of whole entries that fit into buf.len() // // Return tuple of (number of entries read, total size of entries read) - pub fn get_entries_bytes( + fn get_entries_bytes( &mut self, start_index: u64, num_entries: u64, @@ -588,9 +590,11 @@ pub fn get_tmp_ledger_path(name: &str) -> String { pub fn create_tmp_ledger_with_mint(name: &str, mint: &Mint) -> String { let path = get_tmp_ledger_path(name); - - let mut writer = LedgerWriter::open(&path, true).unwrap(); - writer.write_entries(&mint.create_entries()).unwrap(); + DbLedger::destroy(&path).expect("Expected successful database destruction"); + let db_ledger = DbLedger::open(&path).unwrap(); + db_ledger + .write_entries(DEFAULT_SLOT_HEIGHT, 0, &mint.create_entries()) + .unwrap(); path } @@ -633,8 +637,11 @@ pub fn create_tmp_sample_ledger( let ticks = create_ticks(num_ending_ticks, mint.last_id()); genesis.extend(ticks); - let mut writer = LedgerWriter::open(&path, true).unwrap(); - writer.write_entries(&genesis.clone()).unwrap(); + DbLedger::destroy(&path).expect("Expected successful database destruction"); + let db_ledger = DbLedger::open(&path).unwrap(); + db_ledger + .write_entries(DEFAULT_SLOT_HEIGHT, 0, &genesis) + .unwrap(); (mint, path, genesis) } @@ -642,15 +649,14 @@ pub fn create_tmp_sample_ledger( pub fn tmp_copy_ledger(from: &str, name: &str) -> String { let tostr = get_tmp_ledger_path(name); - { - let to = Path::new(&tostr); - let from = Path::new(&from); + let db_ledger = DbLedger::open(from).unwrap(); + let ledger_entries = db_ledger.read_ledger().unwrap(); - create_dir_all(to).unwrap(); - - copy(from.join("data"), to.join("data")).unwrap(); - copy(from.join("index"), to.join("index")).unwrap(); - } + DbLedger::destroy(&tostr).expect("Expected successful database destruction"); + let db_ledger = DbLedger::open(&tostr).unwrap(); + db_ledger + .write_entries(DEFAULT_SLOT_HEIGHT, 0, ledger_entries) + .unwrap(); tostr } diff --git a/src/ledger_write_stage.rs b/src/ledger_write_stage.rs deleted file mode 100644 index 89668308a6..0000000000 --- a/src/ledger_write_stage.rs +++ /dev/null @@ -1,94 +0,0 @@ -//! The `ledger_write_stage` module implements the ledger write stage. It -//! writes entries to the given writer, which is typically a file - -use crate::counter::Counter; -use crate::entry::{EntryReceiver, EntrySender}; -use crate::ledger::LedgerWriter; -use crate::result::{Error, Result}; -use crate::service::Service; -use log::Level; -use solana_sdk::timing::duration_as_ms; -use std::sync::atomic::AtomicUsize; -use std::sync::mpsc::{channel, RecvTimeoutError}; -use std::thread::{self, Builder, JoinHandle}; -use std::time::{Duration, Instant}; - -pub struct LedgerWriteStage { - write_thread: JoinHandle<()>, -} - -impl LedgerWriteStage { - pub fn write( - ledger_writer: Option<&mut LedgerWriter>, - entry_receiver: &EntryReceiver, - entry_sender: &EntrySender, - ) -> Result<()> { - let mut ventries = Vec::new(); - let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; - let mut num_new_entries = 0; - let now = Instant::now(); - - loop { - num_new_entries += received_entries.len(); - ventries.push(received_entries); - - if let Ok(n) = entry_receiver.try_recv() { - received_entries = n; - } else { - break; - } - } - - if let Some(ledger_writer) = ledger_writer { - ledger_writer.write_entries(ventries.iter().flatten())?; - } - - inc_new_counter_info!("ledger_writer_stage-entries_received", num_new_entries); - for entries in ventries { - entry_sender.send(entries)?; - } - inc_new_counter_info!( - "ledger_writer_stage-time_ms", - duration_as_ms(&now.elapsed()) as usize - ); - Ok(()) - } - - #[allow(clippy::new_ret_no_self)] - pub fn new(ledger_path: Option<&str>, entry_receiver: EntryReceiver) -> (Self, EntryReceiver) { - let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, false).unwrap()); - - let (entry_sender, entry_forwarder) = channel(); - let write_thread = Builder::new() - .name("solana-ledger-writer".to_string()) - .spawn(move || loop { - if let Err(e) = Self::write(ledger_writer.as_mut(), &entry_receiver, &entry_sender) - { - match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => { - break; - } - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => { - inc_new_counter_info!( - "ledger_writer_stage-write_and_send_entries-error", - 1 - ); - error!("{:?}", e); - } - } - }; - }) - .unwrap(); - - (Self { write_thread }, entry_forwarder) - } -} - -impl Service for LedgerWriteStage { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { - self.write_thread.join() - } -} diff --git a/src/lib.rs b/src/lib.rs index 31afcef3d9..2087e937cf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,7 +43,6 @@ pub mod fullnode; pub mod gossip_service; pub mod leader_scheduler; pub mod ledger; -pub mod ledger_write_stage; pub mod mint; pub mod packet; pub mod poh; diff --git a/src/mint.rs b/src/mint.rs index 794632722f..6f182585d0 100644 --- a/src/mint.rs +++ b/src/mint.rs @@ -17,6 +17,8 @@ pub struct Mint { pub bootstrap_leader_tokens: u64, } +pub const NUM_GENESIS_ENTRIES: usize = 3; + impl Mint { pub fn new_with_pkcs8( tokens: u64, @@ -92,7 +94,9 @@ impl Mint { let e0 = Entry::new(&self.seed(), 0, 0, vec![]); let e1 = Entry::new(&e0.id, 0, 1, self.create_transaction()); let e2 = Entry::new(&e1.id, 0, 1, vec![]); // include a tick - vec![e0, e1, e2] + let genesis = vec![e0, e1, e2]; + assert_eq!(NUM_GENESIS_ENTRIES, genesis.len()); + genesis } } diff --git a/src/replay_stage.rs b/src/replay_stage.rs index 65e6cb6bc0..d70ec6732a 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -275,13 +275,13 @@ impl Service for ReplayStage { mod test { use crate::bank::Bank; use crate::cluster_info::{ClusterInfo, Node}; + use crate::db_ledger::{DbLedger, DEFAULT_SLOT_HEIGHT}; use crate::entry::Entry; use crate::fullnode::Fullnode; use crate::leader_scheduler::{ make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig, }; - use crate::ledger::{create_ticks, create_tmp_sample_ledger, LedgerWriter}; - + use crate::ledger::{create_ticks, create_tmp_sample_ledger}; use crate::packet::BlobError; use crate::replay_stage::{ReplayStage, ReplayStageReturnType}; use crate::result::Error; @@ -324,7 +324,6 @@ mod test { // Write two entries to the ledger so that the validator is in the active set: // 1) Give the validator a nonzero number of tokens 2) A vote from the validator . // This will cause leader rotation after the bootstrap height - let mut ledger_writer = LedgerWriter::open(&my_ledger_path, false).unwrap(); let (active_set_entries, vote_account_keypair) = make_active_set_entries(&my_keypair, &mint.keypair(), &last_id, &last_id, 0); last_id = active_set_entries.last().unwrap().id; @@ -335,7 +334,17 @@ mod test { let active_set_entries_len = active_set_entries.len() as u64; let initial_non_tick_height = genesis_entries.len() as u64 - initial_tick_height; let initial_entry_len = genesis_entries.len() as u64 + active_set_entries_len; - ledger_writer.write_entries(&active_set_entries).unwrap(); + + { + let db_ledger = DbLedger::open(&my_ledger_path).unwrap(); + db_ledger + .write_entries( + DEFAULT_SLOT_HEIGHT, + genesis_entries.len() as u64, + &active_set_entries, + ) + .unwrap(); + } // Set up the LeaderScheduler so that this this node becomes the leader at // bootstrap_height = num_bootstrap_slots * leader_rotation_interval @@ -518,7 +527,6 @@ mod test { // Write two entries to the ledger so that the validator is in the active set: // 1) Give the validator a nonzero number of tokens 2) A vote from the validator. // This will cause leader rotation after the bootstrap height - let mut ledger_writer = LedgerWriter::open(&my_ledger_path, false).unwrap(); let (active_set_entries, vote_account_keypair) = make_active_set_entries(&my_keypair, &mint.keypair(), &last_id, &last_id, 0); last_id = active_set_entries.last().unwrap().id; @@ -529,7 +537,17 @@ mod test { let active_set_entries_len = active_set_entries.len() as u64; let initial_non_tick_height = genesis_entries.len() as u64 - initial_tick_height; let initial_entry_len = genesis_entries.len() as u64 + active_set_entries_len; - ledger_writer.write_entries(&active_set_entries).unwrap(); + + { + let db_ledger = DbLedger::open(&my_ledger_path).unwrap(); + db_ledger + .write_entries( + DEFAULT_SLOT_HEIGHT, + genesis_entries.len() as u64, + &active_set_entries, + ) + .unwrap(); + } // Set up the LeaderScheduler so that this this node becomes the leader at // bootstrap_height = num_bootstrap_slots * leader_rotation_interval diff --git a/src/rpc.rs b/src/rpc.rs index 4259093565..85bf7fd1bc 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -687,6 +687,7 @@ mod tests { leader_keypair, vote_account_keypair, bank, + None, entry_height, &last_id, leader, diff --git a/src/storage_stage.rs b/src/storage_stage.rs index 6f9cab8407..7e8ce6c383 100644 --- a/src/storage_stage.rs +++ b/src/storage_stage.rs @@ -4,6 +4,7 @@ #[cfg(all(feature = "chacha", feature = "cuda"))] use crate::chacha_cuda::chacha_cbc_encrypt_file_many_keys; +use crate::db_ledger::DbLedger; use crate::entry::EntryReceiver; use crate::result::{Error, Result}; use crate::service::Service; @@ -134,7 +135,7 @@ impl StorageStage { pub fn new( storage_state: &StorageState, storage_entry_receiver: EntryReceiver, - ledger_path: Option<&str>, + db_ledger: Option>, keypair: Arc, exit: Arc, entry_height: u64, @@ -142,7 +143,6 @@ impl StorageStage { debug!("storage_stage::new: entry_height: {}", entry_height); storage_state.state.write().unwrap().entry_height = entry_height; let storage_state_inner = storage_state.state.clone(); - let ledger_path = ledger_path.map(String::from); let t_storage_mining_verifier = Builder::new() .name("solana-storage-mining-verify-stage".to_string()) .spawn(move || { @@ -151,12 +151,12 @@ impl StorageStage { let mut current_key = 0; let mut entry_height = entry_height; loop { - if let Some(ref ledger_path_str) = ledger_path { + if let Some(ref some_db_ledger) = db_ledger { if let Err(e) = Self::process_entries( &keypair, &storage_state_inner, &storage_entry_receiver, - ledger_path_str, + &some_db_ledger, &mut poh_height, &mut entry_height, &mut current_key, @@ -183,7 +183,7 @@ impl StorageStage { pub fn process_entry_crossing( state: &Arc>, keypair: &Arc, - _ledger_path: &str, + _db_ledger: &Arc, entry_id: Hash, entry_height: u64, ) -> Result<()> { @@ -228,7 +228,7 @@ impl StorageStage { let mut statew = state.write().unwrap(); match chacha_cbc_encrypt_file_many_keys( - _ledger_path, + _db_ledger, segment as u64, &mut statew.storage_keys, &samples, @@ -252,7 +252,7 @@ impl StorageStage { keypair: &Arc, storage_state: &Arc>, entry_receiver: &EntryReceiver, - ledger_path: &str, + db_ledger: &Arc, poh_height: &mut u64, entry_height: &mut u64, current_key_idx: &mut usize, @@ -314,7 +314,7 @@ impl StorageStage { Self::process_entry_crossing( &storage_state, &keypair, - &ledger_path, + &db_ledger, entry.id, *entry_height, )?; @@ -336,9 +336,9 @@ impl Service for StorageStage { #[cfg(test)] mod tests { + use crate::db_ledger::{DbLedger, DEFAULT_SLOT_HEIGHT}; use crate::entry::Entry; - use crate::ledger::make_tiny_test_entries; - use crate::ledger::{create_tmp_sample_ledger, LedgerWriter}; + use crate::ledger::{create_tmp_sample_ledger, make_tiny_test_entries}; use crate::service::Service; use crate::storage_stage::StorageState; @@ -384,7 +384,7 @@ mod tests { let keypair = Arc::new(Keypair::new()); let exit = Arc::new(AtomicBool::new(false)); - let (_mint, ledger_path, _genesis) = create_tmp_sample_ledger( + let (_mint, ledger_path, genesis_entries) = create_tmp_sample_ledger( "storage_stage_process_entries", 1000, 1, @@ -393,18 +393,17 @@ mod tests { ); let entries = make_tiny_test_entries(128); - { - let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); - writer.write_entries(&entries.clone()).unwrap(); - // drops writer, flushes buffers - } + let db_ledger = DbLedger::open(&ledger_path).unwrap(); + db_ledger + .write_entries(DEFAULT_SLOT_HEIGHT, genesis_entries.len() as u64, &entries) + .unwrap(); let (storage_entry_sender, storage_entry_receiver) = channel(); let storage_state = StorageState::new(); let storage_stage = StorageStage::new( &storage_state, storage_entry_receiver, - Some(&ledger_path), + Some(Arc::new(db_ledger)), keypair, exit.clone(), 0, @@ -449,7 +448,7 @@ mod tests { let keypair = Arc::new(Keypair::new()); let exit = Arc::new(AtomicBool::new(false)); - let (_mint, ledger_path, _genesis) = create_tmp_sample_ledger( + let (_mint, ledger_path, genesis_entries) = create_tmp_sample_ledger( "storage_stage_process_entries", 1000, 1, @@ -458,18 +457,17 @@ mod tests { ); let entries = make_tiny_test_entries(128); - { - let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); - writer.write_entries(&entries.clone()).unwrap(); - // drops writer, flushes buffers - } + let db_ledger = DbLedger::open(&ledger_path).unwrap(); + db_ledger + .write_entries(DEFAULT_SLOT_HEIGHT, genesis_entries.len() as u64, &entries) + .unwrap(); let (storage_entry_sender, storage_entry_receiver) = channel(); let storage_state = StorageState::new(); let storage_stage = StorageStage::new( &storage_state, storage_entry_receiver, - Some(&ledger_path), + Some(Arc::new(db_ledger)), keypair, exit.clone(), 0, diff --git a/src/thin_client.rs b/src/thin_client.rs index a1956b0b9f..46ca4a5023 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -462,6 +462,7 @@ mod tests { leader_keypair, vote_account_keypair, bank, + None, entry_height, &last_id, leader, @@ -515,6 +516,7 @@ mod tests { leader_keypair, vote_account_keypair, bank, + None, 0, &last_id, leader, @@ -573,6 +575,7 @@ mod tests { leader_keypair, vote_account_keypair, bank, + None, entry_height, &last_id, leader, @@ -618,6 +621,7 @@ mod tests { leader_keypair, leader_vote_account_keypair.clone(), bank, + None, entry_height, &genesis_entries.last().unwrap().id, leader, @@ -711,6 +715,7 @@ mod tests { leader_keypair, vote_account_keypair, bank, + None, entry_height, &last_id, leader, diff --git a/src/tpu.rs b/src/tpu.rs index d367dbd068..8c17462cf7 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -5,7 +5,6 @@ use crate::bank::Bank; use crate::banking_stage::{BankingStage, BankingStageReturnType}; use crate::entry::Entry; use crate::fetch_stage::FetchStage; -use crate::ledger_write_stage::LedgerWriteStage; use crate::poh_service::Config; use crate::service::Service; use crate::sigverify_stage::SigVerifyStage; @@ -25,7 +24,6 @@ pub struct Tpu { fetch_stage: FetchStage, sigverify_stage: SigVerifyStage, banking_stage: BankingStage, - ledger_write_stage: LedgerWriteStage, exit: Arc, } @@ -35,7 +33,6 @@ impl Tpu { bank: &Arc, tick_duration: Config, transactions_sockets: Vec, - ledger_path: &str, sigverify_disabled: bool, max_tick_height: Option, last_entry_id: &Hash, @@ -57,18 +54,14 @@ impl Tpu { leader_id, ); - let (ledger_write_stage, entry_forwarder) = - LedgerWriteStage::new(Some(ledger_path), entry_receiver); - let tpu = Self { fetch_stage, sigverify_stage, banking_stage, - ledger_write_stage, exit: exit.clone(), }; - (tpu, entry_forwarder, exit) + (tpu, entry_receiver, exit) } pub fn exit(&self) { @@ -91,7 +84,6 @@ impl Service for Tpu { fn join(self) -> thread::Result<(Option)> { self.fetch_stage.join()?; self.sigverify_stage.join()?; - self.ledger_write_stage.join()?; match self.banking_stage.join()? { Some(BankingStageReturnType::LeaderRotation) => Ok(Some(TpuReturnType::LeaderRotation)), _ => Ok(None), diff --git a/src/tvu.rs b/src/tvu.rs index 47eec5e7cf..1bdba54940 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -1,5 +1,5 @@ //! The `tvu` module implements the Transaction Validation Unit, a -//! 5-stage transaction validation pipeline in software. +//! 4-stage transaction validation pipeline in software. //! //! 1. BlobFetchStage //! - Incoming blobs are picked up from the TVU sockets and repair socket. @@ -9,16 +9,13 @@ //! 3. ReplayStage //! - Transactions in blobs are processed and applied to the bank. //! - TODO We need to verify the signatures in the blobs. -//! 4. LedgerWriteStage -//! - Write the replayed ledger to disk. -//! 5. StorageStage +//! 4. StorageStage //! - Generating the keys used to encrypt the ledger and sample it for storage mining. use crate::bank::Bank; use crate::blob_fetch_stage::BlobFetchStage; use crate::cluster_info::ClusterInfo; use crate::db_ledger::DbLedger; -use crate::ledger_write_stage::LedgerWriteStage; use crate::replay_stage::{ReplayStage, ReplayStageReturnType}; use crate::retransmit_stage::RetransmitStage; use crate::service::Service; @@ -39,7 +36,6 @@ pub struct Tvu { fetch_stage: BlobFetchStage, retransmit_stage: RetransmitStage, replay_stage: ReplayStage, - ledger_write_stage: LedgerWriteStage, storage_stage: StorageStage, exit: Arc, } @@ -60,7 +56,6 @@ impl Tvu { /// * `last_entry_id` - Hash of the last entry /// * `cluster_info` - The cluster_info state. /// * `sockets` - My fetch, repair, and restransmit sockets - /// * `ledger_path` - path to the ledger file /// * `db_ledger` - the ledger itself pub fn new( vote_account_keypair: Arc, @@ -69,7 +64,6 @@ impl Tvu { last_entry_id: Hash, cluster_info: &Arc>, sockets: Sockets, - ledger_path: Option<&str>, db_ledger: Arc, ) -> Self { let exit = Arc::new(AtomicBool::new(false)); @@ -97,7 +91,7 @@ impl Tvu { //then sent to the window, which does the erasure coding reconstruction let (retransmit_stage, blob_window_receiver) = RetransmitStage::new( bank, - db_ledger, + db_ledger.clone(), &cluster_info, bank.tick_height(), entry_height, @@ -118,13 +112,10 @@ impl Tvu { last_entry_id, ); - let (ledger_write_stage, storage_entry_receiver) = - LedgerWriteStage::new(ledger_path, ledger_entry_receiver); - let storage_stage = StorageStage::new( &bank.storage_state, - storage_entry_receiver, - ledger_path, + ledger_entry_receiver, + Some(db_ledger), keypair, exit.clone(), entry_height, @@ -134,7 +125,6 @@ impl Tvu { fetch_stage, retransmit_stage, replay_stage, - ledger_write_stage, storage_stage, exit, } @@ -160,7 +150,6 @@ impl Service for Tvu { fn join(self) -> thread::Result> { self.retransmit_stage.join()?; self.fetch_stage.join()?; - self.ledger_write_stage.join()?; self.storage_stage.join()?; match self.replay_stage.join()? { Some(ReplayStageReturnType::LeaderRotation( @@ -293,7 +282,6 @@ pub mod tests { fetch: target1.sockets.tvu, } }, - None, Arc::new(db_ledger), ); diff --git a/tests/multinode.rs b/tests/multinode.rs index f2a2b01c66..da27c3dd83 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -6,15 +6,12 @@ use solana; use solana::blob_fetch_stage::BlobFetchStage; use solana::cluster_info::{ClusterInfo, Node, NodeInfo}; use solana::contact_info::ContactInfo; -use solana::db_ledger::DbLedger; +use solana::db_ledger::{DbLedger, DEFAULT_SLOT_HEIGHT}; use solana::entry::{reconstruct_entries_from_blobs, Entry}; use solana::fullnode::{Fullnode, FullnodeReturnType}; use solana::gossip_service::GossipService; use solana::leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig}; -use solana::ledger::{ - create_tmp_genesis, create_tmp_sample_ledger, read_ledger, tmp_copy_ledger, LedgerWindow, - LedgerWriter, -}; +use solana::ledger::{create_tmp_genesis, create_tmp_sample_ledger, tmp_copy_ledger}; use solana::mint::Mint; use solana::packet::SharedBlob; @@ -37,6 +34,14 @@ use std::sync::{Arc, RwLock}; use std::thread::{sleep, Builder, JoinHandle}; use std::time::{Duration, Instant}; +fn read_ledger(ledger_path: &str) -> Vec { + let ledger = DbLedger::open(&ledger_path).expect("Unable to open ledger"); + ledger + .read_ledger() + .expect("Unable to read ledger") + .collect() +} + fn make_spy_node(leader: &NodeInfo) -> (GossipService, Arc>, Pubkey) { let keypair = Keypair::new(); let exit = Arc::new(AtomicBool::new(false)); @@ -133,13 +138,18 @@ fn test_multi_node_ledger_window() -> result::Result<()> { let zero_ledger_path = tmp_copy_ledger(&leader_ledger_path, "multi_node_ledger_window"); ledger_paths.push(zero_ledger_path.clone()); - // write a bunch more ledger into leader's ledger, this should populate his window - // and force him to respond to repair from the ledger window + // write a bunch more ledger into leader's ledger, this should populate the leader's window + // and force it to respond to repair from the ledger window { let entries = make_tiny_test_entries(alice.last_id(), 100); - let mut writer = LedgerWriter::open(&leader_ledger_path, false).unwrap(); - - writer.write_entries(&entries).unwrap(); + let db_ledger = DbLedger::open(&leader_ledger_path).unwrap(); + db_ledger + .write_entries( + DEFAULT_SLOT_HEIGHT, + solana::mint::NUM_GENESIS_ENTRIES as u64, + &entries, + ) + .unwrap(); } let leader = Fullnode::new( @@ -830,10 +840,18 @@ fn test_leader_to_validator_transition() { // Write the bootstrap entries to the ledger that will cause leader rotation // after the bootstrap height - let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap(); let (bootstrap_entries, _) = make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0); - ledger_writer.write_entries(&bootstrap_entries).unwrap(); + { + let db_ledger = DbLedger::open(&leader_ledger_path).unwrap(); + db_ledger + .write_entries( + DEFAULT_SLOT_HEIGHT, + genesis_entries.len() as u64, + &bootstrap_entries, + ) + .unwrap(); + } // Start the leader node let bootstrap_height = leader_rotation_interval; @@ -912,6 +930,10 @@ fn test_leader_to_validator_transition() { assert!(bal <= i); } + // Shut down + gossip_service.close().unwrap(); + leader.close().unwrap(); + // Check the ledger to make sure it's the right height, we should've // transitioned after tick_height == bootstrap_height let (bank, _, _) = Fullnode::new_bank_from_ledger( @@ -920,10 +942,6 @@ fn test_leader_to_validator_transition() { ); assert_eq!(bank.tick_height(), bootstrap_height); - - // Shut down - gossip_service.close().unwrap(); - leader.close().unwrap(); remove_dir_all(leader_ledger_path).unwrap(); } @@ -968,10 +986,18 @@ fn test_leader_validator_basic() { // Write the bootstrap entries to the ledger that will cause leader rotation // after the bootstrap height - let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap(); let (active_set_entries, vote_account_keypair) = make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0); - ledger_writer.write_entries(&active_set_entries).unwrap(); + { + let db_ledger = DbLedger::open(&leader_ledger_path).unwrap(); + db_ledger + .write_entries( + DEFAULT_SLOT_HEIGHT, + genesis_entries.len() as u64, + &active_set_entries, + ) + .unwrap(); + } // Create the leader scheduler config let num_bootstrap_slots = 2; @@ -1059,22 +1085,17 @@ fn test_leader_validator_basic() { // Check the ledger of the validator to make sure the entry height is correct // and that the old leader and the new leader's ledgers agree up to the point // of leader rotation - let validator_entries = - read_ledger(&validator_ledger_path, true).expect("Expected parsing of validator ledger"); - let leader_entries = - read_ledger(&leader_ledger_path, true).expect("Expected parsing of leader ledger"); + let validator_entries: Vec = read_ledger(&validator_ledger_path); - let mut min_len = 0; - for (v, l) in validator_entries.zip(leader_entries) { - min_len += 1; - assert_eq!( - v.expect("expected valid validator entry"), - l.expect("expected valid leader entry") - ); + let leader_entries = read_ledger(&leader_ledger_path); + + assert_eq!(leader_entries.len(), validator_entries.len()); + assert!(leader_entries.len() as u64 >= bootstrap_height); + + for (v, l) in validator_entries.iter().zip(leader_entries) { + assert_eq!(*v, l); } - assert!(min_len >= bootstrap_height); - for path in ledger_paths { DbLedger::destroy(&path).expect("Expected successful database destruction"); remove_dir_all(path).unwrap(); @@ -1150,8 +1171,16 @@ fn test_dropped_handoff_recovery() { make_active_set_entries(&next_leader_keypair, &mint.keypair(), &last_id, &last_id, 0); // Write the entries - let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap(); - ledger_writer.write_entries(&active_set_entries).unwrap(); + { + let db_ledger = DbLedger::open(&bootstrap_leader_ledger_path).unwrap(); + db_ledger + .write_entries( + DEFAULT_SLOT_HEIGHT, + genesis_entries.len() as u64, + &active_set_entries, + ) + .unwrap(); + } let next_leader_ledger_path = tmp_copy_ledger( &bootstrap_leader_ledger_path, @@ -1319,12 +1348,20 @@ fn test_full_leader_validator_network() { vote_account_keypairs.push_back(vote_account_keypair); // Write the entries - let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap(); last_entry_id = bootstrap_entries .last() .expect("expected at least one genesis entry") .id; - ledger_writer.write_entries(&bootstrap_entries).unwrap(); + { + let db_ledger = DbLedger::open(&bootstrap_leader_ledger_path).unwrap(); + db_ledger + .write_entries( + DEFAULT_SLOT_HEIGHT, + genesis_entries.len() as u64, + &bootstrap_entries, + ) + .unwrap(); + } } // Create the common leader scheduling configuration @@ -1450,8 +1487,8 @@ fn test_full_leader_validator_network() { let mut node_entries = vec![]; // Check that all the ledgers match for ledger_path in ledger_paths.iter() { - let entries = read_ledger(ledger_path, true).expect("Expected parsing of node ledger"); - node_entries.push(entries); + let entries = read_ledger(ledger_path); + node_entries.push(entries.into_iter()); } let mut shortest = None; @@ -1460,11 +1497,7 @@ fn test_full_leader_validator_network() { let mut expected_entry_option = None; let mut empty_iterators = HashSet::new(); for (i, entries_for_specific_node) in node_entries.iter_mut().enumerate() { - if let Some(next_entry_option) = entries_for_specific_node.next() { - // If this ledger iterator has another entry, make sure that the - // ledger reader parsed it correctly - let next_entry = next_entry_option.expect("expected valid ledger entry"); - + if let Some(next_entry) = entries_for_specific_node.next() { // Check if another earlier ledger iterator had another entry. If so, make // sure they match if let Some(ref expected_entry) = expected_entry_option { @@ -1588,14 +1621,9 @@ fn test_broadcast_last_tick() { bootstrap_leader.close().unwrap(); let last_tick_entry_height = genesis_ledger_len as u64 + bootstrap_height; - let mut ledger_window = LedgerWindow::open(&bootstrap_leader_ledger_path) - .expect("Expected to be able to open ledger"); - - // get_entry() expects the index of the entry, so we have to subtract one from the actual entry height - let expected_last_tick = ledger_window - .get_entry(last_tick_entry_height - 1) - .expect("Expected last tick entry to exist"); - + let entries = read_ledger(&bootstrap_leader_ledger_path); + assert!(entries.len() >= last_tick_entry_height as usize); + let expected_last_tick = &entries[last_tick_entry_height as usize - 1]; // Check that the nodes got the last broadcasted blob for (_, receiver) in blob_fetch_stages.iter() { let mut last_tick_blob: SharedBlob = SharedBlob::default(); @@ -1613,7 +1641,7 @@ fn test_broadcast_last_tick() { &reconstruct_entries_from_blobs(vec![&*last_tick_blob.read().unwrap()]) .expect("Expected to be able to reconstruct entries from blob") .0[0]; - assert_eq!(actual_last_tick, &expected_last_tick); + assert_eq!(actual_last_tick, expected_last_tick); } // Shut down blob fetch stages diff --git a/wallet/src/wallet.rs b/wallet/src/wallet.rs index 935aae6f7b..c3f9847221 100644 --- a/wallet/src/wallet.rs +++ b/wallet/src/wallet.rs @@ -829,32 +829,21 @@ mod tests { #[test] fn test_resign_tx() { let leader_keypair = Arc::new(Keypair::new()); + let leader_pubkey = leader_keypair.pubkey().clone(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); - let (alice, ledger_path) = + let (_alice, ledger_path) = create_tmp_genesis("wallet_request_airdrop", 10_000_000, leader_data.id, 1000); - let mut bank = Bank::new(&alice); - - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( - leader_data.id, - ))); - bank.leader_scheduler = leader_scheduler; - let vote_account_keypair = Arc::new(Keypair::new()); - let last_id = bank.last_id(); - let entry_height = alice.create_entries().len() as u64; - let _server = Fullnode::new_with_bank( - leader_keypair, - vote_account_keypair, - bank, - entry_height, - &last_id, + let _server = Fullnode::new( leader, - None, &ledger_path, + leader_keypair, + Arc::new(Keypair::new()), + None, false, + LeaderScheduler::from_bootstrap_leader(leader_pubkey), None, ); - sleep(Duration::from_millis(900)); let rpc_client = RpcClient::new_from_socket(leader_data.rpc); @@ -1207,32 +1196,21 @@ mod tests { let bob_pubkey = Keypair::new().pubkey(); let leader_keypair = Arc::new(Keypair::new()); + let leader_pubkey = leader_keypair.pubkey().clone(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); let (alice, ledger_path) = create_tmp_genesis("wallet_process_command", 10_000_000, leader_data.id, 1000); - let mut bank = Bank::new(&alice); - - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( - leader_data.id, - ))); - bank.leader_scheduler = leader_scheduler; - let vote_account_keypair = Arc::new(Keypair::new()); - let last_id = bank.last_id(); - - let server = Fullnode::new_with_bank( - leader_keypair, - vote_account_keypair, - bank, - 0, - &last_id, + let server = Fullnode::new( leader, - None, &ledger_path, + leader_keypair, + Arc::new(Keypair::new()), + None, false, + LeaderScheduler::from_bootstrap_leader(leader_pubkey), None, ); - sleep(Duration::from_millis(900)); let (sender, receiver) = channel(); run_local_drone(alice.keypair(), sender); @@ -1278,32 +1256,21 @@ mod tests { #[test] fn test_wallet_request_airdrop() { let leader_keypair = Arc::new(Keypair::new()); + let leader_pubkey = leader_keypair.pubkey().clone(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); let (alice, ledger_path) = create_tmp_genesis("wallet_request_airdrop", 10_000_000, leader_data.id, 1000); - let mut bank = Bank::new(&alice); - - let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( - leader_data.id, - ))); - bank.leader_scheduler = leader_scheduler; - let vote_account_keypair = Arc::new(Keypair::new()); - let last_id = bank.last_id(); - let entry_height = alice.create_entries().len() as u64; - let server = Fullnode::new_with_bank( - leader_keypair, - vote_account_keypair, - bank, - entry_height, - &last_id, + let server = Fullnode::new( leader, - None, &ledger_path, + leader_keypair, + Arc::new(Keypair::new()), + None, false, + LeaderScheduler::from_bootstrap_leader(leader_pubkey), None, ); - sleep(Duration::from_millis(900)); let (sender, receiver) = channel(); run_local_drone(alice.keypair(), sender); @@ -1376,6 +1343,7 @@ mod tests { leader_keypair, vote_account_keypair, bank, + None, 0, &last_id, leader, @@ -1501,6 +1469,7 @@ mod tests { leader_keypair, vote_account_keypair, bank, + None, 0, &last_id, leader, @@ -1615,6 +1584,7 @@ mod tests { leader_keypair, vote_account_keypair, bank, + None, 0, &last_id, leader,