diff --git a/Cargo.lock b/Cargo.lock index 9d1c5d6b..9358f037 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2519,9 +2519,9 @@ dependencies = [ "quinn", "serde", "serde_json", + "solana-lite-rpc-blockstore", "solana-lite-rpc-cluster-endpoints", "solana-lite-rpc-core", - "solana-lite-rpc-history", "solana-lite-rpc-prioritization-fees", "solana-lite-rpc-services", "solana-rpc-client", @@ -4283,6 +4283,40 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "solana-lite-rpc-blockstore" +version = "0.2.4" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.21.7", + "bincode", + "bytes", + "chrono", + "dashmap 5.5.3", + "futures", + "futures-util", + "itertools 0.10.5", + "jsonrpsee", + "log", + "native-tls", + "postgres-native-tls", + "rand 0.8.5", + "rangetools", + "serde", + "serde_json", + "solana-lite-rpc-cluster-endpoints", + "solana-lite-rpc-core", + "solana-rpc-client", + "solana-rpc-client-api", + "solana-sdk", + "solana-transaction-status", + "tokio", + "tokio-postgres", + "tokio-util", + "tracing-subscriber", +] + [[package]] name = "solana-lite-rpc-cluster-endpoints" version = "0.2.4" @@ -4356,40 +4390,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "solana-lite-rpc-history" -version = "0.2.4" -dependencies = [ - "anyhow", - "async-trait", - "base64 0.21.7", - "bincode", - "bytes", - "chrono", - "dashmap 5.5.3", - "futures", - "futures-util", - "itertools 0.10.5", - "jsonrpsee", - "log", - "native-tls", - "postgres-native-tls", - "rand 0.8.5", - "rangetools", - "serde", - "serde_json", - "solana-lite-rpc-cluster-endpoints", - "solana-lite-rpc-core", - "solana-rpc-client", - "solana-rpc-client-api", - "solana-sdk", - "solana-transaction-status", - "tokio", - "tokio-postgres", - "tokio-util", - "tracing-subscriber", -] - [[package]] name = "solana-lite-rpc-prioritization-fees" version = "0.2.4" diff --git a/Cargo.toml b/Cargo.toml index 6c8a4675..f0fa94cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ "quic-forward-proxy", "quic-forward-proxy-integration-test", "cluster-endpoints", - "history", + "blockstore", "prioritization_fees", "bench" ] @@ -68,7 +68,7 @@ rustls = { version = "0.21.7", default-features = false, features = ["quic"] } solana-lite-rpc-services = {path = "services", version="0.2.4"} solana-lite-rpc-core = {path = "core", version="0.2.4"} solana-lite-rpc-cluster-endpoints = {path = "cluster-endpoints", version="0.2.4"} -solana-lite-rpc-history = {path = "history", version="0.2.4"} +solana-lite-rpc-blockstore = {path = "blockstore", version="0.2.4"} solana-lite-rpc-stakevote = {path = "stake_vote", version="0.2.4"} solana-lite-rpc-prioritization-fees = {path = "prioritization_fees", version="0.2.4"} diff --git a/history/Cargo.toml b/blockstore/Cargo.toml similarity index 91% rename from history/Cargo.toml rename to blockstore/Cargo.toml index bbad4e80..96e235b8 100644 --- a/history/Cargo.toml +++ b/blockstore/Cargo.toml @@ -1,8 +1,8 @@ [package] -name = "solana-lite-rpc-history" +name = "solana-lite-rpc-blockstore" version = "0.2.4" edition = "2021" -description = "History implementations used by solana lite rpc" +description = "Store and proved blocks in PostgreSQL DB and via Yellowstone Faithful" rust-version = "1.70.0" repository = "https://github.com/blockworks-foundation/lite-rpc" license = "AGPL" diff --git a/blockstore/QUERIES.md b/blockstore/QUERIES.md new file mode 100644 index 00000000..af60f213 --- /dev/null +++ b/blockstore/QUERIES.md @@ -0,0 +1,5 @@ + + +```sql + +``` \ No newline at end of file diff --git a/history/examples/bench_postgres_simple_select.rs b/blockstore/examples/bench_postgres_simple_select.rs similarity index 94% rename from history/examples/bench_postgres_simple_select.rs rename to blockstore/examples/bench_postgres_simple_select.rs index 73f45e3d..268c0696 100644 --- a/history/examples/bench_postgres_simple_select.rs +++ b/blockstore/examples/bench_postgres_simple_select.rs @@ -3,8 +3,8 @@ use itertools::Itertools; /// test program to query postgres the simples possible way /// use log::info; -use solana_lite_rpc_history::postgres::postgres_config::PostgresSessionConfig; -use solana_lite_rpc_history::postgres::postgres_session::PostgresSession; +use solana_lite_rpc_blockstore::block_stores::postgres::PostgresSession; +use solana_lite_rpc_blockstore::block_stores::postgres::PostgresSessionConfig; #[tokio::main(flavor = "multi_thread", worker_threads = 16)] pub async fn main() -> anyhow::Result<()> { diff --git a/history/src/bin/blockstore-importer-service.rs b/blockstore/src/bin/blockstore-importer-service.rs similarity index 100% rename from history/src/bin/blockstore-importer-service.rs rename to blockstore/src/bin/blockstore-importer-service.rs diff --git a/history/src/block_stores/faithful_block_store.rs b/blockstore/src/block_stores/faithful_history/faithful_block_store.rs similarity index 90% rename from history/src/block_stores/faithful_block_store.rs rename to blockstore/src/block_stores/faithful_history/faithful_block_store.rs index 6d47e597..d4597cc0 100644 --- a/history/src/block_stores/faithful_block_store.rs +++ b/blockstore/src/block_stores/faithful_history/faithful_block_store.rs @@ -11,7 +11,7 @@ use std::ops::RangeInclusive; use std::sync::Arc; pub struct FaithfulBlockStore { - faithful_rpc_client: Arc, // to fetch legacy blocks from faithful + faithful_rpc_client: Arc, // to fetch legacy blocks from faithful_history } impl FaithfulBlockStore { @@ -48,7 +48,10 @@ impl FaithfulBlockStore { CommitmentConfig::finalized(), )), Err(err) => { - bail!(format!("Block {} not found in faithful: {}", slot, err)); + bail!(format!( + "Block {} not found in faithful_history: {}", + slot, err + )); } } } diff --git a/blockstore/src/block_stores/faithful_history/mod.rs b/blockstore/src/block_stores/faithful_history/mod.rs new file mode 100644 index 00000000..f51810a8 --- /dev/null +++ b/blockstore/src/block_stores/faithful_history/mod.rs @@ -0,0 +1 @@ +pub mod faithful_block_store; diff --git a/blockstore/src/block_stores/mod.rs b/blockstore/src/block_stores/mod.rs new file mode 100644 index 00000000..57c6d205 --- /dev/null +++ b/blockstore/src/block_stores/mod.rs @@ -0,0 +1,3 @@ +pub mod faithful_history; +pub mod multiple_strategy_block_store; +pub mod postgres; diff --git a/history/src/block_stores/multiple_strategy_block_store.rs b/blockstore/src/block_stores/multiple_strategy_block_store.rs similarity index 77% rename from history/src/block_stores/multiple_strategy_block_store.rs rename to blockstore/src/block_stores/multiple_strategy_block_store.rs index 5d2c6fb5..a9a93fb5 100644 --- a/history/src/block_stores/multiple_strategy_block_store.rs +++ b/blockstore/src/block_stores/multiple_strategy_block_store.rs @@ -1,5 +1,5 @@ -use crate::block_stores::faithful_block_store::FaithfulBlockStore; -use crate::block_stores::postgres_block_store::PostgresBlockStore; +use crate::block_stores::faithful_history::faithful_block_store::FaithfulBlockStore; +use crate::block_stores::postgres::postgres_block_store_query::PostgresQueryBlockStore; use anyhow::{bail, Context, Result}; use log::{debug, trace}; use solana_lite_rpc_core::structures::produced_block::ProducedBlock; @@ -12,7 +12,7 @@ use std::sync::Arc; pub enum BlockSource { // serve two epochs from postgres RecentEpochDatabase, - // serve epochs older than two from faithful service + // serve epochs older than two from faithful_history service FaithfulArchive, } @@ -33,20 +33,20 @@ impl Deref for BlockStorageData { // you might need to add a read-cache instead pub struct MultipleStrategyBlockStorage { - persistent_block_storage: PostgresBlockStore, // for persistent block storage + block_storage_query: PostgresQueryBlockStore, // note supported ATM - faithful_block_storage: Option, // to fetch legacy blocks from faithful + faithful_block_storage: Option, // to fetch legacy blocks from faithful_history // last_confirmed_slot: Arc, } impl MultipleStrategyBlockStorage { pub fn new( - persistent_block_storage: PostgresBlockStore, + block_storage_query: PostgresQueryBlockStore, _faithful_rpc_client: Option>, ) -> Self { Self { - persistent_block_storage, - // faithful not used ATM + block_storage_query, + // faithful_history not used ATM faithful_block_storage: None, // faithful_block_storage: faithful_rpc_client.map(|rpc| FaithfulBlockStore::new(rpc)), } @@ -55,7 +55,7 @@ impl MultipleStrategyBlockStorage { // we need to build the slots from right to left pub async fn get_slot_range(&self) -> RangeInclusive { // merge them - let persistent_storage_range = self.persistent_block_storage.get_slot_range().await; + let persistent_storage_range = self.block_storage_query.get_slot_range().await; trace!("Persistent storage range: {:?}", persistent_storage_range); let mut lower = *persistent_storage_range.start(); @@ -70,12 +70,15 @@ impl MultipleStrategyBlockStorage { } let merged = RangeInclusive::new(lower, *persistent_storage_range.end()); - trace!("Merged range from database + faithful: {:?}", merged); + trace!( + "Merged range from database + faithful_history: {:?}", + merged + ); merged } - // lookup confirmed or finalized block from either our blockstore or faithful + // lookup confirmed or finalized block from either our blockstore or faithful_history // TODO find better method name pub async fn query_block( &self, @@ -93,17 +96,17 @@ impl MultipleStrategyBlockStorage { // current strategy: // 1. check if requested slot is in min-max range served from Postgres // 2.1. if yes; fetch from Postgres - // 2.2. if not: try to fetch from faithful + // 2.2. if not: try to fetch from faithful_history - match self.persistent_block_storage.is_block_in_range(slot).await { + match self.block_storage_query.is_block_in_range(slot).await { true => { debug!( "Assume block {} to be available in persistent block-storage", slot, ); let lookup = self - .persistent_block_storage - .query(slot) + .block_storage_query + .query_block(slot) .await .context(format!("block {} not found although it was in range", slot)); @@ -124,7 +127,7 @@ impl MultipleStrategyBlockStorage { match faithful_block_storage.get_block(slot).await { Ok(block) => { debug!( - "Lookup for block {} successful in faithful block-storage", + "Lookup for block {} successful in faithful_history block-storage", slot ); @@ -134,12 +137,18 @@ impl MultipleStrategyBlockStorage { }) } Err(_) => { - debug!("Block {} not found in faithful storage - giving up", slot); - bail!(format!("Block {} not found in faithful", slot)); + debug!( + "Block {} not found in faithful_history storage - giving up", + slot + ); + bail!(format!("Block {} not found in faithful_history", slot)); } } } else { - bail!(format!("Block {} not found - faithful not available", slot)); + bail!(format!( + "Block {} not found - faithful_history not available", + slot + )); } } } diff --git a/blockstore/src/block_stores/postgres/mod.rs b/blockstore/src/block_stores/postgres/mod.rs new file mode 100644 index 00000000..1aed1d9f --- /dev/null +++ b/blockstore/src/block_stores/postgres/mod.rs @@ -0,0 +1,16 @@ +pub mod postgres_block_store_query; +pub mod postgres_block_store_writer; +pub use postgres_config::PostgresSessionConfig; +pub use postgres_session::PostgresSession; +pub use postgres_session::PostgresWriteSession; + +mod postgres_block; +mod postgres_config; +mod postgres_epoch; +mod postgres_session; +mod postgres_transaction; + +// role for block store componente owner with full write access +pub const LITERPC_ROLE: &str = "r_literpc"; +// role for accessing data +pub const LITERPC_QUERY_ROLE: &str = "ro_literpc"; diff --git a/history/src/postgres/postgres_block.rs b/blockstore/src/block_stores/postgres/postgres_block.rs similarity index 94% rename from history/src/postgres/postgres_block.rs rename to blockstore/src/block_stores/postgres/postgres_block.rs index 18c07dd0..8b3285f1 100644 --- a/history/src/postgres/postgres_block.rs +++ b/blockstore/src/block_stores/postgres/postgres_block.rs @@ -1,4 +1,5 @@ -use crate::postgres::postgres_epoch::PostgresEpoch; +use super::postgres_epoch::PostgresEpoch; +use super::postgres_session::PostgresSession; use log::{debug, warn}; use solana_lite_rpc_core::structures::epoch::EpochRef; use solana_lite_rpc_core::structures::produced_block::TransactionInfo; @@ -9,8 +10,6 @@ use solana_transaction_status::Reward; use std::time::Instant; use tokio_postgres::types::ToSql; -use super::postgres_session::PostgresSession; - #[derive(Debug)] pub struct PostgresBlock { pub slot: i64, @@ -46,7 +45,7 @@ impl From<&ProducedBlock> for PostgresBlock { } impl PostgresBlock { - pub fn into_produced_block( + pub fn to_produced_block( &self, transaction_infos: Vec, commitment_config: CommitmentConfig, @@ -158,12 +157,7 @@ impl PostgresBlock { // check if monotonic let prev_max_slot = row.get::<&str, Option>("prev_max_slot"); // None -> no previous rows - debug!( - "Inserted block {} with prev highest slot being {}, parent={}", - self.slot, - prev_max_slot.unwrap_or(-1), - self.parent_slot - ); + debug!("Inserted block {}", self.slot,); if let Some(prev_max_slot) = prev_max_slot { if prev_max_slot > self.slot { // note: unclear if this is desired behavior! @@ -213,7 +207,7 @@ mod tests { let transaction_infos = vec![create_tx_info(), create_tx_info()]; let produced_block = - block.into_produced_block(transaction_infos, CommitmentConfig::confirmed()); + block.to_produced_block(transaction_infos, CommitmentConfig::confirmed()); assert_eq!(produced_block.slot, 5050505); assert_eq!(produced_block.transactions.len(), 2); diff --git a/blockstore/src/block_stores/postgres/postgres_block_store_query.rs b/blockstore/src/block_stores/postgres/postgres_block_store_query.rs new file mode 100644 index 00000000..c92538b8 --- /dev/null +++ b/blockstore/src/block_stores/postgres/postgres_block_store_query.rs @@ -0,0 +1,285 @@ +use std::collections::HashMap; +use std::ops::RangeInclusive; +use std::time::Instant; + +use crate::block_stores::postgres::LITERPC_QUERY_ROLE; +use anyhow::{bail, Result}; +use itertools::Itertools; +use log::{debug, info, warn}; +use solana_lite_rpc_core::structures::epoch::EpochRef; +use solana_lite_rpc_core::structures::{epoch::EpochCache, produced_block::ProducedBlock}; +use solana_sdk::commitment_config::CommitmentConfig; +use solana_sdk::slot_history::Slot; + +use super::postgres_block::*; +use super::postgres_config::*; +use super::postgres_epoch::*; +use super::postgres_session::*; +use super::postgres_transaction::*; + +#[derive(Clone)] +pub struct PostgresQueryBlockStore { + session_cache: PostgresSessionCache, + epoch_schedule: EpochCache, +} + +impl PostgresQueryBlockStore { + pub async fn new(epoch_schedule: EpochCache, pg_session_config: PostgresSessionConfig) -> Self { + let session_cache = PostgresSessionCache::new(pg_session_config.clone()) + .await + .unwrap(); + + Self::check_query_role(&session_cache).await; + + Self { + session_cache, + epoch_schedule, + } + } + + async fn get_session(&self) -> PostgresSession { + self.session_cache + .get_session() + .await + .expect("should get new postgres session") + } + + pub async fn is_block_in_range(&self, slot: Slot) -> bool { + let epoch = self.epoch_schedule.get_epoch_at_slot(slot); + let ranges = self.get_slot_range_by_epoch().await; + let matching_range: Option<&RangeInclusive> = ranges.get(&epoch.into()); + + matching_range + .map(|slot_range| slot_range.contains(&slot)) + .is_some() + } + + pub async fn query_block(&self, slot: Slot) -> Result { + let started_at = Instant::now(); + let epoch: EpochRef = self.epoch_schedule.get_epoch_at_slot(slot).into(); + + let statement = PostgresBlock::build_query_statement(epoch, slot); + let block_row = self + .get_session() + .await + .query_opt(&statement, &[]) + .await + .unwrap(); + + if block_row.is_none() { + bail!("Block {} in epoch {} not found in postgres", slot, epoch); + } + + let statement = PostgresTransaction::build_query_statement(epoch, slot); + let transaction_rows = self + .get_session() + .await + .query_list(&statement, &[]) + .await + .unwrap(); + + warn!( + "transaction_rows: {} - print first 10", + transaction_rows.len() + ); + + let tx_infos = transaction_rows + .iter() + .map(|tx_row| { + let postgres_transaction = PostgresTransaction { + slot: slot as i64, + signature: tx_row.get("signature"), + err: tx_row.get("err"), + cu_requested: tx_row.get("cu_requested"), + prioritization_fees: tx_row.get("prioritization_fees"), + cu_consumed: tx_row.get("cu_consumed"), + recent_blockhash: tx_row.get("recent_blockhash"), + message: tx_row.get("message"), + }; + + postgres_transaction.to_transaction_info() + }) + .collect_vec(); + + let row = block_row.unwrap(); + // meta data + let _epoch: i64 = row.get("_epoch"); + let epoch_schema: String = row.get("_epoch_schema"); + + let blockhash: String = row.get("blockhash"); + let block_height: i64 = row.get("block_height"); + let slot: i64 = row.get("slot"); + let parent_slot: i64 = row.get("parent_slot"); + let block_time: i64 = row.get("block_time"); + let previous_blockhash: String = row.get("previous_blockhash"); + let rewards: Option = row.get("rewards"); + let leader_id: Option = row.get("leader_id"); + + let postgres_block = PostgresBlock { + slot, + blockhash, + block_height, + parent_slot, + block_time, + previous_blockhash, + rewards, + leader_id, + }; + + let produced_block = postgres_block.to_produced_block( + tx_infos, + // FIXME + CommitmentConfig::confirmed(), + ); + + debug!( + "Querying produced block {} from postgres in epoch schema {} took {:.2}ms: {}/{}", + produced_block.slot, + epoch_schema, + started_at.elapsed().as_secs_f64() * 1000.0, + produced_block.blockhash, + produced_block.commitment_config.commitment + ); + + Ok(produced_block) + } + + async fn check_query_role(session_cache: &PostgresSessionCache) { + let role = LITERPC_QUERY_ROLE; + let statement = format!("SELECT 1 FROM pg_roles WHERE rolname='{role}'"); + let count = session_cache + .get_session() + .await + .expect("must get session") + .execute(&statement, &[]) + .await + .expect("must execute query to check for role"); + + if count == 0 { + panic!( + "Missing mandatory postgres query role '{}' for Lite RPC - see permissions.sql", + role + ); + } else { + info!("Self check - found postgres role '{}'", role); + } + } +} + +impl PostgresQueryBlockStore { + pub async fn get_slot_range(&self) -> RangeInclusive { + let map_epoch_to_slot_range = self.get_slot_range_by_epoch().await; + + let rows_minmax: Vec<&RangeInclusive> = + map_epoch_to_slot_range.values().collect_vec(); + + let slot_min = rows_minmax + .iter() + .map(|range| range.start()) + .min() + // TODO decide what todo + .expect("non-empty result - TODO"); + let slot_max = rows_minmax + .iter() + .map(|range| range.end()) + .max() + .expect("non-empty result - TODO"); + + RangeInclusive::new(*slot_min, *slot_max) + } + + pub async fn get_slot_range_by_epoch(&self) -> HashMap> { + let started = Instant::now(); + let session = self.get_session().await; + // e.g. "rpc2a_epoch_552" + let query = format!( + r#" + SELECT + schema_name + FROM information_schema.schemata + WHERE schema_name ~ '^{schema_prefix}[0-9]+$' + "#, + schema_prefix = EPOCH_SCHEMA_PREFIX + ); + let result = session.query_list(&query, &[]).await.unwrap(); + + let epoch_schemas = result + .iter() + .map(|row| row.get::<&str, &str>("schema_name")) + .map(|schema_name| { + ( + schema_name, + PostgresEpoch::parse_epoch_from_schema_name(schema_name), + ) + }) + .collect_vec(); + + if epoch_schemas.is_empty() { + return HashMap::new(); + } + + let inner = epoch_schemas + .iter() + .map(|(schema, epoch)| { + format!( + "SELECT slot,{epoch}::bigint as epoch FROM {schema}.blocks", + schema = schema, + epoch = epoch + ) + }) + .join(" UNION ALL "); + + let query = format!( + r#" + SELECT epoch, min(slot) as slot_min, max(slot) as slot_max FROM ( + {inner} + ) AS all_slots + GROUP BY epoch + "#, + inner = inner + ); + + let rows_minmax = session.query_list(&query, &[]).await.unwrap(); + + if rows_minmax.is_empty() { + return HashMap::new(); + } + + let mut map_epoch_to_slot_range = rows_minmax + .iter() + .map(|row| { + ( + row.get::<&str, i64>("epoch"), + RangeInclusive::new( + row.get::<&str, i64>("slot_min") as Slot, + row.get::<&str, i64>("slot_max") as Slot, + ), + ) + }) + .into_grouping_map() + .fold(None, |acc, _key, val| { + assert!(acc.is_none(), "epoch must be unique"); + Some(val) + }); + + let final_range: HashMap> = map_epoch_to_slot_range + .iter_mut() + .map(|(epoch, range)| { + let epoch = EpochRef::new(*epoch as u64); + ( + epoch, + range.clone().expect("range must be returned from SQL"), + ) + }) + .collect(); + + debug!( + "Slot range check in postgres found {} ranges, took {:2}sec: {:?}", + rows_minmax.len(), + started.elapsed().as_secs_f64(), + final_range + ); + + final_range + } +} diff --git a/history/src/block_stores/postgres_block_store.rs b/blockstore/src/block_stores/postgres/postgres_block_store_writer.rs similarity index 60% rename from history/src/block_stores/postgres_block_store.rs rename to blockstore/src/block_stores/postgres/postgres_block_store_writer.rs index c4dcd2ca..13bf647c 100644 --- a/history/src/block_stores/postgres_block_store.rs +++ b/blockstore/src/block_stores/postgres/postgres_block_store_writer.rs @@ -1,42 +1,30 @@ -use std::collections::HashMap; -use std::ops::RangeInclusive; use std::time::{Duration, Instant}; +use crate::block_stores::postgres::{LITERPC_QUERY_ROLE, LITERPC_ROLE}; use anyhow::{bail, Context, Result}; use itertools::Itertools; use log::{debug, info, trace, warn}; use solana_lite_rpc_core::structures::epoch::EpochRef; use solana_lite_rpc_core::structures::{epoch::EpochCache, produced_block::ProducedBlock}; -use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; +use solana_sdk::commitment_config::CommitmentLevel; use solana_sdk::slot_history::Slot; use tokio_postgres::error::SqlState; -use crate::postgres::postgres_config::PostgresSessionConfig; -use crate::postgres::postgres_epoch::{PostgresEpoch, EPOCH_SCHEMA_PREFIX}; -use crate::postgres::postgres_session::{PostgresSession, PostgresWriteSession}; -use crate::postgres::{ - postgres_block::PostgresBlock, postgres_session::PostgresSessionCache, - postgres_transaction::PostgresTransaction, -}; +use super::postgres_block::*; +use super::postgres_config::*; +use super::postgres_epoch::*; +use super::postgres_session::*; +use super::postgres_transaction::*; -const LITERPC_ROLE: &str = "r_literpc"; const PARALLEL_WRITE_SESSIONS: usize = 4; const MIN_WRITE_CHUNK_SIZE: usize = 500; -#[derive(Default, Clone, Copy)] -pub struct PostgresData { - // from_slot: Slot, - // to_slot: Slot, - // current_epoch: Epoch, -} - #[derive(Clone)] pub struct PostgresBlockStore { session_cache: PostgresSessionCache, // use this session only for the write path! write_sessions: Vec, epoch_schedule: EpochCache, - // postgres_data: Arc>, } impl PostgresBlockStore { @@ -57,17 +45,16 @@ impl PostgresBlockStore { "must have at least one write session" ); - Self::check_role(&session_cache).await; + Self::check_write_role(&session_cache).await; Self { session_cache, write_sessions, epoch_schedule, - // postgres_data, } } - async fn check_role(session_cache: &PostgresSessionCache) { + async fn check_write_role(session_cache: &PostgresSessionCache) { let role = LITERPC_ROLE; let statement = format!("SELECT 1 FROM pg_roles WHERE rolname='{role}'"); let count = session_cache @@ -80,11 +67,14 @@ impl PostgresBlockStore { if count == 0 { panic!( - "Missing mandatory postgres role '{}' for Lite RPC - see permissions.sql", + "Missing mandatory postgres write/ownership role '{}' for Lite RPC - see permissions.sql", role ); } else { - info!("Self check - found postgres role '{}'", role); + info!( + "Self check - found postgres write role/ownership '{}'", + role + ); } } @@ -96,7 +86,7 @@ impl PostgresBlockStore { let statement = PostgresEpoch::build_create_schema_statement(epoch); // note: requires GRANT CREATE ON DATABASE xyz - let result_create_schema = session.execute_simple(&statement).await; + let result_create_schema = session.execute_multiple(&statement).await; if let Err(err) = result_create_schema { if err .code() @@ -117,28 +107,28 @@ impl PostgresBlockStore { // set permissions for new schema let statement = build_assign_permissions_statements(epoch); session - .execute_simple(&statement) + .execute_multiple(&statement) .await .context("Set postgres permissions for new schema")?; // Create blocks table let statement = PostgresBlock::build_create_table_statement(epoch); session - .execute_simple(&statement) + .execute_multiple(&statement) .await .context("create blocks table for new epoch")?; // create transaction table let statement = PostgresTransaction::build_create_table_statement(epoch); session - .execute_simple(&statement) + .execute_multiple(&statement) .await .context("create transaction table for new epoch")?; // add foreign key constraint between transactions and blocks let statement = PostgresTransaction::build_foreign_key_statement(epoch); session - .execute_simple(&statement) + .execute_multiple(&statement) .await .context("create foreign key constraint between transactions and blocks")?; @@ -153,75 +143,6 @@ impl PostgresBlockStore { .expect("should get new postgres session") } - pub async fn is_block_in_range(&self, slot: Slot) -> bool { - let epoch = self.epoch_schedule.get_epoch_at_slot(slot); - let ranges = self.get_slot_range_by_epoch().await; - let matching_range: Option<&RangeInclusive> = ranges.get(&epoch.into()); - - matching_range - .map(|slot_range| slot_range.contains(&slot)) - .is_some() - } - - pub async fn query(&self, slot: Slot) -> Result { - let started = Instant::now(); - let epoch: EpochRef = self.epoch_schedule.get_epoch_at_slot(slot).into(); - - let query = PostgresBlock::build_query_statement(epoch, slot); - let block_row = self - .get_session() - .await - .query_opt(&query, &[]) - .await - .unwrap(); - - if block_row.is_none() { - bail!("Block {} in epoch {} not found in postgres", slot, epoch); - } - - let row = block_row.unwrap(); - // meta data - let _epoch: i64 = row.get("_epoch"); - let epoch_schema: String = row.get("_epoch_schema"); - - let blockhash: String = row.get("blockhash"); - let block_height: i64 = row.get("block_height"); - let slot: i64 = row.get("slot"); - let parent_slot: i64 = row.get("parent_slot"); - let block_time: i64 = row.get("block_time"); - let previous_blockhash: String = row.get("previous_blockhash"); - let rewards: Option = row.get("rewards"); - let leader_id: Option = row.get("leader_id"); - - let postgres_block = PostgresBlock { - slot, - blockhash, - block_height, - parent_slot, - block_time, - previous_blockhash, - rewards, - leader_id, - }; - - let produced_block = postgres_block.into_produced_block( - // TODO what to do - vec![], - CommitmentConfig::confirmed(), - ); - - debug!( - "Querying produced block {} from postgres in epoch schema {} took {:.2}ms: {}/{}", - produced_block.slot, - epoch_schema, - started.elapsed().as_secs_f64() * 1000.0, - produced_block.blockhash, - produced_block.commitment_config.commitment - ); - - Ok(produced_block) - } - // optimistically try to progress commitment level for a block that is already stored pub async fn progress_block_commitment_level(&self, block: &ProducedBlock) -> Result<()> { // ATM we only support updating confirmed block to finalized @@ -236,12 +157,16 @@ impl PostgresBlockStore { Ok(()) } - pub async fn write_block(&self, block: &ProducedBlock) -> Result<()> { + pub async fn save_block(&self, block: &ProducedBlock) -> Result<()> { self.progress_block_commitment_level(block).await?; // let PostgresData { current_epoch, .. } = { *self.postgres_data.read().await }; - trace!("Saving block {} to postgres storage...", block.slot); + trace!( + "Saving block {}@{} to postgres storage...", + block.slot, + block.commitment_config.commitment + ); let slot = block.slot; let transactions = block .transactions @@ -277,19 +202,20 @@ impl PostgresBlockStore { ); for (i, chunk) in chunks.iter().enumerate() { let session = self.write_sessions[i].get_write_session().await.clone(); - let future = PostgresTransaction::save_transaction_copyin(session, epoch.into(), chunk); + let future = + PostgresTransaction::save_transactions_from_block(session, epoch.into(), chunk); queries_fut.push(future); } - let all_results: Vec> = futures_util::future::join_all(queries_fut).await; + let all_results: Vec> = futures_util::future::join_all(queries_fut).await; for result in all_results { - result.unwrap(); + result.expect("Save query must succeed"); } let elapsed_txs_insert = started_txs.elapsed(); - debug!( - "Saving block {} to postgres took {:.2}ms for block and {:.2}ms for {} transactions ({}x{} chunks)", - slot, + info!( + "Saving block {}@{} to postgres took {:.2}ms for block and {:.2}ms for {} transactions ({}x{} chunks)", + slot, block.commitment_config.commitment, elapsed_block_insert.as_secs_f64() * 1000.0, elapsed_txs_insert.as_secs_f64() * 1000.0, transactions.len(), @@ -317,7 +243,7 @@ impl PostgresBlockStore { tokio::spawn(async move { write_session_single - .execute_simple(&statement) + .execute_multiple(&statement) .await .unwrap(); let elapsed = started.elapsed(); @@ -346,18 +272,40 @@ impl PostgresBlockStore { let created_next = self.start_new_epoch_if_necessary(next_epoch).await?; Ok(created_current || created_next) } + + // used for testing only ATM + pub async fn drop_epoch_schema(&self, epoch: EpochRef) -> anyhow::Result<()> { + // create schema for new epoch + let schema_name = PostgresEpoch::build_schema_name(epoch); + let session = self.get_session().await; + + let statement = PostgresEpoch::build_drop_schema_statement(epoch); + let result_drop_schema = session.execute_multiple(&statement).await; + match result_drop_schema { + Ok(_) => { + warn!("Dropped schema {}", schema_name); + Ok(()) + } + Err(_err) => { + bail!("Error dropping schema {}", schema_name) + } + } + } } fn build_assign_permissions_statements(epoch: EpochRef) -> String { - let role = LITERPC_ROLE; let schema = PostgresEpoch::build_schema_name(epoch); - format!( r#" GRANT USAGE ON SCHEMA {schema} TO {role}; GRANT ALL ON ALL TABLES IN SCHEMA {schema} TO {role}; ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT ALL ON TABLES TO {role}; - "# + + GRANT USAGE ON SCHEMA {schema} TO {query_role}; + ALTER DEFAULT PRIVILEGES IN SCHEMA {schema} GRANT SELECT ON TABLES TO {query_role}; + "#, + role = LITERPC_ROLE, + query_role = LITERPC_QUERY_ROLE, ) } @@ -365,123 +313,6 @@ fn div_ceil(a: usize, b: usize) -> usize { (a.saturating_add(b).saturating_sub(1)).saturating_div(b) } -impl PostgresBlockStore { - pub async fn get_slot_range(&self) -> RangeInclusive { - let map_epoch_to_slot_range = self.get_slot_range_by_epoch().await; - - let rows_minmax: Vec<&RangeInclusive> = - map_epoch_to_slot_range.values().collect_vec(); - - let slot_min = rows_minmax - .iter() - .map(|range| range.start()) - .min() - .expect("non-empty result"); - let slot_max = rows_minmax - .iter() - .map(|range| range.end()) - .max() - .expect("non-empty result"); - - RangeInclusive::new(*slot_min, *slot_max) - } - - pub async fn get_slot_range_by_epoch(&self) -> HashMap> { - let started = Instant::now(); - let session = self.get_session().await; - // e.g. "rpc2a_epoch_552" - let query = format!( - r#" - SELECT - schema_name - FROM information_schema.schemata - WHERE schema_name ~ '^{schema_prefix}[0-9]+$' - "#, - schema_prefix = EPOCH_SCHEMA_PREFIX - ); - let result = session.query_list(&query, &[]).await.unwrap(); - - let epoch_schemas = result - .iter() - .map(|row| row.get::<&str, &str>("schema_name")) - .map(|schema_name| { - ( - schema_name, - PostgresEpoch::parse_epoch_from_schema_name(schema_name), - ) - }) - .collect_vec(); - - if epoch_schemas.is_empty() { - return HashMap::new(); - } - - let inner = epoch_schemas - .iter() - .map(|(schema, epoch)| { - format!( - "SELECT slot,{epoch}::bigint as epoch FROM {schema}.blocks", - schema = schema, - epoch = epoch - ) - }) - .join(" UNION ALL "); - - let query = format!( - r#" - SELECT epoch, min(slot) as slot_min, max(slot) as slot_max FROM ( - {inner} - ) AS all_slots - GROUP BY epoch - "#, - inner = inner - ); - - let rows_minmax = session.query_list(&query, &[]).await.unwrap(); - - if rows_minmax.is_empty() { - return HashMap::new(); - } - - let mut map_epoch_to_slot_range = rows_minmax - .iter() - .map(|row| { - ( - row.get::<&str, i64>("epoch"), - RangeInclusive::new( - row.get::<&str, i64>("slot_min") as Slot, - row.get::<&str, i64>("slot_max") as Slot, - ), - ) - }) - .into_grouping_map() - .fold(None, |acc, _key, val| { - assert!(acc.is_none(), "epoch must be unique"); - Some(val) - }); - - let final_range: HashMap> = map_epoch_to_slot_range - .iter_mut() - .map(|(epoch, range)| { - let epoch = EpochRef::new(*epoch as u64); - ( - epoch, - range.clone().expect("range must be returned from SQL"), - ) - }) - .collect(); - - debug!( - "Slot range check in postgres found {} ranges, took {:2}sec: {:?}", - rows_minmax.len(), - started.elapsed().as_secs_f64(), - final_range - ); - - final_range - } -} - #[cfg(test)] mod tests { use super::*; @@ -524,7 +355,7 @@ mod tests { PostgresBlockStore::new(epoch_cache.clone(), pg_session_config.clone()).await; postgres_block_store - .write_block(&create_test_block()) + .save_block(&create_test_block()) .await .unwrap(); } diff --git a/history/src/postgres/postgres_config.rs b/blockstore/src/block_stores/postgres/postgres_config.rs similarity index 96% rename from history/src/postgres/postgres_config.rs rename to blockstore/src/block_stores/postgres/postgres_config.rs index 702f0867..fbe194a1 100644 --- a/history/src/postgres/postgres_config.rs +++ b/blockstore/src/block_stores/postgres/postgres_config.rs @@ -60,7 +60,7 @@ impl PostgresSessionConfig { pub fn new_for_tests() -> PostgresSessionConfig { assert!( env::var("PG_CONFIG").is_err(), - "note that ENV variables are ignored!" + "MUST NOT provide PG_CONFIG environment variables as they are ignored!" ); // see localdev_integrationtest.sql how to setup the database diff --git a/history/src/postgres/postgres_epoch.rs b/blockstore/src/block_stores/postgres/postgres_epoch.rs similarity index 83% rename from history/src/postgres/postgres_epoch.rs rename to blockstore/src/block_stores/postgres/postgres_epoch.rs index 8261f767..c3a61d23 100644 --- a/history/src/postgres/postgres_epoch.rs +++ b/blockstore/src/block_stores/postgres/postgres_epoch.rs @@ -20,6 +20,16 @@ impl PostgresEpoch { ) } + pub fn build_drop_schema_statement(epoch: EpochRef) -> String { + let schema = PostgresEpoch::build_schema_name(epoch); + format!( + " + DROP SCHEMA IF EXISTS {} CASCADE; + ", + schema + ) + } + pub fn parse_epoch_from_schema_name(schema_name: &str) -> EpochRef { let epoch_number_str = schema_name.trim_start_matches(EPOCH_SCHEMA_PREFIX); let epoch = epoch_number_str.parse::().unwrap(); diff --git a/history/src/postgres/postgres_session.rs b/blockstore/src/block_stores/postgres/postgres_session.rs similarity index 91% rename from history/src/postgres/postgres_session.rs rename to blockstore/src/block_stores/postgres/postgres_session.rs index 28d82983..d95c400b 100644 --- a/history/src/postgres/postgres_session.rs +++ b/blockstore/src/block_stores/postgres/postgres_session.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use anyhow::Context; +use log::debug; use native_tls::{Certificate, Identity, TlsConnector}; use postgres_native_tls::MakeTlsConnector; use solana_lite_rpc_core::encoding::BinaryEncoding; @@ -12,29 +13,6 @@ use tokio_postgres::{ use super::postgres_config::{PostgresSessionConfig, PostgresSessionSslConfig}; -const MAX_QUERY_SIZE: usize = 200_000; // 0.2 mb - -pub trait SchemaSize { - const DEFAULT_SIZE: usize = 0; - const MAX_SIZE: usize = 0; -} - -pub const fn get_max_safe_inserts() -> usize { - if T::DEFAULT_SIZE == 0 { - panic!("DEFAULT_SIZE can't be 0. SchemaSize impl should override the DEFAULT_SIZE const"); - } - - MAX_QUERY_SIZE / T::DEFAULT_SIZE -} - -pub const fn get_max_safe_updates() -> usize { - if T::MAX_SIZE == 0 { - panic!("MAX_SIZE can't be 0. SchemaSize impl should override the MAX_SIZE const"); - } - - MAX_QUERY_SIZE / T::MAX_SIZE -} - #[derive(Clone)] pub struct PostgresSession { pub client: Arc, @@ -152,6 +130,24 @@ impl PostgresSession { query } + pub async fn clear_session(&self) { + // see https://www.postgresql.org/docs/current/sql-discard.html + // CLOSE ALL -> drop potental cursors + // RESET ALL -> we do not want (would reset work_mem) + // DEALLOCATE -> would drop prepared statements which we do not use ATM + // DISCARD PLANS -> we want to keep the plans + // DISCARD SEQUENCES -> we want to keep the sequences + self.client + .batch_execute( + r#" + DISCARD TEMP; + CLOSE ALL;"#, + ) + .await + .unwrap(); + debug!("Clear postgres session"); + } + pub async fn execute( &self, statement: &str, @@ -160,7 +156,8 @@ impl PostgresSession { self.client.execute(statement, params).await } - pub async fn execute_simple(&self, statement: &str) -> Result<(), Error> { + // execute statements seperated by semicolon + pub async fn execute_multiple(&self, statement: &str) -> Result<(), Error> { self.client.batch_execute(statement).await } @@ -178,9 +175,6 @@ impl PostgresSession { Ok(total_inserted) } - // TODO provide an optimized version using "COPY IN" instead of "INSERT INTO" (https://trello.com/c/69MlQU6u) - // pub async fn execute_copyin(...) - pub async fn execute_prepared( &self, statement: &str, @@ -280,7 +274,7 @@ impl PostgresWriteSession { SET SESSION maintenance_work_mem = '256MB'; "#; - session.execute_simple(statement).await.unwrap(); + session.execute_multiple(statement).await.unwrap(); Ok(Self { session: Arc::new(RwLock::new(session)), diff --git a/blockstore/src/block_stores/postgres/postgres_transaction.rs b/blockstore/src/block_stores/postgres/postgres_transaction.rs new file mode 100644 index 00000000..f3374906 --- /dev/null +++ b/blockstore/src/block_stores/postgres/postgres_transaction.rs @@ -0,0 +1,260 @@ +use futures_util::pin_mut; +use log::debug; +use solana_lite_rpc_core::structures::epoch::EpochRef; +use solana_lite_rpc_core::{encoding::BASE64, structures::produced_block::TransactionInfo}; +use solana_sdk::slot_history::Slot; +use solana_sdk::transaction::TransactionError; +use tokio::time::Instant; +use tokio_postgres::binary_copy::BinaryCopyInWriter; +use tokio_postgres::types::Type; +use tokio_postgres::CopyInSink; + +use super::postgres_epoch::*; +use super::postgres_session::*; + +#[derive(Debug)] +pub struct PostgresTransaction { + pub signature: String, + // TODO clarify + pub slot: i64, + pub err: Option, + pub cu_requested: Option, + pub prioritization_fees: Option, + pub cu_consumed: Option, + pub recent_blockhash: String, + pub message: String, +} + +impl PostgresTransaction { + pub fn new(value: &TransactionInfo, slot: Slot) -> Self { + Self { + signature: value.signature.clone(), + err: value + .err + .clone() + .map(|x| BASE64.serialize(&x).ok()) + .unwrap_or(None), + cu_requested: value.cu_requested.map(|x| x as i64), + prioritization_fees: value.prioritization_fees.map(|x| x as i64), + cu_consumed: value.cu_consumed.map(|x| x as i64), + recent_blockhash: value.recent_blockhash.clone(), + message: value.message.clone(), + slot: slot as i64, + } + } + + pub fn to_transaction_info(&self) -> TransactionInfo { + TransactionInfo { + signature: self.signature.clone(), + err: self + .err + .as_ref() + .and_then(|x| BASE64.deserialize::(x).ok()), + cu_requested: self.cu_requested.map(|x| x as u32), + prioritization_fees: self.prioritization_fees.map(|x| x as u64), + cu_consumed: self.cu_consumed.map(|x| x as u64), + recent_blockhash: self.recent_blockhash.clone(), + message: self.message.clone(), + // TODO readable_accounts etc. + readable_accounts: vec![], + writable_accounts: vec![], + is_vote: false, + } + } + + pub fn build_create_table_statement(epoch: EpochRef) -> String { + let schema = PostgresEpoch::build_schema_name(epoch); + format!( + r#" + -- lookup table; maps signatures to generated int8 transaction ids + -- no updates or deletes, only INSERTs + CREATE TABLE {schema}.transaction_ids( + transaction_id bigserial PRIMARY KEY WITH (FILLFACTOR=90), + -- never put sig on TOAST + signature text STORAGE PLAIN NOT NULL, + UNIQUE(signature) + ) WITH (FILLFACTOR=100); + + -- parameter 'schema' is something like 'rpc2a_epoch_592' + CREATE TABLE IF NOT EXISTS {schema}.transaction_blockdata( + -- transaction_id must exist in the transaction_ids table + transaction_id bigint PRIMARY KEY WITH (FILLFACTOR=90), + slot bigint NOT NULL, + cu_requested bigint, + prioritization_fees bigint, + cu_consumed bigint, + recent_blockhash text NOT NULL, + err text, + message text NOT NULL + -- model_transaction_blockdata + ) WITH (FILLFACTOR=90,TOAST_TUPLE_TARGET=128); + CREATE INDEX idx_slot ON {schema}.transaction_blockdata USING btree (slot) WITH (FILLFACTOR=90); + "#, + schema = schema + ) + } + + // removed the foreign key as it slows down inserts + pub fn build_foreign_key_statement(epoch: EpochRef) -> String { + let schema = PostgresEpoch::build_schema_name(epoch); + format!( + r#" + ALTER TABLE {schema}.transaction_blockdata + ADD CONSTRAINT fk_transactions FOREIGN KEY (slot) REFERENCES {schema}.blocks (slot); + "#, + schema = schema + ) + } + + pub async fn save_transactions_from_block( + postgres_session: PostgresSession, + epoch: EpochRef, + transactions: &[Self], + ) -> anyhow::Result<()> { + let schema = PostgresEpoch::build_schema_name(epoch); + + let statmement = r#" + CREATE TEMP TABLE IF NOT EXISTS transaction_raw_blockdata( + signature text, + slot bigint, + cu_requested bigint, + prioritization_fees bigint, + cu_consumed bigint, + recent_blockhash text STORAGE PLAIN, + err text STORAGE PLAIN, + message text STORAGE PLAIN + -- model_transaction_blockdata + ); + TRUNCATE transaction_raw_blockdata; + "#; + postgres_session.execute_multiple(statmement).await?; + + let statement = r#" + COPY transaction_raw_blockdata( + signature, + slot, + cu_requested, + prioritization_fees, + cu_consumed, + recent_blockhash, + err, + message + -- model_transaction_blockdata + ) FROM STDIN BINARY + "#; + let started_at = Instant::now(); + let sink: CopyInSink = postgres_session.copy_in(statement).await?; + let writer = BinaryCopyInWriter::new( + sink, + &[ + Type::TEXT, + Type::INT8, + Type::INT8, + Type::INT8, + Type::INT8, + Type::TEXT, + Type::TEXT, + Type::TEXT, // model_transaction_blockdata + ], + ); + pin_mut!(writer); + + for tx in transactions { + let PostgresTransaction { + signature, + slot, + cu_requested, + prioritization_fees, + cu_consumed, + err, + recent_blockhash, + message, + // model_transaction_blockdata + } = tx; + + writer + .as_mut() + .write(&[ + &signature, + &slot, + &cu_requested, + &prioritization_fees, + &cu_consumed, + &err, + &recent_blockhash, + &message, + // model_transaction_blockdata + ]) + .await?; + } + + let num_rows = writer.finish().await?; + debug!( + "inserted {} raw transaction data rows into temp table in {}ms", + num_rows, + started_at.elapsed().as_millis() + ); + + let statement = format!( + r#" + INSERT INTO {schema}.transaction_ids(signature) + SELECT signature from transaction_raw_blockdata + ON CONFLICT DO NOTHING + "#, + ); + let started_at = Instant::now(); + let num_rows = postgres_session.execute(statement.as_str(), &[]).await?; + debug!( + "inserted {} signatures into transaction_ids table in {}ms", + num_rows, + started_at.elapsed().as_millis() + ); + + let statement = format!( + r#" + INSERT INTO {schema}.transaction_blockdata + SELECT + ( SELECT transaction_id FROM {schema}.transaction_ids tx_lkup WHERE tx_lkup.signature = transaction_raw_blockdata.signature ), + slot, + cu_requested, + prioritization_fees, + cu_consumed, + err, + recent_blockhash, + message + -- model_transaction_blockdata + FROM transaction_raw_blockdata + "#, + schema = schema, + ); + let started_at = Instant::now(); + let num_rows = postgres_session.execute(statement.as_str(), &[]).await?; + debug!( + "inserted {} rows into transaction block table in {}ms", + num_rows, + started_at.elapsed().as_millis() + ); + + Ok(()) + } + + pub fn build_query_statement(epoch: EpochRef, slot: Slot) -> String { + format!( + r#" + SELECT + (SELECT signature FROM {schema}.transaction_ids tx_ids WHERE tx_ids.transaction_id = transaction_blockdata.transaction_id), + cu_requested, + prioritization_fees, + cu_consumed, + err, + recent_blockhash, + message + -- model_transaction_blockdata + FROM {schema}.transaction_blockdata + WHERE slot = {} + "#, + slot, + schema = PostgresEpoch::build_schema_name(epoch), + ) + } +} diff --git a/history/src/history.rs b/blockstore/src/history.rs similarity index 100% rename from history/src/history.rs rename to blockstore/src/history.rs diff --git a/history/src/lib.rs b/blockstore/src/lib.rs similarity index 68% rename from history/src/lib.rs rename to blockstore/src/lib.rs index 16475cfa..164ffbdc 100644 --- a/history/src/lib.rs +++ b/blockstore/src/lib.rs @@ -1,3 +1,2 @@ pub mod block_stores; pub mod history; -pub mod postgres; diff --git a/history/tests/blockstore_integration_tests.rs b/blockstore/tests/blockstore_integration_tests.rs similarity index 59% rename from history/tests/blockstore_integration_tests.rs rename to blockstore/tests/blockstore_integration_tests.rs index a99df33a..d51b6f66 100644 --- a/history/tests/blockstore_integration_tests.rs +++ b/blockstore/tests/blockstore_integration_tests.rs @@ -1,56 +1,93 @@ -use log::{debug, error, info, trace, warn}; -use solana_lite_rpc_cluster_endpoints::endpoint_stremers::EndpointStreaming; -use solana_lite_rpc_cluster_endpoints::json_rpc_subscription::create_json_rpc_polling_subscription; -use solana_lite_rpc_core::structures::epoch::EpochCache; +use log::{debug, error, info, warn}; +use solana_lite_rpc_blockstore::block_stores::postgres::postgres_block_store_query::PostgresQueryBlockStore; +use solana_lite_rpc_blockstore::block_stores::postgres::postgres_block_store_writer::PostgresBlockStore; +use solana_lite_rpc_blockstore::block_stores::postgres::PostgresSessionConfig; +use solana_lite_rpc_cluster_endpoints::grpc_multiplex::{ + create_grpc_multiplex_blocks_subscription, create_grpc_multiplex_slots_subscription, +}; +use solana_lite_rpc_cluster_endpoints::grpc_subscription_autoreconnect::{ + GrpcConnectionTimeouts, GrpcSourceConfig, +}; +use solana_lite_rpc_core::structures::epoch::{EpochCache, EpochRef}; use solana_lite_rpc_core::structures::produced_block::ProducedBlock; use solana_lite_rpc_core::structures::slot_notification::SlotNotification; use solana_lite_rpc_core::types::{BlockStream, SlotStream}; -use solana_lite_rpc_history::block_stores::postgres_block_store::PostgresBlockStore; -use solana_lite_rpc_history::postgres::postgres_config::PostgresSessionConfig; use solana_rpc_client::nonblocking::rpc_client::RpcClient; -use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentConfig; -use std::collections::{HashMap, HashSet}; -use std::process; use std::sync::Arc; use std::time::{Duration, Instant}; +use std::{env, process}; use tokio::sync::broadcast::error::RecvError; +use tokio::sync::broadcast::Receiver; use tokio::task::JoinHandle; use tokio::time::sleep; use tokio_util::sync::CancellationToken; use tracing_subscriber::EnvFilter; -// force ordered stream of blocks -const NUM_PARALLEL_TASKS: usize = 1; - const CHANNEL_SIZE_WARNING_THRESHOLD: usize = 5; #[ignore = "need to enable postgres"] #[tokio::test] async fn storage_test() { - // RUST_LOG=info,storage_integration_tests=debug,solana_lite_rpc_history=trace + // RUST_LOG=info,storage_integration_tests=debug,solana_lite_rpc_blockstore=trace tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env()) .init(); configure_panic_hook(); - let pg_session_config = PostgresSessionConfig::new_from_env().unwrap().unwrap(); + let no_pgconfig = env::var("PG_CONFIG").is_err(); + + let pg_session_config = if no_pgconfig { + info!("No PG_CONFIG env - use hartcoded defaults for integration test"); + PostgresSessionConfig::new_for_tests() + } else { + info!("PG_CONFIG env defined"); + PostgresSessionConfig::new_from_env().unwrap().unwrap() + }; let rpc_url = std::env::var("RPC_URL").expect("env var RPC_URL is mandatory"); let rpc_client = Arc::new(RpcClient::new(rpc_url)); - let (subscriptions, _cluster_endpoint_tasks) = - create_json_rpc_polling_subscription(rpc_client.clone(), NUM_PARALLEL_TASKS).unwrap(); + let grpc_addr = env::var("GRPC_ADDR").expect("need grpc url for green"); + let grpc_x_token = env::var("GRPC_X_TOKEN").ok(); - let EndpointStreaming { - blocks_notifier, - slot_notifier, - .. - } = subscriptions; + info!( + "Using grpc source on {} ({})", + grpc_addr, + grpc_x_token.is_some() + ); + + let timeouts = GrpcConnectionTimeouts { + connect_timeout: Duration::from_secs(5), + request_timeout: Duration::from_secs(5), + subscribe_timeout: Duration::from_secs(5), + }; + + let grpc_config = GrpcSourceConfig::new(grpc_addr, grpc_x_token, None, timeouts.clone()); + let grpc_sources = vec![grpc_config]; + + let (slot_notifier, _jh_multiplex_slotstream) = + create_grpc_multiplex_slots_subscription(grpc_sources.clone()); + + let (blocks_notifier, _jh_multiplex_blockstream) = + create_grpc_multiplex_blocks_subscription(grpc_sources); let (epoch_cache, _) = EpochCache::bootstrap_epoch(&rpc_client).await.unwrap(); + let block_storage_query = Arc::new( + PostgresQueryBlockStore::new(epoch_cache.clone(), pg_session_config.clone()).await, + ); + let block_storage = Arc::new(PostgresBlockStore::new(epoch_cache, pg_session_config).await); + let current_epoch = rpc_client.get_epoch_info().await.unwrap().epoch; + block_storage + .drop_epoch_schema(EpochRef::new(current_epoch)) + .await + .unwrap(); + block_storage + .drop_epoch_schema(EpochRef::new(current_epoch).get_next_epoch()) + .await + .unwrap(); let (jh1_1, first_init) = storage_prepare_epoch_schema(slot_notifier.resubscribe(), block_storage.clone()); @@ -59,11 +96,15 @@ async fn storage_test() { let jh1_2 = storage_listen(blocks_notifier.resubscribe(), block_storage.clone()); let jh2 = block_debug_listen(blocks_notifier.resubscribe()); - let jh3 = block_stream_assert_commitment_order(blocks_notifier.resubscribe()); + let jh3 = + spawn_client_to_blockstorage(block_storage_query.clone(), blocks_notifier.resubscribe()); drop(blocks_notifier); - info!("Run tests for some time ..."); - sleep(Duration::from_secs(20)).await; + let seconds_to_run = env::var("SECONDS_TO_RUN") + .map(|s| s.parse::().expect("a number")) + .unwrap_or(20); + info!("Run tests for some time ({} seconds) ...", seconds_to_run); + sleep(Duration::from_secs(seconds_to_run)).await; jh1_1.abort(); jh1_2.abort(); @@ -129,10 +170,18 @@ fn storage_listen( loop { match block_notifier.recv().await { Ok(block) => { + if block.commitment_config != CommitmentConfig::confirmed() { + debug!( + "Skip block {}@{} due to commitment level", + block.slot, block.commitment_config.commitment + ); + continue; + } let started = Instant::now(); debug!( - "Received block: {} with {} txs", + "Storage task received block: {}@{} with {} txs", block.slot, + block.commitment_config.commitment, block.transactions.len() ); @@ -147,12 +196,13 @@ fn storage_listen( // avoid backpressure here! - block_storage.write_block(&block).await.unwrap(); + block_storage.save_block(&block).await.unwrap(); // we should be faster than 150ms here let elapsed = started.elapsed(); debug!( - "Successfully stored block to postgres which took {:.2}ms - remaining {} queue elements", + "Successfully stored block {} to postgres which took {:.2}ms - remaining {} queue elements", + block.slot, elapsed.as_secs_f64() * 1000.0, block_notifier.len() ); if elapsed > Duration::from_millis(150) { @@ -192,12 +242,6 @@ fn storage_listen( }) } -#[derive(Debug, Clone)] -struct BlockDebugDetails { - pub blockhash: String, - pub block: ProducedBlock, -} - fn block_debug_listen(block_notifier: BlockStream) -> JoinHandle<()> { tokio::spawn(async move { let mut last_highest_slot_number = 0; @@ -207,7 +251,7 @@ fn block_debug_listen(block_notifier: BlockStream) -> JoinHandle<()> { match block_notifier.recv().await { Ok(block) => { debug!( - "Saw block: {} @ {} with {} txs", + "Saw block: {}@{} with {} txs", block.slot, block.commitment_config.commitment, block.transactions.len() @@ -243,129 +287,56 @@ fn block_debug_listen(block_notifier: BlockStream) -> JoinHandle<()> { }) } -/// inspect stream of blocks and check that the commitment transition from confirmed to finalized is correct -fn block_stream_assert_commitment_order(block_notifier: BlockStream) -> JoinHandle<()> { +fn spawn_client_to_blockstorage( + block_storage_query: Arc, + mut blocks_notifier: Receiver, +) -> JoinHandle<()> { tokio::spawn(async move { - let mut block_notifier = block_notifier; - - let mut confirmed_blocks_by_slot = HashMap::::new(); - let mut finalized_blocks = HashSet::::new(); - - let mut warmup_cutoff: Slot = 0; - let mut warmup_first_confirmed: Slot = 0; - + // note: no startup deloy loop { - match block_notifier.recv().await { - Ok(block) => { - if warmup_cutoff > 0 { - if block.slot < warmup_cutoff { - continue; + match blocks_notifier.recv().await { + Ok(ProducedBlock { + slot, + commitment_config, + .. + }) => { + if commitment_config != CommitmentConfig::confirmed() { + continue; + } + let confirmed_slot = slot; + // we cannot expect the most recent data + let query_slot = confirmed_slot - 3; + match block_storage_query.query_block(query_slot).await { + Ok(pb) => { + info!( + "Query result for slot {}: {}", + query_slot, + to_string_without_transactions(&pb) + ); + for tx in pb.transactions.iter().take(10) { + info!(" - tx: {}", tx.signature); + } + if pb.transactions.len() > 10 { + info!(" - ... and {} more", pb.transactions.len() - 10); + } } - - // check semantics and log/panic - inspect_this_block( - &mut confirmed_blocks_by_slot, - &mut finalized_blocks, - &block, - ); - } else { - trace!("Warming up {} ...", block.slot); - - if warmup_first_confirmed == 0 - && block.commitment_config == CommitmentConfig::confirmed() - { - warmup_first_confirmed = block.slot; - } - - if block.commitment_config == CommitmentConfig::finalized() - && block.slot >= warmup_first_confirmed - { - warmup_cutoff = block.slot + 32; - debug!("Warming done (slot {})", warmup_cutoff); + Err(err) => { + info!("Query did not return produced block: {}", err); } } - } // -- Ok - Err(RecvError::Lagged(missed_blocks)) => { - warn!( - "Could not keep up with producer - missed {} blocks", - missed_blocks - ); + // Query result for slot 245710738 + // Inserting block 245710741 to schema rpc2a_epoch_581 postgres took 1.52ms } - Err(other_err) => { - panic!("Error receiving block: {:?}", other_err); + Err(_err) => { + warn!("Aborting client"); + break; } } - - // ... + sleep(Duration::from_secs(1)).await; } }) } -fn inspect_this_block( - confirmed_blocks_by_slot: &mut HashMap, - finalized_blocks: &mut HashSet, - block: &ProducedBlock, -) { - if block.commitment_config == CommitmentConfig::confirmed() { - let prev_block = confirmed_blocks_by_slot.insert( - block.slot, - BlockDebugDetails { - blockhash: block.blockhash.clone(), - block: block.clone(), - }, - ); - // Assumption I: we never see the same confirmed block twice - assert!(prev_block.is_none(), "Must not see a confirmed block twice"); - } else if block.commitment_config == CommitmentConfig::finalized() { - let finalized_block = █ - let finalized_block_existed = finalized_blocks.insert(finalized_block.slot); - // Assumption II: we never see the same finalized block twice - assert!( - finalized_block_existed, - "Finalized block {} must NOT have been seen before", - finalized_block.slot - ); - let prev_block = confirmed_blocks_by_slot.get(&block.slot); - match prev_block { - Some(prev_block) => { - info!( - "Got finalized block {} with blockhash {} - prev confirmed was {}", - finalized_block.slot, finalized_block.blockhash, prev_block.blockhash - ); - // TODO is that correct? - // Assumption III: confirmed and finalized block can be matched by slot and have the same blockhash - assert_eq!( - finalized_block.blockhash, prev_block.blockhash, - "Must see the same blockhash for confirmed and finalized block" - ); - - debug!( - "confirmed: {:?}", - to_string_without_transactions(&prev_block.block) - ); - debug!( - "finalized: {:?}", - to_string_without_transactions(finalized_block) - ); - - // Assumption IV: block details do not change between confirmed and finalized - assert_eq!( - // normalized and compare - to_string_without_transactions(&prev_block.block) - .replace("commitment_config=confirmed", "commitment_config=IGNORE"), - to_string_without_transactions(finalized_block) - .replace("commitment_config=finalized", "commitment_config=IGNORE"), - "block tostring mismatch" - ) - } - None => { - // note at startup we might see some orphan finalized blocks before we see matching pairs of confirmed-finalized blocks - warn!("Must see a confirmed block before it is finalized (slot {}) - could be a warmup issue", finalized_block.slot); - } - } - } -} - fn to_string_without_transactions(produced_block: &ProducedBlock) -> String { format!( r#" diff --git a/history/tests/mod.rs b/blockstore/tests/mod.rs similarity index 100% rename from history/tests/mod.rs rename to blockstore/tests/mod.rs diff --git a/history/tests/multiple_strategy_block_store_tests.rs b/blockstore/tests/multiple_strategy_block_store_tests.rs similarity index 72% rename from history/tests/multiple_strategy_block_store_tests.rs rename to blockstore/tests/multiple_strategy_block_store_tests.rs index 790ca509..422180fc 100644 --- a/history/tests/multiple_strategy_block_store_tests.rs +++ b/blockstore/tests/multiple_strategy_block_store_tests.rs @@ -1,9 +1,10 @@ +use solana_lite_rpc_blockstore::block_stores::multiple_strategy_block_store::BlockStorageData; +use solana_lite_rpc_blockstore::block_stores::multiple_strategy_block_store::MultipleStrategyBlockStorage; +use solana_lite_rpc_blockstore::block_stores::postgres::postgres_block_store_query::PostgresQueryBlockStore; +use solana_lite_rpc_blockstore::block_stores::postgres::postgres_block_store_writer::PostgresBlockStore; +use solana_lite_rpc_blockstore::block_stores::postgres::PostgresSessionConfig; use solana_lite_rpc_core::structures::epoch::EpochCache; use solana_lite_rpc_core::structures::produced_block::ProducedBlock; -use solana_lite_rpc_history::block_stores::multiple_strategy_block_store::BlockStorageData; -use solana_lite_rpc_history::block_stores::multiple_strategy_block_store::MultipleStrategyBlockStorage; -use solana_lite_rpc_history::block_stores::postgres_block_store::PostgresBlockStore; -use solana_lite_rpc_history::postgres::postgres_config::PostgresSessionConfig; use solana_sdk::pubkey::Pubkey; use solana_sdk::reward_type::RewardType; use solana_sdk::{commitment_config::CommitmentConfig, hash::Hash}; @@ -37,21 +38,23 @@ async fn test_in_multiple_stategy_block_store() { let pg_session_config = PostgresSessionConfig::new_from_env().unwrap().unwrap(); let epoch_cache = EpochCache::new_for_tests(); - let persistent_store = PostgresBlockStore::new(epoch_cache.clone(), pg_session_config).await; + let persistent_store = + PostgresBlockStore::new(epoch_cache.clone(), pg_session_config.clone()).await; + let block_storage_query = PostgresQueryBlockStore::new(epoch_cache, pg_session_config).await; let multi_store = MultipleStrategyBlockStorage::new( - persistent_store.clone(), + block_storage_query.clone(), None, // not supported ); persistent_store.prepare_epoch_schema(1200).await.unwrap(); persistent_store - .write_block(&create_test_block(1200, CommitmentConfig::confirmed())) + .save_block(&create_test_block(1200, CommitmentConfig::confirmed())) .await .unwrap(); // span range of slots between those two persistent_store - .write_block(&create_test_block(1289, CommitmentConfig::confirmed())) + .save_block(&create_test_block(1289, CommitmentConfig::confirmed())) .await .unwrap(); diff --git a/history/examples/bench_postgres_lowlevel.rs b/history/examples/bench_postgres_lowlevel.rs deleted file mode 100644 index 2325ad25..00000000 --- a/history/examples/bench_postgres_lowlevel.rs +++ /dev/null @@ -1,133 +0,0 @@ -use bytes::Bytes; -use futures_util::pin_mut; -use log::info; -use solana_lite_rpc_history::postgres::postgres_config::PostgresSessionConfig; -use solana_lite_rpc_history::postgres::postgres_session::PostgresWriteSession; -use solana_sdk::blake3::Hash; -use solana_sdk::signature::Signature; -use std::sync::Arc; -use tokio::time::Instant; -use tokio_postgres::binary_copy::BinaryCopyInWriter; -use tokio_postgres::types::Type; -use tokio_postgres::CopyInSink; - -pub async fn copy_in(client: &tokio_postgres::Client) -> anyhow::Result<()> { - let statement = r#" - COPY transactions_copyin( - signature, slot, err, cu_requested, prioritization_fees, cu_consumed, recent_blockhash, message - ) FROM STDIN BINARY - "#; - - // BinaryCopyInWriter - // https://github.com/sfackler/rust-postgres/blob/master/tokio-postgres/tests/test/binary_copy.rs - let sink: CopyInSink = client.copy_in(statement).await.unwrap(); - - let sig = Signature::new_unique().to_string(); - let slot = 200_000_000_i64; - let err = None::<&str>; - let cu_requested = None::; - let prioritization_fees = None::; - let cu_consumed = None::; - let recent_blockhash = Hash::new(&[1u8; 32]).to_string(); - let message = ""; - - let started = Instant::now(); - let writer = BinaryCopyInWriter::new( - sink, - &[ - Type::TEXT, - Type::INT8, - Type::TEXT, - Type::INT8, - Type::INT8, - Type::INT8, - Type::TEXT, - Type::TEXT, - ], - ); - pin_mut!(writer); - - const COUNT: usize = 100000; - for i in 0..COUNT { - let slot_x = slot + i as i64; - writer - .as_mut() - .write(&[ - &sig, - &slot_x, - &err, - &cu_requested, - &prioritization_fees, - &cu_consumed, - &recent_blockhash, - &message, - ]) - .await - .unwrap(); - } - - writer.finish().await.unwrap(); - - info!( - "wrote {} rows in {:.2}ms", - COUNT, - started.elapsed().as_secs_f64() * 1000.0 - ); - - Ok(()) -} - -#[tokio::main] -pub async fn main() { - tracing_subscriber::fmt::init(); - - let pg_session_config = PostgresSessionConfig::new_for_tests(); - let session = PostgresWriteSession::new(pg_session_config) - .await - .unwrap() - .get_write_session() - .await; - - let ddl_statement = r#" - CREATE TEMP TABLE transactions_copyin - ( - signature text NOT NULL, - slot bigint NOT NULL, - err text , - cu_requested bigint, - prioritization_fees bigint, - cu_consumed bigint, - recent_blockhash text NOT NULL, - message text NOT NULL - ) - "#; - - session.execute(ddl_statement, &[]).await.unwrap(); - - let row_count_before = count_rows(session.client.clone()).await; - - let started = Instant::now(); - - copy_in(session.client.as_ref()).await.unwrap(); - - info!( - "copyin write rows in {:.2}ms", - started.elapsed().as_secs_f64() * 1000.0 - ); - - let row_count_after = count_rows(session.client.clone()).await; - info!("total: {}", row_count_after); - info!( - "inserted: {}", - row_count_after.saturating_sub(row_count_before) - ); -} - -async fn count_rows(client: Arc) -> i64 { - let row = client - .query_one("SELECT count(*) FROM transactions_copyin", &[]) - .await - .unwrap(); - - row.get::<_, i64>(0) -} diff --git a/history/examples/bench_postgres_read_load.rs b/history/examples/bench_postgres_read_load.rs deleted file mode 100644 index 08c8e1b1..00000000 --- a/history/examples/bench_postgres_read_load.rs +++ /dev/null @@ -1,119 +0,0 @@ -/// -/// test program to query postgres (get blocks) behind the tip of the slots -/// -use anyhow::bail; -use itertools::Itertools; -use log::info; -use solana_lite_rpc_cluster_endpoints::endpoint_stremers::EndpointStreaming; -use solana_lite_rpc_cluster_endpoints::json_rpc_subscription::create_json_rpc_polling_subscription; -use solana_lite_rpc_core::structures::epoch::{EpochCache, EpochRef}; -use solana_lite_rpc_core::types::SlotStream; -use solana_lite_rpc_history::postgres::postgres_epoch::PostgresEpoch; -use solana_lite_rpc_history::postgres::postgres_session::PostgresSession; -use solana_rpc_client::nonblocking::rpc_client::RpcClient; -use solana_sdk::clock::Slot; -use std::sync::Arc; -use tokio::sync::watch; -use tokio::sync::watch::Sender; -use tokio::task::JoinHandle; - -const NUM_PARALLEL_TASKS: usize = 1; - -#[tokio::main(flavor = "multi_thread", worker_threads = 16)] -pub async fn main() -> anyhow::Result<()> { - tracing_subscriber::fmt::init(); - - let sessions = vec![ - PostgresSession::new_from_env().await.unwrap(), - // PostgresSession::new_from_env().await.unwrap(), - // PostgresSession::new_from_env().await.unwrap(), - // PostgresSession::new_from_env().await.unwrap(), - // PostgresSession::new_from_env().await.unwrap(), - // PostgresSession::new_from_env().await.unwrap(), - // PostgresSession::new_from_env().await.unwrap(), - // PostgresSession::new_from_env().await.unwrap(), - ]; - - let rpc_url = std::env::var("RPC_URL").expect("env var RPC_URL is mandatory"); - let rpc_client = Arc::new(RpcClient::new(rpc_url)); - let epoch_data = EpochCache::bootstrap_epoch(&rpc_client).await.unwrap(); - let (subscriptions, _cluster_endpoint_tasks) = - create_json_rpc_polling_subscription(rpc_client.clone(), NUM_PARALLEL_TASKS).unwrap(); - - let EndpointStreaming { slot_notifier, .. } = subscriptions; - - let (tx, mut rx) = watch::channel(0); - - let _jh2 = slot_listener(slot_notifier.resubscribe(), tx); - - let mut ticker = tokio::time::interval(tokio::time::Duration::from_secs(1)); - loop { - let slot = *rx.borrow_and_update(); - if slot == 0 { - continue; - } - - info!("processed slot: {}", slot); - - // ATM we are 4000 slots behind ... - // TODO reduce 4000 to 0 - let slot: u64 = 234332620; // literpc3 - local - // let slot = 231541684; - let delta = 50 + rand::random::() % 100; - let query_slot = slot.saturating_sub(delta); - info!("query slot (-{}): {}", delta, query_slot); - - let (epoch_cache, _) = &epoch_data; - let epoch: EpochRef = epoch_cache.get_epoch_at_slot(query_slot).into(); - - let futures = (0..3) - .map(|i| { - let si = rand::random::() % sessions.len(); - let session = sessions[si].clone(); - query_database(session, epoch, query_slot + i) - // query_database_simple(session) - }) - .collect_vec(); - - futures_util::future::join_all(futures).await; - - ticker.tick().await; - let result = rx.changed().await; - if result.is_err() { - bail!("Watcher failed - sender was dropped!"); - } - } -} - -async fn query_database(postgres_session: PostgresSession, epoch: EpochRef, slot: Slot) { - let statement = format!( - r#" - SELECT min(slot),max(slot) FROM {schema}.transactions WHERE slot = {slot} - "#, - schema = PostgresEpoch::build_schema_name(epoch), - slot = slot, - ); - - let started = tokio::time::Instant::now(); - let result = postgres_session.query_list(&statement, &[]).await.unwrap(); - info!( - "num_rows: {} (took {:.2}ms)", - result.len(), - started.elapsed().as_secs_f64() * 1000.0 - ); -} - -fn slot_listener(slot_notifier: SlotStream, watch: Sender) -> JoinHandle<()> { - tokio::spawn(async move { - let mut slot_notifier = slot_notifier; - loop { - match slot_notifier.recv().await { - Ok(slot_update) => { - // info!("slot -> {}", slot_update.processed_slot); - watch.send(slot_update.processed_slot).unwrap(); - } - Err(_err) => {} - } - } - }) -} diff --git a/history/src/block_stores/mod.rs b/history/src/block_stores/mod.rs deleted file mode 100644 index 610a07ff..00000000 --- a/history/src/block_stores/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod faithful_block_store; -pub mod multiple_strategy_block_store; -pub mod postgres_block_store; diff --git a/history/src/postgres/mod.rs b/history/src/postgres/mod.rs deleted file mode 100644 index 4c124712..00000000 --- a/history/src/postgres/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -pub mod postgres_config; -pub mod postgres_session; - -pub mod postgres_block; -pub mod postgres_epoch; -pub mod postgres_transaction; diff --git a/history/src/postgres/postgres_transaction.rs b/history/src/postgres/postgres_transaction.rs deleted file mode 100644 index cef2d271..00000000 --- a/history/src/postgres/postgres_transaction.rs +++ /dev/null @@ -1,226 +0,0 @@ -use crate::postgres::postgres_epoch::PostgresEpoch; -use bytes::Bytes; -use futures_util::pin_mut; -use log::{trace, warn}; -use solana_lite_rpc_core::structures::epoch::EpochRef; -use solana_lite_rpc_core::{encoding::BASE64, structures::produced_block::TransactionInfo}; -use solana_sdk::slot_history::Slot; -use tokio_postgres::binary_copy::BinaryCopyInWriter; -use tokio_postgres::types::{ToSql, Type}; -use tokio_postgres::CopyInSink; - -use super::postgres_session::PostgresSession; - -#[derive(Debug)] -pub struct PostgresTransaction { - pub signature: String, - pub slot: i64, - pub err: Option, - pub cu_requested: Option, - pub prioritization_fees: Option, - pub cu_consumed: Option, - pub recent_blockhash: String, - pub message: String, -} - -impl PostgresTransaction { - pub fn new(value: &TransactionInfo, slot: Slot) -> Self { - Self { - signature: value.signature.clone(), - err: value - .err - .clone() - .map(|x| BASE64.serialize(&x).ok()) - .unwrap_or(None), - cu_requested: value.cu_requested.map(|x| x as i64), - prioritization_fees: value.prioritization_fees.map(|x| x as i64), - cu_consumed: value.cu_consumed.map(|x| x as i64), - recent_blockhash: value.recent_blockhash.clone(), - message: value.message.clone(), - slot: slot as i64, - } - } - - pub fn build_create_table_statement(epoch: EpochRef) -> String { - let schema = PostgresEpoch::build_schema_name(epoch); - format!( - r#" - CREATE TABLE IF NOT EXISTS {schema}.transactions ( - signature VARCHAR(88) NOT NULL, - slot BIGINT NOT NULL, - err TEXT, - cu_requested BIGINT, - prioritization_fees BIGINT, - cu_consumed BIGINT, - recent_blockhash TEXT NOT NULL, - message TEXT NOT NULL, - CONSTRAINT pk_transaction_sig PRIMARY KEY(signature) - ) WITH (FILLFACTOR=90); - CREATE INDEX idx_slot ON {schema}.transactions USING btree (slot) WITH (FILLFACTOR=90); - CLUSTER {schema}.transactions USING idx_slot; - "#, - schema = schema - ) - } - - // removed the foreign key as it slows down inserts - pub fn build_foreign_key_statement(epoch: EpochRef) -> String { - let schema = PostgresEpoch::build_schema_name(epoch); - format!( - r#" - ALTER TABLE {schema}.transactions - ADD CONSTRAINT fk_transactions FOREIGN KEY (slot) REFERENCES {schema}.blocks (slot); - "#, - schema = schema - ) - } - - // this version uses INSERT statements - pub async fn save_transaction_insert( - postgres_session: PostgresSession, - epoch: EpochRef, - slot: Slot, - transactions: &[Self], - ) -> anyhow::Result<()> { - const NB_ARGUMENTS: usize = 8; - let tx_count = transactions.len(); - let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NB_ARGUMENTS * tx_count); - - for tx in transactions.iter() { - let PostgresTransaction { - signature, - slot, - err, - cu_requested, - prioritization_fees, - cu_consumed, - recent_blockhash, - message, - } = tx; - - args.push(signature); - args.push(slot); - args.push(err); - args.push(cu_requested); - args.push(prioritization_fees); - args.push(cu_consumed); - args.push(recent_blockhash); - args.push(message); - } - - let values = PostgresSession::values_vecvec(NB_ARGUMENTS, tx_count, &[]); - let schema = PostgresEpoch::build_schema_name(epoch); - let statement = format!( - r#" - INSERT INTO {schema}.transactions - (signature, slot, err, cu_requested, prioritization_fees, cu_consumed, recent_blockhash, message) - VALUES {} - ON CONFLICT DO NOTHING - "#, - values, - schema = schema, - ); - - let inserted = postgres_session.execute_prepared(&statement, &args).await? as usize; - - if inserted < tx_count { - warn!("Some ({}) transactions already existed and where not updated of {} total in schema {schema}", - transactions.len().saturating_sub(inserted), transactions.len(), schema = schema); - } - - trace!( - "Inserted {} transactions chunk into epoch schema {} for block {}", - inserted, - schema, - slot - ); - - Ok(()) - } - - // this version uses "COPY IN" - pub async fn save_transaction_copyin( - postgres_session: PostgresSession, - epoch: EpochRef, - transactions: &[Self], - ) -> anyhow::Result { - let schema = PostgresEpoch::build_schema_name(epoch); - let statement = format!( - r#" - COPY {schema}.transactions( - signature, slot, err, cu_requested, prioritization_fees, cu_consumed, recent_blockhash, message - ) FROM STDIN BINARY - "#, - schema = schema, - ); - - // BinaryCopyInWriter - // https://github.com/sfackler/rust-postgres/blob/master/tokio-postgres/tests/test/binary_copy.rs - let sink: CopyInSink = postgres_session.copy_in(&statement).await.unwrap(); - - let writer = BinaryCopyInWriter::new( - sink, - &[ - Type::TEXT, - Type::INT8, - Type::TEXT, - Type::INT8, - Type::INT8, - Type::INT8, - Type::TEXT, - Type::TEXT, - ], - ); - pin_mut!(writer); - - for tx in transactions { - let PostgresTransaction { - signature, - slot, - err, - cu_requested, - prioritization_fees, - cu_consumed, - recent_blockhash, - message, - } = tx; - - writer - .as_mut() - .write(&[ - &signature, - &slot, - &err, - &cu_requested, - &prioritization_fees, - &cu_consumed, - &recent_blockhash, - &message, - ]) - .await - .unwrap(); - } - - writer.finish().await.unwrap(); - - Ok(true) - } - - pub async fn get( - postgres_session: PostgresSession, - schema: &String, - slot: Slot, - ) -> Vec { - let statement = format!( - r#" - SELECT signature, err, cu_requested, prioritization_fees, cu_consumed, recent_blockhash, message - FROM {schema}.transactions - WHERE slot = {} - "#, - slot, - schema = schema - ); - let _ = postgres_session.client.query(&statement, &[]).await; - todo!() - } -} diff --git a/lite-rpc/Cargo.toml b/lite-rpc/Cargo.toml index 876e5cd1..f47165a0 100644 --- a/lite-rpc/Cargo.toml +++ b/lite-rpc/Cargo.toml @@ -47,7 +47,7 @@ itertools = { workspace = true } solana-lite-rpc-core = { workspace = true } solana-lite-rpc-services = { workspace = true } solana-lite-rpc-cluster-endpoints = { workspace = true } -solana-lite-rpc-history = { workspace = true } +solana-lite-rpc-blockstore = { workspace = true } solana-lite-rpc-prioritization-fees = { workspace = true } [dev-dependencies] diff --git a/lite-rpc/src/bridge.rs b/lite-rpc/src/bridge.rs index b15d7b99..00f167ab 100644 --- a/lite-rpc/src/bridge.rs +++ b/lite-rpc/src/bridge.rs @@ -33,12 +33,12 @@ use solana_transaction_status::{TransactionStatus, UiConfirmedBlock}; use tokio::net::ToSocketAddrs; use tokio::sync::broadcast::error::RecvError::{Closed, Lagged}; +use solana_lite_rpc_blockstore::history::History; use solana_lite_rpc_core::{ encoding, stores::{block_information_store::BlockInformation, data_cache::DataCache, tx_store::TxProps}, AnyhowJoinHandle, }; -use solana_lite_rpc_history::history::History; use solana_lite_rpc_services::{ transaction_service::TransactionService, tx_sender::TXS_IN_CHANNEL, }; @@ -154,7 +154,7 @@ impl LiteBridge { #[jsonrpsee::core::async_trait] impl LiteRpcServer for LiteBridge { async fn get_block(&self, _slot: u64) -> crate::rpc::Result> { - // let block = self.history.block_storage.query_block(slot).await; + // let block = self.blockstore.block_storage.query_block(slot).await; // if block.is_ok() { // // TO DO Convert to UIConfirmed Block // Err(jsonrpsee::core::Error::HttpNotImplemented) @@ -162,7 +162,7 @@ impl LiteRpcServer for LiteBridge { // Ok(None) // } - // TODO get_block might deserve different implementation based on whether we serve from "history module" vs. from "send tx module" + // TODO get_block might deserve different implementation based on whether we serve from "blockstore module" vs. from "send tx module" todo!("get_block: decide where to look") } diff --git a/lite-rpc/src/cli.rs b/lite-rpc/src/cli.rs index 9c418e05..28f8b8e3 100644 --- a/lite-rpc/src/cli.rs +++ b/lite-rpc/src/cli.rs @@ -1,5 +1,6 @@ use std::env; +use crate::postgres_logger; use crate::{ DEFAULT_FANOUT_SIZE, DEFAULT_GRPC_ADDR, DEFAULT_RETRY_TIMEOUT, DEFAULT_RPC_ADDR, DEFAULT_WS_ADDR, MAX_RETRIES, @@ -7,7 +8,6 @@ use crate::{ use anyhow::Context; use clap::Parser; use dotenv::dotenv; -use solana_lite_rpc_history::postgres::postgres_config::PostgresSessionConfig; #[derive(Parser, Debug, Clone)] #[command(author, version, about, long_about = None)] @@ -66,7 +66,7 @@ pub struct Config { /// postgres config #[serde(default)] - pub postgres: Option, + pub postgres: Option, pub max_number_of_connection: Option, } @@ -179,7 +179,8 @@ impl Config { .map(|x| x.parse().ok()) .unwrap_or(config.max_number_of_connection); - config.postgres = PostgresSessionConfig::new_from_env()?.or(config.postgres); + config.postgres = + postgres_logger::PostgresSessionConfig::new_from_env()?.or(config.postgres); Ok(config) } diff --git a/lite-rpc/src/main.rs b/lite-rpc/src/main.rs index f8dbe294..7e4d4a91 100644 --- a/lite-rpc/src/main.rs +++ b/lite-rpc/src/main.rs @@ -9,6 +9,7 @@ use lite_rpc::postgres_logger::PostgresLogger; use lite_rpc::service_spawner::ServiceSpawner; use lite_rpc::{DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE, MAX_NB_OF_CONNECTIONS_WITH_LEADERS}; use log::{debug, info}; +use solana_lite_rpc_blockstore::history::History; use solana_lite_rpc_cluster_endpoints::endpoint_stremers::EndpointStreaming; use solana_lite_rpc_cluster_endpoints::grpc_subscription::create_grpc_subscription; use solana_lite_rpc_cluster_endpoints::grpc_subscription_autoreconnect::{ @@ -32,9 +33,6 @@ use solana_lite_rpc_core::structures::{ }; use solana_lite_rpc_core::types::BlockStream; use solana_lite_rpc_core::AnyhowJoinHandle; -use solana_lite_rpc_history::history::History; -use solana_lite_rpc_history::postgres::postgres_config::PostgresSessionConfig; -use solana_lite_rpc_history::postgres::postgres_session::PostgresSessionCache; use solana_lite_rpc_prioritization_fees::account_prio_service::AccountPrioService; use solana_lite_rpc_services::data_caching_service::DataCachingService; use solana_lite_rpc_services::quic_connection_utils::QuicConnectionParameters; @@ -43,6 +41,7 @@ use solana_lite_rpc_services::tpu_utils::tpu_service::{TpuService, TpuServiceCon use solana_lite_rpc_services::transaction_replayer::TransactionReplayer; use solana_lite_rpc_services::tx_sender::TxSender; +use lite_rpc::postgres_logger; use solana_lite_rpc_prioritization_fees::start_block_priofees_task; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_sdk::commitment_config::CommitmentConfig; @@ -82,7 +81,7 @@ async fn get_latest_block( } pub async fn start_postgres( - config: Option, + config: Option, ) -> anyhow::Result<(Option, AnyhowJoinHandle)> { let Some(config) = config else { return Ok(( @@ -96,7 +95,7 @@ pub async fn start_postgres( let (postgres_send, postgres_recv) = mpsc::unbounded_channel(); - let postgres_session_cache = PostgresSessionCache::new(config).await?; + let postgres_session_cache = postgres_logger::PostgresSessionCache::new(config).await?; let postgres = PostgresLogger::start(postgres_session_cache, postgres_recv); Ok((Some(postgres_send), postgres)) diff --git a/lite-rpc/src/postgres_logger.rs b/lite-rpc/src/postgres_logger/mod.rs similarity index 97% rename from lite-rpc/src/postgres_logger.rs rename to lite-rpc/src/postgres_logger/mod.rs index f9f67ab5..16664eb7 100644 --- a/lite-rpc/src/postgres_logger.rs +++ b/lite-rpc/src/postgres_logger/mod.rs @@ -1,3 +1,9 @@ +mod postgres_config; +mod postgres_session; + +pub use crate::postgres_logger::postgres_config::PostgresSessionConfig; +pub use crate::postgres_logger::postgres_session::{PostgresSession, PostgresSessionCache}; + use anyhow::bail; use chrono::{DateTime, Utc}; use futures::join; @@ -10,7 +16,6 @@ use solana_lite_rpc_core::{ }, AnyhowJoinHandle, }; -use solana_lite_rpc_history::postgres::postgres_session::{PostgresSession, PostgresSessionCache}; use std::time::Duration; use tokio_postgres::types::ToSql; @@ -248,7 +253,7 @@ impl PostgresLogger { tx_batch.append(&mut tx) } NotificationMsg::BlockNotificationMsg(_) => { - // ignore block storage as it has been moved to persistant history. + // ignore block storage as it has been moved to persistant blockstore. continue; } NotificationMsg::UpdateTransactionMsg(update) => { diff --git a/lite-rpc/src/postgres_logger/postgres_config.rs b/lite-rpc/src/postgres_logger/postgres_config.rs new file mode 100644 index 00000000..e424de87 --- /dev/null +++ b/lite-rpc/src/postgres_logger/postgres_config.rs @@ -0,0 +1,57 @@ +use anyhow::Context; +use std::env; +use tokio_postgres::config::SslMode; + +#[derive(serde::Deserialize, Debug, Clone)] +pub struct PostgresSessionConfig { + pub pg_config: String, + pub ssl: Option, +} + +#[derive(serde::Deserialize, Debug, Clone)] +pub struct PostgresSessionSslConfig { + pub ca_pem_b64: String, + pub client_pks_b64: String, + pub client_pks_pass: String, +} + +impl PostgresSessionConfig { + pub fn new_from_env() -> anyhow::Result> { + // pg not enabled + if env::var("PG_ENABLED").is_err() { + return Ok(None); + } + + let enable_pg = env::var("PG_ENABLED").context("PG_ENABLED")?; + if enable_pg != *"true" { + return Ok(None); + } + + let env_pg_config = env::var("PG_CONFIG").context("PG_CONFIG not found")?; + + let ssl_config = if env_pg_config + .parse::()? + .get_ssl_mode() + .eq(&SslMode::Disable) + { + None + } else { + let env_ca_pem_b64 = env::var("CA_PEM_B64").context("CA_PEM_B64 not found")?; + let env_client_pks_b64 = + env::var("CLIENT_PKS_B64").context("CLIENT_PKS_B64 not found")?; + let env_client_pks_pass = + env::var("CLIENT_PKS_PASS").context("CLIENT_PKS_PASS not found")?; + + Some(PostgresSessionSslConfig { + ca_pem_b64: env_ca_pem_b64, + client_pks_b64: env_client_pks_b64, + client_pks_pass: env_client_pks_pass, + }) + }; + + Ok(Some(Self { + pg_config: env_pg_config, + ssl: ssl_config, + })) + } +} diff --git a/lite-rpc/src/postgres_logger/postgres_session.rs b/lite-rpc/src/postgres_logger/postgres_session.rs new file mode 100644 index 00000000..bd4781b7 --- /dev/null +++ b/lite-rpc/src/postgres_logger/postgres_session.rs @@ -0,0 +1,148 @@ +use std::sync::Arc; + +use anyhow::Context; +use native_tls::{Certificate, Identity, TlsConnector}; +use postgres_native_tls::MakeTlsConnector; +use solana_lite_rpc_core::encoding::BinaryEncoding; +use tokio::sync::RwLock; +use tokio_postgres::{config::SslMode, tls::MakeTlsConnect, types::ToSql, Client, NoTls, Socket}; + +use super::postgres_config::{PostgresSessionConfig, PostgresSessionSslConfig}; + +#[derive(Clone)] +pub struct PostgresSession { + pub client: Arc, +} + +impl PostgresSession { + pub async fn new( + PostgresSessionConfig { pg_config, ssl }: PostgresSessionConfig, + ) -> anyhow::Result { + let pg_config = pg_config.parse::()?; + + let client = if let SslMode::Disable = pg_config.get_ssl_mode() { + Self::spawn_connection(pg_config, NoTls).await? + } else { + let PostgresSessionSslConfig { + ca_pem_b64, + client_pks_b64, + client_pks_pass, + } = ssl.as_ref().unwrap(); + + let ca_pem = BinaryEncoding::Base64 + .decode(ca_pem_b64) + .context("ca pem decode")?; + let client_pks = BinaryEncoding::Base64 + .decode(client_pks_b64) + .context("client pks decode")?; + + let connector = TlsConnector::builder() + .add_root_certificate(Certificate::from_pem(&ca_pem)?) + .identity(Identity::from_pkcs12(&client_pks, client_pks_pass).context("Identity")?) + .danger_accept_invalid_hostnames(true) + .danger_accept_invalid_certs(true) + .build()?; + + Self::spawn_connection(pg_config, MakeTlsConnector::new(connector)).await? + }; + + Ok(Self { + client: Arc::new(client), + }) + } + + async fn spawn_connection( + pg_config: tokio_postgres::Config, + connector: T, + ) -> anyhow::Result + where + T: MakeTlsConnect + Send + 'static, + >::Stream: Send, + { + let (client, connection) = pg_config + .connect(connector) + .await + .context("Connecting to Postgres failed")?; + + tokio::spawn(async move { + log::info!("Connecting to Postgres"); + + if let Err(err) = connection.await { + log::error!("Connection to Postgres broke {err:?}"); + return; + } + log::debug!("Postgres thread shutting down"); + }); + + Ok(client) + } + + pub async fn execute( + &self, + statement: &str, + params: &[&(dyn ToSql + Sync)], + ) -> Result { + self.client.execute(statement, params).await + } + + pub fn values_vecvec(args: usize, rows: usize, types: &[&str]) -> String { + let mut query = String::new(); + + Self::multiline_query(&mut query, args, rows, types); + + query + } + + pub fn multiline_query(query: &mut String, args: usize, rows: usize, types: &[&str]) { + let mut arg_index = 1usize; + for row in 0..rows { + query.push('('); + + for i in 0..args { + if row == 0 && !types.is_empty() { + query.push_str(&format!("(${arg_index})::{}", types[i])); + } else { + query.push_str(&format!("${arg_index}")); + } + arg_index += 1; + if i != (args - 1) { + query.push(','); + } + } + + query.push(')'); + + if row != (rows - 1) { + query.push(','); + } + } + } +} + +#[derive(Clone)] +pub struct PostgresSessionCache { + session: Arc>, + config: PostgresSessionConfig, +} + +impl PostgresSessionCache { + pub async fn new(config: PostgresSessionConfig) -> anyhow::Result { + let session = PostgresSession::new(config.clone()).await?; + Ok(Self { + session: Arc::new(RwLock::new(session)), + config, + }) + } + + pub async fn get_session(&self) -> anyhow::Result { + let session = self.session.read().await; + if session.client.is_closed() { + drop(session); + let session = PostgresSession::new(self.config.clone()).await?; + *self.session.write().await = session.clone(); + Ok(session) + } else { + Ok(session.clone()) + } + } +} diff --git a/migrations/localdev_integrationtest.sql b/migrations/localdev_integrationtest.sql index 9d263a59..79419958 100644 --- a/migrations/localdev_integrationtest.sql +++ b/migrations/localdev_integrationtest.sql @@ -3,3 +3,5 @@ CREATE DATABASE literpc_integrationtest_localdev; CREATE USER literpc_integrationtest; ALTER DATABASE literpc_integrationtest_localdev OWNER TO literpc_integrationtest; ALTER USER literpc_integrationtest PASSWORD 'youknowme'; + +-- now apply the permissions.sql diff --git a/migrations/permissions.sql b/migrations/permissions.sql index 61346e17..09bdaff9 100644 --- a/migrations/permissions.sql +++ b/migrations/permissions.sql @@ -15,3 +15,9 @@ ALTER DEFAULT PRIVILEGES IN SCHEMA lite_rpc GRANT SELECT ON TABLES TO r_literpc; -- required for block persistence (dynamic schemata - one per epoch) GRANT CONNECT, CREATE ON DATABASE my_literpc_database TO r_literpc; -- TODO adjust database name + +-- query path +CREATE ROLE ro_literpc; +GRANT ro_literpc TO literpc_app; + +GRANT CONNECT ON DATABASE literpc_integrationtest TO ro_literpc; -- TODO adjust database name diff --git a/stake_vote/src/bootstrap.rs b/stake_vote/src/bootstrap.rs index 3d73894b..4b0f3c07 100644 --- a/stake_vote/src/bootstrap.rs +++ b/stake_vote/src/bootstrap.rs @@ -293,7 +293,7 @@ fn process_bootstrap_event( let stake_history = crate::account::read_historystake_from_account(&history.data); if stake_history.is_none() { return BootsrapProcessResult::Error( - "Bootstrap error, can't read stake history from account data.".to_string(), + "Bootstrap error, can't read stake blockstore from account data.".to_string(), ); } diff --git a/stake_vote/src/epoch.rs b/stake_vote/src/epoch.rs index 0ee271d8..df2cca49 100644 --- a/stake_vote/src/epoch.rs +++ b/stake_vote/src/epoch.rs @@ -11,7 +11,7 @@ pub struct ScheduleEpochData { pub last_slot_in_epoch: u64, pub current_confirmed_slot: u64, pub new_rate_activation_epoch: Option, - //to start a new epoch and schedule, the new stake history + //to start a new epoch and schedule, the new stake blockstore //Must be notified and the end epoch slot notfied. //these field store each event. //If they're defined an new epoch and leader schedule can append. diff --git a/stake_vote/src/lib.rs b/stake_vote/src/lib.rs index b73eaf22..879b70e7 100644 --- a/stake_vote/src/lib.rs +++ b/stake_vote/src/lib.rs @@ -200,7 +200,7 @@ pub async fn start_stakes_and_votes_loop( } } } - None => log::error!("Bootstrap error, can't read stake history from geyser account data."), + None => log::error!("Bootstrap error, can't read stake blockstore from geyser account data."), } } }