This commit is contained in:
aniketfuryrocks 2023-03-20 11:47:11 +05:30
parent 8e5cf8a567
commit c56b1902c2
No known key found for this signature in database
GPG Key ID: FA6BFCFAA7D4B764
3 changed files with 96 additions and 43 deletions

View File

@ -320,6 +320,7 @@ impl BlockListener {
Ok(())
}
pub fn listen(
self,
commitment_config: CommitmentConfig,
@ -350,9 +351,10 @@ impl BlockListener {
}
};
if let Err(_) = this
if this
.index_slot(slot, commitment_config, postgres.clone())
.await
.is_err()
{
// usually as we index all the slots even if they are not been processed we get some errors for slot
// as they are not in long term storage of the rpc // we check 5 times before ignoring the slot

View File

@ -1,6 +1,6 @@
use std::sync::Arc;
use std::{collections::VecDeque, sync::Arc, time::Duration};
use anyhow::{bail, Context, Ok};
use anyhow::{bail, Context};
use log::{info, warn};
use postgres_native_tls::MakeTlsConnector;
@ -55,7 +55,7 @@ pub struct PostgreAccountAddr {
#[derive(Debug)]
pub enum PostgresMsg {
PostgresTx(PostgresTx),
PostgresTx(Vec<PostgresTx>),
PostgresBlock(PostgresBlock),
PostgreAccountAddr(PostgreAccountAddr),
PostgresUpdateTx(PostgresUpdateTx, String),
@ -67,8 +67,10 @@ pub type PostgresMpscSend = UnboundedSender<PostgresMsg>;
pub struct PostgresSession {
client: Client,
insert_tx_statement: Statement,
update_tx_statement: Statement,
insert_5_batch_tx_statement: Statement,
insert_block_statement: Statement,
insert_5_batch_block_statement: Statement,
update_tx_statement: Statement,
}
impl PostgresSession {
@ -115,6 +117,21 @@ impl PostgresSession {
)
.await?;
let insert_5_batch_block_statement = client
.prepare(
r#"
INSERT INTO lite_rpc.Blocks
(slot, leader_id, parent_slot)
VALUES
($1, $2, $3),
($4, $5, $6),
($7, $8, $9),
($10, $11, $12),
($13, $14, $15)
"#,
)
.await?;
let insert_tx_statement = client.prepare(
r#"
INSERT INTO lite_rpc.Txs
@ -124,6 +141,19 @@ impl PostgresSession {
"#,
).await?;
let insert_5_batch_tx_statement = client.prepare(
r#"
INSERT INTO lite_rpc.Txs
(signature, recent_slot, forwarded_slot, processed_slot, cu_consumed, cu_requested, quic_response)
VALUES
($1, $2, $3, $4, $5, $6, $7),
($8, $10, $11, $12, $13, $14, $15),
($16, $17, $18, $19, $20, $21, $22),
($23, $24, $25, $26, $27, $28, $29),
($30, $31, $32, $33, $34, $36, $36)
"#,
).await?;
let update_tx_statement = client
.prepare(
r#"
@ -137,7 +167,9 @@ impl PostgresSession {
Ok(Self {
client,
insert_tx_statement,
insert_5_batch_tx_statement,
insert_block_statement,
insert_5_batch_block_statement,
update_tx_statement,
})
}
@ -234,24 +266,38 @@ impl Postgres {
tokio::spawn(async move {
info!("Writing to postgres");
while let Some(msg) = recv.recv().await {
MESSAGES_IN_POSTGRES_CHANNEL.dec();
let session = self.get_session().await?;
let mut tx_que = VecDeque::<PostgresTx>::new();
let mut block_que = VecDeque::new();
let Err(err) = (
match msg {
PostgresMsg::PostgresTx(tx) => session.send_tx(tx).await,
PostgresMsg::PostgresUpdateTx(tx, sig) => session.update_tx(tx, &sig).await,
PostgresMsg::PostgresBlock(block) => session.send_block(block).await,
PostgresMsg::PostgreAccountAddr(_) => todo!(),
} ) else {
continue;
};
loop {
let msg = recv.try_recv();
warn!("Error writing to postgres {err}");
match msg {
Ok(msg) => {
MESSAGES_IN_POSTGRES_CHANNEL.dec();
let session = self.get_session().await?;
match msg {
PostgresMsg::PostgresTx(mut tx) => tx_que.append(&mut tx),
PostgresMsg::PostgresBlock(block) => block_que.push_back(block),
PostgresMsg::PostgresUpdateTx(tx, sig) => {
if let Err(err) = session.update_tx(tx, &sig).await {
warn!("Error updating tx in postgres {err:?}");
}
}
PostgresMsg::PostgreAccountAddr(_) => todo!(),
}
}
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
bail!("Postgres channel broke")
}
}
while tx_que.len() % 5 != 0 {
let txs = tx_que.drain(0..5).collect();
}
}
bail!("Postgres channel closed")
})
}
}

View File

@ -21,7 +21,7 @@ use tokio::{
use crate::{
bridge::TXS_IN_CHANNEL,
tpu_manager::TpuManager,
workers::{PostgresMsg, PostgresTx, MESSAGES_IN_POSTGRES_CHANNEL},
workers::{PostgresMsg, PostgresTx},
};
use super::PostgresMpscSend;
@ -113,31 +113,35 @@ impl TxSender {
0
}
};
drop(permit);
if let Some(postgres) = postgres {
for (sig, recent_slot) in &sigs_and_slots {
MESSAGES_IN_POSTGRES_CHANNEL.inc();
postgres
.send(PostgresMsg::PostgresTx(PostgresTx {
signature: sig.clone(),
recent_slot: *recent_slot as i64,
forwarded_slot: forwarded_slot as i64,
processed_slot: None,
cu_consumed: None,
cu_requested: None,
quic_response,
}))
.expect("Error writing to postgres service");
}
}
histo_timer.observe_duration();
info!(
"It took {} ms to send a batch of {} transaction(s)",
start.elapsed().as_millis(),
sigs_and_slots.len()
);
if let Some(postgres) = postgres {
let txs = sigs_and_slots
.into_iter()
.map(|(signature, recent_slot)| PostgresTx {
signature,
recent_slot: recent_slot as i64,
forwarded_slot: forwarded_slot as i64,
processed_slot: None,
cu_consumed: None,
cu_requested: None,
quic_response,
})
.collect();
postgres
.send(PostgresMsg::PostgresTx(txs))
.expect("Error writing to postgres service");
}
}
/// retry and confirm transactions every 2ms (avg time to confirm tx)
@ -153,7 +157,9 @@ impl TxSender {
"Batching tx(s) with batch size of {tx_batch_size} every {}ms",
tx_send_interval.as_millis()
);
let semaphore = Arc::new(Semaphore::new(NUMBER_OF_TX_SENDERS));
loop {
let mut sigs_and_slots = Vec::with_capacity(tx_batch_size);
let mut txs = Vec::with_capacity(tx_batch_size);
@ -180,6 +186,7 @@ impl TxSender {
}
}
}
assert_eq!(sigs_and_slots.len(), txs.len());
if sigs_and_slots.is_empty() {
@ -194,7 +201,7 @@ impl TxSender {
}
};
if txs.len() > 0 {
if !txs.is_empty() {
TX_BATCH_SIZES.set(txs.len() as i64);
let postgres = postgres_send.clone();
let tx_sender = self.clone();
@ -212,10 +219,8 @@ impl TxSender {
let length_before = self.txs_sent_store.len();
self.txs_sent_store.retain(|_k, v| {
let retain = v.sent_at.elapsed() < ttl_duration;
if !retain {
if v.status.is_none() {
TX_TIMED_OUT.inc();
}
if !retain && v.status.is_none() {
TX_TIMED_OUT.inc();
}
retain
});