From dcaa0258226ab67e33fd6d0b8cf41b2be4e276f4 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Tue, 19 Jan 2021 22:57:16 +0900 Subject: [PATCH] Configure Bigtable's timeout, enabling by default (#14657) * Configure bigtable's timeout when read-only * Review comments * Apply nits (thanks!) Co-authored-by: Michael Vines * Timeout in the streamed decoding as well Co-authored-by: Michael Vines --- core/src/rpc.rs | 9 ++++- core/src/rpc_service.rs | 1 + ledger-tool/src/bigtable.rs | 12 +++---- storage-bigtable/src/bigtable.rs | 59 ++++++++++++++++++++++++-------- storage-bigtable/src/lib.rs | 5 +-- validator/src/main.rs | 12 +++++++ 6 files changed, 74 insertions(+), 24 deletions(-) diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 22950b86a..4cd784378 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -81,6 +81,7 @@ use std::{ mpsc::{channel, Receiver, Sender}, Arc, Mutex, RwLock, }, + time::Duration, }; use tokio::runtime; @@ -115,6 +116,7 @@ pub struct JsonRpcConfig { pub max_multiple_accounts: Option, pub account_indexes: HashSet, pub rpc_threads: usize, + pub rpc_bigtable_timeout: Option, } #[derive(Clone)] @@ -738,7 +740,12 @@ impl JsonRpcRequestProcessor { bigtable_blocks.retain(|&slot| slot <= end_slot); bigtable_blocks }) - .unwrap_or_else(|_| vec![])); + .map_err(|_| { + Error::invalid_params( + "BigTable query failed (maybe timeout due to too large range?)" + .to_string(), + ) + })?); } } diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index bde8e7367..406434716 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -297,6 +297,7 @@ impl JsonRpcService { runtime .block_on(solana_storage_bigtable::LedgerStorage::new( !config.enable_bigtable_ledger_upload, + config.rpc_bigtable_timeout, )) .map(|bigtable_ledger_storage| { info!("BigTable ledger storage initialized"); diff --git a/ledger-tool/src/bigtable.rs b/ledger-tool/src/bigtable.rs index ae177a9cf..0faa6a60d 100644 --- a/ledger-tool/src/bigtable.rs +++ b/ledger-tool/src/bigtable.rs @@ -21,7 +21,7 @@ async fn upload( ending_slot: Option, allow_missing_metadata: bool, ) -> Result<(), Box> { - let bigtable = solana_storage_bigtable::LedgerStorage::new(false) + let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None) .await .map_err(|err| format!("Failed to connect to storage: {:?}", err))?; @@ -37,7 +37,7 @@ async fn upload( } async fn first_available_block() -> Result<(), Box> { - let bigtable = solana_storage_bigtable::LedgerStorage::new(true).await?; + let bigtable = solana_storage_bigtable::LedgerStorage::new(true, None).await?; match bigtable.get_first_available_block().await? { Some(block) => println!("{}", block), None => println!("No blocks available"), @@ -47,7 +47,7 @@ async fn first_available_block() -> Result<(), Box> { } async fn block(slot: Slot) -> Result<(), Box> { - let bigtable = solana_storage_bigtable::LedgerStorage::new(false) + let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None) .await .map_err(|err| format!("Failed to connect to storage: {:?}", err))?; @@ -75,7 +75,7 @@ async fn block(slot: Slot) -> Result<(), Box> { } async fn blocks(starting_slot: Slot, limit: usize) -> Result<(), Box> { - let bigtable = solana_storage_bigtable::LedgerStorage::new(false) + let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None) .await .map_err(|err| format!("Failed to connect to storage: {:?}", err))?; @@ -87,7 +87,7 @@ async fn blocks(starting_slot: Slot, limit: usize) -> Result<(), Box Result<(), Box> { - let bigtable = solana_storage_bigtable::LedgerStorage::new(false) + let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None) .await .map_err(|err| format!("Failed to connect to storage: {:?}", err))?; @@ -127,7 +127,7 @@ pub async fn transaction_history( show_transactions: bool, query_chunk_size: usize, ) -> Result<(), Box> { - let bigtable = solana_storage_bigtable::LedgerStorage::new(true).await?; + let bigtable = solana_storage_bigtable::LedgerStorage::new(true, None).await?; let mut loaded_block: Option<(Slot, ConfirmedBlock)> = None; while limit > 0 { diff --git a/storage-bigtable/src/bigtable.rs b/storage-bigtable/src/bigtable.rs index dd40ca3e7..1cbf12102 100644 --- a/storage-bigtable/src/bigtable.rs +++ b/storage-bigtable/src/bigtable.rs @@ -6,6 +6,7 @@ use crate::{ root_ca_certificate, }; use log::*; +use std::time::{Duration, Instant}; use thiserror::Error; use tonic::{metadata::MetadataValue, transport::ClientTlsConfig, Request}; @@ -68,6 +69,9 @@ pub enum Error { #[error("RPC error: {0}")] RpcError(tonic::Status), + + #[error("Timeout error")] + TimeoutError, } impl std::convert::From for Error { @@ -95,6 +99,7 @@ pub struct BigTableConnection { access_token: Option, channel: tonic::transport::Channel, table_prefix: String, + timeout: Option, } impl BigTableConnection { @@ -106,7 +111,11 @@ impl BigTableConnection { /// /// The BIGTABLE_EMULATOR_HOST environment variable is also respected. /// - pub async fn new(instance_name: &str, read_only: bool) -> Result { + pub async fn new( + instance_name: &str, + read_only: bool, + timeout: Option, + ) -> Result { match std::env::var("BIGTABLE_EMULATOR_HOST") { Ok(endpoint) => { info!("Connecting to bigtable emulator at {}", endpoint); @@ -117,6 +126,7 @@ impl BigTableConnection { .map_err(|err| Error::InvalidUri(endpoint, err.to_string()))? .connect_lazy()?, table_prefix: format!("projects/emulator/instances/{}/tables/", instance_name), + timeout, }) } @@ -135,20 +145,29 @@ impl BigTableConnection { instance_name ); + let endpoint = { + let endpoint = + tonic::transport::Channel::from_static("https://bigtable.googleapis.com") + .tls_config( + ClientTlsConfig::new() + .ca_certificate( + root_ca_certificate::load().map_err(Error::CertificateError)?, + ) + .domain_name("bigtable.googleapis.com"), + )?; + + if let Some(timeout) = timeout { + endpoint.timeout(timeout) + } else { + endpoint + } + }; + Ok(Self { access_token: Some(access_token), - channel: tonic::transport::Channel::from_static( - "https://bigtable.googleapis.com", - ) - .tls_config( - ClientTlsConfig::new() - .ca_certificate( - root_ca_certificate::load().map_err(Error::CertificateError)?, - ) - .domain_name("bigtable.googleapis.com"), - )? - .connect_lazy()?, + channel: endpoint.connect_lazy()?, table_prefix, + timeout, }) } } @@ -183,6 +202,7 @@ impl BigTableConnection { access_token: self.access_token.clone(), client, table_prefix: self.table_prefix.clone(), + timeout: self.timeout, } } @@ -225,10 +245,12 @@ pub struct BigTable { access_token: Option, client: bigtable_client::BigtableClient, table_prefix: String, + timeout: Option, } impl BigTable { async fn decode_read_rows_response( + &self, mut rrr: tonic::codec::Streaming, ) -> Result> { let mut rows: Vec<(RowKey, RowData)> = vec![]; @@ -240,7 +262,14 @@ impl BigTable { let mut cell_timestamp = 0; let mut cell_value = vec![]; let mut cell_version_ok = true; + let started = Instant::now(); + while let Some(res) = rrr.message().await? { + if let Some(timeout) = self.timeout { + if Instant::now().duration_since(started) > timeout { + return Err(Error::TimeoutError); + } + } for (i, mut chunk) in res.chunks.into_iter().enumerate() { // The comments for `read_rows_response::CellChunk` provide essential details for // understanding how the below decoding works... @@ -361,7 +390,7 @@ impl BigTable { .await? .into_inner(); - let rows = Self::decode_read_rows_response(response).await?; + let rows = self.decode_read_rows_response(response).await?; Ok(rows.into_iter().map(|r| r.0).collect()) } @@ -410,7 +439,7 @@ impl BigTable { .await? .into_inner(); - Self::decode_read_rows_response(response).await + self.decode_read_rows_response(response).await } /// Get latest data from a single row of `table`, if that row exists. Returns an error if that @@ -443,7 +472,7 @@ impl BigTable { .await? .into_inner(); - let rows = Self::decode_read_rows_response(response).await?; + let rows = self.decode_read_rows_response(response).await?; rows.into_iter() .next() .map(|r| r.1) diff --git a/storage-bigtable/src/lib.rs b/storage-bigtable/src/lib.rs index cad9c1962..b5289e9d4 100644 --- a/storage-bigtable/src/lib.rs +++ b/storage-bigtable/src/lib.rs @@ -279,8 +279,9 @@ pub struct LedgerStorage { } impl LedgerStorage { - pub async fn new(read_only: bool) -> Result { - let connection = bigtable::BigTableConnection::new("solana-ledger", read_only).await?; + pub async fn new(read_only: bool, timeout: Option) -> Result { + let connection = + bigtable::BigTableConnection::new("solana-ledger", read_only, timeout).await?; Ok(Self { connection }) } diff --git a/validator/src/main.rs b/validator/src/main.rs index 6f466b44c..4d57e8bc0 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1287,6 +1287,15 @@ pub fn main() { .default_value(&default_rpc_threads) .help("Number of threads to use for servicing RPC requests"), ) + .arg( + Arg::with_name("rpc_bigtable_timeout") + .long("rpc-bigtable-timeout") + .value_name("SECONDS") + .validator(is_parsable::) + .takes_value(true) + .default_value("30") + .help("Number of seconds before timing out RPC requests backed by BigTable"), + ) .arg( Arg::with_name("rpc_pubsub_enable_vote_subscription") .long("rpc-pubsub-enable-vote-subscription") @@ -1561,6 +1570,9 @@ pub fn main() { u64 ), rpc_threads: value_t_or_exit!(matches, "rpc_threads", usize), + rpc_bigtable_timeout: value_t!(matches, "rpc_bigtable_timeout", u64) + .ok() + .map(Duration::from_secs), account_indexes: account_indexes.clone(), }, rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| {