Feature/block priofees (#274)

* boilerplate

* add dummy endpoint

* simple logic with highes block

* endpoint works

* remove cu calculus

* log get_latest_block startup timing

* websocket subscription basics

* WIP

* propagate fees via channel

* cleanup

* use broadcast sender

* proper handling of broadcast channel issues

* rename stuff

* move to dedicated crate

* cleanup deps

* move data definition

* hide private types

* percentile math

* code format

* integrated stats by cu

* add cu stats

* restart inline asserts

* warn about DashMap access

* switch to BTreeMap

* remove vote transactions from calculus

* return vec

* array

* flat format

* reformat

* split arrays

* rename keys

* enable experimental tag

* code format

* more logging

* reset log level

* rename websocket method to blockPrioritizationFeesSubscribe

* HACK: use processed blocks

* add is_vote_transaction

* do not fail if fees goes down

* udp message size example

* imrpove 100 handling

* simplify inital sort

* Revert "HACK: use processed blocks"

This reverts commit ff17d9475c.

* add percentile test case

* add per block cu_consumed+tx_count(nonvote)

* fix supid cleanup bug

* add TxAggregateStats

* add by_cu test

* add alternative cu percentlie impl

* cleanup cu tests

* add check for step

* saturating_sub

* remove useless file

* add is_vote to history crate

* fix fmt+clippy (nightly)

* clippy

* clippy

* clippy

* clippy
This commit is contained in:
Groovie | Mango 2024-01-25 10:29:14 +01:00 committed by GitHub
parent 705a08c2c6
commit 7171b524f0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 782 additions and 188 deletions

View File

@ -3,6 +3,7 @@ name: Deploy to Fly Staging
on:
push:
branches: [main]
tags: ['experimental/*']
env:
FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }}

19
Cargo.lock generated
View File

