Merge branch 'send_transactions_in_batches' into processing_transaction_errors

This commit is contained in:
godmodegalactus 2023-01-04 19:01:25 +01:00
commit eab035b07d
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
6 changed files with 540 additions and 436 deletions

718
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -26,31 +26,18 @@ simplelog = "0.12.0"
[dependencies]
solana-client = { git = "https://github.com/solana-labs/solana.git" }
solana-sdk = { git = "https://github.com/solana-labs/solana.git" }
solana-clap-utils = { git = "https://github.com/solana-labs/solana.git" }
solana-cli-config = { git = "https://github.com/solana-labs/solana.git" }
solana-pubsub-client = { git = "https://github.com/solana-labs/solana.git" }
solana-account-decoder = { git = "https://github.com/solana-labs/solana.git" }
solana-entry = { git = "https://github.com/solana-labs/solana.git" }
solana-faucet = { git = "https://github.com/solana-labs/solana.git" }
solana-gossip = { git = "https://github.com/solana-labs/solana.git" }
solana-ledger = { git = "https://github.com/solana-labs/solana.git" }
solana-measure = { git = "https://github.com/solana-labs/solana.git" }
solana-metrics = { git = "https://github.com/solana-labs/solana.git" }
solana-perf = { git = "https://github.com/solana-labs/solana.git" }
solana-poh = { git = "https://github.com/solana-labs/solana.git" }
solana-rayon-threadlimit = { git = "https://github.com/solana-labs/solana.git" }
solana-rpc-client-api = { git = "https://github.com/solana-labs/solana.git" }
solana-runtime = { git = "https://github.com/solana-labs/solana.git" }
solana-send-transaction-service = { git = "https://github.com/solana-labs/solana.git" }
solana-stake-program = { git = "https://github.com/solana-labs/solana.git" }
solana-storage-bigtable = { git = "https://github.com/solana-labs/solana.git" }
solana-streamer = { git = "https://github.com/solana-labs/solana.git" }
solana-tpu-client = { git = "https://github.com/solana-labs/solana.git" }
solana-transaction-status = { git = "https://github.com/solana-labs/solana.git" }
solana-version = { git = "https://github.com/solana-labs/solana.git" }
solana-vote-program = { git = "https://github.com/solana-labs/solana.git" }
solana-rpc = { git = "https://github.com/solana-labs/solana.git" }
solana-sdk = { git = "https://github.com/solana-labs/solana.git" }
solana-clap-utils = { git = "https://github.com/solana-labs/solana.git" }
solana-cli-config = { git = "https://github.com/solana-labs/solana.git" }
solana-pubsub-client = { git = "https://github.com/solana-labs/solana.git" }
solana-rpc-client-api = { git = "https://github.com/solana-labs/solana.git" }
solana-runtime = { git = "https://github.com/solana-labs/solana.git" }
solana-send-transaction-service = { git = "https://github.com/solana-labs/solana.git" }
solana-tpu-client = { git = "https://github.com/solana-labs/solana.git" }
solana-transaction-status = { git = "https://github.com/solana-labs/solana.git" }
solana-version = { git = "https://github.com/solana-labs/solana.git" }
solana-rpc = { git = "https://github.com/solana-labs/solana.git" }
solana-perf = { git = "https://github.com/solana-labs/solana.git" }
tokio = { version = "1.14.1", features = ["full"]}
tokio-util = { version = "0.6", features = ["codec", "compat"] }
@ -81,3 +68,4 @@ spl-token = { version = "=3.5.0", features = ["no-entrypoint"] }
spl-token-2022 = { version = "0.5.0", features = ["no-entrypoint"] }
stream-cancel = "0.8.1"
thiserror = "1.0.37"
chrono = "0.4.23"

View File

@ -18,7 +18,7 @@ pub struct Args {
pub subscription_port: String,
#[arg(short, long, default_value_t = String::from("http://localhost:8899"))]
pub rpc_url: String,
#[arg(short, long, default_value_t = String::new())]
#[arg(short, long, default_value_t = String::from("http://localhost:8900"))]
pub websocket_url: String,
}

View File

