diff --git a/lib/src/grpc_plugin_source.rs b/lib/src/grpc_plugin_source.rs index 58a0b72..d7ec795 100644 --- a/lib/src/grpc_plugin_source.rs +++ b/lib/src/grpc_plugin_source.rs @@ -91,7 +91,11 @@ async fn feed_data_accountsdb( // So instead, get a snapshot once we got notified about a new slot. Then we can // be confident that the snapshot will be for a slot >= that slot and that we'll have // all data for it. - let mut trigger_snapshot_on_slot = true; + // We can't do it immediately for the first processed slot we get, because the + // info about the new slot is sent before it's completed and the snapshot will be + // for the preceding slot then. Thus wait for two slots, before asking for a snapshot. + let trigger_snapshot_after_slots = 2; + let mut trigger_snapshot_slot_counter = trigger_snapshot_after_slots; let mut snapshot_future = future::Fuse::terminated(); // The plugin sends a ping every 5s or so @@ -105,9 +109,13 @@ async fn feed_data_accountsdb( use accountsdb_proto::{update::UpdateOneof, slot_update::Status}; let update = update?; if let UpdateOneof::SlotUpdate(slot_update) = update.update_oneof.as_ref().expect("invalid grpc") { - if trigger_snapshot_on_slot && slot_update.status == Status::Processed as i32 { - snapshot_future = tokio::spawn(get_snapshot(config.snapshot_source.rpc_http_url.clone(), program_id, slot_update.slot)).fuse(); - trigger_snapshot_on_slot = false; + if slot_update.status == Status::Processed as i32 { + if trigger_snapshot_slot_counter > 1 { + trigger_snapshot_slot_counter -= 1; + } else if trigger_snapshot_slot_counter == 1 { + snapshot_future = tokio::spawn(get_snapshot(config.snapshot_source.rpc_http_url.clone(), program_id, slot_update.slot - trigger_snapshot_after_slots + 1)).fuse(); + trigger_snapshot_slot_counter = 0; + } } } sender.send(Message::GrpcUpdate(update)).await.expect("send success");