diff --git a/src/db_ledger.rs b/src/db_ledger.rs index 701288050..3d0c7ab43 100644 --- a/src/db_ledger.rs +++ b/src/db_ledger.rs @@ -5,13 +5,15 @@ use bincode::{deserialize, serialize}; use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; use entry::Entry; -use ledger::Block; use packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; use result::{Error, Result}; use rocksdb::{ColumnFamily, Options, WriteBatch, DB}; use serde::de::DeserializeOwned; use serde::Serialize; +use solana_sdk::pubkey::Pubkey; +use std::borrow::Borrow; use std::io; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; pub const DB_LEDGER_DIRECTORY: &str = "db_ledger"; @@ -232,6 +234,8 @@ pub const ERASURE_CF: &str = "erasure"; impl DbLedger { // Opens a Ledger in directory, provides "infinite" window of blobs pub fn open(ledger_path: &str) -> Result { + let ledger_path = format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY); + // Use default database options let mut options = Options::default(); options.create_if_missing(true); @@ -260,10 +264,25 @@ impl DbLedger { }) } - pub fn write_shared_blobs(&mut self, slot: u64, shared_blobs: &[SharedBlob]) -> Result<()> { - let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); - let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); - self.write_blobs(slot, &blobs) + pub fn destroy(ledger_path: &str) -> Result<()> { + let ledger_path = format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY); + DB::destroy(&Options::default(), &ledger_path)?; + Ok(()) + } + + pub fn write_shared_blobs(&mut self, slot: u64, shared_blobs: I) -> Result<()> + where + I: IntoIterator, + I::Item: Borrow, + { + for b in shared_blobs { + let bl = b.borrow().read().unwrap(); + let index = bl.index()?; + let key = DataCf::key(slot, index); + self.insert_data_blob(&key, &*bl)?; + } + + Ok(()) } pub fn write_blobs<'a, I>(&mut self, slot: u64, blobs: I) -> Result<()> @@ -278,12 +297,20 @@ impl DbLedger { Ok(()) } - pub fn write_entries(&mut self, slot: u64, entries: &[Entry]) -> Result<()> { - let shared_blobs = entries.to_blobs(); - let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); - let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); - self.write_blobs(slot, &blobs)?; - Ok(()) + pub fn write_entries(&mut self, slot: u64, entries: I) -> Result<()> + where + I: IntoIterator, + I::Item: Borrow, + { + let default_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); + let shared_blobs = entries.into_iter().enumerate().map(|(idx, entry)| { + entry.borrow().to_blob( + Some(idx as u64), + Some(Pubkey::default()), + Some(&default_addr), + ) + }); + self.write_shared_blobs(slot, shared_blobs) } pub fn insert_data_blob(&self, key: &[u8], new_blob: &Blob) -> Result> { @@ -421,12 +448,17 @@ impl DbLedger { } } -pub fn write_entries_to_ledger(ledger_paths: &[String], entries: &[Entry]) { +pub fn write_entries_to_ledger(ledger_paths: &[&str], entries: I) +where + I: IntoIterator, + I::Item: Borrow, +{ + let mut entries = entries.into_iter(); for ledger_path in ledger_paths { let mut db_ledger = DbLedger::open(ledger_path).expect("Expected to be able to open database ledger"); db_ledger - .write_entries(DEFAULT_SLOT_HEIGHT, &entries) + .write_entries(DEFAULT_SLOT_HEIGHT, entries.by_ref()) .expect("Expected successful write of genesis entries"); } } @@ -435,7 +467,6 @@ pub fn write_entries_to_ledger(ledger_paths: &[String], entries: &[Entry]) { mod tests { use super::*; use ledger::{get_tmp_ledger_path, make_tiny_test_entries, Block}; - use rocksdb::{Options, DB}; #[test] fn test_put_get_simple() { @@ -485,8 +516,7 @@ mod tests { // Destroying database without closing it first is undefined behavior drop(ledger); - DB::destroy(&Options::default(), &ledger_path) - .expect("Expected successful database destruction"); + DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); } #[test] @@ -548,8 +578,7 @@ mod tests { // Destroying database without closing it first is undefined behavior drop(ledger); - DB::destroy(&Options::default(), &ledger_path) - .expect("Expected successful database destruction"); + DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); } #[test] @@ -591,8 +620,7 @@ mod tests { // Destroying database without closing it first is undefined behavior drop(ledger); - DB::destroy(&Options::default(), &ledger_path) - .expect("Expected successful database destruction"); + DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); } #[test] @@ -628,8 +656,7 @@ mod tests { // Destroying database without closing it first is undefined behavior drop(ledger); - DB::destroy(&Options::default(), &ledger_path) - .expect("Expected successful database destruction"); + DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); } #[test] @@ -644,7 +671,7 @@ mod tests { let num_entries = 8; let shared_blobs = make_tiny_test_entries(num_entries).to_blobs(); - for (b, i) in shared_blobs.iter().zip(0..num_entries) { + for (i, b) in shared_blobs.iter().enumerate() { b.write().unwrap().set_index(1 << (i * 8)).unwrap(); } @@ -668,7 +695,6 @@ mod tests { db_iterator.next(); } } - DB::destroy(&Options::default(), &db_ledger_path) - .expect("Expected successful database destruction"); + DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction"); } } diff --git a/src/db_window.rs b/src/db_window.rs index 298a0a499..413fe67f0 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -218,9 +218,10 @@ pub fn retransmit_all_leader_blocks( for b in dq { // Check if the blob is from the scheduled leader for its slot. If so, // add to the retransmit_queue - let slot = b.read().unwrap().slot()?; - if let Some(leader_id) = leader_scheduler.get_leader_for_slot(slot) { - add_blob_to_retransmit_queue(b, leader_id, &mut retransmit_queue); + if let Ok(slot) = b.read().unwrap().slot() { + if let Some(leader_id) = leader_scheduler.get_leader_for_slot(slot) { + add_blob_to_retransmit_queue(b, leader_id, &mut retransmit_queue); + } } } @@ -273,6 +274,9 @@ pub fn process_blob( let is_coding = blob.read().unwrap().is_coding(); // Check if the blob is in the range of our known leaders. If not, we return. + // TODO: Need to update slot in broadcast, otherwise this check will fail with + // leader rotation enabled + // Github issue: https://github.com/solana-labs/solana/issues/1899. let slot = blob.read().unwrap().slot()?; let leader = leader_scheduler.get_leader_for_slot(slot); @@ -292,12 +296,11 @@ pub fn process_blob( )?; vec![] } else { - let data_key = ErasureCf::key(slot, pix); + let data_key = DataCf::key(slot, pix); db_ledger.insert_data_blob(&data_key, &blob.read().unwrap())? }; // TODO: Once erasure is fixed, readd that logic here - for entry in &consumed_entries { *tick_height += entry.is_tick() as u64; } @@ -529,8 +532,8 @@ mod test { assert!(gap > 3); let num_entries = 10; let shared_blobs = make_tiny_test_entries(num_entries).to_blobs(); - for (b, i) in shared_blobs.iter().zip(0..shared_blobs.len() as u64) { - b.write().unwrap().set_index(i * gap).unwrap(); + for (i, b) in shared_blobs.iter().enumerate() { + b.write().unwrap().set_index(i as u64 * gap).unwrap(); } let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); diff --git a/src/fullnode.rs b/src/fullnode.rs index 4feddd3b2..9f61681d3 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -3,6 +3,7 @@ use bank::Bank; use broadcast_stage::BroadcastStage; use cluster_info::{ClusterInfo, Node, NodeInfo}; +use db_ledger::{write_entries_to_ledger, DbLedger}; use leader_scheduler::LeaderScheduler; use ledger::read_ledger; use ncp::Ncp; @@ -106,6 +107,7 @@ pub struct Fullnode { broadcast_socket: UdpSocket, rpc_addr: SocketAddr, rpc_pubsub_addr: SocketAddr, + db_ledger: Arc>, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -258,6 +260,10 @@ impl Fullnode { .expect("Leader not known after processing bank"); cluster_info.write().unwrap().set_leader(scheduled_leader); + + // Create the RocksDb ledger + let db_ledger = Self::make_db_ledger(ledger_path); + let node_role = if scheduled_leader != keypair.pubkey() { // Start in validator mode. let tvu = Tvu::new( @@ -267,7 +273,6 @@ impl Fullnode { entry_height, *last_entry_id, cluster_info.clone(), - shared_window.clone(), node.sockets .replicate .iter() @@ -282,6 +287,7 @@ impl Fullnode { .try_clone() .expect("Failed to clone retransmit socket"), Some(ledger_path), + db_ledger.clone(), ); let tpu_forwarder = TpuForwarder::new( node.sockets @@ -352,6 +358,7 @@ impl Fullnode { broadcast_socket: node.sockets.broadcast, rpc_addr, rpc_pubsub_addr, + db_ledger, } } @@ -423,7 +430,6 @@ impl Fullnode { entry_height, last_entry_id, self.cluster_info.clone(), - self.shared_window.clone(), self.replicate_socket .iter() .map(|s| s.try_clone().expect("Failed to clone replicate sockets")) @@ -435,6 +441,7 @@ impl Fullnode { .try_clone() .expect("Failed to clone retransmit socket"), Some(&self.ledger_path), + self.db_ledger.clone(), ); let tpu_forwarder = TpuForwarder::new( self.transaction_sockets @@ -589,6 +596,19 @@ impl Fullnode { ), ) } + + fn make_db_ledger(ledger_path: &str) -> Arc> { + // Destroy any existing instances of the RocksDb ledger + DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); + let ledger_entries = read_ledger(ledger_path, true) + .expect("opening ledger") + .map(|entry| entry.unwrap()); + + write_entries_to_ledger(&[ledger_path], ledger_entries); + let db = + DbLedger::open(ledger_path).expect("Expected to successfully open database ledger"); + Arc::new(RwLock::new(db)) + } } impl Service for Fullnode { @@ -626,9 +646,10 @@ impl Service for Fullnode { mod tests { use bank::Bank; use cluster_info::Node; + use db_ledger::*; use fullnode::{Fullnode, FullnodeReturnType, NodeRole, TvuReturnType}; use leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig}; - use ledger::{create_tmp_genesis, create_tmp_sample_ledger, LedgerWriter}; + use ledger::{create_tmp_genesis, create_tmp_sample_ledger, tmp_copy_ledger, LedgerWriter}; use packet::make_consecutive_blobs; use service::Service; use signature::{Keypair, KeypairUtil}; @@ -839,6 +860,13 @@ mod tests { + num_ending_ticks as u64; ledger_writer.write_entries(&active_set_entries).unwrap(); + let validator_ledger_path = + tmp_copy_ledger(&bootstrap_leader_ledger_path, "test_wrong_role_transition"); + let ledger_paths = vec![ + bootstrap_leader_ledger_path.clone(), + validator_ledger_path.clone(), + ]; + // Create the common leader scheduling configuration let num_slots_per_epoch = 3; let leader_rotation_interval = 5; @@ -855,45 +883,53 @@ mod tests { Some(genesis_tick_height), ); - // Test that a node knows to transition to a validator based on parsing the ledger - let leader_vote_account_keypair = Arc::new(Keypair::new()); - let bootstrap_leader = Fullnode::new( - bootstrap_leader_node, - &bootstrap_leader_ledger_path, - bootstrap_leader_keypair, - leader_vote_account_keypair, - Some(bootstrap_leader_info.ncp), - false, - LeaderScheduler::new(&leader_scheduler_config), - None, - ); + { + // Test that a node knows to transition to a validator based on parsing the ledger + let leader_vote_account_keypair = Arc::new(Keypair::new()); + let bootstrap_leader = Fullnode::new( + bootstrap_leader_node, + &bootstrap_leader_ledger_path, + bootstrap_leader_keypair, + leader_vote_account_keypair, + Some(bootstrap_leader_info.ncp), + false, + LeaderScheduler::new(&leader_scheduler_config), + None, + ); - match bootstrap_leader.node_role { - Some(NodeRole::Validator(_)) => (), - _ => { - panic!("Expected bootstrap leader to be a validator"); + match bootstrap_leader.node_role { + Some(NodeRole::Validator(_)) => (), + _ => { + panic!("Expected bootstrap leader to be a validator"); + } } - } - // Test that a node knows to transition to a leader based on parsing the ledger - let validator = Fullnode::new( - validator_node, - &bootstrap_leader_ledger_path, - Arc::new(validator_keypair), - Arc::new(validator_vote_account_keypair), - Some(bootstrap_leader_info.ncp), - false, - LeaderScheduler::new(&leader_scheduler_config), - None, - ); + // Test that a node knows to transition to a leader based on parsing the ledger + let validator = Fullnode::new( + validator_node, + &validator_ledger_path, + Arc::new(validator_keypair), + Arc::new(validator_vote_account_keypair), + Some(bootstrap_leader_info.ncp), + false, + LeaderScheduler::new(&leader_scheduler_config), + None, + ); - match validator.node_role { - Some(NodeRole::Leader(_)) => (), - _ => { - panic!("Expected node to be the leader"); + match validator.node_role { + Some(NodeRole::Leader(_)) => (), + _ => { + panic!("Expected node to be the leader"); + } } + + validator.close().expect("Expected node to close"); + bootstrap_leader.close().expect("Expected node to close"); + } + for path in ledger_paths { + DbLedger::destroy(&path).expect("Expected successful database destruction"); + let _ignored = remove_dir_all(&path); } - let _ignored = remove_dir_all(&bootstrap_leader_ledger_path); } #[test] @@ -1035,6 +1071,8 @@ mod tests { // Shut down t_responder.join().expect("responder thread join"); validator.close().unwrap(); - remove_dir_all(&validator_ledger_path).unwrap(); + DbLedger::destroy(&validator_ledger_path) + .expect("Expected successful database destruction"); + let _ignored = remove_dir_all(&validator_ledger_path).unwrap(); } } diff --git a/src/ledger.rs b/src/ledger.rs index e4f4f0e37..578fb6297 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -13,7 +13,7 @@ use rayon::prelude::*; use signature::{Keypair, KeypairUtil}; use solana_sdk::hash::{hash, Hash}; use solana_sdk::pubkey::Pubkey; -use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions}; +use std::fs::{copy, create_dir_all, remove_dir_all, File, OpenOptions}; use std::io::prelude::*; use std::io::{self, BufReader, BufWriter, Seek, SeekFrom}; use std::mem::size_of; @@ -638,6 +638,22 @@ pub fn create_tmp_sample_ledger( (mint, path, genesis) } +pub fn tmp_copy_ledger(from: &str, name: &str) -> String { + let tostr = get_tmp_ledger_path(name); + + { + let to = Path::new(&tostr); + let from = Path::new(&from); + + create_dir_all(to).unwrap(); + + copy(from.join("data"), to.join("data")).unwrap(); + copy(from.join("index"), to.join("index")).unwrap(); + } + + tostr +} + pub fn make_tiny_test_entries(num: usize) -> Vec { let zero = Hash::default(); let one = hash(&zero.as_ref()); diff --git a/src/replicator.rs b/src/replicator.rs index c6a8563fb..e8c525532 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -1,5 +1,6 @@ use blob_fetch_stage::BlobFetchStage; use cluster_info::{ClusterInfo, Node, NodeInfo}; +use db_ledger::DbLedger; use leader_scheduler::LeaderScheduler; use ncp::Ncp; use service::Service; @@ -104,9 +105,20 @@ impl Replicator { let (entry_window_sender, entry_window_receiver) = channel(); // todo: pull blobs off the retransmit_receiver and recycle them? let (retransmit_sender, retransmit_receiver) = channel(); + + // Create the RocksDb ledger, eventually will simply repurpose the input + // ledger path as the RocksDb ledger path once we replace the ledger with + // RocksDb. Note for now, this ledger will not contain any of the existing entries + // in the ledger located at ledger_path, and will only append on newly received + // entries after being passed to window_service + let db_ledger = Arc::new(RwLock::new( + DbLedger::open(&ledger_path.unwrap()) + .expect("Expected to be able to open database ledger"), + )); + let t_window = window_service( + db_ledger, cluster_info.clone(), - shared_window.clone(), 0, entry_height, max_entry_height, @@ -165,6 +177,7 @@ impl Replicator { mod tests { use client::mk_client; use cluster_info::Node; + use db_ledger::DbLedger; use fullnode::Fullnode; use leader_scheduler::LeaderScheduler; use ledger::{create_tmp_genesis, get_tmp_ledger_path, read_ledger}; @@ -204,67 +217,73 @@ mod tests { let (mint, leader_ledger_path) = create_tmp_genesis(leader_ledger_path, 100, leader_info.id, 1); - let leader = Fullnode::new( - leader_node, - &leader_ledger_path, - leader_keypair, - vote_account_keypair, - None, - false, - LeaderScheduler::from_bootstrap_leader(leader_info.id), - None, - ); + { + let leader = Fullnode::new( + leader_node, + &leader_ledger_path, + leader_keypair, + vote_account_keypair, + None, + false, + LeaderScheduler::from_bootstrap_leader(leader_info.id), + None, + ); - let mut leader_client = mk_client(&leader_info); + let mut leader_client = mk_client(&leader_info); - let bob = Keypair::new(); + let bob = Keypair::new(); - let last_id = leader_client.get_last_id(); - leader_client - .transfer(1, &mint.keypair(), bob.pubkey(), &last_id) - .unwrap(); - - let replicator_keypair = Keypair::new(); - - info!("starting replicator node"); - let replicator_node = Node::new_localhost_with_pubkey(replicator_keypair.pubkey()); - let (replicator, _leader_info) = Replicator::new( - entry_height, - 1, - &exit, - Some(replicator_ledger_path), - replicator_node, - Some(network_addr), - done.clone(), - ); - - let mut num_entries = 0; - for _ in 0..60 { - match read_ledger(replicator_ledger_path, true) { - Ok(entries) => { - for _ in entries { - num_entries += 1; - } - info!("{} entries", num_entries); - if num_entries > 0 { - break; - } - } - Err(e) => { - info!("error reading ledger: {:?}", e); - } - } - sleep(Duration::from_millis(300)); let last_id = leader_client.get_last_id(); leader_client .transfer(1, &mint.keypair(), bob.pubkey(), &last_id) .unwrap(); + + let replicator_keypair = Keypair::new(); + + info!("starting replicator node"); + let replicator_node = Node::new_localhost_with_pubkey(replicator_keypair.pubkey()); + let (replicator, _leader_info) = Replicator::new( + entry_height, + 1, + &exit, + Some(replicator_ledger_path), + replicator_node, + Some(network_addr), + done.clone(), + ); + + let mut num_entries = 0; + for _ in 0..60 { + match read_ledger(replicator_ledger_path, true) { + Ok(entries) => { + for _ in entries { + num_entries += 1; + } + info!("{} entries", num_entries); + if num_entries > 0 { + break; + } + } + Err(e) => { + info!("error reading ledger: {:?}", e); + } + } + sleep(Duration::from_millis(300)); + let last_id = leader_client.get_last_id(); + leader_client + .transfer(1, &mint.keypair(), bob.pubkey(), &last_id) + .unwrap(); + } + assert_eq!(done.load(Ordering::Relaxed), true); + assert!(num_entries > 0); + exit.store(true, Ordering::Relaxed); + replicator.join(); + leader.exit(); } - assert_eq!(done.load(Ordering::Relaxed), true); - assert!(num_entries > 0); - exit.store(true, Ordering::Relaxed); - replicator.join(); - leader.exit(); + + DbLedger::destroy(&leader_ledger_path).expect("Expected successful database destuction"); + DbLedger::destroy(&replicator_ledger_path) + .expect("Expected successful database destuction"); let _ignored = remove_dir_all(&leader_ledger_path); let _ignored = remove_dir_all(&replicator_ledger_path); } diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index 9f462336e..5f5aacd7f 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -2,6 +2,7 @@ use cluster_info::ClusterInfo; use counter::Counter; +use db_ledger::DbLedger; use entry::Entry; use leader_scheduler::LeaderScheduler; @@ -17,7 +18,6 @@ use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; use streamer::BlobReceiver; -use window::SharedWindow; use window_service::window_service; fn retransmit( @@ -81,8 +81,8 @@ pub struct RetransmitStage { impl RetransmitStage { pub fn new( + db_ledger: Arc>, cluster_info: &Arc>, - window: SharedWindow, tick_height: u64, entry_height: u64, retransmit_socket: Arc, @@ -97,8 +97,8 @@ impl RetransmitStage { let (entry_sender, entry_receiver) = channel(); let done = Arc::new(AtomicBool::new(false)); let t_window = window_service( + db_ledger, cluster_info.clone(), - window, tick_height, entry_height, 0, diff --git a/src/tvu.rs b/src/tvu.rs index 45a9da86f..e5e69c37a 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -13,6 +13,7 @@ use bank::Bank; use blob_fetch_stage::BlobFetchStage; use cluster_info::ClusterInfo; +use db_ledger::DbLedger; use ledger_write_stage::LedgerWriteStage; use replicate_stage::{ReplicateStage, ReplicateStageReturnType}; use retransmit_stage::RetransmitStage; @@ -24,7 +25,6 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread; use storage_stage::{StorageStage, StorageState}; -use window::SharedWindow; #[derive(Debug, PartialEq, Eq, Clone)] pub enum TvuReturnType { @@ -62,11 +62,11 @@ impl Tvu { entry_height: u64, last_entry_id: Hash, cluster_info: Arc>, - window: SharedWindow, replicate_sockets: Vec, repair_socket: UdpSocket, retransmit_socket: UdpSocket, ledger_path: Option<&str>, + db_ledger: Arc>, ) -> Self { let exit = Arc::new(AtomicBool::new(false)); @@ -76,12 +76,13 @@ impl Tvu { blob_sockets.push(repair_socket.clone()); let (fetch_stage, blob_fetch_receiver) = BlobFetchStage::new_multi_socket(blob_sockets, exit.clone()); + //TODO //the packets coming out of blob_receiver need to be sent to the GPU and verified //then sent to the window, which does the erasure coding reconstruction let (retransmit_stage, blob_window_receiver) = RetransmitStage::new( + db_ledger, &cluster_info, - window, bank.tick_height(), entry_height, Arc::new(retransmit_socket), @@ -166,15 +167,19 @@ pub mod tests { use bank::Bank; use bincode::serialize; use cluster_info::{ClusterInfo, Node}; + use db_ledger::DbLedger; use entry::Entry; use leader_scheduler::LeaderScheduler; + use ledger::get_tmp_ledger_path; use logger; use mint::Mint; use ncp::Ncp; use packet::SharedBlob; + use rocksdb::{Options, DB}; use service::Service; use signature::{Keypair, KeypairUtil}; use solana_sdk::hash::Hash; + use std::fs::remove_dir_all; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; @@ -262,6 +267,9 @@ pub mod tests { let vote_account_keypair = Arc::new(Keypair::new()); let mut cur_hash = Hash::default(); + let db_ledger_path = get_tmp_ledger_path("test_replicate"); + let db_ledger = + DbLedger::open(&db_ledger_path).expect("Expected to successfully open ledger"); let tvu = Tvu::new( Arc::new(target1_keypair), vote_account_keypair, @@ -269,11 +277,11 @@ pub mod tests { 0, cur_hash, cref1, - dr_1.1, target1.sockets.replicate, target1.sockets.repair, target1.sockets.retransmit, None, + Arc::new(RwLock::new(db_ledger)), ); let mut alice_ref_balance = starting_balance; @@ -346,5 +354,8 @@ pub mod tests { dr_1.0.join().expect("join"); t_receiver.join().expect("join"); t_responder.join().expect("join"); + DB::destroy(&Options::default(), &db_ledger_path) + .expect("Expected successful database destuction"); + let _ignored = remove_dir_all(&db_ledger_path); } } diff --git a/src/window_service.rs b/src/window_service.rs index abfc9293b..d4ec1379d 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -1,25 +1,26 @@ //! The `window_service` provides a thread for maintaining a window (tail of the ledger). //! -use cluster_info::{ClusterInfo, NodeInfo}; +use cluster_info::ClusterInfo; use counter::Counter; +use db_ledger::{DbLedger, LedgerColumnFamily, MetaCf, DEFAULT_SLOT_HEIGHT}; +use db_window::*; use entry::EntrySender; use leader_scheduler::LeaderScheduler; use log::Level; -use packet::SharedBlob; use rand::{thread_rng, Rng}; use result::{Error, Result}; use solana_metrics::{influxdb, submit}; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::duration_as_ms; +use std::borrow::{Borrow, BorrowMut}; use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::mpsc::RecvTimeoutError; use std::sync::{Arc, RwLock}; use std::thread::{Builder, JoinHandle}; use std::time::{Duration, Instant}; use streamer::{BlobReceiver, BlobSender}; -use window::{SharedWindow, WindowUtil}; pub const MAX_REPAIR_BACKOFF: usize = 128; @@ -49,119 +50,21 @@ fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool { thread_rng().gen_range(0, *times as u64) == 0 } -fn add_block_to_retransmit_queue( - b: &SharedBlob, - leader_id: Pubkey, - retransmit_queue: &mut Vec, -) { - let p = b.read().unwrap(); - //TODO this check isn't safe against adverserial packets - //we need to maintain a sequence window - trace!( - "idx: {} addr: {:?} id: {:?} leader: {:?}", - p.index() - .expect("get_index in fn add_block_to_retransmit_queue"), - p.id() - .expect("get_id in trace! fn add_block_to_retransmit_queue"), - p.meta.addr(), - leader_id - ); - if p.id().expect("get_id in fn add_block_to_retransmit_queue") == leader_id { - //TODO - //need to copy the retransmitted blob - //otherwise we get into races with which thread - //should do the recycling - // - let nv = SharedBlob::default(); - { - let mut mnv = nv.write().unwrap(); - let sz = p.meta.size; - mnv.meta.size = sz; - mnv.data[..sz].copy_from_slice(&p.data[..sz]); - } - retransmit_queue.push(nv); - } -} - -fn retransmit_all_leader_blocks( - window: &SharedWindow, - maybe_leader: Option, - dq: &[SharedBlob], - id: &Pubkey, - consumed: u64, - received: u64, - retransmit: &BlobSender, - pending_retransmits: &mut bool, -) -> Result<()> { - let mut retransmit_queue: Vec = Vec::new(); - if let Some(leader) = maybe_leader { - let leader_id = leader.id; - for b in dq { - add_block_to_retransmit_queue(b, leader_id, &mut retransmit_queue); - } - - if *pending_retransmits { - for w in window - .write() - .expect("Window write failed in retransmit_all_leader_blocks") - .iter_mut() - { - *pending_retransmits = false; - if w.leader_unknown { - if let Some(ref b) = w.data { - add_block_to_retransmit_queue(b, leader_id, &mut retransmit_queue); - w.leader_unknown = false; - } - } - } - } - submit( - influxdb::Point::new("retransmit-queue") - .add_field( - "count", - influxdb::Value::Integer(retransmit_queue.len() as i64), - ).to_owned(), - ); - } else { - warn!("{}: no leader to retransmit from", id); - } - if !retransmit_queue.is_empty() { - trace!( - "{}: RECV_WINDOW {} {}: retransmit {}", - id, - consumed, - received, - retransmit_queue.len(), - ); - inc_new_counter_info!("streamer-recv_window-retransmit", retransmit_queue.len()); - retransmit.send(retransmit_queue)?; - } - Ok(()) -} - #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] fn recv_window( - window: &SharedWindow, + db_ledger: &mut DbLedger, id: &Pubkey, - cluster_info: &Arc>, - consumed: &mut u64, - received: &mut u64, + leader_scheduler: &LeaderScheduler, tick_height: &mut u64, max_ix: u64, r: &BlobReceiver, s: &EntrySender, retransmit: &BlobSender, - pending_retransmits: &mut bool, done: &Arc, ) -> Result<()> { let timer = Duration::from_millis(200); let mut dq = r.recv_timeout(timer)?; - let maybe_leader: Option = cluster_info - .read() - .expect("'cluster_info' read lock in fn recv_window") - .leader_data() - .cloned(); - let leader_unknown = maybe_leader.is_none(); + while let Ok(mut nq) = r.try_recv() { dq.append(&mut nq) } @@ -174,80 +77,41 @@ fn recv_window( .to_owned(), ); - trace!( - "{}: RECV_WINDOW {} {}: got packets {}", - id, - *consumed, - *received, - dq.len(), - ); - - retransmit_all_leader_blocks( - window, - maybe_leader, - &dq, - id, - *consumed, - *received, - retransmit, - pending_retransmits, - )?; + retransmit_all_leader_blocks(&dq, leader_scheduler, retransmit)?; let mut pixs = Vec::new(); //send a contiguous set of blocks let mut consume_queue = Vec::new(); + + trace!("{} num blobs received: {}", id, dq.len()); + for b in dq { let (pix, meta_size) = { let p = b.read().unwrap(); (p.index()?, p.meta.size) }; + pixs.push(pix); - if !window - .read() - .unwrap() - .blob_idx_in_window(&id, pix, *consumed, received) - { - continue; - } - - // For downloading storage blobs, - // we only want up to a certain index - // then stop - if max_ix != 0 && pix > max_ix { - continue; - } - trace!("{} window pix: {} size: {}", id, pix, meta_size); - window.write().unwrap().process_blob( - id, - b, + let _ = process_blob( + leader_scheduler, + db_ledger, + &b, + max_ix, pix, &mut consume_queue, - consumed, tick_height, - leader_unknown, - pending_retransmits, + done, ); + } + + trace!( + "Elapsed processing time in recv_window(): {}", + duration_as_ms(&now.elapsed()) + ); - // Send a signal when we hit the max entry_height - if max_ix != 0 && *consumed == (max_ix + 1) { - done.store(true, Ordering::Relaxed); - } - } - if log_enabled!(Level::Trace) { - trace!("{}", window.read().unwrap().print(id, *consumed)); - trace!( - "{}: consumed: {} received: {} sending consume.len: {} pixs: {:?} took {} ms", - id, - *consumed, - *received, - consume_queue.len(), - pixs, - duration_as_ms(&now.elapsed()) - ); - } if !consume_queue.is_empty() { inc_new_counter_info!("streamer-recv_window-consume", consume_queue.len()); s.send(consume_queue)?; @@ -257,8 +121,8 @@ fn recv_window( #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] pub fn window_service( + db_ledger: Arc>, cluster_info: Arc>, - window: SharedWindow, tick_height: u64, entry_height: u64, max_entry_height: u64, @@ -273,27 +137,20 @@ pub fn window_service( .name("solana-window".to_string()) .spawn(move || { let mut tick_height_ = tick_height; - let mut consumed = entry_height; - let mut received = entry_height; let mut last = entry_height; let mut times = 0; - let id = cluster_info.read().unwrap().my_data().id; - let mut pending_retransmits = false; + let id = cluster_info.read().unwrap().id(); trace!("{}: RECV_WINDOW started", id); loop { - // Check if leader rotation was configured if let Err(e) = recv_window( - &window, + db_ledger.write().unwrap().borrow_mut(), &id, - &cluster_info, - &mut consumed, - &mut received, + leader_scheduler.read().unwrap().borrow(), &mut tick_height_, max_entry_height, &r, &s, &retransmit, - &mut pending_retransmits, &done, ) { match e { @@ -306,45 +163,62 @@ pub fn window_service( } } - submit( - influxdb::Point::new("window-stage") - .add_field("consumed", influxdb::Value::Integer(consumed as i64)) - .to_owned(), - ); + let meta = { + let rlock = db_ledger.read().unwrap(); - if received <= consumed { - trace!( - "{} we have everything received:{} consumed:{}", - id, - received, - consumed + rlock + .meta_cf + .get(&rlock.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT)) + }; + + if let Ok(Some(meta)) = meta { + let received = meta.received; + let consumed = meta.consumed; + + submit( + influxdb::Point::new("window-stage") + .add_field("consumed", influxdb::Value::Integer(consumed as i64)) + .to_owned(), ); - continue; - } - //exponential backoff - if !repair_backoff(&mut last, &mut times, consumed) { - trace!("{} !repair_backoff() times = {}", id, times); - continue; - } - trace!("{} let's repair! times = {}", id, times); + // Consumed should never be bigger than received + assert!(consumed <= received); + if received == consumed { + trace!( + "{} we have everything received: {} consumed: {}", + id, + received, + consumed + ); + continue; + } - let mut window = window.write().unwrap(); - let reqs = window.repair( - &cluster_info, - &id, - times, - consumed, - received, - tick_height_, - max_entry_height, - &leader_scheduler, - ); - for (to, req) in reqs { - repair_socket.send_to(&req, to).unwrap_or_else(|e| { - info!("{} repair req send_to({}) error {:?}", id, to, e); - 0 - }); + //exponential backoff + if !repair_backoff(&mut last, &mut times, consumed) { + trace!("{} !repair_backoff() times = {}", id, times); + continue; + } + trace!("{} let's repair! times = {}", id, times); + + let reqs = repair( + DEFAULT_SLOT_HEIGHT, + db_ledger.read().unwrap().borrow(), + &cluster_info, + &id, + times, + tick_height_, + max_entry_height, + &leader_scheduler, + ); + + if let Ok(reqs) = reqs { + for (to, req) in reqs { + repair_socket.send_to(&req, to).unwrap_or_else(|e| { + info!("{} repair req send_to({}) error {:?}", id, to, e); + 0 + }); + } + } } } }).unwrap() @@ -353,18 +227,21 @@ pub fn window_service( #[cfg(test)] mod test { use cluster_info::{ClusterInfo, Node}; + use db_ledger::DbLedger; use entry::Entry; use leader_scheduler::LeaderScheduler; + use ledger::get_tmp_ledger_path; use logger; use packet::{make_consecutive_blobs, SharedBlob, PACKET_DATA_SIZE}; + use rocksdb::{Options, DB}; use solana_sdk::hash::Hash; + use std::fs::remove_dir_all; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, RwLock}; use std::time::Duration; use streamer::{blob_receiver, responder}; - use window::default_window; use window_service::{repair_backoff, window_service}; fn get_entries(r: Receiver>, num: &mut usize) { @@ -396,11 +273,14 @@ mod test { let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); let (s_window, r_window) = channel(); let (s_retransmit, r_retransmit) = channel(); - let win = Arc::new(RwLock::new(default_window())); let done = Arc::new(AtomicBool::new(false)); + let db_ledger_path = get_tmp_ledger_path("window_send_test"); + let db_ledger = Arc::new(RwLock::new( + DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"), + )); let t_window = window_service( + db_ledger, subs, - win, 0, 0, 0, @@ -444,10 +324,13 @@ mod test { t_receiver.join().expect("join"); t_responder.join().expect("join"); t_window.join().expect("join"); + DB::destroy(&Options::default(), &db_ledger_path) + .expect("Expected successful database destuction"); + let _ignored = remove_dir_all(&db_ledger_path); } #[test] - pub fn window_send_no_leader_test() { + pub fn window_send_leader_test2() { logger::setup(); let tn = Node::new_localhost(); let exit = Arc::new(AtomicBool::new(false)); @@ -459,11 +342,14 @@ mod test { let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); let (s_window, _r_window) = channel(); let (s_retransmit, r_retransmit) = channel(); - let win = Arc::new(RwLock::new(default_window())); let done = Arc::new(AtomicBool::new(false)); + let db_ledger_path = get_tmp_ledger_path("window_send_late_leader_test"); + let db_ledger = Arc::new(RwLock::new( + DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"), + )); let t_window = window_service( + db_ledger, subs.clone(), - win, 0, 0, 0, @@ -471,13 +357,7 @@ mod test { s_window, s_retransmit, Arc::new(tn.sockets.repair), - // TODO: For now, the window still checks the ClusterInfo for the current leader - // to determine whether to retransmit a block. In the future when we rely on - // the LeaderScheduler for retransmits, this test will need to be rewritten - // because a leader should only be unknown in the window when the write stage - // hasn't yet calculated the leaders for slots in the next epoch (on entries - // at heights that are multiples of seed_rotation_interval in LeaderScheduler) - Arc::new(RwLock::new(LeaderScheduler::default())), + Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))), done, ); let t_responder = { @@ -500,75 +380,8 @@ mod test { msgs.push(b); } s_responder.send(msgs).expect("send"); - t_responder - }; - - assert!(r_retransmit.recv_timeout(Duration::new(3, 0)).is_err()); - exit.store(true, Ordering::Relaxed); - t_receiver.join().expect("join"); - t_responder.join().expect("join"); - t_window.join().expect("join"); - } - - #[test] - pub fn window_send_late_leader_test() { - logger::setup(); - let tn = Node::new_localhost(); - let exit = Arc::new(AtomicBool::new(false)); - let cluster_info_me = ClusterInfo::new(tn.info.clone()); - let me_id = cluster_info_me.my_data().id; - let subs = Arc::new(RwLock::new(cluster_info_me)); - - let (s_reader, r_reader) = channel(); - let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); - let (s_window, _r_window) = channel(); - let (s_retransmit, r_retransmit) = channel(); - let win = Arc::new(RwLock::new(default_window())); - let done = Arc::new(AtomicBool::new(false)); - let t_window = window_service( - subs.clone(), - win, - 0, - 0, - 0, - r_reader, - s_window, - s_retransmit, - Arc::new(tn.sockets.repair), - // TODO: For now, the window still checks the ClusterInfo for the current leader - // to determine whether to retransmit a block. In the future when we rely on - // the LeaderScheduler for retransmits, this test will need to be rewritten - // becasue a leader should only be unknown in the window when the write stage - // hasn't yet calculated the leaders for slots in the next epoch (on entries - // at heights that are multiples of seed_rotation_interval in LeaderScheduler) - Arc::new(RwLock::new(LeaderScheduler::default())), - done, - ); - let t_responder = { - let (s_responder, r_responder) = channel(); - let blob_sockets: Vec> = - tn.sockets.replicate.into_iter().map(Arc::new).collect(); - let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); - let mut msgs = Vec::new(); - for v in 0..10 { - let i = 9 - v; - let b = SharedBlob::default(); - { - let mut w = b.write().unwrap(); - w.set_index(i).unwrap(); - w.set_id(&me_id).unwrap(); - assert_eq!(i, w.index().unwrap()); - w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(&tn.info.ncp); - } - msgs.push(b); - } - s_responder.send(msgs).expect("send"); - - assert!(r_retransmit.recv_timeout(Duration::new(3, 0)).is_err()); subs.write().unwrap().set_leader(me_id); - let mut msgs1 = Vec::new(); for v in 1..5 { let i = 9 + v; @@ -595,6 +408,9 @@ mod test { t_receiver.join().expect("join"); t_responder.join().expect("join"); t_window.join().expect("join"); + DB::destroy(&Options::default(), &db_ledger_path) + .expect("Expected successful database destuction"); + let _ignored = remove_dir_all(&db_ledger_path); } #[test] diff --git a/tests/multinode.rs b/tests/multinode.rs index 9a68508c1..9caf30e54 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -9,11 +9,12 @@ extern crate solana_sdk; use solana::blob_fetch_stage::BlobFetchStage; use solana::cluster_info::{ClusterInfo, Node, NodeInfo}; use solana::contact_info::ContactInfo; +use solana::db_ledger::DbLedger; use solana::entry::{reconstruct_entries_from_blobs, Entry}; use solana::fullnode::{Fullnode, FullnodeReturnType}; use solana::leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig}; use solana::ledger::{ - create_tmp_genesis, create_tmp_sample_ledger, get_tmp_ledger_path, read_ledger, LedgerWindow, + create_tmp_genesis, create_tmp_sample_ledger, read_ledger, tmp_copy_ledger, LedgerWindow, LedgerWriter, }; use solana::logger; @@ -33,9 +34,8 @@ use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::{duration_as_ms, duration_as_s}; use std::collections::{HashSet, VecDeque}; use std::env; -use std::fs::{copy, create_dir_all, remove_dir_all}; +use std::fs::remove_dir_all; use std::net::UdpSocket; -use std::path::Path; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::{sleep, Builder, JoinHandle}; @@ -110,22 +110,6 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { rv } -fn tmp_copy_ledger(from: &str, name: &str) -> String { - let tostr = get_tmp_ledger_path(name); - - { - let to = Path::new(&tostr); - let from = Path::new(&from); - - create_dir_all(to).unwrap(); - - copy(from.join("data"), to.join("data")).unwrap(); - copy(from.join("index"), to.join("index")).unwrap(); - } - - tostr -} - fn make_tiny_test_entries(start_hash: Hash, num: usize) -> Vec { let mut id = start_hash; let mut num_hashes = 0; @@ -1087,6 +1071,7 @@ fn test_leader_validator_basic() { assert!(min_len >= bootstrap_height); for path in ledger_paths { + DbLedger::destroy(&path).expect("Expected successful database destruction"); remove_dir_all(path).unwrap(); } } @@ -1346,28 +1331,20 @@ fn test_full_leader_validator_network() { Some(bootstrap_height), Some(leader_rotation_interval), Some(seed_rotation_interval), - Some(leader_rotation_interval), + Some(100), ); let exit = Arc::new(AtomicBool::new(false)); - // Start the bootstrap leader fullnode - let bootstrap_leader = Arc::new(RwLock::new(Fullnode::new( - bootstrap_leader_node, - &bootstrap_leader_ledger_path, - Arc::new(node_keypairs.pop_front().unwrap()), - Arc::new(vote_account_keypairs.pop_front().unwrap()), - Some(bootstrap_leader_info.ncp), - false, - LeaderScheduler::new(&leader_scheduler_config), - None, - ))); - let mut nodes: Vec>> = vec![bootstrap_leader.clone()]; - let mut t_nodes = vec![run_node( - bootstrap_leader_info.id, - bootstrap_leader, - exit.clone(), - )]; + // Postpone starting the leader until after the validators are up and running + // to avoid + // 1) Scenario where leader rotates before validators can start up + // 2) Modifying the leader ledger which validators are going to be copying + // during startup + let leader_keypair = node_keypairs.pop_front().unwrap(); + let leader_vote_keypair = vote_account_keypairs.pop_front().unwrap(); + let mut nodes: Vec>> = vec![]; + let mut t_nodes = vec![]; // Start up the validators for kp in node_keypairs.into_iter() { @@ -1375,7 +1352,9 @@ fn test_full_leader_validator_network() { &bootstrap_leader_ledger_path, "test_full_leader_validator_network", ); + ledger_paths.push(validator_ledger_path.clone()); + let validator_id = kp.pubkey(); let validator_node = Node::new_localhost_with_pubkey(validator_id); let validator = Arc::new(RwLock::new(Fullnode::new( @@ -1393,6 +1372,25 @@ fn test_full_leader_validator_network() { t_nodes.push(run_node(validator_id, validator, exit.clone())); } + // Start up the bootstrap leader + let bootstrap_leader = Arc::new(RwLock::new(Fullnode::new( + bootstrap_leader_node, + &bootstrap_leader_ledger_path, + Arc::new(leader_keypair), + Arc::new(leader_vote_keypair), + Some(bootstrap_leader_info.ncp), + false, + LeaderScheduler::new(&leader_scheduler_config), + None, + ))); + + nodes.push(bootstrap_leader.clone()); + t_nodes.push(run_node( + bootstrap_leader_info.id, + bootstrap_leader, + exit.clone(), + )); + // Wait for convergence let num_converged = converge(&bootstrap_leader_info, N + 1).len(); assert_eq!(num_converged, N + 1); @@ -1495,7 +1493,9 @@ fn test_full_leader_validator_network() { } assert!(shortest.unwrap() >= target_height); + for path in ledger_paths { + DbLedger::destroy(&path).expect("Expected successful database destruction"); remove_dir_all(path).unwrap(); } }