Add ability to use a non-default app profile id in bigtable requests (#25968)
* Add ability to use a non-default app profile id in bigtable requests * Only run subcommand once when getting global configs * Remove unneded scoping on option type
This commit is contained in:
parent
54675b2dd6
commit
32a58dd9e0
|
@ -29,6 +29,7 @@ use {
|
|||
path::Path,
|
||||
process::exit,
|
||||
result::Result,
|
||||
str::FromStr,
|
||||
sync::{atomic::AtomicBool, Arc},
|
||||
},
|
||||
};
|
||||
|
@ -343,6 +344,15 @@ impl BigTableSubCommand for App<'_, '_> {
|
|||
.default_value(solana_storage_bigtable::DEFAULT_INSTANCE_NAME)
|
||||
.help("Name of the target Bigtable instance")
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("rpc_bigtable_app_profile_id")
|
||||
.global(true)
|
||||
.long("rpc-bigtable-app-profile-id")
|
||||
.takes_value(true)
|
||||
.value_name("APP_PROFILE_ID")
|
||||
.default_value(solana_storage_bigtable::DEFAULT_APP_PROFILE_ID)
|
||||
.help("Bigtable application profile id to use in requests")
|
||||
)
|
||||
.subcommand(
|
||||
SubCommand::with_name("upload")
|
||||
.about("Upload the ledger to BigTable")
|
||||
|
@ -469,6 +479,14 @@ impl BigTableSubCommand for App<'_, '_> {
|
|||
.value_name("INSTANCE_NAME")
|
||||
.default_value(solana_storage_bigtable::DEFAULT_INSTANCE_NAME)
|
||||
.help("Name of the reference Bigtable instance to compare to")
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("reference_app_profile_id")
|
||||
.long("reference-app-profile-id")
|
||||
.takes_value(true)
|
||||
.value_name("APP_PROFILE_ID")
|
||||
.default_value(solana_storage_bigtable::DEFAULT_APP_PROFILE_ID)
|
||||
.help("Reference Bigtable application profile id to use in requests")
|
||||
),
|
||||
)
|
||||
.subcommand(
|
||||
|
@ -559,12 +577,12 @@ impl BigTableSubCommand for App<'_, '_> {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) {
|
||||
let runtime = tokio::runtime::Runtime::new().unwrap();
|
||||
|
||||
let verbose = matches.is_present("verbose");
|
||||
let output_format = OutputFormat::from_matches(matches, "output_format", verbose);
|
||||
|
||||
fn get_global_subcommand_arg<T: FromStr>(
|
||||
matches: &ArgMatches<'_>,
|
||||
sub_matches: Option<&clap::ArgMatches>,
|
||||
name: &str,
|
||||
default: &str,
|
||||
) -> T {
|
||||
// this is kinda stupid, but there seems to be a bug in clap when a subcommand
|
||||
// arg is marked both `global(true)` and `default_value("default_value")`.
|
||||
// despite the "global", when the arg is specified on the subcommand, its value
|
||||
|
@ -574,17 +592,37 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) {
|
|||
// again resulting in the default value. the arg having declared a
|
||||
// `default_value()` obviates `is_present(...)` tests since they will always
|
||||
// return true. so we consede and compare against the expected default. :/
|
||||
let (subcommand, sub_matches) = matches.subcommand();
|
||||
let on_command = matches
|
||||
.value_of("rpc_bigtable_instance_name")
|
||||
.map(|v| v != solana_storage_bigtable::DEFAULT_INSTANCE_NAME)
|
||||
.value_of(name)
|
||||
.map(|v| v != default)
|
||||
.unwrap_or(false);
|
||||
let instance_name = if on_command {
|
||||
value_t_or_exit!(matches, "rpc_bigtable_instance_name", String)
|
||||
if on_command {
|
||||
value_t_or_exit!(matches, name, T)
|
||||
} else {
|
||||
let sub_matches = sub_matches.as_ref().unwrap();
|
||||
value_t_or_exit!(sub_matches, "rpc_bigtable_instance_name", String)
|
||||
};
|
||||
value_t_or_exit!(sub_matches, name, T)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) {
|
||||
let runtime = tokio::runtime::Runtime::new().unwrap();
|
||||
|
||||
let verbose = matches.is_present("verbose");
|
||||
let output_format = OutputFormat::from_matches(matches, "output_format", verbose);
|
||||
|
||||
let (subcommand, sub_matches) = matches.subcommand();
|
||||
let instance_name = get_global_subcommand_arg(
|
||||
matches,
|
||||
sub_matches,
|
||||
"rpc_bigtable_instance_name",
|
||||
solana_storage_bigtable::DEFAULT_INSTANCE_NAME,
|
||||
);
|
||||
let app_profile_id = get_global_subcommand_arg(
|
||||
matches,
|
||||
sub_matches,
|
||||
"rpc_bigtable_app_profile_id",
|
||||
solana_storage_bigtable::DEFAULT_APP_PROFILE_ID,
|
||||
);
|
||||
|
||||
let future = match (subcommand, sub_matches) {
|
||||
("upload", Some(arg_matches)) => {
|
||||
|
@ -599,6 +637,7 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) {
|
|||
let config = solana_storage_bigtable::LedgerStorageConfig {
|
||||
read_only: false,
|
||||
instance_name,
|
||||
app_profile_id,
|
||||
..solana_storage_bigtable::LedgerStorageConfig::default()
|
||||
};
|
||||
runtime.block_on(upload(
|
||||
|
@ -614,6 +653,7 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) {
|
|||
let config = solana_storage_bigtable::LedgerStorageConfig {
|
||||
read_only: !arg_matches.is_present("force"),
|
||||
instance_name,
|
||||
app_profile_id,
|
||||
..solana_storage_bigtable::LedgerStorageConfig::default()
|
||||
};
|
||||
runtime.block_on(delete_slots(slots, config))
|
||||
|
@ -622,6 +662,7 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) {
|
|||
let config = solana_storage_bigtable::LedgerStorageConfig {
|
||||
read_only: true,
|
||||
instance_name,
|
||||
app_profile_id,
|
||||
..solana_storage_bigtable::LedgerStorageConfig::default()
|
||||
};
|
||||
runtime.block_on(first_available_block(config))
|
||||
|
@ -631,6 +672,7 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) {
|
|||
let config = solana_storage_bigtable::LedgerStorageConfig {
|
||||
read_only: false,
|
||||
instance_name,
|
||||
app_profile_id,
|
||||
..solana_storage_bigtable::LedgerStorageConfig::default()
|
||||
};
|
||||
runtime.block_on(block(slot, output_format, config))
|
||||
|
@ -641,6 +683,7 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) {
|
|||
let config = solana_storage_bigtable::LedgerStorageConfig {
|
||||
read_only: false,
|
||||
instance_name,
|
||||
app_profile_id,
|
||||
..solana_storage_bigtable::LedgerStorageConfig::default()
|
||||
};
|
||||
|
||||
|
@ -652,6 +695,7 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) {
|
|||
let config = solana_storage_bigtable::LedgerStorageConfig {
|
||||
read_only: false,
|
||||
instance_name,
|
||||
app_profile_id,
|
||||
..solana_storage_bigtable::LedgerStorageConfig::default()
|
||||
};
|
||||
|
||||
|
@ -663,10 +707,13 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) {
|
|||
|
||||
let ref_instance_name =
|
||||
value_t_or_exit!(arg_matches, "reference_instance_name", String);
|
||||
let ref_app_profile_id =
|
||||
value_t_or_exit!(arg_matches, "reference_app_profile_id", String);
|
||||
let ref_config = solana_storage_bigtable::LedgerStorageConfig {
|
||||
read_only: false,
|
||||
credential_type: CredentialType::Filepath(credential_path),
|
||||
instance_name: ref_instance_name,
|
||||
app_profile_id: ref_app_profile_id,
|
||||
..solana_storage_bigtable::LedgerStorageConfig::default()
|
||||
};
|
||||
|
||||
|
@ -681,6 +728,7 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) {
|
|||
let config = solana_storage_bigtable::LedgerStorageConfig {
|
||||
read_only: false,
|
||||
instance_name,
|
||||
app_profile_id,
|
||||
..solana_storage_bigtable::LedgerStorageConfig::default()
|
||||
};
|
||||
|
||||
|
@ -700,6 +748,7 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) {
|
|||
let config = solana_storage_bigtable::LedgerStorageConfig {
|
||||
read_only: true,
|
||||
instance_name,
|
||||
app_profile_id,
|
||||
..solana_storage_bigtable::LedgerStorageConfig::default()
|
||||
};
|
||||
|
||||
|
|
|
@ -174,15 +174,18 @@ impl JsonRpcConfig {
|
|||
pub struct RpcBigtableConfig {
|
||||
pub enable_bigtable_ledger_upload: bool,
|
||||
pub bigtable_instance_name: String,
|
||||
pub bigtable_app_profile_id: String,
|
||||
pub timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
impl Default for RpcBigtableConfig {
|
||||
fn default() -> Self {
|
||||
let bigtable_instance_name = solana_storage_bigtable::DEFAULT_INSTANCE_NAME.to_string();
|
||||
let bigtable_app_profile_id = solana_storage_bigtable::DEFAULT_APP_PROFILE_ID.to_string();
|
||||
Self {
|
||||
enable_bigtable_ledger_upload: false,
|
||||
bigtable_instance_name,
|
||||
bigtable_app_profile_id,
|
||||
timeout: None,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -395,6 +395,7 @@ impl JsonRpcService {
|
|||
if let Some(RpcBigtableConfig {
|
||||
enable_bigtable_ledger_upload,
|
||||
ref bigtable_instance_name,
|
||||
ref bigtable_app_profile_id,
|
||||
timeout,
|
||||
}) = config.rpc_bigtable_config
|
||||
{
|
||||
|
@ -403,6 +404,7 @@ impl JsonRpcService {
|
|||
timeout,
|
||||
credential_type: CredentialType::Filepath(None),
|
||||
instance_name: bigtable_instance_name.clone(),
|
||||
app_profile_id: bigtable_app_profile_id.clone(),
|
||||
};
|
||||
runtime
|
||||
.block_on(solana_storage_bigtable::LedgerStorage::new_with_config(
|
||||
|
|
|
@ -109,6 +109,7 @@ pub struct BigTableConnection {
|
|||
access_token: Option<AccessToken>,
|
||||
channel: tonic::transport::Channel,
|
||||
table_prefix: String,
|
||||
app_profile_id: String,
|
||||
timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
|
@ -123,6 +124,7 @@ impl BigTableConnection {
|
|||
///
|
||||
pub async fn new(
|
||||
instance_name: &str,
|
||||
app_profile_id: &str,
|
||||
read_only: bool,
|
||||
timeout: Option<Duration>,
|
||||
credential_type: CredentialType,
|
||||
|
@ -137,6 +139,7 @@ impl BigTableConnection {
|
|||
.map_err(|err| Error::InvalidUri(endpoint, err.to_string()))?
|
||||
.connect_lazy(),
|
||||
table_prefix: format!("projects/emulator/instances/{}/tables/", instance_name),
|
||||
app_profile_id: app_profile_id.to_string(),
|
||||
timeout,
|
||||
})
|
||||
}
|
||||
|
@ -181,6 +184,7 @@ impl BigTableConnection {
|
|||
access_token: Some(access_token),
|
||||
channel: endpoint.connect_lazy(),
|
||||
table_prefix,
|
||||
app_profile_id: app_profile_id.to_string(),
|
||||
timeout,
|
||||
})
|
||||
}
|
||||
|
@ -214,6 +218,7 @@ impl BigTableConnection {
|
|||
access_token: self.access_token.clone(),
|
||||
client,
|
||||
table_prefix: self.table_prefix.clone(),
|
||||
app_profile_id: self.app_profile_id.clone(),
|
||||
timeout: self.timeout,
|
||||
}
|
||||
}
|
||||
|
@ -276,6 +281,7 @@ pub struct BigTable<F: FnMut(Request<()>) -> InterceptedRequestResult> {
|
|||
access_token: Option<AccessToken>,
|
||||
client: bigtable_client::BigtableClient<InterceptedService<tonic::transport::Channel, F>>,
|
||||
table_prefix: String,
|
||||
app_profile_id: String,
|
||||
timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
|
@ -387,6 +393,7 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
|
|||
.client
|
||||
.read_rows(ReadRowsRequest {
|
||||
table_name: format!("{}{}", self.table_prefix, table_name),
|
||||
app_profile_id: self.app_profile_id.clone(),
|
||||
rows_limit,
|
||||
rows: Some(RowSet {
|
||||
row_keys: vec![],
|
||||
|
@ -416,7 +423,6 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
|
|||
],
|
||||
})),
|
||||
}),
|
||||
..ReadRowsRequest::default()
|
||||
})
|
||||
.await?
|
||||
.into_inner();
|
||||
|
@ -450,6 +456,7 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
|
|||
.client
|
||||
.read_rows(ReadRowsRequest {
|
||||
table_name: format!("{}{}", self.table_prefix, table_name),
|
||||
app_profile_id: self.app_profile_id.clone(),
|
||||
rows_limit,
|
||||
rows: Some(RowSet {
|
||||
row_keys: vec![],
|
||||
|
@ -465,7 +472,6 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
|
|||
// Only return the latest version of each cell
|
||||
filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)),
|
||||
}),
|
||||
..ReadRowsRequest::default()
|
||||
})
|
||||
.await?
|
||||
.into_inner();
|
||||
|
@ -485,6 +491,7 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
|
|||
.client
|
||||
.read_rows(ReadRowsRequest {
|
||||
table_name: format!("{}{}", self.table_prefix, table_name),
|
||||
app_profile_id: self.app_profile_id.clone(),
|
||||
rows_limit: 0, // return all keys
|
||||
rows: Some(RowSet {
|
||||
row_keys: row_keys
|
||||
|
@ -497,7 +504,6 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
|
|||
// Only return the latest version of each cell
|
||||
filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)),
|
||||
}),
|
||||
..ReadRowsRequest::default()
|
||||
})
|
||||
.await?
|
||||
.into_inner();
|
||||
|
@ -521,6 +527,7 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
|
|||
.client
|
||||
.read_rows(ReadRowsRequest {
|
||||
table_name: format!("{}{}", self.table_prefix, table_name),
|
||||
app_profile_id: self.app_profile_id.clone(),
|
||||
rows_limit: 1,
|
||||
rows: Some(RowSet {
|
||||
row_keys: vec![row_key.into_bytes()],
|
||||
|
@ -530,7 +537,6 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
|
|||
// Only return the latest version of each cell
|
||||
filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)),
|
||||
}),
|
||||
..ReadRowsRequest::default()
|
||||
})
|
||||
.await?
|
||||
.into_inner();
|
||||
|
@ -562,8 +568,8 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
|
|||
.client
|
||||
.mutate_rows(MutateRowsRequest {
|
||||
table_name: format!("{}{}", self.table_prefix, table_name),
|
||||
app_profile_id: self.app_profile_id.clone(),
|
||||
entries,
|
||||
..MutateRowsRequest::default()
|
||||
})
|
||||
.await?
|
||||
.into_inner();
|
||||
|
@ -616,8 +622,8 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
|
|||
.client
|
||||
.mutate_rows(MutateRowsRequest {
|
||||
table_name: format!("{}{}", self.table_prefix, table_name),
|
||||
app_profile_id: self.app_profile_id.clone(),
|
||||
entries,
|
||||
..MutateRowsRequest::default()
|
||||
})
|
||||
.await?
|
||||
.into_inner();
|
||||
|
|
|
@ -368,6 +368,7 @@ impl From<LegacyTransactionByAddrInfo> for TransactionByAddrInfo {
|
|||
}
|
||||
|
||||
pub const DEFAULT_INSTANCE_NAME: &str = "solana-ledger";
|
||||
pub const DEFAULT_APP_PROFILE_ID: &str = "default";
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum CredentialType {
|
||||
|
@ -381,6 +382,7 @@ pub struct LedgerStorageConfig {
|
|||
pub timeout: Option<std::time::Duration>,
|
||||
pub credential_type: CredentialType,
|
||||
pub instance_name: String,
|
||||
pub app_profile_id: String,
|
||||
}
|
||||
|
||||
impl Default for LedgerStorageConfig {
|
||||
|
@ -390,6 +392,7 @@ impl Default for LedgerStorageConfig {
|
|||
timeout: None,
|
||||
credential_type: CredentialType::Filepath(None),
|
||||
instance_name: DEFAULT_INSTANCE_NAME.to_string(),
|
||||
app_profile_id: DEFAULT_APP_PROFILE_ID.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -419,10 +422,12 @@ impl LedgerStorage {
|
|||
read_only,
|
||||
timeout,
|
||||
instance_name,
|
||||
app_profile_id,
|
||||
credential_type,
|
||||
} = config;
|
||||
let connection = bigtable::BigTableConnection::new(
|
||||
instance_name.as_str(),
|
||||
app_profile_id.as_str(),
|
||||
read_only,
|
||||
timeout,
|
||||
credential_type,
|
||||
|
|
|
@ -175,6 +175,15 @@ fn main() {
|
|||
.default_value("solana-ledger")
|
||||
.help("Name of BigTable instance to target"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("rpc_bigtable_app_profile_id")
|
||||
.long("rpc-bigtable-app-profile-id")
|
||||
.value_name("APP_PROFILE_ID")
|
||||
.takes_value(true)
|
||||
.hidden(true)
|
||||
.default_value(solana_storage_bigtable::DEFAULT_APP_PROFILE_ID)
|
||||
.help("Application profile id to use in Bigtable requests")
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("rpc_pubsub_enable_vote_subscription")
|
||||
.long("rpc-pubsub-enable-vote-subscription")
|
||||
|
@ -670,6 +679,11 @@ fn main() {
|
|||
Some(RpcBigtableConfig {
|
||||
enable_bigtable_ledger_upload: false,
|
||||
bigtable_instance_name: value_t_or_exit!(matches, "rpc_bigtable_instance", String),
|
||||
bigtable_app_profile_id: value_t_or_exit!(
|
||||
matches,
|
||||
"rpc_bigtable_app_profile_id",
|
||||
String
|
||||
),
|
||||
timeout: None,
|
||||
})
|
||||
} else {
|
||||
|
|
|
@ -1278,6 +1278,14 @@ pub fn main() {
|
|||
.default_value(solana_storage_bigtable::DEFAULT_INSTANCE_NAME)
|
||||
.help("Name of the Bigtable instance to upload to")
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("rpc_bigtable_app_profile_id")
|
||||
.long("rpc-bigtable-app-profile-id")
|
||||
.takes_value(true)
|
||||
.value_name("APP_PROFILE_ID")
|
||||
.default_value(solana_storage_bigtable::DEFAULT_APP_PROFILE_ID)
|
||||
.help("Bigtable application profile id to use in requests")
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("rpc_pubsub_worker_threads")
|
||||
.long("rpc-pubsub-worker-threads")
|
||||
|
@ -2395,6 +2403,11 @@ pub fn main() {
|
|||
Some(RpcBigtableConfig {
|
||||
enable_bigtable_ledger_upload: matches.is_present("enable_bigtable_ledger_upload"),
|
||||
bigtable_instance_name: value_t_or_exit!(matches, "rpc_bigtable_instance_name", String),
|
||||
bigtable_app_profile_id: value_t_or_exit!(
|
||||
matches,
|
||||
"rpc_bigtable_app_profile_id",
|
||||
String
|
||||
),
|
||||
timeout: value_t!(matches, "rpc_bigtable_timeout", u64)
|
||||
.ok()
|
||||
.map(Duration::from_secs),
|
||||
|
|
Loading…
Reference in New Issue