diff --git a/migrations/create.sql b/migrations/create.sql index 5463135f..58df0085 100644 --- a/migrations/create.sql +++ b/migrations/create.sql @@ -1,3 +1,5 @@ +CREATE SCHEMA lite_rpc; + CREATE TABLE lite_rpc.Txs ( id SERIAL NOT NULL PRIMARY KEY, signature CHAR(88) NOT NULL, diff --git a/src/workers/postgres.rs b/src/workers/postgres.rs index 353f95d2..4af9869b 100644 --- a/src/workers/postgres.rs +++ b/src/workers/postgres.rs @@ -1,5 +1,5 @@ use anyhow::{bail, Context}; -use futures::join; +use futures::{future::join_all, join}; use log::{info, warn}; use postgres_native_tls::MakeTlsConnector; use std::{sync::Arc, time::Duration}; @@ -145,7 +145,7 @@ impl PostgresSession { pub fn multiline_query(query: &mut String, args: usize, rows: usize) { let mut arg_index = 1usize; - for _ in 0..rows { + for row in 0..rows { query.push('('); for i in 0..args { @@ -157,12 +157,20 @@ impl PostgresSession { } query.push(')'); + + if row != (rows - 1) { + query.push(','); + } } } pub async fn send_txs(&self, txs: &[PostgresTx]) -> anyhow::Result<()> { const NUMBER_OF_ARGS: usize = 7; + if txs.is_empty() { + return Ok(()); + } + let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * txs.len()); for tx in txs.iter() { @@ -203,6 +211,10 @@ impl PostgresSession { pub async fn send_blocks(&self, blocks: &[PostgresBlock]) -> anyhow::Result<()> { const NUMBER_OF_ARGS: usize = 3; + if blocks.is_empty() { + return Ok(()); + } + let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * blocks.len()); for block in blocks.iter() { @@ -232,7 +244,7 @@ impl PostgresSession { Ok(()) } - pub async fn update_tx(&self, tx: PostgresUpdateTx, signature: &str) -> anyhow::Result<()> { + pub async fn update_tx(&self, tx: &PostgresUpdateTx, signature: &str) -> anyhow::Result<()> { let PostgresUpdateTx { processed_slot, cu_consumed, @@ -242,7 +254,7 @@ impl PostgresSession { self.client .execute( &self.update_tx_statement, - &[&processed_slot, &cu_consumed, &cu_requested, &signature], + &[processed_slot, cu_consumed, cu_requested, &signature], ) .await?; @@ -280,9 +292,32 @@ impl Postgres { let mut tx_que = Vec::::new(); let mut block_que = Vec::new(); + let mut update_que = Vec::new(); loop { - let msg = recv.try_recv(); + loop { + let msg = recv.try_recv(); + + match msg { + Ok(msg) => { + MESSAGES_IN_POSTGRES_CHANNEL.dec(); + + match msg { + PostgresMsg::PostgresTx(mut tx) => tx_que.append(&mut tx), + PostgresMsg::PostgresBlock(block) => block_que.push(block), + PostgresMsg::PostgresUpdateTx(tx, sig) => { + update_que.push((tx, sig)) + } + PostgresMsg::PostgreAccountAddr(_) => todo!(), + } + } + Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break, + Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => { + bail!("Postgres channel broke") + } + } + } + let Ok(session) = self.get_session().await else { const TIME_OUT:Duration = Duration::from_millis(1000); warn!("Unable to get postgres session. Retrying in {TIME_OUT:?}"); @@ -290,29 +325,15 @@ impl Postgres { continue; }; - match msg { - Ok(msg) => { - MESSAGES_IN_POSTGRES_CHANNEL.dec(); + let tx_update_fut = update_que + .iter() + .map(|(tx, sig)| session.update_tx(tx, sig)); - match msg { - PostgresMsg::PostgresTx(mut tx) => tx_que.append(&mut tx), - PostgresMsg::PostgresBlock(block) => block_que.push(block), - PostgresMsg::PostgresUpdateTx(tx, sig) => { - if let Err(err) = session.update_tx(tx, &sig).await { - warn!("Error updating tx in postgres {err:?}"); - } - } - PostgresMsg::PostgreAccountAddr(_) => todo!(), - } - } - Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (), - Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => { - bail!("Postgres channel broke") - } - } - - let (res_txs, res_block) = - join!(session.send_txs(&tx_que), session.send_blocks(&block_que)); + let (res_txs, res_block, res_tx_update) = join!( + session.send_txs(&tx_que), + session.send_blocks(&block_que), + join_all(tx_update_fut) + ); if let Err(err) = res_txs { warn!("Error sending tx batch to postgres {err:?}"); @@ -323,9 +344,22 @@ impl Postgres { if let Err(err) = res_block { warn!("Error sending block batch to postgres {err:?}"); } else { - tx_que.clear(); + block_que.clear(); } + let mut update_que_iter = update_que.into_iter(); + update_que = res_tx_update + .iter() + .filter_map(|res| { + let item = update_que_iter.next(); + if let Err(err) = res { + warn!("Error updating tx to postgres {err:?}"); + return item; + } + None + }) + .collect(); + //{ // let mut batcher = // Batcher::new(&mut tx_que, MAX_BATCH_SIZE, BatcherStrategy::Start); @@ -357,5 +391,5 @@ fn multiline_query_test() { let mut query = String::new(); PostgresSession::multiline_query(&mut query, 3, 2); - assert_eq!(query, "($1,$2,$3)($4,$5,$6)"); + assert_eq!(query, "($1,$2,$3),($4,$5,$6)"); } diff --git a/tests/postgres.bash b/tests/postgres.bash new file mode 100755 index 00000000..bd2f1067 --- /dev/null +++ b/tests/postgres.bash @@ -0,0 +1,35 @@ +#!/bin/sh + +# env variables +export PGPASSWORD="password" +export PG_CONFIG="host=localhost dbname=postgres user=postgres password=password sslmode=disable" + +# functions +pg_run() { + psql -h localhost -U postgres -d postgres -a "$@" +} + +# create and start docker +docker create --name test-postgres -e POSTGRES_PASSWORD=password -p 5432:5432 postgres:latest || true +docker start test-postgres + +echo "Clearing database" +pg_run -f ../migrations/rm.sql +pg_run -f ../migrations/create.sql + +echo "Starting lite-rpc" +cargo run --release -- -p & + +echo "Waiting 5 seconds for lite-rpc to start" +sleep 5 + +echo "Sending 10 txs" +cd ../bench && cargo run --release -- -t 10 + +echo "Killing lite-rpc" +kill "$(jobs -p)" + +echo "Fetching database values" +pg_run -c "SELECT * FROM lite_rpc.txs;" +pg_run -c "SELECT * FROM lite_rpc.blocks;" +