Keep slot, write_version etc as u64, convert only for postgres

This makes the conversion to i64 something that's local to the postgres
target, and doesn't affect unrelated code.
This commit is contained in:
Christian Kamm 2022-03-23 08:18:02 +01:00
parent 83e9c54fc0
commit 42f07d58a6
5 changed files with 80 additions and 44 deletions

View File

@ -45,6 +45,8 @@ impl AccountTable for MangoAccountTable {
let pubkey = encode_address(&account_write.pubkey); let pubkey = encode_address(&account_write.pubkey);
let data = MangoAccount::load_from_bytes(&account_write.data)?; let data = MangoAccount::load_from_bytes(&account_write.data)?;
let slot = account_write.slot as i64;
let write_version = account_write.write_version as i64;
let owner = encode_address(&data.owner); let owner = encode_address(&data.owner);
let mango_group = encode_address(&data.mango_group); let mango_group = encode_address(&data.mango_group);
let version = data.meta_data.version as i16; let version = data.meta_data.version as i16;
@ -128,8 +130,8 @@ impl AccountTable for MangoAccountTable {
) )
ON CONFLICT (pubkey_id, slot, write_version) DO NOTHING", ON CONFLICT (pubkey_id, slot, write_version) DO NOTHING",
pubkey, pubkey,
slot = account_write.slot, slot,
write_version = account_write.write_version, write_version,
version, version,
is_initialized = data.meta_data.is_initialized, is_initialized = data.meta_data.is_initialized,
extra_info, extra_info,
@ -210,6 +212,8 @@ impl AccountTable for MangoGroupTable {
let pubkey = encode_address(&account_write.pubkey); let pubkey = encode_address(&account_write.pubkey);
let data = MangoGroup::load_from_bytes(&account_write.data)?; let data = MangoGroup::load_from_bytes(&account_write.data)?;
let slot = account_write.slot as i64;
let write_version = account_write.write_version as i64;
let version = data.meta_data.version as i16; let version = data.meta_data.version as i16;
let extra_info = &data.meta_data.extra_info as &[u8]; let extra_info = &data.meta_data.extra_info as &[u8];
let num_oracles = data.num_oracles as i64; let num_oracles = data.num_oracles as i64;
@ -296,8 +300,8 @@ impl AccountTable for MangoGroupTable {
$padding) $padding)
ON CONFLICT (pubkey_id, slot, write_version) DO NOTHING", ON CONFLICT (pubkey_id, slot, write_version) DO NOTHING",
pubkey, pubkey,
slot = account_write.slot, slot,
write_version = account_write.write_version, write_version,
version, version,
is_initialized = data.meta_data.is_initialized, is_initialized = data.meta_data.is_initialized,
extra_info, extra_info,
@ -364,6 +368,8 @@ impl AccountTable for MangoCacheTable {
let pubkey = encode_address(&account_write.pubkey); let pubkey = encode_address(&account_write.pubkey);
let data = MangoCache::load_from_bytes(&account_write.data)?; let data = MangoCache::load_from_bytes(&account_write.data)?;
let slot = account_write.slot as i64;
let write_version = account_write.write_version as i64;
let version = data.meta_data.version as i16; let version = data.meta_data.version as i16;
let extra_info = &data.meta_data.extra_info as &[u8]; let extra_info = &data.meta_data.extra_info as &[u8];
let price_cache = data let price_cache = data
@ -405,8 +411,8 @@ impl AccountTable for MangoCacheTable {
$price_cache, $root_bank_cache, $perp_market_cache) $price_cache, $root_bank_cache, $perp_market_cache)
ON CONFLICT (pubkey_id, slot, write_version) DO NOTHING", ON CONFLICT (pubkey_id, slot, write_version) DO NOTHING",
pubkey, pubkey,
slot = account_write.slot, slot,
write_version = account_write.write_version, write_version,
version, version,
is_initialized = data.meta_data.is_initialized, is_initialized = data.meta_data.is_initialized,
extra_info, extra_info,

View File

@ -333,12 +333,12 @@ pub async fn process_events(
account_write_queue_sender account_write_queue_sender
.send(AccountWrite { .send(AccountWrite {
pubkey: Pubkey::new(&update.pubkey), pubkey: Pubkey::new(&update.pubkey),
slot: update.slot as i64, // TODO: narrowing slot: update.slot,
write_version: update.write_version as i64, write_version: update.write_version,
lamports: update.lamports as i64, lamports: update.lamports,
owner: Pubkey::new(&update.owner), owner: Pubkey::new(&update.owner),
executable: update.executable, executable: update.executable,
rent_epoch: update.rent_epoch as i64, rent_epoch: update.rent_epoch,
data: update.data, data: update.data,
is_selected: update.is_selected, is_selected: update.is_selected,
}) })
@ -360,8 +360,8 @@ pub async fn process_events(
continue; continue;
} }
let slot_update = SlotUpdate { let slot_update = SlotUpdate {
slot: update.slot as i64, // TODO: narrowing slot: update.slot,
parent: update.parent.map(|v| v as i64), parent: update.parent,
status: status.expect("qed"), status: status.expect("qed"),
}; };

