Rework TransactionStatus index in blockstore (#9281)

automerge
This commit is contained in:
Tyera Eulberg 2020-04-04 21:24:06 -06:00 committed by GitHub
parent 36ab7e0600
commit 49e2cc6593
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 485 additions and 43 deletions

View File

@ -72,7 +72,7 @@ impl TransactionStatusService {
let fee = fee_calculator.calculate_fee(transaction.message()); let fee = fee_calculator.calculate_fee(transaction.message());
blockstore blockstore
.write_transaction_status( .write_transaction_status(
(slot, transaction.signatures[0]), (transaction.signatures[0], slot),
&TransactionStatusMeta { &TransactionStatusMeta {
status, status,
fee, fee,

View File

@ -146,7 +146,7 @@ fn output_slot(
println!(" Data: {:?}", instruction.data); println!(" Data: {:?}", instruction.data);
} }
} }
match blockstore.read_transaction_status((slot, transaction.signatures[0])) { match blockstore.read_transaction_status((transaction.signatures[0], slot)) {
Ok(transaction_status) => { Ok(transaction_status) => {
if let Some(transaction_status) = transaction_status { if let Some(transaction_status) = transaction_status {
println!( println!(

View File

@ -87,6 +87,8 @@ pub struct Blockstore {
data_shred_cf: LedgerColumn<cf::ShredData>, data_shred_cf: LedgerColumn<cf::ShredData>,
code_shred_cf: LedgerColumn<cf::ShredCode>, code_shred_cf: LedgerColumn<cf::ShredCode>,
transaction_status_cf: LedgerColumn<cf::TransactionStatus>, transaction_status_cf: LedgerColumn<cf::TransactionStatus>,
transaction_status_index_cf: LedgerColumn<cf::TransactionStatusIndex>,
active_transaction_status_index: RwLock<u64>,
rewards_cf: LedgerColumn<cf::Rewards>, rewards_cf: LedgerColumn<cf::Rewards>,
last_root: Arc<RwLock<Slot>>, last_root: Arc<RwLock<Slot>>,
insert_shreds_lock: Arc<Mutex<()>>, insert_shreds_lock: Arc<Mutex<()>>,
@ -200,6 +202,7 @@ impl Blockstore {
let data_shred_cf = db.column(); let data_shred_cf = db.column();
let code_shred_cf = db.column(); let code_shred_cf = db.column();
let transaction_status_cf = db.column(); let transaction_status_cf = db.column();
let transaction_status_index_cf = db.column();
let rewards_cf = db.column(); let rewards_cf = db.column();
let db = Arc::new(db); let db = Arc::new(db);
@ -212,6 +215,20 @@ impl Blockstore {
.unwrap_or(0); .unwrap_or(0);
let last_root = Arc::new(RwLock::new(max_root)); let last_root = Arc::new(RwLock::new(max_root));
// Get active transaction-status index or 0
let active_transaction_status_index = db
.iter::<cf::TransactionStatusIndex>(IteratorMode::Start)?
.next()
.and_then(|(_, data)| {
let index0: TransactionStatusIndexMeta = deserialize(&data).unwrap();
if index0.frozen {
Some(1)
} else {
None
}
})
.unwrap_or(0);
measure.stop(); measure.stop();
info!("{:?} {}", blockstore_path, measure); info!("{:?} {}", blockstore_path, measure);
let blockstore = Blockstore { let blockstore = Blockstore {
@ -225,6 +242,8 @@ impl Blockstore {
data_shred_cf, data_shred_cf,
code_shred_cf, code_shred_cf,
transaction_status_cf, transaction_status_cf,
transaction_status_index_cf,
active_transaction_status_index: RwLock::new(active_transaction_status_index),
rewards_cf, rewards_cf,
new_shreds_signals: vec![], new_shreds_signals: vec![],
completed_slots_senders: vec![], completed_slots_senders: vec![],
@ -321,7 +340,7 @@ impl Blockstore {
.expect("Database Error: Failed to get write batch"); .expect("Database Error: Failed to get write batch");
// delete range cf is not inclusive // delete range cf is not inclusive
let to_slot = to_slot.checked_add(1).unwrap_or_else(|| std::u64::MAX); let to_slot = to_slot.checked_add(1).unwrap_or_else(|| std::u64::MAX);
let columns_empty = self let mut columns_empty = self
.db .db
.delete_range_cf::<cf::SlotMeta>(&mut write_batch, from_slot, to_slot) .delete_range_cf::<cf::SlotMeta>(&mut write_batch, from_slot, to_slot)
.unwrap_or(false) .unwrap_or(false)
@ -357,14 +376,23 @@ impl Blockstore {
.db .db
.delete_range_cf::<cf::Index>(&mut write_batch, from_slot, to_slot) .delete_range_cf::<cf::Index>(&mut write_batch, from_slot, to_slot)
.unwrap_or(false) .unwrap_or(false)
& self
.db
.delete_range_cf::<cf::TransactionStatus>(&mut write_batch, from_slot, to_slot)
.unwrap_or(false)
& self & self
.db .db
.delete_range_cf::<cf::Rewards>(&mut write_batch, from_slot, to_slot) .delete_range_cf::<cf::Rewards>(&mut write_batch, from_slot, to_slot)
.unwrap_or(false); .unwrap_or(false);
let mut w_active_transaction_status_index =
self.active_transaction_status_index.write().unwrap();
if let Some(index) = self.toggle_transaction_status_index(
&mut write_batch,
&mut w_active_transaction_status_index,
to_slot,
)? {
columns_empty &= &self
.db
.delete_range_cf::<cf::TransactionStatus>(&mut write_batch, index, index + 1)
.unwrap_or(false);
}
let mut write_timer = Measure::start("write_batch");
if let Err(e) = self.db.write(write_batch) { if let Err(e) = self.db.write(write_batch) {
error!( error!(
"Error: {:?} while submitting write batch for slot {:?} retrying...", "Error: {:?} while submitting write batch for slot {:?} retrying...",
@ -372,6 +400,11 @@ impl Blockstore {
); );
return Err(e); return Err(e);
} }
write_timer.stop();
datapoint_info!(
"blockstore-purge",
("write_batch_ns", write_timer.as_us() as i64, i64)
);
Ok(columns_empty) Ok(columns_empty)
} }
@ -415,7 +448,7 @@ impl Blockstore {
.unwrap_or(false) .unwrap_or(false)
&& self && self
.transaction_status_cf .transaction_status_cf
.compact_range(from_slot, to_slot) .compact_range(0, 2)
.unwrap_or(false) .unwrap_or(false)
&& self && self
.rewards_cf .rewards_cf
@ -1498,8 +1531,7 @@ impl Blockstore {
TransactionWithStatusMeta { TransactionWithStatusMeta {
transaction: encoded_transaction, transaction: encoded_transaction,
meta: self meta: self
.transaction_status_cf .read_transaction_status((signature, slot))
.get((slot, signature))
.expect("Expect database get to succeed") .expect("Expect database get to succeed")
.map(RpcTransactionStatusMeta::from), .map(RpcTransactionStatusMeta::from),
} }
@ -1507,18 +1539,108 @@ impl Blockstore {
.collect() .collect()
} }
fn initialize_transaction_status_index(&self) -> Result<()> {
self.transaction_status_index_cf
.put(0, &TransactionStatusIndexMeta::default())?;
self.transaction_status_index_cf
.put(1, &TransactionStatusIndexMeta::default())?;
// This dummy status improves compaction performance
self.transaction_status_cf.put(
(2, Signature::default(), 0),
&TransactionStatusMeta::default(),
)
}
fn toggle_transaction_status_index(
&self,
batch: &mut WriteBatch,
w_active_transaction_status_index: &mut u64,
to_slot: Slot,
) -> Result<Option<u64>> {
let index0 = self.transaction_status_index_cf.get(0)?;
if index0.is_none() {
return Ok(None);
}
let mut index0 = index0.unwrap();
let mut index1 = self.transaction_status_index_cf.get(1)?.unwrap();
if !index0.frozen && !index1.frozen {
index0.frozen = true;
*w_active_transaction_status_index = 1;
batch.put::<cf::TransactionStatusIndex>(0, &index0)?;
Ok(None)
} else {
let result = if index0.frozen && to_slot > index0.max_slot {
debug!("Pruning transaction index 0 at slot {}", index0.max_slot);
Some(0)
} else if index1.frozen && to_slot > index1.max_slot {
debug!("Pruning transaction index 1 at slot {}", index1.max_slot);
Some(1)
} else {
None
};
if result.is_some() {
*w_active_transaction_status_index = if index0.frozen { 0 } else { 1 };
if index0.frozen {
index0.max_slot = 0
};
index0.frozen = !index0.frozen;
batch.put::<cf::TransactionStatusIndex>(0, &index0)?;
if index1.frozen {
index1.max_slot = 0
};
index1.frozen = !index1.frozen;
batch.put::<cf::TransactionStatusIndex>(1, &index1)?;
}
Ok(result)
}
}
fn make_transaction_status_index(
&self,
index: (Signature, Slot),
w_active_transaction_status_index: &mut u64,
) -> Result<(u64, Signature, Slot)> {
let (signature, slot) = index;
if self.transaction_status_index_cf.get(0)?.is_none() {
self.initialize_transaction_status_index()?;
}
let i = *w_active_transaction_status_index;
let mut index_meta = self.transaction_status_index_cf.get(i)?.unwrap();
if slot > index_meta.max_slot {
assert!(!index_meta.frozen);
index_meta.max_slot = slot;
self.transaction_status_index_cf.put(i, &index_meta)?;
}
Ok((i, signature, slot))
}
pub fn read_transaction_status( pub fn read_transaction_status(
&self, &self,
index: (Slot, Signature), index: (Signature, Slot),
) -> Result<Option<TransactionStatusMeta>> { ) -> Result<Option<TransactionStatusMeta>> {
self.transaction_status_cf.get(index) let (signature, slot) = index;
let result = self.transaction_status_cf.get((0, signature, slot))?;
if result.is_none() {
Ok(self.transaction_status_cf.get((1, signature, slot))?)
} else {
Ok(result)
}
} }
pub fn write_transaction_status( pub fn write_transaction_status(
&self, &self,
index: (Slot, Signature), index: (Signature, Slot),
status: &TransactionStatusMeta, status: &TransactionStatusMeta,
) -> Result<()> { ) -> Result<()> {
// This write lock prevents interleaving issues with the transactions_status_index_cf by
// gating writes to that column
let mut w_active_transaction_status_index =
self.active_transaction_status_index.write().unwrap();
let index =
self.make_transaction_status_index(index, &mut w_active_transaction_status_index)?;
self.transaction_status_cf.put(index, status) self.transaction_status_cf.put(index, status)
} }
@ -2648,7 +2770,7 @@ pub mod tests {
.iter::<cf::TransactionStatus>(IteratorMode::Start) .iter::<cf::TransactionStatus>(IteratorMode::Start)
.unwrap() .unwrap()
.next() .next()
.map(|((slot, _), _)| slot >= min_slot) .map(|((_, _, slot), _)| slot >= min_slot)
.unwrap_or(true) .unwrap_or(true)
& blockstore & blockstore
.db .db
@ -4865,7 +4987,7 @@ pub mod tests {
ledger ledger
.transaction_status_cf .transaction_status_cf
.put( .put(
(slot, signature), (0, signature, slot),
&TransactionStatusMeta { &TransactionStatusMeta {
status: Ok(()), status: Ok(()),
fee: 42, fee: 42,
@ -4877,7 +4999,7 @@ pub mod tests {
ledger ledger
.transaction_status_cf .transaction_status_cf
.put( .put(
(slot + 1, signature), (0, signature, slot + 1),
&TransactionStatusMeta { &TransactionStatusMeta {
status: Ok(()), status: Ok(()),
fee: 42, fee: 42,
@ -5146,14 +5268,14 @@ pub mod tests {
// result not found // result not found
assert!(transaction_status_cf assert!(transaction_status_cf
.get((0, Signature::default())) .get((0, Signature::default(), 0))
.unwrap() .unwrap()
.is_none()); .is_none());
// insert value // insert value
assert!(transaction_status_cf assert!(transaction_status_cf
.put( .put(
(0, Signature::default()), (0, Signature::default(), 0),
&TransactionStatusMeta { &TransactionStatusMeta {
status: solana_sdk::transaction::Result::<()>::Err( status: solana_sdk::transaction::Result::<()>::Err(
TransactionError::AccountNotFound TransactionError::AccountNotFound
@ -5172,7 +5294,7 @@ pub mod tests {
pre_balances, pre_balances,
post_balances, post_balances,
} = transaction_status_cf } = transaction_status_cf
.get((0, Signature::default())) .get((0, Signature::default(), 0))
.unwrap() .unwrap()
.unwrap(); .unwrap();
assert_eq!(status, Err(TransactionError::AccountNotFound)); assert_eq!(status, Err(TransactionError::AccountNotFound));
@ -5183,7 +5305,7 @@ pub mod tests {
// insert value // insert value
assert!(transaction_status_cf assert!(transaction_status_cf
.put( .put(
(9, Signature::default()), (0, Signature::new(&[2u8; 64]), 9),
&TransactionStatusMeta { &TransactionStatusMeta {
status: solana_sdk::transaction::Result::<()>::Ok(()), status: solana_sdk::transaction::Result::<()>::Ok(()),
fee: 9u64, fee: 9u64,
@ -5200,7 +5322,7 @@ pub mod tests {
pre_balances, pre_balances,
post_balances, post_balances,
} = transaction_status_cf } = transaction_status_cf
.get((9, Signature::default())) .get((0, Signature::new(&[2u8; 64]), 9))
.unwrap() .unwrap()
.unwrap(); .unwrap();
@ -5213,6 +5335,267 @@ pub mod tests {
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
} }
#[test]
fn test_transaction_status_index() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let transaction_status_index_cf = blockstore.db.column::<cf::TransactionStatusIndex>();
let slot0 = 10;
assert!(transaction_status_index_cf.get(0).unwrap().is_none());
assert!(transaction_status_index_cf.get(1).unwrap().is_none());
for _ in 0..5 {
let random_bytes: Vec<u8> = (0..64).map(|_| rand::random::<u8>()).collect();
blockstore
.write_transaction_status(
(Signature::new(&random_bytes), slot0),
&TransactionStatusMeta::default(),
)
.unwrap();
}
// New statuses bump index 0 max_slot
assert_eq!(
transaction_status_index_cf.get(0).unwrap().unwrap(),
TransactionStatusIndexMeta {
max_slot: slot0,
frozen: false,
}
);
assert_eq!(
transaction_status_index_cf.get(1).unwrap().unwrap(),
TransactionStatusIndexMeta::default()
);
let first_status_entry = blockstore
.db
.iter::<cf::TransactionStatus>(IteratorMode::From(
(0, Signature::default(), 0),
IteratorDirection::Forward,
))
.unwrap()
.next()
.unwrap()
.0;
assert_eq!(first_status_entry.0, 0);
assert_eq!(first_status_entry.2, slot0);
blockstore.run_purge(0, 8).unwrap();
// First successful prune freezes index 0
assert_eq!(
transaction_status_index_cf.get(0).unwrap().unwrap(),
TransactionStatusIndexMeta {
max_slot: slot0,
frozen: true,
}
);
assert_eq!(
transaction_status_index_cf.get(1).unwrap().unwrap(),
TransactionStatusIndexMeta::default()
);
let slot1 = 20;
for _ in 0..5 {
let random_bytes: Vec<u8> = (0..64).map(|_| rand::random::<u8>()).collect();
blockstore
.write_transaction_status(
(Signature::new(&random_bytes), slot1),
&TransactionStatusMeta::default(),
)
.unwrap();
}
assert_eq!(
transaction_status_index_cf.get(0).unwrap().unwrap(),
TransactionStatusIndexMeta {
max_slot: slot0,
frozen: true,
}
);
// Index 0 is frozen, so new statuses bump index 1 max_slot
assert_eq!(
transaction_status_index_cf.get(1).unwrap().unwrap(),
TransactionStatusIndexMeta {
max_slot: slot1,
frozen: false,
}
);
// Index 0 statuses still exist
let first_status_entry = blockstore
.db
.iter::<cf::TransactionStatus>(IteratorMode::From(
(0, Signature::default(), 0),
IteratorDirection::Forward,
))
.unwrap()
.next()
.unwrap()
.0;
assert_eq!(first_status_entry.0, 0);
assert_eq!(first_status_entry.2, 10);
// New statuses are stored in index 1
let index1_first_status_entry = blockstore
.db
.iter::<cf::TransactionStatus>(IteratorMode::From(
(1, Signature::default(), 0),
IteratorDirection::Forward,
))
.unwrap()
.next()
.unwrap()
.0;
assert_eq!(index1_first_status_entry.0, 1);
assert_eq!(index1_first_status_entry.2, slot1);
blockstore.run_purge(0, 18).unwrap();
// Successful prune toggles TransactionStatusIndex
assert_eq!(
transaction_status_index_cf.get(0).unwrap().unwrap(),
TransactionStatusIndexMeta {
max_slot: 0,
frozen: false,
}
);
assert_eq!(
transaction_status_index_cf.get(1).unwrap().unwrap(),
TransactionStatusIndexMeta {
max_slot: slot1,
frozen: true,
}
);
// Index 0 has been pruned, so first status entry is now index 1
let first_status_entry = blockstore
.db
.iter::<cf::TransactionStatus>(IteratorMode::From(
(0, Signature::default(), 0),
IteratorDirection::Forward,
))
.unwrap()
.next()
.unwrap()
.0;
assert_eq!(first_status_entry.0, 1);
assert_eq!(first_status_entry.2, slot1);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_purge_transaction_status() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let transaction_status_index_cf = blockstore.db.column::<cf::TransactionStatusIndex>();
let slot = 10;
for _ in 0..5 {
let random_bytes: Vec<u8> = (0..64).map(|_| rand::random::<u8>()).collect();
blockstore
.write_transaction_status(
(Signature::new(&random_bytes), slot),
&TransactionStatusMeta::default(),
)
.unwrap();
}
// Purge to freeze index 0
blockstore.run_purge(0, 1).unwrap();
let mut status_entry_iterator = blockstore
.db
.iter::<cf::TransactionStatus>(IteratorMode::From(
(0, Signature::default(), 0),
IteratorDirection::Forward,
))
.unwrap();
for _ in 0..5 {
let entry = status_entry_iterator.next().unwrap().0;
assert_eq!(entry.0, 0);
assert_eq!(entry.2, slot);
}
assert_eq!(
transaction_status_index_cf.get(0).unwrap().unwrap(),
TransactionStatusIndexMeta {
max_slot: 10,
frozen: true,
}
);
// Low purge should not affect state
blockstore.run_purge(0, 5).unwrap();
let mut status_entry_iterator = blockstore
.db
.iter::<cf::TransactionStatus>(IteratorMode::From(
(0, Signature::default(), 0),
IteratorDirection::Forward,
))
.unwrap();
for _ in 0..5 {
let entry = status_entry_iterator.next().unwrap().0;
assert_eq!(entry.0, 0);
assert_eq!(entry.2, slot);
}
assert_eq!(
transaction_status_index_cf.get(0).unwrap().unwrap(),
TransactionStatusIndexMeta {
max_slot: 10,
frozen: true,
}
);
// Test boundary conditions: < slot should not purge statuses; <= slot should
blockstore.run_purge(0, 9).unwrap();
let mut status_entry_iterator = blockstore
.db
.iter::<cf::TransactionStatus>(IteratorMode::From(
(0, Signature::default(), 0),
IteratorDirection::Forward,
))
.unwrap();
for _ in 0..5 {
let entry = status_entry_iterator.next().unwrap().0;
assert_eq!(entry.0, 0);
assert_eq!(entry.2, slot);
}
assert_eq!(
transaction_status_index_cf.get(0).unwrap().unwrap(),
TransactionStatusIndexMeta {
max_slot: 10,
frozen: true,
}
);
blockstore.run_purge(0, 10).unwrap();
let mut status_entry_iterator = blockstore
.db
.iter::<cf::TransactionStatus>(IteratorMode::From(
(0, Signature::default(), 0),
IteratorDirection::Forward,
))
.unwrap();
let padding_entry = status_entry_iterator.next().unwrap().0;
assert_eq!(padding_entry.0, 2);
assert_eq!(padding_entry.2, 0);
assert!(status_entry_iterator.next().is_none());
assert_eq!(
transaction_status_index_cf.get(0).unwrap().unwrap(),
TransactionStatusIndexMeta {
max_slot: 0,
frozen: false,
}
);
assert_eq!(
transaction_status_index_cf.get(1).unwrap().unwrap(),
TransactionStatusIndexMeta {
max_slot: 0,
frozen: true,
}
);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test] #[test]
fn test_get_last_hash() { fn test_get_last_hash() {
let mut entries: Vec<Entry> = vec![]; let mut entries: Vec<Entry> = vec![];
@ -5248,7 +5631,7 @@ pub mod tests {
); );
transaction_status_cf transaction_status_cf
.put( .put(
(slot, transaction.signatures[0]), (0, transaction.signatures[0], slot),
&TransactionStatusMeta { &TransactionStatusMeta {
status: solana_sdk::transaction::Result::<()>::Err( status: solana_sdk::transaction::Result::<()>::Err(
TransactionError::AccountNotFound, TransactionError::AccountNotFound,

View File

@ -38,6 +38,8 @@ const DATA_SHRED_CF: &str = "data_shred";
const CODE_SHRED_CF: &str = "code_shred"; const CODE_SHRED_CF: &str = "code_shred";
/// Column family for Transaction Status /// Column family for Transaction Status
const TRANSACTION_STATUS_CF: &str = "transaction_status"; const TRANSACTION_STATUS_CF: &str = "transaction_status";
/// Column family for Transaction Status
const TRANSACTION_STATUS_INDEX_CF: &str = "transaction_status_index";
/// Column family for Rewards /// Column family for Rewards
const REWARDS_CF: &str = "rewards"; const REWARDS_CF: &str = "rewards";
@ -108,6 +110,10 @@ pub mod columns {
/// The transaction status column /// The transaction status column
pub struct TransactionStatus; pub struct TransactionStatus;
#[derive(Debug)]
/// The transaction status index column
pub struct TransactionStatusIndex;
#[derive(Debug)] #[derive(Debug)]
/// The rewards column /// The rewards column
pub struct Rewards; pub struct Rewards;
@ -120,7 +126,7 @@ impl Rocks {
fn open(path: &Path) -> Result<Rocks> { fn open(path: &Path) -> Result<Rocks> {
use columns::{ use columns::{
DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Rewards, Root, ShredCode, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Rewards, Root, ShredCode,
ShredData, SlotMeta, TransactionStatus, ShredData, SlotMeta, TransactionStatus, TransactionStatusIndex,
}; };
fs::create_dir_all(&path)?; fs::create_dir_all(&path)?;
@ -145,6 +151,8 @@ impl Rocks {
ColumnFamilyDescriptor::new(ShredCode::NAME, get_cf_options()); ColumnFamilyDescriptor::new(ShredCode::NAME, get_cf_options());
let transaction_status_cf_descriptor = let transaction_status_cf_descriptor =
ColumnFamilyDescriptor::new(TransactionStatus::NAME, get_cf_options()); ColumnFamilyDescriptor::new(TransactionStatus::NAME, get_cf_options());
let transaction_status_index_cf_descriptor =
ColumnFamilyDescriptor::new(TransactionStatusIndex::NAME, get_cf_options());
let rewards_cf_descriptor = ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options()); let rewards_cf_descriptor = ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options());
let cfs = vec![ let cfs = vec![
@ -158,6 +166,7 @@ impl Rocks {
shred_data_cf_descriptor, shred_data_cf_descriptor,
shred_code_cf_descriptor, shred_code_cf_descriptor,
transaction_status_cf_descriptor, transaction_status_cf_descriptor,
transaction_status_index_cf_descriptor,
rewards_cf_descriptor, rewards_cf_descriptor,
]; ];
@ -170,7 +179,7 @@ impl Rocks {
fn columns(&self) -> Vec<&'static str> { fn columns(&self) -> Vec<&'static str> {
use columns::{ use columns::{
DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Rewards, Root, ShredCode, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Rewards, Root, ShredCode,
ShredData, SlotMeta, TransactionStatus, ShredData, SlotMeta, TransactionStatus, TransactionStatusIndex,
}; };
vec![ vec![
@ -184,6 +193,7 @@ impl Rocks {
ShredData::NAME, ShredData::NAME,
ShredCode::NAME, ShredCode::NAME,
TransactionStatus::NAME, TransactionStatus::NAME,
TransactionStatusIndex::NAME,
Rewards::NAME, Rewards::NAME,
] ]
} }
@ -256,7 +266,7 @@ pub trait Column {
fn key(index: Self::Index) -> Vec<u8>; fn key(index: Self::Index) -> Vec<u8>;
fn index(key: &[u8]) -> Self::Index; fn index(key: &[u8]) -> Self::Index;
fn slot(index: Self::Index) -> Slot; fn primary_index(index: Self::Index) -> Slot;
fn as_index(slot: Slot) -> Self::Index; fn as_index(slot: Slot) -> Self::Index;
} }
@ -272,6 +282,10 @@ impl TypedColumn for columns::TransactionStatus {
type Type = TransactionStatusMeta; type Type = TransactionStatusMeta;
} }
impl TypedColumn for columns::TransactionStatusIndex {
type Type = blockstore_meta::TransactionStatusIndexMeta;
}
pub trait SlotColumn<Index = u64> {} pub trait SlotColumn<Index = u64> {}
impl<T: SlotColumn> Column for T { impl<T: SlotColumn> Column for T {
@ -287,7 +301,7 @@ impl<T: SlotColumn> Column for T {
BigEndian::read_u64(&key[..8]) BigEndian::read_u64(&key[..8])
} }
fn slot(index: u64) -> Slot { fn primary_index(index: u64) -> Slot {
index index
} }
@ -297,27 +311,29 @@ impl<T: SlotColumn> Column for T {
} }
impl Column for columns::TransactionStatus { impl Column for columns::TransactionStatus {
type Index = (Slot, Signature); type Index = (u64, Signature, Slot);
fn key((slot, index): (Slot, Signature)) -> Vec<u8> { fn key((index, signature, slot): (u64, Signature, Slot)) -> Vec<u8> {
let mut key = vec![0; 8 + 64]; let mut key = vec![0; 8 + 8 + 64];
BigEndian::write_u64(&mut key[..8], slot); BigEndian::write_u64(&mut key[0..8], index);
key[8..72].clone_from_slice(&index.as_ref()[0..64]); key[8..72].clone_from_slice(&signature.as_ref()[0..64]);
BigEndian::write_u64(&mut key[72..80], slot);
key key
} }
fn index(key: &[u8]) -> (Slot, Signature) { fn index(key: &[u8]) -> (u64, Signature, Slot) {
let slot = BigEndian::read_u64(&key[..8]); let index = BigEndian::read_u64(&key[0..8]);
let index = Signature::new(&key[8..72]); let signature = Signature::new(&key[8..72]);
(slot, index) let slot = BigEndian::read_u64(&key[72..80]);
(index, signature, slot)
} }
fn slot(index: Self::Index) -> Slot { fn primary_index(index: Self::Index) -> u64 {
index.0 index.0
} }
fn as_index(slot: Slot) -> Self::Index { fn as_index(index: u64) -> Self::Index {
(slot, Signature::default()) (index, Signature::default(), 0)
} }
} }
@ -325,6 +341,32 @@ impl ColumnName for columns::TransactionStatus {
const NAME: &'static str = TRANSACTION_STATUS_CF; const NAME: &'static str = TRANSACTION_STATUS_CF;
} }
impl Column for columns::TransactionStatusIndex {
type Index = u64;
fn key(index: u64) -> Vec<u8> {
let mut key = vec![0; 8];
BigEndian::write_u64(&mut key[..], index);
key
}
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn primary_index(index: u64) -> u64 {
index
}
fn as_index(slot: u64) -> u64 {
slot
}
}
impl ColumnName for columns::TransactionStatusIndex {
const NAME: &'static str = TRANSACTION_STATUS_INDEX_CF;
}
impl SlotColumn for columns::Rewards {} impl SlotColumn for columns::Rewards {}
impl ColumnName for columns::Rewards { impl ColumnName for columns::Rewards {
const NAME: &'static str = REWARDS_CF; const NAME: &'static str = REWARDS_CF;
@ -344,7 +386,7 @@ impl Column for columns::ShredCode {
columns::ShredData::index(key) columns::ShredData::index(key)
} }
fn slot(index: Self::Index) -> Slot { fn primary_index(index: Self::Index) -> Slot {
index.0 index.0
} }
@ -373,7 +415,7 @@ impl Column for columns::ShredData {
(slot, index) (slot, index)
} }
fn slot(index: Self::Index) -> Slot { fn primary_index(index: Self::Index) -> Slot {
index.0 index.0
} }
@ -451,7 +493,7 @@ impl Column for columns::ErasureMeta {
key key
} }
fn slot(index: Self::Index) -> Slot { fn primary_index(index: Self::Index) -> Slot {
index.0 index.0
} }
@ -583,7 +625,7 @@ impl Database {
let max_slot = self let max_slot = self
.iter::<C>(IteratorMode::End)? .iter::<C>(IteratorMode::End)?
.next() .next()
.map(|(i, _)| C::slot(i)) .map(|(i, _)| C::primary_index(i))
.unwrap_or(0); .unwrap_or(0);
let end = max_slot <= to; let end = max_slot <= to;
result.map(|_| end) result.map(|_| end)
@ -624,7 +666,7 @@ where
let iter = self.iter(iter_config)?; let iter = self.iter(iter_config)?;
for (index, _) in iter { for (index, _) in iter {
if let Some(to) = to { if let Some(to) = to {
if C::slot(index) > to { if C::primary_index(index) > to {
end = false; end = false;
break; break;
} }

View File

@ -222,6 +222,12 @@ impl DuplicateSlotProof {
} }
} }
#[derive(Debug, Default, Deserialize, Serialize, PartialEq)]
pub struct TransactionStatusIndexMeta {
pub max_slot: Slot,
pub frozen: bool,
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;

View File

@ -26,6 +26,17 @@ pub struct TransactionStatusMeta {
pub post_balances: Vec<u64>, pub post_balances: Vec<u64>,
} }
impl Default for TransactionStatusMeta {
fn default() -> Self {
Self {
status: Ok(()),
fee: 0,
pre_balances: vec![],
post_balances: vec![],
}
}
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct RpcTransactionStatusMeta { pub struct RpcTransactionStatusMeta {