@ -1,3 +1,4 @@
use chrono::DateTime;
use crossbeam_channel::Sender;
use dashmap::DashMap;
use serde::Serialize;
@ -19,7 +20,7 @@ use std::{
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
time::{Duration, Instant, SystemTime},
};
use tokio::sync::broadcast;
@ -49,14 +50,32 @@ impl BlockInformation {
}
}
#[derive(Clone)]
pub struct SignatureStatus {
pub commitment_level: CommitmentLevel,
pub transaction_error: Option<TransactionError>,
pub status: Option<CommitmentLevel>,
pub error: Option<String>,
pub created: Instant,
}
impl SignatureStatus {
pub fn new() -> Self {
Self {
status: None,
error: None,
created: Instant::now(),
}
}
pub fn new_from_commitment(commitment: CommitmentLevel) -> Self {
Self {
status: Some(commitment),
error: None,
created: Instant::now(),
}
}
}
pub struct LiteRpcContext {
pub signature_status: DashMap<String, Option<SignatureStatus>>,
pub signature_status: DashMap<String, SignatureStatus>,
pub finalized_block_info: BlockInformation,
pub confirmed_block_info: BlockInformation,
pub notification_sender: Sender<NotificationType>,
@ -74,6 +93,11 @@ impl LiteRpcContext {
notification_sender,
}
}
pub fn remove_stale_data(&self, purgetime_in_seconds: u64) {
self.signature_status
.retain(|_k, v| v.created.elapsed().as_secs() < purgetime_in_seconds);
}
}
pub struct SignatureNotification {
@ -268,12 +292,15 @@ impl LiteRpcSubsrciptionControl {
pub struct PerformanceCounter {
pub total_confirmations: Arc<AtomicU64>,
pub total_transactions_sent: Arc<AtomicU64>,
pub transaction_sent_error: Arc<AtomicU64>,
pub confirmations_per_seconds: Arc<AtomicU64>,
pub transactions_per_seconds: Arc<AtomicU64>,
pub send_transactions_errors_per_seconds: Arc<AtomicU64>,
last_count_for_confirmations: Arc<AtomicU64>,
last_count_for_transactions_sent: Arc<AtomicU64>,
last_count_for_sent_errors: Arc<AtomicU64>,
}
impl PerformanceCounter {
@ -285,6 +312,9 @@ impl PerformanceCounter {
transactions_per_seconds: Arc::new(AtomicU64::new(0)),
last_count_for_confirmations: Arc::new(AtomicU64::new(0)),
last_count_for_transactions_sent: Arc::new(AtomicU64::new(0)),
transaction_sent_error: Arc::new(AtomicU64::new(0)),
last_count_for_sent_errors: Arc::new(AtomicU64::new(0)),
send_transactions_errors_per_seconds: Arc::new(AtomicU64::new(0)),
}
}
@ -293,6 +323,8 @@ impl PerformanceCounter {
let total_transactions: u64 = self.total_transactions_sent.load(Ordering::Relaxed);
let total_errors: u64 = self.transaction_sent_error.load(Ordering::Relaxed);
self.confirmations_per_seconds.store(
total_confirmations - self.last_count_for_confirmations.load(Ordering::Relaxed),
Ordering::Release,
@ -304,15 +336,17 @@ impl PerformanceCounter {
.load(Ordering::Relaxed),
Ordering::Release,
);
self.send_transactions_errors_per_seconds.store(
total_errors - self.last_count_for_sent_errors.load(Ordering::Relaxed),
Ordering::Release,
);
self.last_count_for_confirmations
.store(total_confirmations, Ordering::Relaxed);
self.last_count_for_transactions_sent
.store(total_transactions, Ordering::Relaxed);
}
pub fn update_sent_transactions_counter(&self) {
self.total_transactions_sent.fetch_add(1, Ordering::Relaxed);
self.last_count_for_sent_errors
.store(total_errors, Ordering::Relaxed);
}
pub fn update_confirm_transaction_counter(&self) {

View File

@ -23,7 +23,7 @@ use crate::{
};
use cli::Args;
fn run(port: String, subscription_port: String, rpc_url: String, websocket_url: String) {
fn run(port: u16, subscription_port: u16, rpc_url: String, websocket_url: String) {
let rpc_url = if rpc_url.is_empty() {
let (_, rpc_url) = ConfigInput::compute_json_rpc_url_setting(
rpc_url.as_str(),
@ -51,7 +51,7 @@ fn run(port: String, subscription_port: String, rpc_url: String, websocket_url:
let performance_counter = PerformanceCounter::new();
launch_performance_updating_thread(performance_counter.clone());
let (broadcast_sender, _broadcast_receiver) = broadcast::channel(128);
let (broadcast_sender, _broadcast_receiver) = broadcast::channel(10000);
let (notification_sender, notification_reciever) = crossbeam_channel::unbounded();
let pubsub_control = Arc::new(LiteRpcSubsrciptionControl::new(
@ -69,15 +69,16 @@ fn run(port: String, subscription_port: String, rpc_url: String, websocket_url:
subscription_port,
performance_counter.clone(),
);
{
let broadcast_thread = {
// build broadcasting thread
let pubsub_control = pubsub_control.clone();
std::thread::Builder::new()
.name("broadcasting thread".to_string())
.spawn(move || {
pubsub_control.start_broadcasting();
})
.unwrap();
}
.unwrap()
};
let mut io = MetaIoHandler::default();
let lite_rpc = lite_rpc::LightRpc;
io.extend_with(lite_rpc.to_delegate());
@ -88,6 +89,18 @@ fn run(port: String, subscription_port: String, rpc_url: String, websocket_url:
notification_sender,
performance_counter.clone(),
);
let cleaning_thread = {
// build cleaning thread
let context = request_processor.context.clone();
std::thread::Builder::new()
.name("cleaning thread".to_string())
.spawn(move || {
context.remove_stale_data(60 * 10);
sleep(std::time::Duration::from_secs(60 * 5))
})
.unwrap()
};
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
@ -108,7 +121,7 @@ fn run(port: String, subscription_port: String, rpc_url: String, websocket_url:
request_processor.clone()
})
.event_loop_executor(runtime.handle().clone())
.threads(1)
.threads(4)
.cors(DomainsValidation::AllowOnly(vec![
AccessControlAllowOrigin::Any,
]))

View File

@ -5,10 +5,12 @@ use solana_client::{
tpu_client::TpuClientConfig,
};
use solana_pubsub_client::pubsub_client::{PubsubBlockClientSubscription, PubsubClient};
use solana_sdk::transaction::Transaction;
use std::{
str::FromStr,
sync::Mutex,
thread::{Builder, JoinHandle},
time::{Duration, Instant},
};
use crate::context::{
@ -22,7 +24,8 @@ use {
jsonrpc_core::{Error, Metadata, Result},
jsonrpc_derive::rpc,
solana_client::connection_cache::ConnectionCache,
solana_client::{rpc_client::RpcClient, tpu_client::TpuClient},
solana_client::rpc_client::RpcClient,
solana_client::tpu_client::TpuClient,
solana_perf::packet::PACKET_DATA_SIZE,
solana_rpc_client_api::{
config::*,
@ -31,7 +34,6 @@ use {
solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel},
signature::Signature,
transaction::VersionedTransaction,
},
solana_transaction_status::{TransactionBinaryEncoding, UiTransactionEncoding},
std::{
@ -40,10 +42,11 @@ use {
},
};
const TPU_BATCH_SIZE: usize = 64;
#[derive(Clone)]
pub struct LightRpcRequestProcessor {
pub rpc_client: Arc<RpcClient>,
pub tpu_client: Arc<TpuClient>,
pub last_valid_block_height: u64,
pub ws_url: String,
pub context: Arc<LiteRpcContext>,
@ -51,6 +54,7 @@ pub struct LightRpcRequestProcessor {
joinables: Arc<Mutex<Vec<JoinHandle<()>>>>,
subscribed_clients: Arc<Mutex<Vec<PubsubBlockClientSubscription>>>,
performance_counter: PerformanceCounter,
tpu_producer_channel: Sender<Transaction>,
}
impl LightRpcRequestProcessor {
@ -62,15 +66,6 @@ impl LightRpcRequestProcessor {
) -> LightRpcRequestProcessor {
let rpc_client = Arc::new(RpcClient::new(json_rpc_url));
let connection_cache = Arc::new(ConnectionCache::default());
let tpu_client = Arc::new(
TpuClient::new_with_connection_cache(
rpc_client.clone(),
websocket_url,
TpuClientConfig::default(),
connection_cache.clone(),
)
.unwrap(),
);
let context = Arc::new(LiteRpcContext::new(rpc_client.clone(), notification_sender));
@ -82,6 +77,8 @@ impl LightRpcRequestProcessor {
let (client_finalized, receiver_finalized) =
Self::subscribe_block(websocket_url, CommitmentLevel::Finalized).unwrap();
let (tpu_producer, tpu_consumer) = crossbeam_channel::bounded(100000);
// create threads to listen for finalized and confrimed blocks
let joinables = vec![
Self::build_thread_to_process_blocks(
@ -94,11 +91,31 @@ impl LightRpcRequestProcessor {
&context,
CommitmentLevel::Finalized,
),
Self::build_thread_to_process_transactions(
json_rpc_url.to_string(),
websocket_url.to_string(),
connection_cache.clone(),
tpu_consumer.clone(),
performance_counter.clone(),
),
Self::build_thread_to_process_transactions(
json_rpc_url.to_string(),
websocket_url.to_string(),
connection_cache.clone(),
tpu_consumer.clone(),
performance_counter.clone(),
),
Self::build_thread_to_process_transactions(
json_rpc_url.to_string(),
websocket_url.to_string(),
connection_cache.clone(),
tpu_consumer,
performance_counter.clone(),
),
];
LightRpcRequestProcessor {
rpc_client,
tpu_client,
last_valid_block_height: 0,
ws_url: websocket_url.to_string(),
context,
@ -106,6 +123,7 @@ impl LightRpcRequestProcessor {
joinables: Arc::new(Mutex::new(joinables)),
subscribed_clients: Arc::new(Mutex::new(vec![client_confirmed, client_finalized])),
performance_counter,
tpu_producer_channel: tpu_producer,
}
}
@ -151,9 +169,72 @@ impl LightRpcRequestProcessor {
.unwrap()
}
fn build_thread_to_process_transactions(
json_rpc_url: String,
websocket_url: String,
connection_cache: Arc<ConnectionCache>,
receiver: Receiver<Transaction>,
performance_counters: PerformanceCounter,
) -> JoinHandle<()> {
Builder::new()
.name("thread working on confirmation block".to_string())
.spawn(move || {
let rpc_client =
Arc::new(RpcClient::new(json_rpc_url.to_string()));
let tpu_client = TpuClient::new_with_connection_cache(
rpc_client,
websocket_url.as_str(),
TpuClientConfig::default(), // value for max fanout slots
connection_cache.clone(),
);
let tpu_client = Arc::new(tpu_client.unwrap());
loop {
let recv_res = receiver.recv();
match recv_res {
Ok(transaction) => {
let mut transactions_vec = vec![transaction];
let mut time_remaining = Duration::from_micros(5000);
for _i in 1..TPU_BATCH_SIZE {
let start = std::time::Instant::now();
let another = receiver.recv_timeout(time_remaining);
match another {
Ok(x) => transactions_vec.push(x),
Err(_) => break,
}
match time_remaining.checked_sub(start.elapsed()) {
Some(x) => time_remaining = x,
None => break,
}
}
let count: u64 = transactions_vec.len() as u64;
let slice = transactions_vec.as_slice();
let fut_res = tpu_client.try_send_transaction_batch(slice);
match fut_res {
Ok(_) => performance_counters
.total_transactions_sent
.fetch_add(count, Ordering::Relaxed),
Err(e) => {
println!("Got error while sending transaction batch of size {}, error {}", count, e.to_string());
performance_counters
.transaction_sent_error
.fetch_add(count, Ordering::Relaxed)
}
};
}
Err(e) => {
println!("got error on tpu channel {}", e.to_string());
break;
}
};
}
}).unwrap()
}
fn process_block(
reciever: Receiver<RpcResponse<RpcBlockUpdate>>,
signature_status: &DashMap<String, Option<SignatureStatus>>,
signature_status: &DashMap<String, SignatureStatus>,
commitment: CommitmentLevel,
notification_sender: &crossbeam_channel::Sender<NotificationType>,
block_information: &BlockInformation,
@ -220,10 +301,11 @@ impl LightRpcRequestProcessor {
);
}
x.insert(Some(SignatureStatus {
commitment_level: commitment,
transaction_error: transaction_error,
}));
x.insert(SignatureStatus {
status: Some(commitment),
error: transaction_error,
created: Instant::now(),
});
}
dashmap::mapref::entry::Entry::Vacant(_x) => {
// do nothing transaction not sent by lite rpc
@ -281,7 +363,7 @@ pub mod lite_rpc {
use std::str::FromStr;
use itertools::Itertools;
use solana_sdk::{fee_calculator::FeeCalculator, pubkey::Pubkey};
use solana_sdk::{fee_calculator::FeeCalculator, pubkey::Pubkey, transaction::Transaction};
use solana_transaction_status::{TransactionConfirmationStatus, TransactionStatus};
use super::*;
@ -365,15 +447,22 @@ pub mod lite_rpc {
tx_encoding
))
})?;
let (wire_transaction, transaction) =
decode_and_deserialize::<VersionedTransaction>(data, binary_encoding)?;
let (_wire_transaction, transaction) =
decode_and_deserialize::<Transaction>(data, binary_encoding)?;
let signature = transaction.signatures[0].to_string();
meta.context
.signature_status
.insert(transaction.signatures[0].to_string(), None);
meta.tpu_client.send_wire_transaction(wire_transaction);
meta.performance_counter.update_sent_transactions_counter();
Ok(transaction.signatures[0].to_string())
.insert(signature.clone(), SignatureStatus::new());
match meta.tpu_producer_channel.send(transaction) {
Ok(_) => Ok(signature),
Err(e) => {
println!("got error while sending on channel {}", e.to_string());
Err(jsonrpc_core::Error::new(
jsonrpc_core::ErrorCode::InternalError,
))
}
}
}
fn get_recent_blockhash(