From ff9c0a5e65c90a96e5eb7c1e3495c5434adec35f Mon Sep 17 00:00:00 2001 From: Maximilian Schneider Date: Thu, 6 Apr 2023 17:28:10 +0200 Subject: [PATCH] store timestamps in sqlana (#106) * save timestamps in sql * move some timestamps to block * store processed_local_time in blockstore * continuosly poll processed blocks and feed into blockstore * fetch local processing time from blockstore * make Blocks.local_time optional in schema --- Cargo.lock | 6 ++- Cargo.toml | 4 +- migrations/create.sql | 3 ++ src/block_store.rs | 81 ++++++++++++++++++++++++++-------- src/bridge.rs | 9 ++-- src/workers/block_listenser.rs | 32 +++++++++++++- src/workers/postgres.rs | 18 ++++++-- src/workers/tx_sender.rs | 3 ++ 8 files changed, 127 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e6da24f6..9e48f771 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -621,9 +621,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.23" +version = "0.4.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16b0a3d9ed01224b22057780a37bb8c5dbfe1be8ba48678e7bf57ec4b385411f" +checksum = "4e3c5919066adf22df73762e50cffcde3a758f2a848b113b586d1f86728b673b" dependencies = [ "iana-time-zone", "js-sys", @@ -2264,6 +2264,7 @@ dependencies = [ "bincode", "bs58", "bytes", + "chrono", "clap 4.1.6", "const_env", "dashmap", @@ -2893,6 +2894,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f028f05971fe20f512bcc679e2c10227e57809a3af86a7606304435bc8896cd6" dependencies = [ "bytes", + "chrono", "fallible-iterator", "postgres-protocol", ] diff --git a/Cargo.toml b/Cargo.toml index 1ca888c7..89b3b3c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,8 @@ dashmap = "5.4.0" const_env = "0.1.2" jsonrpsee = { version = "0.16.2", features = ["macros", "full"] } tracing-subscriber = "0.3.16" -tokio-postgres = "0.7.7" +chrono = "0.4.24" +tokio-postgres = { version = "0.7.7", features = ["with-chrono-0_4"] } native-tls = "0.2.11" postgres-native-tls = "0.5.0" prometheus = "0.13.3" @@ -85,4 +86,5 @@ dotenv = { workspace = true } async-channel = { workspace = true } quinn = { workspace = true } rustls = { workspace = true } +chrono = { workspace = true } async-recursion = { workspace = true } diff --git a/migrations/create.sql b/migrations/create.sql index 58df0085..2e33f472 100644 --- a/migrations/create.sql +++ b/migrations/create.sql @@ -5,6 +5,7 @@ CREATE TABLE lite_rpc.Txs ( signature CHAR(88) NOT NULL, recent_slot BIGINT NOT NULL, forwarded_slot BIGINT NOT NULL, + forwarded_local_time TIMESTAMP WITH TIME ZONE NOT NULL, processed_slot BIGINT, cu_consumed BIGINT, cu_requested BIGINT, @@ -16,6 +17,8 @@ CREATE TABLE lite_rpc.Blocks ( slot BIGINT NOT NULL PRIMARY KEY, leader_id BIGINT NOT NULL, parent_slot BIGINT NOT NULL + cluster_time TIMESTAMP WITH TIME ZONE NOT NULL, + local_time TIMESTAMP WITH TIME ZONE, ); CREATE TABLE lite_rpc.AccountAddrs ( diff --git a/src/block_store.rs b/src/block_store.rs index 1e2355ea..822e2d3a 100644 --- a/src/block_store.rs +++ b/src/block_store.rs @@ -2,11 +2,15 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Context; +use chrono::{DateTime, Utc}; use dashmap::DashMap; use log::info; use prometheus::core::GenericGauge; use prometheus::{opts, register_int_gauge}; +use serde_json::json; +use solana_client::rpc_request::RpcRequest; +use solana_client::rpc_response::{RpcBlockhash, Response}; use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcBlockConfig}; use solana_sdk::commitment_config::CommitmentConfig; use solana_transaction_status::TransactionDetails; @@ -21,11 +25,13 @@ pub struct BlockInformation { pub slot: u64, pub block_height: u64, pub instant: Instant, + pub processed_local_time: Option>, } #[derive(Clone)] pub struct BlockStore { blocks: Arc>, + latest_processed_block: Arc>, latest_confirmed_block: Arc>, latest_finalized_block: Arc>, last_add_block_metric: Arc>, @@ -33,12 +39,26 @@ pub struct BlockStore { impl BlockStore { pub async fn new(rpc_client: &RpcClient) -> anyhow::Result { - let (confirmed_blockhash, confirmed_block) = - Self::fetch_latest(rpc_client, CommitmentConfig::confirmed()).await?; + + let blocks = Arc::new(DashMap::new()); + + // fetch in order of least recency so the blockstore is as up to date as it can be on boot let (finalized_blockhash, finalized_block) = Self::fetch_latest(rpc_client, CommitmentConfig::finalized()).await?; + let (confirmed_blockhash, confirmed_block) = + Self::fetch_latest(rpc_client, CommitmentConfig::confirmed()).await?; + let (processed_blockhash, processed_block) = + Self::fetch_latest_processed(rpc_client).await?; + + blocks.insert(processed_blockhash.clone(),processed_block); + blocks.insert(confirmed_blockhash.clone(), confirmed_block); + blocks.insert(finalized_blockhash.clone(), finalized_block); Ok(Self { + latest_processed_block: Arc::new(RwLock::new(( + processed_blockhash.clone(), + processed_block + ))), latest_confirmed_block: Arc::new(RwLock::new(( confirmed_blockhash.clone(), confirmed_block, @@ -47,16 +67,31 @@ impl BlockStore { finalized_blockhash.clone(), finalized_block, ))), - blocks: Arc::new({ - let map = DashMap::new(); - map.insert(confirmed_blockhash, confirmed_block); - map.insert(finalized_blockhash, finalized_block); - map - }), + blocks, last_add_block_metric: Arc::new(RwLock::new(Instant::now())), }) } + pub async fn fetch_latest_processed( + rpc_client: &RpcClient, + ) -> anyhow::Result<(String, BlockInformation)> { + let response = rpc_client.send::>( + RpcRequest::GetLatestBlockhash, + json!([CommitmentConfig::processed()]), + ) + .await?; + + let processed_blockhash = response.value.blockhash; + let processed_block = BlockInformation { + slot: response.context.slot, + block_height: response.value.last_valid_block_height, + processed_local_time: Some(Utc::now()), + instant: Instant::now(), + }; + + Ok((processed_blockhash, processed_block)) + } + pub async fn fetch_latest( rpc_client: &RpcClient, commitment_config: CommitmentConfig, @@ -89,11 +124,12 @@ impl BlockStore { slot, block_height, instant: Instant::now(), + processed_local_time: None, }, )) } - pub async fn get_block_info(&self, blockhash: &str) -> Option { + pub fn get_block_info(&self, blockhash: &str) -> Option { let Some(info) = self.blocks.get(blockhash) else { return None; }; @@ -107,8 +143,10 @@ impl BlockStore { ) -> Arc> { if commitment_config.is_finalized() { self.latest_finalized_block.clone() - } else { + } else if commitment_config.is_confirmed() { self.latest_confirmed_block.clone() + } else { + self.latest_processed_block.clone() } } @@ -140,7 +178,7 @@ impl BlockStore { pub async fn add_block( &self, blockhash: String, - block_info: BlockInformation, + mut block_info: BlockInformation, commitment_config: CommitmentConfig, ) { // create context for add block metric @@ -149,13 +187,22 @@ impl BlockStore { *last_add_block_metric = Instant::now(); } + // override timestamp from previous value, so we always keep the earliest (processed) timestamp around + if let Some(processed_block) = self.get_block_info(&blockhash.clone()) { + block_info.processed_local_time = processed_block.processed_local_time; + } + + // save slot copy to avoid borrow issues + let slot = block_info.slot; + + // Write to block store first in order to prevent // any race condition i.e prevent some one to // ask the map what it doesn't have rn - let slot = block_info.slot; self.blocks.insert(blockhash.clone(), block_info); BLOCKS_IN_BLOCKSTORE.inc(); + // update latest block let latest_block = self.get_latest_block_arc(commitment_config); if slot > latest_block.read().await.1.slot { *latest_block.write().await = (blockhash, block_info); @@ -163,20 +210,18 @@ impl BlockStore { } pub async fn clean(&self, cleanup_duration: Duration) { + let latest_processed = self.get_latest_blockhash(CommitmentConfig::processed()).await; let latest_confirmed = self - .get_latest_blockhash(CommitmentConfig { - commitment: solana_sdk::commitment_config::CommitmentLevel::Confirmed, - }) + .get_latest_blockhash(CommitmentConfig::confirmed()) .await; let latest_finalized = self - .get_latest_blockhash(CommitmentConfig { - commitment: solana_sdk::commitment_config::CommitmentLevel::Confirmed, - }) + .get_latest_blockhash(CommitmentConfig::finalized()) .await; let before_length = self.blocks.len(); self.blocks.retain(|k, v| { v.instant.elapsed() < cleanup_duration + || k.eq(&latest_processed) || k.eq(&latest_confirmed) || k.eq(&latest_finalized) }); diff --git a/src/bridge.rs b/src/bridge.rs index bc8f1402..9c56731b 100644 --- a/src/bridge.rs +++ b/src/bridge.rs @@ -144,6 +144,8 @@ impl LiteBridge { .clone() .listen(CommitmentConfig::confirmed(), None); + let processed_block_listener = self.block_listner.clone().listen_processed(); + let cleaner = Cleaner::new( self.tx_sender.clone(), self.block_listner.clone(), @@ -187,6 +189,7 @@ impl LiteBridge { tx_sender, finalized_block_listener, confirmed_block_listener, + processed_block_listener, metrics_capture, prometheus_sync, cleaner, @@ -234,9 +237,9 @@ impl LiteRpcServer for LiteBridge { let Some(BlockInformation { slot, .. }) = self .block_store .get_block_info(&tx.get_recent_blockhash().to_string()) - .await else { - log::warn!("block"); - return Err(jsonrpsee::core::Error::Custom("Blockhash not found in block store".to_string())); + else { + log::warn!("block"); + return Err(jsonrpsee::core::Error::Custom("Blockhash not found in block store".to_string())); }; if let Err(e) = self diff --git a/src/workers/block_listenser.rs b/src/workers/block_listenser.rs index 1b2d286a..289a3f2a 100644 --- a/src/workers/block_listenser.rs +++ b/src/workers/block_listenser.rs @@ -4,6 +4,7 @@ use std::{ time::Duration, }; +use chrono::{TimeZone, Utc}; use dashmap::DashMap; use jsonrpsee::SubscriptionSink; use log::{info, trace, warn}; @@ -161,6 +162,7 @@ impl BlockListener { CommitmentLevel::Finalized => TransactionConfirmationStatus::Finalized, _ => TransactionConfirmationStatus::Confirmed, }; + let timer = if commitment_config.is_finalized() { TT_RECV_FIN_BLOCK.start_timer() @@ -183,6 +185,7 @@ impl BlockListener { }, ) .await?; + timer.observe_duration(); if commitment_config.is_finalized() { @@ -211,6 +214,7 @@ impl BlockListener { slot, block_height, instant: Instant::now(), + processed_local_time: None, }, commitment_config, ) @@ -317,11 +321,21 @@ impl BlockListener { let _leader_id = &leader_reward.pubkey; + // TODO insert if not exists leader_id into accountaddrs + + // fetch cluster time from rpc + let block_time = self.rpc_client.get_block_time(slot).await?; + + // fetch local time from blockstore + let block_info = self.block_store.get_block_info(&blockhash); + postgres .send(PostgresMsg::PostgresBlock(PostgresBlock { slot: slot as i64, - leader_id: 0, //FIX: + leader_id: 0, // TODO: lookup leader parent_slot: parent_slot as i64, + cluster_time: Utc.timestamp_millis_opt(block_time).unwrap(), + local_time: block_info.map(|b| b.processed_local_time).flatten(), })) .expect("Error sending block to postgres service"); @@ -473,6 +487,22 @@ impl BlockListener { }) } + // continuosly poll processed blocks and feed into blockstore + pub fn listen_processed(self) -> JoinHandle> { + let rpc_client = self.rpc_client.clone(); + let block_store = self.block_store.clone(); + + tokio::spawn(async move { + info!("processed block listner started"); + + loop { + let (processed_blockhash, processed_block) = BlockStore::fetch_latest_processed(rpc_client.as_ref()).await?; + block_store.add_block(processed_blockhash, processed_block, CommitmentConfig::processed()).await; + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + } + }) + } + pub fn clean(&self, ttl_duration: Duration) { let length_before = self.signature_subscribers.len(); self.signature_subscribers diff --git a/src/workers/postgres.rs b/src/workers/postgres.rs index 4af9869b..555d1943 100644 --- a/src/workers/postgres.rs +++ b/src/workers/postgres.rs @@ -1,4 +1,5 @@ use anyhow::{bail, Context}; +use chrono::{DateTime, Utc}; use futures::{future::join_all, join}; use log::{info, warn}; use postgres_native_tls::MakeTlsConnector; @@ -29,6 +30,7 @@ pub struct PostgresTx { pub signature: String, pub recent_slot: i64, pub forwarded_slot: i64, + pub forwarded_local_time: DateTime, pub processed_slot: Option, pub cu_consumed: Option, pub cu_requested: Option, @@ -47,6 +49,8 @@ pub struct PostgresBlock { pub slot: i64, pub leader_id: i64, pub parent_slot: i64, + pub cluster_time: DateTime, + pub local_time: Option>, } #[derive(Debug)] @@ -109,8 +113,8 @@ impl PostgresSession { .prepare( r#" UPDATE lite_rpc.txs - SET processed_slot = $1, cu_consumed = $2, cu_requested = $3 - WHERE signature = $4 + SET processed_slot = $1, processed_cluster_time = $2, processed_local_time = $3, cu_consumed = $4, cu_requested = $5, + WHERE signature = $6 "#, ) .await?; @@ -178,6 +182,7 @@ impl PostgresSession { signature, recent_slot, forwarded_slot, + forwarded_local_time, processed_slot, cu_consumed, cu_requested, @@ -187,6 +192,7 @@ impl PostgresSession { args.push(signature); args.push(recent_slot); args.push(forwarded_slot); + args.push(forwarded_local_time); args.push(processed_slot); args.push(cu_consumed); args.push(cu_requested); @@ -196,7 +202,7 @@ impl PostgresSession { let mut query = String::from( r#" INSERT INTO lite_rpc.Txs - (signature, recent_slot, forwarded_slot, processed_slot, cu_consumed, cu_requested, quic_response) + (signature, recent_slot, forwarded_slot, forwarded_local_time, processed_slot, cu_consumed, cu_requested, quic_response) VALUES "#, ); @@ -222,17 +228,21 @@ impl PostgresSession { slot, leader_id, parent_slot, + cluster_time, + local_time, } = block; args.push(slot); args.push(leader_id); args.push(parent_slot); + args.push(cluster_time); + args.push(local_time); } let mut query = String::from( r#" INSERT INTO lite_rpc.Blocks - (slot, leader_id, parent_slot) + (slot, leader_id, parent_slot, cluster_time, local_time) VALUES "#, ); diff --git a/src/workers/tx_sender.rs b/src/workers/tx_sender.rs index 59ff8ea7..86e0ca8f 100644 --- a/src/workers/tx_sender.rs +++ b/src/workers/tx_sender.rs @@ -4,6 +4,7 @@ use std::{ }; use anyhow::bail; +use chrono::Utc; use dashmap::DashMap; use log::{info, trace, warn}; @@ -101,6 +102,7 @@ impl TxSender { } let forwarded_slot = tpu_client.get_estimated_slot(); + let forwarded_local_time = Utc::now(); let mut quic_responses = vec![]; for tx in txs { @@ -125,6 +127,7 @@ impl TxSender { signature: sig.clone(), recent_slot: *recent_slot as i64, forwarded_slot: forwarded_slot as i64, + forwarded_local_time, processed_slot: None, cu_consumed: None, cu_requested: None,