Fix memo handling and "blocks" key formatting (#20044)
* Fix memo handling and "blocks" key formatting * Skip memo equality check * Define slot_to_tx_by_addr_key Co-authored-by: Justin Starry <justin@solana.com>
This commit is contained in:
parent
1c04b42839
commit
795dde109c
|
@ -70,6 +70,14 @@ fn slot_to_key(slot: Slot) -> String {
|
||||||
format!("{:016x}", slot)
|
format!("{:016x}", slot)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn slot_to_blocks_key(slot: Slot) -> String {
|
||||||
|
slot_to_key(slot)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn slot_to_tx_by_addr_key(slot: Slot) -> String {
|
||||||
|
slot_to_key(!slot)
|
||||||
|
}
|
||||||
|
|
||||||
// Reverse of `slot_to_key`
|
// Reverse of `slot_to_key`
|
||||||
fn key_to_slot(key: &str) -> Option<Slot> {
|
fn key_to_slot(key: &str) -> Option<Slot> {
|
||||||
match Slot::from_str_radix(key, 16) {
|
match Slot::from_str_radix(key, 16) {
|
||||||
|
@ -257,7 +265,7 @@ impl From<Reward> for StoredConfirmedBlockReward {
|
||||||
}
|
}
|
||||||
|
|
||||||
// A serialized `TransactionInfo` is stored in the `tx` table
|
// A serialized `TransactionInfo` is stored in the `tx` table
|
||||||
#[derive(Serialize, Deserialize, PartialEq)]
|
#[derive(Serialize, Deserialize, PartialEq, Debug)]
|
||||||
struct TransactionInfo {
|
struct TransactionInfo {
|
||||||
slot: Slot, // The slot that contains the block with this transaction in it
|
slot: Slot, // The slot that contains the block with this transaction in it
|
||||||
index: u32, // Where the transaction is located in the block
|
index: u32, // Where the transaction is located in the block
|
||||||
|
@ -265,6 +273,24 @@ struct TransactionInfo {
|
||||||
memo: Option<String>, // Transaction memo
|
memo: Option<String>, // Transaction memo
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Part of a serialized `TransactionInfo` which is stored in the `tx` table
|
||||||
|
#[derive(PartialEq, Debug)]
|
||||||
|
struct UploadedTransaction {
|
||||||
|
slot: Slot, // The slot that contains the block with this transaction in it
|
||||||
|
index: u32, // Where the transaction is located in the block
|
||||||
|
err: Option<TransactionError>, // None if the transaction executed successfully
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<TransactionInfo> for UploadedTransaction {
|
||||||
|
fn from(transaction_info: TransactionInfo) -> Self {
|
||||||
|
Self {
|
||||||
|
slot: transaction_info.slot,
|
||||||
|
index: transaction_info.index,
|
||||||
|
err: transaction_info.err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<TransactionInfo> for TransactionStatus {
|
impl From<TransactionInfo> for TransactionStatus {
|
||||||
fn from(transaction_info: TransactionInfo) -> Self {
|
fn from(transaction_info: TransactionInfo) -> Self {
|
||||||
let TransactionInfo { slot, err, .. } = transaction_info;
|
let TransactionInfo { slot, err, .. } = transaction_info;
|
||||||
|
@ -339,7 +365,12 @@ impl LedgerStorage {
|
||||||
pub async fn get_confirmed_blocks(&self, start_slot: Slot, limit: usize) -> Result<Vec<Slot>> {
|
pub async fn get_confirmed_blocks(&self, start_slot: Slot, limit: usize) -> Result<Vec<Slot>> {
|
||||||
let mut bigtable = self.connection.client();
|
let mut bigtable = self.connection.client();
|
||||||
let blocks = bigtable
|
let blocks = bigtable
|
||||||
.get_row_keys("blocks", Some(slot_to_key(start_slot)), None, limit as i64)
|
.get_row_keys(
|
||||||
|
"blocks",
|
||||||
|
Some(slot_to_blocks_key(start_slot)),
|
||||||
|
None,
|
||||||
|
limit as i64,
|
||||||
|
)
|
||||||
.await?;
|
.await?;
|
||||||
Ok(blocks.into_iter().filter_map(|s| key_to_slot(&s)).collect())
|
Ok(blocks.into_iter().filter_map(|s| key_to_slot(&s)).collect())
|
||||||
}
|
}
|
||||||
|
@ -350,7 +381,7 @@ impl LedgerStorage {
|
||||||
let block_cell_data = bigtable
|
let block_cell_data = bigtable
|
||||||
.get_protobuf_or_bincode_cell::<StoredConfirmedBlock, generated::ConfirmedBlock>(
|
.get_protobuf_or_bincode_cell::<StoredConfirmedBlock, generated::ConfirmedBlock>(
|
||||||
"blocks",
|
"blocks",
|
||||||
slot_to_key(slot),
|
slot_to_blocks_key(slot),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map_err(|err| match err {
|
.map_err(|err| match err {
|
||||||
|
@ -360,7 +391,7 @@ impl LedgerStorage {
|
||||||
Ok(match block_cell_data {
|
Ok(match block_cell_data {
|
||||||
bigtable::CellData::Bincode(block) => block.into(),
|
bigtable::CellData::Bincode(block) => block.into(),
|
||||||
bigtable::CellData::Protobuf(block) => block.try_into().map_err(|_err| {
|
bigtable::CellData::Protobuf(block) => block.try_into().map_err(|_err| {
|
||||||
bigtable::Error::ObjectCorrupt(format!("blocks/{}", slot_to_key(slot)))
|
bigtable::Error::ObjectCorrupt(format!("blocks/{}", slot_to_blocks_key(slot)))
|
||||||
})?,
|
})?,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -469,7 +500,7 @@ impl LedgerStorage {
|
||||||
let starting_slot_tx_len = bigtable
|
let starting_slot_tx_len = bigtable
|
||||||
.get_protobuf_or_bincode_cell::<Vec<LegacyTransactionByAddrInfo>, tx_by_addr::TransactionByAddr>(
|
.get_protobuf_or_bincode_cell::<Vec<LegacyTransactionByAddrInfo>, tx_by_addr::TransactionByAddr>(
|
||||||
"tx-by-addr",
|
"tx-by-addr",
|
||||||
format!("{}{}", address_prefix, slot_to_key(!first_slot)),
|
format!("{}{}", address_prefix, slot_to_tx_by_addr_key(first_slot)),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map(|cell_data| {
|
.map(|cell_data| {
|
||||||
|
@ -485,8 +516,16 @@ impl LedgerStorage {
|
||||||
let tx_by_addr_data = bigtable
|
let tx_by_addr_data = bigtable
|
||||||
.get_row_data(
|
.get_row_data(
|
||||||
"tx-by-addr",
|
"tx-by-addr",
|
||||||
Some(format!("{}{}", address_prefix, slot_to_key(!first_slot))),
|
Some(format!(
|
||||||
Some(format!("{}{}", address_prefix, slot_to_key(!last_slot))),
|
"{}{}",
|
||||||
|
address_prefix,
|
||||||
|
slot_to_tx_by_addr_key(first_slot),
|
||||||
|
)),
|
||||||
|
Some(format!(
|
||||||
|
"{}{}",
|
||||||
|
address_prefix,
|
||||||
|
slot_to_tx_by_addr_key(last_slot),
|
||||||
|
)),
|
||||||
limit as i64 + starting_slot_tx_len as i64,
|
limit as i64 + starting_slot_tx_len as i64,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -596,7 +635,7 @@ impl LedgerStorage {
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(address, transaction_info_by_addr)| {
|
.map(|(address, transaction_info_by_addr)| {
|
||||||
(
|
(
|
||||||
format!("{}/{}", address, slot_to_key(!slot)),
|
format!("{}/{}", address, slot_to_tx_by_addr_key(slot)),
|
||||||
tx_by_addr::TransactionByAddr {
|
tx_by_addr::TransactionByAddr {
|
||||||
tx_by_addrs: transaction_info_by_addr
|
tx_by_addrs: transaction_info_by_addr
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
@ -629,7 +668,7 @@ impl LedgerStorage {
|
||||||
// Store the block itself last, after all other metadata about the block has been
|
// Store the block itself last, after all other metadata about the block has been
|
||||||
// successfully stored. This avoids partial uploaded blocks from becoming visible to
|
// successfully stored. This avoids partial uploaded blocks from becoming visible to
|
||||||
// `get_confirmed_block()` and `get_confirmed_blocks()`
|
// `get_confirmed_block()` and `get_confirmed_blocks()`
|
||||||
let blocks_cells = [(slot_to_key(slot), confirmed_block.into())];
|
let blocks_cells = [(slot_to_blocks_key(slot), confirmed_block.into())];
|
||||||
bytes_written += self
|
bytes_written += self
|
||||||
.connection
|
.connection
|
||||||
.put_protobuf_cells_with_retry::<generated::ConfirmedBlock>("blocks", &blocks_cells)
|
.put_protobuf_cells_with_retry::<generated::ConfirmedBlock>("blocks", &blocks_cells)
|
||||||
|
@ -645,14 +684,13 @@ impl LedgerStorage {
|
||||||
// Delete a confirmed block and associated meta data.
|
// Delete a confirmed block and associated meta data.
|
||||||
pub async fn delete_confirmed_block(&self, slot: Slot, dry_run: bool) -> Result<()> {
|
pub async fn delete_confirmed_block(&self, slot: Slot, dry_run: bool) -> Result<()> {
|
||||||
let mut addresses: HashSet<&Pubkey> = HashSet::new();
|
let mut addresses: HashSet<&Pubkey> = HashSet::new();
|
||||||
let mut expected_tx_infos: HashMap<String, TransactionInfo> = HashMap::new();
|
let mut expected_tx_infos: HashMap<String, UploadedTransaction> = HashMap::new();
|
||||||
let confirmed_block = self.get_confirmed_block(slot).await?;
|
let confirmed_block = self.get_confirmed_block(slot).await?;
|
||||||
for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() {
|
for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() {
|
||||||
let TransactionWithStatusMeta { meta, transaction } = transaction_with_meta;
|
let TransactionWithStatusMeta { meta, transaction } = transaction_with_meta;
|
||||||
let signature = transaction.signatures[0];
|
let signature = transaction.signatures[0];
|
||||||
let index = index as u32;
|
let index = index as u32;
|
||||||
let err = meta.as_ref().and_then(|meta| meta.status.clone().err());
|
let err = meta.as_ref().and_then(|meta| meta.status.clone().err());
|
||||||
let memo = extract_and_fmt_memos(&transaction.message);
|
|
||||||
|
|
||||||
for address in &transaction.message.account_keys {
|
for address in &transaction.message.account_keys {
|
||||||
if !is_sysvar_id(address) {
|
if !is_sysvar_id(address) {
|
||||||
|
@ -662,18 +700,13 @@ impl LedgerStorage {
|
||||||
|
|
||||||
expected_tx_infos.insert(
|
expected_tx_infos.insert(
|
||||||
signature.to_string(),
|
signature.to_string(),
|
||||||
TransactionInfo {
|
UploadedTransaction { slot, index, err },
|
||||||
slot,
|
|
||||||
index,
|
|
||||||
err,
|
|
||||||
memo,
|
|
||||||
},
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let address_slot_rows: Vec<_> = addresses
|
let address_slot_rows: Vec<_> = addresses
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|address| format!("{}/{}", address, slot_to_key(!slot)))
|
.map(|address| format!("{}/{}", address, slot_to_tx_by_addr_key(slot)))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let tx_deletion_rows = if !expected_tx_infos.is_empty() {
|
let tx_deletion_rows = if !expected_tx_infos.is_empty() {
|
||||||
|
@ -682,12 +715,13 @@ impl LedgerStorage {
|
||||||
.map(|(signature, _info)| signature)
|
.map(|(signature, _info)| signature)
|
||||||
.cloned()
|
.cloned()
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
let fetched_tx_infos = self
|
let fetched_tx_infos: HashMap<String, std::result::Result<UploadedTransaction, _>> =
|
||||||
.connection
|
self.connection
|
||||||
.get_bincode_cells_with_retry::<TransactionInfo>("tx", &signatures)
|
.get_bincode_cells_with_retry::<TransactionInfo>("tx", &signatures)
|
||||||
.await?
|
.await?
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.collect::<HashMap<_, _>>();
|
.map(|(signature, tx_info_res)| (signature, tx_info_res.map(Into::into)))
|
||||||
|
.collect::<HashMap<_, _>>();
|
||||||
|
|
||||||
let mut deletion_rows = Vec::with_capacity(expected_tx_infos.len());
|
let mut deletion_rows = Vec::with_capacity(expected_tx_infos.len());
|
||||||
for (signature, expected_tx_info) in expected_tx_infos {
|
for (signature, expected_tx_info) in expected_tx_infos {
|
||||||
|
@ -695,10 +729,12 @@ impl LedgerStorage {
|
||||||
Some(Ok(fetched_tx_info)) if fetched_tx_info == &expected_tx_info => {
|
Some(Ok(fetched_tx_info)) if fetched_tx_info == &expected_tx_info => {
|
||||||
deletion_rows.push(signature);
|
deletion_rows.push(signature);
|
||||||
}
|
}
|
||||||
Some(Ok(_)) => {
|
Some(Ok(fetched_tx_info)) => {
|
||||||
warn!(
|
warn!(
|
||||||
"skipped tx row {} because the bigtable entry did not match",
|
"skipped tx row {} because the bigtable entry ({:?}) did not match to {:?}",
|
||||||
signature
|
signature,
|
||||||
|
fetched_tx_info,
|
||||||
|
&expected_tx_info,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
Some(Err(err)) => {
|
Some(Err(err)) => {
|
||||||
|
@ -731,7 +767,7 @@ impl LedgerStorage {
|
||||||
}
|
}
|
||||||
|
|
||||||
self.connection
|
self.connection
|
||||||
.delete_rows_with_retry("blocks", &[slot.to_string()])
|
.delete_rows_with_retry("blocks", &[slot_to_blocks_key(slot)])
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue