implementing history, block storage method

changes after groovies review
This commit is contained in:
godmodegalactus 2023-09-21 09:56:51 +02:00
parent 8a9a6e2d67
commit 7ef29e1af6
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
24 changed files with 719 additions and 165 deletions

15
Cargo.lock generated
View File

@ -2458,6 +2458,7 @@ dependencies = [
"serde_json",
"solana-lite-rpc-cluster-endpoints",
"solana-lite-rpc-core",
"solana-lite-rpc-history",
"solana-lite-rpc-services",
"solana-rpc-client",
"solana-rpc-client-api",
@ -4291,6 +4292,20 @@ dependencies = [
"tokio",
]
[[package]]
name = "solana-lite-rpc-history"
version = "0.2.3"
dependencies = [
"async-trait",
"dashmap",
"solana-lite-rpc-core",
"solana-rpc-client",
"solana-rpc-client-api",
"solana-sdk",
"solana-transaction-status",
"tokio",
]
[[package]]
name = "solana-lite-rpc-quic-forward-proxy"
version = "0.1.0"

View File

@ -6,6 +6,7 @@ members = [
"quic-forward-proxy",
"quic-forward-proxy-integration-test",
"cluster-endpoints",
"history",
"bench"
]
@ -55,6 +56,7 @@ rustls = { version = "=0.20.8", default-features = false }
solana-lite-rpc-services = {path = "services", version="0.2.3"}
solana-lite-rpc-core = {path = "core", version="0.2.3"}
solana-lite-rpc-cluster-endpoints = {path = "cluster-endpoints", version="0.2.3"}
solana-lite-rpc-history = {path = "history", version="0.2.3"}
async-trait = "0.1.68"
yellowstone-grpc-client = "1.9.0"

View File

@ -1,23 +1,12 @@
use anyhow::Context;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_lite_rpc_core::{
structures::{
produced_block::{ProducedBlock, TransactionInfo},
slot_notification::SlotNotification,
},
structures::{produced_block::ProducedBlock, slot_notification::SlotNotification},
AnyhowJoinHandle,
};
use solana_rpc_client_api::config::RpcBlockConfig;
use solana_sdk::{
borsh0_10::try_from_slice_unchecked,
commitment_config::CommitmentConfig,
compute_budget::{self, ComputeBudgetInstruction},
slot_history::Slot,
};
use solana_transaction_status::{
option_serializer::OptionSerializer, RewardType, TransactionDetails, UiTransactionEncoding,
UiTransactionStatusMeta,
};
use solana_sdk::{commitment_config::CommitmentConfig, slot_history::Slot};
use solana_transaction_status::{TransactionDetails, UiTransactionEncoding};
use std::{
sync::{atomic::AtomicU64, Arc},
time::Duration,
@ -42,126 +31,10 @@ pub async fn process_block(
)
.await;
if block.is_err() {
return None;
match block {
Ok(block) => Some(ProducedBlock::from_ui_block(block, slot, commitment_config)),
Err(_) => None,
}
let block = block.unwrap();
let Some(block_height) = block.block_height else {
return None;
};
let Some(txs) = block.transactions else {
return None;
};
let blockhash = block.blockhash;
let parent_slot = block.parent_slot;
let txs = txs
.into_iter()
.filter_map(|tx| {
let Some(UiTransactionStatusMeta {
err,
compute_units_consumed,
..
}) = tx.meta
else {
log::info!("Tx with no meta");
return None;
};
let Some(tx) = tx.transaction.decode() else {
log::info!("Tx could not be decoded");
return None;
};
let signature = tx.signatures[0].to_string();
let cu_consumed = match compute_units_consumed {
OptionSerializer::Some(cu_consumed) => Some(cu_consumed),
_ => None,
};
let legacy_compute_budget = tx.message.instructions().iter().find_map(|i| {
if i.program_id(tx.message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
units,
additional_fee,
}) = try_from_slice_unchecked(i.data.as_slice())
{
return Some((units, additional_fee));
}
}
None
});
let mut cu_requested = tx.message.instructions().iter().find_map(|i| {
if i.program_id(tx.message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(limit);
}
}
None
});
let mut prioritization_fees = tx.message.instructions().iter().find_map(|i| {
if i.program_id(tx.message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(price);
}
}
None
});
if let Some((units, additional_fee)) = legacy_compute_budget {
cu_requested = Some(units);
if additional_fee > 0 {
prioritization_fees = Some(((units * 1000) / additional_fee).into())
}
};
Some(TransactionInfo {
signature,
err,
cu_requested,
prioritization_fees,
cu_consumed,
})
})
.collect();
let leader_id = if let Some(rewards) = block.rewards {
rewards
.iter()
.find(|reward| Some(RewardType::Fee) == reward.reward_type)
.map(|leader_reward| leader_reward.pubkey.clone())
} else {
None
};
let block_time = block.block_time.unwrap_or(0) as u64;
Some(ProducedBlock {
txs,
block_height,
leader_id,
blockhash,
parent_slot,
block_time,
slot,
commitment_config,
})
}
pub fn poll_block(

View File

@ -0,0 +1,46 @@
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[repr(C)]
pub enum Commitment {
Processed = 0,
Confirmed = 1,
Finalized = 2,
}
impl From<CommitmentLevel> for Commitment {
#[allow(deprecated)]
fn from(value: CommitmentLevel) -> Self {
match value {
CommitmentLevel::Finalized | CommitmentLevel::Root | CommitmentLevel::Max => {
Commitment::Finalized
}
CommitmentLevel::Confirmed
| CommitmentLevel::Single
| CommitmentLevel::SingleGossip => Commitment::Confirmed,
CommitmentLevel::Processed | CommitmentLevel::Recent => Commitment::Processed,
}
}
}
impl From<CommitmentConfig> for Commitment {
fn from(value: CommitmentConfig) -> Self {
value.commitment.into()
}
}
impl Commitment {
pub fn into_commitment_level(&self) -> CommitmentLevel {
match self {
Commitment::Confirmed => CommitmentLevel::Confirmed,
Commitment::Processed => CommitmentLevel::Processed,
Commitment::Finalized => CommitmentLevel::Finalized,
}
}
pub fn into_commiment_config(&self) -> CommitmentConfig {
CommitmentConfig {
commitment: self.into_commitment_level(),
}
}
}

View File

@ -1,3 +1,4 @@
pub mod commitment_utils;
pub mod keypair_loader;
pub mod quic_connection;
pub mod quic_connection_utils;

View File

@ -28,7 +28,7 @@ pub struct SlotCache {
/// The central data store for all data from the cluster.
#[derive(Clone)]
pub struct DataCache {
pub block_store: BlockInformationStore,
pub block_information_store: BlockInformationStore,
pub txs: TxStore,
pub tx_subs: SubscriptionStore,
pub slot_cache: SlotCache,
@ -39,10 +39,10 @@ pub struct DataCache {
impl DataCache {
pub async fn clean(&self, ttl_duration: std::time::Duration) {
let block_info = self
.block_store
.block_information_store
.get_latest_block_info(CommitmentConfig::finalized())
.await;
self.block_store.clean().await;
self.block_information_store.clean().await;
self.txs.clean(block_info.block_height);
self.tx_subs.clean(ttl_duration);
@ -55,7 +55,7 @@ impl DataCache {
self.txs
.is_transaction_confirmed(&sent_transaction_info.signature)
|| self
.block_store
.block_information_store
.get_latest_block(CommitmentConfig::processed())
.await
.block_height
@ -64,7 +64,7 @@ impl DataCache {
pub fn new_for_tests() -> Self {
Self {
block_store: BlockInformationStore::new(BlockInformation {
block_information_store: BlockInformationStore::new(BlockInformation {
block_height: 0,
blockhash: Hash::new_unique().to_string(),
cleanup_slot: 1000,

View File

@ -1,5 +1,12 @@
use solana_sdk::{
commitment_config::CommitmentConfig, slot_history::Slot, transaction::TransactionError,
borsh0_10::try_from_slice_unchecked,
commitment_config::CommitmentConfig,
compute_budget::{self, ComputeBudgetInstruction},
slot_history::Slot,
transaction::TransactionError,
};
use solana_transaction_status::{
option_serializer::OptionSerializer, RewardType, UiConfirmedBlock, UiTransactionStatusMeta,
};
#[derive(Debug, Clone)]
@ -22,3 +29,124 @@ pub struct ProducedBlock {
pub block_time: u64,
pub commitment_config: CommitmentConfig,
}
impl ProducedBlock {
pub fn from_ui_block(
block: UiConfirmedBlock,
slot: Slot,
commitment_config: CommitmentConfig,
) -> Self {
let block_height = block.block_height.unwrap_or_default();
let txs = block.transactions.unwrap_or_default();
let blockhash = block.blockhash;
let parent_slot = block.parent_slot;
let txs = txs
.into_iter()
.filter_map(|tx| {
let Some(UiTransactionStatusMeta {
err,
compute_units_consumed,
..
}) = tx.meta
else {
// ignoring transaction
log::info!("Tx with no meta");
return None;
};
let Some(tx) = tx.transaction.decode() else {
// ignoring transaction
log::info!("Tx could not be decoded");
return None;
};
let signature = tx.signatures[0].to_string();
let cu_consumed = match compute_units_consumed {
OptionSerializer::Some(cu_consumed) => Some(cu_consumed),
_ => None,
};
let legacy_compute_budget = tx.message.instructions().iter().find_map(|i| {
if i.program_id(tx.message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
units,
additional_fee,
}) = try_from_slice_unchecked(i.data.as_slice())
{
return Some((units, additional_fee));
}
}
None
});
let mut cu_requested = tx.message.instructions().iter().find_map(|i| {
if i.program_id(tx.message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(limit);
}
}
None
});
let mut prioritization_fees = tx.message.instructions().iter().find_map(|i| {
if i.program_id(tx.message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(price);
}
}
None
});
if let Some((units, additional_fee)) = legacy_compute_budget {
cu_requested = Some(units);
if additional_fee > 0 {
prioritization_fees = Some(((units * 1000) / additional_fee).into())
}
};
Some(TransactionInfo {
signature,
err,
cu_requested,
prioritization_fees,
cu_consumed,
})
})
.collect();
let leader_id = if let Some(rewards) = block.rewards {
rewards
.iter()
.find(|reward| Some(RewardType::Fee) == reward.reward_type)
.map(|leader_reward| leader_reward.pubkey.clone())
} else {
None
};
let block_time = block.block_time.unwrap_or(0) as u64;
ProducedBlock {
txs,
block_height,
leader_id,
blockhash,
parent_slot,
block_time,
slot,
commitment_config,
}
}
}

View File

@ -1,12 +1,17 @@
use crate::structures::produced_block::ProducedBlock;
use async_trait::async_trait;
use solana_sdk::{commitment_config::CommitmentLevel, slot_history::Slot};
use solana_transaction_status::UiConfirmedBlock;
use std::sync::Arc;
use solana_rpc_client_api::config::RpcBlockConfig;
use solana_sdk::slot_history::Slot;
use std::{ops::Range, sync::Arc};
#[async_trait]
pub trait BlockStorageInterface: Send + Sync {
async fn save(&self, slot: Slot, block: UiConfirmedBlock, commitment: CommitmentLevel);
async fn get(&self, slot: Slot) -> Option<UiConfirmedBlock>;
// will save a block
async fn save(&self, block: ProducedBlock);
// will get a block
async fn get(&self, slot: Slot, config: RpcBlockConfig) -> Option<ProducedBlock>;
// will get range of slots that are stored in the storage
async fn get_slot_range(&self) -> Range<Slot>;
}
pub type BlockStorageImpl = Arc<dyn BlockStorageInterface>;

20
history/Cargo.toml Normal file
View File

@ -0,0 +1,20 @@
[package]
name = "solana-lite-rpc-history"
version = "0.2.3"
edition = "2021"
description = "History implementations used by solana lite rpc"
rust-version = "1.70.0"
repository = "https://github.com/blockworks-foundation/lite-rpc"
license = "AGPL"
[dependencies]
solana-sdk = { workspace = true }
solana-transaction-status = { workspace = true }
solana-rpc-client = { workspace = true }
dashmap = {workspace = true}
async-trait = { workspace = true }
tokio = "1.*"
solana-lite-rpc-core = {workspace = true}
solana-rpc-client-api = {workspace = true}

View File

@ -0,0 +1,76 @@
use async_trait::async_trait;
use solana_lite_rpc_core::{
commitment_utils::Commitment, structures::produced_block::ProducedBlock,
traits::block_storage_interface::BlockStorageInterface,
};
use solana_rpc_client_api::config::RpcBlockConfig;
use solana_sdk::slot_history::Slot;
use std::{collections::BTreeMap, ops::Range};
use tokio::sync::RwLock;
pub struct InmemoryBlockStore {
block_storage: RwLock<BTreeMap<Slot, ProducedBlock>>,
number_of_blocks_to_store: usize,
}
impl InmemoryBlockStore {
pub fn new(number_of_blocks_to_store: usize) -> Self {
Self {
number_of_blocks_to_store,
block_storage: RwLock::new(BTreeMap::new()),
}
}
pub async fn store(&self, block: ProducedBlock) {
let slot = block.slot;
let mut block_storage = self.block_storage.write().await;
let min_slot = match block_storage.first_key_value() {
Some((slot, _)) => *slot,
None => 0,
};
if slot >= min_slot {
// overwrite block only if confirmation has changed
match block_storage.get_mut(&slot) {
Some(x) => {
let commitment_store = Commitment::from(x.commitment_config);
let commitment_block = Commitment::from(block.commitment_config);
let overwrite = commitment_block > commitment_store;
if overwrite {
*x = block;
}
}
None => {
block_storage.insert(slot, block);
}
}
if block_storage.len() > self.number_of_blocks_to_store {
block_storage.remove(&min_slot);
}
}
}
}
#[async_trait]
impl BlockStorageInterface for InmemoryBlockStore {
async fn save(&self, block: ProducedBlock) {
self.store(block).await;
}
async fn get(&self, slot: Slot, _: RpcBlockConfig) -> Option<ProducedBlock> {
self.block_storage.read().await.get(&slot).cloned()
}
async fn get_slot_range(&self) -> Range<Slot> {
let lk = self.block_storage.read().await;
let first = lk.first_key_value();
let last = lk.last_key_value();
if let Some((first_slot, _)) = first {
let Some((last_slot, _)) = last else {
return Range::default();
};
*first_slot..(*last_slot + 1)
} else {
Range::default()
}
}
}

View File

@ -0,0 +1,2 @@
pub mod inmemory_block_store;
pub mod multiple_strategy_block_store;

View File

@ -0,0 +1,139 @@
// A mixed block store,
// Stores confirmed blocks in memory
// Finalized blocks in long term storage of your choice
// Fetches legacy blocks from faithful
use crate::block_stores::inmemory_block_store::InmemoryBlockStore;
use async_trait::async_trait;
use solana_lite_rpc_core::{
commitment_utils::Commitment,
structures::produced_block::ProducedBlock,
traits::block_storage_interface::{BlockStorageImpl, BlockStorageInterface},
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::config::RpcBlockConfig;
use solana_sdk::{commitment_config::CommitmentConfig, slot_history::Slot};
use std::{
ops::Range,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
pub struct MultipleStrategyBlockStorage {
inmemory_for_storage: InmemoryBlockStore, // for confirmed blocks
persistent_block_storage: BlockStorageImpl, // for persistent block storage
faithful_rpc_client: Option<Arc<RpcClient>>, // to fetch legacy blocks from faithful
last_confirmed_slot: Arc<AtomicU64>,
}
impl MultipleStrategyBlockStorage {
pub fn new(
persistent_block_storage: BlockStorageImpl,
faithful_rpc_client: Option<Arc<RpcClient>>,
number_of_slots_in_memory: usize,
) -> Self {
Self {
inmemory_for_storage: InmemoryBlockStore::new(number_of_slots_in_memory),
persistent_block_storage,
faithful_rpc_client,
last_confirmed_slot: Arc::new(AtomicU64::new(0)),
}
}
pub async fn get_in_memory_block(&self, slot: Slot) -> Option<ProducedBlock> {
self.inmemory_for_storage
.get(
slot,
RpcBlockConfig {
encoding: None,
transaction_details: None,
rewards: None,
commitment: None,
max_supported_transaction_version: None,
},
)
.await
}
}
#[async_trait]
impl BlockStorageInterface for MultipleStrategyBlockStorage {
async fn save(&self, block: ProducedBlock) {
let slot = block.slot;
let commitment = Commitment::from(block.commitment_config);
match commitment {
Commitment::Confirmed | Commitment::Processed => {
self.inmemory_for_storage.save(block).await;
}
Commitment::Finalized => {
let block_in_mem = self.get_in_memory_block(block.slot).await;
match block_in_mem {
Some(block_in_mem) => {
// check if inmemory blockhash is same as finalized, update it if they are not
// we can have two machines with same identity publishing two different blocks on same slot
if block_in_mem.blockhash != block.blockhash {
self.inmemory_for_storage.save(block.clone()).await;
}
}
None => self.inmemory_for_storage.save(block.clone()).await,
}
self.persistent_block_storage.save(block).await;
}
};
if slot > self.last_confirmed_slot.load(Ordering::Relaxed) {
self.last_confirmed_slot.store(slot, Ordering::Relaxed);
}
}
async fn get(
&self,
slot: solana_sdk::slot_history::Slot,
config: RpcBlockConfig,
) -> Option<ProducedBlock> {
let last_confirmed_slot = self.last_confirmed_slot.load(Ordering::Relaxed);
if slot > last_confirmed_slot {
None
} else {
let range = self.inmemory_for_storage.get_slot_range().await;
if range.contains(&slot) {
let block = self.inmemory_for_storage.get(slot, config).await;
if block.is_some() {
return block;
}
}
// TODO: Define what data is expected that is definetly not in persistant block storage like data after epoch - 1
// check persistant block
let persistent_block_range = self.persistent_block_storage.get_slot_range().await;
if persistent_block_range.contains(&slot) {
self.persistent_block_storage.get(slot, config).await
} else if let Some(faithful_rpc_client) = self.faithful_rpc_client.clone() {
match faithful_rpc_client
.get_block_with_config(slot, config)
.await
{
Ok(block) => Some(ProducedBlock::from_ui_block(
block,
slot,
CommitmentConfig::finalized(),
)),
Err(_) => None,
}
} else {
None
}
}
}
async fn get_slot_range(&self) -> Range<Slot> {
let in_memory = self.inmemory_for_storage.get_slot_range().await;
// if faithful is available we assume that we have all the blocks
if self.faithful_rpc_client.is_some() {
0..in_memory.end
} else {
let persistent_storage_range = self.persistent_block_storage.get_slot_range().await;
persistent_storage_range.start..in_memory.end
}
}
}

6
history/src/history.rs Normal file
View File

@ -0,0 +1,6 @@
use solana_lite_rpc_core::traits::block_storage_interface::BlockStorageInterface;
use std::sync::Arc;
pub struct History {
pub block_storage: Arc<dyn BlockStorageInterface>,
}

2
history/src/lib.rs Normal file
View File

@ -0,0 +1,2 @@
pub mod block_stores;
pub mod history;

View File

@ -0,0 +1,54 @@
use solana_lite_rpc_core::{
structures::produced_block::ProducedBlock,
traits::block_storage_interface::BlockStorageInterface,
};
use solana_lite_rpc_history::block_stores::inmemory_block_store::InmemoryBlockStore;
use solana_rpc_client_api::config::RpcBlockConfig;
use solana_sdk::{commitment_config::CommitmentConfig, hash::Hash};
use std::sync::Arc;
pub fn create_test_block(slot: u64, commitment_config: CommitmentConfig) -> ProducedBlock {
ProducedBlock {
block_height: slot,
blockhash: Hash::new_unique().to_string(),
parent_slot: slot - 1,
txs: vec![],
block_time: 0,
commitment_config,
leader_id: None,
slot,
}
}
#[tokio::test]
async fn inmemory_block_store_tests() {
// will store only 10 blocks
let store: Arc<dyn BlockStorageInterface> = Arc::new(InmemoryBlockStore::new(10));
// add 10 blocks
for i in 1..11 {
store
.save(create_test_block(i, CommitmentConfig::finalized()))
.await;
}
// check if 10 blocks are added
for i in 1..11 {
assert!(store.get(i, RpcBlockConfig::default()).await.is_some());
}
// add 11th block
store
.save(create_test_block(11, CommitmentConfig::finalized()))
.await;
// can get 11th block
assert!(store.get(11, RpcBlockConfig::default()).await.is_some());
// first block is removed
assert!(store.get(1, RpcBlockConfig::default()).await.is_none());
// cannot add old blocks
store
.save(create_test_block(1, CommitmentConfig::finalized()))
.await;
assert!(store.get(1, RpcBlockConfig::default()).await.is_none());
}

2
history/tests/mod.rs Normal file
View File

@ -0,0 +1,2 @@
mod inmemory_block_store_tests;
mod multiple_strategy_block_store_tests;

View File

@ -0,0 +1,139 @@
use solana_lite_rpc_core::{
structures::produced_block::ProducedBlock,
traits::block_storage_interface::BlockStorageInterface,
};
use solana_lite_rpc_history::{
block_stores::inmemory_block_store::InmemoryBlockStore,
block_stores::multiple_strategy_block_store::MultipleStrategyBlockStorage,
};
use solana_rpc_client_api::config::RpcBlockConfig;
use solana_sdk::{commitment_config::CommitmentConfig, hash::Hash};
use std::sync::Arc;
pub fn create_test_block(slot: u64, commitment_config: CommitmentConfig) -> ProducedBlock {
ProducedBlock {
block_height: slot,
blockhash: Hash::new_unique().to_string(),
parent_slot: slot - 1,
txs: vec![],
block_time: 0,
commitment_config,
leader_id: None,
slot,
}
}
#[tokio::test]
async fn test_in_multiple_stategy_block_store() {
let persistent_store: Arc<dyn BlockStorageInterface> = Arc::new(InmemoryBlockStore::new(10));
let number_of_slots_in_memory = 3;
let block_storage = MultipleStrategyBlockStorage::new(
persistent_store.clone(),
None,
number_of_slots_in_memory,
);
block_storage
.save(create_test_block(1235, CommitmentConfig::confirmed()))
.await;
block_storage
.save(create_test_block(1236, CommitmentConfig::confirmed()))
.await;
assert!(block_storage
.get(1235, RpcBlockConfig::default())
.await
.is_some());
assert!(block_storage
.get(1236, RpcBlockConfig::default())
.await
.is_some());
assert!(persistent_store
.get(1235, RpcBlockConfig::default())
.await
.is_none());
assert!(persistent_store
.get(1236, RpcBlockConfig::default())
.await
.is_none());
block_storage
.save(create_test_block(1235, CommitmentConfig::finalized()))
.await;
block_storage
.save(create_test_block(1236, CommitmentConfig::finalized()))
.await;
block_storage
.save(create_test_block(1237, CommitmentConfig::finalized()))
.await;
assert!(block_storage
.get(1235, RpcBlockConfig::default())
.await
.is_some());
assert!(block_storage
.get(1236, RpcBlockConfig::default())
.await
.is_some());
assert!(block_storage
.get(1237, RpcBlockConfig::default())
.await
.is_some());
assert!(persistent_store
.get(1235, RpcBlockConfig::default())
.await
.is_some());
assert!(persistent_store
.get(1236, RpcBlockConfig::default())
.await
.is_some());
assert!(persistent_store
.get(1237, RpcBlockConfig::default())
.await
.is_some());
assert!(block_storage.get_in_memory_block(1237).await.is_some());
// blocks are replaced by finalized blocks
assert_eq!(
persistent_store
.get(1235, RpcBlockConfig::default())
.await
.unwrap()
.blockhash,
block_storage
.get_in_memory_block(1235)
.await
.unwrap()
.blockhash
);
assert_eq!(
persistent_store
.get(1236, RpcBlockConfig::default())
.await
.unwrap()
.blockhash,
block_storage
.get_in_memory_block(1236)
.await
.unwrap()
.blockhash
);
assert_eq!(
persistent_store
.get(1237, RpcBlockConfig::default())
.await
.unwrap()
.blockhash,
block_storage
.get_in_memory_block(1237)
.await
.unwrap()
.blockhash
);
// no block yet added returns none
assert!(block_storage
.get(1238, RpcBlockConfig::default())
.await
.is_none());
}

View File

@ -37,13 +37,15 @@ lazy_static = { workspace = true }
dotenv = { workspace = true }
async-channel = { workspace = true }
quinn = { workspace = true }
solana-lite-rpc-core = { workspace = true }
solana-lite-rpc-services = { workspace = true }
solana-lite-rpc-cluster-endpoints = { workspace = true }
async-trait = { workspace = true }
tokio = { version = "1.28.2", features = ["full", "fs"]}
tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] }
chrono = { workspace = true }
solana-lite-rpc-core = { workspace = true }
solana-lite-rpc-services = { workspace = true }
solana-lite-rpc-cluster-endpoints = { workspace = true }
solana-lite-rpc-history = { workspace = true }
[dev-dependencies]
bench = { path = "../bench" }

View File

@ -16,13 +16,17 @@ use solana_lite_rpc_core::{
stores::{block_information_store::BlockInformation, data_cache::DataCache, tx_store::TxProps},
AnyhowJoinHandle,
};
use solana_lite_rpc_history::history::History;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::{
config::{RpcContextConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig},
config::{
RpcBlockConfig, RpcContextConfig, RpcEncodingConfigWrapper, RpcRequestAirdropConfig,
RpcSignatureStatusConfig,
},
response::{Response as RpcResponse, RpcBlockhash, RpcResponseContext, RpcVersionInfo},
};
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, slot_history::Slot};
use solana_transaction_status::TransactionStatus;
use solana_transaction_status::{TransactionStatus, UiConfirmedBlock};
use std::{str::FromStr, sync::Arc};
use tokio::net::ToSocketAddrs;
@ -49,6 +53,7 @@ pub struct LiteBridge {
// should be removed
rpc_client: Arc<RpcClient>,
transaction_service: TransactionService,
history: History,
}
impl LiteBridge {
@ -56,11 +61,13 @@ impl LiteBridge {
rpc_client: Arc<RpcClient>,
data_cache: DataCache,
transaction_service: TransactionService,
history: History,
) -> Self {
Self {
rpc_client,
data_cache,
transaction_service,
history,
}
}
@ -159,7 +166,7 @@ impl LiteRpcServer for LiteBridge {
..
} = self
.data_cache
.block_store
.block_information_store
.get_latest_block(commitment_config)
.await;
@ -189,7 +196,7 @@ impl LiteRpcServer for LiteBridge {
let (is_valid, slot) = self
.data_cache
.block_store
.block_information_store
.is_blockhash_valid(&blockhash, commitment)
.await;
@ -218,7 +225,7 @@ impl LiteRpcServer for LiteBridge {
context: RpcResponseContext {
slot: self
.data_cache
.block_store
.block_information_store
.get_latest_block_info(CommitmentConfig::finalized())
.await
.slot,
@ -288,7 +295,7 @@ impl LiteRpcServer for LiteBridge {
let BlockInformation { slot, .. } = self
.data_cache
.block_store
.block_information_store
.get_latest_block(commitment_config)
.await;
Ok(slot)
@ -312,4 +319,19 @@ impl LiteRpcServer for LiteBridge {
Ok(())
}
async fn get_block(
&self,
slot: u64,
config: Option<RpcEncodingConfigWrapper<RpcBlockConfig>>,
) -> crate::rpc::Result<Option<UiConfirmedBlock>> {
let config = config.map_or(RpcBlockConfig::default(), |x| x.convert_to_current());
let block = self.history.block_storage.get(slot, config).await;
if block.is_some() {
// TO DO Convert to UIConfirmed Block
Err(jsonrpsee::core::Error::HttpNotImplemented)
} else {
Ok(None)
}
}
}

View File

@ -29,6 +29,8 @@ use solana_lite_rpc_core::structures::{
};
use solana_lite_rpc_core::types::BlockStream;
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_lite_rpc_history::block_stores::inmemory_block_store::InmemoryBlockStore;
use solana_lite_rpc_history::history::History;
use solana_lite_rpc_services::data_caching_service::DataCachingService;
use solana_lite_rpc_services::tpu_utils::tpu_connection_path::TpuConnectionPath;
use solana_lite_rpc_services::tpu_utils::tpu_service::{TpuService, TpuServiceConfig};
@ -117,9 +119,10 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc<RpcClient>) -> anyhow::R
let finalized_block =
get_latest_block(blocks_notifier.resubscribe(), CommitmentConfig::finalized()).await;
let block_store = BlockInformationStore::new(BlockInformation::from_block(&finalized_block));
let block_information_store =
BlockInformationStore::new(BlockInformation::from_block(&finalized_block));
let data_cache = DataCache {
block_store,
block_information_store,
cluster_info: ClusterInfo::default(),
identity_stakes: IdentityStakes::new(validator_identity.pubkey()),
slot_cache: SlotCache::new(finalized_block.slot),
@ -187,9 +190,18 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc<RpcClient>) -> anyhow::R
let support_service = tokio::spawn(async move { spawner.spawn_support_services().await });
let history = History {
block_storage: Arc::new(InmemoryBlockStore::new(1024)),
};
let bridge_service = tokio::spawn(
LiteBridge::new(rpc_client.clone(), data_cache.clone(), transaction_service)
.start(lite_rpc_http_addr, lite_rpc_ws_addr),
LiteBridge::new(
rpc_client.clone(),
data_cache.clone(),
transaction_service,
history,
)
.start(lite_rpc_http_addr, lite_rpc_ws_addr),
);
tokio::select! {
res = tx_service_jh => {

View File

@ -1,12 +1,13 @@
use jsonrpsee::core::SubscriptionResult;
use jsonrpsee::proc_macros::rpc;
use solana_rpc_client_api::config::{
RpcContextConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig,
RpcBlockConfig, RpcContextConfig, RpcEncodingConfigWrapper, RpcRequestAirdropConfig,
RpcSignatureStatusConfig,
};
use solana_rpc_client_api::response::{Response as RpcResponse, RpcBlockhash, RpcVersionInfo};
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::slot_history::Slot;
use solana_transaction_status::TransactionStatus;
use solana_transaction_status::{TransactionStatus, UiConfirmedBlock};
use crate::configs::{IsBlockHashValidConfig, SendTransactionConfig};
@ -61,4 +62,11 @@ pub trait LiteRpc {
signature: String,
commitment_config: CommitmentConfig,
) -> SubscriptionResult;
#[method(name = "getBlock")]
async fn get_block(
&self,
slot: u64,
config: Option<RpcEncodingConfigWrapper<RpcBlockConfig>>,
) -> Result<Option<UiConfirmedBlock>>;
}

View File

@ -77,7 +77,7 @@ impl ServiceSpawner {
);
service_builder.start(
notifier,
self.data_cache.block_store.clone(),
self.data_cache.block_information_store.clone(),
max_retries,
slot_notifications,
)

View File

@ -53,7 +53,7 @@ impl DataCachingService {
let block = block_notifier.recv().await.expect("Should recv blocks");
data_cache
.block_store
.block_information_store
.add_block(BlockInformation::from_block(&block))
.await;

View File

@ -50,7 +50,7 @@ impl TransactionServiceBuilder {
pub fn start(
self,
notifier: Option<NotificationSender>,
block_store: BlockInformationStore,
block_information_store: BlockInformationStore,
max_retries: usize,
slot_notifications: SlotStream,
) -> (TransactionService, AnyhowJoinHandle) {
@ -89,7 +89,7 @@ impl TransactionServiceBuilder {
TransactionService {
transaction_channel,
replay_channel,
block_store,
block_information_store,
max_retries,
replay_offset: self.tx_replayer.retry_offset,
},
@ -102,7 +102,7 @@ impl TransactionServiceBuilder {
pub struct TransactionService {
pub transaction_channel: Sender<SentTransactionInfo>,
pub replay_channel: UnboundedSender<TransactionReplay>,
pub block_store: BlockInformationStore,
pub block_information_store: BlockInformationStore,
pub max_retries: usize,
pub replay_offset: Duration,
}
@ -126,7 +126,7 @@ impl TransactionService {
last_valid_blockheight,
..
}) = self
.block_store
.block_information_store
.get_block_info(&tx.get_recent_blockhash().to_string())
else {
bail!("Blockhash not found in block store".to_string());