From f0c26bb8fb61164a5ef57453ae66851087404cfe Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Mon, 3 Jan 2022 09:13:32 +0100 Subject: [PATCH] Fix write_version from multiple servers and startup Each node has an internal write_version counter. To deduplicate events and produce a consistent database snapshot independent of the internal count, we map it a per-slot-and-pubkey write version. To do this correctly, we can only process account writes for slots where all account writes will be received. This is a problem during startup, where it's unclear what slots we have missed data for. To make startup easier, let the plugin keep track of the highest slot number that account writes have happened for, and send it when a new consumer connects. --- .../src/accountsdb_plugin_grpc.rs | 33 ++++- lib/src/grpc_plugin_source.rs | 127 ++++++++++++------ proto/accountsdb.proto | 5 + 3 files changed, 116 insertions(+), 49 deletions(-) diff --git a/accountsdb-plugin-grpc/src/accountsdb_plugin_grpc.rs b/accountsdb-plugin-grpc/src/accountsdb_plugin_grpc.rs index 7311d4c..2a5679b 100644 --- a/accountsdb-plugin-grpc/src/accountsdb_plugin_grpc.rs +++ b/accountsdb-plugin-grpc/src/accountsdb_plugin_grpc.rs @@ -2,7 +2,7 @@ use { crate::accounts_selector::AccountsSelector, accountsdb_proto::{ slot_update::Status as SlotUpdateStatus, update::UpdateOneof, AccountWrite, Ping, - SlotUpdate, SubscribeRequest, Update, + SlotUpdate, SubscribeRequest, SubscribeResponse, Update, }, bs58, log::*, @@ -12,7 +12,8 @@ use { AccountsDbPlugin, AccountsDbPluginError, ReplicaAccountInfoVersions, Result as PluginResult, SlotStatus, }, - std::{fs::File, io::Read}, + std::sync::atomic::{AtomicU64, Ordering}, + std::{fs::File, io::Read, sync::Arc}, tokio::sync::{broadcast, mpsc}, tonic::transport::Server, }; @@ -39,12 +40,17 @@ pub mod accountsdb_service { pub struct Service { pub sender: broadcast::Sender, pub config: ServiceConfig, + pub highest_write_slot: Arc, } impl Service { - pub fn new(config: ServiceConfig) -> Self { + pub fn new(config: ServiceConfig, highest_write_slot: Arc) -> Self { let (tx, _) = broadcast::channel(config.broadcast_buffer_size); - Self { sender: tx, config } + Self { + sender: tx, + config, + highest_write_slot, + } } } @@ -59,6 +65,15 @@ pub mod accountsdb_service { info!("new subscriber"); let (tx, rx) = mpsc::channel(self.config.subscriber_buffer_size); let mut broadcast_rx = self.sender.subscribe(); + + tx.send(Ok(Update { + update_oneof: Some(UpdateOneof::SubscribeResponse(SubscribeResponse { + highest_write_slot: self.highest_write_slot.load(Ordering::SeqCst), + })), + })) + .await + .unwrap(); + tokio::spawn(async move { let mut exit = false; while !exit { @@ -86,6 +101,9 @@ pub struct PluginData { server_broadcast: broadcast::Sender, server_exit_sender: Option>, accounts_selector: AccountsSelector, + + /// Largest slot that an account write was processed for + highest_write_slot: Arc, } #[derive(Default)] @@ -150,7 +168,9 @@ impl AccountsDbPlugin for Plugin { } })?; - let service = accountsdb_service::Service::new(config.service_config); + let highest_write_slot = Arc::new(AtomicU64::new(0)); + let service = + accountsdb_service::Service::new(config.service_config, highest_write_slot.clone()); let (server_exit_sender, mut server_exit_receiver) = broadcast::channel::<()>(1); let server_broadcast = service.sender.clone(); @@ -183,6 +203,7 @@ impl AccountsDbPlugin for Plugin { server_broadcast, server_exit_sender: Some(server_exit_sender), accounts_selector, + highest_write_slot, }); Ok(()) @@ -220,6 +241,8 @@ impl AccountsDbPlugin for Plugin { return Ok(()); } + data.highest_write_slot.fetch_max(slot, Ordering::SeqCst); + debug!( "Updating account {:?} with owner {:?} at slot {:?}", bs58::encode(account.pubkey).into_string(), diff --git a/lib/src/grpc_plugin_source.rs b/lib/src/grpc_plugin_source.rs index 2b6d879..bb085a8 100644 --- a/lib/src/grpc_plugin_source.rs +++ b/lib/src/grpc_plugin_source.rs @@ -87,21 +87,32 @@ async fn feed_data_accountsdb( // We can't get a snapshot immediately since the finalized snapshot would be for a // slot in the past and we'd be missing intermediate updates. // - // Delay the request until the first processed slot we heard about becomes rooted + // Delay the request until the first slot we received all writes for becomes rooted // to avoid that problem - partially. The rooted slot will still be larger than the // finalized slot, so add a number of slots as a buffer. // // If that buffer isn't sufficient, there'll be a retry. + // The first slot that we will receive _all_ account writes for + let mut first_full_slot: u64 = u64::MAX; + // If a snapshot should be performed when ready. let mut snapshot_needed = true; - // Lowest slot that an account write was received for. - // The slot one after that will have received all write events. - let mut lowest_write_slot = u64::MAX; + // The highest "rooted" slot that has been seen. + let mut max_rooted_slot = 0; + + // Data for slots will arrive out of order. This value defines how many + // slots after a slot was marked "rooted" we assume it'll not receive + // any more account write information. + // + // This is important for the write_version mapping (to know when slots can + // be dropped). + let max_out_of_order_slots = 40; // Number of slots that we expect "finalized" commitment to lag - // behind "rooted". + // behind "rooted". This matters for getProgramAccounts based snapshots, + // which will have "finalized" commitment. let mut rooted_to_finalized_slots = 30; let mut snapshot_future = future::Fuse::terminated(); @@ -109,14 +120,20 @@ async fn feed_data_accountsdb( // The plugin sends a ping every 5s or so let fatal_idle_timeout = Duration::from_secs(60); - // Current slot that account writes come in for. - let mut current_write_slot: u64 = 0; - // Unfortunately the write_version the plugin provides is local to - // each RPC node. Here we fix it up by giving each pubkey a write_version - // based on the count of writes it gets each slot. - let mut slot_pubkey_writes = HashMap::, u64>::new(); + // Highest slot that an account write came in for. + let mut newest_write_slot: u64 = 0; - // Keep track of write version from RPC node, to check assumptions + // map slot -> (pubkey -> write count) + // + // Since the write_version is a private indentifier per node it can't be used + // to deduplicate events from multiple nodes. Here we rewrite it such that each + // pubkey and each slot has a consecutive numbering of writes starting at 1. + // + // That number will be consistent for each node. + let mut slot_pubkey_writes = HashMap::>::new(); + + // Keep track of write version from RPC node, to check the assumption that it + // increases monotonically let mut last_write_version: u64 = 0; loop { @@ -125,36 +142,51 @@ async fn feed_data_accountsdb( use accountsdb_proto::{update::UpdateOneof, slot_update::Status}; let mut update = update.ok_or(anyhow::anyhow!("accountsdb plugin has closed the stream"))??; match update.update_oneof.as_mut().expect("invalid grpc") { + UpdateOneof::SubscribeResponse(subscribe_response) => { + first_full_slot = subscribe_response.highest_write_slot + 1; + }, UpdateOneof::SlotUpdate(slot_update) => { let status = slot_update.status; - if snapshot_needed && status == Status::Rooted as i32 && slot_update.slot - rooted_to_finalized_slots > lowest_write_slot { - snapshot_needed = false; - snapshot_future = tokio::spawn(get_snapshot(snapshot_config.rpc_http_url.clone(), program_id)).fuse(); + if status == Status::Rooted as i32 { + if slot_update.slot > max_rooted_slot { + max_rooted_slot = slot_update.slot; + + // drop data for slots that are well beyond rooted + slot_pubkey_writes.retain(|&k, _| k >= max_rooted_slot - max_out_of_order_slots); + } + if snapshot_needed && max_rooted_slot - rooted_to_finalized_slots > first_full_slot { + snapshot_needed = false; + snapshot_future = tokio::spawn(get_snapshot(snapshot_config.rpc_http_url.clone(), program_id)).fuse(); + } } }, UpdateOneof::AccountWrite(write) => { - if write.slot > current_write_slot { - current_write_slot = write.slot; - slot_pubkey_writes.clear(); - if lowest_write_slot > write.slot { - lowest_write_slot = write.slot; - } - } - if lowest_write_slot == write.slot { - // don't send out account writes for the first slot - // since we don't know their write_version + if write.slot < first_full_slot { + // Don't try to process data for slots where we may have missed writes: + // We could not map the write_version correctly for them. continue; } + if write.slot > newest_write_slot { + newest_write_slot = write.slot; + } else if write.slot < max_rooted_slot - max_out_of_order_slots { + anyhow::bail!("received write {} slots back from max rooted slot {}", max_rooted_slot - write.slot, max_rooted_slot); + } + // We assume we will receive write versions in sequence. // If this is not the case, logic here does not work correctly because // a later write could arrive first. - assert!(write.write_version > last_write_version); + if write.write_version <= last_write_version { + anyhow::bail!("unexpected write version: {} expected > {}", write.write_version, last_write_version); + } last_write_version = write.write_version; - let version = slot_pubkey_writes.entry(write.pubkey.clone()).or_insert(0); - write.write_version = *version; - *version += 1; + let pubkey_writes = slot_pubkey_writes.entry(write.slot).or_default(); + + let pubkey_bytes = Pubkey::new(&write.pubkey).to_bytes(); + let writes = pubkey_writes.entry(pubkey_bytes).or_insert(1); // write version 0 is reserved for snapshots + write.write_version = *writes as u64; + *writes += 1; }, accountsdb_proto::update::UpdateOneof::Ping(_) => {}, } @@ -163,8 +195,8 @@ async fn feed_data_accountsdb( snapshot = &mut snapshot_future => { let snapshot = snapshot??; if let OptionalContext::Context(snapshot_data) = snapshot { - info!("snapshot is for slot {}, min write slot was {}", snapshot_data.context.slot, lowest_write_slot); - if snapshot_data.context.slot >= lowest_write_slot + 1 { + info!("snapshot is for slot {}, first full slot was {}", snapshot_data.context.slot, first_full_slot); + if snapshot_data.context.slot >= first_full_slot { sender .send(Message::Snapshot(snapshot_data)) .await @@ -173,7 +205,7 @@ async fn feed_data_accountsdb( info!( "snapshot is too old: has slot {}, expected {} minimum", snapshot_data.context.slot, - lowest_write_slot + 1 + first_full_slot ); // try again in another 10 slots snapshot_needed = true; @@ -256,7 +288,16 @@ pub async fn process_events( }); } - let mut latest_write = HashMap::, (u64, u64)>::new(); + // slot -> (pubkey -> write_version) + // + // To avoid unnecessarily sending requests to SQL, we track the latest write_version + // for each (slot, pubkey). If an already-seen write_version comes in, it can be safely + // discarded. + let mut latest_write = HashMap::>::new(); + + // Number of slots to retain in latest_write + let latest_write_retention = 50; + let mut metric_account_writes = metrics_sender.register_u64("grpc_account_writes".into()); let mut metric_account_queue = metrics_sender.register_u64("account_write_queue".into()); let mut metric_slot_queue = metrics_sender.register_u64("slot_update_queue".into()); @@ -278,18 +319,15 @@ pub async fn process_events( metric_account_writes.increment(); metric_account_queue.set(account_write_queue_sender.len() as u64); - // Each validator produces writes in strictly monotonous order. - // This early-out allows skipping postgres queries for the node - // that is behind. - if let Some((slot, write_version)) = latest_write.get(&update.pubkey) { - if *slot > update.slot - || (*slot == update.slot && *write_version > update.write_version) - { - continue; - } + // Skip writes that a different server has already sent + let pubkey_writes = latest_write.entry(update.slot).or_default(); + let pubkey_bytes = Pubkey::new(&update.pubkey).to_bytes(); + let writes = pubkey_writes.entry(pubkey_bytes).or_insert(0); + if update.write_version <= *writes { + continue; } - latest_write - .insert(update.pubkey.clone(), (update.slot, update.write_version)); + *writes = update.write_version; + latest_write.retain(|&k, _| k >= update.slot - latest_write_retention); account_write_queue_sender .send(AccountWrite { @@ -331,6 +369,7 @@ pub async fn process_events( .expect("send success"); } accountsdb_proto::update::UpdateOneof::Ping(_) => {} + accountsdb_proto::update::UpdateOneof::SubscribeResponse(_) => {} } } Message::Snapshot(update) => { diff --git a/proto/accountsdb.proto b/proto/accountsdb.proto index 15e9d07..2c2546e 100644 --- a/proto/accountsdb.proto +++ b/proto/accountsdb.proto @@ -18,6 +18,7 @@ message Update { AccountWrite account_write = 1; SlotUpdate slot_update = 2; Ping ping = 3; + SubscribeResponse subscribe_response = 4; } } @@ -46,3 +47,7 @@ message SlotUpdate { message Ping { } + +message SubscribeResponse { + uint64 highest_write_slot = 1; +} \ No newline at end of file