fix: fix connection timeouts

This commit is contained in:
dboures 2023-03-27 12:11:58 -05:00
parent ce4e9d1d39
commit bbf73b164b
No known key found for this signature in database
GPG Key ID: AB3790129D478852
8 changed files with 99 additions and 68 deletions

View File

@ -1,5 +1,5 @@
use chrono::{DateTime, Utc};
use sqlx::{Pool, Postgres};
use sqlx::{pool::PoolConnection, Postgres};
use crate::{
structs::{candle::Candle, openbook::PgOpenBookFill, resolution::Resolution, trader::PgTrader},
@ -7,7 +7,7 @@ use crate::{
};
pub async fn fetch_earliest_fill(
pool: &Pool<Postgres>,
conn: &mut PoolConnection<Postgres>,
market_address_string: &str,
) -> anyhow::Result<Option<PgOpenBookFill>> {
sqlx::query_as!(
@ -25,13 +25,13 @@ pub async fn fetch_earliest_fill(
ORDER BY time asc LIMIT 1"#,
market_address_string
)
.fetch_optional(pool)
.fetch_optional(conn)
.await
.map_err_anyhow()
}
pub async fn fetch_fills_from(
pool: &Pool<Postgres>,
conn: &mut PoolConnection<Postgres>,
market_address_string: &str,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
@ -55,13 +55,13 @@ pub async fn fetch_fills_from(
start_time,
end_time
)
.fetch_all(pool)
.fetch_all(conn)
.await
.map_err_anyhow()
}
pub async fn fetch_latest_finished_candle(
pool: &Pool<Postgres>,
conn: &mut PoolConnection<Postgres>,
market_name: &str,
resolution: Resolution,
) -> anyhow::Result<Option<Candle>> {
@ -86,13 +86,13 @@ pub async fn fetch_latest_finished_candle(
market_name,
resolution.to_string()
)
.fetch_optional(pool)
.fetch_optional(conn)
.await
.map_err_anyhow()
}
pub async fn fetch_earliest_candle(
pool: &Pool<Postgres>,
conn: &mut PoolConnection<Postgres>,
market_name: &str,
resolution: Resolution,
) -> anyhow::Result<Option<Candle>> {
@ -116,13 +116,13 @@ pub async fn fetch_earliest_candle(
market_name,
resolution.to_string()
)
.fetch_optional(pool)
.fetch_optional(conn)
.await
.map_err_anyhow()
}
pub async fn fetch_candles_from(
pool: &Pool<Postgres>,
conn: &mut PoolConnection<Postgres>,
market_name: &str,
resolution: Resolution,
start_time: DateTime<Utc>,
@ -153,13 +153,13 @@ pub async fn fetch_candles_from(
start_time,
end_time
)
.fetch_all(pool)
.fetch_all(conn)
.await
.map_err_anyhow()
}
pub async fn fetch_tradingview_candles(
pool: &Pool<Postgres>,
conn: &mut PoolConnection<Postgres>,
market_name: &str,
resolution: Resolution,
start_time: DateTime<Utc>,
@ -189,13 +189,13 @@ pub async fn fetch_tradingview_candles(
start_time,
end_time
)
.fetch_all(pool)
.fetch_all(conn)
.await
.map_err_anyhow()
}
pub async fn fetch_top_traders_by_base_volume_from(
pool: &Pool<Postgres>,
conn: &mut PoolConnection<Postgres>,
market_address_string: &str,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
@ -225,13 +225,13 @@ LIMIT 10000"#,
start_time,
end_time
)
.fetch_all(pool)
.fetch_all(conn)
.await
.map_err_anyhow()
}
pub async fn fetch_top_traders_by_quote_volume_from(
pool: &Pool<Postgres>,
conn: &mut PoolConnection<Postgres>,
market_address_string: &str,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
@ -261,7 +261,7 @@ LIMIT 10000"#,
start_time,
end_time
)
.fetch_all(pool)
.fetch_all(conn)
.await
.map_err_anyhow()
}

View File

