Use enum for slot status

This commit is contained in:
Christian Kamm 2021-11-08 12:39:56 +01:00
parent 7f6e25e107
commit a17f142875
12 changed files with 63 additions and 41 deletions

View File

@ -75,22 +75,22 @@ See `scripts/` for SQL that creates the target schema.
The Connector streams data into the `account_write` and `slot` tables. When The Connector streams data into the `account_write` and `slot` tables. When
slots become "rooted", older `account_write` data rooted slots is deleted. That slots become "rooted", older `account_write` data rooted slots is deleted. That
way the current account data for the latest rooted, committed or processed slot way the current account data for the latest rooted, confirmed or processed slot
can be queried, but older data is forgotten. can be queried, but older data is forgotten.
When new slots arrive, the `uncle` column is updated for "processed" and When new slots arrive, the `uncle` column is updated for "processed" and
"committed" slots to allow easy filtering of slots that are no longer part of "confirmed" slots to allow easy filtering of slots that are no longer part of
the chain. the chain.
Example for querying committed data: Example for querying confirmed data:
``` ```
SELECT DISTINCT ON(pubkey) * SELECT DISTINCT ON(pubkey) *
FROM account_write FROM account_write
INNER JOIN slot USING(slot) INNER JOIN slot USING(slot)
WHERE status = "rooted" OR (uncle = FALSE AND status = "committed") WHERE status = 'Rooted' OR (uncle = FALSE AND status = 'Confirmed')
ORDER BY pubkey, slot DESC, write_version DESC; ORDER BY pubkey, slot DESC, write_version DESC;
``` ```
For each pubkey, this gets the latest (most recent slot, most recent For each pubkey, this gets the latest (most recent slot, most recent
write_version) account data; limited to slots that are either rooted or write_version) account data; limited to slots that are either rooted or
(committed and not an uncle). (confirmed and not an uncle).

View File

