rename snapshot struct fields
This commit is contained in:
parent
a1f7ff670c
commit
f33192b6e8
|
@ -13,14 +13,15 @@ use crate::{AnyhowWrap, EntityFilter, FilterConfig};
|
|||
|
||||
/// gPA snapshot struct
|
||||
pub struct SnapshotProgramAccounts {
|
||||
pub snapshot_slot: Slot,
|
||||
pub snapshot_accounts: Vec<RpcKeyedAccount>,
|
||||
pub slot: Slot,
|
||||
pub accounts: Vec<RpcKeyedAccount>,
|
||||
}
|
||||
|
||||
/// gMA snapshot struct
|
||||
pub struct SnapshotMultipleAccounts {
|
||||
pub snapshot_slot: Slot,
|
||||
pub snapshot_accounts: Vec<(String, Option<UiAccount>)>,
|
||||
pub slot: Slot,
|
||||
// (account pubkey, snapshot account)
|
||||
pub accounts: Vec<(String, Option<UiAccount>)>,
|
||||
}
|
||||
|
||||
|
||||
|
@ -55,8 +56,8 @@ pub async fn get_snapshot_gpa(
|
|||
OptionalContext::Context(snapshot) => {
|
||||
let snapshot_slot = snapshot.context.slot;
|
||||
Ok(SnapshotProgramAccounts {
|
||||
snapshot_slot,
|
||||
snapshot_accounts: snapshot.value,
|
||||
slot: snapshot_slot,
|
||||
accounts: snapshot.value,
|
||||
})
|
||||
}
|
||||
OptionalContext::NoContext(_) => anyhow::bail!("bad snapshot format")
|
||||
|
@ -89,7 +90,7 @@ pub async fn get_snapshot_gma(
|
|||
|
||||
let acc: Vec<(String, Option<UiAccount>)> = ids.iter().zip(account_snapshot_response.value).map(|x| (x.0.clone(), x.1)).collect();
|
||||
Ok(SnapshotMultipleAccounts {
|
||||
snapshot_slot: first_full_shot,
|
||||
snapshot_accounts: acc,
|
||||
slot: first_full_shot,
|
||||
accounts: acc,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ use crate::snapshot::{get_snapshot_gma, get_snapshot_gpa, SnapshotMultipleAccoun
|
|||
const SNAPSHOT_REFRESH_INTERVAL: Duration = Duration::from_secs(300);
|
||||
const WS_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000);
|
||||
const DEBOUNCE_RETRY_LOOP: Duration = Duration::from_millis(500);
|
||||
const FATAL_IDLE_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
|
||||
enum WebsocketMessage {
|
||||
|
@ -85,7 +86,7 @@ async fn feed_data_by_accounts(
|
|||
|
||||
let mut slot_sub = client.slots_updates_subscribe().map_err_anyhow()?;
|
||||
|
||||
// note: the snapshot refresh schedule is local to the feed_data method and will be reset in outer loop
|
||||
// note: the snapshot refresh schedule is local to the feed_data method and will be reset as soon as we iterate the outer loop
|
||||
let mut last_snapshot = Instant::now().sub(SNAPSHOT_REFRESH_INTERVAL);
|
||||
|
||||
// consume from channels until an error happens
|
||||
|
@ -99,7 +100,7 @@ async fn feed_data_by_accounts(
|
|||
get_snapshot_gma(snapshot_rpc_http_url.clone(), account_ids.clone()).await;
|
||||
let snapshot = response.context("gma snapshot response").map_err_anyhow();
|
||||
match snapshot {
|
||||
Ok(SnapshotMultipleAccounts { snapshot_slot, snapshot_accounts }) => {
|
||||
Ok(SnapshotMultipleAccounts { slot: snapshot_slot, accounts: snapshot_accounts }) => {
|
||||
debug!(
|
||||
"fetched new gma snapshot slot={} len={:?} time={:?}",
|
||||
snapshot_slot,
|
||||
|
@ -136,10 +137,6 @@ async fn feed_data_by_accounts(
|
|||
None => {
|
||||
// note: this might loop if the filter did not return anything
|
||||
warn!("account stream closed");
|
||||
// that is weired - assume that it is okey to cancel in all cases
|
||||
// if filter_config.program_ids.is_empty() {
|
||||
// return Ok(());
|
||||
// }
|
||||
return Ok(());
|
||||
},
|
||||
}
|
||||
|
@ -155,8 +152,8 @@ async fn feed_data_by_accounts(
|
|||
},
|
||||
}
|
||||
},
|
||||
_ = tokio::time::sleep(Duration::from_secs(60)) => {
|
||||
warn!("websocket timeout");
|
||||
_ = tokio::time::sleep(FATAL_IDLE_TIMEOUT) => {
|
||||
warn!("websocket hasn't received a message in too long");
|
||||
return Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -199,9 +196,10 @@ async fn feed_data_by_program(
|
|||
|
||||
let mut slot_sub = client.slots_updates_subscribe().map_err_anyhow()?;
|
||||
|
||||
// note: the snapshot refresh schedule is local to the feed_data method and will be reset as soon as we iterate the outer loop
|
||||
let mut last_snapshot = Instant::now().sub(SNAPSHOT_REFRESH_INTERVAL);
|
||||
|
||||
// consume from channels unitl an error happens
|
||||
// consume from channels until an error happens
|
||||
loop {
|
||||
|
||||
// occasionally cause a new snapshot to be produced
|
||||
|
@ -212,7 +210,7 @@ async fn feed_data_by_program(
|
|||
get_snapshot_gpa(snapshot_rpc_http_url.clone(), program_id.clone()).await;
|
||||
let snapshot = response.context("gpa snapshot response").map_err_anyhow();
|
||||
match snapshot {
|
||||
Ok(SnapshotProgramAccounts { snapshot_slot, snapshot_accounts } ) => {
|
||||
Ok(SnapshotProgramAccounts { slot: snapshot_slot, accounts: snapshot_accounts } ) => {
|
||||
let accounts: Vec<(String, Option<UiAccount>)> = snapshot_accounts
|
||||
.iter()
|
||||
.map(|x| {
|
||||
|
@ -261,8 +259,8 @@ async fn feed_data_by_program(
|
|||
},
|
||||
}
|
||||
},
|
||||
_ = tokio::time::sleep(Duration::from_secs(60)) => {
|
||||
warn!("websocket timeout");
|
||||
_ = tokio::time::sleep(FATAL_IDLE_TIMEOUT) => {
|
||||
warn!("websocket hasn't received a message in too long");
|
||||
return Ok(())
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue