Relocate BigTable uploader to ledger/ crate

This commit is contained in:
Michael Vines 2020-09-03 19:39:16 -07:00
parent d8e2038dda
commit 91a56caed2
4 changed files with 220 additions and 194 deletions

View File

@ -1,23 +1,14 @@
/// The `bigtable` subcommand
use clap::{value_t, value_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand};
use log::*;
use solana_clap_utils::{
input_parsers::pubkey_of,
input_validators::{is_slot, is_valid_pubkey},
};
use solana_cli::display::println_transaction;
use solana_ledger::{blockstore::Blockstore, blockstore_db::AccessType};
use solana_measure::measure::Measure;
use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature};
use solana_transaction_status::UiTransactionEncoding;
use std::{collections::HashSet, path::Path, process::exit, result::Result, time::Duration};
use tokio::time::delay_for;
// Attempt to upload this many blocks in parallel
const NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL: usize = 32;
// Read up to this many blocks from blockstore before blocking on the upload process
const BLOCK_READ_AHEAD_DEPTH: usize = NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 2;
use std::{path::Path, process::exit, result::Result, sync::Arc};
async fn upload(
blockstore: Blockstore,
@ -25,194 +16,18 @@ async fn upload(
ending_slot: Option<Slot>,
allow_missing_metadata: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let mut measure = Measure::start("entire upload");
let bigtable = solana_storage_bigtable::LedgerStorage::new(false)
.await
.map_err(|err| format!("Failed to connect to storage: {:?}", err))?;
info!("Loading ledger slots...");
let blockstore_slots: Vec<_> = blockstore
.slot_meta_iterator(starting_slot)
.map_err(|err| {
format!(
"Failed to load entries starting from slot {}: {:?}",
starting_slot, err
)
})?
.filter_map(|(slot, _slot_meta)| {
if let Some(ending_slot) = &ending_slot {
if slot > *ending_slot {
return None;
}
}
Some(slot)
})
.collect();
if blockstore_slots.is_empty() {
info!("Ledger has no slots in the specified range");
return Ok(());
}
info!(
"Found {} slots in the range ({}, {})",
blockstore_slots.len(),
blockstore_slots.first().unwrap(),
blockstore_slots.last().unwrap()
);
let mut blockstore_slots_with_no_confirmed_block = HashSet::new();
// Gather the blocks that are already present in bigtable, by slot
let bigtable_slots = {
let mut bigtable_slots = vec![];
let first_blockstore_slot = *blockstore_slots.first().unwrap();
let last_blockstore_slot = *blockstore_slots.last().unwrap();
info!(
"Loading list of bigtable blocks between slots {} and {}...",
first_blockstore_slot, last_blockstore_slot
);
let mut start_slot = *blockstore_slots.first().unwrap();
while start_slot <= last_blockstore_slot {
let mut next_bigtable_slots = loop {
match bigtable.get_confirmed_blocks(start_slot, 1000).await {
Ok(slots) => break slots,
Err(err) => {
error!("get_confirmed_blocks for {} failed: {:?}", start_slot, err);
// Consider exponential backoff...
delay_for(Duration::from_secs(2)).await;
}
}
};
if next_bigtable_slots.is_empty() {
break;
}
bigtable_slots.append(&mut next_bigtable_slots);
start_slot = bigtable_slots.last().unwrap() + 1;
}
bigtable_slots
.into_iter()
.filter(|slot| *slot <= last_blockstore_slot)
.collect::<Vec<_>>()
};
// The blocks that still need to be uploaded is the difference between what's already in the
// bigtable and what's in blockstore...
let blocks_to_upload = {
let blockstore_slots = blockstore_slots.iter().cloned().collect::<HashSet<_>>();
let bigtable_slots = bigtable_slots.into_iter().collect::<HashSet<_>>();
let mut blocks_to_upload = blockstore_slots
.difference(&blockstore_slots_with_no_confirmed_block)
.cloned()
.collect::<HashSet<_>>()
.difference(&bigtable_slots)
.cloned()
.collect::<Vec<_>>();
blocks_to_upload.sort();
blocks_to_upload
};
if blocks_to_upload.is_empty() {
info!("No blocks need to be uploaded to bigtable");
return Ok(());
}
info!(
"{} blocks to be uploaded to the bucket in the range ({}, {})",
blocks_to_upload.len(),
blocks_to_upload.first().unwrap(),
blocks_to_upload.last().unwrap()
);
// Load the blocks out of blockstore in a separate thread to allow for concurrent block uploading
let (_loader_thread, receiver) = {
let (sender, receiver) = std::sync::mpsc::sync_channel(BLOCK_READ_AHEAD_DEPTH);
(
std::thread::spawn(move || {
let mut measure = Measure::start("block loader thread");
for (i, slot) in blocks_to_upload.iter().enumerate() {
let _ = match blockstore.get_confirmed_block(
*slot,
Some(solana_transaction_status::UiTransactionEncoding::Base64),
) {
Ok(confirmed_block) => sender.send((*slot, Some(confirmed_block))),
Err(err) => {
warn!(
"Failed to get load confirmed block from slot {}: {:?}",
slot, err
);
sender.send((*slot, None))
}
};
if i % NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL == 0 {
info!(
"{}% of blocks processed ({}/{})",
i * 100 / blocks_to_upload.len(),
i,
blocks_to_upload.len()
);
}
}
measure.stop();
info!("{} to load {} blocks", measure, blocks_to_upload.len());
}),
receiver,
)
};
let mut failures = 0;
use futures::stream::StreamExt;
let mut stream =
tokio::stream::iter(receiver.into_iter()).chunks(NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL);
while let Some(blocks) = stream.next().await {
let mut measure_upload = Measure::start("Upload");
let mut num_blocks = blocks.len();
info!("Preparing the next {} blocks for upload", num_blocks);
let uploads = blocks.into_iter().filter_map(|(slot, block)| match block {
None => {
blockstore_slots_with_no_confirmed_block.insert(slot);
num_blocks -= 1;
None
}
Some(confirmed_block) => {
if confirmed_block
.transactions
.iter()
.any(|transaction| transaction.meta.is_none())
{
if allow_missing_metadata {
info!("Transaction metadata missing from slot {}", slot);
} else {
panic!("Transaction metadata missing from slot {}", slot);
}
}
Some(bigtable.upload_confirmed_block(slot, confirmed_block))
}
});
for result in futures::future::join_all(uploads).await {
if result.is_err() {
error!("upload_confirmed_block() failed: {:?}", result.err());
failures += 1;
}
}
measure_upload.stop();
info!("{} for {} blocks", measure_upload, num_blocks);
}
measure.stop();
info!("{}", measure);
if failures > 0 {
Err(format!("Incomplete upload, {} operations failed", failures).into())
} else {
Ok(())
}
solana_ledger::bigtable_upload::upload_confirmed_blocks(
Arc::new(blockstore),
bigtable,
starting_slot,
ending_slot,
allow_missing_metadata,
)
.await
}
async fn first_available_block() -> Result<(), Box<dyn std::error::Error>> {

View File

@ -17,6 +17,8 @@ dlopen_derive = "0.1.4"
dlopen = "0.1.8"
ed25519-dalek = "1.0.0-pre.4"
fs_extra = "1.1.0"
futures = "0.3.5"
futures-util = "0.3.5"
itertools = "0.9.0"
lazy_static = "1.4.0"
libc = "0.2.72"
@ -40,9 +42,11 @@ solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "1.4.0" }
solana-runtime = { path = "../runtime", version = "1.4.0" }
solana-sdk = { path = "../sdk", version = "1.4.0" }
solana-stake-program = { path = "../programs/stake", version = "1.4.0" }
solana-storage-bigtable = { path = "../storage-bigtable", version = "1.4.0" }
solana-vote-program = { path = "../programs/vote", version = "1.4.0" }
tempfile = "3.1.0"
thiserror = "1.0"
tokio = { version = "0.2.22", features = ["full"] }
trees = "0.2.1"
[dependencies.rocksdb]

View File

@ -0,0 +1,206 @@
use crate::blockstore::Blockstore;
use log::*;
use solana_measure::measure::Measure;
use solana_sdk::clock::Slot;
use std::{collections::HashSet, result::Result, sync::Arc, time::Duration};
use tokio::time::delay_for;
// Attempt to upload this many blocks in parallel
const NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL: usize = 32;
// Read up to this many blocks from blockstore before blocking on the upload process
const BLOCK_READ_AHEAD_DEPTH: usize = NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 2;
pub async fn upload_confirmed_blocks(
blockstore: Arc<Blockstore>,
bigtable: solana_storage_bigtable::LedgerStorage,
starting_slot: Slot,
ending_slot: Option<Slot>,
allow_missing_metadata: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let mut measure = Measure::start("entire upload");
info!("Loading ledger slots...");
let blockstore_slots: Vec<_> = blockstore
.slot_meta_iterator(starting_slot)
.map_err(|err| {
format!(
"Failed to load entries starting from slot {}: {:?}",
starting_slot, err
)
})?
.filter_map(|(slot, _slot_meta)| {
if let Some(ending_slot) = &ending_slot {
if slot > *ending_slot {
return None;
}
}
Some(slot)
})
.collect();
if blockstore_slots.is_empty() {
info!("Ledger has no slots in the specified range");
return Ok(());
}
info!(
"Found {} slots in the range ({}, {})",
blockstore_slots.len(),
blockstore_slots.first().unwrap(),
blockstore_slots.last().unwrap()
);
let mut blockstore_slots_with_no_confirmed_block = HashSet::new();
// Gather the blocks that are already present in bigtable, by slot
let bigtable_slots = {
let mut bigtable_slots = vec![];
let first_blockstore_slot = *blockstore_slots.first().unwrap();
let last_blockstore_slot = *blockstore_slots.last().unwrap();
info!(
"Loading list of bigtable blocks between slots {} and {}...",
first_blockstore_slot, last_blockstore_slot
);
let mut start_slot = *blockstore_slots.first().unwrap();
while start_slot <= last_blockstore_slot {
let mut next_bigtable_slots = loop {
match bigtable.get_confirmed_blocks(start_slot, 1000).await {
Ok(slots) => break slots,
Err(err) => {
error!("get_confirmed_blocks for {} failed: {:?}", start_slot, err);
// Consider exponential backoff...
delay_for(Duration::from_secs(2)).await;
}
}
};
if next_bigtable_slots.is_empty() {
break;
}
bigtable_slots.append(&mut next_bigtable_slots);
start_slot = bigtable_slots.last().unwrap() + 1;
}
bigtable_slots
.into_iter()
.filter(|slot| *slot <= last_blockstore_slot)
.collect::<Vec<_>>()
};
// The blocks that still need to be uploaded is the difference between what's already in the
// bigtable and what's in blockstore...
let blocks_to_upload = {
let blockstore_slots = blockstore_slots.iter().cloned().collect::<HashSet<_>>();
let bigtable_slots = bigtable_slots.into_iter().collect::<HashSet<_>>();
let mut blocks_to_upload = blockstore_slots
.difference(&blockstore_slots_with_no_confirmed_block)
.cloned()
.collect::<HashSet<_>>()
.difference(&bigtable_slots)
.cloned()
.collect::<Vec<_>>();
blocks_to_upload.sort();
blocks_to_upload
};
if blocks_to_upload.is_empty() {
info!("No blocks need to be uploaded to bigtable");
return Ok(());
}
info!(
"{} blocks to be uploaded to the bucket in the range ({}, {})",
blocks_to_upload.len(),
blocks_to_upload.first().unwrap(),
blocks_to_upload.last().unwrap()
);
// Load the blocks out of blockstore in a separate thread to allow for concurrent block uploading
let (_loader_thread, receiver) = {
let (sender, receiver) = std::sync::mpsc::sync_channel(BLOCK_READ_AHEAD_DEPTH);
(
std::thread::spawn(move || {
let mut measure = Measure::start("block loader thread");
for (i, slot) in blocks_to_upload.iter().enumerate() {
let _ = match blockstore.get_confirmed_block(
*slot,
Some(solana_transaction_status::UiTransactionEncoding::Base64),
) {
Ok(confirmed_block) => sender.send((*slot, Some(confirmed_block))),
Err(err) => {
warn!(
"Failed to get load confirmed block from slot {}: {:?}",
slot, err
);
sender.send((*slot, None))
}
};
if i % NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL == 0 {
info!(
"{}% of blocks processed ({}/{})",
i * 100 / blocks_to_upload.len(),
i,
blocks_to_upload.len()
);
}
}
measure.stop();
info!("{} to load {} blocks", measure, blocks_to_upload.len());
}),
receiver,
)
};
let mut failures = 0;
use futures::stream::StreamExt;
let mut stream =
tokio::stream::iter(receiver.into_iter()).chunks(NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL);
while let Some(blocks) = stream.next().await {
let mut measure_upload = Measure::start("Upload");
let mut num_blocks = blocks.len();
info!("Preparing the next {} blocks for upload", num_blocks);
let uploads = blocks.into_iter().filter_map(|(slot, block)| match block {
None => {
blockstore_slots_with_no_confirmed_block.insert(slot);
num_blocks -= 1;
None
}
Some(confirmed_block) => {
if confirmed_block
.transactions
.iter()
.any(|transaction| transaction.meta.is_none())
{
if allow_missing_metadata {
info!("Transaction metadata missing from slot {}", slot);
} else {
panic!("Transaction metadata missing from slot {}", slot);
}
}
Some(bigtable.upload_confirmed_block(slot, confirmed_block))
}
});
for result in futures::future::join_all(uploads).await {
if result.is_err() {
error!("upload_confirmed_block() failed: {:?}", result.err());
failures += 1;
}
}
measure_upload.stop();
info!("{} for {} blocks", measure_upload, num_blocks);
}
measure.stop();
info!("{}", measure);
if failures > 0 {
Err(format!("Incomplete upload, {} operations failed", failures).into())
} else {
Ok(())
}
}

View File

@ -1,4 +1,5 @@
pub mod bank_forks_utils;
pub mod bigtable_upload;
pub mod block_error;
#[macro_use]
pub mod blockstore;