solana-accountsdb-connector/lib/src/grpc_plugin_source.rs

283 lines
12 KiB
Rust
Raw Normal View History

2021-11-01 13:48:17 -07:00
use jsonrpc_core::futures::StreamExt;
use jsonrpc_core_client::transports::http;
use solana_account_decoder::UiAccountEncoding;
use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
2021-11-02 06:35:45 -07:00
use solana_client::rpc_response::{Response, RpcKeyedAccount};
use solana_rpc::{rpc::rpc_full::FullClient, rpc::OptionalContext};
2021-11-02 06:35:45 -07:00
use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey};
2021-11-01 13:48:17 -07:00
use futures::{future, future::FutureExt};
2021-11-02 05:22:13 -07:00
use tonic::transport::Endpoint;
2021-11-02 00:55:39 -07:00
use log::*;
use std::{collections::HashMap, str::FromStr, time::Duration};
2021-11-01 13:48:17 -07:00
pub mod accountsdb_proto {
tonic::include_proto!("accountsdb");
}
use accountsdb_proto::accounts_db_client::AccountsDbClient;
2021-11-08 06:48:50 -08:00
use crate::{
2021-11-08 11:15:46 -08:00
metrics, AccountWrite, AnyhowWrap, Config, GrpcSourceConfig, SlotStatus, SlotUpdate,
2021-11-08 06:48:50 -08:00
SnapshotSourceConfig,
};
2021-11-01 13:48:17 -07:00
type SnapshotData = Response<Vec<RpcKeyedAccount>>;
2021-11-02 06:35:45 -07:00
enum Message {
GrpcUpdate(accountsdb_proto::Update),
Snapshot(SnapshotData),
2021-11-02 06:35:45 -07:00
}
2021-11-08 00:57:56 -08:00
async fn get_snapshot(
rpc_http_url: String,
program_id: Pubkey,
min_slot: u64,
) -> anyhow::Result<SnapshotData> {
let rpc_client = http::connect_with_options::<FullClient>(&rpc_http_url, true)
2021-11-01 13:48:17 -07:00
.await
.map_err_anyhow()?;
let account_info_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::processed()),
data_slice: None,
};
let program_accounts_config = RpcProgramAccountsConfig {
filters: None,
2021-11-01 13:48:17 -07:00
with_context: Some(true),
account_config: account_info_config.clone(),
};
2021-11-02 06:35:45 -07:00
info!("requesting snapshot");
2021-11-01 13:48:17 -07:00
let account_snapshot = rpc_client
.get_program_accounts(
program_id.to_string(),
Some(program_accounts_config.clone()),
)
.await
.map_err_anyhow()?;
info!("snapshot done");
2021-11-01 13:48:17 -07:00
if let OptionalContext::Context(account_snapshot_response) = account_snapshot {
if account_snapshot_response.context.slot < min_slot {
anyhow::bail!(
"snapshot has slot {}, expected {} minimum",
account_snapshot_response.context.slot,
min_slot
);
}
return Ok(account_snapshot_response);
2021-11-01 13:48:17 -07:00
}
anyhow::bail!("bad snapshot format");
}
async fn feed_data_accountsdb(
2021-11-08 06:48:50 -08:00
grpc_config: &GrpcSourceConfig,
snapshot_config: &SnapshotSourceConfig,
sender: async_channel::Sender<Message>,
) -> anyhow::Result<()> {
2021-11-08 06:48:50 -08:00
let program_id = Pubkey::from_str(&snapshot_config.program_id)?;
2021-11-08 00:57:56 -08:00
let mut client =
2021-11-08 06:48:50 -08:00
AccountsDbClient::connect(Endpoint::from_str(&grpc_config.connection_string)?).await?;
let mut update_stream = client
.subscribe(accountsdb_proto::SubscribeRequest {})
.await?
.into_inner();
// We can't get a snapshot immediately since the snapshot data has no write_version.
// If we did, there could be missing account writes between the snapshot and
// the first streamed data.
// So instead, get a snapshot once we got notified about a new slot. Then we can
// be confident that the snapshot will be for a slot >= that slot and that we'll have
// all data for it.
// We can't do it immediately for the first processed slot we get, because the
// info about the new slot is sent before it's completed and the snapshot will be
// for the preceding slot then. Thus wait for two slots, before asking for a snapshot.
let trigger_snapshot_after_slots = 2;
let mut trigger_snapshot_slot_counter = trigger_snapshot_after_slots;
let mut snapshot_future = future::Fuse::terminated();
2021-11-01 13:48:17 -07:00
2021-11-08 02:42:22 -08:00
// The plugin sends a ping every 5s or so
let fatal_idle_timeout = Duration::from_secs(60);
2021-11-01 13:48:17 -07:00
loop {
tokio::select! {
update = update_stream.next() => {
match update {
Some(update) => {
use accountsdb_proto::{update::UpdateOneof, slot_update::Status};
let update = update?;
if let UpdateOneof::SlotUpdate(slot_update) = update.update_oneof.as_ref().expect("invalid grpc") {
if slot_update.status == Status::Processed as i32 {
if trigger_snapshot_slot_counter > 1 {
trigger_snapshot_slot_counter -= 1;
} else if trigger_snapshot_slot_counter == 1 {
2021-11-08 06:48:50 -08:00
snapshot_future = tokio::spawn(get_snapshot(snapshot_config.rpc_http_url.clone(), program_id, slot_update.slot - trigger_snapshot_after_slots + 1)).fuse();
trigger_snapshot_slot_counter = 0;
}
}
}
sender.send(Message::GrpcUpdate(update)).await.expect("send success");
2021-11-01 13:48:17 -07:00
},
None => {
anyhow::bail!("accountsdb plugin has closed the stream");
},
}
},
snapshot = &mut snapshot_future => {
sender
.send(Message::Snapshot(snapshot??))
.await
.expect("send success");
},
2021-11-08 02:42:22 -08:00
_ = tokio::time::sleep(fatal_idle_timeout) => {
2021-11-01 13:48:17 -07:00
anyhow::bail!("accountsdb plugin hasn't sent a message in too long");
}
}
}
}
pub async fn process_events(
2021-11-02 05:22:13 -07:00
config: Config,
account_write_queue_sender: async_channel::Sender<AccountWrite>,
slot_queue_sender: async_channel::Sender<SlotUpdate>,
2021-11-08 11:15:46 -08:00
metrics_sender: metrics::Metrics,
2021-11-01 13:48:17 -07:00
) {
// Subscribe to accountsdb
let (msg_sender, msg_receiver) = async_channel::unbounded::<Message>();
2021-11-08 06:48:50 -08:00
for grpc_source in config.grpc_sources {
let msg_sender = msg_sender.clone();
let snapshot_source = config.snapshot_source.clone();
2021-11-08 11:15:46 -08:00
let metrics_sender = metrics_sender.clone();
2021-11-08 06:48:50 -08:00
tokio::spawn(async move {
2021-11-09 05:23:42 -08:00
let mut metric_retries = metrics_sender.register_u64(format!(
2021-11-08 11:15:46 -08:00
"grpc_source_{}_connection_retries",
grpc_source.name
));
let metric_status =
2021-11-09 05:23:42 -08:00
metrics_sender.register_string(format!("grpc_source_{}_status", grpc_source.name));
2021-11-08 11:15:46 -08:00
2021-11-08 06:48:50 -08:00
// Continuously reconnect on failure
loop {
2021-11-08 11:15:46 -08:00
metric_status.set("connected".into());
2021-11-08 06:48:50 -08:00
let out = feed_data_accountsdb(&grpc_source, &snapshot_source, msg_sender.clone());
let result = out.await;
assert!(result.is_err());
if let Err(err) = result {
warn!(
"error during communication with the accountsdb plugin. retrying. {:?}",
err
);
}
2021-11-08 11:15:46 -08:00
metric_status.set("disconnected".into());
metric_retries.increment();
2021-11-08 06:48:50 -08:00
tokio::time::sleep(std::time::Duration::from_secs(
grpc_source.retry_connection_sleep_secs,
))
.await;
2021-11-01 13:48:17 -07:00
}
2021-11-08 06:48:50 -08:00
});
}
2021-11-01 13:48:17 -07:00
let mut latest_write = HashMap::<Vec<u8>, (u64, u64)>::new();
2021-11-09 05:23:42 -08:00
let mut metric_account_writes = metrics_sender.register_u64("grpc_account_writes".into());
let mut metric_account_queue = metrics_sender.register_u64("account_write_queue".into());
let mut metric_slot_queue = metrics_sender.register_u64("slot_update_queue".into());
let mut metric_slot_updates = metrics_sender.register_u64("grpc_slot_updates".into());
let mut metric_snapshots = metrics_sender.register_u64("grpc_snapshots".into());
2021-11-08 11:15:46 -08:00
let mut metric_snapshot_account_writes =
2021-11-09 05:23:42 -08:00
metrics_sender.register_u64("grpc_snapshot_account_writes".into());
2021-11-01 13:48:17 -07:00
loop {
let msg = msg_receiver.recv().await.expect("sender must not close");
2021-11-01 13:48:17 -07:00
2021-11-02 06:35:45 -07:00
match msg {
Message::GrpcUpdate(update) => {
match update.update_oneof.expect("invalid grpc") {
2021-11-02 06:35:45 -07:00
accountsdb_proto::update::UpdateOneof::AccountWrite(update) => {
assert!(update.pubkey.len() == 32);
assert!(update.owner.len() == 32);
2021-11-08 11:15:46 -08:00
metric_account_writes.increment();
metric_account_queue.set(account_write_queue_sender.len() as u64);
// Each validator produces writes in strictly monotonous order.
// This early-out allows skipping postgres queries for the node
// that is behind.
if let Some((slot, write_version)) = latest_write.get(&update.pubkey) {
if *slot > update.slot
|| (*slot == update.slot && *write_version > update.write_version)
{
continue;
}
}
latest_write
.insert(update.pubkey.clone(), (update.slot, update.write_version));
2021-11-02 06:35:45 -07:00
account_write_queue_sender
.send(AccountWrite {
pubkey: Pubkey::new(&update.pubkey),
slot: update.slot as i64, // TODO: narrowing
write_version: update.write_version as i64,
lamports: update.lamports as i64,
owner: Pubkey::new(&update.owner),
executable: update.executable,
rent_epoch: update.rent_epoch as i64,
data: update.data,
})
.await
.expect("send success");
2021-11-02 06:35:45 -07:00
}
accountsdb_proto::update::UpdateOneof::SlotUpdate(update) => {
2021-11-08 11:15:46 -08:00
metric_slot_updates.increment();
metric_slot_queue.set(slot_queue_sender.len() as u64);
2021-11-02 06:35:45 -07:00
use accountsdb_proto::slot_update::Status;
2021-11-08 03:39:56 -08:00
let status = Status::from_i32(update.status).map(|v| match v {
Status::Processed => SlotStatus::Processed,
Status::Confirmed => SlotStatus::Confirmed,
Status::Rooted => SlotStatus::Rooted,
});
if status.is_none() {
2021-11-02 06:35:45 -07:00
error!("unexpected slot status: {}", update.status);
continue;
}
slot_queue_sender
.send(SlotUpdate {
slot: update.slot as i64, // TODO: narrowing
parent: update.parent.map(|v| v as i64),
2021-11-08 03:39:56 -08:00
status: status.expect("qed"),
2021-11-02 06:35:45 -07:00
})
.await
.expect("send success");
2021-11-02 06:35:45 -07:00
}
accountsdb_proto::update::UpdateOneof::Ping(_) => {}
2021-11-01 13:48:17 -07:00
}
}
2021-11-02 06:35:45 -07:00
Message::Snapshot(update) => {
2021-11-08 11:15:46 -08:00
metric_snapshots.increment();
2021-11-02 06:35:45 -07:00
info!("processing snapshot...");
for keyed_account in update.value {
2021-11-08 11:15:46 -08:00
metric_snapshot_account_writes.increment();
metric_account_queue.set(account_write_queue_sender.len() as u64);
// TODO: Resnapshot on invalid data?
2021-11-02 06:35:45 -07:00
let account: Account = keyed_account.account.decode().unwrap();
let pubkey = Pubkey::from_str(&keyed_account.pubkey).unwrap();
account_write_queue_sender
.send(AccountWrite::from(pubkey, update.context.slot, 0, account))
.await
.expect("send success");
2021-11-02 06:35:45 -07:00
}
info!("processing snapshot done");
}
2021-11-01 13:48:17 -07:00
}
}
}