diff --git a/Cargo.lock b/Cargo.lock index 84ee8c26..e64fddc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1220,8 +1220,22 @@ dependencies = [ name = "custom-tpu-send-transactions" version = "0.2.4" dependencies = [ + "anyhow", + "bincode", + "clap 4.5.4", + "dashmap 5.5.3", + "futures", + "itertools 0.10.5", + "log", + "rand 0.8.5", + "rand_chacha 0.3.1", + "solana-lite-rpc-cluster-endpoints", "solana-lite-rpc-core", "solana-lite-rpc-services", + "solana-rpc-client", + "solana-sdk", + "tokio", + "tracing-subscriber", ] [[package]] @@ -1779,7 +1793,7 @@ dependencies = [ [[package]] name = "geyser-grpc-connector" version = "0.10.1+yellowstone.1.12" -source = "git+https://github.com/blockworks-foundation/geyser-grpc-connector.git?tag=v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize4#ce6ca26028c4466e0236657a76b9db2cccf4d535" +source = "git+https://github.com/blockworks-foundation/geyser-grpc-connector.git?tag=v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize-with-broadcast-exit#688e4d241dd18d18f57345d592e803aa673fcd96" dependencies = [ "anyhow", "async-stream", diff --git a/cluster-endpoints/Cargo.toml b/cluster-endpoints/Cargo.toml index 1db2d1dc..f1c91319 100644 --- a/cluster-endpoints/Cargo.toml +++ b/cluster-endpoints/Cargo.toml @@ -9,7 +9,7 @@ license = "AGPL" [dependencies] #geyser-grpc-connector = { path = "../../geyser-grpc-connector" } -geyser-grpc-connector = { tag = "v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize4", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" } +geyser-grpc-connector = { tag = "v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize-with-broadcast-exit", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" } solana-sdk = { workspace = true } solana-rpc-client-api = { workspace = true } diff --git a/cluster-endpoints/src/grpc_multiplex.rs b/cluster-endpoints/src/grpc_multiplex.rs index 9a3ee98e..2d0f05f3 100644 --- a/cluster-endpoints/src/grpc_multiplex.rs +++ b/cluster-endpoints/src/grpc_multiplex.rs @@ -12,10 +12,8 @@ use solana_sdk::commitment_config::CommitmentConfig; use solana_lite_rpc_core::solana_utils::hash_from_str; use solana_lite_rpc_core::structures::block_info::BlockInfo; use std::collections::{BTreeSet, HashMap, HashSet}; -use std::sync::Arc; use std::time::Duration; -use tokio::sync::broadcast::Receiver; -use tokio::sync::Notify; +use tokio::sync::broadcast::{self, Receiver}; use tokio::task::JoinHandle; use tokio::time::{sleep, Instant}; use tracing::debug_span; @@ -31,7 +29,7 @@ use crate::grpc_subscription::from_grpc_block_update; fn create_grpc_multiplex_processed_block_task( grpc_sources: &Vec, block_sender: tokio::sync::mpsc::Sender, - exit_notify: Arc, + mut exit_notify: broadcast::Receiver<()>, ) -> Vec> { const COMMITMENT_CONFIG: CommitmentConfig = CommitmentConfig::processed(); @@ -43,7 +41,7 @@ fn create_grpc_multiplex_processed_block_task( grpc_source.clone(), GeyserFilter(COMMITMENT_CONFIG).blocks_and_txs(), autoconnect_tx.clone(), - exit_notify.clone(), + exit_notify.resubscribe(), ); tasks.push(task); } @@ -51,7 +49,7 @@ fn create_grpc_multiplex_processed_block_task( let jh_merging_streams = tokio::task::spawn(async move { let mut slots_processed = BTreeSet::::new(); let mut last_tick = Instant::now(); - loop { + 'recv_loop: loop { // recv loop if last_tick.elapsed() > Duration::from_millis(800) { warn!( @@ -66,22 +64,29 @@ fn create_grpc_multiplex_processed_block_task( res = blocks_rx.recv() => { res }, - _ = exit_notify.notified() => { + _ = exit_notify.recv() => { break; } }; match blocks_rx_result { Some(Message::GeyserSubscribeUpdate(subscribe_update)) => { - let mapfilter = - map_block_from_yellowstone_update(*subscribe_update, COMMITMENT_CONFIG); - if let Some((slot, produced_block)) = mapfilter { - assert_eq!(COMMITMENT_CONFIG, produced_block.commitment_config); + // note: avoid mapping of full block as long as possible + let extracted_slot = extract_slot_from_yellowstone_update(&subscribe_update); + if let Some(slot) = extracted_slot { // check if the slot is in the map, if not check if the container is half full and the slot in question is older than the lowest value // it means that the slot is too old to process - if !slots_processed.contains(&slot) - && (slots_processed.len() < MAX_SIZE / 2 - || slot > slots_processed.first().cloned().unwrap_or_default()) + if slots_processed.contains(&slot) { + continue 'recv_loop; + } + if slots_processed.len() >= MAX_SIZE / 2 + && slot <= slots_processed.first().cloned().unwrap_or_default() { + continue 'recv_loop; + } + + let mapfilter = + map_block_from_yellowstone_update(*subscribe_update, COMMITMENT_CONFIG); + if let Some((_slot, produced_block)) = mapfilter { let send_started_at = Instant::now(); let send_result = block_sender .send(produced_block) @@ -130,7 +135,7 @@ fn create_grpc_multiplex_block_info_task( grpc_sources: &Vec, block_info_sender: tokio::sync::mpsc::Sender, commitment_config: CommitmentConfig, - exit_notify: Arc, + mut exit_notify: broadcast::Receiver<()>, ) -> Vec> { let (autoconnect_tx, mut blocks_rx) = tokio::sync::mpsc::channel(10); let mut tasks = vec![]; @@ -139,7 +144,7 @@ fn create_grpc_multiplex_block_info_task( grpc_source.clone(), GeyserFilter(commitment_config).blocks_meta(), autoconnect_tx.clone(), - exit_notify.clone(), + exit_notify.resubscribe(), ); tasks.push(task); } @@ -151,7 +156,7 @@ fn create_grpc_multiplex_block_info_task( res = blocks_rx.recv() => { res }, - _ = exit_notify.notified() => { + _ = exit_notify.recv() => { break; } }; @@ -263,7 +268,7 @@ pub fn create_grpc_multiplex_blocks_subscription( tokio::sync::mpsc::channel::(500); let (block_info_sender_finalized, mut block_info_reciever_finalized) = tokio::sync::mpsc::channel::(500); - let exit_notify = Arc::new(Notify::new()); + let (exit_sender, exit_notify) = broadcast::channel(1); let processed_block_sender = processed_block_sender.clone(); reconnect_attempts += 1; @@ -280,7 +285,7 @@ pub fn create_grpc_multiplex_blocks_subscription( let processed_blocks_tasks = create_grpc_multiplex_processed_block_task( &grpc_sources, processed_block_sender.clone(), - exit_notify.clone(), + exit_notify.resubscribe(), ); task_list.extend(processed_blocks_tasks); @@ -290,21 +295,21 @@ pub fn create_grpc_multiplex_blocks_subscription( &grpc_sources, block_info_sender_processed.clone(), CommitmentConfig::processed(), - exit_notify.clone(), + exit_notify.resubscribe(), ); task_list.extend(jh_meta_task_processed); let jh_meta_task_confirmed = create_grpc_multiplex_block_info_task( &grpc_sources, block_info_sender_confirmed.clone(), CommitmentConfig::confirmed(), - exit_notify.clone(), + exit_notify.resubscribe(), ); task_list.extend(jh_meta_task_confirmed); let jh_meta_task_finalized = create_grpc_multiplex_block_info_task( &grpc_sources, block_info_sender_finalized.clone(), CommitmentConfig::finalized(), - exit_notify.clone(), + exit_notify, ); task_list.extend(jh_meta_task_finalized); @@ -442,8 +447,12 @@ pub fn create_grpc_multiplex_blocks_subscription( } } } // -- END receiver loop - exit_notify.notify_waiters(); - futures::future::join_all(task_list).await; + if exit_sender.send(()).is_ok() { + futures::future::join_all(task_list).await; + } else { + log::error!("Problem sending exit signal"); + task_list.iter().for_each(|x| x.abort()); + } } // -- END reconnect loop }); @@ -474,9 +483,9 @@ pub fn create_grpc_multiplex_processed_slots_subscription( let jh_multiplex_task = tokio::spawn(async move { loop { let (autoconnect_tx, mut slots_rx) = tokio::sync::mpsc::channel(10); - let exit_notify = Arc::new(Notify::new()); + let (exit_sender, exit_notify) = broadcast::channel(1); - let tasks = grpc_sources + let task_list = grpc_sources .clone() .iter() .map(|grpc_source| { @@ -484,7 +493,7 @@ pub fn create_grpc_multiplex_processed_slots_subscription( grpc_source.clone(), GeyserFilter(COMMITMENT_CONFIG).slots(), autoconnect_tx.clone(), - exit_notify.clone(), + exit_notify.resubscribe(), ) }) .collect_vec(); @@ -537,14 +546,29 @@ pub fn create_grpc_multiplex_processed_slots_subscription( } } } // -- END receiver loop - exit_notify.notify_waiters(); - futures::future::join_all(tasks).await; + + if exit_sender.send(()).is_ok() { + futures::future::join_all(task_list).await; + } else { + log::error!("Problem sending exit signal"); + task_list.iter().for_each(|x| x.abort()); + } } // -- END reconnect loop }); (multiplexed_messages_rx, jh_multiplex_task) } +fn extract_slot_from_yellowstone_update(update: &SubscribeUpdate) -> Option { + match &update.update_oneof { + // list is not exhaustive + Some(UpdateOneof::Slot(update_message)) => Some(update_message.slot), + Some(UpdateOneof::BlockMeta(update_message)) => Some(update_message.slot), + Some(UpdateOneof::Block(update_message)) => Some(update_message.slot), + _ => None, + } +} + fn map_slot_from_yellowstone_update(update: SubscribeUpdate) -> Option { match update.update_oneof { Some(UpdateOneof::Slot(update_slot_message)) => Some(update_slot_message.slot), diff --git a/cluster-endpoints/src/grpc_subscription.rs b/cluster-endpoints/src/grpc_subscription.rs index e57b806b..6e6a5228 100644 --- a/cluster-endpoints/src/grpc_subscription.rs +++ b/cluster-endpoints/src/grpc_subscription.rs @@ -36,7 +36,7 @@ use solana_transaction_status::{Reward, RewardType}; use std::cell::OnceCell; use std::collections::HashMap; use std::sync::Arc; -use tokio::sync::Notify; +use tokio::sync::{broadcast, Notify}; use tracing::trace_span; use yellowstone_grpc_client::GeyserGrpcClient; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; @@ -273,12 +273,12 @@ fn map_compute_budget_instructions(message: &VersionedMessage) -> (Option, pub fn create_block_processing_task( grpc_addr: String, grpc_x_token: Option, - block_sx: async_channel::Sender, + block_sx: tokio::sync::mpsc::Sender, commitment_level: CommitmentLevel, - exit_notfier: Arc, + mut exit_notify: broadcast::Receiver<()>, ) -> AnyhowJoinHandle { tokio::spawn(async move { - loop { + 'main_loop: loop { let mut blocks_subs = HashMap::new(); blocks_subs.insert( "block_client".to_string(), @@ -293,7 +293,8 @@ pub fn create_block_processing_task( // connect to grpc let mut client = connect_with_timeout_hacked(grpc_addr.clone(), grpc_x_token.clone()).await?; - let mut stream = client + let mut stream = tokio::select! { + res = client .subscribe_once( HashMap::new(), Default::default(), @@ -304,8 +305,13 @@ pub fn create_block_processing_task( Some(commitment_level), Default::default(), None, - ) - .await?; + ) => { + res? + }, + _ = exit_notify.recv() => { + break; + } + }; loop { tokio::select! { @@ -338,8 +344,8 @@ pub fn create_block_processing_task( } }; }, - _ = exit_notfier.notified() => { - break; + _ = exit_notify.recv() => { + break 'main_loop; } } } @@ -348,6 +354,7 @@ pub fn create_block_processing_task( log::error!("Grpc block subscription broken (resubscribing)"); tokio::time::sleep(std::time::Duration::from_secs(1)).await; } + Ok(()) }) } @@ -355,7 +362,7 @@ pub fn create_block_processing_task( pub fn create_slot_stream_task( grpc_addr: String, grpc_x_token: Option, - slot_sx: async_channel::Sender, + slot_sx: tokio::sync::mpsc::Sender, commitment_level: CommitmentLevel, ) -> AnyhowJoinHandle { tokio::spawn(async move { diff --git a/cluster-endpoints/src/rpc_polling/vote_accounts_and_cluster_info_polling.rs b/cluster-endpoints/src/rpc_polling/vote_accounts_and_cluster_info_polling.rs index 12b5c2d2..4d2794c9 100644 --- a/cluster-endpoints/src/rpc_polling/vote_accounts_and_cluster_info_polling.rs +++ b/cluster-endpoints/src/rpc_polling/vote_accounts_and_cluster_info_polling.rs @@ -14,7 +14,6 @@ pub fn poll_cluster_info( loop { match rpc_client.get_cluster_nodes().await { Ok(cluster_nodes) => { - debug!("get cluster_nodes from rpc: {:?}", cluster_nodes.len()); if let Err(e) = contact_info_sender.send(cluster_nodes) { warn!("rpc_cluster_info channel has no receivers {e:?}"); } @@ -23,7 +22,7 @@ pub fn poll_cluster_info( Err(error) => { warn!("rpc_cluster_info failed <{:?}> - retrying", error); // throttle - tokio::time::sleep(Duration::from_secs(2500)).await; + tokio::time::sleep(Duration::from_secs(10)).await; } } } @@ -51,7 +50,7 @@ pub fn poll_vote_accounts( Err(error) => { warn!("rpc_vote_accounts failed <{:?}> - retrying", error); // throttle - tokio::time::sleep(Duration::from_secs(2500)).await; + tokio::time::sleep(Duration::from_secs(10)).await; } } } diff --git a/config.example.json b/config.example.json index 320fbefd..7f67ed02 100644 --- a/config.example.json +++ b/config.example.json @@ -10,7 +10,7 @@ "transaction_retry_after_secs": 3, "quic_proxy_addr": null, "use_grpc": false, - "calculate_leader_schedule_form_geyser": false, + "calculate_leader_schedule_from_geyser": false, "grpc_addr": "http://127.0.0.0:10000", "grpc_x_token": null, "postgres": { diff --git a/core/src/keypair_loader.rs b/core/src/keypair_loader.rs index b37864cd..dcf0d138 100644 --- a/core/src/keypair_loader.rs +++ b/core/src/keypair_loader.rs @@ -2,22 +2,21 @@ use anyhow::Context; use solana_sdk::signature::Keypair; use std::env; -// note this is duplicated from lite-rpc module pub async fn load_identity_keypair( - identity_path: Option, + identity_keyfile_path: Option, ) -> anyhow::Result> { - let identity_str = if let Some(identity_from_cli) = identity_path { - tokio::fs::read_to_string(identity_from_cli) + let identity_jsonarray_str = if let Ok(identity_env_var) = env::var("IDENTITY") { + identity_env_var + } else if let Some(identity_path) = identity_keyfile_path { + tokio::fs::read_to_string(identity_path) .await .context("Cannot find the identity file provided")? - } else if let Ok(identity_env_var) = env::var("IDENTITY") { - identity_env_var } else { return Ok(None); }; - let identity_bytes: Vec = - serde_json::from_str(&identity_str).context("Invalid identity format expected Vec")?; + let identity_bytes: Vec = serde_json::from_str(&identity_jsonarray_str) + .context("Invalid identity format expected Vec")?; Ok(Some( Keypair::from_bytes(identity_bytes.as_slice()).context("Invalid identity")?, diff --git a/core/src/lib.rs b/core/src/lib.rs index 5f236fb4..a1269141 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -8,5 +8,6 @@ pub mod stores; pub mod structures; pub mod traits; pub mod types; +pub mod utils; pub type AnyhowJoinHandle = tokio::task::JoinHandle>; diff --git a/core/src/structures/prioritization_fee_heap.rs b/core/src/structures/prioritization_fee_heap.rs index dd47e1fd..5e24c2c2 100644 --- a/core/src/structures/prioritization_fee_heap.rs +++ b/core/src/structures/prioritization_fee_heap.rs @@ -127,7 +127,7 @@ impl PrioritizationFeesHeap { #[cfg(test)] mod tests { use solana_sdk::signature::Signature; - use std::time::Duration; + use std::{sync::Arc, time::Duration}; use crate::structures::{ prioritization_fee_heap::PrioritizationFeesHeap, transaction_sent_info::SentTransactionInfo, @@ -139,7 +139,7 @@ mod tests { let tx_creator = |signature, prioritization_fee| SentTransactionInfo { signature, slot: 0, - transaction: vec![], + transaction: Arc::new(vec![]), last_valid_block_height: 0, prioritization_fee, }; @@ -205,7 +205,7 @@ mod tests { let info = SentTransactionInfo { signature: Signature::new_unique(), slot: height + 1, - transaction: vec![], + transaction: Arc::new(vec![]), last_valid_block_height: height + 10, prioritization_fee, }; diff --git a/core/src/structures/transaction_sent_info.rs b/core/src/structures/transaction_sent_info.rs index aa62e80e..2f8f5097 100644 --- a/core/src/structures/transaction_sent_info.rs +++ b/core/src/structures/transaction_sent_info.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use solana_sdk::signature::Signature; use solana_sdk::slot_history::Slot; @@ -7,7 +9,7 @@ pub type WireTransaction = Vec; pub struct SentTransactionInfo { pub signature: Signature, pub slot: Slot, - pub transaction: WireTransaction, + pub transaction: Arc, pub last_valid_block_height: u64, pub prioritization_fee: u64, } diff --git a/core/src/utils.rs b/core/src/utils.rs new file mode 100644 index 00000000..1b34fde4 --- /dev/null +++ b/core/src/utils.rs @@ -0,0 +1,33 @@ +use std::time::Duration; + +use log::debug; +use solana_sdk::commitment_config::CommitmentConfig; +use tokio::time::{timeout, Instant}; + +use crate::{structures::block_info::BlockInfo, types::BlockInfoStream}; + +pub async fn wait_till_block_of_commitment_is_recieved( + mut blockinfo_stream: BlockInfoStream, + commitment_config: CommitmentConfig, +) -> BlockInfo { + let started = Instant::now(); + loop { + match timeout(Duration::from_millis(1000), blockinfo_stream.recv()).await { + Ok(Ok(block_info)) => { + if block_info.commitment_config == commitment_config { + return block_info; + } + } + Err(_elapsed) => { + debug!( + "waiting for latest block info ({}) ... {:.02}ms", + commitment_config.commitment, + started.elapsed().as_secs_f32() * 1000.0 + ); + } + Ok(Err(error)) => { + panic!("Did not recv block info : {error:?}"); + } + } + } +} diff --git a/examples/custom-tpu-send-transactions/Cargo.toml b/examples/custom-tpu-send-transactions/Cargo.toml index a5090145..9259b9e2 100644 --- a/examples/custom-tpu-send-transactions/Cargo.toml +++ b/examples/custom-tpu-send-transactions/Cargo.toml @@ -10,4 +10,20 @@ edition.workspace = true [dependencies] solana-lite-rpc-services = {workspace = true} -solana-lite-rpc-core = {workspace = true} \ No newline at end of file +solana-lite-rpc-core = {workspace = true} +solana-lite-rpc-cluster-endpoints = {workspace = true} + +solana-sdk = { workspace = true } +solana-rpc-client = { workspace = true } + +tokio = "1.28.2" +clap = { workspace = true } +anyhow = { workspace = true } +dashmap = { workspace = true } +rand = "0.8.5" +rand_chacha = "0.3.1" +log = { workspace = true } +itertools = { workspace = true } +bincode = { workspace = true } +futures = { workspace = true } +tracing-subscriber = { workspace = true } \ No newline at end of file diff --git a/examples/custom-tpu-send-transactions/src/cli.rs b/examples/custom-tpu-send-transactions/src/cli.rs new file mode 100644 index 00000000..dd8a248f --- /dev/null +++ b/examples/custom-tpu-send-transactions/src/cli.rs @@ -0,0 +1,39 @@ +use clap::Parser; + +#[derive(Parser, Debug, Clone)] +#[command(author, version, about, long_about = None)] +pub struct Args { + /// config.json + #[arg(short, long, default_value = "http://127.0.0.1:8899")] + pub rpc_url: String, + + #[arg(short, long)] + pub grpc_url: Option, + + #[arg(short, long)] + pub x_token: Option, + + #[arg(short, long)] + pub transaction_count: Option, + + #[arg(short, long, default_value_t = 1)] + pub number_of_seconds: usize, + + #[arg(short, long)] + pub fee_payer: String, + + #[arg(short, long)] + pub staked_identity: Option, + + #[arg(short, long)] + pub priority_fees: Option, + + #[arg(short = 'a', long, default_value_t = 256)] + pub additional_signers: usize, + + #[arg(short = 'b', long, default_value_t = 0.1)] + pub signers_transfer_balance: f64, + + #[arg(long)] + pub fanout_slots: Option, +} diff --git a/examples/custom-tpu-send-transactions/src/main.rs b/examples/custom-tpu-send-transactions/src/main.rs index 98ecdb01..6264d059 100644 --- a/examples/custom-tpu-send-transactions/src/main.rs +++ b/examples/custom-tpu-send-transactions/src/main.rs @@ -1,3 +1,519 @@ -fn main() { - todo!() +use std::{collections::HashSet, ops::Mul, str::FromStr, sync::Arc, time::Duration}; + +use clap::Parser; +use dashmap::{DashMap, DashSet}; +use itertools::Itertools; +use rand::{ + distributions::{Alphanumeric, Distribution}, + SeedableRng, +}; +use solana_lite_rpc_cluster_endpoints::{ + geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig}, + grpc_subscription::create_grpc_subscription, + json_rpc_leaders_getter::JsonRpcLeaderGetter, + json_rpc_subscription::create_json_rpc_polling_subscription, +}; +use solana_lite_rpc_core::{ + keypair_loader::load_identity_keypair, + stores::{ + block_information_store::{BlockInformation, BlockInformationStore}, + cluster_info_store::ClusterInfo, + data_cache::{DataCache, SlotCache}, + subscription_store::SubscriptionStore, + tx_store::TxStore, + }, + structures::{ + epoch::EpochCache, identity_stakes::IdentityStakes, leaderschedule::CalculatedSchedule, + transaction_sent_info::SentTransactionInfo, + }, + utils::wait_till_block_of_commitment_is_recieved, +}; +use solana_lite_rpc_services::{ + data_caching_service::DataCachingService, + quic_connection_utils::QuicConnectionParameters, + tpu_utils::{ + tpu_connection_path::TpuConnectionPath, + tpu_service::{TpuService, TpuServiceConfig}, + }, + transaction_replayer::TransactionReplayer, + transaction_service::TransactionServiceBuilder, + tx_sender::TxSender, +}; +use solana_sdk::{ + commitment_config::CommitmentConfig, + compute_budget, + hash::Hash, + instruction::Instruction, + message::Message, + native_token::LAMPORTS_PER_SOL, + pubkey::Pubkey, + signature::{Keypair, Signature}, + signer::Signer, + system_instruction, + transaction::Transaction, +}; +use tokio::sync::{Mutex, RwLock}; + +use crate::cli::Args; +use solana_rpc_client::nonblocking::rpc_client::RpcClient; + +mod cli; + +const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr"; + +pub fn create_memo_tx(msg: &[u8], payer: &Keypair, blockhash: Hash, prio_fees: u64) -> Transaction { + let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap(); + + let cb_1 = compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(7000); + let cb_2 = compute_budget::ComputeBudgetInstruction::set_compute_unit_price(prio_fees); + let instruction = Instruction::new_with_bytes(memo, msg, vec![]); + let message = Message::new(&[cb_1, cb_2, instruction], Some(&payer.pubkey())); + Transaction::new(&[payer], message, blockhash) +} + +pub fn generate_random_strings( + num_of_txs: usize, + random_seed: Option, + n_chars: usize, +) -> Vec> { + let seed = random_seed.map_or(0, |x| x); + let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(seed); + (0..num_of_txs) + .map(|_| Alphanumeric.sample_iter(&mut rng).take(n_chars).collect()) + .collect() +} + +pub async fn create_signers_from_payer( + rpc_client: Arc, + payer: Arc, + nb_signers: usize, + signer_balance: u64, + prio_fees: u64, +) -> Vec> { + let signers = (0..nb_signers) + .map(|_| Arc::new(Keypair::new())) + .collect_vec(); + let mut signers_to_transfer: HashSet = signers.iter().map(|kp| kp.pubkey()).collect(); + + while !signers_to_transfer.is_empty() { + let Ok(blockhash) = rpc_client.get_latest_blockhash().await else { + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + }; + + let cb_1 = compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(5000); + let cb_2 = compute_budget::ComputeBudgetInstruction::set_compute_unit_price(prio_fees); + + let transactions = signers_to_transfer + .iter() + .map(|signer| { + let instruction = + system_instruction::transfer(&payer.pubkey(), signer, signer_balance); + let message = Message::new( + &[cb_1.clone(), cb_2.clone(), instruction], + Some(&payer.pubkey()), + ); + (*signer, Transaction::new(&[&payer], message, blockhash)) + }) + .collect_vec(); + let tasks = transactions + .iter() + .map(|(signer, tx)| { + let rpc_client = rpc_client.clone(); + let tx = tx.clone(); + let signer = *signer; + tokio::spawn( + async move { (signer, rpc_client.send_and_confirm_transaction(&tx).await) }, + ) + }) + .collect_vec(); + + let results = futures::future::join_all(tasks).await; + for result in results { + match result { + Ok((signer, Ok(_signature))) => { + signers_to_transfer.remove(&signer); + } + Ok((signer, Err(e))) => { + log::error!("Error transfering to {signer:?}, {e:?}"); + } + _ => { + // retry + } + } + } + } + signers +} + +pub async fn transfer_back_to_payer( + rpc_client: Arc, + payer: Arc, + signers: Vec>, + prio_fees: u64, +) { + let mut signers_to_transfer: HashSet = signers.iter().map(|kp| kp.pubkey()).collect(); + let payer_pubkey = payer.pubkey(); + + while !signers_to_transfer.is_empty() { + let Ok(blockhash) = rpc_client.get_latest_blockhash().await else { + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + }; + + let transfers = signers + .iter() + .map(|signer| { + let rpc_client = rpc_client.clone(); + let signer = signer.clone(); + tokio::spawn(async move { + let balance = rpc_client.get_balance(&signer.pubkey()).await.unwrap(); + + let cb_1 = + compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(5000); + let cb_2 = + compute_budget::ComputeBudgetInstruction::set_compute_unit_price(prio_fees); + let balance_to_transfer = balance + .saturating_sub(5000) + .saturating_sub(5000 * prio_fees); + let instruction = system_instruction::transfer( + &signer.pubkey(), + &payer_pubkey, + balance_to_transfer, + ); + let message = Message::new( + &[cb_1.clone(), cb_2.clone(), instruction], + Some(&signer.pubkey()), + ); + ( + signer.pubkey(), + rpc_client + .send_and_confirm_transaction(&Transaction::new( + &[&signer], + message, + blockhash, + )) + .await, + ) + }) + }) + .collect_vec(); + + let results = futures::future::join_all(transfers).await; + for result in results { + match result { + Ok((signer, Ok(_signature))) => { + signers_to_transfer.remove(&signer); + } + Ok((signer, Err(e))) => { + log::error!("Error transfering to {signer:?}, {e:?}"); + } + _ => { + // retry + } + } + } + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + let args = Args::parse(); + + let rpc_url = args.rpc_url; + let rpc_client = Arc::new(RpcClient::new(rpc_url)); + + let leader_schedule = Arc::new(JsonRpcLeaderGetter::new(rpc_client.clone(), 1024, 128)); + + let fee_payer = Arc::new( + load_identity_keypair(Some(args.fee_payer)) + .await + .expect("Payer should be set or keypair file not found") + .unwrap(), + ); + + let priority_fee = args.priority_fees.unwrap_or_default(); + let nb_signers = args.additional_signers; + let signer_balance = args.signers_transfer_balance.mul(LAMPORTS_PER_SOL as f64) as u64; + let signers = create_signers_from_payer( + rpc_client.clone(), + fee_payer.clone(), + nb_signers, + signer_balance, + priority_fee, + ) + .await; + + println!( + "Creating {} users with {} SOL balance", + nb_signers, signer_balance + ); + + let validator_identity = Arc::new( + load_identity_keypair(args.staked_identity) + .await? + .unwrap_or_else(Keypair::new), + ); + + // START ALL SERVICES REQUIRED BY LITE_RPC + // setup endpoint, GRPC/RPC Polling + println!("Setting up lite-rpc tpu service"); + let (endpoints, _handles) = if let Some(grpc_addr) = args.grpc_url { + let timeouts = GrpcConnectionTimeouts { + connect_timeout: Duration::from_secs(10), + request_timeout: Duration::from_secs(10), + subscribe_timeout: Duration::from_secs(10), + receive_timeout: Duration::from_secs(10), + }; + create_grpc_subscription( + rpc_client.clone(), + vec![GrpcSourceConfig::new( + grpc_addr, + args.x_token.clone(), + None, + timeouts, + )], + vec![], + )? + } else { + create_json_rpc_polling_subscription(rpc_client.clone(), 100)? + }; + + let finalized_block_information = wait_till_block_of_commitment_is_recieved( + endpoints.blockinfo_notifier.resubscribe(), + CommitmentConfig::finalized(), + ) + .await; + + let block_height = rpc_client + .get_block_height_with_commitment(CommitmentConfig::finalized()) + .await?; + let (blockhash, _) = rpc_client + .get_latest_blockhash_with_commitment(CommitmentConfig::finalized()) + .await?; + let finalize_slot = finalized_block_information.slot; + + println!( + "finalized blockheight : {:?}, slot: {}, hash: {}", + finalized_block_information.block_height, + finalized_block_information.slot, + finalized_block_information.blockhash + ); + println!( + "From RPC blockheight : {block_height:?}, hash: {}", + blockhash + ); + + let finalized_block_information = BlockInformation { + slot: finalized_block_information.slot, + block_height, + last_valid_blockheight: finalized_block_information.block_height + 300, + cleanup_slot: finalized_block_information.slot + 1000000, + blockhash: finalized_block_information.blockhash, + commitment_config: CommitmentConfig::finalized(), + block_time: 0, + }; + + let block_information_store = BlockInformationStore::new(finalized_block_information); + + let data_cache = DataCache { + block_information_store, + cluster_info: ClusterInfo::default(), + identity_stakes: IdentityStakes::new(validator_identity.pubkey()), + slot_cache: SlotCache::new(finalize_slot), + tx_subs: SubscriptionStore::default(), + txs: TxStore { + store: Arc::new(DashMap::new()), + }, + epoch_data: EpochCache::new_for_tests(), + leader_schedule: Arc::new(RwLock::new(CalculatedSchedule::default())), + }; + + let data_cache_service = DataCachingService { + data_cache: data_cache.clone(), + clean_duration: Duration::from_secs(120), + }; + + // start listning the cluster data and filling the cache + data_cache_service.listen( + endpoints.blocks_notifier.resubscribe(), + endpoints.blockinfo_notifier, + endpoints.slot_notifier.resubscribe(), + endpoints.cluster_info_notifier, + endpoints.vote_account_notifier, + ); + + let count = args.transaction_count.unwrap_or(10); + let prioritization_heap_size = Some(count * args.number_of_seconds); + + let tpu_config = TpuServiceConfig { + fanout_slots: args.fanout_slots.unwrap_or(16), + maximum_transaction_in_queue: 2000000, + quic_connection_params: QuicConnectionParameters { + connection_timeout: Duration::from_secs(60), + connection_retry_count: 10, + finalize_timeout: Duration::from_millis(10000), + max_number_of_connections: 4, + unistream_timeout: Duration::from_millis(1000), + write_timeout: Duration::from_secs(10), + number_of_transactions_per_unistream: 1, + unistreams_to_create_new_connection_in_percentage: 5, + prioritization_heap_size, + }, + tpu_connection_path: TpuConnectionPath::QuicDirectPath, + }; + + let tpu_service: TpuService = TpuService::new( + tpu_config, + validator_identity, + leader_schedule, + data_cache.clone(), + ) + .await?; + let transaction_service_builder = TransactionServiceBuilder::new( + TxSender::new(data_cache.clone(), tpu_service.clone()), + TransactionReplayer::new( + tpu_service.clone(), + data_cache.clone(), + Duration::from_secs(1), + ), + tpu_service, + 10000, + ); + let (transaction_service, _) = transaction_service_builder.start( + None, + data_cache.block_information_store.clone(), + 10, + endpoints.slot_notifier, + ); + + // CREATE TRANSACTIONS + log::info!("Creating memo transactions"); + + let memo_msgs = generate_random_strings(count * args.number_of_seconds, None, 5); + + let mut tx_to_confirm = vec![]; + let mut second = 1; + let mut signer_count = 0; + let map_of_signature = Arc::new(DashSet::::new()); + let transactions_in_blocks = Arc::new(Mutex::new(Vec::::new())); + let mut block_stream = endpoints.blocks_notifier; + + let block_tps_task = { + let map_of_signature = map_of_signature.clone(); + let transactions_in_blocks = transactions_in_blocks.clone(); + tokio::spawn(async move { + let mut start_tracking = false; + while let Ok(block) = block_stream.recv().await { + let mut count = 0; + for transaction in &block.transactions { + if map_of_signature.contains(&transaction.signature) { + count += 1; + map_of_signature.remove(&transaction.signature); + } + } + + // start tracking once we have first block with some transactions sent by us + if start_tracking || count > 0 { + start_tracking = true; + let mut lk = transactions_in_blocks.lock().await; + lk.push(count); + } + } + }) + }; + + for chunk in memo_msgs.chunks(count) { + let instant = tokio::time::Instant::now(); + let mut current_txs = vec![]; + println!("Sending memo transactions :{}", second); + second += 1; + let bh = data_cache + .block_information_store + .get_latest_blockhash(CommitmentConfig::finalized()) + .await; + let last_valid_block_height = + data_cache.block_information_store.get_last_blockheight() + 300; + let transactions = chunk + .iter() + .map(|x| { + signer_count += 1; + create_memo_tx(x, &signers[signer_count % nb_signers], bh, priority_fee) + }) + .collect_vec(); + + for transaction in transactions { + let signature = transaction.signatures[0]; + let raw_tx = bincode::serialize(&transaction).unwrap(); + let slot = data_cache.slot_cache.get_current_slot(); + map_of_signature.insert(signature); + + let transaction_info = SentTransactionInfo { + signature, + last_valid_block_height, + slot, + transaction: Arc::new(raw_tx), + prioritization_fee: priority_fee, + }; + let _ = transaction_service + .transaction_channel + .send(transaction_info.clone()) + .await; + current_txs.push(signature); + } + + tx_to_confirm.push(current_txs); + + let millis = instant.elapsed().as_millis() as u64; + if millis < 1000 { + tokio::time::sleep(Duration::from_millis(1000 - millis)).await; + } else { + println!("took {millis:?} millis to send {count:?} transactions"); + } + } + + println!( + "{} memo transactions sent, waiting for a minute to confirm them", + count * args.number_of_seconds + ); + + tokio::time::sleep(Duration::from_secs(120)).await; + + let mut second = 1; + for seconds_sigs in tx_to_confirm { + let mut tx_confirmed = 0; + for sig in seconds_sigs { + if data_cache.txs.is_transaction_confirmed(&sig) { + tx_confirmed += 1; + } + } + + println!( + "{} or {} transactions were confirmed for the {} second", + tx_confirmed, count, second + ); + second += 1; + } + + block_tps_task.abort(); + + let lk = transactions_in_blocks.lock().await; + // stop tracking by removing trailling 0s + let mut vec = lk.clone(); + vec.reverse(); + let mut transaction_blocks = vec.iter().skip_while(|x| **x == 0).cloned().collect_vec(); + transaction_blocks.reverse(); + println!( + "BLOCKS transactions : {}", + transaction_blocks.iter().map(|x| x.to_string()).join(", ") + ); + let sum = transaction_blocks.iter().sum::(); + let seconds = (transaction_blocks.len() * 400 / 1000) as u64; + let tps = sum / seconds; + println!("EFFECTIVE TPS: {tps:?}"); + + println!("Transfering remaining lamports to payer"); + transfer_back_to_payer(rpc_client, fee_payer, signers, priority_fee).await; + Ok(()) } diff --git a/lite-rpc/src/bridge.rs b/lite-rpc/src/bridge.rs index c77543c9..26fb5ce7 100644 --- a/lite-rpc/src/bridge.rs +++ b/lite-rpc/src/bridge.rs @@ -377,7 +377,7 @@ impl LiteRpcServer for LiteBridge { match self .transaction_service - .send_transaction(raw_tx, max_retries) + .send_wire_transaction(raw_tx, max_retries) .await { Ok(sig) => { diff --git a/lite-rpc/src/cli.rs b/lite-rpc/src/cli.rs index b8c334ca..ed20ea35 100644 --- a/lite-rpc/src/cli.rs +++ b/lite-rpc/src/cli.rs @@ -49,7 +49,7 @@ pub struct Config { #[serde(default)] pub use_grpc: bool, #[serde(default)] - pub calculate_leader_schedule_form_geyser: bool, + pub calculate_leader_schedule_from_geyser: bool, #[serde(default = "Config::default_grpc_addr")] pub grpc_addr: String, #[serde(default)] @@ -142,11 +142,8 @@ impl Config { .map(|size| size.parse().unwrap()) .unwrap_or(config.fanout_size); - // IDENTITY env sets value of identity_keypair - - // config.identity_keypair = env::var("IDENTITY") - // .map(Some) - // .unwrap_or(config.identity_keypair); + // note: identity config is handled in load_identity_keypair + // the behavior is different from the other config values as it does either take a file path or the keypair as json array config.prometheus_addr = env::var("PROMETHEUS_ADDR").unwrap_or(config.prometheus_addr); @@ -161,7 +158,7 @@ impl Config { config.quic_proxy_addr = env::var("QUIC_PROXY_ADDR").ok(); config.use_grpc = env::var("USE_GRPC") - .map(|_| true) + .map(|value| value.parse::().unwrap()) .unwrap_or(config.use_grpc); // source 1 diff --git a/lite-rpc/src/main.rs b/lite-rpc/src/main.rs index 36746091..4804c2ab 100644 --- a/lite-rpc/src/main.rs +++ b/lite-rpc/src/main.rs @@ -11,7 +11,7 @@ use lite_rpc::postgres_logger::PostgresLogger; use lite_rpc::service_spawner::ServiceSpawner; use lite_rpc::start_server::start_servers; use lite_rpc::DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE; -use log::{debug, info}; +use log::info; use solana_lite_rpc_accounts::account_service::AccountService; use solana_lite_rpc_accounts::account_store_interface::AccountStorageInterface; use solana_lite_rpc_accounts::inmemory_account_store::InmemoryAccountStore; @@ -44,7 +44,8 @@ use solana_lite_rpc_core::structures::{ epoch::EpochCache, identity_stakes::IdentityStakes, notifications::NotificationSender, }; use solana_lite_rpc_core::traits::address_lookup_table_interface::AddressLookupTableInterface; -use solana_lite_rpc_core::types::{BlockInfoStream, BlockStream}; +use solana_lite_rpc_core::types::BlockStream; +use solana_lite_rpc_core::utils::wait_till_block_of_commitment_is_recieved; use solana_lite_rpc_core::AnyhowJoinHandle; use solana_lite_rpc_prioritization_fees::account_prio_service::AccountPrioService; use solana_lite_rpc_services::data_caching_service::DataCachingService; @@ -54,7 +55,6 @@ use solana_lite_rpc_services::transaction_replayer::TransactionReplayer; use solana_lite_rpc_services::tx_sender::TxSender; use lite_rpc::postgres_logger; -use solana_lite_rpc_core::structures::block_info::BlockInfo; use solana_lite_rpc_prioritization_fees::start_block_priofees_task; use solana_lite_rpc_util::obfuscate_rpcurl; use solana_rpc_client::nonblocking::rpc_client::RpcClient; @@ -67,7 +67,6 @@ use std::time::Duration; use tokio::io::AsyncReadExt; use tokio::sync::mpsc; use tokio::sync::RwLock; -use tokio::time::{timeout, Instant}; use tracing_subscriber::fmt::format::FmtSpan; use tracing_subscriber::EnvFilter; @@ -76,38 +75,6 @@ use tracing_subscriber::EnvFilter; #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; -// export _RJEM_MALLOC_CONF=prof:true,lg_prof_interval:30,lg_prof_sample:21,prof_prefix:/tmp/jeprof - -use jemalloc_ctl::{epoch, stats}; -use std::{thread, time::{self}}; - - -async fn get_latest_block_info( - mut blockinfo_stream: BlockInfoStream, - commitment_config: CommitmentConfig, -) -> BlockInfo { - let started = Instant::now(); - loop { - match timeout(Duration::from_millis(500), blockinfo_stream.recv()).await { - Ok(Ok(block_info)) => { - if block_info.commitment_config == commitment_config { - return block_info; - } - } - Err(_elapsed) => { - debug!( - "waiting for latest block info ({}) ... {:.02}ms", - commitment_config.commitment, - started.elapsed().as_secs_f32() * 1000.0 - ); - } - Ok(Err(_error)) => { - panic!("Did not recv block info"); - } - } - } -} - pub async fn start_postgres( config: Option, ) -> anyhow::Result<(Option, AnyhowJoinHandle)> { @@ -261,7 +228,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: }; info!("Waiting for first finalized block info..."); - let finalized_block_info = get_latest_block_info( + let finalized_block_info = wait_till_block_of_commitment_is_recieved( blockinfo_notifier.resubscribe(), CommitmentConfig::finalized(), ) @@ -345,7 +312,6 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: }; let spawner = ServiceSpawner { - prometheus_addr, data_cache: data_cache.clone(), }; //init grpc leader schedule and vote account is configured. @@ -370,7 +336,8 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: slot_notifier.resubscribe(), ); - let support_service = tokio::spawn(async move { spawner.spawn_support_services().await }); + let support_service = + tokio::spawn(async move { spawner.spawn_support_services(prometheus_addr).await }); let history = History::new(); diff --git a/lite-rpc/src/service_spawner.rs b/lite-rpc/src/service_spawner.rs index 0be16a1d..ff2de631 100644 --- a/lite-rpc/src/service_spawner.rs +++ b/lite-rpc/src/service_spawner.rs @@ -17,15 +17,14 @@ use solana_lite_rpc_services::{ use std::time::Duration; pub struct ServiceSpawner { - pub prometheus_addr: String, pub data_cache: DataCache, } impl ServiceSpawner { /// spawn services that support the whole system - pub async fn spawn_support_services(&self) -> anyhow::Result<()> { + pub async fn spawn_support_services(&self, prometheus_addr: String) -> anyhow::Result<()> { // spawn prometheus - let prometheus = PrometheusSync::sync(self.prometheus_addr.clone()); + let prometheus = PrometheusSync::sync(prometheus_addr.clone()); // spawn metrics capture let metrics = MetricsCapture::new(self.data_cache.txs.clone()).capture(); diff --git a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs index 2033dd95..b9f61914 100644 --- a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs +++ b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs @@ -61,6 +61,7 @@ const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameter write_timeout: Duration::from_secs(2), number_of_transactions_per_unistream: 10, unistreams_to_create_new_connection_in_percentage: 10, + prioritization_heap_size: None, }; #[test] @@ -743,7 +744,7 @@ pub fn build_raw_sample_tx(i: u32) -> SentTransactionInfo { let tx = build_sample_tx(&payer_keypair, i); let transaction = - bincode::serialize::(&tx).expect("failed to serialize tx"); + Arc::new(bincode::serialize::(&tx).expect("failed to serialize tx")); SentTransactionInfo { signature: *tx.get_signature(), diff --git a/quic-forward-proxy/src/main.rs b/quic-forward-proxy/src/main.rs index 022ed827..fc48a841 100644 --- a/quic-forward-proxy/src/main.rs +++ b/quic-forward-proxy/src/main.rs @@ -35,8 +35,10 @@ pub async fn main() -> anyhow::Result<()> { dotenv().ok(); let proxy_listener_addr = proxy_listen_addr.parse().unwrap(); - let validator_identity = - ValidatorIdentity::new(load_identity_keypair(Some(identity_keypair)).await?); + + let validator_identity = ValidatorIdentity::new( + load_identity_keypair(Some(identity_keypair).filter(|s| !s.is_empty())).await?, + ); let tls_config = Arc::new(SelfSignedTlsConfigProvider::new_singleton_self_signed_localhost()); let main_services = QuicForwardProxy::new(proxy_listener_addr, tls_config, validator_identity) diff --git a/services/src/quic_connection.rs b/services/src/quic_connection.rs index 3678307f..9111c52e 100644 --- a/services/src/quic_connection.rs +++ b/services/src/quic_connection.rs @@ -4,7 +4,7 @@ use crate::quic_connection_utils::{ use futures::FutureExt; use log::warn; use prometheus::{core::GenericGauge, opts, register_int_gauge}; -use quinn::{Connection, Endpoint}; +use quinn::{Connection, Endpoint, VarInt}; use solana_lite_rpc_core::structures::rotating_queue::RotatingQueue; use solana_sdk::pubkey::Pubkey; use std::{ @@ -14,7 +14,7 @@ use std::{ Arc, }, }; -use tokio::sync::{Notify, OwnedSemaphorePermit, RwLock, Semaphore}; +use tokio::sync::{broadcast, OwnedSemaphorePermit, RwLock, Semaphore}; pub type EndpointPool = RotatingQueue; @@ -40,7 +40,6 @@ pub struct QuicConnection { identity: Pubkey, socket_address: SocketAddr, connection_params: QuicConnectionParameters, - exit_notify: Arc, timeout_counters: Arc, has_connected_once: Arc, } @@ -51,7 +50,6 @@ impl QuicConnection { endpoint: Endpoint, socket_address: SocketAddr, connection_params: QuicConnectionParameters, - exit_notify: Arc, ) -> Self { Self { connection: Arc::new(RwLock::new(None)), @@ -60,13 +58,16 @@ impl QuicConnection { identity, socket_address, connection_params, - exit_notify, timeout_counters: Arc::new(AtomicU64::new(0)), has_connected_once: Arc::new(AtomicBool::new(false)), } } - async fn connect(&self, is_already_connected: bool) -> Option { + async fn connect( + &self, + is_already_connected: bool, + exit_notify: broadcast::Receiver<()>, + ) -> Option { QuicConnectionUtils::connect( self.identity, is_already_connected, @@ -74,12 +75,12 @@ impl QuicConnection { self.socket_address, self.connection_params.connection_timeout, self.connection_params.connection_retry_count, - self.exit_notify.clone(), + exit_notify, ) .await } - pub async fn get_connection(&self) -> Option { + pub async fn get_connection(&self, exit_notify: broadcast::Receiver<()>) -> Option { // get new connection reset if necessary let last_stable_id = self.last_stable_id.load(Ordering::Relaxed) as usize; let conn = self.connection.read().await.clone(); @@ -95,7 +96,7 @@ impl QuicConnection { Some(connection) } else { NB_QUIC_CONNECTION_RESET.inc(); - let new_conn = self.connect(true).await; + let new_conn = self.connect(true, exit_notify).await; if let Some(new_conn) = new_conn { *conn = Some(new_conn); conn.clone() @@ -116,7 +117,7 @@ impl QuicConnection { // connection has recently been established/ just use it return (*lk).clone(); } - let connection = self.connect(false).await; + let connection = self.connect(false, exit_notify).await; *lk = connection.clone(); self.has_connected_once.store(true, Ordering::Relaxed); connection @@ -124,17 +125,16 @@ impl QuicConnection { } } - pub async fn send_transaction(&self, tx: Vec) { + pub async fn send_transaction(&self, tx: &Vec, mut exit_notify: broadcast::Receiver<()>) { let connection_retry_count = self.connection_params.connection_retry_count; for _ in 0..connection_retry_count { let mut do_retry = false; - let exit_notify = self.exit_notify.clone(); let connection = tokio::select! { - conn = self.get_connection() => { + conn = self.get_connection(exit_notify.resubscribe()) => { conn }, - _ = exit_notify.notified() => { + _ = exit_notify.recv() => { break; } }; @@ -149,7 +149,7 @@ impl QuicConnection { ) => { res }, - _ = exit_notify.notified() => { + _ = exit_notify.recv() => { break; } }; @@ -158,13 +158,13 @@ impl QuicConnection { let write_add_result = tokio::select! { res = QuicConnectionUtils::write_all( send_stream, - &tx, + tx, self.identity, self.connection_params, ) => { res }, - _ = exit_notify.notified() => { + _ = exit_notify.recv() => { break; } }; @@ -225,6 +225,13 @@ impl QuicConnection { None => false, } } + + pub async fn close(&self) { + let lk = self.connection.read().await; + if let Some(connection) = lk.as_ref() { + connection.close(VarInt::from_u32(0), b"Not needed"); + } + } } #[derive(Clone)] @@ -247,7 +254,6 @@ impl QuicConnectionPool { endpoints: EndpointPool, socket_address: SocketAddr, connection_parameters: QuicConnectionParameters, - exit_notify: Arc, nb_connection: usize, max_number_of_unistream_connection: usize, ) -> Self { @@ -259,7 +265,6 @@ impl QuicConnectionPool { endpoints.get().expect("Should get and endpoint"), socket_address, connection_parameters, - exit_notify.clone(), )); } Self { @@ -321,4 +326,10 @@ impl QuicConnectionPool { pub fn is_empty(&self) -> bool { self.connections.is_empty() } + + pub async fn close_all(&self) { + for connection in &self.connections { + connection.close().await; + } + } } diff --git a/services/src/quic_connection_utils.rs b/services/src/quic_connection_utils.rs index cc3a1da0..766dbe4c 100644 --- a/services/src/quic_connection_utils.rs +++ b/services/src/quic_connection_utils.rs @@ -14,7 +14,7 @@ use std::{ sync::Arc, time::Duration, }; -use tokio::{sync::Notify, time::timeout}; +use tokio::{sync::broadcast, time::timeout}; lazy_static::lazy_static! { static ref NB_QUIC_0RTT_ATTEMPTED: GenericGauge = @@ -45,6 +45,31 @@ lazy_static::lazy_static! { static ref NB_QUIC_FINISH_ERRORED: GenericGauge = register_int_gauge!(opts!("literpc_quic_finish_errored", "Number of times finish errored")).unwrap(); + + static ref NB_QUIC_CONNECTION_ERROR_VERSION_MISMATCH: GenericGauge = + register_int_gauge!(opts!("literpc_quic_connection_error_version_mismatch", "Number of times connection errored VersionMismatch")).unwrap(); + static ref NB_QUIC_CONNECTION_ERROR_TRANSPORT_ERROR: GenericGauge = + register_int_gauge!(opts!("literpc_quic_connection_error_transport_error", "Number of times connection errored TransportError")).unwrap(); + static ref NB_QUIC_CONNECTION_ERROR_CONNECTION_CLOSED: GenericGauge = + register_int_gauge!(opts!("literpc_quic_connection_error_connection_closed", "Number of times connection errored ConnectionClosed")).unwrap(); + static ref NB_QUIC_CONNECTION_ERROR_APPLICATION_CLOSED: GenericGauge = + register_int_gauge!(opts!("literpc_quic_connection_error_application_closed", "Number of times connection errored ApplicationClosed")).unwrap(); + static ref NB_QUIC_CONNECTION_ERROR_RESET: GenericGauge = + register_int_gauge!(opts!("literpc_quic_connection_error_reset", "Number of times connection errored Reset")).unwrap(); + static ref NB_QUIC_CONNECTION_ERROR_TIMEDOUT: GenericGauge = + register_int_gauge!(opts!("literpc_quic_connection_error_timed_out", "Number of times connection errored TimedOut")).unwrap(); + static ref NB_QUIC_CONNECTION_ERROR_LOCALLY_CLOSED: GenericGauge = + register_int_gauge!(opts!("literpc_quic_connection_error_locally_closed", "Number of times connection errored locally closed")).unwrap(); + + static ref NB_QUIC_WRITE_ERROR_STOPPED: GenericGauge = + register_int_gauge!(opts!("literpc_quic_write_error_stopped", "Number of times write_error Stopped")).unwrap(); + static ref NB_QUIC_WRITE_ERROR_CONNECTION_LOST: GenericGauge = + register_int_gauge!(opts!("literpc_quic_write_error_connection_lost", "Number of times write_error ConnectionLost")).unwrap(); + static ref NB_QUIC_WRITE_ERROR_UNKNOWN_STREAM: GenericGauge = + register_int_gauge!(opts!("literpc_quic_write_error_unknown_stream", "Number of times write_error UnknownStream")).unwrap(); + static ref NB_QUIC_WRITE_ERROR_0RTT_REJECT: GenericGauge = + register_int_gauge!(opts!("literpc_quic_write_error_0RTT_reject", "Number of times write_error ZeroRttRejected")).unwrap(); + static ref NB_QUIC_CONNECTIONS: GenericGauge = register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap(); @@ -83,19 +108,21 @@ pub struct QuicConnectionParameters { pub max_number_of_connections: usize, pub number_of_transactions_per_unistream: usize, pub unistreams_to_create_new_connection_in_percentage: u8, + pub prioritization_heap_size: Option, } impl Default for QuicConnectionParameters { fn default() -> Self { Self { - connection_timeout: Duration::from_millis(10000), + connection_timeout: Duration::from_millis(60000), unistream_timeout: Duration::from_millis(10000), write_timeout: Duration::from_millis(10000), - finalize_timeout: Duration::from_millis(10000), + finalize_timeout: Duration::from_millis(20000), connection_retry_count: 20, max_number_of_connections: 8, number_of_transactions_per_unistream: 1, unistreams_to_create_new_connection_in_percentage: 10, + prioritization_heap_size: None, } } } @@ -167,6 +194,25 @@ impl QuicConnectionUtils { } Err(e) => { NB_QUIC_CONNECTION_ERRORED.inc(); + match &e { + ConnectionError::VersionMismatch => { + NB_QUIC_CONNECTION_ERROR_VERSION_MISMATCH.inc() + } + ConnectionError::TransportError(_) => { + NB_QUIC_CONNECTION_ERROR_TRANSPORT_ERROR.inc() + } + ConnectionError::ConnectionClosed(_) => { + NB_QUIC_CONNECTION_ERROR_CONNECTION_CLOSED.inc() + } + ConnectionError::ApplicationClosed(_) => { + NB_QUIC_CONNECTION_ERROR_APPLICATION_CLOSED.inc() + } + ConnectionError::Reset => NB_QUIC_CONNECTION_ERROR_RESET.inc(), + ConnectionError::TimedOut => NB_QUIC_CONNECTION_ERROR_TIMEDOUT.inc(), + ConnectionError::LocallyClosed => { + NB_QUIC_CONNECTION_ERROR_LOCALLY_CLOSED.inc() + } + } Err(e.into()) } }, @@ -219,7 +265,7 @@ impl QuicConnectionUtils { addr: SocketAddr, connection_timeout: Duration, connection_retry_count: usize, - exit_notified: Arc, + mut exit_notified: broadcast::Receiver<()>, ) -> Option { for _ in 0..connection_retry_count { let conn = if already_connected { @@ -228,7 +274,7 @@ impl QuicConnectionUtils { res = Self::make_connection_0rtt(endpoint.clone(), addr, connection_timeout) => { res }, - _ = exit_notified.notified() => { + _ = exit_notified.recv() => { break; } } @@ -238,7 +284,7 @@ impl QuicConnectionUtils { res = Self::make_connection(endpoint.clone(), addr, connection_timeout) => { res }, - _ = exit_notified.notified() => { + _ = exit_notified.recv() => { break; } } @@ -271,6 +317,17 @@ impl QuicConnectionUtils { match write_timeout_res { Ok(write_res) => { if let Err(e) = write_res { + match &e { + quinn::WriteError::Stopped(_) => NB_QUIC_WRITE_ERROR_STOPPED.inc(), + quinn::WriteError::ConnectionLost(_) => { + NB_QUIC_WRITE_ERROR_CONNECTION_LOST.inc() + } + quinn::WriteError::UnknownStream => { + NB_QUIC_WRITE_ERROR_UNKNOWN_STREAM.inc() + } + quinn::WriteError::ZeroRttRejected => NB_QUIC_WRITE_ERROR_0RTT_REJECT.inc(), + }; + trace!( "Error while writing transaction for {}, error {}", identity, @@ -295,6 +352,16 @@ impl QuicConnectionUtils { match finish_timeout_res { Ok(finish_res) => { if let Err(e) = finish_res { + match &e { + quinn::WriteError::Stopped(_) => NB_QUIC_WRITE_ERROR_STOPPED.inc(), + quinn::WriteError::ConnectionLost(_) => { + NB_QUIC_WRITE_ERROR_CONNECTION_LOST.inc() + } + quinn::WriteError::UnknownStream => { + NB_QUIC_WRITE_ERROR_UNKNOWN_STREAM.inc() + } + quinn::WriteError::ZeroRttRejected => NB_QUIC_WRITE_ERROR_0RTT_REJECT.inc(), + }; trace!( "Error while finishing transaction for {}, error {}", identity, diff --git a/services/src/tpu_utils/quic_proxy_connection_manager.rs b/services/src/tpu_utils/quic_proxy_connection_manager.rs index 81982459..e62479fc 100644 --- a/services/src/tpu_utils/quic_proxy_connection_manager.rs +++ b/services/src/tpu_utils/quic_proxy_connection_manager.rs @@ -179,6 +179,7 @@ impl QuicProxyConnectionManager { transaction, .. }) => { + let transaction = transaction.as_ref().clone(); TxData::new(signature, transaction) }, Err(e) => { @@ -195,6 +196,7 @@ impl QuicProxyConnectionManager { transaction, .. }) => { + let transaction = transaction.as_ref().clone(); txs.push(TxData::new(signature, transaction)); }, Err(TryRecvError::Empty) => { diff --git a/services/src/tpu_utils/tpu_connection_manager.rs b/services/src/tpu_utils/tpu_connection_manager.rs index 44cae9c0..fa211a56 100644 --- a/services/src/tpu_utils/tpu_connection_manager.rs +++ b/services/src/tpu_utils/tpu_connection_manager.rs @@ -15,7 +15,7 @@ use solana_sdk::pubkey::Pubkey; use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams; use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; use tokio::sync::{ - broadcast::{Receiver, Sender}, + broadcast::{self, Receiver, Sender}, Notify, }; @@ -48,7 +48,7 @@ struct ActiveConnection { tpu_address: SocketAddr, data_cache: DataCache, connection_parameters: QuicConnectionParameters, - exit_notifier: Arc, + exit_notifier: broadcast::Sender<()>, } impl ActiveConnection { @@ -59,13 +59,14 @@ impl ActiveConnection { data_cache: DataCache, connection_parameters: QuicConnectionParameters, ) -> Self { + let (exit_notifier, _) = broadcast::channel(1); Self { endpoints, tpu_address, identity, data_cache, connection_parameters, - exit_notifier: Arc::new(Notify::new()), + exit_notifier, } } @@ -79,7 +80,6 @@ impl ActiveConnection { let fill_notify = Arc::new(Notify::new()); let identity = self.identity; - let exit_notifier = self.exit_notifier.clone(); NB_QUIC_ACTIVE_CONNECTIONS.inc(); @@ -95,18 +95,20 @@ impl ActiveConnection { self.endpoints.clone(), addr, self.connection_parameters, - exit_notifier.clone(), max_number_of_connections, max_uni_stream_connections, ); - - let priorization_heap = PrioritizationFeesHeap::new(2 * max_uni_stream_connections); + let prioritization_heap_size = self + .connection_parameters + .prioritization_heap_size + .unwrap_or(2 * max_uni_stream_connections); + let priorization_heap = PrioritizationFeesHeap::new(prioritization_heap_size); let heap_filler_task = { let priorization_heap = priorization_heap.clone(); let data_cache = self.data_cache.clone(); let fill_notify = fill_notify.clone(); - let exit_notifier = exit_notifier.clone(); + let mut exit_notifier = self.exit_notifier.subscribe(); tokio::spawn(async move { let mut current_blockheight = data_cache.block_information_store.get_last_blockheight(); @@ -115,7 +117,7 @@ impl ActiveConnection { tx = transaction_reciever.recv() => { tx }, - _ = exit_notifier.notified() => { + _ = exit_notifier.recv() => { break; } }; @@ -162,12 +164,14 @@ impl ActiveConnection { if let Ok(PooledConnection { connection, permit }) = connection_pool.get_pooled_connection().await { + let exit_notifier = self.exit_notifier.subscribe(); tokio::task::spawn(async move { let _permit = permit; - connection.get_connection().await; + connection.get_connection(exit_notifier).await; }); }; + let mut exit_notifier = self.exit_notifier.subscribe(); 'main_loop: loop { tokio::select! { _ = fill_notify.notified() => { @@ -194,6 +198,7 @@ impl ActiveConnection { break; }, }; + let exit_notifier = self.exit_notifier.subscribe(); tokio::spawn(async move { // permit will be used to send all the transaction and then destroyed @@ -202,13 +207,13 @@ impl ActiveConnection { NB_QUIC_TASKS.inc(); - connection.send_transaction(tx.transaction).await; + connection.send_transaction(tx.transaction.as_ref(), exit_notifier).await; timer.observe_duration(); NB_QUIC_TASKS.dec(); }); } }, - _ = exit_notifier.notified() => { + _ = exit_notifier.recv() => { break 'main_loop; } } @@ -218,6 +223,7 @@ impl ActiveConnection { let elements_removed = priorization_heap.clear().await; TRANSACTIONS_IN_HEAP.sub(elements_removed as i64); NB_QUIC_ACTIVE_CONNECTIONS.dec(); + connection_pool.close_all().await; } pub fn start_listening( @@ -243,9 +249,9 @@ impl TpuConnectionManager { pub async fn new( certificate: rustls::Certificate, key: rustls::PrivateKey, - fanout: usize, + _fanout: usize, ) -> Self { - let number_of_clients = fanout * 4; + let number_of_clients = 1; // fanout * 4; Self { endpoints: RotatingQueue::new(number_of_clients, || { QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone()) @@ -286,7 +292,7 @@ impl TpuConnectionManager { if !connections_to_keep.contains_key(key) { trace!("removing a connection for {}", key.to_string()); // ignore error for exit channel - value.exit_notifier.notify_waiters(); + let _ = value.exit_notifier.send(()); false } else { true diff --git a/services/src/tpu_utils/tpu_service.rs b/services/src/tpu_utils/tpu_service.rs index 596aecc4..9125e14c 100644 --- a/services/src/tpu_utils/tpu_service.rs +++ b/services/src/tpu_utils/tpu_service.rs @@ -17,6 +17,7 @@ use solana_lite_rpc_core::types::SlotStream; use solana_lite_rpc_core::AnyhowJoinHandle; use solana_sdk::{quic::QUIC_PORT_OFFSET, signature::Keypair, slot_history::Slot}; use solana_streamer::tls_certificates::new_self_signed_tls_certificate; +use std::collections::HashMap; use std::{ net::{IpAddr, Ipv4Addr}, sync::Arc, @@ -122,6 +123,7 @@ impl TpuService { ) -> anyhow::Result<()> { let fanout = self.config.fanout_slots; let last_slot = estimated_slot + fanout; + let current_slot = current_slot.saturating_sub(4); let cluster_nodes = self.data_cache.cluster_info.cluster_nodes.clone(); @@ -130,7 +132,7 @@ impl TpuService { .get_slot_leaders(current_slot, last_slot) .await?; // get next leader with its tpu port - let connections_to_keep = next_leaders + let connections_to_keep: HashMap<_, _> = next_leaders .iter() .map(|x| { let contact_info = cluster_nodes.get(&x.pubkey); diff --git a/services/src/transaction_service.rs b/services/src/transaction_service.rs index 2fb544fd..2eae1624 100644 --- a/services/src/transaction_service.rs +++ b/services/src/transaction_service.rs @@ -1,8 +1,7 @@ // This class will manage the lifecycle for a transaction // It will send, replay if necessary and confirm by listening to blocks -use log::trace; -use std::{num::IntErrorKind, time::Duration}; +use std::{sync::Arc, time::Duration}; use crate::{ tpu_utils::tpu_service::TpuService, @@ -23,7 +22,7 @@ use solana_lite_rpc_core::{ use solana_sdk::{ borsh0_10::try_from_slice_unchecked, compute_budget::{self, ComputeBudgetInstruction}, - transaction::VersionedTransaction, + transaction::{Transaction, VersionedTransaction}, }; use tokio::{ sync::mpsc::{self, Sender, UnboundedSender}, @@ -123,6 +122,15 @@ pub struct TransactionService { impl TransactionService { pub async fn send_transaction( + &self, + tx: Transaction, + max_retries: Option, + ) -> anyhow::Result { + let raw_tx = bincode::serialize(&tx)?; + self.send_wire_transaction(raw_tx, max_retries).await + } + + pub async fn send_wire_transaction( &self, raw_tx: Vec, max_retries: Option, @@ -182,7 +190,7 @@ impl TransactionService { signature, last_valid_block_height: last_valid_blockheight, slot, - transaction: raw_tx, + transaction: Arc::new(raw_tx), prioritization_fee, }; if let Err(e) = self