Compare commits

...

14 Commits

Author SHA1 Message Date
GroovieGermanikus f18f5a67bc
move release info to wiki 2024-08-16 14:27:54 +02:00
GroovieGermanikus d46aba1946
fmt+clippy 2024-08-13 23:08:17 +02:00
GroovieGermanikus e1307e2c5e
comment 2024-08-13 23:06:21 +02:00
GroovieGermanikus cc481539e3
compile 2024-08-13 23:04:10 +02:00
GroovieGermanikus 89c05f535c
subscribe to accounts with respective level 2024-08-13 23:03:18 +02:00
GroovieGermanikus 977c1c85c8
dump processed accounts 2024-08-13 22:54:10 +02:00
GroovieGermanikus e23c1aeadc
run dump for 5 hours 2024-08-13 16:49:36 +02:00
GroovieGermanikus 2fb68e6716
parse example 2024-08-13 16:21:21 +02:00
GroovieGermanikus 29b68f42e9
add tool to dump slot stream 2024-08-13 15:42:21 +02:00
GroovieGermanikus 0ef4317a1d
wire up slot stream 2024-08-13 15:21:55 +02:00
GroovieGermanikus 288111e674
make SubscribeRequest forward-compatible 2024-05-08 11:21:32 +02:00
GroovieGermanikus 0a5aaf4846
log size 2024-04-18 17:39:59 +02:00
GroovieGermanikus 458f82725a
log account key + block size 2024-04-18 17:30:23 +02:00
GroovieGermanikus 793de099d3
add version info 2024-04-16 15:59:52 +02:00
4 changed files with 334 additions and 43 deletions

View File

