From bc005e3408d0ca19f1db7f935d4e0087884d56e8 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Fri, 13 May 2022 01:34:02 -0600 Subject: [PATCH] Add configurable limit to number of blocks to check before Bigtable upload (#24716) * Add ConfirmedBlockUploadConfig, no behavior changes * Add comment * A little DRY cleanup * Add configurable limit to number of blocks to check in Blockstore and Bigtable before uploading * Limit blockstore and bigtable look-ahead * Exit iterator early when reach ending_slot * Use rooted_slot_iterator instead of slot_meta_iterator * Only check blocks in the ledger --- ledger-tool/src/bigtable.rs | 15 +++++- ledger/src/bigtable_upload.rs | 67 ++++++++++++++++++--------- ledger/src/bigtable_upload_service.rs | 41 ++++++++++++++-- rpc/src/rpc_service.rs | 4 +- 4 files changed, 96 insertions(+), 31 deletions(-) diff --git a/ledger-tool/src/bigtable.rs b/ledger-tool/src/bigtable.rs index 7796b2a55..da2726d8f 100644 --- a/ledger-tool/src/bigtable.rs +++ b/ledger-tool/src/bigtable.rs @@ -14,7 +14,10 @@ use { display::println_transaction, CliBlock, CliTransaction, CliTransactionConfirmation, OutputFormat, }, - solana_ledger::{blockstore::Blockstore, blockstore_db::AccessType}, + solana_ledger::{ + bigtable_upload::ConfirmedBlockUploadConfig, blockstore::Blockstore, + blockstore_db::AccessType, + }, solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}, solana_storage_bigtable::CredentialType, solana_transaction_status::{ @@ -41,15 +44,23 @@ async fn upload( .await .map_err(|err| format!("Failed to connect to storage: {:?}", err))?; + let config = ConfirmedBlockUploadConfig { + force_reupload, + ..ConfirmedBlockUploadConfig::default() + }; + solana_ledger::bigtable_upload::upload_confirmed_blocks( Arc::new(blockstore), bigtable, starting_slot, ending_slot, - force_reupload, + config, Arc::new(AtomicBool::new(false)), ) .await + .map(|last_slot_uploaded| { + info!("last slot uploaded: {}", last_slot_uploaded); + }) } async fn delete_slots( diff --git a/ledger/src/bigtable_upload.rs b/ledger/src/bigtable_upload.rs index 8e4f0350f..f2481351f 100644 --- a/ledger/src/bigtable_upload.rs +++ b/ledger/src/bigtable_upload.rs @@ -5,6 +5,7 @@ use { solana_measure::measure::Measure, solana_sdk::clock::Slot, std::{ + cmp::min, collections::HashSet, result::Result, sync::{ @@ -15,32 +16,46 @@ use { }, }; -// Attempt to upload this many blocks in parallel -const NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL: usize = 32; +#[derive(Clone)] +pub struct ConfirmedBlockUploadConfig { + pub force_reupload: bool, + pub max_num_slots_to_check: usize, + pub num_blocks_to_upload_in_parallel: usize, + pub block_read_ahead_depth: usize, // should always be >= `num_blocks_to_upload_in_parallel` +} -// Read up to this many blocks from blockstore before blocking on the upload process -const BLOCK_READ_AHEAD_DEPTH: usize = NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 2; +impl Default for ConfirmedBlockUploadConfig { + fn default() -> Self { + const NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL: usize = 32; + ConfirmedBlockUploadConfig { + force_reupload: false, + max_num_slots_to_check: NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 4, + num_blocks_to_upload_in_parallel: NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL, + block_read_ahead_depth: NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 2, + } + } +} pub async fn upload_confirmed_blocks( blockstore: Arc, bigtable: solana_storage_bigtable::LedgerStorage, starting_slot: Slot, ending_slot: Option, - force_reupload: bool, + config: ConfirmedBlockUploadConfig, exit: Arc, -) -> Result<(), Box> { +) -> Result> { let mut measure = Measure::start("entire upload"); info!("Loading ledger slots starting at {}...", starting_slot); let blockstore_slots: Vec<_> = blockstore - .slot_meta_iterator(starting_slot) + .rooted_slot_iterator(starting_slot) .map_err(|err| { format!( "Failed to load entries starting from slot {}: {:?}", starting_slot, err ) })? - .filter_map(|(slot, _slot_meta)| { + .map_while(|slot| { if let Some(ending_slot) = &ending_slot { if slot > *ending_slot { return None; @@ -58,27 +73,31 @@ pub async fn upload_confirmed_blocks( .into()); } + let first_blockstore_slot = blockstore_slots.first().unwrap(); + let last_blockstore_slot = blockstore_slots.last().unwrap(); info!( "Found {} slots in the range ({}, {})", blockstore_slots.len(), - blockstore_slots.first().unwrap(), - blockstore_slots.last().unwrap() + first_blockstore_slot, + last_blockstore_slot, ); // Gather the blocks that are already present in bigtable, by slot - let bigtable_slots = if !force_reupload { + let bigtable_slots = if !config.force_reupload { let mut bigtable_slots = vec![]; - let first_blockstore_slot = *blockstore_slots.first().unwrap(); - let last_blockstore_slot = *blockstore_slots.last().unwrap(); info!( "Loading list of bigtable blocks between slots {} and {}...", first_blockstore_slot, last_blockstore_slot ); - let mut start_slot = *blockstore_slots.first().unwrap(); - while start_slot <= last_blockstore_slot { + let mut start_slot = *first_blockstore_slot; + while start_slot <= *last_blockstore_slot { let mut next_bigtable_slots = loop { - match bigtable.get_confirmed_blocks(start_slot, 1000).await { + let num_bigtable_blocks = min(1000, config.max_num_slots_to_check * 2); + match bigtable + .get_confirmed_blocks(start_slot, num_bigtable_blocks) + .await + { Ok(slots) => break slots, Err(err) => { error!("get_confirmed_blocks for {} failed: {:?}", start_slot, err); @@ -95,7 +114,7 @@ pub async fn upload_confirmed_blocks( } bigtable_slots .into_iter() - .filter(|slot| *slot <= last_blockstore_slot) + .filter(|slot| slot <= last_blockstore_slot) .collect::>() } else { Vec::new() @@ -112,25 +131,27 @@ pub async fn upload_confirmed_blocks( .cloned() .collect::>(); blocks_to_upload.sort_unstable(); + blocks_to_upload.truncate(config.max_num_slots_to_check); blocks_to_upload }; if blocks_to_upload.is_empty() { info!("No blocks need to be uploaded to bigtable"); - return Ok(()); + return Ok(*last_blockstore_slot); } + let last_slot = *blocks_to_upload.last().unwrap(); info!( "{} blocks to be uploaded to the bucket in the range ({}, {})", blocks_to_upload.len(), blocks_to_upload.first().unwrap(), - blocks_to_upload.last().unwrap() + last_slot ); // Load the blocks out of blockstore in a separate thread to allow for concurrent block uploading let (_loader_thread, receiver) = { let exit = exit.clone(); - let (sender, receiver) = bounded(BLOCK_READ_AHEAD_DEPTH); + let (sender, receiver) = bounded(config.block_read_ahead_depth); ( std::thread::spawn(move || { let mut measure = Measure::start("block loader thread"); @@ -150,7 +171,7 @@ pub async fn upload_confirmed_blocks( } }; - if i > 0 && i % NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL == 0 { + if i > 0 && i % config.num_blocks_to_upload_in_parallel == 0 { info!( "{}% of blocks processed ({}/{})", i * 100 / blocks_to_upload.len(), @@ -170,7 +191,7 @@ pub async fn upload_confirmed_blocks( use futures::stream::StreamExt; let mut stream = - tokio_stream::iter(receiver.into_iter()).chunks(NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL); + tokio_stream::iter(receiver.into_iter()).chunks(config.num_blocks_to_upload_in_parallel); while let Some(blocks) = stream.next().await { if exit.load(Ordering::Relaxed) { @@ -205,6 +226,6 @@ pub async fn upload_confirmed_blocks( if failures > 0 { Err(format!("Incomplete upload, {} operations failed", failures).into()) } else { - Ok(()) + Ok(last_slot) } } diff --git a/ledger/src/bigtable_upload_service.rs b/ledger/src/bigtable_upload_service.rs index bf9eb6085..192783c9b 100644 --- a/ledger/src/bigtable_upload_service.rs +++ b/ledger/src/bigtable_upload_service.rs @@ -1,5 +1,8 @@ use { - crate::{bigtable_upload, blockstore::Blockstore}, + crate::{ + bigtable_upload::{self, ConfirmedBlockUploadConfig}, + blockstore::Blockstore, + }, solana_runtime::commitment::BlockCommitmentCache, std::{ cmp::min, @@ -24,6 +27,26 @@ impl BigTableUploadService { block_commitment_cache: Arc>, max_complete_transaction_status_slot: Arc, exit: Arc, + ) -> Self { + Self::new_with_config( + runtime, + bigtable_ledger_storage, + blockstore, + block_commitment_cache, + max_complete_transaction_status_slot, + ConfirmedBlockUploadConfig::default(), + exit, + ) + } + + pub fn new_with_config( + runtime: Arc, + bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage, + blockstore: Arc, + block_commitment_cache: Arc>, + max_complete_transaction_status_slot: Arc, + config: ConfirmedBlockUploadConfig, + exit: Arc, ) -> Self { info!("Starting BigTable upload service"); let thread = Builder::new() @@ -35,6 +58,7 @@ impl BigTableUploadService { blockstore, block_commitment_cache, max_complete_transaction_status_slot, + config, exit, ) }) @@ -49,18 +73,25 @@ impl BigTableUploadService { blockstore: Arc, block_commitment_cache: Arc>, max_complete_transaction_status_slot: Arc, + config: ConfirmedBlockUploadConfig, exit: Arc, ) { - let mut start_slot = 0; + let mut start_slot = blockstore.get_first_available_block().unwrap_or_default(); loop { if exit.load(Ordering::Relaxed) { break; } - let end_slot = min( + // The highest slot eligible for upload is the highest root that has complete + // transaction-status metadata + let highest_complete_root = min( max_complete_transaction_status_slot.load(Ordering::SeqCst), block_commitment_cache.read().unwrap().root(), ); + let end_slot = min( + highest_complete_root, + start_slot.saturating_add(config.max_num_slots_to_check as u64 * 2), + ); if end_slot <= start_slot { std::thread::sleep(std::time::Duration::from_secs(1)); @@ -72,12 +103,12 @@ impl BigTableUploadService { bigtable_ledger_storage.clone(), start_slot, Some(end_slot), - false, + config.clone(), exit.clone(), )); match result { - Ok(()) => start_slot = end_slot, + Ok(last_slot_uploaded) => start_slot = last_slot_uploaded, Err(err) => { warn!("bigtable: upload_confirmed_blocks: {}", err); std::thread::sleep(std::time::Duration::from_secs(2)); diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index 97eb71b95..67165253a 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -21,6 +21,7 @@ use { solana_client::rpc_cache::LargestAccountsCache, solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ + bigtable_upload::ConfirmedBlockUploadConfig, bigtable_upload_service::BigTableUploadService, blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache, }, @@ -410,12 +411,13 @@ impl JsonRpcService { info!("BigTable ledger storage initialized"); let bigtable_ledger_upload_service = if enable_bigtable_ledger_upload { - Some(Arc::new(BigTableUploadService::new( + Some(Arc::new(BigTableUploadService::new_with_config( runtime.clone(), bigtable_ledger_storage.clone(), blockstore.clone(), block_commitment_cache.clone(), current_transaction_status_slot.clone(), + ConfirmedBlockUploadConfig::default(), exit_bigtable_ledger_upload_service.clone(), ))) } else {