after musitdev review

This commit is contained in:
godmodegalactus 2023-10-06 10:44:39 +02:00
parent 14f26534a7
commit fe3509c7ee
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
9 changed files with 92 additions and 70 deletions

View File

@ -1,4 +1,5 @@
use crate::structures::produced_block::ProducedBlock;
use anyhow::Result;
use async_trait::async_trait;
use solana_rpc_client_api::config::RpcBlockConfig;
use solana_sdk::slot_history::Slot;
@ -7,11 +8,12 @@ use std::{ops::Range, sync::Arc};
#[async_trait]
pub trait BlockStorageInterface: Send + Sync {
// will save a block
async fn save(&self, block: ProducedBlock);
async fn save(&self, block: ProducedBlock) -> Result<()>;
// will get a block
async fn get(&self, slot: Slot, config: RpcBlockConfig) -> Option<ProducedBlock>;
async fn get(&self, slot: Slot, config: RpcBlockConfig) -> Result<ProducedBlock>;
// will get range of slots that are stored in the storage
async fn get_slot_range(&self) -> Range<Slot>;
}
pub type BlockStorageImpl = Arc<dyn BlockStorageInterface>;
pub const BLOCK_NOT_FOUND: &str = "Block not found";

View File

@ -1,7 +1,8 @@
use async_trait::async_trait;
use solana_lite_rpc_core::{
commitment_utils::Commitment, structures::produced_block::ProducedBlock,
traits::block_storage_interface::BlockStorageInterface,
commitment_utils::Commitment,
structures::produced_block::ProducedBlock,
traits::block_storage_interface::{BlockStorageInterface, BLOCK_NOT_FOUND},
};
use solana_rpc_client_api::config::RpcBlockConfig;
use solana_sdk::slot_history::Slot;
@ -52,12 +53,18 @@ impl InmemoryBlockStore {
#[async_trait]
impl BlockStorageInterface for InmemoryBlockStore {
async fn save(&self, block: ProducedBlock) {
async fn save(&self, block: ProducedBlock) -> anyhow::Result<()> {
self.store(block).await;
Ok(())
}
async fn get(&self, slot: Slot, _: RpcBlockConfig) -> Option<ProducedBlock> {
self.block_storage.read().await.get(&slot).cloned()
async fn get(&self, slot: Slot, _: RpcBlockConfig) -> anyhow::Result<ProducedBlock> {
self.block_storage
.read()
.await
.get(&slot)
.cloned()
.ok_or(anyhow::Error::msg(BLOCK_NOT_FOUND))
}
async fn get_slot_range(&self) -> Range<Slot> {

View File

@ -4,11 +4,12 @@
// Fetches legacy blocks from faithful
use crate::block_stores::inmemory_block_store::InmemoryBlockStore;
use anyhow::{bail, Result};
use async_trait::async_trait;
use solana_lite_rpc_core::{
commitment_utils::Commitment,
structures::produced_block::ProducedBlock,
traits::block_storage_interface::{BlockStorageImpl, BlockStorageInterface},
traits::block_storage_interface::{BlockStorageImpl, BlockStorageInterface, BLOCK_NOT_FOUND},
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::config::RpcBlockConfig;
@ -42,7 +43,7 @@ impl MultipleStrategyBlockStorage {
}
}
pub async fn get_in_memory_block(&self, slot: Slot) -> Option<ProducedBlock> {
pub async fn get_in_memory_block(&self, slot: Slot) -> anyhow::Result<ProducedBlock> {
self.inmemory_for_storage
.get(
slot,
@ -60,46 +61,47 @@ impl MultipleStrategyBlockStorage {
#[async_trait]
impl BlockStorageInterface for MultipleStrategyBlockStorage {
async fn save(&self, block: ProducedBlock) {
async fn save(&self, block: ProducedBlock) -> Result<()> {
let slot = block.slot;
let commitment = Commitment::from(block.commitment_config);
match commitment {
Commitment::Confirmed | Commitment::Processed => {
self.inmemory_for_storage.save(block).await;
self.inmemory_for_storage.save(block).await?;
}
Commitment::Finalized => {
let block_in_mem = self.get_in_memory_block(block.slot).await;
match block_in_mem {
Some(block_in_mem) => {
Ok(block_in_mem) => {
// check if inmemory blockhash is same as finalized, update it if they are not
// we can have two machines with same identity publishing two different blocks on same slot
if block_in_mem.blockhash != block.blockhash {
self.inmemory_for_storage.save(block.clone()).await;
self.inmemory_for_storage.save(block.clone()).await?;
}
}
None => self.inmemory_for_storage.save(block.clone()).await,
Err(_) => self.inmemory_for_storage.save(block.clone()).await?,
}
self.persistent_block_storage.save(block).await;
self.persistent_block_storage.save(block).await?;
}
};
if slot > self.last_confirmed_slot.load(Ordering::Relaxed) {
self.last_confirmed_slot.store(slot, Ordering::Relaxed);
}
Ok(())
}
async fn get(
&self,
slot: solana_sdk::slot_history::Slot,
config: RpcBlockConfig,
) -> Option<ProducedBlock> {
) -> Result<ProducedBlock> {
let last_confirmed_slot = self.last_confirmed_slot.load(Ordering::Relaxed);
if slot > last_confirmed_slot {
None
bail!(BLOCK_NOT_FOUND);
} else {
let range = self.inmemory_for_storage.get_slot_range().await;
if range.contains(&slot) {
let block = self.inmemory_for_storage.get(slot, config).await;
if block.is_some() {
if block.is_ok() {
return block;
}
}
@ -113,15 +115,15 @@ impl BlockStorageInterface for MultipleStrategyBlockStorage {
.get_block_with_config(slot, config)
.await
{
Ok(block) => Some(ProducedBlock::from_ui_block(
Ok(block) => Ok(ProducedBlock::from_ui_block(
block,
slot,
CommitmentConfig::finalized(),
)),
Err(_) => None,
Err(_) => bail!(BLOCK_NOT_FOUND),
}
} else {
None
bail!(BLOCK_NOT_FOUND);
}
}
}

View File

@ -1,5 +1,6 @@
use std::sync::Arc;
use anyhow::Result;
use async_trait::async_trait;
use itertools::Itertools;
use solana_lite_rpc_core::{
@ -29,7 +30,7 @@ pub struct PostgresBlockStore {
}
impl PostgresBlockStore {
pub async fn start_new_epoch(&self, schema: &String) {
pub async fn start_new_epoch(&self, schema: &String) -> Result<()> {
// create schema for new epoch
let session = self
.session_cache
@ -38,29 +39,22 @@ impl PostgresBlockStore {
.expect("should get new postgres session");
let statement = format!("CREATE SCHEMA {};", schema);
if let Err(e) = session.execute(&statement, &[]).await {
log::error!("Error creating a schema for {} error {}", schema, e);
return;
}
session.execute(&statement, &[]).await?;
// Create blocks table
let statement = PostgresBlock::create_statement(schema);
if let Err(e) = session.execute(&statement, &[]).await {
log::error!("Error creating a blocks table for {} error {}", schema, e);
return;
}
session.execute(&statement, &[]).await?;
// create transaction table
let statement = PostgresTransaction::create_statement(schema);
if let Err(e) = session.execute(&statement, &[]).await {
log::error!("Error creating a blocks table for {} error {}", schema, e);
}
session.execute(&statement, &[]).await?;
Ok(())
}
}
#[async_trait]
impl BlockStorageInterface for PostgresBlockStore {
async fn save(&self, block: ProducedBlock) {
async fn save(&self, block: ProducedBlock) -> Result<()> {
let PostgresData { current_epoch, .. } = { *self.postgres_data.read().await };
let slot = block.slot;
@ -75,7 +69,7 @@ impl BlockStorageInterface for PostgresBlockStore {
let schema = format!("EPOCH_{}", epoch.epoch);
if current_epoch == 0 || current_epoch < epoch.epoch {
self.postgres_data.write().await.current_epoch = epoch.epoch;
self.start_new_epoch(&schema).await;
self.start_new_epoch(&schema).await?;
}
const NUMBER_OF_TRANSACTION: usize = 20;
@ -88,15 +82,16 @@ impl BlockStorageInterface for PostgresBlockStore {
.await
.expect("should get new postgres session");
for chunk in chunks {
PostgresTransaction::save_transactions(&session, &schema, chunk).await;
PostgresTransaction::save_transactions(&session, &schema, chunk).await?;
}
postgres_block.save(&session, &schema).await;
postgres_block.save(&session, &schema).await?;
Ok(())
}
async fn get(&self, slot: Slot, _config: RpcBlockConfig) -> Option<ProducedBlock> {
async fn get(&self, slot: Slot, _config: RpcBlockConfig) -> Result<ProducedBlock> {
let range = self.get_slot_range().await;
if range.contains(&slot) {}
None
todo!()
}
async fn get_slot_range(&self) -> std::ops::Range<Slot> {

View File

@ -55,7 +55,11 @@ impl PostgresBlock {
)
}
pub async fn save(&self, postgres_session: &PostgresSession, schema: &String) {
pub async fn save(
&self,
postgres_session: &PostgresSession,
schema: &String,
) -> anyhow::Result<()> {
let mut query = format!(
r#"
INSERT INTO {}.BLOCKS (slot, blockhash, block_height, parent_slot, block_time, previous_blockhash, rewards) VALUES
@ -73,13 +77,7 @@ impl PostgresBlock {
args.push(&self.rewards);
PostgresSession::multiline_query(&mut query, NB_ARUMENTS, 1, &[]);
if let Err(e) = postgres_session.execute(&query, &args).await {
log::error!(
"Could not save block {} slot, schema {}, error {}",
self.slot,
schema,
e
);
}
postgres_session.execute(&query, &args).await?;
Ok(())
}
}

View File

@ -1,4 +1,3 @@
use log::error;
use solana_lite_rpc_core::{encoding::BASE64, structures::produced_block::TransactionInfo};
use solana_sdk::slot_history::Slot;
use tokio_postgres::types::ToSql;
@ -61,7 +60,7 @@ impl PostgresTransaction {
postgres_session: &PostgresSession,
schema: &String,
transactions: &[Self],
) {
) -> anyhow::Result<()> {
let mut args: Vec<&(dyn ToSql + Sync)> =
Vec::with_capacity(NB_ARUMENTS * transactions.len());
@ -97,12 +96,8 @@ impl PostgresTransaction {
);
PostgresSession::multiline_query(&mut query, NB_ARUMENTS, transactions.len(), &[]);
if let Err(e) = postgres_session.execute(&query, &args).await {
error!(
"Could not save transaction batch schema {}, error {}",
schema, e
);
}
postgres_session.execute(&query, &args).await?;
Ok(())
}
pub async fn get(

View File

@ -31,26 +31,33 @@ async fn inmemory_block_store_tests() {
for i in 1..11 {
store
.save(create_test_block(i, CommitmentConfig::finalized()))
.await;
.await
.unwrap();
}
// check if 10 blocks are added
for i in 1..11 {
assert!(store.get(i, RpcBlockConfig::default()).await.is_some());
assert!(store.get(i, RpcBlockConfig::default()).await.ok().is_some());
}
// add 11th block
store
.save(create_test_block(11, CommitmentConfig::finalized()))
.await;
.await
.unwrap();
// can get 11th block
assert!(store.get(11, RpcBlockConfig::default()).await.is_some());
assert!(store
.get(11, RpcBlockConfig::default())
.await
.ok()
.is_some());
// first block is removed
assert!(store.get(1, RpcBlockConfig::default()).await.is_none());
assert!(store.get(1, RpcBlockConfig::default()).await.ok().is_none());
// cannot add old blocks
store
.save(create_test_block(1, CommitmentConfig::finalized()))
.await;
assert!(store.get(1, RpcBlockConfig::default()).await.is_none());
.await
.unwrap();
assert!(store.get(1, RpcBlockConfig::default()).await.ok().is_none());
}

View File

@ -37,63 +37,78 @@ async fn test_in_multiple_stategy_block_store() {
block_storage
.save(create_test_block(1235, CommitmentConfig::confirmed()))
.await;
.await
.unwrap();
block_storage
.save(create_test_block(1236, CommitmentConfig::confirmed()))
.await;
.await
.unwrap();
assert!(block_storage
.get(1235, RpcBlockConfig::default())
.await
.ok()
.is_some());
assert!(block_storage
.get(1236, RpcBlockConfig::default())
.await
.ok()
.is_some());
assert!(persistent_store
.get(1235, RpcBlockConfig::default())
.await
.ok()
.is_none());
assert!(persistent_store
.get(1236, RpcBlockConfig::default())
.await
.ok()
.is_none());
block_storage
.save(create_test_block(1235, CommitmentConfig::finalized()))
.await;
.await
.unwrap();
block_storage
.save(create_test_block(1236, CommitmentConfig::finalized()))
.await;
.await
.unwrap();
block_storage
.save(create_test_block(1237, CommitmentConfig::finalized()))
.await;
.await
.unwrap();
assert!(block_storage
.get(1235, RpcBlockConfig::default())
.await
.ok()
.is_some());
assert!(block_storage
.get(1236, RpcBlockConfig::default())
.await
.ok()
.is_some());
assert!(block_storage
.get(1237, RpcBlockConfig::default())
.await
.ok()
.is_some());
assert!(persistent_store
.get(1235, RpcBlockConfig::default())
.await
.ok()
.is_some());
assert!(persistent_store
.get(1236, RpcBlockConfig::default())
.await
.ok()
.is_some());
assert!(persistent_store
.get(1237, RpcBlockConfig::default())
.await
.ok()
.is_some());
assert!(block_storage.get_in_memory_block(1237).await.is_some());
assert!(block_storage.get_in_memory_block(1237).await.ok().is_some());
// blocks are replaced by finalized blocks
assert_eq!(
@ -137,5 +152,6 @@ async fn test_in_multiple_stategy_block_store() {
assert!(block_storage
.get(1238, RpcBlockConfig::default())
.await
.ok()
.is_none());
}

View File

@ -329,7 +329,7 @@ impl LiteRpcServer for LiteBridge {
) -> crate::rpc::Result<Option<UiConfirmedBlock>> {
let config = config.map_or(RpcBlockConfig::default(), |x| x.convert_to_current());
let block = self.history.block_storage.get(slot, config).await;
if block.is_some() {
if block.is_ok() {
// TO DO Convert to UIConfirmed Block
Err(jsonrpsee::core::Error::HttpNotImplemented)
} else {