From 42f07d58a6ad764a009513d553f9cbd7c392743c Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Wed, 23 Mar 2022 08:18:02 +0100 Subject: [PATCH] Keep slot, write_version etc as u64, convert only for postgres This makes the conversion to i64 something that's local to the postgres target, and doesn't affect unrelated code. --- connector-mango/src/mango.rs | 18 ++++++++---- lib/src/grpc_plugin_source.rs | 12 ++++---- lib/src/lib.rs | 34 +++++++++++++---------- lib/src/postgres_target.rs | 52 ++++++++++++++++++++++++++--------- lib/src/websocket_source.rs | 8 +++--- 5 files changed, 80 insertions(+), 44 deletions(-) diff --git a/connector-mango/src/mango.rs b/connector-mango/src/mango.rs index 1548c0f..c69dbcf 100644 --- a/connector-mango/src/mango.rs +++ b/connector-mango/src/mango.rs @@ -45,6 +45,8 @@ impl AccountTable for MangoAccountTable { let pubkey = encode_address(&account_write.pubkey); let data = MangoAccount::load_from_bytes(&account_write.data)?; + let slot = account_write.slot as i64; + let write_version = account_write.write_version as i64; let owner = encode_address(&data.owner); let mango_group = encode_address(&data.mango_group); let version = data.meta_data.version as i16; @@ -128,8 +130,8 @@ impl AccountTable for MangoAccountTable { ) ON CONFLICT (pubkey_id, slot, write_version) DO NOTHING", pubkey, - slot = account_write.slot, - write_version = account_write.write_version, + slot, + write_version, version, is_initialized = data.meta_data.is_initialized, extra_info, @@ -210,6 +212,8 @@ impl AccountTable for MangoGroupTable { let pubkey = encode_address(&account_write.pubkey); let data = MangoGroup::load_from_bytes(&account_write.data)?; + let slot = account_write.slot as i64; + let write_version = account_write.write_version as i64; let version = data.meta_data.version as i16; let extra_info = &data.meta_data.extra_info as &[u8]; let num_oracles = data.num_oracles as i64; @@ -296,8 +300,8 @@ impl AccountTable for MangoGroupTable { $padding) ON CONFLICT (pubkey_id, slot, write_version) DO NOTHING", pubkey, - slot = account_write.slot, - write_version = account_write.write_version, + slot, + write_version, version, is_initialized = data.meta_data.is_initialized, extra_info, @@ -364,6 +368,8 @@ impl AccountTable for MangoCacheTable { let pubkey = encode_address(&account_write.pubkey); let data = MangoCache::load_from_bytes(&account_write.data)?; + let slot = account_write.slot as i64; + let write_version = account_write.write_version as i64; let version = data.meta_data.version as i16; let extra_info = &data.meta_data.extra_info as &[u8]; let price_cache = data @@ -405,8 +411,8 @@ impl AccountTable for MangoCacheTable { $price_cache, $root_bank_cache, $perp_market_cache) ON CONFLICT (pubkey_id, slot, write_version) DO NOTHING", pubkey, - slot = account_write.slot, - write_version = account_write.write_version, + slot, + write_version, version, is_initialized = data.meta_data.is_initialized, extra_info, diff --git a/lib/src/grpc_plugin_source.rs b/lib/src/grpc_plugin_source.rs index 76079b2..c7a9ad6 100644 --- a/lib/src/grpc_plugin_source.rs +++ b/lib/src/grpc_plugin_source.rs @@ -333,12 +333,12 @@ pub async fn process_events( account_write_queue_sender .send(AccountWrite { pubkey: Pubkey::new(&update.pubkey), - slot: update.slot as i64, // TODO: narrowing - write_version: update.write_version as i64, - lamports: update.lamports as i64, + slot: update.slot, + write_version: update.write_version, + lamports: update.lamports, owner: Pubkey::new(&update.owner), executable: update.executable, - rent_epoch: update.rent_epoch as i64, + rent_epoch: update.rent_epoch, data: update.data, is_selected: update.is_selected, }) @@ -360,8 +360,8 @@ pub async fn process_events( continue; } let slot_update = SlotUpdate { - slot: update.slot as i64, // TODO: narrowing - parent: update.parent.map(|v| v as i64), + slot: update.slot, + parent: update.parent, status: status.expect("qed"), }; diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 7f11052..2f724a9 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -27,26 +27,26 @@ impl AnyhowWrap for Result { #[derive(Clone, PartialEq, Debug)] pub struct AccountWrite { pub pubkey: Pubkey, - pub slot: i64, - pub write_version: i64, - pub lamports: i64, + pub slot: u64, + pub write_version: u64, + pub lamports: u64, pub owner: Pubkey, pub executable: bool, - pub rent_epoch: i64, + pub rent_epoch: u64, pub data: Vec, pub is_selected: bool, } impl AccountWrite { - fn from(pubkey: Pubkey, slot: u64, write_version: i64, account: Account) -> AccountWrite { + fn from(pubkey: Pubkey, slot: u64, write_version: u64, account: Account) -> AccountWrite { AccountWrite { pubkey, - slot: slot as i64, // TODO: narrowing! + slot: slot, write_version, - lamports: account.lamports as i64, // TODO: narrowing! + lamports: account.lamports, owner: account.owner, executable: account.executable, - rent_epoch: account.rent_epoch as i64, // TODO: narrowing! + rent_epoch: account.rent_epoch, data: account.data, is_selected: true, } @@ -62,9 +62,9 @@ pub enum SlotStatus { #[derive(Clone, Debug)] pub struct SlotUpdate { - pub slot: i64, - pub parent: Option, - pub status: SlotStatus, + pub slot: u64, + pub parent: Option, + pub status: chain_data::SlotStatus, } #[derive(Clone, Debug, Deserialize)] @@ -157,6 +157,10 @@ impl AccountTable for RawAccountTable { ) -> anyhow::Result<()> { let pubkey = encode_address(&account_write.pubkey); let owner = encode_address(&account_write.owner); + let slot = account_write.slot as i64; + let write_version = account_write.write_version as i64; + let lamports = account_write.lamports as i64; + let rent_epoch = account_write.rent_epoch as i64; // TODO: should update for same write_version to work with websocket input let query = postgres_query::query!( @@ -168,13 +172,13 @@ impl AccountTable for RawAccountTable { map_pubkey($owner), $lamports, $executable, $rent_epoch, $data) ON CONFLICT (pubkey_id, slot, write_version) DO NOTHING", pubkey, - slot = account_write.slot, - write_version = account_write.write_version, + slot, + write_version, is_selected = account_write.is_selected, owner, - lamports = account_write.lamports, + lamports, executable = account_write.executable, - rent_epoch = account_write.rent_epoch, + rent_epoch, data = account_write.data, ); let _ = query.execute(client).await?; diff --git a/lib/src/postgres_target.rs b/lib/src/postgres_target.rs index bd0aabf..7a587e7 100644 --- a/lib/src/postgres_target.rs +++ b/lib/src/postgres_target.rs @@ -7,6 +7,25 @@ use std::{collections::HashMap, convert::TryFrom, time::Duration}; use crate::{metrics, AccountTables, AccountWrite, PostgresConfig, SlotStatus, SlotUpdate}; +mod pg { + #[derive(Clone, Copy, Debug, PartialEq, postgres_types::ToSql)] + pub enum SlotStatus { + Rooted, + Confirmed, + Processed, + } + + impl From for SlotStatus { + fn from(status: super::SlotStatus) -> SlotStatus { + match status { + super::SlotStatus::Rooted => SlotStatus::Rooted, + super::SlotStatus::Confirmed => SlotStatus::Confirmed, + super::SlotStatus::Processed => SlotStatus::Processed, + } + } + } +} + async fn postgres_connection( config: &PostgresConfig, metric_retries: metrics::MetricU64, @@ -98,9 +117,9 @@ async fn process_account_write( struct Slots { // non-rooted only - slots: HashMap, - newest_processed_slot: Option, - newest_rooted_slot: Option, + slots: HashMap, + newest_processed_slot: Option, + newest_rooted_slot: Option, } #[derive(Default)] @@ -134,14 +153,16 @@ impl Slots { previous.parent = update.parent; result.parent_update = true; } - } else if update.slot > self.newest_rooted_slot.unwrap_or(-1) { + } else if self.newest_rooted_slot.is_none() + || update.slot > self.newest_rooted_slot.unwrap() + { self.slots.insert(update.slot, update.clone()); } else { result.discard_old = true; } if update.status == SlotStatus::Rooted { - let old_slots: Vec = self + let old_slots: Vec = self .slots .keys() .filter(|s| **s <= update.slot) @@ -150,13 +171,14 @@ impl Slots { for old_slot in old_slots { self.slots.remove(&old_slot); } - if self.newest_rooted_slot.unwrap_or(-1) < update.slot { + if self.newest_rooted_slot.is_none() || self.newest_rooted_slot.unwrap() < update.slot { self.newest_rooted_slot = Some(update.slot); result.new_rooted_head = true; } } - if self.newest_processed_slot.unwrap_or(-1) < update.slot { + if self.newest_processed_slot.is_none() || self.newest_processed_slot.unwrap() < update.slot + { self.newest_processed_slot = Some(update.slot); result.new_processed_head = true; } @@ -246,7 +268,10 @@ impl SlotsProcessing { update: &SlotUpdate, meta: &SlotPreprocessing, ) -> anyhow::Result<()> { + let slot = update.slot as i64; + let status: pg::SlotStatus = update.status.into(); if let Some(parent) = update.parent { + let parent = parent as i64; let query = query!( "INSERT INTO slot (slot, parent, status, uncle) @@ -254,9 +279,9 @@ impl SlotsProcessing { ($slot, $parent, $status, FALSE) ON CONFLICT (slot) DO UPDATE SET parent=$parent, status=$status", - slot = update.slot, - parent = parent, - status = update.status, + slot, + parent, + status, ); let _ = query.execute(client).await.context("updating slot row")?; } else { @@ -267,20 +292,21 @@ impl SlotsProcessing { ($slot, NULL, $status, FALSE) ON CONFLICT (slot) DO UPDATE SET status=$status", - slot = update.slot, - status = update.status, + slot, + status, ); let _ = query.execute(client).await.context("updating slot row")?; } if meta.new_rooted_head { + let slot = update.slot as i64; // Mark preceeding non-uncle slots as rooted let query = query!( "UPDATE slot SET status = 'Rooted' WHERE slot < $newest_final_slot AND (NOT uncle) AND status != 'Rooted'", - newest_final_slot = update.slot + newest_final_slot = slot ); let _ = query .execute(client) diff --git a/lib/src/websocket_source.rs b/lib/src/websocket_source.rs index be93b39..fc26aa7 100644 --- a/lib/src/websocket_source.rs +++ b/lib/src/websocket_source.rs @@ -169,21 +169,21 @@ pub async fn process_events( solana_client::rpc_response::SlotUpdate::CreatedBank { slot, parent, .. } => Some(SlotUpdate { - slot: slot as i64, // TODO: narrowing - parent: Some(parent as i64), + slot, + parent: Some(parent), status: SlotStatus::Processed, }), solana_client::rpc_response::SlotUpdate::OptimisticConfirmation { slot, .. } => Some(SlotUpdate { - slot: slot as i64, // TODO: narrowing + slot, parent: None, status: SlotStatus::Confirmed, }), solana_client::rpc_response::SlotUpdate::Root { slot, .. } => { Some(SlotUpdate { - slot: slot as i64, // TODO: narrowing + slot, parent: None, status: SlotStatus::Rooted, })