using blockheight to expire transactions instead of time

This commit is contained in:
Godmode Galactus 2023-07-17 17:27:57 +02:00
parent f27a3a6c88
commit 9d31f6c756
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
10 changed files with 119 additions and 109 deletions

View File

@ -1,4 +1,5 @@
use anyhow::Context;
use lazy_static::lazy_static;
use rand::{distributions::Alphanumeric, prelude::Distribution, SeedableRng};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{
@ -12,9 +13,8 @@ use solana_sdk::{
system_instruction,
transaction::Transaction,
};
use std::{str::FromStr, time::Duration};
use std::path::PathBuf;
use lazy_static::lazy_static;
use std::{str::FromStr, time::Duration};
use tokio::time::Instant;
const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr";
@ -22,20 +22,18 @@ const WAIT_LIMIT_IN_SECONDS: u64 = 60;
lazy_static! {
static ref USER_KEYPAIR: PathBuf = {
dirs::home_dir().unwrap()
dirs::home_dir()
.unwrap()
.join(".config")
.join("solana")
.join("id.json")
};
}
pub struct BenchHelper;
impl BenchHelper {
pub async fn get_payer() -> anyhow::Result<Keypair> {
let payer = tokio::fs::read_to_string(USER_KEYPAIR.as_path())
.await
.context("Error reading payer file")?;

View File

@ -1,10 +1,9 @@
use std::sync::Arc;
use log::{info, warn};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::config::RpcBlockConfig;
use solana_sdk::{
borsh::try_from_slice_unchecked,
clock::MAX_RECENT_BLOCKHASHES,
commitment_config::CommitmentConfig,
compute_budget::{self, ComputeBudgetInstruction},
slot_history::Slot,
@ -14,7 +13,7 @@ use solana_transaction_status::{
option_serializer::OptionSerializer, RewardType, TransactionDetails, UiTransactionEncoding,
UiTransactionStatusMeta,
};
use tokio::time::Instant;
use std::sync::Arc;
use crate::block_store::{BlockInformation, BlockStore};
@ -97,7 +96,8 @@ impl BlockProcessor {
BlockInformation {
slot,
block_height,
instant: Instant::now(),
last_valid_blockheight: block_height + MAX_RECENT_BLOCKHASHES as u64,
cleanup_slot: block_height + 1000,
processed_local_time: None,
},
commitment_config,

View File

@ -1,9 +1,6 @@
use std::{sync::Arc, time::Duration};
use anyhow::Context;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use log::info;
use serde_json::json;
use solana_client::{
@ -12,15 +9,19 @@ use solana_client::{
rpc_request::RpcRequest,
rpc_response::{Response, RpcBlockhash},
};
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::{
clock::MAX_RECENT_BLOCKHASHES, commitment_config::CommitmentConfig, slot_history::Slot,
};
use solana_transaction_status::TransactionDetails;
use std::sync::Arc;
use tokio::{sync::RwLock, time::Instant};
#[derive(Clone, Copy, Debug)]
pub struct BlockInformation {
pub slot: u64,
pub block_height: u64,
pub instant: Instant,
pub last_valid_blockheight: u64,
pub cleanup_slot: Slot,
pub processed_local_time: Option<DateTime<Utc>>,
}
@ -72,9 +73,13 @@ impl BlockStore {
let processed_blockhash = response.value.blockhash;
let processed_block = BlockInformation {
slot: response.context.slot,
block_height: response.value.last_valid_block_height,
last_valid_blockheight: response.value.last_valid_block_height,
block_height: response
.value
.last_valid_block_height
.saturating_sub(MAX_RECENT_BLOCKHASHES as u64),
processed_local_time: Some(Utc::now()),
instant: Instant::now(),
cleanup_slot: response.value.last_valid_block_height + 700, // cleanup after 1000 slots
};
Ok((processed_blockhash, processed_block))
@ -111,7 +116,8 @@ impl BlockStore {
BlockInformation {
slot,
block_height,
instant: Instant::now(),
last_valid_blockheight: block_height + MAX_RECENT_BLOCKHASHES as u64,
cleanup_slot: block_height + 1000,
processed_local_time: None,
},
))
@ -195,24 +201,13 @@ impl BlockStore {
}
}
pub async fn clean(&self, cleanup_duration: Duration) {
let latest_processed = self
.get_latest_blockhash(CommitmentConfig::processed())
pub async fn clean(&self) {
let finalized_block_information = self
.get_latest_block_info(CommitmentConfig::finalized())
.await;
let latest_confirmed = self
.get_latest_blockhash(CommitmentConfig::confirmed())
.await;
let latest_finalized = self
.get_latest_blockhash(CommitmentConfig::finalized())
.await;
let before_length = self.blocks.len();
self.blocks.retain(|k, v| {
v.instant.elapsed() < cleanup_duration
|| k.eq(&latest_processed)
|| k.eq(&latest_confirmed)
|| k.eq(&latest_finalized)
});
self.blocks
.retain(|_, v| v.last_valid_blockheight >= finalized_block_information.block_height);
info!(
"Cleaned {} block info",

View File

@ -2,20 +2,18 @@ use std::sync::Arc;
use dashmap::DashMap;
use solana_transaction_status::TransactionStatus;
use tokio::time::Instant;
/// Transaction Properties
pub struct TxProps {
pub status: Option<TransactionStatus>,
/// Time at which transaction was forwarded
pub sent_at: Instant,
pub last_valid_blockheight: u64,
}
impl Default for TxProps {
fn default() -> Self {
impl TxProps {
pub fn new(last_valid_blockheight: u64) -> Self {
Self {
status: Default::default(),
sent_at: Instant::now(),
last_valid_blockheight,
}
}
}

View File

@ -150,7 +150,6 @@ impl LiteBridge {
mut self,
http_addr: T,
ws_addr: T,
clean_interval: Duration,
enable_postgres: bool,
prometheus_addr: T,
) -> anyhow::Result<()> {
@ -168,13 +167,10 @@ impl LiteBridge {
let prometheus_sync = PrometheusSync::sync(prometheus_addr);
// transaction services
let (transaction_service, jh_transaction_services) =
self.transaction_service_builder.clone().start(
postgres_send,
self.block_store.clone(),
self.max_retries,
clean_interval,
);
let (transaction_service, jh_transaction_services) = self
.transaction_service_builder
.clone()
.start(postgres_send, self.block_store.clone(), self.max_retries);
self.transaction_service = Some(transaction_service);
@ -417,10 +413,19 @@ impl LiteRpcServer for LiteBridge {
return Err(jsonrpsee::core::Error::Custom(err.to_string()));
}
};
self.tx_store
.insert(airdrop_sig.clone(), Default::default());
if let Ok((_, block_height)) = self
.rpc_client
.get_latest_blockhash_with_commitment(CommitmentConfig::finalized())
.await
{
self.tx_store.insert(
airdrop_sig.clone(),
solana_lite_rpc_core::tx_store::TxProps {
status: None,
last_valid_blockheight: block_height,
},
);
}
Ok(airdrop_sig)
}

View File

@ -1,6 +1,5 @@
use crate::{
DEFAULT_CLEAN_INTERVAL_MS, DEFAULT_FANOUT_SIZE, DEFAULT_RETRY_TIMEOUT, DEFAULT_RPC_ADDR,
DEFAULT_WS_ADDR, MAX_RETRIES,
DEFAULT_FANOUT_SIZE, DEFAULT_RETRY_TIMEOUT, DEFAULT_RPC_ADDR, DEFAULT_WS_ADDR, MAX_RETRIES,
};
use clap::Parser;
@ -18,9 +17,6 @@ pub struct Args {
/// tpu fanout
#[arg(short = 'f', long, default_value_t = DEFAULT_FANOUT_SIZE) ]
pub fanout_size: u64,
/// interval between clean
#[arg(short = 'c', long, default_value_t = DEFAULT_CLEAN_INTERVAL_MS)]
pub clean_interval_ms: u64,
/// enable logging to postgres
#[arg(short = 'p', long)]
pub enable_postgres: bool,

View File

@ -41,7 +41,6 @@ pub async fn start_lite_rpc(args: Args) -> anyhow::Result<()> {
ws_addr,
lite_rpc_ws_addr,
lite_rpc_http_addr,
clean_interval_ms,
fanout_size,
enable_postgres,
prometheus_addr,
@ -53,7 +52,6 @@ pub async fn start_lite_rpc(args: Args) -> anyhow::Result<()> {
let identity = get_identity_keypair(&identity_keypair).await;
let retry_after = Duration::from_secs(transaction_retry_after_secs);
let clean_interval_ms = Duration::from_millis(clean_interval_ms);
LiteBridge::new(
rpc_addr,
@ -68,7 +66,6 @@ pub async fn start_lite_rpc(args: Args) -> anyhow::Result<()> {
.start_services(
lite_rpc_http_addr,
lite_rpc_ws_addr,
clean_interval_ms,
enable_postgres,
prometheus_addr,
)

View File

@ -4,6 +4,7 @@ use crate::{block_listenser::BlockListener, tx_sender::TxSender};
use log::info;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use solana_lite_rpc_core::block_store::BlockStore;
use solana_sdk::commitment_config::CommitmentConfig;
use tokio::task::JoinHandle;
lazy_static::lazy_static! {
@ -31,8 +32,12 @@ impl Cleaner {
}
}
pub fn clean_tx_sender(&self, ttl_duration: Duration) {
self.tx_sender.cleanup(ttl_duration);
pub async fn clean_tx_sender(&self) {
let (_, blockhash_finalized) = self
.block_store
.get_latest_block(CommitmentConfig::finalized())
.await;
self.tx_sender.cleanup(blockhash_finalized.block_height);
}
/// Clean Signature Subscribers from Block Listeners
@ -40,16 +45,12 @@ impl Cleaner {
self.block_listenser.clean(ttl_duration);
}
pub async fn clean_block_store(&self, ttl_duration: Duration) {
self.block_store.clean(ttl_duration).await;
pub async fn clean_block_store(&self) {
self.block_store.clean().await;
BLOCKS_IN_BLOCKSTORE.set(self.block_store.number_of_blocks_in_store() as i64);
}
pub fn start(
self,
ttl_transactions: Duration,
ttl_duration: Duration,
) -> JoinHandle<anyhow::Result<()>> {
pub fn start(self, ttl_duration: Duration) -> JoinHandle<anyhow::Result<()>> {
let mut ttl = tokio::time::interval(ttl_duration);
tokio::spawn(async move {
@ -58,9 +59,9 @@ impl Cleaner {
loop {
ttl.tick().await;
self.clean_tx_sender(ttl_transactions);
self.clean_tx_sender().await;
self.clean_block_listeners(ttl_duration);
self.clean_block_store(ttl_duration).await;
self.clean_block_store().await;
}
})
}

View File

@ -8,7 +8,7 @@ use crate::{
cleaner::Cleaner,
tpu_utils::tpu_service::TpuService,
transaction_replayer::{TransactionReplay, TransactionReplayer, MESSAGES_IN_REPLAY_QUEUE},
tx_sender::{TxSender, WireTransaction},
tx_sender::{TransactionInfo, TxSender},
};
use anyhow::bail;
use solana_lite_rpc_core::{
@ -54,7 +54,6 @@ impl TransactionServiceBuilder {
notifier: Option<NotificationSender>,
block_store: BlockStore,
max_retries: usize,
clean_interval: Duration,
) -> (TransactionService, AnyhowJoinHandle) {
let (transaction_channel, tx_recv) = mpsc::channel(self.max_nb_txs_in_queue);
let (replay_channel, replay_reciever) = tokio::sync::mpsc::unbounded_channel();
@ -92,7 +91,7 @@ impl TransactionServiceBuilder {
// transactions get invalid in around 1 mins, because the block hash expires in 150 blocks so 150 * 400ms = 60s
// Setting it to two to give some margin of error / as not all the blocks are filled.
let cleaner = Cleaner::new(tx_sender.clone(), block_listner.clone(), block_store_t)
.start(Duration::from_secs(120), clean_interval);
.start(Duration::from_secs(120));
tokio::select! {
res = tpu_service_fx => {
@ -135,7 +134,7 @@ impl TransactionServiceBuilder {
#[derive(Clone)]
pub struct TransactionService {
pub transaction_channel: Sender<(String, WireTransaction, u64)>,
pub transaction_channel: Sender<TransactionInfo>,
pub replay_channel: UnboundedSender<TransactionReplay>,
pub block_store: BlockStore,
pub max_retries: usize,
@ -156,7 +155,7 @@ impl TransactionService {
};
let signature = tx.signatures[0];
let Some(BlockInformation { slot, .. }) = self
let Some(BlockInformation { slot, last_valid_blockheight, .. }) = self
.block_store
.get_block_info(&tx.get_recent_blockhash().to_string())
else {
@ -167,7 +166,12 @@ impl TransactionService {
let max_replay = max_retries.map_or(self.max_retries, |x| x as usize);
if let Err(e) = self
.transaction_channel
.send((signature.to_string(), raw_tx, slot))
.send(TransactionInfo {
signature: signature.to_string(),
last_valid_block_height: last_valid_blockheight,
slot,
transaction: raw_tx,
})
.await
{
bail!(

View File

@ -8,6 +8,7 @@ use prometheus::{
core::GenericGauge, histogram_opts, opts, register_histogram, register_int_counter,
register_int_gauge, Histogram, IntCounter,
};
use solana_sdk::slot_history::Slot;
use tokio::{sync::mpsc::Receiver, task::JoinHandle};
use crate::tpu_utils::tpu_service::TpuService;
@ -33,6 +34,14 @@ lazy_static::lazy_static! {
}
pub type WireTransaction = Vec<u8>;
#[derive(Clone, Debug)]
pub struct TransactionInfo {
pub signature: String,
pub slot: Slot,
pub transaction: WireTransaction,
pub last_valid_block_height: u64,
}
// making 250 as sleep time will effectively make lite rpc send
// (1000/250) * 5 * 512 = 10240 tps
const INTERVAL_PER_BATCH_IN_MS: u64 = 50;
@ -58,13 +67,10 @@ impl TxSender {
/// retry enqued_tx(s)
async fn forward_txs(
&self,
sigs_and_slots: Vec<(String, u64)>,
txs: Vec<WireTransaction>,
transaction_infos: Vec<TransactionInfo>,
notifier: Option<NotificationSender>,
) {
assert_eq!(sigs_and_slots.len(), txs.len());
if sigs_and_slots.is_empty() {
if transaction_infos.is_empty() {
return;
}
@ -74,18 +80,30 @@ impl TxSender {
let tpu_client = self.tpu_service.clone();
let txs_sent = self.txs_sent_store.clone();
for (sig, _) in &sigs_and_slots {
trace!("sending transaction {}", sig);
txs_sent.insert(sig.to_owned(), TxProps::default());
for transaction_info in &transaction_infos {
trace!("sending transaction {}", transaction_info.signature);
txs_sent.insert(
transaction_info.signature.clone(),
TxProps {
status: None,
last_valid_blockheight: transaction_info.last_valid_block_height,
},
);
}
let forwarded_slot = tpu_client.get_estimated_slot();
let forwarded_local_time = Utc::now();
let mut quic_responses = vec![];
for (tx, (signature, _)) in txs.iter().zip(sigs_and_slots.clone()) {
txs_sent.insert(signature.to_owned(), TxProps::default());
let quic_response = match tpu_client.send_transaction(signature.clone(), tx.clone()) {
for transaction_info in transaction_infos.iter() {
txs_sent.insert(
transaction_info.signature.clone(),
TxProps::new(transaction_info.last_valid_block_height),
);
let quic_response = match tpu_client.send_transaction(
transaction_info.signature.clone(),
transaction_info.transaction.clone(),
) {
Ok(_) => {
TXS_SENT.inc_by(1);
1
@ -99,12 +117,12 @@ impl TxSender {
quic_responses.push(quic_response);
}
if let Some(notifier) = &notifier {
let notification_msgs = sigs_and_slots
let notification_msgs = transaction_infos
.iter()
.enumerate()
.map(|(index, (sig, recent_slot))| TransactionNotification {
signature: sig.clone(),
recent_slot: *recent_slot,
.map(|(index, transaction_info)| TransactionNotification {
signature: transaction_info.signature.clone(),
recent_slot: transaction_info.slot,
forwarded_slot,
forwarded_local_time,
processed_slot: None,
@ -120,39 +138,40 @@ impl TxSender {
trace!(
"It took {} ms to send a batch of {} transaction(s)",
start.elapsed().as_millis(),
sigs_and_slots.len()
transaction_infos.len()
);
}
/// retry and confirm transactions every 2ms (avg time to confirm tx)
pub fn execute(
self,
mut recv: Receiver<(String, WireTransaction, u64)>,
mut recv: Receiver<TransactionInfo>,
notifier: Option<NotificationSender>,
) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
loop {
let mut sigs_and_slots = Vec::with_capacity(MAX_BATCH_SIZE_IN_PER_INTERVAL);
let mut txs = Vec::with_capacity(MAX_BATCH_SIZE_IN_PER_INTERVAL);
let mut transaction_infos = Vec::with_capacity(MAX_BATCH_SIZE_IN_PER_INTERVAL);
let mut timeout_interval = INTERVAL_PER_BATCH_IN_MS;
// In solana there in sig verify stage rate is limited to 2000 txs in 50ms
// taking this as reference
while txs.len() <= MAX_BATCH_SIZE_IN_PER_INTERVAL {
while transaction_infos.len() <= MAX_BATCH_SIZE_IN_PER_INTERVAL {
let instance = tokio::time::Instant::now();
match tokio::time::timeout(Duration::from_millis(timeout_interval), recv.recv())
.await
{
Ok(value) => match value {
Some((sig, tx, slot)) => {
Some(transaction_info) => {
TXS_IN_CHANNEL.dec();
// duplicate transaction
if self.txs_sent_store.contains_key(&sig) {
if self
.txs_sent_store
.contains_key(&transaction_info.signature)
{
continue;
}
sigs_and_slots.push((sig, slot));
txs.push(tx);
transaction_infos.push(transaction_info);
// update the timeout inteval
timeout_interval = timeout_interval
.saturating_sub(instance.elapsed().as_millis() as u64)
@ -169,24 +188,21 @@ impl TxSender {
}
}
assert_eq!(sigs_and_slots.len(), txs.len());
if sigs_and_slots.is_empty() {
if transaction_infos.is_empty() {
continue;
}
TX_BATCH_SIZES.set(txs.len() as i64);
TX_BATCH_SIZES.set(transaction_infos.len() as i64);
self.forward_txs(sigs_and_slots, txs, notifier.clone())
.await;
self.forward_txs(transaction_infos, notifier.clone()).await;
}
})
}
pub fn cleanup(&self, ttl_duration: Duration) {
pub fn cleanup(&self, current_finalized_blochash: u64) {
let length_before = self.txs_sent_store.len();
self.txs_sent_store.retain(|_k, v| {
let retain = v.sent_at.elapsed() < ttl_duration;
let retain = v.last_valid_blockheight >= current_finalized_blochash;
if !retain && v.status.is_none() {
TX_TIMED_OUT.inc();
}