From c56b1902c22e49e91972a35955780d20498889c3 Mon Sep 17 00:00:00 2001 From: aniketfuryrocks Date: Mon, 20 Mar 2023 11:47:11 +0530 Subject: [PATCH] wip --- src/workers/block_listenser.rs | 4 +- src/workers/postgres.rs | 84 ++++++++++++++++++++++++++-------- src/workers/tx_sender.rs | 51 +++++++++++---------- 3 files changed, 96 insertions(+), 43 deletions(-) diff --git a/src/workers/block_listenser.rs b/src/workers/block_listenser.rs index 8b4e9f2f..0f0a5730 100644 --- a/src/workers/block_listenser.rs +++ b/src/workers/block_listenser.rs @@ -320,6 +320,7 @@ impl BlockListener { Ok(()) } + pub fn listen( self, commitment_config: CommitmentConfig, @@ -350,9 +351,10 @@ impl BlockListener { } }; - if let Err(_) = this + if this .index_slot(slot, commitment_config, postgres.clone()) .await + .is_err() { // usually as we index all the slots even if they are not been processed we get some errors for slot // as they are not in long term storage of the rpc // we check 5 times before ignoring the slot diff --git a/src/workers/postgres.rs b/src/workers/postgres.rs index 997e7beb..8b4d545b 100644 --- a/src/workers/postgres.rs +++ b/src/workers/postgres.rs @@ -1,6 +1,6 @@ -use std::sync::Arc; +use std::{collections::VecDeque, sync::Arc, time::Duration}; -use anyhow::{bail, Context, Ok}; +use anyhow::{bail, Context}; use log::{info, warn}; use postgres_native_tls::MakeTlsConnector; @@ -55,7 +55,7 @@ pub struct PostgreAccountAddr { #[derive(Debug)] pub enum PostgresMsg { - PostgresTx(PostgresTx), + PostgresTx(Vec), PostgresBlock(PostgresBlock), PostgreAccountAddr(PostgreAccountAddr), PostgresUpdateTx(PostgresUpdateTx, String), @@ -67,8 +67,10 @@ pub type PostgresMpscSend = UnboundedSender; pub struct PostgresSession { client: Client, insert_tx_statement: Statement, - update_tx_statement: Statement, + insert_5_batch_tx_statement: Statement, insert_block_statement: Statement, + insert_5_batch_block_statement: Statement, + update_tx_statement: Statement, } impl PostgresSession { @@ -115,6 +117,21 @@ impl PostgresSession { ) .await?; + let insert_5_batch_block_statement = client + .prepare( + r#" + INSERT INTO lite_rpc.Blocks + (slot, leader_id, parent_slot) + VALUES + ($1, $2, $3), + ($4, $5, $6), + ($7, $8, $9), + ($10, $11, $12), + ($13, $14, $15) + "#, + ) + .await?; + let insert_tx_statement = client.prepare( r#" INSERT INTO lite_rpc.Txs @@ -124,6 +141,19 @@ impl PostgresSession { "#, ).await?; + let insert_5_batch_tx_statement = client.prepare( + r#" + INSERT INTO lite_rpc.Txs + (signature, recent_slot, forwarded_slot, processed_slot, cu_consumed, cu_requested, quic_response) + VALUES + ($1, $2, $3, $4, $5, $6, $7), + ($8, $10, $11, $12, $13, $14, $15), + ($16, $17, $18, $19, $20, $21, $22), + ($23, $24, $25, $26, $27, $28, $29), + ($30, $31, $32, $33, $34, $36, $36) + "#, + ).await?; + let update_tx_statement = client .prepare( r#" @@ -137,7 +167,9 @@ impl PostgresSession { Ok(Self { client, insert_tx_statement, + insert_5_batch_tx_statement, insert_block_statement, + insert_5_batch_block_statement, update_tx_statement, }) } @@ -234,24 +266,38 @@ impl Postgres { tokio::spawn(async move { info!("Writing to postgres"); - while let Some(msg) = recv.recv().await { - MESSAGES_IN_POSTGRES_CHANNEL.dec(); - let session = self.get_session().await?; + let mut tx_que = VecDeque::::new(); + let mut block_que = VecDeque::new(); - let Err(err) = ( - match msg { - PostgresMsg::PostgresTx(tx) => session.send_tx(tx).await, - PostgresMsg::PostgresUpdateTx(tx, sig) => session.update_tx(tx, &sig).await, - PostgresMsg::PostgresBlock(block) => session.send_block(block).await, - PostgresMsg::PostgreAccountAddr(_) => todo!(), - } ) else { - continue; - }; + loop { + let msg = recv.try_recv(); - warn!("Error writing to postgres {err}"); + match msg { + Ok(msg) => { + MESSAGES_IN_POSTGRES_CHANNEL.dec(); + let session = self.get_session().await?; + + match msg { + PostgresMsg::PostgresTx(mut tx) => tx_que.append(&mut tx), + PostgresMsg::PostgresBlock(block) => block_que.push_back(block), + PostgresMsg::PostgresUpdateTx(tx, sig) => { + if let Err(err) = session.update_tx(tx, &sig).await { + warn!("Error updating tx in postgres {err:?}"); + } + } + PostgresMsg::PostgreAccountAddr(_) => todo!(), + } + } + Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (), + Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => { + bail!("Postgres channel broke") + } + } + + while tx_que.len() % 5 != 0 { + let txs = tx_que.drain(0..5).collect(); + } } - - bail!("Postgres channel closed") }) } } diff --git a/src/workers/tx_sender.rs b/src/workers/tx_sender.rs index f92dc152..05ada758 100644 --- a/src/workers/tx_sender.rs +++ b/src/workers/tx_sender.rs @@ -21,7 +21,7 @@ use tokio::{ use crate::{ bridge::TXS_IN_CHANNEL, tpu_manager::TpuManager, - workers::{PostgresMsg, PostgresTx, MESSAGES_IN_POSTGRES_CHANNEL}, + workers::{PostgresMsg, PostgresTx}, }; use super::PostgresMpscSend; @@ -113,31 +113,35 @@ impl TxSender { 0 } }; + drop(permit); - if let Some(postgres) = postgres { - for (sig, recent_slot) in &sigs_and_slots { - MESSAGES_IN_POSTGRES_CHANNEL.inc(); - postgres - .send(PostgresMsg::PostgresTx(PostgresTx { - signature: sig.clone(), - recent_slot: *recent_slot as i64, - forwarded_slot: forwarded_slot as i64, - processed_slot: None, - cu_consumed: None, - cu_requested: None, - quic_response, - })) - .expect("Error writing to postgres service"); - } - } - histo_timer.observe_duration(); + info!( "It took {} ms to send a batch of {} transaction(s)", start.elapsed().as_millis(), sigs_and_slots.len() ); + + if let Some(postgres) = postgres { + let txs = sigs_and_slots + .into_iter() + .map(|(signature, recent_slot)| PostgresTx { + signature, + recent_slot: recent_slot as i64, + forwarded_slot: forwarded_slot as i64, + processed_slot: None, + cu_consumed: None, + cu_requested: None, + quic_response, + }) + .collect(); + + postgres + .send(PostgresMsg::PostgresTx(txs)) + .expect("Error writing to postgres service"); + } } /// retry and confirm transactions every 2ms (avg time to confirm tx) @@ -153,7 +157,9 @@ impl TxSender { "Batching tx(s) with batch size of {tx_batch_size} every {}ms", tx_send_interval.as_millis() ); + let semaphore = Arc::new(Semaphore::new(NUMBER_OF_TX_SENDERS)); + loop { let mut sigs_and_slots = Vec::with_capacity(tx_batch_size); let mut txs = Vec::with_capacity(tx_batch_size); @@ -180,6 +186,7 @@ impl TxSender { } } } + assert_eq!(sigs_and_slots.len(), txs.len()); if sigs_and_slots.is_empty() { @@ -194,7 +201,7 @@ impl TxSender { } }; - if txs.len() > 0 { + if !txs.is_empty() { TX_BATCH_SIZES.set(txs.len() as i64); let postgres = postgres_send.clone(); let tx_sender = self.clone(); @@ -212,10 +219,8 @@ impl TxSender { let length_before = self.txs_sent_store.len(); self.txs_sent_store.retain(|_k, v| { let retain = v.sent_at.elapsed() < ttl_duration; - if !retain { - if v.status.is_none() { - TX_TIMED_OUT.inc(); - } + if !retain && v.status.is_none() { + TX_TIMED_OUT.inc(); } retain });