This commit is contained in:
GroovieGermanikus 2024-11-05 17:01:47 +01:00
parent 18729e8f2a
commit 8574783f51
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
4 changed files with 133 additions and 8 deletions

View File

@ -0,0 +1,116 @@
use log::{info, warn};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use std::collections::HashMap;
use std::env;
use std::str::FromStr;
use std::time::SystemTime;
use base64::Engine;
use csv::ReaderBuilder;
use itertools::Itertools;
use solana_sdk::borsh0_10::try_from_slice_unchecked;
/// This file mocks the core model of the RPC server.
use solana_sdk::compute_budget;
use solana_sdk::compute_budget::ComputeBudgetInstruction;
use solana_sdk::hash::Hash;
use solana_sdk::instruction::CompiledInstruction;
use solana_sdk::message::v0::MessageAddressTableLookup;
use solana_sdk::message::{v0, MessageHeader, VersionedMessage};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature;
use solana_sdk::transaction::TransactionError;
use tokio::sync::broadcast;
use tokio::sync::mpsc::Receiver;
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, SubscribeUpdateSlot};
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
use geyser_grpc_connector::{
map_commitment_level, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message,
};
use tokio::time::{sleep, Duration};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterAccounts;
#[tokio::main]
pub async fn main() {
tracing_subscriber::fmt::init();
let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green");
let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok();
let (_foo, exit_notify) = broadcast::channel(1);
info!(
"Using gRPC source {} ({})",
grpc_addr_green,
grpc_x_token_green.is_some()
);
let timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(5),
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
};
let green_config =
GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
let (autoconnect_tx, mut transactions_rx) = tokio::sync::mpsc::channel(10);
let _tx_source_ah = create_geyser_autoconnection_task_with_mpsc(
green_config.clone(),
jupyter_trades(),
autoconnect_tx.clone(),
exit_notify,
);
loop {
let message = transactions_rx.recv().await;
match message {
Some(Message::GeyserSubscribeUpdate(update)) =>
match update.update_oneof {
Some(UpdateOneof::Transaction(update)) => {
let tx = update.transaction.unwrap();
let sig = Signature::try_from(tx.signature.as_slice()).unwrap();
info!("tx {}", sig);
}
_ => {} // FIXME
},
_ => {} // FIXME
}
}
}
fn jupyter_trades() -> SubscribeRequest {
let mut transaction_subs = HashMap::new();
transaction_subs.insert("client".to_string(),
SubscribeRequestFilterTransactions {
vote: Some(false),
failed: Some(false),
signature: None,
account_include: vec![
"JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4".to_string(),
],
account_exclude: vec![],
account_required: vec![],
});
SubscribeRequest {
transactions: transaction_subs,
ping: None,
commitment: Some(map_commitment_level(CommitmentConfig {
commitment: CommitmentLevel::Confirmed,
}) as i32),
..Default::default()
}
}

View File

@ -2,7 +2,7 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use futures::{Stream, StreamExt};
use log::{debug, info, trace};
use solana_sdk::clock::Slot;
use solana_sdk::clock::{Clock, Slot};
use solana_sdk::commitment_config::CommitmentConfig;
use std::env;
use std::pin::pin;
@ -13,6 +13,7 @@ use dashmap::DashMap;
use solana_account_decoder::parse_token::{parse_token, spl_token_ids, TokenAccountType, UiTokenAccount};
use solana_account_decoder::parse_token::UiAccountState::Initialized;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::sysvar::clock;
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
@ -100,6 +101,14 @@ pub async fn main() {
let account_pk = Pubkey::try_from(account.pubkey).unwrap();
let size = account.data.len() as u64;
info!("got account update: {} - {:?} - {} bytes",
update.slot, account_pk, account.data.len());
if clock::id() == account_pk {
let clock: Clock = bincode::deserialize(&account.data).unwrap();
info!("clock: {:#?}", clock);
}
info!("got account write: {}", account.write_version);
match account_write_first_timestamp.entry(account.write_version) {
Entry::Occupied(o) => {
@ -232,10 +241,10 @@ pub fn token_accounts() -> SubscribeRequest {
accounts_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
account: vec![],
// vec!["4DoNfFBfF7UokCC2FQzriy7yHK6DY6NVdYpuekQ5pRgg".to_string()],
owner:
spl_token_ids().iter().map(|pubkey| pubkey.to_string()).collect(),
// account: vec!["4DoNfFBfF7UokCC2FQzriy7yHK6DY6NVdYpuekQ5pRgg".to_string()],
account: vec![clock::id().to_string()],
owner: vec![],
// spl_token_ids().iter().map(|pubkey| pubkey.to_string()).collect(),
filters: vec![],
},
);
@ -266,8 +275,8 @@ pub fn token_accounts_finalized() -> SubscribeRequest {
accounts_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
account: vec![],
// vec!["4DoNfFBfF7UokCC2FQzriy7yHK6DY6NVdYpuekQ5pRgg".to_string()],
account:
vec!["4DoNfFBfF7UokCC2FQzriy7yHK6DY6NVdYpuekQ5pRgg".to_string()],
owner:
spl_token_ids().iter().map(|pubkey| pubkey.to_string()).collect(),
filters: vec![],

View File

@ -144,4 +144,3 @@ pub fn transaction_filter() -> SubscribeRequest {
ping: None,
}
}

View File

@ -231,6 +231,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
ConnectionState::FatalError(_attempt, reason) => match reason {
FatalErrorReason::DownstreamChannelClosed => {
warn!("downstream closed - aborting");
// TODO break 'main_loop instead of returning
return;
}
FatalErrorReason::ConfigurationError => {