Add BigTableUploadService

This commit is contained in:
Michael Vines 2020-09-03 19:39:05 -07:00
parent 2b8a521562
commit bc7731b969
4 changed files with 109 additions and 9 deletions

View File

@ -0,0 +1,78 @@
use solana_ledger::blockstore::Blockstore;
use solana_runtime::commitment::BlockCommitmentCache;
use std::{
sync::atomic::{AtomicBool, Ordering},
sync::{Arc, RwLock},
thread::{self, Builder, JoinHandle},
};
use tokio::runtime;
pub struct BigTableUploadService {
thread: JoinHandle<()>,
}
impl BigTableUploadService {
pub fn new(
runtime_handle: runtime::Handle,
bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage,
blockstore: Arc<Blockstore>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
exit: Arc<AtomicBool>,
) -> Self {
info!("Starting BigTable upload service");
let thread = Builder::new()
.name("bigtable-upload".to_string())
.spawn(move || {
Self::run(
runtime_handle,
bigtable_ledger_storage,
blockstore,
block_commitment_cache,
exit,
)
})
.unwrap();
Self { thread }
}
fn run(
runtime: runtime::Handle,
bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage,
blockstore: Arc<Blockstore>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
exit: Arc<AtomicBool>,
) {
let mut starting_slot = 0;
loop {
if exit.load(Ordering::Relaxed) {
break;
}
let max_confirmed_root = block_commitment_cache
.read()
.unwrap()
.highest_confirmed_root();
let result = runtime.block_on(solana_ledger::bigtable_upload::upload_confirmed_blocks(
blockstore.clone(),
bigtable_ledger_storage.clone(),
starting_slot,
None,
true,
));
match result {
Ok(()) => starting_slot = max_confirmed_root,
Err(err) => {
warn!("bigtable: upload_confirmed_blocks: {}", err);
std::thread::sleep(std::time::Duration::from_secs(2));
}
}
}
}
pub fn join(self) -> thread::Result<()> {
self.thread.join()
}
}

View File

@ -9,6 +9,7 @@
pub mod accounts_background_service;
pub mod accounts_hash_verifier;
pub mod banking_stage;
pub mod bigtable_upload_service;
pub mod broadcast_stage;
pub mod cluster_info_vote_listener;
pub mod commitment_service;

View File

@ -1,6 +1,9 @@
//! The `rpc_service` module implements the Solana JSON RPC service.
use crate::{cluster_info::ClusterInfo, rpc::*, rpc_health::*, validator::ValidatorExit};
use crate::{
bigtable_upload_service::BigTableUploadService, cluster_info::ClusterInfo, rpc::*,
rpc_health::*, validator::ValidatorExit,
};
use jsonrpc_core::MetaIoHandler;
use jsonrpc_http_server::{
hyper, AccessControlAllowOrigin, CloseHandle, DomainsValidation, RequestMiddleware,
@ -260,22 +263,36 @@ impl JsonRpcService {
.build()
.expect("Runtime");
let bigtable_ledger_storage =
let exit_bigtable_ledger_upload_service = Arc::new(AtomicBool::new(false));
let (bigtable_ledger_storage, _bigtable_ledger_upload_service) =
if config.enable_bigtable_ledger_storage || config.enable_bigtable_ledger_upload {
runtime
.block_on(solana_storage_bigtable::LedgerStorage::new(
config.enable_bigtable_ledger_upload,
))
.map(|x| {
.map(|bigtable_ledger_storage| {
info!("BigTable ledger storage initialized");
Some(x)
let bigtable_ledger_upload_service = Arc::new(BigTableUploadService::new(
runtime.handle().clone(),
bigtable_ledger_storage.clone(),
blockstore.clone(),
block_commitment_cache.clone(),
exit_bigtable_ledger_upload_service.clone(),
));
(
Some(bigtable_ledger_storage),
Some(bigtable_ledger_upload_service),
)
})
.unwrap_or_else(|err| {
error!("Failed to initialize BigTable ledger storage: {:?}", err);
None
(None, None)
})
} else {
None
(None, None)
};
let (request_processor, receiver) = JsonRpcRequestProcessor::new(
@ -344,6 +361,7 @@ impl JsonRpcService {
close_handle_sender.send(server.close_handle()).unwrap();
server.wait();
exit_send_transaction_service.store(true, Ordering::Relaxed);
exit_bigtable_ledger_upload_service.store(true, Ordering::Relaxed);
})
.unwrap();

View File

@ -20,7 +20,7 @@ pub async fn upload_confirmed_blocks(
) -> Result<(), Box<dyn std::error::Error>> {
let mut measure = Measure::start("entire upload");
info!("Loading ledger slots...");
info!("Loading ledger slots starting at {}...", starting_slot);
let blockstore_slots: Vec<_> = blockstore
.slot_meta_iterator(starting_slot)
.map_err(|err| {
@ -40,8 +40,11 @@ pub async fn upload_confirmed_blocks(
.collect();
if blockstore_slots.is_empty() {
info!("Ledger has no slots in the specified range");
return Ok(());
return Err(format!(
"Ledger has no slots from {} to {:?}",
starting_slot, ending_slot
)
.into());
}
info!(