Track "is_selected" to denote closed/reused accounts

This commit is contained in:
Christian Kamm 2022-01-26 10:06:03 +01:00
parent 66266a3469
commit 80423fdac9
7 changed files with 134 additions and 69 deletions

View File

@ -65,6 +65,7 @@ CREATE TABLE account_write (
pubkey_id BIGINT NOT NULL REFERENCES pubkey,
slot BIGINT NOT NULL,
write_version BIGINT NOT NULL,
is_selected BOOL NOT NULL,
owner_id BIGINT REFERENCES pubkey,
lamports BIGINT NOT NULL,
executable BOOL NOT NULL,

View File

@ -1,72 +1,86 @@
-- Views for raw accounts
CREATE VIEW account_rooted AS
SELECT
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
pubkey, account_write.*
account_write.*
FROM account_write
LEFT JOIN slot USING(slot)
INNER JOIN pubkey USING(pubkey_id)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND (slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC;
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW account_confirmed AS
SELECT
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
pubkey, account_write.*
account_write.*
FROM account_write
LEFT JOIN slot USING(slot)
INNER JOIN pubkey USING(pubkey_id)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND ((slot.status = 'Confirmed' AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC;
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW account_processed AS
SELECT
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
pubkey, account_write.*
account_write.*
FROM account_write
LEFT JOIN slot USING(slot)
INNER JOIN pubkey USING(pubkey_id)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND (((slot.status = 'Confirmed' OR slot.status = 'Processed') AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC;
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW mango_account_rooted AS
SELECT
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
pubkey, mango_account_write.*
mango_account_write.*
FROM mango_account_write
LEFT JOIN slot USING(slot)
INNER JOIN pubkey using(pubkey_id)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND (slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC;
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN account_write USING(pubkey_id, slot, write_version)
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW mango_account_confirmed AS
SELECT
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
pubkey, mango_account_write.*
mango_account_write.*
FROM mango_account_write
LEFT JOIN slot USING(slot)
INNER JOIN pubkey using(pubkey_id)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND ((slot.status = 'Confirmed' AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC;
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN account_write USING(pubkey_id, slot, write_version)
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW mango_account_processed AS
SELECT
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
pubkey, mango_account_write.*
mango_account_write.*
FROM mango_account_write
LEFT JOIN slot USING(slot)
INNER JOIN pubkey using(pubkey_id)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND (((slot.status = 'Confirmed' OR slot.status = 'Processed') AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC;
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN account_write USING(pubkey_id, slot, write_version)
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW mango_account_processed_balance AS
SELECT
@ -90,69 +104,87 @@ CREATE VIEW mango_account_processed_perp AS
) q;
CREATE VIEW mango_group_rooted AS
SELECT
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
pubkey, mango_group_write.*
mango_group_write.*
FROM mango_group_write
LEFT JOIN slot USING(slot)
INNER JOIN pubkey using(pubkey_id)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND (slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC;
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN account_write USING(pubkey_id, slot, write_version)
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW mango_group_confirmed AS
SELECT
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
pubkey, mango_group_write.*
mango_group_write.*
FROM mango_group_write
LEFT JOIN slot USING(slot)
INNER JOIN pubkey using(pubkey_id)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND ((slot.status = 'Confirmed' AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC;
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN account_write USING(pubkey_id, slot, write_version)
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW mango_group_processed AS
SELECT
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
pubkey, mango_group_write.*
mango_group_write.*
FROM mango_group_write
LEFT JOIN slot USING(slot)
INNER JOIN pubkey using(pubkey_id)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND (((slot.status = 'Confirmed' OR slot.status = 'Processed') AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC;
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN account_write USING(pubkey_id, slot, write_version)
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW mango_cache_rooted AS
SELECT
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
pubkey, mango_cache_write.*
mango_cache_write.*
FROM mango_cache_write
LEFT JOIN slot USING(slot)
INNER JOIN pubkey using(pubkey_id)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND (slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC;
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN account_write USING(pubkey_id, slot, write_version)
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW mango_cache_confirmed AS
SELECT
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
pubkey, mango_cache_write.*
mango_cache_write.*
FROM mango_cache_write
LEFT JOIN slot USING(slot)
INNER JOIN pubkey using(pubkey_id)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND ((slot.status = 'Confirmed' AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC;
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN account_write USING(pubkey_id, slot, write_version)
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW mango_cache_processed AS
SELECT
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
pubkey, mango_cache_write.*
mango_cache_write.*
FROM mango_cache_write
LEFT JOIN slot USING(slot)
INNER JOIN pubkey using(pubkey_id)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND (((slot.status = 'Confirmed' OR slot.status = 'Processed') AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC;
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN account_write USING(pubkey_id, slot, write_version)
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;

View File

@ -54,6 +54,7 @@ CREATE TABLE account_write (
pubkey_id BIGINT NOT NULL REFERENCES pubkey,
slot BIGINT NOT NULL,
write_version BIGINT NOT NULL,
is_selected BOOL NOT NULL,
owner_id BIGINT REFERENCES pubkey,
lamports BIGINT NOT NULL,
executable BOOL NOT NULL,

View File

@ -1,34 +1,40 @@
-- Views for raw accounts
CREATE VIEW account_rooted AS
SELECT
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
pubkey, account_write.*
account_write.*
FROM account_write
LEFT JOIN slot USING(slot)
INNER JOIN pubkey USING(pubkey_id)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND (slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC;
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW account_confirmed AS
SELECT
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
pubkey, account_write.*
account_write.*
FROM account_write
LEFT JOIN slot USING(slot)
INNER JOIN pubkey USING(pubkey_id)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND ((slot.status = 'Confirmed' AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC;
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;
CREATE VIEW account_processed AS
SELECT
SELECT pubkey, latest_writes.* FROM
(SELECT
DISTINCT ON(pubkey_id)
pubkey, account_write.*
account_write.*
FROM account_write
LEFT JOIN slot USING(slot)
INNER JOIN pubkey USING(pubkey_id)
CROSS JOIN (SELECT max(slot) FROM slot) ms
WHERE slot <= ms.max
AND (((slot.status = 'Confirmed' OR slot.status = 'Processed') AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC;
ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes
LEFT JOIN pubkey USING(pubkey_id)
WHERE is_selected;

View File

@ -339,6 +339,7 @@ pub async fn process_events(
executable: update.executable,
rent_epoch: update.rent_epoch as i64,
data: update.data,
is_selected: update.is_selected,
})
.await
.expect("send success");

View File

@ -34,6 +34,7 @@ pub struct AccountWrite {
pub executable: bool,
pub rent_epoch: i64,
pub data: Vec<u8>,
pub is_selected: bool,
}
impl AccountWrite {
@ -47,6 +48,7 @@ impl AccountWrite {
executable: account.executable,
rent_epoch: account.rent_epoch as i64, // TODO: narrowing!
data: account.data,
is_selected: true,
}
}
}
@ -157,15 +159,16 @@ impl AccountTable for RawAccountTable {
// TODO: should update for same write_version to work with websocket input
let query = postgres_query::query!(
"INSERT INTO account_write
(pubkey_id, slot, write_version,
(pubkey_id, slot, write_version, is_selected,
owner_id, lamports, executable, rent_epoch, data)
VALUES
(map_pubkey($pubkey), $slot, $write_version,
(map_pubkey($pubkey), $slot, $write_version, $is_selected,
map_pubkey($owner), $lamports, $executable, $rent_epoch, $data)
ON CONFLICT (pubkey_id, slot, write_version) DO NOTHING",
pubkey,
slot = account_write.slot,
write_version = account_write.write_version,
is_selected = account_write.is_selected,
owner,
lamports = account_write.lamports,
executable = account_write.executable,

View File

@ -170,18 +170,34 @@ fn make_cleanup_steps(tables: &Vec<String>) -> HashMap<String, String> {
// Delete all account writes that came before the newest rooted slot except
// for the newest rooted write for each pubkey.
//
// This could be older rooted writes or writes in uncled slots that came
// before the newest rooted slot.
//
// Also delete _all_ writes from before the newest snapshot, because these may
// be for deleted accounts where the deletion event was missed. Snapshots
// provide a new state for all live accounts, but don't tell us about deleted
// accounts.
//
// The way this is done, by taking the newest snapshot that's at least
// min_snapshot_age behind the newest rooted slot is a workaround: we don't know
// how long it'll take to insert snapshot data, but assume it'll be done by that
// time.
let min_snapshot_age = 300;
steps.extend(
tables
.iter()
.map(|table_name| {
let sql = format!(
"WITH newest_rooted AS (SELECT max(slot) AS newest_rooted_slot FROM slot WHERE status = 'Rooted')
"WITH
newest_rooted AS (
SELECT max(slot) AS newest_rooted_slot FROM slot WHERE status = 'Rooted'),
newest_snapshot AS (
SELECT max(slot) AS newest_snapshot_slot FROM account_write, newest_rooted
WHERE write_version = 0 AND slot + {min_snapshot_age} < newest_rooted_slot)
DELETE FROM {table} AS data
USING
newest_rooted,
newest_snapshot,
(SELECT DISTINCT ON(pubkey_id) pubkey_id, slot, write_version
FROM {table}
LEFT JOIN slot USING(slot)
@ -189,10 +205,15 @@ fn make_cleanup_steps(tables: &Vec<String>) -> HashMap<String, String> {
WHERE slot <= newest_rooted_slot AND (status = 'Rooted' OR status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC
) newest_rooted_write
WHERE data.pubkey_id = newest_rooted_write.pubkey_id
AND data.slot <= newest_rooted_slot
AND (data.slot != newest_rooted_write.slot OR data.write_version != newest_rooted_write.write_version)",
table = table_name
WHERE
data.pubkey_id = newest_rooted_write.pubkey_id AND (
data.slot < newest_snapshot_slot OR (
data.slot <= newest_rooted_slot
AND (data.slot != newest_rooted_write.slot OR data.write_version != newest_rooted_write.write_version)
)
)",
table = table_name,
min_snapshot_age = min_snapshot_age,
);
(format!("delete old writes in {}", table_name), sql)
})