diff --git a/Cargo.lock b/Cargo.lock index 3c592a991a..5fe3091a27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5875,6 +5875,7 @@ dependencies = [ "bzip2", "enum-iterator", "flate2", + "futures 0.3.21", "goauth", "log", "openssl", @@ -5888,6 +5889,7 @@ dependencies = [ "solana-storage-proto", "solana-transaction-status", "thiserror", + "tokio", "tonic 0.7.2", "zstd", ] diff --git a/ledger/src/bigtable_upload.rs b/ledger/src/bigtable_upload.rs index f2481351f0..f1145a4eef 100644 --- a/ledger/src/bigtable_upload.rs +++ b/ledger/src/bigtable_upload.rs @@ -1,18 +1,18 @@ use { crate::blockstore::Blockstore, - crossbeam_channel::bounded, + crossbeam_channel::{bounded, unbounded}, log::*, solana_measure::measure::Measure, solana_sdk::clock::Slot, std::{ - cmp::min, + cmp::{max, min}, collections::HashSet, result::Result, sync::{ atomic::{AtomicBool, Ordering}, Arc, }, - time::Duration, + time::{Duration, Instant}, }, }; @@ -26,16 +26,21 @@ pub struct ConfirmedBlockUploadConfig { impl Default for ConfirmedBlockUploadConfig { fn default() -> Self { - const NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL: usize = 32; + let num_blocks_to_upload_in_parallel = num_cpus::get() / 2; ConfirmedBlockUploadConfig { force_reupload: false, - max_num_slots_to_check: NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 4, - num_blocks_to_upload_in_parallel: NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL, - block_read_ahead_depth: NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 2, + max_num_slots_to_check: num_blocks_to_upload_in_parallel * 4, + num_blocks_to_upload_in_parallel, + block_read_ahead_depth: num_blocks_to_upload_in_parallel * 2, } } } +struct BlockstoreLoadStats { + pub num_blocks_read: usize, + pub elapsed: Duration, +} + pub async fn upload_confirmed_blocks( blockstore: Arc, bigtable: solana_storage_bigtable::LedgerStorage, @@ -147,42 +152,56 @@ pub async fn upload_confirmed_blocks( last_slot ); - // Load the blocks out of blockstore in a separate thread to allow for concurrent block uploading - let (_loader_thread, receiver) = { + // Distribute the blockstore reading across a few background threads to speed up the bigtable uploading + let (loader_threads, receiver): (Vec<_>, _) = { let exit = exit.clone(); let (sender, receiver) = bounded(config.block_read_ahead_depth); + + let (slot_sender, slot_receiver) = unbounded(); + let _ = blocks_to_upload + .into_iter() + .for_each(|b| slot_sender.send(b).unwrap()); + drop(slot_sender); + ( - std::thread::spawn(move || { - let mut measure = Measure::start("block loader thread"); - for (i, slot) in blocks_to_upload.iter().enumerate() { - if exit.load(Ordering::Relaxed) { - break; - } + (0..config.num_blocks_to_upload_in_parallel) + .map(|_| { + let blockstore = blockstore.clone(); + let sender = sender.clone(); + let slot_receiver = slot_receiver.clone(); + let exit = exit.clone(); - let _ = match blockstore.get_rooted_block(*slot, true) { - Ok(confirmed_block) => sender.send((*slot, Some(confirmed_block))), - Err(err) => { - warn!( - "Failed to get load confirmed block from slot {}: {:?}", - slot, err - ); - sender.send((*slot, None)) + std::thread::spawn(move || { + let start = Instant::now(); + let mut num_blocks_read = 0; + + while let Ok(slot) = slot_receiver.recv() { + if exit.load(Ordering::Relaxed) { + break; + } + + let _ = match blockstore.get_rooted_block(slot, true) { + Ok(confirmed_block) => { + num_blocks_read += 1; + sender.send((slot, Some(confirmed_block))) + } + Err(err) => { + warn!( + "Failed to get load confirmed block from slot {}: {:?}", + slot, err + ); + sender.send((slot, None)) + } + }; } - }; - - if i > 0 && i % config.num_blocks_to_upload_in_parallel == 0 { - info!( - "{}% of blocks processed ({}/{})", - i * 100 / blocks_to_upload.len(), - i, - blocks_to_upload.len() - ); - } - } - measure.stop(); - info!("{} to load {} blocks", measure, blocks_to_upload.len()); - }), + BlockstoreLoadStats { + num_blocks_read, + elapsed: start.elapsed(), + } + }) + }) + .collect(), receiver, ) }; @@ -207,12 +226,20 @@ pub async fn upload_confirmed_blocks( num_blocks -= 1; None } - Some(confirmed_block) => Some(bigtable.upload_confirmed_block(slot, confirmed_block)), + Some(confirmed_block) => { + let bt = bigtable.clone(); + Some(tokio::spawn(async move { + bt.upload_confirmed_block(slot, confirmed_block).await + })) + } }); for result in futures::future::join_all(uploads).await { - if result.is_err() { - error!("upload_confirmed_block() failed: {:?}", result.err()); + if let Err(err) = result { + error!("upload_confirmed_block() join failed: {:?}", err); + failures += 1; + } else if let Err(err) = result.unwrap() { + error!("upload_confirmed_block() upload failed: {:?}", err); failures += 1; } } @@ -223,6 +250,34 @@ pub async fn upload_confirmed_blocks( measure.stop(); info!("{}", measure); + + let blockstore_results = loader_threads.into_iter().map(|t| t.join()); + + let mut blockstore_num_blocks_read = 0; + let mut blockstore_load_wallclock = Duration::default(); + let mut blockstore_errors = 0; + + for r in blockstore_results { + match r { + Ok(stats) => { + blockstore_num_blocks_read += stats.num_blocks_read; + blockstore_load_wallclock = max(stats.elapsed, blockstore_load_wallclock); + } + Err(e) => { + error!("error joining blockstore thread: {:?}", e); + blockstore_errors += 1; + } + } + } + + info!( + "blockstore upload took {:?} for {} blocks ({:.2} blocks/s) errors: {}", + blockstore_load_wallclock, + blockstore_num_blocks_read, + blockstore_num_blocks_read as f64 / blockstore_load_wallclock.as_secs_f64(), + blockstore_errors + ); + if failures > 0 { Err(format!("Incomplete upload, {} operations failed", failures).into()) } else { diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 3935d955a2..5e0eb9d297 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -5224,6 +5224,7 @@ dependencies = [ "bzip2", "enum-iterator", "flate2", + "futures 0.3.21", "goauth", "log", "openssl", @@ -5237,6 +5238,7 @@ dependencies = [ "solana-storage-proto", "solana-transaction-status", "thiserror", + "tokio", "tonic 0.7.2", "zstd", ] diff --git a/storage-bigtable/Cargo.toml b/storage-bigtable/Cargo.toml index d370da1801..5cdd0f8a6d 100644 --- a/storage-bigtable/Cargo.toml +++ b/storage-bigtable/Cargo.toml @@ -15,6 +15,7 @@ bincode = "1.3.3" bzip2 = "0.4.3" enum-iterator = "0.8.1" flate2 = "1.0.23" +futures = "0.3.21" goauth = "0.12.0" log = "0.4.17" prost = "0.10.3" @@ -27,6 +28,7 @@ solana-sdk = { path = "../sdk", version = "=1.11.0" } solana-storage-proto = { path = "../storage-proto", version = "=1.11.0" } solana-transaction-status = { path = "../transaction-status", version = "=1.11.0" } thiserror = "1.0" +tokio = "~1.14.1" tonic = { version = "0.7.2", features = ["tls", "transport"] } zstd = "0.11.2" diff --git a/storage-bigtable/src/lib.rs b/storage-bigtable/src/lib.rs index 2a6e798808..4fe7a58e1a 100644 --- a/storage-bigtable/src/lib.rs +++ b/storage-bigtable/src/lib.rs @@ -1,4 +1,5 @@ #![allow(clippy::integer_arithmetic)] + use { crate::bigtable::RowKey, log::*, @@ -25,6 +26,7 @@ use { convert::TryInto, }, thiserror::Error, + tokio::task::JoinError, }; #[macro_use] @@ -54,6 +56,9 @@ pub enum Error { #[error("Signature not found")] SignatureNotFound, + + #[error("tokio error")] + TokioJoinError(JoinError), } impl std::convert::From for Error { @@ -737,8 +742,6 @@ impl LedgerStorage { slot: Slot, confirmed_block: VersionedConfirmedBlock, ) -> Result<()> { - let mut bytes_written = 0; - let mut by_addr: HashMap<&Pubkey, Vec> = HashMap::new(); let mut tx_cells = vec![]; @@ -790,21 +793,51 @@ impl LedgerStorage { }) .collect(); + let mut tasks = vec![]; + if !tx_cells.is_empty() { - bytes_written += self - .connection - .put_bincode_cells_with_retry::("tx", &tx_cells) - .await?; + let conn = self.connection.clone(); + tasks.push(tokio::spawn(async move { + conn.put_bincode_cells_with_retry::("tx", &tx_cells) + .await + })); } if !tx_by_addr_cells.is_empty() { - bytes_written += self - .connection - .put_protobuf_cells_with_retry::( + let conn = self.connection.clone(); + tasks.push(tokio::spawn(async move { + conn.put_protobuf_cells_with_retry::( "tx-by-addr", &tx_by_addr_cells, ) - .await?; + .await + })); + } + + let mut bytes_written = 0; + let mut maybe_first_err: Option = None; + + let results = futures::future::join_all(tasks).await; + for result in results { + match result { + Err(err) => { + if maybe_first_err.is_none() { + maybe_first_err = Some(Error::TokioJoinError(err)); + } + } + Ok(Err(err)) => { + if maybe_first_err.is_none() { + maybe_first_err = Some(Error::BigTableError(err)); + } + } + Ok(Ok(bytes)) => { + bytes_written += bytes; + } + } + } + + if let Some(err) = maybe_first_err { + return Err(err); } let num_transactions = confirmed_block.transactions.len();