working and logging

This commit is contained in:
aniketfuryrocks 2023-01-29 02:47:23 +05:30
parent 04deb4d22a
commit fa655bef33
No known key found for this signature in database
GPG Key ID: FA6BFCFAA7D4B764
7 changed files with 79 additions and 35 deletions

View File

@ -1,11 +1,11 @@
use crate::{
configs::{IsBlockHashValidConfig, SendTransactionConfig},
encoding::BinaryEncoding,
postgres::Postgres,
rpc::LiteRpcServer,
tpu_manager::TpuManager,
workers::{
BlockInformation, BlockListener, Cleaner, MetricsCapture, TxSender, WireTransaction,
BlockInformation, BlockListener, Cleaner, MetricsCapture, Postgres, TxSender,
WireTransaction,
},
};
@ -100,9 +100,12 @@ impl LiteBridge {
tx_send_interval: Duration,
clean_interval: Duration,
postgres_config: &str,
) -> anyhow::Result<[JoinHandle<anyhow::Result<()>>; 8]> {
) -> anyhow::Result<[JoinHandle<anyhow::Result<()>>; 9]> {
let (postgres_send, postgres_recv) = mpsc::unbounded_channel();
let (postgres_connection, postgres) = Postgres::new(postgres_config).await?;
let postgres = postgres.start(postgres_recv);
let (tx_send, tx_recv) = mpsc::unbounded_channel();
self.tx_send = Some(tx_send);
@ -110,7 +113,7 @@ impl LiteBridge {
tx_recv,
tx_batch_size,
tx_send_interval,
Some(postgres.clone()),
Some(postgres_send.clone()),
);
let metrics_capture = MetricsCapture::new(self.tx_sender.clone());
@ -118,7 +121,7 @@ impl LiteBridge {
let finalized_block_listenser = self
.finalized_block_listenser
.clone()
.listen(Some(postgres.clone()));
.listen(Some(postgres_send.clone()));
let confirmed_block_listenser = self.confirmed_block_listenser.clone().listen(None);
@ -168,7 +171,8 @@ impl LiteBridge {
finalized_block_listenser,
confirmed_block_listenser,
postgres_connection,
metrics_capture.capture(Some(postgres)),
postgres,
metrics_capture.capture(Some(postgres_send)),
cleaner,
])
}
@ -205,6 +209,7 @@ impl LiteRpcServer for LiteBridge {
.confirmed_block_listenser
.get_block_info(&tx.get_recent_blockhash().to_string())
.await else {
log::warn!("block");
return Err(jsonrpsee::core::Error::Custom("Blockhash not found in confirmed block store".to_string()));
};

View File

@ -6,7 +6,6 @@ pub mod cli;
pub mod configs;
pub mod encoding;
pub mod errors;
pub mod postgres;
pub mod rpc;
pub mod tpu_manager;
pub mod workers;

View File

@ -21,9 +21,9 @@ use tokio::{
task::JoinHandle,
};
use crate::postgres::{Postgres, PostgresBlock};
use crate::workers::{PostgresBlock, PostgresMsg};
use super::{TxProps, TxSender};
use super::{PostgresMpscSend, TxProps, TxSender};
/// Background worker which listen's to new blocks
/// and keeps a track of confirmed txs
@ -121,7 +121,7 @@ impl BlockListener {
self.signature_subscribers.remove(&signature);
}
pub fn listen(self, postgres: Option<Postgres>) -> JoinHandle<anyhow::Result<()>> {
pub fn listen(self, postgres: Option<PostgresMpscSend>) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
info!("Subscribing to blocks");
@ -176,13 +176,12 @@ impl BlockListener {
if let Some(postgres) = &postgres {
postgres
.send_block(PostgresBlock {
.send(PostgresMsg::PostgresBlock(PostgresBlock {
slot: slot as i64,
leader_id: 0, //FIX:
parent_slot: parent_slot as i64,
})
.await
.unwrap();
}))
.expect("Error sending block to postgres service");
}
for tx in transactions {

View File

@ -2,9 +2,7 @@ use log::{info, warn};
use solana_transaction_status::TransactionConfirmationStatus;
use tokio::{sync::RwLock, task::JoinHandle};
use crate::postgres::Postgres;
use super::TxSender;
use super::{PostgresMpscSend, TxSender};
use serde::{Deserialize, Serialize};
/// Background worker which captures metrics
@ -36,7 +34,7 @@ impl MetricsCapture {
self.metrics.read().await.to_owned()
}
pub fn capture(self, postgres: Option<Postgres>) -> JoinHandle<anyhow::Result<()>> {
pub fn capture(self, postgres: Option<PostgresMpscSend>) -> JoinHandle<anyhow::Result<()>> {
let mut one_second = tokio::time::interval(std::time::Duration::from_secs(1));
tokio::spawn(async move {
@ -85,7 +83,7 @@ impl MetricsCapture {
};
if let Some(_postgres) = &postgres {
// postgres.send_metrics(metrics.clone()).await?;
// postgres.send_metrics(metrics.clone()).await?;
}
}
})

View File

@ -1,9 +1,11 @@
mod block_listenser;
mod cleaner;
mod metrics_capture;
mod postgres;
mod tx_sender;
pub use block_listenser::*;
pub use cleaner::*;
pub use metrics_capture::*;
pub use postgres::*;
pub use tx_sender::*;

View File

@ -1,10 +1,14 @@
use std::sync::Arc;
use anyhow::Context;
use anyhow::{bail, Context, Ok};
use log::{info, warn};
use postgres_native_tls::MakeTlsConnector;
use tokio::fs;
use tokio::task::JoinHandle;
use tokio::{
fs,
sync::mpsc::{UnboundedReceiver, UnboundedSender},
task::JoinHandle,
};
use tokio_postgres::Client;
use native_tls::{Certificate, Identity, TlsConnector};
@ -14,6 +18,7 @@ pub struct Postgres {
client: Arc<Client>,
}
#[derive(Debug)]
pub struct PostgresTx {
pub signature: String,
pub recent_slot: i64,
@ -24,17 +29,29 @@ pub struct PostgresTx {
pub quic_response: i16,
}
#[derive(Debug)]
pub struct PostgresBlock {
pub slot: i64,
pub leader_id: i64,
pub parent_slot: i64,
}
#[derive(Debug)]
pub struct PostgreAccountAddr {
pub id: u32,
pub addr: String,
}
#[derive(Debug)]
pub enum PostgresMsg {
PostgresTx(PostgresTx),
PostgresBlock(PostgresBlock),
PostgreAccountAddr(PostgreAccountAddr),
}
pub type PostgresMpscRecv = UnboundedReceiver<PostgresMsg>;
pub type PostgresMpscSend = UnboundedSender<PostgresMsg>;
impl Postgres {
/// # Return
/// (connection join handle, Self)
@ -111,4 +128,25 @@ impl Postgres {
Ok(())
}
pub fn start(self, mut recv: PostgresMpscRecv) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
info!("Writing to postgres");
while let Some(msg) = recv.recv().await {
let Err(err) = (
match msg {
PostgresMsg::PostgresTx(tx) => self.send_tx(tx).await,
PostgresMsg::PostgresBlock(block) => self.send_block(block).await,
PostgresMsg::PostgreAccountAddr(_) => todo!(),
} ) else {
continue;
};
warn!("Error writing to postgres {err}");
}
bail!("Postgres channel closed")
})
}
}

View File

@ -14,10 +14,12 @@ use tokio::{
};
use crate::{
postgres::{Postgres, PostgresTx},
tpu_manager::TpuManager,
workers::{PostgresMsg, PostgresTx},
};
use super::PostgresMpscSend;
pub type WireTransaction = Vec<u8>;
/// Retry transactions to a maximum of `u16` times, keep a track of confirmed transactions
@ -56,13 +58,13 @@ impl TxSender {
/// retry enqued_tx(s)
async fn forward_txs(
&self,
sigs: Vec<(String, u64)>,
sigs_and_slots: Vec<(String, u64)>,
txs: Vec<WireTransaction>,
postgres: Option<Postgres>,
postgres: Option<PostgresMpscSend>,
) {
assert_eq!(sigs.len(), txs.len());
assert_eq!(sigs_and_slots.len(), txs.len());
if sigs.is_empty() {
if sigs_and_slots.is_empty() {
return;
}
@ -70,9 +72,10 @@ impl TxSender {
let txs_sent = self.txs_sent.clone();
tokio::spawn(async move {
warn!("sending");
let quic_response = match tpu_client.try_send_wire_transaction_batch(txs).await {
Ok(_) => {
for (sig, _) in &sigs {
for (sig, _) in &sigs_and_slots {
txs_sent.insert(sig.to_owned(), TxProps::default());
}
1
@ -84,9 +87,9 @@ impl TxSender {
};
if let Some(postgres) = postgres {
for (sig, recent_slot) in sigs {
for (sig, recent_slot) in sigs_and_slots {
postgres
.send_tx(PostgresTx {
.send(PostgresMsg::PostgresTx(PostgresTx {
signature: sig.clone(),
recent_slot: recent_slot as i64,
forwarded_slot: 0, // FIX: figure this out
@ -94,9 +97,8 @@ impl TxSender {
cu_consumed: None, // FIX: figure this out
cu_requested: None, // FIX: figure this out
quic_response,
})
.await
.unwrap();
}))
.expect("Error writing to postgres service");
}
}
});
@ -108,7 +110,7 @@ impl TxSender {
mut recv: UnboundedReceiver<(String, WireTransaction, u64)>,
tx_batch_size: usize,
tx_send_interval: Duration,
postgres: Option<Postgres>,
postgres_send: Option<PostgresMpscSend>,
) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
info!(
@ -125,6 +127,7 @@ impl TxSender {
while (prev_inst.elapsed() < tx_send_interval) || txs.len() == tx_batch_size {
match recv.try_recv() {
Ok((sig, tx, slot)) => {
log::warn!("recv");
sigs_and_slots.push((sig, slot));
txs.push(tx);
}
@ -135,7 +138,7 @@ impl TxSender {
}
}
self.forward_txs(sigs_and_slots, txs, postgres.clone())
self.forward_txs(sigs_and_slots, txs, postgres_send.clone())
.await;
}
})