use { crate::{ bigtable_upload::{self, ConfirmedBlockUploadConfig}, blockstore::Blockstore, }, solana_runtime::commitment::BlockCommitmentCache, std::{ cmp::min, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, RwLock, }, thread::{self, Builder, JoinHandle}, }, tokio::runtime::Runtime, }; pub struct BigTableUploadService { thread: JoinHandle<()>, } impl BigTableUploadService { pub fn new( runtime: Arc, bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage, blockstore: Arc, block_commitment_cache: Arc>, max_complete_transaction_status_slot: Arc, exit: Arc, ) -> Self { Self::new_with_config( runtime, bigtable_ledger_storage, blockstore, block_commitment_cache, max_complete_transaction_status_slot, ConfirmedBlockUploadConfig::default(), exit, ) } pub fn new_with_config( runtime: Arc, bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage, blockstore: Arc, block_commitment_cache: Arc>, max_complete_transaction_status_slot: Arc, config: ConfirmedBlockUploadConfig, exit: Arc, ) -> Self { info!("Starting BigTable upload service"); let thread = Builder::new() .name("solBigTUpload".to_string()) .spawn(move || { Self::run( runtime, bigtable_ledger_storage, blockstore, block_commitment_cache, max_complete_transaction_status_slot, config, exit, ) }) .unwrap(); Self { thread } } fn run( runtime: Arc, bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage, blockstore: Arc, block_commitment_cache: Arc>, max_complete_transaction_status_slot: Arc, config: ConfirmedBlockUploadConfig, exit: Arc, ) { let mut start_slot = blockstore.get_first_available_block().unwrap_or_default(); loop { if exit.load(Ordering::Relaxed) { break; } // The highest slot eligible for upload is the highest root that has complete // transaction-status metadata let highest_complete_root = min( max_complete_transaction_status_slot.load(Ordering::SeqCst), block_commitment_cache.read().unwrap().root(), ); let end_slot = min( highest_complete_root, start_slot.saturating_add(config.max_num_slots_to_check as u64 * 2), ); if end_slot <= start_slot { std::thread::sleep(std::time::Duration::from_secs(1)); continue; } let result = runtime.block_on(bigtable_upload::upload_confirmed_blocks( blockstore.clone(), bigtable_ledger_storage.clone(), start_slot, end_slot, config.clone(), exit.clone(), )); match result { Ok(last_slot_uploaded) => start_slot = last_slot_uploaded, Err(err) => { warn!("bigtable: upload_confirmed_blocks: {}", err); std::thread::sleep(std::time::Duration::from_secs(2)); if start_slot == 0 { start_slot = blockstore.get_first_available_block().unwrap_or_default(); } } } } } pub fn join(self) -> thread::Result<()> { self.thread.join() } }