Compare commits
No commits in common. "7530c4d77c5cb7357f44caa83d5b70f8adac0fb7" and "122203d140254ff3d80fc5ca825366afd3eb9af7" have entirely different histories.
7530c4d77c
...
122203d140
|
@ -1220,22 +1220,8 @@ dependencies = [
|
|||
name = "custom-tpu-send-transactions"
|
||||
version = "0.2.4"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bincode",
|
||||
"clap 4.5.4",
|
||||
"dashmap 5.5.3",
|
||||
"futures",
|
||||
"itertools 0.10.5",
|
||||
"log",
|
||||
"rand 0.8.5",
|
||||
"rand_chacha 0.3.1",
|
||||
"solana-lite-rpc-cluster-endpoints",
|
||||
"solana-lite-rpc-core",
|
||||
"solana-lite-rpc-services",
|
||||
"solana-rpc-client",
|
||||
"solana-sdk",
|
||||
"tokio",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1793,7 +1779,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "geyser-grpc-connector"
|
||||
version = "0.10.1+yellowstone.1.12"
|
||||
source = "git+https://github.com/blockworks-foundation/geyser-grpc-connector.git?tag=v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize-with-broadcast-exit#688e4d241dd18d18f57345d592e803aa673fcd96"
|
||||
source = "git+https://github.com/blockworks-foundation/geyser-grpc-connector.git?tag=v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize4#ce6ca26028c4466e0236657a76b9db2cccf4d535"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-stream",
|
||||
|
|
|
@ -9,7 +9,7 @@ license = "AGPL"
|
|||
|
||||
[dependencies]
|
||||
#geyser-grpc-connector = { path = "../../geyser-grpc-connector" }
|
||||
geyser-grpc-connector = { tag = "v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize-with-broadcast-exit", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" }
|
||||
geyser-grpc-connector = { tag = "v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize4", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" }
|
||||
|
||||
solana-sdk = { workspace = true }
|
||||
solana-rpc-client-api = { workspace = true }
|
||||
|
|
|
@ -12,8 +12,10 @@ use solana_sdk::commitment_config::CommitmentConfig;
|
|||
use solana_lite_rpc_core::solana_utils::hash_from_str;
|
||||
use solana_lite_rpc_core::structures::block_info::BlockInfo;
|
||||
use std::collections::{BTreeSet, HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::broadcast::{self, Receiver};
|
||||
use tokio::sync::broadcast::Receiver;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::{sleep, Instant};
|
||||
use tracing::debug_span;
|
||||
|
@ -29,7 +31,7 @@ use crate::grpc_subscription::from_grpc_block_update;
|
|||
fn create_grpc_multiplex_processed_block_task(
|
||||
grpc_sources: &Vec<GrpcSourceConfig>,
|
||||
block_sender: tokio::sync::mpsc::Sender<ProducedBlock>,
|
||||
mut exit_notify: broadcast::Receiver<()>,
|
||||
exit_notify: Arc<Notify>,
|
||||
) -> Vec<JoinHandle<()>> {
|
||||
const COMMITMENT_CONFIG: CommitmentConfig = CommitmentConfig::processed();
|
||||
|
||||
|
@ -41,7 +43,7 @@ fn create_grpc_multiplex_processed_block_task(
|
|||
grpc_source.clone(),
|
||||
GeyserFilter(COMMITMENT_CONFIG).blocks_and_txs(),
|
||||
autoconnect_tx.clone(),
|
||||
exit_notify.resubscribe(),
|
||||
exit_notify.clone(),
|
||||
);
|
||||
tasks.push(task);
|
||||
}
|
||||
|
@ -49,7 +51,7 @@ fn create_grpc_multiplex_processed_block_task(
|
|||
let jh_merging_streams = tokio::task::spawn(async move {
|
||||
let mut slots_processed = BTreeSet::<u64>::new();
|
||||
let mut last_tick = Instant::now();
|
||||
'recv_loop: loop {
|
||||
loop {
|
||||
// recv loop
|
||||
if last_tick.elapsed() > Duration::from_millis(800) {
|
||||
warn!(
|
||||
|
@ -64,29 +66,22 @@ fn create_grpc_multiplex_processed_block_task(
|
|||
res = blocks_rx.recv() => {
|
||||
res
|
||||
},
|
||||
_ = exit_notify.recv() => {
|
||||
_ = exit_notify.notified() => {
|
||||
break;
|
||||
}
|
||||
};
|
||||
match blocks_rx_result {
|
||||
Some(Message::GeyserSubscribeUpdate(subscribe_update)) => {
|
||||
// note: avoid mapping of full block as long as possible
|
||||
let extracted_slot = extract_slot_from_yellowstone_update(&subscribe_update);
|
||||
if let Some(slot) = extracted_slot {
|
||||
let mapfilter =
|
||||
map_block_from_yellowstone_update(*subscribe_update, COMMITMENT_CONFIG);
|
||||
if let Some((slot, produced_block)) = mapfilter {
|
||||
assert_eq!(COMMITMENT_CONFIG, produced_block.commitment_config);
|
||||
// check if the slot is in the map, if not check if the container is half full and the slot in question is older than the lowest value
|
||||
// it means that the slot is too old to process
|
||||
if slots_processed.contains(&slot) {
|
||||
continue 'recv_loop;
|
||||
}
|
||||
if slots_processed.len() >= MAX_SIZE / 2
|
||||
&& slot <= slots_processed.first().cloned().unwrap_or_default()
|
||||
if !slots_processed.contains(&slot)
|
||||
&& (slots_processed.len() < MAX_SIZE / 2
|
||||
|| slot > slots_processed.first().cloned().unwrap_or_default())
|
||||
{
|
||||
continue 'recv_loop;
|
||||
}
|
||||
|
||||
let mapfilter =
|
||||
map_block_from_yellowstone_update(*subscribe_update, COMMITMENT_CONFIG);
|
||||
if let Some((_slot, produced_block)) = mapfilter {
|
||||
let send_started_at = Instant::now();
|
||||
let send_result = block_sender
|
||||
.send(produced_block)
|
||||
|
@ -135,7 +130,7 @@ fn create_grpc_multiplex_block_info_task(
|
|||
grpc_sources: &Vec<GrpcSourceConfig>,
|
||||
block_info_sender: tokio::sync::mpsc::Sender<BlockInfo>,
|
||||
commitment_config: CommitmentConfig,
|
||||
mut exit_notify: broadcast::Receiver<()>,
|
||||
exit_notify: Arc<Notify>,
|
||||
) -> Vec<JoinHandle<()>> {
|
||||
let (autoconnect_tx, mut blocks_rx) = tokio::sync::mpsc::channel(10);
|
||||
let mut tasks = vec![];
|
||||
|
@ -144,7 +139,7 @@ fn create_grpc_multiplex_block_info_task(
|
|||
grpc_source.clone(),
|
||||
GeyserFilter(commitment_config).blocks_meta(),
|
||||
autoconnect_tx.clone(),
|
||||
exit_notify.resubscribe(),
|
||||
exit_notify.clone(),
|
||||
);
|
||||
tasks.push(task);
|
||||
}
|
||||
|
@ -156,7 +151,7 @@ fn create_grpc_multiplex_block_info_task(
|
|||
res = blocks_rx.recv() => {
|
||||
res
|
||||
},
|
||||
_ = exit_notify.recv() => {
|
||||
_ = exit_notify.notified() => {
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
@ -268,7 +263,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
|
|||
tokio::sync::mpsc::channel::<BlockInfo>(500);
|
||||
let (block_info_sender_finalized, mut block_info_reciever_finalized) =
|
||||
tokio::sync::mpsc::channel::<BlockInfo>(500);
|
||||
let (exit_sender, exit_notify) = broadcast::channel(1);
|
||||
let exit_notify = Arc::new(Notify::new());
|
||||
|
||||
let processed_block_sender = processed_block_sender.clone();
|
||||
reconnect_attempts += 1;
|
||||
|
@ -285,7 +280,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
|
|||
let processed_blocks_tasks = create_grpc_multiplex_processed_block_task(
|
||||
&grpc_sources,
|
||||
processed_block_sender.clone(),
|
||||
exit_notify.resubscribe(),
|
||||
exit_notify.clone(),
|
||||
);
|
||||
task_list.extend(processed_blocks_tasks);
|
||||
|
||||
|
@ -295,21 +290,21 @@ pub fn create_grpc_multiplex_blocks_subscription(
|
|||
&grpc_sources,
|
||||
block_info_sender_processed.clone(),
|
||||
CommitmentConfig::processed(),
|
||||
exit_notify.resubscribe(),
|
||||
exit_notify.clone(),
|
||||
);
|
||||
task_list.extend(jh_meta_task_processed);
|
||||
let jh_meta_task_confirmed = create_grpc_multiplex_block_info_task(
|
||||
&grpc_sources,
|
||||
block_info_sender_confirmed.clone(),
|
||||
CommitmentConfig::confirmed(),
|
||||
exit_notify.resubscribe(),
|
||||
exit_notify.clone(),
|
||||
);
|
||||
task_list.extend(jh_meta_task_confirmed);
|
||||
let jh_meta_task_finalized = create_grpc_multiplex_block_info_task(
|
||||
&grpc_sources,
|
||||
block_info_sender_finalized.clone(),
|
||||
CommitmentConfig::finalized(),
|
||||
exit_notify,
|
||||
exit_notify.clone(),
|
||||
);
|
||||
task_list.extend(jh_meta_task_finalized);
|
||||
|
||||
|
@ -447,12 +442,8 @@ pub fn create_grpc_multiplex_blocks_subscription(
|
|||
}
|
||||
}
|
||||
} // -- END receiver loop
|
||||
if exit_sender.send(()).is_ok() {
|
||||
futures::future::join_all(task_list).await;
|
||||
} else {
|
||||
log::error!("Problem sending exit signal");
|
||||
task_list.iter().for_each(|x| x.abort());
|
||||
}
|
||||
exit_notify.notify_waiters();
|
||||
futures::future::join_all(task_list).await;
|
||||
} // -- END reconnect loop
|
||||
});
|
||||
|
||||
|
@ -483,9 +474,9 @@ pub fn create_grpc_multiplex_processed_slots_subscription(
|
|||
let jh_multiplex_task = tokio::spawn(async move {
|
||||
loop {
|
||||
let (autoconnect_tx, mut slots_rx) = tokio::sync::mpsc::channel(10);
|
||||
let (exit_sender, exit_notify) = broadcast::channel(1);
|
||||
let exit_notify = Arc::new(Notify::new());
|
||||
|
||||
let task_list = grpc_sources
|
||||
let tasks = grpc_sources
|
||||
.clone()
|
||||
.iter()
|
||||
.map(|grpc_source| {
|
||||
|
@ -493,7 +484,7 @@ pub fn create_grpc_multiplex_processed_slots_subscription(
|
|||
grpc_source.clone(),
|
||||
GeyserFilter(COMMITMENT_CONFIG).slots(),
|
||||
autoconnect_tx.clone(),
|
||||
exit_notify.resubscribe(),
|
||||
exit_notify.clone(),
|
||||
)
|
||||
})
|
||||
.collect_vec();
|
||||
|
@ -546,29 +537,14 @@ pub fn create_grpc_multiplex_processed_slots_subscription(
|
|||
}
|
||||
}
|
||||
} // -- END receiver loop
|
||||
|
||||
if exit_sender.send(()).is_ok() {
|
||||
futures::future::join_all(task_list).await;
|
||||
} else {
|
||||
log::error!("Problem sending exit signal");
|
||||
task_list.iter().for_each(|x| x.abort());
|
||||
}
|
||||
exit_notify.notify_waiters();
|
||||
futures::future::join_all(tasks).await;
|
||||
} // -- END reconnect loop
|
||||
});
|
||||
|
||||
(multiplexed_messages_rx, jh_multiplex_task)
|
||||
}
|
||||
|
||||
fn extract_slot_from_yellowstone_update(update: &SubscribeUpdate) -> Option<Slot> {
|
||||
match &update.update_oneof {
|
||||
// list is not exhaustive
|
||||
Some(UpdateOneof::Slot(update_message)) => Some(update_message.slot),
|
||||
Some(UpdateOneof::BlockMeta(update_message)) => Some(update_message.slot),
|
||||
Some(UpdateOneof::Block(update_message)) => Some(update_message.slot),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn map_slot_from_yellowstone_update(update: SubscribeUpdate) -> Option<Slot> {
|
||||
match update.update_oneof {
|
||||
Some(UpdateOneof::Slot(update_slot_message)) => Some(update_slot_message.slot),
|
||||
|
|
|
@ -36,7 +36,7 @@ use solana_transaction_status::{Reward, RewardType};
|
|||
use std::cell::OnceCell;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{broadcast, Notify};
|
||||
use tokio::sync::Notify;
|
||||
use tracing::trace_span;
|
||||
use yellowstone_grpc_client::GeyserGrpcClient;
|
||||
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
|
||||
|
@ -273,12 +273,12 @@ fn map_compute_budget_instructions(message: &VersionedMessage) -> (Option<u32>,
|
|||
pub fn create_block_processing_task(
|
||||
grpc_addr: String,
|
||||
grpc_x_token: Option<String>,
|
||||
block_sx: tokio::sync::mpsc::Sender<SubscribeUpdateBlock>,
|
||||
block_sx: async_channel::Sender<SubscribeUpdateBlock>,
|
||||
commitment_level: CommitmentLevel,
|
||||
mut exit_notify: broadcast::Receiver<()>,
|
||||
exit_notfier: Arc<Notify>,
|
||||
) -> AnyhowJoinHandle {
|
||||
tokio::spawn(async move {
|
||||
'main_loop: loop {
|
||||
loop {
|
||||
let mut blocks_subs = HashMap::new();
|
||||
blocks_subs.insert(
|
||||
"block_client".to_string(),
|
||||
|
@ -293,8 +293,7 @@ pub fn create_block_processing_task(
|
|||
// connect to grpc
|
||||
let mut client =
|
||||
connect_with_timeout_hacked(grpc_addr.clone(), grpc_x_token.clone()).await?;
|
||||
let mut stream = tokio::select! {
|
||||
res = client
|
||||
let mut stream = client
|
||||
.subscribe_once(
|
||||
HashMap::new(),
|
||||
Default::default(),
|
||||
|
@ -305,13 +304,8 @@ pub fn create_block_processing_task(
|
|||
Some(commitment_level),
|
||||
Default::default(),
|
||||
None,
|
||||
) => {
|
||||
res?
|
||||
},
|
||||
_ = exit_notify.recv() => {
|
||||
break;
|
||||
}
|
||||
};
|
||||
)
|
||||
.await?;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
|
@ -344,8 +338,8 @@ pub fn create_block_processing_task(
|
|||
}
|
||||
};
|
||||
},
|
||||
_ = exit_notify.recv() => {
|
||||
break 'main_loop;
|
||||
_ = exit_notfier.notified() => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -354,7 +348,6 @@ pub fn create_block_processing_task(
|
|||
log::error!("Grpc block subscription broken (resubscribing)");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -362,7 +355,7 @@ pub fn create_block_processing_task(
|
|||
pub fn create_slot_stream_task(
|
||||
grpc_addr: String,
|
||||
grpc_x_token: Option<String>,
|
||||
slot_sx: tokio::sync::mpsc::Sender<SubscribeUpdateSlot>,
|
||||
slot_sx: async_channel::Sender<SubscribeUpdateSlot>,
|
||||
commitment_level: CommitmentLevel,
|
||||
) -> AnyhowJoinHandle {
|
||||
tokio::spawn(async move {
|
||||
|
|
|
@ -14,6 +14,7 @@ pub fn poll_cluster_info(
|
|||
loop {
|
||||
match rpc_client.get_cluster_nodes().await {
|
||||
Ok(cluster_nodes) => {
|
||||
debug!("get cluster_nodes from rpc: {:?}", cluster_nodes.len());
|
||||
if let Err(e) = contact_info_sender.send(cluster_nodes) {
|
||||
warn!("rpc_cluster_info channel has no receivers {e:?}");
|
||||
}
|
||||
|
@ -22,7 +23,7 @@ pub fn poll_cluster_info(
|
|||
Err(error) => {
|
||||
warn!("rpc_cluster_info failed <{:?}> - retrying", error);
|
||||
// throttle
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
tokio::time::sleep(Duration::from_secs(2500)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -50,7 +51,7 @@ pub fn poll_vote_accounts(
|
|||
Err(error) => {
|
||||
warn!("rpc_vote_accounts failed <{:?}> - retrying", error);
|
||||
// throttle
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
tokio::time::sleep(Duration::from_secs(2500)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,7 +10,7 @@
|
|||
"transaction_retry_after_secs": 3,
|
||||
"quic_proxy_addr": null,
|
||||
"use_grpc": false,
|
||||
"calculate_leader_schedule_from_geyser": false,
|
||||
"calculate_leader_schedule_form_geyser": false,
|
||||
"grpc_addr": "http://127.0.0.0:10000",
|
||||
"grpc_x_token": null,
|
||||
"postgres": {
|
||||
|
|
|
@ -2,21 +2,22 @@ use anyhow::Context;
|
|||
use solana_sdk::signature::Keypair;
|
||||
use std::env;
|
||||
|
||||
// note this is duplicated from lite-rpc module
|
||||
pub async fn load_identity_keypair(
|
||||
identity_keyfile_path: Option<String>,
|
||||
identity_path: Option<String>,
|
||||
) -> anyhow::Result<Option<Keypair>> {
|
||||
let identity_jsonarray_str = if let Ok(identity_env_var) = env::var("IDENTITY") {
|
||||
identity_env_var
|
||||
} else if let Some(identity_path) = identity_keyfile_path {
|
||||
tokio::fs::read_to_string(identity_path)
|
||||
let identity_str = if let Some(identity_from_cli) = identity_path {
|
||||
tokio::fs::read_to_string(identity_from_cli)
|
||||
.await
|
||||
.context("Cannot find the identity file provided")?
|
||||
} else if let Ok(identity_env_var) = env::var("IDENTITY") {
|
||||
identity_env_var
|
||||
} else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let identity_bytes: Vec<u8> = serde_json::from_str(&identity_jsonarray_str)
|
||||
.context("Invalid identity format expected Vec<u8>")?;
|
||||
let identity_bytes: Vec<u8> =
|
||||
serde_json::from_str(&identity_str).context("Invalid identity format expected Vec<u8>")?;
|
||||
|
||||
Ok(Some(
|
||||
Keypair::from_bytes(identity_bytes.as_slice()).context("Invalid identity")?,
|
||||
|
|
|
@ -8,6 +8,5 @@ pub mod stores;
|
|||
pub mod structures;
|
||||
pub mod traits;
|
||||
pub mod types;
|
||||
pub mod utils;
|
||||
|
||||
pub type AnyhowJoinHandle = tokio::task::JoinHandle<anyhow::Result<()>>;
|
||||
|
|
|
@ -127,7 +127,7 @@ impl PrioritizationFeesHeap {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use solana_sdk::signature::Signature;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::structures::{
|
||||
prioritization_fee_heap::PrioritizationFeesHeap, transaction_sent_info::SentTransactionInfo,
|
||||
|
@ -139,7 +139,7 @@ mod tests {
|
|||
let tx_creator = |signature, prioritization_fee| SentTransactionInfo {
|
||||
signature,
|
||||
slot: 0,
|
||||
transaction: Arc::new(vec![]),
|
||||
transaction: vec![],
|
||||
last_valid_block_height: 0,
|
||||
prioritization_fee,
|
||||
};
|
||||
|
@ -205,7 +205,7 @@ mod tests {
|
|||
let info = SentTransactionInfo {
|
||||
signature: Signature::new_unique(),
|
||||
slot: height + 1,
|
||||
transaction: Arc::new(vec![]),
|
||||
transaction: vec![],
|
||||
last_valid_block_height: height + 10,
|
||||
prioritization_fee,
|
||||
};
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use solana_sdk::signature::Signature;
|
||||
use solana_sdk::slot_history::Slot;
|
||||
|
||||
|
@ -9,7 +7,7 @@ pub type WireTransaction = Vec<u8>;
|
|||
pub struct SentTransactionInfo {
|
||||
pub signature: Signature,
|
||||
pub slot: Slot,
|
||||
pub transaction: Arc<WireTransaction>,
|
||||
pub transaction: WireTransaction,
|
||||
pub last_valid_block_height: u64,
|
||||
pub prioritization_fee: u64,
|
||||
}
|
||||
|
|
|
@ -1,33 +0,0 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use log::debug;
|
||||
use solana_sdk::commitment_config::CommitmentConfig;
|
||||
use tokio::time::{timeout, Instant};
|
||||
|
||||
use crate::{structures::block_info::BlockInfo, types::BlockInfoStream};
|
||||
|
||||
pub async fn wait_till_block_of_commitment_is_recieved(
|
||||
mut blockinfo_stream: BlockInfoStream,
|
||||
commitment_config: CommitmentConfig,
|
||||
) -> BlockInfo {
|
||||
let started = Instant::now();
|
||||
loop {
|
||||
match timeout(Duration::from_millis(1000), blockinfo_stream.recv()).await {
|
||||
Ok(Ok(block_info)) => {
|
||||
if block_info.commitment_config == commitment_config {
|
||||
return block_info;
|
||||
}
|
||||
}
|
||||
Err(_elapsed) => {
|
||||
debug!(
|
||||
"waiting for latest block info ({}) ... {:.02}ms",
|
||||
commitment_config.commitment,
|
||||
started.elapsed().as_secs_f32() * 1000.0
|
||||
);
|
||||
}
|
||||
Ok(Err(error)) => {
|
||||
panic!("Did not recv block info : {error:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -11,19 +11,3 @@ edition.workspace = true
|
|||
[dependencies]
|
||||
solana-lite-rpc-services = {workspace = true}
|
||||
solana-lite-rpc-core = {workspace = true}
|
||||
solana-lite-rpc-cluster-endpoints = {workspace = true}
|
||||
|
||||
solana-sdk = { workspace = true }
|
||||
solana-rpc-client = { workspace = true }
|
||||
|
||||
tokio = "1.28.2"
|
||||
clap = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
dashmap = { workspace = true }
|
||||
rand = "0.8.5"
|
||||
rand_chacha = "0.3.1"
|
||||
log = { workspace = true }
|
||||
itertools = { workspace = true }
|
||||
bincode = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
|
@ -1,39 +0,0 @@
|
|||
use clap::Parser;
|
||||
|
||||
#[derive(Parser, Debug, Clone)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
pub struct Args {
|
||||
/// config.json
|
||||
#[arg(short, long, default_value = "http://127.0.0.1:8899")]
|
||||
pub rpc_url: String,
|
||||
|
||||
#[arg(short, long)]
|
||||
pub grpc_url: Option<String>,
|
||||
|
||||
#[arg(short, long)]
|
||||
pub x_token: Option<String>,
|
||||
|
||||
#[arg(short, long)]
|
||||
pub transaction_count: Option<usize>,
|
||||
|
||||
#[arg(short, long, default_value_t = 1)]
|
||||
pub number_of_seconds: usize,
|
||||
|
||||
#[arg(short, long)]
|
||||
pub fee_payer: String,
|
||||
|
||||
#[arg(short, long)]
|
||||
pub staked_identity: Option<String>,
|
||||
|
||||
#[arg(short, long)]
|
||||
pub priority_fees: Option<u64>,
|
||||
|
||||
#[arg(short = 'a', long, default_value_t = 256)]
|
||||
pub additional_signers: usize,
|
||||
|
||||
#[arg(short = 'b', long, default_value_t = 0.1)]
|
||||
pub signers_transfer_balance: f64,
|
||||
|
||||
#[arg(long)]
|
||||
pub fanout_slots: Option<u64>,
|
||||
}
|
|
@ -1,519 +1,3 @@
|
|||
use std::{collections::HashSet, ops::Mul, str::FromStr, sync::Arc, time::Duration};
|
||||
|
||||
use clap::Parser;
|
||||
use dashmap::{DashMap, DashSet};
|
||||
use itertools::Itertools;
|
||||
use rand::{
|
||||
distributions::{Alphanumeric, Distribution},
|
||||
SeedableRng,
|
||||
};
|
||||
use solana_lite_rpc_cluster_endpoints::{
|
||||
geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig},
|
||||
grpc_subscription::create_grpc_subscription,
|
||||
json_rpc_leaders_getter::JsonRpcLeaderGetter,
|
||||
json_rpc_subscription::create_json_rpc_polling_subscription,
|
||||
};
|
||||
use solana_lite_rpc_core::{
|
||||
keypair_loader::load_identity_keypair,
|
||||
stores::{
|
||||
block_information_store::{BlockInformation, BlockInformationStore},
|
||||
cluster_info_store::ClusterInfo,
|
||||
data_cache::{DataCache, SlotCache},
|
||||
subscription_store::SubscriptionStore,
|
||||
tx_store::TxStore,
|
||||
},
|
||||
structures::{
|
||||
epoch::EpochCache, identity_stakes::IdentityStakes, leaderschedule::CalculatedSchedule,
|
||||
transaction_sent_info::SentTransactionInfo,
|
||||
},
|
||||
utils::wait_till_block_of_commitment_is_recieved,
|
||||
};
|
||||
use solana_lite_rpc_services::{
|
||||
data_caching_service::DataCachingService,
|
||||
quic_connection_utils::QuicConnectionParameters,
|
||||
tpu_utils::{
|
||||
tpu_connection_path::TpuConnectionPath,
|
||||
tpu_service::{TpuService, TpuServiceConfig},
|
||||
},
|
||||
transaction_replayer::TransactionReplayer,
|
||||
transaction_service::TransactionServiceBuilder,
|
||||
tx_sender::TxSender,
|
||||
};
|
||||
use solana_sdk::{
|
||||
commitment_config::CommitmentConfig,
|
||||
compute_budget,
|
||||
hash::Hash,
|
||||
instruction::Instruction,
|
||||
message::Message,
|
||||
native_token::LAMPORTS_PER_SOL,
|
||||
pubkey::Pubkey,
|
||||
signature::{Keypair, Signature},
|
||||
signer::Signer,
|
||||
system_instruction,
|
||||
transaction::Transaction,
|
||||
};
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
|
||||
use crate::cli::Args;
|
||||
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
|
||||
|
||||
mod cli;
|
||||
|
||||
const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr";
|
||||
|
||||
pub fn create_memo_tx(msg: &[u8], payer: &Keypair, blockhash: Hash, prio_fees: u64) -> Transaction {
|
||||
let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap();
|
||||
|
||||
let cb_1 = compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(7000);
|
||||
let cb_2 = compute_budget::ComputeBudgetInstruction::set_compute_unit_price(prio_fees);
|
||||
let instruction = Instruction::new_with_bytes(memo, msg, vec![]);
|
||||
let message = Message::new(&[cb_1, cb_2, instruction], Some(&payer.pubkey()));
|
||||
Transaction::new(&[payer], message, blockhash)
|
||||
}
|
||||
|
||||
pub fn generate_random_strings(
|
||||
num_of_txs: usize,
|
||||
random_seed: Option<u64>,
|
||||
n_chars: usize,
|
||||
) -> Vec<Vec<u8>> {
|
||||
let seed = random_seed.map_or(0, |x| x);
|
||||
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(seed);
|
||||
(0..num_of_txs)
|
||||
.map(|_| Alphanumeric.sample_iter(&mut rng).take(n_chars).collect())
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub async fn create_signers_from_payer(
|
||||
rpc_client: Arc<RpcClient>,
|
||||
payer: Arc<Keypair>,
|
||||
nb_signers: usize,
|
||||
signer_balance: u64,
|
||||
prio_fees: u64,
|
||||
) -> Vec<Arc<Keypair>> {
|
||||
let signers = (0..nb_signers)
|
||||
.map(|_| Arc::new(Keypair::new()))
|
||||
.collect_vec();
|
||||
let mut signers_to_transfer: HashSet<Pubkey> = signers.iter().map(|kp| kp.pubkey()).collect();
|
||||
|
||||
while !signers_to_transfer.is_empty() {
|
||||
let Ok(blockhash) = rpc_client.get_latest_blockhash().await else {
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
continue;
|
||||
};
|
||||
|
||||
let cb_1 = compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(5000);
|
||||
let cb_2 = compute_budget::ComputeBudgetInstruction::set_compute_unit_price(prio_fees);
|
||||
|
||||
let transactions = signers_to_transfer
|
||||
.iter()
|
||||
.map(|signer| {
|
||||
let instruction =
|
||||
system_instruction::transfer(&payer.pubkey(), signer, signer_balance);
|
||||
let message = Message::new(
|
||||
&[cb_1.clone(), cb_2.clone(), instruction],
|
||||
Some(&payer.pubkey()),
|
||||
);
|
||||
(*signer, Transaction::new(&[&payer], message, blockhash))
|
||||
})
|
||||
.collect_vec();
|
||||
let tasks = transactions
|
||||
.iter()
|
||||
.map(|(signer, tx)| {
|
||||
let rpc_client = rpc_client.clone();
|
||||
let tx = tx.clone();
|
||||
let signer = *signer;
|
||||
tokio::spawn(
|
||||
async move { (signer, rpc_client.send_and_confirm_transaction(&tx).await) },
|
||||
)
|
||||
})
|
||||
.collect_vec();
|
||||
|
||||
let results = futures::future::join_all(tasks).await;
|
||||
for result in results {
|
||||
match result {
|
||||
Ok((signer, Ok(_signature))) => {
|
||||
signers_to_transfer.remove(&signer);
|
||||
}
|
||||
Ok((signer, Err(e))) => {
|
||||
log::error!("Error transfering to {signer:?}, {e:?}");
|
||||
}
|
||||
_ => {
|
||||
// retry
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
signers
|
||||
}
|
||||
|
||||
pub async fn transfer_back_to_payer(
|
||||
rpc_client: Arc<RpcClient>,
|
||||
payer: Arc<Keypair>,
|
||||
signers: Vec<Arc<Keypair>>,
|
||||
prio_fees: u64,
|
||||
) {
|
||||
let mut signers_to_transfer: HashSet<Pubkey> = signers.iter().map(|kp| kp.pubkey()).collect();
|
||||
let payer_pubkey = payer.pubkey();
|
||||
|
||||
while !signers_to_transfer.is_empty() {
|
||||
let Ok(blockhash) = rpc_client.get_latest_blockhash().await else {
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
continue;
|
||||
};
|
||||
|
||||
let transfers = signers
|
||||
.iter()
|
||||
.map(|signer| {
|
||||
let rpc_client = rpc_client.clone();
|
||||
let signer = signer.clone();
|
||||
tokio::spawn(async move {
|
||||
let balance = rpc_client.get_balance(&signer.pubkey()).await.unwrap();
|
||||
|
||||
let cb_1 =
|
||||
compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(5000);
|
||||
let cb_2 =
|
||||
compute_budget::ComputeBudgetInstruction::set_compute_unit_price(prio_fees);
|
||||
let balance_to_transfer = balance
|
||||
.saturating_sub(5000)
|
||||
.saturating_sub(5000 * prio_fees);
|
||||
let instruction = system_instruction::transfer(
|
||||
&signer.pubkey(),
|
||||
&payer_pubkey,
|
||||
balance_to_transfer,
|
||||
);
|
||||
let message = Message::new(
|
||||
&[cb_1.clone(), cb_2.clone(), instruction],
|
||||
Some(&signer.pubkey()),
|
||||
);
|
||||
(
|
||||
signer.pubkey(),
|
||||
rpc_client
|
||||
.send_and_confirm_transaction(&Transaction::new(
|
||||
&[&signer],
|
||||
message,
|
||||
blockhash,
|
||||
))
|
||||
.await,
|
||||
)
|
||||
})
|
||||
})
|
||||
.collect_vec();
|
||||
|
||||
let results = futures::future::join_all(transfers).await;
|
||||
for result in results {
|
||||
match result {
|
||||
Ok((signer, Ok(_signature))) => {
|
||||
signers_to_transfer.remove(&signer);
|
||||
}
|
||||
Ok((signer, Err(e))) => {
|
||||
log::error!("Error transfering to {signer:?}, {e:?}");
|
||||
}
|
||||
_ => {
|
||||
// retry
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt::init();
|
||||
let args = Args::parse();
|
||||
|
||||
let rpc_url = args.rpc_url;
|
||||
let rpc_client = Arc::new(RpcClient::new(rpc_url));
|
||||
|
||||
let leader_schedule = Arc::new(JsonRpcLeaderGetter::new(rpc_client.clone(), 1024, 128));
|
||||
|
||||
let fee_payer = Arc::new(
|
||||
load_identity_keypair(Some(args.fee_payer))
|
||||
.await
|
||||
.expect("Payer should be set or keypair file not found")
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
let priority_fee = args.priority_fees.unwrap_or_default();
|
||||
let nb_signers = args.additional_signers;
|
||||
let signer_balance = args.signers_transfer_balance.mul(LAMPORTS_PER_SOL as f64) as u64;
|
||||
let signers = create_signers_from_payer(
|
||||
rpc_client.clone(),
|
||||
fee_payer.clone(),
|
||||
nb_signers,
|
||||
signer_balance,
|
||||
priority_fee,
|
||||
)
|
||||
.await;
|
||||
|
||||
println!(
|
||||
"Creating {} users with {} SOL balance",
|
||||
nb_signers, signer_balance
|
||||
);
|
||||
|
||||
let validator_identity = Arc::new(
|
||||
load_identity_keypair(args.staked_identity)
|
||||
.await?
|
||||
.unwrap_or_else(Keypair::new),
|
||||
);
|
||||
|
||||
// START ALL SERVICES REQUIRED BY LITE_RPC
|
||||
// setup endpoint, GRPC/RPC Polling
|
||||
println!("Setting up lite-rpc tpu service");
|
||||
let (endpoints, _handles) = if let Some(grpc_addr) = args.grpc_url {
|
||||
let timeouts = GrpcConnectionTimeouts {
|
||||
connect_timeout: Duration::from_secs(10),
|
||||
request_timeout: Duration::from_secs(10),
|
||||
subscribe_timeout: Duration::from_secs(10),
|
||||
receive_timeout: Duration::from_secs(10),
|
||||
};
|
||||
create_grpc_subscription(
|
||||
rpc_client.clone(),
|
||||
vec![GrpcSourceConfig::new(
|
||||
grpc_addr,
|
||||
args.x_token.clone(),
|
||||
None,
|
||||
timeouts,
|
||||
)],
|
||||
vec![],
|
||||
)?
|
||||
} else {
|
||||
create_json_rpc_polling_subscription(rpc_client.clone(), 100)?
|
||||
};
|
||||
|
||||
let finalized_block_information = wait_till_block_of_commitment_is_recieved(
|
||||
endpoints.blockinfo_notifier.resubscribe(),
|
||||
CommitmentConfig::finalized(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let block_height = rpc_client
|
||||
.get_block_height_with_commitment(CommitmentConfig::finalized())
|
||||
.await?;
|
||||
let (blockhash, _) = rpc_client
|
||||
.get_latest_blockhash_with_commitment(CommitmentConfig::finalized())
|
||||
.await?;
|
||||
let finalize_slot = finalized_block_information.slot;
|
||||
|
||||
println!(
|
||||
"finalized blockheight : {:?}, slot: {}, hash: {}",
|
||||
finalized_block_information.block_height,
|
||||
finalized_block_information.slot,
|
||||
finalized_block_information.blockhash
|
||||
);
|
||||
println!(
|
||||
"From RPC blockheight : {block_height:?}, hash: {}",
|
||||
blockhash
|
||||
);
|
||||
|
||||
let finalized_block_information = BlockInformation {
|
||||
slot: finalized_block_information.slot,
|
||||
block_height,
|
||||
last_valid_blockheight: finalized_block_information.block_height + 300,
|
||||
cleanup_slot: finalized_block_information.slot + 1000000,
|
||||
blockhash: finalized_block_information.blockhash,
|
||||
commitment_config: CommitmentConfig::finalized(),
|
||||
block_time: 0,
|
||||
};
|
||||
|
||||
let block_information_store = BlockInformationStore::new(finalized_block_information);
|
||||
|
||||
let data_cache = DataCache {
|
||||
block_information_store,
|
||||
cluster_info: ClusterInfo::default(),
|
||||
identity_stakes: IdentityStakes::new(validator_identity.pubkey()),
|
||||
slot_cache: SlotCache::new(finalize_slot),
|
||||
tx_subs: SubscriptionStore::default(),
|
||||
txs: TxStore {
|
||||
store: Arc::new(DashMap::new()),
|
||||
},
|
||||
epoch_data: EpochCache::new_for_tests(),
|
||||
leader_schedule: Arc::new(RwLock::new(CalculatedSchedule::default())),
|
||||
};
|
||||
|
||||
let data_cache_service = DataCachingService {
|
||||
data_cache: data_cache.clone(),
|
||||
clean_duration: Duration::from_secs(120),
|
||||
};
|
||||
|
||||
// start listning the cluster data and filling the cache
|
||||
data_cache_service.listen(
|
||||
endpoints.blocks_notifier.resubscribe(),
|
||||
endpoints.blockinfo_notifier,
|
||||
endpoints.slot_notifier.resubscribe(),
|
||||
endpoints.cluster_info_notifier,
|
||||
endpoints.vote_account_notifier,
|
||||
);
|
||||
|
||||
let count = args.transaction_count.unwrap_or(10);
|
||||
let prioritization_heap_size = Some(count * args.number_of_seconds);
|
||||
|
||||
let tpu_config = TpuServiceConfig {
|
||||
fanout_slots: args.fanout_slots.unwrap_or(16),
|
||||
maximum_transaction_in_queue: 2000000,
|
||||
quic_connection_params: QuicConnectionParameters {
|
||||
connection_timeout: Duration::from_secs(60),
|
||||
connection_retry_count: 10,
|
||||
finalize_timeout: Duration::from_millis(10000),
|
||||
max_number_of_connections: 4,
|
||||
unistream_timeout: Duration::from_millis(1000),
|
||||
write_timeout: Duration::from_secs(10),
|
||||
number_of_transactions_per_unistream: 1,
|
||||
unistreams_to_create_new_connection_in_percentage: 5,
|
||||
prioritization_heap_size,
|
||||
},
|
||||
tpu_connection_path: TpuConnectionPath::QuicDirectPath,
|
||||
};
|
||||
|
||||
let tpu_service: TpuService = TpuService::new(
|
||||
tpu_config,
|
||||
validator_identity,
|
||||
leader_schedule,
|
||||
data_cache.clone(),
|
||||
)
|
||||
.await?;
|
||||
let transaction_service_builder = TransactionServiceBuilder::new(
|
||||
TxSender::new(data_cache.clone(), tpu_service.clone()),
|
||||
TransactionReplayer::new(
|
||||
tpu_service.clone(),
|
||||
data_cache.clone(),
|
||||
Duration::from_secs(1),
|
||||
),
|
||||
tpu_service,
|
||||
10000,
|
||||
);
|
||||
let (transaction_service, _) = transaction_service_builder.start(
|
||||
None,
|
||||
data_cache.block_information_store.clone(),
|
||||
10,
|
||||
endpoints.slot_notifier,
|
||||
);
|
||||
|
||||
// CREATE TRANSACTIONS
|
||||
log::info!("Creating memo transactions");
|
||||
|
||||
let memo_msgs = generate_random_strings(count * args.number_of_seconds, None, 5);
|
||||
|
||||
let mut tx_to_confirm = vec![];
|
||||
let mut second = 1;
|
||||
let mut signer_count = 0;
|
||||
let map_of_signature = Arc::new(DashSet::<Signature>::new());
|
||||
let transactions_in_blocks = Arc::new(Mutex::new(Vec::<u64>::new()));
|
||||
let mut block_stream = endpoints.blocks_notifier;
|
||||
|
||||
let block_tps_task = {
|
||||
let map_of_signature = map_of_signature.clone();
|
||||
let transactions_in_blocks = transactions_in_blocks.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut start_tracking = false;
|
||||
while let Ok(block) = block_stream.recv().await {
|
||||
let mut count = 0;
|
||||
for transaction in &block.transactions {
|
||||
if map_of_signature.contains(&transaction.signature) {
|
||||
count += 1;
|
||||
map_of_signature.remove(&transaction.signature);
|
||||
}
|
||||
}
|
||||
|
||||
// start tracking once we have first block with some transactions sent by us
|
||||
if start_tracking || count > 0 {
|
||||
start_tracking = true;
|
||||
let mut lk = transactions_in_blocks.lock().await;
|
||||
lk.push(count);
|
||||
}
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
for chunk in memo_msgs.chunks(count) {
|
||||
let instant = tokio::time::Instant::now();
|
||||
let mut current_txs = vec![];
|
||||
println!("Sending memo transactions :{}", second);
|
||||
second += 1;
|
||||
let bh = data_cache
|
||||
.block_information_store
|
||||
.get_latest_blockhash(CommitmentConfig::finalized())
|
||||
.await;
|
||||
let last_valid_block_height =
|
||||
data_cache.block_information_store.get_last_blockheight() + 300;
|
||||
let transactions = chunk
|
||||
.iter()
|
||||
.map(|x| {
|
||||
signer_count += 1;
|
||||
create_memo_tx(x, &signers[signer_count % nb_signers], bh, priority_fee)
|
||||
})
|
||||
.collect_vec();
|
||||
|
||||
for transaction in transactions {
|
||||
let signature = transaction.signatures[0];
|
||||
let raw_tx = bincode::serialize(&transaction).unwrap();
|
||||
let slot = data_cache.slot_cache.get_current_slot();
|
||||
map_of_signature.insert(signature);
|
||||
|
||||
let transaction_info = SentTransactionInfo {
|
||||
signature,
|
||||
last_valid_block_height,
|
||||
slot,
|
||||
transaction: Arc::new(raw_tx),
|
||||
prioritization_fee: priority_fee,
|
||||
};
|
||||
let _ = transaction_service
|
||||
.transaction_channel
|
||||
.send(transaction_info.clone())
|
||||
.await;
|
||||
current_txs.push(signature);
|
||||
}
|
||||
|
||||
tx_to_confirm.push(current_txs);
|
||||
|
||||
let millis = instant.elapsed().as_millis() as u64;
|
||||
if millis < 1000 {
|
||||
tokio::time::sleep(Duration::from_millis(1000 - millis)).await;
|
||||
} else {
|
||||
println!("took {millis:?} millis to send {count:?} transactions");
|
||||
}
|
||||
}
|
||||
|
||||
println!(
|
||||
"{} memo transactions sent, waiting for a minute to confirm them",
|
||||
count * args.number_of_seconds
|
||||
);
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(120)).await;
|
||||
|
||||
let mut second = 1;
|
||||
for seconds_sigs in tx_to_confirm {
|
||||
let mut tx_confirmed = 0;
|
||||
for sig in seconds_sigs {
|
||||
if data_cache.txs.is_transaction_confirmed(&sig) {
|
||||
tx_confirmed += 1;
|
||||
}
|
||||
}
|
||||
|
||||
println!(
|
||||
"{} or {} transactions were confirmed for the {} second",
|
||||
tx_confirmed, count, second
|
||||
);
|
||||
second += 1;
|
||||
}
|
||||
|
||||
block_tps_task.abort();
|
||||
|
||||
let lk = transactions_in_blocks.lock().await;
|
||||
// stop tracking by removing trailling 0s
|
||||
let mut vec = lk.clone();
|
||||
vec.reverse();
|
||||
let mut transaction_blocks = vec.iter().skip_while(|x| **x == 0).cloned().collect_vec();
|
||||
transaction_blocks.reverse();
|
||||
println!(
|
||||
"BLOCKS transactions : {}",
|
||||
transaction_blocks.iter().map(|x| x.to_string()).join(", ")
|
||||
);
|
||||
let sum = transaction_blocks.iter().sum::<u64>();
|
||||
let seconds = (transaction_blocks.len() * 400 / 1000) as u64;
|
||||
let tps = sum / seconds;
|
||||
println!("EFFECTIVE TPS: {tps:?}");
|
||||
|
||||
println!("Transfering remaining lamports to payer");
|
||||
transfer_back_to_payer(rpc_client, fee_payer, signers, priority_fee).await;
|
||||
Ok(())
|
||||
fn main() {
|
||||
todo!()
|
||||
}
|
||||
|
|
|
@ -377,7 +377,7 @@ impl LiteRpcServer for LiteBridge {
|
|||
|
||||
match self
|
||||
.transaction_service
|
||||
.send_wire_transaction(raw_tx, max_retries)
|
||||
.send_transaction(raw_tx, max_retries)
|
||||
.await
|
||||
{
|
||||
Ok(sig) => {
|
||||
|
|
|
@ -49,7 +49,7 @@ pub struct Config {
|
|||
#[serde(default)]
|
||||
pub use_grpc: bool,
|
||||
#[serde(default)]
|
||||
pub calculate_leader_schedule_from_geyser: bool,
|
||||
pub calculate_leader_schedule_form_geyser: bool,
|
||||
#[serde(default = "Config::default_grpc_addr")]
|
||||
pub grpc_addr: String,
|
||||
#[serde(default)]
|
||||
|
@ -142,8 +142,11 @@ impl Config {
|
|||
.map(|size| size.parse().unwrap())
|
||||
.unwrap_or(config.fanout_size);
|
||||
|
||||
// note: identity config is handled in load_identity_keypair
|
||||
// the behavior is different from the other config values as it does either take a file path or the keypair as json array
|
||||
// IDENTITY env sets value of identity_keypair
|
||||
|
||||
// config.identity_keypair = env::var("IDENTITY")
|
||||
// .map(Some)
|
||||
// .unwrap_or(config.identity_keypair);
|
||||
|
||||
config.prometheus_addr = env::var("PROMETHEUS_ADDR").unwrap_or(config.prometheus_addr);
|
||||
|
||||
|
@ -158,7 +161,7 @@ impl Config {
|
|||
config.quic_proxy_addr = env::var("QUIC_PROXY_ADDR").ok();
|
||||
|
||||
config.use_grpc = env::var("USE_GRPC")
|
||||
.map(|value| value.parse::<bool>().unwrap())
|
||||
.map(|_| true)
|
||||
.unwrap_or(config.use_grpc);
|
||||
|
||||
// source 1
|
||||
|
|
|
@ -11,7 +11,7 @@ use lite_rpc::postgres_logger::PostgresLogger;
|
|||
use lite_rpc::service_spawner::ServiceSpawner;
|
||||
use lite_rpc::start_server::start_servers;
|
||||
use lite_rpc::DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE;
|
||||
use log::info;
|
||||
use log::{debug, info};
|
||||
use solana_lite_rpc_accounts::account_service::AccountService;
|
||||
use solana_lite_rpc_accounts::account_store_interface::AccountStorageInterface;
|
||||
use solana_lite_rpc_accounts::inmemory_account_store::InmemoryAccountStore;
|
||||
|
@ -44,8 +44,7 @@ use solana_lite_rpc_core::structures::{
|
|||
epoch::EpochCache, identity_stakes::IdentityStakes, notifications::NotificationSender,
|
||||
};
|
||||
use solana_lite_rpc_core::traits::address_lookup_table_interface::AddressLookupTableInterface;
|
||||
use solana_lite_rpc_core::types::BlockStream;
|
||||
use solana_lite_rpc_core::utils::wait_till_block_of_commitment_is_recieved;
|
||||
use solana_lite_rpc_core::types::{BlockInfoStream, BlockStream};
|
||||
use solana_lite_rpc_core::AnyhowJoinHandle;
|
||||
use solana_lite_rpc_prioritization_fees::account_prio_service::AccountPrioService;
|
||||
use solana_lite_rpc_services::data_caching_service::DataCachingService;
|
||||
|
@ -55,6 +54,7 @@ use solana_lite_rpc_services::transaction_replayer::TransactionReplayer;
|
|||
use solana_lite_rpc_services::tx_sender::TxSender;
|
||||
|
||||
use lite_rpc::postgres_logger;
|
||||
use solana_lite_rpc_core::structures::block_info::BlockInfo;
|
||||
use solana_lite_rpc_prioritization_fees::start_block_priofees_task;
|
||||
use solana_lite_rpc_util::obfuscate_rpcurl;
|
||||
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
|
||||
|
@ -67,6 +67,7 @@ use std::time::Duration;
|
|||
use tokio::io::AsyncReadExt;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::time::{timeout, Instant};
|
||||
use tracing_subscriber::fmt::format::FmtSpan;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
|
||||
|
@ -75,6 +76,38 @@ use tracing_subscriber::EnvFilter;
|
|||
#[global_allocator]
|
||||
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
|
||||
|
||||
// export _RJEM_MALLOC_CONF=prof:true,lg_prof_interval:30,lg_prof_sample:21,prof_prefix:/tmp/jeprof
|
||||
|
||||
use jemalloc_ctl::{epoch, stats};
|
||||
use std::{thread, time::{self}};
|
||||
|
||||
|
||||
async fn get_latest_block_info(
|
||||
mut blockinfo_stream: BlockInfoStream,
|
||||
commitment_config: CommitmentConfig,
|
||||
) -> BlockInfo {
|
||||
let started = Instant::now();
|
||||
loop {
|
||||
match timeout(Duration::from_millis(500), blockinfo_stream.recv()).await {
|
||||
Ok(Ok(block_info)) => {
|
||||
if block_info.commitment_config == commitment_config {
|
||||
return block_info;
|
||||
}
|
||||
}
|
||||
Err(_elapsed) => {
|
||||
debug!(
|
||||
"waiting for latest block info ({}) ... {:.02}ms",
|
||||
commitment_config.commitment,
|
||||
started.elapsed().as_secs_f32() * 1000.0
|
||||
);
|
||||
}
|
||||
Ok(Err(_error)) => {
|
||||
panic!("Did not recv block info");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start_postgres(
|
||||
config: Option<postgres_logger::PostgresSessionConfig>,
|
||||
) -> anyhow::Result<(Option<NotificationSender>, AnyhowJoinHandle)> {
|
||||
|
@ -228,7 +261,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
|
|||
};
|
||||
|
||||
info!("Waiting for first finalized block info...");
|
||||
let finalized_block_info = wait_till_block_of_commitment_is_recieved(
|
||||
let finalized_block_info = get_latest_block_info(
|
||||
blockinfo_notifier.resubscribe(),
|
||||
CommitmentConfig::finalized(),
|
||||
)
|
||||
|
@ -312,6 +345,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
|
|||
};
|
||||
|
||||
let spawner = ServiceSpawner {
|
||||
prometheus_addr,
|
||||
data_cache: data_cache.clone(),
|
||||
};
|
||||
//init grpc leader schedule and vote account is configured.
|
||||
|
@ -336,8 +370,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
|
|||
slot_notifier.resubscribe(),
|
||||
);
|
||||
|
||||
let support_service =
|
||||
tokio::spawn(async move { spawner.spawn_support_services(prometheus_addr).await });
|
||||
let support_service = tokio::spawn(async move { spawner.spawn_support_services().await });
|
||||
|
||||
let history = History::new();
|
||||
|
||||
|
|
|
@ -17,14 +17,15 @@ use solana_lite_rpc_services::{
|
|||
use std::time::Duration;
|
||||
|
||||
pub struct ServiceSpawner {
|
||||
pub prometheus_addr: String,
|
||||
pub data_cache: DataCache,
|
||||
}
|
||||
|
||||
impl ServiceSpawner {
|
||||
/// spawn services that support the whole system
|
||||
pub async fn spawn_support_services(&self, prometheus_addr: String) -> anyhow::Result<()> {
|
||||
pub async fn spawn_support_services(&self) -> anyhow::Result<()> {
|
||||
// spawn prometheus
|
||||
let prometheus = PrometheusSync::sync(prometheus_addr.clone());
|
||||
let prometheus = PrometheusSync::sync(self.prometheus_addr.clone());
|
||||
|
||||
// spawn metrics capture
|
||||
let metrics = MetricsCapture::new(self.data_cache.txs.clone()).capture();
|
||||
|
|
|
@ -61,7 +61,6 @@ const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameter
|
|||
write_timeout: Duration::from_secs(2),
|
||||
number_of_transactions_per_unistream: 10,
|
||||
unistreams_to_create_new_connection_in_percentage: 10,
|
||||
prioritization_heap_size: None,
|
||||
};
|
||||
|
||||
#[test]
|
||||
|
@ -744,7 +743,7 @@ pub fn build_raw_sample_tx(i: u32) -> SentTransactionInfo {
|
|||
let tx = build_sample_tx(&payer_keypair, i);
|
||||
|
||||
let transaction =
|
||||
Arc::new(bincode::serialize::<VersionedTransaction>(&tx).expect("failed to serialize tx"));
|
||||
bincode::serialize::<VersionedTransaction>(&tx).expect("failed to serialize tx");
|
||||
|
||||
SentTransactionInfo {
|
||||
signature: *tx.get_signature(),
|
||||
|
|
|
@ -35,10 +35,8 @@ pub async fn main() -> anyhow::Result<()> {
|
|||
dotenv().ok();
|
||||
|
||||
let proxy_listener_addr = proxy_listen_addr.parse().unwrap();
|
||||
|
||||
let validator_identity = ValidatorIdentity::new(
|
||||
load_identity_keypair(Some(identity_keypair).filter(|s| !s.is_empty())).await?,
|
||||
);
|
||||
let validator_identity =
|
||||
ValidatorIdentity::new(load_identity_keypair(Some(identity_keypair)).await?);
|
||||
|
||||
let tls_config = Arc::new(SelfSignedTlsConfigProvider::new_singleton_self_signed_localhost());
|
||||
let main_services = QuicForwardProxy::new(proxy_listener_addr, tls_config, validator_identity)
|
||||
|
|
|
@ -4,7 +4,7 @@ use crate::quic_connection_utils::{
|
|||
use futures::FutureExt;
|
||||
use log::warn;
|
||||
use prometheus::{core::GenericGauge, opts, register_int_gauge};
|
||||
use quinn::{Connection, Endpoint, VarInt};
|
||||
use quinn::{Connection, Endpoint};
|
||||
use solana_lite_rpc_core::structures::rotating_queue::RotatingQueue;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use std::{
|
||||
|
@ -14,7 +14,7 @@ use std::{
|
|||
Arc,
|
||||
},
|
||||
};
|
||||
use tokio::sync::{broadcast, OwnedSemaphorePermit, RwLock, Semaphore};
|
||||
use tokio::sync::{Notify, OwnedSemaphorePermit, RwLock, Semaphore};
|
||||
|
||||
pub type EndpointPool = RotatingQueue<Endpoint>;
|
||||
|
||||
|
@ -40,6 +40,7 @@ pub struct QuicConnection {
|
|||
identity: Pubkey,
|
||||
socket_address: SocketAddr,
|
||||
connection_params: QuicConnectionParameters,
|
||||
exit_notify: Arc<Notify>,
|
||||
timeout_counters: Arc<AtomicU64>,
|
||||
has_connected_once: Arc<AtomicBool>,
|
||||
}
|
||||
|
@ -50,6 +51,7 @@ impl QuicConnection {
|
|||
endpoint: Endpoint,
|
||||
socket_address: SocketAddr,
|
||||
connection_params: QuicConnectionParameters,
|
||||
exit_notify: Arc<Notify>,
|
||||
) -> Self {
|
||||
Self {
|
||||
connection: Arc::new(RwLock::new(None)),
|
||||
|
@ -58,16 +60,13 @@ impl QuicConnection {
|
|||
identity,
|
||||
socket_address,
|
||||
connection_params,
|
||||
exit_notify,
|
||||
timeout_counters: Arc::new(AtomicU64::new(0)),
|
||||
has_connected_once: Arc::new(AtomicBool::new(false)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn connect(
|
||||
&self,
|
||||
is_already_connected: bool,
|
||||
exit_notify: broadcast::Receiver<()>,
|
||||
) -> Option<Connection> {
|
||||
async fn connect(&self, is_already_connected: bool) -> Option<Connection> {
|
||||
QuicConnectionUtils::connect(
|
||||
self.identity,
|
||||
is_already_connected,
|
||||
|
@ -75,12 +74,12 @@ impl QuicConnection {
|
|||
self.socket_address,
|
||||
self.connection_params.connection_timeout,
|
||||
self.connection_params.connection_retry_count,
|
||||
exit_notify,
|
||||
self.exit_notify.clone(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_connection(&self, exit_notify: broadcast::Receiver<()>) -> Option<Connection> {
|
||||
pub async fn get_connection(&self) -> Option<Connection> {
|
||||
// get new connection reset if necessary
|
||||
let last_stable_id = self.last_stable_id.load(Ordering::Relaxed) as usize;
|
||||
let conn = self.connection.read().await.clone();
|
||||
|
@ -96,7 +95,7 @@ impl QuicConnection {
|
|||
Some(connection)
|
||||
} else {
|
||||
NB_QUIC_CONNECTION_RESET.inc();
|
||||
let new_conn = self.connect(true, exit_notify).await;
|
||||
let new_conn = self.connect(true).await;
|
||||
if let Some(new_conn) = new_conn {
|
||||
*conn = Some(new_conn);
|
||||
conn.clone()
|
||||
|
@ -117,7 +116,7 @@ impl QuicConnection {
|
|||
// connection has recently been established/ just use it
|
||||
return (*lk).clone();
|
||||
}
|
||||
let connection = self.connect(false, exit_notify).await;
|
||||
let connection = self.connect(false).await;
|
||||
*lk = connection.clone();
|
||||
self.has_connected_once.store(true, Ordering::Relaxed);
|
||||
connection
|
||||
|
@ -125,16 +124,17 @@ impl QuicConnection {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn send_transaction(&self, tx: &Vec<u8>, mut exit_notify: broadcast::Receiver<()>) {
|
||||
pub async fn send_transaction(&self, tx: Vec<u8>) {
|
||||
let connection_retry_count = self.connection_params.connection_retry_count;
|
||||
for _ in 0..connection_retry_count {
|
||||
let mut do_retry = false;
|
||||
let exit_notify = self.exit_notify.clone();
|
||||
|
||||
let connection = tokio::select! {
|
||||
conn = self.get_connection(exit_notify.resubscribe()) => {
|
||||
conn = self.get_connection() => {
|
||||
conn
|
||||
},
|
||||
_ = exit_notify.recv() => {
|
||||
_ = exit_notify.notified() => {
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
@ -149,7 +149,7 @@ impl QuicConnection {
|
|||
) => {
|
||||
res
|
||||
},
|
||||
_ = exit_notify.recv() => {
|
||||
_ = exit_notify.notified() => {
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
@ -158,13 +158,13 @@ impl QuicConnection {
|
|||
let write_add_result = tokio::select! {
|
||||
res = QuicConnectionUtils::write_all(
|
||||
send_stream,
|
||||
tx,
|
||||
&tx,
|
||||
self.identity,
|
||||
self.connection_params,
|
||||
) => {
|
||||
res
|
||||
},
|
||||
_ = exit_notify.recv() => {
|
||||
_ = exit_notify.notified() => {
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
@ -225,13 +225,6 @@ impl QuicConnection {
|
|||
None => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn close(&self) {
|
||||
let lk = self.connection.read().await;
|
||||
if let Some(connection) = lk.as_ref() {
|
||||
connection.close(VarInt::from_u32(0), b"Not needed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
@ -254,6 +247,7 @@ impl QuicConnectionPool {
|
|||
endpoints: EndpointPool,
|
||||
socket_address: SocketAddr,
|
||||
connection_parameters: QuicConnectionParameters,
|
||||
exit_notify: Arc<Notify>,
|
||||
nb_connection: usize,
|
||||
max_number_of_unistream_connection: usize,
|
||||
) -> Self {
|
||||
|
@ -265,6 +259,7 @@ impl QuicConnectionPool {
|
|||
endpoints.get().expect("Should get and endpoint"),
|
||||
socket_address,
|
||||
connection_parameters,
|
||||
exit_notify.clone(),
|
||||
));
|
||||
}
|
||||
Self {
|
||||
|
@ -326,10 +321,4 @@ impl QuicConnectionPool {
|
|||
pub fn is_empty(&self) -> bool {
|
||||
self.connections.is_empty()
|
||||
}
|
||||
|
||||
pub async fn close_all(&self) {
|
||||
for connection in &self.connections {
|
||||
connection.close().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@ use std::{
|
|||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::{sync::broadcast, time::timeout};
|
||||
use tokio::{sync::Notify, time::timeout};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
static ref NB_QUIC_0RTT_ATTEMPTED: GenericGauge<prometheus::core::AtomicI64> =
|
||||
|
@ -45,31 +45,6 @@ lazy_static::lazy_static! {
|
|||
static ref NB_QUIC_FINISH_ERRORED: GenericGauge<prometheus::core::AtomicI64> =
|
||||
register_int_gauge!(opts!("literpc_quic_finish_errored", "Number of times finish errored")).unwrap();
|
||||
|
||||
|
||||
static ref NB_QUIC_CONNECTION_ERROR_VERSION_MISMATCH: GenericGauge<prometheus::core::AtomicI64> =
|
||||
register_int_gauge!(opts!("literpc_quic_connection_error_version_mismatch", "Number of times connection errored VersionMismatch")).unwrap();
|
||||
static ref NB_QUIC_CONNECTION_ERROR_TRANSPORT_ERROR: GenericGauge<prometheus::core::AtomicI64> =
|
||||
register_int_gauge!(opts!("literpc_quic_connection_error_transport_error", "Number of times connection errored TransportError")).unwrap();
|
||||
static ref NB_QUIC_CONNECTION_ERROR_CONNECTION_CLOSED: GenericGauge<prometheus::core::AtomicI64> =
|
||||
register_int_gauge!(opts!("literpc_quic_connection_error_connection_closed", "Number of times connection errored ConnectionClosed")).unwrap();
|
||||
static ref NB_QUIC_CONNECTION_ERROR_APPLICATION_CLOSED: GenericGauge<prometheus::core::AtomicI64> =
|
||||
register_int_gauge!(opts!("literpc_quic_connection_error_application_closed", "Number of times connection errored ApplicationClosed")).unwrap();
|
||||
static ref NB_QUIC_CONNECTION_ERROR_RESET: GenericGauge<prometheus::core::AtomicI64> =
|
||||
register_int_gauge!(opts!("literpc_quic_connection_error_reset", "Number of times connection errored Reset")).unwrap();
|
||||
static ref NB_QUIC_CONNECTION_ERROR_TIMEDOUT: GenericGauge<prometheus::core::AtomicI64> =
|
||||
register_int_gauge!(opts!("literpc_quic_connection_error_timed_out", "Number of times connection errored TimedOut")).unwrap();
|
||||
static ref NB_QUIC_CONNECTION_ERROR_LOCALLY_CLOSED: GenericGauge<prometheus::core::AtomicI64> =
|
||||
register_int_gauge!(opts!("literpc_quic_connection_error_locally_closed", "Number of times connection errored locally closed")).unwrap();
|
||||
|
||||
static ref NB_QUIC_WRITE_ERROR_STOPPED: GenericGauge<prometheus::core::AtomicI64> =
|
||||
register_int_gauge!(opts!("literpc_quic_write_error_stopped", "Number of times write_error Stopped")).unwrap();
|
||||
static ref NB_QUIC_WRITE_ERROR_CONNECTION_LOST: GenericGauge<prometheus::core::AtomicI64> =
|
||||
register_int_gauge!(opts!("literpc_quic_write_error_connection_lost", "Number of times write_error ConnectionLost")).unwrap();
|
||||
static ref NB_QUIC_WRITE_ERROR_UNKNOWN_STREAM: GenericGauge<prometheus::core::AtomicI64> =
|
||||
register_int_gauge!(opts!("literpc_quic_write_error_unknown_stream", "Number of times write_error UnknownStream")).unwrap();
|
||||
static ref NB_QUIC_WRITE_ERROR_0RTT_REJECT: GenericGauge<prometheus::core::AtomicI64> =
|
||||
register_int_gauge!(opts!("literpc_quic_write_error_0RTT_reject", "Number of times write_error ZeroRttRejected")).unwrap();
|
||||
|
||||
static ref NB_QUIC_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
|
||||
register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap();
|
||||
|
||||
|
@ -108,21 +83,19 @@ pub struct QuicConnectionParameters {
|
|||
pub max_number_of_connections: usize,
|
||||
pub number_of_transactions_per_unistream: usize,
|
||||
pub unistreams_to_create_new_connection_in_percentage: u8,
|
||||
pub prioritization_heap_size: Option<usize>,
|
||||
}
|
||||
|
||||
impl Default for QuicConnectionParameters {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
connection_timeout: Duration::from_millis(60000),
|
||||
connection_timeout: Duration::from_millis(10000),
|
||||
unistream_timeout: Duration::from_millis(10000),
|
||||
write_timeout: Duration::from_millis(10000),
|
||||
finalize_timeout: Duration::from_millis(20000),
|
||||
finalize_timeout: Duration::from_millis(10000),
|
||||
connection_retry_count: 20,
|
||||
max_number_of_connections: 8,
|
||||
number_of_transactions_per_unistream: 1,
|
||||
unistreams_to_create_new_connection_in_percentage: 10,
|
||||
prioritization_heap_size: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -194,25 +167,6 @@ impl QuicConnectionUtils {
|
|||
}
|
||||
Err(e) => {
|
||||
NB_QUIC_CONNECTION_ERRORED.inc();
|
||||
match &e {
|
||||
ConnectionError::VersionMismatch => {
|
||||
NB_QUIC_CONNECTION_ERROR_VERSION_MISMATCH.inc()
|
||||
}
|
||||
ConnectionError::TransportError(_) => {
|
||||
NB_QUIC_CONNECTION_ERROR_TRANSPORT_ERROR.inc()
|
||||
}
|
||||
ConnectionError::ConnectionClosed(_) => {
|
||||
NB_QUIC_CONNECTION_ERROR_CONNECTION_CLOSED.inc()
|
||||
}
|
||||
ConnectionError::ApplicationClosed(_) => {
|
||||
NB_QUIC_CONNECTION_ERROR_APPLICATION_CLOSED.inc()
|
||||
}
|
||||
ConnectionError::Reset => NB_QUIC_CONNECTION_ERROR_RESET.inc(),
|
||||
ConnectionError::TimedOut => NB_QUIC_CONNECTION_ERROR_TIMEDOUT.inc(),
|
||||
ConnectionError::LocallyClosed => {
|
||||
NB_QUIC_CONNECTION_ERROR_LOCALLY_CLOSED.inc()
|
||||
}
|
||||
}
|
||||
Err(e.into())
|
||||
}
|
||||
},
|
||||
|
@ -265,7 +219,7 @@ impl QuicConnectionUtils {
|
|||
addr: SocketAddr,
|
||||
connection_timeout: Duration,
|
||||
connection_retry_count: usize,
|
||||
mut exit_notified: broadcast::Receiver<()>,
|
||||
exit_notified: Arc<Notify>,
|
||||
) -> Option<Connection> {
|
||||
for _ in 0..connection_retry_count {
|
||||
let conn = if already_connected {
|
||||
|
@ -274,7 +228,7 @@ impl QuicConnectionUtils {
|
|||
res = Self::make_connection_0rtt(endpoint.clone(), addr, connection_timeout) => {
|
||||
res
|
||||
},
|
||||
_ = exit_notified.recv() => {
|
||||
_ = exit_notified.notified() => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -284,7 +238,7 @@ impl QuicConnectionUtils {
|
|||
res = Self::make_connection(endpoint.clone(), addr, connection_timeout) => {
|
||||
res
|
||||
},
|
||||
_ = exit_notified.recv() => {
|
||||
_ = exit_notified.notified() => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -317,17 +271,6 @@ impl QuicConnectionUtils {
|
|||
match write_timeout_res {
|
||||
Ok(write_res) => {
|
||||
if let Err(e) = write_res {
|
||||
match &e {
|
||||
quinn::WriteError::Stopped(_) => NB_QUIC_WRITE_ERROR_STOPPED.inc(),
|
||||
quinn::WriteError::ConnectionLost(_) => {
|
||||
NB_QUIC_WRITE_ERROR_CONNECTION_LOST.inc()
|
||||
}
|
||||
quinn::WriteError::UnknownStream => {
|
||||
NB_QUIC_WRITE_ERROR_UNKNOWN_STREAM.inc()
|
||||
}
|
||||
quinn::WriteError::ZeroRttRejected => NB_QUIC_WRITE_ERROR_0RTT_REJECT.inc(),
|
||||
};
|
||||
|
||||
trace!(
|
||||
"Error while writing transaction for {}, error {}",
|
||||
identity,
|
||||
|
@ -352,16 +295,6 @@ impl QuicConnectionUtils {
|
|||
match finish_timeout_res {
|
||||
Ok(finish_res) => {
|
||||
if let Err(e) = finish_res {
|
||||
match &e {
|
||||
quinn::WriteError::Stopped(_) => NB_QUIC_WRITE_ERROR_STOPPED.inc(),
|
||||
quinn::WriteError::ConnectionLost(_) => {
|
||||
NB_QUIC_WRITE_ERROR_CONNECTION_LOST.inc()
|
||||
}
|
||||
quinn::WriteError::UnknownStream => {
|
||||
NB_QUIC_WRITE_ERROR_UNKNOWN_STREAM.inc()
|
||||
}
|
||||
quinn::WriteError::ZeroRttRejected => NB_QUIC_WRITE_ERROR_0RTT_REJECT.inc(),
|
||||
};
|
||||
trace!(
|
||||
"Error while finishing transaction for {}, error {}",
|
||||
identity,
|
||||
|
|
|
@ -179,7 +179,6 @@ impl QuicProxyConnectionManager {
|
|||
transaction,
|
||||
..
|
||||
}) => {
|
||||
let transaction = transaction.as_ref().clone();
|
||||
TxData::new(signature, transaction)
|
||||
},
|
||||
Err(e) => {
|
||||
|
@ -196,7 +195,6 @@ impl QuicProxyConnectionManager {
|
|||
transaction,
|
||||
..
|
||||
}) => {
|
||||
let transaction = transaction.as_ref().clone();
|
||||
txs.push(TxData::new(signature, transaction));
|
||||
},
|
||||
Err(TryRecvError::Empty) => {
|
||||
|
|
|
@ -15,7 +15,7 @@ use solana_sdk::pubkey::Pubkey;
|
|||
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
|
||||
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
|
||||
use tokio::sync::{
|
||||
broadcast::{self, Receiver, Sender},
|
||||
broadcast::{Receiver, Sender},
|
||||
Notify,
|
||||
};
|
||||
|
||||
|
@ -48,7 +48,7 @@ struct ActiveConnection {
|
|||
tpu_address: SocketAddr,
|
||||
data_cache: DataCache,
|
||||
connection_parameters: QuicConnectionParameters,
|
||||
exit_notifier: broadcast::Sender<()>,
|
||||
exit_notifier: Arc<Notify>,
|
||||
}
|
||||
|
||||
impl ActiveConnection {
|
||||
|
@ -59,14 +59,13 @@ impl ActiveConnection {
|
|||
data_cache: DataCache,
|
||||
connection_parameters: QuicConnectionParameters,
|
||||
) -> Self {
|
||||
let (exit_notifier, _) = broadcast::channel(1);
|
||||
Self {
|
||||
endpoints,
|
||||
tpu_address,
|
||||
identity,
|
||||
data_cache,
|
||||
connection_parameters,
|
||||
exit_notifier,
|
||||
exit_notifier: Arc::new(Notify::new()),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -80,6 +79,7 @@ impl ActiveConnection {
|
|||
let fill_notify = Arc::new(Notify::new());
|
||||
|
||||
let identity = self.identity;
|
||||
let exit_notifier = self.exit_notifier.clone();
|
||||
|
||||
NB_QUIC_ACTIVE_CONNECTIONS.inc();
|
||||
|
||||
|
@ -95,20 +95,18 @@ impl ActiveConnection {
|
|||
self.endpoints.clone(),
|
||||
addr,
|
||||
self.connection_parameters,
|
||||
exit_notifier.clone(),
|
||||
max_number_of_connections,
|
||||
max_uni_stream_connections,
|
||||
);
|
||||
let prioritization_heap_size = self
|
||||
.connection_parameters
|
||||
.prioritization_heap_size
|
||||
.unwrap_or(2 * max_uni_stream_connections);
|
||||
let priorization_heap = PrioritizationFeesHeap::new(prioritization_heap_size);
|
||||
|
||||
let priorization_heap = PrioritizationFeesHeap::new(2 * max_uni_stream_connections);
|
||||
|
||||
let heap_filler_task = {
|
||||
let priorization_heap = priorization_heap.clone();
|
||||
let data_cache = self.data_cache.clone();
|
||||
let fill_notify = fill_notify.clone();
|
||||
let mut exit_notifier = self.exit_notifier.subscribe();
|
||||
let exit_notifier = exit_notifier.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut current_blockheight =
|
||||
data_cache.block_information_store.get_last_blockheight();
|
||||
|
@ -117,7 +115,7 @@ impl ActiveConnection {
|
|||
tx = transaction_reciever.recv() => {
|
||||
tx
|
||||
},
|
||||
_ = exit_notifier.recv() => {
|
||||
_ = exit_notifier.notified() => {
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
@ -164,14 +162,12 @@ impl ActiveConnection {
|
|||
if let Ok(PooledConnection { connection, permit }) =
|
||||
connection_pool.get_pooled_connection().await
|
||||
{
|
||||
let exit_notifier = self.exit_notifier.subscribe();
|
||||
tokio::task::spawn(async move {
|
||||
let _permit = permit;
|
||||
connection.get_connection(exit_notifier).await;
|
||||
connection.get_connection().await;
|
||||
});
|
||||
};
|
||||
|
||||
let mut exit_notifier = self.exit_notifier.subscribe();
|
||||
'main_loop: loop {
|
||||
tokio::select! {
|
||||
_ = fill_notify.notified() => {
|
||||
|
@ -198,7 +194,6 @@ impl ActiveConnection {
|
|||
break;
|
||||
},
|
||||
};
|
||||
let exit_notifier = self.exit_notifier.subscribe();
|
||||
|
||||
tokio::spawn(async move {
|
||||
// permit will be used to send all the transaction and then destroyed
|
||||
|
@ -207,13 +202,13 @@ impl ActiveConnection {
|
|||
|
||||
NB_QUIC_TASKS.inc();
|
||||
|
||||
connection.send_transaction(tx.transaction.as_ref(), exit_notifier).await;
|
||||
connection.send_transaction(tx.transaction).await;
|
||||
timer.observe_duration();
|
||||
NB_QUIC_TASKS.dec();
|
||||
});
|
||||
}
|
||||
},
|
||||
_ = exit_notifier.recv() => {
|
||||
_ = exit_notifier.notified() => {
|
||||
break 'main_loop;
|
||||
}
|
||||
}
|
||||
|
@ -223,7 +218,6 @@ impl ActiveConnection {
|
|||
let elements_removed = priorization_heap.clear().await;
|
||||
TRANSACTIONS_IN_HEAP.sub(elements_removed as i64);
|
||||
NB_QUIC_ACTIVE_CONNECTIONS.dec();
|
||||
connection_pool.close_all().await;
|
||||
}
|
||||
|
||||
pub fn start_listening(
|
||||
|
@ -249,9 +243,9 @@ impl TpuConnectionManager {
|
|||
pub async fn new(
|
||||
certificate: rustls::Certificate,
|
||||
key: rustls::PrivateKey,
|
||||
_fanout: usize,
|
||||
fanout: usize,
|
||||
) -> Self {
|
||||
let number_of_clients = 1; // fanout * 4;
|
||||
let number_of_clients = fanout * 4;
|
||||
Self {
|
||||
endpoints: RotatingQueue::new(number_of_clients, || {
|
||||
QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone())
|
||||
|
@ -292,7 +286,7 @@ impl TpuConnectionManager {
|
|||
if !connections_to_keep.contains_key(key) {
|
||||
trace!("removing a connection for {}", key.to_string());
|
||||
// ignore error for exit channel
|
||||
let _ = value.exit_notifier.send(());
|
||||
value.exit_notifier.notify_waiters();
|
||||
false
|
||||
} else {
|
||||
true
|
||||
|
|
|
@ -17,7 +17,6 @@ use solana_lite_rpc_core::types::SlotStream;
|
|||
use solana_lite_rpc_core::AnyhowJoinHandle;
|
||||
use solana_sdk::{quic::QUIC_PORT_OFFSET, signature::Keypair, slot_history::Slot};
|
||||
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
|
||||
use std::collections::HashMap;
|
||||
use std::{
|
||||
net::{IpAddr, Ipv4Addr},
|
||||
sync::Arc,
|
||||
|
@ -123,7 +122,6 @@ impl TpuService {
|
|||
) -> anyhow::Result<()> {
|
||||
let fanout = self.config.fanout_slots;
|
||||
let last_slot = estimated_slot + fanout;
|
||||
let current_slot = current_slot.saturating_sub(4);
|
||||
|
||||
let cluster_nodes = self.data_cache.cluster_info.cluster_nodes.clone();
|
||||
|
||||
|
@ -132,7 +130,7 @@ impl TpuService {
|
|||
.get_slot_leaders(current_slot, last_slot)
|
||||
.await?;
|
||||
// get next leader with its tpu port
|
||||
let connections_to_keep: HashMap<_, _> = next_leaders
|
||||
let connections_to_keep = next_leaders
|
||||
.iter()
|
||||
.map(|x| {
|
||||
let contact_info = cluster_nodes.get(&x.pubkey);
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
// This class will manage the lifecycle for a transaction
|
||||
// It will send, replay if necessary and confirm by listening to blocks
|
||||
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use log::trace;
|
||||
use std::{num::IntErrorKind, time::Duration};
|
||||
|
||||
use crate::{
|
||||
tpu_utils::tpu_service::TpuService,
|
||||
|
@ -22,7 +23,7 @@ use solana_lite_rpc_core::{
|
|||
use solana_sdk::{
|
||||
borsh0_10::try_from_slice_unchecked,
|
||||
compute_budget::{self, ComputeBudgetInstruction},
|
||||
transaction::{Transaction, VersionedTransaction},
|
||||
transaction::VersionedTransaction,
|
||||
};
|
||||
use tokio::{
|
||||
sync::mpsc::{self, Sender, UnboundedSender},
|
||||
|
@ -122,15 +123,6 @@ pub struct TransactionService {
|
|||
|
||||
impl TransactionService {
|
||||
pub async fn send_transaction(
|
||||
&self,
|
||||
tx: Transaction,
|
||||
max_retries: Option<u16>,
|
||||
) -> anyhow::Result<String> {
|
||||
let raw_tx = bincode::serialize(&tx)?;
|
||||
self.send_wire_transaction(raw_tx, max_retries).await
|
||||
}
|
||||
|
||||
pub async fn send_wire_transaction(
|
||||
&self,
|
||||
raw_tx: Vec<u8>,
|
||||
max_retries: Option<u16>,
|
||||
|
@ -190,7 +182,7 @@ impl TransactionService {
|
|||
signature,
|
||||
last_valid_block_height: last_valid_blockheight,
|
||||
slot,
|
||||
transaction: Arc::new(raw_tx),
|
||||
transaction: raw_tx,
|
||||
prioritization_fee,
|
||||
};
|
||||
if let Err(e) = self
|
||||
|
|
Loading…
Reference in New Issue