From b38bf90de7823d35a5a1d14a07be08ea8dd85e87 Mon Sep 17 00:00:00 2001 From: carllin Date: Mon, 21 Oct 2019 16:15:10 -0700 Subject: [PATCH] Deshred blocks in parallel (#6461) * Deshred in parallel * Add tests for corrupt slots and parallel deshred * Rename load_blocktree_entries to load_blocktree_entries_with_shred_count --- core/src/broadcast_stage.rs | 2 +- core/src/replay_stage.rs | 39 ++--- ledger/src/blocktree.rs | 323 ++++++++++++++++++++++++++--------- ledger/src/blocktree/meta.rs | 3 + 4 files changed, 266 insertions(+), 101 deletions(-) diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 6fa1b9fb3..cdfd535ad 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -298,7 +298,7 @@ mod test { ); let blocktree = broadcast_service.blocktree; - let (entries, _, _, _) = blocktree + let (entries, _) = blocktree .get_slot_entries_with_shred_count(slot, 0) .expect("Expect entries to be present"); assert_eq!(entries.len(), max_tick_height as usize); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 2ef10a49a..d8910d0c5 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -415,25 +415,25 @@ impl ReplayStage { .entry(bank.slot()) .or_insert_with(|| ForkProgress::new(bank.slot(), bank.last_blockhash())); let now = Instant::now(); - let load_result = Self::load_blocktree_entries(bank, blocktree, bank_progress); + let load_result = + Self::load_blocktree_entries_with_shred_count(bank, blocktree, bank_progress); let fetch_entries_elapsed = now.elapsed().as_micros(); - if load_result.is_err() { bank_progress.stats.fetch_entries_fail_elapsed += fetch_entries_elapsed as u64; + } else { + bank_progress.stats.fetch_entries_elapsed += fetch_entries_elapsed as u64; } - let replay_result = - load_result.and_then(|(entries, num_shreds, useful_time, wasted_time)| { - trace!( - "Fetch entries for slot {}, {:?} entries, num shreds {:?}", - bank.slot(), - entries.len(), - num_shreds - ); - tx_count += entries.iter().map(|e| e.transactions.len()).sum::(); - bank_progress.stats.fetch_entries_elapsed += useful_time as u64; - bank_progress.stats.fetch_entries_fail_elapsed += wasted_time as u64; - Self::replay_entries_into_bank(bank, entries, bank_progress, num_shreds) - }); + + let replay_result = load_result.and_then(|(entries, num_shreds)| { + trace!( + "Fetch entries for slot {}, {:?} entries, num shreds {:?}", + bank.slot(), + entries.len(), + num_shreds + ); + tx_count += entries.iter().map(|e| e.transactions.len()).sum::(); + Self::replay_entries_into_bank(bank, entries, bank_progress, num_shreds) + }); if Self::is_replay_result_fatal(&replay_result) { warn!( @@ -726,15 +726,15 @@ impl ReplayStage { }); } - fn load_blocktree_entries( + fn load_blocktree_entries_with_shred_count( bank: &Bank, blocktree: &Blocktree, bank_progress: &mut ForkProgress, - ) -> Result<(Vec, usize, u64, u64)> { + ) -> Result<(Vec, usize)> { let bank_slot = bank.slot(); - let entries_and_count = blocktree + let entries_and_shred_count = blocktree .get_slot_entries_with_shred_count(bank_slot, bank_progress.num_shreds as u64)?; - Ok(entries_and_count) + Ok(entries_and_shred_count) } fn replay_entries_into_bank( @@ -766,6 +766,7 @@ impl ReplayStage { ) -> Result<()> { let now = Instant::now(); let last_entry = &bank_progress.last_entry; + datapoint_info!("verify-batch-size", ("size", entries.len() as i64, i64)); let verify_result = entries.verify(last_entry); let verify_entries_elapsed = now.elapsed().as_micros(); bank_progress.stats.entry_verification_elapsed += verify_entries_elapsed as u64; diff --git a/ledger/src/blocktree.rs b/ledger/src/blocktree.rs index a495cb44a..9701dd843 100644 --- a/ledger/src/blocktree.rs +++ b/ledger/src/blocktree.rs @@ -7,11 +7,14 @@ use crate::shred::{Shred, Shredder}; use bincode::deserialize; -use std::collections::HashMap; - use log::*; +use rayon::iter::IntoParallelRefIterator; +use rayon::iter::ParallelIterator; +use rayon::ThreadPool; +use rocksdb; use solana_metrics::{datapoint_debug, datapoint_error}; +use solana_rayon_threadlimit::get_thread_count; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::Hash; @@ -19,13 +22,13 @@ use solana_sdk::signature::{Keypair, KeypairUtil}; use std::cell::RefCell; use std::cmp; +use std::collections::HashMap; use std::fs; use std::path::{Path, PathBuf}; use std::rc::Rc; use std::result; use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TrySendError}; use std::sync::{Arc, RwLock}; -use std::time::Instant; pub use self::meta::*; use crate::leader_schedule_cache::LeaderScheduleCache; @@ -45,6 +48,11 @@ type BatchProcessor = db::BatchProcessor; pub const BLOCKTREE_DIRECTORY: &str = "rocksdb"; +thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() + .num_threads(get_thread_count()) + .build() + .unwrap())); + pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000; pub type SlotMetaWorkingSetEntry = (Rc>, Option); @@ -727,6 +735,13 @@ impl Blocktree { false }; + let last_in_data = if shred.data_complete() { + debug!("got last in data"); + true + } else { + false + }; + if is_orphan(slot_meta) { slot_meta.parent_slot = parent; } @@ -754,7 +769,13 @@ impl Blocktree { // Commit step: commit all changes to the mutable structures at once, or none at all. // We don't want only a subset of these changes going through. write_batch.put_bytes::((slot, index), &shred.payload)?; - update_slot_meta(last_in_slot, slot_meta, index, new_consumed); + update_slot_meta( + last_in_slot, + last_in_data, + slot_meta, + index as u32, + new_consumed, + ); data_index.set_present(index, true); trace!("inserted shred into slot {:?} and index {:?}", slot, index); Ok(()) @@ -991,81 +1012,111 @@ impl Blocktree { pub fn get_slot_entries_with_shred_count( &self, slot: u64, - mut start_index: u64, - ) -> Result<(Vec, usize, u64, u64)> { - let mut useful_time = 0; - let mut wasted_time = 0; - - let mut all_entries = vec![]; - let mut num_shreds = 0; - loop { - let now = Instant::now(); - let mut res = self.get_entries_in_data_block(slot, &mut start_index); - let elapsed = now.elapsed().as_micros(); - - if let Ok((ref mut entries, new_num_shreds)) = res { - if !entries.is_empty() { - all_entries.append(entries); - num_shreds += new_num_shreds; - useful_time += elapsed; - continue; - } - } - - // All unsuccessful cases (errors, incomplete data blocks) will count as wasted work - wasted_time += elapsed; - res?; - break; + start_index: u64, + ) -> Result<(Vec, usize)> { + let slot_meta_cf = self.db.column::(); + let slot_meta = slot_meta_cf.get(slot)?; + if slot_meta.is_none() { + return Ok((vec![], 0)); } - trace!("Found {:?} entries", all_entries.len()); - Ok(( - all_entries, - num_shreds, - useful_time as u64, - wasted_time as u64, - )) + let slot_meta = slot_meta.unwrap(); + // Find all the ranges for the completed data blocks + let completed_ranges = Self::get_completed_data_ranges( + start_index as u32, + &slot_meta.completed_data_indexes[..], + slot_meta.consumed as u32, + ); + if completed_ranges.is_empty() { + return Ok((vec![], 0)); + } + let num_shreds = completed_ranges + .last() + .map(|(_, end_index)| u64::from(*end_index) - start_index + 1); + + let all_entries: Result>> = PAR_THREAD_POOL.with(|thread_pool| { + thread_pool.borrow().install(|| { + completed_ranges + .par_iter() + .map(|(start_index, end_index)| { + self.get_entries_in_data_block(slot, *start_index, *end_index) + }) + .collect() + }) + }); + + let all_entries: Vec = all_entries?.into_iter().flatten().collect(); + Ok((all_entries, num_shreds.unwrap_or(0) as usize)) } - pub fn get_entries_in_data_block( - &self, - slot: u64, - start_index: &mut u64, - ) -> Result<(Vec, usize)> { - let mut shred_chunk: Vec = vec![]; - let data_shred_cf = self.db.column::(); - while let Some(serialized_shred) = data_shred_cf.get_bytes((slot, *start_index))? { - *start_index += 1; - let new_shred = Shred::new_from_serialized_shred(serialized_shred).ok(); - if let Some(shred) = new_shred { - let is_complete = shred.data_complete() || shred.last_in_slot(); - shred_chunk.push(shred); - if is_complete { - if let Ok(deshred_payload) = Shredder::deshred(&shred_chunk) { - debug!("{:?} shreds in last FEC set", shred_chunk.len(),); - let entries: Vec = - bincode::deserialize(&deshred_payload).map_err(|_| { - BlocktreeError::InvalidShredData(Box::new( - bincode::ErrorKind::Custom( - "could not construct entries".to_string(), - ), - )) - })?; - return Ok((entries, shred_chunk.len())); - } else { - debug!("Failed in deshredding shred payloads"); - break; - } - } - } else { - // Didn't find a valid shred, this slot is dead. - // TODO: Mark as dead, but have to carefully handle last shred of interrupted - // slots. - break; + // Get the range of indexes [start_index, end_index] of every completed data block + fn get_completed_data_ranges( + mut start_index: u32, + completed_data_end_indexes: &[u32], + consumed: u32, + ) -> Vec<(u32, u32)> { + let mut completed_data_ranges = vec![]; + let floor = completed_data_end_indexes + .iter() + .position(|i| *i >= start_index) + .unwrap_or_else(|| completed_data_end_indexes.len()); + + for i in &completed_data_end_indexes[floor as usize..] { + // `consumed` is the next missing shred index, but shred `i` existing in + // completed_data_end_indexes implies it's not missing + assert!(*i != consumed); + + if *i < consumed { + completed_data_ranges.push((start_index, *i)); + start_index = *i + 1; } } - Ok((vec![], 0)) + completed_data_ranges + } + + fn get_entries_in_data_block( + &self, + slot: u64, + start_index: u32, + end_index: u32, + ) -> Result> { + let data_shred_cf = self.db.column::(); + + // Short circuit on first error + let data_shreds: Result> = (start_index..=end_index) + .map(|i| { + data_shred_cf + .get_bytes((slot, u64::from(i))) + .and_then(|serialized_shred| { + Shred::new_from_serialized_shred( + serialized_shred + .expect("Shred must exist if shred index was included in a range"), + ) + .map_err(|_| { + BlocktreeError::InvalidShredData(Box::new(bincode::ErrorKind::Custom( + "Could not reconstruct shred from shred payload".to_string(), + ))) + }) + }) + }) + .collect(); + + let data_shreds = data_shreds?; + assert!(data_shreds.last().unwrap().data_complete()); + + let deshred_payload = Shredder::deshred(&data_shreds).map_err(|_| { + BlocktreeError::InvalidShredData(Box::new(bincode::ErrorKind::Custom( + "Could not reconstruct data block from constituent shreds".to_string(), + ))) + })?; + + debug!("{:?} shreds in last FEC set", data_shreds.len(),); + bincode::deserialize::>(&deshred_payload).map_err(|_| { + BlocktreeError::InvalidShredData(Box::new(bincode::ErrorKind::Custom( + "could not reconstruct entries".to_string(), + ))) + }) } // Returns slots connecting to any element of the list `slots`. @@ -1198,20 +1249,21 @@ impl Blocktree { fn update_slot_meta( is_last_in_slot: bool, + is_last_in_data: bool, slot_meta: &mut SlotMeta, - index: u64, + index: u32, new_consumed: u64, ) { // Index is zero-indexed, while the "received" height starts from 1, // so received = index + 1 for the same shred. - slot_meta.received = cmp::max(index + 1, slot_meta.received); + slot_meta.received = cmp::max((u64::from(index) + 1) as u64, slot_meta.received); slot_meta.consumed = new_consumed; slot_meta.last_index = { // If the last index in the slot hasn't been set before, then // set it to this shred index if slot_meta.last_index == std::u64::MAX { if is_last_in_slot { - index + u64::from(index) } else { std::u64::MAX } @@ -1219,6 +1271,16 @@ fn update_slot_meta( slot_meta.last_index } }; + + if is_last_in_slot || is_last_in_data { + let position = slot_meta + .completed_data_indexes + .iter() + .position(|completed_data_index| *completed_data_index > index) + .unwrap_or_else(|| slot_meta.completed_data_indexes.len()); + + slot_meta.completed_data_indexes.insert(position, index); + } } fn get_index_meta_entry<'a>( @@ -2045,7 +2107,8 @@ pub mod tests { #[test] fn test_insert_data_shreds_reverse() { - let num_entries = 10; + let num_shreds = 10; + let num_entries = max_ticks_per_n_shreds(num_shreds); let (mut shreds, entries) = make_slot_entries(0, 0, num_entries); let num_shreds = shreds.len() as u64; @@ -2938,7 +3001,7 @@ pub mod tests { shred .iter_mut() .enumerate() - .for_each(|(i, shred)| shred.set_index(slot as u32 + i as u32)); + .for_each(|(_, shred)| shred.set_index(0)); shreds.extend(shred); entries.extend(entry); } @@ -2956,16 +3019,16 @@ pub mod tests { for i in 0..num_entries - 1 { assert_eq!( - blocktree.get_slot_entries(i, i, None).unwrap()[0], + blocktree.get_slot_entries(i, 0, None).unwrap()[0], entries[i as usize] ); let meta = blocktree.meta(i).unwrap().unwrap(); - assert_eq!(meta.received, i + num_shreds_per_slot); - assert_eq!(meta.last_index, i + num_shreds_per_slot - 1); + assert_eq!(meta.received, 1); + assert_eq!(meta.last_index, 0); if i != 0 { assert_eq!(meta.parent_slot, i - 1); - assert_eq!(meta.consumed, 0); + assert_eq!(meta.consumed, 1); } else { assert_eq!(meta.parent_slot, 0); assert_eq!(meta.consumed, num_shreds_per_slot); @@ -3567,4 +3630,102 @@ pub mod tests { drop(blocktree); Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } + + #[test] + fn test_get_completed_data_ranges() { + let completed_data_end_indexes = vec![2, 4, 9, 11]; + + // Consumed is 1, which means we're missing shred with index 1, should return empty + let start_index = 0; + let consumed = 1; + assert_eq!( + Blocktree::get_completed_data_ranges( + start_index, + &completed_data_end_indexes[..], + consumed + ), + vec![] + ); + + let start_index = 0; + let consumed = 3; + assert_eq!( + Blocktree::get_completed_data_ranges( + start_index, + &completed_data_end_indexes[..], + consumed + ), + vec![(0, 2)] + ); + + // Test all possible ranges: + // + // `consumed == completed_data_end_indexes[j] + 1`, means we have all the shreds up to index + // `completed_data_end_indexes[j] + 1`. Thus the completed data blocks is everything in the + // range: + // [start_index, completed_data_end_indexes[j]] == + // [completed_data_end_indexes[i], completed_data_end_indexes[j]], + for i in 0..completed_data_end_indexes.len() { + for j in i..completed_data_end_indexes.len() { + let start_index = completed_data_end_indexes[i]; + let consumed = completed_data_end_indexes[j] + 1; + // When start_index == completed_data_end_indexes[i], then that means + // the shred with index == start_index is a single-shred data block, + // so the start index is the end index for that data block. + let mut expected = vec![(start_index, start_index)]; + expected.extend( + completed_data_end_indexes[i..=j] + .windows(2) + .map(|end_indexes| (end_indexes[0] + 1, end_indexes[1])), + ); + + assert_eq!( + Blocktree::get_completed_data_ranges( + start_index, + &completed_data_end_indexes[..], + consumed + ), + expected + ); + } + } + } + + #[test] + fn test_get_slot_entries_with_shred_count_corruption() { + let blocktree_path = + get_tmp_ledger_path("test_get_slot_entries_with_shred_count_corruption"); + { + let blocktree = Blocktree::open(&blocktree_path).unwrap(); + let num_ticks = 8; + let entries = create_ticks(num_ticks, Hash::default()); + let slot = 1; + let shreds = entries_to_test_shreds(entries, slot, 0, false); + let next_shred_index = shreds.len(); + blocktree + .insert_shreds(shreds, None) + .expect("Expected successful write of shreds"); + assert_eq!( + blocktree.get_slot_entries(slot, 0, None).unwrap().len() as u64, + num_ticks + ); + + // Insert an empty shred that won't deshred into entries + let shreds = vec![Shred::new_from_data( + slot, + next_shred_index as u32, + 1, + None, + true, + true, + )]; + + // With the corruption, nothing should be returned, even though an + // earlier data block was valid + blocktree + .insert_shreds(shreds, None) + .expect("Expected successful write of shreds"); + assert!(blocktree.get_slot_entries(slot, 0, None).is_err()); + } + } } diff --git a/ledger/src/blocktree/meta.rs b/ledger/src/blocktree/meta.rs index f65e71ab0..683eb0835 100644 --- a/ledger/src/blocktree/meta.rs +++ b/ledger/src/blocktree/meta.rs @@ -27,6 +27,8 @@ pub struct SlotMeta { // True if this slot is full (consumed == last_index + 1) and if every // slot that is a parent of this slot is also connected. pub is_connected: bool, + // List of start indexes for completed data slots + pub completed_data_indexes: Vec, } #[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] @@ -227,6 +229,7 @@ impl SlotMeta { next_slots: vec![], is_connected: slot == 0, last_index: std::u64::MAX, + completed_data_indexes: vec![], } } }