Merge pull request #227 from blockworks-foundation/postgres_saving_blocks

Saving block and transactions in postgres table
This commit is contained in:
galactus 2023-10-09 10:33:23 +02:00 committed by GitHub
commit 53a84b17dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 315 additions and 156 deletions

1
Cargo.lock generated
View File

@ -4307,6 +4307,7 @@ dependencies = [
"bincode", "bincode",
"chrono", "chrono",
"dashmap", "dashmap",
"itertools",
"log", "log",
"native-tls", "native-tls",
"postgres-native-tls", "postgres-native-tls",

View File

@ -1,4 +1,5 @@
use crate::structures::produced_block::ProducedBlock; use crate::structures::produced_block::ProducedBlock;
use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use solana_rpc_client_api::config::RpcBlockConfig; use solana_rpc_client_api::config::RpcBlockConfig;
use solana_sdk::slot_history::Slot; use solana_sdk::slot_history::Slot;
@ -7,11 +8,12 @@ use std::{ops::Range, sync::Arc};
#[async_trait] #[async_trait]
pub trait BlockStorageInterface: Send + Sync { pub trait BlockStorageInterface: Send + Sync {
// will save a block // will save a block
async fn save(&self, block: ProducedBlock); async fn save(&self, block: ProducedBlock) -> Result<()>;
// will get a block // will get a block
async fn get(&self, slot: Slot, config: RpcBlockConfig) -> Option<ProducedBlock>; async fn get(&self, slot: Slot, config: RpcBlockConfig) -> Result<ProducedBlock>;
// will get range of slots that are stored in the storage // will get range of slots that are stored in the storage
async fn get_slot_range(&self) -> Range<Slot>; async fn get_slot_range(&self) -> Range<Slot>;
} }
pub type BlockStorageImpl = Arc<dyn BlockStorageInterface>; pub type BlockStorageImpl = Arc<dyn BlockStorageInterface>;
pub const BLOCK_NOT_FOUND: &str = "Block not found";

View File

@ -25,4 +25,5 @@ log = {workspace = true}
chrono = {workspace = true} chrono = {workspace = true}
bincode = {workspace = true} bincode = {workspace = true}
base64 = {workspace = true} base64 = {workspace = true}
itertools = {workspace = true}
tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] } tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] }

View File

