Remove primary index from Blockstore special-column keys (#33419)

* Add helper trait for column key deprecation

* Add WriteBatch::delete_raw

* Add ProtobufColumn::get_raw_protobuf_or_bincode

* Add ColumnIndexDeprecation iterator methods

* Impl ColumnIndexDeprecation for TransactionStatus (doesn't build)

* Update TransactionStatus put

* Update TransactionStatus purge_exact

* Fix read_transaction_status

* Fix get_transaction_status_with_counter

* Fix test_all_empty_or_min (builds except tests)

* Fix test_get_rooted_block

* Fix test_persist_transaction_status

* Fix test_get_transaction_status

* Fix test_get_rooted_transaction

* Fix test_get_complete_transaction

* Fix test_lowest_cleanup_slot_and_special_cfs

* Fix test_map_transactions_to_statuses

* Fix test_transaction_status_protobuf_backward_compatability

* Fix test_special_columns_empty

* Delete test_transaction_status_index

* Delete test_purge_transaction_status

* Ignore some tests until both special columns are dealt with (all build)

* Impl ColumnIndexDeprecation for AddressSignatures (doesn't build)

* Add BlockstoreError variant

* Update AddressSignatures put

* Remove unneeded active_transaction_status_index column lock

* Update AddressSignatures purge_exact

* Fix find_address_signatures_for_slot methods

* Fix get_block_signatures methods

* Fix get_confirmed_signatures_for_address2

* Remove unused method

* Fix test_all_empty_or_min moar (builds except tests)

* Fix tests (all build)

* Fix test_get_confirmed_signatures_for_address

* Fix test_lowest_cleanup_slot_and_special_cfs moar

* Unignore tests (builds except tests)

* Fix test_purge_transaction_status_exact

* Fix test_purge_front_of_ledger

* Fix test_purge_special_columns_compaction_filter (all build)

* Move some test-harness stuff around

* Add test cases for purge_special_columns_with_old_data

* Add test_read_transaction_status_with_old_data

* Add test_get_transaction_status_with_old_data

* Review comments

* Move rev of block-signatures into helper

* Improve deprecated_key impls

* iter_filtered -> iter_current_index_filtered

* Add comment to explain why use the smallest (index, Signature) to seed the iterator

* Impl ColumnIndexDeprecation for TransactionMemos (doesn't build)

* Update TransactionMemos put

* Add LedgerColumn::get_raw

* Fix read_transaction_memos

* Add TransactionMemos to purge_special_columns_exact

* Add TransactionMemos to compaction filter

* Take find_address_signatures out of service

* Remove faulty delete_new_column_key logic

* Simplify comments
This commit is contained in:
Tyera 2023-10-10 10:40:36 -06:00 committed by GitHub
parent cb695c7b32
commit 509d6acd2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 987 additions and 1298 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -146,6 +146,8 @@ pub enum BlockstoreError {
UnsupportedTransactionVersion,
#[error("missing transaction metadata")]
MissingTransactionMetadata,
#[error("transaction-index overflow")]
TransactionIndexOverflow,
}
pub type Result<T> = std::result::Result<T, BlockstoreError>;
@ -270,14 +272,14 @@ pub mod columns {
#[derive(Debug)]
/// The transaction status column
///
/// * index type: `(u64, `[`Signature`]`, `[`Slot`])`
/// * index type: `(`[`Signature`]`, `[`Slot`])`
/// * value type: [`generated::TransactionStatusMeta`]
pub struct TransactionStatus;
#[derive(Debug)]
/// The address signatures column
///
/// * index type: `(u64, `[`Pubkey`]`, `[`Slot`]`, `[`Signature`]`)`
/// * index type: `(`[`Pubkey`]`, `[`Slot`]`, u32, `[`Signature`]`)`
/// * value type: [`blockstore_meta::AddressSignatureMeta`]
pub struct AddressSignatures;
@ -614,6 +616,23 @@ impl Rocks {
self.db.iterator_cf(cf, iterator_mode)
}
fn iterator_cf_raw_key(
&self,
cf: &ColumnFamily,
iterator_mode: IteratorMode<Vec<u8>>,
) -> DBIterator {
let start_key;
let iterator_mode = match iterator_mode {
IteratorMode::From(start_from, direction) => {
start_key = start_from;
RocksIteratorMode::From(&start_key, direction)
}
IteratorMode::Start => RocksIteratorMode::Start,
IteratorMode::End => RocksIteratorMode::End,
};
self.db.iterator_cf(cf, iterator_mode)
}
fn raw_iterator_cf(&self, cf: &ColumnFamily) -> DBRawIterator {
self.db.raw_iterator_cf(cf)
}
@ -677,6 +696,9 @@ pub trait Column {
fn key(index: Self::Index) -> Vec<u8>;
fn index(key: &[u8]) -> Self::Index;
// This trait method is primarily used by `Database::delete_range_cf()`, and is therefore only
// relevant for columns keyed by Slot: ie. SlotColumns and columns that feature a Slot as the
// first item in the key.
fn as_index(slot: Slot) -> Self::Index;
fn slot(index: Self::Index) -> Slot;
}
@ -739,34 +761,58 @@ impl<T: SlotColumn> Column for T {
}
}
impl Column for columns::TransactionStatus {
type Index = (u64, Signature, Slot);
pub enum IndexError {
UnpackError,
}
fn key((index, signature, slot): (u64, Signature, Slot)) -> Vec<u8> {
let mut key = vec![0; 8 + 64 + 8]; // size_of u64 + size_of Signature + size_of Slot
BigEndian::write_u64(&mut key[0..8], index);
key[8..72].copy_from_slice(&signature.as_ref()[0..64]);
BigEndian::write_u64(&mut key[72..80], slot);
/// Helper trait to transition primary indexes out from the columns that are using them.
pub trait ColumnIndexDeprecation: Column {
const DEPRECATED_INDEX_LEN: usize;
const CURRENT_INDEX_LEN: usize;
type DeprecatedIndex;
fn deprecated_key(index: Self::DeprecatedIndex) -> Vec<u8>;
fn try_deprecated_index(key: &[u8]) -> std::result::Result<Self::DeprecatedIndex, IndexError>;
fn try_current_index(key: &[u8]) -> std::result::Result<Self::Index, IndexError>;
fn convert_index(deprecated_index: Self::DeprecatedIndex) -> Self::Index;
fn index(key: &[u8]) -> Self::Index {
if let Ok(index) = Self::try_current_index(key) {
index
} else if let Ok(index) = Self::try_deprecated_index(key) {
Self::convert_index(index)
} else {
// Way back in the day, we broke the TransactionStatus column key. This fallback
// preserves the existing logic for ancient keys, but realistically should never be
// executed.
Self::as_index(0)
}
}
}
impl Column for columns::TransactionStatus {
type Index = (Signature, Slot);
fn key((signature, slot): Self::Index) -> Vec<u8> {
let mut key = vec![0; Self::CURRENT_INDEX_LEN];
key[0..64].copy_from_slice(&signature.as_ref()[0..64]);
BigEndian::write_u64(&mut key[64..72], slot);
key
}
fn index(key: &[u8]) -> (u64, Signature, Slot) {
if key.len() != 80 {
Self::as_index(0)
} else {
let index = BigEndian::read_u64(&key[0..8]);
let signature = Signature::try_from(&key[8..72]).unwrap();
let slot = BigEndian::read_u64(&key[72..80]);
(index, signature, slot)
}
fn index(key: &[u8]) -> (Signature, Slot) {
<columns::TransactionStatus as ColumnIndexDeprecation>::index(key)
}
fn slot(index: Self::Index) -> Slot {
index.2
index.1
}
fn as_index(index: u64) -> Self::Index {
(index, Signature::default(), 0)
// The TransactionStatus column is not keyed by slot so this method is meaningless
// See Column::as_index() declaration for more details
fn as_index(_index: u64) -> Self::Index {
(Signature::default(), 0)
}
}
impl ColumnName for columns::TransactionStatus {
@ -776,63 +822,171 @@ impl ProtobufColumn for columns::TransactionStatus {
type Type = generated::TransactionStatusMeta;
}
impl Column for columns::AddressSignatures {
type Index = (u64, Pubkey, Slot, Signature);
impl ColumnIndexDeprecation for columns::TransactionStatus {
const DEPRECATED_INDEX_LEN: usize = 80;
const CURRENT_INDEX_LEN: usize = 72;
type DeprecatedIndex = (u64, Signature, Slot);
fn key((index, pubkey, slot, signature): (u64, Pubkey, Slot, Signature)) -> Vec<u8> {
let mut key = vec![0; 8 + 32 + 8 + 64]; // size_of u64 + size_of Pubkey + size_of Slot + size_of Signature
fn deprecated_key((index, signature, slot): Self::DeprecatedIndex) -> Vec<u8> {
let mut key = vec![0; Self::DEPRECATED_INDEX_LEN];
BigEndian::write_u64(&mut key[0..8], index);
key[8..40].copy_from_slice(&pubkey.as_ref()[0..32]);
BigEndian::write_u64(&mut key[40..48], slot);
key[48..112].copy_from_slice(&signature.as_ref()[0..64]);
key[8..72].copy_from_slice(&signature.as_ref()[0..64]);
BigEndian::write_u64(&mut key[72..80], slot);
key
}
fn index(key: &[u8]) -> (u64, Pubkey, Slot, Signature) {
let index = BigEndian::read_u64(&key[0..8]);
let pubkey = Pubkey::try_from(&key[8..40]).unwrap();
let slot = BigEndian::read_u64(&key[40..48]);
let signature = Signature::try_from(&key[48..112]).unwrap();
(index, pubkey, slot, signature)
fn try_deprecated_index(key: &[u8]) -> std::result::Result<Self::DeprecatedIndex, IndexError> {
if key.len() != Self::DEPRECATED_INDEX_LEN {
return Err(IndexError::UnpackError);
}
let primary_index = BigEndian::read_u64(&key[0..8]);
let signature = Signature::try_from(&key[8..72]).unwrap();
let slot = BigEndian::read_u64(&key[72..80]);
Ok((primary_index, signature, slot))
}
fn try_current_index(key: &[u8]) -> std::result::Result<Self::Index, IndexError> {
if key.len() != Self::CURRENT_INDEX_LEN {
return Err(IndexError::UnpackError);
}
let signature = Signature::try_from(&key[0..64]).unwrap();
let slot = BigEndian::read_u64(&key[64..72]);
Ok((signature, slot))
}
fn convert_index(deprecated_index: Self::DeprecatedIndex) -> Self::Index {
let (_primary_index, signature, slot) = deprecated_index;
(signature, slot)
}
}
impl Column for columns::AddressSignatures {
type Index = (Pubkey, Slot, u32, Signature);
fn key((pubkey, slot, transaction_index, signature): Self::Index) -> Vec<u8> {
let mut key = vec![0; Self::CURRENT_INDEX_LEN];
key[0..32].copy_from_slice(&pubkey.as_ref()[0..32]);
BigEndian::write_u64(&mut key[32..40], slot);
BigEndian::write_u32(&mut key[40..44], transaction_index);
key[44..108].copy_from_slice(&signature.as_ref()[0..64]);
key
}
fn index(key: &[u8]) -> Self::Index {
<columns::AddressSignatures as ColumnIndexDeprecation>::index(key)
}
fn slot(index: Self::Index) -> Slot {
index.2
index.1
}
fn as_index(index: u64) -> Self::Index {
(index, Pubkey::default(), 0, Signature::default())
// The AddressSignatures column is not keyed by slot so this method is meaningless
// See Column::as_index() declaration for more details
fn as_index(_index: u64) -> Self::Index {
(Pubkey::default(), 0, 0, Signature::default())
}
}
impl ColumnName for columns::AddressSignatures {
const NAME: &'static str = ADDRESS_SIGNATURES_CF;
}
impl Column for columns::TransactionMemos {
type Index = Signature;
impl ColumnIndexDeprecation for columns::AddressSignatures {
const DEPRECATED_INDEX_LEN: usize = 112;
const CURRENT_INDEX_LEN: usize = 108;
type DeprecatedIndex = (u64, Pubkey, Slot, Signature);
fn key(signature: Signature) -> Vec<u8> {
let mut key = vec![0; 64]; // size_of Signature
key[0..64].copy_from_slice(&signature.as_ref()[0..64]);
fn deprecated_key((primary_index, pubkey, slot, signature): Self::DeprecatedIndex) -> Vec<u8> {
let mut key = vec![0; Self::DEPRECATED_INDEX_LEN];
BigEndian::write_u64(&mut key[0..8], primary_index);
key[8..40].clone_from_slice(&pubkey.as_ref()[0..32]);
BigEndian::write_u64(&mut key[40..48], slot);
key[48..112].clone_from_slice(&signature.as_ref()[0..64]);
key
}
fn index(key: &[u8]) -> Signature {
Signature::try_from(&key[..64]).unwrap()
fn try_deprecated_index(key: &[u8]) -> std::result::Result<Self::DeprecatedIndex, IndexError> {
if key.len() != Self::DEPRECATED_INDEX_LEN {
return Err(IndexError::UnpackError);
}
let primary_index = BigEndian::read_u64(&key[0..8]);
let pubkey = Pubkey::try_from(&key[8..40]).unwrap();
let slot = BigEndian::read_u64(&key[40..48]);
let signature = Signature::try_from(&key[48..112]).unwrap();
Ok((primary_index, pubkey, slot, signature))
}
fn slot(_index: Self::Index) -> Slot {
unimplemented!()
fn try_current_index(key: &[u8]) -> std::result::Result<Self::Index, IndexError> {
if key.len() != Self::CURRENT_INDEX_LEN {
return Err(IndexError::UnpackError);
}
let pubkey = Pubkey::try_from(&key[0..32]).unwrap();
let slot = BigEndian::read_u64(&key[32..40]);
let transaction_index = BigEndian::read_u32(&key[40..44]);
let signature = Signature::try_from(&key[44..108]).unwrap();
Ok((pubkey, slot, transaction_index, signature))
}
fn as_index(_index: u64) -> Self::Index {
Signature::default()
fn convert_index(deprecated_index: Self::DeprecatedIndex) -> Self::Index {
let (_primary_index, pubkey, slot, signature) = deprecated_index;
(pubkey, slot, 0, signature)
}
}
impl Column for columns::TransactionMemos {
type Index = (Signature, Slot);
fn key((signature, slot): Self::Index) -> Vec<u8> {
let mut key = vec![0; Self::CURRENT_INDEX_LEN];
key[0..64].copy_from_slice(&signature.as_ref()[0..64]);
BigEndian::write_u64(&mut key[64..72], slot);
key
}
fn index(key: &[u8]) -> Self::Index {
<columns::TransactionMemos as ColumnIndexDeprecation>::index(key)
}
fn slot(index: Self::Index) -> Slot {
index.1
}
fn as_index(index: u64) -> Self::Index {
(Signature::default(), index)
}
}
impl ColumnName for columns::TransactionMemos {
const NAME: &'static str = TRANSACTION_MEMOS_CF;
}
impl ColumnIndexDeprecation for columns::TransactionMemos {
const DEPRECATED_INDEX_LEN: usize = 64;
const CURRENT_INDEX_LEN: usize = 72;
type DeprecatedIndex = Signature;
fn deprecated_key(signature: Self::DeprecatedIndex) -> Vec<u8> {
let mut key = vec![0; Self::DEPRECATED_INDEX_LEN];
key[0..64].copy_from_slice(&signature.as_ref()[0..64]);
key
}
fn try_deprecated_index(key: &[u8]) -> std::result::Result<Self::DeprecatedIndex, IndexError> {
Signature::try_from(&key[..64]).map_err(|_| IndexError::UnpackError)
}
fn try_current_index(key: &[u8]) -> std::result::Result<Self::Index, IndexError> {
if key.len() != Self::CURRENT_INDEX_LEN {
return Err(IndexError::UnpackError);
}
let signature = Signature::try_from(&key[0..64]).unwrap();
let slot = BigEndian::read_u64(&key[64..72]);
Ok((signature, slot))
}
fn convert_index(deprecated_index: Self::DeprecatedIndex) -> Self::Index {
(deprecated_index, 0)
}
}
impl Column for columns::TransactionStatusIndex {
type Index = u64;
@ -1456,12 +1610,16 @@ where
}
pub fn get(&self, key: C::Index) -> Result<Option<C::Type>> {
self.get_raw(&C::key(key))
}
pub fn get_raw(&self, key: &[u8]) -> Result<Option<C::Type>> {
let mut result = Ok(None);
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
&self.read_perf_status,
);
if let Some(pinnable_slice) = self.backend.get_pinned_cf(self.handle(), &C::key(key))? {
if let Some(pinnable_slice) = self.backend.get_pinned_cf(self.handle(), key)? {
let value = deserialize(pinnable_slice.as_ref())?;
result = Ok(Some(value))
}
@ -1507,12 +1665,19 @@ where
pub fn get_protobuf_or_bincode<T: DeserializeOwned + Into<C::Type>>(
&self,
key: C::Index,
) -> Result<Option<C::Type>> {
self.get_raw_protobuf_or_bincode::<T>(&C::key(key))
}
pub(crate) fn get_raw_protobuf_or_bincode<T: DeserializeOwned + Into<C::Type>>(
&self,
key: &[u8],
) -> Result<Option<C::Type>> {
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
&self.read_perf_status,
);
let result = self.backend.get_pinned_cf(self.handle(), &C::key(key));
let result = self.backend.get_pinned_cf(self.handle(), key);
if let Some(op_start_instant) = is_perf_enabled {
report_rocksdb_read_perf(
C::NAME,
@ -1577,6 +1742,45 @@ where
}
}
impl<C> LedgerColumn<C>
where
C: ColumnIndexDeprecation + ColumnName,
{
pub(crate) fn iter_current_index_filtered(
&self,
iterator_mode: IteratorMode<C::Index>,
) -> Result<impl Iterator<Item = (C::Index, Box<[u8]>)> + '_> {
let cf = self.handle();
let iter = self.backend.iterator_cf::<C>(cf, iterator_mode);
Ok(iter.filter_map(|pair| {
let (key, value) = pair.unwrap();
C::try_current_index(&key).ok().map(|index| (index, value))
}))
}
pub(crate) fn iter_deprecated_index_filtered(
&self,
iterator_mode: IteratorMode<C::DeprecatedIndex>,
) -> Result<impl Iterator<Item = (C::DeprecatedIndex, Box<[u8]>)> + '_> {
let cf = self.handle();
let iterator_mode_raw_key = match iterator_mode {
IteratorMode::Start => IteratorMode::Start,
IteratorMode::End => IteratorMode::End,
IteratorMode::From(start_from, direction) => {
let raw_key = C::deprecated_key(start_from);
IteratorMode::From(raw_key, direction)
}
};
let iter = self.backend.iterator_cf_raw_key(cf, iterator_mode_raw_key);
Ok(iter.filter_map(|pair| {
let (key, value) = pair.unwrap();
C::try_deprecated_index(&key)
.ok()
.map(|index| (index, value))
}))
}
}
impl<'a> WriteBatch<'a> {
pub fn put_bytes<C: Column + ColumnName>(&mut self, key: C::Index, bytes: &[u8]) -> Result<()> {
self.write_batch
@ -1585,7 +1789,11 @@ impl<'a> WriteBatch<'a> {
}
pub fn delete<C: Column + ColumnName>(&mut self, key: C::Index) -> Result<()> {
self.write_batch.delete_cf(self.get_cf::<C>(), C::key(key));
self.delete_raw::<C>(&C::key(key))
}
pub(crate) fn delete_raw<C: Column + ColumnName>(&mut self, key: &[u8]) -> Result<()> {
self.write_batch.delete_cf(self.get_cf::<C>(), key);
Ok(())
}
@ -1882,7 +2090,9 @@ fn should_enable_cf_compaction(cf_name: &str) -> bool {
// completed on a given range or file.
matches!(
cf_name,
columns::TransactionStatus::NAME | columns::AddressSignatures::NAME
columns::TransactionStatus::NAME
| columns::TransactionMemos::NAME
| columns::AddressSignatures::NAME
)
}
@ -1976,4 +2186,44 @@ pub mod tests {
});
assert!(!should_enable_cf_compaction("something else"));
}
impl<C> LedgerColumn<C>
where
C: ColumnIndexDeprecation + ProtobufColumn + ColumnName,
{
pub fn put_deprecated_protobuf(
&self,
key: C::DeprecatedIndex,
value: &C::Type,
) -> Result<()> {
let mut buf = Vec::with_capacity(value.encoded_len());
value.encode(&mut buf)?;
self.backend
.put_cf(self.handle(), &C::deprecated_key(key), &buf)
}
}
impl<C> LedgerColumn<C>
where
C: ColumnIndexDeprecation + TypedColumn + ColumnName,
{
pub fn put_deprecated(&self, key: C::DeprecatedIndex, value: &C::Type) -> Result<()> {
let serialized_value = serialize(value)?;
self.backend
.put_cf(self.handle(), &C::deprecated_key(key), &serialized_value)
}
}
impl<C> LedgerColumn<C>
where
C: ColumnIndexDeprecation + ColumnName,
{
pub(crate) fn iterator_cf_raw_key(
&self,
iterator_mode: IteratorMode<Vec<u8>>,
) -> DBIterator {
let cf = self.handle();
self.backend.iterator_cf_raw_key(cf, iterator_mode)
}
}
}

View File

@ -188,7 +188,7 @@ impl TransactionStatusService {
if enable_rpc_transaction_history {
if let Some(memos) = extract_and_fmt_memos(transaction.message()) {
blockstore
.write_transaction_memos(transaction.signature(), memos)
.write_transaction_memos(transaction.signature(), slot, memos)
.expect("Expect database write to succeed: TransactionMemos");
}
@ -199,6 +199,7 @@ impl TransactionStatusService {
tx_account_locks.writable,
tx_account_locks.readonly,
transaction_status_meta,
transaction_index,
)
.expect("Expect database write to succeed: TransactionStatus");
}