@ -15,6 +15,9 @@ The implementation is based on _Rust Futures_.
Please open an issue if you have any questions or suggestions -> [New Issue](https://github.com/blockworks-foundation/geyser-grpc-connector/issues/new).
## Versions
These are the currently maintained versions of the library: [see Wiki](https://github.com/blockworks-foundation/geyser-grpc-connector/wiki)
## Installation and Usage
```cargo add geyser-grpc-connector ```

View File

@ -0,0 +1,309 @@
///
/// subsribe to grpc in multiple ways:
/// - all slots and processed accounts in one subscription
/// - only processed accounts
/// - only confirmed accounts
/// - only finalized accounts
///
/// we want to see if there is a difference in timing of "processed accounts" in the mix with slot vs "only processed accounts"
use log::{info, warn};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use std::collections::HashMap;
use std::env;
use std::str::FromStr;
use std::time::SystemTime;
use base64::Engine;
use csv::ReaderBuilder;
use itertools::Itertools;
use solana_sdk::borsh0_10::try_from_slice_unchecked;
/// This file mocks the core model of the RPC server.
use solana_sdk::compute_budget;
use solana_sdk::compute_budget::ComputeBudgetInstruction;
use solana_sdk::hash::Hash;
use solana_sdk::instruction::CompiledInstruction;
use solana_sdk::message::v0::MessageAddressTableLookup;
use solana_sdk::message::{v0, MessageHeader, VersionedMessage};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature;
use solana_sdk::transaction::TransactionError;
use tokio::sync::mpsc::Receiver;
use yellowstone_grpc_proto::geyser::{
SubscribeRequest, SubscribeRequestFilterSlots, SubscribeUpdateSlot,
};
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
use geyser_grpc_connector::{
map_commitment_level, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message,
};
use tokio::time::{sleep, Duration};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterAccounts;
fn start_all_slots_and_processed_accounts_consumer(mut slots_channel: Receiver<Message>) {
tokio::spawn(async move {
loop {
match slots_channel.recv().await {
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
Some(UpdateOneof::Slot(update_slot)) => {
let since_epoch_ms = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis();
let short_status = match map_slot_status(&update_slot) {
CommitmentLevel::Processed => "P",
CommitmentLevel::Confirmed => "C",
CommitmentLevel::Finalized => "F",
_ => {
panic!("unexpected commitment level")
}
};
// DUMPSLOT 283356662,283356661,F,1723556492340
info!(
"MIXSLOT {},{:09},{},{}",
update_slot.slot,
update_slot.parent.unwrap_or(0),
short_status,
since_epoch_ms
);
}
Some(UpdateOneof::Account(update_account)) => {
let since_epoch_ms = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis();
let account_info = update_account.account.unwrap();
let slot = update_account.slot;
let account_pk =
Pubkey::new_from_array(account_info.pubkey.try_into().unwrap());
let write_version = account_info.write_version;
let data_len = account_info.data.len();
// DUMPACCOUNT 283417593,HTQeo4GNbZfGY5G4fAkDr1S5xnz5qWXFgueRwgw53aU1,1332997857270,752,1723582355872
info!(
"MIXACCOUNT {},{},{},{},{}",
slot, account_pk, write_version, data_len, since_epoch_ms
);
}
None => {}
_ => {}
},
None => {
warn!("multiplexer channel closed - aborting");
return;
}
Some(Message::Connecting(_)) => {}
}
}
});
}
// need to provide the commitment level used to filter the accounts
fn start_account_same_level(
mut slots_channel: Receiver<Message>,
commitment_level: CommitmentLevel,
) {
tokio::spawn(async move {
loop {
match slots_channel.recv().await {
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
Some(UpdateOneof::Account(update_account)) => {
let since_epoch_ms = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis();
let account_info = update_account.account.unwrap();
let slot = update_account.slot;
let account_pk =
Pubkey::new_from_array(account_info.pubkey.try_into().unwrap());
let write_version = account_info.write_version;
let data_len = account_info.data.len();
let short_status = match commitment_level {
CommitmentLevel::Processed => "P",
CommitmentLevel::Confirmed => "C",
CommitmentLevel::Finalized => "F",
_ => {
panic!("unexpected commitment level")
}
};
// DUMPACCOUNT 283417593,HTQeo4GNbZfGY5G4fAkDr1S5xnz5qWXFgueRwgw53aU1,1332997857270,752,1723582355872
info!(
"DUMPACCOUNT {},{},{},{},{},{}",
slot, short_status, account_pk, write_version, data_len, since_epoch_ms
);
}
None => {}
_ => {}
},
None => {
warn!("multiplexer channel closed - aborting");
return;
}
Some(Message::Connecting(_)) => {}
}
}
});
}
fn map_slot_status(
slot_update: &SubscribeUpdateSlot,
) -> solana_sdk::commitment_config::CommitmentLevel {
use solana_sdk::commitment_config::CommitmentLevel as solanaCL;
use yellowstone_grpc_proto::geyser::CommitmentLevel as yCL;
yellowstone_grpc_proto::geyser::CommitmentLevel::try_from(slot_update.status)
.map(|v| match v {
yCL::Processed => solanaCL::Processed,
yCL::Confirmed => solanaCL::Confirmed,
yCL::Finalized => solanaCL::Finalized,
})
.expect("valid commitment level")
}
#[tokio::main]
pub async fn main() {
tracing_subscriber::fmt::init();
let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green");
let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok();
info!(
"Using gRPC source {} ({})",
grpc_addr_green,
grpc_x_token_green.is_some()
);
let timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(5),
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
};
let green_config =
GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
// mix of (all) slots and processed accounts
let (autoconnect_tx, slots_accounts_rx) = tokio::sync::mpsc::channel(10);
let _green_stream_ah = create_geyser_autoconnection_task_with_mpsc(
green_config.clone(),
all_slots_and_processed_accounts_together(),
autoconnect_tx.clone(),
);
let (only_processed_accounts_tx, only_processed_accounts_rx) = tokio::sync::mpsc::channel(10);
let _accounts_processed_stream_ah = create_geyser_autoconnection_task_with_mpsc(
green_config.clone(),
accounts_at_level(CommitmentLevel::Processed),
only_processed_accounts_tx.clone(),
);
let (only_confirmed_accounts_tx, only_confirmed_accounts_rx) = tokio::sync::mpsc::channel(10);
let _accounts_confirmed_stream_ah = create_geyser_autoconnection_task_with_mpsc(
green_config.clone(),
accounts_at_level(CommitmentLevel::Confirmed),
only_confirmed_accounts_tx.clone(),
);
let (only_finalized_accounts_tx, only_finalized_accounts_rx) = tokio::sync::mpsc::channel(10);
let _accounts_finalized_stream_ah = create_geyser_autoconnection_task_with_mpsc(
green_config.clone(),
accounts_at_level(CommitmentLevel::Finalized),
only_finalized_accounts_tx.clone(),
);
start_all_slots_and_processed_accounts_consumer(slots_accounts_rx);
start_account_same_level(only_processed_accounts_rx, CommitmentLevel::Processed);
start_account_same_level(only_confirmed_accounts_rx, CommitmentLevel::Confirmed);
start_account_same_level(only_finalized_accounts_rx, CommitmentLevel::Finalized);
// "infinite" sleep
sleep(Duration::from_secs(3600 * 5)).await;
}
const RAYDIUM_AMM_PUBKEY: &'static str = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8";
fn all_slots_and_processed_accounts_together() -> SubscribeRequest {
let mut slot_subs = HashMap::new();
slot_subs.insert(
"client".to_string(),
SubscribeRequestFilterSlots {
// implies all slots
filter_by_commitment: None,
},
);
let mut account_subs = HashMap::new();
account_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
account: vec![],
owner: vec![RAYDIUM_AMM_PUBKEY.to_string()],
filters: vec![],
},
);
SubscribeRequest {
slots: slot_subs,
accounts: account_subs,
ping: None,
// implies "processed"
commitment: None,
..Default::default()
}
}
fn accounts_at_level(commitment_level: CommitmentLevel) -> SubscribeRequest {
let mut account_subs = HashMap::new();
account_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
account: vec![],
owner: vec![RAYDIUM_AMM_PUBKEY.to_string()],
filters: vec![],
},
);
SubscribeRequest {
accounts: account_subs,
ping: None,
commitment: Some(map_commitment_level(CommitmentConfig {
commitment: commitment_level,
}) as i32),
..Default::default()
}
}
#[test]
fn parse_output() {
let data = "283360248,000000000,C,1723558000558";
let mut rdr = ReaderBuilder::new()
.has_headers(false)
.from_reader(data.as_bytes());
let all_records = rdr.records().collect_vec();
assert_eq!(1, all_records.len());
let record = all_records[0].as_ref().unwrap();
let slot: u64 = record[0].parse().unwrap();
let parent: Option<u64> = record[1]
.parse()
.ok()
.and_then(|v| if v == 0 { None } else { Some(v) });
let status = match record[2].to_string().as_str() {
"P" => CommitmentLevel::Processed,
"C" => CommitmentLevel::Confirmed,
"F" => CommitmentLevel::Finalized,
_ => panic!("invalid commitment level"),
};
assert_eq!(283360248, slot);
assert_eq!(None, parent);
assert_eq!(CommitmentLevel::Confirmed, status);
}