@ -1,7 +1,8 @@
use async_trait::async_trait; use async_trait::async_trait;
use solana_lite_rpc_core::{ use solana_lite_rpc_core::{
commitment_utils::Commitment, structures::produced_block::ProducedBlock, commitment_utils::Commitment,
traits::block_storage_interface::BlockStorageInterface, structures::produced_block::ProducedBlock,
traits::block_storage_interface::{BlockStorageInterface, BLOCK_NOT_FOUND},
}; };
use solana_rpc_client_api::config::RpcBlockConfig; use solana_rpc_client_api::config::RpcBlockConfig;
use solana_sdk::slot_history::Slot; use solana_sdk::slot_history::Slot;
@ -52,12 +53,18 @@ impl InmemoryBlockStore {
#[async_trait] #[async_trait]
impl BlockStorageInterface for InmemoryBlockStore { impl BlockStorageInterface for InmemoryBlockStore {
async fn save(&self, block: ProducedBlock) { async fn save(&self, block: ProducedBlock) -> anyhow::Result<()> {
self.store(block).await; self.store(block).await;
Ok(())
} }
async fn get(&self, slot: Slot, _: RpcBlockConfig) -> Option<ProducedBlock> { async fn get(&self, slot: Slot, _: RpcBlockConfig) -> anyhow::Result<ProducedBlock> {
self.block_storage.read().await.get(&slot).cloned() self.block_storage
.read()
.await
.get(&slot)
.cloned()
.ok_or(anyhow::Error::msg(BLOCK_NOT_FOUND))
} }
async fn get_slot_range(&self) -> Range<Slot> { async fn get_slot_range(&self) -> Range<Slot> {

View File

@ -4,11 +4,12 @@
// Fetches legacy blocks from faithful // Fetches legacy blocks from faithful
use crate::block_stores::inmemory_block_store::InmemoryBlockStore; use crate::block_stores::inmemory_block_store::InmemoryBlockStore;
use anyhow::{bail, Result};
use async_trait::async_trait; use async_trait::async_trait;
use solana_lite_rpc_core::{ use solana_lite_rpc_core::{
commitment_utils::Commitment, commitment_utils::Commitment,
structures::produced_block::ProducedBlock, structures::produced_block::ProducedBlock,
traits::block_storage_interface::{BlockStorageImpl, BlockStorageInterface}, traits::block_storage_interface::{BlockStorageImpl, BlockStorageInterface, BLOCK_NOT_FOUND},
}; };
use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::config::RpcBlockConfig; use solana_rpc_client_api::config::RpcBlockConfig;
@ -42,7 +43,7 @@ impl MultipleStrategyBlockStorage {
} }
} }
pub async fn get_in_memory_block(&self, slot: Slot) -> Option<ProducedBlock> { pub async fn get_in_memory_block(&self, slot: Slot) -> anyhow::Result<ProducedBlock> {
self.inmemory_for_storage self.inmemory_for_storage
.get( .get(
slot, slot,
@ -60,46 +61,47 @@ impl MultipleStrategyBlockStorage {
#[async_trait] #[async_trait]
impl BlockStorageInterface for MultipleStrategyBlockStorage { impl BlockStorageInterface for MultipleStrategyBlockStorage {
async fn save(&self, block: ProducedBlock) { async fn save(&self, block: ProducedBlock) -> Result<()> {
let slot = block.slot; let slot = block.slot;
let commitment = Commitment::from(block.commitment_config); let commitment = Commitment::from(block.commitment_config);
match commitment { match commitment {
Commitment::Confirmed | Commitment::Processed => { Commitment::Confirmed | Commitment::Processed => {
self.inmemory_for_storage.save(block).await; self.inmemory_for_storage.save(block).await?;
} }
Commitment::Finalized => { Commitment::Finalized => {
let block_in_mem = self.get_in_memory_block(block.slot).await; let block_in_mem = self.get_in_memory_block(block.slot).await;
match block_in_mem { match block_in_mem {
Some(block_in_mem) => { Ok(block_in_mem) => {
// check if inmemory blockhash is same as finalized, update it if they are not // check if inmemory blockhash is same as finalized, update it if they are not
// we can have two machines with same identity publishing two different blocks on same slot // we can have two machines with same identity publishing two different blocks on same slot
if block_in_mem.blockhash != block.blockhash { if block_in_mem.blockhash != block.blockhash {
self.inmemory_for_storage.save(block.clone()).await; self.inmemory_for_storage.save(block.clone()).await?;
} }
} }
None => self.inmemory_for_storage.save(block.clone()).await, Err(_) => self.inmemory_for_storage.save(block.clone()).await?,
} }
self.persistent_block_storage.save(block).await; self.persistent_block_storage.save(block).await?;
} }
}; };
if slot > self.last_confirmed_slot.load(Ordering::Relaxed) { if slot > self.last_confirmed_slot.load(Ordering::Relaxed) {
self.last_confirmed_slot.store(slot, Ordering::Relaxed); self.last_confirmed_slot.store(slot, Ordering::Relaxed);
} }
Ok(())
} }
async fn get( async fn get(
&self, &self,
slot: solana_sdk::slot_history::Slot, slot: solana_sdk::slot_history::Slot,
config: RpcBlockConfig, config: RpcBlockConfig,
) -> Option<ProducedBlock> { ) -> Result<ProducedBlock> {
let last_confirmed_slot = self.last_confirmed_slot.load(Ordering::Relaxed); let last_confirmed_slot = self.last_confirmed_slot.load(Ordering::Relaxed);
if slot > last_confirmed_slot { if slot > last_confirmed_slot {
None bail!(BLOCK_NOT_FOUND);
} else { } else {
let range = self.inmemory_for_storage.get_slot_range().await; let range = self.inmemory_for_storage.get_slot_range().await;
if range.contains(&slot) { if range.contains(&slot) {
let block = self.inmemory_for_storage.get(slot, config).await; let block = self.inmemory_for_storage.get(slot, config).await;
if block.is_some() { if block.is_ok() {
return block; return block;
} }
} }
@ -113,15 +115,15 @@ impl BlockStorageInterface for MultipleStrategyBlockStorage {
.get_block_with_config(slot, config) .get_block_with_config(slot, config)
.await .await
{ {
Ok(block) => Some(ProducedBlock::from_ui_block( Ok(block) => Ok(ProducedBlock::from_ui_block(
block, block,
slot, slot,
CommitmentConfig::finalized(), CommitmentConfig::finalized(),
)), )),
Err(_) => None, Err(_) => bail!(BLOCK_NOT_FOUND),
} }
} else { } else {
None bail!(BLOCK_NOT_FOUND);
} }
} }
} }

View File

@ -1 +1,101 @@
use std::sync::Arc;
use anyhow::Result;
use async_trait::async_trait;
use itertools::Itertools;
use solana_lite_rpc_core::{
structures::{epoch::EpochCache, produced_block::ProducedBlock},
traits::block_storage_interface::BlockStorageInterface,
};
use solana_rpc_client_api::config::RpcBlockConfig;
use solana_sdk::{slot_history::Slot, stake_history::Epoch};
use tokio::sync::RwLock;
use crate::postgres::{
postgres_block::PostgresBlock, postgres_session::PostgresSessionCache,
postgres_transaction::PostgresTransaction,
};
#[derive(Default, Clone, Copy)]
pub struct PostgresData {
from_slot: Slot,
to_slot: Slot,
current_epoch: Epoch,
}
pub struct PostgresBlockStore {
session_cache: PostgresSessionCache,
epoch_cache: EpochCache,
postgres_data: Arc<RwLock<PostgresData>>,
}
impl PostgresBlockStore {
pub async fn start_new_epoch(&self, schema: &String) -> Result<()> {
// create schema for new epoch
let session = self
.session_cache
.get_session()
.await
.expect("should get new postgres session");
let statement = format!("CREATE SCHEMA {};", schema);
session.execute(&statement, &[]).await?;
// Create blocks table
let statement = PostgresBlock::create_statement(schema);
session.execute(&statement, &[]).await?;
// create transaction table
let statement = PostgresTransaction::create_statement(schema);
session.execute(&statement, &[]).await?;
Ok(())
}
}
#[async_trait]
impl BlockStorageInterface for PostgresBlockStore {
async fn save(&self, block: ProducedBlock) -> Result<()> {
let PostgresData { current_epoch, .. } = { *self.postgres_data.read().await };
let slot = block.slot;
let transactions = block
.transactions
.iter()
.map(|x| PostgresTransaction::new(x, slot))
.collect_vec();
let postgres_block = PostgresBlock::from(&block);
let epoch = self.epoch_cache.get_epoch_at_slot(slot);
let schema = format!("EPOCH_{}", epoch.epoch);
if current_epoch == 0 || current_epoch < epoch.epoch {
self.postgres_data.write().await.current_epoch = epoch.epoch;
self.start_new_epoch(&schema).await?;
}
const NUMBER_OF_TRANSACTION: usize = 20;
// save transaction
let chunks = transactions.chunks(NUMBER_OF_TRANSACTION);
let session = self
.session_cache
.get_session()
.await
.expect("should get new postgres session");
for chunk in chunks {
PostgresTransaction::save_transactions(&session, &schema, chunk).await?;
}
postgres_block.save(&session, &schema).await?;
Ok(())
}
async fn get(&self, slot: Slot, _config: RpcBlockConfig) -> Result<ProducedBlock> {
let range = self.get_slot_range().await;
if range.contains(&slot) {}
todo!()
}
async fn get_slot_range(&self) -> std::ops::Range<Slot> {
let lk = self.postgres_data.read().await;
lk.from_slot..lk.to_slot + 1
}
}

View File

@ -1,29 +1,23 @@
use super::postgres_session::SchemaSize; use solana_lite_rpc_core::{encoding::BASE64, structures::produced_block::ProducedBlock};
use solana_lite_rpc_core::{ use tokio_postgres::types::ToSql;
commitment_utils::Commitment, encoding::BASE64, structures::produced_block::ProducedBlock,
}; use super::postgres_session::PostgresSession;
#[derive(Debug)] #[derive(Debug)]
pub struct PostgresBlock { pub struct PostgresBlock {
pub leader_id: Option<String>, pub slot: i64,
pub blockhash: String, pub blockhash: String,
pub block_height: i64, pub block_height: i64,
pub slot: i64,
pub parent_slot: i64, pub parent_slot: i64,
pub block_time: i64, pub block_time: i64,
pub commitment_config: i8,
pub previous_blockhash: String, pub previous_blockhash: String,
pub rewards: Option<String>, pub rewards: Option<String>,
} }
impl SchemaSize for PostgresBlock { const NB_ARUMENTS: usize = 7;
const DEFAULT_SIZE: usize = 4 * 8;
const MAX_SIZE: usize = Self::DEFAULT_SIZE + 8;
}
impl From<ProducedBlock> for PostgresBlock { impl From<&ProducedBlock> for PostgresBlock {
fn from(value: ProducedBlock) -> Self { fn from(value: &ProducedBlock) -> Self {
let commitment = Commitment::from(&value.commitment_config);
let rewards = value let rewards = value
.rewards .rewards
.as_ref() .as_ref()
@ -31,15 +25,59 @@ impl From<ProducedBlock> for PostgresBlock {
.unwrap_or(None); .unwrap_or(None);
Self { Self {
leader_id: value.leader_id, blockhash: value.blockhash.clone(),
blockhash: value.blockhash,
block_height: value.block_height as i64, block_height: value.block_height as i64,
slot: value.slot as i64, slot: value.slot as i64,
parent_slot: value.parent_slot as i64, parent_slot: value.parent_slot as i64,
block_time: value.block_time as i64, block_time: value.block_time as i64,
commitment_config: commitment as i8, previous_blockhash: value.previous_blockhash.clone(),
previous_blockhash: value.previous_blockhash,
rewards, rewards,
} }
} }
} }
impl PostgresBlock {
pub fn create_statement(schema: &String) -> String {
format!(
"
CREATE TABLE {}.BLOCKS (
slot BIGINT PRIMARY KEY,
blockhash STRING NOT NULL,
leader_id STRING,
block_height BIGINT NOT NULL,
parent_slot BIGINT NOT NULL,
block_time BIGINT NOT NULL,
previous_blockhash STRING NOT NULL,
rewards STRING,
);
",
schema
)
}
pub async fn save(
&self,
postgres_session: &PostgresSession,
schema: &String,
) -> anyhow::Result<()> {
let mut query = format!(
r#"
INSERT INTO {}.BLOCKS (slot, blockhash, block_height, parent_slot, block_time, previous_blockhash, rewards) VALUES
"#,
schema
);
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NB_ARUMENTS);
args.push(&self.slot);
args.push(&self.blockhash);
args.push(&self.block_height);
args.push(&self.parent_slot);
args.push(&self.block_time);
args.push(&self.previous_blockhash);
args.push(&self.rewards);
PostgresSession::multiline_query(&mut query, NB_ARUMENTS, 1, &[]);
postgres_session.execute(&query, &args).await?;
Ok(())
}
}

View File

@ -1,10 +1,13 @@
use solana_lite_rpc_core::{encoding::BASE64, structures::produced_block::TransactionInfo}; use solana_lite_rpc_core::{encoding::BASE64, structures::produced_block::TransactionInfo};
use solana_sdk::slot_history::Slot;
use tokio_postgres::types::ToSql;
use super::postgres_session::SchemaSize; use super::postgres_session::PostgresSession;
#[derive(Debug)] #[derive(Debug)]
pub struct PostgresTransaction { pub struct PostgresTransaction {
pub signature: String, pub signature: String,
pub slot: i64,
pub err: Option<String>, pub err: Option<String>,
pub cu_requested: Option<i32>, pub cu_requested: Option<i32>,
pub prioritization_fees: Option<i64>, pub prioritization_fees: Option<i64>,
@ -13,13 +16,10 @@ pub struct PostgresTransaction {
pub message: String, pub message: String,
} }
impl SchemaSize for PostgresTransaction { const NB_ARUMENTS: usize = 8;
const DEFAULT_SIZE: usize = 88 + (3 * 8) + 2;
const MAX_SIZE: usize = Self::DEFAULT_SIZE + (3 * 8);
}
impl From<&TransactionInfo> for PostgresTransaction { impl PostgresTransaction {
fn from(value: &TransactionInfo) -> Self { pub fn new(value: &TransactionInfo, slot: Slot) -> Self {
Self { Self {
signature: value.signature.clone(), signature: value.signature.clone(),
err: value err: value
@ -32,6 +32,81 @@ impl From<&TransactionInfo> for PostgresTransaction {
cu_consumed: value.cu_consumed.map(|x| x as i64), cu_consumed: value.cu_consumed.map(|x| x as i64),
recent_blockhash: value.recent_blockhash.clone(), recent_blockhash: value.recent_blockhash.clone(),
message: value.message.clone(), message: value.message.clone(),
slot: slot as i64,
} }
} }
pub fn create_statement(schema: &String) -> String {
format!(
"\
CREATE TABLE {}.TRANSACTIONS (
signature CHAR(88) NOT NULL,
slot BIGINT,
err STRING,
cu_requested BIGINT,
prioritization_fees BIGINT,
cu_consumed BIGINT,
recent_blockhash STRING NOT NULL,
message STRING NOT NULL,
PRIMARY KEY (signature)
CONSTRAINT fk_transactions FOREIGN KEY (slot) REFERENCES {}.BLOCKS(slot);
);
",
schema, schema
)
}
pub async fn save_transactions(
postgres_session: &PostgresSession,
schema: &String,
transactions: &[Self],
) -> anyhow::Result<()> {
let mut args: Vec<&(dyn ToSql + Sync)> =
Vec::with_capacity(NB_ARUMENTS * transactions.len());
for tx in transactions.iter() {
let PostgresTransaction {
signature,
slot,
err,
cu_requested,
prioritization_fees,
cu_consumed,
recent_blockhash,
message,
} = tx;
args.push(signature);
args.push(slot);
args.push(err);
args.push(cu_requested);
args.push(prioritization_fees);
args.push(cu_consumed);
args.push(recent_blockhash);
args.push(message);
}
let mut query = format!(
r#"
INSERT INTO {}.TRANSACTIONS
(signature, slot, err, cu_requested, prioritization_fees, cu_consumed, recent_blockhash, message)
VALUES
"#,
schema
);
PostgresSession::multiline_query(&mut query, NB_ARUMENTS, transactions.len(), &[]);
postgres_session.execute(&query, &args).await?;
Ok(())
}
pub async fn get(
postgres_session: PostgresSession,
schema: &String,
slot: Slot,
) -> Vec<TransactionInfo> {
let statement = format!("SELECT signature, err, cu_requested, prioritization_fees, cu_consumed, recent_blockhash, message FROM {}.TRANSACTIONS WHERE SLOT = {}", schema, slot);
let _ = postgres_session.client.query(&statement, &[]).await;
todo!()
}
} }

View File

@ -31,26 +31,33 @@ async fn inmemory_block_store_tests() {
for i in 1..11 { for i in 1..11 {
store store
.save(create_test_block(i, CommitmentConfig::finalized())) .save(create_test_block(i, CommitmentConfig::finalized()))
.await; .await
.unwrap();
} }
// check if 10 blocks are added // check if 10 blocks are added
for i in 1..11 { for i in 1..11 {
assert!(store.get(i, RpcBlockConfig::default()).await.is_some()); assert!(store.get(i, RpcBlockConfig::default()).await.ok().is_some());
} }
// add 11th block // add 11th block
store store
.save(create_test_block(11, CommitmentConfig::finalized())) .save(create_test_block(11, CommitmentConfig::finalized()))
.await; .await
.unwrap();
// can get 11th block // can get 11th block
assert!(store.get(11, RpcBlockConfig::default()).await.is_some()); assert!(store
.get(11, RpcBlockConfig::default())
.await
.ok()
.is_some());
// first block is removed // first block is removed
assert!(store.get(1, RpcBlockConfig::default()).await.is_none()); assert!(store.get(1, RpcBlockConfig::default()).await.ok().is_none());
// cannot add old blocks // cannot add old blocks
store store
.save(create_test_block(1, CommitmentConfig::finalized())) .save(create_test_block(1, CommitmentConfig::finalized()))
.await; .await
assert!(store.get(1, RpcBlockConfig::default()).await.is_none()); .unwrap();
assert!(store.get(1, RpcBlockConfig::default()).await.ok().is_none());
} }

View File

@ -37,63 +37,78 @@ async fn test_in_multiple_stategy_block_store() {
block_storage block_storage
.save(create_test_block(1235, CommitmentConfig::confirmed())) .save(create_test_block(1235, CommitmentConfig::confirmed()))
.await; .await
.unwrap();
block_storage block_storage
.save(create_test_block(1236, CommitmentConfig::confirmed())) .save(create_test_block(1236, CommitmentConfig::confirmed()))
.await; .await
.unwrap();
assert!(block_storage assert!(block_storage
.get(1235, RpcBlockConfig::default()) .get(1235, RpcBlockConfig::default())
.await .await
.ok()
.is_some()); .is_some());
assert!(block_storage assert!(block_storage
.get(1236, RpcBlockConfig::default()) .get(1236, RpcBlockConfig::default())
.await .await
.ok()
.is_some()); .is_some());
assert!(persistent_store assert!(persistent_store
.get(1235, RpcBlockConfig::default()) .get(1235, RpcBlockConfig::default())
.await .await
.ok()
.is_none()); .is_none());
assert!(persistent_store assert!(persistent_store
.get(1236, RpcBlockConfig::default()) .get(1236, RpcBlockConfig::default())
.await .await
.ok()
.is_none()); .is_none());
block_storage block_storage
.save(create_test_block(1235, CommitmentConfig::finalized())) .save(create_test_block(1235, CommitmentConfig::finalized()))
.await; .await
.unwrap();
block_storage block_storage
.save(create_test_block(1236, CommitmentConfig::finalized())) .save(create_test_block(1236, CommitmentConfig::finalized()))
.await; .await
.unwrap();
block_storage block_storage
.save(create_test_block(1237, CommitmentConfig::finalized())) .save(create_test_block(1237, CommitmentConfig::finalized()))
.await; .await
.unwrap();
assert!(block_storage assert!(block_storage
.get(1235, RpcBlockConfig::default()) .get(1235, RpcBlockConfig::default())
.await .await
.ok()
.is_some()); .is_some());
assert!(block_storage assert!(block_storage
.get(1236, RpcBlockConfig::default()) .get(1236, RpcBlockConfig::default())
.await .await
.ok()
.is_some()); .is_some());
assert!(block_storage assert!(block_storage
.get(1237, RpcBlockConfig::default()) .get(1237, RpcBlockConfig::default())
.await .await
.ok()
.is_some()); .is_some());
assert!(persistent_store assert!(persistent_store
.get(1235, RpcBlockConfig::default()) .get(1235, RpcBlockConfig::default())
.await .await
.ok()
.is_some()); .is_some());
assert!(persistent_store assert!(persistent_store
.get(1236, RpcBlockConfig::default()) .get(1236, RpcBlockConfig::default())
.await .await
.ok()
.is_some()); .is_some());
assert!(persistent_store assert!(persistent_store
.get(1237, RpcBlockConfig::default()) .get(1237, RpcBlockConfig::default())
.await .await
.ok()
.is_some()); .is_some());
assert!(block_storage.get_in_memory_block(1237).await.is_some()); assert!(block_storage.get_in_memory_block(1237).await.ok().is_some());
// blocks are replaced by finalized blocks // blocks are replaced by finalized blocks
assert_eq!( assert_eq!(
@ -137,5 +152,6 @@ async fn test_in_multiple_stategy_block_store() {
assert!(block_storage assert!(block_storage
.get(1238, RpcBlockConfig::default()) .get(1238, RpcBlockConfig::default())
.await .await
.ok()
.is_none()); .is_none());
} }

View File

@ -329,7 +329,7 @@ impl LiteRpcServer for LiteBridge {
) -> crate::rpc::Result<Option<UiConfirmedBlock>> { ) -> crate::rpc::Result<Option<UiConfirmedBlock>> {
let config = config.map_or(RpcBlockConfig::default(), |x| x.convert_to_current()); let config = config.map_or(RpcBlockConfig::default(), |x| x.convert_to_current());
let block = self.history.block_storage.get(slot, config).await; let block = self.history.block_storage.get(slot, config).await;
if block.is_some() { if block.is_ok() {
// TO DO Convert to UIConfirmed Block // TO DO Convert to UIConfirmed Block
Err(jsonrpsee::core::Error::HttpNotImplemented) Err(jsonrpsee::core::Error::HttpNotImplemented)
} else { } else {

View File

@ -5,7 +5,7 @@ use log::{info, warn};
use prometheus::{core::GenericGauge, opts, register_int_gauge}; use prometheus::{core::GenericGauge, opts, register_int_gauge};
use solana_lite_rpc_core::{ use solana_lite_rpc_core::{
structures::notifications::{ structures::notifications::{
BlockNotification, NotificationMsg, NotificationReciever, TransactionNotification, NotificationMsg, NotificationReciever, TransactionNotification,
TransactionUpdateNotification, TransactionUpdateNotification,
}, },
AnyhowJoinHandle, AnyhowJoinHandle,
@ -86,32 +86,6 @@ impl From<&TransactionUpdateNotification> for PostgresTxUpdate {
} }
} }
#[derive(Debug)]
pub struct PostgresBlock {
pub slot: i64, // 8 bytes
pub leader_id: i64, // 8 bytes
pub parent_slot: i64, // 8 bytes
pub cluster_time: DateTime<Utc>, // 8 bytes
pub local_time: Option<DateTime<Utc>>,
}
impl SchemaSize for PostgresBlock {
const DEFAULT_SIZE: usize = 4 * 8;
const MAX_SIZE: usize = Self::DEFAULT_SIZE + 8;
}
impl From<BlockNotification> for PostgresBlock {
fn from(value: BlockNotification) -> Self {
Self {
slot: value.slot as i64,
leader_id: 0, // TODO
cluster_time: value.cluster_time,
local_time: value.local_time,
parent_slot: value.parent_slot as i64,
}
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct AccountAddr { pub struct AccountAddr {
pub id: u32, pub id: u32,
@ -180,59 +154,6 @@ async fn send_txs(postgres_session: &PostgresSession, txs: &[PostgresTx]) -> any
Ok(()) Ok(())
} }
async fn send_blocks(
postgres_session: &PostgresSession,
blocks: &[PostgresBlock],
) -> anyhow::Result<()> {
const NUMBER_OF_ARGS: usize = 5;
if blocks.is_empty() {
return Ok(());
}
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * blocks.len());
for block in blocks.iter() {
let PostgresBlock {
slot,
leader_id,
parent_slot,
cluster_time,
local_time,
} = block;
args.push(slot);
args.push(leader_id);
args.push(parent_slot);
args.push(cluster_time);
args.push(local_time);
}
let mut query = String::from(
r#"
INSERT INTO lite_rpc.Blocks
(slot, leader_id, parent_slot, cluster_time, local_time)
VALUES
"#,
);
PostgresSession::multiline_query(&mut query, NUMBER_OF_ARGS, blocks.len(), &[]);
query.push_str(
r#"
ON CONFLICT (slot) DO UPDATE SET
leader_id = EXCLUDED.leader_id,
parent_slot = EXCLUDED.parent_slot,
cluster_time = EXCLUDED.cluster_time,
local_time = EXCLUDED.local_time
"#,
);
postgres_session.execute(&query, &args).await?;
Ok(())
}
async fn update_txs( async fn update_txs(
postgres_session: &PostgresSession, postgres_session: &PostgresSession,
txs: &[PostgresTxUpdate], txs: &[PostgresTxUpdate],
@ -302,11 +223,9 @@ impl PostgresLogger {
info!("start postgres worker"); info!("start postgres worker");
const TX_MAX_CAPACITY: usize = get_max_safe_inserts::<PostgresTx>(); const TX_MAX_CAPACITY: usize = get_max_safe_inserts::<PostgresTx>();
const BLOCK_MAX_CAPACITY: usize = get_max_safe_inserts::<PostgresBlock>();
const UPDATE_MAX_CAPACITY: usize = get_max_safe_updates::<PostgresTxUpdate>(); const UPDATE_MAX_CAPACITY: usize = get_max_safe_updates::<PostgresTxUpdate>();
let mut tx_batch: Vec<PostgresTx> = Vec::with_capacity(TX_MAX_CAPACITY); let mut tx_batch: Vec<PostgresTx> = Vec::with_capacity(TX_MAX_CAPACITY);
let mut block_batch: Vec<PostgresBlock> = Vec::with_capacity(BLOCK_MAX_CAPACITY);
let mut update_batch = Vec::<PostgresTxUpdate>::with_capacity(UPDATE_MAX_CAPACITY); let mut update_batch = Vec::<PostgresTxUpdate>::with_capacity(UPDATE_MAX_CAPACITY);
let mut session_establish_error = false; let mut session_establish_error = false;
@ -320,7 +239,6 @@ impl PostgresLogger {
// check for capacity // check for capacity
if tx_batch.len() >= TX_MAX_CAPACITY if tx_batch.len() >= TX_MAX_CAPACITY
|| block_batch.len() >= BLOCK_MAX_CAPACITY
|| update_batch.len() >= UPDATE_MAX_CAPACITY || update_batch.len() >= UPDATE_MAX_CAPACITY
{ {
break; break;
@ -335,8 +253,9 @@ impl PostgresLogger {
let mut tx = tx.iter().map(|x| x.into()).collect::<Vec<_>>(); let mut tx = tx.iter().map(|x| x.into()).collect::<Vec<_>>();
tx_batch.append(&mut tx) tx_batch.append(&mut tx)
} }
NotificationMsg::BlockNotificationMsg(block) => { NotificationMsg::BlockNotificationMsg(_) => {
block_batch.push(block.into()) // ignore block storage as it has been moved to persistant history.
continue;
} }
NotificationMsg::UpdateTransactionMsg(update) => { NotificationMsg::UpdateTransactionMsg(update) => {
let mut update = update.iter().map(|x| x.into()).collect(); let mut update = update.iter().map(|x| x.into()).collect();
@ -355,7 +274,7 @@ impl PostgresLogger {
} }
// if there's nothing to do, yield for a brief time // if there's nothing to do, yield for a brief time
if tx_batch.is_empty() && block_batch.is_empty() && update_batch.is_empty() { if tx_batch.is_empty() && update_batch.is_empty() {
tokio::time::sleep(Duration::from_millis(10)).await; tokio::time::sleep(Duration::from_millis(10)).await;
continue; continue;
} }
@ -377,9 +296,8 @@ impl PostgresLogger {
POSTGRES_SESSION_ERRORS.set(0); POSTGRES_SESSION_ERRORS.set(0);
// write to database when a successful connection is made // write to database when a successful connection is made
let (res_txs, res_blocks, res_update) = join!( let (res_txs, res_update) = join!(
send_txs(&session, &tx_batch), send_txs(&session, &tx_batch),
send_blocks(&session, &block_batch),
update_txs(&session, &update_batch) update_txs(&session, &update_batch)
); );
@ -392,14 +310,6 @@ impl PostgresLogger {
} else { } else {
tx_batch.clear(); tx_batch.clear();
} }
if let Err(err) = res_blocks {
warn!(
"Error sending block batch ({:?}) to postgres {err:?}",
block_batch.len()
);
} else {
block_batch.clear();
}
if let Err(err) = res_update { if let Err(err) = res_update {
warn!( warn!(
"Error sending update batch ({:?}) to postgres {err:?}", "Error sending update batch ({:?}) to postgres {err:?}",