Postgres test migration fix (#122)

* metric devide fix

* postgres

* get max safe updates

* rm array vec

* >=

* make it 1

* continue

* postgres 50

* set default to 0 to trigger panic by default

* batch update statements

* add comment to code

---------

Co-authored-by: Maximilian Schneider <mail@maximilianschneider.net>
This commit is contained in:
Aniket Prajapati 2023-04-17 00:16:47 +05:30 committed by GitHub
parent 65c5d8365c
commit b49d18d826
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 197 additions and 118 deletions

View File

@ -34,7 +34,11 @@ impl AddAssign<&Self> for Metric {
}
impl DivAssign<u64> for Metric {
// used to avg metrics, if there were no runs then benchmark averages across 0 runs
fn div_assign(&mut self, rhs: u64) {
if rhs == 0 {
return;
}
self.total_time_elapsed_sec /= rhs as f64;
self.txs_sent /= rhs;
self.time_to_send_txs /= rhs as f64;

View File

@ -220,6 +220,9 @@ impl BlockListener {
.await;
let mut transactions_processed = 0;
let mut transactions_to_update = vec![];
transactions_to_update.reserve(transactions.len());
for tx in transactions {
let Some(UiTransactionStatusMeta { err, status, compute_units_consumed ,.. }) = tx.meta else {
info!("tx with no meta");
@ -260,26 +263,19 @@ impl BlockListener {
confirmation_status: Some(comfirmation_status.clone()),
});
//
// Write to postgres
//
if let Some(postgres) = &postgres {
// prepare writing to postgres
if let Some(_postgres) = &postgres {
let cu_consumed = match compute_units_consumed {
OptionSerializer::Some(cu_consumed) => Some(cu_consumed as i64),
_ => None,
};
postgres
.send(PostgresMsg::PostgresUpdateTx(
PostgresUpdateTx {
processed_slot: slot as i64,
cu_consumed,
cu_requested: None, //TODO: cu requested
},
sig.clone(),
))
.unwrap();
MESSAGES_IN_POSTGRES_CHANNEL.inc();
transactions_to_update.push(PostgresUpdateTx {
signature: sig.clone(),
processed_slot: slot as i64,
cu_consumed,
cu_requested: None, //TODO: cu requested
});
}
};
@ -299,6 +295,14 @@ impl BlockListener {
}
}
//
// Write to postgres
//
if let Some(postgres) = &postgres {
postgres.send(PostgresMsg::PostgresUpdateTx(transactions_to_update)).unwrap();
MESSAGES_IN_POSTGRES_CHANNEL.inc();
}
trace!(
"Number of transactions processed {} for slot {} for commitment {} time taken {} ms",
transactions_processed,

View File

@ -1,6 +1,6 @@
use anyhow::{bail, Context};
use chrono::{DateTime, Utc};
use futures::{future::join_all, join};
use futures::join;
use log::{info, warn};
use postgres_native_tls::MakeTlsConnector;
use std::{sync::Arc, time::Duration};
@ -13,9 +13,7 @@ use tokio::{
},
task::JoinHandle,
};
use tokio_postgres::{
config::SslMode, tls::MakeTlsConnect, types::ToSql, Client, NoTls, Socket, Statement,
};
use tokio_postgres::{config::SslMode, tls::MakeTlsConnect, types::ToSql, Client, NoTls, Socket};
use native_tls::{Certificate, Identity, TlsConnector};
@ -23,36 +21,76 @@ use crate::encoding::BinaryEncoding;
lazy_static::lazy_static! {
pub static ref MESSAGES_IN_POSTGRES_CHANNEL: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_messages_in_postgres", "Number of messages in postgres")).unwrap();
pub static ref POSTGRES_SESSION_ERRORS: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_session_errors", "Number of failures while establishing postgres session")).unwrap();
}
const MAX_QUERY_SIZE: usize = 200_000; // 0.2 mb
trait SchemaSize {
const DEFAULT_SIZE: usize = 0;
const MAX_SIZE: usize = 0;
}
const fn get_max_safe_inserts<T: SchemaSize>() -> usize {
if T::DEFAULT_SIZE == 0 {
panic!("DEFAULT_SIZE can't be 0. SchemaSize impl should override the DEFAULT_SIZE const");
}
MAX_QUERY_SIZE / T::DEFAULT_SIZE
}
const fn get_max_safe_updates<T: SchemaSize>() -> usize {
if T::MAX_SIZE == 0 {
panic!("MAX_SIZE can't be 0. SchemaSize impl should override the MAX_SIZE const");
}
MAX_QUERY_SIZE / T::MAX_SIZE
}
#[derive(Debug)]
pub struct PostgresTx {
pub signature: String,
pub recent_slot: i64,
pub forwarded_slot: i64,
pub forwarded_local_time: DateTime<Utc>,
pub signature: String, // 88 bytes
pub recent_slot: i64, // 8 bytes
pub forwarded_slot: i64, // 8 bytes
pub forwarded_local_time: DateTime<Utc>, // 8 bytes
pub processed_slot: Option<i64>,
pub cu_consumed: Option<i64>,
pub cu_requested: Option<i64>,
pub quic_response: i16,
pub quic_response: i16, // 8 bytes
}
impl SchemaSize for PostgresTx {
const DEFAULT_SIZE: usize = 88 + (4 * 8);
const MAX_SIZE: usize = Self::DEFAULT_SIZE + (3 * 8);
}
#[derive(Debug)]
pub struct PostgresUpdateTx {
pub processed_slot: i64,
pub signature: String, // 88 bytes
pub processed_slot: i64, // 8 bytes
pub cu_consumed: Option<i64>,
pub cu_requested: Option<i64>,
}
impl SchemaSize for PostgresUpdateTx {
const DEFAULT_SIZE: usize = 88 + 8;
const MAX_SIZE: usize = Self::DEFAULT_SIZE + (2 * 8);
}
#[derive(Debug)]
pub struct PostgresBlock {
pub slot: i64,
pub leader_id: i64,
pub parent_slot: i64,
pub cluster_time: DateTime<Utc>,
pub slot: i64, // 8 bytes
pub leader_id: i64, // 8 bytes
pub parent_slot: i64, // 8 bytes
pub cluster_time: DateTime<Utc>, // 8 bytes
pub local_time: Option<DateTime<Utc>>,
}
impl SchemaSize for PostgresBlock {
const DEFAULT_SIZE: usize = 4 * 8;
const MAX_SIZE: usize = Self::DEFAULT_SIZE + 8;
}
#[derive(Debug)]
pub struct PostgreAccountAddr {
pub id: u32,
@ -64,7 +102,7 @@ pub enum PostgresMsg {
PostgresTx(Vec<PostgresTx>),
PostgresBlock(PostgresBlock),
PostgreAccountAddr(PostgreAccountAddr),
PostgresUpdateTx(PostgresUpdateTx, String),
PostgresUpdateTx(Vec<PostgresUpdateTx>),
}
pub type PostgresMpscRecv = UnboundedReceiver<PostgresMsg>;
@ -72,7 +110,6 @@ pub type PostgresMpscSend = UnboundedSender<PostgresMsg>;
pub struct PostgresSession {
client: Client,
update_tx_statement: Statement,
}
impl PostgresSession {
@ -108,20 +145,7 @@ impl PostgresSession {
Self::spawn_connection(pg_config, MakeTlsConnector::new(connector)).await?
};
let update_tx_statement = client
.prepare(
r#"
UPDATE lite_rpc.txs
SET processed_slot = $1, cu_consumed = $2, cu_requested = $3
WHERE signature = $4
"#,
)
.await?;
Ok(Self {
client,
update_tx_statement,
})
Ok(Self { client })
}
async fn spawn_connection<T>(
@ -138,9 +162,14 @@ impl PostgresSession {
.context("Connecting to Postgres failed")?;
tokio::spawn(async move {
log::info!("Connecting to Postgres");
if let Err(err) = connection.await {
log::error!("Connection to Postgres broke {err:?}");
return;
}
unreachable!("Postgres thread returned")
});
Ok(client)
@ -253,19 +282,49 @@ impl PostgresSession {
Ok(())
}
pub async fn update_tx(&self, tx: &PostgresUpdateTx, signature: &str) -> anyhow::Result<()> {
let PostgresUpdateTx {
processed_slot,
cu_consumed,
cu_requested,
} = tx;
pub async fn update_txs(&self, txs: &[PostgresUpdateTx]) -> anyhow::Result<()> {
const NUMBER_OF_ARGS: usize = 4;
self.client
.execute(
&self.update_tx_statement,
&[processed_slot, cu_consumed, cu_requested, &signature],
)
.await?;
if txs.is_empty() {
return Ok(());
}
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * txs.len());
for tx in txs.iter() {
let PostgresUpdateTx {
signature,
processed_slot,
cu_consumed,
cu_requested,
} = tx;
args.push(signature);
args.push(processed_slot);
args.push(cu_consumed);
args.push(cu_requested);
}
let mut query = String::from(
r#"
UPDATE lite_rpc.Txs as t1 set
processed_slot = t2.processed_slot,
cu_consumed = t2.cu_consumed,
cu_requested = t2.cu_requested
FROM (VALUES
"#,
);
Self::multiline_query(&mut query, NUMBER_OF_ARGS, txs.len());
query.push_str(
r#"
) AS t2(signature, processed_slot, cu_consumed, cu_requested)
WHERE t1.signature = t2.signature
"#,
);
self.client.execute(&query, &args).await?;
Ok(())
}
@ -297,26 +356,44 @@ impl Postgres {
pub fn start(mut self, mut recv: PostgresMpscRecv) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
info!("Writing to postgres");
info!("start postgres worker");
let mut tx_que = Vec::<PostgresTx>::new();
let mut block_que = Vec::new();
let mut update_que = Vec::new();
const TX_MAX_CAPACITY: usize = get_max_safe_inserts::<PostgresTx>();
const BLOCK_MAX_CAPACITY: usize = get_max_safe_inserts::<PostgresBlock>();
const UPDATE_MAX_CAPACITY: usize = get_max_safe_updates::<PostgresUpdateTx>();
let mut tx_batch: Vec<PostgresTx> = Vec::with_capacity(TX_MAX_CAPACITY);
let mut block_batch: Vec<PostgresBlock> = Vec::with_capacity(BLOCK_MAX_CAPACITY);
let mut update_batch = Vec::<PostgresUpdateTx>::with_capacity(UPDATE_MAX_CAPACITY);
let mut session_establish_error = false;
loop {
// drain channel until we reach max capacity for any statement type
loop {
let msg = recv.try_recv();
if session_establish_error {
break;
}
match msg {
// check for capacity
if tx_batch.len() >= TX_MAX_CAPACITY
|| block_batch.len() >= BLOCK_MAX_CAPACITY
|| update_batch.len() >= UPDATE_MAX_CAPACITY
{
break;
}
match recv.try_recv() {
Ok(msg) => {
MESSAGES_IN_POSTGRES_CHANNEL.dec();
match msg {
PostgresMsg::PostgresTx(mut tx) => tx_que.append(&mut tx),
PostgresMsg::PostgresBlock(block) => block_que.push(block),
PostgresMsg::PostgresUpdateTx(tx, sig) => {
update_que.push((tx, sig))
PostgresMsg::PostgresTx(mut tx) => tx_batch.append(&mut tx),
PostgresMsg::PostgresBlock(block) => block_batch.push(block),
PostgresMsg::PostgresUpdateTx(mut update) => {
update_batch.append(&mut update)
}
PostgresMsg::PostgreAccountAddr(_) => todo!(),
}
}
@ -327,69 +404,60 @@ impl Postgres {
}
}
let Ok(session) = self.get_session().await else {
// if there's nothing to do, yield for a brief time
if tx_batch.is_empty() && block_batch.is_empty() && update_batch.is_empty() {
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
}
// Establish session with postgres or get an existing one
let session = self.get_session().await;
session_establish_error = session.is_err();
let Ok(session) = session else {
POSTGRES_SESSION_ERRORS.inc();
const TIME_OUT:Duration = Duration::from_millis(1000);
warn!("Unable to get postgres session. Retrying in {TIME_OUT:?}");
tokio::time::sleep(TIME_OUT).await;
continue;
};
let tx_update_fut = update_que
.iter()
.map(|(tx, sig)| session.update_tx(tx, sig));
POSTGRES_SESSION_ERRORS.set(0);
let (res_txs, res_block, res_tx_update) = join!(
session.send_txs(&tx_que),
session.send_blocks(&block_que),
join_all(tx_update_fut)
// write to database when a successful connection is made
let (res_txs, res_blocks, res_update) = join!(
session.send_txs(&tx_batch),
session.send_blocks(&block_batch),
session.update_txs(&update_batch)
);
// clear batches only if results were successful
if let Err(err) = res_txs {
warn!("Error sending tx batch ({:?}) to postgres {err:?}", tx_que.len());
warn!(
"Error sending tx batch ({:?}) to postgres {err:?}",
tx_batch.len()
);
} else {
tx_que.clear();
tx_batch.clear();
}
if let Err(err) = res_block {
warn!("Error sending block batch ({:?}) to postgres {err:?}", block_que.len());
if let Err(err) = res_blocks {
warn!(
"Error sending block batch ({:?}) to postgres {err:?}",
block_batch.len()
);
} else {
block_que.clear();
block_batch.clear();
}
if let Err(err) = res_update {
warn!(
"Error sending update batch ({:?}) to postgres {err:?}",
update_batch.len()
);
} else {
update_batch.clear();
}
let mut update_que_iter = update_que.into_iter();
update_que = res_tx_update
.iter()
.filter_map(|res| {
let item = update_que_iter.next();
if let Err(err) = res {
warn!("Error updating tx to postgres {err:?}");
return item;
}
None
})
.collect();
//{
// let mut batcher =
// Batcher::new(&mut tx_que, MAX_BATCH_SIZE, BatcherStrategy::Start);
// while let Some(txs) = batcher.next_batch() {
// if let Err(err) = session.send_txs(txs).await {
// warn!("Error sending tx batch to postgres {err:?}");
// }
// }
//}
//{
// let mut batcher =
// Batcher::new(&mut block_que, MAX_BATCH_SIZE, BatcherStrategy::Start);
// while let Some(txs) = batcher.next_batch() {
// if let Err(err) = session.send_blocks(txs).await {
// warn!("Error sending block batch to postgres {err:?}");
// }
// }
//}
}
})
}

View File

@ -1,7 +1,7 @@
#!/bin/sh
# kill background jobs on exit/failure
trap 'kill $(jobs -pr)' SIGINT SIGTERM EXIT
trap 'kill $(jobs -pr) && docker kill test-postgres' SIGINT SIGTERM EXIT
# env variables
export PGPASSWORD="password"
@ -16,12 +16,15 @@ pg_run() {
docker create --name test-postgres -e POSTGRES_PASSWORD=password -p 5432:5432 postgres:latest || true
docker start test-postgres
echo "Waiting 10 seconds for postgres to start"
sleep 10
echo "Clearing database"
pg_run -f ../migrations/rm.sql
pg_run -f ../migrations/create.sql
echo "Starting the test validator"
solana-test-validator > /dev/null &
(cd $HOME; solana-test-validator > /dev/null &)
echo "Waiting 8 seconds for solana-test-validator to start"
sleep 8