Adding custom tpu send transaction example (#380)

* Adding an example for custom tpu send transaction

* Fixing the custom tpu example

* Optimizing SentTransactionInfo, and calculating TPS

* Reverting unwanted changes

* After groovies review
This commit is contained in:
galactus 2024-04-02 14:26:54 +02:00 committed by GitHub
parent 4d7700145e
commit 681334197f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 667 additions and 57 deletions

14
Cargo.lock generated
View File

@ -1220,8 +1220,22 @@ dependencies = [
name = "custom-tpu-send-transactions"
version = "0.2.4"
dependencies = [
"anyhow",
"bincode",
"clap 4.5.4",
"dashmap 5.5.3",
"futures",
"itertools 0.10.5",
"log",
"rand 0.8.5",
"rand_chacha 0.3.1",
"solana-lite-rpc-cluster-endpoints",
"solana-lite-rpc-core",
"solana-lite-rpc-services",
"solana-rpc-client",
"solana-sdk",
"tokio",
"tracing-subscriber",
]
[[package]]

View File

@ -14,7 +14,6 @@ pub fn poll_cluster_info(
loop {
match rpc_client.get_cluster_nodes().await {
Ok(cluster_nodes) => {
debug!("get cluster_nodes from rpc: {:?}", cluster_nodes.len());
if let Err(e) = contact_info_sender.send(cluster_nodes) {
warn!("rpc_cluster_info channel has no receivers {e:?}");
}
@ -23,7 +22,7 @@ pub fn poll_cluster_info(
Err(error) => {
warn!("rpc_cluster_info failed <{:?}> - retrying", error);
// throttle
tokio::time::sleep(Duration::from_secs(2500)).await;
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
}
@ -51,7 +50,7 @@ pub fn poll_vote_accounts(
Err(error) => {
warn!("rpc_vote_accounts failed <{:?}> - retrying", error);
// throttle
tokio::time::sleep(Duration::from_secs(2500)).await;
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
}

View File

@ -8,5 +8,6 @@ pub mod stores;
pub mod structures;
pub mod traits;
pub mod types;
pub mod utils;
pub type AnyhowJoinHandle = tokio::task::JoinHandle<anyhow::Result<()>>;

View File

@ -127,7 +127,7 @@ impl PrioritizationFeesHeap {
#[cfg(test)]
mod tests {
use solana_sdk::signature::Signature;
use std::time::Duration;
use std::{sync::Arc, time::Duration};
use crate::structures::{
prioritization_fee_heap::PrioritizationFeesHeap, transaction_sent_info::SentTransactionInfo,
@ -139,7 +139,7 @@ mod tests {
let tx_creator = |signature, prioritization_fee| SentTransactionInfo {
signature,
slot: 0,
transaction: vec![],
transaction: Arc::new(vec![]),
last_valid_block_height: 0,
prioritization_fee,
};
@ -205,7 +205,7 @@ mod tests {
let info = SentTransactionInfo {
signature: Signature::new_unique(),
slot: height + 1,
transaction: vec![],
transaction: Arc::new(vec![]),
last_valid_block_height: height + 10,
prioritization_fee,
};

View File

@ -1,3 +1,5 @@
use std::sync::Arc;
use solana_sdk::signature::Signature;
use solana_sdk::slot_history::Slot;
@ -7,7 +9,7 @@ pub type WireTransaction = Vec<u8>;
pub struct SentTransactionInfo {
pub signature: Signature,
pub slot: Slot,
pub transaction: WireTransaction,
pub transaction: Arc<WireTransaction>,
pub last_valid_block_height: u64,
pub prioritization_fee: u64,
}

33
core/src/utils.rs Normal file
View File

@ -0,0 +1,33 @@
use std::time::Duration;
use log::debug;
use solana_sdk::commitment_config::CommitmentConfig;
use tokio::time::{timeout, Instant};
use crate::{structures::block_info::BlockInfo, types::BlockInfoStream};
pub async fn wait_till_block_of_commitment_is_recieved(
mut blockinfo_stream: BlockInfoStream,
commitment_config: CommitmentConfig,
) -> BlockInfo {
let started = Instant::now();
loop {
match timeout(Duration::from_millis(1000), blockinfo_stream.recv()).await {
Ok(Ok(block_info)) => {
if block_info.commitment_config == commitment_config {
return block_info;
}
}
Err(_elapsed) => {
debug!(
"waiting for latest block info ({}) ... {:.02}ms",
commitment_config.commitment,
started.elapsed().as_secs_f32() * 1000.0
);
}
Ok(Err(error)) => {
panic!("Did not recv block info : {error:?}");
}
}
}
}

View File

@ -10,4 +10,20 @@ edition.workspace = true
[dependencies]
solana-lite-rpc-services = {workspace = true}
solana-lite-rpc-core = {workspace = true}
solana-lite-rpc-core = {workspace = true}
solana-lite-rpc-cluster-endpoints = {workspace = true}
solana-sdk = { workspace = true }
solana-rpc-client = { workspace = true }
tokio = "1.28.2"
clap = { workspace = true }
anyhow = { workspace = true }
dashmap = { workspace = true }
rand = "0.8.5"
rand_chacha = "0.3.1"
log = { workspace = true }
itertools = { workspace = true }
bincode = { workspace = true }
futures = { workspace = true }
tracing-subscriber = { workspace = true }

View File

@ -0,0 +1,39 @@
use clap::Parser;
#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
pub struct Args {
/// config.json
#[arg(short, long, default_value = "http://127.0.0.1:8899")]
pub rpc_url: String,
#[arg(short, long)]
pub grpc_url: Option<String>,
#[arg(short, long)]
pub x_token: Option<String>,
#[arg(short, long)]
pub transaction_count: Option<usize>,
#[arg(short, long, default_value_t = 1)]
pub number_of_seconds: usize,
#[arg(short, long)]
pub fee_payer: String,
#[arg(short, long)]
pub staked_identity: Option<String>,
#[arg(short, long)]
pub priority_fees: Option<u64>,
#[arg(short = 'a', long, default_value_t = 256)]
pub additional_signers: usize,
#[arg(short = 'b', long, default_value_t = 0.1)]
pub signers_transfer_balance: f64,
#[arg(long)]
pub fanout_slots: Option<u64>,
}

View File

@ -1,3 +1,519 @@
fn main() {
todo!()
use std::{collections::HashSet, ops::Mul, str::FromStr, sync::Arc, time::Duration};
use clap::Parser;
use dashmap::{DashMap, DashSet};
use itertools::Itertools;
use rand::{
distributions::{Alphanumeric, Distribution},
SeedableRng,
};
use solana_lite_rpc_cluster_endpoints::{
geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig},
grpc_subscription::create_grpc_subscription,
json_rpc_leaders_getter::JsonRpcLeaderGetter,
json_rpc_subscription::create_json_rpc_polling_subscription,
};
use solana_lite_rpc_core::{
keypair_loader::load_identity_keypair,
stores::{
block_information_store::{BlockInformation, BlockInformationStore},
cluster_info_store::ClusterInfo,
data_cache::{DataCache, SlotCache},
subscription_store::SubscriptionStore,
tx_store::TxStore,
},
structures::{
epoch::EpochCache, identity_stakes::IdentityStakes, leaderschedule::CalculatedSchedule,
transaction_sent_info::SentTransactionInfo,
},
utils::wait_till_block_of_commitment_is_recieved,
};
use solana_lite_rpc_services::{
data_caching_service::DataCachingService,
quic_connection_utils::QuicConnectionParameters,
tpu_utils::{
tpu_connection_path::TpuConnectionPath,
tpu_service::{TpuService, TpuServiceConfig},
},
transaction_replayer::TransactionReplayer,
transaction_service::TransactionServiceBuilder,
tx_sender::TxSender,
};
use solana_sdk::{
commitment_config::CommitmentConfig,
compute_budget,
hash::Hash,
instruction::Instruction,
message::Message,
native_token::LAMPORTS_PER_SOL,
pubkey::Pubkey,
signature::{Keypair, Signature},
signer::Signer,
system_instruction,
transaction::Transaction,
};
use tokio::sync::{Mutex, RwLock};
use crate::cli::Args;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
mod cli;
const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr";
pub fn create_memo_tx(msg: &[u8], payer: &Keypair, blockhash: Hash, prio_fees: u64) -> Transaction {
let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap();
let cb_1 = compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(7000);
let cb_2 = compute_budget::ComputeBudgetInstruction::set_compute_unit_price(prio_fees);
let instruction = Instruction::new_with_bytes(memo, msg, vec![]);
let message = Message::new(&[cb_1, cb_2, instruction], Some(&payer.pubkey()));
Transaction::new(&[payer], message, blockhash)
}
pub fn generate_random_strings(
num_of_txs: usize,
random_seed: Option<u64>,
n_chars: usize,
) -> Vec<Vec<u8>> {
let seed = random_seed.map_or(0, |x| x);
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(seed);
(0..num_of_txs)
.map(|_| Alphanumeric.sample_iter(&mut rng).take(n_chars).collect())
.collect()
}
pub async fn create_signers_from_payer(
rpc_client: Arc<RpcClient>,
payer: Arc<Keypair>,
nb_signers: usize,
signer_balance: u64,
prio_fees: u64,
) -> Vec<Arc<Keypair>> {
let signers = (0..nb_signers)
.map(|_| Arc::new(Keypair::new()))
.collect_vec();
let mut signers_to_transfer: HashSet<Pubkey> = signers.iter().map(|kp| kp.pubkey()).collect();
while !signers_to_transfer.is_empty() {
let Ok(blockhash) = rpc_client.get_latest_blockhash().await else {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
};
let cb_1 = compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(5000);
let cb_2 = compute_budget::ComputeBudgetInstruction::set_compute_unit_price(prio_fees);
let transactions = signers_to_transfer
.iter()
.map(|signer| {
let instruction =
system_instruction::transfer(&payer.pubkey(), signer, signer_balance);
let message = Message::new(
&[cb_1.clone(), cb_2.clone(), instruction],
Some(&payer.pubkey()),
);
(*signer, Transaction::new(&[&payer], message, blockhash))
})
.collect_vec();
let tasks = transactions
.iter()
.map(|(signer, tx)| {
let rpc_client = rpc_client.clone();
let tx = tx.clone();
let signer = *signer;
tokio::spawn(
async move { (signer, rpc_client.send_and_confirm_transaction(&tx).await) },
)
})
.collect_vec();
let results = futures::future::join_all(tasks).await;
for result in results {
match result {
Ok((signer, Ok(_signature))) => {
signers_to_transfer.remove(&signer);
}
Ok((signer, Err(e))) => {
log::error!("Error transfering to {signer:?}, {e:?}");
}
_ => {
// retry
}
}
}
}
signers
}
pub async fn transfer_back_to_payer(
rpc_client: Arc<RpcClient>,
payer: Arc<Keypair>,
signers: Vec<Arc<Keypair>>,
prio_fees: u64,
) {
let mut signers_to_transfer: HashSet<Pubkey> = signers.iter().map(|kp| kp.pubkey()).collect();
let payer_pubkey = payer.pubkey();
while !signers_to_transfer.is_empty() {
let Ok(blockhash) = rpc_client.get_latest_blockhash().await else {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
};
let transfers = signers
.iter()
.map(|signer| {
let rpc_client = rpc_client.clone();
let signer = signer.clone();
tokio::spawn(async move {
let balance = rpc_client.get_balance(&signer.pubkey()).await.unwrap();
let cb_1 =
compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(5000);
let cb_2 =
compute_budget::ComputeBudgetInstruction::set_compute_unit_price(prio_fees);
let balance_to_transfer = balance
.saturating_sub(5000)
.saturating_sub(5000 * prio_fees);
let instruction = system_instruction::transfer(
&signer.pubkey(),
&payer_pubkey,
balance_to_transfer,
);
let message = Message::new(
&[cb_1.clone(), cb_2.clone(), instruction],
Some(&signer.pubkey()),
);
(
signer.pubkey(),
rpc_client
.send_and_confirm_transaction(&Transaction::new(
&[&signer],
message,
blockhash,
))
.await,
)
})
})
.collect_vec();
let results = futures::future::join_all(transfers).await;
for result in results {
match result {
Ok((signer, Ok(_signature))) => {
signers_to_transfer.remove(&signer);
}
Ok((signer, Err(e))) => {
log::error!("Error transfering to {signer:?}, {e:?}");
}
_ => {
// retry
}
}
}
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let args = Args::parse();
let rpc_url = args.rpc_url;
let rpc_client = Arc::new(RpcClient::new(rpc_url));
let leader_schedule = Arc::new(JsonRpcLeaderGetter::new(rpc_client.clone(), 1024, 128));
let fee_payer = Arc::new(
load_identity_keypair(Some(args.fee_payer))
.await
.expect("Payer should be set or keypair file not found")
.unwrap(),
);
let priority_fee = args.priority_fees.unwrap_or_default();
let nb_signers = args.additional_signers;
let signer_balance = args.signers_transfer_balance.mul(LAMPORTS_PER_SOL as f64) as u64;
let signers = create_signers_from_payer(
rpc_client.clone(),
fee_payer.clone(),
nb_signers,
signer_balance,
priority_fee,
)
.await;
println!(
"Creating {} users with {} SOL balance",
nb_signers, signer_balance
);
let validator_identity = Arc::new(
load_identity_keypair(args.staked_identity)
.await?
.unwrap_or_else(Keypair::new),
);
// START ALL SERVICES REQUIRED BY LITE_RPC
// setup endpoint, GRPC/RPC Polling
println!("Setting up lite-rpc tpu service");
let (endpoints, _handles) = if let Some(grpc_addr) = args.grpc_url {
let timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(10),
request_timeout: Duration::from_secs(10),
subscribe_timeout: Duration::from_secs(10),
receive_timeout: Duration::from_secs(10),
};
create_grpc_subscription(
rpc_client.clone(),
vec![GrpcSourceConfig::new(
grpc_addr,
args.x_token.clone(),
None,
timeouts,
)],
vec![],
)?
} else {
create_json_rpc_polling_subscription(rpc_client.clone(), 100)?
};
let finalized_block_information = wait_till_block_of_commitment_is_recieved(
endpoints.blockinfo_notifier.resubscribe(),
CommitmentConfig::finalized(),
)
.await;
let block_height = rpc_client
.get_block_height_with_commitment(CommitmentConfig::finalized())
.await?;
let (blockhash, _) = rpc_client
.get_latest_blockhash_with_commitment(CommitmentConfig::finalized())
.await?;
let finalize_slot = finalized_block_information.slot;
println!(
"finalized blockheight : {:?}, slot: {}, hash: {}",
finalized_block_information.block_height,
finalized_block_information.slot,
finalized_block_information.blockhash
);
println!(
"From RPC blockheight : {block_height:?}, hash: {}",
blockhash
);
let finalized_block_information = BlockInformation {
slot: finalized_block_information.slot,
block_height,
last_valid_blockheight: finalized_block_information.block_height + 300,
cleanup_slot: finalized_block_information.slot + 1000000,
blockhash: finalized_block_information.blockhash,
commitment_config: CommitmentConfig::finalized(),
block_time: 0,
};
let block_information_store = BlockInformationStore::new(finalized_block_information);
let data_cache = DataCache {
block_information_store,
cluster_info: ClusterInfo::default(),
identity_stakes: IdentityStakes::new(validator_identity.pubkey()),
slot_cache: SlotCache::new(finalize_slot),
tx_subs: SubscriptionStore::default(),
txs: TxStore {
store: Arc::new(DashMap::new()),
},
epoch_data: EpochCache::new_for_tests(),
leader_schedule: Arc::new(RwLock::new(CalculatedSchedule::default())),
};
let data_cache_service = DataCachingService {
data_cache: data_cache.clone(),
clean_duration: Duration::from_secs(120),
};
// start listning the cluster data and filling the cache
data_cache_service.listen(
endpoints.blocks_notifier.resubscribe(),
endpoints.blockinfo_notifier,
endpoints.slot_notifier.resubscribe(),
endpoints.cluster_info_notifier,
endpoints.vote_account_notifier,
);
let count = args.transaction_count.unwrap_or(10);
let prioritization_heap_size = Some(count * args.number_of_seconds);
let tpu_config = TpuServiceConfig {
fanout_slots: args.fanout_slots.unwrap_or(16),
maximum_transaction_in_queue: 2000000,
quic_connection_params: QuicConnectionParameters {
connection_timeout: Duration::from_secs(60),
connection_retry_count: 10,
finalize_timeout: Duration::from_millis(10000),
max_number_of_connections: 4,
unistream_timeout: Duration::from_millis(1000),
write_timeout: Duration::from_secs(10),
number_of_transactions_per_unistream: 1,
unistreams_to_create_new_connection_in_percentage: 5,
prioritization_heap_size,
},
tpu_connection_path: TpuConnectionPath::QuicDirectPath,
};
let tpu_service: TpuService = TpuService::new(
tpu_config,
validator_identity,
leader_schedule,
data_cache.clone(),
)
.await?;
let transaction_service_builder = TransactionServiceBuilder::new(
TxSender::new(data_cache.clone(), tpu_service.clone()),
TransactionReplayer::new(
tpu_service.clone(),
data_cache.clone(),
Duration::from_secs(1),
),
tpu_service,
10000,
);
let (transaction_service, _) = transaction_service_builder.start(
None,
data_cache.block_information_store.clone(),
10,
endpoints.slot_notifier,
);
// CREATE TRANSACTIONS
log::info!("Creating memo transactions");
let memo_msgs = generate_random_strings(count * args.number_of_seconds, None, 5);
let mut tx_to_confirm = vec![];
let mut second = 1;
let mut signer_count = 0;
let map_of_signature = Arc::new(DashSet::<Signature>::new());
let transactions_in_blocks = Arc::new(Mutex::new(Vec::<u64>::new()));
let mut block_stream = endpoints.blocks_notifier;
let block_tps_task = {
let map_of_signature = map_of_signature.clone();
let transactions_in_blocks = transactions_in_blocks.clone();
tokio::spawn(async move {
let mut start_tracking = false;
while let Ok(block) = block_stream.recv().await {
let mut count = 0;
for transaction in &block.transactions {
if map_of_signature.contains(&transaction.signature) {
count += 1;
map_of_signature.remove(&transaction.signature);
}
}
// start tracking once we have first block with some transactions sent by us
if start_tracking || count > 0 {
start_tracking = true;
let mut lk = transactions_in_blocks.lock().await;
lk.push(count);
}
}
})
};
for chunk in memo_msgs.chunks(count) {
let instant = tokio::time::Instant::now();
let mut current_txs = vec![];
println!("Sending memo transactions :{}", second);
second += 1;
let bh = data_cache
.block_information_store
.get_latest_blockhash(CommitmentConfig::finalized())
.await;
let last_valid_block_height =
data_cache.block_information_store.get_last_blockheight() + 300;
let transactions = chunk
.iter()
.map(|x| {
signer_count += 1;
create_memo_tx(x, &signers[signer_count % nb_signers], bh, priority_fee)
})
.collect_vec();
for transaction in transactions {
let signature = transaction.signatures[0];
let raw_tx = bincode::serialize(&transaction).unwrap();
let slot = data_cache.slot_cache.get_current_slot();
map_of_signature.insert(signature);
let transaction_info = SentTransactionInfo {
signature,
last_valid_block_height,
slot,
transaction: Arc::new(raw_tx),
prioritization_fee: priority_fee,
};
let _ = transaction_service
.transaction_channel
.send(transaction_info.clone())
.await;
current_txs.push(signature);
}
tx_to_confirm.push(current_txs);
let millis = instant.elapsed().as_millis() as u64;
if millis < 1000 {
tokio::time::sleep(Duration::from_millis(1000 - millis)).await;
} else {
println!("took {millis:?} millis to send {count:?} transactions");
}
}
println!(
"{} memo transactions sent, waiting for a minute to confirm them",
count * args.number_of_seconds
);
tokio::time::sleep(Duration::from_secs(120)).await;
let mut second = 1;
for seconds_sigs in tx_to_confirm {
let mut tx_confirmed = 0;
for sig in seconds_sigs {
if data_cache.txs.is_transaction_confirmed(&sig) {
tx_confirmed += 1;
}
}
println!(
"{} or {} transactions were confirmed for the {} second",
tx_confirmed, count, second
);
second += 1;
}
block_tps_task.abort();
let lk = transactions_in_blocks.lock().await;
// stop tracking by removing trailling 0s
let mut vec = lk.clone();
vec.reverse();
let mut transaction_blocks = vec.iter().skip_while(|x| **x == 0).cloned().collect_vec();
transaction_blocks.reverse();
println!(
"BLOCKS transactions : {}",
transaction_blocks.iter().map(|x| x.to_string()).join(", ")
);
let sum = transaction_blocks.iter().sum::<u64>();
let seconds = (transaction_blocks.len() * 400 / 1000) as u64;
let tps = sum / seconds;
println!("EFFECTIVE TPS: {tps:?}");
println!("Transfering remaining lamports to payer");
transfer_back_to_payer(rpc_client, fee_payer, signers, priority_fee).await;
Ok(())
}

View File

@ -377,7 +377,7 @@ impl LiteRpcServer for LiteBridge {
match self
.transaction_service
.send_transaction(raw_tx, max_retries)
.send_wire_transaction(raw_tx, max_retries)
.await
{
Ok(sig) => {

View File

@ -11,7 +11,7 @@ use lite_rpc::postgres_logger::PostgresLogger;
use lite_rpc::service_spawner::ServiceSpawner;
use lite_rpc::start_server::start_servers;
use lite_rpc::DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE;
use log::{debug, info};
use log::info;
use solana_lite_rpc_accounts::account_service::AccountService;
use solana_lite_rpc_accounts::account_store_interface::AccountStorageInterface;
use solana_lite_rpc_accounts::inmemory_account_store::InmemoryAccountStore;
@ -44,7 +44,8 @@ use solana_lite_rpc_core::structures::{
epoch::EpochCache, identity_stakes::IdentityStakes, notifications::NotificationSender,
};
use solana_lite_rpc_core::traits::address_lookup_table_interface::AddressLookupTableInterface;
use solana_lite_rpc_core::types::{BlockInfoStream, BlockStream};
use solana_lite_rpc_core::types::BlockStream;
use solana_lite_rpc_core::utils::wait_till_block_of_commitment_is_recieved;
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_lite_rpc_prioritization_fees::account_prio_service::AccountPrioService;
use solana_lite_rpc_services::data_caching_service::DataCachingService;
@ -54,7 +55,6 @@ use solana_lite_rpc_services::transaction_replayer::TransactionReplayer;
use solana_lite_rpc_services::tx_sender::TxSender;
use lite_rpc::postgres_logger;
use solana_lite_rpc_core::structures::block_info::BlockInfo;
use solana_lite_rpc_prioritization_fees::start_block_priofees_task;
use solana_lite_rpc_util::obfuscate_rpcurl;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
@ -67,7 +67,6 @@ use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::sync::mpsc;
use tokio::sync::RwLock;
use tokio::time::{timeout, Instant};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::EnvFilter;
@ -76,32 +75,6 @@ use tracing_subscriber::EnvFilter;
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
async fn get_latest_block_info(
mut blockinfo_stream: BlockInfoStream,
commitment_config: CommitmentConfig,
) -> BlockInfo {
let started = Instant::now();
loop {
match timeout(Duration::from_millis(500), blockinfo_stream.recv()).await {
Ok(Ok(block_info)) => {
if block_info.commitment_config == commitment_config {
return block_info;
}
}
Err(_elapsed) => {
debug!(
"waiting for latest block info ({}) ... {:.02}ms",
commitment_config.commitment,
started.elapsed().as_secs_f32() * 1000.0
);
}
Ok(Err(_error)) => {
panic!("Did not recv block info");
}
}
}
}
pub async fn start_postgres(
config: Option<postgres_logger::PostgresSessionConfig>,
) -> anyhow::Result<(Option<NotificationSender>, AnyhowJoinHandle)> {
@ -255,7 +228,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
};
info!("Waiting for first finalized block info...");
let finalized_block_info = get_latest_block_info(
let finalized_block_info = wait_till_block_of_commitment_is_recieved(
blockinfo_notifier.resubscribe(),
CommitmentConfig::finalized(),
)
@ -339,7 +312,6 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
};
let spawner = ServiceSpawner {
prometheus_addr,
data_cache: data_cache.clone(),
};
//init grpc leader schedule and vote account is configured.
@ -364,7 +336,8 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
slot_notifier.resubscribe(),
);
let support_service = tokio::spawn(async move { spawner.spawn_support_services().await });
let support_service =
tokio::spawn(async move { spawner.spawn_support_services(prometheus_addr).await });
let history = History::new();

View File

@ -17,15 +17,14 @@ use solana_lite_rpc_services::{
use std::time::Duration;
pub struct ServiceSpawner {
pub prometheus_addr: String,
pub data_cache: DataCache,
}
impl ServiceSpawner {
/// spawn services that support the whole system
pub async fn spawn_support_services(&self) -> anyhow::Result<()> {
pub async fn spawn_support_services(&self, prometheus_addr: String) -> anyhow::Result<()> {
// spawn prometheus
let prometheus = PrometheusSync::sync(self.prometheus_addr.clone());
let prometheus = PrometheusSync::sync(prometheus_addr.clone());
// spawn metrics capture
let metrics = MetricsCapture::new(self.data_cache.txs.clone()).capture();

View File

@ -61,6 +61,7 @@ const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameter
write_timeout: Duration::from_secs(2),
number_of_transactions_per_unistream: 10,
unistreams_to_create_new_connection_in_percentage: 10,
prioritization_heap_size: None,
};
#[test]
@ -743,7 +744,7 @@ pub fn build_raw_sample_tx(i: u32) -> SentTransactionInfo {
let tx = build_sample_tx(&payer_keypair, i);
let transaction =
bincode::serialize::<VersionedTransaction>(&tx).expect("failed to serialize tx");
Arc::new(bincode::serialize::<VersionedTransaction>(&tx).expect("failed to serialize tx"));
SentTransactionInfo {
signature: *tx.get_signature(),

View File

@ -124,7 +124,7 @@ impl QuicConnection {
}
}
pub async fn send_transaction(&self, tx: Vec<u8>) {
pub async fn send_transaction(&self, tx: &Vec<u8>) {
let connection_retry_count = self.connection_params.connection_retry_count;
for _ in 0..connection_retry_count {
let mut do_retry = false;
@ -158,7 +158,7 @@ impl QuicConnection {
let write_add_result = tokio::select! {
res = QuicConnectionUtils::write_all(
send_stream,
&tx,
tx,
self.identity,
self.connection_params,
) => {

View File

@ -83,6 +83,7 @@ pub struct QuicConnectionParameters {
pub max_number_of_connections: usize,
pub number_of_transactions_per_unistream: usize,
pub unistreams_to_create_new_connection_in_percentage: u8,
pub prioritization_heap_size: Option<usize>,
}
impl Default for QuicConnectionParameters {
@ -96,6 +97,7 @@ impl Default for QuicConnectionParameters {
max_number_of_connections: 8,
number_of_transactions_per_unistream: 1,
unistreams_to_create_new_connection_in_percentage: 10,
prioritization_heap_size: None,
}
}
}

View File

@ -179,6 +179,7 @@ impl QuicProxyConnectionManager {
transaction,
..
}) => {
let transaction = transaction.as_ref().clone();
TxData::new(signature, transaction)
},
Err(e) => {
@ -195,6 +196,7 @@ impl QuicProxyConnectionManager {
transaction,
..
}) => {
let transaction = transaction.as_ref().clone();
txs.push(TxData::new(signature, transaction));
},
Err(TryRecvError::Empty) => {

View File

@ -99,8 +99,11 @@ impl ActiveConnection {
max_number_of_connections,
max_uni_stream_connections,
);
let priorization_heap = PrioritizationFeesHeap::new(2 * max_uni_stream_connections);
let prioritization_heap_size = self
.connection_parameters
.prioritization_heap_size
.unwrap_or(2 * max_uni_stream_connections);
let priorization_heap = PrioritizationFeesHeap::new(prioritization_heap_size);
let heap_filler_task = {
let priorization_heap = priorization_heap.clone();
@ -202,7 +205,7 @@ impl ActiveConnection {
NB_QUIC_TASKS.inc();
connection.send_transaction(tx.transaction).await;
connection.send_transaction(tx.transaction.as_ref()).await;
timer.observe_duration();
NB_QUIC_TASKS.dec();
});

View File

@ -15,6 +15,7 @@ use solana_lite_rpc_core::types::SlotStream;
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_sdk::{quic::QUIC_PORT_OFFSET, signature::Keypair, slot_history::Slot};
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use std::collections::HashMap;
use std::{
net::{IpAddr, Ipv4Addr},
sync::Arc,
@ -127,7 +128,7 @@ impl TpuService {
.get_slot_leaders(current_slot, last_slot)
.await?;
// get next leader with its tpu port
let connections_to_keep = next_leaders
let connections_to_keep: HashMap<_, _> = next_leaders
.iter()
.map(|x| {
let contact_info = cluster_nodes.get(&x.pubkey);

View File

@ -1,7 +1,7 @@
// This class will manage the lifecycle for a transaction
// It will send, replay if necessary and confirm by listening to blocks
use std::time::Duration;
use std::{sync::Arc, time::Duration};
use crate::{
tpu_utils::tpu_service::TpuService,
@ -22,7 +22,7 @@ use solana_lite_rpc_core::{
use solana_sdk::{
borsh0_10::try_from_slice_unchecked,
compute_budget::{self, ComputeBudgetInstruction},
transaction::VersionedTransaction,
transaction::{Transaction, VersionedTransaction},
};
use tokio::{
sync::mpsc::{self, Sender, UnboundedSender},
@ -122,6 +122,15 @@ pub struct TransactionService {
impl TransactionService {
pub async fn send_transaction(
&self,
tx: Transaction,
max_retries: Option<u16>,
) -> anyhow::Result<String> {
let raw_tx = bincode::serialize(&tx)?;
self.send_wire_transaction(raw_tx, max_retries).await
}
pub async fn send_wire_transaction(
&self,
raw_tx: Vec<u8>,
max_retries: Option<u16>,
@ -173,7 +182,7 @@ impl TransactionService {
signature,
last_valid_block_height: last_valid_blockheight,
slot,
transaction: raw_tx,
transaction: Arc::new(raw_tx),
prioritization_fee,
};
if let Err(e) = self