diff --git a/ledger/src/bigtable_upload.rs b/ledger/src/bigtable_upload.rs index be28ee8a07..2a076d46be 100644 --- a/ledger/src/bigtable_upload.rs +++ b/ledger/src/bigtable_upload.rs @@ -178,10 +178,10 @@ pub async fn upload_confirmed_blocks( break; } - let _ = match blockstore.get_rooted_block(slot, true) { - Ok(confirmed_block) => { + let _ = match blockstore.get_rooted_block_with_entries(slot, true) { + Ok(confirmed_block_with_entries) => { num_blocks_read += 1; - sender.send((slot, Some(confirmed_block))) + sender.send((slot, Some(confirmed_block_with_entries))) } Err(err) => { warn!( @@ -227,7 +227,8 @@ pub async fn upload_confirmed_blocks( Some(confirmed_block) => { let bt = bigtable.clone(); Some(tokio::spawn(async move { - bt.upload_confirmed_block(slot, confirmed_block).await + bt.upload_confirmed_block_with_entries(slot, confirmed_block) + .await })) } }); diff --git a/storage-bigtable/init-bigtable.sh b/storage-bigtable/init-bigtable.sh index 3b988e2ef6..43ea293bb7 100755 --- a/storage-bigtable/init-bigtable.sh +++ b/storage-bigtable/init-bigtable.sh @@ -16,7 +16,7 @@ if [[ -n $BIGTABLE_EMULATOR_HOST ]]; then cbt+=(-project emulator) fi -for table in blocks tx tx-by-addr; do +for table in blocks entries tx tx-by-addr; do ( set -x "${cbt[@]}" createtable $table diff --git a/storage-bigtable/src/lib.rs b/storage-bigtable/src/lib.rs index 5b9652de94..b1f889e203 100644 --- a/storage-bigtable/src/lib.rs +++ b/storage-bigtable/src/lib.rs @@ -15,12 +15,13 @@ use { timing::AtomicInterval, transaction::{TransactionError, VersionedTransaction}, }, - solana_storage_proto::convert::{generated, tx_by_addr}, + solana_storage_proto::convert::{entries, generated, tx_by_addr}, solana_transaction_status::{ extract_and_fmt_memos, ConfirmedBlock, ConfirmedTransactionStatusWithSignature, ConfirmedTransactionWithStatusMeta, Reward, TransactionByAddrInfo, TransactionConfirmationStatus, TransactionStatus, TransactionStatusMeta, - TransactionWithStatusMeta, VersionedConfirmedBlock, VersionedTransactionWithStatusMeta, + TransactionWithStatusMeta, VersionedConfirmedBlock, VersionedConfirmedBlockWithEntries, + VersionedTransactionWithStatusMeta, }, std::{ collections::{HashMap, HashSet}, @@ -91,6 +92,10 @@ fn slot_to_blocks_key(slot: Slot) -> String { slot_to_key(slot) } +fn slot_to_entries_key(slot: Slot) -> String { + slot_to_key(slot) +} + fn slot_to_tx_by_addr_key(slot: Slot) -> String { slot_to_key(!slot) } @@ -883,7 +888,30 @@ impl LedgerStorage { "LedgerStorage::upload_confirmed_block request received: {:?}", slot ); + self.upload_confirmed_block_with_entries( + slot, + VersionedConfirmedBlockWithEntries { + block: confirmed_block, + entries: vec![], + }, + ) + .await + } + + pub async fn upload_confirmed_block_with_entries( + &self, + slot: Slot, + confirmed_block: VersionedConfirmedBlockWithEntries, + ) -> Result<()> { + trace!( + "LedgerStorage::upload_confirmed_block_with_entries request received: {:?}", + slot + ); let mut by_addr: HashMap<&Pubkey, Vec> = HashMap::new(); + let VersionedConfirmedBlockWithEntries { + block: confirmed_block, + entries, + } = confirmed_block; let mut tx_cells = Vec::with_capacity(confirmed_block.transactions.len()); for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() { @@ -934,6 +962,14 @@ impl LedgerStorage { }) .collect(); + let num_entries = entries.len(); + let entry_cell = ( + slot_to_entries_key(slot), + entries::Entries { + entries: entries.into_iter().enumerate().map(Into::into).collect(), + }, + ); + let mut tasks = vec![]; if !tx_cells.is_empty() { @@ -955,6 +991,14 @@ impl LedgerStorage { })); } + if num_entries > 0 { + let conn = self.connection.clone(); + tasks.push(tokio::spawn(async move { + conn.put_protobuf_cells_with_retry::("entries", &[entry_cell]) + .await + })); + } + let mut bytes_written = 0; let mut maybe_first_err: Option = None; @@ -995,6 +1039,7 @@ impl LedgerStorage { "storage-bigtable-upload-block", ("slot", slot, i64), ("transactions", num_transactions, i64), + ("entries", num_entries, i64), ("bytes", bytes_written, i64), ); Ok(()) diff --git a/storage-proto/build.rs b/storage-proto/build.rs index 947f562c1c..583a95650e 100644 --- a/storage-proto/build.rs +++ b/storage-proto/build.rs @@ -6,7 +6,11 @@ fn main() -> Result<(), std::io::Error> { } let proto_base_path = std::path::PathBuf::from("proto"); - let proto_files = ["confirmed_block.proto", "transaction_by_addr.proto"]; + let proto_files = [ + "confirmed_block.proto", + "entries.proto", + "transaction_by_addr.proto", + ]; let mut protos = Vec::new(); for proto_file in &proto_files { let proto = proto_base_path.join(proto_file); diff --git a/storage-proto/proto/entries.proto b/storage-proto/proto/entries.proto new file mode 100644 index 0000000000..64108925ad --- /dev/null +++ b/storage-proto/proto/entries.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package solana.storage.Entries; + +message Entries { + repeated Entry entries = 1; +} + +message Entry { + uint32 index = 1; + uint64 num_hashes = 2; + bytes hash = 3; + uint64 num_transactions = 4; + uint32 starting_transaction_index = 5; +} diff --git a/storage-proto/src/convert.rs b/storage-proto/src/convert.rs index 7ca5728d39..abb1e84885 100644 --- a/storage-proto/src/convert.rs +++ b/storage-proto/src/convert.rs @@ -15,7 +15,7 @@ use { transaction_context::TransactionReturnData, }, solana_transaction_status::{ - ConfirmedBlock, InnerInstruction, InnerInstructions, Reward, RewardType, + ConfirmedBlock, EntrySummary, InnerInstruction, InnerInstructions, Reward, RewardType, TransactionByAddrInfo, TransactionStatusMeta, TransactionTokenBalance, TransactionWithStatusMeta, VersionedConfirmedBlock, VersionedTransactionWithStatusMeta, }, @@ -41,6 +41,11 @@ pub mod tx_by_addr { )); } +#[allow(clippy::derive_partial_eq_without_eq)] +pub mod entries { + include!(concat!(env!("OUT_DIR"), "/solana.storage.entries.rs")); +} + impl From> for generated::Rewards { fn from(rewards: Vec) -> Self { Self { @@ -1189,6 +1194,18 @@ impl TryFrom for Vec { } } +impl From<(usize, EntrySummary)> for entries::Entry { + fn from((index, entry_summary): (usize, EntrySummary)) -> Self { + entries::Entry { + index: index as u32, + num_hashes: entry_summary.num_hashes, + hash: entry_summary.hash.as_ref().into(), + num_transactions: entry_summary.num_transactions, + starting_transaction_index: entry_summary.starting_transaction_index as u32, + } + } +} + #[cfg(test)] mod test { use {super::*, enum_iterator::all};