rebase unbounded

This commit is contained in:
aniketfuryrocks 2023-01-29 00:24:26 +05:30
parent d64d4194d0
commit b28cbff4cb
No known key found for this signature in database
GPG Key ID: FA6BFCFAA7D4B764
3 changed files with 70 additions and 63 deletions

View File

@ -4,7 +4,10 @@ use crate::{
postgres::Postgres,
rpc::LiteRpcServer,
tpu_manager::TpuManager,
workers::{BlockInformation, BlockListener, Cleaner, MetricsCapture, TxSender},
workers::{
BlockInformation, BlockListener, Cleaner, Metrics, MetricsCapture, TxSender,
WireTransaction,
},
};
use std::{ops::Deref, str::FromStr, sync::Arc, time::Duration};
@ -27,12 +30,18 @@ use solana_sdk::{
transaction::VersionedTransaction,
};
use solana_transaction_status::TransactionStatus;
use tokio::{net::ToSocketAddrs, task::JoinHandle};
use tokio::{
net::ToSocketAddrs,
sync::mpsc::{self, UnboundedSender},
task::JoinHandle,
};
/// A bridge between clients and tpu
pub struct LiteBridge {
pub rpc_client: Arc<RpcClient>,
pub tpu_manager: Arc<TpuManager>,
// None if LiteBridge is not executed
pub tx_send: Option<UnboundedSender<(String, WireTransaction, u64)>>,
pub tx_sender: TxSender,
pub finalized_block_listenser: BlockListener,
pub confirmed_block_listenser: BlockListener,
@ -68,6 +77,7 @@ impl LiteBridge {
Ok(Self {
rpc_client,
tpu_manager,
tx_send: None,
tx_sender,
finalized_block_listenser,
confirmed_block_listenser,
@ -84,7 +94,7 @@ impl LiteBridge {
/// List for `JsonRpc` requests
pub async fn start_services<T: ToSocketAddrs + std::fmt::Debug + 'static + Send + Clone>(
self,
mut self,
http_addr: T,
ws_addr: T,
tx_batch_size: usize,
@ -92,10 +102,13 @@ impl LiteBridge {
clean_interval: Duration,
postgres_config: &str,
) -> anyhow::Result<[JoinHandle<anyhow::Result<()>>; 8]> {
let (tx_send, tx_recv) = mpsc::unbounded_channel();
self.tx_send = Some(tx_send);
let tx_sender = self
.tx_sender
.clone()
.execute(tx_batch_size, tx_send_interval);
.execute(tx_recv, tx_batch_size, tx_send_interval);
let metrics_capture = MetricsCapture::new(self.tx_sender.clone());
let (postgres_connection, postgres) = Postgres::new(postgres_config).await?;
@ -193,9 +206,11 @@ impl LiteRpcServer for LiteBridge {
return Err(jsonrpsee::core::Error::Custom("Blockhash not found in confirmed block store".to_string()));
};
self.tx_sender
.enqnueue_tx(sig.to_string(), raw_tx, slot)
.await;
self.tx_send
.as_ref()
.expect("Lite Bridge Not Executed")
.send((sig.to_string(), raw_tx, slot))
.unwrap();
Ok(BinaryEncoding::Base58.encode(sig))
}

View File

@ -3,11 +3,15 @@ use std::{
time::{Duration, Instant},
};
use anyhow::bail;
use dashmap::DashMap;
use log::{info, warn};
use solana_transaction_status::TransactionStatus;
use tokio::{sync::RwLock, task::JoinHandle};
use tokio::{
sync::mpsc::{error::TryRecvError, UnboundedReceiver},
task::JoinHandle,
};
use crate::tpu_manager::TpuManager;
@ -18,8 +22,6 @@ pub type WireTransaction = Vec<u8>;
pub struct TxSender {
/// Tx(s) forwarded to tpu
pub txs_sent: Arc<DashMap<String, TxProps>>,
/// Transactions queue for retrying
enqueued_txs: Arc<RwLock<Vec<(String, WireTransaction, u64)>>>,
/// TpuClient to call the tpu port
tpu_manager: Arc<TpuManager>,
}
@ -49,51 +51,26 @@ impl Default for TxProps {
impl TxSender {
pub fn new(tpu_manager: Arc<TpuManager>) -> Self {
Self {
enqueued_txs: Default::default(),
tpu_manager,
txs_sent: Default::default(),
}
}
/// en-queue transaction if it doesn't already exist
pub async fn enqnueue_tx(&self, sig: String, raw_tx: WireTransaction, recent_slot: u64) {
self.enqueued_txs
.write()
.await
.push((sig, raw_tx, recent_slot));
}
/// retry enqued_tx(s)
pub async fn forward_txs(&self, tx_batch_size: usize) {
if self.enqueued_txs.read().await.is_empty() {
async fn forward_txs(&self, sigs: Vec<(String, u64)>, txs: Vec<WireTransaction>) {
assert_eq!(sigs.len(), txs.len());
if sigs.is_empty() {
return;
}
let mut enqueued_txs = Vec::new();
std::mem::swap(&mut enqueued_txs, &mut *self.enqueued_txs.write().await);
let mut tx_remaining = enqueued_txs.len();
let mut enqueued_txs = enqueued_txs.into_iter();
let tpu_client = self.tpu_manager.clone();
let txs_sent = self.txs_sent.clone();
tokio::spawn(async move {
while tx_remaining != 0 {
let mut batch = Vec::with_capacity(tx_batch_size);
let mut sigs_and_slots = Vec::with_capacity(tx_batch_size);
for (batched, (sig, tx, recent_slot)) in enqueued_txs.by_ref().enumerate() {
batch.push(tx);
sigs_and_slots.push((sig, recent_slot));
tx_remaining -= 1;
if batched == tx_batch_size {
break;
}
}
match tpu_client.try_send_wire_transaction_batch(batch).await {
match tpu_client.try_send_wire_transaction_batch(txs).await {
Ok(_) => {
for (sig, recent_slot) in sigs_and_slots {
for (sig, recent_slot) in sigs {
txs_sent.insert(
sig,
TxProps {
@ -107,19 +84,16 @@ impl TxSender {
warn!("{err}");
}
}
}
});
}
/// retry and confirm transactions every 2ms (avg time to confirm tx)
pub fn execute(
self,
mut recv: UnboundedReceiver<(String, WireTransaction, u64)>,
tx_batch_size: usize,
tx_send_interval: Duration,
) -> JoinHandle<anyhow::Result<()>> {
let mut interval = tokio::time::interval(tx_send_interval);
#[allow(unreachable_code)]
tokio::spawn(async move {
info!(
"Batching tx(s) with batch size of {tx_batch_size} every {}ms",
@ -127,12 +101,26 @@ impl TxSender {
);
loop {
interval.tick().await;
self.forward_txs(tx_batch_size).await;
let prev_inst = tokio::time::Instant::now();
let mut sigs_and_slots = Vec::with_capacity(tx_batch_size);
let mut txs = Vec::with_capacity(tx_batch_size);
while (prev_inst.elapsed() < tx_send_interval) || txs.len() == tx_batch_size {
match recv.try_recv() {
Ok((sig, tx, slot)) => {
sigs_and_slots.push((sig, slot));
txs.push(tx);
}
Err(TryRecvError::Disconnected) => {
bail!("Channel Disconnected");
}
_ => {}
}
}
// to give the correct type to JoinHandle
Ok(())
self.forward_txs(sigs_and_slots, txs).await;
}
})
}
}

View File

@ -13,6 +13,7 @@ use solana_client::nonblocking::{pubsub_client::PubsubClient, rpc_client::RpcCli
use solana_sdk::commitment_config::CommitmentConfig;
use solana_transaction_status::TransactionConfirmationStatus;
use tokio::sync::mpsc;
#[tokio::test]
async fn send_and_confirm_txs() {
@ -43,9 +44,12 @@ async fn send_and_confirm_txs() {
.await
.unwrap();
let (tx_send, tx_recv) = mpsc::unbounded_channel();
let services = try_join_all(vec![
block_listener.clone().listen(None),
tx_sender.clone().execute(
tx_recv,
DEFAULT_TX_BATCH_SIZE,
Duration::from_millis(DEFAULT_TX_BATCH_INTERVAL_MS),
),
@ -61,9 +65,9 @@ async fn send_and_confirm_txs() {
let tx = BinaryEncoding::Base58.encode(bincode::serialize(&tx).unwrap());
let sig = sig.to_string();
tx_sender
.enqnueue_tx(sig.clone(), tx.as_bytes().to_vec(), 0)
.await;
tx_send
.send((sig.clone(), tx.as_bytes().to_vec(), 0))
.unwrap();
for _ in 0..2 {
let tx_status = tx_sender.txs_sent.get(&sig).unwrap();