Add validator flag to opt in to cpi and logs storage (#14922)

* Add validator flag to opt in to cpi and logs storage

* Default TestValidator to opt-in; allow using in multinode-demo

* No clone

Co-authored-by: Carl Lin <carl@solana.com>
This commit is contained in:
Tyera Eulberg 2021-02-01 14:00:51 -07:00 committed by GitHub
parent 73d9186502
commit cbb8b79a60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 81 additions and 24 deletions

View File

@ -593,7 +593,7 @@ impl BankingStage {
); );
bank_utils::find_and_send_votes(txs, &tx_results, Some(gossip_vote_sender)); bank_utils::find_and_send_votes(txs, &tx_results, Some(gossip_vote_sender));
if let Some(sender) = transaction_status_sender { if let Some(transaction_status_sender) = transaction_status_sender {
let post_balances = bank.collect_balances(batch); let post_balances = bank.collect_balances(batch);
let post_token_balances = collect_token_balances(&bank, &batch, &mut mint_decimals); let post_token_balances = collect_token_balances(&bank, &batch, &mut mint_decimals);
send_transaction_status_batch( send_transaction_status_batch(
@ -605,7 +605,7 @@ impl BankingStage {
TransactionTokenBalancesSet::new(pre_token_balances, post_token_balances), TransactionTokenBalancesSet::new(pre_token_balances, post_token_balances),
inner_instructions, inner_instructions,
transaction_logs, transaction_logs,
sender, transaction_status_sender,
); );
} }
} }
@ -2074,7 +2074,10 @@ mod tests {
&transactions, &transactions,
&poh_recorder, &poh_recorder,
0, 0,
Some(transaction_status_sender), Some(TransactionStatusSender {
sender: transaction_status_sender,
enable_cpi_and_log_storage: false,
}),
&gossip_vote_sender, &gossip_vote_sender,
); );

View File

@ -2746,7 +2746,10 @@ pub(crate) mod tests {
&bank, &bank,
&entries, &entries,
true, true,
Some(transaction_status_sender), Some(TransactionStatusSender {
sender: transaction_status_sender,
enable_cpi_and_log_storage: false,
}),
Some(&replay_vote_sender), Some(&replay_vote_sender),
); );

View File

