ws sysvar clock

This commit is contained in:
GroovieGermanikus 2024-06-03 17:20:53 +02:00
parent 0f6ffc6edc
commit 6d1882e7d7
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 43 additions and 1 deletions

View File

@ -39,13 +39,16 @@ async fn main() {
rpc_get_signatures_for_address(&rpc_client).await; rpc_get_signatures_for_address(&rpc_client).await;
websocket_account_subscribe(Url::parse(ws_url2.as_str()).unwrap()).await;
let (slots_tx, mut slots_rx) = tokio::sync::mpsc::channel(100); let (slots_tx, mut slots_rx) = tokio::sync::mpsc::channel(100);
tokio::spawn(websocket_source(Url::parse(ws_url1.as_str()).unwrap(), slots_tx.clone())); tokio::spawn(websocket_source(Url::parse(ws_url1.as_str()).unwrap(), slots_tx.clone()));
tokio::spawn(websocket_source(Url::parse(ws_url2.as_str()).unwrap(), slots_tx.clone())); tokio::spawn(websocket_source(Url::parse(ws_url2.as_str()).unwrap(), slots_tx.clone()));
tokio::spawn(rpc_getslot_source(rpc_url, slots_tx.clone())); tokio::spawn(rpc_getslot_source(rpc_url, slots_tx.clone()));
let started_at = Instant::now(); let started_at = Instant::now();
while let Some(slot) = slots_rx.recv().await { while let Some(slot) = slots_rx.recv().await {
println!("Slot: {}", slot); println!("Slot: {}", slot);
@ -110,7 +113,7 @@ async fn rpc_get_signatures_for_address(rpc_client: &RpcClient) {
let config = GetConfirmedSignaturesForAddress2Config { let config = GetConfirmedSignaturesForAddress2Config {
before: None, before: None,
until: None, until: None,
limit: Some(333), limit: Some(42),
commitment: Some(CommitmentConfig::confirmed()), commitment: Some(CommitmentConfig::confirmed()),
}; };
@ -119,10 +122,49 @@ async fn rpc_get_signatures_for_address(rpc_client: &RpcClient) {
.await .await
.unwrap(); .unwrap();
// 42
info!("Signatures: {:?}", signatures.len()); info!("Signatures: {:?}", signatures.len());
} }
async fn websocket_account_subscribe(
rpc_url: Url
) {
let sysvar_subscribe =
json!({
"jsonrpc": "2.0",
"id": 1,
"method": "accountSubscribe",
"params": [
"SysvarC1ock11111111111111111111111111111111"
]
});
let mut ws1 = StableWebSocket::new_with_timeout(
rpc_url,
sysvar_subscribe.clone(),
Duration::from_secs(3),
)
.await
.unwrap();
let mut channel = ws1.subscribe_message_channel();
let mut count = 0;
while let Ok(msg) = channel.recv().await {
if let WsMessage::Text(payload) = msg {
info!("SysvarC1ock: {:?}", payload);
count += 1;
}
if count > 3 {
break;
}
}
}
async fn rpc_getslot_source( async fn rpc_getslot_source(
rpc_url: Url, rpc_url: Url,
mpsc_downstream: tokio::sync::mpsc::Sender<Slot>, mpsc_downstream: tokio::sync::mpsc::Sender<Slot>,