Adding support to get transaction data

This commit is contained in:
Godmode Galactus 2024-05-16 11:33:34 +02:00
parent 8df262c459
commit adab055ae3
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
9 changed files with 163 additions and 19 deletions

View File

@ -1,7 +1,7 @@
use std::collections::HashSet;
use serde::{Deserialize, Serialize};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::{pubkey::Pubkey, signature::Signature};
use crate::message::Message;
@ -10,6 +10,7 @@ pub enum Filter {
Account(AccountFilter),
Slot,
BlockMeta,
Transaction(Signature),
}
impl Filter {
@ -18,6 +19,20 @@ impl Filter {
Filter::Account(account) => account.allows(message),
Filter::Slot => matches!(message, Message::SlotMsg(_)),
Filter::BlockMeta => matches!(message, Message::BlockMetaMsg(_)),
Filter::Transaction(signature) => {
match message {
Message::TransactionMsg(transaction) => {
if signature == &Signature::default() {
// subscibe to all the signatures
true
} else {
// just check the first signature
transaction.signatures.iter().any(|x| x == signature)
}
}
_ => false,
}
}
}
}
}

View File

@ -5,6 +5,7 @@ use crate::{
types::{
account::Account,
block_meta::{BlockMeta, SlotMeta},
transaction::Transaction,
},
};
@ -13,5 +14,6 @@ pub enum Message {
AccountMsg(Account),
SlotMsg(SlotMeta),
BlockMetaMsg(BlockMeta),
TransactionMsg(Transaction),
Filters(Vec<Filter>), // sent from client to server
}

View File

