Merge remote-tracking branch 'origin/main' into improve/simplify-insertinto-queries
This commit is contained in:
commit
d1bc9935bc
18
src/main.rs
18
src/main.rs
|
@ -20,6 +20,7 @@ use dashmap::DashMap;
|
|||
use futures::StreamExt;
|
||||
use log::{debug, error, info};
|
||||
use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter, IntGauge};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use transaction_info::TransactionInfo;
|
||||
|
||||
mod alt_store;
|
||||
|
@ -146,7 +147,7 @@ async fn start_tracking_blocks(
|
|||
rpc_client: Arc<RpcClient>,
|
||||
grpc_block_addr: String,
|
||||
grpc_x_token: Option<String>,
|
||||
postgres: postgres::Postgres,
|
||||
block_sender_postgres: Sender<BlockInfo>,
|
||||
slot: Arc<AtomicU64>,
|
||||
alts_list: Vec<Pubkey>,
|
||||
) {
|
||||
|
@ -248,7 +249,7 @@ async fn start_tracking_blocks(
|
|||
BLOCK_TXS.set(block.transactions.len() as i64);
|
||||
BANKING_STAGE_BLOCKS_COUNTER.inc();
|
||||
BANKING_STAGE_BLOCKS_TASK.inc();
|
||||
let postgres = postgres.clone();
|
||||
let block_sender_postgres = block_sender_postgres.clone();
|
||||
let slot = slot.clone();
|
||||
let atl_store = atl_store.clone();
|
||||
tokio::spawn(async move {
|
||||
|
@ -258,7 +259,7 @@ async fn start_tracking_blocks(
|
|||
TXERROR_COUNT.add(
|
||||
block_info.processed_transactions - block_info.successful_transactions,
|
||||
);
|
||||
if let Err(e) = postgres.save_block_info(block_info).await {
|
||||
if let Err(e) = postgres::send_block_info_to_buffer(block_sender_postgres, block_info).await {
|
||||
panic!("Error saving block {}", e);
|
||||
}
|
||||
slot.store(block.slot, std::sync::atomic::Ordering::Relaxed);
|
||||
|
@ -295,7 +296,8 @@ async fn main() -> anyhow::Result<()> {
|
|||
let grpc_block_addr = args.grpc_address_to_fetch_blocks;
|
||||
let map_of_infos = Arc::new(DashMap::<(String, u64), TransactionInfo>::new());
|
||||
|
||||
let postgres = postgres::Postgres::new().await;
|
||||
let postgres1 = postgres::Postgres::new_with_workmem().await;
|
||||
let postgres2 = postgres::Postgres::new_with_workmem().await;
|
||||
let slot = Arc::new(AtomicU64::new(0));
|
||||
let no_block_subscription = grpc_block_addr.is_none();
|
||||
let alts = args.alts;
|
||||
|
@ -311,7 +313,11 @@ async fn main() -> anyhow::Result<()> {
|
|||
.map(|x| Pubkey::from_str(&x).unwrap())
|
||||
.collect_vec();
|
||||
|
||||
postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
|
||||
|
||||
let block_sender = postgres1.spawn_block_saver();
|
||||
|
||||
postgres2.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
|
||||
|
||||
let jhs = args
|
||||
.banking_grpc_addresses
|
||||
.iter()
|
||||
|
@ -335,7 +341,7 @@ async fn main() -> anyhow::Result<()> {
|
|||
rpc_client,
|
||||
gprc_block_addr,
|
||||
args.grpc_x_token,
|
||||
postgres,
|
||||
block_sender,
|
||||
slot,
|
||||
alts_list,
|
||||
)
|
||||
|
|
214
src/postgres.rs
214
src/postgres.rs
|
@ -1,4 +1,3 @@
|
|||
use std::time::Instant;
|
||||
use std::{
|
||||
sync::{atomic::AtomicU64, Arc},
|
||||
time::Duration,
|
||||
|
@ -14,6 +13,9 @@ use native_tls::{Certificate, Identity, TlsConnector};
|
|||
use postgres_native_tls::MakeTlsConnector;
|
||||
use serde::Serialize;
|
||||
use solana_sdk::transaction::TransactionError;
|
||||
use tokio::sync::mpsc::error::SendTimeoutError;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::time::Instant;
|
||||
use tokio_postgres::{
|
||||
binary_copy::BinaryCopyInWriter,
|
||||
config::SslMode,
|
||||
|
@ -27,6 +29,7 @@ use crate::{
|
|||
transaction_info::TransactionInfo,
|
||||
};
|
||||
|
||||
const BLOCK_WRITE_BUFFER_SIZE: usize = 5;
|
||||
const LIMIT_LATEST_TXS_PER_ACCOUNT: i64 = 1000;
|
||||
|
||||
pub struct TempTableTracker {
|
||||
|
@ -120,6 +123,20 @@ impl PostgresSession {
|
|||
Ok(client)
|
||||
}
|
||||
|
||||
pub async fn configure_work_mem(&self) {
|
||||
self.client
|
||||
.execute("SET work_mem TO '256MB'", &[])
|
||||
.await
|
||||
.unwrap();
|
||||
let work_mem: String = self
|
||||
.client
|
||||
.query_one("show work_mem", &[])
|
||||
.await
|
||||
.unwrap()
|
||||
.get("work_mem");
|
||||
info!("Configured work_mem={}", work_mem);
|
||||
}
|
||||
|
||||
pub async fn drop_temp_table(&self, table: String) -> anyhow::Result<()> {
|
||||
self.client
|
||||
.execute(format!("drop table if exists {};", table).as_str(), &[])
|
||||
|
@ -154,13 +171,19 @@ impl PostgresSession {
|
|||
"#,
|
||||
temp_table
|
||||
);
|
||||
let started_at = Instant::now();
|
||||
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
|
||||
let writer = BinaryCopyInWriter::new(sink, &[Type::TEXT]);
|
||||
pin_mut!(writer);
|
||||
for signature in signatures {
|
||||
writer.as_mut().write(&[&signature]).await?;
|
||||
}
|
||||
writer.finish().await?;
|
||||
let num_rows = writer.finish().await?;
|
||||
debug!(
|
||||
"inserted {} signatures into temp table in {}ms",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
|
@ -169,7 +192,14 @@ impl PostgresSession {
|
|||
"#,
|
||||
temp_table
|
||||
);
|
||||
self.client.execute(statement.as_str(), &[]).await?;
|
||||
let started_at = Instant::now();
|
||||
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
|
||||
debug!(
|
||||
"inserted {} signatures in transactions table in {}ms",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
Ok(())
|
||||
|
@ -203,13 +233,19 @@ impl PostgresSession {
|
|||
"#,
|
||||
temp_table
|
||||
);
|
||||
let started_at = Instant::now();
|
||||
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
|
||||
let writer = BinaryCopyInWriter::new(sink, &[Type::TEXT]);
|
||||
pin_mut!(writer);
|
||||
for account in accounts {
|
||||
writer.as_mut().write(&[&account]).await?;
|
||||
}
|
||||
writer.finish().await?;
|
||||
let num_rows = writer.finish().await?;
|
||||
debug!(
|
||||
"inserted {} account keys into temp table in {}ms",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
|
@ -218,7 +254,14 @@ impl PostgresSession {
|
|||
"#,
|
||||
temp_table
|
||||
);
|
||||
let started_at = Instant::now();
|
||||
self.client.execute(statement.as_str(), &[]).await?;
|
||||
debug!(
|
||||
"inserted {} account keys into accounts table in {}ms",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -254,6 +297,7 @@ impl PostgresSession {
|
|||
"#,
|
||||
temp_table
|
||||
);
|
||||
let started_at = Instant::now();
|
||||
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
|
||||
let writer = BinaryCopyInWriter::new(
|
||||
sink,
|
||||
|
@ -282,7 +326,12 @@ impl PostgresSession {
|
|||
writer.as_mut().write(&args).await?;
|
||||
}
|
||||
}
|
||||
writer.finish().await?;
|
||||
let num_rows = writer.finish().await?;
|
||||
debug!(
|
||||
"inserted {} txs for tx_slot into temp table in {}ms",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
|
@ -295,7 +344,15 @@ impl PostgresSession {
|
|||
"#,
|
||||
temp_table
|
||||
);
|
||||
let started_at = Instant::now();
|
||||
self.client.execute(statement.as_str(), &[]).await?;
|
||||
debug!(
|
||||
"inserted {} txs into transaction_slot table in {}ms",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -331,7 +388,7 @@ impl PostgresSession {
|
|||
"#,
|
||||
temp_table
|
||||
);
|
||||
|
||||
let started_at = Instant::now();
|
||||
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
|
||||
let writer = BinaryCopyInWriter::new(
|
||||
sink,
|
||||
|
@ -349,7 +406,12 @@ impl PostgresSession {
|
|||
writer.as_mut().write(&args).await?;
|
||||
}
|
||||
}
|
||||
writer.finish().await?;
|
||||
let num_rows = writer.finish().await?;
|
||||
debug!(
|
||||
"inserted {} accounts for transaction into temp table in {}ms",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
// merge data from temp table into accounts_map_transaction
|
||||
let statement = format!(
|
||||
|
@ -366,8 +428,13 @@ impl PostgresSession {
|
|||
"#,
|
||||
temp_table
|
||||
);
|
||||
let started_at = Instant::now();
|
||||
let rows = self.client.execute(statement.as_str(), &[]).await?;
|
||||
debug!("inserted into accounts_map_transaction: {}", rows);
|
||||
debug!(
|
||||
"inserted {} accounts into accounts_map_transaction in {}ms",
|
||||
rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
// merge data from temp table into accounts_map_transaction_latest
|
||||
// note: query uses the array_dedup_append postgres function to deduplicate and limit the array size
|
||||
|
@ -396,8 +463,13 @@ impl PostgresSession {
|
|||
temp_table_name = temp_table_latest_agged,
|
||||
limit = LIMIT_LATEST_TXS_PER_ACCOUNT
|
||||
);
|
||||
let rows = self.client.execute(statement.as_str(), &[]).await?;
|
||||
info!("inserted into {}: {}", temp_table_latest_agged, rows);
|
||||
let started_at = Instant::now();
|
||||
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
|
||||
debug!(
|
||||
"merged new transactions into accounts_map_transaction_latest for {} accounts in {}ms",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
|
@ -407,8 +479,13 @@ impl PostgresSession {
|
|||
"#,
|
||||
temp_table_name = temp_table_latest_agged
|
||||
);
|
||||
let rows = self.client.execute(statement.as_str(), &[]).await?;
|
||||
info!("upserted in accounts_map_transaction_latest: {}", rows);
|
||||
let started_at = Instant::now();
|
||||
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
|
||||
debug!(
|
||||
"upserted {} merged transaction arrays into accounts_map_transaction_latest in {}ms",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
self.drop_temp_table(temp_table_latest_agged).await?;
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
|
@ -454,7 +531,7 @@ impl PostgresSession {
|
|||
"#,
|
||||
temp_table
|
||||
);
|
||||
|
||||
let started_at = Instant::now();
|
||||
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
|
||||
let writer = BinaryCopyInWriter::new(
|
||||
sink,
|
||||
|
@ -480,7 +557,12 @@ impl PostgresSession {
|
|||
args.push(&transaction.supp_infos);
|
||||
writer.as_mut().write(&args).await?;
|
||||
}
|
||||
writer.finish().await?;
|
||||
let num_rows = writer.finish().await?;
|
||||
debug!(
|
||||
"inserted {} transactions for block into temp table in {}ms",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
|
@ -499,7 +581,13 @@ impl PostgresSession {
|
|||
"#,
|
||||
temp_table
|
||||
);
|
||||
self.client.execute(statement.as_str(), &[]).await?;
|
||||
let started_at = Instant::now();
|
||||
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
|
||||
debug!(
|
||||
"inserted {} transactions for block into transaction_infos table in {}ms",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
Ok(())
|
||||
|
@ -538,7 +626,7 @@ impl PostgresSession {
|
|||
"#,
|
||||
temp_table
|
||||
);
|
||||
|
||||
let started_at = Instant::now();
|
||||
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
|
||||
let writer = BinaryCopyInWriter::new(
|
||||
sink,
|
||||
|
@ -564,7 +652,12 @@ impl PostgresSession {
|
|||
args.push(&pf_json);
|
||||
writer.as_mut().write(&args).await?;
|
||||
}
|
||||
writer.finish().await?;
|
||||
let num_rows = writer.finish().await?;
|
||||
debug!(
|
||||
"inserted {} heavily_locked_accounts into temp table in {}ms",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
|
@ -602,7 +695,13 @@ impl PostgresSession {
|
|||
"#,
|
||||
temp_table
|
||||
);
|
||||
self.client.execute(statement.as_str(), &[]).await?;
|
||||
let started_at = Instant::now();
|
||||
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
|
||||
debug!(
|
||||
"inserted {} heavily_locked_accounts into accounts_map_blocks table in {}ms",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
Ok(())
|
||||
|
@ -622,6 +721,7 @@ impl PostgresSession {
|
|||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
ON CONFLICT DO NOTHING
|
||||
"#;
|
||||
let started_at = Instant::now();
|
||||
let num_rows = self.client
|
||||
.execute(
|
||||
statement,
|
||||
|
@ -637,6 +737,12 @@ impl PostgresSession {
|
|||
],
|
||||
)
|
||||
.await?;
|
||||
debug!(
|
||||
"inserted {} block info into blocks table in {}ms",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
if num_rows == 0 {
|
||||
warn!("block_info already exists in blocks table - skipping insert");
|
||||
}
|
||||
|
@ -739,7 +845,7 @@ impl PostgresSession {
|
|||
// save account usage in blocks
|
||||
self.save_account_usage_in_block(&block_info).await?;
|
||||
self.save_block_info(&block_info).await?;
|
||||
info!("block saved");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -761,17 +867,7 @@ impl PostgresSession {
|
|||
slots_to_keep
|
||||
);
|
||||
|
||||
self.client
|
||||
.execute("SET work_mem TO '256MB'", &[])
|
||||
.await
|
||||
.unwrap();
|
||||
let work_mem: String = self
|
||||
.client
|
||||
.query_one("show work_mem", &[])
|
||||
.await
|
||||
.unwrap()
|
||||
.get("work_mem");
|
||||
info!("Configured work_mem={}", work_mem);
|
||||
self.configure_work_mem().await;
|
||||
|
||||
{
|
||||
info!("Rows before cleanup:");
|
||||
|
@ -1069,13 +1165,47 @@ pub struct Postgres {
|
|||
}
|
||||
|
||||
impl Postgres {
|
||||
pub async fn new() -> Self {
|
||||
pub async fn new_with_workmem() -> Self {
|
||||
let session = PostgresSession::new().await.unwrap();
|
||||
let session = Arc::new(session);
|
||||
session.configure_work_mem().await;
|
||||
Self {
|
||||
session: Arc::new(session),
|
||||
session
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn_block_saver(&self) -> Sender<BlockInfo> {
|
||||
let (block_sender, mut block_receiver) = tokio::sync::mpsc::channel::<BlockInfo>(BLOCK_WRITE_BUFFER_SIZE);
|
||||
|
||||
let session = self.session.clone();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match block_receiver.recv().await {
|
||||
None => {
|
||||
warn!("block_receiver closed - stopping thread");
|
||||
return;
|
||||
}
|
||||
|
||||
Some(block) => {
|
||||
let slot = block.slot;
|
||||
info!("saving block {} ..", slot);
|
||||
match session.save_block(block).await {
|
||||
Ok(_) => {
|
||||
info!("saving block {} done", slot);
|
||||
}
|
||||
Err(err) => {
|
||||
error!("saving block failed {}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
return block_sender;
|
||||
}
|
||||
|
||||
|
||||
pub fn spawn_transaction_infos_saver(
|
||||
&self,
|
||||
map_of_transaction: Arc<DashMap<(String, u64), TransactionInfo>>,
|
||||
|
@ -1108,9 +1238,6 @@ impl Postgres {
|
|||
});
|
||||
}
|
||||
|
||||
pub async fn save_block_info(&self, block: BlockInfo) -> anyhow::Result<()> {
|
||||
self.session.save_block(block).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone)]
|
||||
|
@ -1130,3 +1257,20 @@ pub struct AccountsForTransaction {
|
|||
pub signature: String,
|
||||
pub accounts: Vec<AccountUsed>,
|
||||
}
|
||||
|
||||
|
||||
pub async fn send_block_info_to_buffer(block_sender_postgres: Sender<BlockInfo>, block_info: BlockInfo) -> anyhow::Result<()> {
|
||||
debug!("block buffer capacity: {}", block_sender_postgres.capacity());
|
||||
|
||||
const WARNING_THRESHOLD: Duration = Duration::from_millis(3000);
|
||||
|
||||
let started_at = Instant::now();
|
||||
if let Err(SendTimeoutError::Timeout(block)) = block_sender_postgres.send_timeout(block_info, WARNING_THRESHOLD).await {
|
||||
let slot = block.slot;
|
||||
warn!("Block {} was not buffered for {:.3}s - continue waiting", slot, WARNING_THRESHOLD.as_secs_f32());
|
||||
block_sender_postgres.send(block).await?;
|
||||
info!("Block {} was finally buffered after {:.3}s", slot, started_at.elapsed().as_secs_f32());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue