Configure Bigtable's timeout, enabling by default (#14657)

* Configure bigtable's timeout when read-only

* Review comments

* Apply nits (thanks!)

Co-authored-by: Michael Vines <mvines@gmail.com>

* Timeout in the streamed decoding as well

Co-authored-by: Michael Vines <mvines@gmail.com>
This commit is contained in:
Ryo Onodera 2021-01-19 22:57:16 +09:00 committed by GitHub
parent 8a604de3c3
commit dcaa025822
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 74 additions and 24 deletions

View File

@ -81,6 +81,7 @@ use std::{
mpsc::{channel, Receiver, Sender}, mpsc::{channel, Receiver, Sender},
Arc, Mutex, RwLock, Arc, Mutex, RwLock,
}, },
time::Duration,
}; };
use tokio::runtime; use tokio::runtime;
@ -115,6 +116,7 @@ pub struct JsonRpcConfig {
pub max_multiple_accounts: Option<usize>, pub max_multiple_accounts: Option<usize>,
pub account_indexes: HashSet<AccountIndex>, pub account_indexes: HashSet<AccountIndex>,
pub rpc_threads: usize, pub rpc_threads: usize,
pub rpc_bigtable_timeout: Option<Duration>,
} }
#[derive(Clone)] #[derive(Clone)]
@ -738,7 +740,12 @@ impl JsonRpcRequestProcessor {
bigtable_blocks.retain(|&slot| slot <= end_slot); bigtable_blocks.retain(|&slot| slot <= end_slot);
bigtable_blocks bigtable_blocks
}) })
.unwrap_or_else(|_| vec![])); .map_err(|_| {
Error::invalid_params(
"BigTable query failed (maybe timeout due to too large range?)"
.to_string(),
)
})?);
} }
} }

View File