@ -14,7 +14,11 @@ use crate::quic::{
configure_server::ALPN_GEYSER_PROTOCOL_ID, skip_verification::ClientSkipServerVerification,
};
pub const DEFAULT_MAX_STREAMS: u32 = 1024;
pub const DEFAULT_MAX_STREAMS: u32 = 16384;
pub const DEFAULT_MAX_SLOT_BLOCKMETA_STREAMS: u32 = 24;
pub const DEFAULT_MAX_TRANSACTION_STREAMS: u32 = 1000;
pub const DEFAULT_MAX_ACCOUNT_STREAMS: u32 =
DEFAULT_MAX_STREAMS - DEFAULT_MAX_SLOT_BLOCKMETA_STREAMS - DEFAULT_MAX_TRANSACTION_STREAMS;
pub fn create_client_endpoint(
certificate: rustls::Certificate,

View File

@ -144,20 +144,26 @@ impl ConnectionManager {
for connection_data in lk.iter() {
if connection_data.filters.iter().any(|x| x.allows(&message)) {
let connection = connection_data.connection.clone();
let permit_result = connection_data
.streams_under_use
.clone()
.try_acquire_owned();
let Ok(permit) = permit_result else {
log::error!("Stream {} seems to be lagging", connection_data.id);
continue;
};
let message = message.clone();
let stream_under_use = connection_data.streams_under_use.clone();
let id = connection_data.id;
tokio::spawn(async move {
let _permit = permit;
let permit_result = stream_under_use.clone().try_acquire_owned();
let _permit = match permit_result {
Ok(permit) => permit,
Err(_) => {
// all permits are taken wait log warning and wait for permit
log::warn!("Stream {} seems to be lagging", id);
stream_under_use
.acquire_owned()
.await
.expect("Should aquire the permit")
}
};
for _ in 0..retry_count {
let send_stream = connection.open_uni().await;
match send_stream {

View File

@ -1,3 +1,4 @@
pub mod account;
pub mod block_meta;
pub mod slot_identifier;
pub mod transaction;

View File

@ -0,0 +1,34 @@
use serde::{Deserialize, Serialize};
use solana_sdk::{
message::v0::{LoadedAddresses, Message},
signature::Signature,
transaction::TransactionError,
transaction_context::TransactionReturnData,
};
use solana_transaction_status::{InnerInstructions, Rewards};
use super::slot_identifier::SlotIdentifier;
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct TransactionMeta {
pub error: Option<TransactionError>,
pub fee: u64,
pub pre_balances: Vec<u64>,
pub post_balances: Vec<u64>,
pub inner_instructions: Option<Vec<InnerInstructions>>,
pub log_messages: Option<Vec<String>>,
pub rewards: Option<Rewards>,
pub loaded_addresses: LoadedAddresses,
pub return_data: Option<TransactionReturnData>,
pub compute_units_consumed: Option<u64>,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct Transaction {
pub slot_identifier: SlotIdentifier,
pub signatures: Vec<Signature>,
pub message: Message,
pub is_vote: bool,
pub transasction_meta: TransactionMeta,
pub index: u64,
}

View File

@ -15,6 +15,7 @@ use quic_geyser_common::{
account::Account as GeyserAccount,
block_meta::{BlockMeta, SlotMeta},
slot_identifier::SlotIdentifier,
transaction::Transaction,
},
};
use quinn::{Endpoint, EndpointConfig, TokioRuntime};
@ -39,6 +40,7 @@ pub enum ChannelMessage {
Account(AccountData, Slot, bool),
Slot(u64, u64, CommitmentLevel),
BlockMeta(BlockMeta),
Transaction(Transaction),
}
#[derive(Debug)]
@ -123,6 +125,10 @@ impl QuicServer {
let message = Message::BlockMetaMsg(block_meta);
quic_connection_manager.dispach(message, retry_count).await;
}
ChannelMessage::Transaction(transaction) => {
let message = Message::TransactionMsg(transaction);
quic_connection_manager.dispach(message, retry_count).await;
}
}
}
log::error!("quic server dispatch task stopped");

View File

@ -2,10 +2,14 @@ use agave_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
ReplicaEntryInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus,
};
use quic_geyser_common::types::block_meta::BlockMeta;
use quic_geyser_common::types::{
block_meta::BlockMeta,
slot_identifier::SlotIdentifier,
transaction::{Transaction, TransactionMeta},
};
use solana_sdk::{
account::Account, clock::Slot, commitment_config::CommitmentLevel, pubkey::Pubkey,
signature::Keypair,
account::Account, clock::Slot, commitment_config::CommitmentLevel, message::v0::Message,
pubkey::Pubkey, signature::Keypair,
};
use crate::{
@ -102,10 +106,64 @@ impl GeyserPlugin for QuicGeyserPlugin {
fn notify_transaction(
&self,
_transaction: ReplicaTransactionInfoVersions,
_slot: Slot,
transaction: ReplicaTransactionInfoVersions,
slot: Slot,
) -> PluginResult<()> {
// Todo
let Some(quic_server) = &self.quic_server else {
return Ok(());
};
let ReplicaTransactionInfoVersions::V0_0_2(solana_transaction) = transaction else {
return Err(GeyserPluginError::TransactionUpdateError {
msg: "Unsupported transaction version".to_string(),
});
};
let message = solana_transaction.transaction.message();
let mut account_keys = vec![];
for index in 0.. {
let account = message.account_keys().get(index);
match account {
Some(account) => account_keys.push(*account),
None => break,
}
}
let v0_message = Message {
header: *message.header(),
account_keys,
recent_blockhash: *message.recent_blockhash(),
instructions: message.instructions().to_vec(),
address_table_lookups: message.message_address_table_lookups().to_vec(),
};
let status_meta = solana_transaction.transaction_status_meta;
let transaction = Transaction {
slot_identifier: SlotIdentifier { slot },
signatures: solana_transaction.transaction.signatures().to_vec(),
message: v0_message,
is_vote: solana_transaction.is_vote,
transasction_meta: TransactionMeta {
error: match &status_meta.status {
Ok(_) => None,
Err(e) => Some(e.clone()),
},
fee: status_meta.fee,
pre_balances: status_meta.pre_balances.clone(),
post_balances: status_meta.post_balances.clone(),
inner_instructions: status_meta.inner_instructions.clone(),
log_messages: status_meta.log_messages.clone(),
rewards: status_meta.rewards.clone(),
loaded_addresses: status_meta.loaded_addresses.clone(),
return_data: status_meta.return_data.clone(),
compute_units_consumed: status_meta.compute_units_consumed,
},
index: solana_transaction.index as u64,
};
let transaction_message = ChannelMessage::Transaction(transaction);
quic_server.send_message(transaction_message)?;
Ok(())
}

View File

@ -9,7 +9,11 @@ use futures::StreamExt;
use quic_geyser_client::{client::Client, DEFAULT_MAX_STREAM};
use quic_geyser_common::filters::{AccountFilter, Filter};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Keypair};
use solana_sdk::{
commitment_config::CommitmentConfig,
pubkey::Pubkey,
signature::{Keypair, Signature},
};
use tokio::pin;
pub mod cli;
@ -49,6 +53,7 @@ async fn main() {
let slot_notifications = Arc::new(AtomicU64::new(0));
let account_notification = Arc::new(AtomicU64::new(0));
let blockmeta_notifications = Arc::new(AtomicU64::new(0));
let transaction_notifications = Arc::new(AtomicU64::new(0));
let cluster_slot = Arc::new(AtomicU64::new(0));
let account_slot = Arc::new(AtomicU64::new(0));
@ -75,6 +80,7 @@ async fn main() {
let slot_notifications = slot_notifications.clone();
let account_notification = account_notification.clone();
let blockmeta_notifications = blockmeta_notifications.clone();
let transaction_notifications = transaction_notifications.clone();
let cluster_slot = cluster_slot.clone();
let account_slot = account_slot.clone();
@ -99,6 +105,10 @@ async fn main() {
" Blockmeta notified : {}",
blockmeta_notifications.swap(0, std::sync::atomic::Ordering::Relaxed)
);
println!(
" Transactions notified : {}",
transaction_notifications.swap(0, std::sync::atomic::Ordering::Relaxed)
);
println!(" Cluster Slots: {}, Account Slot: {}, Slot Notification slot: {}, BlockMeta slot: {} ", cluster_slot.load(std::sync::atomic::Ordering::Relaxed), account_slot.load(std::sync::atomic::Ordering::Relaxed), slot_slot.load(std::sync::atomic::Ordering::Relaxed), blockmeta_slot.load(std::sync::atomic::Ordering::Relaxed));
}
@ -114,6 +124,7 @@ async fn main() {
}),
Filter::Slot,
Filter::BlockMeta,
Filter::Transaction(Signature::default()),
])
.await
.unwrap();
@ -144,6 +155,13 @@ async fn main() {
blockmeta_notifications.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
blockmeta_slot.store(block_meta.slot, std::sync::atomic::Ordering::Relaxed);
}
quic_geyser_common::message::Message::TransactionMsg(tx) => {
log::debug!(
"got transaction notification: {}",
tx.signatures[0].to_string()
);
transaction_notifications.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
quic_geyser_common::message::Message::Filters(_) => todo!(),
}
}