add solana-ledger-tool bigtable copy (#28122)

* init copy cmd

* extract creating emulator connection logic

* extract copy args as struct

* add new_for_emulator

* add tryFrom confirmed block to versioned block

* implement bigtable copy command

* use 'force' flag to force upload

* use unwrap_or

* remove redundant importing

* fix nightly lint

* explicit transactions missing error

* process ending_slot

* prevent start slot > end slot

* print skip slots in debug level

* fix destination bigtable should not be readonly

* combine is-emulator and endpoint to emulated source. conflict with crenditial path

* wording

* log some error messages with error level

* nightly lint

* add dry-run

* extract create bigtable instances logic

* use a lighter way to check block

* use the latest futures version which is used in the repo

* use futures = "0.3"

Co-authored-by: Tyera <teulberg@gmail.com>

* wording

Co-authored-by: Tyera <teulberg@gmail.com>

* wording

Co-authored-by: Tyera <teulberg@gmail.com>

Co-authored-by: Tyera <teulberg@gmail.com>
This commit is contained in:
Yihau Chen 2023-01-09 11:23:35 +08:00 committed by GitHub
parent 677b6d6458
commit 94cb88ffad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 446 additions and 13 deletions

2
Cargo.lock generated
View File

@ -5665,9 +5665,11 @@ dependencies = [
"crossbeam-channel",
"csv",
"dashmap 4.0.2",
"futures 0.3.24",
"histogram",
"itertools",
"log",
"num_cpus",
"regex",
"serde",
"serde_json",

View File

@ -16,9 +16,11 @@ clap = "2.33.1"
crossbeam-channel = "0.5"
csv = "1.1.6"
dashmap = "4.0.2"
futures = "0.3"
histogram = "0.6.9"
itertools = "0.10.5"
log = { version = "0.4.17" }
num_cpus = "1.13.1"
regex = "1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.83"

View File

@ -4,7 +4,9 @@ use {
clap::{
value_t, value_t_or_exit, values_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand,
},
log::info,
crossbeam_channel::unbounded,
futures::stream::FuturesUnordered,
log::{debug, error, info},
serde_json::json,
solana_clap_utils::{
input_parsers::pubkey_of,
@ -23,7 +25,7 @@ use {
solana_storage_bigtable::CredentialType,
solana_transaction_status::{
BlockEncodingOptions, ConfirmedBlock, EncodeError, TransactionDetails,
UiTransactionEncoding,
UiTransactionEncoding, VersionedConfirmedBlock,
},
std::{
cmp::min,
@ -32,7 +34,7 @@ use {
process::exit,
result::Result,
str::FromStr,
sync::{atomic::AtomicBool, Arc},
sync::{atomic::AtomicBool, Arc, Mutex},
},
};
@ -342,6 +344,254 @@ pub async fn transaction_history(
Ok(())
}
struct CopyArgs {
from_slot: Slot,
to_slot: Option<Slot>,
source_instance_name: String,
source_app_profile_id: String,
emulated_source: Option<String>,
source_credential_path: Option<String>,
destination_instance_name: String,
destination_app_profile_id: String,
emulated_destination: Option<String>,
destination_credential_path: Option<String>,
force: bool,
dry_run: bool,
}
impl CopyArgs {
pub fn process(arg_matches: &ArgMatches) -> Self {
CopyArgs {
from_slot: value_t!(arg_matches, "starting_slot", Slot).unwrap_or(0),
to_slot: value_t!(arg_matches, "ending_slot", Slot).ok(),
source_instance_name: value_t_or_exit!(arg_matches, "source_instance_name", String),
source_app_profile_id: value_t_or_exit!(arg_matches, "source_app_profile_id", String),
source_credential_path: value_t!(arg_matches, "source_credential_path", String).ok(),
emulated_source: value_t!(arg_matches, "emulated_source", String).ok(),
destination_instance_name: value_t_or_exit!(
arg_matches,
"destination_instance_name",
String
),
destination_app_profile_id: value_t_or_exit!(
arg_matches,
"destination_app_profile_id",
String
),
destination_credential_path: value_t!(
arg_matches,
"destination_credential_path",
String
)
.ok(),
emulated_destination: value_t!(arg_matches, "emulated_destination", String).ok(),
force: arg_matches.is_present("force"),
dry_run: arg_matches.is_present("dry_run"),
}
}
}
async fn copy(args: CopyArgs) -> Result<(), Box<dyn std::error::Error>> {
let from_slot = args.from_slot;
let to_slot = args.to_slot.unwrap_or(from_slot);
debug!("from_slot: {}, to_slot: {}", from_slot, to_slot);
if from_slot > to_slot {
return Err("starting slot should be less than or equal to ending slot")?;
}
let source_bigtable = get_bigtable(GetBigtableArgs {
read_only: true,
instance_name: args.source_instance_name,
app_profile_id: args.source_app_profile_id,
timeout: None,
emulated_source: args.emulated_source,
crediential_path: args.source_credential_path,
})
.await?;
let destination_bigtable = get_bigtable(GetBigtableArgs {
read_only: false,
instance_name: args.destination_instance_name,
app_profile_id: args.destination_app_profile_id,
timeout: None,
emulated_source: args.emulated_destination,
crediential_path: args.destination_credential_path,
})
.await?;
let (s, r) = unbounded::<u64>();
for i in from_slot..=to_slot {
s.send(i).unwrap();
}
let workers = min(to_slot - from_slot + 1, num_cpus::get().try_into().unwrap());
debug!("worker num: {}", workers);
let success_slots = Arc::new(Mutex::new(vec![]));
let skip_slots = Arc::new(Mutex::new(vec![]));
let block_not_found_slots = Arc::new(Mutex::new(vec![]));
let failed_slots = Arc::new(Mutex::new(vec![]));
let tasks = (0..workers)
.map(|i| {
let r = r.clone();
let source_bigtable_clone = source_bigtable.clone();
let destination_bigtable_clone = destination_bigtable.clone();
let success_slots_clone = Arc::clone(&success_slots);
let skip_slots_clone = Arc::clone(&skip_slots);
let block_not_found_slots_clone = Arc::clone(&block_not_found_slots);
let failed_slots_clone = Arc::clone(&failed_slots);
tokio::spawn(async move {
while let Ok(slot) = r.try_recv() {
debug!("worker {}: received slot {}", i, slot);
if !args.force {
match destination_bigtable_clone.confirmed_block_exists(slot).await {
Ok(exist) => {
if exist {
skip_slots_clone.lock().unwrap().push(slot);
continue;
}
}
Err(err) => {
error!("confirmed_block_exists() failed from the destination Bigtable, slot: {}, err: {}", slot, err);
failed_slots_clone.lock().unwrap().push(slot);
continue;
}
};
}
if args.dry_run {
match source_bigtable_clone.confirmed_block_exists(slot).await {
Ok(exist) => {
if exist {
debug!("will write block: {}", slot);
success_slots_clone.lock().unwrap().push(slot);
} else {
debug!("block not found, slot: {}", slot);
block_not_found_slots_clone.lock().unwrap().push(slot);
continue;
}
}
Err(err) => {
error!("failed to get a confirmed block from the source Bigtable, slot: {}, err: {}", slot, err);
failed_slots_clone.lock().unwrap().push(slot);
continue;
}
};
} else {
let confirmed_block =
match source_bigtable_clone.get_confirmed_block(slot).await {
Ok(block) => match VersionedConfirmedBlock::try_from(block) {
Ok(block) => block,
Err(err) => {
error!("failed to convert confirmed block to versioned confirmed block, slot: {}, err: {}", slot, err);
failed_slots_clone.lock().unwrap().push(slot);
continue;
}
},
Err(solana_storage_bigtable::Error::BlockNotFound(slot)) => {
debug!("block not found, slot: {}", slot);
block_not_found_slots_clone.lock().unwrap().push(slot);
continue;
}
Err(err) => {
error!("failed to get confirmed block, slot: {}, err: {}", slot, err);
failed_slots_clone.lock().unwrap().push(slot);
continue;
}
};
match destination_bigtable_clone
.upload_confirmed_block(slot, confirmed_block)
.await
{
Ok(()) => {
debug!("wrote block: {}", slot);
success_slots_clone.lock().unwrap().push(slot);
}
Err(err) => {
error!("write failed, slot: {}, err: {}", slot, err);
failed_slots_clone.lock().unwrap().push(slot);
continue;
}
}
}
}
debug!("worker {}: exit", i);
})
})
.collect::<FuturesUnordered<_>>();
futures::future::join_all(tasks).await;
let mut success_slots = success_slots.lock().unwrap();
success_slots.sort();
let mut skip_slots = skip_slots.lock().unwrap();
skip_slots.sort();
let mut block_not_found_slots = block_not_found_slots.lock().unwrap();
block_not_found_slots.sort();
let mut failed_slots = failed_slots.lock().unwrap();
failed_slots.sort();
debug!("success slots: {:?}", success_slots);
debug!("skip slots: {:?}", skip_slots);
debug!("blocks not found slots: {:?}", block_not_found_slots);
debug!("failed slots: {:?}", failed_slots);
println!(
"success: {}, skip: {}, block not found: {}, failed: {}",
success_slots.len(),
skip_slots.len(),
block_not_found_slots.len(),
failed_slots.len(),
);
Ok(())
}
struct GetBigtableArgs {
read_only: bool,
instance_name: String,
app_profile_id: String,
timeout: Option<std::time::Duration>,
emulated_source: Option<String>,
crediential_path: Option<String>,
}
async fn get_bigtable(
args: GetBigtableArgs,
) -> solana_storage_bigtable::Result<solana_storage_bigtable::LedgerStorage> {
if let Some(endpoint) = args.emulated_source {
solana_storage_bigtable::LedgerStorage::new_for_emulator(
&args.instance_name,
&args.app_profile_id,
&endpoint,
args.timeout,
)
} else {
solana_storage_bigtable::LedgerStorage::new_with_config(
solana_storage_bigtable::LedgerStorageConfig {
read_only: args.read_only,
timeout: args.timeout,
credential_type: CredentialType::Filepath(Some(args.crediential_path.unwrap())),
instance_name: args.instance_name,
app_profile_id: args.app_profile_id,
},
)
.await
}
}
pub trait BigTableSubCommand {
fn bigtable_subcommand(self) -> Self;
}
@ -590,6 +840,119 @@ impl BigTableSubCommand for App<'_, '_> {
.takes_value(false)
.help("Display the full transactions"),
),
)
.subcommand(
SubCommand::with_name("copy")
.about("Copy blocks from a Bigtable to another Bigtable")
.arg(
Arg::with_name("source_credential_path")
.long("source-credential-path")
.value_name("SOURCE_CREDENTIAL_PATH")
.takes_value(true)
.conflicts_with("emulated_source")
.help(
"Source Bigtable credential filepath (credential may be readonly)",
),
)
.arg(
Arg::with_name("emulated_source")
.long("emulated-source")
.value_name("EMULATED_SOURCE")
.takes_value(true)
.conflicts_with("source_credential_path")
.help(
"Source Bigtable emulated source",
),
)
.arg(
Arg::with_name("source_instance_name")
.long("source-instance-name")
.takes_value(true)
.value_name("SOURCE_INSTANCE_NAME")
.default_value(solana_storage_bigtable::DEFAULT_INSTANCE_NAME)
.help("Source Bigtable instance name")
)
.arg(
Arg::with_name("source_app_profile_id")
.long("source-app-profile-id")
.takes_value(true)
.value_name("SOURCE_APP_PROFILE_ID")
.default_value(solana_storage_bigtable::DEFAULT_APP_PROFILE_ID)
.help("Source Bigtable app profile id")
)
.arg(
Arg::with_name("destination_credential_path")
.long("destination-credential-path")
.value_name("DESTINATION_CREDENTIAL_PATH")
.takes_value(true)
.conflicts_with("emulated_destination")
.help(
"Destination Bigtable credential filepath (credential must have Bigtable write permissions)",
),
)
.arg(
Arg::with_name("emulated_destination")
.long("emulated-destination")
.value_name("EMULATED_DESTINATION")
.takes_value(true)
.conflicts_with("destination_credential_path")
.help(
"Destination Bigtable emulated destination",
),
)
.arg(
Arg::with_name("destination_instance_name")
.long("destination-instance-name")
.takes_value(true)
.value_name("DESTINATION_INSTANCE_NAME")
.default_value(solana_storage_bigtable::DEFAULT_INSTANCE_NAME)
.help("Destination Bigtable instance name")
)
.arg(
Arg::with_name("destination_app_profile_id")
.long("destination-app-profile-id")
.takes_value(true)
.value_name("DESTINATION_APP_PROFILE_ID")
.default_value(solana_storage_bigtable::DEFAULT_APP_PROFILE_ID)
.help("Destination Bigtable app profile id")
)
.arg(
Arg::with_name("starting_slot")
.long("starting-slot")
.validator(is_slot)
.value_name("START_SLOT")
.takes_value(true)
.required(true)
.help(
"Start copying at this slot",
),
)
.arg(
Arg::with_name("ending_slot")
.long("ending-slot")
.validator(is_slot)
.value_name("END_SLOT")
.takes_value(true)
.help("Stop copying at this slot (inclusive, START_SLOT ..= END_SLOT)"),
)
.arg(
Arg::with_name("force")
.long("force")
.value_name("FORCE")
.takes_value(false)
.help(
"Force copy of blocks already present in destination Bigtable instance",
),
)
.arg(
Arg::with_name("dry_run")
.long("dry-run")
.value_name("DRY_RUN")
.takes_value(false)
.help(
"Dry run. It won't upload any blocks",
),
)
),
)
}
@ -788,6 +1151,7 @@ pub fn bigtable_process_command(
config,
))
}
("copy", Some(arg_matches)) => runtime.block_on(copy(CopyArgs::process(arg_matches))),
_ => unreachable!(),
};

