diff --git a/src/main.rs b/src/main.rs index 3c36abe..2363ce2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -39,13 +39,16 @@ async fn main() { rpc_get_signatures_for_address(&rpc_client).await; + websocket_account_subscribe(Url::parse(ws_url2.as_str()).unwrap()).await; let (slots_tx, mut slots_rx) = tokio::sync::mpsc::channel(100); + tokio::spawn(websocket_source(Url::parse(ws_url1.as_str()).unwrap(), slots_tx.clone())); tokio::spawn(websocket_source(Url::parse(ws_url2.as_str()).unwrap(), slots_tx.clone())); tokio::spawn(rpc_getslot_source(rpc_url, slots_tx.clone())); + let started_at = Instant::now(); while let Some(slot) = slots_rx.recv().await { println!("Slot: {}", slot); @@ -110,7 +113,7 @@ async fn rpc_get_signatures_for_address(rpc_client: &RpcClient) { let config = GetConfirmedSignaturesForAddress2Config { before: None, until: None, - limit: Some(333), + limit: Some(42), commitment: Some(CommitmentConfig::confirmed()), }; @@ -119,10 +122,49 @@ async fn rpc_get_signatures_for_address(rpc_client: &RpcClient) { .await .unwrap(); + // 42 info!("Signatures: {:?}", signatures.len()); } +async fn websocket_account_subscribe( + rpc_url: Url +) { + + let sysvar_subscribe = + json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "accountSubscribe", + "params": [ + "SysvarC1ock11111111111111111111111111111111" + ] + }); + + let mut ws1 = StableWebSocket::new_with_timeout( + rpc_url, + sysvar_subscribe.clone(), + Duration::from_secs(3), + ) + .await + .unwrap(); + + let mut channel = ws1.subscribe_message_channel(); + + let mut count = 0; + while let Ok(msg) = channel.recv().await { + if let WsMessage::Text(payload) = msg { + info!("SysvarC1ock: {:?}", payload); + count += 1; + } + if count > 3 { + break; + } + } +} + + + async fn rpc_getslot_source( rpc_url: Url, mpsc_downstream: tokio::sync::mpsc::Sender,