diff --git a/src/main.rs b/src/main.rs index 9c8c215..b470293 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,6 +14,7 @@ use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::pubkey::Pubkey; use tokio::select; use tokio::sync::mpsc::error::SendError; +use tokio::sync::mpsc::Sender; use tokio::time::Instant; use tokio_stream::{Stream, StreamExt}; use tokio_stream::wrappers::{BroadcastStream, ReceiverStream}; @@ -49,6 +50,8 @@ async fn main() { websocket_account_subscribe(Url::parse(ws_url2.as_str()).unwrap()).await; + let (slots_tx, mut slots_rx) = tokio::sync::mpsc::channel(100); + let timeouts = GrpcConnectionTimeouts { connect_timeout: Duration::from_secs(10), request_timeout: Duration::from_secs(10), @@ -59,13 +62,36 @@ async fn main() { let config = GrpcSourceConfig::new(grpc_addr.to_string(), None, None, timeouts.clone()); + start_geyser_all_accounts_task(config.clone()); + + start_geyser_orca_token_account_task(config.clone()); + + tokio::spawn(websocket_source(Url::parse(ws_url1.as_str()).unwrap(), slots_tx.clone())); + tokio::spawn(websocket_source(Url::parse(ws_url2.as_str()).unwrap(), slots_tx.clone())); + tokio::spawn(rpc_getslot_source(rpc_url, slots_tx.clone())); + + + let started_at = Instant::now(); + while let Some(slot) = slots_rx.recv().await { + println!("Slot: {}", slot); + + if Instant::now().duration_since(started_at) > Duration::from_secs(2) { + break; + } + } + + sleep(Duration::from_secs(15)); +} + + +// note: this might fail if the yellowstone plugin does not allow "any broadcast filter" +fn start_geyser_all_accounts_task(config: GrpcSourceConfig) { let green_stream = create_geyser_reconnecting_stream( config.clone(), - usdc_token_account(), + // orca_token_account(), + all_accounts(), ); - let (slots_tx, mut slots_rx) = tokio::sync::mpsc::channel(100); - tokio::spawn(async move { let mut green_stream = pin!(green_stream); while let Some(message) = green_stream.next().await { @@ -82,23 +108,30 @@ async fn main() { } } }); +} +fn start_geyser_orca_token_account_task(config: GrpcSourceConfig) { + let green_stream = create_geyser_reconnecting_stream( + config.clone(), + orca_token_account(), + ); - tokio::spawn(websocket_source(Url::parse(ws_url1.as_str()).unwrap(), slots_tx.clone())); - tokio::spawn(websocket_source(Url::parse(ws_url2.as_str()).unwrap(), slots_tx.clone())); - tokio::spawn(rpc_getslot_source(rpc_url, slots_tx.clone())); - - - let started_at = Instant::now(); - while let Some(slot) = slots_rx.recv().await { - println!("Slot: {}", slot); - - if Instant::now().duration_since(started_at) > Duration::from_secs(2) { - break; + tokio::spawn(async move { + let mut green_stream = pin!(green_stream); + while let Some(message) = green_stream.next().await { + match message { + Message::GeyserSubscribeUpdate(subscriber_update) => { + match subscriber_update.update_oneof { + Some(UpdateOneof::Account(update)) => { + info!("ORCA Account: {:?}", update.account.unwrap().pubkey); + } + _ => {} + } + } + _ => {} + } } - } - - sleep(Duration::from_secs(15)); + }); } async fn rpc_gpa(rpc_client: &RpcClient) { @@ -262,7 +295,34 @@ async fn websocket_source( } -pub fn usdc_token_account() -> SubscribeRequest { +pub fn all_accounts() -> SubscribeRequest { + let mut accounts_subs = HashMap::new(); + accounts_subs.insert( + "client".to_string(), + SubscribeRequestFilterAccounts { + account: vec![], + owner: vec![], + filters: vec![], + }, + ); + + + SubscribeRequest { + slots: Default::default(), + accounts: accounts_subs, + transactions: HashMap::new(), + entry: Default::default(), + blocks: Default::default(), + blocks_meta: HashMap::new(), + commitment: None, + accounts_data_slice: Default::default(), + ping: None, + } +} + + + +pub fn orca_token_account() -> SubscribeRequest { let mut accounts_subs = HashMap::new(); accounts_subs.insert( "client".to_string(),