diff --git a/src/bridge.rs b/src/bridge.rs index fe548fcf..97c0d776 100644 --- a/src/bridge.rs +++ b/src/bridge.rs @@ -1,11 +1,11 @@ use crate::{ configs::{IsBlockHashValidConfig, SendTransactionConfig}, encoding::BinaryEncoding, - postgres::Postgres, rpc::LiteRpcServer, tpu_manager::TpuManager, workers::{ - BlockInformation, BlockListener, Cleaner, MetricsCapture, TxSender, WireTransaction, + BlockInformation, BlockListener, Cleaner, MetricsCapture, Postgres, TxSender, + WireTransaction, }, }; @@ -100,9 +100,12 @@ impl LiteBridge { tx_send_interval: Duration, clean_interval: Duration, postgres_config: &str, - ) -> anyhow::Result<[JoinHandle>; 8]> { + ) -> anyhow::Result<[JoinHandle>; 9]> { + let (postgres_send, postgres_recv) = mpsc::unbounded_channel(); let (postgres_connection, postgres) = Postgres::new(postgres_config).await?; + let postgres = postgres.start(postgres_recv); + let (tx_send, tx_recv) = mpsc::unbounded_channel(); self.tx_send = Some(tx_send); @@ -110,7 +113,7 @@ impl LiteBridge { tx_recv, tx_batch_size, tx_send_interval, - Some(postgres.clone()), + Some(postgres_send.clone()), ); let metrics_capture = MetricsCapture::new(self.tx_sender.clone()); @@ -118,7 +121,7 @@ impl LiteBridge { let finalized_block_listenser = self .finalized_block_listenser .clone() - .listen(Some(postgres.clone())); + .listen(Some(postgres_send.clone())); let confirmed_block_listenser = self.confirmed_block_listenser.clone().listen(None); @@ -168,7 +171,8 @@ impl LiteBridge { finalized_block_listenser, confirmed_block_listenser, postgres_connection, - metrics_capture.capture(Some(postgres)), + postgres, + metrics_capture.capture(Some(postgres_send)), cleaner, ]) } @@ -205,6 +209,7 @@ impl LiteRpcServer for LiteBridge { .confirmed_block_listenser .get_block_info(&tx.get_recent_blockhash().to_string()) .await else { + log::warn!("block"); return Err(jsonrpsee::core::Error::Custom("Blockhash not found in confirmed block store".to_string())); }; diff --git a/src/lib.rs b/src/lib.rs index 71cbb6ee..4bb5e821 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,7 +6,6 @@ pub mod cli; pub mod configs; pub mod encoding; pub mod errors; -pub mod postgres; pub mod rpc; pub mod tpu_manager; pub mod workers; diff --git a/src/workers/block_listenser.rs b/src/workers/block_listenser.rs index 361b705a..deb44150 100644 --- a/src/workers/block_listenser.rs +++ b/src/workers/block_listenser.rs @@ -21,9 +21,9 @@ use tokio::{ task::JoinHandle, }; -use crate::postgres::{Postgres, PostgresBlock}; +use crate::workers::{PostgresBlock, PostgresMsg}; -use super::{TxProps, TxSender}; +use super::{PostgresMpscSend, TxProps, TxSender}; /// Background worker which listen's to new blocks /// and keeps a track of confirmed txs @@ -121,7 +121,7 @@ impl BlockListener { self.signature_subscribers.remove(&signature); } - pub fn listen(self, postgres: Option) -> JoinHandle> { + pub fn listen(self, postgres: Option) -> JoinHandle> { tokio::spawn(async move { info!("Subscribing to blocks"); @@ -176,13 +176,12 @@ impl BlockListener { if let Some(postgres) = &postgres { postgres - .send_block(PostgresBlock { + .send(PostgresMsg::PostgresBlock(PostgresBlock { slot: slot as i64, leader_id: 0, //FIX: parent_slot: parent_slot as i64, - }) - .await - .unwrap(); + })) + .expect("Error sending block to postgres service"); } for tx in transactions { diff --git a/src/workers/metrics_capture.rs b/src/workers/metrics_capture.rs index 403da3f5..46f3f103 100644 --- a/src/workers/metrics_capture.rs +++ b/src/workers/metrics_capture.rs @@ -2,9 +2,7 @@ use log::{info, warn}; use solana_transaction_status::TransactionConfirmationStatus; use tokio::{sync::RwLock, task::JoinHandle}; -use crate::postgres::Postgres; - -use super::TxSender; +use super::{PostgresMpscSend, TxSender}; use serde::{Deserialize, Serialize}; /// Background worker which captures metrics @@ -36,7 +34,7 @@ impl MetricsCapture { self.metrics.read().await.to_owned() } - pub fn capture(self, postgres: Option) -> JoinHandle> { + pub fn capture(self, postgres: Option) -> JoinHandle> { let mut one_second = tokio::time::interval(std::time::Duration::from_secs(1)); tokio::spawn(async move { @@ -85,7 +83,7 @@ impl MetricsCapture { }; if let Some(_postgres) = &postgres { - // postgres.send_metrics(metrics.clone()).await?; + // postgres.send_metrics(metrics.clone()).await?; } } }) diff --git a/src/workers/mod.rs b/src/workers/mod.rs index 6b6d9535..c65c8290 100644 --- a/src/workers/mod.rs +++ b/src/workers/mod.rs @@ -1,9 +1,11 @@ mod block_listenser; mod cleaner; mod metrics_capture; +mod postgres; mod tx_sender; pub use block_listenser::*; pub use cleaner::*; pub use metrics_capture::*; +pub use postgres::*; pub use tx_sender::*; diff --git a/src/postgres.rs b/src/workers/postgres.rs similarity index 71% rename from src/postgres.rs rename to src/workers/postgres.rs index 60110d16..8db95faa 100644 --- a/src/postgres.rs +++ b/src/workers/postgres.rs @@ -1,10 +1,14 @@ use std::sync::Arc; -use anyhow::Context; +use anyhow::{bail, Context, Ok}; use log::{info, warn}; use postgres_native_tls::MakeTlsConnector; -use tokio::fs; -use tokio::task::JoinHandle; + +use tokio::{ + fs, + sync::mpsc::{UnboundedReceiver, UnboundedSender}, + task::JoinHandle, +}; use tokio_postgres::Client; use native_tls::{Certificate, Identity, TlsConnector}; @@ -14,6 +18,7 @@ pub struct Postgres { client: Arc, } +#[derive(Debug)] pub struct PostgresTx { pub signature: String, pub recent_slot: i64, @@ -24,17 +29,29 @@ pub struct PostgresTx { pub quic_response: i16, } +#[derive(Debug)] pub struct PostgresBlock { pub slot: i64, pub leader_id: i64, pub parent_slot: i64, } +#[derive(Debug)] pub struct PostgreAccountAddr { pub id: u32, pub addr: String, } +#[derive(Debug)] +pub enum PostgresMsg { + PostgresTx(PostgresTx), + PostgresBlock(PostgresBlock), + PostgreAccountAddr(PostgreAccountAddr), +} + +pub type PostgresMpscRecv = UnboundedReceiver; +pub type PostgresMpscSend = UnboundedSender; + impl Postgres { /// # Return /// (connection join handle, Self) @@ -111,4 +128,25 @@ impl Postgres { Ok(()) } + + pub fn start(self, mut recv: PostgresMpscRecv) -> JoinHandle> { + tokio::spawn(async move { + info!("Writing to postgres"); + + while let Some(msg) = recv.recv().await { + let Err(err) = ( + match msg { + PostgresMsg::PostgresTx(tx) => self.send_tx(tx).await, + PostgresMsg::PostgresBlock(block) => self.send_block(block).await, + PostgresMsg::PostgreAccountAddr(_) => todo!(), + } ) else { + continue; + }; + + warn!("Error writing to postgres {err}"); + } + + bail!("Postgres channel closed") + }) + } } diff --git a/src/workers/tx_sender.rs b/src/workers/tx_sender.rs index feda125d..05950b79 100644 --- a/src/workers/tx_sender.rs +++ b/src/workers/tx_sender.rs @@ -14,10 +14,12 @@ use tokio::{ }; use crate::{ - postgres::{Postgres, PostgresTx}, tpu_manager::TpuManager, + workers::{PostgresMsg, PostgresTx}, }; +use super::PostgresMpscSend; + pub type WireTransaction = Vec; /// Retry transactions to a maximum of `u16` times, keep a track of confirmed transactions @@ -56,13 +58,13 @@ impl TxSender { /// retry enqued_tx(s) async fn forward_txs( &self, - sigs: Vec<(String, u64)>, + sigs_and_slots: Vec<(String, u64)>, txs: Vec, - postgres: Option, + postgres: Option, ) { - assert_eq!(sigs.len(), txs.len()); + assert_eq!(sigs_and_slots.len(), txs.len()); - if sigs.is_empty() { + if sigs_and_slots.is_empty() { return; } @@ -70,9 +72,10 @@ impl TxSender { let txs_sent = self.txs_sent.clone(); tokio::spawn(async move { + warn!("sending"); let quic_response = match tpu_client.try_send_wire_transaction_batch(txs).await { Ok(_) => { - for (sig, _) in &sigs { + for (sig, _) in &sigs_and_slots { txs_sent.insert(sig.to_owned(), TxProps::default()); } 1 @@ -84,9 +87,9 @@ impl TxSender { }; if let Some(postgres) = postgres { - for (sig, recent_slot) in sigs { + for (sig, recent_slot) in sigs_and_slots { postgres - .send_tx(PostgresTx { + .send(PostgresMsg::PostgresTx(PostgresTx { signature: sig.clone(), recent_slot: recent_slot as i64, forwarded_slot: 0, // FIX: figure this out @@ -94,9 +97,8 @@ impl TxSender { cu_consumed: None, // FIX: figure this out cu_requested: None, // FIX: figure this out quic_response, - }) - .await - .unwrap(); + })) + .expect("Error writing to postgres service"); } } }); @@ -108,7 +110,7 @@ impl TxSender { mut recv: UnboundedReceiver<(String, WireTransaction, u64)>, tx_batch_size: usize, tx_send_interval: Duration, - postgres: Option, + postgres_send: Option, ) -> JoinHandle> { tokio::spawn(async move { info!( @@ -125,6 +127,7 @@ impl TxSender { while (prev_inst.elapsed() < tx_send_interval) || txs.len() == tx_batch_size { match recv.try_recv() { Ok((sig, tx, slot)) => { + log::warn!("recv"); sigs_and_slots.push((sig, slot)); txs.push(tx); } @@ -135,7 +138,7 @@ impl TxSender { } } - self.forward_txs(sigs_and_slots, txs, postgres.clone()) + self.forward_txs(sigs_and_slots, txs, postgres_send.clone()) .await; } })