diff --git a/src/workers/postgres.rs b/src/workers/postgres.rs index 11ac01ea..e0bfcbdc 100644 --- a/src/workers/postgres.rs +++ b/src/workers/postgres.rs @@ -26,6 +26,8 @@ lazy_static::lazy_static! { pub static ref MESSAGES_IN_POSTGRES_CHANNEL: GenericGauge = register_int_gauge!(opts!("literpc_messages_in_postgres", "Number of messages in postgres")).unwrap(); } +const MAX_BATCH_SIZE: usize = 15; + #[derive(Debug)] pub struct PostgresTx { pub signature: String, @@ -70,10 +72,6 @@ pub type PostgresMpscSend = UnboundedSender; pub struct PostgresSession { client: Client, - insert_tx_statement: Statement, - insert_5_batch_tx_statement: Statement, - insert_block_statement: Statement, - insert_5_batch_block_statement: Statement, update_tx_statement: Statement, } @@ -110,54 +108,6 @@ impl PostgresSession { }; }); - let insert_block_statement = client - .prepare( - r#" - INSERT INTO lite_rpc.Blocks - (slot, leader_id, parent_slot) - VALUES - ($1, $2, $3) - "#, - ) - .await?; - - let insert_5_batch_block_statement = client - .prepare( - r#" - INSERT INTO lite_rpc.Blocks - (slot, leader_id, parent_slot) - VALUES - ($1, $2, $3), - ($4, $5, $6), - ($7, $8, $9), - ($10, $11, $12), - ($13, $14, $15) - "#, - ) - .await?; - - let insert_tx_statement = client.prepare( - r#" - INSERT INTO lite_rpc.Txs - (signature, recent_slot, forwarded_slot, processed_slot, cu_consumed, cu_requested, quic_response) - VALUES - ($1, $2, $3, $4, $5, $6, $7) - "#, - ).await?; - - let insert_5_batch_tx_statement = client.prepare( - r#" - INSERT INTO lite_rpc.Txs - (signature, recent_slot, forwarded_slot, processed_slot, cu_consumed, cu_requested, quic_response) - VALUES - ($1, $2, $3, $4, $5, $6, $7), - ($8, $9, $10, $11, $12, $13, $14), - ($15, $16, $17, $18, $19, $20, $21), - ($22 ,$23, $24, $25, $26, $27, $28), - ($29 ,$30, $31, $32, $33, $34, $35) - "#, - ).await?; - let update_tx_statement = client .prepare( r#" @@ -170,33 +120,31 @@ impl PostgresSession { Ok(Self { client, - insert_tx_statement, - insert_5_batch_tx_statement, - insert_block_statement, - insert_5_batch_block_statement, update_tx_statement, }) } - pub async fn send_block(&self, block: PostgresBlock) -> anyhow::Result<()> { - let PostgresBlock { - slot, - leader_id, - parent_slot, - } = block; + pub fn multiline_query(query: &mut String, args: usize, rows: usize) { + let mut arg_index = 1usize; + for _ in 0..rows { + query.push('('); - self.client - .execute( - &self.insert_block_statement, - &[&slot, &leader_id, &parent_slot], - ) - .await?; + for i in 0..args { + query.push_str(&format!("${arg_index}")); + arg_index += 1; + if i != (args - 1) { + query.push(','); + } + } - Ok(()) + query.push(')'); + } } - pub async fn send_tx_5_batch(&self, txs: &[PostgresTx]) -> anyhow::Result<()> { - let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(7 * 5); + pub async fn send_txs(&self, txs: &[PostgresTx]) -> anyhow::Result<()> { + const NUMBER_OF_ARGS: usize = 7; + + let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * txs.len()); for tx in txs.iter() { let PostgresTx { @@ -218,15 +166,25 @@ impl PostgresSession { args.push(quic_response); } - self.client - .execute(&self.insert_5_batch_tx_statement, &args) - .await?; + let mut query = String::from( + r#" + INSERT INTO lite_rpc.Txs + (signature, recent_slot, forwarded_slot, processed_slot, cu_consumed, cu_requested, quic_response) + VALUES + "#, + ); + + Self::multiline_query(&mut query, NUMBER_OF_ARGS, txs.len()); + + self.client.execute(&query, &args).await?; Ok(()) } - pub async fn send_block_5_batch(&self, blocks: &[PostgresBlock]) -> anyhow::Result<()> { - let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(7 * 5); + pub async fn send_blocks(&self, blocks: &[PostgresBlock]) -> anyhow::Result<()> { + const NUMBER_OF_ARGS: usize = 3; + + let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * blocks.len()); for block in blocks.iter() { let PostgresBlock { @@ -240,38 +198,17 @@ impl PostgresSession { args.push(parent_slot); } - self.client - .execute(&self.insert_5_batch_block_statement, &args) - .await?; + let mut query = String::from( + r#" + INSERT INTO lite_rpc.Blocks + (slot, leader_id, parent_slot) + VALUES + "#, + ); - Ok(()) - } + Self::multiline_query(&mut query, NUMBER_OF_ARGS, blocks.len()); - pub async fn send_tx(&self, tx: PostgresTx) -> anyhow::Result<()> { - let PostgresTx { - signature, - recent_slot, - forwarded_slot, - processed_slot, - cu_consumed, - cu_requested, - quic_response, - } = tx; - - self.client - .execute( - &self.insert_tx_statement, - &[ - &signature, - &recent_slot, - &forwarded_slot, - &processed_slot, - &cu_consumed, - &cu_requested, - &quic_response, - ], - ) - .await?; + self.client.execute(&query, &args).await?; Ok(()) } @@ -356,20 +293,22 @@ impl Postgres { } { - let mut batcher = Batcher::new(&mut tx_que, 5, BatcherStrategy::Start); + let mut batcher = + Batcher::new(&mut tx_que, MAX_BATCH_SIZE, BatcherStrategy::Start); while let Some(txs) = batcher.next_batch() { - if let Err(err) = session.send_tx_5_batch(txs).await { + if let Err(err) = session.send_txs(txs).await { warn!("Error sending tx batch to postgres {err:?}"); } } } { - let mut batcher = Batcher::new(&mut block_que, 5, BatcherStrategy::Start); + let mut batcher = + Batcher::new(&mut block_que, MAX_BATCH_SIZE, BatcherStrategy::Start); while let Some(txs) = batcher.next_batch() { - if let Err(err) = session.send_block_5_batch(txs).await { + if let Err(err) = session.send_blocks(txs).await { warn!("Error sending block batch to postgres {err:?}"); } } @@ -378,3 +317,11 @@ impl Postgres { }) } } + +#[test] +fn multiline_query_test() { + let mut query = String::new(); + + PostgresSession::multiline_query(&mut query, 3, 2); + assert_eq!(query, "($1,$2,$3)($4,$5,$6)"); +}