Speedup bigtable block upload by factor of 8-10x (#24534)

Added multiple blockstore read threads.
Run the bigtable upload in tokio::spawn context.
Run bigtable tx and tx-by-addr uploads in tokio::spawn context.
This commit is contained in:
buffalu 2022-05-17 01:21:05 -05:00 committed by GitHub
parent e718c80a21
commit 6bcadc755e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 144 additions and 50 deletions

2
Cargo.lock generated
View File

@ -5875,6 +5875,7 @@ dependencies = [
"bzip2", "bzip2",
"enum-iterator", "enum-iterator",
"flate2", "flate2",
"futures 0.3.21",
"goauth", "goauth",
"log", "log",
"openssl", "openssl",
@ -5888,6 +5889,7 @@ dependencies = [
"solana-storage-proto", "solana-storage-proto",
"solana-transaction-status", "solana-transaction-status",
"thiserror", "thiserror",
"tokio",
"tonic 0.7.2", "tonic 0.7.2",
"zstd", "zstd",
] ]

View File

@ -1,18 +1,18 @@
use { use {
crate::blockstore::Blockstore, crate::blockstore::Blockstore,
crossbeam_channel::bounded, crossbeam_channel::{bounded, unbounded},
log::*, log::*,
solana_measure::measure::Measure, solana_measure::measure::Measure,
solana_sdk::clock::Slot, solana_sdk::clock::Slot,
std::{ std::{
cmp::min, cmp::{max, min},
collections::HashSet, collections::HashSet,
result::Result, result::Result,
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
Arc, Arc,
}, },
time::Duration, time::{Duration, Instant},
}, },
}; };
@ -26,16 +26,21 @@ pub struct ConfirmedBlockUploadConfig {
impl Default for ConfirmedBlockUploadConfig { impl Default for ConfirmedBlockUploadConfig {
fn default() -> Self { fn default() -> Self {
const NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL: usize = 32; let num_blocks_to_upload_in_parallel = num_cpus::get() / 2;
ConfirmedBlockUploadConfig { ConfirmedBlockUploadConfig {
force_reupload: false, force_reupload: false,
max_num_slots_to_check: NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 4, max_num_slots_to_check: num_blocks_to_upload_in_parallel * 4,
num_blocks_to_upload_in_parallel: NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL, num_blocks_to_upload_in_parallel,
block_read_ahead_depth: NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 2, block_read_ahead_depth: num_blocks_to_upload_in_parallel * 2,
} }
} }
} }
struct BlockstoreLoadStats {
pub num_blocks_read: usize,
pub elapsed: Duration,
}
pub async fn upload_confirmed_blocks( pub async fn upload_confirmed_blocks(
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
bigtable: solana_storage_bigtable::LedgerStorage, bigtable: solana_storage_bigtable::LedgerStorage,
@ -147,42 +152,56 @@ pub async fn upload_confirmed_blocks(
last_slot last_slot
); );
// Load the blocks out of blockstore in a separate thread to allow for concurrent block uploading // Distribute the blockstore reading across a few background threads to speed up the bigtable uploading
let (_loader_thread, receiver) = { let (loader_threads, receiver): (Vec<_>, _) = {
let exit = exit.clone(); let exit = exit.clone();
let (sender, receiver) = bounded(config.block_read_ahead_depth); let (sender, receiver) = bounded(config.block_read_ahead_depth);
let (slot_sender, slot_receiver) = unbounded();
let _ = blocks_to_upload
.into_iter()
.for_each(|b| slot_sender.send(b).unwrap());
drop(slot_sender);
( (
std::thread::spawn(move || { (0..config.num_blocks_to_upload_in_parallel)
let mut measure = Measure::start("block loader thread"); .map(|_| {
for (i, slot) in blocks_to_upload.iter().enumerate() { let blockstore = blockstore.clone();
if exit.load(Ordering::Relaxed) { let sender = sender.clone();
break; let slot_receiver = slot_receiver.clone();
} let exit = exit.clone();
let _ = match blockstore.get_rooted_block(*slot, true) { std::thread::spawn(move || {
Ok(confirmed_block) => sender.send((*slot, Some(confirmed_block))), let start = Instant::now();
Err(err) => { let mut num_blocks_read = 0;
warn!(
"Failed to get load confirmed block from slot {}: {:?}", while let Ok(slot) = slot_receiver.recv() {
slot, err if exit.load(Ordering::Relaxed) {
); break;
sender.send((*slot, None)) }
let _ = match blockstore.get_rooted_block(slot, true) {
Ok(confirmed_block) => {
num_blocks_read += 1;
sender.send((slot, Some(confirmed_block)))
}
Err(err) => {
warn!(
"Failed to get load confirmed block from slot {}: {:?}",
slot, err
);
sender.send((slot, None))
}
};
} }
}; BlockstoreLoadStats {
num_blocks_read,
if i > 0 && i % config.num_blocks_to_upload_in_parallel == 0 { elapsed: start.elapsed(),
info!( }
"{}% of blocks processed ({}/{})", })
i * 100 / blocks_to_upload.len(), })
i, .collect(),
blocks_to_upload.len()
);
}
}
measure.stop();
info!("{} to load {} blocks", measure, blocks_to_upload.len());
}),
receiver, receiver,
) )
}; };
@ -207,12 +226,20 @@ pub async fn upload_confirmed_blocks(
num_blocks -= 1; num_blocks -= 1;
None None
} }
Some(confirmed_block) => Some(bigtable.upload_confirmed_block(slot, confirmed_block)), Some(confirmed_block) => {
let bt = bigtable.clone();
Some(tokio::spawn(async move {
bt.upload_confirmed_block(slot, confirmed_block).await
}))
}
}); });
for result in futures::future::join_all(uploads).await { for result in futures::future::join_all(uploads).await {
if result.is_err() { if let Err(err) = result {
error!("upload_confirmed_block() failed: {:?}", result.err()); error!("upload_confirmed_block() join failed: {:?}", err);
failures += 1;
} else if let Err(err) = result.unwrap() {
error!("upload_confirmed_block() upload failed: {:?}", err);
failures += 1; failures += 1;
} }
} }
@ -223,6 +250,34 @@ pub async fn upload_confirmed_blocks(
measure.stop(); measure.stop();
info!("{}", measure); info!("{}", measure);
let blockstore_results = loader_threads.into_iter().map(|t| t.join());
let mut blockstore_num_blocks_read = 0;
let mut blockstore_load_wallclock = Duration::default();
let mut blockstore_errors = 0;
for r in blockstore_results {
match r {
Ok(stats) => {
blockstore_num_blocks_read += stats.num_blocks_read;
blockstore_load_wallclock = max(stats.elapsed, blockstore_load_wallclock);
}
Err(e) => {
error!("error joining blockstore thread: {:?}", e);
blockstore_errors += 1;
}
}
}
info!(
"blockstore upload took {:?} for {} blocks ({:.2} blocks/s) errors: {}",
blockstore_load_wallclock,
blockstore_num_blocks_read,
blockstore_num_blocks_read as f64 / blockstore_load_wallclock.as_secs_f64(),
blockstore_errors
);
if failures > 0 { if failures > 0 {
Err(format!("Incomplete upload, {} operations failed", failures).into()) Err(format!("Incomplete upload, {} operations failed", failures).into())
} else { } else {

View File

@ -5224,6 +5224,7 @@ dependencies = [
"bzip2", "bzip2",
"enum-iterator", "enum-iterator",
"flate2", "flate2",
"futures 0.3.21",
"goauth", "goauth",
"log", "log",
"openssl", "openssl",
@ -5237,6 +5238,7 @@ dependencies = [
"solana-storage-proto", "solana-storage-proto",
"solana-transaction-status", "solana-transaction-status",
"thiserror", "thiserror",
"tokio",
"tonic 0.7.2", "tonic 0.7.2",
"zstd", "zstd",
] ]

View File

@ -15,6 +15,7 @@ bincode = "1.3.3"
bzip2 = "0.4.3" bzip2 = "0.4.3"
enum-iterator = "0.8.1" enum-iterator = "0.8.1"
flate2 = "1.0.23" flate2 = "1.0.23"
futures = "0.3.21"
goauth = "0.12.0" goauth = "0.12.0"
log = "0.4.17" log = "0.4.17"
prost = "0.10.3" prost = "0.10.3"
@ -27,6 +28,7 @@ solana-sdk = { path = "../sdk", version = "=1.11.0" }
solana-storage-proto = { path = "../storage-proto", version = "=1.11.0" } solana-storage-proto = { path = "../storage-proto", version = "=1.11.0" }
solana-transaction-status = { path = "../transaction-status", version = "=1.11.0" } solana-transaction-status = { path = "../transaction-status", version = "=1.11.0" }
thiserror = "1.0" thiserror = "1.0"
tokio = "~1.14.1"
tonic = { version = "0.7.2", features = ["tls", "transport"] } tonic = { version = "0.7.2", features = ["tls", "transport"] }
zstd = "0.11.2" zstd = "0.11.2"

View File

@ -1,4 +1,5 @@
#![allow(clippy::integer_arithmetic)] #![allow(clippy::integer_arithmetic)]
use { use {
crate::bigtable::RowKey, crate::bigtable::RowKey,
log::*, log::*,
@ -25,6 +26,7 @@ use {
convert::TryInto, convert::TryInto,
}, },
thiserror::Error, thiserror::Error,
tokio::task::JoinError,
}; };
#[macro_use] #[macro_use]
@ -54,6 +56,9 @@ pub enum Error {
#[error("Signature not found")] #[error("Signature not found")]
SignatureNotFound, SignatureNotFound,
#[error("tokio error")]
TokioJoinError(JoinError),
} }
impl std::convert::From<bigtable::Error> for Error { impl std::convert::From<bigtable::Error> for Error {
@ -737,8 +742,6 @@ impl LedgerStorage {
slot: Slot, slot: Slot,
confirmed_block: VersionedConfirmedBlock, confirmed_block: VersionedConfirmedBlock,
) -> Result<()> { ) -> Result<()> {
let mut bytes_written = 0;
let mut by_addr: HashMap<&Pubkey, Vec<TransactionByAddrInfo>> = HashMap::new(); let mut by_addr: HashMap<&Pubkey, Vec<TransactionByAddrInfo>> = HashMap::new();
let mut tx_cells = vec![]; let mut tx_cells = vec![];
@ -790,21 +793,51 @@ impl LedgerStorage {
}) })
.collect(); .collect();
let mut tasks = vec![];
if !tx_cells.is_empty() { if !tx_cells.is_empty() {
bytes_written += self let conn = self.connection.clone();
.connection tasks.push(tokio::spawn(async move {
.put_bincode_cells_with_retry::<TransactionInfo>("tx", &tx_cells) conn.put_bincode_cells_with_retry::<TransactionInfo>("tx", &tx_cells)
.await?; .await
}));
} }
if !tx_by_addr_cells.is_empty() { if !tx_by_addr_cells.is_empty() {
bytes_written += self let conn = self.connection.clone();
.connection tasks.push(tokio::spawn(async move {
.put_protobuf_cells_with_retry::<tx_by_addr::TransactionByAddr>( conn.put_protobuf_cells_with_retry::<tx_by_addr::TransactionByAddr>(
"tx-by-addr", "tx-by-addr",
&tx_by_addr_cells, &tx_by_addr_cells,
) )
.await?; .await
}));
}
let mut bytes_written = 0;
let mut maybe_first_err: Option<Error> = None;
let results = futures::future::join_all(tasks).await;
for result in results {
match result {
Err(err) => {
if maybe_first_err.is_none() {
maybe_first_err = Some(Error::TokioJoinError(err));
}
}
Ok(Err(err)) => {
if maybe_first_err.is_none() {
maybe_first_err = Some(Error::BigTableError(err));
}
}
Ok(Ok(bytes)) => {
bytes_written += bytes;
}
}
}
if let Some(err) = maybe_first_err {
return Err(err);
} }
let num_transactions = confirmed_block.transactions.len(); let num_transactions = confirmed_block.transactions.len();