diff --git a/migrations/create.sql b/migrations/create.sql index 5463135f..58df0085 100644 --- a/migrations/create.sql +++ b/migrations/create.sql @@ -1,3 +1,5 @@ +CREATE SCHEMA lite_rpc; + CREATE TABLE lite_rpc.Txs ( id SERIAL NOT NULL PRIMARY KEY, signature CHAR(88) NOT NULL, diff --git a/src/workers/block_listenser.rs b/src/workers/block_listenser.rs index bcdf7ce6..5a83bf53 100644 --- a/src/workers/block_listenser.rs +++ b/src/workers/block_listenser.rs @@ -318,11 +318,13 @@ impl BlockListener { parent_slot: parent_slot as i64, })) .expect("Error sending block to postgres service"); + MESSAGES_IN_POSTGRES_CHANNEL.inc(); } Ok(()) } + pub fn listen( self, commitment_config: CommitmentConfig, @@ -353,9 +355,10 @@ impl BlockListener { } }; - if let Err(_) = this + if this .index_slot(slot, commitment_config, postgres.clone()) .await + .is_err() { // usually as we index all the slots even if they are not been processed we get some errors for slot // as they are not in long term storage of the rpc // we check 5 times before ignoring the slot diff --git a/src/workers/postgres.rs b/src/workers/postgres.rs index 4df3d8ad..4af9869b 100644 --- a/src/workers/postgres.rs +++ b/src/workers/postgres.rs @@ -1,8 +1,8 @@ -use std::{sync::Arc, time::Duration}; - use anyhow::{bail, Context}; +use futures::{future::join_all, join}; use log::{info, warn}; use postgres_native_tls::MakeTlsConnector; +use std::{sync::Arc, time::Duration}; use prometheus::{core::GenericGauge, opts, register_int_gauge}; use tokio::{ @@ -12,7 +12,9 @@ use tokio::{ }, task::JoinHandle, }; -use tokio_postgres::{Client, Statement}; +use tokio_postgres::{ + config::SslMode, tls::MakeTlsConnect, types::ToSql, Client, NoTls, Socket, Statement, +}; use native_tls::{Certificate, Identity, TlsConnector}; @@ -55,7 +57,7 @@ pub struct PostgreAccountAddr { #[derive(Debug)] pub enum PostgresMsg { - PostgresTx(PostgresTx), + PostgresTx(Vec), PostgresBlock(PostgresBlock), PostgreAccountAddr(PostgreAccountAddr), PostgresUpdateTx(PostgresUpdateTx, String), @@ -66,63 +68,42 @@ pub type PostgresMpscSend = UnboundedSender; pub struct PostgresSession { client: Client, - insert_tx_statement: Statement, update_tx_statement: Statement, - insert_block_statement: Statement, } impl PostgresSession { pub async fn new() -> anyhow::Result { - let ca_pem_b64 = std::env::var("CA_PEM_B64").context("env CA_PEM_B64 not found")?; - let client_pks_b64 = - std::env::var("CLIENT_PKS_B64").context("env CLIENT_PKS_B64 not found")?; - let client_pks_password = - std::env::var("CLIENT_PKS_PASS").context("env CLIENT_PKS_PASS not found")?; let pg_config = std::env::var("PG_CONFIG").context("env PG_CONFIG not found")?; + let pg_config = pg_config.parse::()?; - let ca_pem = BinaryEncoding::Base64 - .decode(ca_pem_b64) - .context("ca pem decode")?; + let client = if let SslMode::Disable = pg_config.get_ssl_mode() { + Self::spawn_connection(pg_config, NoTls).await? + } else { + let ca_pem_b64 = std::env::var("CA_PEM_B64").context("env CA_PEM_B64 not found")?; + let client_pks_b64 = + std::env::var("CLIENT_PKS_B64").context("env CLIENT_PKS_B64 not found")?; + let client_pks_password = + std::env::var("CLIENT_PKS_PASS").context("env CLIENT_PKS_PASS not found")?; - let client_pks = BinaryEncoding::Base64 - .decode(client_pks_b64) - .context("client pks decode")?; + let ca_pem = BinaryEncoding::Base64 + .decode(ca_pem_b64) + .context("ca pem decode")?; - let connector = TlsConnector::builder() - .add_root_certificate(Certificate::from_pem(&ca_pem)?) - .identity(Identity::from_pkcs12(&client_pks, &client_pks_password).context("Identity")?) - .danger_accept_invalid_hostnames(true) - .danger_accept_invalid_certs(true) - .build()?; + let client_pks = BinaryEncoding::Base64 + .decode(client_pks_b64) + .context("client pks decode")?; - let connector = MakeTlsConnector::new(connector); - let (client, connection) = tokio_postgres::connect(&pg_config, connector.clone()).await?; + let connector = TlsConnector::builder() + .add_root_certificate(Certificate::from_pem(&ca_pem)?) + .identity( + Identity::from_pkcs12(&client_pks, &client_pks_password).context("Identity")?, + ) + .danger_accept_invalid_hostnames(true) + .danger_accept_invalid_certs(true) + .build()?; - tokio::spawn(async move { - if let Err(err) = connection.await { - log::error!("Connection to Postgres broke {err:?}"); - }; - }); - - let insert_block_statement = client - .prepare( - r#" - INSERT INTO lite_rpc.Blocks - (slot, leader_id, parent_slot) - VALUES - ($1, $2, $3) - "#, - ) - .await?; - - let insert_tx_statement = client.prepare( - r#" - INSERT INTO lite_rpc.Txs - (signature, recent_slot, forwarded_slot, processed_slot, cu_consumed, cu_requested, quic_response) - VALUES - ($1, $2, $3, $4, $5, $6, $7) - "#, - ).await?; + Self::spawn_connection(pg_config, MakeTlsConnector::new(connector)).await? + }; let update_tx_statement = client .prepare( @@ -136,59 +117,134 @@ impl PostgresSession { Ok(Self { client, - insert_tx_statement, - insert_block_statement, update_tx_statement, }) } - pub async fn send_block(&self, block: PostgresBlock) -> anyhow::Result<()> { - let PostgresBlock { - slot, - leader_id, - parent_slot, - } = block; + async fn spawn_connection( + pg_config: tokio_postgres::Config, + connector: T, + ) -> anyhow::Result + where + T: MakeTlsConnect + Send + 'static, + >::Stream: Send, + { + let (client, connection) = pg_config + .connect(connector) + .await + .context("Connecting to Postgres failed")?; - self.client - .execute( - &self.insert_block_statement, - &[&slot, &leader_id, &parent_slot], - ) - .await?; + tokio::spawn(async move { + if let Err(err) = connection.await { + log::error!("Connection to Postgres broke {err:?}"); + } + }); + + Ok(client) + } + + pub fn multiline_query(query: &mut String, args: usize, rows: usize) { + let mut arg_index = 1usize; + for row in 0..rows { + query.push('('); + + for i in 0..args { + query.push_str(&format!("${arg_index}")); + arg_index += 1; + if i != (args - 1) { + query.push(','); + } + } + + query.push(')'); + + if row != (rows - 1) { + query.push(','); + } + } + } + + pub async fn send_txs(&self, txs: &[PostgresTx]) -> anyhow::Result<()> { + const NUMBER_OF_ARGS: usize = 7; + + if txs.is_empty() { + return Ok(()); + } + + let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * txs.len()); + + for tx in txs.iter() { + let PostgresTx { + signature, + recent_slot, + forwarded_slot, + processed_slot, + cu_consumed, + cu_requested, + quic_response, + } = tx; + + args.push(signature); + args.push(recent_slot); + args.push(forwarded_slot); + args.push(processed_slot); + args.push(cu_consumed); + args.push(cu_requested); + args.push(quic_response); + } + + let mut query = String::from( + r#" + INSERT INTO lite_rpc.Txs + (signature, recent_slot, forwarded_slot, processed_slot, cu_consumed, cu_requested, quic_response) + VALUES + "#, + ); + + Self::multiline_query(&mut query, NUMBER_OF_ARGS, txs.len()); + + self.client.execute(&query, &args).await?; Ok(()) } - pub async fn send_tx(&self, tx: PostgresTx) -> anyhow::Result<()> { - let PostgresTx { - signature, - recent_slot, - forwarded_slot, - processed_slot, - cu_consumed, - cu_requested, - quic_response, - } = tx; + pub async fn send_blocks(&self, blocks: &[PostgresBlock]) -> anyhow::Result<()> { + const NUMBER_OF_ARGS: usize = 3; - self.client - .execute( - &self.insert_tx_statement, - &[ - &signature, - &recent_slot, - &forwarded_slot, - &processed_slot, - &cu_consumed, - &cu_requested, - &quic_response, - ], - ) - .await?; + if blocks.is_empty() { + return Ok(()); + } + + let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * blocks.len()); + + for block in blocks.iter() { + let PostgresBlock { + slot, + leader_id, + parent_slot, + } = block; + + args.push(slot); + args.push(leader_id); + args.push(parent_slot); + } + + let mut query = String::from( + r#" + INSERT INTO lite_rpc.Blocks + (slot, leader_id, parent_slot) + VALUES + "#, + ); + + Self::multiline_query(&mut query, NUMBER_OF_ARGS, blocks.len()); + + self.client.execute(&query, &args).await?; Ok(()) } - pub async fn update_tx(&self, tx: PostgresUpdateTx, signature: &str) -> anyhow::Result<()> { + pub async fn update_tx(&self, tx: &PostgresUpdateTx, signature: &str) -> anyhow::Result<()> { let PostgresUpdateTx { processed_slot, cu_consumed, @@ -198,7 +254,7 @@ impl PostgresSession { self.client .execute( &self.update_tx_statement, - &[&processed_slot, &cu_consumed, &cu_requested, &signature], + &[processed_slot, cu_consumed, cu_requested, &signature], ) .await?; @@ -234,7 +290,34 @@ impl Postgres { tokio::spawn(async move { info!("Writing to postgres"); - while let Some(msg) = recv.recv().await { + let mut tx_que = Vec::::new(); + let mut block_que = Vec::new(); + let mut update_que = Vec::new(); + + loop { + loop { + let msg = recv.try_recv(); + + match msg { + Ok(msg) => { + MESSAGES_IN_POSTGRES_CHANNEL.dec(); + + match msg { + PostgresMsg::PostgresTx(mut tx) => tx_que.append(&mut tx), + PostgresMsg::PostgresBlock(block) => block_que.push(block), + PostgresMsg::PostgresUpdateTx(tx, sig) => { + update_que.push((tx, sig)) + } + PostgresMsg::PostgreAccountAddr(_) => todo!(), + } + } + Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break, + Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => { + bail!("Postgres channel broke") + } + } + } + let Ok(session) = self.get_session().await else { const TIME_OUT:Duration = Duration::from_millis(1000); warn!("Unable to get postgres session. Retrying in {TIME_OUT:?}"); @@ -242,22 +325,71 @@ impl Postgres { continue; }; - MESSAGES_IN_POSTGRES_CHANNEL.dec(); + let tx_update_fut = update_que + .iter() + .map(|(tx, sig)| session.update_tx(tx, sig)); - let Err(err) = ( - match msg { - PostgresMsg::PostgresTx(tx) => session.send_tx(tx).await, - PostgresMsg::PostgresUpdateTx(tx, sig) => session.update_tx(tx, &sig).await, - PostgresMsg::PostgresBlock(block) => session.send_block(block).await, - PostgresMsg::PostgreAccountAddr(_) => todo!(), - } ) else { - continue; - }; + let (res_txs, res_block, res_tx_update) = join!( + session.send_txs(&tx_que), + session.send_blocks(&block_que), + join_all(tx_update_fut) + ); - warn!("Error writing to postgres {err}"); + if let Err(err) = res_txs { + warn!("Error sending tx batch to postgres {err:?}"); + } else { + tx_que.clear(); + } + + if let Err(err) = res_block { + warn!("Error sending block batch to postgres {err:?}"); + } else { + block_que.clear(); + } + + let mut update_que_iter = update_que.into_iter(); + update_que = res_tx_update + .iter() + .filter_map(|res| { + let item = update_que_iter.next(); + if let Err(err) = res { + warn!("Error updating tx to postgres {err:?}"); + return item; + } + None + }) + .collect(); + + //{ + // let mut batcher = + // Batcher::new(&mut tx_que, MAX_BATCH_SIZE, BatcherStrategy::Start); + + // while let Some(txs) = batcher.next_batch() { + // if let Err(err) = session.send_txs(txs).await { + // warn!("Error sending tx batch to postgres {err:?}"); + // } + // } + //} + + //{ + // let mut batcher = + // Batcher::new(&mut block_que, MAX_BATCH_SIZE, BatcherStrategy::Start); + + // while let Some(txs) = batcher.next_batch() { + // if let Err(err) = session.send_blocks(txs).await { + // warn!("Error sending block batch to postgres {err:?}"); + // } + // } + //} } - - bail!("Postgres channel closed") }) } } + +#[test] +fn multiline_query_test() { + let mut query = String::new(); + + PostgresSession::multiline_query(&mut query, 3, 2); + assert_eq!(query, "($1,$2,$3),($4,$5,$6)"); +} diff --git a/src/workers/tx_sender.rs b/src/workers/tx_sender.rs index 00db0a70..a3024472 100644 --- a/src/workers/tx_sender.rs +++ b/src/workers/tx_sender.rs @@ -130,22 +130,28 @@ impl TxSender { tasks_counter.fetch_sub(1, Ordering::Relaxed); if let Some(postgres) = postgres { - for (sig, recent_slot) in &sigs_and_slots { - MESSAGES_IN_POSTGRES_CHANNEL.inc(); - postgres - .send(PostgresMsg::PostgresTx(PostgresTx { - signature: sig.clone(), - recent_slot: *recent_slot as i64, - forwarded_slot: forwarded_slot as i64, - processed_slot: None, - cu_consumed: None, - cu_requested: None, - quic_response, - })) - .expect("Error writing to postgres service"); - } + let postgres_msgs = sigs_and_slots + .iter() + .map(|(sig, recent_slot)| PostgresTx { + signature: sig.clone(), + recent_slot: *recent_slot as i64, + forwarded_slot: forwarded_slot as i64, + processed_slot: None, + cu_consumed: None, + cu_requested: None, + quic_response, + }) + .collect(); + + postgres + .send(PostgresMsg::PostgresTx(postgres_msgs)) + .expect("Error writing to postgres service"); + + MESSAGES_IN_POSTGRES_CHANNEL.inc(); } + histo_timer.observe_duration(); + info!( "It took {} ms to send a batch of {} transaction(s)", start.elapsed().as_millis(), @@ -177,6 +183,7 @@ impl TxSender { "Batching tx(s) with batch size of {tx_batch_size} every {}ms", tx_send_interval.as_millis() ); + let semaphore = Arc::new(Semaphore::new(NUMBER_OF_TX_SENDERS)); // To limit the maximum number of background tasks sending transactions to MAX_NUMBER_OF_TOKIO_TASKS_SENDING_TXS @@ -217,6 +224,7 @@ impl TxSender { } } } + assert_eq!(sigs_and_slots.len(), txs.len()); if sigs_and_slots.is_empty() { @@ -231,7 +239,7 @@ impl TxSender { } }; - if txs.len() > 0 { + if !txs.is_empty() { TX_BATCH_SIZES.set(txs.len() as i64); let postgres = postgres_send.clone(); let tx_sender = self.clone(); @@ -255,10 +263,8 @@ impl TxSender { let length_before = self.txs_sent_store.len(); self.txs_sent_store.retain(|_k, v| { let retain = v.sent_at.elapsed() < ttl_duration; - if !retain { - if v.status.is_none() { - TX_TIMED_OUT.inc(); - } + if !retain && v.status.is_none() { + TX_TIMED_OUT.inc(); } retain }); diff --git a/tests/postgres.bash b/tests/postgres.bash new file mode 100755 index 00000000..bd2f1067 --- /dev/null +++ b/tests/postgres.bash @@ -0,0 +1,35 @@ +#!/bin/sh + +# env variables +export PGPASSWORD="password" +export PG_CONFIG="host=localhost dbname=postgres user=postgres password=password sslmode=disable" + +# functions +pg_run() { + psql -h localhost -U postgres -d postgres -a "$@" +} + +# create and start docker +docker create --name test-postgres -e POSTGRES_PASSWORD=password -p 5432:5432 postgres:latest || true +docker start test-postgres + +echo "Clearing database" +pg_run -f ../migrations/rm.sql +pg_run -f ../migrations/create.sql + +echo "Starting lite-rpc" +cargo run --release -- -p & + +echo "Waiting 5 seconds for lite-rpc to start" +sleep 5 + +echo "Sending 10 txs" +cd ../bench && cargo run --release -- -t 10 + +echo "Killing lite-rpc" +kill "$(jobs -p)" + +echo "Fetching database values" +pg_run -c "SELECT * FROM lite_rpc.txs;" +pg_run -c "SELECT * FROM lite_rpc.blocks;" +