diff --git a/migrations/create.sql b/migrations/create.sql index f355dfd0..5463135f 100644 --- a/migrations/create.sql +++ b/migrations/create.sql @@ -1,12 +1,12 @@ CREATE TABLE lite_rpc.Txs ( id SERIAL NOT NULL PRIMARY KEY, - signature BINARY(64) NOT NULL, + signature CHAR(88) NOT NULL, recent_slot BIGINT NOT NULL, forwarded_slot BIGINT NOT NULL, processed_slot BIGINT, cu_consumed BIGINT, cu_requested BIGINT, - quic_response CHAR + quic_response SMALLINT ); diff --git a/src/bridge.rs b/src/bridge.rs index d2f3942e..2cb47299 100644 --- a/src/bridge.rs +++ b/src/bridge.rs @@ -108,7 +108,7 @@ impl LiteBridge { let confirmed_block_listenser = self .confirmed_block_listenser .clone() - .listen(Some(postgres.clone())); + .listen(None); let cleaner = Cleaner::new( self.tx_sender.clone(), diff --git a/src/postgres.rs b/src/postgres.rs index e17045bc..60110d16 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use anyhow::Context; -use log::info; +use log::{info, warn}; use postgres_native_tls::MakeTlsConnector; use tokio::fs; use tokio::task::JoinHandle; @@ -14,14 +14,14 @@ pub struct Postgres { client: Arc, } -pub struct PostgresTx<'a> { - pub signature: &'a [u8], +pub struct PostgresTx { + pub signature: String, pub recent_slot: i64, pub forwarded_slot: i64, pub processed_slot: Option, pub cu_consumed: Option, pub cu_requested: Option, - pub quic_response: u32, + pub quic_response: i16, } pub struct PostgresBlock { @@ -86,7 +86,7 @@ impl Postgres { Ok(()) } - pub async fn send_tx<'a>(&self, tx: PostgresTx<'a>) -> anyhow::Result<()> { + pub async fn send_tx(&self, tx: PostgresTx) -> anyhow::Result<()> { let PostgresTx { signature, recent_slot, @@ -97,12 +97,14 @@ impl Postgres { quic_response, } = tx; + warn!("{}", signature.len()); + self.client.execute( r#" INSERT INTO lite_rpc.Txs (signature, recent_slot, forwarded_slot, processed_slot, cu_consumed, cu_requested, quic_response) VALUES - ($1, $2, $3, $4, $5, $6) + ($1, $2, $3, $4, $5, $6, $7) "#, &[&signature, &recent_slot, &forwarded_slot, &processed_slot, &cu_consumed, &cu_requested, &quic_response], ).await?; diff --git a/src/workers/block_listenser.rs b/src/workers/block_listenser.rs index 8471f5d9..7a0374e5 100644 --- a/src/workers/block_listenser.rs +++ b/src/workers/block_listenser.rs @@ -153,11 +153,14 @@ impl BlockListener { }; if let Some(postgres) = &postgres { - postgres.send_block(PostgresBlock { - slot: slot as i64, - leader_id: 0, //FIX: - parent_slot: 0, //FIX: - }).await?; + postgres + .send_block(PostgresBlock { + slot: slot as i64, + leader_id: 0, //FIX: + parent_slot: 0, //FIX: + }) + .await + .unwrap(); } for tx in transactions { @@ -175,15 +178,18 @@ impl BlockListener { if let Some(mut tx_status) = self.tx_sender.txs_sent.get_mut(&sig) { if let Some(postgres) = &postgres { - postgres.send_tx(PostgresTx { - signature: tx.get_signature().as_ref(), - recent_slot: slot as i64, - forwarded_slot: 0, - processed_slot: None, - cu_consumed: None, - cu_requested: None, - quic_response: 0, - }).await?; + postgres + .send_tx(PostgresTx { + signature: sig.clone(), + recent_slot: slot as i64, + forwarded_slot: 0, + processed_slot: None, + cu_consumed: None, + cu_requested: None, + quic_response: 0, + }) + .await + .unwrap(); } tx_status.value_mut().status = Some(TransactionStatus {