Pipeline SQL inserts for better performance

Previously insert speed was primarily limited by roundtrip time to the
database since they were fully sequential in each connection. Now a full
batch of inserts is done per connection in parallel.

This could still be improved a lot. Currently each connection will fully
wait for all inserts to be done in the current batch before starting
another one.
This commit is contained in:
Christian Kamm 2022-01-04 10:15:22 +01:00
parent 3f242aa3b6
commit 5b5eaba4ff
5 changed files with 47 additions and 15 deletions

View File

@ -18,6 +18,7 @@ program_id = "mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68"
[postgres_target]
connection_string = "host=/var/run/postgresql"
account_write_connection_count = 4
account_write_max_batch_size = 10
slot_update_connection_count = 4
retry_query_max_count = 3
retry_query_sleep_secs = 5

View File

@ -18,6 +18,7 @@ program_id = ""
[postgres_target]
connection_string = "host=/var/run/postgresql"
account_write_connection_count = 4
account_write_max_batch_size = 10
slot_update_connection_count = 2
retry_query_max_count = 3
retry_query_sleep_secs = 5

View File

@ -70,6 +70,8 @@ pub struct PostgresConfig {
pub connection_string: String,
/// Number of parallel postgres connections used for account write insertions
pub account_write_connection_count: u64,
/// Maximum batch size for account write inserts over one connection
pub account_write_max_batch_size: usize,
/// Number of parallel postgres connections used for slot insertions
pub slot_update_connection_count: u64,
/// Number of queries retries before fatal error

View File

@ -28,6 +28,10 @@ impl MetricU64 {
self.value.store(value, atomic::Ordering::Release);
}
pub fn add(&mut self, value: u64) {
self.value.fetch_add(value, atomic::Ordering::AcqRel);
}
pub fn increment(&mut self) {
self.value.fetch_add(1, atomic::Ordering::AcqRel);
}

View File

@ -87,11 +87,12 @@ async fn process_account_write(
write: &AccountWrite,
account_tables: &AccountTables,
) -> anyhow::Result<()> {
for account_table in account_tables {
// TODO: Could run all these in parallel instead of sequentially
let _ = account_table.insert_account_write(client, write).await?;
}
futures::future::try_join_all(
account_tables
.iter()
.map(|table| table.insert_account_write(client, write)),
)
.await?;
Ok(())
}
@ -343,13 +344,28 @@ pub async fn init(
tokio::spawn(async move {
let mut client_opt = None;
loop {
let write = account_write_queue_receiver_c
.recv()
.await
.expect("sender must stay alive");
// Retrieve up to batch_size account writes
let mut write_batch = Vec::new();
write_batch.push(
account_write_queue_receiver_c
.recv()
.await
.expect("sender must stay alive"),
);
while write_batch.len() < config.account_write_max_batch_size {
match account_write_queue_receiver_c.try_recv() {
Ok(write) => write_batch.push(write),
Err(async_channel::TryRecvError::Empty) => break,
Err(async_channel::TryRecvError::Closed) => {
panic!("sender must stay alive")
}
};
}
trace!(
"account write, channel size {}",
account_write_queue_receiver_c.len()
"account write, batch {}, channel size {}",
write_batch.len(),
account_write_queue_receiver_c.len(),
);
let mut error_count = 0;
@ -357,12 +373,20 @@ pub async fn init(
let client =
update_postgres_client(&mut client_opt, &postgres_account_writes, &config)
.await;
if let Err(err) = process_account_write(client, &write, &account_tables_c).await
{
metric_retries.increment();
let mut results = futures::future::join_all(
write_batch
.iter()
.map(|write| process_account_write(client, &write, &account_tables_c)),
)
.await;
let mut iter = results.iter();
write_batch.retain(|_| iter.next().unwrap().is_err());
if write_batch.len() > 0 {
metric_retries.add(write_batch.len() as u64);
error_count += 1;
if error_count - 1 < config.retry_query_max_count {
warn!("failed to process account write, retrying: {:?}", err);
results.retain(|r| r.is_err());
warn!("failed to process account write, retrying: {:?}", results);
tokio::time::sleep(Duration::from_secs(config.retry_query_sleep_secs))
.await;
continue;