Fix write_version from multiple servers and startup

Each node has an internal write_version counter. To deduplicate events
and produce a consistent database snapshot independent of the internal
count, we map it a per-slot-and-pubkey write version.

To do this correctly, we can only process account writes for slots where
all account writes will be received. This is a problem during startup,
where it's unclear what slots we have missed data for.

To make startup easier, let the plugin keep track of the highest slot
number that account writes have happened for, and send it when a new
consumer connects.
This commit is contained in:
Christian Kamm 2022-01-03 09:13:32 +01:00
parent 4236805b29
commit f0c26bb8fb
3 changed files with 116 additions and 49 deletions

View File

@ -2,7 +2,7 @@ use {
crate::accounts_selector::AccountsSelector, crate::accounts_selector::AccountsSelector,
accountsdb_proto::{ accountsdb_proto::{
slot_update::Status as SlotUpdateStatus, update::UpdateOneof, AccountWrite, Ping, slot_update::Status as SlotUpdateStatus, update::UpdateOneof, AccountWrite, Ping,
SlotUpdate, SubscribeRequest, Update, SlotUpdate, SubscribeRequest, SubscribeResponse, Update,
}, },
bs58, bs58,
log::*, log::*,
@ -12,7 +12,8 @@ use {
AccountsDbPlugin, AccountsDbPluginError, ReplicaAccountInfoVersions, AccountsDbPlugin, AccountsDbPluginError, ReplicaAccountInfoVersions,
Result as PluginResult, SlotStatus, Result as PluginResult, SlotStatus,
}, },
std::{fs::File, io::Read}, std::sync::atomic::{AtomicU64, Ordering},
std::{fs::File, io::Read, sync::Arc},
tokio::sync::{broadcast, mpsc}, tokio::sync::{broadcast, mpsc},
tonic::transport::Server, tonic::transport::Server,
}; };
@ -39,12 +40,17 @@ pub mod accountsdb_service {
pub struct Service { pub struct Service {
pub sender: broadcast::Sender<Update>, pub sender: broadcast::Sender<Update>,
pub config: ServiceConfig, pub config: ServiceConfig,
pub highest_write_slot: Arc<AtomicU64>,
} }
impl Service { impl Service {
pub fn new(config: ServiceConfig) -> Self { pub fn new(config: ServiceConfig, highest_write_slot: Arc<AtomicU64>) -> Self {
let (tx, _) = broadcast::channel(config.broadcast_buffer_size); let (tx, _) = broadcast::channel(config.broadcast_buffer_size);
Self { sender: tx, config } Self {
sender: tx,
config,
highest_write_slot,
}
} }
} }
@ -59,6 +65,15 @@ pub mod accountsdb_service {
info!("new subscriber"); info!("new subscriber");
let (tx, rx) = mpsc::channel(self.config.subscriber_buffer_size); let (tx, rx) = mpsc::channel(self.config.subscriber_buffer_size);
let mut broadcast_rx = self.sender.subscribe(); let mut broadcast_rx = self.sender.subscribe();
tx.send(Ok(Update {
update_oneof: Some(UpdateOneof::SubscribeResponse(SubscribeResponse {
highest_write_slot: self.highest_write_slot.load(Ordering::SeqCst),
})),
}))
.await
.unwrap();
tokio::spawn(async move { tokio::spawn(async move {
let mut exit = false; let mut exit = false;
while !exit { while !exit {
@ -86,6 +101,9 @@ pub struct PluginData {
server_broadcast: broadcast::Sender<Update>, server_broadcast: broadcast::Sender<Update>,
server_exit_sender: Option<broadcast::Sender<()>>, server_exit_sender: Option<broadcast::Sender<()>>,
accounts_selector: AccountsSelector, accounts_selector: AccountsSelector,
/// Largest slot that an account write was processed for
highest_write_slot: Arc<AtomicU64>,
} }
#[derive(Default)] #[derive(Default)]
@ -150,7 +168,9 @@ impl AccountsDbPlugin for Plugin {
} }
})?; })?;
let service = accountsdb_service::Service::new(config.service_config); let highest_write_slot = Arc::new(AtomicU64::new(0));
let service =
accountsdb_service::Service::new(config.service_config, highest_write_slot.clone());
let (server_exit_sender, mut server_exit_receiver) = broadcast::channel::<()>(1); let (server_exit_sender, mut server_exit_receiver) = broadcast::channel::<()>(1);
let server_broadcast = service.sender.clone(); let server_broadcast = service.sender.clone();
@ -183,6 +203,7 @@ impl AccountsDbPlugin for Plugin {
server_broadcast, server_broadcast,
server_exit_sender: Some(server_exit_sender), server_exit_sender: Some(server_exit_sender),
accounts_selector, accounts_selector,
highest_write_slot,
}); });
Ok(()) Ok(())
@ -220,6 +241,8 @@ impl AccountsDbPlugin for Plugin {
return Ok(()); return Ok(());
} }
data.highest_write_slot.fetch_max(slot, Ordering::SeqCst);
debug!( debug!(
"Updating account {:?} with owner {:?} at slot {:?}", "Updating account {:?} with owner {:?} at slot {:?}",
bs58::encode(account.pubkey).into_string(), bs58::encode(account.pubkey).into_string(),

View File

@ -87,21 +87,32 @@ async fn feed_data_accountsdb(
// We can't get a snapshot immediately since the finalized snapshot would be for a // We can't get a snapshot immediately since the finalized snapshot would be for a
// slot in the past and we'd be missing intermediate updates. // slot in the past and we'd be missing intermediate updates.
// //
// Delay the request until the first processed slot we heard about becomes rooted // Delay the request until the first slot we received all writes for becomes rooted
// to avoid that problem - partially. The rooted slot will still be larger than the // to avoid that problem - partially. The rooted slot will still be larger than the
// finalized slot, so add a number of slots as a buffer. // finalized slot, so add a number of slots as a buffer.
// //
// If that buffer isn't sufficient, there'll be a retry. // If that buffer isn't sufficient, there'll be a retry.
// The first slot that we will receive _all_ account writes for
let mut first_full_slot: u64 = u64::MAX;
// If a snapshot should be performed when ready. // If a snapshot should be performed when ready.
let mut snapshot_needed = true; let mut snapshot_needed = true;
// Lowest slot that an account write was received for. // The highest "rooted" slot that has been seen.
// The slot one after that will have received all write events. let mut max_rooted_slot = 0;
let mut lowest_write_slot = u64::MAX;
// Data for slots will arrive out of order. This value defines how many
// slots after a slot was marked "rooted" we assume it'll not receive
// any more account write information.
//
// This is important for the write_version mapping (to know when slots can
// be dropped).
let max_out_of_order_slots = 40;
// Number of slots that we expect "finalized" commitment to lag // Number of slots that we expect "finalized" commitment to lag
// behind "rooted". // behind "rooted". This matters for getProgramAccounts based snapshots,
// which will have "finalized" commitment.
let mut rooted_to_finalized_slots = 30; let mut rooted_to_finalized_slots = 30;
let mut snapshot_future = future::Fuse::terminated(); let mut snapshot_future = future::Fuse::terminated();
@ -109,14 +120,20 @@ async fn feed_data_accountsdb(
// The plugin sends a ping every 5s or so // The plugin sends a ping every 5s or so
let fatal_idle_timeout = Duration::from_secs(60); let fatal_idle_timeout = Duration::from_secs(60);
// Current slot that account writes come in for. // Highest slot that an account write came in for.
let mut current_write_slot: u64 = 0; let mut newest_write_slot: u64 = 0;
// Unfortunately the write_version the plugin provides is local to
// each RPC node. Here we fix it up by giving each pubkey a write_version
// based on the count of writes it gets each slot.
let mut slot_pubkey_writes = HashMap::<Vec<u8>, u64>::new();
// Keep track of write version from RPC node, to check assumptions // map slot -> (pubkey -> write count)
//
// Since the write_version is a private indentifier per node it can't be used
// to deduplicate events from multiple nodes. Here we rewrite it such that each
// pubkey and each slot has a consecutive numbering of writes starting at 1.
//
// That number will be consistent for each node.
let mut slot_pubkey_writes = HashMap::<u64, HashMap<[u8; 32], u32>>::new();
// Keep track of write version from RPC node, to check the assumption that it
// increases monotonically
let mut last_write_version: u64 = 0; let mut last_write_version: u64 = 0;
loop { loop {
@ -125,36 +142,51 @@ async fn feed_data_accountsdb(
use accountsdb_proto::{update::UpdateOneof, slot_update::Status}; use accountsdb_proto::{update::UpdateOneof, slot_update::Status};
let mut update = update.ok_or(anyhow::anyhow!("accountsdb plugin has closed the stream"))??; let mut update = update.ok_or(anyhow::anyhow!("accountsdb plugin has closed the stream"))??;
match update.update_oneof.as_mut().expect("invalid grpc") { match update.update_oneof.as_mut().expect("invalid grpc") {
UpdateOneof::SubscribeResponse(subscribe_response) => {
first_full_slot = subscribe_response.highest_write_slot + 1;
},
UpdateOneof::SlotUpdate(slot_update) => { UpdateOneof::SlotUpdate(slot_update) => {
let status = slot_update.status; let status = slot_update.status;
if snapshot_needed && status == Status::Rooted as i32 && slot_update.slot - rooted_to_finalized_slots > lowest_write_slot { if status == Status::Rooted as i32 {
snapshot_needed = false; if slot_update.slot > max_rooted_slot {
snapshot_future = tokio::spawn(get_snapshot(snapshot_config.rpc_http_url.clone(), program_id)).fuse(); max_rooted_slot = slot_update.slot;
// drop data for slots that are well beyond rooted
slot_pubkey_writes.retain(|&k, _| k >= max_rooted_slot - max_out_of_order_slots);
}
if snapshot_needed && max_rooted_slot - rooted_to_finalized_slots > first_full_slot {
snapshot_needed = false;
snapshot_future = tokio::spawn(get_snapshot(snapshot_config.rpc_http_url.clone(), program_id)).fuse();
}
} }
}, },
UpdateOneof::AccountWrite(write) => { UpdateOneof::AccountWrite(write) => {
if write.slot > current_write_slot { if write.slot < first_full_slot {
current_write_slot = write.slot; // Don't try to process data for slots where we may have missed writes:
slot_pubkey_writes.clear(); // We could not map the write_version correctly for them.
if lowest_write_slot > write.slot {
lowest_write_slot = write.slot;
}
}
if lowest_write_slot == write.slot {
// don't send out account writes for the first slot
// since we don't know their write_version
continue; continue;
} }
if write.slot > newest_write_slot {
newest_write_slot = write.slot;
} else if write.slot < max_rooted_slot - max_out_of_order_slots {
anyhow::bail!("received write {} slots back from max rooted slot {}", max_rooted_slot - write.slot, max_rooted_slot);
}
// We assume we will receive write versions in sequence. // We assume we will receive write versions in sequence.
// If this is not the case, logic here does not work correctly because // If this is not the case, logic here does not work correctly because
// a later write could arrive first. // a later write could arrive first.
assert!(write.write_version > last_write_version); if write.write_version <= last_write_version {
anyhow::bail!("unexpected write version: {} expected > {}", write.write_version, last_write_version);
}
last_write_version = write.write_version; last_write_version = write.write_version;
let version = slot_pubkey_writes.entry(write.pubkey.clone()).or_insert(0); let pubkey_writes = slot_pubkey_writes.entry(write.slot).or_default();
write.write_version = *version;
*version += 1; let pubkey_bytes = Pubkey::new(&write.pubkey).to_bytes();
let writes = pubkey_writes.entry(pubkey_bytes).or_insert(1); // write version 0 is reserved for snapshots
write.write_version = *writes as u64;
*writes += 1;
}, },
accountsdb_proto::update::UpdateOneof::Ping(_) => {}, accountsdb_proto::update::UpdateOneof::Ping(_) => {},
} }
@ -163,8 +195,8 @@ async fn feed_data_accountsdb(
snapshot = &mut snapshot_future => { snapshot = &mut snapshot_future => {
let snapshot = snapshot??; let snapshot = snapshot??;
if let OptionalContext::Context(snapshot_data) = snapshot { if let OptionalContext::Context(snapshot_data) = snapshot {
info!("snapshot is for slot {}, min write slot was {}", snapshot_data.context.slot, lowest_write_slot); info!("snapshot is for slot {}, first full slot was {}", snapshot_data.context.slot, first_full_slot);
if snapshot_data.context.slot >= lowest_write_slot + 1 { if snapshot_data.context.slot >= first_full_slot {
sender sender
.send(Message::Snapshot(snapshot_data)) .send(Message::Snapshot(snapshot_data))
.await .await
@ -173,7 +205,7 @@ async fn feed_data_accountsdb(
info!( info!(
"snapshot is too old: has slot {}, expected {} minimum", "snapshot is too old: has slot {}, expected {} minimum",
snapshot_data.context.slot, snapshot_data.context.slot,
lowest_write_slot + 1 first_full_slot
); );
// try again in another 10 slots // try again in another 10 slots
snapshot_needed = true; snapshot_needed = true;
@ -256,7 +288,16 @@ pub async fn process_events(
}); });
} }
let mut latest_write = HashMap::<Vec<u8>, (u64, u64)>::new(); // slot -> (pubkey -> write_version)
//
// To avoid unnecessarily sending requests to SQL, we track the latest write_version
// for each (slot, pubkey). If an already-seen write_version comes in, it can be safely
// discarded.
let mut latest_write = HashMap::<u64, HashMap<[u8; 32], u64>>::new();
// Number of slots to retain in latest_write
let latest_write_retention = 50;
let mut metric_account_writes = metrics_sender.register_u64("grpc_account_writes".into()); let mut metric_account_writes = metrics_sender.register_u64("grpc_account_writes".into());
let mut metric_account_queue = metrics_sender.register_u64("account_write_queue".into()); let mut metric_account_queue = metrics_sender.register_u64("account_write_queue".into());
let mut metric_slot_queue = metrics_sender.register_u64("slot_update_queue".into()); let mut metric_slot_queue = metrics_sender.register_u64("slot_update_queue".into());
@ -278,18 +319,15 @@ pub async fn process_events(
metric_account_writes.increment(); metric_account_writes.increment();
metric_account_queue.set(account_write_queue_sender.len() as u64); metric_account_queue.set(account_write_queue_sender.len() as u64);
// Each validator produces writes in strictly monotonous order. // Skip writes that a different server has already sent
// This early-out allows skipping postgres queries for the node let pubkey_writes = latest_write.entry(update.slot).or_default();
// that is behind. let pubkey_bytes = Pubkey::new(&update.pubkey).to_bytes();
if let Some((slot, write_version)) = latest_write.get(&update.pubkey) { let writes = pubkey_writes.entry(pubkey_bytes).or_insert(0);
if *slot > update.slot if update.write_version <= *writes {
|| (*slot == update.slot && *write_version > update.write_version) continue;
{
continue;
}
} }
latest_write *writes = update.write_version;
.insert(update.pubkey.clone(), (update.slot, update.write_version)); latest_write.retain(|&k, _| k >= update.slot - latest_write_retention);
account_write_queue_sender account_write_queue_sender
.send(AccountWrite { .send(AccountWrite {
@ -331,6 +369,7 @@ pub async fn process_events(
.expect("send success"); .expect("send success");
} }
accountsdb_proto::update::UpdateOneof::Ping(_) => {} accountsdb_proto::update::UpdateOneof::Ping(_) => {}
accountsdb_proto::update::UpdateOneof::SubscribeResponse(_) => {}
} }
} }
Message::Snapshot(update) => { Message::Snapshot(update) => {

View File

@ -18,6 +18,7 @@ message Update {
AccountWrite account_write = 1; AccountWrite account_write = 1;
SlotUpdate slot_update = 2; SlotUpdate slot_update = 2;
Ping ping = 3; Ping ping = 3;
SubscribeResponse subscribe_response = 4;
} }
} }
@ -46,3 +47,7 @@ message SlotUpdate {
message Ping { message Ping {
} }
message SubscribeResponse {
uint64 highest_write_slot = 1;
}