[ledger-tool]compare_blocks (#22229)
* 1.made load_credentials accept credential path as a parameter. 2.partial implement bigtable comparasion function * finding missing blocks in bigtables in a specified range * refactor compare-blocks,add unit test for missing_blocks and fmt * compare-block fix last block bug * refactor compare-block and improve wording * Update ledger-tool/src/bigtable.rs Co-authored-by: Tyera Eulberg <teulberg@gmail.com> * update compare-block command-line description * style:improve wording/naming/code style Co-authored-by: Tyera Eulberg <teulberg@gmail.com>
This commit is contained in:
parent
0673dc5375
commit
d9220652ad
|
@ -4,6 +4,8 @@ use {
|
||||||
clap::{
|
clap::{
|
||||||
value_t, value_t_or_exit, values_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand,
|
value_t, value_t_or_exit, values_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand,
|
||||||
},
|
},
|
||||||
|
log::info,
|
||||||
|
serde_json::json,
|
||||||
solana_clap_utils::{
|
solana_clap_utils::{
|
||||||
input_parsers::pubkey_of,
|
input_parsers::pubkey_of,
|
||||||
input_validators::{is_slot, is_valid_pubkey},
|
input_validators::{is_slot, is_valid_pubkey},
|
||||||
|
@ -16,6 +18,7 @@ use {
|
||||||
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature},
|
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature},
|
||||||
solana_transaction_status::{ConfirmedBlock, Encodable, UiTransactionEncoding},
|
solana_transaction_status::{ConfirmedBlock, Encodable, UiTransactionEncoding},
|
||||||
std::{
|
std::{
|
||||||
|
collections::HashSet,
|
||||||
path::Path,
|
path::Path,
|
||||||
process::exit,
|
process::exit,
|
||||||
result::Result,
|
result::Result,
|
||||||
|
@ -30,7 +33,7 @@ async fn upload(
|
||||||
allow_missing_metadata: bool,
|
allow_missing_metadata: bool,
|
||||||
force_reupload: bool,
|
force_reupload: bool,
|
||||||
) -> Result<(), Box<dyn std::error::Error>> {
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None)
|
let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None, None)
|
||||||
.await
|
.await
|
||||||
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?;
|
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?;
|
||||||
|
|
||||||
|
@ -48,7 +51,7 @@ async fn upload(
|
||||||
|
|
||||||
async fn delete_slots(slots: Vec<Slot>, dry_run: bool) -> Result<(), Box<dyn std::error::Error>> {
|
async fn delete_slots(slots: Vec<Slot>, dry_run: bool) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let read_only = dry_run;
|
let read_only = dry_run;
|
||||||
let bigtable = solana_storage_bigtable::LedgerStorage::new(read_only, None)
|
let bigtable = solana_storage_bigtable::LedgerStorage::new(read_only, None, None)
|
||||||
.await
|
.await
|
||||||
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?;
|
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?;
|
||||||
|
|
||||||
|
@ -56,7 +59,7 @@ async fn delete_slots(slots: Vec<Slot>, dry_run: bool) -> Result<(), Box<dyn std
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn first_available_block() -> Result<(), Box<dyn std::error::Error>> {
|
async fn first_available_block() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let bigtable = solana_storage_bigtable::LedgerStorage::new(true, None).await?;
|
let bigtable = solana_storage_bigtable::LedgerStorage::new(true, None, None).await?;
|
||||||
match bigtable.get_first_available_block().await? {
|
match bigtable.get_first_available_block().await? {
|
||||||
Some(block) => println!("{}", block),
|
Some(block) => println!("{}", block),
|
||||||
None => println!("No blocks available"),
|
None => println!("No blocks available"),
|
||||||
|
@ -66,7 +69,7 @@ async fn first_available_block() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn block(slot: Slot, output_format: OutputFormat) -> Result<(), Box<dyn std::error::Error>> {
|
async fn block(slot: Slot, output_format: OutputFormat) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None)
|
let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None, None)
|
||||||
.await
|
.await
|
||||||
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?;
|
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?;
|
||||||
|
|
||||||
|
@ -81,7 +84,7 @@ async fn block(slot: Slot, output_format: OutputFormat) -> Result<(), Box<dyn st
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn blocks(starting_slot: Slot, limit: usize) -> Result<(), Box<dyn std::error::Error>> {
|
async fn blocks(starting_slot: Slot, limit: usize) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None)
|
let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None, None)
|
||||||
.await
|
.await
|
||||||
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?;
|
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?;
|
||||||
|
|
||||||
|
@ -92,12 +95,55 @@ async fn blocks(starting_slot: Slot, limit: usize) -> Result<(), Box<dyn std::er
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn compare_blocks(
|
||||||
|
starting_slot: Slot,
|
||||||
|
limit: usize,
|
||||||
|
credential_path: String,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
assert!(!credential_path.is_empty());
|
||||||
|
|
||||||
|
let owned_bigtable = solana_storage_bigtable::LedgerStorage::new(false, None, None)
|
||||||
|
.await
|
||||||
|
.map_err(|err| format!("failed to connect to owned bigtable: {:?}", err))?;
|
||||||
|
let owned_bigtable_slots = owned_bigtable
|
||||||
|
.get_confirmed_blocks(starting_slot, limit)
|
||||||
|
.await?;
|
||||||
|
info!(
|
||||||
|
"owned bigtable {} blocks found ",
|
||||||
|
owned_bigtable_slots.len()
|
||||||
|
);
|
||||||
|
let reference_bigtable =
|
||||||
|
solana_storage_bigtable::LedgerStorage::new(false, None, Some(credential_path))
|
||||||
|
.await
|
||||||
|
.map_err(|err| format!("failed to connect to reference bigtable: {:?}", err))?;
|
||||||
|
|
||||||
|
let reference_bigtable_slots = reference_bigtable
|
||||||
|
.get_confirmed_blocks(starting_slot, limit)
|
||||||
|
.await?;
|
||||||
|
info!(
|
||||||
|
"reference bigtable {} blocks found ",
|
||||||
|
reference_bigtable_slots.len(),
|
||||||
|
);
|
||||||
|
|
||||||
|
println!(
|
||||||
|
"{}",
|
||||||
|
json!({
|
||||||
|
"num_reference_slots": json!(reference_bigtable_slots.len()),
|
||||||
|
"num_owned_slots": json!(owned_bigtable_slots.len()),
|
||||||
|
"reference_last_block": json!(reference_bigtable_slots.len().checked_sub(1).map(|i| reference_bigtable_slots[i])),
|
||||||
|
"missing_blocks": json!(missing_blocks(&reference_bigtable_slots, &owned_bigtable_slots)),
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn confirm(
|
async fn confirm(
|
||||||
signature: &Signature,
|
signature: &Signature,
|
||||||
verbose: bool,
|
verbose: bool,
|
||||||
output_format: OutputFormat,
|
output_format: OutputFormat,
|
||||||
) -> Result<(), Box<dyn std::error::Error>> {
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None)
|
let bigtable = solana_storage_bigtable::LedgerStorage::new(false, None, None)
|
||||||
.await
|
.await
|
||||||
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?;
|
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?;
|
||||||
|
|
||||||
|
@ -146,7 +192,7 @@ pub async fn transaction_history(
|
||||||
show_transactions: bool,
|
show_transactions: bool,
|
||||||
query_chunk_size: usize,
|
query_chunk_size: usize,
|
||||||
) -> Result<(), Box<dyn std::error::Error>> {
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let bigtable = solana_storage_bigtable::LedgerStorage::new(true, None).await?;
|
let bigtable = solana_storage_bigtable::LedgerStorage::new(true, None, None).await?;
|
||||||
|
|
||||||
let mut loaded_block: Option<(Slot, ConfirmedBlock)> = None;
|
let mut loaded_block: Option<(Slot, ConfirmedBlock)> = None;
|
||||||
while limit > 0 {
|
while limit > 0 {
|
||||||
|
@ -329,6 +375,39 @@ impl BigTableSubCommand for App<'_, '_> {
|
||||||
.help("Maximum number of slots to return"),
|
.help("Maximum number of slots to return"),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
.subcommand(
|
||||||
|
SubCommand::with_name("compare-blocks")
|
||||||
|
.about("Find the missing confirmed blocks of an owned bigtable for a given range \
|
||||||
|
by comparing to a reference bigtable")
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("starting_slot")
|
||||||
|
.validator(is_slot)
|
||||||
|
.value_name("SLOT")
|
||||||
|
.takes_value(true)
|
||||||
|
.index(1)
|
||||||
|
.required(true)
|
||||||
|
.default_value("0")
|
||||||
|
.help("Start listing at this slot"),
|
||||||
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("limit")
|
||||||
|
.validator(is_slot)
|
||||||
|
.value_name("LIMIT")
|
||||||
|
.takes_value(true)
|
||||||
|
.index(2)
|
||||||
|
.required(true)
|
||||||
|
.default_value("1000")
|
||||||
|
.help("Maximum number of slots to check"),
|
||||||
|
).arg(
|
||||||
|
Arg::with_name("reference_credential")
|
||||||
|
.long("reference-credential")
|
||||||
|
.short("c")
|
||||||
|
.value_name("REFERENCE_CREDENTIAL_FILEPATH")
|
||||||
|
.takes_value(true)
|
||||||
|
.required(true)
|
||||||
|
.help("File path for a credential to a reference bigtable"),
|
||||||
|
),
|
||||||
|
)
|
||||||
.subcommand(
|
.subcommand(
|
||||||
SubCommand::with_name("block")
|
SubCommand::with_name("block")
|
||||||
.about("Get a confirmed block")
|
.about("Get a confirmed block")
|
||||||
|
@ -459,6 +538,18 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) {
|
||||||
|
|
||||||
runtime.block_on(blocks(starting_slot, limit))
|
runtime.block_on(blocks(starting_slot, limit))
|
||||||
}
|
}
|
||||||
|
("compare-blocks", Some(arg_matches)) => {
|
||||||
|
let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot);
|
||||||
|
let limit = value_t_or_exit!(arg_matches, "limit", usize);
|
||||||
|
let reference_credential_filepath =
|
||||||
|
value_t_or_exit!(arg_matches, "reference_credential", String);
|
||||||
|
|
||||||
|
runtime.block_on(compare_blocks(
|
||||||
|
starting_slot,
|
||||||
|
limit,
|
||||||
|
reference_credential_filepath,
|
||||||
|
))
|
||||||
|
}
|
||||||
("confirm", Some(arg_matches)) => {
|
("confirm", Some(arg_matches)) => {
|
||||||
let signature = arg_matches
|
let signature = arg_matches
|
||||||
.value_of("signature")
|
.value_of("signature")
|
||||||
|
@ -498,3 +589,54 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) {
|
||||||
exit(1);
|
exit(1);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn missing_blocks(reference: &[Slot], owned: &[Slot]) -> Vec<Slot> {
|
||||||
|
if owned.is_empty() && !reference.is_empty() {
|
||||||
|
return reference.to_owned();
|
||||||
|
} else if owned.is_empty() {
|
||||||
|
return vec![];
|
||||||
|
}
|
||||||
|
|
||||||
|
let owned_hashset: HashSet<_> = owned.iter().collect();
|
||||||
|
let mut missing_slots = vec![];
|
||||||
|
for slot in reference {
|
||||||
|
if !owned_hashset.contains(slot) {
|
||||||
|
missing_slots.push(slot.to_owned());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
missing_slots
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_missing_blocks() {
|
||||||
|
let reference_slots = vec![0, 37, 38, 39, 40, 41, 42, 43, 44, 45];
|
||||||
|
let owned_slots = vec![0, 38, 39, 40, 43, 44, 45, 46, 47];
|
||||||
|
let owned_slots_leftshift = vec![0, 25, 26, 27, 28, 29, 30, 31, 32];
|
||||||
|
let owned_slots_rightshift = vec![0, 44, 46, 47, 48, 49, 50, 51, 52, 53, 54];
|
||||||
|
let missing_slots = vec![37, 41, 42];
|
||||||
|
let missing_slots_leftshift = vec![37, 38, 39, 40, 41, 42, 43, 44, 45];
|
||||||
|
let missing_slots_rightshift = vec![37, 38, 39, 40, 41, 42, 43, 45];
|
||||||
|
assert!(missing_blocks(&[], &[]).is_empty());
|
||||||
|
assert!(missing_blocks(&[], &owned_slots).is_empty());
|
||||||
|
assert_eq!(
|
||||||
|
missing_blocks(&reference_slots, &[]),
|
||||||
|
reference_slots.to_owned()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
missing_blocks(&reference_slots, &owned_slots),
|
||||||
|
missing_slots
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
missing_blocks(&reference_slots, &owned_slots_leftshift),
|
||||||
|
missing_slots_leftshift
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
missing_blocks(&reference_slots, &owned_slots_rightshift),
|
||||||
|
missing_slots_rightshift
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -348,6 +348,7 @@ impl JsonRpcService {
|
||||||
.block_on(solana_storage_bigtable::LedgerStorage::new(
|
.block_on(solana_storage_bigtable::LedgerStorage::new(
|
||||||
!config.enable_bigtable_ledger_upload,
|
!config.enable_bigtable_ledger_upload,
|
||||||
config.rpc_bigtable_timeout,
|
config.rpc_bigtable_timeout,
|
||||||
|
None,
|
||||||
))
|
))
|
||||||
.map(|bigtable_ledger_storage| {
|
.map(|bigtable_ledger_storage| {
|
||||||
info!("BigTable ledger storage initialized");
|
info!("BigTable ledger storage initialized");
|
||||||
|
|
|
@ -16,17 +16,15 @@ use {
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
fn load_credentials() -> Result<Credentials, String> {
|
fn load_credentials(filepath: Option<String>) -> Result<Credentials, String> {
|
||||||
// Use standard GOOGLE_APPLICATION_CREDENTIALS environment variable
|
let path = match filepath {
|
||||||
let credentials_file = std::env::var("GOOGLE_APPLICATION_CREDENTIALS")
|
Some(f) => f,
|
||||||
.map_err(|_| "GOOGLE_APPLICATION_CREDENTIALS environment variable not found".to_string())?;
|
None => std::env::var("GOOGLE_APPLICATION_CREDENTIALS").map_err(|_| {
|
||||||
|
"GOOGLE_APPLICATION_CREDENTIALS environment variable not found".to_string()
|
||||||
Credentials::from_file(&credentials_file).map_err(|err| {
|
})?,
|
||||||
format!(
|
};
|
||||||
"Failed to read GCP credentials from {}: {}",
|
Credentials::from_file(&path)
|
||||||
credentials_file, err
|
.map_err(|err| format!("Failed to read GCP credentials from {}: {}", path, err))
|
||||||
)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
@ -38,8 +36,8 @@ pub struct AccessToken {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AccessToken {
|
impl AccessToken {
|
||||||
pub async fn new(scope: Scope) -> Result<Self, String> {
|
pub async fn new(scope: Scope, credential_filepath: Option<String>) -> Result<Self, String> {
|
||||||
let credentials = load_credentials()?;
|
let credentials = load_credentials(credential_filepath)?;
|
||||||
if let Err(err) = credentials.rsa_key() {
|
if let Err(err) = credentials.rsa_key() {
|
||||||
Err(format!("Invalid rsa key: {}", err))
|
Err(format!("Invalid rsa key: {}", err))
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -125,6 +125,7 @@ impl BigTableConnection {
|
||||||
instance_name: &str,
|
instance_name: &str,
|
||||||
read_only: bool,
|
read_only: bool,
|
||||||
timeout: Option<Duration>,
|
timeout: Option<Duration>,
|
||||||
|
credential_path: Option<String>,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
match std::env::var("BIGTABLE_EMULATOR_HOST") {
|
match std::env::var("BIGTABLE_EMULATOR_HOST") {
|
||||||
Ok(endpoint) => {
|
Ok(endpoint) => {
|
||||||
|
@ -141,11 +142,14 @@ impl BigTableConnection {
|
||||||
}
|
}
|
||||||
|
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
let access_token = AccessToken::new(if read_only {
|
let access_token = AccessToken::new(
|
||||||
|
if read_only {
|
||||||
Scope::BigTableDataReadOnly
|
Scope::BigTableDataReadOnly
|
||||||
} else {
|
} else {
|
||||||
Scope::BigTableData
|
Scope::BigTableData
|
||||||
})
|
},
|
||||||
|
credential_path,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
.map_err(Error::AccessToken)?;
|
.map_err(Error::AccessToken)?;
|
||||||
|
|
||||||
|
|
|
@ -345,9 +345,14 @@ pub struct LedgerStorage {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LedgerStorage {
|
impl LedgerStorage {
|
||||||
pub async fn new(read_only: bool, timeout: Option<std::time::Duration>) -> Result<Self> {
|
pub async fn new(
|
||||||
|
read_only: bool,
|
||||||
|
timeout: Option<std::time::Duration>,
|
||||||
|
credential_path: Option<String>,
|
||||||
|
) -> Result<Self> {
|
||||||
let connection =
|
let connection =
|
||||||
bigtable::BigTableConnection::new("solana-ledger", read_only, timeout).await?;
|
bigtable::BigTableConnection::new("solana-ledger", read_only, timeout, credential_path)
|
||||||
|
.await?;
|
||||||
Ok(Self { connection })
|
Ok(Self { connection })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue