Store program logs in blockstore / bigtable (TransactionWithStatusMeta) (#12678)

* introduce store program logs in blockstore / bigtable

* fix test, transaction logs created for successful transactions

* fix test for legacy bincode implementation around log_messages

* only api nodes should record logs

* truncate transaction logs to 100KB

* refactor log truncate for improved coverage
This commit is contained in:
Josh 2020-10-08 12:06:15 -07:00 committed by GitHub
parent 9629baa0c7
commit 8f5431551e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 196 additions and 23 deletions

View File

@ -534,13 +534,14 @@ impl BankingStage {
mut loaded_accounts,
results,
inner_instructions,
transaction_logs,
mut retryable_txs,
tx_count,
signature_count,
) = bank.load_and_execute_transactions(
batch,
MAX_PROCESSING_AGE,
None,
transaction_status_sender.is_some(),
transaction_status_sender.is_some(),
);
load_execute_time.stop();
@ -580,6 +581,7 @@ impl BankingStage {
tx_results.processing_results,
TransactionBalancesSet::new(pre_balances, post_balances),
inner_instructions,
transaction_logs,
sender,
);
}

View File

@ -56,6 +56,7 @@ impl TransactionStatusService {
statuses,
balances,
inner_instructions,
transaction_logs,
} = write_transaction_status_receiver.recv_timeout(Duration::from_secs(1))?;
let slot = bank.slot();
@ -65,12 +66,14 @@ impl TransactionStatusService {
pre_balances,
post_balances,
inner_instructions,
log_messages,
) in izip!(
OrderedIterator::new(&transactions, iteration_order.as_deref()),
statuses,
balances.pre_balances,
balances.post_balances,
inner_instructions
inner_instructions,
transaction_logs
) {
if Bank::can_commit(&status) && !transaction.signatures.is_empty() {
let fee_calculator = match hash_age_kind {
@ -96,6 +99,8 @@ impl TransactionStatusService {
.collect()
});
let log_messages = Some(log_messages);
blockstore
.write_transaction_status(
slot,
@ -108,6 +113,7 @@ impl TransactionStatusService {
pre_balances,
post_balances,
inner_instructions,
log_messages,
},
)
.expect("Expect database write to succeed");

View File

@ -5686,6 +5686,7 @@ pub mod tests {
pre_balances: pre_balances.clone(),
post_balances: post_balances.clone(),
inner_instructions: Some(vec![]),
log_messages: Some(vec![]),
},
)
.unwrap();
@ -5699,6 +5700,7 @@ pub mod tests {
pre_balances: pre_balances.clone(),
post_balances: post_balances.clone(),
inner_instructions: Some(vec![]),
log_messages: Some(vec![]),
},
)
.unwrap();
@ -5710,6 +5712,7 @@ pub mod tests {
pre_balances,
post_balances,
inner_instructions: Some(vec![]),
log_messages: Some(vec![]),
}),
}
})
@ -6006,6 +6009,7 @@ pub mod tests {
index: 0,
instructions: vec![CompiledInstruction::new(1, &(), vec![0])],
}];
let log_messages_vec = vec![String::from("Test message\n")];
// result not found
assert!(transaction_status_cf
@ -6025,6 +6029,7 @@ pub mod tests {
pre_balances: pre_balances_vec.clone(),
post_balances: post_balances_vec.clone(),
inner_instructions: Some(inner_instructions_vec.clone()),
log_messages: Some(log_messages_vec.clone()),
},
)
.is_ok());
@ -6036,6 +6041,7 @@ pub mod tests {
pre_balances,
post_balances,
inner_instructions,
log_messages,
} = transaction_status_cf
.get((0, Signature::default(), 0))
.unwrap()
@ -6045,6 +6051,7 @@ pub mod tests {
assert_eq!(pre_balances, pre_balances_vec);
assert_eq!(post_balances, post_balances_vec);
assert_eq!(inner_instructions.unwrap(), inner_instructions_vec);
assert_eq!(log_messages.unwrap(), log_messages_vec);
// insert value
assert!(transaction_status_cf
@ -6056,6 +6063,7 @@ pub mod tests {
pre_balances: pre_balances_vec.clone(),
post_balances: post_balances_vec.clone(),
inner_instructions: Some(inner_instructions_vec.clone()),
log_messages: Some(log_messages_vec.clone()),
},
)
.is_ok());
@ -6067,6 +6075,7 @@ pub mod tests {
pre_balances,
post_balances,
inner_instructions,
log_messages,
} = transaction_status_cf
.get((0, Signature::new(&[2u8; 64]), 9))
.unwrap()
@ -6078,6 +6087,7 @@ pub mod tests {
assert_eq!(pre_balances, pre_balances_vec);
assert_eq!(post_balances, post_balances_vec);
assert_eq!(inner_instructions.unwrap(), inner_instructions_vec);
assert_eq!(log_messages.unwrap(), log_messages_vec);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
@ -6305,6 +6315,7 @@ pub mod tests {
pre_balances: pre_balances_vec,
post_balances: post_balances_vec,
inner_instructions: Some(vec![]),
log_messages: Some(vec![]),
};
let signature1 = Signature::new(&[1u8; 64]);
@ -6438,6 +6449,7 @@ pub mod tests {
index: 0,
instructions: vec![CompiledInstruction::new(1, &(), vec![0])],
}]);
let log_messages = Some(vec![String::from("Test message\n")]);
let signature = transaction.signatures[0];
blockstore
.transaction_status_cf
@ -6449,6 +6461,7 @@ pub mod tests {
pre_balances: pre_balances.clone(),
post_balances: post_balances.clone(),
inner_instructions: inner_instructions.clone(),
log_messages: log_messages.clone(),
},
)
.unwrap();
@ -6460,6 +6473,7 @@ pub mod tests {
pre_balances,
post_balances,
inner_instructions,
log_messages,
}),
}
})
@ -6900,6 +6914,7 @@ pub mod tests {
pre_balances: vec![],
post_balances: vec![],
inner_instructions: Some(vec![]),
log_messages: Some(vec![]),
},
)
.unwrap();

