subscribe to accounts with respective level

This commit is contained in:
GroovieGermanikus 2024-08-13 23:03:18 +02:00
parent 977c1c85c8
commit 89c05f535c
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 94 additions and 8 deletions

View File

@ -29,13 +29,13 @@ use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterSlo
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message};
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, map_commitment_level, Message};
use tokio::time::{sleep, Duration};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterAccounts;
fn start_slot_multi_consumer(mut slots_channel: Receiver<Message>) {
fn start_all_slots_and_processed_accounts_consumer(mut slots_channel: Receiver<Message>) {
tokio::spawn(async move {
loop {
match slots_channel.recv().await {
@ -50,7 +50,7 @@ fn start_slot_multi_consumer(mut slots_channel: Receiver<Message>) {
_ => { panic!("unexpected commitment level") }
};
// DUMPSLOT 283356662,283356661,F,1723556492340
info!("DUMPSLOT {},{:09},{},{}", update_slot.slot, update_slot.parent.unwrap_or(0), short_status, since_epoch_ms);
info!("MIXSLOT {},{:09},{},{}", update_slot.slot, update_slot.parent.unwrap_or(0), short_status, since_epoch_ms);
}
Some(UpdateOneof::Account(update_account)) => {
let since_epoch_ms = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis();
@ -61,7 +61,46 @@ fn start_slot_multi_consumer(mut slots_channel: Receiver<Message>) {
let write_version = account_info.write_version;
let data_len = account_info.data.len();
// DUMPACCOUNT 283417593,HTQeo4GNbZfGY5G4fAkDr1S5xnz5qWXFgueRwgw53aU1,1332997857270,752,1723582355872
info!("DUMPACCOUNT {},{},{},{},{}", slot, account_pk, write_version, data_len, since_epoch_ms);
info!("MIXACCOUNT {},{},{},{},{}", slot, account_pk, write_version, data_len, since_epoch_ms);
}
None => {}
_ => {}
},
None => {
warn!("multiplexer channel closed - aborting");
return;
}
Some(Message::Connecting(_)) => {}
}
}
});
}
// need to provide the commitment level used to filter the accounts
fn start_account_same_level(mut slots_channel: Receiver<Message>, commitment_level: CommitmentLevel) {
tokio::spawn(async move {
loop {
match slots_channel.recv().await {
Some(Message::GeyserSubscribeUpdate(update)) => match update.update_oneof {
Some(UpdateOneof::Account(update_account)) => {
let since_epoch_ms = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis();
let account_info = update_account.account.unwrap();
let slot = update_account.slot;
let account_pk = Pubkey::new_from_array(account_info.pubkey.try_into().unwrap());
let write_version = account_info.write_version;
let data_len = account_info.data.len();
let short_status = match commitment_level {
CommitmentLevel::Processed => "P",
CommitmentLevel::Confirmed => "C",
CommitmentLevel::Finalized => "F",
_ => { panic!("unexpected commitment level") }
};
// DUMPACCOUNT 283417593,HTQeo4GNbZfGY5G4fAkDr1S5xnz5qWXFgueRwgw53aU1,1332997857270,752,1723582355872
info!("DUMPACCOUNT {},{},{},{},{},{}", slot, short_status, account_pk, write_version, data_len, since_epoch_ms);
}
None => {}
_ => {}
@ -110,14 +149,41 @@ pub async fn main() {
let green_config =
GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
let (autoconnect_tx, slots_rx) = tokio::sync::mpsc::channel(10);
// mix of (all) slots and processed accounts
let (autoconnect_tx, slots_accounts_rx) = tokio::sync::mpsc::channel(10);
let _green_stream_ah = create_geyser_autoconnection_task_with_mpsc(
green_config.clone(),
slots_all_confirmation_levels(),
all_slots_and_processed_accounts_together(),
autoconnect_tx.clone(),
);
start_slot_multi_consumer(slots_rx);
let (only_processed_accounts_tx, only_processed_accounts_rx) = tokio::sync::mpsc::channel(10);
let _accounts_processed_stream_ah = create_geyser_autoconnection_task_with_mpsc(
green_config.clone(),
accounts_at_level(CommitmentLevel::Processed),
only_processed_accounts_tx.clone(),
);
let (only_confirmed_accounts_tx, only_confirmed_accounts_rx) = tokio::sync::mpsc::channel(10);
let _accounts_confirmed_stream_ah = create_geyser_autoconnection_task_with_mpsc(
green_config.clone(),
accounts_at_level(CommitmentLevel::Confirmed),
only_confirmed_accounts_tx.clone(),
);
let (only_finalized_accounts_tx, only_finalized_accounts_rx) = tokio::sync::mpsc::channel(10);
let _accounts_finalized_stream_ah = create_geyser_autoconnection_task_with_mpsc(
green_config.clone(),
accounts_at_level(CommitmentLevel::Finalized),
only_finalized_accounts_tx.clone(),
);
start_all_slots_and_processed_accounts_consumer(slots_accounts_rx);
start_account_same_level(only_processed_accounts_rx, CommitmentLevel::Processed);
start_account_same_level(only_confirmed_accounts_rx, CommitmentLevel::Confirmed);
start_account_same_level(only_finalized_accounts_rx, CommitmentLevel::Finalized);
// "infinite" sleep
sleep(Duration::from_secs(3600 * 5)).await;
@ -125,7 +191,7 @@ pub async fn main() {
const RAYDIUM_AMM_PUBKEY: &'static str = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8";
fn slots_all_confirmation_levels() -> SubscribeRequest {
fn all_slots_and_processed_accounts_together() -> SubscribeRequest {
let mut slot_subs = HashMap::new();
slot_subs.insert(
"client".to_string(),
@ -154,6 +220,26 @@ fn slots_all_confirmation_levels() -> SubscribeRequest {
}
}
fn accounts_at_level(commitment_level: CommitmentLevel) -> SubscribeRequest {
let mut account_subs = HashMap::new();
account_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
account: vec![],
owner: vec![RAYDIUM_AMM_PUBKEY.to_string()],
filters: vec![],
},
);
SubscribeRequest {
accounts: account_subs,
ping: None,
commitment: Some(map_commitment_level(CommitmentConfig { commitment: commitment_level }) as i32),
..Default::default()
}
}
#[test]
fn parse_output() {
let data = "283360248,000000000,C,1723558000558";