From a17f1428753e6526bde37d213e186ea81d095ee1 Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Mon, 8 Nov 2021 12:39:56 +0100 Subject: [PATCH] Use enum for slot status --- README.md | 10 +++++----- connector-mango/scripts/create_schema.sql | 9 ++++++++- connector-mango/scripts/create_views.sql | 16 ++++++++-------- connector-mango/scripts/drop_views.sql | 4 ++-- connector-raw/scripts/create_schema.sql | 9 ++++++++- connector-raw/scripts/create_views.sql | 8 ++++---- connector-raw/scripts/drop_schema.sql | 1 + connector-raw/scripts/drop_views.sql | 2 +- lib/src/grpc_plugin_source.rs | 17 ++++++++--------- lib/src/lib.rs | 12 ++++++++++-- lib/src/postgres_target.rs | 8 ++++---- lib/src/websocket_source.rs | 8 ++++---- 12 files changed, 63 insertions(+), 41 deletions(-) diff --git a/README.md b/README.md index 7acc5b9..c41071b 100644 --- a/README.md +++ b/README.md @@ -75,22 +75,22 @@ See `scripts/` for SQL that creates the target schema. The Connector streams data into the `account_write` and `slot` tables. When slots become "rooted", older `account_write` data rooted slots is deleted. That -way the current account data for the latest rooted, committed or processed slot +way the current account data for the latest rooted, confirmed or processed slot can be queried, but older data is forgotten. When new slots arrive, the `uncle` column is updated for "processed" and -"committed" slots to allow easy filtering of slots that are no longer part of +"confirmed" slots to allow easy filtering of slots that are no longer part of the chain. -Example for querying committed data: +Example for querying confirmed data: ``` SELECT DISTINCT ON(pubkey) * FROM account_write INNER JOIN slot USING(slot) -WHERE status = "rooted" OR (uncle = FALSE AND status = "committed") +WHERE status = 'Rooted' OR (uncle = FALSE AND status = 'Confirmed') ORDER BY pubkey, slot DESC, write_version DESC; ``` For each pubkey, this gets the latest (most recent slot, most recent write_version) account data; limited to slots that are either rooted or -(committed and not an uncle). +(confirmed and not an uncle). diff --git a/connector-mango/scripts/create_schema.sql b/connector-mango/scripts/create_schema.sql index 73292aa..411f32e 100644 --- a/connector-mango/scripts/create_schema.sql +++ b/connector-mango/scripts/create_schema.sql @@ -2,6 +2,12 @@ * This plugin implementation for PostgreSQL requires the following tables */ +CREATE TYPE "SlotStatus" AS ENUM ( + 'Rooted', + 'Confirmed', + 'Processed' +); + -- The table storing account writes, keeping only the newest write_version per slot CREATE TABLE account_write ( pubkey VARCHAR(44) NOT NULL, @@ -19,9 +25,10 @@ CREATE TABLE account_write ( CREATE TABLE slot ( slot BIGINT PRIMARY KEY, parent BIGINT, - status VARCHAR(16) NOT NULL, + status "SlotStatus" NOT NULL, uncle BOOL NOT NULL ); +CREATE INDEX ON slot (parent); CREATE TYPE "PerpAccount" AS ( base_position INT8, diff --git a/connector-mango/scripts/create_views.sql b/connector-mango/scripts/create_views.sql index f8196ab..af6eef7 100644 --- a/connector-mango/scripts/create_views.sql +++ b/connector-mango/scripts/create_views.sql @@ -5,15 +5,15 @@ CREATE VIEW account_rooted AS * FROM account_write INNER JOIN slot USING(slot) - WHERE slot.status = 'rooted' + WHERE slot.status = 'Rooted' ORDER BY pubkey, slot DESC, write_version DESC; -CREATE VIEW account_committed AS +CREATE VIEW account_confirmed AS SELECT DISTINCT ON(pubkey) * FROM account_write INNER JOIN slot USING(slot) - WHERE (slot.status = 'committed' AND NOT slot.uncle) OR slot.status = 'rooted' + WHERE (slot.status = 'Confirmed' AND NOT slot.uncle) OR slot.status = 'Rooted' ORDER BY pubkey, slot DESC, write_version DESC; CREATE VIEW account_processed AS SELECT @@ -21,7 +21,7 @@ CREATE VIEW account_processed AS * FROM account_write INNER JOIN slot USING(slot) - WHERE ((slot.status = 'committed' OR slot.status = 'processed') AND NOT slot.uncle) OR slot.status = 'rooted' + WHERE ((slot.status = 'Confirmed' OR slot.status = 'Processed') AND NOT slot.uncle) OR slot.status = 'Rooted' ORDER BY pubkey, slot DESC, write_version DESC; CREATE VIEW mango_account_rooted AS @@ -30,15 +30,15 @@ CREATE VIEW mango_account_rooted AS * FROM mango_account_write INNER JOIN slot USING(slot) - WHERE slot.status = 'rooted' + WHERE slot.status = 'Rooted' ORDER BY pubkey, slot DESC, write_version DESC; -CREATE VIEW mango_account_committed AS +CREATE VIEW mango_account_confirmed AS SELECT DISTINCT ON(pubkey) * FROM mango_account_write INNER JOIN slot USING(slot) - WHERE (slot.status = 'committed' AND NOT slot.uncle) OR slot.status = 'rooted' + WHERE (slot.status = 'Confirmed' AND NOT slot.uncle) OR slot.status = 'Rooted' ORDER BY pubkey, slot DESC, write_version DESC; CREATE VIEW mango_account_processed AS SELECT @@ -46,7 +46,7 @@ CREATE VIEW mango_account_processed AS * FROM mango_account_write INNER JOIN slot USING(slot) - WHERE ((slot.status = 'committed' OR slot.status = 'processed') AND NOT slot.uncle) OR slot.status = 'rooted' + WHERE ((slot.status = 'Confirmed' OR slot.status = 'Processed') AND NOT slot.uncle) OR slot.status = 'Rooted' ORDER BY pubkey, slot DESC, write_version DESC; CREATE VIEW mango_account_processed_balance AS diff --git a/connector-mango/scripts/drop_views.sql b/connector-mango/scripts/drop_views.sql index ecd8e1f..ac5208b 100644 --- a/connector-mango/scripts/drop_views.sql +++ b/connector-mango/scripts/drop_views.sql @@ -1,9 +1,9 @@ DROP VIEW account_rooted; -DROP VIEW account_committed; +DROP VIEW account_confirmed; DROP VIEW account_processed; DROP VIEW mango_account_rooted; -DROP VIEW mango_account_committed; +DROP VIEW mango_account_confirmed; DROP VIEW mango_account_processed; DROP VIEW mango_account_processed_balance; diff --git a/connector-raw/scripts/create_schema.sql b/connector-raw/scripts/create_schema.sql index b04ae4b..af4ef1a 100644 --- a/connector-raw/scripts/create_schema.sql +++ b/connector-raw/scripts/create_schema.sql @@ -2,6 +2,12 @@ * This plugin implementation for PostgreSQL requires the following tables */ +CREATE TYPE "SlotStatus" AS ENUM ( + 'Rooted', + 'Confirmed', + 'Processed' +); + -- The table storing account writes, keeping only the newest write_version per slot CREATE TABLE account_write ( pubkey VARCHAR(44) NOT NULL, @@ -19,6 +25,7 @@ CREATE TABLE account_write ( CREATE TABLE slot ( slot BIGINT PRIMARY KEY, parent BIGINT, - status VARCHAR(16) NOT NULL, + status "SlotStatus" NOT NULL, uncle BOOL NOT NULL ); +CREATE INDEX ON slot (parent); \ No newline at end of file diff --git a/connector-raw/scripts/create_views.sql b/connector-raw/scripts/create_views.sql index 5d15cc0..463657a 100644 --- a/connector-raw/scripts/create_views.sql +++ b/connector-raw/scripts/create_views.sql @@ -5,15 +5,15 @@ CREATE VIEW account_rooted AS * FROM account_write INNER JOIN slot USING(slot) - WHERE slot.status = 'rooted' + WHERE slot.status = 'Rooted' ORDER BY pubkey, slot DESC, write_version DESC; -CREATE VIEW account_committed AS +CREATE VIEW account_confirmed AS SELECT DISTINCT ON(pubkey) * FROM account_write INNER JOIN slot USING(slot) - WHERE (slot.status = 'committed' AND NOT slot.uncle) OR slot.status = 'rooted' + WHERE (slot.status = 'Confirmed' AND NOT slot.uncle) OR slot.status = 'Rooted' ORDER BY pubkey, slot DESC, write_version DESC; CREATE VIEW account_processed AS SELECT @@ -21,5 +21,5 @@ CREATE VIEW account_processed AS * FROM account_write INNER JOIN slot USING(slot) - WHERE ((slot.status = 'committed' OR slot.status = 'processed') AND NOT slot.uncle) OR slot.status = 'rooted' + WHERE ((slot.status = 'Confirmed' OR slot.status = 'Processed') AND NOT slot.uncle) OR slot.status = 'Rooted' ORDER BY pubkey, slot DESC, write_version DESC; diff --git a/connector-raw/scripts/drop_schema.sql b/connector-raw/scripts/drop_schema.sql index 5e0a0b2..9a2ba81 100644 --- a/connector-raw/scripts/drop_schema.sql +++ b/connector-raw/scripts/drop_schema.sql @@ -4,3 +4,4 @@ DROP TABLE slot CASCADE; DROP TABLE account_write CASCADE; +DROP TYPE "SlotStatus"; \ No newline at end of file diff --git a/connector-raw/scripts/drop_views.sql b/connector-raw/scripts/drop_views.sql index 791ffb5..ed9953b 100644 --- a/connector-raw/scripts/drop_views.sql +++ b/connector-raw/scripts/drop_views.sql @@ -1,3 +1,3 @@ DROP VIEW account_rooted; -DROP VIEW account_committed; +DROP VIEW account_confirmed; DROP VIEW account_processed; diff --git a/lib/src/grpc_plugin_source.rs b/lib/src/grpc_plugin_source.rs index 7d924bd..58a0b72 100644 --- a/lib/src/grpc_plugin_source.rs +++ b/lib/src/grpc_plugin_source.rs @@ -18,7 +18,7 @@ pub mod accountsdb_proto { } use accountsdb_proto::accounts_db_client::AccountsDbClient; -use crate::{AccountWrite, AnyhowWrap, Config, SlotUpdate}; +use crate::{AccountWrite, AnyhowWrap, Config, SlotStatus, SlotUpdate}; type SnapshotData = Response>; @@ -181,13 +181,12 @@ pub async fn process_events( } accountsdb_proto::update::UpdateOneof::SlotUpdate(update) => { use accountsdb_proto::slot_update::Status; - let status_string = match Status::from_i32(update.status) { - Some(Status::Processed) => "processed", - Some(Status::Confirmed) => "confirmed", - Some(Status::Rooted) => "rooted", - None => "", - }; - if status_string == "" { + let status = Status::from_i32(update.status).map(|v| match v { + Status::Processed => SlotStatus::Processed, + Status::Confirmed => SlotStatus::Confirmed, + Status::Rooted => SlotStatus::Rooted, + }); + if status.is_none() { error!("unexpected slot status: {}", update.status); continue; } @@ -195,7 +194,7 @@ pub async fn process_events( .send(SlotUpdate { slot: update.slot as i64, // TODO: narrowing parent: update.parent.map(|v| v as i64), - status: status_string.into(), + status: status.expect("qed"), }) .await .expect("send success"); diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 6d96a39..ec0018b 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -4,6 +4,7 @@ pub mod websocket_source; use { async_trait::async_trait, + postgres_types::ToSql, serde_derive::Deserialize, solana_sdk::{account::Account, pubkey::Pubkey}, std::sync::Arc, @@ -48,11 +49,18 @@ impl AccountWrite { } } -#[derive(Clone, PartialEq, Debug)] +#[derive(Clone, Debug, PartialEq, ToSql)] +pub enum SlotStatus { + Rooted, + Confirmed, + Processed, +} + +#[derive(Clone, Debug)] pub struct SlotUpdate { pub slot: i64, pub parent: Option, - pub status: String, + pub status: SlotStatus, } #[derive(Clone, Debug, Deserialize)] diff --git a/lib/src/postgres_target.rs b/lib/src/postgres_target.rs index a63b324..a8186ac 100644 --- a/lib/src/postgres_target.rs +++ b/lib/src/postgres_target.rs @@ -3,7 +3,7 @@ use log::*; use postgres_query::{query, query_dyn}; use std::{collections::HashMap, time::Duration}; -use crate::{AccountTables, AccountWrite, PostgresConfig, SlotUpdate}; +use crate::{AccountTables, AccountWrite, PostgresConfig, SlotStatus, SlotUpdate}; async fn postgres_connection( config: &PostgresConfig, @@ -98,7 +98,7 @@ impl SlotsProcessing { SELECT DISTINCT ON(pubkey) pubkey, slot, write_version FROM {table} INNER JOIN slot USING(slot) - WHERE slot <= $newest_final_slot AND status = 'rooted' + WHERE slot <= $newest_final_slot AND status = 'Rooted' ORDER BY pubkey, slot DESC, write_version DESC ) latest_write WHERE data.pubkey = latest_write.pubkey @@ -148,7 +148,7 @@ impl SlotsProcessing { let _ = query.execute(client).await.context("updating slot row")?; } - if update.status == "rooted" { + if update.status == SlotStatus::Rooted { self.slots.remove(&update.slot); // TODO: should also convert all parents to rooted, just in case we missed an update? @@ -196,7 +196,7 @@ impl SlotsProcessing { UNION ALL SELECT s.*, depth + 1 FROM slot s INNER JOIN liveslots l ON s.slot = l.parent - WHERE l.status != 'rooted' AND depth < 1000 + WHERE l.status != 'Rooted' AND depth < 1000 ), min_slot AS (SELECT min(slot) AS min_slot FROM liveslots) UPDATE slot SET diff --git a/lib/src/websocket_source.rs b/lib/src/websocket_source.rs index 5800cb8..9b7abc3 100644 --- a/lib/src/websocket_source.rs +++ b/lib/src/websocket_source.rs @@ -17,7 +17,7 @@ use std::{ time::{Duration, Instant}, }; -use crate::{AccountWrite, AnyhowWrap, Config, SlotUpdate}; +use crate::{AccountWrite, AnyhowWrap, Config, SlotStatus, SlotUpdate}; enum WebsocketMessage { SingleUpdate(Response), @@ -167,7 +167,7 @@ pub async fn process_events( } => Some(SlotUpdate { slot: slot as i64, // TODO: narrowing parent: Some(parent as i64), - status: "processed".into(), + status: SlotStatus::Processed, }), solana_client::rpc_response::SlotUpdate::OptimisticConfirmation { slot, @@ -175,13 +175,13 @@ pub async fn process_events( } => Some(SlotUpdate { slot: slot as i64, // TODO: narrowing parent: None, - status: "confirmed".into(), + status: SlotStatus::Confirmed, }), solana_client::rpc_response::SlotUpdate::Root { slot, .. } => { Some(SlotUpdate { slot: slot as i64, // TODO: narrowing parent: None, - status: "rooted".into(), + status: SlotStatus::Rooted, }) } _ => None,