93 lines
2.7 KiB
Rust
93 lines
2.7 KiB
Rust
use {
|
|
crate::{bigtable_upload, blockstore::Blockstore},
|
|
solana_runtime::commitment::BlockCommitmentCache,
|
|
std::{
|
|
cmp::min,
|
|
sync::{
|
|
atomic::{AtomicBool, AtomicU64, Ordering},
|
|
Arc, RwLock,
|
|
},
|
|
thread::{self, Builder, JoinHandle},
|
|
},
|
|
tokio::runtime::Runtime,
|
|
};
|
|
|
|
pub struct BigTableUploadService {
|
|
thread: JoinHandle<()>,
|
|
}
|
|
|
|
impl BigTableUploadService {
|
|
pub fn new(
|
|
runtime: Arc<Runtime>,
|
|
bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage,
|
|
blockstore: Arc<Blockstore>,
|
|
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
|
|
max_complete_transaction_status_slot: Arc<AtomicU64>,
|
|
exit: Arc<AtomicBool>,
|
|
) -> Self {
|
|
info!("Starting BigTable upload service");
|
|
let thread = Builder::new()
|
|
.name("bigtable-upload".to_string())
|
|
.spawn(move || {
|
|
Self::run(
|
|
runtime,
|
|
bigtable_ledger_storage,
|
|
blockstore,
|
|
block_commitment_cache,
|
|
max_complete_transaction_status_slot,
|
|
exit,
|
|
)
|
|
})
|
|
.unwrap();
|
|
|
|
Self { thread }
|
|
}
|
|
|
|
fn run(
|
|
runtime: Arc<Runtime>,
|
|
bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage,
|
|
blockstore: Arc<Blockstore>,
|
|
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
|
|
max_complete_transaction_status_slot: Arc<AtomicU64>,
|
|
exit: Arc<AtomicBool>,
|
|
) {
|
|
let mut start_slot = 0;
|
|
loop {
|
|
if exit.load(Ordering::Relaxed) {
|
|
break;
|
|
}
|
|
|
|
let end_slot = min(
|
|
max_complete_transaction_status_slot.load(Ordering::SeqCst),
|
|
block_commitment_cache.read().unwrap().root(),
|
|
);
|
|
|
|
if end_slot <= start_slot {
|
|
std::thread::sleep(std::time::Duration::from_secs(1));
|
|
continue;
|
|
}
|
|
|
|
let result = runtime.block_on(bigtable_upload::upload_confirmed_blocks(
|
|
blockstore.clone(),
|
|
bigtable_ledger_storage.clone(),
|
|
start_slot,
|
|
Some(end_slot),
|
|
false,
|
|
exit.clone(),
|
|
));
|
|
|
|
match result {
|
|
Ok(()) => start_slot = end_slot,
|
|
Err(err) => {
|
|
warn!("bigtable: upload_confirmed_blocks: {}", err);
|
|
std::thread::sleep(std::time::Duration::from_secs(2));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn join(self) -> thread::Result<()> {
|
|
self.thread.join()
|
|
}
|
|
}
|