Save all transactions in the cache, save transaction messages too, Increase transaction store size

This commit is contained in:
godmodegalactus 2023-10-03 11:36:38 +02:00
parent 75edc36b86
commit 480631de72
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
14 changed files with 66 additions and 30 deletions

View File

@ -7,6 +7,7 @@ use futures::StreamExt;
use itertools::Itertools;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_lite_rpc_core::{
encoding::BinaryEncoding,
structures::{
produced_block::{ProducedBlock, TransactionInfo},
slot_notification::SlotNotification,
@ -192,6 +193,8 @@ fn process_block(
cu_requested,
prioritization_fees,
cu_consumed: compute_units_consumed,
blockhash: message.recent_blockhash().to_string(),
message: BinaryEncoding::Base64.encode(message.serialize()),
})
})
.collect();

View File

@ -1,4 +1,5 @@
pub mod commitment_utils;
pub mod encoding;
pub mod keypair_loader;
pub mod quic_connection;
pub mod quic_connection_utils;

View File

@ -1,5 +1,6 @@
use std::sync::{atomic::AtomicU64, Arc};
use dashmap::DashMap;
use solana_sdk::hash::Hash;
use solana_sdk::slot_history::Slot;
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
@ -76,7 +77,10 @@ impl DataCache {
identity_stakes: IdentityStakes::new(Pubkey::new_unique()),
slot_cache: SlotCache::new(0),
tx_subs: SubscriptionStore::default(),
txs: TxStore::default(),
txs: TxStore {
save_for_additional_slots: 0,
store: Arc::new(DashMap::new()),
},
}
}
}

View File

@ -19,19 +19,31 @@ impl TxProps {
}
}
#[derive(Default, Clone, Debug)]
#[derive(Clone, Debug)]
pub struct TxStore {
pub store: Arc<DashMap<String, TxProps>>,
pub save_for_additional_slots: u64,
}
impl TxStore {
pub fn update_status(&self, signature: &str, status: TransactionStatus) -> bool {
pub fn update_status(
&self,
signature: &str,
status: TransactionStatus,
last_valid_blockheight: u64,
) -> bool {
if let Some(mut meta) = self.store.get_mut(signature) {
meta.status = Some(status);
true
} else {
false
self.store.insert(
signature.to_string(),
TxProps {
status: Some(status),
last_valid_blockheight,
},
);
}
true
}
pub fn insert(&self, signature: String, props: TxProps) -> Option<TxProps> {
@ -57,11 +69,7 @@ impl TxStore {
pub fn clean(&self, current_finalized_blochash: u64) {
let length_before = self.store.len();
self.store.retain(|_k, v| {
let retain = v.last_valid_blockheight >= current_finalized_blochash;
if !retain && v.status.is_none() {
// TODO: TX_TIMED_OUT.inc();
}
retain
v.last_valid_blockheight >= current_finalized_blochash + self.save_for_additional_slots
});
log::info!("Cleaned {} transactions", length_before - self.store.len());
}
@ -73,9 +81,3 @@ impl TxStore {
}
}
}
pub fn empty_tx_store() -> TxStore {
TxStore {
store: Arc::new(DashMap::new()),
}
}

View File

@ -9,6 +9,8 @@ use solana_transaction_status::{
option_serializer::OptionSerializer, RewardType, UiConfirmedBlock, UiTransactionStatusMeta,
};
use crate::encoding::BinaryEncoding;
#[derive(Debug, Clone)]
pub struct TransactionInfo {
pub signature: String,
@ -16,6 +18,8 @@ pub struct TransactionInfo {
pub cu_requested: Option<u32>,
pub prioritization_fees: Option<u64>,
pub cu_consumed: Option<u64>,
pub blockhash: String,
pub message: String,
}
#[derive(Default, Debug, Clone)]
@ -119,12 +123,17 @@ impl ProducedBlock {
}
};
let blockhash = tx.message.recent_blockhash().to_string();
let message = BinaryEncoding::Base64.encode(tx.message.serialize());
Some(TransactionInfo {
signature,
err,
cu_requested,
prioritization_fees,
cu_consumed,
blockhash,
message,
})
})
.collect();

View File

@ -1,2 +1,3 @@
pub mod inmemory_block_store;
pub mod multiple_strategy_block_store;
pub mod postgres_block_store;

View File

@ -0,0 +1 @@

View File

@ -1,5 +1,5 @@
use crate::encoding::BinaryEncoding;
use serde::{Deserialize, Serialize};
use solana_lite_rpc_core::encoding::BinaryEncoding;
use solana_sdk::commitment_config::CommitmentLevel;
#[derive(Debug, Default, Serialize, Deserialize)]

View File

