Fix initial snapshot #1
Previously the initial snapshot could be returned for a discarded slot. Now a finalized-commitment snapshot is retrieved.
This commit is contained in:
parent
17d6787bf1
commit
76e14d0300
|
@ -33,15 +33,14 @@ enum Message {
|
||||||
async fn get_snapshot(
|
async fn get_snapshot(
|
||||||
rpc_http_url: String,
|
rpc_http_url: String,
|
||||||
program_id: Pubkey,
|
program_id: Pubkey,
|
||||||
min_slot: u64,
|
) -> anyhow::Result<OptionalContext<Vec<RpcKeyedAccount>>> {
|
||||||
) -> anyhow::Result<SnapshotData> {
|
|
||||||
let rpc_client = http::connect_with_options::<FullClient>(&rpc_http_url, true)
|
let rpc_client = http::connect_with_options::<FullClient>(&rpc_http_url, true)
|
||||||
.await
|
.await
|
||||||
.map_err_anyhow()?;
|
.map_err_anyhow()?;
|
||||||
|
|
||||||
let account_info_config = RpcAccountInfoConfig {
|
let account_info_config = RpcAccountInfoConfig {
|
||||||
encoding: Some(UiAccountEncoding::Base64),
|
encoding: Some(UiAccountEncoding::Base64),
|
||||||
commitment: Some(CommitmentConfig::processed()),
|
commitment: Some(CommitmentConfig::finalized()),
|
||||||
data_slice: None,
|
data_slice: None,
|
||||||
};
|
};
|
||||||
let program_accounts_config = RpcProgramAccountsConfig {
|
let program_accounts_config = RpcProgramAccountsConfig {
|
||||||
|
@ -58,19 +57,8 @@ async fn get_snapshot(
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map_err_anyhow()?;
|
.map_err_anyhow()?;
|
||||||
info!("snapshot done");
|
info!("snapshot received");
|
||||||
if let OptionalContext::Context(account_snapshot_response) = account_snapshot {
|
Ok(account_snapshot)
|
||||||
if account_snapshot_response.context.slot < min_slot {
|
|
||||||
anyhow::bail!(
|
|
||||||
"snapshot has slot {}, expected {} minimum",
|
|
||||||
account_snapshot_response.context.slot,
|
|
||||||
min_slot
|
|
||||||
);
|
|
||||||
}
|
|
||||||
return Ok(account_snapshot_response);
|
|
||||||
}
|
|
||||||
|
|
||||||
anyhow::bail!("bad snapshot format");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn feed_data_accountsdb(
|
async fn feed_data_accountsdb(
|
||||||
|
@ -96,17 +84,26 @@ async fn feed_data_accountsdb(
|
||||||
.await?
|
.await?
|
||||||
.into_inner();
|
.into_inner();
|
||||||
|
|
||||||
// We can't get a snapshot immediately since the snapshot data has no write_version.
|
// We can't get a snapshot immediately since the finalized snapshot would be for a
|
||||||
// If we did, there could be missing account writes between the snapshot and
|
// slot in the past and we'd be missing intermediate updates.
|
||||||
// the first streamed data.
|
//
|
||||||
// So instead, get a snapshot once we got notified about a new slot. Then we can
|
// Delay the request until the first processed slot we heard about becomes rooted
|
||||||
// be confident that the snapshot will be for a slot >= that slot and that we'll have
|
// to avoid that problem - partially. The rooted slot will still be larger than the
|
||||||
// all data for it.
|
// finalized slot, so add a number of slots as a buffer.
|
||||||
// We can't do it immediately for the first processed slot we get, because the
|
//
|
||||||
// info about the new slot is sent before it's completed and the snapshot will be
|
// If that buffer isn't sufficient, there'll be a retry.
|
||||||
// for the preceding slot then. Thus wait for some slots before asking for a snapshot.
|
|
||||||
let trigger_snapshot_after_slots = 10;
|
// If a snapshot should be performed when ready.
|
||||||
let mut trigger_snapshot_slot_counter = trigger_snapshot_after_slots;
|
let mut snapshot_needed = true;
|
||||||
|
|
||||||
|
// Lowest slot that an account write was received for.
|
||||||
|
// The slot one after that will have received all write events.
|
||||||
|
let mut lowest_write_slot = u64::MAX;
|
||||||
|
|
||||||
|
// Number of slots that we expect "finalized" commitment to lag
|
||||||
|
// behind "rooted".
|
||||||
|
let mut rooted_to_finalized_slots = 30;
|
||||||
|
|
||||||
let mut snapshot_future = future::Fuse::terminated();
|
let mut snapshot_future = future::Fuse::terminated();
|
||||||
|
|
||||||
// The plugin sends a ping every 5s or so
|
// The plugin sends a ping every 5s or so
|
||||||
|
@ -115,32 +112,47 @@ async fn feed_data_accountsdb(
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
update = update_stream.next() => {
|
update = update_stream.next() => {
|
||||||
match update {
|
use accountsdb_proto::{update::UpdateOneof, slot_update::Status};
|
||||||
Some(update) => {
|
let update = update.ok_or(anyhow::anyhow!("accountsdb plugin has closed the stream"))??;
|
||||||
use accountsdb_proto::{update::UpdateOneof, slot_update::Status};
|
match update.update_oneof.as_ref().expect("invalid grpc") {
|
||||||
let update = update?;
|
UpdateOneof::SlotUpdate(slot_update) => {
|
||||||
if let UpdateOneof::SlotUpdate(slot_update) = update.update_oneof.as_ref().expect("invalid grpc") {
|
let status = slot_update.status;
|
||||||
if slot_update.status == Status::Processed as i32 {
|
if snapshot_needed && status == Status::Rooted as i32 && slot_update.slot - rooted_to_finalized_slots > lowest_write_slot {
|
||||||
if trigger_snapshot_slot_counter > 1 {
|
snapshot_needed = false;
|
||||||
trigger_snapshot_slot_counter -= 1;
|
snapshot_future = tokio::spawn(get_snapshot(snapshot_config.rpc_http_url.clone(), program_id)).fuse();
|
||||||
} else if trigger_snapshot_slot_counter == 1 {
|
|
||||||
snapshot_future = tokio::spawn(get_snapshot(snapshot_config.rpc_http_url.clone(), program_id, slot_update.slot - trigger_snapshot_after_slots + 1)).fuse();
|
|
||||||
trigger_snapshot_slot_counter = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
sender.send(Message::GrpcUpdate(update)).await.expect("send success");
|
|
||||||
},
|
},
|
||||||
None => {
|
UpdateOneof::AccountWrite(write) => {
|
||||||
anyhow::bail!("accountsdb plugin has closed the stream");
|
if lowest_write_slot > write.slot {
|
||||||
|
lowest_write_slot = write.slot;
|
||||||
|
}
|
||||||
},
|
},
|
||||||
|
accountsdb_proto::update::UpdateOneof::Ping(_) => {},
|
||||||
}
|
}
|
||||||
|
sender.send(Message::GrpcUpdate(update)).await.expect("send success");
|
||||||
},
|
},
|
||||||
snapshot = &mut snapshot_future => {
|
snapshot = &mut snapshot_future => {
|
||||||
sender
|
let snapshot = snapshot??;
|
||||||
.send(Message::Snapshot(snapshot??))
|
if let OptionalContext::Context(snapshot_data) = snapshot {
|
||||||
.await
|
info!("snapshot is for slot {}, min write slot was {}", snapshot_data.context.slot, lowest_write_slot);
|
||||||
.expect("send success");
|
if snapshot_data.context.slot >= lowest_write_slot + 1 {
|
||||||
|
sender
|
||||||
|
.send(Message::Snapshot(snapshot_data))
|
||||||
|
.await
|
||||||
|
.expect("send success");
|
||||||
|
} else {
|
||||||
|
info!(
|
||||||
|
"snapshot is too old: has slot {}, expected {} minimum",
|
||||||
|
snapshot_data.context.slot,
|
||||||
|
lowest_write_slot + 1
|
||||||
|
);
|
||||||
|
// try again in another 10 slots
|
||||||
|
snapshot_needed = true;
|
||||||
|
rooted_to_finalized_slots += 10;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
anyhow::bail!("bad snapshot format");
|
||||||
|
}
|
||||||
},
|
},
|
||||||
_ = tokio::time::sleep(fatal_idle_timeout) => {
|
_ = tokio::time::sleep(fatal_idle_timeout) => {
|
||||||
anyhow::bail!("accountsdb plugin hasn't sent a message in too long");
|
anyhow::bail!("accountsdb plugin hasn't sent a message in too long");
|
||||||
|
|
Loading…
Reference in New Issue