diff --git a/cluster-endpoints/src/grpc_subscription.rs b/cluster-endpoints/src/grpc_subscription.rs index 235b3af5..4e012de1 100644 --- a/cluster-endpoints/src/grpc_subscription.rs +++ b/cluster-endpoints/src/grpc_subscription.rs @@ -7,6 +7,7 @@ use futures::StreamExt; use itertools::Itertools; use solana_client::nonblocking::rpc_client::RpcClient; use solana_lite_rpc_core::{ + encoding::BinaryEncoding, structures::{ produced_block::{ProducedBlock, TransactionInfo}, slot_notification::SlotNotification, @@ -192,6 +193,8 @@ fn process_block( cu_requested, prioritization_fees, cu_consumed: compute_units_consumed, + blockhash: message.recent_blockhash().to_string(), + message: BinaryEncoding::Base64.encode(message.serialize()), }) }) .collect(); diff --git a/lite-rpc/src/encoding.rs b/core/src/encoding.rs similarity index 100% rename from lite-rpc/src/encoding.rs rename to core/src/encoding.rs diff --git a/core/src/lib.rs b/core/src/lib.rs index 46073b61..12dc7df2 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,4 +1,5 @@ pub mod commitment_utils; +pub mod encoding; pub mod keypair_loader; pub mod quic_connection; pub mod quic_connection_utils; diff --git a/core/src/stores/data_cache.rs b/core/src/stores/data_cache.rs index 11b5bd5e..c75786cd 100644 --- a/core/src/stores/data_cache.rs +++ b/core/src/stores/data_cache.rs @@ -1,5 +1,6 @@ use std::sync::{atomic::AtomicU64, Arc}; +use dashmap::DashMap; use solana_sdk::hash::Hash; use solana_sdk::slot_history::Slot; use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey}; @@ -76,7 +77,10 @@ impl DataCache { identity_stakes: IdentityStakes::new(Pubkey::new_unique()), slot_cache: SlotCache::new(0), tx_subs: SubscriptionStore::default(), - txs: TxStore::default(), + txs: TxStore { + save_for_additional_slots: 0, + store: Arc::new(DashMap::new()), + }, } } } diff --git a/core/src/stores/tx_store.rs b/core/src/stores/tx_store.rs index 6bc1e92a..fcd69545 100644 --- a/core/src/stores/tx_store.rs +++ b/core/src/stores/tx_store.rs @@ -19,19 +19,31 @@ impl TxProps { } } -#[derive(Default, Clone, Debug)] +#[derive(Clone, Debug)] pub struct TxStore { pub store: Arc>, + pub save_for_additional_slots: u64, } impl TxStore { - pub fn update_status(&self, signature: &str, status: TransactionStatus) -> bool { + pub fn update_status( + &self, + signature: &str, + status: TransactionStatus, + last_valid_blockheight: u64, + ) -> bool { if let Some(mut meta) = self.store.get_mut(signature) { meta.status = Some(status); - true } else { - false + self.store.insert( + signature.to_string(), + TxProps { + status: Some(status), + last_valid_blockheight, + }, + ); } + true } pub fn insert(&self, signature: String, props: TxProps) -> Option { @@ -57,11 +69,7 @@ impl TxStore { pub fn clean(&self, current_finalized_blochash: u64) { let length_before = self.store.len(); self.store.retain(|_k, v| { - let retain = v.last_valid_blockheight >= current_finalized_blochash; - if !retain && v.status.is_none() { - // TODO: TX_TIMED_OUT.inc(); - } - retain + v.last_valid_blockheight >= current_finalized_blochash + self.save_for_additional_slots }); log::info!("Cleaned {} transactions", length_before - self.store.len()); } @@ -73,9 +81,3 @@ impl TxStore { } } } - -pub fn empty_tx_store() -> TxStore { - TxStore { - store: Arc::new(DashMap::new()), - } -} diff --git a/core/src/structures/produced_block.rs b/core/src/structures/produced_block.rs index aa206900..a2ce31da 100644 --- a/core/src/structures/produced_block.rs +++ b/core/src/structures/produced_block.rs @@ -9,6 +9,8 @@ use solana_transaction_status::{ option_serializer::OptionSerializer, RewardType, UiConfirmedBlock, UiTransactionStatusMeta, }; +use crate::encoding::BinaryEncoding; + #[derive(Debug, Clone)] pub struct TransactionInfo { pub signature: String, @@ -16,6 +18,8 @@ pub struct TransactionInfo { pub cu_requested: Option, pub prioritization_fees: Option, pub cu_consumed: Option, + pub blockhash: String, + pub message: String, } #[derive(Default, Debug, Clone)] @@ -119,12 +123,17 @@ impl ProducedBlock { } }; + let blockhash = tx.message.recent_blockhash().to_string(); + let message = BinaryEncoding::Base64.encode(tx.message.serialize()); + Some(TransactionInfo { signature, err, cu_requested, prioritization_fees, cu_consumed, + blockhash, + message, }) }) .collect(); diff --git a/history/src/block_stores/mod.rs b/history/src/block_stores/mod.rs index c03e85de..ab21ea3b 100644 --- a/history/src/block_stores/mod.rs +++ b/history/src/block_stores/mod.rs @@ -1,2 +1,3 @@ pub mod inmemory_block_store; pub mod multiple_strategy_block_store; +pub mod postgres_block_store; diff --git a/history/src/block_stores/postgres_block_store.rs b/history/src/block_stores/postgres_block_store.rs new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/history/src/block_stores/postgres_block_store.rs @@ -0,0 +1 @@ + diff --git a/lite-rpc/src/configs.rs b/lite-rpc/src/configs.rs index 395a2e73..f0c9e95b 100644 --- a/lite-rpc/src/configs.rs +++ b/lite-rpc/src/configs.rs @@ -1,5 +1,5 @@ -use crate::encoding::BinaryEncoding; use serde::{Deserialize, Serialize}; +use solana_lite_rpc_core::encoding::BinaryEncoding; use solana_sdk::commitment_config::CommitmentLevel; #[derive(Debug, Default, Serialize, Deserialize)] diff --git a/lite-rpc/src/errors.rs b/lite-rpc/src/errors.rs index a04d914d..d7414860 100644 --- a/lite-rpc/src/errors.rs +++ b/lite-rpc/src/errors.rs @@ -1,7 +1,6 @@ +use solana_lite_rpc_core::encoding::BinaryCodecError; use solana_sdk::{signature::ParseSignatureError, transport::TransportError}; -use crate::encoding::BinaryCodecError; - #[derive(thiserror::Error, Debug)] pub enum JsonRpcError { #[error("TransportError {0}")] diff --git a/lite-rpc/src/lib.rs b/lite-rpc/src/lib.rs index 43da1f66..54041527 100644 --- a/lite-rpc/src/lib.rs +++ b/lite-rpc/src/lib.rs @@ -4,7 +4,6 @@ use solana_transaction_status::TransactionConfirmationStatus; pub mod bridge; pub mod cli; pub mod configs; -pub mod encoding; pub mod errors; pub mod jsonrpsee_subscrption_handler_sink; pub mod postgres; @@ -40,3 +39,7 @@ pub const DEFAULT_GRPC_ADDR: &str = "http://127.0.0.0:10000"; #[from_env] pub const GRPC_VERSION: &str = "1.16.1"; + +// cache transactions of 1000 slots by default +#[from_env] +pub const NB_SLOTS_TRANSACTIONS_TO_CACHE: u64 = 1000; diff --git a/lite-rpc/src/main.rs b/lite-rpc/src/main.rs index 876882ef..f0f26a23 100644 --- a/lite-rpc/src/main.rs +++ b/lite-rpc/src/main.rs @@ -4,11 +4,12 @@ use std::time::Duration; use anyhow::bail; use clap::Parser; +use dashmap::DashMap; use dotenv::dotenv; use lite_rpc::postgres::Postgres; use lite_rpc::service_spawner::ServiceSpawner; use lite_rpc::{bridge::LiteBridge, cli::Args}; -use lite_rpc::{DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE, GRPC_VERSION}; +use lite_rpc::{DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE, GRPC_VERSION, NB_SLOTS_TRANSACTIONS_TO_CACHE}; use solana_lite_rpc_cluster_endpoints::endpoint_stremers::EndpointStreaming; use solana_lite_rpc_cluster_endpoints::grpc_subscription::create_grpc_subscription; @@ -121,13 +122,17 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc) -> anyhow::R let block_information_store = BlockInformationStore::new(BlockInformation::from_block(&finalized_block)); + let data_cache = DataCache { block_information_store, cluster_info: ClusterInfo::default(), identity_stakes: IdentityStakes::new(validator_identity.pubkey()), slot_cache: SlotCache::new(finalized_block.slot), tx_subs: SubscriptionStore::default(), - txs: TxStore::default(), + txs: TxStore { + store: Arc::new(DashMap::new()), + save_for_additional_slots: NB_SLOTS_TRANSACTIONS_TO_CACHE, + }, }; let lata_cache_service = DataCachingService { diff --git a/lite-rpc/src/postgres.rs b/lite-rpc/src/postgres.rs index 25d786ed..4d4c0e41 100644 --- a/lite-rpc/src/postgres.rs +++ b/lite-rpc/src/postgres.rs @@ -2,23 +2,20 @@ use anyhow::{bail, Context}; use chrono::{DateTime, Utc}; use futures::join; use log::{info, warn}; +use native_tls::{Certificate, Identity, TlsConnector}; use postgres_native_tls::MakeTlsConnector; use prometheus::{core::GenericGauge, opts, register_int_gauge}; -use std::{sync::Arc, time::Duration}; - -use tokio::sync::{RwLock, RwLockReadGuard}; -use tokio_postgres::{config::SslMode, tls::MakeTlsConnect, types::ToSql, Client, NoTls, Socket}; - -use native_tls::{Certificate, Identity, TlsConnector}; - -use crate::encoding::BinaryEncoding; use solana_lite_rpc_core::{ + encoding::BinaryEncoding, structures::notifications::{ BlockNotification, NotificationMsg, NotificationReciever, TransactionNotification, TransactionUpdateNotification, }, AnyhowJoinHandle, }; +use std::{sync::Arc, time::Duration}; +use tokio::sync::{RwLock, RwLockReadGuard}; +use tokio_postgres::{config::SslMode, tls::MakeTlsConnect, types::ToSql, Client, NoTls, Socket}; lazy_static::lazy_static! { pub static ref MESSAGES_IN_POSTGRES_CHANNEL: GenericGauge = register_int_gauge!(opts!("literpc_messages_in_postgres", "Number of messages in postgres")).unwrap(); diff --git a/services/src/data_caching_service.rs b/services/src/data_caching_service.rs index a3f177c4..402aec27 100644 --- a/services/src/data_caching_service.rs +++ b/services/src/data_caching_service.rs @@ -8,6 +8,7 @@ use solana_lite_rpc_core::stores::{ }; use solana_lite_rpc_core::types::{BlockStream, ClusterInfoStream, SlotStream, VoteAccountStream}; use solana_lite_rpc_core::AnyhowJoinHandle; +use solana_sdk::clock::MAX_RECENT_BLOCKHASHES; use solana_sdk::commitment_config::CommitmentLevel; use solana_transaction_status::{TransactionConfirmationStatus, TransactionStatus}; @@ -64,6 +65,15 @@ impl DataCachingService { }; for tx in block.transactions { + let block_info = data_cache + .block_information_store + .get_block_info(&tx.blockhash); + let last_valid_blockheight = if let Some(block_info) = block_info { + block_info.last_valid_blockheight + } else { + block.slot + MAX_RECENT_BLOCKHASHES as u64 + }; + if data_cache.txs.update_status( &tx.signature, TransactionStatus { @@ -73,6 +83,7 @@ impl DataCachingService { err: tx.err.clone(), confirmation_status: Some(confirmation_status.clone()), }, + last_valid_blockheight, ) { // transaction updated match confirmation_status {