diff --git a/storage-bigtable/src/lib.rs b/storage-bigtable/src/lib.rs index 88da8d5185..179677936f 100644 --- a/storage-bigtable/src/lib.rs +++ b/storage-bigtable/src/lib.rs @@ -70,6 +70,14 @@ fn slot_to_key(slot: Slot) -> String { format!("{:016x}", slot) } +fn slot_to_blocks_key(slot: Slot) -> String { + slot_to_key(slot) +} + +fn slot_to_tx_by_addr_key(slot: Slot) -> String { + slot_to_key(!slot) +} + // Reverse of `slot_to_key` fn key_to_slot(key: &str) -> Option { match Slot::from_str_radix(key, 16) { @@ -257,7 +265,7 @@ impl From for StoredConfirmedBlockReward { } // A serialized `TransactionInfo` is stored in the `tx` table -#[derive(Serialize, Deserialize, PartialEq)] +#[derive(Serialize, Deserialize, PartialEq, Debug)] struct TransactionInfo { slot: Slot, // The slot that contains the block with this transaction in it index: u32, // Where the transaction is located in the block @@ -265,6 +273,24 @@ struct TransactionInfo { memo: Option, // Transaction memo } +// Part of a serialized `TransactionInfo` which is stored in the `tx` table +#[derive(PartialEq, Debug)] +struct UploadedTransaction { + slot: Slot, // The slot that contains the block with this transaction in it + index: u32, // Where the transaction is located in the block + err: Option, // None if the transaction executed successfully +} + +impl From for UploadedTransaction { + fn from(transaction_info: TransactionInfo) -> Self { + Self { + slot: transaction_info.slot, + index: transaction_info.index, + err: transaction_info.err, + } + } +} + impl From for TransactionStatus { fn from(transaction_info: TransactionInfo) -> Self { let TransactionInfo { slot, err, .. } = transaction_info; @@ -339,7 +365,12 @@ impl LedgerStorage { pub async fn get_confirmed_blocks(&self, start_slot: Slot, limit: usize) -> Result> { let mut bigtable = self.connection.client(); let blocks = bigtable - .get_row_keys("blocks", Some(slot_to_key(start_slot)), None, limit as i64) + .get_row_keys( + "blocks", + Some(slot_to_blocks_key(start_slot)), + None, + limit as i64, + ) .await?; Ok(blocks.into_iter().filter_map(|s| key_to_slot(&s)).collect()) } @@ -350,7 +381,7 @@ impl LedgerStorage { let block_cell_data = bigtable .get_protobuf_or_bincode_cell::( "blocks", - slot_to_key(slot), + slot_to_blocks_key(slot), ) .await .map_err(|err| match err { @@ -360,7 +391,7 @@ impl LedgerStorage { Ok(match block_cell_data { bigtable::CellData::Bincode(block) => block.into(), bigtable::CellData::Protobuf(block) => block.try_into().map_err(|_err| { - bigtable::Error::ObjectCorrupt(format!("blocks/{}", slot_to_key(slot))) + bigtable::Error::ObjectCorrupt(format!("blocks/{}", slot_to_blocks_key(slot))) })?, }) } @@ -469,7 +500,7 @@ impl LedgerStorage { let starting_slot_tx_len = bigtable .get_protobuf_or_bincode_cell::, tx_by_addr::TransactionByAddr>( "tx-by-addr", - format!("{}{}", address_prefix, slot_to_key(!first_slot)), + format!("{}{}", address_prefix, slot_to_tx_by_addr_key(first_slot)), ) .await .map(|cell_data| { @@ -485,8 +516,16 @@ impl LedgerStorage { let tx_by_addr_data = bigtable .get_row_data( "tx-by-addr", - Some(format!("{}{}", address_prefix, slot_to_key(!first_slot))), - Some(format!("{}{}", address_prefix, slot_to_key(!last_slot))), + Some(format!( + "{}{}", + address_prefix, + slot_to_tx_by_addr_key(first_slot), + )), + Some(format!( + "{}{}", + address_prefix, + slot_to_tx_by_addr_key(last_slot), + )), limit as i64 + starting_slot_tx_len as i64, ) .await?; @@ -596,7 +635,7 @@ impl LedgerStorage { .into_iter() .map(|(address, transaction_info_by_addr)| { ( - format!("{}/{}", address, slot_to_key(!slot)), + format!("{}/{}", address, slot_to_tx_by_addr_key(slot)), tx_by_addr::TransactionByAddr { tx_by_addrs: transaction_info_by_addr .into_iter() @@ -629,7 +668,7 @@ impl LedgerStorage { // Store the block itself last, after all other metadata about the block has been // successfully stored. This avoids partial uploaded blocks from becoming visible to // `get_confirmed_block()` and `get_confirmed_blocks()` - let blocks_cells = [(slot_to_key(slot), confirmed_block.into())]; + let blocks_cells = [(slot_to_blocks_key(slot), confirmed_block.into())]; bytes_written += self .connection .put_protobuf_cells_with_retry::("blocks", &blocks_cells) @@ -645,14 +684,13 @@ impl LedgerStorage { // Delete a confirmed block and associated meta data. pub async fn delete_confirmed_block(&self, slot: Slot, dry_run: bool) -> Result<()> { let mut addresses: HashSet<&Pubkey> = HashSet::new(); - let mut expected_tx_infos: HashMap = HashMap::new(); + let mut expected_tx_infos: HashMap = HashMap::new(); let confirmed_block = self.get_confirmed_block(slot).await?; for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() { let TransactionWithStatusMeta { meta, transaction } = transaction_with_meta; let signature = transaction.signatures[0]; let index = index as u32; let err = meta.as_ref().and_then(|meta| meta.status.clone().err()); - let memo = extract_and_fmt_memos(&transaction.message); for address in &transaction.message.account_keys { if !is_sysvar_id(address) { @@ -662,18 +700,13 @@ impl LedgerStorage { expected_tx_infos.insert( signature.to_string(), - TransactionInfo { - slot, - index, - err, - memo, - }, + UploadedTransaction { slot, index, err }, ); } let address_slot_rows: Vec<_> = addresses .into_iter() - .map(|address| format!("{}/{}", address, slot_to_key(!slot))) + .map(|address| format!("{}/{}", address, slot_to_tx_by_addr_key(slot))) .collect(); let tx_deletion_rows = if !expected_tx_infos.is_empty() { @@ -682,12 +715,13 @@ impl LedgerStorage { .map(|(signature, _info)| signature) .cloned() .collect::>(); - let fetched_tx_infos = self - .connection - .get_bincode_cells_with_retry::("tx", &signatures) - .await? - .into_iter() - .collect::>(); + let fetched_tx_infos: HashMap> = + self.connection + .get_bincode_cells_with_retry::("tx", &signatures) + .await? + .into_iter() + .map(|(signature, tx_info_res)| (signature, tx_info_res.map(Into::into))) + .collect::>(); let mut deletion_rows = Vec::with_capacity(expected_tx_infos.len()); for (signature, expected_tx_info) in expected_tx_infos { @@ -695,10 +729,12 @@ impl LedgerStorage { Some(Ok(fetched_tx_info)) if fetched_tx_info == &expected_tx_info => { deletion_rows.push(signature); } - Some(Ok(_)) => { + Some(Ok(fetched_tx_info)) => { warn!( - "skipped tx row {} because the bigtable entry did not match", - signature + "skipped tx row {} because the bigtable entry ({:?}) did not match to {:?}", + signature, + fetched_tx_info, + &expected_tx_info, ); } Some(Err(err)) => { @@ -731,7 +767,7 @@ impl LedgerStorage { } self.connection - .delete_rows_with_retry("blocks", &[slot.to_string()]) + .delete_rows_with_retry("blocks", &[slot_to_blocks_key(slot)]) .await?; }