@ -1,10 +1,10 @@
use chrono::Utc;
use sqlx::{Pool, Postgres};
use chrono::{Utc};
use sqlx::{Pool, Postgres, Connection};
use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher},
};
use tokio::sync::mpsc::{error::TryRecvError, Receiver};
use tokio::{sync::mpsc::{error::TryRecvError, Receiver}};
use crate::{
structs::{candle::Candle, openbook::OpenBookFillEventLog},
@ -13,8 +13,9 @@ use crate::{
pub async fn persist_fill_events(
pool: &Pool<Postgres>,
mut fill_receiver: Receiver<OpenBookFillEventLog>,
) {
fill_receiver: &mut Receiver<OpenBookFillEventLog>,
) -> anyhow::Result<()> {
let mut conn = pool.acquire().await.unwrap();
loop {
let mut write_batch = HashMap::new();
while write_batch.len() < 10 {
@ -38,38 +39,61 @@ pub async fn persist_fill_events(
}
if write_batch.len() > 0 {
print!("writing: {:?} events to DB\n", write_batch.len());
let upsert_statement = build_fills_upsert_statement(write_batch);
sqlx::query(&upsert_statement)
.execute(pool)
.await
.map_err_anyhow()
.unwrap();
// print!("writing: {:?} events to DB\n", write_batch.len());
match conn.ping().await {
Ok(_) => {
let upsert_statement = build_fills_upsert_statement(write_batch);
sqlx::query(&upsert_statement)
.execute(&mut conn)
.await
.map_err_anyhow()
.unwrap();
}
Err(_) => {
println!("Fills ping failed");
break;
}
}
}
}
Ok(())
}
pub async fn persist_candles(pool: Pool<Postgres>, mut candles_receiver: Receiver<Vec<Candle>>) {
pub async fn persist_candles(
pool: Pool<Postgres>,
candles_receiver: &mut Receiver<Vec<Candle>>,
) -> anyhow::Result<()> {
let mut conn = pool.acquire().await.unwrap();
loop {
match candles_receiver.try_recv() {
Ok(candles) => {
if candles.len() == 0 {
continue;
}
print!("writing: {:?} candles to DB\n", candles.len());
let upsert_statement = build_candes_upsert_statement(candles);
sqlx::query(&upsert_statement)
.execute(&pool)
.await
.map_err_anyhow()
.unwrap();
match conn.ping().await {
Ok(_) => {
match candles_receiver.try_recv() {
Ok(candles) => {
if candles.len() == 0 {
continue;
}
// print!("writing: {:?} candles to DB\n", candles.len());
let upsert_statement = build_candes_upsert_statement(candles);
sqlx::query(&upsert_statement)
.execute(&mut conn)
.await
.map_err_anyhow()
.unwrap();
}
Err(TryRecvError::Empty) => continue,
Err(TryRecvError::Disconnected) => {
panic!("Candles sender must stay alive")
}
};
}
Err(TryRecvError::Empty) => continue,
Err(TryRecvError::Disconnected) => {
panic!("Candles sender must stay alive")
Err(_) => {
println!("Candle ping failed");
break;
}
};
}
Ok(())
}
fn build_fills_upsert_statement(events: HashMap<OpenBookFillEventLog, u8>) -> String {

View File

@ -1,4 +1,4 @@
pub mod worker;
pub mod database;
pub mod structs;
pub mod utils;
pub mod worker;

View File

@ -34,8 +34,9 @@ pub async fn get_candles(
let from = to_timestampz(info.from);
let to = to_timestampz(info.to);
let mut conn = context.pool.acquire().await.unwrap();
let candles =
match fetch_tradingview_candles(&context.pool, &info.market_name, resolution, from, to)
match fetch_tradingview_candles(&mut conn, &info.market_name, resolution, from, to)
.await
{
Ok(c) => c,

View File

@ -31,8 +31,9 @@ pub async fn get_top_traders_by_base_volume(
let from = to_timestampz(info.from);
let to = to_timestampz(info.to);
let mut conn = context.pool.acquire().await.unwrap();
let raw_traders = match fetch_top_traders_by_base_volume_from(
&context.pool,
&mut conn,
&selected_market.address,
from,
to,
@ -70,8 +71,9 @@ pub async fn get_top_traders_by_quote_volume(
let from = to_timestampz(info.from);
let to = to_timestampz(info.to);
let mut conn = context.pool.acquire().await.unwrap();
let raw_traders = match fetch_top_traders_by_quote_volume_from(
&context.pool,
&mut conn,
&selected_market.address,
from,
to,

View File

@ -1,7 +1,7 @@
use std::cmp::{max, min};
use chrono::{DateTime, Duration, DurationRound, Utc};
use sqlx::{types::Decimal, Pool, Postgres};
use sqlx::{pool::PoolConnection, types::Decimal, Postgres};
use crate::{
database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
@ -14,12 +14,12 @@ use crate::{
};
pub async fn batch_1m_candles(
pool: &Pool<Postgres>,
conn: &mut PoolConnection<Postgres>,
market: &MarketInfo,
) -> anyhow::Result<Vec<Candle>> {
let market_name = &market.name;
let market_address = &market.address;
let latest_candle = fetch_latest_finished_candle(pool, market_name, Resolution::R1m).await?;
let latest_candle = fetch_latest_finished_candle(conn, market_name, Resolution::R1m).await?;
match latest_candle {
Some(candle) => {
@ -28,7 +28,7 @@ pub async fn batch_1m_candles(
start_time + day(),
Utc::now().duration_trunc(Duration::minutes(1))?,
);
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
let mut fills = fetch_fills_from(conn, market_address, start_time, end_time).await?;
let candles = combine_fills_into_1m_candles(
&mut fills,
market,
@ -39,7 +39,7 @@ pub async fn batch_1m_candles(
Ok(candles)
}
None => {
let earliest_fill = fetch_earliest_fill(pool, market_address).await?;
let earliest_fill = fetch_earliest_fill(conn, market_address).await?;
if earliest_fill.is_none() {
println!("No fills found for: {:?}", market_name);
@ -54,7 +54,7 @@ pub async fn batch_1m_candles(
start_time + day(),
Utc::now().duration_trunc(Duration::minutes(1))?,
);
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
let mut fills = fetch_fills_from(conn, market_address, start_time, end_time).await?;
let candles =
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
Ok(candles)

View File

@ -2,7 +2,7 @@ pub mod higher_order_candles;
pub mod minute_candles;
use chrono::Duration;
use sqlx::{Pool, Postgres};
use sqlx::{pool::PoolConnection, Pool, Postgres};
use strum::IntoEnumIterator;
use tokio::{sync::mpsc::Sender, time::sleep};
@ -17,14 +17,14 @@ pub async fn batch_for_market(
pool: Pool<Postgres>,
candles_sender: &Sender<Vec<Candle>>,
market: &MarketInfo,
) {
) -> anyhow::Result<()> {
loop {
let sender = candles_sender.clone();
let pool_clone = pool.clone();
let market_clone = market.clone();
let mut conn = pool.acquire().await?;
loop {
sleep(Duration::milliseconds(2000).to_std().unwrap()).await;
match batch_inner(&pool_clone, &sender, &market_clone).await {
sleep(Duration::milliseconds(2000).to_std()?).await;
match batch_inner(&mut conn, &sender, &market_clone).await {
Ok(_) => {}
Err(e) => {
println!(
@ -41,19 +41,19 @@ pub async fn batch_for_market(
}
async fn batch_inner(
pool: &Pool<Postgres>,
conn: &mut PoolConnection<Postgres>,
candles_sender: &Sender<Vec<Candle>>,
market: &MarketInfo,
) -> anyhow::Result<()> {
let market_name = &market.name.clone();
let candles = batch_1m_candles(pool, market).await?;
let candles = batch_1m_candles(conn, market).await?;
send_candles(candles, candles_sender).await;
for resolution in Resolution::iter() {
if resolution == Resolution::R1m {
continue;
}
let candles = batch_higher_order_candles(pool, market_name, resolution).await?;
let candles = batch_higher_order_candles(conn, market_name, resolution).await?;
send_candles(candles, candles_sender).await;
}
Ok(())

View File

@ -46,7 +46,7 @@ async fn main() -> anyhow::Result<()> {
setup_database(&pool).await?;
let mut handles = vec![];
let (fill_sender, fill_receiver) = mpsc::channel::<OpenBookFillEventLog>(1000);
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEventLog>(1000);
handles.push(tokio::spawn(async move {
scrape(&config, &fill_sender, &target_markets).await; //TODO: send the vec, it's okay
@ -54,23 +54,27 @@ async fn main() -> anyhow::Result<()> {
let fills_pool = pool.clone();
handles.push(tokio::spawn(async move {
persist_fill_events(&fills_pool, fill_receiver).await;
loop {
persist_fill_events(&fills_pool, &mut fill_receiver).await.unwrap();
}
}));
let (candle_sender, candle_receiver) = mpsc::channel::<Vec<Candle>>(1000);
let (candle_sender, mut candle_receiver) = mpsc::channel::<Vec<Candle>>(1000);
for market in market_infos.into_iter() {
let sender = candle_sender.clone();
let batch_pool = pool.clone();
handles.push(tokio::spawn(async move {
batch_for_market(batch_pool, &sender, &market).await;
batch_for_market(batch_pool, &sender, &market).await.unwrap();
println!("SOMETHING WENT WRONG");
}));
}
let persist_pool = pool.clone();
handles.push(tokio::spawn(async move {
persist_candles(persist_pool, candle_receiver).await;
loop {
persist_candles(persist_pool.clone(), &mut candle_receiver).await.unwrap();
}
}));
futures::future::join_all(handles).await;