diff --git a/lib/src/grpc_plugin_source.rs b/lib/src/grpc_plugin_source.rs index 936d093..9b23c11 100644 --- a/lib/src/grpc_plugin_source.rs +++ b/lib/src/grpc_plugin_source.rs @@ -33,15 +33,14 @@ enum Message { async fn get_snapshot( rpc_http_url: String, program_id: Pubkey, - min_slot: u64, -) -> anyhow::Result { +) -> anyhow::Result>> { let rpc_client = http::connect_with_options::(&rpc_http_url, true) .await .map_err_anyhow()?; let account_info_config = RpcAccountInfoConfig { encoding: Some(UiAccountEncoding::Base64), - commitment: Some(CommitmentConfig::processed()), + commitment: Some(CommitmentConfig::finalized()), data_slice: None, }; let program_accounts_config = RpcProgramAccountsConfig { @@ -58,19 +57,8 @@ async fn get_snapshot( ) .await .map_err_anyhow()?; - info!("snapshot done"); - if let OptionalContext::Context(account_snapshot_response) = account_snapshot { - if account_snapshot_response.context.slot < min_slot { - anyhow::bail!( - "snapshot has slot {}, expected {} minimum", - account_snapshot_response.context.slot, - min_slot - ); - } - return Ok(account_snapshot_response); - } - - anyhow::bail!("bad snapshot format"); + info!("snapshot received"); + Ok(account_snapshot) } async fn feed_data_accountsdb( @@ -96,17 +84,26 @@ async fn feed_data_accountsdb( .await? .into_inner(); - // We can't get a snapshot immediately since the snapshot data has no write_version. - // If we did, there could be missing account writes between the snapshot and - // the first streamed data. - // So instead, get a snapshot once we got notified about a new slot. Then we can - // be confident that the snapshot will be for a slot >= that slot and that we'll have - // all data for it. - // We can't do it immediately for the first processed slot we get, because the - // info about the new slot is sent before it's completed and the snapshot will be - // for the preceding slot then. Thus wait for some slots before asking for a snapshot. - let trigger_snapshot_after_slots = 10; - let mut trigger_snapshot_slot_counter = trigger_snapshot_after_slots; + // We can't get a snapshot immediately since the finalized snapshot would be for a + // slot in the past and we'd be missing intermediate updates. + // + // Delay the request until the first processed slot we heard about becomes rooted + // to avoid that problem - partially. The rooted slot will still be larger than the + // finalized slot, so add a number of slots as a buffer. + // + // If that buffer isn't sufficient, there'll be a retry. + + // If a snapshot should be performed when ready. + let mut snapshot_needed = true; + + // Lowest slot that an account write was received for. + // The slot one after that will have received all write events. + let mut lowest_write_slot = u64::MAX; + + // Number of slots that we expect "finalized" commitment to lag + // behind "rooted". + let mut rooted_to_finalized_slots = 30; + let mut snapshot_future = future::Fuse::terminated(); // The plugin sends a ping every 5s or so @@ -115,32 +112,47 @@ async fn feed_data_accountsdb( loop { tokio::select! { update = update_stream.next() => { - match update { - Some(update) => { - use accountsdb_proto::{update::UpdateOneof, slot_update::Status}; - let update = update?; - if let UpdateOneof::SlotUpdate(slot_update) = update.update_oneof.as_ref().expect("invalid grpc") { - if slot_update.status == Status::Processed as i32 { - if trigger_snapshot_slot_counter > 1 { - trigger_snapshot_slot_counter -= 1; - } else if trigger_snapshot_slot_counter == 1 { - snapshot_future = tokio::spawn(get_snapshot(snapshot_config.rpc_http_url.clone(), program_id, slot_update.slot - trigger_snapshot_after_slots + 1)).fuse(); - trigger_snapshot_slot_counter = 0; - } - } + use accountsdb_proto::{update::UpdateOneof, slot_update::Status}; + let update = update.ok_or(anyhow::anyhow!("accountsdb plugin has closed the stream"))??; + match update.update_oneof.as_ref().expect("invalid grpc") { + UpdateOneof::SlotUpdate(slot_update) => { + let status = slot_update.status; + if snapshot_needed && status == Status::Rooted as i32 && slot_update.slot - rooted_to_finalized_slots > lowest_write_slot { + snapshot_needed = false; + snapshot_future = tokio::spawn(get_snapshot(snapshot_config.rpc_http_url.clone(), program_id)).fuse(); } - sender.send(Message::GrpcUpdate(update)).await.expect("send success"); }, - None => { - anyhow::bail!("accountsdb plugin has closed the stream"); + UpdateOneof::AccountWrite(write) => { + if lowest_write_slot > write.slot { + lowest_write_slot = write.slot; + } }, + accountsdb_proto::update::UpdateOneof::Ping(_) => {}, } + sender.send(Message::GrpcUpdate(update)).await.expect("send success"); }, snapshot = &mut snapshot_future => { - sender - .send(Message::Snapshot(snapshot??)) - .await - .expect("send success"); + let snapshot = snapshot??; + if let OptionalContext::Context(snapshot_data) = snapshot { + info!("snapshot is for slot {}, min write slot was {}", snapshot_data.context.slot, lowest_write_slot); + if snapshot_data.context.slot >= lowest_write_slot + 1 { + sender + .send(Message::Snapshot(snapshot_data)) + .await + .expect("send success"); + } else { + info!( + "snapshot is too old: has slot {}, expected {} minimum", + snapshot_data.context.slot, + lowest_write_slot + 1 + ); + // try again in another 10 slots + snapshot_needed = true; + rooted_to_finalized_slots += 10; + } + } else { + anyhow::bail!("bad snapshot format"); + } }, _ = tokio::time::sleep(fatal_idle_timeout) => { anyhow::bail!("accountsdb plugin hasn't sent a message in too long");