Compare commits

...

15 Commits

Author SHA1 Message Date
Lou-Kamades 7530c4d77c
Merge branch 'main' into lou/tc-4 2024-04-06 16:32:40 +02:00
Christian Kamm 2d614365e7
experiment: reduce number of source endpoints to 1 (#386)
Prediction is that this has no negative effect but reduces memory use a
lot.
2024-04-03 21:06:29 +02:00
Groovie | Mango b84e880961
remove async_channel and bring back tokio mpsc§ (#384) 2024-04-03 11:22:36 +02:00
godmodegalactus 90dbcaa9d8
Adding more prometheus metrics to track QUIC errors details 2024-04-03 10:51:50 +02:00
Groovie | Mango 69d7dbb123
optimize mapping of produced block (#382)
optimize mapping of produced block
2024-04-03 10:40:53 +02:00
godmodegalactus 06373c0844
Closing QUIC connection correctly, changing some QUIC params 2024-04-03 10:25:30 +02:00
godmodegalactus 6b8fe682ae
Revert "Close connection while dropping"
This reverts commit 5ef1391b9a.
2024-04-03 10:02:45 +02:00
godmodegalactus 5ef1391b9a
Close connection while dropping 2024-04-02 23:06:52 +02:00
godmodegalactus d0a2fe200b
Using default solana timeout for quic connection 2024-04-02 21:45:35 +02:00
Groovie | Mango 944a93114c
Bugfix,379 identity key pair loading must be optional (#381)
fix identity keypair loading logic

reverting to this logic
1. check if env variable IDENTITY is present
2. check if cli arg --identity-keypair is present
3. assume no identity
2024-04-02 15:52:06 +02:00
godmodegalactus 12a6832c56
Swapping notify channel with broadcast channel 2024-04-02 15:07:01 +02:00
godmodegalactus ca3fa46139
Using broadcast channels instead of notify 2024-04-02 14:48:54 +02:00
godmodegalactus 91cf06436a
Changing exit signal from notify to broadcast channel in grpc multiplexer 2024-04-02 14:45:41 +02:00
galactus 681334197f
Adding custom tpu send transaction example (#380)
* Adding an example for custom tpu send transaction

* Fixing the custom tpu example

* Optimizing SentTransactionInfo, and calculating TPS

* Reverting unwanted changes

* After groovies review
2024-04-02 14:26:54 +02:00
Lou-Kamades 4d7700145e
fix parsing of use_grpc env var (#378) 2024-04-02 10:21:46 +02:00
26 changed files with 870 additions and 158 deletions

16
Cargo.lock generated
View File

@ -1220,8 +1220,22 @@ dependencies = [
name = "custom-tpu-send-transactions"
version = "0.2.4"
dependencies = [
"anyhow",
"bincode",
"clap 4.5.4",
"dashmap 5.5.3",
"futures",
"itertools 0.10.5",
"log",
"rand 0.8.5",
"rand_chacha 0.3.1",
"solana-lite-rpc-cluster-endpoints",
"solana-lite-rpc-core",
"solana-lite-rpc-services",
"solana-rpc-client",
"solana-sdk",
"tokio",
"tracing-subscriber",
]
[[package]]
@ -1779,7 +1793,7 @@ dependencies = [
[[package]]
name = "geyser-grpc-connector"
version = "0.10.1+yellowstone.1.12"
source = "git+https://github.com/blockworks-foundation/geyser-grpc-connector.git?tag=v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize4#ce6ca26028c4466e0236657a76b9db2cccf4d535"
source = "git+https://github.com/blockworks-foundation/geyser-grpc-connector.git?tag=v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize-with-broadcast-exit#688e4d241dd18d18f57345d592e803aa673fcd96"
dependencies = [
"anyhow",
"async-stream",

View File

@ -9,7 +9,7 @@ license = "AGPL"
[dependencies]
#geyser-grpc-connector = { path = "../../geyser-grpc-connector" }
geyser-grpc-connector = { tag = "v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize4", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" }
geyser-grpc-connector = { tag = "v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize-with-broadcast-exit", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" }
solana-sdk = { workspace = true }
solana-rpc-client-api = { workspace = true }

View File

@ -12,10 +12,8 @@ use solana_sdk::commitment_config::CommitmentConfig;
use solana_lite_rpc_core::solana_utils::hash_from_str;
use solana_lite_rpc_core::structures::block_info::BlockInfo;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast::Receiver;
use tokio::sync::Notify;
use tokio::sync::broadcast::{self, Receiver};
use tokio::task::JoinHandle;
use tokio::time::{sleep, Instant};
use tracing::debug_span;
@ -31,7 +29,7 @@ use crate::grpc_subscription::from_grpc_block_update;
fn create_grpc_multiplex_processed_block_task(
grpc_sources: &Vec<GrpcSourceConfig>,
block_sender: tokio::sync::mpsc::Sender<ProducedBlock>,
exit_notify: Arc<Notify>,
mut exit_notify: broadcast::Receiver<()>,
) -> Vec<JoinHandle<()>> {
const COMMITMENT_CONFIG: CommitmentConfig = CommitmentConfig::processed();
@ -43,7 +41,7 @@ fn create_grpc_multiplex_processed_block_task(
grpc_source.clone(),
GeyserFilter(COMMITMENT_CONFIG).blocks_and_txs(),
autoconnect_tx.clone(),
exit_notify.clone(),
exit_notify.resubscribe(),
);
tasks.push(task);
}
@ -51,7 +49,7 @@ fn create_grpc_multiplex_processed_block_task(
let jh_merging_streams = tokio::task::spawn(async move {
let mut slots_processed = BTreeSet::<u64>::new();
let mut last_tick = Instant::now();
loop {
'recv_loop: loop {
// recv loop
if last_tick.elapsed() > Duration::from_millis(800) {
warn!(
@ -66,22 +64,29 @@ fn create_grpc_multiplex_processed_block_task(
res = blocks_rx.recv() => {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break;
}
};
match blocks_rx_result {
Some(Message::GeyserSubscribeUpdate(subscribe_update)) => {
let mapfilter =
map_block_from_yellowstone_update(*subscribe_update, COMMITMENT_CONFIG);
if let Some((slot, produced_block)) = mapfilter {
assert_eq!(COMMITMENT_CONFIG, produced_block.commitment_config);
// note: avoid mapping of full block as long as possible
let extracted_slot = extract_slot_from_yellowstone_update(&subscribe_update);
if let Some(slot) = extracted_slot {
// check if the slot is in the map, if not check if the container is half full and the slot in question is older than the lowest value
// it means that the slot is too old to process
if !slots_processed.contains(&slot)
&& (slots_processed.len() < MAX_SIZE / 2
|| slot > slots_processed.first().cloned().unwrap_or_default())
if slots_processed.contains(&slot) {
continue 'recv_loop;
}
if slots_processed.len() >= MAX_SIZE / 2
&& slot <= slots_processed.first().cloned().unwrap_or_default()
{
continue 'recv_loop;
}
let mapfilter =
map_block_from_yellowstone_update(*subscribe_update, COMMITMENT_CONFIG);
if let Some((_slot, produced_block)) = mapfilter {
let send_started_at = Instant::now();
let send_result = block_sender
.send(produced_block)
@ -130,7 +135,7 @@ fn create_grpc_multiplex_block_info_task(
grpc_sources: &Vec<GrpcSourceConfig>,
block_info_sender: tokio::sync::mpsc::Sender<BlockInfo>,
commitment_config: CommitmentConfig,
exit_notify: Arc<Notify>,
mut exit_notify: broadcast::Receiver<()>,
) -> Vec<JoinHandle<()>> {
let (autoconnect_tx, mut blocks_rx) = tokio::sync::mpsc::channel(10);
let mut tasks = vec![];
@ -139,7 +144,7 @@ fn create_grpc_multiplex_block_info_task(
grpc_source.clone(),
GeyserFilter(commitment_config).blocks_meta(),
autoconnect_tx.clone(),
exit_notify.clone(),
exit_notify.resubscribe(),
);
tasks.push(task);
}
@ -151,7 +156,7 @@ fn create_grpc_multiplex_block_info_task(
res = blocks_rx.recv() => {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break;
}
};
@ -263,7 +268,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
tokio::sync::mpsc::channel::<BlockInfo>(500);
let (block_info_sender_finalized, mut block_info_reciever_finalized) =
tokio::sync::mpsc::channel::<BlockInfo>(500);
let exit_notify = Arc::new(Notify::new());
let (exit_sender, exit_notify) = broadcast::channel(1);
let processed_block_sender = processed_block_sender.clone();
reconnect_attempts += 1;
@ -280,7 +285,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
let processed_blocks_tasks = create_grpc_multiplex_processed_block_task(
&grpc_sources,
processed_block_sender.clone(),
exit_notify.clone(),
exit_notify.resubscribe(),
);
task_list.extend(processed_blocks_tasks);
@ -290,21 +295,21 @@ pub fn create_grpc_multiplex_blocks_subscription(
&grpc_sources,
block_info_sender_processed.clone(),
CommitmentConfig::processed(),
exit_notify.clone(),
exit_notify.resubscribe(),
);
task_list.extend(jh_meta_task_processed);
let jh_meta_task_confirmed = create_grpc_multiplex_block_info_task(
&grpc_sources,
block_info_sender_confirmed.clone(),
CommitmentConfig::confirmed(),
exit_notify.clone(),
exit_notify.resubscribe(),
);
task_list.extend(jh_meta_task_confirmed);
let jh_meta_task_finalized = create_grpc_multiplex_block_info_task(
&grpc_sources,
block_info_sender_finalized.clone(),
CommitmentConfig::finalized(),
exit_notify.clone(),
exit_notify,
);
task_list.extend(jh_meta_task_finalized);
@ -442,8 +447,12 @@ pub fn create_grpc_multiplex_blocks_subscription(
}
}
} // -- END receiver loop
exit_notify.notify_waiters();
futures::future::join_all(task_list).await;
if exit_sender.send(()).is_ok() {
futures::future::join_all(task_list).await;
} else {
log::error!("Problem sending exit signal");
task_list.iter().for_each(|x| x.abort());
}
} // -- END reconnect loop
});
@ -474,9 +483,9 @@ pub fn create_grpc_multiplex_processed_slots_subscription(
let jh_multiplex_task = tokio::spawn(async move {
loop {
let (autoconnect_tx, mut slots_rx) = tokio::sync::mpsc::channel(10);
let exit_notify = Arc::new(Notify::new());
let (exit_sender, exit_notify) = broadcast::channel(1);
let tasks = grpc_sources
let task_list = grpc_sources
.clone()
.iter()
.map(|grpc_source| {
@ -484,7 +493,7 @@ pub fn create_grpc_multiplex_processed_slots_subscription(
grpc_source.clone(),
GeyserFilter(COMMITMENT_CONFIG).slots(),
autoconnect_tx.clone(),
exit_notify.clone(),
exit_notify.resubscribe(),
)
})
.collect_vec();
@ -537,14 +546,29 @@ pub fn create_grpc_multiplex_processed_slots_subscription(
}
}
} // -- END receiver loop
exit_notify.notify_waiters();
futures::future::join_all(tasks).await;
if exit_sender.send(()).is_ok() {
futures::future::join_all(task_list).await;
} else {
log::error!("Problem sending exit signal");
task_list.iter().for_each(|x| x.abort());
}
} // -- END reconnect loop
});
(multiplexed_messages_rx, jh_multiplex_task)
}
fn extract_slot_from_yellowstone_update(update: &SubscribeUpdate) -> Option<Slot> {
match &update.update_oneof {
// list is not exhaustive
Some(UpdateOneof::Slot(update_message)) => Some(update_message.slot),
Some(UpdateOneof::BlockMeta(update_message)) => Some(update_message.slot),
Some(UpdateOneof::Block(update_message)) => Some(update_message.slot),
_ => None,
}
}
fn map_slot_from_yellowstone_update(update: SubscribeUpdate) -> Option<Slot> {
match update.update_oneof {
Some(UpdateOneof::Slot(update_slot_message)) => Some(update_slot_message.slot),

View File

@ -36,7 +36,7 @@ use solana_transaction_status::{Reward, RewardType};
use std::cell::OnceCell;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Notify;
use tokio::sync::{broadcast, Notify};
use tracing::trace_span;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
@ -273,12 +273,12 @@ fn map_compute_budget_instructions(message: &VersionedMessage) -> (Option<u32>,
pub fn create_block_processing_task(
grpc_addr: String,
grpc_x_token: Option<String>,
block_sx: async_channel::Sender<SubscribeUpdateBlock>,
block_sx: tokio::sync::mpsc::Sender<SubscribeUpdateBlock>,
commitment_level: CommitmentLevel,
exit_notfier: Arc<Notify>,
mut exit_notify: broadcast::Receiver<()>,
) -> AnyhowJoinHandle {
tokio::spawn(async move {
loop {
'main_loop: loop {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"block_client".to_string(),
@ -293,7 +293,8 @@ pub fn create_block_processing_task(
// connect to grpc
let mut client =
connect_with_timeout_hacked(grpc_addr.clone(), grpc_x_token.clone()).await?;
let mut stream = client
let mut stream = tokio::select! {
res = client
.subscribe_once(
HashMap::new(),
Default::default(),
@ -304,8 +305,13 @@ pub fn create_block_processing_task(
Some(commitment_level),
Default::default(),
None,
)
.await?;
) => {
res?
},
_ = exit_notify.recv() => {
break;
}
};
loop {
tokio::select! {
@ -338,8 +344,8 @@ pub fn create_block_processing_task(
}
};
},
_ = exit_notfier.notified() => {
break;
_ = exit_notify.recv() => {
break 'main_loop;
}
}
}
@ -348,6 +354,7 @@ pub fn create_block_processing_task(
log::error!("Grpc block subscription broken (resubscribing)");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Ok(())
})
}
@ -355,7 +362,7 @@ pub fn create_block_processing_task(
pub fn create_slot_stream_task(
grpc_addr: String,
grpc_x_token: Option<String>,
slot_sx: async_channel::Sender<SubscribeUpdateSlot>,
slot_sx: tokio::sync::mpsc::Sender<SubscribeUpdateSlot>,
commitment_level: CommitmentLevel,
) -> AnyhowJoinHandle {
tokio::spawn(async move {

View File

@ -14,7 +14,6 @@ pub fn poll_cluster_info(
loop {
match rpc_client.get_cluster_nodes().await {
Ok(cluster_nodes) => {
debug!("get cluster_nodes from rpc: {:?}", cluster_nodes.len());
if let Err(e) = contact_info_sender.send(cluster_nodes) {
warn!("rpc_cluster_info channel has no receivers {e:?}");
}
@ -23,7 +22,7 @@ pub fn poll_cluster_info(
Err(error) => {
warn!("rpc_cluster_info failed <{:?}> - retrying", error);
// throttle
tokio::time::sleep(Duration::from_secs(2500)).await;
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
}
@ -51,7 +50,7 @@ pub fn poll_vote_accounts(
Err(error) => {
warn!("rpc_vote_accounts failed <{:?}> - retrying", error);
// throttle
tokio::time::sleep(Duration::from_secs(2500)).await;
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
}

View File

@ -10,7 +10,7 @@
"transaction_retry_after_secs": 3,
"quic_proxy_addr": null,
"use_grpc": false,
"calculate_leader_schedule_form_geyser": false,
"calculate_leader_schedule_from_geyser": false,
"grpc_addr": "http://127.0.0.0:10000",
"grpc_x_token": null,
"postgres": {

View File

@ -2,22 +2,21 @@ use anyhow::Context;
use solana_sdk::signature::Keypair;
use std::env;
// note this is duplicated from lite-rpc module
pub async fn load_identity_keypair(
identity_path: Option<String>,
identity_keyfile_path: Option<String>,
) -> anyhow::Result<Option<Keypair>> {
let identity_str = if let Some(identity_from_cli) = identity_path {
tokio::fs::read_to_string(identity_from_cli)
let identity_jsonarray_str = if let Ok(identity_env_var) = env::var("IDENTITY") {
identity_env_var
} else if let Some(identity_path) = identity_keyfile_path {
tokio::fs::read_to_string(identity_path)
.await
.context("Cannot find the identity file provided")?
} else if let Ok(identity_env_var) = env::var("IDENTITY") {
identity_env_var
} else {
return Ok(None);
};
let identity_bytes: Vec<u8> =
serde_json::from_str(&identity_str).context("Invalid identity format expected Vec<u8>")?;
let identity_bytes: Vec<u8> = serde_json::from_str(&identity_jsonarray_str)
.context("Invalid identity format expected Vec<u8>")?;
Ok(Some(
Keypair::from_bytes(identity_bytes.as_slice()).context("Invalid identity")?,

View File

@ -8,5 +8,6 @@ pub mod stores;
pub mod structures;
pub mod traits;
pub mod types;
pub mod utils;
pub type AnyhowJoinHandle = tokio::task::JoinHandle<anyhow::Result<()>>;

View File

@ -127,7 +127,7 @@ impl PrioritizationFeesHeap {
#[cfg(test)]
mod tests {
use solana_sdk::signature::Signature;
use std::time::Duration;
use std::{sync::Arc, time::Duration};
use crate::structures::{
prioritization_fee_heap::PrioritizationFeesHeap, transaction_sent_info::SentTransactionInfo,
@ -139,7 +139,7 @@ mod tests {
let tx_creator = |signature, prioritization_fee| SentTransactionInfo {
signature,
slot: 0,
transaction: vec![],
transaction: Arc::new(vec![]),
last_valid_block_height: 0,
prioritization_fee,
};
@ -205,7 +205,7 @@ mod tests {
let info = SentTransactionInfo {
signature: Signature::new_unique(),
slot: height + 1,
transaction: vec![],
transaction: Arc::new(vec![]),
last_valid_block_height: height + 10,
prioritization_fee,
};

View File

@ -1,3 +1,5 @@
use std::sync::Arc;
use solana_sdk::signature::Signature;
use solana_sdk::slot_history::Slot;
@ -7,7 +9,7 @@ pub type WireTransaction = Vec<u8>;
pub struct SentTransactionInfo {
pub signature: Signature,
pub slot: Slot,
pub transaction: WireTransaction,
pub transaction: Arc<WireTransaction>,
pub last_valid_block_height: u64,
pub prioritization_fee: u64,
}

33
core/src/utils.rs Normal file
View File

@ -0,0 +1,33 @@
use std::time::Duration;
use log::debug;
use solana_sdk::commitment_config::CommitmentConfig;
use tokio::time::{timeout, Instant};
use crate::{structures::block_info::BlockInfo, types::BlockInfoStream};
pub async fn wait_till_block_of_commitment_is_recieved(
mut blockinfo_stream: BlockInfoStream,
commitment_config: CommitmentConfig,
) -> BlockInfo {
let started = Instant::now();
loop {
match timeout(Duration::from_millis(1000), blockinfo_stream.recv()).await {
Ok(Ok(block_info)) => {
if block_info.commitment_config == commitment_config {
return block_info;
}
}
Err(_elapsed) => {
debug!(
"waiting for latest block info ({}) ... {:.02}ms",
commitment_config.commitment,
started.elapsed().as_secs_f32() * 1000.0
);
}
Ok(Err(error)) => {
panic!("Did not recv block info : {error:?}");
}
}
}
}

View File

@ -10,4 +10,20 @@ edition.workspace = true
[dependencies]
solana-lite-rpc-services = {workspace = true}
solana-lite-rpc-core = {workspace = true}
solana-lite-rpc-core = {workspace = true}
solana-lite-rpc-cluster-endpoints = {workspace = true}
solana-sdk = { workspace = true }
solana-rpc-client = { workspace = true }
tokio = "1.28.2"
clap = { workspace = true }
anyhow = { workspace = true }
dashmap = { workspace = true }
rand = "0.8.5"
rand_chacha = "0.3.1"
log = { workspace = true }
itertools = { workspace = true }
bincode = { workspace = true }
futures = { workspace = true }
tracing-subscriber = { workspace = true }

View File

@ -0,0 +1,39 @@
use clap::Parser;
#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
pub struct Args {
/// config.json
#[arg(short, long, default_value = "http://127.0.0.1:8899")]
pub rpc_url: String,
#[arg(short, long)]
pub grpc_url: Option<String>,
#[arg(short, long)]
pub x_token: Option<String>,
#[arg(short, long)]
pub transaction_count: Option<usize>,
#[arg(short, long, default_value_t = 1)]
pub number_of_seconds: usize,
#[arg(short, long)]
pub fee_payer: String,
#[arg(short, long)]
pub staked_identity: Option<String>,
#[arg(short, long)]
pub priority_fees: Option<u64>,
#[arg(short = 'a', long, default_value_t = 256)]
pub additional_signers: usize,
#[arg(short = 'b', long, default_value_t = 0.1)]
pub signers_transfer_balance: f64,
#[arg(long)]
pub fanout_slots: Option<u64>,
}

View File

@ -1,3 +1,519 @@
fn main() {
todo!()
use std::{collections::HashSet, ops::Mul, str::FromStr, sync::Arc, time::Duration};
use clap::Parser;
use dashmap::{DashMap, DashSet};
use itertools::Itertools;
use rand::{
distributions::{Alphanumeric, Distribution},
SeedableRng,
};
use solana_lite_rpc_cluster_endpoints::{
geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig},
grpc_subscription::create_grpc_subscription,
json_rpc_leaders_getter::JsonRpcLeaderGetter,
json_rpc_subscription::create_json_rpc_polling_subscription,
};
use solana_lite_rpc_core::{
keypair_loader::load_identity_keypair,
stores::{
block_information_store::{BlockInformation, BlockInformationStore},
cluster_info_store::ClusterInfo,
data_cache::{DataCache, SlotCache},
subscription_store::SubscriptionStore,
tx_store::TxStore,
},
structures::{
epoch::EpochCache, identity_stakes::IdentityStakes, leaderschedule::CalculatedSchedule,
transaction_sent_info::SentTransactionInfo,
},
utils::wait_till_block_of_commitment_is_recieved,
};
use solana_lite_rpc_services::{
data_caching_service::DataCachingService,
quic_connection_utils::QuicConnectionParameters,
tpu_utils::{
tpu_connection_path::TpuConnectionPath,
tpu_service::{TpuService, TpuServiceConfig},
},
transaction_replayer::TransactionReplayer,
transaction_service::TransactionServiceBuilder,
tx_sender::TxSender,
};
use solana_sdk::{
commitment_config::CommitmentConfig,
compute_budget,
hash::Hash,
instruction::Instruction,
message::Message,
native_token::LAMPORTS_PER_SOL,
pubkey::Pubkey,
signature::{Keypair, Signature},
signer::Signer,
system_instruction,
transaction::Transaction,
};
use tokio::sync::{Mutex, RwLock};
use crate::cli::Args;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
mod cli;
const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr";
pub fn create_memo_tx(msg: &[u8], payer: &Keypair, blockhash: Hash, prio_fees: u64) -> Transaction {
let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap();
let cb_1 = compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(7000);
let cb_2 = compute_budget::ComputeBudgetInstruction::set_compute_unit_price(prio_fees);
let instruction = Instruction::new_with_bytes(memo, msg, vec![]);
let message = Message::new(&[cb_1, cb_2, instruction], Some(&payer.pubkey()));
Transaction::new(&[payer], message, blockhash)
}
pub fn generate_random_strings(
num_of_txs: usize,
random_seed: Option<u64>,
n_chars: usize,
) -> Vec<Vec<u8>> {
let seed = random_seed.map_or(0, |x| x);
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(seed);
(0..num_of_txs)
.map(|_| Alphanumeric.sample_iter(&mut rng).take(n_chars).collect())
.collect()
}
pub async fn create_signers_from_payer(
rpc_client: Arc<RpcClient>,
payer: Arc<Keypair>,
nb_signers: usize,
signer_balance: u64,
prio_fees: u64,
) -> Vec<Arc<Keypair>> {
let signers = (0..nb_signers)
.map(|_| Arc::new(Keypair::new()))
.collect_vec();
let mut signers_to_transfer: HashSet<Pubkey> = signers.iter().map(|kp| kp.pubkey()).collect();
while !signers_to_transfer.is_empty() {
let Ok(blockhash) = rpc_client.get_latest_blockhash().await else {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
};
let cb_1 = compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(5000);
let cb_2 = compute_budget::ComputeBudgetInstruction::set_compute_unit_price(prio_fees);
let transactions = signers_to_transfer
.iter()
.map(|signer| {
let instruction =
system_instruction::transfer(&payer.pubkey(), signer, signer_balance);
let message = Message::new(
&[cb_1.clone(), cb_2.clone(), instruction],
Some(&payer.pubkey()),
);
(*signer, Transaction::new(&[&payer], message, blockhash))
})
.collect_vec();
let tasks = transactions
.iter()
.map(|(signer, tx)| {
let rpc_client = rpc_client.clone();
let tx = tx.clone();
let signer = *signer;
tokio::spawn(
async move { (signer, rpc_client.send_and_confirm_transaction(&tx).await) },
)
})
.collect_vec();
let results = futures::future::join_all(tasks).await;
for result in results {
match result {
Ok((signer, Ok(_signature))) => {
signers_to_transfer.remove(&signer);
}
Ok((signer, Err(e))) => {
log::error!("Error transfering to {signer:?}, {e:?}");
}
_ => {
// retry
}
}
}
}
signers
}
pub async fn transfer_back_to_payer(
rpc_client: Arc<RpcClient>,
payer: Arc<Keypair>,
signers: Vec<Arc<Keypair>>,
prio_fees: u64,
) {
let mut signers_to_transfer: HashSet<Pubkey> = signers.iter().map(|kp| kp.pubkey()).collect();
let payer_pubkey = payer.pubkey();
while !signers_to_transfer.is_empty() {
let Ok(blockhash) = rpc_client.get_latest_blockhash().await else {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
};
let transfers = signers
.iter()
.map(|signer| {
let rpc_client = rpc_client.clone();
let signer = signer.clone();
tokio::spawn(async move {
let balance = rpc_client.get_balance(&signer.pubkey()).await.unwrap();
let cb_1 =
compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(5000);
let cb_2 =
compute_budget::ComputeBudgetInstruction::set_compute_unit_price(prio_fees);
let balance_to_transfer = balance
.saturating_sub(5000)
.saturating_sub(5000 * prio_fees);
let instruction = system_instruction::transfer(
&signer.pubkey(),
&payer_pubkey,
balance_to_transfer,
);
let message = Message::new(
&[cb_1.clone(), cb_2.clone(), instruction],
Some(&signer.pubkey()),
);
(
signer.pubkey(),
rpc_client
.send_and_confirm_transaction(&Transaction::new(
&[&signer],
message,
blockhash,
))
.await,
)
})
})
.collect_vec();
let results = futures::future::join_all(transfers).await;
for result in results {
match result {
Ok((signer, Ok(_signature))) => {
signers_to_transfer.remove(&signer);
}
Ok((signer, Err(e))) => {
log::error!("Error transfering to {signer:?}, {e:?}");
}
_ => {
// retry
}
}
}
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let args = Args::parse();
let rpc_url = args.rpc_url;
let rpc_client = Arc::new(RpcClient::new(rpc_url));
let leader_schedule = Arc::new(JsonRpcLeaderGetter::new(rpc_client.clone(), 1024, 128));
let fee_payer = Arc::new(
load_identity_keypair(Some(args.fee_payer))
.await
.expect("Payer should be set or keypair file not found")
.unwrap(),
);
let priority_fee = args.priority_fees.unwrap_or_default();
let nb_signers = args.additional_signers;
let signer_balance = args.signers_transfer_balance.mul(LAMPORTS_PER_SOL as f64) as u64;
let signers = create_signers_from_payer(
rpc_client.clone(),
fee_payer.clone(),
nb_signers,
signer_balance,
priority_fee,
)
.await;
println!(
"Creating {} users with {} SOL balance",
nb_signers, signer_balance
);
let validator_identity = Arc::new(
load_identity_keypair(args.staked_identity)
.await?
.unwrap_or_else(Keypair::new),
);
// START ALL SERVICES REQUIRED BY LITE_RPC
// setup endpoint, GRPC/RPC Polling
println!("Setting up lite-rpc tpu service");
let (endpoints, _handles) = if let Some(grpc_addr) = args.grpc_url {
let timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(10),
request_timeout: Duration::from_secs(10),
subscribe_timeout: Duration::from_secs(10),
receive_timeout: Duration::from_secs(10),
};
create_grpc_subscription(
rpc_client.clone(),
vec![GrpcSourceConfig::new(
grpc_addr,
args.x_token.clone(),
None,
timeouts,
)],
vec![],
)?
} else {
create_json_rpc_polling_subscription(rpc_client.clone(), 100)?
};
let finalized_block_information = wait_till_block_of_commitment_is_recieved(
endpoints.blockinfo_notifier.resubscribe(),
CommitmentConfig::finalized(),
)
.await;
let block_height = rpc_client
.get_block_height_with_commitment(CommitmentConfig::finalized())
.await?;
let (blockhash, _) = rpc_client
.get_latest_blockhash_with_commitment(CommitmentConfig::finalized())
.await?;
let finalize_slot = finalized_block_information.slot;
println!(
"finalized blockheight : {:?}, slot: {}, hash: {}",
finalized_block_information.block_height,
finalized_block_information.slot,
finalized_block_information.blockhash
);
println!(
"From RPC blockheight : {block_height:?}, hash: {}",
blockhash
);
let finalized_block_information = BlockInformation {
slot: finalized_block_information.slot,
block_height,
last_valid_blockheight: finalized_block_information.block_height + 300,
cleanup_slot: finalized_block_information.slot + 1000000,
blockhash: finalized_block_information.blockhash,
commitment_config: CommitmentConfig::finalized(),
block_time: 0,
};
let block_information_store = BlockInformationStore::new(finalized_block_information);
let data_cache = DataCache {
block_information_store,
cluster_info: ClusterInfo::default(),
identity_stakes: IdentityStakes::new(validator_identity.pubkey()),
slot_cache: SlotCache::new(finalize_slot),
tx_subs: SubscriptionStore::default(),
txs: TxStore {
store: Arc::new(DashMap::new()),
},
epoch_data: EpochCache::new_for_tests(),
leader_schedule: Arc::new(RwLock::new(CalculatedSchedule::default())),
};
let data_cache_service = DataCachingService {
data_cache: data_cache.clone(),
clean_duration: Duration::from_secs(120),
};
// start listning the cluster data and filling the cache
data_cache_service.listen(
endpoints.blocks_notifier.resubscribe(),
endpoints.blockinfo_notifier,
endpoints.slot_notifier.resubscribe(),
endpoints.cluster_info_notifier,
endpoints.vote_account_notifier,
);
let count = args.transaction_count.unwrap_or(10);
let prioritization_heap_size = Some(count * args.number_of_seconds);
let tpu_config = TpuServiceConfig {
fanout_slots: args.fanout_slots.unwrap_or(16),
maximum_transaction_in_queue: 2000000,
quic_connection_params: QuicConnectionParameters {
connection_timeout: Duration::from_secs(60),
connection_retry_count: 10,
finalize_timeout: Duration::from_millis(10000),
max_number_of_connections: 4,
unistream_timeout: Duration::from_millis(1000),
write_timeout: Duration::from_secs(10),
number_of_transactions_per_unistream: 1,
unistreams_to_create_new_connection_in_percentage: 5,
prioritization_heap_size,
},
tpu_connection_path: TpuConnectionPath::QuicDirectPath,
};
let tpu_service: TpuService = TpuService::new(
tpu_config,
validator_identity,
leader_schedule,
data_cache.clone(),
)
.await?;
let transaction_service_builder = TransactionServiceBuilder::new(
TxSender::new(data_cache.clone(), tpu_service.clone()),
TransactionReplayer::new(
tpu_service.clone(),
data_cache.clone(),
Duration::from_secs(1),
),
tpu_service,
10000,
);
let (transaction_service, _) = transaction_service_builder.start(
None,
data_cache.block_information_store.clone(),
10,
endpoints.slot_notifier,
);
// CREATE TRANSACTIONS
log::info!("Creating memo transactions");
let memo_msgs = generate_random_strings(count * args.number_of_seconds, None, 5);
let mut tx_to_confirm = vec![];
let mut second = 1;
let mut signer_count = 0;
let map_of_signature = Arc::new(DashSet::<Signature>::new());
let transactions_in_blocks = Arc::new(Mutex::new(Vec::<u64>::new()));
let mut block_stream = endpoints.blocks_notifier;
let block_tps_task = {
let map_of_signature = map_of_signature.clone();
let transactions_in_blocks = transactions_in_blocks.clone();
tokio::spawn(async move {
let mut start_tracking = false;
while let Ok(block) = block_stream.recv().await {
let mut count = 0;
for transaction in &block.transactions {
if map_of_signature.contains(&transaction.signature) {
count += 1;
map_of_signature.remove(&transaction.signature);
}
}
// start tracking once we have first block with some transactions sent by us
if start_tracking || count > 0 {
start_tracking = true;
let mut lk = transactions_in_blocks.lock().await;
lk.push(count);
}
}
})
};
for chunk in memo_msgs.chunks(count) {
let instant = tokio::time::Instant::now();
let mut current_txs = vec![];
println!("Sending memo transactions :{}", second);
second += 1;
let bh = data_cache
.block_information_store
.get_latest_blockhash(CommitmentConfig::finalized())
.await;
let last_valid_block_height =
data_cache.block_information_store.get_last_blockheight() + 300;
let transactions = chunk
.iter()
.map(|x| {
signer_count += 1;
create_memo_tx(x, &signers[signer_count % nb_signers], bh, priority_fee)
})
.collect_vec();
for transaction in transactions {
let signature = transaction.signatures[0];
let raw_tx = bincode::serialize(&transaction).unwrap();
let slot = data_cache.slot_cache.get_current_slot();
map_of_signature.insert(signature);
let transaction_info = SentTransactionInfo {
signature,
last_valid_block_height,
slot,
transaction: Arc::new(raw_tx),
prioritization_fee: priority_fee,
};
let _ = transaction_service
.transaction_channel
.send(transaction_info.clone())
.await;
current_txs.push(signature);
}
tx_to_confirm.push(current_txs);
let millis = instant.elapsed().as_millis() as u64;
if millis < 1000 {
tokio::time::sleep(Duration::from_millis(1000 - millis)).await;
} else {
println!("took {millis:?} millis to send {count:?} transactions");
}
}
println!(
"{} memo transactions sent, waiting for a minute to confirm them",
count * args.number_of_seconds
);
tokio::time::sleep(Duration::from_secs(120)).await;
let mut second = 1;
for seconds_sigs in tx_to_confirm {
let mut tx_confirmed = 0;
for sig in seconds_sigs {
if data_cache.txs.is_transaction_confirmed(&sig) {
tx_confirmed += 1;
}
}
println!(
"{} or {} transactions were confirmed for the {} second",
tx_confirmed, count, second
);
second += 1;
}
block_tps_task.abort();
let lk = transactions_in_blocks.lock().await;
// stop tracking by removing trailling 0s
let mut vec = lk.clone();
vec.reverse();
let mut transaction_blocks = vec.iter().skip_while(|x| **x == 0).cloned().collect_vec();
transaction_blocks.reverse();
println!(
"BLOCKS transactions : {}",
transaction_blocks.iter().map(|x| x.to_string()).join(", ")
);
let sum = transaction_blocks.iter().sum::<u64>();
let seconds = (transaction_blocks.len() * 400 / 1000) as u64;
let tps = sum / seconds;
println!("EFFECTIVE TPS: {tps:?}");
println!("Transfering remaining lamports to payer");
transfer_back_to_payer(rpc_client, fee_payer, signers, priority_fee).await;
Ok(())
}

View File

@ -377,7 +377,7 @@ impl LiteRpcServer for LiteBridge {
match self
.transaction_service
.send_transaction(raw_tx, max_retries)
.send_wire_transaction(raw_tx, max_retries)
.await
{
Ok(sig) => {

View File

@ -49,7 +49,7 @@ pub struct Config {
#[serde(default)]
pub use_grpc: bool,
#[serde(default)]
pub calculate_leader_schedule_form_geyser: bool,
pub calculate_leader_schedule_from_geyser: bool,
#[serde(default = "Config::default_grpc_addr")]
pub grpc_addr: String,
#[serde(default)]
@ -142,11 +142,8 @@ impl Config {
.map(|size| size.parse().unwrap())
.unwrap_or(config.fanout_size);
// IDENTITY env sets value of identity_keypair
// config.identity_keypair = env::var("IDENTITY")
// .map(Some)
// .unwrap_or(config.identity_keypair);
// note: identity config is handled in load_identity_keypair
// the behavior is different from the other config values as it does either take a file path or the keypair as json array
config.prometheus_addr = env::var("PROMETHEUS_ADDR").unwrap_or(config.prometheus_addr);
@ -161,7 +158,7 @@ impl Config {
config.quic_proxy_addr = env::var("QUIC_PROXY_ADDR").ok();
config.use_grpc = env::var("USE_GRPC")
.map(|_| true)
.map(|value| value.parse::<bool>().unwrap())
.unwrap_or(config.use_grpc);
// source 1

View File

@ -11,7 +11,7 @@ use lite_rpc::postgres_logger::PostgresLogger;
use lite_rpc::service_spawner::ServiceSpawner;
use lite_rpc::start_server::start_servers;
use lite_rpc::DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE;
use log::{debug, info};
use log::info;
use solana_lite_rpc_accounts::account_service::AccountService;
use solana_lite_rpc_accounts::account_store_interface::AccountStorageInterface;
use solana_lite_rpc_accounts::inmemory_account_store::InmemoryAccountStore;
@ -44,7 +44,8 @@ use solana_lite_rpc_core::structures::{
epoch::EpochCache, identity_stakes::IdentityStakes, notifications::NotificationSender,
};
use solana_lite_rpc_core::traits::address_lookup_table_interface::AddressLookupTableInterface;
use solana_lite_rpc_core::types::{BlockInfoStream, BlockStream};
use solana_lite_rpc_core::types::BlockStream;
use solana_lite_rpc_core::utils::wait_till_block_of_commitment_is_recieved;
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_lite_rpc_prioritization_fees::account_prio_service::AccountPrioService;
use solana_lite_rpc_services::data_caching_service::DataCachingService;
@ -54,7 +55,6 @@ use solana_lite_rpc_services::transaction_replayer::TransactionReplayer;
use solana_lite_rpc_services::tx_sender::TxSender;
use lite_rpc::postgres_logger;
use solana_lite_rpc_core::structures::block_info::BlockInfo;
use solana_lite_rpc_prioritization_fees::start_block_priofees_task;
use solana_lite_rpc_util::obfuscate_rpcurl;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
@ -67,7 +67,6 @@ use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::sync::mpsc;
use tokio::sync::RwLock;
use tokio::time::{timeout, Instant};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::EnvFilter;
@ -76,38 +75,6 @@ use tracing_subscriber::EnvFilter;
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
// export _RJEM_MALLOC_CONF=prof:true,lg_prof_interval:30,lg_prof_sample:21,prof_prefix:/tmp/jeprof
use jemalloc_ctl::{epoch, stats};
use std::{thread, time::{self}};
async fn get_latest_block_info(
mut blockinfo_stream: BlockInfoStream,
commitment_config: CommitmentConfig,
) -> BlockInfo {
let started = Instant::now();
loop {
match timeout(Duration::from_millis(500), blockinfo_stream.recv()).await {
Ok(Ok(block_info)) => {
if block_info.commitment_config == commitment_config {
return block_info;
}
}
Err(_elapsed) => {
debug!(
"waiting for latest block info ({}) ... {:.02}ms",
commitment_config.commitment,
started.elapsed().as_secs_f32() * 1000.0
);
}
Ok(Err(_error)) => {
panic!("Did not recv block info");
}
}
}
}
pub async fn start_postgres(
config: Option<postgres_logger::PostgresSessionConfig>,
) -> anyhow::Result<(Option<NotificationSender>, AnyhowJoinHandle)> {
@ -261,7 +228,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
};
info!("Waiting for first finalized block info...");
let finalized_block_info = get_latest_block_info(
let finalized_block_info = wait_till_block_of_commitment_is_recieved(
blockinfo_notifier.resubscribe(),
CommitmentConfig::finalized(),
)
@ -345,7 +312,6 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
};
let spawner = ServiceSpawner {
prometheus_addr,
data_cache: data_cache.clone(),
};
//init grpc leader schedule and vote account is configured.
@ -370,7 +336,8 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
slot_notifier.resubscribe(),
);
let support_service = tokio::spawn(async move { spawner.spawn_support_services().await });
let support_service =
tokio::spawn(async move { spawner.spawn_support_services(prometheus_addr).await });
let history = History::new();

View File

@ -17,15 +17,14 @@ use solana_lite_rpc_services::{
use std::time::Duration;
pub struct ServiceSpawner {
pub prometheus_addr: String,
pub data_cache: DataCache,
}
impl ServiceSpawner {
/// spawn services that support the whole system
pub async fn spawn_support_services(&self) -> anyhow::Result<()> {
pub async fn spawn_support_services(&self, prometheus_addr: String) -> anyhow::Result<()> {
// spawn prometheus
let prometheus = PrometheusSync::sync(self.prometheus_addr.clone());
let prometheus = PrometheusSync::sync(prometheus_addr.clone());
// spawn metrics capture
let metrics = MetricsCapture::new(self.data_cache.txs.clone()).capture();

View File

@ -61,6 +61,7 @@ const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameter
write_timeout: Duration::from_secs(2),
number_of_transactions_per_unistream: 10,
unistreams_to_create_new_connection_in_percentage: 10,
prioritization_heap_size: None,
};
#[test]
@ -743,7 +744,7 @@ pub fn build_raw_sample_tx(i: u32) -> SentTransactionInfo {
let tx = build_sample_tx(&payer_keypair, i);
let transaction =
bincode::serialize::<VersionedTransaction>(&tx).expect("failed to serialize tx");
Arc::new(bincode::serialize::<VersionedTransaction>(&tx).expect("failed to serialize tx"));
SentTransactionInfo {
signature: *tx.get_signature(),

View File

@ -35,8 +35,10 @@ pub async fn main() -> anyhow::Result<()> {
dotenv().ok();
let proxy_listener_addr = proxy_listen_addr.parse().unwrap();
let validator_identity =
ValidatorIdentity::new(load_identity_keypair(Some(identity_keypair)).await?);
let validator_identity = ValidatorIdentity::new(
load_identity_keypair(Some(identity_keypair).filter(|s| !s.is_empty())).await?,
);
let tls_config = Arc::new(SelfSignedTlsConfigProvider::new_singleton_self_signed_localhost());
let main_services = QuicForwardProxy::new(proxy_listener_addr, tls_config, validator_identity)

View File

@ -4,7 +4,7 @@ use crate::quic_connection_utils::{
use futures::FutureExt;
use log::warn;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use quinn::{Connection, Endpoint};
use quinn::{Connection, Endpoint, VarInt};
use solana_lite_rpc_core::structures::rotating_queue::RotatingQueue;
use solana_sdk::pubkey::Pubkey;
use std::{
@ -14,7 +14,7 @@ use std::{
Arc,
},
};
use tokio::sync::{Notify, OwnedSemaphorePermit, RwLock, Semaphore};
use tokio::sync::{broadcast, OwnedSemaphorePermit, RwLock, Semaphore};
pub type EndpointPool = RotatingQueue<Endpoint>;
@ -40,7 +40,6 @@ pub struct QuicConnection {
identity: Pubkey,
socket_address: SocketAddr,
connection_params: QuicConnectionParameters,
exit_notify: Arc<Notify>,
timeout_counters: Arc<AtomicU64>,
has_connected_once: Arc<AtomicBool>,
}
@ -51,7 +50,6 @@ impl QuicConnection {
endpoint: Endpoint,
socket_address: SocketAddr,
connection_params: QuicConnectionParameters,
exit_notify: Arc<Notify>,
) -> Self {
Self {
connection: Arc::new(RwLock::new(None)),
@ -60,13 +58,16 @@ impl QuicConnection {
identity,
socket_address,
connection_params,
exit_notify,
timeout_counters: Arc::new(AtomicU64::new(0)),
has_connected_once: Arc::new(AtomicBool::new(false)),
}
}
async fn connect(&self, is_already_connected: bool) -> Option<Connection> {
async fn connect(
&self,
is_already_connected: bool,
exit_notify: broadcast::Receiver<()>,
) -> Option<Connection> {
QuicConnectionUtils::connect(
self.identity,
is_already_connected,
@ -74,12 +75,12 @@ impl QuicConnection {
self.socket_address,
self.connection_params.connection_timeout,
self.connection_params.connection_retry_count,
self.exit_notify.clone(),
exit_notify,
)
.await
}
pub async fn get_connection(&self) -> Option<Connection> {
pub async fn get_connection(&self, exit_notify: broadcast::Receiver<()>) -> Option<Connection> {
// get new connection reset if necessary
let last_stable_id = self.last_stable_id.load(Ordering::Relaxed) as usize;
let conn = self.connection.read().await.clone();
@ -95,7 +96,7 @@ impl QuicConnection {
Some(connection)
} else {
NB_QUIC_CONNECTION_RESET.inc();
let new_conn = self.connect(true).await;
let new_conn = self.connect(true, exit_notify).await;
if let Some(new_conn) = new_conn {
*conn = Some(new_conn);
conn.clone()
@ -116,7 +117,7 @@ impl QuicConnection {
// connection has recently been established/ just use it
return (*lk).clone();
}
let connection = self.connect(false).await;
let connection = self.connect(false, exit_notify).await;
*lk = connection.clone();
self.has_connected_once.store(true, Ordering::Relaxed);
connection
@ -124,17 +125,16 @@ impl QuicConnection {
}
}
pub async fn send_transaction(&self, tx: Vec<u8>) {
pub async fn send_transaction(&self, tx: &Vec<u8>, mut exit_notify: broadcast::Receiver<()>) {
let connection_retry_count = self.connection_params.connection_retry_count;
for _ in 0..connection_retry_count {
let mut do_retry = false;
let exit_notify = self.exit_notify.clone();
let connection = tokio::select! {
conn = self.get_connection() => {
conn = self.get_connection(exit_notify.resubscribe()) => {
conn
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break;
}
};
@ -149,7 +149,7 @@ impl QuicConnection {
) => {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break;
}
};
@ -158,13 +158,13 @@ impl QuicConnection {
let write_add_result = tokio::select! {
res = QuicConnectionUtils::write_all(
send_stream,
&tx,
tx,
self.identity,
self.connection_params,
) => {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break;
}
};
@ -225,6 +225,13 @@ impl QuicConnection {
None => false,
}
}
pub async fn close(&self) {
let lk = self.connection.read().await;
if let Some(connection) = lk.as_ref() {
connection.close(VarInt::from_u32(0), b"Not needed");
}
}
}
#[derive(Clone)]
@ -247,7 +254,6 @@ impl QuicConnectionPool {
endpoints: EndpointPool,
socket_address: SocketAddr,
connection_parameters: QuicConnectionParameters,
exit_notify: Arc<Notify>,
nb_connection: usize,
max_number_of_unistream_connection: usize,
) -> Self {
@ -259,7 +265,6 @@ impl QuicConnectionPool {
endpoints.get().expect("Should get and endpoint"),
socket_address,
connection_parameters,
exit_notify.clone(),
));
}
Self {
@ -321,4 +326,10 @@ impl QuicConnectionPool {
pub fn is_empty(&self) -> bool {
self.connections.is_empty()
}
pub async fn close_all(&self) {
for connection in &self.connections {
connection.close().await;
}
}
}

View File

@ -14,7 +14,7 @@ use std::{
sync::Arc,
time::Duration,
};
use tokio::{sync::Notify, time::timeout};
use tokio::{sync::broadcast, time::timeout};
lazy_static::lazy_static! {
static ref NB_QUIC_0RTT_ATTEMPTED: GenericGauge<prometheus::core::AtomicI64> =
@ -45,6 +45,31 @@ lazy_static::lazy_static! {
static ref NB_QUIC_FINISH_ERRORED: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_finish_errored", "Number of times finish errored")).unwrap();
static ref NB_QUIC_CONNECTION_ERROR_VERSION_MISMATCH: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_connection_error_version_mismatch", "Number of times connection errored VersionMismatch")).unwrap();
static ref NB_QUIC_CONNECTION_ERROR_TRANSPORT_ERROR: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_connection_error_transport_error", "Number of times connection errored TransportError")).unwrap();
static ref NB_QUIC_CONNECTION_ERROR_CONNECTION_CLOSED: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_connection_error_connection_closed", "Number of times connection errored ConnectionClosed")).unwrap();
static ref NB_QUIC_CONNECTION_ERROR_APPLICATION_CLOSED: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_connection_error_application_closed", "Number of times connection errored ApplicationClosed")).unwrap();
static ref NB_QUIC_CONNECTION_ERROR_RESET: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_connection_error_reset", "Number of times connection errored Reset")).unwrap();
static ref NB_QUIC_CONNECTION_ERROR_TIMEDOUT: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_connection_error_timed_out", "Number of times connection errored TimedOut")).unwrap();
static ref NB_QUIC_CONNECTION_ERROR_LOCALLY_CLOSED: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_connection_error_locally_closed", "Number of times connection errored locally closed")).unwrap();
static ref NB_QUIC_WRITE_ERROR_STOPPED: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_write_error_stopped", "Number of times write_error Stopped")).unwrap();
static ref NB_QUIC_WRITE_ERROR_CONNECTION_LOST: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_write_error_connection_lost", "Number of times write_error ConnectionLost")).unwrap();
static ref NB_QUIC_WRITE_ERROR_UNKNOWN_STREAM: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_write_error_unknown_stream", "Number of times write_error UnknownStream")).unwrap();
static ref NB_QUIC_WRITE_ERROR_0RTT_REJECT: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_write_error_0RTT_reject", "Number of times write_error ZeroRttRejected")).unwrap();
static ref NB_QUIC_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap();
@ -83,19 +108,21 @@ pub struct QuicConnectionParameters {
pub max_number_of_connections: usize,
pub number_of_transactions_per_unistream: usize,
pub unistreams_to_create_new_connection_in_percentage: u8,
pub prioritization_heap_size: Option<usize>,
}
impl Default for QuicConnectionParameters {
fn default() -> Self {
Self {
connection_timeout: Duration::from_millis(10000),
connection_timeout: Duration::from_millis(60000),
unistream_timeout: Duration::from_millis(10000),
write_timeout: Duration::from_millis(10000),
finalize_timeout: Duration::from_millis(10000),
finalize_timeout: Duration::from_millis(20000),
connection_retry_count: 20,
max_number_of_connections: 8,
number_of_transactions_per_unistream: 1,
unistreams_to_create_new_connection_in_percentage: 10,
prioritization_heap_size: None,
}
}
}
@ -167,6 +194,25 @@ impl QuicConnectionUtils {
}
Err(e) => {
NB_QUIC_CONNECTION_ERRORED.inc();
match &e {
ConnectionError::VersionMismatch => {
NB_QUIC_CONNECTION_ERROR_VERSION_MISMATCH.inc()
}
ConnectionError::TransportError(_) => {
NB_QUIC_CONNECTION_ERROR_TRANSPORT_ERROR.inc()
}
ConnectionError::ConnectionClosed(_) => {
NB_QUIC_CONNECTION_ERROR_CONNECTION_CLOSED.inc()
}
ConnectionError::ApplicationClosed(_) => {
NB_QUIC_CONNECTION_ERROR_APPLICATION_CLOSED.inc()
}
ConnectionError::Reset => NB_QUIC_CONNECTION_ERROR_RESET.inc(),
ConnectionError::TimedOut => NB_QUIC_CONNECTION_ERROR_TIMEDOUT.inc(),
ConnectionError::LocallyClosed => {
NB_QUIC_CONNECTION_ERROR_LOCALLY_CLOSED.inc()
}
}
Err(e.into())
}
},
@ -219,7 +265,7 @@ impl QuicConnectionUtils {
addr: SocketAddr,
connection_timeout: Duration,
connection_retry_count: usize,
exit_notified: Arc<Notify>,
mut exit_notified: broadcast::Receiver<()>,
) -> Option<Connection> {
for _ in 0..connection_retry_count {
let conn = if already_connected {
@ -228,7 +274,7 @@ impl QuicConnectionUtils {
res = Self::make_connection_0rtt(endpoint.clone(), addr, connection_timeout) => {
res
},
_ = exit_notified.notified() => {
_ = exit_notified.recv() => {
break;
}
}
@ -238,7 +284,7 @@ impl QuicConnectionUtils {
res = Self::make_connection(endpoint.clone(), addr, connection_timeout) => {
res
},
_ = exit_notified.notified() => {
_ = exit_notified.recv() => {
break;
}
}
@ -271,6 +317,17 @@ impl QuicConnectionUtils {
match write_timeout_res {
Ok(write_res) => {
if let Err(e) = write_res {
match &e {
quinn::WriteError::Stopped(_) => NB_QUIC_WRITE_ERROR_STOPPED.inc(),
quinn::WriteError::ConnectionLost(_) => {
NB_QUIC_WRITE_ERROR_CONNECTION_LOST.inc()
}
quinn::WriteError::UnknownStream => {
NB_QUIC_WRITE_ERROR_UNKNOWN_STREAM.inc()
}
quinn::WriteError::ZeroRttRejected => NB_QUIC_WRITE_ERROR_0RTT_REJECT.inc(),
};
trace!(
"Error while writing transaction for {}, error {}",
identity,
@ -295,6 +352,16 @@ impl QuicConnectionUtils {
match finish_timeout_res {
Ok(finish_res) => {
if let Err(e) = finish_res {
match &e {
quinn::WriteError::Stopped(_) => NB_QUIC_WRITE_ERROR_STOPPED.inc(),
quinn::WriteError::ConnectionLost(_) => {
NB_QUIC_WRITE_ERROR_CONNECTION_LOST.inc()
}
quinn::WriteError::UnknownStream => {
NB_QUIC_WRITE_ERROR_UNKNOWN_STREAM.inc()
}
quinn::WriteError::ZeroRttRejected => NB_QUIC_WRITE_ERROR_0RTT_REJECT.inc(),
};
trace!(
"Error while finishing transaction for {}, error {}",
identity,

View File

@ -179,6 +179,7 @@ impl QuicProxyConnectionManager {
transaction,
..
}) => {
let transaction = transaction.as_ref().clone();
TxData::new(signature, transaction)
},
Err(e) => {
@ -195,6 +196,7 @@ impl QuicProxyConnectionManager {
transaction,
..
}) => {
let transaction = transaction.as_ref().clone();
txs.push(TxData::new(signature, transaction));
},
Err(TryRecvError::Empty) => {

View File

@ -15,7 +15,7 @@ use solana_sdk::pubkey::Pubkey;
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
use tokio::sync::{
broadcast::{Receiver, Sender},
broadcast::{self, Receiver, Sender},
Notify,
};
@ -48,7 +48,7 @@ struct ActiveConnection {
tpu_address: SocketAddr,
data_cache: DataCache,
connection_parameters: QuicConnectionParameters,
exit_notifier: Arc<Notify>,
exit_notifier: broadcast::Sender<()>,
}
impl ActiveConnection {
@ -59,13 +59,14 @@ impl ActiveConnection {
data_cache: DataCache,
connection_parameters: QuicConnectionParameters,
) -> Self {
let (exit_notifier, _) = broadcast::channel(1);
Self {
endpoints,
tpu_address,
identity,
data_cache,
connection_parameters,
exit_notifier: Arc::new(Notify::new()),
exit_notifier,
}
}
@ -79,7 +80,6 @@ impl ActiveConnection {
let fill_notify = Arc::new(Notify::new());
let identity = self.identity;
let exit_notifier = self.exit_notifier.clone();
NB_QUIC_ACTIVE_CONNECTIONS.inc();
@ -95,18 +95,20 @@ impl ActiveConnection {
self.endpoints.clone(),
addr,
self.connection_parameters,
exit_notifier.clone(),
max_number_of_connections,
max_uni_stream_connections,
);
let priorization_heap = PrioritizationFeesHeap::new(2 * max_uni_stream_connections);
let prioritization_heap_size = self
.connection_parameters
.prioritization_heap_size
.unwrap_or(2 * max_uni_stream_connections);
let priorization_heap = PrioritizationFeesHeap::new(prioritization_heap_size);
let heap_filler_task = {
let priorization_heap = priorization_heap.clone();
let data_cache = self.data_cache.clone();
let fill_notify = fill_notify.clone();
let exit_notifier = exit_notifier.clone();
let mut exit_notifier = self.exit_notifier.subscribe();
tokio::spawn(async move {
let mut current_blockheight =
data_cache.block_information_store.get_last_blockheight();
@ -115,7 +117,7 @@ impl ActiveConnection {
tx = transaction_reciever.recv() => {
tx
},
_ = exit_notifier.notified() => {
_ = exit_notifier.recv() => {
break;
}
};
@ -162,12 +164,14 @@ impl ActiveConnection {
if let Ok(PooledConnection { connection, permit }) =
connection_pool.get_pooled_connection().await
{
let exit_notifier = self.exit_notifier.subscribe();
tokio::task::spawn(async move {
let _permit = permit;
connection.get_connection().await;
connection.get_connection(exit_notifier).await;
});
};
let mut exit_notifier = self.exit_notifier.subscribe();
'main_loop: loop {
tokio::select! {
_ = fill_notify.notified() => {
@ -194,6 +198,7 @@ impl ActiveConnection {
break;
},
};
let exit_notifier = self.exit_notifier.subscribe();
tokio::spawn(async move {
// permit will be used to send all the transaction and then destroyed
@ -202,13 +207,13 @@ impl ActiveConnection {
NB_QUIC_TASKS.inc();
connection.send_transaction(tx.transaction).await;
connection.send_transaction(tx.transaction.as_ref(), exit_notifier).await;
timer.observe_duration();
NB_QUIC_TASKS.dec();
});
}
},
_ = exit_notifier.notified() => {
_ = exit_notifier.recv() => {
break 'main_loop;
}
}
@ -218,6 +223,7 @@ impl ActiveConnection {
let elements_removed = priorization_heap.clear().await;
TRANSACTIONS_IN_HEAP.sub(elements_removed as i64);
NB_QUIC_ACTIVE_CONNECTIONS.dec();
connection_pool.close_all().await;
}
pub fn start_listening(
@ -243,9 +249,9 @@ impl TpuConnectionManager {
pub async fn new(
certificate: rustls::Certificate,
key: rustls::PrivateKey,
fanout: usize,
_fanout: usize,
) -> Self {
let number_of_clients = fanout * 4;
let number_of_clients = 1; // fanout * 4;
Self {
endpoints: RotatingQueue::new(number_of_clients, || {
QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone())
@ -286,7 +292,7 @@ impl TpuConnectionManager {
if !connections_to_keep.contains_key(key) {
trace!("removing a connection for {}", key.to_string());
// ignore error for exit channel
value.exit_notifier.notify_waiters();
let _ = value.exit_notifier.send(());
false
} else {
true

View File

@ -17,6 +17,7 @@ use solana_lite_rpc_core::types::SlotStream;
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_sdk::{quic::QUIC_PORT_OFFSET, signature::Keypair, slot_history::Slot};
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use std::collections::HashMap;
use std::{
net::{IpAddr, Ipv4Addr},
sync::Arc,
@ -122,6 +123,7 @@ impl TpuService {
) -> anyhow::Result<()> {
let fanout = self.config.fanout_slots;
let last_slot = estimated_slot + fanout;
let current_slot = current_slot.saturating_sub(4);
let cluster_nodes = self.data_cache.cluster_info.cluster_nodes.clone();
@ -130,7 +132,7 @@ impl TpuService {
.get_slot_leaders(current_slot, last_slot)
.await?;
// get next leader with its tpu port
let connections_to_keep = next_leaders
let connections_to_keep: HashMap<_, _> = next_leaders
.iter()
.map(|x| {
let contact_info = cluster_nodes.get(&x.pubkey);

View File

@ -1,8 +1,7 @@
// This class will manage the lifecycle for a transaction
// It will send, replay if necessary and confirm by listening to blocks
use log::trace;
use std::{num::IntErrorKind, time::Duration};
use std::{sync::Arc, time::Duration};
use crate::{
tpu_utils::tpu_service::TpuService,
@ -23,7 +22,7 @@ use solana_lite_rpc_core::{
use solana_sdk::{
borsh0_10::try_from_slice_unchecked,
compute_budget::{self, ComputeBudgetInstruction},
transaction::VersionedTransaction,
transaction::{Transaction, VersionedTransaction},
};
use tokio::{
sync::mpsc::{self, Sender, UnboundedSender},
@ -123,6 +122,15 @@ pub struct TransactionService {
impl TransactionService {
pub async fn send_transaction(
&self,
tx: Transaction,
max_retries: Option<u16>,
) -> anyhow::Result<String> {
let raw_tx = bincode::serialize(&tx)?;
self.send_wire_transaction(raw_tx, max_retries).await
}
pub async fn send_wire_transaction(
&self,
raw_tx: Vec<u8>,
max_retries: Option<u16>,
@ -182,7 +190,7 @@ impl TransactionService {
signature,
last_valid_block_height: last_valid_blockheight,
slot,
transaction: raw_tx,
transaction: Arc::new(raw_tx),
prioritization_fee,
};
if let Err(e) = self