@ -1,7 +1,6 @@
use solana_lite_rpc_core::encoding::BinaryCodecError;
use solana_sdk::{signature::ParseSignatureError, transport::TransportError};
use crate::encoding::BinaryCodecError;
#[derive(thiserror::Error, Debug)]
pub enum JsonRpcError {
#[error("TransportError {0}")]

View File

@ -4,7 +4,6 @@ use solana_transaction_status::TransactionConfirmationStatus;
pub mod bridge;
pub mod cli;
pub mod configs;
pub mod encoding;
pub mod errors;
pub mod jsonrpsee_subscrption_handler_sink;
pub mod postgres;
@ -40,3 +39,7 @@ pub const DEFAULT_GRPC_ADDR: &str = "http://127.0.0.0:10000";
#[from_env]
pub const GRPC_VERSION: &str = "1.16.1";
// cache transactions of 1000 slots by default
#[from_env]
pub const NB_SLOTS_TRANSACTIONS_TO_CACHE: u64 = 1000;

View File

@ -4,11 +4,12 @@ use std::time::Duration;
use anyhow::bail;
use clap::Parser;
use dashmap::DashMap;
use dotenv::dotenv;
use lite_rpc::postgres::Postgres;
use lite_rpc::service_spawner::ServiceSpawner;
use lite_rpc::{bridge::LiteBridge, cli::Args};
use lite_rpc::{DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE, GRPC_VERSION};
use lite_rpc::{DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE, GRPC_VERSION, NB_SLOTS_TRANSACTIONS_TO_CACHE};
use solana_lite_rpc_cluster_endpoints::endpoint_stremers::EndpointStreaming;
use solana_lite_rpc_cluster_endpoints::grpc_subscription::create_grpc_subscription;
@ -121,13 +122,17 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc<RpcClient>) -> anyhow::R
let block_information_store =
BlockInformationStore::new(BlockInformation::from_block(&finalized_block));
let data_cache = DataCache {
block_information_store,
cluster_info: ClusterInfo::default(),
identity_stakes: IdentityStakes::new(validator_identity.pubkey()),
slot_cache: SlotCache::new(finalized_block.slot),
tx_subs: SubscriptionStore::default(),
txs: TxStore::default(),
txs: TxStore {
store: Arc::new(DashMap::new()),
save_for_additional_slots: NB_SLOTS_TRANSACTIONS_TO_CACHE,
},
};
let lata_cache_service = DataCachingService {

View File

@ -2,23 +2,20 @@ use anyhow::{bail, Context};
use chrono::{DateTime, Utc};
use futures::join;
use log::{info, warn};
use native_tls::{Certificate, Identity, TlsConnector};
use postgres_native_tls::MakeTlsConnector;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use std::{sync::Arc, time::Duration};
use tokio::sync::{RwLock, RwLockReadGuard};
use tokio_postgres::{config::SslMode, tls::MakeTlsConnect, types::ToSql, Client, NoTls, Socket};
use native_tls::{Certificate, Identity, TlsConnector};
use crate::encoding::BinaryEncoding;
use solana_lite_rpc_core::{
encoding::BinaryEncoding,
structures::notifications::{
BlockNotification, NotificationMsg, NotificationReciever, TransactionNotification,
TransactionUpdateNotification,
},
AnyhowJoinHandle,
};
use std::{sync::Arc, time::Duration};
use tokio::sync::{RwLock, RwLockReadGuard};
use tokio_postgres::{config::SslMode, tls::MakeTlsConnect, types::ToSql, Client, NoTls, Socket};
lazy_static::lazy_static! {
pub static ref MESSAGES_IN_POSTGRES_CHANNEL: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_messages_in_postgres", "Number of messages in postgres")).unwrap();

View File

@ -8,6 +8,7 @@ use solana_lite_rpc_core::stores::{
};
use solana_lite_rpc_core::types::{BlockStream, ClusterInfoStream, SlotStream, VoteAccountStream};
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_sdk::clock::MAX_RECENT_BLOCKHASHES;
use solana_sdk::commitment_config::CommitmentLevel;
use solana_transaction_status::{TransactionConfirmationStatus, TransactionStatus};
@ -64,6 +65,15 @@ impl DataCachingService {
};
for tx in block.transactions {
let block_info = data_cache
.block_information_store
.get_block_info(&tx.blockhash);
let last_valid_blockheight = if let Some(block_info) = block_info {
block_info.last_valid_blockheight
} else {
block.slot + MAX_RECENT_BLOCKHASHES as u64
};
if data_cache.txs.update_status(
&tx.signature,
TransactionStatus {
@ -73,6 +83,7 @@ impl DataCachingService {
err: tx.err.clone(),
confirmation_status: Some(confirmation_status.clone()),
},
last_valid_blockheight,
) {
// transaction updated
match confirmation_status {