Add configurable limit to number of blocks to check before Bigtable upload (#24716)

* Add ConfirmedBlockUploadConfig, no behavior changes

* Add comment

* A little DRY cleanup

* Add configurable limit to number of blocks to check in Blockstore and Bigtable before uploading

* Limit blockstore and bigtable look-ahead

* Exit iterator early when reach ending_slot

* Use rooted_slot_iterator instead of slot_meta_iterator

* Only check blocks in the ledger
This commit is contained in:
Tyera Eulberg 2022-05-13 01:34:02 -06:00 committed by GitHub
parent 71dd95e842
commit bc005e3408
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 96 additions and 31 deletions

View File

@ -14,7 +14,10 @@ use {
display::println_transaction, CliBlock, CliTransaction, CliTransactionConfirmation, display::println_transaction, CliBlock, CliTransaction, CliTransactionConfirmation,
OutputFormat, OutputFormat,
}, },
solana_ledger::{blockstore::Blockstore, blockstore_db::AccessType}, solana_ledger::{
bigtable_upload::ConfirmedBlockUploadConfig, blockstore::Blockstore,
blockstore_db::AccessType,
},
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}, solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature},
solana_storage_bigtable::CredentialType, solana_storage_bigtable::CredentialType,
solana_transaction_status::{ solana_transaction_status::{
@ -41,15 +44,23 @@ async fn upload(
.await .await
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?; .map_err(|err| format!("Failed to connect to storage: {:?}", err))?;
let config = ConfirmedBlockUploadConfig {
force_reupload,
..ConfirmedBlockUploadConfig::default()
};
solana_ledger::bigtable_upload::upload_confirmed_blocks( solana_ledger::bigtable_upload::upload_confirmed_blocks(
Arc::new(blockstore), Arc::new(blockstore),
bigtable, bigtable,
starting_slot, starting_slot,
ending_slot, ending_slot,
force_reupload, config,
Arc::new(AtomicBool::new(false)), Arc::new(AtomicBool::new(false)),
) )
.await .await
.map(|last_slot_uploaded| {
info!("last slot uploaded: {}", last_slot_uploaded);
})
} }
async fn delete_slots( async fn delete_slots(

View File

@ -5,6 +5,7 @@ use {
solana_measure::measure::Measure, solana_measure::measure::Measure,
solana_sdk::clock::Slot, solana_sdk::clock::Slot,
std::{ std::{
cmp::min,
collections::HashSet, collections::HashSet,
result::Result, result::Result,
sync::{ sync::{
@ -15,32 +16,46 @@ use {
}, },
}; };
// Attempt to upload this many blocks in parallel #[derive(Clone)]
const NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL: usize = 32; pub struct ConfirmedBlockUploadConfig {
pub force_reupload: bool,
pub max_num_slots_to_check: usize,
pub num_blocks_to_upload_in_parallel: usize,
pub block_read_ahead_depth: usize, // should always be >= `num_blocks_to_upload_in_parallel`
}
// Read up to this many blocks from blockstore before blocking on the upload process impl Default for ConfirmedBlockUploadConfig {
const BLOCK_READ_AHEAD_DEPTH: usize = NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 2; fn default() -> Self {
const NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL: usize = 32;
ConfirmedBlockUploadConfig {
force_reupload: false,
max_num_slots_to_check: NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 4,
num_blocks_to_upload_in_parallel: NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL,
block_read_ahead_depth: NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 2,
}
}
}
pub async fn upload_confirmed_blocks( pub async fn upload_confirmed_blocks(
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
bigtable: solana_storage_bigtable::LedgerStorage, bigtable: solana_storage_bigtable::LedgerStorage,
starting_slot: Slot, starting_slot: Slot,
ending_slot: Option<Slot>, ending_slot: Option<Slot>,
force_reupload: bool, config: ConfirmedBlockUploadConfig,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<Slot, Box<dyn std::error::Error>> {
let mut measure = Measure::start("entire upload"); let mut measure = Measure::start("entire upload");
info!("Loading ledger slots starting at {}...", starting_slot); info!("Loading ledger slots starting at {}...", starting_slot);
let blockstore_slots: Vec<_> = blockstore let blockstore_slots: Vec<_> = blockstore
.slot_meta_iterator(starting_slot) .rooted_slot_iterator(starting_slot)
.map_err(|err| { .map_err(|err| {
format!( format!(
"Failed to load entries starting from slot {}: {:?}", "Failed to load entries starting from slot {}: {:?}",
starting_slot, err starting_slot, err
) )
})? })?
.filter_map(|(slot, _slot_meta)| { .map_while(|slot| {
if let Some(ending_slot) = &ending_slot { if let Some(ending_slot) = &ending_slot {
if slot > *ending_slot { if slot > *ending_slot {
return None; return None;
@ -58,27 +73,31 @@ pub async fn upload_confirmed_blocks(
.into()); .into());
} }
let first_blockstore_slot = blockstore_slots.first().unwrap();
let last_blockstore_slot = blockstore_slots.last().unwrap();
info!( info!(
"Found {} slots in the range ({}, {})", "Found {} slots in the range ({}, {})",
blockstore_slots.len(), blockstore_slots.len(),
blockstore_slots.first().unwrap(), first_blockstore_slot,
blockstore_slots.last().unwrap() last_blockstore_slot,
); );
// Gather the blocks that are already present in bigtable, by slot // Gather the blocks that are already present in bigtable, by slot
let bigtable_slots = if !force_reupload { let bigtable_slots = if !config.force_reupload {
let mut bigtable_slots = vec![]; let mut bigtable_slots = vec![];
let first_blockstore_slot = *blockstore_slots.first().unwrap();
let last_blockstore_slot = *blockstore_slots.last().unwrap();
info!( info!(
"Loading list of bigtable blocks between slots {} and {}...", "Loading list of bigtable blocks between slots {} and {}...",
first_blockstore_slot, last_blockstore_slot first_blockstore_slot, last_blockstore_slot
); );
let mut start_slot = *blockstore_slots.first().unwrap(); let mut start_slot = *first_blockstore_slot;
while start_slot <= last_blockstore_slot { while start_slot <= *last_blockstore_slot {
let mut next_bigtable_slots = loop { let mut next_bigtable_slots = loop {
match bigtable.get_confirmed_blocks(start_slot, 1000).await { let num_bigtable_blocks = min(1000, config.max_num_slots_to_check * 2);
match bigtable
.get_confirmed_blocks(start_slot, num_bigtable_blocks)
.await
{
Ok(slots) => break slots, Ok(slots) => break slots,
Err(err) => { Err(err) => {
error!("get_confirmed_blocks for {} failed: {:?}", start_slot, err); error!("get_confirmed_blocks for {} failed: {:?}", start_slot, err);
@ -95,7 +114,7 @@ pub async fn upload_confirmed_blocks(
} }
bigtable_slots bigtable_slots
.into_iter() .into_iter()
.filter(|slot| *slot <= last_blockstore_slot) .filter(|slot| slot <= last_blockstore_slot)
.collect::<Vec<_>>() .collect::<Vec<_>>()
} else { } else {
Vec::new() Vec::new()
@ -112,25 +131,27 @@ pub async fn upload_confirmed_blocks(
.cloned() .cloned()
.collect::<Vec<_>>(); .collect::<Vec<_>>();
blocks_to_upload.sort_unstable(); blocks_to_upload.sort_unstable();
blocks_to_upload.truncate(config.max_num_slots_to_check);
blocks_to_upload blocks_to_upload
}; };
if blocks_to_upload.is_empty() { if blocks_to_upload.is_empty() {
info!("No blocks need to be uploaded to bigtable"); info!("No blocks need to be uploaded to bigtable");
return Ok(()); return Ok(*last_blockstore_slot);
} }
let last_slot = *blocks_to_upload.last().unwrap();
info!( info!(
"{} blocks to be uploaded to the bucket in the range ({}, {})", "{} blocks to be uploaded to the bucket in the range ({}, {})",
blocks_to_upload.len(), blocks_to_upload.len(),
blocks_to_upload.first().unwrap(), blocks_to_upload.first().unwrap(),
blocks_to_upload.last().unwrap() last_slot
); );
// Load the blocks out of blockstore in a separate thread to allow for concurrent block uploading // Load the blocks out of blockstore in a separate thread to allow for concurrent block uploading
let (_loader_thread, receiver) = { let (_loader_thread, receiver) = {
let exit = exit.clone(); let exit = exit.clone();
let (sender, receiver) = bounded(BLOCK_READ_AHEAD_DEPTH); let (sender, receiver) = bounded(config.block_read_ahead_depth);
( (
std::thread::spawn(move || { std::thread::spawn(move || {
let mut measure = Measure::start("block loader thread"); let mut measure = Measure::start("block loader thread");
@ -150,7 +171,7 @@ pub async fn upload_confirmed_blocks(
} }
}; };
if i > 0 && i % NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL == 0 { if i > 0 && i % config.num_blocks_to_upload_in_parallel == 0 {
info!( info!(
"{}% of blocks processed ({}/{})", "{}% of blocks processed ({}/{})",
i * 100 / blocks_to_upload.len(), i * 100 / blocks_to_upload.len(),
@ -170,7 +191,7 @@ pub async fn upload_confirmed_blocks(
use futures::stream::StreamExt; use futures::stream::StreamExt;
let mut stream = let mut stream =
tokio_stream::iter(receiver.into_iter()).chunks(NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL); tokio_stream::iter(receiver.into_iter()).chunks(config.num_blocks_to_upload_in_parallel);
while let Some(blocks) = stream.next().await { while let Some(blocks) = stream.next().await {
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
@ -205,6 +226,6 @@ pub async fn upload_confirmed_blocks(
if failures > 0 { if failures > 0 {
Err(format!("Incomplete upload, {} operations failed", failures).into()) Err(format!("Incomplete upload, {} operations failed", failures).into())
} else { } else {
Ok(()) Ok(last_slot)
} }
} }

View File

@ -1,5 +1,8 @@
use { use {
crate::{bigtable_upload, blockstore::Blockstore}, crate::{
bigtable_upload::{self, ConfirmedBlockUploadConfig},
blockstore::Blockstore,
},
solana_runtime::commitment::BlockCommitmentCache, solana_runtime::commitment::BlockCommitmentCache,
std::{ std::{
cmp::min, cmp::min,
@ -24,6 +27,26 @@ impl BigTableUploadService {
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>, block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
max_complete_transaction_status_slot: Arc<AtomicU64>, max_complete_transaction_status_slot: Arc<AtomicU64>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> Self {
Self::new_with_config(
runtime,
bigtable_ledger_storage,
blockstore,
block_commitment_cache,
max_complete_transaction_status_slot,
ConfirmedBlockUploadConfig::default(),
exit,
)
}
pub fn new_with_config(
runtime: Arc<Runtime>,
bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage,
blockstore: Arc<Blockstore>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
max_complete_transaction_status_slot: Arc<AtomicU64>,
config: ConfirmedBlockUploadConfig,
exit: Arc<AtomicBool>,
) -> Self { ) -> Self {
info!("Starting BigTable upload service"); info!("Starting BigTable upload service");
let thread = Builder::new() let thread = Builder::new()
@ -35,6 +58,7 @@ impl BigTableUploadService {
blockstore, blockstore,
block_commitment_cache, block_commitment_cache,
max_complete_transaction_status_slot, max_complete_transaction_status_slot,
config,
exit, exit,
) )
}) })
@ -49,18 +73,25 @@ impl BigTableUploadService {
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>, block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
max_complete_transaction_status_slot: Arc<AtomicU64>, max_complete_transaction_status_slot: Arc<AtomicU64>,
config: ConfirmedBlockUploadConfig,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) { ) {
let mut start_slot = 0; let mut start_slot = blockstore.get_first_available_block().unwrap_or_default();
loop { loop {
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
break; break;
} }
let end_slot = min( // The highest slot eligible for upload is the highest root that has complete
// transaction-status metadata
let highest_complete_root = min(
max_complete_transaction_status_slot.load(Ordering::SeqCst), max_complete_transaction_status_slot.load(Ordering::SeqCst),
block_commitment_cache.read().unwrap().root(), block_commitment_cache.read().unwrap().root(),
); );
let end_slot = min(
highest_complete_root,
start_slot.saturating_add(config.max_num_slots_to_check as u64 * 2),
);
if end_slot <= start_slot { if end_slot <= start_slot {
std::thread::sleep(std::time::Duration::from_secs(1)); std::thread::sleep(std::time::Duration::from_secs(1));
@ -72,12 +103,12 @@ impl BigTableUploadService {
bigtable_ledger_storage.clone(), bigtable_ledger_storage.clone(),
start_slot, start_slot,
Some(end_slot), Some(end_slot),
false, config.clone(),
exit.clone(), exit.clone(),
)); ));
match result { match result {
Ok(()) => start_slot = end_slot, Ok(last_slot_uploaded) => start_slot = last_slot_uploaded,
Err(err) => { Err(err) => {
warn!("bigtable: upload_confirmed_blocks: {}", err); warn!("bigtable: upload_confirmed_blocks: {}", err);
std::thread::sleep(std::time::Duration::from_secs(2)); std::thread::sleep(std::time::Duration::from_secs(2));

View File

@ -21,6 +21,7 @@ use {
solana_client::rpc_cache::LargestAccountsCache, solana_client::rpc_cache::LargestAccountsCache,
solana_gossip::cluster_info::ClusterInfo, solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{ solana_ledger::{
bigtable_upload::ConfirmedBlockUploadConfig,
bigtable_upload_service::BigTableUploadService, blockstore::Blockstore, bigtable_upload_service::BigTableUploadService, blockstore::Blockstore,
leader_schedule_cache::LeaderScheduleCache, leader_schedule_cache::LeaderScheduleCache,
}, },
@ -410,12 +411,13 @@ impl JsonRpcService {
info!("BigTable ledger storage initialized"); info!("BigTable ledger storage initialized");
let bigtable_ledger_upload_service = if enable_bigtable_ledger_upload { let bigtable_ledger_upload_service = if enable_bigtable_ledger_upload {
Some(Arc::new(BigTableUploadService::new( Some(Arc::new(BigTableUploadService::new_with_config(
runtime.clone(), runtime.clone(),
bigtable_ledger_storage.clone(), bigtable_ledger_storage.clone(),
blockstore.clone(), blockstore.clone(),
block_commitment_cache.clone(), block_commitment_cache.clone(),
current_transaction_status_slot.clone(), current_transaction_status_slot.clone(),
ConfirmedBlockUploadConfig::default(),
exit_bigtable_ledger_upload_service.clone(), exit_bigtable_ledger_upload_service.clone(),
))) )))
} else { } else {