diff --git a/core/src/traits/block_storage_interface.rs b/core/src/traits/block_storage_interface.rs index 1db772e2..5aacd184 100644 --- a/core/src/traits/block_storage_interface.rs +++ b/core/src/traits/block_storage_interface.rs @@ -1,4 +1,5 @@ use crate::structures::produced_block::ProducedBlock; +use anyhow::Result; use async_trait::async_trait; use solana_rpc_client_api::config::RpcBlockConfig; use solana_sdk::slot_history::Slot; @@ -7,11 +8,12 @@ use std::{ops::Range, sync::Arc}; #[async_trait] pub trait BlockStorageInterface: Send + Sync { // will save a block - async fn save(&self, block: ProducedBlock); + async fn save(&self, block: ProducedBlock) -> Result<()>; // will get a block - async fn get(&self, slot: Slot, config: RpcBlockConfig) -> Option; + async fn get(&self, slot: Slot, config: RpcBlockConfig) -> Result; // will get range of slots that are stored in the storage async fn get_slot_range(&self) -> Range; } pub type BlockStorageImpl = Arc; +pub const BLOCK_NOT_FOUND: &str = "Block not found"; diff --git a/history/src/block_stores/inmemory_block_store.rs b/history/src/block_stores/inmemory_block_store.rs index d97b28c0..bfacd419 100644 --- a/history/src/block_stores/inmemory_block_store.rs +++ b/history/src/block_stores/inmemory_block_store.rs @@ -1,7 +1,8 @@ use async_trait::async_trait; use solana_lite_rpc_core::{ - commitment_utils::Commitment, structures::produced_block::ProducedBlock, - traits::block_storage_interface::BlockStorageInterface, + commitment_utils::Commitment, + structures::produced_block::ProducedBlock, + traits::block_storage_interface::{BlockStorageInterface, BLOCK_NOT_FOUND}, }; use solana_rpc_client_api::config::RpcBlockConfig; use solana_sdk::slot_history::Slot; @@ -52,12 +53,18 @@ impl InmemoryBlockStore { #[async_trait] impl BlockStorageInterface for InmemoryBlockStore { - async fn save(&self, block: ProducedBlock) { + async fn save(&self, block: ProducedBlock) -> anyhow::Result<()> { self.store(block).await; + Ok(()) } - async fn get(&self, slot: Slot, _: RpcBlockConfig) -> Option { - self.block_storage.read().await.get(&slot).cloned() + async fn get(&self, slot: Slot, _: RpcBlockConfig) -> anyhow::Result { + self.block_storage + .read() + .await + .get(&slot) + .cloned() + .ok_or(anyhow::Error::msg(BLOCK_NOT_FOUND)) } async fn get_slot_range(&self) -> Range { diff --git a/history/src/block_stores/multiple_strategy_block_store.rs b/history/src/block_stores/multiple_strategy_block_store.rs index 2e80dc12..5f9b7fb9 100644 --- a/history/src/block_stores/multiple_strategy_block_store.rs +++ b/history/src/block_stores/multiple_strategy_block_store.rs @@ -4,11 +4,12 @@ // Fetches legacy blocks from faithful use crate::block_stores::inmemory_block_store::InmemoryBlockStore; +use anyhow::{bail, Result}; use async_trait::async_trait; use solana_lite_rpc_core::{ commitment_utils::Commitment, structures::produced_block::ProducedBlock, - traits::block_storage_interface::{BlockStorageImpl, BlockStorageInterface}, + traits::block_storage_interface::{BlockStorageImpl, BlockStorageInterface, BLOCK_NOT_FOUND}, }; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_rpc_client_api::config::RpcBlockConfig; @@ -42,7 +43,7 @@ impl MultipleStrategyBlockStorage { } } - pub async fn get_in_memory_block(&self, slot: Slot) -> Option { + pub async fn get_in_memory_block(&self, slot: Slot) -> anyhow::Result { self.inmemory_for_storage .get( slot, @@ -60,46 +61,47 @@ impl MultipleStrategyBlockStorage { #[async_trait] impl BlockStorageInterface for MultipleStrategyBlockStorage { - async fn save(&self, block: ProducedBlock) { + async fn save(&self, block: ProducedBlock) -> Result<()> { let slot = block.slot; let commitment = Commitment::from(block.commitment_config); match commitment { Commitment::Confirmed | Commitment::Processed => { - self.inmemory_for_storage.save(block).await; + self.inmemory_for_storage.save(block).await?; } Commitment::Finalized => { let block_in_mem = self.get_in_memory_block(block.slot).await; match block_in_mem { - Some(block_in_mem) => { + Ok(block_in_mem) => { // check if inmemory blockhash is same as finalized, update it if they are not // we can have two machines with same identity publishing two different blocks on same slot if block_in_mem.blockhash != block.blockhash { - self.inmemory_for_storage.save(block.clone()).await; + self.inmemory_for_storage.save(block.clone()).await?; } } - None => self.inmemory_for_storage.save(block.clone()).await, + Err(_) => self.inmemory_for_storage.save(block.clone()).await?, } - self.persistent_block_storage.save(block).await; + self.persistent_block_storage.save(block).await?; } }; if slot > self.last_confirmed_slot.load(Ordering::Relaxed) { self.last_confirmed_slot.store(slot, Ordering::Relaxed); } + Ok(()) } async fn get( &self, slot: solana_sdk::slot_history::Slot, config: RpcBlockConfig, - ) -> Option { + ) -> Result { let last_confirmed_slot = self.last_confirmed_slot.load(Ordering::Relaxed); if slot > last_confirmed_slot { - None + bail!(BLOCK_NOT_FOUND); } else { let range = self.inmemory_for_storage.get_slot_range().await; if range.contains(&slot) { let block = self.inmemory_for_storage.get(slot, config).await; - if block.is_some() { + if block.is_ok() { return block; } } @@ -113,15 +115,15 @@ impl BlockStorageInterface for MultipleStrategyBlockStorage { .get_block_with_config(slot, config) .await { - Ok(block) => Some(ProducedBlock::from_ui_block( + Ok(block) => Ok(ProducedBlock::from_ui_block( block, slot, CommitmentConfig::finalized(), )), - Err(_) => None, + Err(_) => bail!(BLOCK_NOT_FOUND), } } else { - None + bail!(BLOCK_NOT_FOUND); } } } diff --git a/history/src/block_stores/postgres_block_store.rs b/history/src/block_stores/postgres_block_store.rs index 4300afbb..a45edc3b 100644 --- a/history/src/block_stores/postgres_block_store.rs +++ b/history/src/block_stores/postgres_block_store.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use anyhow::Result; use async_trait::async_trait; use itertools::Itertools; use solana_lite_rpc_core::{ @@ -29,7 +30,7 @@ pub struct PostgresBlockStore { } impl PostgresBlockStore { - pub async fn start_new_epoch(&self, schema: &String) { + pub async fn start_new_epoch(&self, schema: &String) -> Result<()> { // create schema for new epoch let session = self .session_cache @@ -38,29 +39,22 @@ impl PostgresBlockStore { .expect("should get new postgres session"); let statement = format!("CREATE SCHEMA {};", schema); - if let Err(e) = session.execute(&statement, &[]).await { - log::error!("Error creating a schema for {} error {}", schema, e); - return; - } + session.execute(&statement, &[]).await?; // Create blocks table let statement = PostgresBlock::create_statement(schema); - if let Err(e) = session.execute(&statement, &[]).await { - log::error!("Error creating a blocks table for {} error {}", schema, e); - return; - } + session.execute(&statement, &[]).await?; // create transaction table let statement = PostgresTransaction::create_statement(schema); - if let Err(e) = session.execute(&statement, &[]).await { - log::error!("Error creating a blocks table for {} error {}", schema, e); - } + session.execute(&statement, &[]).await?; + Ok(()) } } #[async_trait] impl BlockStorageInterface for PostgresBlockStore { - async fn save(&self, block: ProducedBlock) { + async fn save(&self, block: ProducedBlock) -> Result<()> { let PostgresData { current_epoch, .. } = { *self.postgres_data.read().await }; let slot = block.slot; @@ -75,7 +69,7 @@ impl BlockStorageInterface for PostgresBlockStore { let schema = format!("EPOCH_{}", epoch.epoch); if current_epoch == 0 || current_epoch < epoch.epoch { self.postgres_data.write().await.current_epoch = epoch.epoch; - self.start_new_epoch(&schema).await; + self.start_new_epoch(&schema).await?; } const NUMBER_OF_TRANSACTION: usize = 20; @@ -88,15 +82,16 @@ impl BlockStorageInterface for PostgresBlockStore { .await .expect("should get new postgres session"); for chunk in chunks { - PostgresTransaction::save_transactions(&session, &schema, chunk).await; + PostgresTransaction::save_transactions(&session, &schema, chunk).await?; } - postgres_block.save(&session, &schema).await; + postgres_block.save(&session, &schema).await?; + Ok(()) } - async fn get(&self, slot: Slot, _config: RpcBlockConfig) -> Option { + async fn get(&self, slot: Slot, _config: RpcBlockConfig) -> Result { let range = self.get_slot_range().await; if range.contains(&slot) {} - None + todo!() } async fn get_slot_range(&self) -> std::ops::Range { diff --git a/history/src/postgres/postgres_block.rs b/history/src/postgres/postgres_block.rs index 70a1c480..74bb3c47 100644 --- a/history/src/postgres/postgres_block.rs +++ b/history/src/postgres/postgres_block.rs @@ -55,7 +55,11 @@ impl PostgresBlock { ) } - pub async fn save(&self, postgres_session: &PostgresSession, schema: &String) { + pub async fn save( + &self, + postgres_session: &PostgresSession, + schema: &String, + ) -> anyhow::Result<()> { let mut query = format!( r#" INSERT INTO {}.BLOCKS (slot, blockhash, block_height, parent_slot, block_time, previous_blockhash, rewards) VALUES @@ -73,13 +77,7 @@ impl PostgresBlock { args.push(&self.rewards); PostgresSession::multiline_query(&mut query, NB_ARUMENTS, 1, &[]); - if let Err(e) = postgres_session.execute(&query, &args).await { - log::error!( - "Could not save block {} slot, schema {}, error {}", - self.slot, - schema, - e - ); - } + postgres_session.execute(&query, &args).await?; + Ok(()) } } diff --git a/history/src/postgres/postgres_transaction.rs b/history/src/postgres/postgres_transaction.rs index 56376175..b559c4a5 100644 --- a/history/src/postgres/postgres_transaction.rs +++ b/history/src/postgres/postgres_transaction.rs @@ -1,4 +1,3 @@ -use log::error; use solana_lite_rpc_core::{encoding::BASE64, structures::produced_block::TransactionInfo}; use solana_sdk::slot_history::Slot; use tokio_postgres::types::ToSql; @@ -61,7 +60,7 @@ impl PostgresTransaction { postgres_session: &PostgresSession, schema: &String, transactions: &[Self], - ) { + ) -> anyhow::Result<()> { let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NB_ARUMENTS * transactions.len()); @@ -97,12 +96,8 @@ impl PostgresTransaction { ); PostgresSession::multiline_query(&mut query, NB_ARUMENTS, transactions.len(), &[]); - if let Err(e) = postgres_session.execute(&query, &args).await { - error!( - "Could not save transaction batch schema {}, error {}", - schema, e - ); - } + postgres_session.execute(&query, &args).await?; + Ok(()) } pub async fn get( diff --git a/history/tests/inmemory_block_store_tests.rs b/history/tests/inmemory_block_store_tests.rs index 543d39a8..8e2dbeb5 100644 --- a/history/tests/inmemory_block_store_tests.rs +++ b/history/tests/inmemory_block_store_tests.rs @@ -31,26 +31,33 @@ async fn inmemory_block_store_tests() { for i in 1..11 { store .save(create_test_block(i, CommitmentConfig::finalized())) - .await; + .await + .unwrap(); } // check if 10 blocks are added for i in 1..11 { - assert!(store.get(i, RpcBlockConfig::default()).await.is_some()); + assert!(store.get(i, RpcBlockConfig::default()).await.ok().is_some()); } // add 11th block store .save(create_test_block(11, CommitmentConfig::finalized())) - .await; + .await + .unwrap(); // can get 11th block - assert!(store.get(11, RpcBlockConfig::default()).await.is_some()); + assert!(store + .get(11, RpcBlockConfig::default()) + .await + .ok() + .is_some()); // first block is removed - assert!(store.get(1, RpcBlockConfig::default()).await.is_none()); + assert!(store.get(1, RpcBlockConfig::default()).await.ok().is_none()); // cannot add old blocks store .save(create_test_block(1, CommitmentConfig::finalized())) - .await; - assert!(store.get(1, RpcBlockConfig::default()).await.is_none()); + .await + .unwrap(); + assert!(store.get(1, RpcBlockConfig::default()).await.ok().is_none()); } diff --git a/history/tests/multiple_strategy_block_store_tests.rs b/history/tests/multiple_strategy_block_store_tests.rs index 6a908501..3e0304eb 100644 --- a/history/tests/multiple_strategy_block_store_tests.rs +++ b/history/tests/multiple_strategy_block_store_tests.rs @@ -37,63 +37,78 @@ async fn test_in_multiple_stategy_block_store() { block_storage .save(create_test_block(1235, CommitmentConfig::confirmed())) - .await; + .await + .unwrap(); block_storage .save(create_test_block(1236, CommitmentConfig::confirmed())) - .await; + .await + .unwrap(); assert!(block_storage .get(1235, RpcBlockConfig::default()) .await + .ok() .is_some()); assert!(block_storage .get(1236, RpcBlockConfig::default()) .await + .ok() .is_some()); assert!(persistent_store .get(1235, RpcBlockConfig::default()) .await + .ok() .is_none()); assert!(persistent_store .get(1236, RpcBlockConfig::default()) .await + .ok() .is_none()); block_storage .save(create_test_block(1235, CommitmentConfig::finalized())) - .await; + .await + .unwrap(); block_storage .save(create_test_block(1236, CommitmentConfig::finalized())) - .await; + .await + .unwrap(); block_storage .save(create_test_block(1237, CommitmentConfig::finalized())) - .await; + .await + .unwrap(); assert!(block_storage .get(1235, RpcBlockConfig::default()) .await + .ok() .is_some()); assert!(block_storage .get(1236, RpcBlockConfig::default()) .await + .ok() .is_some()); assert!(block_storage .get(1237, RpcBlockConfig::default()) .await + .ok() .is_some()); assert!(persistent_store .get(1235, RpcBlockConfig::default()) .await + .ok() .is_some()); assert!(persistent_store .get(1236, RpcBlockConfig::default()) .await + .ok() .is_some()); assert!(persistent_store .get(1237, RpcBlockConfig::default()) .await + .ok() .is_some()); - assert!(block_storage.get_in_memory_block(1237).await.is_some()); + assert!(block_storage.get_in_memory_block(1237).await.ok().is_some()); // blocks are replaced by finalized blocks assert_eq!( @@ -137,5 +152,6 @@ async fn test_in_multiple_stategy_block_store() { assert!(block_storage .get(1238, RpcBlockConfig::default()) .await + .ok() .is_none()); } diff --git a/lite-rpc/src/bridge.rs b/lite-rpc/src/bridge.rs index a41d2cb0..6497d4ff 100644 --- a/lite-rpc/src/bridge.rs +++ b/lite-rpc/src/bridge.rs @@ -329,7 +329,7 @@ impl LiteRpcServer for LiteBridge { ) -> crate::rpc::Result> { let config = config.map_or(RpcBlockConfig::default(), |x| x.convert_to_current()); let block = self.history.block_storage.get(slot, config).await; - if block.is_some() { + if block.is_ok() { // TO DO Convert to UIConfirmed Block Err(jsonrpsee::core::Error::HttpNotImplemented) } else {