From f9049d6ee4e932a582b7825e236468ea37db8952 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Mon, 20 Jul 2020 21:06:13 -0700 Subject: [PATCH] Add ledger-tool bigtable subcommands --- ledger-tool/Cargo.toml | 7 +- ledger-tool/src/bigtable.rs | 538 ++++++++++++++++++++++++++++++++++++ ledger-tool/src/main.rs | 6 +- 3 files changed, 549 insertions(+), 2 deletions(-) create mode 100644 ledger-tool/src/bigtable.rs diff --git a/ledger-tool/Cargo.toml b/ledger-tool/Cargo.toml index f809a185d..1a1c576be 100644 --- a/ledger-tool/Cargo.toml +++ b/ledger-tool/Cargo.toml @@ -12,22 +12,27 @@ homepage = "https://solana.com/" bs58 = "0.3.1" bytecount = "0.6.0" clap = "2.33.1" +futures = "0.3.5" +futures-util = "0.3.5" histogram = "*" log = { version = "0.4.8" } +regex = "1" serde_json = "1.0.56" serde_yaml = "0.8.13" solana-clap-utils = { path = "../clap-utils", version = "1.4.0" } solana-cli = { path = "../cli", version = "1.4.0" } solana-ledger = { path = "../ledger", version = "1.4.0" } solana-logger = { path = "../logger", version = "1.4.0" } +solana-measure = { path = "../measure", version = "1.4.0" } solana-runtime = { path = "../runtime", version = "1.4.0" } solana-sdk = { path = "../sdk", version = "1.4.0" } solana-stake-program = { path = "../programs/stake", version = "1.4.0" } +solana-storage-bigtable = { path = "../storage-bigtable", version = "1.4.0" } solana-transaction-status = { path = "../transaction-status", version = "1.4.0" } solana-version = { path = "../version", version = "1.4.0" } solana-vote-program = { path = "../programs/vote", version = "1.4.0" } tempfile = "3.1.0" -regex = "1" +tokio = { version = "0.2.22", features = ["full"] } [dev-dependencies] assert_cmd = "1.0" diff --git a/ledger-tool/src/bigtable.rs b/ledger-tool/src/bigtable.rs new file mode 100644 index 000000000..aa611216b --- /dev/null +++ b/ledger-tool/src/bigtable.rs @@ -0,0 +1,538 @@ +/// The `bigtable` subcommand +use clap::{value_t, value_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand}; +use log::*; +use solana_clap_utils::{ + input_parsers::pubkey_of, + input_validators::{is_slot, is_valid_pubkey}, +}; +use solana_cli::display::println_transaction; +use solana_ledger::{blockstore::Blockstore, blockstore_db::AccessType}; +use solana_measure::measure::Measure; +use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}; +use solana_transaction_status::UiTransactionEncoding; +use std::{collections::HashSet, path::Path, process::exit, result::Result, time::Duration}; +use tokio::time::delay_for; + +// Attempt to upload this many blocks in parallel +const NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL: usize = 32; + +// Read up to this many blocks from blockstore before blocking on the upload process +const BLOCK_READ_AHEAD_DEPTH: usize = NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 2; + +async fn upload( + blockstore: Blockstore, + starting_slot: Slot, + ending_slot: Option, + allow_missing_metadata: bool, +) -> Result<(), Box> { + let mut measure = Measure::start("entire upload"); + + let bigtable = solana_storage_bigtable::LedgerStorage::new(false) + .await + .map_err(|err| format!("Failed to connect to storage: {:?}", err))?; + + info!("Loading ledger slots..."); + let blockstore_slots: Vec<_> = blockstore + .slot_meta_iterator(starting_slot) + .map_err(|err| { + format!( + "Failed to load entries starting from slot {}: {:?}", + starting_slot, err + ) + })? + .filter_map(|(slot, _slot_meta)| { + if let Some(ending_slot) = &ending_slot { + if slot > *ending_slot { + return None; + } + } + Some(slot) + }) + .collect(); + + if blockstore_slots.is_empty() { + info!("Ledger has no slots in the specified range"); + return Ok(()); + } + info!( + "Found {} slots in the range ({}, {})", + blockstore_slots.len(), + blockstore_slots.first().unwrap(), + blockstore_slots.last().unwrap() + ); + + let mut blockstore_slots_with_no_confirmed_block = HashSet::new(); + + // Gather the blocks that are already present in bigtable, by slot + let bigtable_slots = { + let mut bigtable_slots = vec![]; + let first_blockstore_slot = *blockstore_slots.first().unwrap(); + let last_blockstore_slot = *blockstore_slots.last().unwrap(); + info!( + "Loading list of bigtable blocks between slots {} and {}...", + first_blockstore_slot, last_blockstore_slot + ); + + let mut start_slot = *blockstore_slots.first().unwrap(); + while start_slot <= last_blockstore_slot { + let mut next_bigtable_slots = loop { + match bigtable.get_confirmed_blocks(start_slot, 1000).await { + Ok(slots) => break slots, + Err(err) => { + error!("get_confirmed_blocks for {} failed: {:?}", start_slot, err); + // Consider exponential backoff... + delay_for(Duration::from_secs(2)).await; + } + } + }; + if next_bigtable_slots.is_empty() { + break; + } + bigtable_slots.append(&mut next_bigtable_slots); + start_slot = bigtable_slots.last().unwrap() + 1; + } + bigtable_slots + .into_iter() + .filter(|slot| *slot <= last_blockstore_slot) + .collect::>() + }; + + // The blocks that still need to be uploaded is the difference between what's already in the + // bigtable and what's in blockstore... + let blocks_to_upload = { + let blockstore_slots = blockstore_slots.iter().cloned().collect::>(); + let bigtable_slots = bigtable_slots.into_iter().collect::>(); + + let mut blocks_to_upload = blockstore_slots + .difference(&blockstore_slots_with_no_confirmed_block) + .cloned() + .collect::>() + .difference(&bigtable_slots) + .cloned() + .collect::>(); + blocks_to_upload.sort(); + blocks_to_upload + }; + + if blocks_to_upload.is_empty() { + info!("No blocks need to be uploaded to bigtable"); + return Ok(()); + } + info!( + "{} blocks to be uploaded to the bucket in the range ({}, {})", + blocks_to_upload.len(), + blocks_to_upload.first().unwrap(), + blocks_to_upload.last().unwrap() + ); + + // Load the blocks out of blockstore in a separate thread to allow for concurrent block uploading + let (_loader_thread, receiver) = { + let (sender, receiver) = std::sync::mpsc::sync_channel(BLOCK_READ_AHEAD_DEPTH); + ( + std::thread::spawn(move || { + let mut measure = Measure::start("block loader thread"); + for slot in &blocks_to_upload { + let _ = match blockstore.get_confirmed_block( + *slot, + Some(solana_transaction_status::UiTransactionEncoding::Binary), + ) { + Ok(confirmed_block) => sender.send((*slot, Some(confirmed_block))), + Err(err) => { + warn!( + "Failed to get load confirmed block from slot {}: {:?}", + slot, err + ); + sender.send((*slot, None)) + } + }; + } + measure.stop(); + info!("{} to load {} blocks", measure, blocks_to_upload.len()); + }), + receiver, + ) + }; + + let mut failures = 0; + use futures::stream::StreamExt; + + let mut stream = + tokio::stream::iter(receiver.into_iter()).chunks(NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL); + + while let Some(blocks) = stream.next().await { + let mut measure_upload = Measure::start("upload"); + let mut num_blocks = blocks.len(); + + let uploads = blocks.into_iter().filter_map(|(slot, block)| match block { + None => { + blockstore_slots_with_no_confirmed_block.insert(slot); + num_blocks -= 1; + None + } + Some(confirmed_block) => { + if confirmed_block + .transactions + .iter() + .any(|transaction| transaction.meta.is_none()) + { + if allow_missing_metadata { + info!("Transaction metadata missing from slot {}", slot); + } else { + panic!("Transaction metadata missing from slot {}", slot); + } + } + Some(bigtable.upload_confirmed_block(slot, confirmed_block)) + } + }); + + for result in futures::future::join_all(uploads).await { + if result.is_err() { + error!("upload_confirmed_block() failed: {:?}", result.err()); + failures += 1; + } + } + + measure_upload.stop(); + info!("{} for {} blocks", measure_upload, num_blocks); + } + + measure.stop(); + info!("{}", measure); + if failures > 0 { + Err(format!("Incomplete upload, {} operations failed", failures).into()) + } else { + Ok(()) + } +} + +async fn first_available_block() -> Result<(), Box> { + let bigtable = solana_storage_bigtable::LedgerStorage::new(true).await?; + match bigtable.get_first_available_block().await? { + Some(block) => println!("{}", block), + None => println!("No blocks available"), + } + + Ok(()) +} + +async fn block(slot: Slot) -> Result<(), Box> { + let bigtable = solana_storage_bigtable::LedgerStorage::new(false) + .await + .map_err(|err| format!("Failed to connect to storage: {:?}", err))?; + + let block = bigtable + .get_confirmed_block(slot, UiTransactionEncoding::Binary) + .await?; + + println!("Slot: {}", slot); + println!("Parent Slot: {}", block.parent_slot); + println!("Blockhash: {}", block.blockhash); + println!("Previous Blockhash: {}", block.previous_blockhash); + if block.block_time.is_some() { + println!("Block Time: {:?}", block.block_time); + } + if !block.rewards.is_empty() { + println!("Rewards: {:?}", block.rewards); + } + for (index, transaction_with_meta) in block.transactions.iter().enumerate() { + println!("Transaction {}:", index); + println_transaction( + &transaction_with_meta.transaction.decode().unwrap(), + &transaction_with_meta.meta, + " ", + ); + } + Ok(()) +} + +async fn blocks(starting_slot: Slot, limit: usize) -> Result<(), Box> { + let bigtable = solana_storage_bigtable::LedgerStorage::new(false) + .await + .map_err(|err| format!("Failed to connect to storage: {:?}", err))?; + + let slots = bigtable.get_confirmed_blocks(starting_slot, limit).await?; + println!("{:?}", slots); + println!("{} blocks found", slots.len()); + + Ok(()) +} + +async fn confirm(signature: &Signature, verbose: bool) -> Result<(), Box> { + let bigtable = solana_storage_bigtable::LedgerStorage::new(false) + .await + .map_err(|err| format!("Failed to connect to storage: {:?}", err))?; + + let transaction_status = bigtable.get_signature_status(signature).await?; + + if verbose { + match bigtable + .get_confirmed_transaction(signature, UiTransactionEncoding::Binary) + .await + { + Ok(Some(confirmed_transaction)) => { + println!( + "\nTransaction executed in slot {}:", + confirmed_transaction.slot + ); + println_transaction( + &confirmed_transaction + .transaction + .transaction + .decode() + .expect("Successful decode"), + &confirmed_transaction.transaction.meta, + " ", + ); + } + Ok(None) => println!("Confirmed transaction details not available"), + Err(err) => println!("Unable to get confirmed transaction details: {}", err), + } + println!(); + } + match transaction_status.status { + Ok(_) => println!("Confirmed"), + Err(err) => println!("Transaction failed: {}", err), + } + Ok(()) +} + +pub async fn transaction_history( + address: &Pubkey, + limit: usize, + start_after: Option<&Signature>, + verbose: bool, +) -> Result<(), Box> { + let bigtable = solana_storage_bigtable::LedgerStorage::new(true).await?; + + let results = bigtable + .get_confirmed_signatures_for_address(address, start_after, limit) + .await?; + + for (signature, slot, memo, err) in results { + if verbose { + println!( + "{}, slot={}, memo=\"{}\", status={}", + signature, + slot, + memo.unwrap_or_else(|| "".to_string()), + match err { + None => "Confirmed".to_string(), + Some(err) => format!("Failed: {:?}", err), + } + ); + } else { + println!("{}", signature); + } + } + Ok(()) +} + +pub trait BigTableSubCommand { + fn bigtable_subcommand(self) -> Self; +} + +impl BigTableSubCommand for App<'_, '_> { + fn bigtable_subcommand(self) -> Self { + self.subcommand( + SubCommand::with_name("bigtable") + .about("Ledger data on a BigTable instance") + .setting(AppSettings::ArgRequiredElseHelp) + .subcommand( + SubCommand::with_name("upload") + .about("Upload the ledger to BigTable") + .arg( + Arg::with_name("starting_slot") + .long("starting-slot") + .validator(is_slot) + .value_name("SLOT") + .takes_value(true) + .index(1) + .help( + "Start uploading at this slot [default: first available slot]", + ), + ) + .arg( + Arg::with_name("ending_slot") + .long("ending-slot") + .validator(is_slot) + .value_name("SLOT") + .takes_value(true) + .index(2) + .help("Stop uploading at this slot [default: last available slot]"), + ) + .arg( + Arg::with_name("allow_missing_metadata") + .long("allow-missing-metadata") + .takes_value(false) + .help("Don't panic if transaction metadata is missing"), + ), + ) + .subcommand( + SubCommand::with_name("first-available-block") + .about("Get the first available block in the storage"), + ) + .subcommand( + SubCommand::with_name("blocks") + .about("Get a list of slots with confirmed blocks for the given range") + .arg( + Arg::with_name("starting_slot") + .long("starting-slot") + .validator(is_slot) + .value_name("SLOT") + .takes_value(true) + .index(1) + .required(true) + .default_value("0") + .help("Start listing at this slot"), + ) + .arg( + Arg::with_name("limit") + .long("limit") + .validator(is_slot) + .value_name("LIMIT") + .takes_value(true) + .index(2) + .required(true) + .default_value("1000") + .help("Maximum number of slots to return"), + ), + ) + .subcommand( + SubCommand::with_name("block") + .about("Get a confirmed block") + .arg( + Arg::with_name("slot") + .long("slot") + .validator(is_slot) + .value_name("SLOT") + .takes_value(true) + .index(1) + .required(true), + ), + ) + .subcommand( + SubCommand::with_name("confirm") + .about("Confirm transaction by signature") + .arg( + Arg::with_name("signature") + .long("signature") + .value_name("TRANSACTION_SIGNATURE") + .takes_value(true) + .required(true) + .index(1) + .help("The transaction signature to confirm"), + ) + .arg( + Arg::with_name("verbose") + .short("v") + .long("verbose") + .takes_value(false) + .help("Show additional information"), + ), + ) + .subcommand( + SubCommand::with_name("transaction-history") + .about( + "Show historical transactions affecting the given address, \ + ordered based on the slot in which they were confirmed in \ + from lowest to highest slot", + ) + .arg( + Arg::with_name("address") + .index(1) + .value_name("ADDRESS") + .required(true) + .validator(is_valid_pubkey) + .help("Account address"), + ) + .arg( + Arg::with_name("limit") + .long("limit") + .takes_value(true) + .value_name("LIMIT") + .validator(is_slot) + .index(2) + .default_value("1000") + .help("Maximum number of transaction signatures to return"), + ) + .arg( + Arg::with_name("after") + .long("after") + .value_name("TRANSACTION_SIGNATURE") + .takes_value(true) + .help("Start with the first signature older than this one"), + ) + .arg( + Arg::with_name("verbose") + .short("v") + .long("verbose") + .takes_value(false) + .help("Show additional information"), + ), + ), + ) + } +} + +pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) { + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + + let future = match matches.subcommand() { + ("upload", Some(arg_matches)) => { + let starting_slot = value_t!(arg_matches, "starting_slot", Slot).unwrap_or(0); + let ending_slot = value_t!(arg_matches, "ending_slot", Slot).ok(); + let allow_missing_metadata = arg_matches.is_present("allow_missing_metadata"); + let blockstore = + crate::open_blockstore(&ledger_path, AccessType::TryPrimaryThenSecondary, None); + + runtime.block_on(upload( + blockstore, + starting_slot, + ending_slot, + allow_missing_metadata, + )) + } + ("first-available-block", Some(_arg_matches)) => runtime.block_on(first_available_block()), + ("block", Some(arg_matches)) => { + let slot = value_t_or_exit!(arg_matches, "slot", Slot); + runtime.block_on(block(slot)) + } + ("blocks", Some(arg_matches)) => { + let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot); + let limit = value_t_or_exit!(arg_matches, "limit", usize); + + runtime.block_on(blocks(starting_slot, limit)) + } + ("confirm", Some(arg_matches)) => { + let signature = arg_matches + .value_of("signature") + .unwrap() + .parse() + .expect("Invalid signature"); + let verbose = arg_matches.is_present("verbose"); + + runtime.block_on(confirm(&signature, verbose)) + } + ("transaction-history", Some(arg_matches)) => { + let address = pubkey_of(arg_matches, "address").unwrap(); + let limit = value_t_or_exit!(arg_matches, "limit", usize); + let after = arg_matches + .value_of("after") + .map(|signature| signature.parse().expect("Invalid signature")); + let verbose = arg_matches.is_present("verbose"); + + runtime.block_on(transaction_history( + &address, + limit, + after.as_ref(), + verbose, + )) + } + _ => unreachable!(), + }; + + future.unwrap_or_else(|err| { + eprintln!("{:?}", err); + exit(1); + }); +} diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 83ca891e3..b781ee777 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -2,6 +2,7 @@ use clap::{ crate_description, crate_name, value_t, value_t_or_exit, values_t_or_exit, App, Arg, ArgMatches, SubCommand, }; +use log::*; use regex::Regex; use serde_json::json; use solana_clap_utils::input_validators::{is_parsable, is_slot}; @@ -43,7 +44,8 @@ use std::{ sync::Arc, }; -use log::*; +mod bigtable; +use bigtable::*; #[derive(PartialEq)] enum LedgerOutputMethod { @@ -805,6 +807,7 @@ fn main() { .global(true) .help("Use DIR for ledger location"), ) + .bigtable_subcommand() .subcommand( SubCommand::with_name("print") .about("Print the ledger") @@ -1145,6 +1148,7 @@ fn main() { .map(BlockstoreRecoveryMode::from); match matches.subcommand() { + ("bigtable", Some(arg_matches)) => bigtable_process_command(&ledger_path, arg_matches), ("print", Some(arg_matches)) => { let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot); let num_slots = value_t!(arg_matches, "num_slots", Slot).ok();