From 80423fdac9eb1a37c4dc5aed0151e61883429d54 Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Wed, 26 Jan 2022 10:06:03 +0100 Subject: [PATCH] Track "is_selected" to denote closed/reused accounts --- connector-mango/scripts/create_schema.sql | 1 + connector-mango/scripts/create_views.sql | 130 ++++++++++++++-------- connector-raw/scripts/create_schema.sql | 1 + connector-raw/scripts/create_views.sql | 30 +++-- lib/src/grpc_plugin_source.rs | 1 + lib/src/lib.rs | 7 +- lib/src/postgres_target.rs | 33 +++++- 7 files changed, 134 insertions(+), 69 deletions(-) diff --git a/connector-mango/scripts/create_schema.sql b/connector-mango/scripts/create_schema.sql index f3c05ab..0350c38 100644 --- a/connector-mango/scripts/create_schema.sql +++ b/connector-mango/scripts/create_schema.sql @@ -65,6 +65,7 @@ CREATE TABLE account_write ( pubkey_id BIGINT NOT NULL REFERENCES pubkey, slot BIGINT NOT NULL, write_version BIGINT NOT NULL, + is_selected BOOL NOT NULL, owner_id BIGINT REFERENCES pubkey, lamports BIGINT NOT NULL, executable BOOL NOT NULL, diff --git a/connector-mango/scripts/create_views.sql b/connector-mango/scripts/create_views.sql index 0dd1d09..2d77082 100644 --- a/connector-mango/scripts/create_views.sql +++ b/connector-mango/scripts/create_views.sql @@ -1,72 +1,86 @@ -- Views for raw accounts CREATE VIEW account_rooted AS - SELECT + SELECT pubkey, latest_writes.* FROM + (SELECT DISTINCT ON(pubkey_id) - pubkey, account_write.* + account_write.* FROM account_write LEFT JOIN slot USING(slot) - INNER JOIN pubkey USING(pubkey_id) CROSS JOIN (SELECT max(slot) FROM slot) ms WHERE slot <= ms.max AND (slot.status = 'Rooted' OR slot.status is NULL) - ORDER BY pubkey_id, slot DESC, write_version DESC; + ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes + LEFT JOIN pubkey USING(pubkey_id) + WHERE is_selected; CREATE VIEW account_confirmed AS - SELECT + SELECT pubkey, latest_writes.* FROM + (SELECT DISTINCT ON(pubkey_id) - pubkey, account_write.* + account_write.* FROM account_write LEFT JOIN slot USING(slot) - INNER JOIN pubkey USING(pubkey_id) CROSS JOIN (SELECT max(slot) FROM slot) ms WHERE slot <= ms.max AND ((slot.status = 'Confirmed' AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL) - ORDER BY pubkey_id, slot DESC, write_version DESC; + ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes + LEFT JOIN pubkey USING(pubkey_id) + WHERE is_selected; CREATE VIEW account_processed AS - SELECT + SELECT pubkey, latest_writes.* FROM + (SELECT DISTINCT ON(pubkey_id) - pubkey, account_write.* + account_write.* FROM account_write LEFT JOIN slot USING(slot) - INNER JOIN pubkey USING(pubkey_id) CROSS JOIN (SELECT max(slot) FROM slot) ms WHERE slot <= ms.max AND (((slot.status = 'Confirmed' OR slot.status = 'Processed') AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL) - ORDER BY pubkey_id, slot DESC, write_version DESC; - + ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes + LEFT JOIN pubkey USING(pubkey_id) + WHERE is_selected; CREATE VIEW mango_account_rooted AS - SELECT + SELECT pubkey, latest_writes.* FROM + (SELECT DISTINCT ON(pubkey_id) - pubkey, mango_account_write.* + mango_account_write.* FROM mango_account_write LEFT JOIN slot USING(slot) - INNER JOIN pubkey using(pubkey_id) CROSS JOIN (SELECT max(slot) FROM slot) ms WHERE slot <= ms.max AND (slot.status = 'Rooted' OR slot.status is NULL) - ORDER BY pubkey_id, slot DESC, write_version DESC; + ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes + LEFT JOIN account_write USING(pubkey_id, slot, write_version) + LEFT JOIN pubkey USING(pubkey_id) + WHERE is_selected; CREATE VIEW mango_account_confirmed AS - SELECT + SELECT pubkey, latest_writes.* FROM + (SELECT DISTINCT ON(pubkey_id) - pubkey, mango_account_write.* + mango_account_write.* FROM mango_account_write LEFT JOIN slot USING(slot) - INNER JOIN pubkey using(pubkey_id) CROSS JOIN (SELECT max(slot) FROM slot) ms WHERE slot <= ms.max AND ((slot.status = 'Confirmed' AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL) - ORDER BY pubkey_id, slot DESC, write_version DESC; + ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes + LEFT JOIN account_write USING(pubkey_id, slot, write_version) + LEFT JOIN pubkey USING(pubkey_id) + WHERE is_selected; CREATE VIEW mango_account_processed AS - SELECT + SELECT pubkey, latest_writes.* FROM + (SELECT DISTINCT ON(pubkey_id) - pubkey, mango_account_write.* + mango_account_write.* FROM mango_account_write LEFT JOIN slot USING(slot) - INNER JOIN pubkey using(pubkey_id) CROSS JOIN (SELECT max(slot) FROM slot) ms WHERE slot <= ms.max AND (((slot.status = 'Confirmed' OR slot.status = 'Processed') AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL) - ORDER BY pubkey_id, slot DESC, write_version DESC; + ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes + LEFT JOIN account_write USING(pubkey_id, slot, write_version) + LEFT JOIN pubkey USING(pubkey_id) + WHERE is_selected; CREATE VIEW mango_account_processed_balance AS SELECT @@ -90,69 +104,87 @@ CREATE VIEW mango_account_processed_perp AS ) q; CREATE VIEW mango_group_rooted AS - SELECT + SELECT pubkey, latest_writes.* FROM + (SELECT DISTINCT ON(pubkey_id) - pubkey, mango_group_write.* + mango_group_write.* FROM mango_group_write LEFT JOIN slot USING(slot) - INNER JOIN pubkey using(pubkey_id) CROSS JOIN (SELECT max(slot) FROM slot) ms WHERE slot <= ms.max AND (slot.status = 'Rooted' OR slot.status is NULL) - ORDER BY pubkey_id, slot DESC, write_version DESC; + ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes + LEFT JOIN account_write USING(pubkey_id, slot, write_version) + LEFT JOIN pubkey USING(pubkey_id) + WHERE is_selected; CREATE VIEW mango_group_confirmed AS - SELECT + SELECT pubkey, latest_writes.* FROM + (SELECT DISTINCT ON(pubkey_id) - pubkey, mango_group_write.* + mango_group_write.* FROM mango_group_write LEFT JOIN slot USING(slot) - INNER JOIN pubkey using(pubkey_id) CROSS JOIN (SELECT max(slot) FROM slot) ms WHERE slot <= ms.max AND ((slot.status = 'Confirmed' AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL) - ORDER BY pubkey_id, slot DESC, write_version DESC; + ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes + LEFT JOIN account_write USING(pubkey_id, slot, write_version) + LEFT JOIN pubkey USING(pubkey_id) + WHERE is_selected; CREATE VIEW mango_group_processed AS - SELECT + SELECT pubkey, latest_writes.* FROM + (SELECT DISTINCT ON(pubkey_id) - pubkey, mango_group_write.* + mango_group_write.* FROM mango_group_write LEFT JOIN slot USING(slot) - INNER JOIN pubkey using(pubkey_id) CROSS JOIN (SELECT max(slot) FROM slot) ms WHERE slot <= ms.max AND (((slot.status = 'Confirmed' OR slot.status = 'Processed') AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL) - ORDER BY pubkey_id, slot DESC, write_version DESC; + ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes + LEFT JOIN account_write USING(pubkey_id, slot, write_version) + LEFT JOIN pubkey USING(pubkey_id) + WHERE is_selected; CREATE VIEW mango_cache_rooted AS - SELECT + SELECT pubkey, latest_writes.* FROM + (SELECT DISTINCT ON(pubkey_id) - pubkey, mango_cache_write.* + mango_cache_write.* FROM mango_cache_write LEFT JOIN slot USING(slot) - INNER JOIN pubkey using(pubkey_id) CROSS JOIN (SELECT max(slot) FROM slot) ms WHERE slot <= ms.max AND (slot.status = 'Rooted' OR slot.status is NULL) - ORDER BY pubkey_id, slot DESC, write_version DESC; + ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes + LEFT JOIN account_write USING(pubkey_id, slot, write_version) + LEFT JOIN pubkey USING(pubkey_id) + WHERE is_selected; CREATE VIEW mango_cache_confirmed AS - SELECT + SELECT pubkey, latest_writes.* FROM + (SELECT DISTINCT ON(pubkey_id) - pubkey, mango_cache_write.* + mango_cache_write.* FROM mango_cache_write LEFT JOIN slot USING(slot) - INNER JOIN pubkey using(pubkey_id) CROSS JOIN (SELECT max(slot) FROM slot) ms WHERE slot <= ms.max AND ((slot.status = 'Confirmed' AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL) - ORDER BY pubkey_id, slot DESC, write_version DESC; + ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes + LEFT JOIN account_write USING(pubkey_id, slot, write_version) + LEFT JOIN pubkey USING(pubkey_id) + WHERE is_selected; CREATE VIEW mango_cache_processed AS - SELECT + SELECT pubkey, latest_writes.* FROM + (SELECT DISTINCT ON(pubkey_id) - pubkey, mango_cache_write.* + mango_cache_write.* FROM mango_cache_write LEFT JOIN slot USING(slot) - INNER JOIN pubkey using(pubkey_id) CROSS JOIN (SELECT max(slot) FROM slot) ms WHERE slot <= ms.max AND (((slot.status = 'Confirmed' OR slot.status = 'Processed') AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL) - ORDER BY pubkey_id, slot DESC, write_version DESC; + ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes + LEFT JOIN account_write USING(pubkey_id, slot, write_version) + LEFT JOIN pubkey USING(pubkey_id) + WHERE is_selected; diff --git a/connector-raw/scripts/create_schema.sql b/connector-raw/scripts/create_schema.sql index 372906c..b2778ed 100644 --- a/connector-raw/scripts/create_schema.sql +++ b/connector-raw/scripts/create_schema.sql @@ -54,6 +54,7 @@ CREATE TABLE account_write ( pubkey_id BIGINT NOT NULL REFERENCES pubkey, slot BIGINT NOT NULL, write_version BIGINT NOT NULL, + is_selected BOOL NOT NULL, owner_id BIGINT REFERENCES pubkey, lamports BIGINT NOT NULL, executable BOOL NOT NULL, diff --git a/connector-raw/scripts/create_views.sql b/connector-raw/scripts/create_views.sql index 2657ba9..f29a7da 100644 --- a/connector-raw/scripts/create_views.sql +++ b/connector-raw/scripts/create_views.sql @@ -1,34 +1,40 @@ -- Views for raw accounts CREATE VIEW account_rooted AS - SELECT + SELECT pubkey, latest_writes.* FROM + (SELECT DISTINCT ON(pubkey_id) - pubkey, account_write.* + account_write.* FROM account_write LEFT JOIN slot USING(slot) - INNER JOIN pubkey USING(pubkey_id) CROSS JOIN (SELECT max(slot) FROM slot) ms WHERE slot <= ms.max AND (slot.status = 'Rooted' OR slot.status is NULL) - ORDER BY pubkey_id, slot DESC, write_version DESC; + ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes + LEFT JOIN pubkey USING(pubkey_id) + WHERE is_selected; CREATE VIEW account_confirmed AS - SELECT + SELECT pubkey, latest_writes.* FROM + (SELECT DISTINCT ON(pubkey_id) - pubkey, account_write.* + account_write.* FROM account_write LEFT JOIN slot USING(slot) - INNER JOIN pubkey USING(pubkey_id) CROSS JOIN (SELECT max(slot) FROM slot) ms WHERE slot <= ms.max AND ((slot.status = 'Confirmed' AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL) - ORDER BY pubkey_id, slot DESC, write_version DESC; + ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes + LEFT JOIN pubkey USING(pubkey_id) + WHERE is_selected; CREATE VIEW account_processed AS - SELECT + SELECT pubkey, latest_writes.* FROM + (SELECT DISTINCT ON(pubkey_id) - pubkey, account_write.* + account_write.* FROM account_write LEFT JOIN slot USING(slot) - INNER JOIN pubkey USING(pubkey_id) CROSS JOIN (SELECT max(slot) FROM slot) ms WHERE slot <= ms.max AND (((slot.status = 'Confirmed' OR slot.status = 'Processed') AND NOT slot.uncle) OR slot.status = 'Rooted' OR slot.status is NULL) - ORDER BY pubkey_id, slot DESC, write_version DESC; + ORDER BY pubkey_id, slot DESC, write_version DESC) latest_writes + LEFT JOIN pubkey USING(pubkey_id) + WHERE is_selected; diff --git a/lib/src/grpc_plugin_source.rs b/lib/src/grpc_plugin_source.rs index 47b8456..958f8d5 100644 --- a/lib/src/grpc_plugin_source.rs +++ b/lib/src/grpc_plugin_source.rs @@ -339,6 +339,7 @@ pub async fn process_events( executable: update.executable, rent_epoch: update.rent_epoch as i64, data: update.data, + is_selected: update.is_selected, }) .await .expect("send success"); diff --git a/lib/src/lib.rs b/lib/src/lib.rs index c1ae9dd..d349de4 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -34,6 +34,7 @@ pub struct AccountWrite { pub executable: bool, pub rent_epoch: i64, pub data: Vec, + pub is_selected: bool, } impl AccountWrite { @@ -47,6 +48,7 @@ impl AccountWrite { executable: account.executable, rent_epoch: account.rent_epoch as i64, // TODO: narrowing! data: account.data, + is_selected: true, } } } @@ -157,15 +159,16 @@ impl AccountTable for RawAccountTable { // TODO: should update for same write_version to work with websocket input let query = postgres_query::query!( "INSERT INTO account_write - (pubkey_id, slot, write_version, + (pubkey_id, slot, write_version, is_selected, owner_id, lamports, executable, rent_epoch, data) VALUES - (map_pubkey($pubkey), $slot, $write_version, + (map_pubkey($pubkey), $slot, $write_version, $is_selected, map_pubkey($owner), $lamports, $executable, $rent_epoch, $data) ON CONFLICT (pubkey_id, slot, write_version) DO NOTHING", pubkey, slot = account_write.slot, write_version = account_write.write_version, + is_selected = account_write.is_selected, owner, lamports = account_write.lamports, executable = account_write.executable, diff --git a/lib/src/postgres_target.rs b/lib/src/postgres_target.rs index ce302ae..4b8c77b 100644 --- a/lib/src/postgres_target.rs +++ b/lib/src/postgres_target.rs @@ -170,18 +170,34 @@ fn make_cleanup_steps(tables: &Vec) -> HashMap { // Delete all account writes that came before the newest rooted slot except // for the newest rooted write for each pubkey. - // // This could be older rooted writes or writes in uncled slots that came // before the newest rooted slot. + // + // Also delete _all_ writes from before the newest snapshot, because these may + // be for deleted accounts where the deletion event was missed. Snapshots + // provide a new state for all live accounts, but don't tell us about deleted + // accounts. + // + // The way this is done, by taking the newest snapshot that's at least + // min_snapshot_age behind the newest rooted slot is a workaround: we don't know + // how long it'll take to insert snapshot data, but assume it'll be done by that + // time. + let min_snapshot_age = 300; steps.extend( tables .iter() .map(|table_name| { let sql = format!( - "WITH newest_rooted AS (SELECT max(slot) AS newest_rooted_slot FROM slot WHERE status = 'Rooted') + "WITH + newest_rooted AS ( + SELECT max(slot) AS newest_rooted_slot FROM slot WHERE status = 'Rooted'), + newest_snapshot AS ( + SELECT max(slot) AS newest_snapshot_slot FROM account_write, newest_rooted + WHERE write_version = 0 AND slot + {min_snapshot_age} < newest_rooted_slot) DELETE FROM {table} AS data USING newest_rooted, + newest_snapshot, (SELECT DISTINCT ON(pubkey_id) pubkey_id, slot, write_version FROM {table} LEFT JOIN slot USING(slot) @@ -189,10 +205,15 @@ fn make_cleanup_steps(tables: &Vec) -> HashMap { WHERE slot <= newest_rooted_slot AND (status = 'Rooted' OR status is NULL) ORDER BY pubkey_id, slot DESC, write_version DESC ) newest_rooted_write - WHERE data.pubkey_id = newest_rooted_write.pubkey_id - AND data.slot <= newest_rooted_slot - AND (data.slot != newest_rooted_write.slot OR data.write_version != newest_rooted_write.write_version)", - table = table_name + WHERE + data.pubkey_id = newest_rooted_write.pubkey_id AND ( + data.slot < newest_snapshot_slot OR ( + data.slot <= newest_rooted_slot + AND (data.slot != newest_rooted_write.slot OR data.write_version != newest_rooted_write.write_version) + ) + )", + table = table_name, + min_snapshot_age = min_snapshot_age, ); (format!("delete old writes in {}", table_name), sql) })