all_account+orca

This commit is contained in:
GroovieGermanikus 2024-06-03 18:45:29 +02:00
parent fac8d2550b
commit c28797da9e
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 78 additions and 18 deletions

View File

@ -14,6 +14,7 @@ use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use tokio::select; use tokio::select;
use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::Sender;
use tokio::time::Instant; use tokio::time::Instant;
use tokio_stream::{Stream, StreamExt}; use tokio_stream::{Stream, StreamExt};
use tokio_stream::wrappers::{BroadcastStream, ReceiverStream}; use tokio_stream::wrappers::{BroadcastStream, ReceiverStream};
@ -49,6 +50,8 @@ async fn main() {
websocket_account_subscribe(Url::parse(ws_url2.as_str()).unwrap()).await; websocket_account_subscribe(Url::parse(ws_url2.as_str()).unwrap()).await;
let (slots_tx, mut slots_rx) = tokio::sync::mpsc::channel(100);
let timeouts = GrpcConnectionTimeouts { let timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(10), connect_timeout: Duration::from_secs(10),
request_timeout: Duration::from_secs(10), request_timeout: Duration::from_secs(10),
@ -59,13 +62,36 @@ async fn main() {
let config = GrpcSourceConfig::new(grpc_addr.to_string(), None, None, timeouts.clone()); let config = GrpcSourceConfig::new(grpc_addr.to_string(), None, None, timeouts.clone());
start_geyser_all_accounts_task(config.clone());
start_geyser_orca_token_account_task(config.clone());
tokio::spawn(websocket_source(Url::parse(ws_url1.as_str()).unwrap(), slots_tx.clone()));
tokio::spawn(websocket_source(Url::parse(ws_url2.as_str()).unwrap(), slots_tx.clone()));
tokio::spawn(rpc_getslot_source(rpc_url, slots_tx.clone()));
let started_at = Instant::now();
while let Some(slot) = slots_rx.recv().await {
println!("Slot: {}", slot);
if Instant::now().duration_since(started_at) > Duration::from_secs(2) {
break;
}
}
sleep(Duration::from_secs(15));
}
// note: this might fail if the yellowstone plugin does not allow "any broadcast filter"
fn start_geyser_all_accounts_task(config: GrpcSourceConfig) {
let green_stream = create_geyser_reconnecting_stream( let green_stream = create_geyser_reconnecting_stream(
config.clone(), config.clone(),
usdc_token_account(), // orca_token_account(),
all_accounts(),
); );
let (slots_tx, mut slots_rx) = tokio::sync::mpsc::channel(100);
tokio::spawn(async move { tokio::spawn(async move {
let mut green_stream = pin!(green_stream); let mut green_stream = pin!(green_stream);
while let Some(message) = green_stream.next().await { while let Some(message) = green_stream.next().await {
@ -82,23 +108,30 @@ async fn main() {
} }
} }
}); });
}
fn start_geyser_orca_token_account_task(config: GrpcSourceConfig) {
let green_stream = create_geyser_reconnecting_stream(
config.clone(),
orca_token_account(),
);
tokio::spawn(websocket_source(Url::parse(ws_url1.as_str()).unwrap(), slots_tx.clone())); tokio::spawn(async move {
tokio::spawn(websocket_source(Url::parse(ws_url2.as_str()).unwrap(), slots_tx.clone())); let mut green_stream = pin!(green_stream);
tokio::spawn(rpc_getslot_source(rpc_url, slots_tx.clone())); while let Some(message) = green_stream.next().await {
match message {
Message::GeyserSubscribeUpdate(subscriber_update) => {
let started_at = Instant::now(); match subscriber_update.update_oneof {
while let Some(slot) = slots_rx.recv().await { Some(UpdateOneof::Account(update)) => {
println!("Slot: {}", slot); info!("ORCA Account: {:?}", update.account.unwrap().pubkey);
}
if Instant::now().duration_since(started_at) > Duration::from_secs(2) { _ => {}
break; }
}
_ => {}
}
} }
} });
sleep(Duration::from_secs(15));
} }
async fn rpc_gpa(rpc_client: &RpcClient) { async fn rpc_gpa(rpc_client: &RpcClient) {
@ -262,7 +295,34 @@ async fn websocket_source(
} }
pub fn usdc_token_account() -> SubscribeRequest { pub fn all_accounts() -> SubscribeRequest {
let mut accounts_subs = HashMap::new();
accounts_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
account: vec![],
owner: vec![],
filters: vec![],
},
);
SubscribeRequest {
slots: Default::default(),
accounts: accounts_subs,
transactions: HashMap::new(),
entry: Default::default(),
blocks: Default::default(),
blocks_meta: HashMap::new(),
commitment: None,
accounts_data_slice: Default::default(),
ping: None,
}
}
pub fn orca_token_account() -> SubscribeRequest {
let mut accounts_subs = HashMap::new(); let mut accounts_subs = HashMap::new();
accounts_subs.insert( accounts_subs.insert(
"client".to_string(), "client".to_string(),