Make more postgres settings configurable

This commit is contained in:
Christian Kamm 2021-11-08 11:26:01 +01:00
parent d9cb30c2b3
commit d366581826
11 changed files with 78 additions and 46 deletions

4
Cargo.lock generated
View File

@ -4330,11 +4330,11 @@ dependencies = [
"mango-common",
"postgres-types",
"postgres_query",
"serde_json",
"solana-accountsdb-connector-lib",
"solana-logger",
"tokio",
"tokio-postgres",
"toml",
]
[[package]]
@ -4367,10 +4367,10 @@ version = "0.1.0"
dependencies = [
"anyhow",
"log 0.4.14",
"serde_json",
"solana-accountsdb-connector-lib",
"solana-logger",
"tokio",
"toml",
]
[[package]]

View File

@ -9,7 +9,8 @@ solana-accountsdb-connector-lib = { path = "../lib" }
solana-logger = "=1.8.2"
log = "0.4"
anyhow = "1.0"
serde_json = "1.0.68"
toml = "0.5"
async-trait = "0.1"
fixed = { version = "=1.9.0", features = ["serde"] }
bs58 = "0.3.1"

View File

@ -1,8 +0,0 @@
{
"postgres_connection_string": "host=/var/run/postgresql",
"postgres_account_write_connections": 4,
"grpc_connection_string": "http://[::1]:10000",
"rpc_http_url": "",
"rpc_ws_url": "",
"program_id": "mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68"
}

View File

@ -0,0 +1,12 @@
grpc_connection_string = "http://[::1]:10000"
rpc_http_url = ""
rpc_ws_url = ""
program_id = "mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68"
[postgres_target]
connection_string = "host=/var/run/postgresql"
account_write_connection_count = 4
retry_query_max_count = 3
retry_query_sleep_secs = 5
retry_connection_sleep_secs = 30
fatal_connection_timeout_secs = 600

View File

@ -18,7 +18,7 @@ async fn main() -> Result<(), anyhow::Error> {
let mut file = File::open(&args[1])?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
serde_json::from_str(&contents).unwrap()
toml::from_str(&contents).unwrap()
};
solana_logger::setup_with_default("info");
@ -32,7 +32,7 @@ async fn main() -> Result<(), anyhow::Error> {
];
let (account_write_queue_sender, slot_queue_sender) =
postgres_target::init(&config, account_tables).await?;
postgres_target::init(&config.postgres_target, account_tables).await?;
info!("postgres done");
let use_accountsdb = true;

View File

@ -10,4 +10,4 @@ solana-logger = "=1.8.2"
log = "0.4"
tokio = { version = "1", features = ["full"] }
anyhow = "1.0"
serde_json = "1.0.68"
toml = "0.5"

View File

@ -1,8 +0,0 @@
{
"postgres_connection_string": "host=/var/run/postgresql",
"postgres_account_write_connections": 4,
"grpc_connection_string": "http://[::1]:10000",
"rpc_http_url": "",
"rpc_ws_url": "",
"program_id": ""
}

View File

@ -0,0 +1,12 @@
grpc_connection_string = "http://[::1]:10000"
rpc_http_url = ""
rpc_ws_url = ""
program_id = ""
[postgres_target]
connection_string = "host=/var/run/postgresql"
account_write_connection_count = 4
retry_query_max_count = 3
retry_query_sleep_secs = 5
retry_connection_sleep_secs = 30
fatal_connection_timeout_secs = 600

View File