@ -2,6 +2,12 @@
* This plugin implementation for PostgreSQL requires the following tables * This plugin implementation for PostgreSQL requires the following tables
*/ */
CREATE TYPE "SlotStatus" AS ENUM (
'Rooted',
'Confirmed',
'Processed'
);
-- The table storing account writes, keeping only the newest write_version per slot -- The table storing account writes, keeping only the newest write_version per slot
CREATE TABLE account_write ( CREATE TABLE account_write (
pubkey VARCHAR(44) NOT NULL, pubkey VARCHAR(44) NOT NULL,
@ -19,9 +25,10 @@ CREATE TABLE account_write (
CREATE TABLE slot ( CREATE TABLE slot (
slot BIGINT PRIMARY KEY, slot BIGINT PRIMARY KEY,
parent BIGINT, parent BIGINT,
status VARCHAR(16) NOT NULL, status "SlotStatus" NOT NULL,
uncle BOOL NOT NULL uncle BOOL NOT NULL
); );
CREATE INDEX ON slot (parent);
CREATE TYPE "PerpAccount" AS ( CREATE TYPE "PerpAccount" AS (
base_position INT8, base_position INT8,

View File

@ -5,15 +5,15 @@ CREATE VIEW account_rooted AS
* *
FROM account_write FROM account_write
INNER JOIN slot USING(slot) INNER JOIN slot USING(slot)
WHERE slot.status = 'rooted' WHERE slot.status = 'Rooted'
ORDER BY pubkey, slot DESC, write_version DESC; ORDER BY pubkey, slot DESC, write_version DESC;
CREATE VIEW account_committed AS CREATE VIEW account_confirmed AS
SELECT SELECT
DISTINCT ON(pubkey) DISTINCT ON(pubkey)
* *
FROM account_write FROM account_write
INNER JOIN slot USING(slot) INNER JOIN slot USING(slot)
WHERE (slot.status = 'committed' AND NOT slot.uncle) OR slot.status = 'rooted' WHERE (slot.status = 'Confirmed' AND NOT slot.uncle) OR slot.status = 'Rooted'
ORDER BY pubkey, slot DESC, write_version DESC; ORDER BY pubkey, slot DESC, write_version DESC;
CREATE VIEW account_processed AS CREATE VIEW account_processed AS
SELECT SELECT
@ -21,7 +21,7 @@ CREATE VIEW account_processed AS
* *
FROM account_write FROM account_write
INNER JOIN slot USING(slot) INNER JOIN slot USING(slot)
WHERE ((slot.status = 'committed' OR slot.status = 'processed') AND NOT slot.uncle) OR slot.status = 'rooted' WHERE ((slot.status = 'Confirmed' OR slot.status = 'Processed') AND NOT slot.uncle) OR slot.status = 'Rooted'
ORDER BY pubkey, slot DESC, write_version DESC; ORDER BY pubkey, slot DESC, write_version DESC;
CREATE VIEW mango_account_rooted AS CREATE VIEW mango_account_rooted AS
@ -30,15 +30,15 @@ CREATE VIEW mango_account_rooted AS
* *
FROM mango_account_write FROM mango_account_write
INNER JOIN slot USING(slot) INNER JOIN slot USING(slot)
WHERE slot.status = 'rooted' WHERE slot.status = 'Rooted'
ORDER BY pubkey, slot DESC, write_version DESC; ORDER BY pubkey, slot DESC, write_version DESC;
CREATE VIEW mango_account_committed AS CREATE VIEW mango_account_confirmed AS
SELECT SELECT
DISTINCT ON(pubkey) DISTINCT ON(pubkey)
* *
FROM mango_account_write FROM mango_account_write
INNER JOIN slot USING(slot) INNER JOIN slot USING(slot)
WHERE (slot.status = 'committed' AND NOT slot.uncle) OR slot.status = 'rooted' WHERE (slot.status = 'Confirmed' AND NOT slot.uncle) OR slot.status = 'Rooted'
ORDER BY pubkey, slot DESC, write_version DESC; ORDER BY pubkey, slot DESC, write_version DESC;
CREATE VIEW mango_account_processed AS CREATE VIEW mango_account_processed AS
SELECT SELECT
@ -46,7 +46,7 @@ CREATE VIEW mango_account_processed AS
* *
FROM mango_account_write FROM mango_account_write
INNER JOIN slot USING(slot) INNER JOIN slot USING(slot)
WHERE ((slot.status = 'committed' OR slot.status = 'processed') AND NOT slot.uncle) OR slot.status = 'rooted' WHERE ((slot.status = 'Confirmed' OR slot.status = 'Processed') AND NOT slot.uncle) OR slot.status = 'Rooted'
ORDER BY pubkey, slot DESC, write_version DESC; ORDER BY pubkey, slot DESC, write_version DESC;
CREATE VIEW mango_account_processed_balance AS CREATE VIEW mango_account_processed_balance AS

View File

@ -1,9 +1,9 @@
DROP VIEW account_rooted; DROP VIEW account_rooted;
DROP VIEW account_committed; DROP VIEW account_confirmed;
DROP VIEW account_processed; DROP VIEW account_processed;
DROP VIEW mango_account_rooted; DROP VIEW mango_account_rooted;
DROP VIEW mango_account_committed; DROP VIEW mango_account_confirmed;
DROP VIEW mango_account_processed; DROP VIEW mango_account_processed;
DROP VIEW mango_account_processed_balance; DROP VIEW mango_account_processed_balance;

View File

@ -2,6 +2,12 @@
* This plugin implementation for PostgreSQL requires the following tables * This plugin implementation for PostgreSQL requires the following tables
*/ */
CREATE TYPE "SlotStatus" AS ENUM (
'Rooted',
'Confirmed',
'Processed'
);
-- The table storing account writes, keeping only the newest write_version per slot -- The table storing account writes, keeping only the newest write_version per slot
CREATE TABLE account_write ( CREATE TABLE account_write (
pubkey VARCHAR(44) NOT NULL, pubkey VARCHAR(44) NOT NULL,
@ -19,6 +25,7 @@ CREATE TABLE account_write (
CREATE TABLE slot ( CREATE TABLE slot (
slot BIGINT PRIMARY KEY, slot BIGINT PRIMARY KEY,
parent BIGINT, parent BIGINT,
status VARCHAR(16) NOT NULL, status "SlotStatus" NOT NULL,
uncle BOOL NOT NULL uncle BOOL NOT NULL
); );
CREATE INDEX ON slot (parent);

View File

@ -5,15 +5,15 @@ CREATE VIEW account_rooted AS
* *
FROM account_write FROM account_write
INNER JOIN slot USING(slot) INNER JOIN slot USING(slot)
WHERE slot.status = 'rooted' WHERE slot.status = 'Rooted'
ORDER BY pubkey, slot DESC, write_version DESC; ORDER BY pubkey, slot DESC, write_version DESC;
CREATE VIEW account_committed AS CREATE VIEW account_confirmed AS
SELECT SELECT
DISTINCT ON(pubkey) DISTINCT ON(pubkey)
* *
FROM account_write FROM account_write
INNER JOIN slot USING(slot) INNER JOIN slot USING(slot)
WHERE (slot.status = 'committed' AND NOT slot.uncle) OR slot.status = 'rooted' WHERE (slot.status = 'Confirmed' AND NOT slot.uncle) OR slot.status = 'Rooted'
ORDER BY pubkey, slot DESC, write_version DESC; ORDER BY pubkey, slot DESC, write_version DESC;
CREATE VIEW account_processed AS CREATE VIEW account_processed AS
SELECT SELECT
@ -21,5 +21,5 @@ CREATE VIEW account_processed AS
* *
FROM account_write FROM account_write
INNER JOIN slot USING(slot) INNER JOIN slot USING(slot)
WHERE ((slot.status = 'committed' OR slot.status = 'processed') AND NOT slot.uncle) OR slot.status = 'rooted' WHERE ((slot.status = 'Confirmed' OR slot.status = 'Processed') AND NOT slot.uncle) OR slot.status = 'Rooted'
ORDER BY pubkey, slot DESC, write_version DESC; ORDER BY pubkey, slot DESC, write_version DESC;

View File

@ -4,3 +4,4 @@
DROP TABLE slot CASCADE; DROP TABLE slot CASCADE;
DROP TABLE account_write CASCADE; DROP TABLE account_write CASCADE;
DROP TYPE "SlotStatus";

View File

@ -1,3 +1,3 @@
DROP VIEW account_rooted; DROP VIEW account_rooted;
DROP VIEW account_committed; DROP VIEW account_confirmed;
DROP VIEW account_processed; DROP VIEW account_processed;

View File

@ -18,7 +18,7 @@ pub mod accountsdb_proto {
} }
use accountsdb_proto::accounts_db_client::AccountsDbClient; use accountsdb_proto::accounts_db_client::AccountsDbClient;
use crate::{AccountWrite, AnyhowWrap, Config, SlotUpdate}; use crate::{AccountWrite, AnyhowWrap, Config, SlotStatus, SlotUpdate};
type SnapshotData = Response<Vec<RpcKeyedAccount>>; type SnapshotData = Response<Vec<RpcKeyedAccount>>;
@ -181,13 +181,12 @@ pub async fn process_events(
} }
accountsdb_proto::update::UpdateOneof::SlotUpdate(update) => { accountsdb_proto::update::UpdateOneof::SlotUpdate(update) => {
use accountsdb_proto::slot_update::Status; use accountsdb_proto::slot_update::Status;
let status_string = match Status::from_i32(update.status) { let status = Status::from_i32(update.status).map(|v| match v {
Some(Status::Processed) => "processed", Status::Processed => SlotStatus::Processed,
Some(Status::Confirmed) => "confirmed", Status::Confirmed => SlotStatus::Confirmed,
Some(Status::Rooted) => "rooted", Status::Rooted => SlotStatus::Rooted,
None => "", });
}; if status.is_none() {
if status_string == "" {
error!("unexpected slot status: {}", update.status); error!("unexpected slot status: {}", update.status);
continue; continue;
} }
@ -195,7 +194,7 @@ pub async fn process_events(
.send(SlotUpdate { .send(SlotUpdate {
slot: update.slot as i64, // TODO: narrowing slot: update.slot as i64, // TODO: narrowing
parent: update.parent.map(|v| v as i64), parent: update.parent.map(|v| v as i64),
status: status_string.into(), status: status.expect("qed"),
}) })
.await .await
.expect("send success"); .expect("send success");

View File

@ -4,6 +4,7 @@ pub mod websocket_source;
use { use {
async_trait::async_trait, async_trait::async_trait,
postgres_types::ToSql,
serde_derive::Deserialize, serde_derive::Deserialize,
solana_sdk::{account::Account, pubkey::Pubkey}, solana_sdk::{account::Account, pubkey::Pubkey},
std::sync::Arc, std::sync::Arc,
@ -48,11 +49,18 @@ impl AccountWrite {
} }
} }
#[derive(Clone, PartialEq, Debug)] #[derive(Clone, Debug, PartialEq, ToSql)]
pub enum SlotStatus {
Rooted,
Confirmed,
Processed,
}
#[derive(Clone, Debug)]
pub struct SlotUpdate { pub struct SlotUpdate {
pub slot: i64, pub slot: i64,
pub parent: Option<i64>, pub parent: Option<i64>,
pub status: String, pub status: SlotStatus,
} }
#[derive(Clone, Debug, Deserialize)] #[derive(Clone, Debug, Deserialize)]

View File

@ -3,7 +3,7 @@ use log::*;
use postgres_query::{query, query_dyn}; use postgres_query::{query, query_dyn};
use std::{collections::HashMap, time::Duration}; use std::{collections::HashMap, time::Duration};
use crate::{AccountTables, AccountWrite, PostgresConfig, SlotUpdate}; use crate::{AccountTables, AccountWrite, PostgresConfig, SlotStatus, SlotUpdate};
async fn postgres_connection( async fn postgres_connection(
config: &PostgresConfig, config: &PostgresConfig,
@ -98,7 +98,7 @@ impl SlotsProcessing {
SELECT DISTINCT ON(pubkey) pubkey, slot, write_version SELECT DISTINCT ON(pubkey) pubkey, slot, write_version
FROM {table} FROM {table}
INNER JOIN slot USING(slot) INNER JOIN slot USING(slot)
WHERE slot <= $newest_final_slot AND status = 'rooted' WHERE slot <= $newest_final_slot AND status = 'Rooted'
ORDER BY pubkey, slot DESC, write_version DESC ORDER BY pubkey, slot DESC, write_version DESC
) latest_write ) latest_write
WHERE data.pubkey = latest_write.pubkey WHERE data.pubkey = latest_write.pubkey
@ -148,7 +148,7 @@ impl SlotsProcessing {
let _ = query.execute(client).await.context("updating slot row")?; let _ = query.execute(client).await.context("updating slot row")?;
} }
if update.status == "rooted" { if update.status == SlotStatus::Rooted {
self.slots.remove(&update.slot); self.slots.remove(&update.slot);
// TODO: should also convert all parents to rooted, just in case we missed an update? // TODO: should also convert all parents to rooted, just in case we missed an update?
@ -196,7 +196,7 @@ impl SlotsProcessing {
UNION ALL UNION ALL
SELECT s.*, depth + 1 FROM slot s SELECT s.*, depth + 1 FROM slot s
INNER JOIN liveslots l ON s.slot = l.parent INNER JOIN liveslots l ON s.slot = l.parent
WHERE l.status != 'rooted' AND depth < 1000 WHERE l.status != 'Rooted' AND depth < 1000
), ),
min_slot AS (SELECT min(slot) AS min_slot FROM liveslots) min_slot AS (SELECT min(slot) AS min_slot FROM liveslots)
UPDATE slot SET UPDATE slot SET

View File

@ -17,7 +17,7 @@ use std::{
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use crate::{AccountWrite, AnyhowWrap, Config, SlotUpdate}; use crate::{AccountWrite, AnyhowWrap, Config, SlotStatus, SlotUpdate};
enum WebsocketMessage { enum WebsocketMessage {
SingleUpdate(Response<RpcKeyedAccount>), SingleUpdate(Response<RpcKeyedAccount>),
@ -167,7 +167,7 @@ pub async fn process_events(
} => Some(SlotUpdate { } => Some(SlotUpdate {
slot: slot as i64, // TODO: narrowing slot: slot as i64, // TODO: narrowing
parent: Some(parent as i64), parent: Some(parent as i64),
status: "processed".into(), status: SlotStatus::Processed,
}), }),
solana_client::rpc_response::SlotUpdate::OptimisticConfirmation { solana_client::rpc_response::SlotUpdate::OptimisticConfirmation {
slot, slot,
@ -175,13 +175,13 @@ pub async fn process_events(
} => Some(SlotUpdate { } => Some(SlotUpdate {
slot: slot as i64, // TODO: narrowing slot: slot as i64, // TODO: narrowing
parent: None, parent: None,
status: "confirmed".into(), status: SlotStatus::Confirmed,
}), }),
solana_client::rpc_response::SlotUpdate::Root { slot, .. } => { solana_client::rpc_response::SlotUpdate::Root { slot, .. } => {
Some(SlotUpdate { Some(SlotUpdate {
slot: slot as i64, // TODO: narrowing slot: slot as i64, // TODO: narrowing
parent: None, parent: None,
status: "rooted".into(), status: SlotStatus::Rooted,
}) })
} }
_ => None, _ => None,