From bc7731b96985169b10076e85bb431bb3844e2209 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Thu, 3 Sep 2020 19:39:05 -0700 Subject: [PATCH] Add BigTableUploadService --- core/src/bigtable_upload_service.rs | 78 +++++++++++++++++++++++++++++ core/src/lib.rs | 1 + core/src/rpc_service.rs | 30 ++++++++--- ledger/src/bigtable_upload.rs | 9 ++-- 4 files changed, 109 insertions(+), 9 deletions(-) create mode 100644 core/src/bigtable_upload_service.rs diff --git a/core/src/bigtable_upload_service.rs b/core/src/bigtable_upload_service.rs new file mode 100644 index 0000000000..693e8c23e7 --- /dev/null +++ b/core/src/bigtable_upload_service.rs @@ -0,0 +1,78 @@ +use solana_ledger::blockstore::Blockstore; +use solana_runtime::commitment::BlockCommitmentCache; +use std::{ + sync::atomic::{AtomicBool, Ordering}, + sync::{Arc, RwLock}, + thread::{self, Builder, JoinHandle}, +}; +use tokio::runtime; + +pub struct BigTableUploadService { + thread: JoinHandle<()>, +} + +impl BigTableUploadService { + pub fn new( + runtime_handle: runtime::Handle, + bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage, + blockstore: Arc, + block_commitment_cache: Arc>, + exit: Arc, + ) -> Self { + info!("Starting BigTable upload service"); + let thread = Builder::new() + .name("bigtable-upload".to_string()) + .spawn(move || { + Self::run( + runtime_handle, + bigtable_ledger_storage, + blockstore, + block_commitment_cache, + exit, + ) + }) + .unwrap(); + + Self { thread } + } + + fn run( + runtime: runtime::Handle, + bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage, + blockstore: Arc, + block_commitment_cache: Arc>, + exit: Arc, + ) { + let mut starting_slot = 0; + loop { + if exit.load(Ordering::Relaxed) { + break; + } + + let max_confirmed_root = block_commitment_cache + .read() + .unwrap() + .highest_confirmed_root(); + + let result = runtime.block_on(solana_ledger::bigtable_upload::upload_confirmed_blocks( + blockstore.clone(), + bigtable_ledger_storage.clone(), + starting_slot, + None, + true, + )); + + match result { + Ok(()) => starting_slot = max_confirmed_root, + Err(err) => { + warn!("bigtable: upload_confirmed_blocks: {}", err); + std::thread::sleep(std::time::Duration::from_secs(2)); + } + } + } + } + + pub fn join(self) -> thread::Result<()> { + self.thread.join() + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 268db59c4c..5323da0dcd 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -9,6 +9,7 @@ pub mod accounts_background_service; pub mod accounts_hash_verifier; pub mod banking_stage; +pub mod bigtable_upload_service; pub mod broadcast_stage; pub mod cluster_info_vote_listener; pub mod commitment_service; diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index 178228af18..48a0d857ab 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -1,6 +1,9 @@ //! The `rpc_service` module implements the Solana JSON RPC service. -use crate::{cluster_info::ClusterInfo, rpc::*, rpc_health::*, validator::ValidatorExit}; +use crate::{ + bigtable_upload_service::BigTableUploadService, cluster_info::ClusterInfo, rpc::*, + rpc_health::*, validator::ValidatorExit, +}; use jsonrpc_core::MetaIoHandler; use jsonrpc_http_server::{ hyper, AccessControlAllowOrigin, CloseHandle, DomainsValidation, RequestMiddleware, @@ -260,22 +263,36 @@ impl JsonRpcService { .build() .expect("Runtime"); - let bigtable_ledger_storage = + let exit_bigtable_ledger_upload_service = Arc::new(AtomicBool::new(false)); + + let (bigtable_ledger_storage, _bigtable_ledger_upload_service) = if config.enable_bigtable_ledger_storage || config.enable_bigtable_ledger_upload { runtime .block_on(solana_storage_bigtable::LedgerStorage::new( config.enable_bigtable_ledger_upload, )) - .map(|x| { + .map(|bigtable_ledger_storage| { info!("BigTable ledger storage initialized"); - Some(x) + + let bigtable_ledger_upload_service = Arc::new(BigTableUploadService::new( + runtime.handle().clone(), + bigtable_ledger_storage.clone(), + blockstore.clone(), + block_commitment_cache.clone(), + exit_bigtable_ledger_upload_service.clone(), + )); + + ( + Some(bigtable_ledger_storage), + Some(bigtable_ledger_upload_service), + ) }) .unwrap_or_else(|err| { error!("Failed to initialize BigTable ledger storage: {:?}", err); - None + (None, None) }) } else { - None + (None, None) }; let (request_processor, receiver) = JsonRpcRequestProcessor::new( @@ -344,6 +361,7 @@ impl JsonRpcService { close_handle_sender.send(server.close_handle()).unwrap(); server.wait(); exit_send_transaction_service.store(true, Ordering::Relaxed); + exit_bigtable_ledger_upload_service.store(true, Ordering::Relaxed); }) .unwrap(); diff --git a/ledger/src/bigtable_upload.rs b/ledger/src/bigtable_upload.rs index fe2356ca49..a580a07976 100644 --- a/ledger/src/bigtable_upload.rs +++ b/ledger/src/bigtable_upload.rs @@ -20,7 +20,7 @@ pub async fn upload_confirmed_blocks( ) -> Result<(), Box> { let mut measure = Measure::start("entire upload"); - info!("Loading ledger slots..."); + info!("Loading ledger slots starting at {}...", starting_slot); let blockstore_slots: Vec<_> = blockstore .slot_meta_iterator(starting_slot) .map_err(|err| { @@ -40,8 +40,11 @@ pub async fn upload_confirmed_blocks( .collect(); if blockstore_slots.is_empty() { - info!("Ledger has no slots in the specified range"); - return Ok(()); + return Err(format!( + "Ledger has no slots from {} to {:?}", + starting_slot, ending_slot + ) + .into()); } info!(