use async channels

This commit is contained in:
aniketfuryrocks 2023-02-22 19:28:14 +05:30
parent c9fbf2cdeb
commit 1fb4ad981e
No known key found for this signature in database
GPG Key ID: FA6BFCFAA7D4B764
5 changed files with 392 additions and 404 deletions

609
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -23,7 +23,7 @@ solana-transaction-status = { git = "https://github.com/blockworks-foundation/so
solana-version= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" } solana-version= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-client= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" } solana-client= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
serde = { version = "1.0.152", features = ["derive"] } serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.92" serde_json = "1.0.93"
tokio = { version = "1.25.0", features = ["full"]} tokio = { version = "1.25.0", features = ["full"]}
bincode = "1.3.3" bincode = "1.3.3"
bs58 = "0.4.0" bs58 = "0.4.0"
@ -33,7 +33,7 @@ futures = "0.3.26"
bytes = "1.4.0" bytes = "1.4.0"
anyhow = "1.0.69" anyhow = "1.0.69"
log = "0.4.17" log = "0.4.17"
clap = { version = "4.1.4", features = ["derive"] } clap = { version = "4.1.6", features = ["derive"] }
dashmap = "5.4.0" dashmap = "5.4.0"
const_env = "0.1.2" const_env = "0.1.2"
jsonrpsee = { version = "0.16.2", features = ["macros", "full"] } jsonrpsee = { version = "0.16.2", features = ["macros", "full"] }
@ -44,3 +44,4 @@ postgres-native-tls = "0.5.0"
prometheus = "0.13.3" prometheus = "0.13.3"
lazy_static = "1.4.0" lazy_static = "1.4.0"
dotenv = "0.15.0" dotenv = "0.15.0"
async-channel = "1.8.0"

View File

@ -9,9 +9,9 @@ solana-rpc-client = { git = "https://github.com/blockworks-foundation/solana", b
log = "0.4.17" log = "0.4.17"
anyhow = "1.0.69" anyhow = "1.0.69"
serde = "1.0.152" serde = "1.0.152"
serde_json = "1.0.92" serde_json = "1.0.93"
csv = "1.1.6" csv = "1.2.0"
clap = { version = "4.1.4", features = ["derive"] } clap = { version = "4.1.6", features = ["derive"] }
tokio = { version = "1.25.0", features = ["full", "fs"]} tokio = { version = "1.25.0", features = ["full", "fs"]}
tracing-subscriber = "0.3.16" tracing-subscriber = "0.3.16"
dirs = "4.0.0" dirs = "4.0.0"

View File

@ -63,7 +63,7 @@ impl TpuManager {
ws_addr, ws_addr,
fanout_slots, fanout_slots,
error_count: Default::default(), error_count: Default::default(),
connection_cache: connection_cache, connection_cache,
}) })
} }
@ -101,7 +101,7 @@ impl TpuManager {
Ok(()) Ok(())
} }
async fn get_tpu_client(&self) -> Arc<QuicTpuClient> { pub async fn get_tpu_client(&self) -> Arc<QuicTpuClient> {
self.tpu_client.read().await.clone() self.tpu_client.read().await.clone()
} }

View File