View File

@ -16,8 +16,8 @@ use solana_metrics::{datapoint_error, inc_new_counter_debug};
use solana_rayon_threadlimit::get_thread_count;
use solana_runtime::{
bank::{
Bank, Builtins, InnerInstructionsList, TransactionBalancesSet, TransactionProcessResult,
TransactionResults,
Bank, Builtins, InnerInstructionsList, TransactionBalancesSet, TransactionLogMessages,
TransactionProcessResult, TransactionResults,
},
bank_forks::BankForks,
bank_utils,
@ -103,12 +103,13 @@ fn execute_batch(
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
) -> Result<()> {
let (tx_results, balances, inner_instructions) =
let (tx_results, balances, inner_instructions, transaction_logs) =
batch.bank().load_execute_and_commit_transactions(
batch,
MAX_PROCESSING_AGE,
transaction_status_sender.is_some(),
transaction_status_sender.is_some(),
transaction_status_sender.is_some(),
);
bank_utils::find_and_send_votes(batch.transactions(), &tx_results, replay_vote_sender);
@ -127,6 +128,7 @@ fn execute_batch(
processing_results,
balances,
inner_instructions,
transaction_logs,
sender,
);
}
@ -1031,6 +1033,7 @@ pub struct TransactionStatusBatch {
pub statuses: Vec<TransactionProcessResult>,
pub balances: TransactionBalancesSet,
pub inner_instructions: Vec<Option<InnerInstructionsList>>,
pub transaction_logs: Vec<TransactionLogMessages>,
}
pub type TransactionStatusSender = Sender<TransactionStatusBatch>;
@ -1042,6 +1045,7 @@ pub fn send_transaction_status_batch(
statuses: Vec<TransactionProcessResult>,
balances: TransactionBalancesSet,
inner_instructions: Vec<Option<InnerInstructionsList>>,
transaction_logs: Vec<TransactionLogMessages>,
transaction_status_sender: TransactionStatusSender,
) {
let slot = bank.slot();
@ -1052,6 +1056,7 @@ pub fn send_transaction_status_batch(
statuses,
balances,
inner_instructions,
transaction_logs,
}) {
trace!(
"Slot {} transaction_status send batch failed: {:?}",
@ -2891,11 +2896,13 @@ pub mod tests {
},
_balances,
_inner_instructions,
_log_messages,
) = batch.bank().load_execute_and_commit_transactions(
&batch,
MAX_PROCESSING_AGE,
false,
false,
false,
);
let (err, signature) = get_first_error(&batch, fee_collection_results).unwrap();
// First error found should be for the 2nd transaction, due to iteration_order

View File

@ -105,8 +105,8 @@ fn process_transaction_and_record_inner(
let signature = tx.signatures.get(0).unwrap().clone();
let txs = vec![tx];
let tx_batch = bank.prepare_batch(&txs, None);
let (mut results, _, mut inner) =
bank.load_execute_and_commit_transactions(&tx_batch, MAX_PROCESSING_AGE, false, true);
let (mut results, _, mut inner, _transaction_logs) =
bank.load_execute_and_commit_transactions(&tx_batch, MAX_PROCESSING_AGE, false, true, false);
let inner_instructions = inner.swap_remove(0);
let result = results
.fee_collection_results

View File

@ -106,6 +106,8 @@ pub const SECONDS_PER_YEAR: f64 = 365.25 * 24.0 * 60.0 * 60.0;
pub const MAX_LEADER_SCHEDULE_STAKES: Epoch = 5;
pub const TRANSACTION_LOG_MESSAGES_BYTES_LIMIT: usize = 100 * 1000;
type BankStatusCache = StatusCache<Result<()>>;
#[frozen_abi(digest = "EEFPLdPhngiBojqEnDMkoEGjyYYHNWPHnenRf8b9diqd")]
pub type BankSlotDelta = SlotDelta<Result<()>>;
@ -389,6 +391,9 @@ pub type InnerInstructions = Vec<CompiledInstruction>;
/// A list of instructions that were invoked during each instruction of a transaction
pub type InnerInstructionsList = Vec<InnerInstructions>;
/// A list of log messages emitted during a transaction
pub type TransactionLogMessages = Vec<String>;
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum HashAgeKind {
Extant,
@ -1698,22 +1703,22 @@ impl Bank {
let txs = &[transaction];
let batch = self.prepare_simulation_batch(txs);
let log_collector = Rc::new(LogCollector::default());
let (
_loaded_accounts,
executed,
_inner_instructions,
transaction_logs,
_retryable_transactions,
_transaction_count,
_signature_count,
) = self.load_and_execute_transactions(
&batch,
MAX_PROCESSING_AGE,
Some(log_collector.clone()),
false,
);
) = self.load_and_execute_transactions(&batch, MAX_PROCESSING_AGE, false, true);
let transaction_result = executed[0].0.clone().map(|_| ());
let log_messages = Rc::try_unwrap(log_collector).unwrap_or_default().into();
let log_messages = transaction_logs
.get(0)
.map_or(vec![], |messages| messages.to_vec());
(transaction_result, log_messages)
}
@ -2112,17 +2117,34 @@ impl Bank {
cache.remove(pubkey);
}
pub fn truncate_log_messages(
log_messages: &mut TransactionLogMessages,
max_bytes: usize,
truncate_message: String,
) {
let mut size = 0;
for (i, line) in log_messages.iter().enumerate() {
size += line.len();
if size > max_bytes {
log_messages.truncate(i);
log_messages.push(truncate_message);
return;
}
}
}
#[allow(clippy::type_complexity)]
pub fn load_and_execute_transactions(
&self,
batch: &TransactionBatch,
max_age: usize,
log_collector: Option<Rc<LogCollector>>,
enable_cpi_recording: bool,
enable_log_recording: bool,
) -> (
Vec<(Result<TransactionLoadResult>, Option<HashAgeKind>)>,
Vec<TransactionProcessResult>,
Vec<Option<InnerInstructionsList>>,
Vec<TransactionLogMessages>,
Vec<usize>,
u64,
u64,
@ -2165,6 +2187,8 @@ impl Bank {
let mut signature_count: u64 = 0;
let mut inner_instructions: Vec<Option<InnerInstructionsList>> =
Vec::with_capacity(txs.len());
let mut transaction_logs: Vec<TransactionLogMessages> = Vec::with_capacity(txs.len());
let executed: Vec<TransactionProcessResult> = loaded_accounts
.iter_mut()
.zip(OrderedIterator::new(txs, batch.iteration_order()))
@ -2187,6 +2211,12 @@ impl Bank {
None
};
let log_collector = if enable_log_recording {
Some(Rc::new(LogCollector::default()))
} else {
None
};
let process_result = self.message_processor.process_message(
tx.message(),
&loader_refcells,
@ -2198,6 +2228,21 @@ impl Bank {
self.feature_set.clone(),
);
if enable_log_recording {
let mut log_messages: TransactionLogMessages =
Rc::try_unwrap(log_collector.unwrap_or_default())
.unwrap_or_default()
.into();
Self::truncate_log_messages(
&mut log_messages,
TRANSACTION_LOG_MESSAGES_BYTES_LIMIT,
String::from("<< Transaction log truncated to 100KB >>\n"),
);
transaction_logs.push(log_messages);
}
Self::compile_recorded_instructions(
&mut inner_instructions,
instruction_recorders,
@ -2262,6 +2307,7 @@ impl Bank {
loaded_accounts,
executed,
inner_instructions,
transaction_logs,
retryable_txs,
tx_count,
signature_count,
@ -2936,18 +2982,33 @@ impl Bank {
max_age: usize,
collect_balances: bool,
enable_cpi_recording: bool,
enable_log_recording: bool,
) -> (
TransactionResults,
TransactionBalancesSet,
Vec<Option<InnerInstructionsList>>,
Vec<TransactionLogMessages>,
) {
let pre_balances = if collect_balances {
self.collect_balances(batch)
} else {
vec![]
};
let (mut loaded_accounts, executed, inner_instructions, _, tx_count, signature_count) =
self.load_and_execute_transactions(batch, max_age, None, enable_cpi_recording);
let (
mut loaded_accounts,
executed,
inner_instructions,
transaction_logs,
_,
tx_count,
signature_count,
) = self.load_and_execute_transactions(
batch,
max_age,
enable_cpi_recording,
enable_log_recording,
);
let results = self.commit_transactions(
batch.transactions(),
@ -2966,13 +3027,14 @@ impl Bank {
results,
TransactionBalancesSet::new(pre_balances, post_balances),
inner_instructions,
transaction_logs,
)
}
#[must_use]
pub fn process_transactions(&self, txs: &[Transaction]) -> Vec<Result<()>> {
let batch = self.prepare_batch(txs, None);
self.load_execute_and_commit_transactions(&batch, MAX_PROCESSING_AGE, false, false)
self.load_execute_and_commit_transactions(&batch, MAX_PROCESSING_AGE, false, false, false)
.0
.fee_collection_results
}
@ -6350,7 +6412,13 @@ mod tests {
let lock_result = bank.prepare_batch(&pay_alice, None);
let results_alice = bank
.load_execute_and_commit_transactions(&lock_result, MAX_PROCESSING_AGE, false, false)
.load_execute_and_commit_transactions(
&lock_result,
MAX_PROCESSING_AGE,
false,
false,
false,
)
.0
.fee_collection_results;
assert_eq!(results_alice[0], Ok(()));
@ -8121,10 +8189,18 @@ mod tests {
let txs = vec![tx0, tx1, tx2];
let lock_result = bank0.prepare_batch(&txs, None);
let (transaction_results, transaction_balances_set, inner_instructions) = bank0
.load_execute_and_commit_transactions(&lock_result, MAX_PROCESSING_AGE, true, false);
let (transaction_results, transaction_balances_set, inner_instructions, transaction_logs) =
bank0.load_execute_and_commit_transactions(
&lock_result,
MAX_PROCESSING_AGE,
true,
false,
false,
);
assert!(inner_instructions[0].iter().all(|ix| ix.is_empty()));
assert_eq!(transaction_logs.len(), 0);
assert_eq!(transaction_balances_set.pre_balances.len(), 3);
assert_eq!(transaction_balances_set.post_balances.len(), 3);
@ -9396,4 +9472,53 @@ mod tests {
let new_bank1_hash = bank1.hash();
assert_eq!(old_hash, new_bank1_hash);
}
#[test]
fn test_truncate_log_messages() {
let mut messages = vec![
String::from("This is line one\n"),
String::from("This is line two\n"),
String::from("This is line three\n"),
];
// messages under limit
Bank::truncate_log_messages(
&mut messages,
10000,
String::from("<< Transaction log truncated to 10,000 bytes >>\n"),
);
assert_eq!(messages.len(), 3);
// messages truncated to two lines
let maxsize = messages.get(0).unwrap().len() + messages.get(1).unwrap().len();
Bank::truncate_log_messages(
&mut messages,
maxsize,
String::from("<< Transaction log truncated >>\n"),
);
assert_eq!(messages.len(), 3);
assert_eq!(
messages.get(2).unwrap(),
"<< Transaction log truncated >>\n"
);
// messages truncated to one line
let mut messages = vec![
String::from("Line 1\n"),
String::from("Line 2\n"),
String::from("Line 3\n"),
];
let maxsize = messages.get(0).unwrap().len() + 4;
Bank::truncate_log_messages(
&mut messages,
maxsize,
String::from("<< Transaction log truncated >>\n"),
);
assert_eq!(messages.len(), 2);
assert_eq!(
messages.get(1).unwrap(),
"<< Transaction log truncated >>\n"
);
}
}

View File

@ -10,6 +10,7 @@ impl LogCollector {
self.messages.borrow_mut().push(message.to_string())
}
}
impl Into<Vec<String>> for LogCollector {
fn into(self) -> Vec<String> {
self.messages.into_inner()

View File

@ -59,6 +59,8 @@ pub struct TransactionStatusMeta {
pub post_balances: ::std::vec::Vec<u64>,
#[prost(message, repeated, tag = "5")]
pub inner_instructions: ::std::vec::Vec<InnerInstructions>,
#[prost(message, repeated, tag = "6")]
pub log_messages: ::std::vec::Vec<String>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TransactionError {

View File

@ -654,6 +654,7 @@ mod tests {
pre_balances: vec![43, 0, 1],
post_balances: vec![0, 42, 1],
inner_instructions: Some(vec![]),
log_messages: Some(vec![]),
}),
};
let block = ConfirmedBlock {
@ -701,7 +702,8 @@ mod tests {
if let CellData::Bincode(bincode_block) = deserialized {
let mut block = block;
if let Some(meta) = &mut block.transactions[0].meta {
meta.inner_instructions = None; // Legacy bincode implementation does not suport inner_instructions
meta.inner_instructions = None; // Legacy bincode implementation does not support inner_instructions
meta.log_messages = None; // Legacy bincode implementation does not support log_messages
}
assert_eq!(block, bincode_block.into());
} else {

View File

@ -201,6 +201,7 @@ impl From<TransactionStatusMeta> for generated::TransactionStatusMeta {
pre_balances,
post_balances,
inner_instructions,
log_messages,
} = value;
let err = match status {
Ok(()) => None,
@ -213,12 +214,14 @@ impl From<TransactionStatusMeta> for generated::TransactionStatusMeta {
.into_iter()
.map(|ii| ii.into())
.collect();
let log_messages = log_messages.unwrap_or_default();
Self {
err,
fee,
pre_balances,
post_balances,
inner_instructions,
log_messages,
}
}
}
@ -233,6 +236,7 @@ impl TryFrom<generated::TransactionStatusMeta> for TransactionStatusMeta {
pre_balances,
post_balances,
inner_instructions,
log_messages,
} = value;
let status = match &err {
None => Ok(()),
@ -244,12 +248,14 @@ impl TryFrom<generated::TransactionStatusMeta> for TransactionStatusMeta {
.map(|inner| inner.into())
.collect(),
);
let log_messages = Some(log_messages);
Ok(Self {
status,
fee,
pre_balances,
post_balances,
inner_instructions,
log_messages,
})
}
}

View File

@ -184,6 +184,7 @@ impl From<StoredConfirmedBlockTransactionStatusMeta> for TransactionStatusMeta {
pre_balances,
post_balances,
inner_instructions: None,
log_messages: None,
}
}
}

View File

@ -145,6 +145,8 @@ pub struct TransactionStatusMeta {
pub post_balances: Vec<u64>,
#[serde(deserialize_with = "default_on_eof")]
pub inner_instructions: Option<Vec<InnerInstructions>>,
#[serde(deserialize_with = "default_on_eof")]
pub log_messages: Option<Vec<String>>,
}
impl Default for TransactionStatusMeta {
@ -155,6 +157,7 @@ impl Default for TransactionStatusMeta {
pre_balances: vec![],
post_balances: vec![],
inner_instructions: None,
log_messages: None,
}
}
}
@ -169,6 +172,7 @@ pub struct UiTransactionStatusMeta {
pub pre_balances: Vec<u64>,
pub post_balances: Vec<u64>,
pub inner_instructions: Option<Vec<UiInnerInstructions>>,
pub log_messages: Option<Vec<String>>,
}
impl UiTransactionStatusMeta {
@ -184,6 +188,7 @@ impl UiTransactionStatusMeta {
.map(|ix| UiInnerInstructions::parse(ix, message))
.collect()
}),
log_messages: meta.log_messages,
}
}
}
@ -199,6 +204,7 @@ impl From<TransactionStatusMeta> for UiTransactionStatusMeta {
inner_instructions: meta
.inner_instructions
.map(|ixs| ixs.into_iter().map(|ix| ix.into()).collect()),
log_messages: meta.log_messages,
}
}
}