solana-tokens: optimize PickleDb dumps (#13879)

* Dump PickleDb after transaction sends/confirmation

* Dump PickleDb on ctrlc

* Don't exit during tests

* Add build_messages helper and test db dump

* Add send_messages helper and test db dump

* Add combined test

* Add log_transaction_confirmations helper and test db dump

* Add update_finalized_transactions test

* Return error instead of process::exit

* Close TestValidator
This commit is contained in:
Tyera Eulberg 2020-12-01 19:22:27 -07:00 committed by GitHub
parent 0a8bc347a1
commit 8c40dd34b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 566 additions and 34 deletions

1
Cargo.lock generated
View File

@ -4985,6 +4985,7 @@ dependencies = [
"clap",
"console",
"csv",
"ctrlc",
"dirs-next",
"indexmap",
"indicatif",

View File

@ -13,6 +13,7 @@ chrono = { version = "0.4", features = ["serde"] }
clap = "2.33.0"
console = "0.11.3"
csv = "1.1.3"
ctrlc = { version = "3.1.5", features = ["termination"] }
dirs-next = "2.0.0"
indexmap = "1.5.1"
indicatif = "0.15.0"

View File

@ -12,9 +12,7 @@ use solana_clap_utils::{
use solana_cli_config::CONFIG_FILE;
use solana_remote_wallet::remote_wallet::maybe_wallet_manager;
use solana_sdk::native_token::sol_to_lamports;
use std::error::Error;
use std::ffi::OsString;
use std::process::exit;
use std::{error::Error, ffi::OsString, process::exit};
fn get_matches<'a, I, T>(args: I) -> ArgMatches<'a>
where

View File

@ -21,6 +21,7 @@ use solana_client::{
rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
};
use solana_sdk::{
clock::Slot,
commitment_config::CommitmentConfig,
instruction::Instruction,
message::Message,
@ -33,11 +34,16 @@ use solana_stake_program::{
stake_instruction::{self, LockupArgs},
stake_state::{Authorized, Lockup, StakeAuthorize},
};
use solana_transaction_status::TransactionStatus;
use spl_associated_token_account_v1_0::get_associated_token_address;
use spl_token_v2_0::solana_program::program_error::ProgramError;
use std::{
cmp::{self},
io,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::sleep,
time::Duration,
};
@ -83,6 +89,8 @@ impl From<Vec<FundingSource>> for FundingSources {
}
}
type StakeExtras = Vec<(Keypair, Option<DateTime<Utc>>)>;
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("I/O error")]
@ -99,6 +107,8 @@ pub enum Error {
InsufficientFunds(FundingSources, f64),
#[error("Program error")]
ProgramError(#[from] ProgramError),
#[error("Exit signal received")]
ExitSignal,
}
fn merge_allocations(allocations: &[Allocation]) -> Vec<Allocation> {
@ -239,17 +249,21 @@ fn distribution_instructions(
instructions
}
fn distribute_allocations(
fn build_messages(
client: &RpcClient,
db: &mut PickleDb,
allocations: &[Allocation],
args: &DistributeTokensArgs,
exit: Arc<AtomicBool>,
messages: &mut Vec<Message>,
stake_extras: &mut StakeExtras,
created_accounts: &mut u64,
) -> Result<(), Error> {
type StakeExtras = Vec<(Keypair, Option<DateTime<Utc>>)>;
let mut messages: Vec<Message> = vec![];
let mut stake_extras: StakeExtras = vec![];
let mut created_accounts = 0;
for allocation in allocations.iter() {
if exit.load(Ordering::SeqCst) {
db.dump()?;
return Err(Error::ExitSignal);
}
let new_stake_account_keypair = Keypair::new();
let lockup_date = if allocation.lockup_date == "" {
None
@ -270,7 +284,7 @@ fn distribute_allocations(
)])?[0]
.is_none();
if do_create_associated_token_account {
created_accounts += 1;
*created_accounts += 1;
}
(
token_amount_to_ui_amount(allocation.amount, spl_token_args.decimals).ui_amount,
@ -296,20 +310,25 @@ fn distribute_allocations(
messages.push(message);
stake_extras.push((new_stake_account_keypair, lockup_date));
}
Ok(())
}
let num_signatures = messages
.iter()
.map(|message| message.header.num_required_signatures as usize)
.sum();
if args.spl_token_args.is_some() {
check_spl_token_balances(num_signatures, allocations, client, args, created_accounts)?;
} else {
check_payer_balances(num_signatures, allocations, client, args)?;
}
fn send_messages(
client: &RpcClient,
db: &mut PickleDb,
allocations: &[Allocation],
args: &DistributeTokensArgs,
exit: Arc<AtomicBool>,
messages: Vec<Message>,
stake_extras: StakeExtras,
) -> Result<(), Error> {
for ((allocation, message), (new_stake_account_keypair, lockup_date)) in
allocations.iter().zip(messages).zip(stake_extras)
{
if exit.load(Ordering::SeqCst) {
db.dump()?;
return Err(Error::ExitSignal);
}
let new_stake_account_address = new_stake_account_keypair.pubkey();
let mut signers = vec![&*args.fee_payer, &*args.sender_keypair];
@ -363,6 +382,44 @@ fn distribute_allocations(
Ok(())
}
fn distribute_allocations(
client: &RpcClient,
db: &mut PickleDb,
allocations: &[Allocation],
args: &DistributeTokensArgs,
exit: Arc<AtomicBool>,
) -> Result<(), Error> {
let mut messages: Vec<Message> = vec![];
let mut stake_extras: StakeExtras = vec![];
let mut created_accounts = 0;
build_messages(
client,
db,
allocations,
args,
exit.clone(),
&mut messages,
&mut stake_extras,
&mut created_accounts,
)?;
let num_signatures = messages
.iter()
.map(|message| message.header.num_required_signatures as usize)
.sum();
if args.spl_token_args.is_some() {
check_spl_token_balances(num_signatures, allocations, client, args, created_accounts)?;
} else {
check_payer_balances(num_signatures, allocations, client, args)?;
}
send_messages(client, db, allocations, args, exit, messages, stake_extras)?;
db.dump()?;
Ok(())
}
fn read_allocations(
input_csv: &str,
transfer_amount: Option<u64>,
@ -437,6 +494,7 @@ fn new_spinner_progress_bar() -> ProgressBar {
pub fn process_allocations(
client: &RpcClient,
args: &DistributeTokensArgs,
exit: Arc<AtomicBool>,
) -> Result<Option<usize>, Error> {
let require_lockup_heading = args.stake_args.is_some();
let mut allocations: Vec<Allocation> = read_allocations(
@ -461,7 +519,7 @@ pub fn process_allocations(
let mut db = db::open_db(&args.transaction_db, args.dry_run)?;
// Start by finalizing any transactions from the previous run.
let confirmations = finalize_transactions(client, &mut db, args.dry_run)?;
let confirmations = finalize_transactions(client, &mut db, args.dry_run, exit.clone())?;
let transaction_infos = db::read_transaction_infos(&db);
apply_previous_transactions(&mut allocations, &transaction_infos);
@ -502,9 +560,9 @@ pub fn process_allocations(
style(format!("{:<44} {:>24}", "Recipient", "Expected Balance",)).bold()
);
distribute_allocations(client, &mut db, &allocations, args)?;
distribute_allocations(client, &mut db, &allocations, args, exit.clone())?;
let opt_confirmations = finalize_transactions(client, &mut db, args.dry_run)?;
let opt_confirmations = finalize_transactions(client, &mut db, args.dry_run, exit)?;
if !args.dry_run {
if let Some(output_path) = &args.output_path {
@ -519,12 +577,13 @@ fn finalize_transactions(
client: &RpcClient,
db: &mut PickleDb,
dry_run: bool,
exit: Arc<AtomicBool>,
) -> Result<Option<usize>, Error> {
if dry_run {
return Ok(None);
}
let mut opt_confirmations = update_finalized_transactions(client, db)?;
let mut opt_confirmations = update_finalized_transactions(client, db, exit.clone())?;
let progress_bar = new_spinner_progress_bar();
@ -538,7 +597,7 @@ fn finalize_transactions(
// Sleep for about 1 slot
sleep(Duration::from_millis(500));
let opt_conf = update_finalized_transactions(client, db)?;
let opt_conf = update_finalized_transactions(client, db, exit.clone())?;
opt_confirmations = opt_conf;
}
@ -550,6 +609,7 @@ fn finalize_transactions(
fn update_finalized_transactions(
client: &RpcClient,
db: &mut PickleDb,
exit: Arc<AtomicBool>,
) -> Result<Option<usize>, Error> {
let transaction_infos = db::read_transaction_infos(db);
let unconfirmed_transactions: Vec<_> = transaction_infos
@ -578,9 +638,29 @@ fn update_finalized_transactions(
.into_iter(),
);
}
let root_slot = client.get_slot()?;
let mut confirmations = None;
log_transaction_confirmations(
client,
db,
exit,
unconfirmed_transactions,
statuses,
&mut confirmations,
)?;
db.dump()?;
Ok(confirmations)
}
fn log_transaction_confirmations(
client: &RpcClient,
db: &mut PickleDb,
exit: Arc<AtomicBool>,
unconfirmed_transactions: Vec<(&Transaction, Slot)>,
statuses: Vec<Option<TransactionStatus>>,
confirmations: &mut Option<usize>,
) -> Result<(), Error> {
let root_slot = client.get_slot()?;
for ((transaction, last_valid_slot), opt_transaction_status) in unconfirmed_transactions
.into_iter()
.zip(statuses.into_iter())
@ -593,14 +673,18 @@ fn update_finalized_transactions(
root_slot,
) {
Ok(Some(confs)) => {
confirmations = Some(cmp::min(confs, confirmations.unwrap_or(usize::MAX)));
*confirmations = Some(cmp::min(confs, confirmations.unwrap_or(usize::MAX)));
}
result => {
result?;
}
}
if exit.load(Ordering::SeqCst) {
db.dump()?;
return Err(Error::ExitSignal);
}
Ok(confirmations)
}
Ok(())
}
fn check_payer_balances(
@ -742,6 +826,7 @@ pub fn test_process_distribute_tokens_with_client(
sender_keypair: Keypair,
transfer_amount: Option<u64>,
) {
let exit = Arc::new(AtomicBool::default());
let fee_payer = Keypair::new();
let transaction = transfer(
client,
@ -797,7 +882,7 @@ pub fn test_process_distribute_tokens_with_client(
spl_token_args: None,
transfer_amount,
};
let confirmations = process_allocations(client, &args).unwrap();
let confirmations = process_allocations(client, &args, exit.clone()).unwrap();
assert_eq!(confirmations, None);
let transaction_infos =
@ -811,7 +896,7 @@ pub fn test_process_distribute_tokens_with_client(
check_output_file(&output_path, &db::open_db(&transaction_db, true).unwrap());
// Now, run it again, and check there's no double-spend.
process_allocations(client, &args).unwrap();
process_allocations(client, &args, exit).unwrap();
let transaction_infos =
db::read_transaction_infos(&db::open_db(&transaction_db, true).unwrap());
assert_eq!(transaction_infos.len(), 1);
@ -824,6 +909,7 @@ pub fn test_process_distribute_tokens_with_client(
}
pub fn test_process_distribute_stake_with_client(client: &RpcClient, sender_keypair: Keypair) {
let exit = Arc::new(AtomicBool::default());
let fee_payer = Keypair::new();
let transaction = transfer(
client,
@ -905,7 +991,7 @@ pub fn test_process_distribute_stake_with_client(client: &RpcClient, sender_keyp
sender_keypair: Box::new(sender_keypair),
transfer_amount: None,
};
let confirmations = process_allocations(client, &args).unwrap();
let confirmations = process_allocations(client, &args, exit.clone()).unwrap();
assert_eq!(confirmations, None);
let transaction_infos =
@ -927,7 +1013,7 @@ pub fn test_process_distribute_stake_with_client(client: &RpcClient, sender_keyp
check_output_file(&output_path, &db::open_db(&transaction_db, true).unwrap());
// Now, run it again, and check there's no double-spend.
process_allocations(client, &args).unwrap();
process_allocations(client, &args, exit).unwrap();
let transaction_infos =
db::read_transaction_infos(&db::open_db(&transaction_db, true).unwrap());
assert_eq!(transaction_infos.len(), 1);
@ -1696,4 +1782,433 @@ mod tests {
test_validator.close();
}
#[test]
fn test_build_messages_dump_db() {
let client = RpcClient::new_mock("mock_client".to_string());
let dir = tempdir().unwrap();
let db_file = dir
.path()
.join("build_messages.db")
.to_str()
.unwrap()
.to_string();
let mut db = db::open_db(&db_file, false).unwrap();
let sender = Keypair::new();
let recipient = Pubkey::new_unique();
let amount = sol_to_lamports(1.0);
let last_valid_slot = 222;
let transaction = transfer(&client, amount, &sender, &recipient).unwrap();
// Queue db data
db::set_transaction_info(
&mut db,
&recipient,
amount,
&transaction,
None,
false,
last_valid_slot,
None,
)
.unwrap();
// Check that data has not been dumped
let read_db = db::open_db(&db_file, true).unwrap();
assert!(db::read_transaction_infos(&read_db).is_empty());
// This is just dummy data; Args will not affect messages built
let args = DistributeTokensArgs {
sender_keypair: Box::new(Keypair::new()),
fee_payer: Box::new(Keypair::new()),
dry_run: true,
input_csv: "".to_string(),
transaction_db: "".to_string(),
output_path: None,
stake_args: None,
spl_token_args: None,
transfer_amount: None,
};
let allocation = Allocation {
recipient: recipient.to_string(),
amount: sol_to_lamports(1.0),
lockup_date: "".to_string(),
};
let mut messages: Vec<Message> = vec![];
let mut stake_extras: StakeExtras = vec![];
let mut created_accounts = 0;
// Exit false will not dump data
build_messages(
&client,
&mut db,
&[allocation.clone()],
&args,
Arc::new(AtomicBool::new(false)),
&mut messages,
&mut stake_extras,
&mut created_accounts,
)
.unwrap();
let read_db = db::open_db(&db_file, true).unwrap();
assert!(db::read_transaction_infos(&read_db).is_empty());
assert_eq!(messages.len(), 1);
// Empty allocations will not dump data
let mut messages: Vec<Message> = vec![];
let exit = Arc::new(AtomicBool::new(true));
build_messages(
&client,
&mut db,
&[],
&args,
exit.clone(),
&mut messages,
&mut stake_extras,
&mut created_accounts,
)
.unwrap();
let read_db = db::open_db(&db_file, true).unwrap();
assert!(db::read_transaction_infos(&read_db).is_empty());
assert!(messages.is_empty());
// Any allocation should prompt data dump
let mut messages: Vec<Message> = vec![];
build_messages(
&client,
&mut db,
&[allocation],
&args,
exit,
&mut messages,
&mut stake_extras,
&mut created_accounts,
)
.unwrap_err();
let read_db = db::open_db(&db_file, true).unwrap();
let transaction_info = db::read_transaction_infos(&read_db);
assert_eq!(transaction_info.len(), 1);
assert_eq!(
transaction_info[0],
TransactionInfo {
recipient,
amount,
new_stake_account_address: None,
finalized_date: None,
transaction,
last_valid_slot,
lockup_date: None,
}
);
assert_eq!(messages.len(), 0);
}
#[test]
fn test_send_messages_dump_db() {
let client = RpcClient::new_mock("mock_client".to_string());
let dir = tempdir().unwrap();
let db_file = dir
.path()
.join("send_messages.db")
.to_str()
.unwrap()
.to_string();
let mut db = db::open_db(&db_file, false).unwrap();
let sender = Keypair::new();
let recipient = Pubkey::new_unique();
let amount = sol_to_lamports(1.0);
let last_valid_slot = 222;
let transaction = transfer(&client, amount, &sender, &recipient).unwrap();
// Queue db data
db::set_transaction_info(
&mut db,
&recipient,
amount,
&transaction,
None,
false,
last_valid_slot,
None,
)
.unwrap();
// Check that data has not been dumped
let read_db = db::open_db(&db_file, true).unwrap();
assert!(db::read_transaction_infos(&read_db).is_empty());
// This is just dummy data; Args will not affect messages
let args = DistributeTokensArgs {
sender_keypair: Box::new(Keypair::new()),
fee_payer: Box::new(Keypair::new()),
dry_run: true,
input_csv: "".to_string(),
transaction_db: "".to_string(),
output_path: None,
stake_args: None,
spl_token_args: None,
transfer_amount: None,
};
let allocation = Allocation {
recipient: recipient.to_string(),
amount: sol_to_lamports(1.0),
lockup_date: "".to_string(),
};
let message = transaction.message.clone();
// Exit false will not dump data
send_messages(
&client,
&mut db,
&[allocation.clone()],
&args,
Arc::new(AtomicBool::new(false)),
vec![message.clone()],
vec![(Keypair::new(), None)],
)
.unwrap();
let read_db = db::open_db(&db_file, true).unwrap();
assert!(db::read_transaction_infos(&read_db).is_empty());
// The method above will, however, write a record to the in-memory db
// Grab that expected value to test successful dump
let num_records = db::read_transaction_infos(&db).len();
// Empty messages/allocations will not dump data
let exit = Arc::new(AtomicBool::new(true));
send_messages(&client, &mut db, &[], &args, exit.clone(), vec![], vec![]).unwrap();
let read_db = db::open_db(&db_file, true).unwrap();
assert!(db::read_transaction_infos(&read_db).is_empty());
// Message/allocation should prompt data dump at start of loop
send_messages(
&client,
&mut db,
&[allocation],
&args,
exit,
vec![message.clone()],
vec![(Keypair::new(), None)],
)
.unwrap_err();
let read_db = db::open_db(&db_file, true).unwrap();
let transaction_info = db::read_transaction_infos(&read_db);
assert_eq!(transaction_info.len(), num_records);
assert!(transaction_info.contains(&TransactionInfo {
recipient,
amount,
new_stake_account_address: None,
finalized_date: None,
transaction,
last_valid_slot,
lockup_date: None,
}));
assert!(transaction_info.contains(&TransactionInfo {
recipient,
amount,
new_stake_account_address: None,
finalized_date: None,
transaction: Transaction::new_unsigned(message),
last_valid_slot: std::u64::MAX,
lockup_date: None,
}));
// Next dump should write record written in last send_messages call
let num_records = db::read_transaction_infos(&db).len();
db.dump().unwrap();
let read_db = db::open_db(&db_file, true).unwrap();
let transaction_info = db::read_transaction_infos(&read_db);
assert_eq!(transaction_info.len(), num_records);
}
#[test]
fn test_distribute_allocations_dump_db() {
let test_validator = TestValidator::with_no_fees();
let sender_keypair = test_validator.mint_keypair();
let url = test_validator.rpc_url();
let client = RpcClient::new_with_commitment(url, CommitmentConfig::recent());
let fee_payer = Keypair::new();
let transaction = transfer(
&client,
sol_to_lamports(1.0),
&sender_keypair,
&fee_payer.pubkey(),
)
.unwrap();
client
.send_and_confirm_transaction_with_spinner(&transaction)
.unwrap();
let dir = tempdir().unwrap();
let db_file = dir
.path()
.join("dist_allocations.db")
.to_str()
.unwrap()
.to_string();
let mut db = db::open_db(&db_file, false).unwrap();
let recipient = Pubkey::new_unique();
let allocation = Allocation {
recipient: recipient.to_string(),
amount: sol_to_lamports(1.0),
lockup_date: "".to_string(),
};
// This is just dummy data; Args will not affect messages
let args = DistributeTokensArgs {
sender_keypair: Box::new(sender_keypair),
fee_payer: Box::new(fee_payer),
dry_run: true,
input_csv: "".to_string(),
transaction_db: "".to_string(),
output_path: None,
stake_args: None,
spl_token_args: None,
transfer_amount: None,
};
let exit = Arc::new(AtomicBool::new(false));
// Ensure data is always dumped after distribute_allocations
distribute_allocations(&client, &mut db, &[allocation], &args, exit).unwrap();
let read_db = db::open_db(&db_file, true).unwrap();
let transaction_info = db::read_transaction_infos(&read_db);
assert_eq!(transaction_info.len(), 1);
test_validator.close();
}
#[test]
fn test_log_transaction_confirmations_dump_db() {
let client = RpcClient::new_mock("mock_client".to_string());
let dir = tempdir().unwrap();
let db_file = dir
.path()
.join("log_transaction_confirmations.db")
.to_str()
.unwrap()
.to_string();
let mut db = db::open_db(&db_file, false).unwrap();
let sender = Keypair::new();
let recipient = Pubkey::new_unique();
let amount = sol_to_lamports(1.0);
let last_valid_slot = 222;
let transaction = transfer(&client, amount, &sender, &recipient).unwrap();
// Queue unconfirmed transaction into db
db::set_transaction_info(
&mut db,
&recipient,
amount,
&transaction,
None,
false,
last_valid_slot,
None,
)
.unwrap();
// Check that data has not been dumped
let read_db = db::open_db(&db_file, true).unwrap();
assert!(db::read_transaction_infos(&read_db).is_empty());
// Empty unconfirmed_transactions will not dump data
let mut confirmations = None;
let exit = Arc::new(AtomicBool::new(true));
log_transaction_confirmations(
&client,
&mut db,
exit.clone(),
vec![],
vec![],
&mut confirmations,
)
.unwrap();
let read_db = db::open_db(&db_file, true).unwrap();
assert!(db::read_transaction_infos(&read_db).is_empty());
assert_eq!(confirmations, None);
// Exit false will not dump data
log_transaction_confirmations(
&client,
&mut db,
Arc::new(AtomicBool::new(false)),
vec![(&transaction, 111)],
vec![Some(TransactionStatus {
slot: 40,
confirmations: Some(15),
status: Ok(()),
err: None,
})],
&mut confirmations,
)
.unwrap();
let read_db = db::open_db(&db_file, true).unwrap();
assert!(db::read_transaction_infos(&read_db).is_empty());
assert_eq!(confirmations, Some(15));
// Exit true should dump data
log_transaction_confirmations(
&client,
&mut db,
exit,
vec![(&transaction, 111)],
vec![Some(TransactionStatus {
slot: 55,
confirmations: None,
status: Ok(()),
err: None,
})],
&mut confirmations,
)
.unwrap_err();
let read_db = db::open_db(&db_file, true).unwrap();
let transaction_info = db::read_transaction_infos(&read_db);
assert_eq!(transaction_info.len(), 1);
assert!(transaction_info[0].finalized_date.is_some());
}
#[test]
fn test_update_finalized_transactions_dump_db() {
let client = RpcClient::new_mock("mock_client".to_string());
let dir = tempdir().unwrap();
let db_file = dir
.path()
.join("update_finalized_transactions.db")
.to_str()
.unwrap()
.to_string();
let mut db = db::open_db(&db_file, false).unwrap();
let sender = Keypair::new();
let recipient = Pubkey::new_unique();
let amount = sol_to_lamports(1.0);
let last_valid_slot = 222;
let transaction = transfer(&client, amount, &sender, &recipient).unwrap();
// Queue unconfirmed transaction into db
db::set_transaction_info(
&mut db,
&recipient,
amount,
&transaction,
None,
false,
last_valid_slot,
None,
)
.unwrap();
// Ensure data is always dumped after update_finalized_transactions
let confs =
update_finalized_transactions(&client, &mut db, Arc::new(AtomicBool::new(false)))
.unwrap();
let read_db = db::open_db(&db_file, true).unwrap();
let transaction_info = db::read_transaction_infos(&read_db);
assert_eq!(transaction_info.len(), 1);
assert_eq!(confs, None);
}
}

View File

@ -48,7 +48,7 @@ pub fn open_db(path: &str, dry_run: bool) -> Result<PickleDb, Error> {
let policy = if dry_run {
PickleDbDumpPolicy::NeverDump
} else {
PickleDbDumpPolicy::AutoDump
PickleDbDumpPolicy::DumpUponRequest
};
let path = Path::new(path);
let db = if path.exists() {

View File

@ -1,7 +1,16 @@
use solana_cli_config::{Config, CONFIG_FILE};
use solana_client::rpc_client::RpcClient;
use solana_tokens::{arg_parser::parse_args, args::Command, commands, spl_token};
use std::{env, error::Error, path::Path, process};
use std::{
env,
error::Error,
path::Path,
process,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
fn main() -> Result<(), Box<dyn Error>> {
let command_args = parse_args(env::args_os())?;
@ -18,10 +27,18 @@ fn main() -> Result<(), Box<dyn Error>> {
let json_rpc_url = command_args.url.unwrap_or(config.json_rpc_url);
let client = RpcClient::new(json_rpc_url);
let exit = Arc::new(AtomicBool::default());
let _exit = exit.clone();
// Initialize CTRL-C handler to ensure db changes are written before exit.
ctrlc::set_handler(move || {
_exit.store(true, Ordering::SeqCst);
})
.expect("Error setting Ctrl-C handler");
match command_args.command {
Command::DistributeTokens(mut args) => {
spl_token::update_token_args(&client, &mut args.spl_token_args)?;
commands::process_allocations(&client, &args)?;
commands::process_allocations(&client, &args, exit)?;
}
Command::Balances(mut args) => {
spl_token::update_decimals(&client, &mut args.spl_token_args)?;