Plumb Bigtable ledger storage into the RPC subsystem

This commit is contained in:
Michael Vines 2020-07-23 09:54:57 -07:00
parent 0e02740565
commit dfae9a9864
8 changed files with 158 additions and 22 deletions

View File

@ -61,6 +61,7 @@ solana-runtime = { path = "../runtime", version = "1.4.0" }
solana-sdk = { path = "../sdk", version = "1.4.0" } solana-sdk = { path = "../sdk", version = "1.4.0" }
solana-sdk-macro-frozen-abi = { path = "../sdk/macro-frozen-abi", version = "1.4.0" } solana-sdk-macro-frozen-abi = { path = "../sdk/macro-frozen-abi", version = "1.4.0" }
solana-stake-program = { path = "../programs/stake", version = "1.4.0" } solana-stake-program = { path = "../programs/stake", version = "1.4.0" }
solana-storage-bigtable = { path = "../storage-bigtable", version = "1.4.0" }
solana-streamer = { path = "../streamer", version = "1.4.0" } solana-streamer = { path = "../streamer", version = "1.4.0" }
solana-sys-tuner = { path = "../sys-tuner", version = "1.4.0" } solana-sys-tuner = { path = "../sys-tuner", version = "1.4.0" }
solana-transaction-status = { path = "../transaction-status", version = "1.4.0" } solana-transaction-status = { path = "../transaction-status", version = "1.4.0" }

View File

