Fix delayed snapshots

The snapshot was requested too early before.
This commit is contained in:
Christian Kamm 2021-11-08 14:26:37 +01:00
parent 11f4d6bd67
commit 35c0c5da23
1 changed files with 12 additions and 4 deletions

View File

@ -91,7 +91,11 @@ async fn feed_data_accountsdb(
// So instead, get a snapshot once we got notified about a new slot. Then we can // So instead, get a snapshot once we got notified about a new slot. Then we can
// be confident that the snapshot will be for a slot >= that slot and that we'll have // be confident that the snapshot will be for a slot >= that slot and that we'll have
// all data for it. // all data for it.
let mut trigger_snapshot_on_slot = true; // We can't do it immediately for the first processed slot we get, because the
// info about the new slot is sent before it's completed and the snapshot will be
// for the preceding slot then. Thus wait for two slots, before asking for a snapshot.
let trigger_snapshot_after_slots = 2;
let mut trigger_snapshot_slot_counter = trigger_snapshot_after_slots;
let mut snapshot_future = future::Fuse::terminated(); let mut snapshot_future = future::Fuse::terminated();
// The plugin sends a ping every 5s or so // The plugin sends a ping every 5s or so
@ -105,9 +109,13 @@ async fn feed_data_accountsdb(
use accountsdb_proto::{update::UpdateOneof, slot_update::Status}; use accountsdb_proto::{update::UpdateOneof, slot_update::Status};
let update = update?; let update = update?;
if let UpdateOneof::SlotUpdate(slot_update) = update.update_oneof.as_ref().expect("invalid grpc") { if let UpdateOneof::SlotUpdate(slot_update) = update.update_oneof.as_ref().expect("invalid grpc") {
if trigger_snapshot_on_slot && slot_update.status == Status::Processed as i32 { if slot_update.status == Status::Processed as i32 {
snapshot_future = tokio::spawn(get_snapshot(config.snapshot_source.rpc_http_url.clone(), program_id, slot_update.slot)).fuse(); if trigger_snapshot_slot_counter > 1 {
trigger_snapshot_on_slot = false; trigger_snapshot_slot_counter -= 1;
} else if trigger_snapshot_slot_counter == 1 {
snapshot_future = tokio::spawn(get_snapshot(config.snapshot_source.rpc_http_url.clone(), program_id, slot_update.slot - trigger_snapshot_after_slots + 1)).fuse();
trigger_snapshot_slot_counter = 0;
}
} }
} }
sender.send(Message::GrpcUpdate(update)).await.expect("send success"); sender.send(Message::GrpcUpdate(update)).await.expect("send success");