diff --git a/Cargo.lock b/Cargo.lock index f0878583..bf02926b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2530,6 +2530,7 @@ dependencies = [ "thiserror", "tokio", "tokio-postgres", + "tokio-util", "tracing-subscriber", ] @@ -3502,6 +3503,12 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "rangetools" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6e97ca3dbabd81e6033cfe09f0cef37c89f34f2a9223cab7cf99305d46a9633" + [[package]] name = "rayon" version = "1.8.1" @@ -4355,13 +4362,21 @@ dependencies = [ "async-trait", "base64 0.21.7", "bincode", + "bytes", "chrono", "dashmap 5.5.3", + "futures", + "futures-util", "itertools 0.10.5", + "jsonrpsee", "log", "native-tls", "postgres-native-tls", + "rand 0.8.5", + "rangetools", "serde", + "serde_json", + "solana-lite-rpc-cluster-endpoints", "solana-lite-rpc-core", "solana-rpc-client", "solana-rpc-client-api", @@ -4369,6 +4384,8 @@ dependencies = [ "solana-transaction-status", "tokio", "tokio-postgres", + "tokio-util", + "tracing-subscriber", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 3788f672..fca66fc5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ solana-account-decoder = "~1.17.15" solana-ledger = "~1.17.15" solana-program = "~1.17.15" itertools = "0.10.5" +rangetools = "0.1.4" serde = { version = "1.0.160", features = ["derive"] } serde_json = "1.0.96" bincode = "1.3.3" @@ -41,6 +42,7 @@ base64 = "0.21.0" borsh = "0.10.3" thiserror = "1.0.40" futures = "0.3.28" +futures-util = "0.3.28" bytes = "1.4.0" anyhow = "1.0.70" log = "0.4.17" diff --git a/cluster-endpoints/src/json_rpc_subscription.rs b/cluster-endpoints/src/json_rpc_subscription.rs index ccf354be..4684c5d4 100644 --- a/cluster-endpoints/src/json_rpc_subscription.rs +++ b/cluster-endpoints/src/json_rpc_subscription.rs @@ -12,17 +12,22 @@ use std::sync::Arc; pub fn create_json_rpc_polling_subscription( rpc_client: Arc, + num_parallel_tasks: usize, ) -> anyhow::Result<(EndpointStreaming, Vec)> { - let (slot_sx, slot_notifier) = tokio::sync::broadcast::channel(10); - let (block_sx, blocks_notifier) = tokio::sync::broadcast::channel(10); - let (cluster_info_sx, cluster_info_notifier) = tokio::sync::broadcast::channel(10); - let (va_sx, vote_account_notifier) = tokio::sync::broadcast::channel(10); + let (slot_sx, slot_notifier) = tokio::sync::broadcast::channel(16); + let (block_sx, blocks_notifier) = tokio::sync::broadcast::channel(16); + let (cluster_info_sx, cluster_info_notifier) = tokio::sync::broadcast::channel(16); + let (va_sx, vote_account_notifier) = tokio::sync::broadcast::channel(16); let mut endpoint_tasks = poll_slots(rpc_client.clone(), CommitmentConfig::processed(), slot_sx)?; - let mut block_polling_tasks = - poll_block(rpc_client.clone(), block_sx, slot_notifier.resubscribe()); + let mut block_polling_tasks = poll_block( + rpc_client.clone(), + block_sx, + slot_notifier.resubscribe(), + num_parallel_tasks, + ); endpoint_tasks.append(&mut block_polling_tasks); let cluster_info_polling = diff --git a/cluster-endpoints/src/rpc_polling/poll_blocks.rs b/cluster-endpoints/src/rpc_polling/poll_blocks.rs index 0c4d60b8..42d73927 100644 --- a/cluster-endpoints/src/rpc_polling/poll_blocks.rs +++ b/cluster-endpoints/src/rpc_polling/poll_blocks.rs @@ -16,6 +16,8 @@ use solana_transaction_status::{TransactionDetails, UiTransactionEncoding}; use std::{sync::Arc, time::Duration}; use tokio::sync::broadcast::{Receiver, Sender}; +pub const NUM_PARALLEL_TASKS_DEFAULT: usize = 16; + pub async fn process_block( rpc_client: &RpcClient, slot: Slot, @@ -42,6 +44,7 @@ pub fn poll_block( rpc_client: Arc, block_notification_sender: Sender, slot_notification: Receiver, + num_parallel_tasks: usize, ) -> Vec { let mut tasks: Vec = vec![]; @@ -50,7 +53,7 @@ pub fn poll_block( let (block_schedule_queue_sx, block_schedule_queue_rx) = async_channel::unbounded::<(Slot, CommitmentConfig)>(); - for _i in 0..16 { + for _i in 0..num_parallel_tasks { let block_notification_sender = block_notification_sender.clone(); let rpc_client = rpc_client.clone(); let block_schedule_queue_rx = block_schedule_queue_rx.clone(); diff --git a/cluster-endpoints/src/rpc_polling/poll_slots.rs b/cluster-endpoints/src/rpc_polling/poll_slots.rs index d8ffc84e..67c6aef2 100644 --- a/cluster-endpoints/src/rpc_polling/poll_slots.rs +++ b/cluster-endpoints/src/rpc_polling/poll_slots.rs @@ -78,7 +78,7 @@ pub fn poll_slots( } Ok(None) => log::error!("got nothing from slot update notifier"), Err(err) => { - log::warn!("failed to receive slot update: {err}"); + log::debug!("timeout on receive slot update: {err}"); // force update the slot // estimated slot should not go ahead more than 32 slots // this is because it may be a slot block diff --git a/core/src/iterutils.rs b/core/src/iterutils.rs new file mode 100644 index 00000000..011f68c4 --- /dev/null +++ b/core/src/iterutils.rs @@ -0,0 +1,17 @@ +pub enum Uniqueness { + ExactlyOne, + Multiple(usize), + Empty, +} + +impl Uniqueness { + pub fn inspect_len(len: usize) -> Uniqueness { + if len == 1 { + Uniqueness::ExactlyOne + } else if len == 0 { + Uniqueness::Empty + } else { + Uniqueness::Multiple(len) + } + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 9f517722..5f236fb4 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,5 +1,6 @@ pub mod commitment_utils; pub mod encoding; +pub mod iterutils; pub mod keypair_loader; pub mod network_utils; pub mod solana_utils; diff --git a/core/src/stores/block_information_store.rs b/core/src/stores/block_information_store.rs index 610cbe08..52446c8e 100644 --- a/core/src/stores/block_information_store.rs +++ b/core/src/stores/block_information_store.rs @@ -1,11 +1,8 @@ use dashmap::DashMap; use log::info; -use solana_sdk::{ - clock::MAX_RECENT_BLOCKHASHES, - commitment_config::{CommitmentConfig, CommitmentLevel}, - slot_history::Slot, -}; +use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; +use solana_sdk::{clock::MAX_RECENT_BLOCKHASHES, slot_history::Slot}; use std::sync::Arc; use tokio::sync::RwLock; diff --git a/core/src/structures/epoch.rs b/core/src/structures/epoch.rs index c0ea1572..257b4767 100644 --- a/core/src/structures/epoch.rs +++ b/core/src/structures/epoch.rs @@ -4,6 +4,7 @@ use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_sdk::epoch_info::EpochInfo; use solana_sdk::slot_history::Slot; use solana_sdk::sysvar::epoch_schedule::EpochSchedule; +use std::fmt::Display; use std::sync::Arc; #[derive(Debug, Copy, Clone, PartialOrd, PartialEq, Eq, Ord)] @@ -14,8 +15,11 @@ pub struct Epoch { pub absolute_slot: Slot, } +#[derive(Debug, Copy, Clone, PartialOrd, PartialEq, Eq, Ord, Hash)] +pub struct EpochRef(u64); + impl Epoch { - pub fn into_epoch_info(&self, block_height: u64, transaction_count: Option) -> EpochInfo { + pub fn as_epoch_info(&self, block_height: u64, transaction_count: Option) -> EpochInfo { EpochInfo { epoch: self.epoch, slot_index: self.slot_index, @@ -27,6 +31,32 @@ impl Epoch { } } +impl Display for EpochRef { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.get_epoch()) + } +} + +impl From for EpochRef { + fn from(epoch: Epoch) -> Self { + Self(epoch.epoch) + } +} + +impl EpochRef { + pub fn new(epoch: u64) -> Self { + Self(epoch) + } + + pub fn get_epoch(&self) -> u64 { + self.0 + } + + pub fn get_next_epoch(&self) -> Self { + Self(self.0 + 1) + } +} + #[derive(Clone)] pub struct EpochCache { epoch_schedule: Arc, diff --git a/core/src/structures/produced_block.rs b/core/src/structures/produced_block.rs index 8dbda402..4bbde68d 100644 --- a/core/src/structures/produced_block.rs +++ b/core/src/structures/produced_block.rs @@ -1,6 +1,6 @@ +use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::{ borsh0_10::try_from_slice_unchecked, - commitment_config::CommitmentConfig, compute_budget::{self, ComputeBudgetInstruction}, slot_history::Slot, transaction::TransactionError, @@ -23,7 +23,8 @@ pub struct TransactionInfo { pub message: String, } -#[derive(Default, Debug, Clone)] +// TODO try to remove Clone +#[derive(Debug, Clone)] pub struct ProducedBlock { pub transactions: Vec, pub leader_id: Option, diff --git a/core/src/traits/block_storage_interface.rs b/core/src/traits/block_storage_interface.rs deleted file mode 100644 index 5aacd184..00000000 --- a/core/src/traits/block_storage_interface.rs +++ /dev/null @@ -1,19 +0,0 @@ -use crate::structures::produced_block::ProducedBlock; -use anyhow::Result; -use async_trait::async_trait; -use solana_rpc_client_api::config::RpcBlockConfig; -use solana_sdk::slot_history::Slot; -use std::{ops::Range, sync::Arc}; - -#[async_trait] -pub trait BlockStorageInterface: Send + Sync { - // will save a block - async fn save(&self, block: ProducedBlock) -> Result<()>; - // will get a block - async fn get(&self, slot: Slot, config: RpcBlockConfig) -> Result; - // will get range of slots that are stored in the storage - async fn get_slot_range(&self) -> Range; -} - -pub type BlockStorageImpl = Arc; -pub const BLOCK_NOT_FOUND: &str = "Block not found"; diff --git a/core/src/traits/mod.rs b/core/src/traits/mod.rs index c1746f35..fd724272 100644 --- a/core/src/traits/mod.rs +++ b/core/src/traits/mod.rs @@ -1,3 +1,2 @@ -pub mod block_storage_interface; pub mod leaders_fetcher_interface; pub mod subscription_sink; diff --git a/history/Cargo.toml b/history/Cargo.toml index 08758b88..e6cdf361 100644 --- a/history/Cargo.toml +++ b/history/Cargo.toml @@ -14,17 +14,30 @@ solana-rpc-client = { workspace = true } dashmap = {workspace = true} async-trait = { workspace = true } -tokio = "1.*" +tokio = { version = "1.28.2", features = ["full", "fs"]} +tokio-util = "0.7" solana-lite-rpc-core = {workspace = true} +solana-lite-rpc-cluster-endpoints = {workspace = true} solana-rpc-client-api = {workspace = true} native-tls = { workspace = true } postgres-native-tls = { workspace = true } anyhow = { workspace = true } log = {workspace = true} +tracing-subscriber = { workspace = true, features = ["std", "env-filter"] } chrono = {workspace = true} +serde = { workspace = true } +serde_json = { workspace = true } +jsonrpsee = { workspace = true } bincode = {workspace = true} base64 = {workspace = true} itertools = {workspace = true} +rangetools = {workspace = true} tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] } -serde = { workspace = true } +futures = {workspace = true} +futures-util = {workspace = true} +bytes = "1.5.0" +rand = "0.8.5" + +[dev-dependencies] +tracing-subscriber = { workspace = true } diff --git a/history/examples/bench_postgres_lowlevel.rs b/history/examples/bench_postgres_lowlevel.rs new file mode 100644 index 00000000..eb622e47 --- /dev/null +++ b/history/examples/bench_postgres_lowlevel.rs @@ -0,0 +1,130 @@ +use bytes::Bytes; +use futures_util::pin_mut; +use log::info; +use solana_lite_rpc_history::postgres::postgres_config::PostgresSessionConfig; +use solana_lite_rpc_history::postgres::postgres_session::PostgresWriteSession; +use solana_sdk::blake3::Hash; +use solana_sdk::signature::Signature; +use std::sync::Arc; +use tokio::time::Instant; +use tokio_postgres::binary_copy::BinaryCopyInWriter; +use tokio_postgres::types::Type; +use tokio_postgres::CopyInSink; + +pub async fn copy_in(client: &tokio_postgres::Client) -> anyhow::Result<()> { + let statement = r#" + COPY transactions_copyin( + signature, slot, err, cu_requested, prioritization_fees, cu_consumed, recent_blockhash, message + ) FROM STDIN BINARY + "#; + + // BinaryCopyInWriter + // https://github.com/sfackler/rust-postgres/blob/master/tokio-postgres/tests/test/binary_copy.rs + let sink: CopyInSink = client.copy_in(statement).await.unwrap(); + + let sig = Signature::new_unique().to_string(); + let slot = 200_000_000_i64; + let err = None::<&str>; + let cu_requested = None::; + let prioritization_fees = None::; + let cu_consumed = None::; + let recent_blockhash = Hash::new(&[1u8; 32]).to_string(); + let message = ""; + + let started = Instant::now(); + let writer = BinaryCopyInWriter::new( + sink, + &[ + Type::TEXT, + Type::INT8, + Type::TEXT, + Type::INT8, + Type::INT8, + Type::INT8, + Type::TEXT, + Type::TEXT, + ], + ); + pin_mut!(writer); + + const COUNT: usize = 100000; + for i in 0..COUNT { + let slot_x = slot + i as i64; + writer + .as_mut() + .write(&[ + &sig, + &slot_x, + &err, + &cu_requested, + &prioritization_fees, + &cu_consumed, + &recent_blockhash, + &message, + ]) + .await + .unwrap(); + } + + writer.finish().await.unwrap(); + + info!( + "wrote {} rows in {:.2}ms", + COUNT, + started.elapsed().as_secs_f64() * 1000.0 + ); + + Ok(()) +} + +#[tokio::main] +pub async fn main() { + tracing_subscriber::fmt::init(); + + let pg_session_config = PostgresSessionConfig::new_for_tests(); + let session = PostgresWriteSession::new(pg_session_config) + .await + .unwrap() + .get_write_session() + .await; + + let ddl_statement = r#" + CREATE TEMP TABLE transactions_copyin + ( + signature text NOT NULL, + slot bigint NOT NULL, + err text , + cu_requested bigint, + prioritization_fees bigint, + cu_consumed bigint, + recent_blockhash text NOT NULL, + message text NOT NULL + ) + "#; + + session.execute(ddl_statement, &[]).await.unwrap(); + + let row_count_before = count_rows(session.client.clone()).await; + + let started = Instant::now(); + + copy_in(session.client.as_ref()).await.unwrap(); + + info!( + "copyin write rows in {:.2}ms", + started.elapsed().as_secs_f64() * 1000.0 + ); + + let row_count_after = count_rows(session.client.clone()).await; + info!("total: {}", row_count_after); + info!("inserted: {}", row_count_after - row_count_before); +} + +async fn count_rows(client: Arc) -> i64 { + let row = client + .query_one("SELECT count(*) FROM transactions_copyin", &[]) + .await + .unwrap(); + + row.get::<_, i64>(0) +} diff --git a/history/examples/bench_postgres_read_load.rs b/history/examples/bench_postgres_read_load.rs new file mode 100644 index 00000000..bd8031c4 --- /dev/null +++ b/history/examples/bench_postgres_read_load.rs @@ -0,0 +1,119 @@ +/// +/// test program to query postgres (get blocks) behind the tip of the slots +/// +use anyhow::bail; +use itertools::Itertools; +use log::info; +use solana_lite_rpc_cluster_endpoints::endpoint_stremers::EndpointStreaming; +use solana_lite_rpc_cluster_endpoints::json_rpc_subscription::create_json_rpc_polling_subscription; +use solana_lite_rpc_core::structures::epoch::{EpochCache, EpochRef}; +use solana_lite_rpc_core::types::SlotStream; +use solana_lite_rpc_history::postgres::postgres_epoch::PostgresEpoch; +use solana_lite_rpc_history::postgres::postgres_session::PostgresSession; +use solana_rpc_client::nonblocking::rpc_client::RpcClient; +use solana_sdk::clock::Slot; +use std::sync::Arc; +use tokio::sync::watch; +use tokio::sync::watch::Sender; +use tokio::task::JoinHandle; + +const NUM_PARALLEL_TASKS: usize = 1; + +#[tokio::main(flavor = "multi_thread", worker_threads = 16)] +pub async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + + let sessions = vec![ + PostgresSession::new_from_env().await.unwrap(), + // PostgresSession::new_from_env().await.unwrap(), + // PostgresSession::new_from_env().await.unwrap(), + // PostgresSession::new_from_env().await.unwrap(), + // PostgresSession::new_from_env().await.unwrap(), + // PostgresSession::new_from_env().await.unwrap(), + // PostgresSession::new_from_env().await.unwrap(), + // PostgresSession::new_from_env().await.unwrap(), + ]; + + let rpc_url = std::env::var("RPC_URL").expect("env var RPC_URL is mandatory"); + let rpc_client = Arc::new(RpcClient::new(rpc_url)); + let epoch_data = EpochCache::bootstrap_epoch(&rpc_client).await.unwrap(); + let (subscriptions, _cluster_endpoint_tasks) = + create_json_rpc_polling_subscription(rpc_client.clone(), NUM_PARALLEL_TASKS).unwrap(); + + let EndpointStreaming { slot_notifier, .. } = subscriptions; + + let (tx, mut rx) = watch::channel(0); + + let _jh2 = slot_listener(slot_notifier.resubscribe(), tx); + + let mut ticker = tokio::time::interval(tokio::time::Duration::from_secs(1)); + loop { + let slot = *rx.borrow_and_update(); + if slot == 0 { + continue; + } + + info!("processed slot: {}", slot); + + // ATM we are 4000 slots behind ... + // TODO reduce 4000 to 0 + let slot = 234332620; // literpc3 - local + // let slot = 231541684; + let delta = 50 + rand::random::() % 100; + let query_slot = slot - delta; + info!("query slot (-{}): {}", delta, query_slot); + + let (epoch_cache, _) = &epoch_data; + let epoch: EpochRef = epoch_cache.get_epoch_at_slot(query_slot).into(); + + let futures = (0..3) + .map(|i| { + let si = rand::random::() % sessions.len(); + let session = sessions[si].clone(); + query_database(session, epoch, query_slot + i) + // query_database_simple(session) + }) + .collect_vec(); + + futures_util::future::join_all(futures).await; + + ticker.tick().await; + let result = rx.changed().await; + if result.is_err() { + bail!("Watcher failed - sender was dropped!"); + } + } +} + +async fn query_database(postgres_session: PostgresSession, epoch: EpochRef, slot: Slot) { + let statement = format!( + r#" + SELECT min(slot),max(slot) FROM {schema}.transactions WHERE slot = {slot} + "#, + schema = PostgresEpoch::build_schema_name(epoch), + slot = slot, + ); + + let started = tokio::time::Instant::now(); + let result = postgres_session.query_list(&statement, &[]).await.unwrap(); + info!( + "num_rows: {} (took {:.2}ms)", + result.len(), + started.elapsed().as_secs_f64() * 1000.0 + ); +} + +fn slot_listener(slot_notifier: SlotStream, watch: Sender) -> JoinHandle<()> { + tokio::spawn(async move { + let mut slot_notifier = slot_notifier; + loop { + match slot_notifier.recv().await { + Ok(slot_update) => { + // info!("slot -> {}", slot_update.processed_slot); + watch.send(slot_update.processed_slot).unwrap(); + } + Err(_err) => {} + } + } + }) +} diff --git a/history/examples/bench_postgres_simple_select.rs b/history/examples/bench_postgres_simple_select.rs new file mode 100644 index 00000000..73f45e3d --- /dev/null +++ b/history/examples/bench_postgres_simple_select.rs @@ -0,0 +1,78 @@ +use itertools::Itertools; +/// +/// test program to query postgres the simples possible way +/// +use log::info; +use solana_lite_rpc_history::postgres::postgres_config::PostgresSessionConfig; +use solana_lite_rpc_history::postgres::postgres_session::PostgresSession; + +#[tokio::main(flavor = "multi_thread", worker_threads = 16)] +pub async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + + let pg_session_config = PostgresSessionConfig::new_for_tests(); + + let single_session = PostgresSession::new(pg_session_config.clone()) + .await + .unwrap(); + // run one query + query_database_simple(single_session).await; + info!("single query test ... done"); + + // run parallel queries + parallel_queries(pg_session_config).await; + info!("parallel queries test ... done"); + + Ok(()) +} + +async fn parallel_queries(pg_session_config: PostgresSessionConfig) { + let many_sessions = vec![ + PostgresSession::new(pg_session_config.clone()) + .await + .unwrap(), + PostgresSession::new(pg_session_config.clone()) + .await + .unwrap(), + PostgresSession::new(pg_session_config.clone()) + .await + .unwrap(), + PostgresSession::new(pg_session_config.clone()) + .await + .unwrap(), + PostgresSession::new(pg_session_config.clone()) + .await + .unwrap(), + PostgresSession::new(pg_session_config.clone()) + .await + .unwrap(), + PostgresSession::new(pg_session_config.clone()) + .await + .unwrap(), + PostgresSession::new(pg_session_config.clone()) + .await + .unwrap(), + ]; + + let futures = (0..many_sessions.len()) + .map(|si| { + let session = many_sessions[si].clone(); + query_database_simple(session) + }) + .collect_vec(); + + futures_util::future::join_all(futures).await; +} + +async fn query_database_simple(postgres_session: PostgresSession) { + let statement = "SELECT 1"; + + let started = tokio::time::Instant::now(); + let result = postgres_session.query_list(statement, &[]).await.unwrap(); + let elapsed = started.elapsed().as_secs_f64(); + info!( + "num_rows: {} (took {:.2}ms)", + result.len(), + elapsed * 1000.0 + ); +} diff --git a/history/src/bin/blockstore-importer-service.rs b/history/src/bin/blockstore-importer-service.rs new file mode 100644 index 00000000..0812f0dd --- /dev/null +++ b/history/src/bin/blockstore-importer-service.rs @@ -0,0 +1,4 @@ +#[tokio::main(flavor = "multi_thread", worker_threads = 16)] +async fn main() { + tracing_subscriber::fmt::init(); +} diff --git a/history/src/block_stores/faithful_block_store.rs b/history/src/block_stores/faithful_block_store.rs new file mode 100644 index 00000000..f3ec8931 --- /dev/null +++ b/history/src/block_stores/faithful_block_store.rs @@ -0,0 +1,54 @@ +use anyhow::bail; +use log::warn; +use solana_lite_rpc_core::structures::produced_block::ProducedBlock; +use solana_rpc_client::nonblocking::rpc_client::RpcClient; +use solana_rpc_client_api::config::RpcBlockConfig; +use solana_sdk::clock::Slot; +use solana_sdk::commitment_config::CommitmentConfig; +use solana_transaction_status::{TransactionDetails, UiTransactionEncoding}; +use std::ops::RangeInclusive; +use std::sync::Arc; + +pub struct FaithfulBlockStore { + faithful_rpc_client: Arc, // to fetch legacy blocks from faithful +} + +impl FaithfulBlockStore { + pub fn new(faithful_rpc_client: Arc) -> Self { + Self { + faithful_rpc_client, + } + } + + pub fn get_slot_range(&self) -> RangeInclusive { + // TODO + warn!("slot_range not implemented for FaithfulBlockStore"); + RangeInclusive::new(1, 0) // empty + } + + pub async fn get_block(&self, slot: Slot) -> anyhow::Result { + // TODO check what parameters we want + let faithful_config = RpcBlockConfig { + encoding: Some(UiTransactionEncoding::Base58), + transaction_details: Some(TransactionDetails::Full), + rewards: None, + commitment: None, + max_supported_transaction_version: None, + }; + + match self + .faithful_rpc_client + .get_block_with_config(slot, faithful_config) + .await + { + Ok(block) => Ok(ProducedBlock::from_ui_block( + block, + slot, + CommitmentConfig::finalized(), + )), + Err(err) => { + bail!(format!("Block {} not found in faithful: {}", slot, err)); + } + } + } +} diff --git a/history/src/block_stores/inmemory_block_store.rs b/history/src/block_stores/inmemory_block_store.rs deleted file mode 100644 index bfacd419..00000000 --- a/history/src/block_stores/inmemory_block_store.rs +++ /dev/null @@ -1,83 +0,0 @@ -use async_trait::async_trait; -use solana_lite_rpc_core::{ - commitment_utils::Commitment, - structures::produced_block::ProducedBlock, - traits::block_storage_interface::{BlockStorageInterface, BLOCK_NOT_FOUND}, -}; -use solana_rpc_client_api::config::RpcBlockConfig; -use solana_sdk::slot_history::Slot; -use std::{collections::BTreeMap, ops::Range}; -use tokio::sync::RwLock; - -pub struct InmemoryBlockStore { - block_storage: RwLock>, - number_of_blocks_to_store: usize, -} - -impl InmemoryBlockStore { - pub fn new(number_of_blocks_to_store: usize) -> Self { - Self { - number_of_blocks_to_store, - block_storage: RwLock::new(BTreeMap::new()), - } - } - - pub async fn store(&self, block: ProducedBlock) { - let slot = block.slot; - let mut block_storage = self.block_storage.write().await; - let min_slot = match block_storage.first_key_value() { - Some((slot, _)) => *slot, - None => 0, - }; - if slot >= min_slot { - // overwrite block only if confirmation has changed - match block_storage.get_mut(&slot) { - Some(x) => { - let commitment_store = Commitment::from(x.commitment_config); - let commitment_block = Commitment::from(block.commitment_config); - let overwrite = commitment_block > commitment_store; - if overwrite { - *x = block; - } - } - None => { - block_storage.insert(slot, block); - } - } - if block_storage.len() > self.number_of_blocks_to_store { - block_storage.remove(&min_slot); - } - } - } -} - -#[async_trait] -impl BlockStorageInterface for InmemoryBlockStore { - async fn save(&self, block: ProducedBlock) -> anyhow::Result<()> { - self.store(block).await; - Ok(()) - } - - async fn get(&self, slot: Slot, _: RpcBlockConfig) -> anyhow::Result { - self.block_storage - .read() - .await - .get(&slot) - .cloned() - .ok_or(anyhow::Error::msg(BLOCK_NOT_FOUND)) - } - - async fn get_slot_range(&self) -> Range { - let lk = self.block_storage.read().await; - let first = lk.first_key_value(); - let last = lk.last_key_value(); - if let Some((first_slot, _)) = first { - let Some((last_slot, _)) = last else { - return Range::default(); - }; - *first_slot..(*last_slot + 1) - } else { - Range::default() - } - } -} diff --git a/history/src/block_stores/mod.rs b/history/src/block_stores/mod.rs index ab21ea3b..610a07ff 100644 --- a/history/src/block_stores/mod.rs +++ b/history/src/block_stores/mod.rs @@ -1,3 +1,3 @@ -pub mod inmemory_block_store; +pub mod faithful_block_store; pub mod multiple_strategy_block_store; pub mod postgres_block_store; diff --git a/history/src/block_stores/multiple_strategy_block_store.rs b/history/src/block_stores/multiple_strategy_block_store.rs index 5f9b7fb9..31717de7 100644 --- a/history/src/block_stores/multiple_strategy_block_store.rs +++ b/history/src/block_stores/multiple_strategy_block_store.rs @@ -1,141 +1,146 @@ -// A mixed block store, -// Stores confirmed blocks in memory -// Finalized blocks in long term storage of your choice -// Fetches legacy blocks from faithful - -use crate::block_stores::inmemory_block_store::InmemoryBlockStore; -use anyhow::{bail, Result}; -use async_trait::async_trait; -use solana_lite_rpc_core::{ - commitment_utils::Commitment, - structures::produced_block::ProducedBlock, - traits::block_storage_interface::{BlockStorageImpl, BlockStorageInterface, BLOCK_NOT_FOUND}, -}; +use crate::block_stores::faithful_block_store::FaithfulBlockStore; +use crate::block_stores::postgres_block_store::PostgresBlockStore; +use anyhow::{bail, Context, Result}; +use log::{debug, trace}; +use solana_lite_rpc_core::structures::produced_block::ProducedBlock; use solana_rpc_client::nonblocking::rpc_client::RpcClient; -use solana_rpc_client_api::config::RpcBlockConfig; -use solana_sdk::{commitment_config::CommitmentConfig, slot_history::Slot}; -use std::{ - ops::Range, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, -}; +use solana_sdk::slot_history::Slot; +use std::ops::{Deref, RangeInclusive}; +use std::sync::Arc; +#[derive(Debug, Clone)] +pub enum BlockSource { + // serve two epochs from postgres + RecentEpochDatabase, + // serve epochs older than two from faithful service + FaithfulArchive, +} + +#[derive(Debug, Clone)] +pub struct BlockStorageData { + // note: commitment_config is the actual commitment level + pub block: ProducedBlock, + // meta data + pub result_source: BlockSource, +} + +impl Deref for BlockStorageData { + type Target = ProducedBlock; + + fn deref(&self) -> &Self::Target { + &self.block + } +} + +// you might need to add a read-cache instead pub struct MultipleStrategyBlockStorage { - inmemory_for_storage: InmemoryBlockStore, // for confirmed blocks - persistent_block_storage: BlockStorageImpl, // for persistent block storage - faithful_rpc_client: Option>, // to fetch legacy blocks from faithful - last_confirmed_slot: Arc, + persistent_block_storage: PostgresBlockStore, // for persistent block storage + // note supported ATM + faithful_block_storage: Option, // to fetch legacy blocks from faithful + // last_confirmed_slot: Arc, } impl MultipleStrategyBlockStorage { pub fn new( - persistent_block_storage: BlockStorageImpl, - faithful_rpc_client: Option>, - number_of_slots_in_memory: usize, + persistent_block_storage: PostgresBlockStore, + _faithful_rpc_client: Option>, ) -> Self { Self { - inmemory_for_storage: InmemoryBlockStore::new(number_of_slots_in_memory), persistent_block_storage, - faithful_rpc_client, - last_confirmed_slot: Arc::new(AtomicU64::new(0)), + // faithful not used ATM + faithful_block_storage: None, + // faithful_block_storage: faithful_rpc_client.map(|rpc| FaithfulBlockStore::new(rpc)), } } - pub async fn get_in_memory_block(&self, slot: Slot) -> anyhow::Result { - self.inmemory_for_storage - .get( - slot, - RpcBlockConfig { - encoding: None, - transaction_details: None, - rewards: None, - commitment: None, - max_supported_transaction_version: None, - }, - ) - .await - } -} + // we need to build the slots from right to left + pub async fn get_slot_range(&self) -> RangeInclusive { + // merge them + let persistent_storage_range = self.persistent_block_storage.get_slot_range().await; + trace!("Persistent storage range: {:?}", persistent_storage_range); -#[async_trait] -impl BlockStorageInterface for MultipleStrategyBlockStorage { - async fn save(&self, block: ProducedBlock) -> Result<()> { - let slot = block.slot; - let commitment = Commitment::from(block.commitment_config); - match commitment { - Commitment::Confirmed | Commitment::Processed => { - self.inmemory_for_storage.save(block).await?; + let mut lower = *persistent_storage_range.start(); + + if let Some(faithful_block_storage) = &self.faithful_block_storage { + let faithful_storage_range = faithful_block_storage.get_slot_range(); + trace!("Faithful storage range: {:?}", faithful_storage_range); + if lower - faithful_storage_range.end() <= 1 { + // move the lower bound to the left + lower = lower.min(*faithful_storage_range.start()); } - Commitment::Finalized => { - let block_in_mem = self.get_in_memory_block(block.slot).await; - match block_in_mem { - Ok(block_in_mem) => { - // check if inmemory blockhash is same as finalized, update it if they are not - // we can have two machines with same identity publishing two different blocks on same slot - if block_in_mem.blockhash != block.blockhash { - self.inmemory_for_storage.save(block.clone()).await?; - } - } - Err(_) => self.inmemory_for_storage.save(block.clone()).await?, - } - self.persistent_block_storage.save(block).await?; - } - }; - if slot > self.last_confirmed_slot.load(Ordering::Relaxed) { - self.last_confirmed_slot.store(slot, Ordering::Relaxed); } - Ok(()) + + let merged = RangeInclusive::new(lower, *persistent_storage_range.end()); + trace!("Merged range from database + faithful: {:?}", merged); + + merged } - async fn get( + // lookup confirmed or finalized block from either our blockstore or faithful + // TODO find better method name + pub async fn query_block( &self, slot: solana_sdk::slot_history::Slot, - config: RpcBlockConfig, - ) -> Result { - let last_confirmed_slot = self.last_confirmed_slot.load(Ordering::Relaxed); - if slot > last_confirmed_slot { - bail!(BLOCK_NOT_FOUND); - } else { - let range = self.inmemory_for_storage.get_slot_range().await; - if range.contains(&slot) { - let block = self.inmemory_for_storage.get(slot, config).await; - if block.is_ok() { - return block; - } - } - // TODO: Define what data is expected that is definetly not in persistant block storage like data after epoch - 1 - // check persistant block - let persistent_block_range = self.persistent_block_storage.get_slot_range().await; - if persistent_block_range.contains(&slot) { - self.persistent_block_storage.get(slot, config).await - } else if let Some(faithful_rpc_client) = self.faithful_rpc_client.clone() { - match faithful_rpc_client - .get_block_with_config(slot, config) + ) -> Result { + // TODO this check is optional and might be moved to the caller + // if slot > last_confirmed_slot { + // bail!(format!( + // "Block {} not found (last_confirmed_slot={})", + // slot, last_confirmed_slot + // )); + // } + + // TODO: use a smarter strategy to decide about the cutoff + // current strategy: + // 1. check if requested slot is in min-max range served from Postgres + // 2.1. if yes; fetch from Postgres + // 2.2. if not: try to fetch from faithful + + match self.persistent_block_storage.is_block_in_range(slot).await { + true => { + debug!( + "Assume block {} to be available in persistent block-storage", + slot, + ); + let lookup = self + .persistent_block_storage + .query(slot) .await - { - Ok(block) => Ok(ProducedBlock::from_ui_block( - block, - slot, - CommitmentConfig::finalized(), - )), - Err(_) => bail!(BLOCK_NOT_FOUND), - } - } else { - bail!(BLOCK_NOT_FOUND); + .context(format!("block {} not found although it was in range", slot)); + + return lookup.map(|b| BlockStorageData { + block: b, + result_source: BlockSource::RecentEpochDatabase, + }); + } + false => { + debug!( + "Block {} not found in persistent block-storage - continue", + slot + ); } } - } - async fn get_slot_range(&self) -> Range { - let in_memory = self.inmemory_for_storage.get_slot_range().await; - // if faithful is available we assume that we have all the blocks - if self.faithful_rpc_client.is_some() { - 0..in_memory.end + if let Some(faithful_block_storage) = &self.faithful_block_storage { + match faithful_block_storage.get_block(slot).await { + Ok(block) => { + debug!( + "Lookup for block {} successful in faithful block-storage", + slot + ); + + Ok(BlockStorageData { + block, + result_source: BlockSource::FaithfulArchive, + }) + } + Err(_) => { + debug!("Block {} not found in faithful storage - giving up", slot); + bail!(format!("Block {} not found in faithful", slot)); + } + } } else { - let persistent_storage_range = self.persistent_block_storage.get_slot_range().await; - persistent_storage_range.start..in_memory.end + bail!(format!("Block {} not found - faithful not available", slot)); } } } diff --git a/history/src/block_stores/postgres_block_store.rs b/history/src/block_stores/postgres_block_store.rs index 003e49e4..4042e14f 100644 --- a/history/src/block_stores/postgres_block_store.rs +++ b/history/src/block_stores/postgres_block_store.rs @@ -1,101 +1,562 @@ -use std::sync::Arc; +use std::collections::HashMap; +use std::ops::RangeInclusive; +use std::time::{Duration, Instant}; -use anyhow::Result; -use async_trait::async_trait; +use anyhow::{bail, Context, Result}; use itertools::Itertools; -use solana_lite_rpc_core::{ - structures::{epoch::EpochCache, produced_block::ProducedBlock}, - traits::block_storage_interface::BlockStorageInterface, -}; -use solana_rpc_client_api::config::RpcBlockConfig; -use solana_sdk::{slot_history::Slot, stake_history::Epoch}; -use tokio::sync::RwLock; +use log::{debug, info, trace, warn}; +use solana_lite_rpc_core::structures::epoch::EpochRef; +use solana_lite_rpc_core::structures::{epoch::EpochCache, produced_block::ProducedBlock}; +use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; +use solana_sdk::slot_history::Slot; +use tokio_postgres::error::SqlState; +use crate::postgres::postgres_config::PostgresSessionConfig; +use crate::postgres::postgres_epoch::{PostgresEpoch, EPOCH_SCHEMA_PREFIX}; +use crate::postgres::postgres_session::{PostgresSession, PostgresWriteSession}; use crate::postgres::{ postgres_block::PostgresBlock, postgres_session::PostgresSessionCache, postgres_transaction::PostgresTransaction, }; +const LITERPC_ROLE: &str = "r_literpc"; +const PARALLEL_WRITE_SESSIONS: usize = 4; +const MIN_WRITE_CHUNK_SIZE: usize = 500; + #[derive(Default, Clone, Copy)] pub struct PostgresData { - from_slot: Slot, - to_slot: Slot, - current_epoch: Epoch, + // from_slot: Slot, + // to_slot: Slot, + // current_epoch: Epoch, } +#[derive(Clone)] pub struct PostgresBlockStore { session_cache: PostgresSessionCache, - epoch_cache: EpochCache, - postgres_data: Arc>, + // use this session only for the write path! + write_sessions: Vec, + epoch_schedule: EpochCache, + // postgres_data: Arc>, } impl PostgresBlockStore { - pub async fn start_new_epoch(&self, schema: &String) -> Result<()> { - // create schema for new epoch - let session = self - .session_cache + pub async fn new(epoch_schedule: EpochCache, pg_session_config: PostgresSessionConfig) -> Self { + let session_cache = PostgresSessionCache::new(pg_session_config.clone()) + .await + .unwrap(); + let mut write_sessions = Vec::new(); + for _i in 0..PARALLEL_WRITE_SESSIONS { + write_sessions.push( + PostgresWriteSession::new(pg_session_config.clone()) + .await + .unwrap(), + ); + } + assert!( + !write_sessions.is_empty(), + "must have at least one write session" + ); + + Self::check_role(&session_cache).await; + + Self { + session_cache, + write_sessions, + epoch_schedule, + // postgres_data, + } + } + + async fn check_role(session_cache: &PostgresSessionCache) { + let role = LITERPC_ROLE; + let statement = format!("SELECT 1 FROM pg_roles WHERE rolname='{role}'"); + let count = session_cache .get_session() .await - .expect("should get new postgres session"); + .expect("must get session") + .execute(&statement, &[]) + .await + .expect("must execute query to check for role"); - let statement = format!("CREATE SCHEMA {};", schema); - session.execute(&statement, &[]).await?; + if count == 0 { + panic!( + "Missing mandatory postgres role '{}' for Lite RPC - see permissions.sql", + role + ); + } else { + info!("Self check - found postgres role '{}'", role); + } + } + + // return true if schema was actually created + async fn start_new_epoch_if_necessary(&self, epoch: EpochRef) -> Result { + // create schema for new epoch + let schema_name = PostgresEpoch::build_schema_name(epoch); + let session = self.get_session().await; + + let statement = PostgresEpoch::build_create_schema_statement(epoch); + // note: requires GRANT CREATE ON DATABASE xyz + let result_create_schema = session.execute_simple(&statement).await; + if let Err(err) = result_create_schema { + if err + .code() + .map(|sqlstate| sqlstate == &SqlState::DUPLICATE_SCHEMA) + .unwrap_or(false) + { + // TODO: do we want to allow this; continuing with existing epoch schema might lead to inconsistent data in blocks and transactions table + info!( + "Schema {} for epoch {} already exists - data will be appended", + schema_name, epoch + ); + return Ok(false); + } else { + return Err(err).context("create schema for new epoch"); + } + } + + // set permissions for new schema + let statement = build_assign_permissions_statements(epoch); + session + .execute_simple(&statement) + .await + .context("Set postgres permissions for new schema")?; // Create blocks table - let statement = PostgresBlock::create_statement(schema); - session.execute(&statement, &[]).await?; + let statement = PostgresBlock::build_create_table_statement(epoch); + session + .execute_simple(&statement) + .await + .context("create blocks table for new epoch")?; // create transaction table - let statement = PostgresTransaction::create_statement(schema); - session.execute(&statement, &[]).await?; + let statement = PostgresTransaction::build_create_table_statement(epoch); + session + .execute_simple(&statement) + .await + .context("create transaction table for new epoch")?; + + // add foreign key constraint between transactions and blocks + let statement = PostgresTransaction::build_foreign_key_statement(epoch); + session + .execute_simple(&statement) + .await + .context("create foreign key constraint between transactions and blocks")?; + + info!("Start new epoch in postgres schema {}", schema_name); + Ok(true) + } + + async fn get_session(&self) -> PostgresSession { + self.session_cache + .get_session() + .await + .expect("should get new postgres session") + } + + pub async fn is_block_in_range(&self, slot: Slot) -> bool { + let epoch = self.epoch_schedule.get_epoch_at_slot(slot); + let ranges = self.get_slot_range_by_epoch().await; + let matching_range: Option<&RangeInclusive> = ranges.get(&epoch.into()); + + matching_range + .map(|slot_range| slot_range.contains(&slot)) + .is_some() + } + + pub async fn query(&self, slot: Slot) -> Result { + let started = Instant::now(); + let epoch: EpochRef = self.epoch_schedule.get_epoch_at_slot(slot).into(); + + let query = PostgresBlock::build_query_statement(epoch, slot); + let block_row = self + .get_session() + .await + .query_opt(&query, &[]) + .await + .unwrap(); + + if block_row.is_none() { + bail!("Block {} in epoch {} not found in postgres", slot, epoch); + } + + let row = block_row.unwrap(); + // meta data + let _epoch: i64 = row.get("_epoch"); + let epoch_schema: String = row.get("_epoch_schema"); + + let blockhash: String = row.get("blockhash"); + let block_height: i64 = row.get("block_height"); + let slot: i64 = row.get("slot"); + let parent_slot: i64 = row.get("parent_slot"); + let block_time: i64 = row.get("block_time"); + let previous_blockhash: String = row.get("previous_blockhash"); + let rewards: Option = row.get("rewards"); + let leader_id: Option = row.get("leader_id"); + + let postgres_block = PostgresBlock { + slot, + blockhash, + block_height, + parent_slot, + block_time, + previous_blockhash, + rewards, + leader_id, + }; + + let produced_block = postgres_block.into_produced_block( + // TODO what to do + vec![], + CommitmentConfig::confirmed(), + ); + + debug!( + "Querying produced block {} from postgres in epoch schema {} took {:.2}ms: {}/{}", + produced_block.slot, + epoch_schema, + started.elapsed().as_secs_f64() * 1000.0, + produced_block.blockhash, + produced_block.commitment_config.commitment + ); + + Ok(produced_block) + } + + // optimistically try to progress commitment level for a block that is already stored + pub async fn progress_block_commitment_level(&self, block: &ProducedBlock) -> Result<()> { + // ATM we only support updating confirmed block to finalized + if block.commitment_config.commitment == CommitmentLevel::Finalized { + debug!( + "Checking block {} if we can progress it to finalized ...", + block.slot + ); + + // TODO model commitment levels in new table + } Ok(()) } -} -#[async_trait] -impl BlockStorageInterface for PostgresBlockStore { - async fn save(&self, block: ProducedBlock) -> Result<()> { - let PostgresData { current_epoch, .. } = { *self.postgres_data.read().await }; + pub async fn write_block(&self, block: &ProducedBlock) -> Result<()> { + self.progress_block_commitment_level(block).await?; + // let PostgresData { current_epoch, .. } = { *self.postgres_data.read().await }; + + trace!("Saving block {} to postgres storage...", block.slot); let slot = block.slot; let transactions = block .transactions .iter() .map(|x| PostgresTransaction::new(x, slot)) .collect_vec(); - let postgres_block = PostgresBlock::from(&block); + let postgres_block = PostgresBlock::from(block); - let epoch = self.epoch_cache.get_epoch_at_slot(slot); - let schema = format!("EPOCH_{}", epoch.epoch); - if current_epoch == 0 || current_epoch < epoch.epoch { - self.postgres_data.write().await.current_epoch = epoch.epoch; - self.start_new_epoch(&schema).await?; + let epoch = self.epoch_schedule.get_epoch_at_slot(slot); + + let write_session_single = self.write_sessions[0].get_write_session().await; + + let started_block = Instant::now(); + let inserted = postgres_block + .save(&write_session_single, epoch.into()) + .await?; + + if !inserted { + debug!("Block {} already exists - skip update", slot); + return Ok(()); + } + let elapsed_block_insert = started_block.elapsed(); + + let started_txs = Instant::now(); + + let mut queries_fut = Vec::new(); + let chunk_size = + div_ceil(transactions.len(), self.write_sessions.len()).max(MIN_WRITE_CHUNK_SIZE); + let chunks = transactions.chunks(chunk_size).collect_vec(); + assert!( + chunks.len() <= self.write_sessions.len(), + "cannot have more chunks than session" + ); + for (i, chunk) in chunks.iter().enumerate() { + let session = self.write_sessions[i].get_write_session().await.clone(); + let future = PostgresTransaction::save_transaction_copyin(session, epoch.into(), chunk); + queries_fut.push(future); + } + let all_results: Vec> = futures_util::future::join_all(queries_fut).await; + for result in all_results { + result.unwrap(); } - const NUMBER_OF_TRANSACTION: usize = 20; + let elapsed_txs_insert = started_txs.elapsed(); + + debug!( + "Saving block {} to postgres took {:.2}ms for block and {:.2}ms for {} transactions ({}x{} chunks)", + slot, + elapsed_block_insert.as_secs_f64() * 1000.0, + elapsed_txs_insert.as_secs_f64() * 1000.0, + transactions.len(), + chunks.len(), + chunk_size, + ); - // save transaction - let chunks = transactions.chunks(NUMBER_OF_TRANSACTION); - let session = self - .session_cache - .get_session() - .await - .expect("should get new postgres session"); - for chunk in chunks { - PostgresTransaction::save_transactions(&session, &schema, chunk).await?; - } - postgres_block.save(&session, &schema).await?; Ok(()) } - async fn get(&self, _slot: Slot, _config: RpcBlockConfig) -> Result { - //let _range = self.get_slot_range().await; - //if range.contains(&slot) {} - todo!() + // ATM we focus on blocks as this table gets INSERTS and does deduplication checks (i.e. heavy reads on index pk_block_slot) + pub async fn optimize_blocks_table(&self, slot: Slot) -> Result<()> { + let started = Instant::now(); + let epoch: EpochRef = self.epoch_schedule.get_epoch_at_slot(slot).into(); + let random_session = slot as usize % self.write_sessions.len(); + let write_session_single = self.write_sessions[random_session] + .get_write_session() + .await; + let statement = format!( + r#" + ANALYZE (SKIP_LOCKED) {schema}.blocks; + "#, + schema = PostgresEpoch::build_schema_name(epoch), + ); + + tokio::spawn(async move { + write_session_single + .execute_simple(&statement) + .await + .unwrap(); + let elapsed = started.elapsed(); + debug!( + "Postgres analyze of blocks table took {:.2}ms", + elapsed.as_secs_f64() * 1000.0 + ); + if elapsed > Duration::from_millis(500) { + warn!( + "Very slow postgres ANALYZE on slot {} - took {:.2}ms", + slot, + elapsed.as_secs_f64() * 1000.0 + ); + } + }); + Ok(()) } - async fn get_slot_range(&self) -> std::ops::Range { - let lk = self.postgres_data.read().await; - lk.from_slot..lk.to_slot + 1 + // create current + next epoch + // true if anything was created; false if a NOOP + pub async fn prepare_epoch_schema(&self, slot: Slot) -> anyhow::Result { + let epoch = self.epoch_schedule.get_epoch_at_slot(slot); + let current_epoch = epoch.into(); + let created_current = self.start_new_epoch_if_necessary(current_epoch).await?; + let next_epoch = current_epoch.get_next_epoch(); + let created_next = self.start_new_epoch_if_necessary(next_epoch).await?; + Ok(created_current || created_next) + } +} + +fn build_assign_permissions_statements(epoch: EpochRef) -> String { + let role = LITERPC_ROLE; + let schema = PostgresEpoch::build_schema_name(epoch); + + format!( + r#" + GRANT USAGE ON SCHEMA {schema} TO {role}; + GRANT ALL ON ALL TABLES IN SCHEMA {schema} TO {role}; + ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT ALL ON TABLES TO {role}; + "# + ) +} + +fn div_ceil(a: usize, b: usize) -> usize { + (a + b - 1) / b +} + +impl PostgresBlockStore { + pub async fn get_slot_range(&self) -> RangeInclusive { + let map_epoch_to_slot_range = self.get_slot_range_by_epoch().await; + + let rows_minmax: Vec<&RangeInclusive> = + map_epoch_to_slot_range.values().collect_vec(); + + let slot_min = rows_minmax + .iter() + .map(|range| range.start()) + .min() + .expect("non-empty result"); + let slot_max = rows_minmax + .iter() + .map(|range| range.end()) + .max() + .expect("non-empty result"); + + RangeInclusive::new(*slot_min, *slot_max) + } + + pub async fn get_slot_range_by_epoch(&self) -> HashMap> { + let started = Instant::now(); + let session = self.get_session().await; + // e.g. "rpc2a_epoch_552" + let query = format!( + r#" + SELECT + schema_name + FROM information_schema.schemata + WHERE schema_name ~ '^{schema_prefix}[0-9]+$' + "#, + schema_prefix = EPOCH_SCHEMA_PREFIX + ); + let result = session.query_list(&query, &[]).await.unwrap(); + + let epoch_schemas = result + .iter() + .map(|row| row.get::<&str, &str>("schema_name")) + .map(|schema_name| { + ( + schema_name, + PostgresEpoch::parse_epoch_from_schema_name(schema_name), + ) + }) + .collect_vec(); + + if epoch_schemas.is_empty() { + return HashMap::new(); + } + + let inner = epoch_schemas + .iter() + .map(|(schema, epoch)| { + format!( + "SELECT slot,{epoch}::bigint as epoch FROM {schema}.blocks", + schema = schema, + epoch = epoch + ) + }) + .join(" UNION ALL "); + + let query = format!( + r#" + SELECT epoch, min(slot) as slot_min, max(slot) as slot_max FROM ( + {inner} + ) AS all_slots + GROUP BY epoch + "#, + inner = inner + ); + + let rows_minmax = session.query_list(&query, &[]).await.unwrap(); + + if rows_minmax.is_empty() { + return HashMap::new(); + } + + let mut map_epoch_to_slot_range = rows_minmax + .iter() + .map(|row| { + ( + row.get::<&str, i64>("epoch"), + RangeInclusive::new( + row.get::<&str, i64>("slot_min") as Slot, + row.get::<&str, i64>("slot_max") as Slot, + ), + ) + }) + .into_grouping_map() + .fold(None, |acc, _key, val| { + assert!(acc.is_none(), "epoch must be unique"); + Some(val) + }); + + let final_range: HashMap> = map_epoch_to_slot_range + .iter_mut() + .map(|(epoch, range)| { + let epoch = EpochRef::new(*epoch as u64); + ( + epoch, + range.clone().expect("range must be returned from SQL"), + ) + }) + .collect(); + + debug!( + "Slot range check in postgres found {} ranges, took {:2}sec: {:?}", + rows_minmax.len(), + started.elapsed().as_secs_f64(), + final_range + ); + + final_range + } +} + +#[cfg(test)] +mod tests { + use super::*; + use solana_lite_rpc_core::structures::produced_block::TransactionInfo; + use solana_sdk::commitment_config::CommitmentConfig; + use solana_sdk::signature::Signature; + use std::str::FromStr; + + #[tokio::test] + #[ignore] + async fn postgres_write_session() { + let write_session = PostgresWriteSession::new_from_env().await.unwrap(); + + let row_role = write_session + .get_write_session() + .await + .query_one("SELECT current_role", &[]) + .await + .unwrap(); + info!("row: {:?}", row_role); + } + + #[tokio::test] + #[ignore] + async fn test_save_block() { + tracing_subscriber::fmt::init(); + + let pg_session_config = PostgresSessionConfig { + pg_config: "host=localhost dbname=literpc3 user=literpc_app password=litelitesecret" + .to_string(), + ssl: None, + }; + + let _postgres_session_cache = PostgresSessionCache::new(pg_session_config.clone()) + .await + .unwrap(); + let epoch_cache = EpochCache::new_for_tests(); + + let postgres_block_store = + PostgresBlockStore::new(epoch_cache.clone(), pg_session_config.clone()).await; + + postgres_block_store + .write_block(&create_test_block()) + .await + .unwrap(); + } + + fn create_test_block() -> ProducedBlock { + let sig1 = Signature::from_str("5VBroA4MxsbZdZmaSEb618WRRwhWYW9weKhh3md1asGRx7nXDVFLua9c98voeiWdBE7A9isEoLL7buKyaVRSK1pV").unwrap(); + let sig2 = Signature::from_str("3d9x3rkVQEoza37MLJqXyadeTbEJGUB6unywK4pjeRLJc16wPsgw3dxPryRWw3UaLcRyuxEp1AXKGECvroYxAEf2").unwrap(); + + ProducedBlock { + block_height: 42, + blockhash: "blockhash".to_string(), + previous_blockhash: "previous_blockhash".to_string(), + parent_slot: 666, + slot: 223555999, + transactions: vec![create_test_tx(sig1), create_test_tx(sig2)], + // TODO double if this is unix millis or seconds + block_time: 1699260872000, + commitment_config: CommitmentConfig::finalized(), + leader_id: None, + rewards: None, + } + } + + fn create_test_tx(signature: Signature) -> TransactionInfo { + TransactionInfo { + signature: signature.to_string(), + err: None, + cu_requested: Some(40000), + prioritization_fees: Some(5000), + cu_consumed: Some(32000), + recent_blockhash: "recent_blockhash".to_string(), + message: "some message".to_string(), + } } } diff --git a/history/src/history.rs b/history/src/history.rs index cf806806..d89980fc 100644 --- a/history/src/history.rs +++ b/history/src/history.rs @@ -1,6 +1,15 @@ -use solana_lite_rpc_core::traits::block_storage_interface::BlockStorageInterface; -use std::sync::Arc; - pub struct History { - pub block_storage: Arc, + // Placeholder +} + +impl History { + pub fn new() -> Self { + History {} + } +} + +impl Default for History { + fn default() -> Self { + Self::new() + } } diff --git a/history/src/postgres/mod.rs b/history/src/postgres/mod.rs index dc816d60..4c124712 100644 --- a/history/src/postgres/mod.rs +++ b/history/src/postgres/mod.rs @@ -1,4 +1,6 @@ -pub mod postgres_block; pub mod postgres_config; pub mod postgres_session; + +pub mod postgres_block; +pub mod postgres_epoch; pub mod postgres_transaction; diff --git a/history/src/postgres/postgres_block.rs b/history/src/postgres/postgres_block.rs index 74bb3c47..1e0cb5d0 100644 --- a/history/src/postgres/postgres_block.rs +++ b/history/src/postgres/postgres_block.rs @@ -1,4 +1,12 @@ +use crate::postgres::postgres_epoch::PostgresEpoch; +use log::{debug, warn}; +use solana_lite_rpc_core::structures::epoch::EpochRef; +use solana_lite_rpc_core::structures::produced_block::TransactionInfo; use solana_lite_rpc_core::{encoding::BASE64, structures::produced_block::ProducedBlock}; +use solana_sdk::clock::Slot; +use solana_sdk::commitment_config::CommitmentConfig; +use solana_transaction_status::Reward; +use std::time::Instant; use tokio_postgres::types::ToSql; use super::postgres_session::PostgresSession; @@ -12,16 +20,15 @@ pub struct PostgresBlock { pub block_time: i64, pub previous_blockhash: String, pub rewards: Option, + pub leader_id: Option, } -const NB_ARUMENTS: usize = 7; - impl From<&ProducedBlock> for PostgresBlock { fn from(value: &ProducedBlock) -> Self { let rewards = value .rewards .as_ref() - .map(|x| BASE64.serialize(x).ok()) + .map(|x| BASE64.serialize::>(x).ok()) .unwrap_or(None); Self { @@ -31,43 +38,107 @@ impl From<&ProducedBlock> for PostgresBlock { parent_slot: value.parent_slot as i64, block_time: value.block_time as i64, previous_blockhash: value.previous_blockhash.clone(), + // TODO add leader_id, etc. rewards, + leader_id: value.leader_id.clone(), } } } impl PostgresBlock { - pub fn create_statement(schema: &String) -> String { + pub fn into_produced_block( + &self, + transaction_infos: Vec, + commitment_config: CommitmentConfig, + ) -> ProducedBlock { + let rewards_vec: Option> = self + .rewards + .as_ref() + .map(|x| BASE64.deserialize::>(x).ok()) + .unwrap_or(None); + + ProducedBlock { + // TODO implement + transactions: transaction_infos, + leader_id: None, + blockhash: self.blockhash.clone(), + block_height: self.block_height as u64, + slot: self.slot as Slot, + parent_slot: self.parent_slot as Slot, + block_time: self.block_time as u64, + commitment_config, + previous_blockhash: self.previous_blockhash.clone(), + rewards: rewards_vec, + } + } +} + +impl PostgresBlock { + pub fn build_create_table_statement(epoch: EpochRef) -> String { + let schema = PostgresEpoch::build_schema_name(epoch); format!( - " - CREATE TABLE {}.BLOCKS ( - slot BIGINT PRIMARY KEY, - blockhash STRING NOT NULL, - leader_id STRING, + r#" + CREATE TABLE IF NOT EXISTS {schema}.blocks ( + slot BIGINT NOT NULL, + blockhash TEXT NOT NULL, + leader_id TEXT, block_height BIGINT NOT NULL, parent_slot BIGINT NOT NULL, block_time BIGINT NOT NULL, - previous_blockhash STRING NOT NULL, - rewards STRING, - ); - ", - schema + previous_blockhash TEXT NOT NULL, + rewards TEXT, + CONSTRAINT pk_block_slot PRIMARY KEY(slot) + ) WITH (FILLFACTOR=90); + CLUSTER {schema}.blocks USING pk_block_slot; + "#, + schema = schema ) } + pub fn build_query_statement(epoch: EpochRef, slot: Slot) -> String { + format!( + r#" + SELECT + slot, blockhash, block_height, parent_slot, block_time, previous_blockhash, rewards, leader_id, + {epoch}::bigint as _epoch, '{schema}'::text as _epoch_schema FROM {schema}.blocks + WHERE slot = {slot} + "#, + schema = PostgresEpoch::build_schema_name(epoch), + epoch = epoch, + slot = slot + ) + } + + // true is actually inserted; false if operation was noop pub async fn save( &self, postgres_session: &PostgresSession, - schema: &String, - ) -> anyhow::Result<()> { - let mut query = format!( + epoch: EpochRef, + ) -> anyhow::Result { + const NB_ARGUMENTS: usize = 8; + + let started = Instant::now(); + let schema = PostgresEpoch::build_schema_name(epoch); + let values = PostgresSession::values_vec(NB_ARGUMENTS, &[]); + + let statement = format!( r#" - INSERT INTO {}.BLOCKS (slot, blockhash, block_height, parent_slot, block_time, previous_blockhash, rewards) VALUES - "#, - schema + INSERT INTO {schema}.blocks (slot, blockhash, block_height, parent_slot, block_time, previous_blockhash, rewards, leader_id) + VALUES {} + -- prevent updates + ON CONFLICT DO NOTHING + RETURNING ( + -- get previous max slot + SELECT max(all_blocks.slot) as prev_max_slot + FROM {schema}.blocks AS all_blocks + WHERE all_blocks.slot!={schema}.blocks.slot + ) + "#, + values, + schema = schema, ); - let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NB_ARUMENTS); + let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NB_ARGUMENTS); args.push(&self.slot); args.push(&self.blockhash); args.push(&self.block_height); @@ -75,9 +146,88 @@ impl PostgresBlock { args.push(&self.block_time); args.push(&self.previous_blockhash); args.push(&self.rewards); + args.push(&self.leader_id); - PostgresSession::multiline_query(&mut query, NB_ARUMENTS, 1, &[]); - postgres_session.execute(&query, &args).await?; - Ok(()) + let returning = postgres_session + .execute_and_return(&statement, &args) + .await?; + + // TODO: decide what to do if block already exists + match returning { + Some(row) => { + // check if monotonic + let prev_max_slot = row.get::<&str, Option>("prev_max_slot"); + // None -> no previous rows + debug!( + "Inserted block {} with prev highest slot being {}, parent={}", + self.slot, + prev_max_slot.unwrap_or(-1), + self.parent_slot + ); + if let Some(prev_max_slot) = prev_max_slot { + if prev_max_slot > self.slot { + // note: unclear if this is desired behavior! + warn!( + "Block {} was inserted behind tip of highest slot number {} (epoch {})", + self.slot, prev_max_slot, epoch + ); + } + } + } + None => { + // database detected conflict + warn!("Block {} already exists - not updated", self.slot); + return Ok(false); + } + } + + debug!( + "Inserting block {} to schema {} postgres took {:.2}ms", + self.slot, + schema, + started.elapsed().as_secs_f64() * 1000.0 + ); + + Ok(true) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use solana_sdk::commitment_config::CommitmentConfig; + + #[test] + fn map_postgresblock_to_produced_block() { + let block = PostgresBlock { + slot: 5050505, + blockhash: "blockhash".to_string(), + block_height: 4040404, + parent_slot: 5050500, + block_time: 12121212, + previous_blockhash: "previous_blockhash".to_string(), + rewards: None, + leader_id: None, + }; + + let transaction_infos = vec![create_tx_info(), create_tx_info()]; + + let produced_block = + block.into_produced_block(transaction_infos, CommitmentConfig::confirmed()); + + assert_eq!(produced_block.slot, 5050505); + assert_eq!(produced_block.transactions.len(), 2); + } + + fn create_tx_info() -> TransactionInfo { + TransactionInfo { + signature: "signature".to_string(), + err: None, + cu_requested: None, + prioritization_fees: None, + cu_consumed: None, + recent_blockhash: "recent_blockhash".to_string(), + message: "message".to_string(), + } } } diff --git a/history/src/postgres/postgres_config.rs b/history/src/postgres/postgres_config.rs index cc5c6bdb..58d4d7db 100644 --- a/history/src/postgres/postgres_config.rs +++ b/history/src/postgres/postgres_config.rs @@ -50,3 +50,25 @@ impl PostgresSessionConfig { })) } } + +impl PostgresSessionConfig { + pub fn new_for_tests() -> PostgresSessionConfig { + assert!( + env::var("PG_CONFIG").is_err(), + "note that ENV variables are ignored!" + ); + + // see localdev_integrationtest.sql how to setup the database + PostgresSessionConfig { + pg_config: r#" + host=localhost + dbname=literpc_integrationtest_localdev + user=literpc_integrationtest + password=youknowme + sslmode=disable + "# + .to_string(), + ssl: None, + } + } +} diff --git a/history/src/postgres/postgres_epoch.rs b/history/src/postgres/postgres_epoch.rs new file mode 100644 index 00000000..8261f767 --- /dev/null +++ b/history/src/postgres/postgres_epoch.rs @@ -0,0 +1,47 @@ +use solana_lite_rpc_core::structures::epoch::EpochRef; + +pub struct PostgresEpoch {} + +pub const EPOCH_SCHEMA_PREFIX: &str = "rpc2a_epoch_"; + +impl PostgresEpoch { + // e.g. rpc2a_epoch_644 - rpc2a = RPCv2 alpha + pub fn build_schema_name(epoch: EpochRef) -> String { + format!("{}{}", EPOCH_SCHEMA_PREFIX, epoch.get_epoch()) + } + + pub fn build_create_schema_statement(epoch: EpochRef) -> String { + let schema = PostgresEpoch::build_schema_name(epoch); + format!( + " + CREATE SCHEMA {}; + ", + schema + ) + } + + pub fn parse_epoch_from_schema_name(schema_name: &str) -> EpochRef { + let epoch_number_str = schema_name.trim_start_matches(EPOCH_SCHEMA_PREFIX); + let epoch = epoch_number_str.parse::().unwrap(); + EpochRef::new(epoch) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_build_schema_name() { + let epoch = EpochRef::new(644); + let schema = PostgresEpoch::build_schema_name(epoch); + assert_eq!("rpc2a_epoch_644", schema); + } + + #[test] + fn test_parse_epoch_from_schema_name() { + let schema = "rpc2a_epoch_644"; + let epoch = PostgresEpoch::parse_epoch_from_schema_name(schema); + assert_eq!(644, epoch.get_epoch()); + } +} diff --git a/history/src/postgres/postgres_session.rs b/history/src/postgres/postgres_session.rs index 65dbc26a..28d82983 100644 --- a/history/src/postgres/postgres_session.rs +++ b/history/src/postgres/postgres_session.rs @@ -5,7 +5,10 @@ use native_tls::{Certificate, Identity, TlsConnector}; use postgres_native_tls::MakeTlsConnector; use solana_lite_rpc_core::encoding::BinaryEncoding; use tokio::sync::RwLock; -use tokio_postgres::{config::SslMode, tls::MakeTlsConnect, types::ToSql, Client, NoTls, Socket}; +use tokio_postgres::{ + config::SslMode, tls::MakeTlsConnect, types::ToSql, Client, CopyInSink, Error, NoTls, Row, + Socket, +}; use super::postgres_config::{PostgresSessionConfig, PostgresSessionSslConfig}; @@ -38,8 +41,15 @@ pub struct PostgresSession { } impl PostgresSession { + pub async fn new_from_env() -> anyhow::Result { + let pg_session_config = PostgresSessionConfig::new_from_env() + .expect("failed to start Postgres Client") + .expect("Postgres not enabled (use PG_ENABLED)"); + PostgresSession::new(pg_session_config).await + } + pub async fn new( - PostgresSessionConfig { pg_config, ssl }: &PostgresSessionConfig, + PostgresSessionConfig { pg_config, ssl }: PostgresSessionConfig, ) -> anyhow::Result { let pg_config = pg_config.parse::()?; @@ -94,7 +104,7 @@ impl PostgresSession { log::error!("Connection to Postgres broke {err:?}"); return; } - unreachable!("Postgres thread returned") + log::debug!("Postgres thread shutting down"); }); Ok(client) @@ -125,13 +135,98 @@ impl PostgresSession { } } + pub fn values_vecvec(args: usize, rows: usize, types: &[&str]) -> String { + let mut query = String::new(); + + Self::multiline_query(&mut query, args, rows, types); + + query + } + + // workaround: produces ((a,b,c)) while (a,b,c) would suffice + pub fn values_vec(args: usize, types: &[&str]) -> String { + let mut query = String::new(); + + Self::multiline_query(&mut query, args, 1, types); + + query + } + pub async fn execute( &self, - statement: &String, + statement: &str, params: &[&(dyn ToSql + Sync)], ) -> Result { self.client.execute(statement, params).await } + + pub async fn execute_simple(&self, statement: &str) -> Result<(), Error> { + self.client.batch_execute(statement).await + } + + pub async fn execute_prepared_batch( + &self, + statement: &str, + params: &Vec>, + ) -> Result { + let prepared_stmt = self.client.prepare(statement).await?; + let mut total_inserted = 0; + for row in params { + let result = self.client.execute(&prepared_stmt, row).await; + total_inserted += result?; + } + Ok(total_inserted) + } + + // TODO provide an optimized version using "COPY IN" instead of "INSERT INTO" (https://trello.com/c/69MlQU6u) + // pub async fn execute_copyin(...) + + pub async fn execute_prepared( + &self, + statement: &str, + params: &[&(dyn ToSql + Sync)], + ) -> Result { + let prepared_stmt = self.client.prepare(statement).await?; + self.client.execute(&prepared_stmt, params).await + } + + pub async fn execute_and_return( + &self, + statement: &str, + params: &[&(dyn ToSql + Sync)], + ) -> Result, Error> { + self.client.query_opt(statement, params).await + } + + pub async fn query_opt( + &self, + statement: &str, + params: &[&(dyn ToSql + Sync)], + ) -> Result, Error> { + self.client.query_opt(statement, params).await + } + + pub async fn query_one( + &self, + statement: &str, + params: &[&(dyn ToSql + Sync)], + ) -> Result { + self.client.query_one(statement, params).await + } + + pub async fn query_list( + &self, + statement: &str, + params: &[&(dyn ToSql + Sync)], + ) -> Result, Error> { + self.client.query(statement, params).await + } + + pub async fn copy_in(&self, statement: &str) -> Result, Error> { + // BinaryCopyInWriter + // https://github.com/sfackler/rust-postgres/blob/master/tokio-postgres/tests/test/binary_copy.rs + self.client.copy_in(statement).await + } } #[derive(Clone)] @@ -142,7 +237,7 @@ pub struct PostgresSessionCache { impl PostgresSessionCache { pub async fn new(config: PostgresSessionConfig) -> anyhow::Result { - let session = PostgresSession::new(&config).await?; + let session = PostgresSession::new(config.clone()).await?; Ok(Self { session: Arc::new(RwLock::new(session)), config, @@ -153,7 +248,7 @@ impl PostgresSessionCache { let session = self.session.read().await; if session.client.is_closed() { drop(session); - let session = PostgresSession::new(&self.config).await?; + let session = PostgresSession::new(self.config.clone()).await?; *self.session.write().await = session.clone(); Ok(session) } else { @@ -162,6 +257,53 @@ impl PostgresSessionCache { } } +#[derive(Clone)] +pub struct PostgresWriteSession { + session: Arc>, + pub pg_session_config: PostgresSessionConfig, +} + +impl PostgresWriteSession { + pub async fn new_from_env() -> anyhow::Result { + let pg_session_config = PostgresSessionConfig::new_from_env() + .expect("failed to start Postgres Client") + .expect("Postgres not enabled (use PG_ENABLED)"); + Self::new(pg_session_config).await + } + + pub async fn new(pg_session_config: PostgresSessionConfig) -> anyhow::Result { + let session = PostgresSession::new(pg_session_config.clone()).await?; + + let statement = r#" + SET SESSION application_name='postgres-blockstore-write-session'; + -- default: 64MB + SET SESSION maintenance_work_mem = '256MB'; + "#; + + session.execute_simple(statement).await.unwrap(); + + Ok(Self { + session: Arc::new(RwLock::new(session)), + pg_session_config, + }) + } + + pub async fn get_write_session(&self) -> PostgresSession { + let session = self.session.read().await; + + if session.client.is_closed() || session.client.execute(";", &[]).await.is_err() { + let session = PostgresSession::new(self.pg_session_config.clone()) + .await + .expect("should have created new postgres session"); + let mut lock = self.session.write().await; + *lock = session.clone(); + session + } else { + session.clone() + } + } +} + #[test] fn multiline_query_test() { let mut query = String::new(); @@ -170,6 +312,12 @@ fn multiline_query_test() { assert_eq!(query, "($1,$2,$3),($4,$5,$6)"); } +#[test] +fn value_query_test() { + let values = PostgresSession::values_vecvec(3, 2, &[]); + assert_eq!(values, "($1,$2,$3),($4,$5,$6)"); +} + #[test] fn multiline_query_test_types() { let mut query = String::new(); @@ -177,3 +325,15 @@ fn multiline_query_test_types() { PostgresSession::multiline_query(&mut query, 3, 2, &["text", "int", "int"]); assert_eq!(query, "(($1)::text,($2)::int,($3)::int),($4,$5,$6)"); } + +#[test] +fn value_vecvec_test_types() { + let values = PostgresSession::values_vecvec(3, 2, &["text", "int", "int"]); + assert_eq!(values, "(($1)::text,($2)::int,($3)::int),($4,$5,$6)"); +} + +#[test] +fn value_vec_test_types() { + let values = PostgresSession::values_vec(3, &["text", "int", "int"]); + assert_eq!(values, "(($1)::text,($2)::int,($3)::int)"); +} diff --git a/history/src/postgres/postgres_transaction.rs b/history/src/postgres/postgres_transaction.rs index b559c4a5..960f89be 100644 --- a/history/src/postgres/postgres_transaction.rs +++ b/history/src/postgres/postgres_transaction.rs @@ -1,6 +1,13 @@ +use crate::postgres::postgres_epoch::PostgresEpoch; +use bytes::Bytes; +use futures_util::pin_mut; +use log::{trace, warn}; +use solana_lite_rpc_core::structures::epoch::EpochRef; use solana_lite_rpc_core::{encoding::BASE64, structures::produced_block::TransactionInfo}; use solana_sdk::slot_history::Slot; -use tokio_postgres::types::ToSql; +use tokio_postgres::binary_copy::BinaryCopyInWriter; +use tokio_postgres::types::{ToSql, Type}; +use tokio_postgres::CopyInSink; use super::postgres_session::PostgresSession; @@ -9,15 +16,13 @@ pub struct PostgresTransaction { pub signature: String, pub slot: i64, pub err: Option, - pub cu_requested: Option, + pub cu_requested: Option, pub prioritization_fees: Option, pub cu_consumed: Option, pub recent_blockhash: String, pub message: String, } -const NB_ARUMENTS: usize = 8; - impl PostgresTransaction { pub fn new(value: &TransactionInfo, slot: Slot) -> Self { Self { @@ -27,7 +32,7 @@ impl PostgresTransaction { .clone() .map(|x| BASE64.serialize(&x).ok()) .unwrap_or(None), - cu_requested: value.cu_requested.map(|x| x as i32), + cu_requested: value.cu_requested.map(|x| x as i64), prioritization_fees: value.prioritization_fees.map(|x| x as i64), cu_consumed: value.cu_consumed.map(|x| x as i64), recent_blockhash: value.recent_blockhash.clone(), @@ -36,33 +41,50 @@ impl PostgresTransaction { } } - pub fn create_statement(schema: &String) -> String { + pub fn build_create_table_statement(epoch: EpochRef) -> String { + let schema = PostgresEpoch::build_schema_name(epoch); format!( - "\ - CREATE TABLE {}.TRANSACTIONS ( - signature CHAR(88) NOT NULL, - slot BIGINT, - err STRING, - cu_requested BIGINT, - prioritization_fees BIGINT, - cu_consumed BIGINT, - recent_blockhash STRING NOT NULL, - message STRING NOT NULL, - PRIMARY KEY (signature) - CONSTRAINT fk_transactions FOREIGN KEY (slot) REFERENCES {}.BLOCKS(slot); - ); - ", - schema, schema + r#" + CREATE TABLE IF NOT EXISTS {schema}.transactions ( + signature VARCHAR(88) NOT NULL, + slot BIGINT NOT NULL, + err TEXT, + cu_requested BIGINT, + prioritization_fees BIGINT, + cu_consumed BIGINT, + recent_blockhash TEXT NOT NULL, + message TEXT NOT NULL, + CONSTRAINT pk_transaction_sig PRIMARY KEY(signature) + ) WITH (FILLFACTOR=90); + CREATE INDEX idx_slot ON {schema}.transactions USING btree (slot) WITH (FILLFACTOR=90); + CLUSTER {schema}.transactions USING idx_slot; + "#, + schema = schema ) } - pub async fn save_transactions( - postgres_session: &PostgresSession, - schema: &String, + // removed the foreign key as it slows down inserts + pub fn build_foreign_key_statement(epoch: EpochRef) -> String { + let schema = PostgresEpoch::build_schema_name(epoch); + format!( + r#" + ALTER TABLE {schema}.transactions + ADD CONSTRAINT fk_transactions FOREIGN KEY (slot) REFERENCES {schema}.blocks (slot); + "#, + schema = schema + ) + } + + // this version uses INSERT statements + pub async fn save_transaction_insert( + postgres_session: PostgresSession, + epoch: EpochRef, + slot: Slot, transactions: &[Self], ) -> anyhow::Result<()> { - let mut args: Vec<&(dyn ToSql + Sync)> = - Vec::with_capacity(NB_ARUMENTS * transactions.len()); + const NB_ARGUMENTS: usize = 8; + let tx_count = transactions.len(); + let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NB_ARGUMENTS * tx_count); for tx in transactions.iter() { let PostgresTransaction { @@ -86,26 +108,118 @@ impl PostgresTransaction { args.push(message); } - let mut query = format!( + let values = PostgresSession::values_vecvec(NB_ARGUMENTS, tx_count, &[]); + let schema = PostgresEpoch::build_schema_name(epoch); + let statement = format!( r#" - INSERT INTO {}.TRANSACTIONS + INSERT INTO {schema}.transactions (signature, slot, err, cu_requested, prioritization_fees, cu_consumed, recent_blockhash, message) - VALUES + VALUES {} + ON CONFLICT DO NOTHING "#, - schema + values, + schema = schema, + ); + + let inserted = postgres_session.execute_prepared(&statement, &args).await? as usize; + + if inserted < tx_count { + warn!("Some ({}) transactions already existed and where not updated of {} total in schema {schema}", + transactions.len() - inserted, transactions.len(), schema = schema); + } + + trace!( + "Inserted {} transactions chunk into epoch schema {} for block {}", + inserted, + schema, + slot ); - PostgresSession::multiline_query(&mut query, NB_ARUMENTS, transactions.len(), &[]); - postgres_session.execute(&query, &args).await?; Ok(()) } + // this version uses "COPY IN" + pub async fn save_transaction_copyin( + postgres_session: PostgresSession, + epoch: EpochRef, + transactions: &[Self], + ) -> anyhow::Result { + let schema = PostgresEpoch::build_schema_name(epoch); + let statement = format!( + r#" + COPY {schema}.transactions( + signature, slot, err, cu_requested, prioritization_fees, cu_consumed, recent_blockhash, message + ) FROM STDIN BINARY + "#, + schema = schema, + ); + + // BinaryCopyInWriter + // https://github.com/sfackler/rust-postgres/blob/master/tokio-postgres/tests/test/binary_copy.rs + let sink: CopyInSink = postgres_session.copy_in(&statement).await.unwrap(); + + let writer = BinaryCopyInWriter::new( + sink, + &[ + Type::TEXT, + Type::INT8, + Type::TEXT, + Type::INT8, + Type::INT8, + Type::INT8, + Type::TEXT, + Type::TEXT, + ], + ); + pin_mut!(writer); + + for tx in transactions { + let PostgresTransaction { + signature, + slot, + err, + cu_requested, + prioritization_fees, + cu_consumed, + recent_blockhash, + message, + } = tx; + + writer + .as_mut() + .write(&[ + &signature, + &slot, + &err, + &cu_requested, + &prioritization_fees, + &cu_consumed, + &recent_blockhash, + &message, + ]) + .await + .unwrap(); + } + + writer.finish().await.unwrap(); + + Ok(true) + } + pub async fn get( postgres_session: PostgresSession, schema: &String, slot: Slot, ) -> Vec { - let statement = format!("SELECT signature, err, cu_requested, prioritization_fees, cu_consumed, recent_blockhash, message FROM {}.TRANSACTIONS WHERE SLOT = {}", schema, slot); + let statement = format!( + r#" + SELECT signature, err, cu_requested, prioritization_fees, cu_consumed, recent_blockhash, message + FROM {schema}.transactions + WHERE slot = {} + "#, + slot, + schema = schema + ); let _ = postgres_session.client.query(&statement, &[]).await; todo!() } diff --git a/history/tests/blockstore_integration_tests.rs b/history/tests/blockstore_integration_tests.rs new file mode 100644 index 00000000..a99df33a --- /dev/null +++ b/history/tests/blockstore_integration_tests.rs @@ -0,0 +1,405 @@ +use log::{debug, error, info, trace, warn}; +use solana_lite_rpc_cluster_endpoints::endpoint_stremers::EndpointStreaming; +use solana_lite_rpc_cluster_endpoints::json_rpc_subscription::create_json_rpc_polling_subscription; +use solana_lite_rpc_core::structures::epoch::EpochCache; +use solana_lite_rpc_core::structures::produced_block::ProducedBlock; +use solana_lite_rpc_core::structures::slot_notification::SlotNotification; +use solana_lite_rpc_core::types::{BlockStream, SlotStream}; +use solana_lite_rpc_history::block_stores::postgres_block_store::PostgresBlockStore; +use solana_lite_rpc_history::postgres::postgres_config::PostgresSessionConfig; +use solana_rpc_client::nonblocking::rpc_client::RpcClient; +use solana_sdk::clock::Slot; +use solana_sdk::commitment_config::CommitmentConfig; +use std::collections::{HashMap, HashSet}; +use std::process; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::broadcast::error::RecvError; +use tokio::task::JoinHandle; +use tokio::time::sleep; +use tokio_util::sync::CancellationToken; +use tracing_subscriber::EnvFilter; + +// force ordered stream of blocks +const NUM_PARALLEL_TASKS: usize = 1; + +const CHANNEL_SIZE_WARNING_THRESHOLD: usize = 5; +#[ignore = "need to enable postgres"] +#[tokio::test] +async fn storage_test() { + // RUST_LOG=info,storage_integration_tests=debug,solana_lite_rpc_history=trace + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .init(); + configure_panic_hook(); + + let pg_session_config = PostgresSessionConfig::new_from_env().unwrap().unwrap(); + + let rpc_url = std::env::var("RPC_URL").expect("env var RPC_URL is mandatory"); + + let rpc_client = Arc::new(RpcClient::new(rpc_url)); + + let (subscriptions, _cluster_endpoint_tasks) = + create_json_rpc_polling_subscription(rpc_client.clone(), NUM_PARALLEL_TASKS).unwrap(); + + let EndpointStreaming { + blocks_notifier, + slot_notifier, + .. + } = subscriptions; + + let (epoch_cache, _) = EpochCache::bootstrap_epoch(&rpc_client).await.unwrap(); + + let block_storage = Arc::new(PostgresBlockStore::new(epoch_cache, pg_session_config).await); + + let (jh1_1, first_init) = + storage_prepare_epoch_schema(slot_notifier.resubscribe(), block_storage.clone()); + // coordinate initial epoch schema creation + first_init.cancelled().await; + + let jh1_2 = storage_listen(blocks_notifier.resubscribe(), block_storage.clone()); + let jh2 = block_debug_listen(blocks_notifier.resubscribe()); + let jh3 = block_stream_assert_commitment_order(blocks_notifier.resubscribe()); + drop(blocks_notifier); + + info!("Run tests for some time ..."); + sleep(Duration::from_secs(20)).await; + + jh1_1.abort(); + jh1_2.abort(); + jh2.abort(); + jh3.abort(); + + info!("Tests aborted forcefully by design."); +} + +// TODO this is a race condition as the .save might get called before the schema was prepared +fn storage_prepare_epoch_schema( + slot_notifier: SlotStream, + postgres_storage: Arc, +) -> (JoinHandle<()>, CancellationToken) { + let mut debounce_slot = 0; + let building_epoch_schema = CancellationToken::new(); + let first_run_signal = building_epoch_schema.clone(); + let join_handle = tokio::spawn(async move { + let mut slot_notifier = slot_notifier; + loop { + match slot_notifier.recv().await { + Ok(SlotNotification { processed_slot, .. }) => { + if processed_slot >= debounce_slot { + let created = postgres_storage + .prepare_epoch_schema(processed_slot) + .await + .unwrap(); + first_run_signal.cancel(); + debounce_slot = processed_slot + 64; // wait a bit before hammering the DB again + if created { + debug!("Async job prepared schema at slot {}", processed_slot); + } else { + debug!( + "Async job for preparing schema at slot {} was a noop", + processed_slot + ); + } + } + } + _ => { + warn!("Error receiving slot - continue"); + } + } + } + }); + (join_handle, building_epoch_schema) +} + +/// run the optimizer at least every n slots +const OPTIMIZE_EVERY_N_SLOTS: u64 = 10; +/// wait at least n slots before running the optimizer again +const OPTIMIZE_DEBOUNCE_SLOTS: u64 = 4; + +// note: the consumer lags far behind the ingress of blocks and transactions +fn storage_listen( + block_notifier: BlockStream, + block_storage: Arc, +) -> JoinHandle<()> { + tokio::spawn(async move { + let mut last_optimizer_run = 0; + let mut block_notifier = block_notifier; + // this is the critical write loop + loop { + match block_notifier.recv().await { + Ok(block) => { + let started = Instant::now(); + debug!( + "Received block: {} with {} txs", + block.slot, + block.transactions.len() + ); + + if block_notifier.len() > CHANNEL_SIZE_WARNING_THRESHOLD { + warn!( + "(soft_realtime) Block queue is growing - {} elements", + block_notifier.len() + ); + } + + // TODO we should intercept finalized blocks and try to update only the status optimistically + + // avoid backpressure here! + + block_storage.write_block(&block).await.unwrap(); + + // we should be faster than 150ms here + let elapsed = started.elapsed(); + debug!( + "Successfully stored block to postgres which took {:.2}ms - remaining {} queue elements", + elapsed.as_secs_f64() * 1000.0, block_notifier.len() + ); + if elapsed > Duration::from_millis(150) { + warn!("(soft_realtime) Write operation was slow!"); + } + + // debounce for 4 slots but run at least every 10 slots + if block.slot > last_optimizer_run + OPTIMIZE_EVERY_N_SLOTS + || block.slot > last_optimizer_run + OPTIMIZE_DEBOUNCE_SLOTS + && started.elapsed() < Duration::from_millis(200) + && block_notifier.is_empty() + { + debug!( + "Use extra time to do some optimization (slot {})", + block.slot + ); + block_storage + .optimize_blocks_table(block.slot) + .await + .unwrap(); + last_optimizer_run = block.slot; + } + } // -- Ok + Err(RecvError::Lagged(missed_blocks)) => { + warn!( + "Could not keep up with producer - missed {} blocks", + missed_blocks + ); + } + Err(other_err) => { + warn!("Error receiving block: {:?}", other_err); + } + } + + // ... + } + }) +} + +#[derive(Debug, Clone)] +struct BlockDebugDetails { + pub blockhash: String, + pub block: ProducedBlock, +} + +fn block_debug_listen(block_notifier: BlockStream) -> JoinHandle<()> { + tokio::spawn(async move { + let mut last_highest_slot_number = 0; + let mut block_notifier = block_notifier; + + loop { + match block_notifier.recv().await { + Ok(block) => { + debug!( + "Saw block: {} @ {} with {} txs", + block.slot, + block.commitment_config.commitment, + block.transactions.len() + ); + + // check monotony + // note: this succeeds if poll_block parallelism is 1 (see NUM_PARALLEL_BLOCKS) + if block.commitment_config == CommitmentConfig::confirmed() { + if block.slot > last_highest_slot_number { + last_highest_slot_number = block.slot; + } else { + // note: ATM this fails very often (using the RPC poller) + warn!( + "Monotonic check failed - block {} is out of order, last highest was {}", + block.slot, last_highest_slot_number + ); + } + } + } // -- Ok + Err(RecvError::Lagged(missed_blocks)) => { + warn!( + "Could not keep up with producer - missed {} blocks", + missed_blocks + ); + } + Err(other_err) => { + panic!("Error receiving block: {:?}", other_err); + } + } + + // ... + } + }) +} + +/// inspect stream of blocks and check that the commitment transition from confirmed to finalized is correct +fn block_stream_assert_commitment_order(block_notifier: BlockStream) -> JoinHandle<()> { + tokio::spawn(async move { + let mut block_notifier = block_notifier; + + let mut confirmed_blocks_by_slot = HashMap::::new(); + let mut finalized_blocks = HashSet::::new(); + + let mut warmup_cutoff: Slot = 0; + let mut warmup_first_confirmed: Slot = 0; + + loop { + match block_notifier.recv().await { + Ok(block) => { + if warmup_cutoff > 0 { + if block.slot < warmup_cutoff { + continue; + } + + // check semantics and log/panic + inspect_this_block( + &mut confirmed_blocks_by_slot, + &mut finalized_blocks, + &block, + ); + } else { + trace!("Warming up {} ...", block.slot); + + if warmup_first_confirmed == 0 + && block.commitment_config == CommitmentConfig::confirmed() + { + warmup_first_confirmed = block.slot; + } + + if block.commitment_config == CommitmentConfig::finalized() + && block.slot >= warmup_first_confirmed + { + warmup_cutoff = block.slot + 32; + debug!("Warming done (slot {})", warmup_cutoff); + } + } + } // -- Ok + Err(RecvError::Lagged(missed_blocks)) => { + warn!( + "Could not keep up with producer - missed {} blocks", + missed_blocks + ); + } + Err(other_err) => { + panic!("Error receiving block: {:?}", other_err); + } + } + + // ... + } + }) +} + +fn inspect_this_block( + confirmed_blocks_by_slot: &mut HashMap, + finalized_blocks: &mut HashSet, + block: &ProducedBlock, +) { + if block.commitment_config == CommitmentConfig::confirmed() { + let prev_block = confirmed_blocks_by_slot.insert( + block.slot, + BlockDebugDetails { + blockhash: block.blockhash.clone(), + block: block.clone(), + }, + ); + // Assumption I: we never see the same confirmed block twice + assert!(prev_block.is_none(), "Must not see a confirmed block twice"); + } else if block.commitment_config == CommitmentConfig::finalized() { + let finalized_block = █ + let finalized_block_existed = finalized_blocks.insert(finalized_block.slot); + // Assumption II: we never see the same finalized block twice + assert!( + finalized_block_existed, + "Finalized block {} must NOT have been seen before", + finalized_block.slot + ); + let prev_block = confirmed_blocks_by_slot.get(&block.slot); + match prev_block { + Some(prev_block) => { + info!( + "Got finalized block {} with blockhash {} - prev confirmed was {}", + finalized_block.slot, finalized_block.blockhash, prev_block.blockhash + ); + // TODO is that correct? + // Assumption III: confirmed and finalized block can be matched by slot and have the same blockhash + assert_eq!( + finalized_block.blockhash, prev_block.blockhash, + "Must see the same blockhash for confirmed and finalized block" + ); + + debug!( + "confirmed: {:?}", + to_string_without_transactions(&prev_block.block) + ); + debug!( + "finalized: {:?}", + to_string_without_transactions(finalized_block) + ); + + // Assumption IV: block details do not change between confirmed and finalized + assert_eq!( + // normalized and compare + to_string_without_transactions(&prev_block.block) + .replace("commitment_config=confirmed", "commitment_config=IGNORE"), + to_string_without_transactions(finalized_block) + .replace("commitment_config=finalized", "commitment_config=IGNORE"), + "block tostring mismatch" + ) + } + None => { + // note at startup we might see some orphan finalized blocks before we see matching pairs of confirmed-finalized blocks + warn!("Must see a confirmed block before it is finalized (slot {}) - could be a warmup issue", finalized_block.slot); + } + } + } +} + +fn to_string_without_transactions(produced_block: &ProducedBlock) -> String { + format!( + r#" + leader_id={:?} + blockhash={} + block_height={} + slot={} + parent_slot={} + block_time={} + commitment_config={} + previous_blockhash={} + num_transactions={} + "#, + produced_block.leader_id, + produced_block.blockhash, + produced_block.block_height, + produced_block.slot, + produced_block.parent_slot, + produced_block.block_time, + produced_block.commitment_config.commitment, + produced_block.previous_blockhash, + produced_block.transactions.len(), + // rewards + // transactions + ) +} + +fn configure_panic_hook() { + let default_panic = std::panic::take_hook(); + std::panic::set_hook(Box::new(move |panic_info| { + default_panic(panic_info); + // e.g. panicked at 'BANG', lite-rpc/tests/blockstore_integration_tests:260:25 + error!("{}", panic_info); + eprintln!("{}", panic_info); + process::exit(12); + })); +} diff --git a/history/tests/inmemory_block_store_tests.rs b/history/tests/inmemory_block_store_tests.rs deleted file mode 100644 index 8e2dbeb5..00000000 --- a/history/tests/inmemory_block_store_tests.rs +++ /dev/null @@ -1,63 +0,0 @@ -use solana_lite_rpc_core::{ - structures::produced_block::ProducedBlock, - traits::block_storage_interface::BlockStorageInterface, -}; -use solana_lite_rpc_history::block_stores::inmemory_block_store::InmemoryBlockStore; -use solana_rpc_client_api::config::RpcBlockConfig; -use solana_sdk::{commitment_config::CommitmentConfig, hash::Hash}; -use std::sync::Arc; - -pub fn create_test_block(slot: u64, commitment_config: CommitmentConfig) -> ProducedBlock { - ProducedBlock { - block_height: slot, - blockhash: Hash::new_unique().to_string(), - previous_blockhash: Hash::new_unique().to_string(), - parent_slot: slot - 1, - transactions: vec![], - block_time: 0, - commitment_config, - leader_id: None, - slot, - rewards: None, - } -} - -#[tokio::test] -async fn inmemory_block_store_tests() { - // will store only 10 blocks - let store: Arc = Arc::new(InmemoryBlockStore::new(10)); - - // add 10 blocks - for i in 1..11 { - store - .save(create_test_block(i, CommitmentConfig::finalized())) - .await - .unwrap(); - } - - // check if 10 blocks are added - for i in 1..11 { - assert!(store.get(i, RpcBlockConfig::default()).await.ok().is_some()); - } - // add 11th block - store - .save(create_test_block(11, CommitmentConfig::finalized())) - .await - .unwrap(); - - // can get 11th block - assert!(store - .get(11, RpcBlockConfig::default()) - .await - .ok() - .is_some()); - // first block is removed - assert!(store.get(1, RpcBlockConfig::default()).await.ok().is_none()); - - // cannot add old blocks - store - .save(create_test_block(1, CommitmentConfig::finalized())) - .await - .unwrap(); - assert!(store.get(1, RpcBlockConfig::default()).await.ok().is_none()); -} diff --git a/history/tests/mod.rs b/history/tests/mod.rs index 835c2e28..f7a71211 100644 --- a/history/tests/mod.rs +++ b/history/tests/mod.rs @@ -1,2 +1 @@ -mod inmemory_block_store_tests; mod multiple_strategy_block_store_tests; diff --git a/history/tests/multiple_strategy_block_store_tests.rs b/history/tests/multiple_strategy_block_store_tests.rs index 3e0304eb..790ca509 100644 --- a/history/tests/multiple_strategy_block_store_tests.rs +++ b/history/tests/multiple_strategy_block_store_tests.rs @@ -1,14 +1,13 @@ -use solana_lite_rpc_core::{ - structures::produced_block::ProducedBlock, - traits::block_storage_interface::BlockStorageInterface, -}; -use solana_lite_rpc_history::{ - block_stores::inmemory_block_store::InmemoryBlockStore, - block_stores::multiple_strategy_block_store::MultipleStrategyBlockStorage, -}; -use solana_rpc_client_api::config::RpcBlockConfig; +use solana_lite_rpc_core::structures::epoch::EpochCache; +use solana_lite_rpc_core::structures::produced_block::ProducedBlock; +use solana_lite_rpc_history::block_stores::multiple_strategy_block_store::BlockStorageData; +use solana_lite_rpc_history::block_stores::multiple_strategy_block_store::MultipleStrategyBlockStorage; +use solana_lite_rpc_history::block_stores::postgres_block_store::PostgresBlockStore; +use solana_lite_rpc_history::postgres::postgres_config::PostgresSessionConfig; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::reward_type::RewardType; use solana_sdk::{commitment_config::CommitmentConfig, hash::Hash}; -use std::sync::Arc; +use solana_transaction_status::Reward; pub fn create_test_block(slot: u64, commitment_config: CommitmentConfig) -> ProducedBlock { ProducedBlock { @@ -21,137 +20,62 @@ pub fn create_test_block(slot: u64, commitment_config: CommitmentConfig) -> Prod commitment_config, leader_id: None, slot, - rewards: None, + rewards: Some(vec![Reward { + pubkey: Pubkey::new_unique().to_string(), + lamports: 5000, + post_balance: 1000000, + reward_type: Some(RewardType::Voting), + commission: None, + }]), } } +#[ignore = "need postgres database"] #[tokio::test] async fn test_in_multiple_stategy_block_store() { - let persistent_store: Arc = Arc::new(InmemoryBlockStore::new(10)); - let number_of_slots_in_memory = 3; - let block_storage = MultipleStrategyBlockStorage::new( + tracing_subscriber::fmt::init(); + + let pg_session_config = PostgresSessionConfig::new_from_env().unwrap().unwrap(); + let epoch_cache = EpochCache::new_for_tests(); + let persistent_store = PostgresBlockStore::new(epoch_cache.clone(), pg_session_config).await; + let multi_store = MultipleStrategyBlockStorage::new( persistent_store.clone(), - None, - number_of_slots_in_memory, + None, // not supported ); - block_storage - .save(create_test_block(1235, CommitmentConfig::confirmed())) + persistent_store.prepare_epoch_schema(1200).await.unwrap(); + + persistent_store + .write_block(&create_test_block(1200, CommitmentConfig::confirmed())) .await .unwrap(); - block_storage - .save(create_test_block(1236, CommitmentConfig::confirmed())) + // span range of slots between those two + persistent_store + .write_block(&create_test_block(1289, CommitmentConfig::confirmed())) .await .unwrap(); - assert!(block_storage - .get(1235, RpcBlockConfig::default()) - .await - .ok() - .is_some()); - assert!(block_storage - .get(1236, RpcBlockConfig::default()) - .await - .ok() - .is_some()); - assert!(persistent_store - .get(1235, RpcBlockConfig::default()) - .await - .ok() - .is_none()); - assert!(persistent_store - .get(1236, RpcBlockConfig::default()) - .await - .ok() - .is_none()); + assert!(multi_store.query_block(1200).await.ok().is_some()); - block_storage - .save(create_test_block(1235, CommitmentConfig::finalized())) - .await - .unwrap(); - block_storage - .save(create_test_block(1236, CommitmentConfig::finalized())) - .await - .unwrap(); - block_storage - .save(create_test_block(1237, CommitmentConfig::finalized())) - .await - .unwrap(); + assert!(multi_store.query_block(1289).await.ok().is_some()); - assert!(block_storage - .get(1235, RpcBlockConfig::default()) - .await - .ok() - .is_some()); - assert!(block_storage - .get(1236, RpcBlockConfig::default()) - .await - .ok() - .is_some()); - assert!(block_storage - .get(1237, RpcBlockConfig::default()) - .await - .ok() - .is_some()); - assert!(persistent_store - .get(1235, RpcBlockConfig::default()) - .await - .ok() - .is_some()); - assert!(persistent_store - .get(1236, RpcBlockConfig::default()) - .await - .ok() - .is_some()); - assert!(persistent_store - .get(1237, RpcBlockConfig::default()) - .await - .ok() - .is_some()); - assert!(block_storage.get_in_memory_block(1237).await.ok().is_some()); + // not in range + assert!(multi_store.query_block(1000).await.is_err()); + // the range check should give "true", yet no block is returned + assert!(multi_store.query_block(1250).await.is_err()); + // not in range + assert!(multi_store.query_block(9999).await.is_err()); - // blocks are replaced by finalized blocks + let block_1200: BlockStorageData = multi_store.query_block(1200).await.unwrap(); + assert_eq!(1, block_1200.rewards.as_ref().unwrap().len()); assert_eq!( - persistent_store - .get(1235, RpcBlockConfig::default()) - .await + 5000, + block_1200 + .rewards + .as_ref() .unwrap() - .blockhash, - block_storage - .get_in_memory_block(1235) - .await + .first() .unwrap() - .blockhash + .lamports ); - assert_eq!( - persistent_store - .get(1236, RpcBlockConfig::default()) - .await - .unwrap() - .blockhash, - block_storage - .get_in_memory_block(1236) - .await - .unwrap() - .blockhash - ); - assert_eq!( - persistent_store - .get(1237, RpcBlockConfig::default()) - .await - .unwrap() - .blockhash, - block_storage - .get_in_memory_block(1237) - .await - .unwrap() - .blockhash - ); - - // no block yet added returns none - assert!(block_storage - .get(1238, RpcBlockConfig::default()) - .await - .ok() - .is_none()); } diff --git a/lite-rpc/Cargo.toml b/lite-rpc/Cargo.toml index 192b6d9a..e90a6b34 100644 --- a/lite-rpc/Cargo.toml +++ b/lite-rpc/Cargo.toml @@ -39,6 +39,7 @@ async-channel = { workspace = true } quinn = { workspace = true } async-trait = { workspace = true } tokio = { version = "1.28.2", features = ["full", "fs"]} +tokio-util = "0.7" tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] } chrono = { workspace = true } diff --git a/lite-rpc/src/bridge.rs b/lite-rpc/src/bridge.rs index b7ec84a7..278e5f7a 100644 --- a/lite-rpc/src/bridge.rs +++ b/lite-rpc/src/bridge.rs @@ -22,11 +22,10 @@ use solana_lite_rpc_history::history::History; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_rpc_client_api::{ config::{ - RpcBlockConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter, RpcBlocksConfigWrapper, - RpcContextConfig, RpcEncodingConfigWrapper, RpcGetVoteAccountsConfig, - RpcLeaderScheduleConfig, RpcProgramAccountsConfig, RpcRequestAirdropConfig, - RpcSignatureStatusConfig, RpcSignatureSubscribeConfig, RpcSignaturesForAddressConfig, - RpcTransactionLogsConfig, RpcTransactionLogsFilter, + RpcBlockSubscribeConfig, RpcBlockSubscribeFilter, RpcBlocksConfigWrapper, RpcContextConfig, + RpcGetVoteAccountsConfig, RpcLeaderScheduleConfig, RpcProgramAccountsConfig, + RpcRequestAirdropConfig, RpcSignatureStatusConfig, RpcSignatureSubscribeConfig, + RpcSignaturesForAddressConfig, RpcTransactionLogsConfig, RpcTransactionLogsFilter, }, response::{ Response as RpcResponse, RpcBlockhash, RpcConfirmedTransactionStatusWithSignature, @@ -57,6 +56,7 @@ lazy_static::lazy_static! { } /// A bridge between clients and tpu +#[allow(dead_code)] pub struct LiteBridge { data_cache: DataCache, // should be removed @@ -125,19 +125,17 @@ impl LiteBridge { #[jsonrpsee::core::async_trait] impl LiteRpcServer for LiteBridge { - async fn get_block( - &self, - slot: u64, - config: Option>, - ) -> crate::rpc::Result> { - let config = config.map_or(RpcBlockConfig::default(), |x| x.convert_to_current()); - let block = self.history.block_storage.get(slot, config).await; - if block.is_ok() { - // TO DO Convert to UIConfirmed Block - Err(jsonrpsee::core::Error::HttpNotImplemented) - } else { - Ok(None) - } + async fn get_block(&self, _slot: u64) -> crate::rpc::Result> { + // let block = self.history.block_storage.query_block(slot).await; + // if block.is_ok() { + // // TO DO Convert to UIConfirmed Block + // Err(jsonrpsee::core::Error::HttpNotImplemented) + // } else { + // Ok(None) + // } + + // TODO get_block might deserve different implementation based on whether we serve from "history module" vs. from "send tx module" + todo!("get_block: decide where to look") } async fn get_blocks( @@ -264,7 +262,7 @@ impl LiteRpcServer for LiteBridge { .data_cache .get_current_epoch(commitment_config) .await - .into_epoch_info(block_info.block_height, None); + .as_epoch_info(block_info.block_height, None); Ok(epoch_info) } diff --git a/lite-rpc/src/main.rs b/lite-rpc/src/main.rs index aed3f80f..a935c8f2 100644 --- a/lite-rpc/src/main.rs +++ b/lite-rpc/src/main.rs @@ -16,6 +16,7 @@ use solana_lite_rpc_cluster_endpoints::grpc_subscription_autoreconnect::{ }; use solana_lite_rpc_cluster_endpoints::json_rpc_leaders_getter::JsonRpcLeaderGetter; use solana_lite_rpc_cluster_endpoints::json_rpc_subscription::create_json_rpc_polling_subscription; +use solana_lite_rpc_cluster_endpoints::rpc_polling::poll_blocks::NUM_PARALLEL_TASKS_DEFAULT; use solana_lite_rpc_core::keypair_loader::load_identity_keypair; use solana_lite_rpc_core::stores::{ block_information_store::{BlockInformation, BlockInformationStore}, @@ -31,7 +32,6 @@ use solana_lite_rpc_core::structures::{ }; use solana_lite_rpc_core::types::BlockStream; use solana_lite_rpc_core::AnyhowJoinHandle; -use solana_lite_rpc_history::block_stores::inmemory_block_store::InmemoryBlockStore; use solana_lite_rpc_history::history::History; use solana_lite_rpc_history::postgres::postgres_config::PostgresSessionConfig; use solana_lite_rpc_history::postgres::postgres_session::PostgresSessionCache; @@ -137,7 +137,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: // )? } else { info!("Creating RPC poll subscription..."); - create_json_rpc_polling_subscription(rpc_client.clone())? + create_json_rpc_polling_subscription(rpc_client.clone(), NUM_PARALLEL_TASKS_DEFAULT)? }; let EndpointStreaming { blocks_notifier, @@ -229,9 +229,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: let support_service = tokio::spawn(async move { spawner.spawn_support_services().await }); - let history = History { - block_storage: Arc::new(InmemoryBlockStore::new(1024)), - }; + let history = History::new(); let bridge_service = tokio::spawn( LiteBridge::new( diff --git a/lite-rpc/src/postgres_logger.rs b/lite-rpc/src/postgres_logger.rs index ae0bd733..f9f67ab5 100644 --- a/lite-rpc/src/postgres_logger.rs +++ b/lite-rpc/src/postgres_logger.rs @@ -109,13 +109,13 @@ const fn get_max_safe_updates() -> usize { } async fn send_txs(postgres_session: &PostgresSession, txs: &[PostgresTx]) -> anyhow::Result<()> { - const NUMBER_OF_ARGS: usize = 8; - if txs.is_empty() { return Ok(()); } - let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * txs.len()); + const NB_ARGUMENTS: usize = 8; + + let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NB_ARGUMENTS * txs.len()); for tx in txs.iter() { let PostgresTx { @@ -139,17 +139,17 @@ async fn send_txs(postgres_session: &PostgresSession, txs: &[PostgresTx]) -> any args.push(quic_response); } - let mut query = String::from( + let values = PostgresSession::values_vecvec(NB_ARGUMENTS, txs.len(), &[]); + let statement = format!( r#" INSERT INTO lite_rpc.Txs (signature, recent_slot, forwarded_slot, forwarded_local_time, processed_slot, cu_consumed, cu_requested, quic_response) - VALUES + VALUES {} "#, + values ); - PostgresSession::multiline_query(&mut query, NUMBER_OF_ARGS, txs.len(), &[]); - - postgres_session.client.execute(&query, &args).await?; + postgres_session.client.execute(&statement, &args).await?; Ok(()) } @@ -158,13 +158,13 @@ async fn update_txs( postgres_session: &PostgresSession, txs: &[PostgresTxUpdate], ) -> anyhow::Result<()> { - const NUMBER_OF_ARGS: usize = 5; + const NB_ARGUMENTS: usize = 5; if txs.is_empty() { return Ok(()); } - let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * txs.len()); + let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NB_ARGUMENTS * txs.len()); for tx in txs.iter() { let PostgresTxUpdate { @@ -182,32 +182,26 @@ async fn update_txs( args.push(cu_price); } - let mut query = String::from( + let values = PostgresSession::values_vecvec( + NB_ARGUMENTS, + txs.len(), + &["text", "bigint", "bigint", "bigint", "bigint"], + ); + + let statement = format!( r#" UPDATE lite_rpc.Txs AS t1 SET processed_slot = t2.processed_slot, cu_consumed = t2.cu_consumed, cu_requested = t2.cu_requested, cu_price = t2.cu_price - FROM (VALUES - "#, - ); - - PostgresSession::multiline_query( - &mut query, - NUMBER_OF_ARGS, - txs.len(), - &["text", "bigint", "bigint", "bigint", "bigint"], - ); - - query.push_str( - r#" - ) AS t2(signature, processed_slot, cu_consumed, cu_requested, cu_price) + FROM (VALUES {}) AS t2(signature, processed_slot, cu_consumed, cu_requested, cu_price) WHERE t1.signature = t2.signature "#, + values ); - postgres_session.execute(&query, &args).await?; + postgres_session.execute(&statement, &args).await?; Ok(()) } diff --git a/lite-rpc/src/rpc.rs b/lite-rpc/src/rpc.rs index 15971781..5f754591 100644 --- a/lite-rpc/src/rpc.rs +++ b/lite-rpc/src/rpc.rs @@ -2,11 +2,10 @@ use crate::configs::{IsBlockHashValidConfig, SendTransactionConfig}; use jsonrpsee::core::SubscriptionResult; use jsonrpsee::proc_macros::rpc; use solana_rpc_client_api::config::{ - RpcBlockConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter, RpcBlocksConfigWrapper, - RpcContextConfig, RpcEncodingConfigWrapper, RpcGetVoteAccountsConfig, RpcLeaderScheduleConfig, - RpcProgramAccountsConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig, - RpcSignatureSubscribeConfig, RpcSignaturesForAddressConfig, RpcTransactionLogsConfig, - RpcTransactionLogsFilter, + RpcBlockSubscribeConfig, RpcBlockSubscribeFilter, RpcBlocksConfigWrapper, RpcContextConfig, + RpcGetVoteAccountsConfig, RpcLeaderScheduleConfig, RpcProgramAccountsConfig, + RpcRequestAirdropConfig, RpcSignatureStatusConfig, RpcSignatureSubscribeConfig, + RpcSignaturesForAddressConfig, RpcTransactionLogsConfig, RpcTransactionLogsFilter, }; use solana_rpc_client_api::response::{ Response as RpcResponse, RpcBlockhash, RpcConfirmedTransactionStatusWithSignature, @@ -28,11 +27,7 @@ pub trait LiteRpc { // *********************** #[method(name = "getBlock")] - async fn get_block( - &self, - slot: u64, - config: Option>, - ) -> Result>; + async fn get_block(&self, slot: u64) -> Result>; #[method(name = "getBlocks")] async fn get_blocks( diff --git a/migrations/create.sql b/migrations/create.sql index 608486eb..bafd91e5 100644 --- a/migrations/create.sql +++ b/migrations/create.sql @@ -1,8 +1,9 @@ +-- note: this schema is only used for postgres_logger CREATE SCHEMA lite_rpc; CREATE TABLE lite_rpc.Txs ( id SERIAL NOT NULL PRIMARY KEY, - signature CHAR(88) NOT NULL, + signature VARCHAR(88) NOT NULL, recent_slot BIGINT NOT NULL, forwarded_slot BIGINT NOT NULL, forwarded_local_time TIMESTAMP WITH TIME ZONE NOT NULL, diff --git a/migrations/localdev_integrationtest.sql b/migrations/localdev_integrationtest.sql new file mode 100644 index 00000000..9d263a59 --- /dev/null +++ b/migrations/localdev_integrationtest.sql @@ -0,0 +1,5 @@ + +CREATE DATABASE literpc_integrationtest_localdev; +CREATE USER literpc_integrationtest; +ALTER DATABASE literpc_integrationtest_localdev OWNER TO literpc_integrationtest; +ALTER USER literpc_integrationtest PASSWORD 'youknowme'; diff --git a/migrations/permissions.sql b/migrations/permissions.sql new file mode 100644 index 00000000..61346e17 --- /dev/null +++ b/migrations/permissions.sql @@ -0,0 +1,17 @@ +-- postgresql permission schema for Lite RPC persistence + +-- YOU NEED TO adjust the script + +-- create role and user; role is defined in code as LITERPC_ROLE constant +CREATE ROLE r_literpc; +CREATE USER literpc_app IN GROUP r_literpc; +-- ALTER USER literpc_app PASSWORD 'secret'; -- TODO provide your authentication + +-- required for postgres_logger +GRANT USAGE ON SCHEMA lite_rpc TO r_literpc; +GRANT ALL ON ALL TABLES IN SCHEMA lite_rpc TO r_literpc; +GRANT ALL ON ALL SEQUENCES IN SCHEMA lite_rpc TO r_literpc; +ALTER DEFAULT PRIVILEGES IN SCHEMA lite_rpc GRANT SELECT ON TABLES TO r_literpc; + +-- required for block persistence (dynamic schemata - one per epoch) +GRANT CONNECT, CREATE ON DATABASE my_literpc_database TO r_literpc; -- TODO adjust database name