View File

@ -136,16 +136,7 @@ impl BigTableConnection {
match std::env::var("BIGTABLE_EMULATOR_HOST") {
Ok(endpoint) => {
info!("Connecting to bigtable emulator at {}", endpoint);
Ok(Self {
access_token: None,
channel: tonic::transport::Channel::from_shared(format!("http://{endpoint}"))
.map_err(|err| Error::InvalidUri(endpoint, err.to_string()))?
.connect_lazy(),
table_prefix: format!("projects/emulator/instances/{instance_name}/tables/"),
app_profile_id: app_profile_id.to_string(),
timeout,
})
Self::new_for_emulator(instance_name, app_profile_id, &endpoint, timeout)
}
Err(_) => {
@ -215,6 +206,23 @@ impl BigTableConnection {
}
}
pub fn new_for_emulator(
instance_name: &str,
app_profile_id: &str,
endpoint: &str,
timeout: Option<Duration>,
) -> Result<Self> {
Ok(Self {
access_token: None,
channel: tonic::transport::Channel::from_shared(format!("http://{endpoint}"))
.map_err(|err| Error::InvalidUri(String::from(endpoint), err.to_string()))?
.connect_lazy(),
table_prefix: format!("projects/emulator/instances/{instance_name}/tables/"),
app_profile_id: app_profile_id.to_string(),
timeout,
})
}
/// Create a new BigTable client.
///
/// Clients require `&mut self`, due to `Tonic::transport::Channel` limitations, however

View File

@ -24,6 +24,7 @@ use {
std::{
collections::{HashMap, HashSet},
convert::TryInto,
time::Duration,
},
thiserror::Error,
tokio::task::JoinError,
@ -418,6 +419,22 @@ impl LedgerStorage {
.await
}
pub fn new_for_emulator(
instance_name: &str,
app_profile_id: &str,
endpoint: &str,
timeout: Option<Duration>,
) -> Result<Self> {
Ok(Self {
connection: bigtable::BigTableConnection::new_for_emulator(
instance_name,
app_profile_id,
endpoint,
timeout,
)?,
})
}
pub async fn new_with_config(config: LedgerStorageConfig) -> Result<Self> {
let LedgerStorageConfig {
read_only,

View File

@ -604,6 +604,12 @@ pub struct Reward {
pub type Rewards = Vec<Reward>;
#[derive(Debug, Error)]
pub enum ConvertBlockError {
#[error("transactions missing after converted, before: {0}, after: {1}")]
TransactionsMissing(usize, usize),
}
#[derive(Clone, Debug, PartialEq)]
pub struct ConfirmedBlock {
pub previous_blockhash: String,
@ -646,6 +652,40 @@ impl From<VersionedConfirmedBlock> for ConfirmedBlock {
}
}
impl TryFrom<ConfirmedBlock> for VersionedConfirmedBlock {
type Error = ConvertBlockError;
fn try_from(block: ConfirmedBlock) -> Result<Self, Self::Error> {
let expected_transaction_count = block.transactions.len();
let txs: Vec<_> = block
.transactions
.into_iter()
.filter_map(|tx| match tx {
TransactionWithStatusMeta::MissingMetadata(_) => None,
TransactionWithStatusMeta::Complete(tx) => Some(tx),
})
.collect();
if txs.len() != expected_transaction_count {
return Err(ConvertBlockError::TransactionsMissing(
expected_transaction_count,
txs.len(),
));
}
Ok(Self {
previous_blockhash: block.previous_blockhash,
blockhash: block.blockhash,
parent_slot: block.parent_slot,
transactions: txs,
rewards: block.rewards,
block_time: block.block_time,
block_height: block.block_height,
})
}
}
impl ConfirmedBlock {
pub fn encode_with_options(
self,