diff --git a/ledger-tool/src/bigtable.rs b/ledger-tool/src/bigtable.rs index 3f137f1d3e..725789922e 100644 --- a/ledger-tool/src/bigtable.rs +++ b/ledger-tool/src/bigtable.rs @@ -1,23 +1,14 @@ /// The `bigtable` subcommand use clap::{value_t, value_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand}; -use log::*; use solana_clap_utils::{ input_parsers::pubkey_of, input_validators::{is_slot, is_valid_pubkey}, }; use solana_cli::display::println_transaction; use solana_ledger::{blockstore::Blockstore, blockstore_db::AccessType}; -use solana_measure::measure::Measure; use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}; use solana_transaction_status::UiTransactionEncoding; -use std::{collections::HashSet, path::Path, process::exit, result::Result, time::Duration}; -use tokio::time::delay_for; - -// Attempt to upload this many blocks in parallel -const NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL: usize = 32; - -// Read up to this many blocks from blockstore before blocking on the upload process -const BLOCK_READ_AHEAD_DEPTH: usize = NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 2; +use std::{path::Path, process::exit, result::Result, sync::Arc}; async fn upload( blockstore: Blockstore, @@ -25,194 +16,18 @@ async fn upload( ending_slot: Option, allow_missing_metadata: bool, ) -> Result<(), Box> { - let mut measure = Measure::start("entire upload"); - let bigtable = solana_storage_bigtable::LedgerStorage::new(false) .await .map_err(|err| format!("Failed to connect to storage: {:?}", err))?; - info!("Loading ledger slots..."); - let blockstore_slots: Vec<_> = blockstore - .slot_meta_iterator(starting_slot) - .map_err(|err| { - format!( - "Failed to load entries starting from slot {}: {:?}", - starting_slot, err - ) - })? - .filter_map(|(slot, _slot_meta)| { - if let Some(ending_slot) = &ending_slot { - if slot > *ending_slot { - return None; - } - } - Some(slot) - }) - .collect(); - - if blockstore_slots.is_empty() { - info!("Ledger has no slots in the specified range"); - return Ok(()); - } - info!( - "Found {} slots in the range ({}, {})", - blockstore_slots.len(), - blockstore_slots.first().unwrap(), - blockstore_slots.last().unwrap() - ); - - let mut blockstore_slots_with_no_confirmed_block = HashSet::new(); - - // Gather the blocks that are already present in bigtable, by slot - let bigtable_slots = { - let mut bigtable_slots = vec![]; - let first_blockstore_slot = *blockstore_slots.first().unwrap(); - let last_blockstore_slot = *blockstore_slots.last().unwrap(); - info!( - "Loading list of bigtable blocks between slots {} and {}...", - first_blockstore_slot, last_blockstore_slot - ); - - let mut start_slot = *blockstore_slots.first().unwrap(); - while start_slot <= last_blockstore_slot { - let mut next_bigtable_slots = loop { - match bigtable.get_confirmed_blocks(start_slot, 1000).await { - Ok(slots) => break slots, - Err(err) => { - error!("get_confirmed_blocks for {} failed: {:?}", start_slot, err); - // Consider exponential backoff... - delay_for(Duration::from_secs(2)).await; - } - } - }; - if next_bigtable_slots.is_empty() { - break; - } - bigtable_slots.append(&mut next_bigtable_slots); - start_slot = bigtable_slots.last().unwrap() + 1; - } - bigtable_slots - .into_iter() - .filter(|slot| *slot <= last_blockstore_slot) - .collect::>() - }; - - // The blocks that still need to be uploaded is the difference between what's already in the - // bigtable and what's in blockstore... - let blocks_to_upload = { - let blockstore_slots = blockstore_slots.iter().cloned().collect::>(); - let bigtable_slots = bigtable_slots.into_iter().collect::>(); - - let mut blocks_to_upload = blockstore_slots - .difference(&blockstore_slots_with_no_confirmed_block) - .cloned() - .collect::>() - .difference(&bigtable_slots) - .cloned() - .collect::>(); - blocks_to_upload.sort(); - blocks_to_upload - }; - - if blocks_to_upload.is_empty() { - info!("No blocks need to be uploaded to bigtable"); - return Ok(()); - } - info!( - "{} blocks to be uploaded to the bucket in the range ({}, {})", - blocks_to_upload.len(), - blocks_to_upload.first().unwrap(), - blocks_to_upload.last().unwrap() - ); - - // Load the blocks out of blockstore in a separate thread to allow for concurrent block uploading - let (_loader_thread, receiver) = { - let (sender, receiver) = std::sync::mpsc::sync_channel(BLOCK_READ_AHEAD_DEPTH); - ( - std::thread::spawn(move || { - let mut measure = Measure::start("block loader thread"); - for (i, slot) in blocks_to_upload.iter().enumerate() { - let _ = match blockstore.get_confirmed_block( - *slot, - Some(solana_transaction_status::UiTransactionEncoding::Base64), - ) { - Ok(confirmed_block) => sender.send((*slot, Some(confirmed_block))), - Err(err) => { - warn!( - "Failed to get load confirmed block from slot {}: {:?}", - slot, err - ); - sender.send((*slot, None)) - } - }; - - if i % NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL == 0 { - info!( - "{}% of blocks processed ({}/{})", - i * 100 / blocks_to_upload.len(), - i, - blocks_to_upload.len() - ); - } - } - measure.stop(); - info!("{} to load {} blocks", measure, blocks_to_upload.len()); - }), - receiver, - ) - }; - - let mut failures = 0; - use futures::stream::StreamExt; - - let mut stream = - tokio::stream::iter(receiver.into_iter()).chunks(NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL); - - while let Some(blocks) = stream.next().await { - let mut measure_upload = Measure::start("Upload"); - let mut num_blocks = blocks.len(); - info!("Preparing the next {} blocks for upload", num_blocks); - - let uploads = blocks.into_iter().filter_map(|(slot, block)| match block { - None => { - blockstore_slots_with_no_confirmed_block.insert(slot); - num_blocks -= 1; - None - } - Some(confirmed_block) => { - if confirmed_block - .transactions - .iter() - .any(|transaction| transaction.meta.is_none()) - { - if allow_missing_metadata { - info!("Transaction metadata missing from slot {}", slot); - } else { - panic!("Transaction metadata missing from slot {}", slot); - } - } - Some(bigtable.upload_confirmed_block(slot, confirmed_block)) - } - }); - - for result in futures::future::join_all(uploads).await { - if result.is_err() { - error!("upload_confirmed_block() failed: {:?}", result.err()); - failures += 1; - } - } - - measure_upload.stop(); - info!("{} for {} blocks", measure_upload, num_blocks); - } - - measure.stop(); - info!("{}", measure); - if failures > 0 { - Err(format!("Incomplete upload, {} operations failed", failures).into()) - } else { - Ok(()) - } + solana_ledger::bigtable_upload::upload_confirmed_blocks( + Arc::new(blockstore), + bigtable, + starting_slot, + ending_slot, + allow_missing_metadata, + ) + .await } async fn first_available_block() -> Result<(), Box> { diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index e7528fe24f..8ffc905f3e 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -17,6 +17,8 @@ dlopen_derive = "0.1.4" dlopen = "0.1.8" ed25519-dalek = "1.0.0-pre.4" fs_extra = "1.1.0" +futures = "0.3.5" +futures-util = "0.3.5" itertools = "0.9.0" lazy_static = "1.4.0" libc = "0.2.72" @@ -40,9 +42,11 @@ solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "1.4.0" } solana-runtime = { path = "../runtime", version = "1.4.0" } solana-sdk = { path = "../sdk", version = "1.4.0" } solana-stake-program = { path = "../programs/stake", version = "1.4.0" } +solana-storage-bigtable = { path = "../storage-bigtable", version = "1.4.0" } solana-vote-program = { path = "../programs/vote", version = "1.4.0" } tempfile = "3.1.0" thiserror = "1.0" +tokio = { version = "0.2.22", features = ["full"] } trees = "0.2.1" [dependencies.rocksdb] diff --git a/ledger/src/bigtable_upload.rs b/ledger/src/bigtable_upload.rs new file mode 100644 index 0000000000..fe2356ca49 --- /dev/null +++ b/ledger/src/bigtable_upload.rs @@ -0,0 +1,206 @@ +use crate::blockstore::Blockstore; +use log::*; +use solana_measure::measure::Measure; +use solana_sdk::clock::Slot; +use std::{collections::HashSet, result::Result, sync::Arc, time::Duration}; +use tokio::time::delay_for; + +// Attempt to upload this many blocks in parallel +const NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL: usize = 32; + +// Read up to this many blocks from blockstore before blocking on the upload process +const BLOCK_READ_AHEAD_DEPTH: usize = NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 2; + +pub async fn upload_confirmed_blocks( + blockstore: Arc, + bigtable: solana_storage_bigtable::LedgerStorage, + starting_slot: Slot, + ending_slot: Option, + allow_missing_metadata: bool, +) -> Result<(), Box> { + let mut measure = Measure::start("entire upload"); + + info!("Loading ledger slots..."); + let blockstore_slots: Vec<_> = blockstore + .slot_meta_iterator(starting_slot) + .map_err(|err| { + format!( + "Failed to load entries starting from slot {}: {:?}", + starting_slot, err + ) + })? + .filter_map(|(slot, _slot_meta)| { + if let Some(ending_slot) = &ending_slot { + if slot > *ending_slot { + return None; + } + } + Some(slot) + }) + .collect(); + + if blockstore_slots.is_empty() { + info!("Ledger has no slots in the specified range"); + return Ok(()); + } + + info!( + "Found {} slots in the range ({}, {})", + blockstore_slots.len(), + blockstore_slots.first().unwrap(), + blockstore_slots.last().unwrap() + ); + + let mut blockstore_slots_with_no_confirmed_block = HashSet::new(); + + // Gather the blocks that are already present in bigtable, by slot + let bigtable_slots = { + let mut bigtable_slots = vec![]; + let first_blockstore_slot = *blockstore_slots.first().unwrap(); + let last_blockstore_slot = *blockstore_slots.last().unwrap(); + info!( + "Loading list of bigtable blocks between slots {} and {}...", + first_blockstore_slot, last_blockstore_slot + ); + + let mut start_slot = *blockstore_slots.first().unwrap(); + while start_slot <= last_blockstore_slot { + let mut next_bigtable_slots = loop { + match bigtable.get_confirmed_blocks(start_slot, 1000).await { + Ok(slots) => break slots, + Err(err) => { + error!("get_confirmed_blocks for {} failed: {:?}", start_slot, err); + // Consider exponential backoff... + delay_for(Duration::from_secs(2)).await; + } + } + }; + if next_bigtable_slots.is_empty() { + break; + } + bigtable_slots.append(&mut next_bigtable_slots); + start_slot = bigtable_slots.last().unwrap() + 1; + } + bigtable_slots + .into_iter() + .filter(|slot| *slot <= last_blockstore_slot) + .collect::>() + }; + + // The blocks that still need to be uploaded is the difference between what's already in the + // bigtable and what's in blockstore... + let blocks_to_upload = { + let blockstore_slots = blockstore_slots.iter().cloned().collect::>(); + let bigtable_slots = bigtable_slots.into_iter().collect::>(); + + let mut blocks_to_upload = blockstore_slots + .difference(&blockstore_slots_with_no_confirmed_block) + .cloned() + .collect::>() + .difference(&bigtable_slots) + .cloned() + .collect::>(); + blocks_to_upload.sort(); + blocks_to_upload + }; + + if blocks_to_upload.is_empty() { + info!("No blocks need to be uploaded to bigtable"); + return Ok(()); + } + info!( + "{} blocks to be uploaded to the bucket in the range ({}, {})", + blocks_to_upload.len(), + blocks_to_upload.first().unwrap(), + blocks_to_upload.last().unwrap() + ); + + // Load the blocks out of blockstore in a separate thread to allow for concurrent block uploading + let (_loader_thread, receiver) = { + let (sender, receiver) = std::sync::mpsc::sync_channel(BLOCK_READ_AHEAD_DEPTH); + ( + std::thread::spawn(move || { + let mut measure = Measure::start("block loader thread"); + for (i, slot) in blocks_to_upload.iter().enumerate() { + let _ = match blockstore.get_confirmed_block( + *slot, + Some(solana_transaction_status::UiTransactionEncoding::Base64), + ) { + Ok(confirmed_block) => sender.send((*slot, Some(confirmed_block))), + Err(err) => { + warn!( + "Failed to get load confirmed block from slot {}: {:?}", + slot, err + ); + sender.send((*slot, None)) + } + }; + + if i % NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL == 0 { + info!( + "{}% of blocks processed ({}/{})", + i * 100 / blocks_to_upload.len(), + i, + blocks_to_upload.len() + ); + } + } + measure.stop(); + info!("{} to load {} blocks", measure, blocks_to_upload.len()); + }), + receiver, + ) + }; + + let mut failures = 0; + use futures::stream::StreamExt; + + let mut stream = + tokio::stream::iter(receiver.into_iter()).chunks(NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL); + + while let Some(blocks) = stream.next().await { + let mut measure_upload = Measure::start("Upload"); + let mut num_blocks = blocks.len(); + info!("Preparing the next {} blocks for upload", num_blocks); + + let uploads = blocks.into_iter().filter_map(|(slot, block)| match block { + None => { + blockstore_slots_with_no_confirmed_block.insert(slot); + num_blocks -= 1; + None + } + Some(confirmed_block) => { + if confirmed_block + .transactions + .iter() + .any(|transaction| transaction.meta.is_none()) + { + if allow_missing_metadata { + info!("Transaction metadata missing from slot {}", slot); + } else { + panic!("Transaction metadata missing from slot {}", slot); + } + } + Some(bigtable.upload_confirmed_block(slot, confirmed_block)) + } + }); + + for result in futures::future::join_all(uploads).await { + if result.is_err() { + error!("upload_confirmed_block() failed: {:?}", result.err()); + failures += 1; + } + } + + measure_upload.stop(); + info!("{} for {} blocks", measure_upload, num_blocks); + } + + measure.stop(); + info!("{}", measure); + if failures > 0 { + Err(format!("Incomplete upload, {} operations failed", failures).into()) + } else { + Ok(()) + } +} diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index b034f8d3c3..0178e9b6f4 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -1,4 +1,5 @@ pub mod bank_forks_utils; +pub mod bigtable_upload; pub mod block_error; #[macro_use] pub mod blockstore;