Blockstore address signatures: handle slots that cross primary indexes, and refactor get_confirmed_signatures_for_address2 (#11497)

* Freeze address-signature index in the middle of slot to show failure case

* Secondary filter on signature

* Use AddressSignatures iterator instead of manually decrementing slots

* Remove unused method

* Add metrics

* Add transaction-status-index doccumentation
This commit is contained in:
Tyera Eulberg 2020-08-10 10:27:38 -06:00 committed by GitHub
parent 685e456eff
commit de5fb3ba0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 142 additions and 138 deletions

View File

@ -1671,6 +1671,10 @@ impl Blockstore {
.collect()
}
/// Initializes the TransactionStatusIndex column family with two records, `0` and `1`,
/// which are used as the primary index for entries in the TransactionStatus and
/// AddressSignatures columns. At any given time, one primary index is active (ie. new records
/// are stored under this index), the other is frozen.
fn initialize_transaction_status_index(&self) -> Result<()> {
self.transaction_status_index_cf
.put(0, &TransactionStatusIndexMeta::default())?;
@ -1687,6 +1691,8 @@ impl Blockstore {
)
}
/// Toggles the active primary index between `0` and `1`, and clears the stored max-slot of the
/// frozen index in preparation for pruning.
fn toggle_transaction_status_index(
&self,
batch: &mut WriteBatch,
@ -1902,34 +1908,10 @@ impl Blockstore {
}
}
}
signatures.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
signatures.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap().then(a.1.cmp(&b.1)));
Ok(signatures)
}
fn get_lowest_slot_for_address(&self, address: Pubkey) -> Result<Option<Slot>> {
let mut lowest_slot = None;
for transaction_status_cf_primary_index in 0..=1 {
let mut index_iterator = self.address_signatures_cf.iter(IteratorMode::From(
(
transaction_status_cf_primary_index,
address,
0,
Signature::default(),
),
IteratorDirection::Forward,
))?;
if let Some(((i, key_address, slot, _), _)) = index_iterator.next() {
if i == transaction_status_cf_primary_index
&& key_address == address
&& slot < lowest_slot.unwrap_or(Slot::MAX)
{
lowest_slot = Some(slot);
}
}
}
Ok(lowest_slot)
}
pub fn get_confirmed_signatures_for_address(
&self,
pubkey: Pubkey,
@ -1967,7 +1949,8 @@ impl Blockstore {
// Figure the `slot` to start listing signatures at, based on the ledger location of the
// `before` signature if present. Also generate a HashSet of signatures that should
// be excluded from the results.
let (mut slot, mut excluded_signatures) = match before {
let mut get_before_slot_timer = Measure::start("get_before_slot_timer");
let (slot, mut excluded_signatures) = match before {
None => (highest_confirmed_root, None),
Some(before) => {
let transaction_status = self.get_transaction_status(before)?;
@ -1999,9 +1982,10 @@ impl Blockstore {
.collect();
// Sort signatures as a way to entire a stable ordering within a slot, as
// `self.find_address_signatures()` is ordered by signatures ordered and
// the AddressSignatures column is ordered by signatures within a slot,
// not by block ordering
slot_signatures.sort();
slot_signatures.reverse();
if let Some(pos) = slot_signatures.iter().position(|&x| x == before) {
slot_signatures.truncate(pos + 1);
@ -2015,41 +1999,96 @@ impl Blockstore {
}
}
};
get_before_slot_timer.stop();
// Fetch the list of signatures that affect the given address
let first_available_block = self.get_first_available_block()?;
let first_address_slot = self.get_lowest_slot_for_address(address)?;
if first_address_slot.is_none() {
return Ok(vec![]);
}
let lower_bound = cmp::max(first_available_block, first_address_slot.unwrap());
let mut address_signatures = vec![];
loop {
if address_signatures.len() >= limit {
address_signatures.truncate(limit);
break;
}
let mut signatures = self.find_address_signatures(address, slot, slot)?;
if let Some(excluded_signatures) = excluded_signatures.take() {
address_signatures.extend(
signatures
.into_iter()
.filter(|(_, signature)| !excluded_signatures.contains(&signature)),
)
} else {
address_signatures.append(&mut signatures);
}
excluded_signatures = None;
if slot == lower_bound {
break;
}
slot -= 1;
// Get signatures in `slot`
let mut get_initial_slot_timer = Measure::start("get_initial_slot_timer");
let mut signatures = self.find_address_signatures(address, slot, slot)?;
signatures.reverse();
if let Some(excluded_signatures) = excluded_signatures.take() {
address_signatures.extend(
signatures
.into_iter()
.filter(|(_, signature)| !excluded_signatures.contains(&signature)),
)
} else {
address_signatures.append(&mut signatures);
}
get_initial_slot_timer.stop();
// Check the active_transaction_status_index to see if it contains slot. If so, start with
// that index, as it will contain higher slots
let starting_primary_index = *self.active_transaction_status_index.read().unwrap();
let next_primary_index = if starting_primary_index == 0 { 1 } else { 0 };
let next_max_slot = self
.transaction_status_index_cf
.get(next_primary_index)?
.unwrap()
.max_slot;
let mut starting_primary_index_iter_timer = Measure::start("starting_primary_index_iter");
if slot > next_max_slot {
let mut starting_iterator = self.address_signatures_cf.iter(IteratorMode::From(
(starting_primary_index, address, slot, Signature::default()),
IteratorDirection::Reverse,
))?;
// Iterate through starting_iterator until limit is reached
while address_signatures.len() < limit {
if let Some(((i, key_address, slot, signature), _)) = starting_iterator.next() {
if slot == next_max_slot {
break;
}
if i == starting_primary_index
&& key_address == address
&& slot >= first_available_block
{
address_signatures.push((slot, signature));
continue;
}
}
break;
}
// Handle slots that cross primary indexes
let mut signatures =
self.find_address_signatures(address, next_max_slot, next_max_slot)?;
signatures.reverse();
address_signatures.append(&mut signatures);
}
starting_primary_index_iter_timer.stop();
// Iterate through next_iterator until limit is reached
let mut next_primary_index_iter_timer = Measure::start("next_primary_index_iter_timer");
let mut next_iterator = self.address_signatures_cf.iter(IteratorMode::From(
(next_primary_index, address, slot, Signature::default()),
IteratorDirection::Reverse,
))?;
while address_signatures.len() < limit {
if let Some(((i, key_address, slot, signature), _)) = next_iterator.next() {
// Skip next_max_slot, which is already included
if slot == next_max_slot {
continue;
}
if i == next_primary_index
&& key_address == address
&& slot >= first_available_block
{
address_signatures.push((slot, signature));
continue;
}
}
break;
}
next_primary_index_iter_timer.stop();
address_signatures.truncate(limit);
// Fill in the status information for each found transaction
let mut get_status_info_timer = Measure::start("get_status_info_timer");
let mut infos = vec![];
for (slot, signature) in address_signatures.into_iter() {
let transaction_status = self.get_transaction_status(signature)?;
@ -2064,6 +2103,36 @@ impl Blockstore {
memo: None,
});
}
get_status_info_timer.stop();
datapoint_info!(
"blockstore-get-conf-sigs-for-addr-2",
(
"get_before_slot_us",
get_before_slot_timer.as_us() as i64,
i64
),
(
"get_initial_slot_us",
get_initial_slot_timer.as_us() as i64,
i64
),
(
"starting_primary_index_iter_us",
starting_primary_index_iter_timer.as_us() as i64,
i64
),
(
"next_primary_index_iter_us",
next_primary_index_iter_timer.as_us() as i64,
i64
),
(
"get_status_info_us",
get_status_info_timer.as_us() as i64,
i64
)
);
Ok(infos)
}
@ -6242,82 +6311,6 @@ pub mod tests {
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_get_lowest_slot_for_address() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let address = Pubkey::new_rand();
let address2 = Pubkey::new_rand();
let slot = 5;
// Add an additional to record to ensure that existent or lower slots in entries for
// other addresses do not affect return
blockstore
.address_signatures_cf
.put(
(0, address2, slot, Signature::default()),
&AddressSignatureMeta { writeable: false },
)
.unwrap();
assert_eq!(
blockstore.get_lowest_slot_for_address(address).unwrap(),
None
);
let slot = 200;
blockstore
.address_signatures_cf
.put(
(0, address, slot, Signature::default()),
&AddressSignatureMeta { writeable: false },
)
.unwrap();
assert_eq!(
blockstore.get_lowest_slot_for_address(address).unwrap(),
Some(200)
);
blockstore
.address_signatures_cf
.put(
(1, address, slot, Signature::default()),
&AddressSignatureMeta { writeable: false },
)
.unwrap();
assert_eq!(
blockstore.get_lowest_slot_for_address(address).unwrap(),
Some(200)
);
let slot = 300;
blockstore
.address_signatures_cf
.put(
(1, address, slot, Signature::default()),
&AddressSignatureMeta { writeable: false },
)
.unwrap();
assert_eq!(
blockstore.get_lowest_slot_for_address(address).unwrap(),
Some(200)
);
let slot = 100;
blockstore
.address_signatures_cf
.put(
(1, address, slot, Signature::default()),
&AddressSignatureMeta { writeable: false },
)
.unwrap();
assert_eq!(
blockstore.get_lowest_slot_for_address(address).unwrap(),
Some(100)
);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_get_confirmed_signatures_for_address2() {
let blockstore_path = get_tmp_ledger_path!();
@ -6344,14 +6337,18 @@ pub mod tests {
let address0 = Pubkey::new_rand();
let address1 = Pubkey::new_rand();
for slot in 2..=4 {
for slot in 2..=7 {
let entries = make_slot_entries_with_transaction_addresses(&[
address0, address1, address0, address1,
]);
let shreds = entries_to_test_shreds(entries.clone(), slot, slot - 1, true, 0);
blockstore.insert_shreds(shreds, None, false).unwrap();
for entry in &entries {
for (i, entry) in entries.iter().enumerate() {
if slot == 4 && i == 2 {
// Purge to freeze index 0 and write address-signatures in new primary index
blockstore.run_purge(0, 1, PurgeType::PrimaryIndex).unwrap();
}
for transaction in &entry.transactions {
assert_eq!(transaction.signatures.len(), 1);
blockstore
@ -6366,8 +6363,8 @@ pub mod tests {
}
}
}
blockstore.set_roots(&[1, 2, 3, 4]).unwrap();
let highest_confirmed_root = 4;
blockstore.set_roots(&[1, 2, 3, 4, 5, 6, 7]).unwrap();
let highest_confirmed_root = 7;
// Fetch all signatures for address 0 at once...
let all0 = blockstore
@ -6378,7 +6375,7 @@ pub mod tests {
usize::MAX,
)
.unwrap();
assert_eq!(all0.len(), 6);
assert_eq!(all0.len(), 12);
// Fetch all signatures for address 1 at once...
let all1 = blockstore
@ -6389,7 +6386,7 @@ pub mod tests {
usize::MAX,
)
.unwrap();
assert_eq!(all1.len(), 6);
assert_eq!(all1.len(), 12);
assert!(all0 != all1);
@ -6442,7 +6439,7 @@ pub mod tests {
assert_eq!(results[2], all0[i + 2]);
}
// Ensure that the signatures within a slot are ordered by signature
// Ensure that the signatures within a slot are reverse ordered by signature
// (current limitation of the .get_confirmed_signatures_for_address2())
for i in (0..all1.len()).step_by(2) {
let results = blockstore
@ -6459,7 +6456,7 @@ pub mod tests {
.unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0].slot, results[1].slot);
assert!(results[0].signature <= results[1].signature);
assert!(results[0].signature >= results[1].signature);
assert_eq!(results[0], all1[i]);
assert_eq!(results[1], all1[i + 1]);
}

View File

@ -235,6 +235,9 @@ impl Blockstore {
Ok(result)
}
/// Purges special columns (using a non-Slot primary-index) exactly, by deserializing each slot
/// being purged and iterating through all transactions to determine the keys of individual
/// records. **This method is very slow.**
fn purge_special_columns_exact(
&self,
batch: &mut WriteBatch,
@ -279,6 +282,8 @@ impl Blockstore {
Ok(())
}
/// Purges special columns (using a non-Slot primary-index) by range. Purge occurs if frozen
/// primary index has a max-slot less than the highest slot being purged.
fn purge_special_columns_with_primary_index(
&self,
write_batch: &mut WriteBatch,

View File

@ -40,7 +40,9 @@ const CODE_SHRED_CF: &str = "code_shred";
const TRANSACTION_STATUS_CF: &str = "transaction_status";
/// Column family for Address Signatures
const ADDRESS_SIGNATURES_CF: &str = "address_signatures";
/// Column family for Transaction Status Index
/// Column family for the Transaction Status Index.
/// This column family is used for tracking the active primary index for columns that for
/// query performance reasons should not be indexed by Slot.
const TRANSACTION_STATUS_INDEX_CF: &str = "transaction_status_index";
/// Column family for Rewards
const REWARDS_CF: &str = "rewards";