This commit is contained in:
aniketfuryrocks 2023-03-31 23:35:17 +05:30
parent a25c2cdfea
commit 8aaaf34c59
No known key found for this signature in database
GPG Key ID: FA6BFCFAA7D4B764
3 changed files with 100 additions and 29 deletions

View File

@ -1,3 +1,5 @@
CREATE SCHEMA lite_rpc;
CREATE TABLE lite_rpc.Txs (
id SERIAL NOT NULL PRIMARY KEY,
signature CHAR(88) NOT NULL,

View File

@ -1,5 +1,5 @@
use anyhow::{bail, Context};
use futures::join;
use futures::{future::join_all, join};
use log::{info, warn};
use postgres_native_tls::MakeTlsConnector;
use std::{sync::Arc, time::Duration};
@ -145,7 +145,7 @@ impl PostgresSession {
pub fn multiline_query(query: &mut String, args: usize, rows: usize) {
let mut arg_index = 1usize;
for _ in 0..rows {
for row in 0..rows {
query.push('(');
for i in 0..args {
@ -157,12 +157,20 @@ impl PostgresSession {
}
query.push(')');
if row != (rows - 1) {
query.push(',');
}
}
}
pub async fn send_txs(&self, txs: &[PostgresTx]) -> anyhow::Result<()> {
const NUMBER_OF_ARGS: usize = 7;
if txs.is_empty() {
return Ok(());
}
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * txs.len());
for tx in txs.iter() {
@ -203,6 +211,10 @@ impl PostgresSession {
pub async fn send_blocks(&self, blocks: &[PostgresBlock]) -> anyhow::Result<()> {
const NUMBER_OF_ARGS: usize = 3;
if blocks.is_empty() {
return Ok(());
}
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * blocks.len());
for block in blocks.iter() {
@ -232,7 +244,7 @@ impl PostgresSession {
Ok(())
}
pub async fn update_tx(&self, tx: PostgresUpdateTx, signature: &str) -> anyhow::Result<()> {
pub async fn update_tx(&self, tx: &PostgresUpdateTx, signature: &str) -> anyhow::Result<()> {
let PostgresUpdateTx {
processed_slot,
cu_consumed,
@ -242,7 +254,7 @@ impl PostgresSession {
self.client
.execute(
&self.update_tx_statement,
&[&processed_slot, &cu_consumed, &cu_requested, &signature],
&[processed_slot, cu_consumed, cu_requested, &signature],
)
.await?;
@ -280,9 +292,32 @@ impl Postgres {
let mut tx_que = Vec::<PostgresTx>::new();
let mut block_que = Vec::new();
let mut update_que = Vec::new();
loop {
let msg = recv.try_recv();
loop {
let msg = recv.try_recv();
match msg {
Ok(msg) => {
MESSAGES_IN_POSTGRES_CHANNEL.dec();
match msg {
PostgresMsg::PostgresTx(mut tx) => tx_que.append(&mut tx),
PostgresMsg::PostgresBlock(block) => block_que.push(block),
PostgresMsg::PostgresUpdateTx(tx, sig) => {
update_que.push((tx, sig))
}
PostgresMsg::PostgreAccountAddr(_) => todo!(),
}
}
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
bail!("Postgres channel broke")
}
}
}
let Ok(session) = self.get_session().await else {
const TIME_OUT:Duration = Duration::from_millis(1000);
warn!("Unable to get postgres session. Retrying in {TIME_OUT:?}");
@ -290,29 +325,15 @@ impl Postgres {
continue;
};
match msg {
Ok(msg) => {
MESSAGES_IN_POSTGRES_CHANNEL.dec();
let tx_update_fut = update_que
.iter()
.map(|(tx, sig)| session.update_tx(tx, sig));
match msg {
PostgresMsg::PostgresTx(mut tx) => tx_que.append(&mut tx),
PostgresMsg::PostgresBlock(block) => block_que.push(block),
PostgresMsg::PostgresUpdateTx(tx, sig) => {
if let Err(err) = session.update_tx(tx, &sig).await {
warn!("Error updating tx in postgres {err:?}");
}
}
PostgresMsg::PostgreAccountAddr(_) => todo!(),
}
}
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
bail!("Postgres channel broke")
}
}
let (res_txs, res_block) =
join!(session.send_txs(&tx_que), session.send_blocks(&block_que));
let (res_txs, res_block, res_tx_update) = join!(
session.send_txs(&tx_que),
session.send_blocks(&block_que),
join_all(tx_update_fut)
);
if let Err(err) = res_txs {
warn!("Error sending tx batch to postgres {err:?}");
@ -323,9 +344,22 @@ impl Postgres {
if let Err(err) = res_block {
warn!("Error sending block batch to postgres {err:?}");
} else {
tx_que.clear();
block_que.clear();
}
let mut update_que_iter = update_que.into_iter();
update_que = res_tx_update
.iter()
.filter_map(|res| {
let item = update_que_iter.next();
if let Err(err) = res {
warn!("Error updating tx to postgres {err:?}");
return item;
}
None
})
.collect();
//{
// let mut batcher =
// Batcher::new(&mut tx_que, MAX_BATCH_SIZE, BatcherStrategy::Start);
@ -357,5 +391,5 @@ fn multiline_query_test() {
let mut query = String::new();
PostgresSession::multiline_query(&mut query, 3, 2);
assert_eq!(query, "($1,$2,$3)($4,$5,$6)");
assert_eq!(query, "($1,$2,$3),($4,$5,$6)");
}

35
tests/postgres.bash Executable file
View File

@ -0,0 +1,35 @@
#!/bin/sh
# env variables
export PGPASSWORD="password"
export PG_CONFIG="host=localhost dbname=postgres user=postgres password=password sslmode=disable"
# functions
pg_run() {
psql -h localhost -U postgres -d postgres -a "$@"
}
# create and start docker
docker create --name test-postgres -e POSTGRES_PASSWORD=password -p 5432:5432 postgres:latest || true
docker start test-postgres
echo "Clearing database"
pg_run -f ../migrations/rm.sql
pg_run -f ../migrations/create.sql
echo "Starting lite-rpc"
cargo run --release -- -p &
echo "Waiting 5 seconds for lite-rpc to start"
sleep 5
echo "Sending 10 txs"
cd ../bench && cargo run --release -- -t 10
echo "Killing lite-rpc"
kill "$(jobs -p)"
echo "Fetching database values"
pg_run -c "SELECT * FROM lite_rpc.txs;"
pg_run -c "SELECT * FROM lite_rpc.blocks;"