@ -16,7 +16,7 @@ async fn main() -> Result<(), anyhow::Error> {
let mut file = File::open(&args[1])?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
serde_json::from_str(&contents).unwrap()
toml::from_str(&contents).unwrap()
};
solana_logger::setup_with_default("info");
@ -25,7 +25,7 @@ async fn main() -> Result<(), anyhow::Error> {
let account_tables: AccountTables = vec![Arc::new(RawAccountTable {})];
let (account_write_queue_sender, slot_queue_sender) =
postgres_target::init(&config, account_tables).await?;
postgres_target::init(&config.postgres_target, account_tables).await?;
info!("postgres done");
let use_accountsdb = true;

View File

@ -55,10 +55,24 @@ pub struct SlotUpdate {
pub status: String,
}
#[derive(Clone, Debug, Deserialize)]
pub struct PostgresConfig {
pub connection_string: String,
/// Number of parallel postgres connections used for account write insertions
pub account_write_connection_count: u64,
/// Number of queries retries before fatal error
pub retry_query_max_count: u64,
/// Seconds to sleep between query retries
pub retry_query_sleep_secs: u64,
/// Seconds to sleep between connection attempts
pub retry_connection_sleep_secs: u64,
/// Fatal error when the connection can't be reestablished this long
pub fatal_connection_timeout_secs: u64,
}
#[derive(Clone, Debug, Deserialize)]
pub struct Config {
pub postgres_connection_string: String,
pub postgres_account_write_connections: i32,
pub postgres_target: PostgresConfig,
pub grpc_connection_string: String,
pub rpc_http_url: String,
pub rpc_ws_url: String,

View File

@ -3,28 +3,32 @@ use log::*;
use postgres_query::{query, query_dyn};
use std::{collections::HashMap, time::Duration};
use crate::{AccountTables, AccountWrite, Config, SlotUpdate};
use crate::{AccountTables, AccountWrite, PostgresConfig, SlotUpdate};
async fn postgres_connection(
connection_string: &str,
config: &PostgresConfig,
) -> Result<async_channel::Receiver<Option<tokio_postgres::Client>>, anyhow::Error> {
let (tx, rx) = async_channel::unbounded();
let connection_string_c = connection_string.to_string();
let config = config.clone();
let mut initial =
Some(tokio_postgres::connect(&connection_string, tokio_postgres::NoTls).await?);
Some(tokio_postgres::connect(&config.connection_string, tokio_postgres::NoTls).await?);
tokio::spawn(async move {
loop {
let (client, connection) = match initial.take() {
Some(v) => v,
None => {
let result =
tokio_postgres::connect(&connection_string_c, tokio_postgres::NoTls).await;
tokio_postgres::connect(&config.connection_string, tokio_postgres::NoTls)
.await;
match result {
Ok(v) => v,
Err(err) => {
warn!("could not connect to postgres: {:?}", err);
tokio::time::sleep(Duration::from_secs(5)).await;
tokio::time::sleep(Duration::from_secs(
config.retry_connection_sleep_secs,
))
.await;
continue;
}
}
@ -35,7 +39,7 @@ async fn postgres_connection(
let result = connection.await;
tx.send(None).await.expect("send success");
warn!("postgres connection error: {:?}", result);
tokio::time::sleep(Duration::from_secs(5)).await;
tokio::time::sleep(Duration::from_secs(config.retry_connection_sleep_secs)).await;
}
});
@ -45,15 +49,15 @@ async fn postgres_connection(
async fn update_postgres_client<'a>(
client: &'a mut Option<postgres_query::Caching<tokio_postgres::Client>>,
rx: &async_channel::Receiver<Option<tokio_postgres::Client>>,
config: &PostgresConfig,
) -> &'a postgres_query::Caching<tokio_postgres::Client> {
// get the most recent client, waiting if there's a disconnect
while !rx.is_empty() || client.is_none() {
// TODO: timeout configurable
tokio::select! {
client_raw_opt = rx.recv() => {
*client = client_raw_opt.expect("not closed").map(|client| postgres_query::Caching::new(client));
},
_ = tokio::time::sleep(Duration::from_secs(360)) => {
_ = tokio::time::sleep(Duration::from_secs(config.fatal_connection_timeout_secs)) => {
error!("waited too long for new postgres client");
std::process::exit(1);
},
@ -216,7 +220,7 @@ impl SlotsProcessing {
}
pub async fn init(
config: &Config,
config: &PostgresConfig,
account_tables: AccountTables,
) -> Result<
(
@ -232,14 +236,14 @@ pub async fn init(
// slot updates are not parallel because their order matters
let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::<SlotUpdate>();
let postgres_slots = postgres_connection(&config.postgres_connection_string).await?;
let postgres_slots = postgres_connection(&config).await?;
// postgres account write sending worker threads
for _ in 0..config.postgres_account_write_connections {
let postgres_account_writes =
postgres_connection(&config.postgres_connection_string).await?;
for _ in 0..config.account_write_connection_count {
let postgres_account_writes = postgres_connection(&config).await?;
let account_write_queue_receiver_c = account_write_queue_receiver.clone();
let account_tables_c = account_tables.clone();
let config = config.clone();
tokio::spawn(async move {
let mut client_opt = None;
loop {
@ -256,13 +260,15 @@ pub async fn init(
let mut error_count = 0;
loop {
let client =
update_postgres_client(&mut client_opt, &postgres_account_writes).await;
update_postgres_client(&mut client_opt, &postgres_account_writes, &config)
.await;
if let Err(err) = process_account_write(client, &write, &account_tables_c).await
{
error_count += 1;
if error_count - 1 < 3 {
if error_count - 1 < config.retry_query_max_count {
warn!("failed to process account write, retrying: {:?}", err);
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(config.retry_query_sleep_secs))
.await;
continue;
} else {
error!("failed to process account write, exiting");
@ -280,6 +286,7 @@ pub async fn init(
.iter()
.map(|table| table.table_name().to_string())
.collect();
let config = config.clone();
tokio::spawn(async move {
let mut slots_processing = SlotsProcessing::default();
let mut client_opt = None;
@ -299,12 +306,14 @@ pub async fn init(
let mut error_count = 0;
loop {
let client = update_postgres_client(&mut client_opt, &postgres_slots).await;
let client =
update_postgres_client(&mut client_opt, &postgres_slots, &config).await;
if let Err(err) = slots_processing.process(client, &update).await {
error_count += 1;
if error_count - 1 < 3 {
if error_count - 1 < config.retry_query_max_count {
warn!("failed to process slot update, retrying: {:?}", err);
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(config.retry_query_sleep_secs))
.await;
continue;
} else {
error!("failed to process slot update, exiting");