View File

@ -27,26 +27,26 @@ impl<T, E: std::fmt::Debug> AnyhowWrap for Result<T, E> {
#[derive(Clone, PartialEq, Debug)] #[derive(Clone, PartialEq, Debug)]
pub struct AccountWrite { pub struct AccountWrite {
pub pubkey: Pubkey, pub pubkey: Pubkey,
pub slot: i64, pub slot: u64,
pub write_version: i64, pub write_version: u64,
pub lamports: i64, pub lamports: u64,
pub owner: Pubkey, pub owner: Pubkey,
pub executable: bool, pub executable: bool,
pub rent_epoch: i64, pub rent_epoch: u64,
pub data: Vec<u8>, pub data: Vec<u8>,
pub is_selected: bool, pub is_selected: bool,
} }
impl AccountWrite { impl AccountWrite {
fn from(pubkey: Pubkey, slot: u64, write_version: i64, account: Account) -> AccountWrite { fn from(pubkey: Pubkey, slot: u64, write_version: u64, account: Account) -> AccountWrite {
AccountWrite { AccountWrite {
pubkey, pubkey,
slot: slot as i64, // TODO: narrowing! slot: slot,
write_version, write_version,
lamports: account.lamports as i64, // TODO: narrowing! lamports: account.lamports,
owner: account.owner, owner: account.owner,
executable: account.executable, executable: account.executable,
rent_epoch: account.rent_epoch as i64, // TODO: narrowing! rent_epoch: account.rent_epoch,
data: account.data, data: account.data,
is_selected: true, is_selected: true,
} }
@ -62,9 +62,9 @@ pub enum SlotStatus {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct SlotUpdate { pub struct SlotUpdate {
pub slot: i64, pub slot: u64,
pub parent: Option<i64>, pub parent: Option<u64>,
pub status: SlotStatus, pub status: chain_data::SlotStatus,
} }
#[derive(Clone, Debug, Deserialize)] #[derive(Clone, Debug, Deserialize)]
@ -157,6 +157,10 @@ impl AccountTable for RawAccountTable {
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let pubkey = encode_address(&account_write.pubkey); let pubkey = encode_address(&account_write.pubkey);
let owner = encode_address(&account_write.owner); let owner = encode_address(&account_write.owner);
let slot = account_write.slot as i64;
let write_version = account_write.write_version as i64;
let lamports = account_write.lamports as i64;
let rent_epoch = account_write.rent_epoch as i64;
// TODO: should update for same write_version to work with websocket input // TODO: should update for same write_version to work with websocket input
let query = postgres_query::query!( let query = postgres_query::query!(
@ -168,13 +172,13 @@ impl AccountTable for RawAccountTable {
map_pubkey($owner), $lamports, $executable, $rent_epoch, $data) map_pubkey($owner), $lamports, $executable, $rent_epoch, $data)
ON CONFLICT (pubkey_id, slot, write_version) DO NOTHING", ON CONFLICT (pubkey_id, slot, write_version) DO NOTHING",
pubkey, pubkey,
slot = account_write.slot, slot,
write_version = account_write.write_version, write_version,
is_selected = account_write.is_selected, is_selected = account_write.is_selected,
owner, owner,
lamports = account_write.lamports, lamports,
executable = account_write.executable, executable = account_write.executable,
rent_epoch = account_write.rent_epoch, rent_epoch,
data = account_write.data, data = account_write.data,
); );
let _ = query.execute(client).await?; let _ = query.execute(client).await?;

View File

@ -7,6 +7,25 @@ use std::{collections::HashMap, convert::TryFrom, time::Duration};
use crate::{metrics, AccountTables, AccountWrite, PostgresConfig, SlotStatus, SlotUpdate}; use crate::{metrics, AccountTables, AccountWrite, PostgresConfig, SlotStatus, SlotUpdate};
mod pg {
#[derive(Clone, Copy, Debug, PartialEq, postgres_types::ToSql)]
pub enum SlotStatus {
Rooted,
Confirmed,
Processed,
}
impl From<super::SlotStatus> for SlotStatus {
fn from(status: super::SlotStatus) -> SlotStatus {
match status {
super::SlotStatus::Rooted => SlotStatus::Rooted,
super::SlotStatus::Confirmed => SlotStatus::Confirmed,
super::SlotStatus::Processed => SlotStatus::Processed,
}
}
}
}
async fn postgres_connection( async fn postgres_connection(
config: &PostgresConfig, config: &PostgresConfig,
metric_retries: metrics::MetricU64, metric_retries: metrics::MetricU64,
@ -98,9 +117,9 @@ async fn process_account_write(
struct Slots { struct Slots {
// non-rooted only // non-rooted only
slots: HashMap<i64, SlotUpdate>, slots: HashMap<u64, SlotUpdate>,
newest_processed_slot: Option<i64>, newest_processed_slot: Option<u64>,
newest_rooted_slot: Option<i64>, newest_rooted_slot: Option<u64>,
} }
#[derive(Default)] #[derive(Default)]
@ -134,14 +153,16 @@ impl Slots {
previous.parent = update.parent; previous.parent = update.parent;
result.parent_update = true; result.parent_update = true;
} }
} else if update.slot > self.newest_rooted_slot.unwrap_or(-1) { } else if self.newest_rooted_slot.is_none()
|| update.slot > self.newest_rooted_slot.unwrap()
{
self.slots.insert(update.slot, update.clone()); self.slots.insert(update.slot, update.clone());
} else { } else {
result.discard_old = true; result.discard_old = true;
} }
if update.status == SlotStatus::Rooted { if update.status == SlotStatus::Rooted {
let old_slots: Vec<i64> = self let old_slots: Vec<u64> = self
.slots .slots
.keys() .keys()
.filter(|s| **s <= update.slot) .filter(|s| **s <= update.slot)
@ -150,13 +171,14 @@ impl Slots {
for old_slot in old_slots { for old_slot in old_slots {
self.slots.remove(&old_slot); self.slots.remove(&old_slot);
} }
if self.newest_rooted_slot.unwrap_or(-1) < update.slot { if self.newest_rooted_slot.is_none() || self.newest_rooted_slot.unwrap() < update.slot {
self.newest_rooted_slot = Some(update.slot); self.newest_rooted_slot = Some(update.slot);
result.new_rooted_head = true; result.new_rooted_head = true;
} }
} }
if self.newest_processed_slot.unwrap_or(-1) < update.slot { if self.newest_processed_slot.is_none() || self.newest_processed_slot.unwrap() < update.slot
{
self.newest_processed_slot = Some(update.slot); self.newest_processed_slot = Some(update.slot);
result.new_processed_head = true; result.new_processed_head = true;
} }
@ -246,7 +268,10 @@ impl SlotsProcessing {
update: &SlotUpdate, update: &SlotUpdate,
meta: &SlotPreprocessing, meta: &SlotPreprocessing,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let slot = update.slot as i64;
let status: pg::SlotStatus = update.status.into();
if let Some(parent) = update.parent { if let Some(parent) = update.parent {
let parent = parent as i64;
let query = query!( let query = query!(
"INSERT INTO slot "INSERT INTO slot
(slot, parent, status, uncle) (slot, parent, status, uncle)
@ -254,9 +279,9 @@ impl SlotsProcessing {
($slot, $parent, $status, FALSE) ($slot, $parent, $status, FALSE)
ON CONFLICT (slot) DO UPDATE SET ON CONFLICT (slot) DO UPDATE SET
parent=$parent, status=$status", parent=$parent, status=$status",
slot = update.slot, slot,
parent = parent, parent,
status = update.status, status,
); );
let _ = query.execute(client).await.context("updating slot row")?; let _ = query.execute(client).await.context("updating slot row")?;
} else { } else {
@ -267,20 +292,21 @@ impl SlotsProcessing {
($slot, NULL, $status, FALSE) ($slot, NULL, $status, FALSE)
ON CONFLICT (slot) DO UPDATE SET ON CONFLICT (slot) DO UPDATE SET
status=$status", status=$status",
slot = update.slot, slot,
status = update.status, status,
); );
let _ = query.execute(client).await.context("updating slot row")?; let _ = query.execute(client).await.context("updating slot row")?;
} }
if meta.new_rooted_head { if meta.new_rooted_head {
let slot = update.slot as i64;
// Mark preceeding non-uncle slots as rooted // Mark preceeding non-uncle slots as rooted
let query = query!( let query = query!(
"UPDATE slot SET status = 'Rooted' "UPDATE slot SET status = 'Rooted'
WHERE slot < $newest_final_slot WHERE slot < $newest_final_slot
AND (NOT uncle) AND (NOT uncle)
AND status != 'Rooted'", AND status != 'Rooted'",
newest_final_slot = update.slot newest_final_slot = slot
); );
let _ = query let _ = query
.execute(client) .execute(client)

View File

@ -169,21 +169,21 @@ pub async fn process_events(
solana_client::rpc_response::SlotUpdate::CreatedBank { solana_client::rpc_response::SlotUpdate::CreatedBank {
slot, parent, .. slot, parent, ..
} => Some(SlotUpdate { } => Some(SlotUpdate {
slot: slot as i64, // TODO: narrowing slot,
parent: Some(parent as i64), parent: Some(parent),
status: SlotStatus::Processed, status: SlotStatus::Processed,
}), }),
solana_client::rpc_response::SlotUpdate::OptimisticConfirmation { solana_client::rpc_response::SlotUpdate::OptimisticConfirmation {
slot, slot,
.. ..
} => Some(SlotUpdate { } => Some(SlotUpdate {
slot: slot as i64, // TODO: narrowing slot,
parent: None, parent: None,
status: SlotStatus::Confirmed, status: SlotStatus::Confirmed,
}), }),
solana_client::rpc_response::SlotUpdate::Root { slot, .. } => { solana_client::rpc_response::SlotUpdate::Root { slot, .. } => {
Some(SlotUpdate { Some(SlotUpdate {
slot: slot as i64, // TODO: narrowing slot,
parent: None, parent: None,
status: SlotStatus::Rooted, status: SlotStatus::Rooted,
}) })