@ -297,6 +297,7 @@ impl JsonRpcService {
runtime runtime
.block_on(solana_storage_bigtable::LedgerStorage::new( .block_on(solana_storage_bigtable::LedgerStorage::new(
!config.enable_bigtable_ledger_upload, !config.enable_bigtable_ledger_upload,
config.rpc_bigtable_timeout,
)) ))
.map(|bigtable_ledger_storage| { .map(|bigtable_ledger_storage| {
info!("BigTable ledger storage initialized"); info!("BigTable ledger storage initialized");

View File

@ -21,7 +21,7 @@ async fn upload(
ending_slot: Option<Slot>, ending_slot: Option<Slot>,
allow_missing_metadata: bool, allow_missing_metadata: bool,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
let bigtable = solana_storage_bigtable::LedgerStorage::new(false) let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None)
.await .await
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?; .map_err(|err| format!("Failed to connect to storage: {:?}", err))?;
@ -37,7 +37,7 @@ async fn upload(
} }
async fn first_available_block() -> Result<(), Box<dyn std::error::Error>> { async fn first_available_block() -> Result<(), Box<dyn std::error::Error>> {
let bigtable = solana_storage_bigtable::LedgerStorage::new(true).await?; let bigtable = solana_storage_bigtable::LedgerStorage::new(true, None).await?;
match bigtable.get_first_available_block().await? { match bigtable.get_first_available_block().await? {
Some(block) => println!("{}", block), Some(block) => println!("{}", block),
None => println!("No blocks available"), None => println!("No blocks available"),
@ -47,7 +47,7 @@ async fn first_available_block() -> Result<(), Box<dyn std::error::Error>> {
} }
async fn block(slot: Slot) -> Result<(), Box<dyn std::error::Error>> { async fn block(slot: Slot) -> Result<(), Box<dyn std::error::Error>> {
let bigtable = solana_storage_bigtable::LedgerStorage::new(false) let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None)
.await .await
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?; .map_err(|err| format!("Failed to connect to storage: {:?}", err))?;
@ -75,7 +75,7 @@ async fn block(slot: Slot) -> Result<(), Box<dyn std::error::Error>> {
} }
async fn blocks(starting_slot: Slot, limit: usize) -> Result<(), Box<dyn std::error::Error>> { async fn blocks(starting_slot: Slot, limit: usize) -> Result<(), Box<dyn std::error::Error>> {
let bigtable = solana_storage_bigtable::LedgerStorage::new(false) let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None)
.await .await
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?; .map_err(|err| format!("Failed to connect to storage: {:?}", err))?;
@ -87,7 +87,7 @@ async fn blocks(starting_slot: Slot, limit: usize) -> Result<(), Box<dyn std::er
} }
async fn confirm(signature: &Signature, verbose: bool) -> Result<(), Box<dyn std::error::Error>> { async fn confirm(signature: &Signature, verbose: bool) -> Result<(), Box<dyn std::error::Error>> {
let bigtable = solana_storage_bigtable::LedgerStorage::new(false) let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None)
.await .await
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?; .map_err(|err| format!("Failed to connect to storage: {:?}", err))?;
@ -127,7 +127,7 @@ pub async fn transaction_history(
show_transactions: bool, show_transactions: bool,
query_chunk_size: usize, query_chunk_size: usize,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
let bigtable = solana_storage_bigtable::LedgerStorage::new(true).await?; let bigtable = solana_storage_bigtable::LedgerStorage::new(true, None).await?;
let mut loaded_block: Option<(Slot, ConfirmedBlock)> = None; let mut loaded_block: Option<(Slot, ConfirmedBlock)> = None;
while limit > 0 { while limit > 0 {

View File

@ -6,6 +6,7 @@ use crate::{
root_ca_certificate, root_ca_certificate,
}; };
use log::*; use log::*;
use std::time::{Duration, Instant};
use thiserror::Error; use thiserror::Error;
use tonic::{metadata::MetadataValue, transport::ClientTlsConfig, Request}; use tonic::{metadata::MetadataValue, transport::ClientTlsConfig, Request};
@ -68,6 +69,9 @@ pub enum Error {
#[error("RPC error: {0}")] #[error("RPC error: {0}")]
RpcError(tonic::Status), RpcError(tonic::Status),
#[error("Timeout error")]
TimeoutError,
} }
impl std::convert::From<std::io::Error> for Error { impl std::convert::From<std::io::Error> for Error {
@ -95,6 +99,7 @@ pub struct BigTableConnection {
access_token: Option<AccessToken>, access_token: Option<AccessToken>,
channel: tonic::transport::Channel, channel: tonic::transport::Channel,
table_prefix: String, table_prefix: String,
timeout: Option<Duration>,
} }
impl BigTableConnection { impl BigTableConnection {
@ -106,7 +111,11 @@ impl BigTableConnection {
/// ///
/// The BIGTABLE_EMULATOR_HOST environment variable is also respected. /// The BIGTABLE_EMULATOR_HOST environment variable is also respected.
/// ///
pub async fn new(instance_name: &str, read_only: bool) -> Result<Self> { pub async fn new(
instance_name: &str,
read_only: bool,
timeout: Option<Duration>,
) -> Result<Self> {
match std::env::var("BIGTABLE_EMULATOR_HOST") { match std::env::var("BIGTABLE_EMULATOR_HOST") {
Ok(endpoint) => { Ok(endpoint) => {
info!("Connecting to bigtable emulator at {}", endpoint); info!("Connecting to bigtable emulator at {}", endpoint);
@ -117,6 +126,7 @@ impl BigTableConnection {
.map_err(|err| Error::InvalidUri(endpoint, err.to_string()))? .map_err(|err| Error::InvalidUri(endpoint, err.to_string()))?
.connect_lazy()?, .connect_lazy()?,
table_prefix: format!("projects/emulator/instances/{}/tables/", instance_name), table_prefix: format!("projects/emulator/instances/{}/tables/", instance_name),
timeout,
}) })
} }
@ -135,20 +145,29 @@ impl BigTableConnection {
instance_name instance_name
); );
let endpoint = {
let endpoint =
tonic::transport::Channel::from_static("https://bigtable.googleapis.com")
.tls_config(
ClientTlsConfig::new()
.ca_certificate(
root_ca_certificate::load().map_err(Error::CertificateError)?,
)
.domain_name("bigtable.googleapis.com"),
)?;
if let Some(timeout) = timeout {
endpoint.timeout(timeout)
} else {
endpoint
}
};
Ok(Self { Ok(Self {
access_token: Some(access_token), access_token: Some(access_token),
channel: tonic::transport::Channel::from_static( channel: endpoint.connect_lazy()?,
"https://bigtable.googleapis.com",
)
.tls_config(
ClientTlsConfig::new()
.ca_certificate(
root_ca_certificate::load().map_err(Error::CertificateError)?,
)
.domain_name("bigtable.googleapis.com"),
)?
.connect_lazy()?,
table_prefix, table_prefix,
timeout,
}) })
} }
} }
@ -183,6 +202,7 @@ impl BigTableConnection {
access_token: self.access_token.clone(), access_token: self.access_token.clone(),
client, client,
table_prefix: self.table_prefix.clone(), table_prefix: self.table_prefix.clone(),
timeout: self.timeout,
} }
} }
@ -225,10 +245,12 @@ pub struct BigTable {
access_token: Option<AccessToken>, access_token: Option<AccessToken>,
client: bigtable_client::BigtableClient<tonic::transport::Channel>, client: bigtable_client::BigtableClient<tonic::transport::Channel>,
table_prefix: String, table_prefix: String,
timeout: Option<Duration>,
} }
impl BigTable { impl BigTable {
async fn decode_read_rows_response( async fn decode_read_rows_response(
&self,
mut rrr: tonic::codec::Streaming<ReadRowsResponse>, mut rrr: tonic::codec::Streaming<ReadRowsResponse>,
) -> Result<Vec<(RowKey, RowData)>> { ) -> Result<Vec<(RowKey, RowData)>> {
let mut rows: Vec<(RowKey, RowData)> = vec![]; let mut rows: Vec<(RowKey, RowData)> = vec![];
@ -240,7 +262,14 @@ impl BigTable {
let mut cell_timestamp = 0; let mut cell_timestamp = 0;
let mut cell_value = vec![]; let mut cell_value = vec![];
let mut cell_version_ok = true; let mut cell_version_ok = true;
let started = Instant::now();
while let Some(res) = rrr.message().await? { while let Some(res) = rrr.message().await? {
if let Some(timeout) = self.timeout {
if Instant::now().duration_since(started) > timeout {
return Err(Error::TimeoutError);
}
}
for (i, mut chunk) in res.chunks.into_iter().enumerate() { for (i, mut chunk) in res.chunks.into_iter().enumerate() {
// The comments for `read_rows_response::CellChunk` provide essential details for // The comments for `read_rows_response::CellChunk` provide essential details for
// understanding how the below decoding works... // understanding how the below decoding works...
@ -361,7 +390,7 @@ impl BigTable {
.await? .await?
.into_inner(); .into_inner();
let rows = Self::decode_read_rows_response(response).await?; let rows = self.decode_read_rows_response(response).await?;
Ok(rows.into_iter().map(|r| r.0).collect()) Ok(rows.into_iter().map(|r| r.0).collect())
} }
@ -410,7 +439,7 @@ impl BigTable {
.await? .await?
.into_inner(); .into_inner();
Self::decode_read_rows_response(response).await self.decode_read_rows_response(response).await
} }
/// Get latest data from a single row of `table`, if that row exists. Returns an error if that /// Get latest data from a single row of `table`, if that row exists. Returns an error if that
@ -443,7 +472,7 @@ impl BigTable {
.await? .await?
.into_inner(); .into_inner();
let rows = Self::decode_read_rows_response(response).await?; let rows = self.decode_read_rows_response(response).await?;
rows.into_iter() rows.into_iter()
.next() .next()
.map(|r| r.1) .map(|r| r.1)

View File

@ -279,8 +279,9 @@ pub struct LedgerStorage {
} }
impl LedgerStorage { impl LedgerStorage {
pub async fn new(read_only: bool) -> Result<Self> { pub async fn new(read_only: bool, timeout: Option<std::time::Duration>) -> Result<Self> {
let connection = bigtable::BigTableConnection::new("solana-ledger", read_only).await?; let connection =
bigtable::BigTableConnection::new("solana-ledger", read_only, timeout).await?;
Ok(Self { connection }) Ok(Self { connection })
} }

View File

@ -1287,6 +1287,15 @@ pub fn main() {
.default_value(&default_rpc_threads) .default_value(&default_rpc_threads)
.help("Number of threads to use for servicing RPC requests"), .help("Number of threads to use for servicing RPC requests"),
) )
.arg(
Arg::with_name("rpc_bigtable_timeout")
.long("rpc-bigtable-timeout")
.value_name("SECONDS")
.validator(is_parsable::<u64>)
.takes_value(true)
.default_value("30")
.help("Number of seconds before timing out RPC requests backed by BigTable"),
)
.arg( .arg(
Arg::with_name("rpc_pubsub_enable_vote_subscription") Arg::with_name("rpc_pubsub_enable_vote_subscription")
.long("rpc-pubsub-enable-vote-subscription") .long("rpc-pubsub-enable-vote-subscription")
@ -1561,6 +1570,9 @@ pub fn main() {
u64 u64
), ),
rpc_threads: value_t_or_exit!(matches, "rpc_threads", usize), rpc_threads: value_t_or_exit!(matches, "rpc_threads", usize),
rpc_bigtable_timeout: value_t!(matches, "rpc_bigtable_timeout", u64)
.ok()
.map(Duration::from_secs),
account_indexes: account_indexes.clone(), account_indexes: account_indexes.clone(),
}, },
rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| { rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| {