Compare commits
19 Commits
experiment
...
main
Author | SHA1 | Date |
---|---|---|
GroovieGermanikus | b4bf41c4cd | |
GroovieGermanikus | 39ebe53663 | |
GroovieGermanikus | 491c6b16fe | |
GroovieGermanikus | 3c8668ff31 | |
GroovieGermanikus | 0fdf9fb1b6 | |
GroovieGermanikus | 5c640c94fa | |
GroovieGermanikus | 7a1bc411b7 | |
GroovieGermanikus | 65ce71cc2d | |
GroovieGermanikus | 6bf8912599 | |
GroovieGermanikus | a48893004c | |
GroovieGermanikus | 3c10711732 | |
GroovieGermanikus | 7e121e9606 | |
GroovieGermanikus | 944bf7ea54 | |
GroovieGermanikus | d1d7a7b525 | |
GroovieGermanikus | fd8fb50fb2 | |
GroovieGermanikus | a1946f59ba | |
GroovieGermanikus | 1e5598eb5b | |
GroovieGermanikus | 368f37cace | |
Groovie | Mango | 78344dc8b4 |
|
@ -0,0 +1,51 @@
|
|||
name: Cargo Build & Test
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
pull_request:
|
||||
|
||||
env:
|
||||
CARGO_TERM_COLOR: always
|
||||
SCCACHE_GHA_ENABLED: true
|
||||
RUSTC_WRAPPER: sccache
|
||||
SCCACHE_CACHE_SIZE: "1G"
|
||||
|
||||
jobs:
|
||||
build_and_test:
|
||||
name: bankingstage-sidecar check and test
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- name: Install Linux Packages
|
||||
run: |
|
||||
sudo apt-get update -y
|
||||
sudo apt-get install libssl-dev openssl -y
|
||||
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
# The toolchain action should definitely be run before the cache action
|
||||
- uses: actions-rust-lang/setup-rust-toolchain@v1
|
||||
with:
|
||||
cache: true
|
||||
# avoid the default "-D warnings" which thrashes cache
|
||||
rustflags: ""
|
||||
|
||||
- name: Run sccache-cache
|
||||
uses: mozilla-actions/sccache-action@v0.0.3
|
||||
|
||||
# https://github.com/actions/cache/blob/main/examples.md#rust---cargo
|
||||
# https://blog.arriven.wtf/posts/rust-ci-cache/
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
with:
|
||||
# will be covered by sscache
|
||||
cache-targets: false
|
||||
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
|
||||
|
||||
|
||||
- name: Early Build
|
||||
run: |
|
||||
cargo check --locked --workspace --all-targets
|
||||
|
||||
- name: Run Tests
|
||||
run: RUST_LOG=info cargo test
|
|
@ -1241,6 +1241,12 @@ version = "2.0.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5"
|
||||
|
||||
[[package]]
|
||||
name = "fd_bs58"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fdaac1b30ff95de4562a79ae990f91ac7bf2062d7313cc25c58987762c8e8160"
|
||||
|
||||
[[package]]
|
||||
name = "feature-probe"
|
||||
version = "0.1.1"
|
||||
|
@ -1466,6 +1472,7 @@ dependencies = [
|
|||
"clap",
|
||||
"const_env",
|
||||
"dashmap",
|
||||
"fd_bs58",
|
||||
"futures",
|
||||
"itertools 0.10.5",
|
||||
"lazy_static",
|
||||
|
@ -1485,6 +1492,7 @@ dependencies = [
|
|||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"yellowstone-grpc-client 1.11.1+solana.1.16.17",
|
||||
"yellowstone-grpc-client 1.12.0+solana.1.16.17",
|
||||
|
@ -1951,6 +1959,15 @@ version = "0.4.20"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
|
||||
|
||||
[[package]]
|
||||
name = "matchers"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
|
||||
dependencies = [
|
||||
"regex-automata 0.1.10",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "matchit"
|
||||
version = "0.7.3"
|
||||
|
@ -2775,8 +2792,17 @@ checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343"
|
|||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
"regex-automata",
|
||||
"regex-syntax",
|
||||
"regex-automata 0.4.3",
|
||||
"regex-syntax 0.8.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "regex-automata"
|
||||
version = "0.1.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
|
||||
dependencies = [
|
||||
"regex-syntax 0.6.29",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -2787,9 +2813,15 @@ checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f"
|
|||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
"regex-syntax",
|
||||
"regex-syntax 0.8.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "regex-syntax"
|
||||
version = "0.6.29"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
|
||||
|
||||
[[package]]
|
||||
name = "regex-syntax"
|
||||
version = "0.8.2"
|
||||
|
@ -4428,10 +4460,14 @@ version = "0.3.18"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b"
|
||||
dependencies = [
|
||||
"matchers",
|
||||
"nu-ansi-term",
|
||||
"once_cell",
|
||||
"regex",
|
||||
"sharded-slab",
|
||||
"smallvec",
|
||||
"thread_local",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-log",
|
||||
]
|
||||
|
@ -4886,7 +4922,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "yellowstone-grpc-client"
|
||||
version = "1.11.1+solana.1.16.17"
|
||||
source = "git+https://github.com/blockworks-foundation/yellowstone-grpc.git?branch=tag-v1.16-mango#620c49a01a6de30790b2afcc71c6c709b3061880"
|
||||
source = "git+https://github.com/blockworks-foundation/yellowstone-grpc.git?branch=tag-v1.16-mango#ada20b8e3946d87bcb7ebfe97f6a2153366094e7"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures",
|
||||
|
@ -4900,8 +4936,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "yellowstone-grpc-client"
|
||||
version = "1.12.0+solana.1.16.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e58204f372a7e82d15d72bdf99334029c4e9cdc15bd2e9a5c33b598d9f1eb8b6"
|
||||
source = "git+https://github.com/blockworks-foundation/yellowstone-grpc.git?tag=v1.11.0+solana.1.16.17-with-buffers2#751d756cb00c8a8c56a75a23e584f19bee5695cf"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures",
|
||||
|
@ -4915,7 +4950,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "yellowstone-grpc-proto"
|
||||
version = "1.10.0+solana.1.16.17"
|
||||
source = "git+https://github.com/blockworks-foundation/yellowstone-grpc.git?branch=tag-v1.16-mango#620c49a01a6de30790b2afcc71c6c709b3061880"
|
||||
source = "git+https://github.com/blockworks-foundation/yellowstone-grpc.git?branch=tag-v1.16-mango#ada20b8e3946d87bcb7ebfe97f6a2153366094e7"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bincode",
|
||||
|
@ -4931,8 +4966,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "yellowstone-grpc-proto"
|
||||
version = "1.11.0+solana.1.16.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "00d751c6ef3093ec90ab1e16c6a504b5bea99aca6c688c429fed4cc56782f57e"
|
||||
source = "git+https://github.com/blockworks-foundation/yellowstone-grpc.git?tag=v1.11.0+solana.1.16.17-with-buffers2#751d756cb00c8a8c56a75a23e584f19bee5695cf"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bincode",
|
||||
|
|
|
@ -17,13 +17,15 @@ serde = { version = "1.0.160", features = ["derive"] }
|
|||
serde_json = "1.0.96"
|
||||
bincode = "1.3.3"
|
||||
bs58 = "0.4.0"
|
||||
fd_bs58 = "0.1.0"
|
||||
base64 = "0.21.0"
|
||||
thiserror = "1.0.40"
|
||||
futures = "0.3.28"
|
||||
bytes = "1.4.0"
|
||||
anyhow = "1.0.70"
|
||||
log = "0.4.17"
|
||||
tracing-subscriber = "0.3.18"
|
||||
tracing = "0.1.37"
|
||||
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
|
||||
clap = { version = "4.2.4", features = ["derive", "env"] }
|
||||
dashmap = "5.4.0"
|
||||
const_env = "0.1.2"
|
||||
|
@ -40,8 +42,8 @@ tokio = { version = "1.32.0", features = ["rt-multi-thread", "macros", "time"] }
|
|||
yellowstone-grpc-client = {git = "https://github.com/blockworks-foundation/yellowstone-grpc.git", branch = "tag-v1.16-mango"}
|
||||
yellowstone-grpc-proto = {git = "https://github.com/blockworks-foundation/yellowstone-grpc.git", branch = "tag-v1.16-mango"}
|
||||
|
||||
yellowstone-grpc-client_original = { package = "yellowstone-grpc-client", version = "1.12.0+solana.1.16.17" }
|
||||
yellowstone-grpc-proto_original = { package = "yellowstone-grpc-proto", version = "1.11.0+solana.1.16.17" }
|
||||
yellowstone-grpc-client_original = { package = "yellowstone-grpc-client", git = "https://github.com/blockworks-foundation/yellowstone-grpc.git", tag = "v1.11.0+solana.1.16.17-with-buffers2" }
|
||||
yellowstone-grpc-proto_original = { package = "yellowstone-grpc-proto", git = "https://github.com/blockworks-foundation/yellowstone-grpc.git", tag = "v1.11.0+solana.1.16.17-with-buffers2" }
|
||||
|
||||
|
||||
[patch.crates-io]
|
||||
|
|
|
@ -6,14 +6,14 @@
|
|||
CREATE SCHEMA banking_stage_results_2;
|
||||
|
||||
CREATE TABLE banking_stage_results_2.transactions(
|
||||
transaction_id bigserial PRIMARY KEY,
|
||||
signature varchar(88) NOT NULL,
|
||||
UNIQUE(signature)
|
||||
transaction_id bigserial,
|
||||
signature varchar(88) NOT NULL,
|
||||
PRIMARY KEY (transaction_id) INCLUDE(signature),
|
||||
UNIQUE(signature) INCLUDE (transaction_id)
|
||||
);
|
||||
ALTER TABLE banking_stage_results_2.transactions ALTER COLUMN signature SET STORAGE MAIN;
|
||||
-- page layout: rows are small and must store in main; compression is okey
|
||||
-- ALTER TABLE banking_stage_results_2.transactions SET (toast_tuple_target=4080);
|
||||
|
||||
ALTER INDEX banking_stage_results_2.transaction_pkey SET (FILLFACTOR=80);
|
||||
ALTER INDEX banking_stage_results_2.transaction_signature_transaction_id_key SET (FILLFACTOR=80);
|
||||
ALTER TABLE banking_stage_results_2.transactions SET (toast_tuple_target=128);
|
||||
|
||||
CREATE TABLE banking_stage_results_2.transaction_infos (
|
||||
transaction_id BIGINT PRIMARY KEY,
|
||||
|
@ -66,15 +66,16 @@ ALTER TABLE banking_stage_results_2.blocks SET (toast_tuple_target=200);
|
|||
ALTER TABLE banking_stage_results_2.blocks ALTER COLUMN block_hash SET STORAGE main;
|
||||
CREATE INDEX idx_blocks_block_hash ON banking_stage_results_2.blocks(block_hash);
|
||||
|
||||
|
||||
CREATE TABLE banking_stage_results_2.accounts(
|
||||
acc_id bigserial PRIMARY KEY,
|
||||
account_key varchar(44) NOT NULL,
|
||||
UNIQUE (account_key)
|
||||
PRIMARY KEY (acc_id) INCLUDE(account_key),
|
||||
UNIQUE(account_key) INCLUDE (acc_id)
|
||||
);
|
||||
-- page layout: rows are small and must store in main; compression is okey
|
||||
ALTER TABLE banking_stage_results_2.accounts ALTER COLUMN account_key SET STORAGE main;
|
||||
-- ALTER TABLE banking_stage_results_2.transactions SET (toast_tuple_target=4080);
|
||||
ALTER TABLE banking_stage_results_2.accounts SET (toast_tuple_target=128);
|
||||
ALTER INDEX banking_stage_results_2.accounts_pkey SET (FILLFACTOR=80);
|
||||
ALTER INDEX banking_stage_results_2.accounts_account_key_acc_id_key SET (FILLFACTOR=80);
|
||||
-- table+index size (605mn rows): 146GB total = 48GB main table + 49 GB index by acc_id + 49 GB index by account_key
|
||||
|
||||
|
||||
CREATE TABLE banking_stage_results_2.accounts_map_transaction(
|
||||
|
@ -206,6 +207,7 @@ ALTER TABLE banking_stage_results_2.accounts_map_transaction
|
|||
autovacuum_analyze_threshold=10000
|
||||
);
|
||||
|
||||
-- not written ATM
|
||||
ALTER TABLE banking_stage_results_2.accounts_map_transaction_latest
|
||||
SET (
|
||||
autovacuum_vacuum_scale_factor=0,
|
||||
|
|
|
@ -11,6 +11,7 @@ while true; do
|
|||
# startup delay
|
||||
sleep 300;
|
||||
# should retain about 1 week of data
|
||||
# no "--count-rows"
|
||||
RUST_LOG=info /usr/local/bin/cleanupdb --num-slots-to-keep $SLOTS_TO_KEEP;
|
||||
# every 5 hours
|
||||
sleep 18000;
|
||||
|
|
|
@ -26,14 +26,14 @@ pub struct ALTStore {
|
|||
impl ALTStore {
|
||||
pub fn new(rpc_client: Arc<RpcClient>) -> Self {
|
||||
let (sx, rx) = async_channel::unbounded();
|
||||
let instant = Self {
|
||||
let alt_store = Self {
|
||||
rpc_client,
|
||||
map: Arc::new(DashMap::new()),
|
||||
loading_queue: Arc::new(sx),
|
||||
};
|
||||
|
||||
{
|
||||
let instant = instant.clone();
|
||||
let instant = alt_store.clone();
|
||||
tokio::task::spawn(async move {
|
||||
loop {
|
||||
if let Ok(pk) = rx.recv().await {
|
||||
|
@ -51,15 +51,15 @@ impl ALTStore {
|
|||
});
|
||||
}
|
||||
|
||||
instant
|
||||
alt_store
|
||||
}
|
||||
|
||||
pub async fn load_alts_list(&self, alts_list: &Vec<Pubkey>) {
|
||||
pub async fn load_alts_list(&self, alts_list: Vec<&Pubkey>) {
|
||||
log::info!("Preloading {} ALTs", alts_list.len());
|
||||
|
||||
for batches in alts_list.chunks(1000).map(|x| x.to_vec()) {
|
||||
for batches in alts_list.chunks(1000) {
|
||||
let tasks = batches.chunks(100).map(|batch| {
|
||||
let batch = batch.to_vec();
|
||||
let batch: Vec<Pubkey> = batch.into_iter().map(|pk| (*pk).clone()).collect_vec();
|
||||
let rpc_client = self.rpc_client.clone();
|
||||
let this = self.clone();
|
||||
tokio::spawn(async move {
|
||||
|
@ -74,7 +74,7 @@ impl ALTStore {
|
|||
{
|
||||
for (index, acc) in multiple_accounts.value.iter().enumerate() {
|
||||
if let Some(acc) = acc {
|
||||
this.save_account(&batch[index], &acc.data);
|
||||
this.save_account(batch[index], &acc.data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -94,26 +94,25 @@ impl ALTStore {
|
|||
let alts_list = alts_list
|
||||
.iter()
|
||||
.filter(|x| !self.map.contains_key(x))
|
||||
.cloned()
|
||||
.collect_vec();
|
||||
if alts_list.is_empty() {
|
||||
return;
|
||||
}
|
||||
self.load_alts_list(&alts_list).await;
|
||||
self.load_alts_list(alts_list).await;
|
||||
}
|
||||
|
||||
pub async fn start_loading_missing_alts(&self, alts_list: &Vec<Pubkey>) {
|
||||
pub async fn start_loading_missing_alts(&self, alts_list: &Vec<&Pubkey>) {
|
||||
for key in alts_list.iter().filter(|x| !self.map.contains_key(x)) {
|
||||
ALTS_IN_LOADING_QUEUE.inc();
|
||||
let _ = self.loading_queue.send(*key).await;
|
||||
let _ = self.loading_queue.send(*key.clone()).await;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn save_account(&self, address: &Pubkey, data: &[u8]) {
|
||||
pub fn save_account(&self, address: Pubkey, data: &[u8]) {
|
||||
let lookup_table = AddressLookupTable::deserialize(&data).unwrap();
|
||||
if self
|
||||
.map
|
||||
.insert(address.clone(), lookup_table.addresses.to_vec())
|
||||
.insert(address, lookup_table.addresses.to_vec())
|
||||
.is_none()
|
||||
{
|
||||
ALTS_IN_STORE.inc();
|
||||
|
@ -121,7 +120,7 @@ impl ALTStore {
|
|||
drop(lookup_table);
|
||||
}
|
||||
|
||||
async fn load_accounts(
|
||||
async fn load_accounts<'a>(
|
||||
&self,
|
||||
alt: &Pubkey,
|
||||
write_accounts: &Vec<u8>,
|
||||
|
@ -188,13 +187,13 @@ impl ALTStore {
|
|||
}
|
||||
|
||||
pub fn serialize(&self) -> Vec<u8> {
|
||||
bincode::serialize::<BinaryALTData>(&BinaryALTData::new(&self.map)).unwrap()
|
||||
bincode::serialize::<BinaryALTData>(&BinaryALTData::new(self.map.clone())).unwrap()
|
||||
}
|
||||
|
||||
pub fn load_binary(&self, binary_data: Vec<u8>) {
|
||||
let binary_alt_data = bincode::deserialize::<BinaryALTData>(&binary_data).unwrap();
|
||||
for (alt, accounts) in binary_alt_data.data.iter() {
|
||||
self.map.insert(alt.clone(), accounts.clone());
|
||||
for (pubkey, accounts) in binary_alt_data.data {
|
||||
self.map.insert(pubkey, accounts);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -205,7 +204,7 @@ pub struct BinaryALTData {
|
|||
}
|
||||
|
||||
impl BinaryALTData {
|
||||
pub fn new(map: &Arc<DashMap<Pubkey, Vec<Pubkey>>>) -> Self {
|
||||
pub fn new(map: Arc<DashMap<Pubkey, Vec<Pubkey>>>) -> Self {
|
||||
let data = map
|
||||
.iter()
|
||||
.map(|x| (x.key().clone(), x.value().clone()))
|
||||
|
|
|
@ -13,6 +13,9 @@ pub struct Args {
|
|||
|
||||
#[arg(short, long, default_value_t = false)]
|
||||
pub dry_run: bool,
|
||||
|
||||
#[arg(short, long, default_value_t = false)]
|
||||
pub count_rows: bool,
|
||||
}
|
||||
|
||||
#[tokio::main()]
|
||||
|
@ -22,9 +25,10 @@ async fn main() {
|
|||
let Args {
|
||||
num_slots_to_keep,
|
||||
dry_run,
|
||||
count_rows,
|
||||
} = Args::parse();
|
||||
|
||||
let session = PostgresSession::new(0).await.unwrap();
|
||||
|
||||
session.cleanup_old_data(num_slots_to_keep, dry_run).await;
|
||||
session.cleanup_old_data(num_slots_to_keep, dry_run, count_rows).await;
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ use std::{
|
|||
collections::{HashMap, HashSet},
|
||||
sync::Arc,
|
||||
};
|
||||
use std::rc::Rc;
|
||||
|
||||
#[derive(Serialize, Debug, Clone)]
|
||||
pub struct PrioFeeData {
|
||||
|
@ -121,7 +122,7 @@ pub struct BlockInfo {
|
|||
impl BlockInfo {
|
||||
pub async fn process_versioned_message(
|
||||
atl_store: &Arc<ALTStore>,
|
||||
signature: &String,
|
||||
signature: String,
|
||||
slot: Slot,
|
||||
message: &VersionedMessage,
|
||||
prio_fees_in_block: &mut Vec<(u64, u64)>,
|
||||
|
@ -183,10 +184,10 @@ impl BlockInfo {
|
|||
if !is_vote {
|
||||
let mut accounts = message
|
||||
.static_account_keys()
|
||||
.iter()
|
||||
.iter().cloned()
|
||||
.enumerate()
|
||||
.map(|(index, account)| TransactionAccount {
|
||||
key: account.clone(),
|
||||
.map(|(index, account_pk)| TransactionAccount {
|
||||
key: account_pk,
|
||||
is_writable: message.is_maybe_writable(index),
|
||||
is_signer: message.is_signer(index),
|
||||
is_alt: false,
|
||||
|
@ -209,7 +210,7 @@ impl BlockInfo {
|
|||
for writable_account in accounts
|
||||
.iter()
|
||||
.filter(|x| x.is_writable)
|
||||
.map(|x| x.key.clone())
|
||||
.map(|x| x.key)
|
||||
{
|
||||
match writelocked_accounts.get_mut(&writable_account) {
|
||||
Some(x) => {
|
||||
|
@ -219,9 +220,9 @@ impl BlockInfo {
|
|||
}
|
||||
None => {
|
||||
writelocked_accounts.insert(
|
||||
writable_account.clone(),
|
||||
writable_account,
|
||||
AccountData {
|
||||
key: writable_account.to_string(),
|
||||
key: fd_bs58::encode_32(writable_account),
|
||||
cu_consumed,
|
||||
cu_requested,
|
||||
vec_pf: vec![prioritization_fees],
|
||||
|
@ -234,7 +235,7 @@ impl BlockInfo {
|
|||
for readable_account in accounts
|
||||
.iter()
|
||||
.filter(|x| !x.is_writable)
|
||||
.map(|x| x.key.clone())
|
||||
.map(|x| x.key)
|
||||
{
|
||||
match readlocked_accounts.get_mut(&readable_account) {
|
||||
Some(x) => {
|
||||
|
@ -244,9 +245,9 @@ impl BlockInfo {
|
|||
}
|
||||
None => {
|
||||
readlocked_accounts.insert(
|
||||
readable_account.clone(),
|
||||
readable_account,
|
||||
AccountData {
|
||||
key: readable_account.to_string(),
|
||||
key: fd_bs58::encode_32(readable_account),
|
||||
cu_consumed,
|
||||
cu_requested,
|
||||
vec_pf: vec![prioritization_fees],
|
||||
|
@ -257,14 +258,14 @@ impl BlockInfo {
|
|||
}
|
||||
|
||||
Some(BlockTransactionInfo {
|
||||
signature: signature.to_string(),
|
||||
signature,
|
||||
processed_slot: slot as i64,
|
||||
is_successful,
|
||||
cu_requested: cu_requested as i64,
|
||||
cu_consumed: cu_consumed as i64,
|
||||
prioritization_fees: prioritization_fees as i64,
|
||||
supp_infos: String::new(),
|
||||
accounts: accounts,
|
||||
accounts,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
|
@ -348,6 +349,7 @@ impl BlockInfo {
|
|||
block: &yellowstone_grpc_proto_original::prelude::SubscribeUpdateBlock,
|
||||
) -> BlockInfo {
|
||||
let block_hash = block.blockhash.clone();
|
||||
let _span = tracing::debug_span!("map_block_info", block_hash = block_hash);
|
||||
let slot = block.slot;
|
||||
let leader_identity = block
|
||||
.rewards
|
||||
|
@ -401,9 +403,8 @@ impl BlockInfo {
|
|||
let Some(meta) = &transaction.meta else {
|
||||
return None;
|
||||
};
|
||||
let signature = Signature::try_from(&tx.signatures[0][0..64])
|
||||
.unwrap()
|
||||
.to_string();
|
||||
|
||||
let signature = fd_bs58::encode_64(&tx.signatures[0]);
|
||||
|
||||
let message = VersionedMessage::V0(v0::Message {
|
||||
header: MessageHeader {
|
||||
|
@ -442,7 +443,7 @@ impl BlockInfo {
|
|||
.try_into()
|
||||
.unwrap_or(Pubkey::default().to_bytes());
|
||||
let account_key = Pubkey::new_from_array(bytes);
|
||||
lookup_tables.insert(account_key.clone());
|
||||
lookup_tables.insert(account_key);
|
||||
MessageAddressTableLookup {
|
||||
account_key,
|
||||
writable_indexes: table.writable_indexes,
|
||||
|
@ -456,14 +457,14 @@ impl BlockInfo {
|
|||
.collect_vec();
|
||||
|
||||
atl_store
|
||||
.start_loading_missing_alts(&lookup_tables.iter().cloned().collect_vec())
|
||||
.start_loading_missing_alts(&lookup_tables.iter().collect_vec())
|
||||
.await;
|
||||
|
||||
let mut block_transactions = vec![];
|
||||
for (signature, message, meta, is_vote) in sigs_and_messages {
|
||||
let tx = Self::process_versioned_message(
|
||||
&atl_store,
|
||||
&signature,
|
||||
signature,
|
||||
slot,
|
||||
&message,
|
||||
&mut prio_fees_in_block,
|
||||
|
|
|
@ -9,6 +9,7 @@ pub struct Args {
|
|||
#[arg(short, long)]
|
||||
pub grpc_address_to_fetch_blocks: Option<String>,
|
||||
|
||||
// note: this only applies to grpc_address_to_fetch_blocks
|
||||
#[arg(short = 'x', long)]
|
||||
pub grpc_x_token: Option<String>,
|
||||
|
||||
|
|
40
src/main.rs
40
src/main.rs
|
@ -26,6 +26,9 @@ use futures::StreamExt;
|
|||
use log::{debug, error, info};
|
||||
use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter, IntGauge};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tracing_subscriber::EnvFilter;
|
||||
use tracing_subscriber::fmt::format::FmtSpan;
|
||||
use yellowstone_grpc_client_original::GeyserGrpcClientBufferConfig;
|
||||
use transaction_info::TransactionInfo;
|
||||
|
||||
mod alt_store;
|
||||
|
@ -52,6 +55,8 @@ lazy_static::lazy_static! {
|
|||
register_int_gauge!(opts!("bankingstage_blocks_in_rpc_queue", "Banking stage blocks in rpc queue")).unwrap();
|
||||
}
|
||||
|
||||
const NUM_BLOCK_SENDERS: usize = 4;
|
||||
|
||||
pub async fn start_tracking_banking_stage_errors(
|
||||
grpc_address: String,
|
||||
map_of_infos: Arc<DashMap<(String, u64), TransactionInfo>>,
|
||||
|
@ -122,7 +127,7 @@ pub async fn start_tracking_banking_stage_errors(
|
|||
BANKING_STAGE_ERROR_EVENT_COUNT.inc();
|
||||
instance = Instant::now();
|
||||
|
||||
let sig = transaction.signature.to_string();
|
||||
let sig = &transaction.signature;
|
||||
match map_of_infos.get_mut(&(sig.clone(), transaction.slot)) {
|
||||
Some(mut x) => {
|
||||
let tx_info = x.value_mut();
|
||||
|
@ -132,7 +137,7 @@ pub async fn start_tracking_banking_stage_errors(
|
|||
// map_of_infos might get populated by parallel writers if multiple geyser sources are configured
|
||||
let write_version = error_plugin_write_version.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
let tx_info = TransactionInfo::new(&transaction, write_version);
|
||||
map_of_infos.insert((sig, transaction.slot), tx_info);
|
||||
map_of_infos.insert((sig.clone(), transaction.slot), tx_info);
|
||||
}
|
||||
}
|
||||
},
|
||||
|
@ -157,7 +162,7 @@ async fn start_tracking_blocks(
|
|||
grpc_x_token: Option<String>,
|
||||
block_sender_postgres: Vec<Sender<BlockInfo>>,
|
||||
slot: Arc<AtomicU64>,
|
||||
alts_list: Vec<Pubkey>,
|
||||
alts_list: Vec<&Pubkey>,
|
||||
) {
|
||||
let block_counter = Arc::new(AtomicU64::new(0));
|
||||
let restart_block_subscription = Arc::new(AtomicBool::new(false));
|
||||
|
@ -181,10 +186,19 @@ async fn start_tracking_blocks(
|
|||
})
|
||||
};
|
||||
|
||||
let mut client = yellowstone_grpc_client_original::GeyserGrpcClient::connect(
|
||||
let buffer_config_large = GeyserGrpcClientBufferConfig {
|
||||
buffer_size: Some(65536), // 64kb (default: 1k)
|
||||
conn_window: Some(5242880), // 5mb (=default)
|
||||
stream_window: Some(4194304), // 4mb (default: 2m)
|
||||
};
|
||||
// see https://github.com/blockworks-foundation/geyser-grpc-connector/issues/10
|
||||
let mut client = yellowstone_grpc_client_original::GeyserGrpcClient::connect_with_timeout_with_buffers(
|
||||
grpc_block_addr,
|
||||
grpc_x_token,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
buffer_config_large
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
@ -223,7 +237,7 @@ async fn start_tracking_blocks(
|
|||
// };
|
||||
|
||||
let atl_store = Arc::new(alt_store::ALTStore::new(rpc_client));
|
||||
atl_store.load_alts_list(&alts_list).await;
|
||||
atl_store.load_alts_list(alts_list).await;
|
||||
|
||||
// let data = atl_store.serialize();
|
||||
// let mut alts_file = tokio::fs::File::create("alt_binary.bin").await.unwrap();
|
||||
|
@ -321,7 +335,7 @@ async fn start_tracking_blocks(
|
|||
if let Some(account) = account_update.account {
|
||||
let bytes: [u8; 32] = account.pubkey.try_into().unwrap_or(Pubkey::default().to_bytes());
|
||||
let pubkey = Pubkey::new_from_array(bytes);
|
||||
atl_store.save_account(&pubkey, &account.data);
|
||||
atl_store.save_account(pubkey, &account.data);
|
||||
}
|
||||
},
|
||||
_ => {}
|
||||
|
@ -335,7 +349,12 @@ async fn start_tracking_blocks(
|
|||
|
||||
#[tokio::main()]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt::init();
|
||||
tracing_subscriber::fmt::fmt()
|
||||
.with_env_filter(EnvFilter::from_default_env())
|
||||
// not sure if "CLOSE" is exactly what we want
|
||||
// ex. "close time.busy=14.7ms time.idle=14.0µs"
|
||||
.with_span_events(FmtSpan::CLOSE)
|
||||
.init();
|
||||
|
||||
let args = Args::parse();
|
||||
let rpc_client = Arc::new(rpc_client::RpcClient::new(args.rpc_url));
|
||||
|
@ -360,11 +379,12 @@ async fn main() -> anyhow::Result<()> {
|
|||
.split("\r\n")
|
||||
.map(|x| x.trim().to_string())
|
||||
.filter(|x| x.len() > 0)
|
||||
.map(|x| Pubkey::from_str(&x).unwrap())
|
||||
.map(|x| Pubkey::new_from_array(fd_bs58::decode_32(x).unwrap()))
|
||||
.collect_vec();
|
||||
|
||||
let mut block_senders = vec![];
|
||||
for i in 1..=4 {
|
||||
info!("Starting {} block senders", NUM_BLOCK_SENDERS);
|
||||
for i in 1..=NUM_BLOCK_SENDERS {
|
||||
let s = postgres::Postgres::new_with_workmem(i)
|
||||
.await
|
||||
.spawn_block_saver();
|
||||
|
@ -400,7 +420,7 @@ async fn main() -> anyhow::Result<()> {
|
|||
args.grpc_x_token,
|
||||
block_senders,
|
||||
slot,
|
||||
alts_list,
|
||||
alts_list.iter().map(|x| x).collect_vec(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
|
365
src/postgres.rs
365
src/postgres.rs
|
@ -3,17 +3,21 @@ use std::{
|
|||
sync::{atomic::AtomicU64, Arc},
|
||||
time::Duration,
|
||||
};
|
||||
use std::rc::Rc;
|
||||
|
||||
use anyhow::Context;
|
||||
use base64::Engine;
|
||||
use dashmap::DashMap;
|
||||
use futures::pin_mut;
|
||||
use futures::{join, pin_mut, try_join};
|
||||
use itertools::Itertools;
|
||||
use log::{debug, error, info, log, warn, Level};
|
||||
use native_tls::{Certificate, Identity, TlsConnector};
|
||||
use postgres_native_tls::MakeTlsConnector;
|
||||
use prometheus::{opts, register_int_gauge, IntGauge};
|
||||
use serde::Serialize;
|
||||
use solana_sdk::clock::Slot;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::signature::Signature;
|
||||
use solana_sdk::transaction::TransactionError;
|
||||
use tokio::sync::mpsc::error::SendTimeoutError;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
|
@ -104,6 +108,8 @@ impl PostgresSession {
|
|||
pub async fn new(nb: usize) -> anyhow::Result<Self> {
|
||||
let pg_config = std::env::var("PG_CONFIG").context("env PG_CONFIG not found")?;
|
||||
let pg_config = pg_config.parse::<tokio_postgres::Config>()?;
|
||||
info!("Using Postgres config to database {} with ssl_disabled={}",
|
||||
pg_config.get_dbname().unwrap_or("n/a"), matches!(pg_config.get_ssl_mode(), SslMode::Disable));
|
||||
|
||||
let client = if let SslMode::Disable = pg_config.get_ssl_mode() {
|
||||
Self::spawn_connection(pg_config, NoTls).await?
|
||||
|
@ -184,6 +190,23 @@ impl PostgresSession {
|
|||
info!("Configured work_mem={}", work_mem);
|
||||
}
|
||||
|
||||
pub async fn disable_postgres_workers(&self) {
|
||||
self.client
|
||||
.execute("SET max_parallel_workers_per_gather = 0", &[])
|
||||
.await
|
||||
.unwrap();
|
||||
info!("Disable parallel postgres workers");
|
||||
}
|
||||
|
||||
pub async fn relax_commit_settings(&self) {
|
||||
self.client
|
||||
.execute("SET synchronous_commit TO 'off'", &[])
|
||||
.await
|
||||
.unwrap();
|
||||
// note: commit_delay can be changed but requires superuser
|
||||
info!("Configured synchronous_commit");
|
||||
}
|
||||
|
||||
pub async fn drop_temp_table(&self, table: String) -> anyhow::Result<()> {
|
||||
self.client
|
||||
.execute(format!("drop table if exists {};", table).as_str(), &[])
|
||||
|
@ -191,7 +214,7 @@ impl PostgresSession {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn create_transaction_ids(&self, signatures: HashSet<String>) -> anyhow::Result<()> {
|
||||
pub async fn create_transaction_ids(&self, signatures: Vec<String>, slot: Slot) -> anyhow::Result<()> {
|
||||
// create temp table
|
||||
let temp_table = self.get_new_temp_table();
|
||||
|
||||
|
@ -227,9 +250,10 @@ impl PostgresSession {
|
|||
}
|
||||
let num_rows = writer.finish().await?;
|
||||
debug!(
|
||||
"inserted {} signatures into temp table in {}ms",
|
||||
"inserted {} signatures into temp table in {}ms (block {})",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
started_at.elapsed().as_millis(),
|
||||
slot
|
||||
);
|
||||
|
||||
let statement = format!(
|
||||
|
@ -243,19 +267,22 @@ impl PostgresSession {
|
|||
);
|
||||
let started_at = Instant::now();
|
||||
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
|
||||
debug!(
|
||||
"inserted {} signatures in transactions table in {}ms",
|
||||
"inserted {} signatures in transactions table in {}ms (block {})",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
started_at.elapsed().as_millis(),
|
||||
slot
|
||||
);
|
||||
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn create_accounts_for_transaction(
|
||||
&self,
|
||||
accounts: HashSet<String>,
|
||||
slot: Slot,
|
||||
) -> anyhow::Result<()> {
|
||||
// create temp table
|
||||
let temp_table = self.get_new_temp_table();
|
||||
|
@ -306,13 +333,15 @@ impl PostgresSession {
|
|||
);
|
||||
let started_at = Instant::now();
|
||||
self.client.execute(statement.as_str(), &[]).await?;
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
|
||||
debug!(
|
||||
"inserted {} account keys into accounts table in {}ms",
|
||||
"inserted {} account keys into accounts table in {}ms (block {})",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
started_at.elapsed().as_millis(),
|
||||
slot
|
||||
);
|
||||
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -412,8 +441,8 @@ impl PostgresSession {
|
|||
pub async fn insert_accounts_for_transaction(
|
||||
&self,
|
||||
accounts_for_transaction: Vec<AccountsForTransaction>,
|
||||
slot: Slot,
|
||||
) -> anyhow::Result<()> {
|
||||
|
||||
let instant = Instant::now();
|
||||
let temp_table = self.get_new_temp_table();
|
||||
self.client
|
||||
|
@ -449,9 +478,10 @@ impl PostgresSession {
|
|||
);
|
||||
pin_mut!(writer);
|
||||
for acc_tx in accounts_for_transaction {
|
||||
for acc in &acc_tx.accounts {
|
||||
for acc in acc_tx.accounts {
|
||||
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(4);
|
||||
args.push(&acc.key);
|
||||
let pubkey_str = &acc.key;
|
||||
args.push(&pubkey_str);
|
||||
args.push(&acc_tx.signature);
|
||||
args.push(&acc.writable);
|
||||
args.push(&acc.is_signer);
|
||||
|
@ -461,9 +491,10 @@ impl PostgresSession {
|
|||
}
|
||||
let num_rows = writer.finish().await?;
|
||||
debug!(
|
||||
"inserted {} accounts for transaction into temp table in {}ms",
|
||||
"inserted {} accounts for transaction into temp table in {}ms (block {})",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
started_at.elapsed().as_millis(),
|
||||
slot
|
||||
);
|
||||
|
||||
// note: no lock ordering here, as the accounts_map_transaction does not seem to cause deadlocks (issue 58)
|
||||
|
@ -485,66 +516,72 @@ impl PostgresSession {
|
|||
let started_at = Instant::now();
|
||||
let rows = self.client.execute(statement.as_str(), &[]).await?;
|
||||
debug!(
|
||||
"inserted {} accounts into accounts_map_transaction in {}ms",
|
||||
"inserted {} accounts into accounts_map_transaction in {}ms (block {})",
|
||||
rows,
|
||||
started_at.elapsed().as_millis()
|
||||
started_at.elapsed().as_millis(),
|
||||
slot
|
||||
);
|
||||
TIME_TO_STORE_TX_ACCOUNT_OLD.set(instant.elapsed().as_millis() as i64);
|
||||
|
||||
let instant = Instant::now();
|
||||
// merge data from temp table into accounts_map_transaction_latest
|
||||
// note: query uses the array_dedup_append postgres function to deduplicate and limit the array size
|
||||
// example: array_dedup_append('{8,3,2,1}', '{5,3}', 4) -> {2,1,5,3}
|
||||
let temp_table_latest_agged = self.get_new_temp_table();
|
||||
let statement = format!(
|
||||
r#"
|
||||
CREATE TEMP TABLE {temp_table_name} AS
|
||||
WITH amt_new AS (
|
||||
// DISABLED FOR NOW to see if the rest of the system works better without it
|
||||
warn!("DISABLED writing of table accounts_map_transaction_latest");
|
||||
if false {
|
||||
let instant = Instant::now();
|
||||
// merge data from temp table into accounts_map_transaction_latest
|
||||
// note: query uses the array_dedup_append postgres function to deduplicate and limit the array size
|
||||
// example: array_dedup_append('{8,3,2,1}', '{5,3}', 4) -> {2,1,5,3}
|
||||
let temp_table_latest_agged = self.get_new_temp_table();
|
||||
let statement = format!(
|
||||
r#"
|
||||
CREATE TEMP TABLE {temp_table_name} AS
|
||||
WITH amt_new AS (
|
||||
SELECT
|
||||
acc_id, array_agg(transactions.transaction_id) AS tx_agged
|
||||
FROM {temp_table_newdata} AS newdata
|
||||
inner join banking_stage_results_2.accounts on accounts.account_key=newdata.account_key
|
||||
inner join banking_stage_results_2.transactions on transactions.signature=newdata.signature
|
||||
GROUP BY acc_id
|
||||
)
|
||||
SELECT
|
||||
acc_id, array_agg(transactions.transaction_id) AS tx_agged
|
||||
FROM {temp_table_newdata} AS newdata
|
||||
inner join banking_stage_results_2.accounts on accounts.account_key=newdata.account_key
|
||||
inner join banking_stage_results_2.transactions on transactions.signature=newdata.signature
|
||||
GROUP BY acc_id
|
||||
)
|
||||
SELECT
|
||||
acc_id,
|
||||
array_dedup_append(
|
||||
(SELECT tx_ids FROM banking_stage_results_2.accounts_map_transaction_latest WHERE acc_id=amt_new.acc_id),
|
||||
amt_new.tx_agged,
|
||||
{limit}) AS tx_ids_agg
|
||||
FROM amt_new
|
||||
"#,
|
||||
temp_table_newdata = temp_table,
|
||||
temp_table_name = temp_table_latest_agged,
|
||||
limit = LIMIT_LATEST_TXS_PER_ACCOUNT
|
||||
);
|
||||
let started_at = Instant::now();
|
||||
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
|
||||
debug!(
|
||||
"merged new transactions into accounts_map_transaction_latest temp table for {} accounts in {}ms",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
acc_id,
|
||||
array_dedup_append(
|
||||
(SELECT tx_ids FROM banking_stage_results_2.accounts_map_transaction_latest WHERE acc_id=amt_new.acc_id),
|
||||
amt_new.tx_agged,
|
||||
{limit}) AS tx_ids_agg
|
||||
FROM amt_new
|
||||
"#,
|
||||
temp_table_newdata = temp_table,
|
||||
temp_table_name = temp_table_latest_agged,
|
||||
limit = LIMIT_LATEST_TXS_PER_ACCOUNT
|
||||
);
|
||||
let started_at = Instant::now();
|
||||
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
|
||||
debug!(
|
||||
"merged new transactions into accounts_map_transaction_latest temp table for {} accounts in {}ms",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
INSERT INTO banking_stage_results_2.accounts_map_transaction_latest(acc_id, tx_ids)
|
||||
SELECT acc_id, tx_ids_agg FROM {temp_table_name}
|
||||
ORDER BY acc_id
|
||||
ON CONFLICT (acc_id) DO UPDATE SET tx_ids = EXCLUDED.tx_ids
|
||||
"#,
|
||||
temp_table_name = temp_table_latest_agged
|
||||
);
|
||||
let started_at = Instant::now();
|
||||
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
|
||||
debug!(
|
||||
"upserted {} merged transaction arrays into accounts_map_transaction_latest in {}ms",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
TIME_TO_STORE_TX_ACCOUNT_NEW.set(instant.elapsed().as_millis() as i64);
|
||||
self.drop_temp_table(temp_table_latest_agged).await?;
|
||||
}
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
INSERT INTO banking_stage_results_2.accounts_map_transaction_latest(acc_id, tx_ids)
|
||||
SELECT acc_id, tx_ids_agg FROM {temp_table_name}
|
||||
ORDER BY acc_id
|
||||
ON CONFLICT (acc_id) DO UPDATE SET tx_ids = EXCLUDED.tx_ids
|
||||
"#,
|
||||
temp_table_name = temp_table_latest_agged
|
||||
);
|
||||
let started_at = Instant::now();
|
||||
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
|
||||
debug!(
|
||||
"upserted {} merged transaction arrays into accounts_map_transaction_latest in {}ms",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
TIME_TO_STORE_TX_ACCOUNT_NEW.set(instant.elapsed().as_millis() as i64);
|
||||
self.drop_temp_table(temp_table_latest_agged).await?;
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -552,7 +589,7 @@ impl PostgresSession {
|
|||
pub async fn insert_transactions_for_block(
|
||||
&self,
|
||||
transactions: &Vec<BlockTransactionInfo>,
|
||||
slot: i64,
|
||||
slot: Slot,
|
||||
) -> anyhow::Result<()> {
|
||||
let temp_table = self.get_new_temp_table();
|
||||
self.client
|
||||
|
@ -603,10 +640,11 @@ impl PostgresSession {
|
|||
],
|
||||
);
|
||||
pin_mut!(writer);
|
||||
let slot_db = slot as i64;
|
||||
for transaction in transactions {
|
||||
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(7);
|
||||
args.push(&transaction.signature);
|
||||
args.push(&slot);
|
||||
args.push(&slot_db);
|
||||
args.push(&transaction.is_successful);
|
||||
args.push(&transaction.cu_requested);
|
||||
args.push(&transaction.cu_consumed);
|
||||
|
@ -616,9 +654,10 @@ impl PostgresSession {
|
|||
}
|
||||
let num_rows = writer.finish().await?;
|
||||
debug!(
|
||||
"inserted {} transactions for block into temp table in {}ms",
|
||||
"inserted {} transactions for block into temp table in {}ms (block {})",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
started_at.elapsed().as_millis(),
|
||||
slot
|
||||
);
|
||||
|
||||
let statement = format!(
|
||||
|
@ -642,9 +681,10 @@ impl PostgresSession {
|
|||
let started_at = Instant::now();
|
||||
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
|
||||
debug!(
|
||||
"inserted {} transactions for block into transaction_infos table in {}ms",
|
||||
"inserted {} transactions for block into transaction_infos table in {}ms (block {})",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
started_at.elapsed().as_millis(),
|
||||
slot
|
||||
);
|
||||
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
|
@ -652,6 +692,7 @@ impl PostgresSession {
|
|||
}
|
||||
|
||||
pub async fn save_account_usage_in_block(&self, block_info: &BlockInfo) -> anyhow::Result<()> {
|
||||
let slot = block_info.slot;
|
||||
let temp_table = self.get_new_temp_table();
|
||||
self.client
|
||||
.execute(
|
||||
|
@ -729,9 +770,10 @@ impl PostgresSession {
|
|||
}
|
||||
let num_rows = writer.finish().await?;
|
||||
debug!(
|
||||
"inserted {} heavily_locked_accounts into temp table in {}ms",
|
||||
"inserted {} heavily_locked_accounts into temp table in {}ms (block {})",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
started_at.elapsed().as_millis(),
|
||||
slot
|
||||
);
|
||||
|
||||
let statement = format!(
|
||||
|
@ -773,13 +815,15 @@ impl PostgresSession {
|
|||
);
|
||||
let started_at = Instant::now();
|
||||
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
|
||||
debug!(
|
||||
"inserted {} heavily_locked_accounts into accounts_map_blocks table in {}ms",
|
||||
"inserted {} heavily_locked_accounts into accounts_map_blocks table in {}ms (block {})",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
started_at.elapsed().as_millis(),
|
||||
slot
|
||||
);
|
||||
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -815,9 +859,10 @@ impl PostgresSession {
|
|||
)
|
||||
.await?;
|
||||
debug!(
|
||||
"inserted {} block info into blocks table in {}ms",
|
||||
"inserted {} block info into blocks table in {}ms (block {})",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
started_at.elapsed().as_millis(),
|
||||
block_info.slot,
|
||||
);
|
||||
|
||||
if num_rows == 0 {
|
||||
|
@ -830,23 +875,25 @@ impl PostgresSession {
|
|||
pub async fn save_banking_transaction_results(
|
||||
&self,
|
||||
txs: Vec<TransactionInfo>,
|
||||
slot: Slot,
|
||||
) -> anyhow::Result<()> {
|
||||
if txs.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
// create transaction ids
|
||||
let signatures = txs
|
||||
let signatures: Vec<String> = txs
|
||||
.iter()
|
||||
.map(|transaction| transaction.signature.clone())
|
||||
.unique()
|
||||
.collect();
|
||||
self.create_transaction_ids(signatures).await?;
|
||||
self.create_transaction_ids(signatures, slot).await?;
|
||||
// create account ids
|
||||
let accounts = txs
|
||||
.iter()
|
||||
.flat_map(|transaction| transaction.account_used.clone())
|
||||
.map(|(acc, _)| acc)
|
||||
.collect();
|
||||
self.create_accounts_for_transaction(accounts).await?;
|
||||
self.create_accounts_for_transaction(accounts, slot).await?;
|
||||
// add transaction in tx slot table
|
||||
self.insert_transaction_in_txslot_table(txs.as_slice())
|
||||
.await?;
|
||||
|
@ -858,7 +905,7 @@ impl PostgresSession {
|
|||
.account_used
|
||||
.iter()
|
||||
.map(|(key, is_writable)| AccountUsed {
|
||||
key: key.clone(),
|
||||
key: fd_bs58::encode_32(key),
|
||||
writable: *is_writable,
|
||||
is_signer: false,
|
||||
is_atl: false,
|
||||
|
@ -870,7 +917,7 @@ impl PostgresSession {
|
|||
ACCOUNTS_SAVING_QUEUE.inc();
|
||||
let instant: Instant = Instant::now();
|
||||
ACCOUNTS_SAVING_QUEUE.dec();
|
||||
if let Err(e) = self.insert_accounts_for_transaction(txs_accounts).await {
|
||||
if let Err(e) = self.insert_accounts_for_transaction(txs_accounts, slot).await {
|
||||
error!("Error inserting accounts for transactions : {e:?}");
|
||||
}
|
||||
TIME_TO_STORE_TX_ACCOUNT.set(instant.elapsed().as_millis() as i64);
|
||||
|
@ -887,65 +934,81 @@ impl PostgresSession {
|
|||
}
|
||||
|
||||
pub async fn save_block(&self, block_info: BlockInfo) -> anyhow::Result<()> {
|
||||
let instant = Instant::now();
|
||||
// create transaction ids
|
||||
let int_sig = Instant::now();
|
||||
let signatures = block_info
|
||||
.transactions
|
||||
.iter()
|
||||
.map(|transaction| transaction.signature.clone())
|
||||
.collect();
|
||||
self.create_transaction_ids(signatures).await?;
|
||||
TIME_TO_SAVE_TRANSACTION.set(int_sig.elapsed().as_millis() as i64);
|
||||
// create account ids
|
||||
let ins_acc = Instant::now();
|
||||
let accounts = block_info
|
||||
.heavily_locked_accounts
|
||||
.iter()
|
||||
.map(|acc| acc.key.clone())
|
||||
.collect();
|
||||
self.create_accounts_for_transaction(accounts).await?;
|
||||
ACCOUNT_SAVE_TIME.set(ins_acc.elapsed().as_millis() as i64);
|
||||
let slot = block_info.slot as Slot;
|
||||
debug!("Saving block {} ...", slot);
|
||||
// 750ms
|
||||
let _span = tracing::info_span!("save_block", slot = block_info.slot);
|
||||
let save_block_started_at = Instant::now();
|
||||
let fut_signatures = async {
|
||||
// .3ms
|
||||
let _span = tracing::debug_span!("map_signatures", slot = block_info.slot);
|
||||
block_info
|
||||
.transactions
|
||||
.iter()
|
||||
.map(|transaction| transaction.signature.clone())
|
||||
.collect_vec()
|
||||
};
|
||||
let fut_accounts = async {
|
||||
// .6ms
|
||||
let _span = tracing::debug_span!("map_accounts", slot = block_info.slot);
|
||||
block_info
|
||||
.heavily_locked_accounts
|
||||
.iter()
|
||||
.map(|acc| acc.key.clone())
|
||||
.collect::<HashSet<String>>()
|
||||
};
|
||||
|
||||
let instant_acc_tx: Instant = Instant::now();
|
||||
let txs_accounts = block_info
|
||||
.transactions
|
||||
.iter()
|
||||
.map(|tx| AccountsForTransaction {
|
||||
signature: tx.signature.clone(),
|
||||
accounts: tx
|
||||
.accounts
|
||||
.iter()
|
||||
.map(|acc| AccountUsed {
|
||||
key: acc.key.to_string(),
|
||||
writable: acc.is_writable,
|
||||
is_signer: acc.is_signer,
|
||||
is_atl: acc.is_alt,
|
||||
})
|
||||
.collect(),
|
||||
})
|
||||
.collect_vec();
|
||||
if let Err(e) = self.insert_accounts_for_transaction(txs_accounts).await {
|
||||
error!("Error inserting accounts for transactions : {e:?}");
|
||||
}
|
||||
TIME_TO_STORE_TX_ACCOUNT.set(instant_acc_tx.elapsed().as_millis() as i64);
|
||||
let fut_txs_accounts = async {
|
||||
// 90ms
|
||||
let _span = tracing::debug_span!("map_txs_accounts", slot = block_info.slot);
|
||||
block_info
|
||||
.transactions
|
||||
.iter()
|
||||
.map(|tx| AccountsForTransaction {
|
||||
signature: tx.signature.clone(),
|
||||
accounts: tx
|
||||
.accounts
|
||||
.iter()
|
||||
.map(|acc| AccountUsed {
|
||||
key: fd_bs58::encode_32(&acc.key),
|
||||
writable: acc.is_writable,
|
||||
is_signer: acc.is_signer,
|
||||
is_atl: acc.is_alt,
|
||||
})
|
||||
.collect_vec(),
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
|
||||
// insert transactions
|
||||
let instant_save_tx = Instant::now();
|
||||
self.insert_transactions_for_block(&block_info.transactions, block_info.slot)
|
||||
.await?;
|
||||
TIME_TO_SAVE_TRANSACTION_DATA.set(instant_save_tx.elapsed().as_millis() as i64);
|
||||
let (signatures, accounts, txs_accounts) = join!(fut_signatures, fut_accounts, fut_txs_accounts);
|
||||
|
||||
// save account usage in blocks
|
||||
let ins = Instant::now();
|
||||
self.save_account_usage_in_block(&block_info).await?;
|
||||
TIME_TO_SAVE_BLOCK_ACCOUNTS.set(ins.elapsed().as_millis() as i64);
|
||||
let both_started_at = Instant::now();
|
||||
let fut_create_tx_ids = self.create_transaction_ids(signatures, slot);
|
||||
let fut_create_accs = self.create_accounts_for_transaction(accounts, slot);
|
||||
try_join!(fut_create_tx_ids, fut_create_accs)?;
|
||||
TIME_TO_SAVE_TRANSACTION.set(both_started_at.elapsed().as_millis() as i64);
|
||||
ACCOUNT_SAVE_TIME.set(both_started_at.elapsed().as_millis() as i64);
|
||||
|
||||
let inst_block_info = Instant::now();
|
||||
self.save_block_info(&block_info).await?;
|
||||
BLOCK_INFO_SAVE_TIME.set(inst_block_info.elapsed().as_millis() as i64);
|
||||
let both_started_at = Instant::now();
|
||||
// depends on transactions and accounts mapping tables
|
||||
let fut_insert_accounts_for_transaction = self.insert_accounts_for_transaction(txs_accounts, slot);
|
||||
// depends on transactions mapping table
|
||||
let fut_insert_transactions_for_block = self.insert_transactions_for_block(&block_info.transactions, slot);
|
||||
// depends on accounts mapping table
|
||||
let fut_save_account_usage_in_block = self.save_account_usage_in_block(&block_info);
|
||||
// no dependencies
|
||||
let fut_save_block_info = self.save_block_info(&block_info);
|
||||
let ((), (), (), ()) = try_join!(
|
||||
fut_insert_accounts_for_transaction,
|
||||
fut_insert_transactions_for_block,
|
||||
fut_save_account_usage_in_block,
|
||||
fut_save_block_info)?;
|
||||
TIME_TO_STORE_TX_ACCOUNT.set(both_started_at.elapsed().as_millis() as i64);
|
||||
TIME_TO_SAVE_TRANSACTION_DATA.set(both_started_at.elapsed().as_millis() as i64);
|
||||
TIME_TO_SAVE_BLOCK_ACCOUNTS.set(both_started_at.elapsed().as_millis() as i64);
|
||||
BLOCK_INFO_SAVE_TIME.set(both_started_at.elapsed().as_millis() as i64);
|
||||
|
||||
TIME_TO_SAVE_BLOCK.set(instant.elapsed().as_millis() as i64);
|
||||
TIME_TO_SAVE_BLOCK.set(save_block_started_at.elapsed().as_millis() as i64);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -959,7 +1022,9 @@ impl PostgresSession {
|
|||
// "accounts_map_transaction" -> delete rows with transaction_id before X
|
||||
// "transaction_infos" -> delete rows processed_slot before X
|
||||
// "transaction_slot" -> delete transaction with slot before X
|
||||
pub async fn cleanup_old_data(&self, slots_to_keep: i64, dry_run: bool) {
|
||||
|
||||
// count_rows=true might be very expensive
|
||||
pub async fn cleanup_old_data(&self, slots_to_keep: i64, dry_run: bool, count_rows: bool) {
|
||||
// keep 1mio slots (apprx 4 days)
|
||||
info!(
|
||||
"{}Running cleanup job with slots_to_keep={}",
|
||||
|
@ -968,8 +1033,9 @@ impl PostgresSession {
|
|||
);
|
||||
|
||||
self.configure_work_mem().await;
|
||||
self.disable_postgres_workers().await;
|
||||
|
||||
{
|
||||
if count_rows {
|
||||
info!("Rows before cleanup:");
|
||||
self.log_rowcount(Level::Info, "blocks").await;
|
||||
self.log_rowcount(Level::Info, "accounts").await;
|
||||
|
@ -979,6 +1045,8 @@ impl PostgresSession {
|
|||
.await;
|
||||
self.log_rowcount(Level::Info, "transaction_infos").await;
|
||||
self.log_rowcount(Level::Info, "transaction_slot").await;
|
||||
} else {
|
||||
info!("Skipping row count before cleanup");
|
||||
}
|
||||
|
||||
// max slot from blocks table
|
||||
|
@ -1227,7 +1295,7 @@ impl PostgresSession {
|
|||
);
|
||||
}
|
||||
|
||||
{
|
||||
if count_rows {
|
||||
info!("Rows after cleanup:");
|
||||
self.log_rowcount(Level::Info, "blocks").await;
|
||||
self.log_rowcount(Level::Info, "accounts").await;
|
||||
|
@ -1237,6 +1305,8 @@ impl PostgresSession {
|
|||
.await;
|
||||
self.log_rowcount(Level::Info, "transaction_infos").await;
|
||||
self.log_rowcount(Level::Info, "transaction_slot").await;
|
||||
} else {
|
||||
info!("Skipping row count after cleanup");
|
||||
}
|
||||
|
||||
info!("Cleanup job completed.");
|
||||
|
@ -1269,6 +1339,7 @@ impl Postgres {
|
|||
let session = PostgresSession::new(nb).await.unwrap();
|
||||
let session = Arc::new(session);
|
||||
session.configure_work_mem().await;
|
||||
session.relax_commit_settings().await;
|
||||
Self { session }
|
||||
}
|
||||
|
||||
|
@ -1334,7 +1405,7 @@ impl Postgres {
|
|||
.filter_map(|key| map_of_transaction.remove(key))
|
||||
.map(|(_, trans)| trans)
|
||||
.collect_vec();
|
||||
if let Err(err) = session.save_banking_transaction_results(batches).await {
|
||||
if let Err(err) = session.save_banking_transaction_results(batches, slot).await {
|
||||
panic!("saving transaction infos failed {}", err);
|
||||
}
|
||||
}
|
||||
|
@ -1366,8 +1437,8 @@ pub async fn send_block_info_to_buffer(
|
|||
block_info: BlockInfo,
|
||||
) -> anyhow::Result<()> {
|
||||
debug!(
|
||||
"block buffer capacity: {}",
|
||||
block_sender_postgres.capacity()
|
||||
"block buffer remaining capacity: {}",
|
||||
block_sender_postgres.max_capacity() - block_sender_postgres.capacity()
|
||||
);
|
||||
|
||||
const WARNING_THRESHOLD: Duration = Duration::from_millis(3000);
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
use std::{collections::HashMap, hash::Hash};
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use itertools::Itertools;
|
||||
|
@ -103,8 +105,9 @@ impl TransactionInfo {
|
|||
.iter()
|
||||
.map(|x| (x.account.clone(), x.is_writable))
|
||||
.collect_vec();
|
||||
|
||||
Self {
|
||||
signature: notification.signature.clone(),
|
||||
signature: notification.signature.clone().into(),
|
||||
errors,
|
||||
slot: notification.slot,
|
||||
utc_timestamp,
|
||||
|
|
Loading…
Reference in New Issue