Legitimately map transactions to statuses in blocktree (#7011)

* Refactor rocksdb TransactionStatus to store/return struct; hook up map_transactions_to_statuses

* Cleanup use statements
This commit is contained in:
Tyera Eulberg 2019-11-18 09:12:42 -07:00 committed by GitHub
parent 3bc8d78801
commit e0a2bb9d86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 111 additions and 28 deletions

View File

@ -26,7 +26,13 @@ pub struct Response<T> {
pub struct RpcConfirmedBlock {
pub previous_blockhash: Hash,
pub blockhash: Hash,
pub transactions: Vec<(Transaction, Result<()>)>,
pub transactions: Vec<(Transaction, Option<RpcTransactionStatus>)>,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct RpcTransactionStatus {
pub status: Result<()>,
pub fee: u64,
}
#[derive(Serialize, Deserialize, Clone, Debug)]

View File

@ -304,8 +304,8 @@ impl JsonRpcRequestProcessor {
// The `get_confirmed_block` method is not fully implemented. It currenlty returns a partially
// complete RpcConfirmedBlock. The `blockhash` and `previous_blockhash` fields are legitimate
// data, while the `transactions` field contains transaction tuples (Transaction,
// transaction::Result), where the Transaction is a legitimate transaction, but the Result is
// always `Ok()`.
// transaction::Result), where the Transaction is a legitimate transaction, but the
// Option<RpcTransactionStatus> is always None.
pub fn get_confirmed_block(&self, slot: Slot) -> Result<Option<RpcConfirmedBlock>> {
Ok(self.blocktree.get_confirmed_block(slot).ok())
}

View File

@ -22,7 +22,7 @@ use rayon::{
ThreadPool,
};
use rocksdb::DBRawIterator;
use solana_client::rpc_request::RpcConfirmedBlock;
use solana_client::rpc_request::{RpcConfirmedBlock, RpcTransactionStatus};
use solana_measure::measure::Measure;
use solana_metrics::{datapoint_debug, datapoint_error};
use solana_rayon_threadlimit::get_thread_count;
@ -32,7 +32,7 @@ use solana_sdk::{
hash::Hash,
signature::{Keypair, KeypairUtil},
timing::timestamp,
transaction::{self, Transaction},
transaction::Transaction,
};
use std::{
cell::RefCell,
@ -1143,7 +1143,7 @@ impl Blocktree {
.expect("Rooted parent slot must have blockhash"),
blockhash: get_last_hash(slot_entries.iter())
.expect("Rooted slot must have blockhash"),
transactions: self.map_transactions_to_statuses(slot_transaction_iterator),
transactions: self.map_transactions_to_statuses(slot, slot_transaction_iterator),
};
Ok(block)
} else {
@ -1151,13 +1151,22 @@ impl Blocktree {
}
}
// The `map_transactions_to_statuses` method is not fully implemented (depends on persistent
// status store). It currently returns status Ok(()) for all transactions.
fn map_transactions_to_statuses<'a>(
&self,
slot: Slot,
iterator: impl Iterator<Item = Transaction> + 'a,
) -> Vec<(Transaction, transaction::Result<()>)> {
iterator.map(|transaction| (transaction, Ok(()))).collect()
) -> Vec<(Transaction, Option<RpcTransactionStatus>)> {
iterator
.map(|transaction| {
let signature = transaction.signatures[0];
(
transaction,
self.transaction_status_cf
.get((slot, signature))
.expect("Expect database get to succeed"),
)
})
.collect()
}
/// Returns the entry vector for the slot starting with `shred_start_index`
@ -4095,17 +4104,36 @@ pub mod tests {
ledger.insert_shreds(shreds, None, false).unwrap();
ledger.set_roots(&[0]).unwrap();
let confirmed_block = ledger.get_confirmed_block(0).unwrap();
assert_eq!(confirmed_block.transactions.len(), 100);
let expected_transactions: Vec<(Transaction, transaction::Result<()>)> = entries
let expected_transactions: Vec<(Transaction, Option<RpcTransactionStatus>)> = entries
.iter()
.cloned()
.filter(|entry| !entry.is_tick())
.flat_map(|entry| entry.transactions)
.map(|transaction| (transaction, Ok(())))
.map(|transaction| {
let signature = transaction.signatures[0];
ledger
.transaction_status_cf
.put(
(slot, signature),
&RpcTransactionStatus {
status: Ok(()),
fee: 42,
},
)
.unwrap();
(
transaction,
Some(RpcTransactionStatus {
status: Ok(()),
fee: 42,
}),
)
})
.collect();
let confirmed_block = ledger.get_confirmed_block(0).unwrap();
assert_eq!(confirmed_block.transactions.len(), 100);
let mut expected_block = RpcConfirmedBlock::default();
expected_block.transactions = expected_transactions;
// The blockhash and previous_blockhash of `expected_block` are default only because
@ -4136,17 +4164,17 @@ pub mod tests {
assert!(transaction_status_cf
.put(
(0, Signature::default()),
&(
solana_sdk::transaction::Result::<()>::Err(
&RpcTransactionStatus {
status: solana_sdk::transaction::Result::<()>::Err(
TransactionError::AccountNotFound
),
5u64
)
fee: 5u64
},
)
.is_ok());
// result found
let (status, fee) = transaction_status_cf
let RpcTransactionStatus { status, fee } = transaction_status_cf
.get((0, Signature::default()))
.unwrap()
.unwrap();
@ -4157,12 +4185,15 @@ pub mod tests {
assert!(transaction_status_cf
.put(
(9, Signature::default()),
&(solana_sdk::transaction::Result::<()>::Ok(()), 9u64)
&RpcTransactionStatus {
status: solana_sdk::transaction::Result::<()>::Ok(()),
fee: 9u64
},
)
.is_ok());
// result found
let (status, fee) = transaction_status_cf
let RpcTransactionStatus { status, fee } = transaction_status_cf
.get((9, Signature::default()))
.unwrap()
.unwrap();
@ -4189,4 +4220,53 @@ pub mod tests {
let entries_iterator = entries.iter();
assert_eq!(get_last_hash(entries_iterator).unwrap(), entries[9].hash);
}
#[test]
fn test_map_transactions_to_statuses() {
let blocktree_path = get_tmp_ledger_path!();
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let transaction_status_cf = blocktree.db.column::<cf::TransactionStatus>();
let slot = 0;
let mut transactions: Vec<Transaction> = vec![];
for x in 0..4 {
let transaction = Transaction::new_with_compiled_instructions(
&[&Keypair::new()],
&[Pubkey::new_rand()],
Hash::default(),
vec![Pubkey::new_rand()],
vec![CompiledInstruction::new(1, &(), vec![0])],
);
transaction_status_cf
.put(
(slot, transaction.signatures[0]),
&RpcTransactionStatus {
status: solana_sdk::transaction::Result::<()>::Err(
TransactionError::AccountNotFound,
),
fee: x,
},
)
.unwrap();
transactions.push(transaction);
}
// Push transaction that will not have matching status, as a test case
transactions.push(Transaction::new_with_compiled_instructions(
&[&Keypair::new()],
&[Pubkey::new_rand()],
Hash::default(),
vec![Pubkey::new_rand()],
vec![CompiledInstruction::new(1, &(), vec![0])],
));
let map = blocktree.map_transactions_to_statuses(slot, transactions.into_iter());
assert_eq!(map.len(), 5);
for x in 0..4 {
assert_eq!(map[x].1.as_ref().unwrap().fee, x as u64);
}
assert_eq!(map[4].1.as_ref(), None);
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
}

View File

@ -9,12 +9,9 @@ use rocksdb::{
};
use serde::de::DeserializeOwned;
use serde::Serialize;
use solana_client::rpc_request::RpcTransactionStatus;
use solana_sdk::{clock::Slot, signature::Signature};
use std::collections::HashMap;
use std::fs;
use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;
use std::{collections::HashMap, fs, marker::PhantomData, path::Path, sync::Arc};
// A good value for this is the number of cores on the machine
const TOTAL_THREADS: i32 = 8;
@ -261,7 +258,7 @@ pub trait TypedColumn: Column {
}
impl TypedColumn for columns::TransactionStatus {
type Type = (solana_sdk::transaction::Result<()>, u64);
type Type = RpcTransactionStatus;
}
impl Column for columns::TransactionStatus {