Merge pull request #148 from blockworks-foundation/mango_simulation_test

Mango simulation test
This commit is contained in:
galactus 2023-06-16 14:09:22 +02:00 committed by GitHub
commit fc0881f5a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 705 additions and 362 deletions

7
Cargo.lock generated
View File

@ -422,7 +422,7 @@ dependencies = [
[[package]]
name = "bench"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"anyhow",
"clap 4.2.4",
@ -2299,11 +2299,13 @@ version = "0.2.0"
dependencies = [
"anyhow",
"async-channel",
"async-trait",
"base64 0.21.0",
"bench",
"bincode",
"bs58",
"bytes",
"chrono",
"clap 4.2.4",
"const_env",
"dashmap",
@ -3962,6 +3964,7 @@ name = "solana-lite-rpc-core"
version = "0.2.0"
dependencies = [
"anyhow",
"async-trait",
"base64 0.21.0",
"bincode",
"bs58",
@ -3969,7 +3972,6 @@ dependencies = [
"chrono",
"dashmap",
"futures",
"jsonrpsee",
"log",
"quinn",
"rustls 0.20.8",
@ -4001,7 +4003,6 @@ dependencies = [
"chrono",
"dashmap",
"futures",
"jsonrpsee",
"lazy_static",
"log",
"prometheus",

View File

@ -25,7 +25,6 @@ solana-pubsub-client = "1.15.2"
solana-streamer = "1.14.2"
serde = { version = "1.0.160", features = ["derive"] }
serde_json = "1.0.96"
tokio = { version = "1.28.2", features = ["full", "fs"]}
bincode = "1.3.3"
bs58 = "0.4.0"
base64 = "0.21.0"
@ -49,4 +48,5 @@ async-channel = "1.8.0"
quinn = "0.9.3"
rustls = { version = "=0.20.8", default-features = false }
solana-lite-rpc-services = {path = "services", version="0.2.0"}
solana-lite-rpc-core = {path = "core", version="0.2.0"}
solana-lite-rpc-core = {path = "core", version="0.2.0"}
async-trait = "0.1.68"

View File

@ -1,6 +1,6 @@
[package]
name = "bench"
version = "0.1.0"
version = "0.2.0"
edition = "2021"
[dependencies]
@ -11,11 +11,12 @@ anyhow = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
clap = { workspace = true }
tokio = { workspace = true }
tokio = { version = "1.28.2", features = ["full", "fs"]}
tracing-subscriber = { workspace = true }
csv = "1.2.1"
dirs = "5.0.0"
rand = "0.8.5"
rand_chacha = "0.3.1"
futures = { workspace = true }
dashmap = {workspace = true }
dashmap = { workspace = true }

View File

@ -19,7 +19,7 @@ solana-pubsub-client = { workspace = true }
solana-streamer = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tokio = "1.*"
bincode = { workspace = true }
bs58 = { workspace = true }
base64 = { workspace = true }
@ -29,7 +29,7 @@ bytes = { workspace = true }
anyhow = { workspace = true }
log = { workspace = true }
dashmap = { workspace = true }
jsonrpsee = { workspace = true }
quinn = { workspace = true }
chrono = { workspace = true }
rustls = { workspace = true }
rustls = { workspace = true }
async-trait = { workspace = true }

View File

@ -24,22 +24,21 @@ pub struct BlockProcessor {
block_store: Option<BlockStore>,
}
#[derive(Default)]
pub struct BlockProcessorResult {
pub invalid_block: bool,
pub transaction_infos: Vec<TransactionInfo>,
pub leader_id: Option<String>,
pub blockhash: String,
pub parent_slot: Slot,
pub block_time: u64,
}
impl BlockProcessorResult {
pub fn invalid() -> Self {
Self {
invalid_block: true,
transaction_infos: vec![],
leader_id: None,
blockhash: String::new(),
parent_slot: 0,
..Default::default()
}
}
}
@ -48,9 +47,9 @@ pub struct TransactionInfo {
pub signature: String,
pub err: Option<TransactionError>,
pub status: Result<(), TransactionError>,
pub cu_requested: Option<i64>,
pub prioritization_fees: Option<i64>,
pub cu_consumed: Option<i64>,
pub cu_requested: Option<u32>,
pub prioritization_fees: Option<u64>,
pub cu_consumed: Option<u64>,
}
impl BlockProcessor {
@ -123,7 +122,7 @@ impl BlockProcessor {
};
let signature = tx.signatures[0].to_string();
let cu_consumed = match compute_units_consumed {
OptionSerializer::Some(cu_consumed) => Some(cu_consumed as i64),
OptionSerializer::Some(cu_consumed) => Some(cu_consumed),
_ => None,
};
@ -149,7 +148,7 @@ impl BlockProcessor {
if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(limit as i64);
return Some(limit);
}
}
None
@ -162,14 +161,14 @@ impl BlockProcessor {
if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(price as i64);
return Some(price);
}
}
None
});
if let Some((units, additional_fee)) = legacy_compute_budget {
cu_requested = Some(units as i64);
cu_requested = Some(units);
if additional_fee > 0 {
prioritization_fees = Some(((units * 1000) / additional_fee).into())
}
@ -194,12 +193,15 @@ impl BlockProcessor {
None
};
let block_time = block.block_time.unwrap_or(0) as u64;
Ok(BlockProcessorResult {
invalid_block: false,
transaction_infos,
leader_id,
blockhash,
parent_slot,
block_time,
})
}

View File

@ -7,5 +7,7 @@ pub mod rotating_queue;
pub mod solana_utils;
pub mod structures;
pub mod subscription_handler;
pub mod subscription_sink;
pub mod tx_store;
pub type AnyhowJoinHandle = tokio::task::JoinHandle<anyhow::Result<()>>;

View File

@ -1,54 +1,46 @@
use chrono::{DateTime, Utc};
use solana_sdk::{commitment_config::CommitmentLevel, transaction::TransactionError};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
pub trait SchemaSize {
const DEFAULT_SIZE: usize = 0;
const MAX_SIZE: usize = 0;
}
#[derive(Debug)]
pub struct TransactionNotification {
pub signature: String, // 88 bytes
pub recent_slot: i64, // 8 bytes
pub forwarded_slot: i64, // 8 bytes
pub recent_slot: u64, // 8 bytes
pub forwarded_slot: u64, // 8 bytes
pub forwarded_local_time: DateTime<Utc>, // 8 bytes
pub processed_slot: Option<i64>,
pub cu_consumed: Option<i64>,
pub cu_requested: Option<i64>,
pub processed_slot: Option<u64>,
pub cu_consumed: Option<u64>,
pub cu_requested: Option<u64>,
pub quic_response: i16, // 8 bytes
}
impl SchemaSize for TransactionNotification {
const DEFAULT_SIZE: usize = 88 + (4 * 8);
const MAX_SIZE: usize = Self::DEFAULT_SIZE + (3 * 8);
}
#[derive(Debug)]
pub struct TransactionUpdateNotification {
pub signature: String, // 88 bytes
pub processed_slot: i64, // 8 bytes
pub cu_consumed: Option<i64>,
pub cu_requested: Option<i64>,
pub cu_price: Option<i64>,
}
impl SchemaSize for TransactionUpdateNotification {
const DEFAULT_SIZE: usize = 88 + 8;
const MAX_SIZE: usize = Self::DEFAULT_SIZE + (3 * 8);
pub signature: String, // 88 bytes
pub slot: u64,
pub cu_consumed: Option<u64>,
pub cu_requested: Option<u32>,
pub cu_price: Option<u64>,
pub transaction_status: Result<(), TransactionError>,
pub blockhash: String,
pub leader: String,
pub commitment: CommitmentLevel,
}
#[derive(Debug)]
pub struct BlockNotification {
pub slot: i64, // 8 bytes
pub leader_id: i64, // 8 bytes
pub parent_slot: i64, // 8 bytes
pub slot: u64, // 8 bytes
pub block_leader: String, // 8 bytes
pub parent_slot: u64, // 8 bytes
pub cluster_time: DateTime<Utc>, // 8 bytes
pub local_time: Option<DateTime<Utc>>,
}
impl SchemaSize for BlockNotification {
const DEFAULT_SIZE: usize = 4 * 8;
const MAX_SIZE: usize = Self::DEFAULT_SIZE + 8;
pub blockhash: String,
pub total_transactions: u64,
pub block_time: u64,
pub total_cu_consumed: u64,
pub commitment: CommitmentLevel,
pub transaction_found: u64,
pub cu_consumed_by_txs: u64,
}
#[derive(Debug)]

View File

@ -1,20 +1,20 @@
use std::{sync::Arc, time::Duration};
use dashmap::DashMap;
use jsonrpsee::{SubscriptionMessage, SubscriptionSink};
use solana_rpc_client_api::response::{Response as RpcResponse, RpcResponseContext};
use solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel},
slot_history::Slot,
};
use tokio::time::Instant;
use crate::block_processor::TransactionInfo;
use crate::{block_processor::TransactionInfo, subscription_sink::SubscriptionSink};
pub type SubscptionHanderSink = Arc<dyn SubscriptionSink>;
#[derive(Clone, Default)]
pub struct SubscriptionHandler {
pub signature_subscribers:
Arc<DashMap<(String, CommitmentConfig), (SubscriptionSink, Instant)>>,
Arc<DashMap<(String, CommitmentConfig), (SubscptionHanderSink, Instant)>>,
}
impl SubscriptionHandler {
@ -38,7 +38,7 @@ impl SubscriptionHandler {
&self,
signature: String,
commitment_config: CommitmentConfig,
sink: SubscriptionSink,
sink: SubscptionHanderSink,
) {
let commitment_config = Self::get_supported_commitment_config(commitment_config);
self.signature_subscribers
@ -62,17 +62,7 @@ impl SubscriptionHandler {
.remove(&(transaction_info.signature.clone(), commitment_config))
{
// none if transaction succeeded
let _res = sink
.send(
SubscriptionMessage::from_json(&RpcResponse {
context: RpcResponseContext {
slot,
api_version: None,
},
value: serde_json::json!({ "err": transaction_info.err }),
})
.unwrap(),
)
sink.send(slot, serde_json::json!({ "err": transaction_info.err }))
.await;
}
}

View File

@ -0,0 +1,8 @@
use async_trait::async_trait;
use solana_sdk::slot_history::Slot;
#[async_trait]
pub trait SubscriptionSink: Send + Sync {
async fn send(&self, slot: Slot, message: serde_json::Value);
fn is_closed(&self) -> bool;
}

27
core/src/tx_store.rs Normal file
View File

@ -0,0 +1,27 @@
use std::sync::Arc;
use dashmap::DashMap;
use solana_transaction_status::TransactionStatus;
use tokio::time::Instant;
/// Transaction Properties
pub struct TxProps {
pub status: Option<TransactionStatus>,
/// Time at which transaction was forwarded
pub sent_at: Instant,
}
impl Default for TxProps {
fn default() -> Self {
Self {
status: Default::default(),
sent_at: Instant::now(),
}
}
}
pub type TxStore = Arc<DashMap<String, TxProps>>;
pub fn empty_tx_store() -> TxStore {
Arc::new(DashMap::new())
}

View File

@ -17,7 +17,6 @@ solana-transaction-status = { workspace = true }
solana-version = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
bincode = { workspace = true }
bs58 = { workspace = true }
base64 = { workspace = true }
@ -40,7 +39,10 @@ async-channel = { workspace = true }
quinn = { workspace = true }
solana-lite-rpc-core = { workspace = true }
solana-lite-rpc-services = { workspace = true }
async-trait = { workspace = true }
tokio = { version = "1.28.2", features = ["full", "fs"]}
tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] }
chrono = { workspace = true }
[dev-dependencies]
bench = { path = "../bench" }