@ -2518,6 +2518,7 @@ dependencies = [
"quinn",
"serde",
"serde_json",
"solana-lite-rpc-block-priofees",
"solana-lite-rpc-cluster-endpoints",
"solana-lite-rpc-core",
"solana-lite-rpc-history",
@ -4281,6 +4282,24 @@ dependencies = [
"syn 2.0.48",
]
[[package]]
name = "solana-lite-rpc-block-priofees"
version = "0.2.3"
dependencies = [
"dashmap 5.5.3",
"itertools 0.10.5",
"jsonrpsee",
"lazy_static",
"log",
"prometheus",
"serde",
"serde_json",
"solana-lite-rpc-core",
"solana-sdk",
"tokio",
"tracing-subscriber",
]
[[package]]
name = "solana-lite-rpc-cluster-endpoints"
version = "0.2.3"

View File

@ -9,6 +9,7 @@ members = [
"quic-forward-proxy-integration-test",
"cluster-endpoints",
"history",
"block_priofees",
"bench"
]
@ -69,6 +70,7 @@ solana-lite-rpc-core = {path = "core", version="0.2.3"}
solana-lite-rpc-cluster-endpoints = {path = "cluster-endpoints", version="0.2.3"}
solana-lite-rpc-history = {path = "history", version="0.2.3"}
solana-lite-rpc-stakevote = {path = "stake_vote", version="0.2.3"}
solana-lite-rpc-block-priofees = {path = "block_priofees", version="0.2.3"}
async-trait = "0.1.68"
yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" }

20
block_priofees/Cargo.toml Normal file
View File

@ -0,0 +1,20 @@
[package]
name = "solana-lite-rpc-block-priofees"
version = "0.2.3"
edition = "2021"
description = "Expose priority fees stats per block via RPC and WebSocket"
[dependencies]
solana-lite-rpc-core = {workspace = true}
solana-sdk = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
log = { workspace = true }
itertools = { workspace = true }
dashmap = { workspace = true }
jsonrpsee = { workspace = true }
tracing-subscriber = { workspace = true }
prometheus = { workspace = true }
lazy_static = { workspace = true }
tokio = { version = "1.28.2", features = ["full"]}

View File

@ -0,0 +1,157 @@
use crate::rpc_data::{PrioFeesStats, PrioFeesUpdateMessage, TxAggregateStats};
use crate::stats_calculation::calculate_supp_percentiles;
use log::{error, info, trace, warn};
use solana_lite_rpc_core::types::BlockStream;
use solana_sdk::clock::Slot;
use std::collections::BTreeMap;
use std::sync::Arc;
use tokio::sync::broadcast::error::RecvError::{Closed, Lagged};
use tokio::sync::broadcast::Sender;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
// note: ATM only the latest slot (highest key) is used
const SLOTS_TO_RETAIN: u64 = 100;
/// put everything required to serve sync data calls here
#[derive(Clone)]
pub struct PrioFeeStore {
// store priofees stats for recently processed blocks up to CLEANUP_SLOTS_AFTER
recent: Arc<RwLock<BTreeMap<Slot, PrioFeesStats>>>,
}
pub struct PrioFeesService {
pub block_fees_store: PrioFeeStore,
// use .subscribe() to get a receiver
pub block_fees_stream: Sender<PrioFeesUpdateMessage>,
}
impl PrioFeesService {
pub async fn get_latest_priofees(&self) -> Option<(Slot, PrioFeesStats)> {
let lock = self.block_fees_store.recent.read().await;
let latest_in_store = lock.last_key_value();
latest_in_store.map(|x| (*x.0, x.1.clone()))
}
}
pub async fn start_block_priofees_task(
mut block_stream: BlockStream,
) -> (JoinHandle<()>, PrioFeesService) {
let recent_data = Arc::new(RwLock::new(BTreeMap::new()));
let store = PrioFeeStore {
recent: recent_data.clone(),
};
let (priofees_update_sender, _priofees_update_receiver) = tokio::sync::broadcast::channel(64);
let sender_to_return = priofees_update_sender.clone();
let jh_priofees_task = tokio::spawn(async move {
let sender = priofees_update_sender.clone();
'recv_loop: loop {
let block = block_stream.recv().await;
match block {
Ok(block) => {
if !block.commitment_config.is_processed() {
continue;
}
let processed_slot = block.slot;
{
// first do some cleanup
let mut lock = recent_data.write().await;
lock.retain(|slot, _| {
*slot > processed_slot.saturating_sub(SLOTS_TO_RETAIN)
});
}
let block_priofees = block
.transactions
.iter()
.filter(|tx| !tx.is_vote)
.map(|tx| {
(
tx.prioritization_fees.unwrap_or_default(),
tx.cu_consumed.unwrap_or_default(),
)
})
.collect::<Vec<(u64, u64)>>();
let priofees_percentiles = calculate_supp_percentiles(&block_priofees);
let total_tx_count = block.transactions.len() as u64;
let nonvote_tx_count =
block.transactions.iter().filter(|tx| !tx.is_vote).count() as u64;
let total_cu_consumed = block
.transactions
.iter()
.map(|tx| tx.cu_consumed.unwrap_or(0))
.sum::<u64>();
let nonvote_cu_consumed = block
.transactions
.iter()
.filter(|tx| !tx.is_vote)
.map(|tx| tx.cu_consumed.unwrap_or(0))
.sum::<u64>();
trace!("Got prio fees stats for processed block {}", processed_slot);
let priofees_stats = PrioFeesStats {
by_tx: priofees_percentiles.by_tx,
by_tx_percentiles: priofees_percentiles.by_tx_percentiles,
by_cu: priofees_percentiles.by_cu,
by_cu_percentiles: priofees_percentiles.by_cu_percentiles,
tx_count: TxAggregateStats {
total: total_tx_count,
nonvote: nonvote_tx_count,
},
cu_consumed: TxAggregateStats {
total: total_cu_consumed,
nonvote: nonvote_cu_consumed,
},
};
{
// first do some cleanup
let mut lock = recent_data.write().await;
lock.insert(processed_slot, priofees_stats.clone());
}
let msg = PrioFeesUpdateMessage {
slot: processed_slot,
priofees_stats,
};
let send_result = sender.send(msg);
match send_result {
Ok(n_subscribers) => {
trace!(
"sent priofees update message to {} subscribers (buffer={})",
n_subscribers,
sender.len()
);
}
Err(_) => {
trace!("no subscribers for priofees update message");
}
}
}
Err(Lagged(_lagged)) => {
warn!("channel error receiving block for priofees calculation - continue");
continue 'recv_loop;
}
Err(Closed) => {
error!("failed to receive block, sender closed - aborting");
break 'recv_loop;
}
}
}
info!("priofees task shutting down");
});
(
jh_priofees_task,
PrioFeesService {
block_fees_store: store,
block_fees_stream: sender_to_return,
},
)
}

View File

@ -0,0 +1,6 @@
mod block_priofees;
pub mod rpc_data;
mod stats_calculation;
pub use block_priofees::{start_block_priofees_task, PrioFeesService};

View File

@ -0,0 +1,39 @@
use jsonrpsee::core::Serialize;
use solana_sdk::clock::Slot;
use std::fmt::Display;
#[derive(Clone, Serialize, Debug)]
pub struct TxAggregateStats {
pub total: u64,
pub nonvote: u64,
}
#[derive(Clone, Serialize, Debug)]
pub struct PrioFeesStats {
pub by_tx: Vec<u64>,
pub by_tx_percentiles: Vec<f32>,
pub by_cu: Vec<u64>,
pub by_cu_percentiles: Vec<f32>,
pub tx_count: TxAggregateStats,
pub cu_consumed: TxAggregateStats,
}
#[derive(Clone, Serialize, Debug, Eq, PartialEq, Hash)]
pub struct FeePoint {
// percentile
pub p: u32,
// value of fees in lamports
pub v: u64,
}
impl Display for FeePoint {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "(p{}, {})", self.p, self.v)
}
}
#[derive(Clone, Debug)]
pub struct PrioFeesUpdateMessage {
pub slot: Slot,
pub priofees_stats: PrioFeesStats,
}

View File