@ -109,6 +109,7 @@ pub struct JsonRpcConfig {
pub enable_validator_exit: bool, pub enable_validator_exit: bool,
pub enable_set_log_filter: bool, pub enable_set_log_filter: bool,
pub enable_rpc_transaction_history: bool, pub enable_rpc_transaction_history: bool,
pub enable_cpi_and_log_storage: bool,
pub identity_pubkey: Pubkey, pub identity_pubkey: Pubkey,
pub faucet_addr: Option<SocketAddr>, pub faucet_addr: Option<SocketAddr>,
pub health_check_slot_distance: u64, pub health_check_slot_distance: u64,

View File

@ -2,7 +2,7 @@ use crossbeam_channel::{Receiver, RecvTimeoutError};
use itertools::izip; use itertools::izip;
use solana_ledger::{blockstore::Blockstore, blockstore_processor::TransactionStatusBatch}; use solana_ledger::{blockstore::Blockstore, blockstore_processor::TransactionStatusBatch};
use solana_runtime::{ use solana_runtime::{
bank::{Bank, NonceRollbackInfo}, bank::{Bank, InnerInstructionsList, NonceRollbackInfo, TransactionLogMessages},
transaction_utils::OrderedIterator, transaction_utils::OrderedIterator,
}; };
use solana_transaction_status::{InnerInstructions, TransactionStatusMeta}; use solana_transaction_status::{InnerInstructions, TransactionStatusMeta};
@ -60,6 +60,18 @@ impl TransactionStatusService {
} = write_transaction_status_receiver.recv_timeout(Duration::from_secs(1))?; } = write_transaction_status_receiver.recv_timeout(Duration::from_secs(1))?;
let slot = bank.slot(); let slot = bank.slot();
let inner_instructions_iter: Box<dyn Iterator<Item = Option<InnerInstructionsList>>> =
if let Some(inner_instructions) = inner_instructions {
Box::new(inner_instructions.into_iter())
} else {
Box::new(std::iter::repeat_with(|| None))
};
let transaction_logs_iter: Box<dyn Iterator<Item = TransactionLogMessages>> =
if let Some(transaction_logs) = transaction_logs {
Box::new(transaction_logs.into_iter())
} else {
Box::new(std::iter::repeat_with(Vec::new))
};
for ( for (
(_, transaction), (_, transaction),
(status, nonce_rollback), (status, nonce_rollback),
@ -76,8 +88,8 @@ impl TransactionStatusService {
balances.post_balances, balances.post_balances,
token_balances.pre_token_balances, token_balances.pre_token_balances,
token_balances.post_token_balances, token_balances.post_token_balances,
inner_instructions, inner_instructions_iter,
transaction_logs transaction_logs_iter
) { ) {
if Bank::can_commit(&status) && !transaction.signatures.is_empty() { if Bank::can_commit(&status) && !transaction.signatures.is_empty() {
let fee_calculator = nonce_rollback let fee_calculator = nonce_rollback

View File

@ -984,7 +984,11 @@ fn new_banks_from_ledger(
let blockstore = Arc::new(blockstore); let blockstore = Arc::new(blockstore);
let transaction_history_services = let transaction_history_services =
if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history { if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
initialize_rpc_transaction_history_services(blockstore.clone(), exit) initialize_rpc_transaction_history_services(
blockstore.clone(),
exit,
config.rpc_config.enable_cpi_and_log_storage,
)
} else { } else {
TransactionHistoryServices::default() TransactionHistoryServices::default()
}; };
@ -1153,9 +1157,13 @@ fn backup_and_clear_blockstore(ledger_path: &Path, start_slot: Slot, shred_versi
fn initialize_rpc_transaction_history_services( fn initialize_rpc_transaction_history_services(
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
enable_cpi_and_log_storage: bool,
) -> TransactionHistoryServices { ) -> TransactionHistoryServices {
let (transaction_status_sender, transaction_status_receiver) = unbounded(); let (transaction_status_sender, transaction_status_receiver) = unbounded();
let transaction_status_sender = Some(transaction_status_sender); let transaction_status_sender = Some(TransactionStatusSender {
sender: transaction_status_sender,
enable_cpi_and_log_storage,
});
let transaction_status_service = Some(TransactionStatusService::new( let transaction_status_service = Some(TransactionStatusService::new(
transaction_status_receiver, transaction_status_receiver,
blockstore.clone(), blockstore.clone(),

View File

@ -136,7 +136,7 @@ fn execute_batch(
.. ..
} = tx_results; } = tx_results;
if let Some(sender) = transaction_status_sender { if let Some(transaction_status_sender) = transaction_status_sender {
let post_token_balances = if record_token_balances { let post_token_balances = if record_token_balances {
collect_token_balances(&bank, &batch, &mut mint_decimals) collect_token_balances(&bank, &batch, &mut mint_decimals)
} else { } else {
@ -155,7 +155,7 @@ fn execute_batch(
token_balances, token_balances,
inner_instructions, inner_instructions,
transaction_logs, transaction_logs,
sender, transaction_status_sender,
); );
} }
@ -1093,11 +1093,15 @@ pub struct TransactionStatusBatch {
pub statuses: Vec<TransactionExecutionResult>, pub statuses: Vec<TransactionExecutionResult>,
pub balances: TransactionBalancesSet, pub balances: TransactionBalancesSet,
pub token_balances: TransactionTokenBalancesSet, pub token_balances: TransactionTokenBalancesSet,
pub inner_instructions: Vec<Option<InnerInstructionsList>>, pub inner_instructions: Option<Vec<Option<InnerInstructionsList>>>,
pub transaction_logs: Vec<TransactionLogMessages>, pub transaction_logs: Option<Vec<TransactionLogMessages>>,
} }
pub type TransactionStatusSender = Sender<TransactionStatusBatch>; #[derive(Clone)]
pub struct TransactionStatusSender {
pub sender: Sender<TransactionStatusBatch>,
pub enable_cpi_and_log_storage: bool,
}
pub fn send_transaction_status_batch( pub fn send_transaction_status_batch(
bank: Arc<Bank>, bank: Arc<Bank>,
@ -1111,16 +1115,25 @@ pub fn send_transaction_status_batch(
transaction_status_sender: TransactionStatusSender, transaction_status_sender: TransactionStatusSender,
) { ) {
let slot = bank.slot(); let slot = bank.slot();
if let Err(e) = transaction_status_sender.send(TransactionStatusBatch { let (inner_instructions, transaction_logs) =
bank, if !transaction_status_sender.enable_cpi_and_log_storage {
transactions: transactions.to_vec(), (None, None)
iteration_order, } else {
statuses, (Some(inner_instructions), Some(transaction_logs))
balances, };
token_balances, if let Err(e) = transaction_status_sender
inner_instructions, .sender
transaction_logs, .send(TransactionStatusBatch {
}) { bank,
transactions: transactions.to_vec(),
iteration_order,
statuses,
balances,
token_balances,
inner_instructions,
transaction_logs,
})
{
trace!( trace!(
"Slot {} transaction_status send batch failed: {:?}", "Slot {} transaction_status send batch failed: {:?}",
slot, slot,

View File

@ -48,6 +48,9 @@ while [[ -n $1 ]]; do
elif [[ $1 = --enable-rpc-transaction-history ]]; then elif [[ $1 = --enable-rpc-transaction-history ]]; then
args+=("$1") args+=("$1")
shift shift
elif [[ $1 = --enable-cpi-and-log-storage ]]; then
args+=("$1")
shift
elif [[ $1 = --enable-rpc-bigtable-ledger-storage ]]; then elif [[ $1 = --enable-rpc-bigtable-ledger-storage ]]; then
args+=("$1") args+=("$1")
shift shift

View File

@ -130,6 +130,9 @@ while [[ -n $1 ]]; do
elif [[ $1 = --enable-rpc-transaction-history ]]; then elif [[ $1 = --enable-rpc-transaction-history ]]; then
args+=("$1") args+=("$1")
shift shift
elif [[ $1 = --enable-cpi-and-log-storage ]]; then
args+=("$1")
shift
elif [[ $1 = --skip-poh-verify ]]; then elif [[ $1 = --skip-poh-verify ]]; then
args+=("$1") args+=("$1")
shift shift

1
run.sh
View File

@ -102,6 +102,7 @@ args=(
--log - --log -
--enable-rpc-exit --enable-rpc-exit
--enable-rpc-transaction-history --enable-rpc-transaction-history
--enable-cpi-and-log-storage
--init-complete-file "$dataDir"/init-completed --init-complete-file "$dataDir"/init-completed
--snapshot-compression none --snapshot-compression none
--require-tower --require-tower

View File

@ -376,6 +376,7 @@ fn main() {
.rpc_config(JsonRpcConfig { .rpc_config(JsonRpcConfig {
enable_validator_exit: true, enable_validator_exit: true,
enable_rpc_transaction_history: true, enable_rpc_transaction_history: true,
enable_cpi_and_log_storage: false,
faucet_addr, faucet_addr,
..JsonRpcConfig::default() ..JsonRpcConfig::default()
}) })

View File

@ -957,6 +957,14 @@ pub fn main() {
.takes_value(false) .takes_value(false)
.help("Upload new confirmed blocks into a BigTable instance"), .help("Upload new confirmed blocks into a BigTable instance"),
) )
.arg(
Arg::with_name("enable_cpi_and_log_storage")
.long("enable-cpi-and-log-storage")
.requires("enable_rpc_transaction_history")
.takes_value(false)
.help("Include CPI inner instructions and logs in the \
historical transaction info stored"),
)
.arg( .arg(
Arg::with_name("rpc_max_multiple_accounts") Arg::with_name("rpc_max_multiple_accounts")
.long("rpc-max-multiple-accounts") .long("rpc-max-multiple-accounts")
@ -1533,6 +1541,7 @@ pub fn main() {
enable_validator_exit: matches.is_present("enable_rpc_exit"), enable_validator_exit: matches.is_present("enable_rpc_exit"),
enable_set_log_filter: matches.is_present("enable_rpc_set_log_filter"), enable_set_log_filter: matches.is_present("enable_rpc_set_log_filter"),
enable_rpc_transaction_history: matches.is_present("enable_rpc_transaction_history"), enable_rpc_transaction_history: matches.is_present("enable_rpc_transaction_history"),
enable_cpi_and_log_storage: matches.is_present("enable_cpi_and_log_storage"),
enable_bigtable_ledger_storage: matches enable_bigtable_ledger_storage: matches
.is_present("enable_rpc_bigtable_ledger_storage"), .is_present("enable_rpc_bigtable_ledger_storage"),
enable_bigtable_ledger_upload: matches.is_present("enable_bigtable_ledger_upload"), enable_bigtable_ledger_upload: matches.is_present("enable_bigtable_ledger_upload"),