insert query batch

This commit is contained in:
aniketfuryrocks 2023-03-26 16:31:36 +05:30
parent f98c8f2e4c
commit ed8351e0b9
No known key found for this signature in database
GPG Key ID: FA6BFCFAA7D4B764
1 changed files with 57 additions and 110 deletions

View File

@ -26,6 +26,8 @@ lazy_static::lazy_static! {
pub static ref MESSAGES_IN_POSTGRES_CHANNEL: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_messages_in_postgres", "Number of messages in postgres")).unwrap(); pub static ref MESSAGES_IN_POSTGRES_CHANNEL: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_messages_in_postgres", "Number of messages in postgres")).unwrap();
} }
const MAX_BATCH_SIZE: usize = 15;
#[derive(Debug)] #[derive(Debug)]
pub struct PostgresTx { pub struct PostgresTx {
pub signature: String, pub signature: String,
@ -70,10 +72,6 @@ pub type PostgresMpscSend = UnboundedSender<PostgresMsg>;
pub struct PostgresSession { pub struct PostgresSession {
client: Client, client: Client,
insert_tx_statement: Statement,
insert_5_batch_tx_statement: Statement,
insert_block_statement: Statement,
insert_5_batch_block_statement: Statement,
update_tx_statement: Statement, update_tx_statement: Statement,
} }
@ -110,54 +108,6 @@ impl PostgresSession {
}; };
}); });
let insert_block_statement = client
.prepare(
r#"
INSERT INTO lite_rpc.Blocks
(slot, leader_id, parent_slot)
VALUES
($1, $2, $3)
"#,
)
.await?;
let insert_5_batch_block_statement = client
.prepare(
r#"
INSERT INTO lite_rpc.Blocks
(slot, leader_id, parent_slot)
VALUES
($1, $2, $3),
($4, $5, $6),
($7, $8, $9),
($10, $11, $12),
($13, $14, $15)
"#,
)
.await?;
let insert_tx_statement = client.prepare(
r#"
INSERT INTO lite_rpc.Txs
(signature, recent_slot, forwarded_slot, processed_slot, cu_consumed, cu_requested, quic_response)
VALUES
($1, $2, $3, $4, $5, $6, $7)
"#,
).await?;
let insert_5_batch_tx_statement = client.prepare(
r#"
INSERT INTO lite_rpc.Txs
(signature, recent_slot, forwarded_slot, processed_slot, cu_consumed, cu_requested, quic_response)
VALUES
($1, $2, $3, $4, $5, $6, $7),
($8, $9, $10, $11, $12, $13, $14),
($15, $16, $17, $18, $19, $20, $21),
($22 ,$23, $24, $25, $26, $27, $28),
($29 ,$30, $31, $32, $33, $34, $35)
"#,
).await?;
let update_tx_statement = client let update_tx_statement = client
.prepare( .prepare(
r#" r#"
@ -170,33 +120,31 @@ impl PostgresSession {
Ok(Self { Ok(Self {
client, client,
insert_tx_statement,
insert_5_batch_tx_statement,
insert_block_statement,
insert_5_batch_block_statement,
update_tx_statement, update_tx_statement,
}) })
} }
pub async fn send_block(&self, block: PostgresBlock) -> anyhow::Result<()> { pub fn multiline_query(query: &mut String, args: usize, rows: usize) {
let PostgresBlock { let mut arg_index = 1usize;
slot, for _ in 0..rows {
leader_id, query.push('(');
parent_slot,
} = block;
self.client for i in 0..args {
.execute( query.push_str(&format!("${arg_index}"));
&self.insert_block_statement, arg_index += 1;
&[&slot, &leader_id, &parent_slot], if i != (args - 1) {
) query.push(',');
.await?; }
Ok(())
} }
pub async fn send_tx_5_batch(&self, txs: &[PostgresTx]) -> anyhow::Result<()> { query.push(')');
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(7 * 5); }
}
pub async fn send_txs(&self, txs: &[PostgresTx]) -> anyhow::Result<()> {
const NUMBER_OF_ARGS: usize = 7;
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * txs.len());
for tx in txs.iter() { for tx in txs.iter() {
let PostgresTx { let PostgresTx {
@ -218,15 +166,25 @@ impl PostgresSession {
args.push(quic_response); args.push(quic_response);
} }
self.client let mut query = String::from(
.execute(&self.insert_5_batch_tx_statement, &args) r#"
.await?; INSERT INTO lite_rpc.Txs
(signature, recent_slot, forwarded_slot, processed_slot, cu_consumed, cu_requested, quic_response)
VALUES
"#,
);
Self::multiline_query(&mut query, NUMBER_OF_ARGS, txs.len());
self.client.execute(&query, &args).await?;
Ok(()) Ok(())
} }
pub async fn send_block_5_batch(&self, blocks: &[PostgresBlock]) -> anyhow::Result<()> { pub async fn send_blocks(&self, blocks: &[PostgresBlock]) -> anyhow::Result<()> {
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(7 * 5); const NUMBER_OF_ARGS: usize = 3;
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * blocks.len());
for block in blocks.iter() { for block in blocks.iter() {
let PostgresBlock { let PostgresBlock {
@ -240,38 +198,17 @@ impl PostgresSession {
args.push(parent_slot); args.push(parent_slot);
} }
self.client let mut query = String::from(
.execute(&self.insert_5_batch_block_statement, &args) r#"
.await?; INSERT INTO lite_rpc.Blocks
(slot, leader_id, parent_slot)
VALUES
"#,
);
Ok(()) Self::multiline_query(&mut query, NUMBER_OF_ARGS, blocks.len());
}
pub async fn send_tx(&self, tx: PostgresTx) -> anyhow::Result<()> { self.client.execute(&query, &args).await?;
let PostgresTx {
signature,
recent_slot,
forwarded_slot,
processed_slot,
cu_consumed,
cu_requested,
quic_response,
} = tx;
self.client
.execute(
&self.insert_tx_statement,
&[
&signature,
&recent_slot,
&forwarded_slot,
&processed_slot,
&cu_consumed,
&cu_requested,
&quic_response,
],
)
.await?;
Ok(()) Ok(())
} }
@ -356,20 +293,22 @@ impl Postgres {
} }
{ {
let mut batcher = Batcher::new(&mut tx_que, 5, BatcherStrategy::Start); let mut batcher =
Batcher::new(&mut tx_que, MAX_BATCH_SIZE, BatcherStrategy::Start);
while let Some(txs) = batcher.next_batch() { while let Some(txs) = batcher.next_batch() {
if let Err(err) = session.send_tx_5_batch(txs).await { if let Err(err) = session.send_txs(txs).await {
warn!("Error sending tx batch to postgres {err:?}"); warn!("Error sending tx batch to postgres {err:?}");
} }
} }
} }
{ {
let mut batcher = Batcher::new(&mut block_que, 5, BatcherStrategy::Start); let mut batcher =
Batcher::new(&mut block_que, MAX_BATCH_SIZE, BatcherStrategy::Start);
while let Some(txs) = batcher.next_batch() { while let Some(txs) = batcher.next_batch() {
if let Err(err) = session.send_block_5_batch(txs).await { if let Err(err) = session.send_blocks(txs).await {
warn!("Error sending block batch to postgres {err:?}"); warn!("Error sending block batch to postgres {err:?}");
} }
} }
@ -378,3 +317,11 @@ impl Postgres {
}) })
} }
} }
#[test]
fn multiline_query_test() {
let mut query = String::new();
PostgresSession::multiline_query(&mut query, 3, 2);
assert_eq!(query, "($1,$2,$3)($4,$5,$6)");
}