@ -0,0 +1,195 @@
use crate::rpc_data::FeePoint;
use itertools::Itertools;
use std::collections::HashMap;
use std::iter::zip;
/// `quantile` function is the same as the median if q=50, the same as the minimum if q=0 and the same as the maximum if q=100.
pub fn calculate_supp_percentiles(
// Vec(prioritization_fees, cu_consumed)
prio_fees_in_block: &[(u64, u64)],
) -> Percentiles {
let prio_fees_in_block = if prio_fees_in_block.is_empty() {
// note: percentile for empty array is undefined
vec![(0, 0)]
} else {
// sort by prioritization fees
prio_fees_in_block
.iter()
.sorted_by_key(|(prio, _cu)| prio)
.cloned()
.collect_vec()
};
// get stats by transaction
let dist_fee_by_index: Vec<FeePoint> = (0..=100)
.step_by(5)
.map(|p| {
let prio_fee = {
let index = prio_fees_in_block.len() * p / 100;
let cap_index = index.min(prio_fees_in_block.len() - 1);
prio_fees_in_block[cap_index].0
};
FeePoint {
p: p as u32,
v: prio_fee,
}
})
.collect_vec();
// get stats by CU
let cu_sum: u64 = prio_fees_in_block.iter().map(|x| x.1).sum();
let mut dist_fee_by_cu: HashMap<i32, u64> = HashMap::new();
let mut agg: u64 = 0;
let mut p = 0;
let p_step = 5;
for (prio, cu) in &prio_fees_in_block {
agg += cu;
// write p's as long as agg beats the aggregated cu
while agg >= (cu_sum as f64 * p as f64 / 100.0) as u64 && p <= 100 {
dist_fee_by_cu.insert(p, *prio);
assert_ne!(p_step, 0, "zero steps might cause infinite loop");
p += p_step;
}
}
let dist_fee_by_cu: Vec<FeePoint> = dist_fee_by_cu
.into_iter()
.sorted_by_key(|(p, _)| *p)
.map(|(p, fees)| FeePoint {
p: p as u32,
v: fees,
})
.collect_vec();
Percentiles {
by_tx: dist_fee_by_index
.iter()
.map(|fee_point| fee_point.v)
.collect_vec(),
by_tx_percentiles: dist_fee_by_index
.iter()
.map(|fee_point| fee_point.p as f32 / 100.0)
.collect_vec(),
by_cu: dist_fee_by_cu
.iter()
.map(|fee_point| fee_point.v)
.collect_vec(),
by_cu_percentiles: dist_fee_by_cu
.iter()
.map(|fee_point| fee_point.p as f32 / 100.0)
.collect_vec(),
}
}
pub struct Percentiles {
pub by_tx: Vec<u64>,
pub by_tx_percentiles: Vec<f32>,
pub by_cu: Vec<u64>,
pub by_cu_percentiles: Vec<f32>,
}
#[allow(dead_code)]
impl Percentiles {
fn get_fees_by_tx(&self, percentile: f32) -> Option<u64> {
zip(&self.by_tx_percentiles, &self.by_tx)
.find(|(&p, _cu)| p == percentile)
.map(|(_p, &cu)| cu)
}
fn get_fees_cu(&self, percentile: f32) -> Option<u64> {
zip(&self.by_cu_percentiles, &self.by_cu)
.find(|(&p, _cu)| p == percentile)
.map(|(_p, &cu)| cu)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_calculate_supp_info() {
let prio_fees_in_block = vec![(2, 2), (4, 4), (5, 5), (3, 3), (1, 1)];
let supp_info = calculate_supp_percentiles(&prio_fees_in_block).by_tx;
assert_eq!(supp_info[0], 1);
assert_eq!(supp_info[10], 3);
assert_eq!(supp_info[15], 4);
assert_eq!(supp_info[18], 5);
assert_eq!(supp_info[20], 5);
}
#[test]
fn test_calculate_supp_info_by_cu() {
// total of 20000 CU where consumed
let prio_fees_in_block = vec![(100, 10000), (200, 10000)];
let Percentiles {
by_cu,
by_cu_percentiles,
..
} = calculate_supp_percentiles(&prio_fees_in_block);
assert_eq!(by_cu_percentiles[10], 0.5);
assert_eq!(by_cu[10], 100); // need more than 100 to beat 50% of the CU
assert_eq!(by_cu[11], 200); // need more than 200 to beat 55% of the CU
assert_eq!(by_cu[20], 200); // need more than 200 to beat 100% of the CU
}
#[test]
fn test_empty_array() {
let prio_fees_in_block = vec![];
let supp_info = calculate_supp_percentiles(&prio_fees_in_block).by_tx;
assert_eq!(supp_info[0], 0);
}
#[test]
fn test_zeros() {
let prio_fees_in_block = vec![(0, 0), (0, 0)];
let supp_info = calculate_supp_percentiles(&prio_fees_in_block).by_cu;
assert_eq!(supp_info[0], 0);
}
#[test]
fn test_statisticshowto() {
let prio_fees_in_block = vec![
(30, 1),
(33, 2),
(43, 3),
(53, 4),
(56, 5),
(67, 6),
(68, 7),
(72, 8),
];
let supp_info = calculate_supp_percentiles(&prio_fees_in_block);
assert_eq!(supp_info.by_tx[5], 43);
assert_eq!(supp_info.by_tx_percentiles[5], 0.25);
}
#[test]
fn test_simple_non_integer_index() {
// Messwerte: 3 5 5 6 7 7 8 10 10
// In diesem Fall lautet es also 5.
let values = vec![
(3, 1),
(5, 2),
(5, 3),
(6, 4),
(7, 5),
(7, 6),
(8, 7),
(10, 8),
(10, 9),
];
let supp_info = calculate_supp_percentiles(&values);
assert_eq!(supp_info.by_tx_percentiles[4], 0.20);
assert_eq!(supp_info.by_tx[5], 5);
}
#[test]
fn test_large_list() {
let prio_fees_in_block: Vec<(u64, u64)> = (0..1000).map(|x| (x, x)).collect();
let supp_info = calculate_supp_percentiles(&prio_fees_in_block);
assert_eq!(supp_info.by_tx[19], 950);
assert_eq!(supp_info.by_tx_percentiles[19], 0.95);
}
}

View File

@ -1,5 +1,5 @@
use crate::grpc_subscription::{
create_block_processing_task, create_slot_stream_task, map_block_update,
create_block_processing_task, create_slot_stream_task, from_grpc_block_update,
};
use anyhow::Context;
use futures::{Stream, StreamExt};
@ -29,7 +29,7 @@ impl FromYellowstoneExtractor for BlockExtractor {
fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)> {
match update.update_oneof {
Some(UpdateOneof::Block(update_block_message)) => {
let block = map_block_update(update_block_message, self.0);
let block = from_grpc_block_update(update_block_message, self.0);
Some((block.slot, block))
}
_ => None,
@ -53,7 +53,7 @@ impl FromYellowstoneExtractor for BlockMetaHashExtractor {
fn create_grpc_multiplex_processed_block_stream(
grpc_sources: &Vec<GrpcSourceConfig>,
confirmed_block_sender: UnboundedSender<ProducedBlock>,
processed_block_sender: UnboundedSender<ProducedBlock>,
) -> Vec<AnyhowJoinHandle> {
let commitment_config = CommitmentConfig::processed();
@ -82,8 +82,8 @@ fn create_grpc_multiplex_processed_block_stream(
&& (slots_processed.len() < MAX_SIZE / 2
|| slot > slots_processed.first().cloned().unwrap_or_default())
{
confirmed_block_sender
.send(map_block_update(block, commitment_config))
processed_block_sender
.send(from_grpc_block_update(block, commitment_config))
.context("Issue to send confirmed block")?;
slots_processed.insert(slot);
if slots_processed.len() > MAX_SIZE {

View File

@ -15,6 +15,8 @@ use solana_lite_rpc_core::{
structures::produced_block::{ProducedBlock, TransactionInfo},
AnyhowJoinHandle,
};
use solana_sdk::program_utils::limited_deserialize;
use solana_sdk::vote::instruction::VoteInstruction;
use solana_sdk::{
borsh0_10::try_from_slice_unchecked,
commitment_config::CommitmentConfig,
@ -39,7 +41,8 @@ use yellowstone_grpc_proto::prelude::{
SubscribeUpdateBlock,
};
pub fn map_block_update(
/// grpc version of ProducedBlock mapping
pub fn from_grpc_block_update(
block: SubscribeUpdateBlock,
commitment_config: CommitmentConfig,
) -> ProducedBlock {
@ -181,8 +184,17 @@ pub fn map_block_update(
})
.or(legacy_prioritization_fees);
let is_vote_transaction = message.instructions().iter().any(|i| {
i.program_id(message.static_account_keys())
.eq(&solana_sdk::vote::program::id())
&& limited_deserialize::<VoteInstruction>(&i.data)
.map(|vi| vi.is_simple_vote())
.unwrap_or(false)
});
Some(TransactionInfo {
signature: signature.to_string(),
is_vote: is_vote_transaction,
err,
cu_requested,
prioritization_fees,

View File

@ -1,5 +1,7 @@
use anyhow::{bail, Context};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_lite_rpc_core::encoding::BinaryEncoding;
use solana_lite_rpc_core::structures::produced_block::TransactionInfo;
use solana_lite_rpc_core::{
structures::{
produced_block::ProducedBlock,
@ -8,11 +10,20 @@ use solana_lite_rpc_core::{
AnyhowJoinHandle,
};
use solana_rpc_client_api::config::RpcBlockConfig;
use solana_sdk::borsh0_10::try_from_slice_unchecked;
use solana_sdk::compute_budget::ComputeBudgetInstruction;
use solana_sdk::program_utils::limited_deserialize;
use solana_sdk::reward_type::RewardType;
use solana_sdk::vote::instruction::VoteInstruction;
use solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel},
compute_budget,
slot_history::Slot,
};
use solana_transaction_status::{TransactionDetails, UiTransactionEncoding};
use solana_transaction_status::option_serializer::OptionSerializer;
use solana_transaction_status::{
TransactionDetails, UiConfirmedBlock, UiTransactionEncoding, UiTransactionStatusMeta,
};
use std::{sync::Arc, time::Duration};
use tokio::sync::broadcast::{Receiver, Sender};
@ -37,7 +48,7 @@ pub async fn process_block(
.await;
block
.ok()
.map(|block| ProducedBlock::from_ui_block(block, slot, commitment_config))
.map(|block| from_ui_block(block, slot, commitment_config))
}
pub fn poll_block(
@ -163,3 +174,156 @@ pub fn poll_block(
tasks
}
/// rpc version of ProducedBlock mapping
pub fn from_ui_block(
block: UiConfirmedBlock,
slot: Slot,
commitment_config: CommitmentConfig,
) -> ProducedBlock {
let block_height = block.block_height.unwrap_or_default();
let txs = block.transactions.unwrap_or_default();
let blockhash = block.blockhash;
let previous_blockhash = block.previous_blockhash;
let parent_slot = block.parent_slot;
let rewards = block.rewards.clone();
let txs = txs
.into_iter()
.filter_map(|tx| {
let Some(UiTransactionStatusMeta {
err,
compute_units_consumed,
..
}) = tx.meta
else {
// ignoring transaction
log::info!("Tx with no meta");
return None;
};
let Some(tx) = tx.transaction.decode() else {
// ignoring transaction
log::info!("Tx could not be decoded");
return None;
};
let signature = tx.signatures[0].to_string();
let cu_consumed = match compute_units_consumed {
OptionSerializer::Some(cu_consumed) => Some(cu_consumed),
_ => None,
};
let legacy_compute_budget = tx.message.instructions().iter().find_map(|i| {
if i.program_id(tx.message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
units,
additional_fee,
}) = try_from_slice_unchecked(i.data.as_slice())
{
return Some((units, additional_fee));
}
}
None
});
let mut cu_requested = tx.message.instructions().iter().find_map(|i| {
if i.program_id(tx.message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(limit);
}
}
None
});
let mut prioritization_fees = tx.message.instructions().iter().find_map(|i| {
if i.program_id(tx.message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(price);
}
}
None
});
if let Some((units, additional_fee)) = legacy_compute_budget {
cu_requested = Some(units);
if additional_fee > 0 {
prioritization_fees = Some(calc_prioritization_fees(units, additional_fee))
}
};
let blockhash = tx.message.recent_blockhash().to_string();
let message = BinaryEncoding::Base64.encode(tx.message.serialize());
let is_vote_transaction = tx.message.instructions().iter().any(|i| {
i.program_id(tx.message.static_account_keys())
.eq(&solana_sdk::vote::program::id())
&& limited_deserialize::<VoteInstruction>(&i.data)
.map(|vi| vi.is_simple_vote())
.unwrap_or(false)
});
Some(TransactionInfo {
signature,
is_vote: is_vote_transaction,
err,
cu_requested,
prioritization_fees,
cu_consumed,
recent_blockhash: blockhash,
message,
})
})
.collect();
let leader_id = if let Some(rewards) = block.rewards {
rewards
.iter()
.find(|reward| Some(RewardType::Fee) == reward.reward_type)
.map(|leader_reward| leader_reward.pubkey.clone())
} else {
None
};
let block_time = block.block_time.unwrap_or(0) as u64;
ProducedBlock {
transactions: txs,
block_height,
leader_id,
blockhash,
previous_blockhash,
parent_slot,
block_time,
slot,
commitment_config,
rewards,
}
}
#[inline]
fn calc_prioritization_fees(units: u32, additional_fee: u32) -> u64 {
(units as u64 * 1000) / additional_fee as u64
}
#[test]
fn overflow_u32() {
// value high enough to overflow u32 if multiplied by 1000
let units: u32 = 4_000_000_000;
let additional_fee: u32 = 100;
let prioritization_fees: u64 = calc_prioritization_fees(units, additional_fee);
assert_eq!(40_000_000_000, prioritization_fees);
}

View File

@ -1,20 +1,11 @@
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::{
borsh0_10::try_from_slice_unchecked,
compute_budget::{self, ComputeBudgetInstruction},
slot_history::Slot,
transaction::TransactionError,
};
use solana_transaction_status::{
option_serializer::OptionSerializer, Reward, RewardType, UiConfirmedBlock,
UiTransactionStatusMeta,
};
use crate::encoding::BinaryEncoding;
use solana_sdk::{slot_history::Slot, transaction::TransactionError};
use solana_transaction_status::Reward;
#[derive(Debug, Clone)]
pub struct TransactionInfo {
pub signature: String,
pub is_vote: bool,
pub err: Option<TransactionError>,
pub cu_requested: Option<u32>,
pub prioritization_fees: Option<u64>,
@ -39,134 +30,6 @@ pub struct ProducedBlock {
}
impl ProducedBlock {
pub fn from_ui_block(
block: UiConfirmedBlock,
slot: Slot,
commitment_config: CommitmentConfig,
) -> Self {
let block_height = block.block_height.unwrap_or_default();
let txs = block.transactions.unwrap_or_default();
let blockhash = block.blockhash;
let previous_blockhash = block.previous_blockhash;
let parent_slot = block.parent_slot;
let rewards = block.rewards.clone();
let txs = txs
.into_iter()
.filter_map(|tx| {
let Some(UiTransactionStatusMeta {
err,
compute_units_consumed,
..
}) = tx.meta
else {
// ignoring transaction
log::info!("Tx with no meta");
return None;
};
let Some(tx) = tx.transaction.decode() else {
// ignoring transaction
log::info!("Tx could not be decoded");
return None;
};
let signature = tx.signatures[0].to_string();
let cu_consumed = match compute_units_consumed {
OptionSerializer::Some(cu_consumed) => Some(cu_consumed),
_ => None,
};
let legacy_compute_budget = tx.message.instructions().iter().find_map(|i| {
if i.program_id(tx.message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
units,
additional_fee,
}) = try_from_slice_unchecked(i.data.as_slice())
{
return Some((units, additional_fee));
}
}
None
});
let mut cu_requested = tx.message.instructions().iter().find_map(|i| {
if i.program_id(tx.message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(limit);
}
}
None
});
let mut prioritization_fees = tx.message.instructions().iter().find_map(|i| {
if i.program_id(tx.message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(price);
}
}
None
});
if let Some((units, additional_fee)) = legacy_compute_budget {
cu_requested = Some(units);
if additional_fee > 0 {
prioritization_fees = Some(calc_prioritization_fees(units, additional_fee))
}
};
let blockhash = tx.message.recent_blockhash().to_string();
let message = BinaryEncoding::Base64.encode(tx.message.serialize());
Some(TransactionInfo {
signature,
err,
cu_requested,
prioritization_fees,
cu_consumed,
recent_blockhash: blockhash,
message,
})
})
.collect();
let leader_id = if let Some(rewards) = block.rewards {
rewards
.iter()
.find(|reward| Some(RewardType::Fee) == reward.reward_type)
.map(|leader_reward| leader_reward.pubkey.clone())
} else {
None
};
let block_time = block.block_time.unwrap_or(0) as u64;
ProducedBlock {
transactions: txs,
block_height,
leader_id,
blockhash,
previous_blockhash,
parent_slot,
block_time,
slot,
commitment_config,
rewards,
}
}
/// moving commitment level to finalized
pub fn to_finalized_block(&self) -> Self {
ProducedBlock {
@ -183,18 +46,3 @@ impl ProducedBlock {
}
}
}
#[inline]
fn calc_prioritization_fees(units: u32, additional_fee: u32) -> u64 {
(units as u64 * 1000) / additional_fee as u64
}
#[test]
fn overflow_u32() {
// value high enough to overflow u32 if multiplied by 1000
let units: u32 = 4_000_000_000;
let additional_fee: u32 = 100;
let prioritization_fees: u64 = calc_prioritization_fees(units, additional_fee);
assert_eq!(40_000_000_000, prioritization_fees);
}

View File

@ -1,5 +1,6 @@
use anyhow::bail;
use log::warn;
use solana_lite_rpc_cluster_endpoints::rpc_polling;
use solana_lite_rpc_core::structures::produced_block::ProducedBlock;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::config::RpcBlockConfig;
@ -41,7 +42,7 @@ impl FaithfulBlockStore {
.get_block_with_config(slot, faithful_config)
.await
{
Ok(block) => Ok(ProducedBlock::from_ui_block(
Ok(block) => Ok(rpc_polling::poll_blocks::from_ui_block(
block,
slot,
CommitmentConfig::finalized(),

View File

@ -26,7 +26,6 @@ pub struct BlockStorageData {
impl Deref for BlockStorageData {
type Target = ProducedBlock;
fn deref(&self) -> &Self::Target {
&self.block
}

View File

@ -551,6 +551,7 @@ mod tests {
fn create_test_tx(signature: Signature) -> TransactionInfo {
TransactionInfo {
signature: signature.to_string(),
is_vote: false,
err: None,
cu_requested: Some(40000),
prioritization_fees: Some(5000),

View File

@ -222,6 +222,7 @@ mod tests {
fn create_tx_info() -> TransactionInfo {
TransactionInfo {
signature: "signature".to_string(),
is_vote: false,
err: None,
cu_requested: None,
prioritization_fees: None,

View File

@ -47,6 +47,7 @@ solana-lite-rpc-core = { workspace = true }
solana-lite-rpc-services = { workspace = true }
solana-lite-rpc-cluster-endpoints = { workspace = true }
solana-lite-rpc-history = { workspace = true }
solana-lite-rpc-block-priofees = { workspace = true }
[dev-dependencies]
bench = { path = "../bench" }

View File

@ -1,24 +1,12 @@
use crate::{
configs::{IsBlockHashValidConfig, SendTransactionConfig},
jsonrpsee_subscrption_handler_sink::JsonRpseeSubscriptionHandlerSink,
rpc::LiteRpcServer,
};
use solana_sdk::epoch_info::EpochInfo;
use std::collections::HashMap;
use solana_lite_rpc_services::{
transaction_service::TransactionService, tx_sender::TXS_IN_CHANNEL,
};
use std::{str::FromStr, sync::Arc};
use anyhow::Context;
use jsonrpsee::{core::SubscriptionResult, server::ServerBuilder, PendingSubscriptionSink};
use prometheus::{opts, register_int_counter, IntCounter};
use solana_lite_rpc_core::{
encoding,
stores::{block_information_store::BlockInformation, data_cache::DataCache, tx_store::TxProps},
AnyhowJoinHandle,
use jsonrpsee::{
core::SubscriptionResult, server::ServerBuilder, DisconnectError, PendingSubscriptionSink,
};
use solana_lite_rpc_history::history::History;
use log::{debug, error, warn};
use prometheus::{opts, register_int_counter, IntCounter};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::{
config::{
@ -33,10 +21,29 @@ use solana_rpc_client_api::{
RpcVoteAccountStatus,
},
};
use solana_sdk::epoch_info::EpochInfo;
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, slot_history::Slot};
use solana_transaction_status::{TransactionStatus, UiConfirmedBlock};
use std::{str::FromStr, sync::Arc};
use tokio::net::ToSocketAddrs;
use tokio::sync::broadcast::error::RecvError::{Closed, Lagged};
use solana_lite_rpc_core::{
encoding,
stores::{block_information_store::BlockInformation, data_cache::DataCache, tx_store::TxProps},
AnyhowJoinHandle,
};
use solana_lite_rpc_history::history::History;
use solana_lite_rpc_services::{
transaction_service::TransactionService, tx_sender::TXS_IN_CHANNEL,
};
use crate::{
configs::{IsBlockHashValidConfig, SendTransactionConfig},
jsonrpsee_subscrption_handler_sink::JsonRpseeSubscriptionHandlerSink,
rpc::LiteRpcServer,
};
use solana_lite_rpc_block_priofees::rpc_data::{PrioFeesStats, PrioFeesUpdateMessage};
use solana_lite_rpc_block_priofees::PrioFeesService;
lazy_static::lazy_static! {
static ref RPC_SEND_TX: IntCounter =
@ -53,6 +60,8 @@ lazy_static::lazy_static! {
register_int_counter!(opts!("literpc_rpc_airdrop", "RPC call to request airdrop")).unwrap();
static ref RPC_SIGNATURE_SUBSCRIBE: IntCounter =
register_int_counter!(opts!("literpc_rpc_signature_subscribe", "RPC call to subscribe to signature")).unwrap();
static ref RPC_BLOCK_PRIOFEES_SUBSCRIBE: IntCounter =
register_int_counter!(opts!("literpc_rpc_block_priofees_subscribe", "RPC call to subscribe to block prio fees")).unwrap();
}
/// A bridge between clients and tpu
@ -63,6 +72,7 @@ pub struct LiteBridge {
rpc_client: Arc<RpcClient>,
transaction_service: TransactionService,
history: History,
prio_fees_service: PrioFeesService,
}
impl LiteBridge {
@ -71,12 +81,14 @@ impl LiteBridge {
data_cache: DataCache,
transaction_service: TransactionService,
history: History,
prio_fees_service: PrioFeesService,
) -> Self {
Self {
rpc_client,
data_cache,
transaction_service,
history,
prio_fees_service,
}
}
@ -510,4 +522,79 @@ impl LiteRpcServer for LiteBridge {
) -> crate::rpc::Result<RpcVoteAccountStatus> {
todo!()
}
async fn get_latest_block_priofees(&self) -> crate::rpc::Result<RpcResponse<PrioFeesStats>> {
match self.prio_fees_service.get_latest_priofees().await {
Some((confirmation_slot, priofees)) => {
return Ok(RpcResponse {
context: RpcResponseContext {
slot: confirmation_slot,
api_version: None,
},
value: priofees,
});
}
None => {
return Err(jsonrpsee::core::Error::Custom(
"No latest priofees stats available found".to_string(),
));
}
}
}
// use websocket-tungstenite-retry->examples/consume_literpc_priofees.rs to test
async fn latest_block_priofees_subscribe(
&self,
pending: PendingSubscriptionSink,
) -> SubscriptionResult {
let sink = pending.accept().await?;
let mut block_fees_stream = self.prio_fees_service.block_fees_stream.subscribe();
tokio::spawn(async move {
RPC_BLOCK_PRIOFEES_SUBSCRIBE.inc();
'recv_loop: loop {
match block_fees_stream.recv().await {
Ok(PrioFeesUpdateMessage {
slot: confirmation_slot,
priofees_stats,
}) => {
let result_message =
jsonrpsee::SubscriptionMessage::from_json(&RpcResponse {
context: RpcResponseContext {
slot: confirmation_slot,
api_version: None,
},
value: priofees_stats,
});
match sink.send(result_message.unwrap()).await {
Ok(()) => {
// success
continue 'recv_loop;
}
Err(DisconnectError(_subscription_message)) => {
debug!("Stopping subscription task on disconnect");
return;
}
};
}
Err(Lagged(lagged)) => {
// this usually happens if there is one "slow receiver", see https://docs.rs/tokio/latest/tokio/sync/broadcast/index.html#lagging
warn!(
"subscriber laggs some({}) priofees update messages - continue",
lagged
);
continue 'recv_loop;
}
Err(Closed) => {
error!("failed to receive block, sender closed - aborting");
return;
}
}
}
});
Ok(())
}
}

View File

@ -8,7 +8,7 @@ use lite_rpc::cli::Config;
use lite_rpc::postgres_logger::PostgresLogger;
use lite_rpc::service_spawner::ServiceSpawner;
use lite_rpc::DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE;
use log::info;
use log::{debug, info};
use solana_lite_rpc_cluster_endpoints::endpoint_stremers::EndpointStreaming;
use solana_lite_rpc_cluster_endpoints::grpc_subscription::create_grpc_subscription;
use solana_lite_rpc_cluster_endpoints::grpc_subscription_autoreconnect::{
@ -41,6 +41,8 @@ use solana_lite_rpc_services::tpu_utils::tpu_connection_path::TpuConnectionPath;
use solana_lite_rpc_services::tpu_utils::tpu_service::{TpuService, TpuServiceConfig};
use solana_lite_rpc_services::transaction_replayer::TransactionReplayer;
use solana_lite_rpc_services::tx_sender::TxSender;
use solana_lite_rpc_block_priofees::start_block_priofees_task;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::signature::Keypair;
@ -50,17 +52,32 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::RwLock;
use tokio::time::{timeout, Instant};
async fn get_latest_block(
mut block_stream: BlockStream,
commitment_config: CommitmentConfig,
) -> ProducedBlock {
while let Ok(block) = block_stream.recv().await {
if block.commitment_config == commitment_config {
return block;
let started = Instant::now();
loop {
match timeout(Duration::from_millis(500), block_stream.recv()).await {
Ok(Ok(block)) => {
if block.commitment_config == commitment_config {
return block;
}
}
Err(_elapsed) => {
debug!(
"waiting for latest block ({}) ... {:.02}ms",
commitment_config.commitment,
started.elapsed().as_secs_f32() * 1000.0
);
}
Ok(Err(_error)) => {
panic!("Did not recv blocks");
}
}
}
panic!("Did not recv blocks");
}
pub async fn start_postgres(
@ -140,12 +157,14 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
create_json_rpc_polling_subscription(rpc_client.clone(), NUM_PARALLEL_TASKS_DEFAULT)?
};
let EndpointStreaming {
// note: blocks_notifier will be dropped at some point
blocks_notifier,
cluster_info_notifier,
slot_notifier,
vote_account_notifier,
} = subscriptions;
info!("Waiting for first finalized block...");
let finalized_block =
get_latest_block(blocks_notifier.resubscribe(), CommitmentConfig::finalized()).await;
info!("Got finalized block: {:?}", finalized_block.slot);
@ -180,6 +199,10 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
cluster_info_notifier,
vote_account_notifier,
);
let (_block_priofees_task, block_priofees_service) =
start_block_priofees_task(blocks_notifier.resubscribe()).await;
drop(blocks_notifier);
let (notification_channel, postgres) = start_postgres(postgres).await?;
@ -237,6 +260,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
data_cache.clone(),
transaction_service,
history,
block_priofees_service,
)
.start(lite_rpc_http_addr, lite_rpc_ws_addr),
);
@ -250,6 +274,10 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
res = bridge_service => {
anyhow::bail!("Server {res:?}")
}
// allow it to fail
// res = block_priofees_task => {
// anyhow::bail!("Prio Fees Service {res:?}")
// }
res = postgres => {
anyhow::bail!("Postgres service {res:?}");
}

View File

@ -1,6 +1,7 @@
use crate::configs::{IsBlockHashValidConfig, SendTransactionConfig};
use jsonrpsee::core::SubscriptionResult;
use jsonrpsee::proc_macros::rpc;
use solana_lite_rpc_block_priofees::rpc_data::PrioFeesStats;
use solana_rpc_client_api::config::{
RpcBlockSubscribeConfig, RpcBlockSubscribeFilter, RpcBlocksConfigWrapper, RpcContextConfig,
RpcGetVoteAccountsConfig, RpcLeaderScheduleConfig, RpcProgramAccountsConfig,
@ -229,4 +230,16 @@ pub trait LiteRpc {
&self,
config: Option<RpcGetVoteAccountsConfig>,
) -> crate::rpc::Result<RpcVoteAccountStatus>;
// ***********************
// expose prio fees distribution per block
// (this is special method not available in solana rpc)
// ***********************
#[method(name = "getLatestBlockPrioFees")]
async fn get_latest_block_priofees(&self) -> crate::rpc::Result<RpcResponse<PrioFeesStats>>;
/// subscribe to prio fees distribution per block; uses confirmation level "confirmed"
#[subscription(name = "blockPrioritizationFeesSubscribe" => "blockPrioritizationFeesNotification", unsubscribe="blockPrioritizationFeesUnsubscribe", item=PrioFeesStats)]
async fn latest_block_priofees_subscribe(&self) -> SubscriptionResult;
}