View File

@ -74,6 +74,7 @@ pub async fn main() {
tracing_subscriber::fmt::init();
// console_subscriber::init();
let COMMITMENT_LEVEL = CommitmentConfig::processed();
let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green");
let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok();
@ -96,30 +97,27 @@ pub async fn main() {
let green_stream = create_geyser_reconnecting_stream(
config.clone(),
GeyserFilter(CommitmentConfig::processed()).accounts(),
GeyserFilter(COMMITMENT_LEVEL).accounts(),
);
let blue_stream = create_geyser_reconnecting_stream(
config.clone(),
GeyserFilter(CommitmentConfig::processed()).blocks_and_txs(),
GeyserFilter(COMMITMENT_LEVEL).blocks_and_txs(),
);
tokio::spawn(async move {
let mut wtr = csv::Writer::from_path("accounts-mainnet.csv").unwrap();
let mut green_stream = pin!(green_stream);
while let Some(message) = green_stream.next().await {
match message {
Message::GeyserSubscribeUpdate(subscriber_update) => {
match subscriber_update.update_oneof {
Some(UpdateOneof::Account(update)) => {
info!("got update (green)!!! slot: {}", update.slot);
let key = update.account.unwrap().pubkey;
let account_info = update.account.unwrap();
let account_pk = Pubkey::try_from(account_info.pubkey).unwrap();
info!("got account update (green)!!! {} - {:?} - {} bytes",
update.slot, account_pk, account_info.data.len());
let bytes: [u8; 32] =
key.try_into().unwrap_or(Pubkey::default().to_bytes());
let pubkey = Pubkey::new_from_array(bytes);
wtr.write_record(&[pubkey.to_string()]).unwrap();
wtr.flush().unwrap();
account_pk.to_bytes();
}
_ => {}
}
@ -134,12 +132,13 @@ pub async fn main() {
tokio::spawn(async move {
let mut blue_stream = pin!(blue_stream);
let extractor = BlockMiniExtractor(COMMITMENT_LEVEL);
while let Some(message) = blue_stream.next().await {
match message {
Message::GeyserSubscribeUpdate(subscriber_update) => {
let mapped = map_block_update(*subscriber_update);
if let Some(slot) = mapped {
info!("got update (blue)!!! slot: {}", slot);
let mapped = extractor.map_yellowstone_update(*subscriber_update);
if let Some((slot, block_mini)) = mapped {
info!("got update (blue)!!! block: {} - {} bytes", slot, block_mini.blocksize);
}
}
Message::Connecting(attempt) => {

View File

@ -2,7 +2,11 @@ use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::time::Duration;
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, SubscribeUpdate};
use yellowstone_grpc_proto::geyser::{
CommitmentLevel, SubscribeRequest, SubscribeRequestFilterAccounts,
SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots,
SubscribeUpdate,
};
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
pub mod channel_plugger;
@ -97,15 +101,9 @@ impl GeyserFilter {
);
SubscribeRequest {
slots: HashMap::new(),
accounts: Default::default(),
transactions: HashMap::new(),
entry: Default::default(),
blocks: blocks_subs,
blocks_meta: HashMap::new(),
commitment: Some(map_commitment_level(self.0) as i32),
accounts_data_slice: Default::default(),
ping: None,
..Default::default()
}
}
@ -114,15 +112,9 @@ impl GeyserFilter {
blocksmeta_subs.insert("client".to_string(), SubscribeRequestFilterBlocksMeta {});
SubscribeRequest {
slots: HashMap::new(),
accounts: Default::default(),
transactions: HashMap::new(),
entry: Default::default(),
blocks: HashMap::new(),
blocks_meta: blocksmeta_subs,
commitment: Some(map_commitment_level(self.0) as i32),
accounts_data_slice: Default::default(),
ping: None,
..Default::default()
}
}
@ -137,14 +129,8 @@ impl GeyserFilter {
SubscribeRequest {
slots: slots_subs,
accounts: Default::default(),
transactions: HashMap::new(),
entry: Default::default(),
blocks: HashMap::new(),
blocks_meta: HashMap::new(),
commitment: Some(map_commitment_level(self.0) as i32),
accounts_data_slice: Default::default(),
ping: None,
..Default::default()
}
}
@ -160,20 +146,14 @@ impl GeyserFilter {
);
SubscribeRequest {
slots: HashMap::new(),
accounts: accounts_subs,
transactions: HashMap::new(),
entry: Default::default(),
blocks: Default::default(),
blocks_meta: HashMap::new(),
commitment: Some(map_commitment_level(self.0) as i32),
accounts_data_slice: Default::default(),
ping: None,
..Default::default()
}
}
}
fn map_commitment_level(commitment_config: CommitmentConfig) -> CommitmentLevel {
pub fn map_commitment_level(commitment_config: CommitmentConfig) -> CommitmentLevel {
// solana_sdk -> yellowstone
match commitment_config.commitment {
solana_sdk::commitment_config::CommitmentLevel::Processed => {