View File

@ -1,6 +1,6 @@
use crate::{
configs::{IsBlockHashValidConfig, SendTransactionConfig},
encoding::BinaryEncoding,
jsonrpsee_subscrption_handler_sink::JsonRpseeSubscriptionHandlerSink,
postgres::Postgres,
rpc::LiteRpcServer,
DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE,
@ -8,44 +8,38 @@ use crate::{
use solana_lite_rpc_services::{
block_listenser::BlockListener,
cleaner::Cleaner,
metrics_capture::MetricsCapture,
prometheus_sync::PrometheusSync,
tpu_utils::tpu_service::TpuService,
transaction_replayer::{TransactionReplay, TransactionReplayer, MESSAGES_IN_REPLAY_QUEUE},
transaction_replayer::TransactionReplayer,
transaction_service::{TransactionService, TransactionServiceBuilder},
tx_sender::WireTransaction,
tx_sender::{TxProps, TxSender, TXS_IN_CHANNEL},
tx_sender::{TxSender, TXS_IN_CHANNEL},
};
use solana_lite_rpc_core::{
block_store::{BlockInformation, BlockStore},
AnyhowJoinHandle,
};
use std::{ops::Deref, str::FromStr, sync::Arc, time::Duration};
use anyhow::bail;
use dashmap::DashMap;
use log::{error, info};
use jsonrpsee::{core::SubscriptionResult, server::ServerBuilder, PendingSubscriptionSink};
use log::info;
use prometheus::{opts, register_int_counter, IntCounter};
use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction};
use solana_lite_rpc_core::{
block_store::{BlockInformation, BlockStore},
tx_store::{empty_tx_store, TxStore},
AnyhowJoinHandle,
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::{
config::{RpcContextConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig},
response::{Response as RpcResponse, RpcBlockhash, RpcResponseContext, RpcVersionInfo},
};
use solana_sdk::{
commitment_config::CommitmentConfig, hash::Hash, pubkey::Pubkey, signature::Keypair,
slot_history::Slot, transaction::VersionedTransaction,
slot_history::Slot,
};
use solana_transaction_status::TransactionStatus;
use std::{ops::Deref, str::FromStr, sync::Arc, time::Duration};
use tokio::{
net::ToSocketAddrs,
sync::mpsc::{self, Sender, UnboundedSender},
time::Instant,
sync::mpsc::{self, Sender},
};
lazy_static::lazy_static! {
@ -68,16 +62,14 @@ lazy_static::lazy_static! {
/// A bridge between clients and tpu
pub struct LiteBridge {
pub rpc_client: Arc<RpcClient>,
pub tpu_service: Arc<TpuService>,
pub tx_store: TxStore,
// None if LiteBridge is not executed
pub tx_send_channel: Option<Sender<(String, WireTransaction, u64)>>,
pub tx_sender: TxSender,
pub block_listner: BlockListener,
pub block_store: BlockStore,
pub tx_replayer: TransactionReplayer,
pub tx_replay_sender: Option<UnboundedSender<TransactionReplay>>,
pub max_retries: usize,
pub transaction_service_builder: TransactionServiceBuilder,
pub transaction_service: Option<TransactionService>,
pub block_listner: BlockListener,
}
impl LiteBridge {
@ -92,7 +84,7 @@ impl LiteBridge {
let rpc_client = Arc::new(RpcClient::new(rpc_url.clone()));
let current_slot = rpc_client.get_slot().await?;
let tx_store: Arc<DashMap<String, TxProps>> = Default::default();
let tx_store = empty_tx_store();
let tpu_service = TpuService::new(
current_slot,
@ -104,26 +96,33 @@ impl LiteBridge {
)
.await?;
let tpu_service = Arc::new(tpu_service);
let tx_sender = TxSender::new(tx_store, tpu_service.clone());
let tx_sender = TxSender::new(tx_store.clone(), tpu_service.clone());
let block_store = BlockStore::new(&rpc_client).await?;
let block_listner =
BlockListener::new(rpc_client.clone(), tx_sender.clone(), block_store.clone());
BlockListener::new(rpc_client.clone(), tx_store.clone(), block_store.clone());
let tx_replayer =
TransactionReplayer::new(tpu_service.clone(), tx_store.clone(), retry_after);
let transaction_manager = TransactionServiceBuilder::new(
tx_sender,
tx_replayer,
block_listner.clone(),
tpu_service,
DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE,
);
let tx_replayer = TransactionReplayer::new(tx_sender.clone(), retry_after);
Ok(Self {
rpc_client,
tpu_service,
tx_store,
tx_send_channel: None,
tx_sender,
block_listner,
block_store,
tx_replayer,
tx_replay_sender: None,
max_retries,
transaction_service_builder: transaction_manager,
transaction_service: None,
block_listner,
})
}
@ -147,47 +146,20 @@ impl LiteBridge {
(None, None)
};
let tpu_service = self.tpu_service.clone();
let tpu_service = tpu_service.start();
let (tx_send, tx_recv) = mpsc::channel(DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE);
self.tx_send_channel = Some(tx_send);
let tx_sender = self
.tx_sender
.clone()
.execute(tx_recv, postgres_send.clone());
let (replay_sender, replay_reciever) = tokio::sync::mpsc::unbounded_channel();
let replay_service = self
.tx_replayer
.start_service(replay_sender.clone(), replay_reciever);
self.tx_replay_sender = Some(replay_sender);
let metrics_capture = MetricsCapture::new(self.tx_sender.clone()).capture();
let metrics_capture = MetricsCapture::new(self.tx_store.clone()).capture();
let prometheus_sync = PrometheusSync.sync(prometheus_addr);
let finalized_block_listener = self.block_listner.clone().listen(
CommitmentConfig::finalized(),
postgres_send.clone(),
self.tpu_service.get_estimated_slot_holder(),
);
let confirmed_block_listener = self.block_listner.clone().listen(
CommitmentConfig::confirmed(),
None,
self.tpu_service.get_estimated_slot_holder(),
);
let processed_block_listener = self.block_listner.clone().listen_processed();
let cleaner = Cleaner::new(
self.tx_sender.clone(),
self.block_listner.clone(),
self.block_store.clone(),
)
.start(clean_interval);
let max_retries = self.max_retries;
let (transaction_service, jh_transaction_services) = self
.transaction_service_builder
.start(
postgres_send,
self.block_store.clone(),
max_retries,
clean_interval,
)
.await;
self.transaction_service = Some(transaction_service);
let rpc = self.into_rpc();
let (ws_server, http_server) = {
@ -228,42 +200,24 @@ impl LiteBridge {
});
tokio::select! {
res = tpu_service => {
bail!("Tpu Services exited unexpectedly {res:?}");
},
res = ws_server => {
bail!("WebSocket server exited unexpectedly {res:?}");
},
res = http_server => {
bail!("HTTP server exited unexpectedly {res:?}");
},
res = tx_sender => {
bail!("Tx Sender exited unexpectedly {res:?}");
},
res = finalized_block_listener => {
bail!("Finalized Block Listener exited unexpectedly {res:?}");
},
res = confirmed_block_listener => {
bail!("Confirmed Block Listener exited unexpectedly {res:?}");
},
res = processed_block_listener => {
bail!("Processed Block Listener exited unexpectedly {res:?}");
},
res = metrics_capture => {
bail!("Metrics Capture exited unexpectedly {res:?}");
},
res = prometheus_sync => {
bail!("Prometheus Service exited unexpectedly {res:?}");
},
res = cleaner => {
bail!("Cleaner Service exited unexpectedly {res:?}");
},
res = replay_service => {
bail!("Tx replay service exited unexpectedly {res:?}");
},
res = postgres => {
bail!("Postgres service exited unexpectedly {res:?}");
},
res = jh_transaction_services => {
bail!("Transaction service exited unexpectedly {res:?}");
}
}
}
}
@ -289,56 +243,22 @@ impl LiteRpcServer for LiteBridge {
}
};
let tx = match bincode::deserialize::<VersionedTransaction>(&raw_tx) {
Ok(tx) => tx,
Err(err) => {
return Err(jsonrpsee::core::Error::Custom(err.to_string()));
}
};
let transaction_service = self
.transaction_service
.clone()
.expect("Transaction Service should have been initialized");
let sig = tx.get_signature();
let Some(BlockInformation { slot, .. }) = self
.block_store
.get_block_info(&tx.get_recent_blockhash().to_string())
else {
log::warn!("block");
return Err(jsonrpsee::core::Error::Custom("Blockhash not found in block store".to_string()));
};
let raw_tx_clone = raw_tx.clone();
if let Err(e) = self
.tx_send_channel
.as_ref()
.expect("Lite Bridge Not Executed")
.send((sig.to_string(), raw_tx, slot))
match transaction_service
.send_transaction(raw_tx, max_retries)
.await
{
error!(
"Internal error sending transaction on send channel error {}",
e
);
}
Ok(sig) => {
TXS_IN_CHANNEL.inc();
if let Some(tx_replay_sender) = &self.tx_replay_sender {
let max_replay = max_retries.map_or(self.max_retries, |x| x as usize);
let replay_at = Instant::now() + self.tx_replayer.retry_after;
// ignore error for replay service
if tx_replay_sender
.send(TransactionReplay {
signature: sig.to_string(),
tx: raw_tx_clone,
replay_count: 0,
max_replay,
replay_at,
})
.is_ok()
{
MESSAGES_IN_REPLAY_QUEUE.inc();
Ok(sig)
}
Err(e) => Err(jsonrpsee::core::Error::Custom(e.to_string())),
}
TXS_IN_CHANNEL.inc();
Ok(BinaryEncoding::Base58.encode(sig))
}
async fn get_latest_blockhash(
@ -424,12 +344,7 @@ impl LiteRpcServer for LiteBridge {
let sig_statuses = sigs
.iter()
.map(|sig| {
self.tx_sender
.txs_sent_store
.get(sig)
.and_then(|v| v.status.clone())
})
.map(|sig| self.tx_store.get(sig).and_then(|v| v.status.clone()))
.collect();
Ok(RpcResponse {
@ -481,8 +396,7 @@ impl LiteRpcServer for LiteBridge {
}
};
self.tx_sender
.txs_sent_store
self.tx_store
.insert(airdrop_sig.clone(), Default::default());
Ok(airdrop_sig)
@ -507,8 +421,12 @@ impl LiteRpcServer for LiteBridge {
RPC_SIGNATURE_SUBSCRIBE.inc();
let sink = pending.accept().await?;
self.block_listner
.signature_subscribe(signature, commitment_config, sink);
let jsonrpsee_sink = JsonRpseeSubscriptionHandlerSink::new(sink);
self.block_listner.signature_subscribe(
signature,
commitment_config,
Arc::new(jsonrpsee_sink),
);
Ok(())
}

View File

@ -0,0 +1,38 @@
use async_trait::async_trait;
use jsonrpsee::{SubscriptionMessage, SubscriptionSink};
use solana_rpc_client_api::response::{Response as RpcResponse, RpcResponseContext};
pub struct JsonRpseeSubscriptionHandlerSink {
jsonrpsee_sink: SubscriptionSink,
}
impl JsonRpseeSubscriptionHandlerSink {
pub fn new(jsonrpsee_sink: SubscriptionSink) -> Self {
Self { jsonrpsee_sink }
}
}
#[async_trait]
impl solana_lite_rpc_core::subscription_sink::SubscriptionSink
for JsonRpseeSubscriptionHandlerSink
{
async fn send(&self, slot: solana_sdk::slot_history::Slot, message: serde_json::Value) {
let _ = self
.jsonrpsee_sink
.send(
SubscriptionMessage::from_json(&RpcResponse {
context: RpcResponseContext {
slot,
api_version: None,
},
value: message,
})
.unwrap(),
)
.await;
}
fn is_closed(&self) -> bool {
self.jsonrpsee_sink.is_closed()
}
}

View File

@ -6,6 +6,7 @@ pub mod cli;
pub mod configs;
pub mod encoding;
pub mod errors;
pub mod jsonrpsee_subscrption_handler_sink;
pub mod postgres;
pub mod rpc;

View File

@ -1,4 +1,5 @@
use anyhow::{bail, Context};
use chrono::{DateTime, Utc};
use futures::join;
use log::{info, warn};
use postgres_native_tls::MakeTlsConnector;
@ -15,7 +16,7 @@ use native_tls::{Certificate, Identity, TlsConnector};
use crate::encoding::BinaryEncoding;
use solana_lite_rpc_core::notifications::{
BlockNotification, NotificationMsg, NotificationReciever, SchemaSize, TransactionNotification,
BlockNotification, NotificationMsg, NotificationReciever, TransactionNotification,
TransactionUpdateNotification,
};
@ -24,8 +25,105 @@ lazy_static::lazy_static! {
pub static ref POSTGRES_SESSION_ERRORS: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_session_errors", "Number of failures while establishing postgres session")).unwrap();
}
use std::convert::From;
const MAX_QUERY_SIZE: usize = 200_000; // 0.2 mb
pub trait SchemaSize {
const DEFAULT_SIZE: usize = 0;
const MAX_SIZE: usize = 0;
}
#[derive(Debug)]
pub struct PostgresTx {
pub signature: String, // 88 bytes
pub recent_slot: i64, // 8 bytes
pub forwarded_slot: i64, // 8 bytes
pub forwarded_local_time: DateTime<Utc>, // 8 bytes
pub processed_slot: Option<i64>,
pub cu_consumed: Option<i64>,
pub cu_requested: Option<i64>,
pub quic_response: i16, // 2 bytes
}
impl SchemaSize for PostgresTx {
const DEFAULT_SIZE: usize = 88 + (4 * 8);
const MAX_SIZE: usize = Self::DEFAULT_SIZE + (3 * 8);
}
impl From<&TransactionNotification> for PostgresTx {
fn from(value: &TransactionNotification) -> Self {
Self {
signature: value.signature.clone(),
recent_slot: value.recent_slot as i64,
forwarded_slot: value.forwarded_slot as i64,
forwarded_local_time: value.forwarded_local_time,
processed_slot: value.processed_slot.map(|x| x as i64),
cu_consumed: value.cu_consumed.map(|x| x as i64),
cu_requested: value.cu_requested.map(|x| x as i64),
quic_response: value.quic_response,
}
}
}
#[derive(Debug)]
pub struct PostgresTxUpdate {
pub signature: String, // 88 bytes
pub processed_slot: i64, // 8 bytes
pub cu_consumed: Option<i64>,
pub cu_requested: Option<i64>,
pub cu_price: Option<i64>,
}
impl SchemaSize for PostgresTxUpdate {
const DEFAULT_SIZE: usize = 88 + 8;
const MAX_SIZE: usize = Self::DEFAULT_SIZE + (3 * 8);
}
impl From<&TransactionUpdateNotification> for PostgresTxUpdate {
fn from(value: &TransactionUpdateNotification) -> Self {
Self {
signature: value.signature.clone(),
processed_slot: value.slot as i64,
cu_consumed: value.cu_consumed.map(|x| x as i64),
cu_requested: value.cu_requested.map(|x| x as i64),
cu_price: value.cu_price.map(|x| x as i64),
}
}
}
#[derive(Debug)]
pub struct PostgresBlock {
pub slot: i64, // 8 bytes
pub leader_id: i64, // 8 bytes
pub parent_slot: i64, // 8 bytes
pub cluster_time: DateTime<Utc>, // 8 bytes
pub local_time: Option<DateTime<Utc>>,
}
impl SchemaSize for PostgresBlock {
const DEFAULT_SIZE: usize = 4 * 8;
const MAX_SIZE: usize = Self::DEFAULT_SIZE + 8;
}
impl From<BlockNotification> for PostgresBlock {
fn from(value: BlockNotification) -> Self {
Self {
slot: value.slot as i64,
leader_id: 0, // TODO
cluster_time: value.cluster_time,
local_time: value.local_time,
parent_slot: value.parent_slot as i64,
}
}
}
#[derive(Debug)]
pub struct AccountAddr {
pub id: u32,
pub addr: String,
}
const fn get_max_safe_inserts<T: SchemaSize>() -> usize {
if T::DEFAULT_SIZE == 0 {
panic!("DEFAULT_SIZE can't be 0. SchemaSize impl should override the DEFAULT_SIZE const");
@ -134,7 +232,7 @@ impl PostgresSession {
}
}
pub async fn send_txs(&self, txs: &[TransactionNotification]) -> anyhow::Result<()> {
pub async fn send_txs(&self, txs: &[PostgresTx]) -> anyhow::Result<()> {
const NUMBER_OF_ARGS: usize = 8;
if txs.is_empty() {
@ -144,7 +242,7 @@ impl PostgresSession {
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * txs.len());
for tx in txs.iter() {
let TransactionNotification {
let PostgresTx {
signature,
recent_slot,
forwarded_slot,
@ -183,7 +281,7 @@ impl PostgresSession {
Ok(())
}
pub async fn send_blocks(&self, blocks: &[BlockNotification]) -> anyhow::Result<()> {
pub async fn send_blocks(&self, blocks: &[PostgresBlock]) -> anyhow::Result<()> {
const NUMBER_OF_ARGS: usize = 5;
if blocks.is_empty() {
@ -193,7 +291,7 @@ impl PostgresSession {
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * blocks.len());
for block in blocks.iter() {
let BlockNotification {
let PostgresBlock {
slot,
leader_id,
parent_slot,
@ -233,7 +331,7 @@ impl PostgresSession {
Ok(())
}
pub async fn update_txs(&self, txs: &[TransactionUpdateNotification]) -> anyhow::Result<()> {
pub async fn update_txs(&self, txs: &[PostgresTxUpdate]) -> anyhow::Result<()> {
const NUMBER_OF_ARGS: usize = 5;
if txs.is_empty() {
@ -243,7 +341,7 @@ impl PostgresSession {
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * txs.len());
for tx in txs.iter() {
let TransactionUpdateNotification {
let PostgresTxUpdate {
signature,
processed_slot,
cu_consumed,
@ -317,15 +415,13 @@ impl Postgres {
tokio::spawn(async move {
info!("start postgres worker");
const TX_MAX_CAPACITY: usize = get_max_safe_inserts::<TransactionNotification>();
const BLOCK_MAX_CAPACITY: usize = get_max_safe_inserts::<BlockNotification>();
const UPDATE_MAX_CAPACITY: usize =
get_max_safe_updates::<TransactionUpdateNotification>();
const TX_MAX_CAPACITY: usize = get_max_safe_inserts::<PostgresTx>();
const BLOCK_MAX_CAPACITY: usize = get_max_safe_inserts::<PostgresBlock>();
const UPDATE_MAX_CAPACITY: usize = get_max_safe_updates::<PostgresTxUpdate>();
let mut tx_batch: Vec<TransactionNotification> = Vec::with_capacity(TX_MAX_CAPACITY);
let mut block_batch: Vec<BlockNotification> = Vec::with_capacity(BLOCK_MAX_CAPACITY);
let mut update_batch =
Vec::<TransactionUpdateNotification>::with_capacity(UPDATE_MAX_CAPACITY);
let mut tx_batch: Vec<PostgresTx> = Vec::with_capacity(TX_MAX_CAPACITY);
let mut block_batch: Vec<PostgresBlock> = Vec::with_capacity(BLOCK_MAX_CAPACITY);
let mut update_batch = Vec::<PostgresTxUpdate>::with_capacity(UPDATE_MAX_CAPACITY);
let mut session_establish_error = false;
@ -349,13 +445,15 @@ impl Postgres {
MESSAGES_IN_POSTGRES_CHANNEL.dec();
match msg {
NotificationMsg::TxNotificationMsg(mut tx) => {
NotificationMsg::TxNotificationMsg(tx) => {
let mut tx = tx.iter().map(|x| x.into()).collect::<Vec<_>>();
tx_batch.append(&mut tx)
}
NotificationMsg::BlockNotificationMsg(block) => {
block_batch.push(block)
block_batch.push(block.into())
}
NotificationMsg::UpdateTransactionMsg(mut update) => {
NotificationMsg::UpdateTransactionMsg(update) => {
let mut update = update.iter().map(|x| x.into()).collect();
update_batch.append(&mut update)
}

View File

@ -19,7 +19,7 @@ solana-pubsub-client = { workspace = true }
solana-streamer = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tokio = "1.*"
bincode = { workspace = true }
bs58 = { workspace = true }
base64 = { workspace = true }
@ -29,7 +29,6 @@ bytes = { workspace = true }
anyhow = { workspace = true }
log = { workspace = true }
dashmap = { workspace = true }
jsonrpsee = { workspace = true }
prometheus = { workspace = true }
lazy_static = { workspace = true }
async-channel = { workspace = true }

View File

@ -1,6 +1,6 @@
use std::{
sync::{
atomic::{AtomicU64, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
time::Duration,
@ -8,8 +8,6 @@ use std::{
use anyhow::{bail, Context};
use chrono::{TimeZone, Utc};
use jsonrpsee::SubscriptionSink;
use log::{error, info, trace};
use prometheus::{
core::GenericGauge, histogram_opts, opts, register_histogram, register_int_counter,
@ -34,11 +32,10 @@ use solana_lite_rpc_core::{
notifications::{
BlockNotification, NotificationMsg, NotificationSender, TransactionUpdateNotification,
},
subscription_handler::SubscriptionHandler,
subscription_handler::{SubscptionHanderSink, SubscriptionHandler},
tx_store::{TxProps, TxStore},
};
use crate::tx_sender::{TxProps, TxSender};
lazy_static::lazy_static! {
static ref TT_RECV_CON_BLOCK: Histogram = register_histogram!(histogram_opts!(
"literpc_tt_recv_con_block",
@ -72,7 +69,7 @@ lazy_static::lazy_static! {
/// and keeps a track of confirmed txs
#[derive(Clone)]
pub struct BlockListener {
tx_sender: TxSender,
tx_store: TxStore,
rpc_client: Arc<RpcClient>,
block_processor: BlockProcessor,
subscription_handler: SubscriptionHandler,
@ -85,11 +82,11 @@ pub struct BlockListnerNotificatons {
}
impl BlockListener {
pub fn new(rpc_client: Arc<RpcClient>, tx_sender: TxSender, block_store: BlockStore) -> Self {
pub fn new(rpc_client: Arc<RpcClient>, tx_store: TxStore, block_store: BlockStore) -> Self {
Self {
block_processor: BlockProcessor::new(rpc_client.clone(), Some(block_store.clone())),
rpc_client,
tx_sender,
tx_store,
subscription_handler: SubscriptionHandler::default(),
block_store,
}
@ -98,7 +95,7 @@ impl BlockListener {
pub async fn num_of_sigs_commited(&self, sigs: &[String]) -> usize {
let mut num_of_sigs_commited = 0;
for sig in sigs {
if self.tx_sender.txs_sent_store.contains_key(sig) {
if self.tx_store.contains_key(sig) {
num_of_sigs_commited += 1;
}
}
@ -109,7 +106,7 @@ impl BlockListener {
&self,
signature: String,
commitment_config: CommitmentConfig,
sink: SubscriptionSink,
sink: SubscptionHanderSink,
) {
self.subscription_handler
.signature_subscribe(signature, commitment_config, sink);
@ -134,7 +131,7 @@ impl BlockListener {
&self,
slot: Slot,
commitment_config: CommitmentConfig,
postgres: Option<NotificationSender>,
notification_channel: Option<NotificationSender>,
) -> anyhow::Result<()> {
//info!("indexing slot {} commitment {}", slot, commitment_config.commitment);
let comfirmation_status = match commitment_config.commitment {
@ -163,10 +160,14 @@ impl BlockListener {
let mut transactions_processed = 0;
let mut transactions_to_update = vec![];
let mut total_cu_consumed: u64 = 0;
let mut cu_consumed_by_txs: u64 = 0;
let total_txs = block_processor_result.transaction_infos.len();
let mut tx_count: usize = 0;
for tx_info in block_processor_result.transaction_infos {
transactions_processed += 1;
if let Some(mut tx_status) = self.tx_sender.txs_sent_store.get_mut(&tx_info.signature) {
total_cu_consumed = total_cu_consumed.saturating_add(tx_info.cu_consumed.unwrap_or(0));
if let Some(mut tx_status) = self.tx_store.get_mut(&tx_info.signature) {
//
// Metrics
//
@ -175,6 +176,9 @@ impl BlockListener {
} else {
TXS_CONFIRMED.inc();
}
cu_consumed_by_txs =
cu_consumed_by_txs.saturating_add(tx_info.cu_consumed.unwrap_or(0));
tx_count = tx_count.saturating_add(1);
trace!(
"got transaction {} confrimation level {}",
@ -191,13 +195,17 @@ impl BlockListener {
});
// prepare writing to postgres
if let Some(_postgres) = &postgres {
if let Some(_notification_channel) = &notification_channel {
transactions_to_update.push(TransactionUpdateNotification {
signature: tx_info.signature.clone(),
processed_slot: slot as i64,
slot,
cu_consumed: tx_info.cu_consumed,
cu_requested: tx_info.cu_requested,
cu_price: tx_info.prioritization_fees,
transaction_status: tx_info.status.clone(),
blockhash: block_processor_result.blockhash.clone(),
leader: block_processor_result.leader_id.clone().unwrap_or_default(),
commitment: commitment_config.commitment,
});
}
};
@ -210,12 +218,11 @@ impl BlockListener {
//
// Notify
//
if let Some(postgres) = &postgres {
postgres
.send(NotificationMsg::UpdateTransactionMsg(
transactions_to_update,
))
.unwrap();
if let Some(notification_channel) = &notification_channel {
// ignore error because may be the channel is already closed
let _ = notification_channel.send(NotificationMsg::UpdateTransactionMsg(
transactions_to_update,
));
}
trace!(
@ -226,7 +233,7 @@ impl BlockListener {
start.elapsed().as_millis()
);
if let Some(postgres) = &postgres {
if let Some(notification_channel) = &notification_channel {
// TODO insert if not exists leader_id into accountaddrs
// fetch cluster time from rpc
@ -237,15 +244,23 @@ impl BlockListener {
.block_store
.get_block_info(&block_processor_result.blockhash);
postgres
.send(NotificationMsg::BlockNotificationMsg(BlockNotification {
slot: slot as i64,
leader_id: 0, // TODO: lookup leader
parent_slot: block_processor_result.parent_slot as i64,
// ignore error because may be the channel is already closed
let _ = notification_channel.send(NotificationMsg::BlockNotificationMsg(
BlockNotification {
slot,
block_leader: block_processor_result.leader_id.unwrap_or_default(), // TODO: lookup leader
parent_slot: block_processor_result.parent_slot,
cluster_time: Utc.timestamp_millis_opt(block_time * 1000).unwrap(),
local_time: block_info.and_then(|b| b.processed_local_time),
}))
.expect("Error sending block to postgres service");
blockhash: block_processor_result.blockhash,
commitment: commitment_config.commitment,
block_time: block_processor_result.block_time,
total_transactions: total_txs as u64,
transaction_found: tx_count as u64,
cu_consumed_by_txs,
total_cu_consumed,
},
));
}
Ok(())
@ -256,6 +271,7 @@ impl BlockListener {
commitment_config: CommitmentConfig,
notifier: Option<NotificationSender>,
estimated_slot: Arc<AtomicU64>,
exit_signal: Arc<AtomicBool>,
) -> anyhow::Result<()> {
let (slot_retry_queue_sx, mut slot_retry_queue_rx) = tokio::sync::mpsc::unbounded_channel();
let (block_schedule_queue_sx, block_schedule_queue_rx) = async_channel::unbounded::<Slot>();
@ -263,14 +279,18 @@ impl BlockListener {
// task to fetch blocks
//
let this = self.clone();
let exit_signal_l = exit_signal.clone();
let slot_indexer_tasks = (0..8).map(move |_| {
let this = this.clone();
let notifier = notifier.clone();
let slot_retry_queue_sx = slot_retry_queue_sx.clone();
let block_schedule_queue_rx = block_schedule_queue_rx.clone();
let exit_signal_l = exit_signal_l.clone();
let task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
loop {
if exit_signal_l.load(Ordering::Relaxed) {
break;
}
match block_schedule_queue_rx.recv().await {
Ok(slot) => {
if commitment_config.is_finalized() {
@ -289,20 +309,20 @@ impl BlockListener {
.checked_add(Duration::from_millis(10))
.unwrap();
slot_retry_queue_sx
.send((slot, retry_at))
.context("Error sending slot to retry queue from slot indexer task")?;
slot_retry_queue_sx.send((slot, retry_at)).context(
"Error sending slot to retry queue from slot indexer task",
)?;
BLOCKS_IN_RETRY_QUEUE.inc();
};
},
}
Err(_) => {
// We get error because channel is empty we retry recv again
tokio::time::sleep(Duration::from_millis(1)).await;
}
}
}
//bail!("Block Slot channel closed")
bail!("Block Slot channel closed")
});
task
@ -314,9 +334,13 @@ impl BlockListener {
let slot_retry_task: JoinHandle<anyhow::Result<()>> = {
let block_schedule_queue_sx = block_schedule_queue_sx.clone();
let recent_slot = recent_slot.clone();
let exit_signal_l = exit_signal.clone();
tokio::spawn(async move {
while let Some((slot, instant)) = slot_retry_queue_rx.recv().await {
if exit_signal_l.load(Ordering::Relaxed) {
break;
}
BLOCKS_IN_RETRY_QUEUE.dec();
let recent_slot = recent_slot.load(std::sync::atomic::Ordering::Relaxed);
// if slot is too old ignore
@ -366,6 +390,9 @@ impl BlockListener {
continue;
}
if exit_signal.load(Ordering::Relaxed) {
break;
}
// filter already processed slots
let new_block_slots: Vec<u64> = (last_latest_slot..new_slot).collect();
// context for lock
@ -387,6 +414,7 @@ impl BlockListener {
last_latest_slot = new_slot;
recent_slot.store(last_latest_slot, std::sync::atomic::Ordering::Relaxed);
}
Ok(())
});
tokio::select! {
@ -403,13 +431,17 @@ impl BlockListener {
}
// continuosly poll processed blocks and feed into blockstore
pub fn listen_processed(self) -> JoinHandle<anyhow::Result<()>> {
pub fn listen_processed(self, exit_signal: Arc<AtomicBool>) -> JoinHandle<anyhow::Result<()>> {
let block_processor = self.block_processor;
tokio::spawn(async move {
info!("processed block listner started");
loop {
if exit_signal.load(Ordering::Relaxed) {
break;
}
if let Err(err) = block_processor
.poll_latest_block(CommitmentConfig::processed())
.await
@ -420,6 +452,7 @@ impl BlockListener {
// sleep
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
Ok(())
})
}

View File

@ -1,3 +1,5 @@
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;
use crate::block_listenser::BlockListener;
@ -47,19 +49,27 @@ impl Cleaner {
BLOCKS_IN_BLOCKSTORE.set(self.block_store.number_of_blocks_in_store() as i64);
}
pub fn start(self, ttl_duration: Duration) -> JoinHandle<anyhow::Result<()>> {
pub fn start(
self,
ttl_duration: Duration,
exit_signal: Arc<AtomicBool>,
) -> JoinHandle<anyhow::Result<()>> {
let mut ttl = tokio::time::interval(ttl_duration);
tokio::spawn(async move {
info!("Cleaning memory");
loop {
if exit_signal.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
ttl.tick().await;
self.clean_tx_sender(ttl_duration);
self.clean_block_listeners(ttl_duration);
self.clean_block_store(ttl_duration).await;
}
Ok(())
})
}
}

View File

@ -4,4 +4,5 @@ pub mod metrics_capture;
pub mod prometheus_sync;
pub mod tpu_utils;
pub mod transaction_replayer;
pub mod transaction_service;
pub mod tx_sender;

View File

@ -1,9 +1,9 @@
use std::sync::Arc;
use crate::tx_sender::TxSender;
use log::info;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use serde::{Deserialize, Serialize};
use solana_lite_rpc_core::tx_store::TxStore;
use solana_transaction_status::TransactionConfirmationStatus;
use tokio::{sync::RwLock, task::JoinHandle};
@ -26,7 +26,7 @@ lazy_static::lazy_static! {
/// Background worker which captures metrics
#[derive(Clone)]
pub struct MetricsCapture {
tx_sender: TxSender,
txs_store: TxStore,
metrics: Arc<RwLock<Metrics>>,
}
@ -41,9 +41,9 @@ pub struct Metrics {
}
impl MetricsCapture {
pub fn new(tx_sender: TxSender) -> Self {
pub fn new(txs_store: TxStore) -> Self {
Self {
tx_sender,
txs_store,
metrics: Default::default(),
}
}
@ -65,11 +65,11 @@ impl MetricsCapture {
loop {
one_second.tick().await;
let txs_sent = self.tx_sender.txs_sent_store.len();
let txs_sent = self.txs_store.len();
let mut txs_confirmed: usize = 0;
let mut txs_finalized: usize = 0;
for tx in self.tx_sender.txs_sent_store.iter() {
for tx in self.txs_store.iter() {
if let Some(tx) = &tx.value().status {
match tx.confirmation_status() {
TransactionConfirmationStatus::Confirmed => txs_confirmed += 1,

View File

@ -1,11 +1,10 @@
use crate::tx_sender::TxProps;
use dashmap::DashMap;
use log::{error, trace};
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use quinn::{Connection, Endpoint};
use solana_lite_rpc_core::{
quic_connection_utils::QuicConnectionUtils, rotating_queue::RotatingQueue,
structures::identity_stakes::IdentityStakes,
structures::identity_stakes::IdentityStakes, tx_store::TxStore,
};
use solana_sdk::pubkey::Pubkey;
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
@ -39,7 +38,7 @@ struct ActiveConnection {
identity: Pubkey,
tpu_address: SocketAddr,
exit_signal: Arc<AtomicBool>,
txs_sent_store: Arc<DashMap<String, TxProps>>,
txs_sent_store: TxStore,
}
impl ActiveConnection {
@ -47,7 +46,7 @@ impl ActiveConnection {
endpoint: Endpoint,
tpu_address: SocketAddr,
identity: Pubkey,
txs_sent_store: Arc<DashMap<String, TxProps>>,
txs_sent_store: TxStore,
) -> Self {
Self {
endpoint,
@ -62,10 +61,7 @@ impl ActiveConnection {
NB_QUIC_CONNECTIONS.inc();
}
fn check_for_confirmation(
txs_sent_store: &Arc<DashMap<String, TxProps>>,
signature: String,
) -> bool {
fn check_for_confirmation(txs_sent_store: &TxStore, signature: String) -> bool {
match txs_sent_store.get(&signature) {
Some(props) => props.status.is_some(),
None => false,
@ -81,7 +77,7 @@ impl ActiveConnection {
exit_signal: Arc<AtomicBool>,
identity: Pubkey,
identity_stakes: IdentityStakes,
txs_sent_store: Arc<DashMap<String, TxProps>>,
txs_sent_store: TxStore,
) {
NB_QUIC_ACTIVE_CONNECTIONS.inc();
let mut transaction_reciever = transaction_reciever;
@ -255,7 +251,7 @@ impl TpuConnectionManager {
transaction_sender: Arc<Sender<(String, Vec<u8>)>>,
connections_to_keep: HashMap<Pubkey, SocketAddr>,
identity_stakes: IdentityStakes,
txs_sent_store: Arc<DashMap<String, TxProps>>,
txs_sent_store: TxStore,
) {
NB_CONNECTIONS_TO_KEEP.set(connections_to_keep.len() as i64);
for (identity, socket_addr) in &connections_to_keep {

View File

@ -1,14 +1,14 @@
use anyhow::bail;
use dashmap::DashMap;
use log::{error, info};
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_lite_rpc_core::{
leader_schedule::LeaderSchedule, solana_utils::SolanaUtils,
structures::identity_stakes::IdentityStakes, AnyhowJoinHandle,
structures::identity_stakes::IdentityStakes, tx_store::TxStore, AnyhowJoinHandle,
};
use super::tpu_connection_manager::TpuConnectionManager;
use solana_sdk::{
pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair, signer::Signer, slot_history::Slot,
};
@ -17,7 +17,7 @@ use std::{
net::{IpAddr, Ipv4Addr},
str::FromStr,
sync::{
atomic::{AtomicU64, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
};
@ -26,9 +26,6 @@ use tokio::{
time::{Duration, Instant},
};
use super::tpu_connection_manager::TpuConnectionManager;
use crate::tx_sender::TxProps;
const CACHE_NEXT_SLOT_LEADERS_PUBKEY_SIZE: usize = 1024; // Save pubkey and contact info of next 1024 leaders in the queue
const CLUSTERINFO_REFRESH_TIME: u64 = 60 * 60; // stakes every 1hrs
const LEADER_SCHEDULE_UPDATE_INTERVAL: u64 = 10; // update leader schedule every 10s
@ -59,7 +56,7 @@ pub struct TpuService {
tpu_connection_manager: Arc<TpuConnectionManager>,
identity: Arc<Keypair>,
identity_stakes: Arc<RwLock<IdentityStakes>>,
txs_sent_store: Arc<DashMap<String, TxProps>>,
txs_sent_store: TxStore,
leader_schedule: Arc<LeaderSchedule>,
}
@ -70,7 +67,7 @@ impl TpuService {
identity: Arc<Keypair>,
rpc_client: Arc<RpcClient>,
rpc_ws_address: String,
txs_sent_store: Arc<DashMap<String, TxProps>>,
txs_sent_store: TxStore,
) -> anyhow::Result<Self> {
let (sender, _) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE);
let (certificate, key) = new_self_signed_tls_certificate(
@ -165,10 +162,15 @@ impl TpuService {
.await;
}
fn check_exit_signal(exit_signal: &Arc<AtomicBool>) -> bool {
exit_signal.load(Ordering::Relaxed)
}
async fn update_current_slot(
&self,
update_notifier: tokio::sync::mpsc::UnboundedSender<u64>,
) -> anyhow::Result<()> {
exit_signal: Arc<AtomicBool>,
) {
let current_slot = self.current_slot.clone();
let update_slot = |slot: u64| {
if slot > current_slot.load(Ordering::Relaxed) {
@ -179,17 +181,17 @@ impl TpuService {
};
loop {
if Self::check_exit_signal(&exit_signal) {
break;
}
// always loop update the current slots as it is central to working of TPU
let _ = SolanaUtils::poll_slots(
self.rpc_client.clone(),
&self.rpc_ws_address,
update_slot,
)
.await;
let _ =
SolanaUtils::poll_slots(self.rpc_client.clone(), &self.rpc_ws_address, update_slot)
.await;
}
}
pub async fn start(&self) -> anyhow::Result<()> {
pub async fn start(&self, exit_signal: Arc<AtomicBool>) -> anyhow::Result<()> {
self.leader_schedule
.load_cluster_info(self.rpc_client.clone())
.await?;
@ -198,13 +200,21 @@ impl TpuService {
self.update_quic_connections().await;
let this = self.clone();
let exit_signal_l = exit_signal.clone();
let jh_update_leaders = tokio::spawn(async move {
let mut last_cluster_info_update = Instant::now();
let leader_schedule_update_interval =
Duration::from_secs(LEADER_SCHEDULE_UPDATE_INTERVAL);
let cluster_info_update_interval = Duration::from_secs(CLUSTERINFO_REFRESH_TIME);
loop {
if Self::check_exit_signal(&exit_signal_l) {
break;
}
tokio::time::sleep(leader_schedule_update_interval).await;
if Self::check_exit_signal(&exit_signal_l) {
break;
}
info!("update leader schedule and cluster nodes");
if this.update_leader_schedule().await.is_err() {
error!("unable to update leader shedule");
@ -221,18 +231,23 @@ impl TpuService {
let this = self.clone();
let (slot_sender, slot_reciever) = tokio::sync::mpsc::unbounded_channel::<Slot>();
let exit_signal_l = exit_signal.clone();
let slot_sub_task: AnyhowJoinHandle = tokio::spawn(async move {
this.update_current_slot(slot_sender).await?;
this.update_current_slot(slot_sender, exit_signal_l).await;
Ok(())
});
let estimated_slot = self.estimated_slot.clone();
let current_slot = self.current_slot.clone();
let this = self.clone();
let exit_signal_l = exit_signal.clone();
let estimated_slot_calculation = tokio::spawn(async move {
let mut slot_update_notifier = slot_reciever;
loop {
if Self::check_exit_signal(&exit_signal_l) {
break;
}
if SolanaUtils::slot_estimator(
&mut slot_update_notifier,
current_slot.clone(),

View File

@ -1,7 +1,11 @@
use crate::tx_sender::TxSender;
use crate::tpu_utils::tpu_service::TpuService;
use log::error;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use std::time::Duration;
use solana_lite_rpc_core::tx_store::TxStore;
use std::{
sync::{atomic::AtomicBool, Arc},
time::Duration,
};
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender},
task::JoinHandle,
@ -24,14 +28,16 @@ pub struct TransactionReplay {
#[derive(Clone)]
pub struct TransactionReplayer {
pub tx_sender: TxSender,
pub tpu_service: TpuService,
pub tx_store: TxStore,
pub retry_after: Duration,
}
impl TransactionReplayer {
pub fn new(tx_sender: TxSender, retry_after: Duration) -> Self {
pub fn new(tpu_service: TpuService, tx_store: TxStore, retry_after: Duration) -> Self {
Self {
tx_sender,
tpu_service,
tx_store,
retry_after,
}
}
@ -40,12 +46,17 @@ impl TransactionReplayer {
&self,
sender: UnboundedSender<TransactionReplay>,
reciever: UnboundedReceiver<TransactionReplay>,
exit_signal: Arc<AtomicBool>,
) -> JoinHandle<anyhow::Result<()>> {
let tx_sender = self.tx_sender.clone();
let tpu_service = self.tpu_service.clone();
let tx_store = self.tx_store.clone();
let retry_after = self.retry_after;
tokio::spawn(async move {
let mut reciever = reciever;
loop {
if exit_signal.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
let tx = reciever.recv().await;
match tx {
Some(mut tx_replay) => {
@ -53,7 +64,7 @@ impl TransactionReplayer {
if Instant::now() < tx_replay.replay_at {
tokio::time::sleep_until(tx_replay.replay_at).await;
}
if let Some(tx) = tx_sender.txs_sent_store.get(&tx_replay.signature) {
if let Some(tx) = tx_store.get(&tx_replay.signature) {
if tx.status.is_some() {
// transaction has been confirmed / no retry needed
continue;
@ -63,8 +74,7 @@ impl TransactionReplayer {
continue;
}
// ignore reset error
let _ = tx_sender
.tpu_service
let _ = tpu_service
.send_transaction(tx_replay.signature.clone(), tx_replay.tx.clone());
if tx_replay.replay_count < tx_replay.max_replay {

View File

@ -0,0 +1,215 @@
// This class will manage the lifecycle for a transaction
// It will send, replay if necessary and confirm by listening to blocks
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use crate::{
block_listenser::BlockListener,
cleaner::Cleaner,
tpu_utils::tpu_service::TpuService,
transaction_replayer::{TransactionReplay, TransactionReplayer, MESSAGES_IN_REPLAY_QUEUE},
tx_sender::{TxSender, WireTransaction},
};
use anyhow::bail;
use solana_lite_rpc_core::{
block_store::{BlockInformation, BlockStore},
notifications::NotificationSender,
};
use solana_rpc_client::rpc_client::SerializableTransaction;
use solana_sdk::{commitment_config::CommitmentConfig, transaction::VersionedTransaction};
use tokio::{
sync::mpsc::{self, Sender, UnboundedSender},
task::JoinHandle,
time::Instant,
};
#[derive(Clone)]
pub struct TransactionServiceBuilder {
tx_sender: TxSender,
tx_replayer: TransactionReplayer,
block_listner: BlockListener,
tpu_service: TpuService,
max_nb_txs_in_queue: usize,
}
impl TransactionServiceBuilder {
pub fn new(
tx_sender: TxSender,
tx_replayer: TransactionReplayer,
block_listner: BlockListener,
tpu_service: TpuService,
max_nb_txs_in_queue: usize,
) -> Self {
Self {
tx_sender,
tx_replayer,
tpu_service,
block_listner,
max_nb_txs_in_queue,
}
}
pub async fn start(
&self,
notifier: Option<NotificationSender>,
block_store: BlockStore,
max_retries: usize,
clean_interval: Duration,
) -> (TransactionService, JoinHandle<String>) {
let (transaction_channel, tx_recv) = mpsc::channel(self.max_nb_txs_in_queue);
let (replay_channel, replay_reciever) = tokio::sync::mpsc::unbounded_channel();
let tx_sender = self.tx_sender.clone();
let block_listner = self.block_listner.clone();
let tx_replayer = self.tx_replayer.clone();
let tpu_service = self.tpu_service.clone();
let replay_channel_task = replay_channel.clone();
let exit_signal = Arc::new(AtomicBool::new(false));
let exit_signal_t = exit_signal.clone();
let block_store_t = block_store.clone();
let jh_services: JoinHandle<String> = tokio::spawn(async move {
let tpu_service_fx = tpu_service.start(exit_signal_t.clone());
let tx_sender_jh =
tx_sender
.clone()
.execute(tx_recv, notifier.clone(), exit_signal_t.clone());
let replay_service = tx_replayer.start_service(
replay_channel_task,
replay_reciever,
exit_signal_t.clone(),
);
let finalized_block_listener = block_listner.clone().listen(
CommitmentConfig::finalized(),
notifier.clone(),
tpu_service.get_estimated_slot_holder(),
exit_signal_t.clone(),
);
let confirmed_block_listener = block_listner.clone().listen(
CommitmentConfig::confirmed(),
None,
tpu_service.get_estimated_slot_holder(),
exit_signal_t.clone(),
);
let processed_block_listener = block_listner
.clone()
.listen_processed(exit_signal_t.clone());
let cleaner = Cleaner::new(tx_sender.clone(), block_listner.clone(), block_store_t)
.start(clean_interval, exit_signal_t);
tokio::select! {
res = tpu_service_fx => {
format!("{res:?}")
},
res = tx_sender_jh => {
format!("{res:?}")
},
res = finalized_block_listener => {
format!("{res:?}")
},
res = confirmed_block_listener => {
format!("{res:?}")
},
res = processed_block_listener => {
format!("{res:?}")
},
res = replay_service => {
format!("{res:?}")
},
res = cleaner => {
format!("{res:?}")
},
}
});
(
TransactionService {
transaction_channel,
replay_channel,
exit_signal,
block_store,
max_retries,
replay_after: self.tx_replayer.retry_after,
},
jh_services,
)
}
}
#[derive(Clone)]
pub struct TransactionService {
pub transaction_channel: Sender<(String, WireTransaction, u64)>,
pub replay_channel: UnboundedSender<TransactionReplay>,
pub exit_signal: Arc<AtomicBool>,
pub block_store: BlockStore,
pub max_retries: usize,
pub replay_after: Duration,
}
impl TransactionService {
pub async fn send_transaction(
&self,
raw_tx: Vec<u8>,
max_retries: Option<u16>,
) -> anyhow::Result<String> {
let tx = match bincode::deserialize::<VersionedTransaction>(&raw_tx) {
Ok(tx) => tx,
Err(err) => {
bail!(err.to_string());
}
};
let signature = tx.signatures[0];
let Some(BlockInformation { slot, .. }) = self
.block_store
.get_block_info(&tx.get_recent_blockhash().to_string())
else {
bail!("Blockhash not found in block store".to_string());
};
let raw_tx_clone = raw_tx.clone();
let max_replay = max_retries.map_or(self.max_retries, |x| x as usize);
if let Err(e) = self
.transaction_channel
.send((signature.to_string(), raw_tx, slot))
.await
{
bail!(
"Internal error sending transaction on send channel error {}",
e
);
}
let replay_at = Instant::now() + self.replay_after;
// ignore error for replay service
if self
.replay_channel
.send(TransactionReplay {
signature: signature.to_string(),
tx: raw_tx_clone,
replay_count: 0,
max_replay,
replay_at,
})
.is_ok()
{
MESSAGES_IN_REPLAY_QUEUE.inc();
}
Ok(signature.to_string())
}
pub fn stop(&self) {
self.exit_signal.store(true, Ordering::Relaxed)
}
}

View File

@ -1,23 +1,22 @@
use std::{
sync::Arc,
sync::{atomic::AtomicBool, Arc},
time::{Duration, Instant},
};
use anyhow::bail;
use chrono::Utc;
use dashmap::DashMap;
use log::{info, trace, warn};
use prometheus::{
core::GenericGauge, histogram_opts, opts, register_histogram, register_int_counter,
register_int_gauge, Histogram, IntCounter,
};
use solana_transaction_status::TransactionStatus;
use tokio::{sync::mpsc::Receiver, task::JoinHandle};
use crate::tpu_utils::tpu_service::TpuService;
use solana_lite_rpc_core::notifications::{
NotificationMsg, NotificationSender, TransactionNotification,
use solana_lite_rpc_core::{
notifications::{NotificationMsg, NotificationSender, TransactionNotification},
tx_store::{TxProps, TxStore},
};
lazy_static::lazy_static! {
@ -46,32 +45,13 @@ const MAX_BATCH_SIZE_IN_PER_INTERVAL: usize = 2000;
#[derive(Clone)]
pub struct TxSender {
/// Tx(s) forwarded to tpu
pub txs_sent_store: Arc<DashMap<String, TxProps>>,
txs_sent_store: TxStore,
/// TpuClient to call the tpu port
pub tpu_service: Arc<TpuService>,
}
/// Transaction Properties
pub struct TxProps {
pub status: Option<TransactionStatus>,
/// Time at which transaction was forwarded
pub sent_at: Instant,
}
impl Default for TxProps {
fn default() -> Self {
Self {
status: Default::default(),
sent_at: Instant::now(),
}
}
tpu_service: TpuService,
}
impl TxSender {
pub fn new(
txs_sent_store: Arc<DashMap<String, TxProps>>,
tpu_service: Arc<TpuService>,
) -> Self {
pub fn new(txs_sent_store: TxStore, tpu_service: TpuService) -> Self {
Self {
tpu_service,
txs_sent_store,
@ -83,7 +63,7 @@ impl TxSender {
&self,
sigs_and_slots: Vec<(String, u64)>,
txs: Vec<WireTransaction>,
postgres: Option<NotificationSender>,
notifier: Option<NotificationSender>,
) {
assert_eq!(sigs_and_slots.len(), txs.len());
@ -121,14 +101,14 @@ impl TxSender {
};
quic_responses.push(quic_response);
}
if let Some(postgres) = &postgres {
let postgres_msgs = sigs_and_slots
if let Some(notifier) = &notifier {
let notification_msgs = sigs_and_slots
.iter()
.enumerate()
.map(|(index, (sig, recent_slot))| TransactionNotification {
signature: sig.clone(),
recent_slot: *recent_slot as i64,
forwarded_slot: forwarded_slot as i64,
recent_slot: *recent_slot,
forwarded_slot,
forwarded_local_time,
processed_slot: None,
cu_consumed: None,
@ -136,9 +116,8 @@ impl TxSender {
quic_response: quic_responses[index],
})
.collect();
postgres
.send(NotificationMsg::TxNotificationMsg(postgres_msgs))
.expect("Error writing to postgres service");
// ignore error on sent because the channel may be already closed
let _ = notifier.send(NotificationMsg::TxNotificationMsg(notification_msgs));
}
histo_timer.observe_duration();
trace!(
@ -152,7 +131,8 @@ impl TxSender {
pub fn execute(
self,
mut recv: Receiver<(String, WireTransaction, u64)>,
postgres_send: Option<NotificationSender>,
notifier: Option<NotificationSender>,
exit_signal: Arc<AtomicBool>,
) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
let tx_sender = self.clone();
@ -160,6 +140,9 @@ impl TxSender {
let mut sigs_and_slots = Vec::with_capacity(MAX_BATCH_SIZE_IN_PER_INTERVAL);
let mut txs = Vec::with_capacity(MAX_BATCH_SIZE_IN_PER_INTERVAL);
let mut timeout_interval = INTERVAL_PER_BATCH_IN_MS;
if exit_signal.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
// In solana there in sig verify stage rate is limited to 2000 txs in 50ms
// taking this as reference
@ -200,9 +183,10 @@ impl TxSender {
TX_BATCH_SIZES.set(txs.len() as i64);
tx_sender
.forward_txs(sigs_and_slots, txs, postgres_send.clone())
.forward_txs(sigs_and_slots, txs, notifier.clone())
.await;
}
Ok(())
})
}