diff --git a/core/Cargo.toml b/core/Cargo.toml index 24f73dd016..de39b6376e 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -61,6 +61,7 @@ solana-runtime = { path = "../runtime", version = "1.4.0" } solana-sdk = { path = "../sdk", version = "1.4.0" } solana-sdk-macro-frozen-abi = { path = "../sdk/macro-frozen-abi", version = "1.4.0" } solana-stake-program = { path = "../programs/stake", version = "1.4.0" } +solana-storage-bigtable = { path = "../storage-bigtable", version = "1.4.0" } solana-streamer = { path = "../streamer", version = "1.4.0" } solana-sys-tuner = { path = "../sys-tuner", version = "1.4.0" } solana-transaction-status = { path = "../transaction-status", version = "1.4.0" } diff --git a/core/src/rpc.rs b/core/src/rpc.rs index ebe40538a4..e29c5ed221 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -91,6 +91,7 @@ pub struct JsonRpcConfig { pub identity_pubkey: Pubkey, pub faucet_addr: Option, pub health_check_slot_distance: u64, + pub enable_bigtable_ledger_storage: bool, } #[derive(Clone)] @@ -105,6 +106,7 @@ pub struct JsonRpcRequestProcessor { genesis_hash: Hash, transaction_sender: Arc>>, runtime_handle: runtime::Handle, + bigtable_ledger_storage: Option, } impl Metadata for JsonRpcRequestProcessor {} @@ -159,6 +161,7 @@ impl JsonRpcRequestProcessor { }) } + #[allow(clippy::too_many_arguments)] pub fn new( config: JsonRpcConfig, bank_forks: Arc>, @@ -169,6 +172,7 @@ impl JsonRpcRequestProcessor { cluster_info: Arc, genesis_hash: Hash, runtime: &runtime::Runtime, + bigtable_ledger_storage: Option, ) -> (Self, Receiver) { let (sender, receiver) = channel(); ( @@ -182,6 +186,8 @@ impl JsonRpcRequestProcessor { cluster_info, genesis_hash, transaction_sender: Arc::new(Mutex::new(sender)), + runtime_handle: runtime.handle().clone(), + bigtable_ledger_storage, }, receiver, ) @@ -220,7 +226,8 @@ impl JsonRpcRequestProcessor { cluster_info, genesis_hash, transaction_sender: Arc::new(Mutex::new(sender)), - runtime_handle: runtime.handle().clone(), + runtime_handle: runtime::Runtime::new().unwrap().handle().clone(), + bigtable_ledger_storage: None, } } @@ -560,6 +567,7 @@ impl JsonRpcRequestProcessor { slot: Slot, encoding: Option, ) -> Result> { + let encoding = encoding.unwrap_or(UiTransactionEncoding::Json); if self.config.enable_rpc_transaction_history && slot <= self @@ -568,7 +576,15 @@ impl JsonRpcRequestProcessor { .unwrap() .highest_confirmed_root() { - let result = self.blockstore.get_confirmed_block(slot, encoding); + let result = self.blockstore.get_confirmed_block(slot, Some(encoding)); + if result.is_err() { + if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage { + return Ok(self + .runtime_handle + .block_on(bigtable_ledger_storage.get_confirmed_block(slot, encoding)) + .ok()); + } + } self.check_slot_cleaned_up(&result, slot)?; Ok(result.ok()) } else { @@ -597,9 +613,25 @@ impl JsonRpcRequestProcessor { MAX_GET_CONFIRMED_BLOCKS_RANGE ))); } + + let lowest_slot = self.blockstore.lowest_slot(); + if start_slot < lowest_slot { + // If the starting slot is lower than what's available in blockstore assume the entire + // [start_slot..end_slot] can be fetched from BigTable. + if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage { + return Ok(self + .runtime_handle + .block_on( + bigtable_ledger_storage + .get_confirmed_blocks(start_slot, (end_slot - start_slot) as usize), + ) + .unwrap_or_else(|_| vec![])); + } + } + Ok(self .blockstore - .rooted_slot_iterator(max(start_slot, self.blockstore.lowest_slot())) + .rooted_slot_iterator(max(start_slot, lowest_slot)) .map_err(|_| Error::internal_error())? .filter(|&slot| slot <= end_slot) .collect()) @@ -693,6 +725,16 @@ impl JsonRpcRequestProcessor { err, } }) + .or_else(|| { + if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage { + self.runtime_handle + .block_on(bigtable_ledger_storage.get_signature_status(&signature)) + .map(Some) + .unwrap_or(None) + } else { + None + } + }) } else { None }; @@ -735,21 +777,38 @@ impl JsonRpcRequestProcessor { signature: Signature, encoding: Option, ) -> Option { + let encoding = encoding.unwrap_or(UiTransactionEncoding::Json); if self.config.enable_rpc_transaction_history { - self.blockstore - .get_confirmed_transaction(signature, encoding) + match self + .blockstore + .get_confirmed_transaction(signature, Some(encoding)) .unwrap_or(None) - .filter(|confirmed_transaction| { - confirmed_transaction.slot + { + Some(confirmed_transaction) => { + if confirmed_transaction.slot <= self .block_commitment_cache .read() .unwrap() .highest_confirmed_root() - }) - } else { - None + { + return Some(confirmed_transaction); + } + } + None => { + if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage { + return self + .runtime_handle + .block_on( + bigtable_ledger_storage + .get_confirmed_transaction(&signature, encoding), + ) + .unwrap_or(None); + } + } + } } + None } pub fn get_confirmed_signatures_for_address( @@ -759,6 +818,8 @@ impl JsonRpcRequestProcessor { end_slot: Slot, ) -> Vec { if self.config.enable_rpc_transaction_history { + // TODO: Add bigtable_ledger_storage support as a part of + // https://github.com/solana-labs/solana/pull/10928 let end_slot = min( end_slot, self.block_commitment_cache @@ -775,9 +836,23 @@ impl JsonRpcRequestProcessor { } pub fn get_first_available_block(&self) -> Slot { - self.blockstore + let slot = self + .blockstore .get_first_available_block() - .unwrap_or_default() + .unwrap_or_default(); + + if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage { + let bigtable_slot = self + .runtime_handle + .block_on(bigtable_ledger_storage.get_first_available_block()) + .unwrap_or(None) + .unwrap_or(slot); + + if bigtable_slot < slot { + return bigtable_slot; + } + } + slot } pub fn get_stake_activation( @@ -2348,6 +2423,8 @@ pub mod tests { RpcHealth::stub(), cluster_info.clone(), Hash::default(), + &runtime::Runtime::new().unwrap(), + None, ); SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); @@ -3490,6 +3567,8 @@ pub mod tests { RpcHealth::stub(), cluster_info, Hash::default(), + &runtime::Runtime::new().unwrap(), + None, ); SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); @@ -3529,6 +3608,8 @@ pub mod tests { health.clone(), cluster_info, Hash::default(), + &runtime::Runtime::new().unwrap(), + None, ); SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); @@ -3709,6 +3790,8 @@ pub mod tests { RpcHealth::stub(), cluster_info, Hash::default(), + &runtime::Runtime::new().unwrap(), + None, ); SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); assert_eq!(request_processor.validator_exit(), false); @@ -3736,6 +3819,8 @@ pub mod tests { RpcHealth::stub(), cluster_info, Hash::default(), + &runtime::Runtime::new().unwrap(), + None, ); SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); assert_eq!(request_processor.validator_exit(), true); @@ -3825,6 +3910,8 @@ pub mod tests { RpcHealth::stub(), cluster_info, Hash::default(), + &runtime::Runtime::new().unwrap(), + None, ); SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); assert_eq!( diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index be8d40efcb..7490685ac8 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -253,12 +253,28 @@ impl JsonRpcService { )); let tpu_address = cluster_info.my_contact_info().tpu; - let runtime = runtime::Builder::new() + let mut runtime = runtime::Builder::new() .threaded_scheduler() .thread_name("rpc-runtime") .enable_all() .build() .expect("Runtime"); + + let bigtable_ledger_storage = if config.enable_bigtable_ledger_storage { + runtime + .block_on(solana_storage_bigtable::LedgerStorage::new(false)) + .map(|x| { + info!("BigTable ledger storage initialized"); + Some(x) + }) + .unwrap_or_else(|err| { + error!("Failed to initialize BigTable ledger storage: {:?}", err); + None + }) + } else { + None + }; + let (request_processor, receiver) = JsonRpcRequestProcessor::new( config, bank_forks.clone(), @@ -269,6 +285,7 @@ impl JsonRpcService { cluster_info, genesis_hash, &runtime, + bigtable_ledger_storage, ); let exit_send_transaction_service = Arc::new(AtomicBool::new(false)); diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 23f251cb86..afb3f65a8f 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -952,7 +952,7 @@ pub(crate) mod tests { system_transaction, }; use std::{fmt::Debug, sync::mpsc::channel, time::Instant}; - use tokio::{prelude::FutureExt, runtime::Runtime, timer::Delay}; + use tokio_01::{prelude::FutureExt, runtime::Runtime, timer::Delay}; pub(crate) fn robust_poll_or_panic( receiver: futures::sync::mpsc::Receiver, diff --git a/core/tests/rpc.rs b/core/tests/rpc.rs index 37bb955e42..9d11f1e16b 100644 --- a/core/tests/rpc.rs +++ b/core/tests/rpc.rs @@ -26,7 +26,7 @@ use std::{ thread::sleep, time::{Duration, Instant}, }; -use tokio::runtime::Runtime; +use tokio_01::runtime::Runtime; macro_rules! json_req { ($method: expr, $params: expr) => {{ @@ -189,7 +189,7 @@ fn test_rpc_subscriptions() { .and_then(move |client| { for sig in signature_set { let status_sender = status_sender.clone(); - tokio::spawn( + tokio_01::spawn( client .signature_subscribe(sig.clone(), None) .and_then(move |sig_stream| { @@ -203,7 +203,7 @@ fn test_rpc_subscriptions() { }), ); } - tokio::spawn( + tokio_01::spawn( client .slot_subscribe() .and_then(move |slot_stream| { @@ -218,7 +218,7 @@ fn test_rpc_subscriptions() { ); for pubkey in account_set { let account_sender = account_sender.clone(); - tokio::spawn( + tokio_01::spawn( client .account_subscribe(pubkey, None) .and_then(move |account_stream| { diff --git a/ledger-tool/src/bigtable.rs b/ledger-tool/src/bigtable.rs index aa611216b5..6fea79155b 100644 --- a/ledger-tool/src/bigtable.rs +++ b/ledger-tool/src/bigtable.rs @@ -10,7 +10,13 @@ use solana_ledger::{blockstore::Blockstore, blockstore_db::AccessType}; use solana_measure::measure::Measure; use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}; use solana_transaction_status::UiTransactionEncoding; -use std::{collections::HashSet, path::Path, process::exit, result::Result, time::Duration}; +use std::{ + collections::HashSet, + path::Path, + process::exit, + result::Result, + time::{Duration, Instant}, +}; use tokio::time::delay_for; // Attempt to upload this many blocks in parallel @@ -131,7 +137,8 @@ async fn upload( ( std::thread::spawn(move || { let mut measure = Measure::start("block loader thread"); - for slot in &blocks_to_upload { + let mut last_status_update = Instant::now(); + for (i, slot) in blocks_to_upload.iter().enumerate() { let _ = match blockstore.get_confirmed_block( *slot, Some(solana_transaction_status::UiTransactionEncoding::Binary), @@ -145,6 +152,16 @@ async fn upload( sender.send((*slot, None)) } }; + + if Instant::now().duration_since(last_status_update).as_secs() >= 60 { + info!( + "{}% of blocks processed ({}/{})", + i * 100 / blocks_to_upload.len(), + i, + blocks_to_upload.len() + ); + last_status_update = Instant::now(); + } } measure.stop(); info!("{} to load {} blocks", measure, blocks_to_upload.len()); @@ -160,8 +177,9 @@ async fn upload( tokio::stream::iter(receiver.into_iter()).chunks(NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL); while let Some(blocks) = stream.next().await { - let mut measure_upload = Measure::start("upload"); + let mut measure_upload = Measure::start("Upload"); let mut num_blocks = blocks.len(); + info!("Preparing the next {} blocks for upload", num_blocks); let uploads = blocks.into_iter().filter_map(|(slot, block)| match block { None => { diff --git a/multinode-demo/bootstrap-validator.sh b/multinode-demo/bootstrap-validator.sh index e19639384c..b11c041892 100755 --- a/multinode-demo/bootstrap-validator.sh +++ b/multinode-demo/bootstrap-validator.sh @@ -48,6 +48,9 @@ while [[ -n $1 ]]; do elif [[ $1 = --enable-rpc-transaction-history ]]; then args+=("$1") shift + elif [[ $1 = --enable-rpc-bigtable-ledger-storage ]]; then + args+=("$1") + shift elif [[ $1 = --skip-poh-verify ]]; then args+=("$1") shift diff --git a/validator/src/main.rs b/validator/src/main.rs index aa1d9d33de..a401934b73 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -627,7 +627,15 @@ pub fn main() { .takes_value(false) .help("Enable historical transaction info over JSON RPC, \ including the 'getConfirmedBlock' API. \ - This will cause an increase in disk usage and IOPS"), + This will cause an increase in disk usage and IOPS"), + ) + .arg( + Arg::with_name("enable_rpc_bigtable_ledger_storage") + .long("enable-rpc-bigtable-ledger-storage") + .requires("enable_rpc_transaction_history") + .takes_value(false) + .help("Fetch historical transaction info from a BigTable instance \ + as a fallback to local ledger data"), ) .arg( Arg::with_name("health_check_slot_distance") @@ -938,6 +946,8 @@ pub fn main() { enable_validator_exit: matches.is_present("enable_rpc_exit"), enable_set_log_filter: matches.is_present("enable_rpc_set_log_filter"), enable_rpc_transaction_history: matches.is_present("enable_rpc_transaction_history"), + enable_bigtable_ledger_storage: matches + .is_present("enable_rpc_bigtable_ledger_storage"), identity_pubkey: identity_keypair.pubkey(), faucet_addr: matches.value_of("rpc_faucet_addr").map(|address| { solana_net_utils::parse_host_port(address).expect("failed to parse faucet address")