Merge pull request #92 from blockworks-foundation/postgres_batching

postgres batching
This commit is contained in:
Aniket Prajapati 2023-04-01 04:14:31 +05:30 committed by GitHub
commit 561edf49d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 302 additions and 124 deletions

View File

@ -1,3 +1,5 @@
CREATE SCHEMA lite_rpc;
CREATE TABLE lite_rpc.Txs (
id SERIAL NOT NULL PRIMARY KEY,
signature CHAR(88) NOT NULL,

View File

@ -318,11 +318,13 @@ impl BlockListener {
parent_slot: parent_slot as i64,
}))
.expect("Error sending block to postgres service");
MESSAGES_IN_POSTGRES_CHANNEL.inc();
}
Ok(())
}
pub fn listen(
self,
commitment_config: CommitmentConfig,
@ -353,9 +355,10 @@ impl BlockListener {
}
};
if let Err(_) = this
if this
.index_slot(slot, commitment_config, postgres.clone())
.await
.is_err()
{
// usually as we index all the slots even if they are not been processed we get some errors for slot
// as they are not in long term storage of the rpc // we check 5 times before ignoring the slot

View File

@ -1,8 +1,8 @@
use std::{sync::Arc, time::Duration};
use anyhow::{bail, Context};
use futures::{future::join_all, join};
use log::{info, warn};
use postgres_native_tls::MakeTlsConnector;
use std::{sync::Arc, time::Duration};
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use tokio::{
@ -12,7 +12,9 @@ use tokio::{
},
task::JoinHandle,
};
use tokio_postgres::{Client, Statement};
use tokio_postgres::{
config::SslMode, tls::MakeTlsConnect, types::ToSql, Client, NoTls, Socket, Statement,
};
use native_tls::{Certificate, Identity, TlsConnector};
@ -55,7 +57,7 @@ pub struct PostgreAccountAddr {
#[derive(Debug)]
pub enum PostgresMsg {
PostgresTx(PostgresTx),
PostgresTx(Vec<PostgresTx>),
PostgresBlock(PostgresBlock),
PostgreAccountAddr(PostgreAccountAddr),
PostgresUpdateTx(PostgresUpdateTx, String),
@ -66,63 +68,42 @@ pub type PostgresMpscSend = UnboundedSender<PostgresMsg>;
pub struct PostgresSession {
client: Client,
insert_tx_statement: Statement,
update_tx_statement: Statement,
insert_block_statement: Statement,
}
impl PostgresSession {
pub async fn new() -> anyhow::Result<Self> {
let ca_pem_b64 = std::env::var("CA_PEM_B64").context("env CA_PEM_B64 not found")?;
let client_pks_b64 =
std::env::var("CLIENT_PKS_B64").context("env CLIENT_PKS_B64 not found")?;
let client_pks_password =
std::env::var("CLIENT_PKS_PASS").context("env CLIENT_PKS_PASS not found")?;
let pg_config = std::env::var("PG_CONFIG").context("env PG_CONFIG not found")?;
let pg_config = pg_config.parse::<tokio_postgres::Config>()?;
let ca_pem = BinaryEncoding::Base64
.decode(ca_pem_b64)
.context("ca pem decode")?;
let client = if let SslMode::Disable = pg_config.get_ssl_mode() {
Self::spawn_connection(pg_config, NoTls).await?
} else {
let ca_pem_b64 = std::env::var("CA_PEM_B64").context("env CA_PEM_B64 not found")?;
let client_pks_b64 =
std::env::var("CLIENT_PKS_B64").context("env CLIENT_PKS_B64 not found")?;
let client_pks_password =
std::env::var("CLIENT_PKS_PASS").context("env CLIENT_PKS_PASS not found")?;
let client_pks = BinaryEncoding::Base64
.decode(client_pks_b64)
.context("client pks decode")?;
let ca_pem = BinaryEncoding::Base64
.decode(ca_pem_b64)
.context("ca pem decode")?;
let connector = TlsConnector::builder()
.add_root_certificate(Certificate::from_pem(&ca_pem)?)
.identity(Identity::from_pkcs12(&client_pks, &client_pks_password).context("Identity")?)
.danger_accept_invalid_hostnames(true)
.danger_accept_invalid_certs(true)
.build()?;
let client_pks = BinaryEncoding::Base64
.decode(client_pks_b64)
.context("client pks decode")?;
let connector = MakeTlsConnector::new(connector);
let (client, connection) = tokio_postgres::connect(&pg_config, connector.clone()).await?;
let connector = TlsConnector::builder()
.add_root_certificate(Certificate::from_pem(&ca_pem)?)
.identity(
Identity::from_pkcs12(&client_pks, &client_pks_password).context("Identity")?,
)
.danger_accept_invalid_hostnames(true)
.danger_accept_invalid_certs(true)
.build()?;
tokio::spawn(async move {
if let Err(err) = connection.await {
log::error!("Connection to Postgres broke {err:?}");
};
});
let insert_block_statement = client
.prepare(
r#"
INSERT INTO lite_rpc.Blocks
(slot, leader_id, parent_slot)
VALUES
($1, $2, $3)
"#,
)
.await?;
let insert_tx_statement = client.prepare(
r#"
INSERT INTO lite_rpc.Txs
(signature, recent_slot, forwarded_slot, processed_slot, cu_consumed, cu_requested, quic_response)
VALUES
($1, $2, $3, $4, $5, $6, $7)
"#,
).await?;
Self::spawn_connection(pg_config, MakeTlsConnector::new(connector)).await?
};
let update_tx_statement = client
.prepare(
@ -136,59 +117,134 @@ impl PostgresSession {
Ok(Self {
client,
insert_tx_statement,
insert_block_statement,
update_tx_statement,
})
}
pub async fn send_block(&self, block: PostgresBlock) -> anyhow::Result<()> {
let PostgresBlock {
slot,
leader_id,
parent_slot,
} = block;
async fn spawn_connection<T>(
pg_config: tokio_postgres::Config,
connector: T,
) -> anyhow::Result<Client>
where
T: MakeTlsConnect<Socket> + Send + 'static,
<T as MakeTlsConnect<Socket>>::Stream: Send,
{
let (client, connection) = pg_config
.connect(connector)
.await
.context("Connecting to Postgres failed")?;
self.client
.execute(
&self.insert_block_statement,
&[&slot, &leader_id, &parent_slot],
)
.await?;
tokio::spawn(async move {
if let Err(err) = connection.await {
log::error!("Connection to Postgres broke {err:?}");
}
});
Ok(client)
}
pub fn multiline_query(query: &mut String, args: usize, rows: usize) {
let mut arg_index = 1usize;
for row in 0..rows {
query.push('(');
for i in 0..args {
query.push_str(&format!("${arg_index}"));
arg_index += 1;
if i != (args - 1) {
query.push(',');
}
}
query.push(')');
if row != (rows - 1) {
query.push(',');
}
}
}
pub async fn send_txs(&self, txs: &[PostgresTx]) -> anyhow::Result<()> {
const NUMBER_OF_ARGS: usize = 7;
if txs.is_empty() {
return Ok(());
}
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * txs.len());
for tx in txs.iter() {
let PostgresTx {
signature,
recent_slot,
forwarded_slot,
processed_slot,
cu_consumed,
cu_requested,
quic_response,
} = tx;
args.push(signature);
args.push(recent_slot);
args.push(forwarded_slot);
args.push(processed_slot);
args.push(cu_consumed);
args.push(cu_requested);
args.push(quic_response);
}
let mut query = String::from(
r#"
INSERT INTO lite_rpc.Txs
(signature, recent_slot, forwarded_slot, processed_slot, cu_consumed, cu_requested, quic_response)
VALUES
"#,
);
Self::multiline_query(&mut query, NUMBER_OF_ARGS, txs.len());
self.client.execute(&query, &args).await?;
Ok(())
}
pub async fn send_tx(&self, tx: PostgresTx) -> anyhow::Result<()> {
let PostgresTx {
signature,
recent_slot,
forwarded_slot,
processed_slot,
cu_consumed,
cu_requested,
quic_response,
} = tx;
pub async fn send_blocks(&self, blocks: &[PostgresBlock]) -> anyhow::Result<()> {
const NUMBER_OF_ARGS: usize = 3;
self.client
.execute(
&self.insert_tx_statement,
&[
&signature,
&recent_slot,
&forwarded_slot,
&processed_slot,
&cu_consumed,
&cu_requested,
&quic_response,
],
)
.await?;
if blocks.is_empty() {
return Ok(());
}
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * blocks.len());
for block in blocks.iter() {
let PostgresBlock {
slot,
leader_id,
parent_slot,
} = block;
args.push(slot);
args.push(leader_id);
args.push(parent_slot);
}
let mut query = String::from(
r#"
INSERT INTO lite_rpc.Blocks
(slot, leader_id, parent_slot)
VALUES
"#,
);
Self::multiline_query(&mut query, NUMBER_OF_ARGS, blocks.len());
self.client.execute(&query, &args).await?;
Ok(())
}
pub async fn update_tx(&self, tx: PostgresUpdateTx, signature: &str) -> anyhow::Result<()> {
pub async fn update_tx(&self, tx: &PostgresUpdateTx, signature: &str) -> anyhow::Result<()> {
let PostgresUpdateTx {
processed_slot,
cu_consumed,
@ -198,7 +254,7 @@ impl PostgresSession {
self.client
.execute(
&self.update_tx_statement,
&[&processed_slot, &cu_consumed, &cu_requested, &signature],
&[processed_slot, cu_consumed, cu_requested, &signature],
)
.await?;
@ -234,7 +290,34 @@ impl Postgres {
tokio::spawn(async move {
info!("Writing to postgres");
while let Some(msg) = recv.recv().await {
let mut tx_que = Vec::<PostgresTx>::new();
let mut block_que = Vec::new();
let mut update_que = Vec::new();
loop {
loop {
let msg = recv.try_recv();
match msg {
Ok(msg) => {
MESSAGES_IN_POSTGRES_CHANNEL.dec();
match msg {
PostgresMsg::PostgresTx(mut tx) => tx_que.append(&mut tx),
PostgresMsg::PostgresBlock(block) => block_que.push(block),
PostgresMsg::PostgresUpdateTx(tx, sig) => {
update_que.push((tx, sig))
}
PostgresMsg::PostgreAccountAddr(_) => todo!(),
}
}
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
bail!("Postgres channel broke")
}
}
}
let Ok(session) = self.get_session().await else {
const TIME_OUT:Duration = Duration::from_millis(1000);
warn!("Unable to get postgres session. Retrying in {TIME_OUT:?}");
@ -242,22 +325,71 @@ impl Postgres {
continue;
};
MESSAGES_IN_POSTGRES_CHANNEL.dec();
let tx_update_fut = update_que
.iter()
.map(|(tx, sig)| session.update_tx(tx, sig));
let Err(err) = (
match msg {
PostgresMsg::PostgresTx(tx) => session.send_tx(tx).await,
PostgresMsg::PostgresUpdateTx(tx, sig) => session.update_tx(tx, &sig).await,
PostgresMsg::PostgresBlock(block) => session.send_block(block).await,
PostgresMsg::PostgreAccountAddr(_) => todo!(),
} ) else {
continue;
};
let (res_txs, res_block, res_tx_update) = join!(
session.send_txs(&tx_que),
session.send_blocks(&block_que),
join_all(tx_update_fut)
);
warn!("Error writing to postgres {err}");
if let Err(err) = res_txs {
warn!("Error sending tx batch to postgres {err:?}");
} else {
tx_que.clear();
}
if let Err(err) = res_block {
warn!("Error sending block batch to postgres {err:?}");
} else {
block_que.clear();
}
let mut update_que_iter = update_que.into_iter();
update_que = res_tx_update
.iter()
.filter_map(|res| {
let item = update_que_iter.next();
if let Err(err) = res {
warn!("Error updating tx to postgres {err:?}");
return item;
}
None
})
.collect();
//{
// let mut batcher =
// Batcher::new(&mut tx_que, MAX_BATCH_SIZE, BatcherStrategy::Start);
// while let Some(txs) = batcher.next_batch() {
// if let Err(err) = session.send_txs(txs).await {
// warn!("Error sending tx batch to postgres {err:?}");
// }
// }
//}
//{
// let mut batcher =
// Batcher::new(&mut block_que, MAX_BATCH_SIZE, BatcherStrategy::Start);
// while let Some(txs) = batcher.next_batch() {
// if let Err(err) = session.send_blocks(txs).await {
// warn!("Error sending block batch to postgres {err:?}");
// }
// }
//}
}
bail!("Postgres channel closed")
})
}
}
#[test]
fn multiline_query_test() {
let mut query = String::new();
PostgresSession::multiline_query(&mut query, 3, 2);
assert_eq!(query, "($1,$2,$3),($4,$5,$6)");
}

View File

@ -130,22 +130,28 @@ impl TxSender {
tasks_counter.fetch_sub(1, Ordering::Relaxed);
if let Some(postgres) = postgres {
for (sig, recent_slot) in &sigs_and_slots {
MESSAGES_IN_POSTGRES_CHANNEL.inc();
postgres
.send(PostgresMsg::PostgresTx(PostgresTx {
signature: sig.clone(),
recent_slot: *recent_slot as i64,
forwarded_slot: forwarded_slot as i64,
processed_slot: None,
cu_consumed: None,
cu_requested: None,
quic_response,
}))
.expect("Error writing to postgres service");
}
let postgres_msgs = sigs_and_slots
.iter()
.map(|(sig, recent_slot)| PostgresTx {
signature: sig.clone(),
recent_slot: *recent_slot as i64,
forwarded_slot: forwarded_slot as i64,
processed_slot: None,
cu_consumed: None,
cu_requested: None,
quic_response,
})
.collect();
postgres
.send(PostgresMsg::PostgresTx(postgres_msgs))
.expect("Error writing to postgres service");
MESSAGES_IN_POSTGRES_CHANNEL.inc();
}
histo_timer.observe_duration();
info!(
"It took {} ms to send a batch of {} transaction(s)",
start.elapsed().as_millis(),
@ -177,6 +183,7 @@ impl TxSender {
"Batching tx(s) with batch size of {tx_batch_size} every {}ms",
tx_send_interval.as_millis()
);
let semaphore = Arc::new(Semaphore::new(NUMBER_OF_TX_SENDERS));
// To limit the maximum number of background tasks sending transactions to MAX_NUMBER_OF_TOKIO_TASKS_SENDING_TXS
@ -217,6 +224,7 @@ impl TxSender {
}
}
}
assert_eq!(sigs_and_slots.len(), txs.len());
if sigs_and_slots.is_empty() {
@ -231,7 +239,7 @@ impl TxSender {
}
};
if txs.len() > 0 {
if !txs.is_empty() {
TX_BATCH_SIZES.set(txs.len() as i64);
let postgres = postgres_send.clone();
let tx_sender = self.clone();
@ -255,10 +263,8 @@ impl TxSender {
let length_before = self.txs_sent_store.len();
self.txs_sent_store.retain(|_k, v| {
let retain = v.sent_at.elapsed() < ttl_duration;
if !retain {
if v.status.is_none() {
TX_TIMED_OUT.inc();
}
if !retain && v.status.is_none() {
TX_TIMED_OUT.inc();
}
retain
});

35
tests/postgres.bash Executable file
View File

@ -0,0 +1,35 @@
#!/bin/sh
# env variables
export PGPASSWORD="password"
export PG_CONFIG="host=localhost dbname=postgres user=postgres password=password sslmode=disable"
# functions
pg_run() {
psql -h localhost -U postgres -d postgres -a "$@"
}
# create and start docker
docker create --name test-postgres -e POSTGRES_PASSWORD=password -p 5432:5432 postgres:latest || true
docker start test-postgres
echo "Clearing database"
pg_run -f ../migrations/rm.sql
pg_run -f ../migrations/create.sql
echo "Starting lite-rpc"
cargo run --release -- -p &
echo "Waiting 5 seconds for lite-rpc to start"
sleep 5
echo "Sending 10 txs"
cd ../bench && cargo run --release -- -t 10
echo "Killing lite-rpc"
kill "$(jobs -p)"
echo "Fetching database values"
pg_run -c "SELECT * FROM lite_rpc.txs;"
pg_run -c "SELECT * FROM lite_rpc.blocks;"