Add ledger-tool bigtable upload loop (#26030)
* Add ledger-tool bigtable upload loop * Limit range on caller side, switch to while loop, and remove now-obsolete option
This commit is contained in:
parent
62ff54d04e
commit
2866ca4b1c
|
@ -25,6 +25,7 @@ use {
|
||||||
UiTransactionEncoding,
|
UiTransactionEncoding,
|
||||||
},
|
},
|
||||||
std::{
|
std::{
|
||||||
|
cmp::min,
|
||||||
collections::HashSet,
|
collections::HashSet,
|
||||||
path::Path,
|
path::Path,
|
||||||
process::exit,
|
process::exit,
|
||||||
|
@ -36,7 +37,7 @@ use {
|
||||||
|
|
||||||
async fn upload(
|
async fn upload(
|
||||||
blockstore: Blockstore,
|
blockstore: Blockstore,
|
||||||
starting_slot: Slot,
|
mut starting_slot: Slot,
|
||||||
ending_slot: Option<Slot>,
|
ending_slot: Option<Slot>,
|
||||||
force_reupload: bool,
|
force_reupload: bool,
|
||||||
config: solana_storage_bigtable::LedgerStorageConfig,
|
config: solana_storage_bigtable::LedgerStorageConfig,
|
||||||
|
@ -49,19 +50,29 @@ async fn upload(
|
||||||
force_reupload,
|
force_reupload,
|
||||||
..ConfirmedBlockUploadConfig::default()
|
..ConfirmedBlockUploadConfig::default()
|
||||||
};
|
};
|
||||||
|
let blockstore = Arc::new(blockstore);
|
||||||
|
|
||||||
solana_ledger::bigtable_upload::upload_confirmed_blocks(
|
let ending_slot = ending_slot.unwrap_or_else(|| blockstore.last_root());
|
||||||
Arc::new(blockstore),
|
|
||||||
bigtable,
|
while starting_slot <= ending_slot {
|
||||||
starting_slot,
|
let current_ending_slot = min(
|
||||||
ending_slot,
|
ending_slot,
|
||||||
config,
|
starting_slot.saturating_add(config.max_num_slots_to_check as u64 * 2),
|
||||||
Arc::new(AtomicBool::new(false)),
|
);
|
||||||
)
|
let last_slot_uploaded = solana_ledger::bigtable_upload::upload_confirmed_blocks(
|
||||||
.await
|
blockstore.clone(),
|
||||||
.map(|last_slot_uploaded| {
|
bigtable.clone(),
|
||||||
|
starting_slot,
|
||||||
|
current_ending_slot,
|
||||||
|
config.clone(),
|
||||||
|
Arc::new(AtomicBool::new(false)),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
info!("last slot uploaded: {}", last_slot_uploaded);
|
info!("last slot uploaded: {}", last_slot_uploaded);
|
||||||
})
|
starting_slot = last_slot_uploaded.saturating_add(1);
|
||||||
|
}
|
||||||
|
info!("No more blocks to upload.");
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn delete_slots(
|
async fn delete_slots(
|
||||||
|
|
|
@ -45,7 +45,7 @@ pub async fn upload_confirmed_blocks(
|
||||||
blockstore: Arc<Blockstore>,
|
blockstore: Arc<Blockstore>,
|
||||||
bigtable: solana_storage_bigtable::LedgerStorage,
|
bigtable: solana_storage_bigtable::LedgerStorage,
|
||||||
starting_slot: Slot,
|
starting_slot: Slot,
|
||||||
ending_slot: Option<Slot>,
|
ending_slot: Slot,
|
||||||
config: ConfirmedBlockUploadConfig,
|
config: ConfirmedBlockUploadConfig,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
) -> Result<Slot, Box<dyn std::error::Error>> {
|
) -> Result<Slot, Box<dyn std::error::Error>> {
|
||||||
|
@ -60,14 +60,7 @@ pub async fn upload_confirmed_blocks(
|
||||||
starting_slot, err
|
starting_slot, err
|
||||||
)
|
)
|
||||||
})?
|
})?
|
||||||
.map_while(|slot| {
|
.map_while(|slot| (slot <= ending_slot).then(|| slot))
|
||||||
if let Some(ending_slot) = &ending_slot {
|
|
||||||
if slot > *ending_slot {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Some(slot)
|
|
||||||
})
|
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
if blockstore_slots.is_empty() {
|
if blockstore_slots.is_empty() {
|
||||||
|
|
|
@ -102,7 +102,7 @@ impl BigTableUploadService {
|
||||||
blockstore.clone(),
|
blockstore.clone(),
|
||||||
bigtable_ledger_storage.clone(),
|
bigtable_ledger_storage.clone(),
|
||||||
start_slot,
|
start_slot,
|
||||||
Some(end_slot),
|
end_slot,
|
||||||
config.clone(),
|
config.clone(),
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
));
|
));
|
||||||
|
|
Loading…
Reference in New Issue