@ -10,7 +10,7 @@ use log::{info, warn};
use prometheus::{register_counter, Counter}; use prometheus::{register_counter, Counter};
use solana_transaction_status::TransactionStatus; use solana_transaction_status::TransactionStatus;
use tokio::{ use tokio::{
sync::{mpsc::UnboundedReceiver, TryAcquireError}, sync::mpsc::{error::TryRecvError, UnboundedReceiver},
task::JoinHandle, task::JoinHandle,
}; };
@ -35,8 +35,6 @@ pub struct TxSender {
pub txs_sent: Arc<DashMap<String, TxProps>>, pub txs_sent: Arc<DashMap<String, TxProps>>,
/// TpuClient to call the tpu port /// TpuClient to call the tpu port
pub tpu_manager: Arc<TpuManager>, pub tpu_manager: Arc<TpuManager>,
counting_semaphore: Arc<tokio::sync::Semaphore>,
} }
/// Transaction Properties /// Transaction Properties
@ -60,7 +58,57 @@ impl TxSender {
Self { Self {
tpu_manager, tpu_manager,
txs_sent: Default::default(), txs_sent: Default::default(),
counting_semaphore: Arc::new(tokio::sync::Semaphore::new(5)), }
}
/// retry enqued_tx(s)
async fn forward_txs(
&self,
sigs_and_slots: Vec<(String, u64)>,
txs: Vec<WireTransaction>,
postgres: Option<PostgresMpscSend>,
) {
assert_eq!(sigs_and_slots.len(), txs.len());
if sigs_and_slots.is_empty() {
return;
}
let tpu_client = self.tpu_manager.clone();
let txs_sent = self.txs_sent.clone();
let quic_response = match tpu_client.try_send_wire_transaction_batch(txs).await {
Ok(_) => {
for (sig, _) in &sigs_and_slots {
txs_sent.insert(sig.to_owned(), TxProps::default());
}
// metrics
TXS_SENT.inc_by(sigs_and_slots.len() as f64);
1
}
Err(err) => {
warn!("{err}");
0
}
};
if let Some(postgres) = postgres {
let forwarded_slot = tpu_client.get_tpu_client().await.estimated_current_slot();
for (sig, recent_slot) in sigs_and_slots {
postgres
.send(PostgresMsg::PostgresTx(PostgresTx {
signature: sig.clone(),
recent_slot: recent_slot as i64,
forwarded_slot: forwarded_slot as i64,
processed_slot: None,
cu_consumed: None,
cu_requested: None,
quic_response,
}))
.expect("Error writing to postgres service");
}
} }
} }
@ -72,107 +120,49 @@ impl TxSender {
tx_send_interval: Duration, tx_send_interval: Duration,
postgres_send: Option<PostgresMpscSend>, postgres_send: Option<PostgresMpscSend>,
) -> JoinHandle<anyhow::Result<()>> { ) -> JoinHandle<anyhow::Result<()>> {
let (batch_send, batch_recv) = async_channel::unbounded();
for _i in 0..5 {
let this = self.clone();
let batch_recv = batch_recv.clone();
let postgres_send = postgres_send.clone();
tokio::spawn(async move {
while let Ok((sigs_and_slots, txs)) = batch_recv.recv().await {
this.forward_txs(sigs_and_slots, txs, postgres_send.clone())
.await;
}
});
}
tokio::spawn(async move { tokio::spawn(async move {
info!( info!(
"Batching tx(s) with batch size of {tx_batch_size} every {} ms", "Batching tx(s) with batch size of {tx_batch_size} every {}ms",
tx_send_interval.as_millis() tx_send_interval.as_millis()
); );
loop { loop {
let mut sigs_and_slots = Vec::with_capacity(tx_batch_size); let mut sigs_and_slots = Vec::with_capacity(tx_batch_size);
let mut txs = Vec::with_capacity(tx_batch_size); let mut txs = Vec::with_capacity(tx_batch_size);
let mut maybe_permit = None;
let counting_semaphore = self.counting_semaphore.clone();
while txs.len() <= tx_batch_size { while txs.len() <= tx_batch_size {
let res = tokio::time::timeout(tx_send_interval, recv.recv()).await; match recv.try_recv() {
match res { Ok((sig, tx, slot)) => {
Ok(value) => match value { sigs_and_slots.push((sig, slot));
Some((sig, tx, slot)) => { txs.push(tx);
sigs_and_slots.push((sig, slot)); }
txs.push(tx); Err(TryRecvError::Disconnected) => {
} bail!("Channel Disconnected");
None => { }
bail!("Channel Disconnected");
}
},
_ => { _ => {
let res = counting_semaphore.clone().try_acquire_owned(); break;
match res {
Ok(permit) => {
maybe_permit = Some(permit);
break;
}
Err(TryAcquireError::Closed) => {
bail!("Semaphone permit error");
}
Err(TryAcquireError::NoPermits) => {
// No permits continue to fetch transactions and batch them
}
}
} }
} }
} }
assert_eq!(sigs_and_slots.len(), txs.len());
if sigs_and_slots.is_empty() { batch_send.send((sigs_and_slots, txs)).await?;
continue;
}
let permit = match maybe_permit { tokio::time::sleep(tx_send_interval).await;
Some(permit) => permit,
None => {
// get the permit
counting_semaphore.acquire_owned().await.unwrap()
}
};
let postgres_send = postgres_send.clone();
let tpu_client = self.tpu_manager.clone();
let txs_sent = self.txs_sent.clone();
tokio::spawn(async move {
let semaphore_permit = permit;
for (sig, _) in &sigs_and_slots {
txs_sent.insert(sig.to_owned(), TxProps::default());
}
info!(
"sending {} transactions by tpu size {}",
txs.len(),
txs_sent.len()
);
let quic_response = {
let _semaphore_permit = semaphore_permit;
match tpu_client.try_send_wire_transaction_batch(txs).await {
Ok(_) => {
// metrics
TXS_SENT.inc_by(sigs_and_slots.len() as f64);
1
}
Err(err) => {
warn!("{err}");
0
}
}
};
if let Some(postgres) = postgres_send {
let forwarded_slot: u64 = tpu_client.estimated_current_slot().await;
for (sig, recent_slot) in sigs_and_slots {
postgres
.send(PostgresMsg::PostgresTx(PostgresTx {
signature: sig.clone(),
recent_slot: recent_slot as i64,
forwarded_slot: forwarded_slot as i64,
processed_slot: None,
cu_consumed: None,
cu_requested: None,
quic_response,
}))
.expect("Error writing to postgres service");
}
}
});
} }
}) })
} }