@ -91,6 +91,7 @@ pub struct JsonRpcConfig {
pub identity_pubkey: Pubkey, pub identity_pubkey: Pubkey,
pub faucet_addr: Option<SocketAddr>, pub faucet_addr: Option<SocketAddr>,
pub health_check_slot_distance: u64, pub health_check_slot_distance: u64,
pub enable_bigtable_ledger_storage: bool,
} }
#[derive(Clone)] #[derive(Clone)]
@ -105,6 +106,7 @@ pub struct JsonRpcRequestProcessor {
genesis_hash: Hash, genesis_hash: Hash,
transaction_sender: Arc<Mutex<Sender<TransactionInfo>>>, transaction_sender: Arc<Mutex<Sender<TransactionInfo>>>,
runtime_handle: runtime::Handle, runtime_handle: runtime::Handle,
bigtable_ledger_storage: Option<solana_storage_bigtable::LedgerStorage>,
} }
impl Metadata for JsonRpcRequestProcessor {} impl Metadata for JsonRpcRequestProcessor {}
@ -159,6 +161,7 @@ impl JsonRpcRequestProcessor {
}) })
} }
#[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
config: JsonRpcConfig, config: JsonRpcConfig,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
@ -169,6 +172,7 @@ impl JsonRpcRequestProcessor {
cluster_info: Arc<ClusterInfo>, cluster_info: Arc<ClusterInfo>,
genesis_hash: Hash, genesis_hash: Hash,
runtime: &runtime::Runtime, runtime: &runtime::Runtime,
bigtable_ledger_storage: Option<solana_storage_bigtable::LedgerStorage>,
) -> (Self, Receiver<TransactionInfo>) { ) -> (Self, Receiver<TransactionInfo>) {
let (sender, receiver) = channel(); let (sender, receiver) = channel();
( (
@ -182,6 +186,8 @@ impl JsonRpcRequestProcessor {
cluster_info, cluster_info,
genesis_hash, genesis_hash,
transaction_sender: Arc::new(Mutex::new(sender)), transaction_sender: Arc::new(Mutex::new(sender)),
runtime_handle: runtime.handle().clone(),
bigtable_ledger_storage,
}, },
receiver, receiver,
) )
@ -220,7 +226,8 @@ impl JsonRpcRequestProcessor {
cluster_info, cluster_info,
genesis_hash, genesis_hash,
transaction_sender: Arc::new(Mutex::new(sender)), transaction_sender: Arc::new(Mutex::new(sender)),
runtime_handle: runtime.handle().clone(), runtime_handle: runtime::Runtime::new().unwrap().handle().clone(),
bigtable_ledger_storage: None,
} }
} }
@ -560,6 +567,7 @@ impl JsonRpcRequestProcessor {
slot: Slot, slot: Slot,
encoding: Option<UiTransactionEncoding>, encoding: Option<UiTransactionEncoding>,
) -> Result<Option<ConfirmedBlock>> { ) -> Result<Option<ConfirmedBlock>> {
let encoding = encoding.unwrap_or(UiTransactionEncoding::Json);
if self.config.enable_rpc_transaction_history if self.config.enable_rpc_transaction_history
&& slot && slot
<= self <= self
@ -568,7 +576,15 @@ impl JsonRpcRequestProcessor {
.unwrap() .unwrap()
.highest_confirmed_root() .highest_confirmed_root()
{ {
let result = self.blockstore.get_confirmed_block(slot, encoding); let result = self.blockstore.get_confirmed_block(slot, Some(encoding));
if result.is_err() {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
return Ok(self
.runtime_handle
.block_on(bigtable_ledger_storage.get_confirmed_block(slot, encoding))
.ok());
}
}
self.check_slot_cleaned_up(&result, slot)?; self.check_slot_cleaned_up(&result, slot)?;
Ok(result.ok()) Ok(result.ok())
} else { } else {
@ -597,9 +613,25 @@ impl JsonRpcRequestProcessor {
MAX_GET_CONFIRMED_BLOCKS_RANGE MAX_GET_CONFIRMED_BLOCKS_RANGE
))); )));
} }
let lowest_slot = self.blockstore.lowest_slot();
if start_slot < lowest_slot {
// If the starting slot is lower than what's available in blockstore assume the entire
// [start_slot..end_slot] can be fetched from BigTable.
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
return Ok(self
.runtime_handle
.block_on(
bigtable_ledger_storage
.get_confirmed_blocks(start_slot, (end_slot - start_slot) as usize),
)
.unwrap_or_else(|_| vec![]));
}
}
Ok(self Ok(self
.blockstore .blockstore
.rooted_slot_iterator(max(start_slot, self.blockstore.lowest_slot())) .rooted_slot_iterator(max(start_slot, lowest_slot))
.map_err(|_| Error::internal_error())? .map_err(|_| Error::internal_error())?
.filter(|&slot| slot <= end_slot) .filter(|&slot| slot <= end_slot)
.collect()) .collect())
@ -693,6 +725,16 @@ impl JsonRpcRequestProcessor {
err, err,
} }
}) })
.or_else(|| {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
self.runtime_handle
.block_on(bigtable_ledger_storage.get_signature_status(&signature))
.map(Some)
.unwrap_or(None)
} else {
None
}
})
} else { } else {
None None
}; };
@ -735,21 +777,38 @@ impl JsonRpcRequestProcessor {
signature: Signature, signature: Signature,
encoding: Option<UiTransactionEncoding>, encoding: Option<UiTransactionEncoding>,
) -> Option<ConfirmedTransaction> { ) -> Option<ConfirmedTransaction> {
let encoding = encoding.unwrap_or(UiTransactionEncoding::Json);
if self.config.enable_rpc_transaction_history { if self.config.enable_rpc_transaction_history {
self.blockstore match self
.get_confirmed_transaction(signature, encoding) .blockstore
.get_confirmed_transaction(signature, Some(encoding))
.unwrap_or(None) .unwrap_or(None)
.filter(|confirmed_transaction| { {
confirmed_transaction.slot Some(confirmed_transaction) => {
if confirmed_transaction.slot
<= self <= self
.block_commitment_cache .block_commitment_cache
.read() .read()
.unwrap() .unwrap()
.highest_confirmed_root() .highest_confirmed_root()
}) {
} else { return Some(confirmed_transaction);
None }
}
None => {
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
return self
.runtime_handle
.block_on(
bigtable_ledger_storage
.get_confirmed_transaction(&signature, encoding),
)
.unwrap_or(None);
}
}
}
} }
None
} }
pub fn get_confirmed_signatures_for_address( pub fn get_confirmed_signatures_for_address(
@ -759,6 +818,8 @@ impl JsonRpcRequestProcessor {
end_slot: Slot, end_slot: Slot,
) -> Vec<Signature> { ) -> Vec<Signature> {
if self.config.enable_rpc_transaction_history { if self.config.enable_rpc_transaction_history {
// TODO: Add bigtable_ledger_storage support as a part of
// https://github.com/solana-labs/solana/pull/10928
let end_slot = min( let end_slot = min(
end_slot, end_slot,
self.block_commitment_cache self.block_commitment_cache
@ -775,9 +836,23 @@ impl JsonRpcRequestProcessor {
} }
pub fn get_first_available_block(&self) -> Slot { pub fn get_first_available_block(&self) -> Slot {
self.blockstore let slot = self
.blockstore
.get_first_available_block() .get_first_available_block()
.unwrap_or_default() .unwrap_or_default();
if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage {
let bigtable_slot = self
.runtime_handle
.block_on(bigtable_ledger_storage.get_first_available_block())
.unwrap_or(None)
.unwrap_or(slot);
if bigtable_slot < slot {
return bigtable_slot;
}
}
slot
} }
pub fn get_stake_activation( pub fn get_stake_activation(
@ -2348,6 +2423,8 @@ pub mod tests {
RpcHealth::stub(), RpcHealth::stub(),
cluster_info.clone(), cluster_info.clone(),
Hash::default(), Hash::default(),
&runtime::Runtime::new().unwrap(),
None,
); );
SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver);
@ -3490,6 +3567,8 @@ pub mod tests {
RpcHealth::stub(), RpcHealth::stub(),
cluster_info, cluster_info,
Hash::default(), Hash::default(),
&runtime::Runtime::new().unwrap(),
None,
); );
SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver);
@ -3529,6 +3608,8 @@ pub mod tests {
health.clone(), health.clone(),
cluster_info, cluster_info,
Hash::default(), Hash::default(),
&runtime::Runtime::new().unwrap(),
None,
); );
SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver);
@ -3709,6 +3790,8 @@ pub mod tests {
RpcHealth::stub(), RpcHealth::stub(),
cluster_info, cluster_info,
Hash::default(), Hash::default(),
&runtime::Runtime::new().unwrap(),
None,
); );
SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver);
assert_eq!(request_processor.validator_exit(), false); assert_eq!(request_processor.validator_exit(), false);
@ -3736,6 +3819,8 @@ pub mod tests {
RpcHealth::stub(), RpcHealth::stub(),
cluster_info, cluster_info,
Hash::default(), Hash::default(),
&runtime::Runtime::new().unwrap(),
None,
); );
SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver);
assert_eq!(request_processor.validator_exit(), true); assert_eq!(request_processor.validator_exit(), true);
@ -3825,6 +3910,8 @@ pub mod tests {
RpcHealth::stub(), RpcHealth::stub(),
cluster_info, cluster_info,
Hash::default(), Hash::default(),
&runtime::Runtime::new().unwrap(),
None,
); );
SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver); SendTransactionService::new(tpu_address, &bank_forks, &exit, receiver);
assert_eq!( assert_eq!(

View File

@ -253,12 +253,28 @@ impl JsonRpcService {
)); ));
let tpu_address = cluster_info.my_contact_info().tpu; let tpu_address = cluster_info.my_contact_info().tpu;
let runtime = runtime::Builder::new() let mut runtime = runtime::Builder::new()
.threaded_scheduler() .threaded_scheduler()
.thread_name("rpc-runtime") .thread_name("rpc-runtime")
.enable_all() .enable_all()
.build() .build()
.expect("Runtime"); .expect("Runtime");
let bigtable_ledger_storage = if config.enable_bigtable_ledger_storage {
runtime
.block_on(solana_storage_bigtable::LedgerStorage::new(false))
.map(|x| {
info!("BigTable ledger storage initialized");
Some(x)
})
.unwrap_or_else(|err| {
error!("Failed to initialize BigTable ledger storage: {:?}", err);
None
})
} else {
None
};
let (request_processor, receiver) = JsonRpcRequestProcessor::new( let (request_processor, receiver) = JsonRpcRequestProcessor::new(
config, config,
bank_forks.clone(), bank_forks.clone(),
@ -269,6 +285,7 @@ impl JsonRpcService {
cluster_info, cluster_info,
genesis_hash, genesis_hash,
&runtime, &runtime,
bigtable_ledger_storage,
); );
let exit_send_transaction_service = Arc::new(AtomicBool::new(false)); let exit_send_transaction_service = Arc::new(AtomicBool::new(false));

View File

@ -952,7 +952,7 @@ pub(crate) mod tests {
system_transaction, system_transaction,
}; };
use std::{fmt::Debug, sync::mpsc::channel, time::Instant}; use std::{fmt::Debug, sync::mpsc::channel, time::Instant};
use tokio::{prelude::FutureExt, runtime::Runtime, timer::Delay}; use tokio_01::{prelude::FutureExt, runtime::Runtime, timer::Delay};
pub(crate) fn robust_poll_or_panic<T: Debug + Send + 'static>( pub(crate) fn robust_poll_or_panic<T: Debug + Send + 'static>(
receiver: futures::sync::mpsc::Receiver<T>, receiver: futures::sync::mpsc::Receiver<T>,

View File

@ -26,7 +26,7 @@ use std::{
thread::sleep, thread::sleep,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use tokio::runtime::Runtime; use tokio_01::runtime::Runtime;
macro_rules! json_req { macro_rules! json_req {
($method: expr, $params: expr) => {{ ($method: expr, $params: expr) => {{
@ -189,7 +189,7 @@ fn test_rpc_subscriptions() {
.and_then(move |client| { .and_then(move |client| {
for sig in signature_set { for sig in signature_set {
let status_sender = status_sender.clone(); let status_sender = status_sender.clone();
tokio::spawn( tokio_01::spawn(
client client
.signature_subscribe(sig.clone(), None) .signature_subscribe(sig.clone(), None)
.and_then(move |sig_stream| { .and_then(move |sig_stream| {
@ -203,7 +203,7 @@ fn test_rpc_subscriptions() {
}), }),
); );
} }
tokio::spawn( tokio_01::spawn(
client client
.slot_subscribe() .slot_subscribe()
.and_then(move |slot_stream| { .and_then(move |slot_stream| {
@ -218,7 +218,7 @@ fn test_rpc_subscriptions() {
); );
for pubkey in account_set { for pubkey in account_set {
let account_sender = account_sender.clone(); let account_sender = account_sender.clone();
tokio::spawn( tokio_01::spawn(
client client
.account_subscribe(pubkey, None) .account_subscribe(pubkey, None)
.and_then(move |account_stream| { .and_then(move |account_stream| {

View File

@ -10,7 +10,13 @@ use solana_ledger::{blockstore::Blockstore, blockstore_db::AccessType};
use solana_measure::measure::Measure; use solana_measure::measure::Measure;
use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}; use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature};
use solana_transaction_status::UiTransactionEncoding; use solana_transaction_status::UiTransactionEncoding;
use std::{collections::HashSet, path::Path, process::exit, result::Result, time::Duration}; use std::{
collections::HashSet,
path::Path,
process::exit,
result::Result,
time::{Duration, Instant},
};
use tokio::time::delay_for; use tokio::time::delay_for;
// Attempt to upload this many blocks in parallel // Attempt to upload this many blocks in parallel
@ -131,7 +137,8 @@ async fn upload(
( (
std::thread::spawn(move || { std::thread::spawn(move || {
let mut measure = Measure::start("block loader thread"); let mut measure = Measure::start("block loader thread");
for slot in &blocks_to_upload { let mut last_status_update = Instant::now();
for (i, slot) in blocks_to_upload.iter().enumerate() {
let _ = match blockstore.get_confirmed_block( let _ = match blockstore.get_confirmed_block(
*slot, *slot,
Some(solana_transaction_status::UiTransactionEncoding::Binary), Some(solana_transaction_status::UiTransactionEncoding::Binary),
@ -145,6 +152,16 @@ async fn upload(
sender.send((*slot, None)) sender.send((*slot, None))
} }
}; };
if Instant::now().duration_since(last_status_update).as_secs() >= 60 {
info!(
"{}% of blocks processed ({}/{})",
i * 100 / blocks_to_upload.len(),
i,
blocks_to_upload.len()
);
last_status_update = Instant::now();
}
} }
measure.stop(); measure.stop();
info!("{} to load {} blocks", measure, blocks_to_upload.len()); info!("{} to load {} blocks", measure, blocks_to_upload.len());
@ -160,8 +177,9 @@ async fn upload(
tokio::stream::iter(receiver.into_iter()).chunks(NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL); tokio::stream::iter(receiver.into_iter()).chunks(NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL);
while let Some(blocks) = stream.next().await { while let Some(blocks) = stream.next().await {
let mut measure_upload = Measure::start("upload"); let mut measure_upload = Measure::start("Upload");
let mut num_blocks = blocks.len(); let mut num_blocks = blocks.len();
info!("Preparing the next {} blocks for upload", num_blocks);
let uploads = blocks.into_iter().filter_map(|(slot, block)| match block { let uploads = blocks.into_iter().filter_map(|(slot, block)| match block {
None => { None => {

View File

@ -48,6 +48,9 @@ while [[ -n $1 ]]; do
elif [[ $1 = --enable-rpc-transaction-history ]]; then elif [[ $1 = --enable-rpc-transaction-history ]]; then
args+=("$1") args+=("$1")
shift shift
elif [[ $1 = --enable-rpc-bigtable-ledger-storage ]]; then
args+=("$1")
shift
elif [[ $1 = --skip-poh-verify ]]; then elif [[ $1 = --skip-poh-verify ]]; then
args+=("$1") args+=("$1")
shift shift

View File

@ -627,7 +627,15 @@ pub fn main() {
.takes_value(false) .takes_value(false)
.help("Enable historical transaction info over JSON RPC, \ .help("Enable historical transaction info over JSON RPC, \
including the 'getConfirmedBlock' API. \ including the 'getConfirmedBlock' API. \
This will cause an increase in disk usage and IOPS"), This will cause an increase in disk usage and IOPS"),
)
.arg(
Arg::with_name("enable_rpc_bigtable_ledger_storage")
.long("enable-rpc-bigtable-ledger-storage")
.requires("enable_rpc_transaction_history")
.takes_value(false)
.help("Fetch historical transaction info from a BigTable instance \
as a fallback to local ledger data"),
) )
.arg( .arg(
Arg::with_name("health_check_slot_distance") Arg::with_name("health_check_slot_distance")
@ -938,6 +946,8 @@ pub fn main() {
enable_validator_exit: matches.is_present("enable_rpc_exit"), enable_validator_exit: matches.is_present("enable_rpc_exit"),
enable_set_log_filter: matches.is_present("enable_rpc_set_log_filter"), enable_set_log_filter: matches.is_present("enable_rpc_set_log_filter"),
enable_rpc_transaction_history: matches.is_present("enable_rpc_transaction_history"), enable_rpc_transaction_history: matches.is_present("enable_rpc_transaction_history"),
enable_bigtable_ledger_storage: matches
.is_present("enable_rpc_bigtable_ledger_storage"),
identity_pubkey: identity_keypair.pubkey(), identity_pubkey: identity_keypair.pubkey(),
faucet_addr: matches.value_of("rpc_faucet_addr").map(|address| { faucet_addr: matches.value_of("rpc_faucet_addr").map(|address| {
solana_net_utils::parse_host_port(address).expect("failed to parse faucet address") solana_net_utils::parse_host_port(address).expect("failed to parse faucet address")