Fix potential missing data with initial snapshotting

This commit is contained in:
Christian Kamm 2021-11-07 10:52:38 +01:00
parent 9f7f72c3cb
commit 387d34e511
1 changed files with 71 additions and 42 deletions

View File

@ -7,6 +7,7 @@ use solana_client::rpc_response::{Response, RpcKeyedAccount};
use solana_rpc::{rpc::rpc_full::FullClient, rpc::OptionalContext}; use solana_rpc::{rpc::rpc_full::FullClient, rpc::OptionalContext};
use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey}; use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey};
use futures::{future, future::FutureExt};
use tonic::transport::Endpoint; use tonic::transport::Endpoint;
use log::*; use log::*;
@ -19,9 +20,52 @@ use accountsdb_proto::accounts_db_client::AccountsDbClient;
use crate::{AccountWrite, AnyhowWrap, Config, SlotUpdate}; use crate::{AccountWrite, AnyhowWrap, Config, SlotUpdate};
type SnapshotData = Response<Vec<RpcKeyedAccount>>;
enum Message { enum Message {
GrpcUpdate(accountsdb_proto::Update), GrpcUpdate(accountsdb_proto::Update),
Snapshot(Response<Vec<RpcKeyedAccount>>), Snapshot(SnapshotData),
}
async fn get_snapshot(rpc_http_url: String, min_slot: u64) -> Result<SnapshotData, anyhow::Error> {
let rpc_client = http::connect_with_options::<FullClient>(&rpc_http_url, true)
.await
.map_err_anyhow()?;
// TODO: Make addresses filters configurable
let program_id = Pubkey::from_str("mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68")?;
let account_info_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::processed()),
data_slice: None,
};
let program_accounts_config = RpcProgramAccountsConfig {
filters: None,
with_context: Some(true),
account_config: account_info_config.clone(),
};
info!("requesting snapshot");
let account_snapshot = rpc_client
.get_program_accounts(
program_id.to_string(),
Some(program_accounts_config.clone()),
)
.await
.map_err_anyhow()?;
info!("snapshot done");
if let OptionalContext::Context(account_snapshot_response) = account_snapshot {
if account_snapshot_response.context.slot < min_slot {
anyhow::bail!(
"snapshot has slot {}, expected {} minimum",
account_snapshot_response.context.slot,
min_slot
);
}
return Ok(account_snapshot_response);
}
anyhow::bail!("bad snapshot format");
} }
async fn feed_data_accountsdb( async fn feed_data_accountsdb(
@ -36,57 +80,41 @@ async fn feed_data_accountsdb(
.await? .await?
.into_inner(); .into_inner();
let rpc_client = http::connect_with_options::<FullClient>(&config.rpc_http_url, true) // We can't get a snapshot immediately since the snapshot data has no write_version.
.await // If we did, there could be missing account writes between the snapshot and
.map_err_anyhow()?; // the first streamed data.
// So instead, get a snapshot once we got notified about a new slot. Then we can
let program_id = Pubkey::from_str("mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68")?; // be confident that the snapshot will be for a slot >= that slot and that we'll have
let account_info_config = RpcAccountInfoConfig { // all data for it.
encoding: Some(UiAccountEncoding::Base64), let mut trigger_snapshot_on_slot = true;
commitment: Some(CommitmentConfig::processed()), let mut snapshot_future = future::Fuse::terminated();
data_slice: None,
};
// TODO: Make addresses filters configurable
let program_accounts_config = RpcProgramAccountsConfig {
filters: None, /*Some(vec![RpcFilterType::DataSize(
size_of::<MangoAccount>() as u64
)]),*/
with_context: Some(true),
account_config: account_info_config.clone(),
};
// Get an account snapshot on start
// TODO: Should only do that once we know we have all account write events for that slot
info!("requesting snapshot");
let account_snapshot = rpc_client
.get_program_accounts(
program_id.to_string(),
Some(program_accounts_config.clone()),
)
.await
.map_err_anyhow()?;
if let OptionalContext::Context(account_snapshot_response) = account_snapshot {
sender
.send(Message::Snapshot(account_snapshot_response))
.await
.expect("send success");
} else {
anyhow::bail!("bad snapshot format");
}
info!("snapshot done");
loop { loop {
tokio::select! { tokio::select! {
update = update_stream.next() => { update = update_stream.next() => {
match update { match update {
Some(update) => { Some(update) => {
sender.send(Message::GrpcUpdate(update?)).await.expect("send success"); use accountsdb_proto::{update::UpdateOneof, slot_update::Status};
let update = update?;
if let UpdateOneof::SlotUpdate(slot_update) = update.update_oneof.as_ref().expect("invalid grpc") {
if trigger_snapshot_on_slot && slot_update.status == Status::Processed as i32 {
snapshot_future = tokio::spawn(get_snapshot(config.rpc_http_url.clone(), slot_update.slot)).fuse();
trigger_snapshot_on_slot = false;
}
}
sender.send(Message::GrpcUpdate(update)).await.expect("send success");
}, },
None => { None => {
anyhow::bail!("accountsdb plugin has closed the stream"); anyhow::bail!("accountsdb plugin has closed the stream");
}, },
} }
}, },
snapshot = &mut snapshot_future => {
sender
.send(Message::Snapshot(snapshot??))
.await
.expect("send success");
},
_ = tokio::time::sleep(Duration::from_secs(60)) => { _ = tokio::time::sleep(Duration::from_secs(60)) => {
anyhow::bail!("accountsdb plugin hasn't sent a message in too long"); anyhow::bail!("accountsdb plugin hasn't sent a message in too long");
} }
@ -118,11 +146,11 @@ pub async fn process_events(
}); });
loop { loop {
let msg = msg_receiver.recv().await.unwrap(); let msg = msg_receiver.recv().await.expect("sender must not close");
match msg { match msg {
Message::GrpcUpdate(update) => { Message::GrpcUpdate(update) => {
match update.update_oneof.unwrap() { match update.update_oneof.expect("invalid grpc") {
accountsdb_proto::update::UpdateOneof::AccountWrite(update) => { accountsdb_proto::update::UpdateOneof::AccountWrite(update) => {
assert!(update.pubkey.len() == 32); assert!(update.pubkey.len() == 32);
assert!(update.owner.len() == 32); assert!(update.owner.len() == 32);
@ -167,6 +195,7 @@ pub async fn process_events(
Message::Snapshot(update) => { Message::Snapshot(update) => {
info!("processing snapshot..."); info!("processing snapshot...");
for keyed_account in update.value { for keyed_account in update.value {
// TODO: Resnapshot on invalid data?
let account: Account = keyed_account.account.decode().unwrap(); let account: Account = keyed_account.account.decode().unwrap();
let pubkey = Pubkey::from_str(&keyed_account.pubkey).unwrap(); let pubkey = Pubkey::from_str(&keyed_account.pubkey).unwrap();
account_write_queue_sender account_write_queue_sender