Compare commits

...

5 Commits

Author SHA1 Message Date
GroovieGermanikus fd8fb50fb2
remove .clones and use fd_bs58 2024-04-25 12:45:11 +02:00
GroovieGermanikus a1946f59ba
CAUTION: temp disabled accounts_map_transaction_latest 2024-04-24 12:17:12 +02:00
GroovieGermanikus 1e5598eb5b
cleanupdb: disable postgres parallel workers
trying to fix "too many connection" issue
2024-03-26 22:55:47 +01:00
GroovieGermanikus 368f37cace
use connect_with_timeout_with_buffers 2024-03-26 09:47:02 +01:00
Groovie | Mango 78344dc8b4
cleanup: do not count rows unless enabled by argument "--count-rows" (#63)
cleanup: do not count rows unless enabled by argument "--count-rows"
2024-03-11 20:40:49 +01:00
11 changed files with 299 additions and 148 deletions

51
.github/workflows/rust_check.yml vendored Normal file
View File

@ -0,0 +1,51 @@
name: Cargo Build & Test
on:
push:
branches:
- main
pull_request:
env:
CARGO_TERM_COLOR: always
SCCACHE_GHA_ENABLED: true
RUSTC_WRAPPER: sccache
SCCACHE_CACHE_SIZE: "1G"
jobs:
build_and_test:
name: bankingstage-sidecar check and test
runs-on: ubuntu-22.04
steps:
- name: Install Linux Packages
run: |
sudo apt-get update -y
sudo apt-get install libssl-dev openssl -y
- uses: actions/checkout@v4
# The toolchain action should definitely be run before the cache action
- uses: actions-rust-lang/setup-rust-toolchain@v1
with:
cache: true
# avoid the default "-D warnings" which thrashes cache
rustflags: ""
- name: Run sccache-cache
uses: mozilla-actions/sccache-action@v0.0.3
# https://github.com/actions/cache/blob/main/examples.md#rust---cargo
# https://blog.arriven.wtf/posts/rust-ci-cache/
- uses: Swatinem/rust-cache@v2
with:
# will be covered by sscache
cache-targets: false
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
- name: Early Build
run: |
cargo check --locked --workspace --all-targets
- name: Run Tests
run: RUST_LOG=info cargo test

52
Cargo.lock generated
View File

@ -1241,6 +1241,12 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5"
[[package]]
name = "fd_bs58"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fdaac1b30ff95de4562a79ae990f91ac7bf2062d7313cc25c58987762c8e8160"
[[package]] [[package]]
name = "feature-probe" name = "feature-probe"
version = "0.1.1" version = "0.1.1"
@ -1466,6 +1472,7 @@ dependencies = [
"clap", "clap",
"const_env", "const_env",
"dashmap", "dashmap",
"fd_bs58",
"futures", "futures",
"itertools 0.10.5", "itertools 0.10.5",
"lazy_static", "lazy_static",
@ -1485,6 +1492,7 @@ dependencies = [
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-postgres", "tokio-postgres",
"tracing",
"tracing-subscriber", "tracing-subscriber",
"yellowstone-grpc-client 1.11.1+solana.1.16.17", "yellowstone-grpc-client 1.11.1+solana.1.16.17",
"yellowstone-grpc-client 1.12.0+solana.1.16.17", "yellowstone-grpc-client 1.12.0+solana.1.16.17",
@ -1951,6 +1959,15 @@ version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "matchers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata 0.1.10",
]
[[package]] [[package]]
name = "matchit" name = "matchit"
version = "0.7.3" version = "0.7.3"
@ -2775,8 +2792,17 @@ checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343"
dependencies = [ dependencies = [
"aho-corasick", "aho-corasick",
"memchr", "memchr",
"regex-automata", "regex-automata 0.4.3",
"regex-syntax", "regex-syntax 0.8.2",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax 0.6.29",
] ]
[[package]] [[package]]
@ -2787,9 +2813,15 @@ checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f"
dependencies = [ dependencies = [
"aho-corasick", "aho-corasick",
"memchr", "memchr",
"regex-syntax", "regex-syntax 0.8.2",
] ]
[[package]]
name = "regex-syntax"
version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]] [[package]]
name = "regex-syntax" name = "regex-syntax"
version = "0.8.2" version = "0.8.2"
@ -4428,10 +4460,14 @@ version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b"
dependencies = [ dependencies = [
"matchers",
"nu-ansi-term", "nu-ansi-term",
"once_cell",
"regex",
"sharded-slab", "sharded-slab",
"smallvec", "smallvec",
"thread_local", "thread_local",
"tracing",
"tracing-core", "tracing-core",
"tracing-log", "tracing-log",
] ]
@ -4886,7 +4922,7 @@ dependencies = [
[[package]] [[package]]
name = "yellowstone-grpc-client" name = "yellowstone-grpc-client"
version = "1.11.1+solana.1.16.17" version = "1.11.1+solana.1.16.17"
source = "git+https://github.com/blockworks-foundation/yellowstone-grpc.git?branch=tag-v1.16-mango#620c49a01a6de30790b2afcc71c6c709b3061880" source = "git+https://github.com/blockworks-foundation/yellowstone-grpc.git?branch=tag-v1.16-mango#ada20b8e3946d87bcb7ebfe97f6a2153366094e7"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures", "futures",
@ -4900,8 +4936,7 @@ dependencies = [
[[package]] [[package]]
name = "yellowstone-grpc-client" name = "yellowstone-grpc-client"
version = "1.12.0+solana.1.16.17" version = "1.12.0+solana.1.16.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/blockworks-foundation/yellowstone-grpc.git?tag=v1.11.0+solana.1.16.17-with-buffers2#751d756cb00c8a8c56a75a23e584f19bee5695cf"
checksum = "e58204f372a7e82d15d72bdf99334029c4e9cdc15bd2e9a5c33b598d9f1eb8b6"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures", "futures",
@ -4915,7 +4950,7 @@ dependencies = [
[[package]] [[package]]
name = "yellowstone-grpc-proto" name = "yellowstone-grpc-proto"
version = "1.10.0+solana.1.16.17" version = "1.10.0+solana.1.16.17"
source = "git+https://github.com/blockworks-foundation/yellowstone-grpc.git?branch=tag-v1.16-mango#620c49a01a6de30790b2afcc71c6c709b3061880" source = "git+https://github.com/blockworks-foundation/yellowstone-grpc.git?branch=tag-v1.16-mango#ada20b8e3946d87bcb7ebfe97f6a2153366094e7"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bincode", "bincode",
@ -4931,8 +4966,7 @@ dependencies = [
[[package]] [[package]]
name = "yellowstone-grpc-proto" name = "yellowstone-grpc-proto"
version = "1.11.0+solana.1.16.17" version = "1.11.0+solana.1.16.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/blockworks-foundation/yellowstone-grpc.git?tag=v1.11.0+solana.1.16.17-with-buffers2#751d756cb00c8a8c56a75a23e584f19bee5695cf"
checksum = "00d751c6ef3093ec90ab1e16c6a504b5bea99aca6c688c429fed4cc56782f57e"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bincode", "bincode",

View File

@ -17,13 +17,15 @@ serde = { version = "1.0.160", features = ["derive"] }
serde_json = "1.0.96" serde_json = "1.0.96"
bincode = "1.3.3" bincode = "1.3.3"
bs58 = "0.4.0" bs58 = "0.4.0"
fd_bs58 = "0.1.0"
base64 = "0.21.0" base64 = "0.21.0"
thiserror = "1.0.40" thiserror = "1.0.40"
futures = "0.3.28" futures = "0.3.28"
bytes = "1.4.0" bytes = "1.4.0"
anyhow = "1.0.70" anyhow = "1.0.70"
log = "0.4.17" log = "0.4.17"
tracing-subscriber = "0.3.18" tracing = "0.1.37"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
clap = { version = "4.2.4", features = ["derive", "env"] } clap = { version = "4.2.4", features = ["derive", "env"] }
dashmap = "5.4.0" dashmap = "5.4.0"
const_env = "0.1.2" const_env = "0.1.2"
@ -40,8 +42,8 @@ tokio = { version = "1.32.0", features = ["rt-multi-thread", "macros", "time"] }
yellowstone-grpc-client = {git = "https://github.com/blockworks-foundation/yellowstone-grpc.git", branch = "tag-v1.16-mango"} yellowstone-grpc-client = {git = "https://github.com/blockworks-foundation/yellowstone-grpc.git", branch = "tag-v1.16-mango"}
yellowstone-grpc-proto = {git = "https://github.com/blockworks-foundation/yellowstone-grpc.git", branch = "tag-v1.16-mango"} yellowstone-grpc-proto = {git = "https://github.com/blockworks-foundation/yellowstone-grpc.git", branch = "tag-v1.16-mango"}
yellowstone-grpc-client_original = { package = "yellowstone-grpc-client", version = "1.12.0+solana.1.16.17" } yellowstone-grpc-client_original = { package = "yellowstone-grpc-client", git = "https://github.com/blockworks-foundation/yellowstone-grpc.git", tag = "v1.11.0+solana.1.16.17-with-buffers2" }
yellowstone-grpc-proto_original = { package = "yellowstone-grpc-proto", version = "1.11.0+solana.1.16.17" } yellowstone-grpc-proto_original = { package = "yellowstone-grpc-proto", git = "https://github.com/blockworks-foundation/yellowstone-grpc.git", tag = "v1.11.0+solana.1.16.17-with-buffers2" }
[patch.crates-io] [patch.crates-io]

View File

@ -206,6 +206,7 @@ ALTER TABLE banking_stage_results_2.accounts_map_transaction
autovacuum_analyze_threshold=10000 autovacuum_analyze_threshold=10000
); );
-- not written ATM
ALTER TABLE banking_stage_results_2.accounts_map_transaction_latest ALTER TABLE banking_stage_results_2.accounts_map_transaction_latest
SET ( SET (
autovacuum_vacuum_scale_factor=0, autovacuum_vacuum_scale_factor=0,

View File

@ -11,6 +11,7 @@ while true; do
# startup delay # startup delay
sleep 300; sleep 300;
# should retain about 1 week of data # should retain about 1 week of data
# no "--count-rows"
RUST_LOG=info /usr/local/bin/cleanupdb --num-slots-to-keep $SLOTS_TO_KEEP; RUST_LOG=info /usr/local/bin/cleanupdb --num-slots-to-keep $SLOTS_TO_KEEP;
# every 5 hours # every 5 hours
sleep 18000; sleep 18000;

View File

@ -26,14 +26,14 @@ pub struct ALTStore {
impl ALTStore { impl ALTStore {
pub fn new(rpc_client: Arc<RpcClient>) -> Self { pub fn new(rpc_client: Arc<RpcClient>) -> Self {
let (sx, rx) = async_channel::unbounded(); let (sx, rx) = async_channel::unbounded();
let instant = Self { let alt_store = Self {
rpc_client, rpc_client,
map: Arc::new(DashMap::new()), map: Arc::new(DashMap::new()),
loading_queue: Arc::new(sx), loading_queue: Arc::new(sx),
}; };
{ {
let instant = instant.clone(); let instant = alt_store.clone();
tokio::task::spawn(async move { tokio::task::spawn(async move {
loop { loop {
if let Ok(pk) = rx.recv().await { if let Ok(pk) = rx.recv().await {
@ -51,15 +51,15 @@ impl ALTStore {
}); });
} }
instant alt_store
} }
pub async fn load_alts_list(&self, alts_list: &Vec<Pubkey>) { pub async fn load_alts_list(&self, alts_list: Vec<&Pubkey>) {
log::info!("Preloading {} ALTs", alts_list.len()); log::info!("Preloading {} ALTs", alts_list.len());
for batches in alts_list.chunks(1000).map(|x| x.to_vec()) { for batches in alts_list.chunks(1000) {
let tasks = batches.chunks(100).map(|batch| { let tasks = batches.chunks(100).map(|batch| {
let batch = batch.to_vec(); let batch: Vec<Pubkey> = batch.into_iter().map(|pk| (*pk).clone()).collect_vec();
let rpc_client = self.rpc_client.clone(); let rpc_client = self.rpc_client.clone();
let this = self.clone(); let this = self.clone();
tokio::spawn(async move { tokio::spawn(async move {
@ -74,7 +74,7 @@ impl ALTStore {
{ {
for (index, acc) in multiple_accounts.value.iter().enumerate() { for (index, acc) in multiple_accounts.value.iter().enumerate() {
if let Some(acc) = acc { if let Some(acc) = acc {
this.save_account(&batch[index], &acc.data); this.save_account(batch[index], &acc.data);
} }
} }
} }
@ -94,26 +94,25 @@ impl ALTStore {
let alts_list = alts_list let alts_list = alts_list
.iter() .iter()
.filter(|x| !self.map.contains_key(x)) .filter(|x| !self.map.contains_key(x))
.cloned()
.collect_vec(); .collect_vec();
if alts_list.is_empty() { if alts_list.is_empty() {
return; return;
} }
self.load_alts_list(&alts_list).await; self.load_alts_list(alts_list).await;
} }
pub async fn start_loading_missing_alts(&self, alts_list: &Vec<Pubkey>) { pub async fn start_loading_missing_alts(&self, alts_list: &Vec<&Pubkey>) {
for key in alts_list.iter().filter(|x| !self.map.contains_key(x)) { for key in alts_list.iter().filter(|x| !self.map.contains_key(x)) {
ALTS_IN_LOADING_QUEUE.inc(); ALTS_IN_LOADING_QUEUE.inc();
let _ = self.loading_queue.send(*key).await; let _ = self.loading_queue.send(*key.clone()).await;
} }
} }
pub fn save_account(&self, address: &Pubkey, data: &[u8]) { pub fn save_account(&self, address: Pubkey, data: &[u8]) {
let lookup_table = AddressLookupTable::deserialize(&data).unwrap(); let lookup_table = AddressLookupTable::deserialize(&data).unwrap();
if self if self
.map .map
.insert(address.clone(), lookup_table.addresses.to_vec()) .insert(address, lookup_table.addresses.to_vec())
.is_none() .is_none()
{ {
ALTS_IN_STORE.inc(); ALTS_IN_STORE.inc();
@ -121,7 +120,7 @@ impl ALTStore {
drop(lookup_table); drop(lookup_table);
} }
async fn load_accounts( async fn load_accounts<'a>(
&self, &self,
alt: &Pubkey, alt: &Pubkey,
write_accounts: &Vec<u8>, write_accounts: &Vec<u8>,
@ -188,13 +187,13 @@ impl ALTStore {
} }
pub fn serialize(&self) -> Vec<u8> { pub fn serialize(&self) -> Vec<u8> {
bincode::serialize::<BinaryALTData>(&BinaryALTData::new(&self.map)).unwrap() bincode::serialize::<BinaryALTData>(&BinaryALTData::new(self.map.clone())).unwrap()
} }
pub fn load_binary(&self, binary_data: Vec<u8>) { pub fn load_binary(&self, binary_data: Vec<u8>) {
let binary_alt_data = bincode::deserialize::<BinaryALTData>(&binary_data).unwrap(); let binary_alt_data = bincode::deserialize::<BinaryALTData>(&binary_data).unwrap();
for (alt, accounts) in binary_alt_data.data.iter() { for (pubkey, accounts) in binary_alt_data.data {
self.map.insert(alt.clone(), accounts.clone()); self.map.insert(pubkey, accounts);
} }
} }
} }
@ -205,7 +204,7 @@ pub struct BinaryALTData {
} }
impl BinaryALTData { impl BinaryALTData {
pub fn new(map: &Arc<DashMap<Pubkey, Vec<Pubkey>>>) -> Self { pub fn new(map: Arc<DashMap<Pubkey, Vec<Pubkey>>>) -> Self {
let data = map let data = map
.iter() .iter()
.map(|x| (x.key().clone(), x.value().clone())) .map(|x| (x.key().clone(), x.value().clone()))

View File

@ -13,6 +13,9 @@ pub struct Args {
#[arg(short, long, default_value_t = false)] #[arg(short, long, default_value_t = false)]
pub dry_run: bool, pub dry_run: bool,
#[arg(short, long, default_value_t = false)]
pub count_rows: bool,
} }
#[tokio::main()] #[tokio::main()]
@ -22,9 +25,10 @@ async fn main() {
let Args { let Args {
num_slots_to_keep, num_slots_to_keep,
dry_run, dry_run,
count_rows,
} = Args::parse(); } = Args::parse();
let session = PostgresSession::new(0).await.unwrap(); let session = PostgresSession::new(0).await.unwrap();
session.cleanup_old_data(num_slots_to_keep, dry_run).await; session.cleanup_old_data(num_slots_to_keep, dry_run, count_rows).await;
} }

View File

@ -17,6 +17,7 @@ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
sync::Arc, sync::Arc,
}; };
use std::rc::Rc;
#[derive(Serialize, Debug, Clone)] #[derive(Serialize, Debug, Clone)]
pub struct PrioFeeData { pub struct PrioFeeData {
@ -121,7 +122,7 @@ pub struct BlockInfo {
impl BlockInfo { impl BlockInfo {
pub async fn process_versioned_message( pub async fn process_versioned_message(
atl_store: &Arc<ALTStore>, atl_store: &Arc<ALTStore>,
signature: &String, signature: String,
slot: Slot, slot: Slot,
message: &VersionedMessage, message: &VersionedMessage,
prio_fees_in_block: &mut Vec<(u64, u64)>, prio_fees_in_block: &mut Vec<(u64, u64)>,
@ -183,10 +184,10 @@ impl BlockInfo {
if !is_vote { if !is_vote {
let mut accounts = message let mut accounts = message
.static_account_keys() .static_account_keys()
.iter() .iter().cloned()
.enumerate() .enumerate()
.map(|(index, account)| TransactionAccount { .map(|(index, account_pk)| TransactionAccount {
key: account.clone(), key: account_pk,
is_writable: message.is_maybe_writable(index), is_writable: message.is_maybe_writable(index),
is_signer: message.is_signer(index), is_signer: message.is_signer(index),
is_alt: false, is_alt: false,
@ -209,7 +210,7 @@ impl BlockInfo {
for writable_account in accounts for writable_account in accounts
.iter() .iter()
.filter(|x| x.is_writable) .filter(|x| x.is_writable)
.map(|x| x.key.clone()) .map(|x| x.key)
{ {
match writelocked_accounts.get_mut(&writable_account) { match writelocked_accounts.get_mut(&writable_account) {
Some(x) => { Some(x) => {
@ -219,9 +220,9 @@ impl BlockInfo {
} }
None => { None => {
writelocked_accounts.insert( writelocked_accounts.insert(
writable_account.clone(), writable_account,
AccountData { AccountData {
key: writable_account.to_string(), key: fd_bs58::encode_32(writable_account),
cu_consumed, cu_consumed,
cu_requested, cu_requested,
vec_pf: vec![prioritization_fees], vec_pf: vec![prioritization_fees],
@ -234,7 +235,7 @@ impl BlockInfo {
for readable_account in accounts for readable_account in accounts
.iter() .iter()
.filter(|x| !x.is_writable) .filter(|x| !x.is_writable)
.map(|x| x.key.clone()) .map(|x| x.key)
{ {
match readlocked_accounts.get_mut(&readable_account) { match readlocked_accounts.get_mut(&readable_account) {
Some(x) => { Some(x) => {
@ -244,9 +245,9 @@ impl BlockInfo {
} }
None => { None => {
readlocked_accounts.insert( readlocked_accounts.insert(
readable_account.clone(), readable_account,
AccountData { AccountData {
key: readable_account.to_string(), key: fd_bs58::encode_32(readable_account),
cu_consumed, cu_consumed,
cu_requested, cu_requested,
vec_pf: vec![prioritization_fees], vec_pf: vec![prioritization_fees],
@ -257,14 +258,14 @@ impl BlockInfo {
} }
Some(BlockTransactionInfo { Some(BlockTransactionInfo {
signature: signature.to_string(), signature,
processed_slot: slot as i64, processed_slot: slot as i64,
is_successful, is_successful,
cu_requested: cu_requested as i64, cu_requested: cu_requested as i64,
cu_consumed: cu_consumed as i64, cu_consumed: cu_consumed as i64,
prioritization_fees: prioritization_fees as i64, prioritization_fees: prioritization_fees as i64,
supp_infos: String::new(), supp_infos: String::new(),
accounts: accounts, accounts,
}) })
} else { } else {
None None
@ -348,6 +349,7 @@ impl BlockInfo {
block: &yellowstone_grpc_proto_original::prelude::SubscribeUpdateBlock, block: &yellowstone_grpc_proto_original::prelude::SubscribeUpdateBlock,
) -> BlockInfo { ) -> BlockInfo {
let block_hash = block.blockhash.clone(); let block_hash = block.blockhash.clone();
let _span = tracing::debug_span!("map_block_info", block_hash = block_hash);
let slot = block.slot; let slot = block.slot;
let leader_identity = block let leader_identity = block
.rewards .rewards
@ -401,9 +403,8 @@ impl BlockInfo {
let Some(meta) = &transaction.meta else { let Some(meta) = &transaction.meta else {
return None; return None;
}; };
let signature = Signature::try_from(&tx.signatures[0][0..64])
.unwrap() let signature = fd_bs58::encode_64(&tx.signatures[0]);
.to_string();
let message = VersionedMessage::V0(v0::Message { let message = VersionedMessage::V0(v0::Message {
header: MessageHeader { header: MessageHeader {
@ -442,7 +443,7 @@ impl BlockInfo {
.try_into() .try_into()
.unwrap_or(Pubkey::default().to_bytes()); .unwrap_or(Pubkey::default().to_bytes());
let account_key = Pubkey::new_from_array(bytes); let account_key = Pubkey::new_from_array(bytes);
lookup_tables.insert(account_key.clone()); lookup_tables.insert(account_key);
MessageAddressTableLookup { MessageAddressTableLookup {
account_key, account_key,
writable_indexes: table.writable_indexes, writable_indexes: table.writable_indexes,
@ -456,14 +457,14 @@ impl BlockInfo {
.collect_vec(); .collect_vec();
atl_store atl_store
.start_loading_missing_alts(&lookup_tables.iter().cloned().collect_vec()) .start_loading_missing_alts(&lookup_tables.iter().collect_vec())
.await; .await;
let mut block_transactions = vec![]; let mut block_transactions = vec![];
for (signature, message, meta, is_vote) in sigs_and_messages { for (signature, message, meta, is_vote) in sigs_and_messages {
let tx = Self::process_versioned_message( let tx = Self::process_versioned_message(
&atl_store, &atl_store,
&signature, signature,
slot, slot,
&message, &message,
&mut prio_fees_in_block, &mut prio_fees_in_block,

View File

@ -26,6 +26,9 @@ use futures::StreamExt;
use log::{debug, error, info}; use log::{debug, error, info};
use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter, IntGauge}; use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter, IntGauge};
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::fmt::format::FmtSpan;
use yellowstone_grpc_client_original::GeyserGrpcClientBufferConfig;
use transaction_info::TransactionInfo; use transaction_info::TransactionInfo;
mod alt_store; mod alt_store;
@ -122,7 +125,7 @@ pub async fn start_tracking_banking_stage_errors(
BANKING_STAGE_ERROR_EVENT_COUNT.inc(); BANKING_STAGE_ERROR_EVENT_COUNT.inc();
instance = Instant::now(); instance = Instant::now();
let sig = transaction.signature.to_string(); let sig = &transaction.signature;
match map_of_infos.get_mut(&(sig.clone(), transaction.slot)) { match map_of_infos.get_mut(&(sig.clone(), transaction.slot)) {
Some(mut x) => { Some(mut x) => {
let tx_info = x.value_mut(); let tx_info = x.value_mut();
@ -132,7 +135,7 @@ pub async fn start_tracking_banking_stage_errors(
// map_of_infos might get populated by parallel writers if multiple geyser sources are configured // map_of_infos might get populated by parallel writers if multiple geyser sources are configured
let write_version = error_plugin_write_version.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let write_version = error_plugin_write_version.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let tx_info = TransactionInfo::new(&transaction, write_version); let tx_info = TransactionInfo::new(&transaction, write_version);
map_of_infos.insert((sig, transaction.slot), tx_info); map_of_infos.insert((sig.clone(), transaction.slot), tx_info);
} }
} }
}, },
@ -157,7 +160,7 @@ async fn start_tracking_blocks(
grpc_x_token: Option<String>, grpc_x_token: Option<String>,
block_sender_postgres: Vec<Sender<BlockInfo>>, block_sender_postgres: Vec<Sender<BlockInfo>>,
slot: Arc<AtomicU64>, slot: Arc<AtomicU64>,
alts_list: Vec<Pubkey>, alts_list: Vec<&Pubkey>,
) { ) {
let block_counter = Arc::new(AtomicU64::new(0)); let block_counter = Arc::new(AtomicU64::new(0));
let restart_block_subscription = Arc::new(AtomicBool::new(false)); let restart_block_subscription = Arc::new(AtomicBool::new(false));
@ -181,10 +184,19 @@ async fn start_tracking_blocks(
}) })
}; };
let mut client = yellowstone_grpc_client_original::GeyserGrpcClient::connect( let buffer_config_large = GeyserGrpcClientBufferConfig {
buffer_size: Some(65536), // 64kb (default: 1k)
conn_window: Some(5242880), // 5mb (=default)
stream_window: Some(4194304), // 4mb (default: 2m)
};
// see https://github.com/blockworks-foundation/geyser-grpc-connector/issues/10
let mut client = yellowstone_grpc_client_original::GeyserGrpcClient::connect_with_timeout_with_buffers(
grpc_block_addr, grpc_block_addr,
grpc_x_token, grpc_x_token,
None, None,
None,
None,
buffer_config_large
) )
.unwrap(); .unwrap();
@ -223,7 +235,7 @@ async fn start_tracking_blocks(
// }; // };
let atl_store = Arc::new(alt_store::ALTStore::new(rpc_client)); let atl_store = Arc::new(alt_store::ALTStore::new(rpc_client));
atl_store.load_alts_list(&alts_list).await; atl_store.load_alts_list(alts_list).await;
// let data = atl_store.serialize(); // let data = atl_store.serialize();
// let mut alts_file = tokio::fs::File::create("alt_binary.bin").await.unwrap(); // let mut alts_file = tokio::fs::File::create("alt_binary.bin").await.unwrap();
@ -321,7 +333,7 @@ async fn start_tracking_blocks(
if let Some(account) = account_update.account { if let Some(account) = account_update.account {
let bytes: [u8; 32] = account.pubkey.try_into().unwrap_or(Pubkey::default().to_bytes()); let bytes: [u8; 32] = account.pubkey.try_into().unwrap_or(Pubkey::default().to_bytes());
let pubkey = Pubkey::new_from_array(bytes); let pubkey = Pubkey::new_from_array(bytes);
atl_store.save_account(&pubkey, &account.data); atl_store.save_account(pubkey, &account.data);
} }
}, },
_ => {} _ => {}
@ -335,7 +347,12 @@ async fn start_tracking_blocks(
#[tokio::main()] #[tokio::main()]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::fmt()
.with_env_filter(EnvFilter::from_default_env())
// not sure if "CLOSE" is exactly what we want
// ex. "close time.busy=14.7ms time.idle=14.0µs"
.with_span_events(FmtSpan::CLOSE)
.init();
let args = Args::parse(); let args = Args::parse();
let rpc_client = Arc::new(rpc_client::RpcClient::new(args.rpc_url)); let rpc_client = Arc::new(rpc_client::RpcClient::new(args.rpc_url));
@ -360,7 +377,7 @@ async fn main() -> anyhow::Result<()> {
.split("\r\n") .split("\r\n")
.map(|x| x.trim().to_string()) .map(|x| x.trim().to_string())
.filter(|x| x.len() > 0) .filter(|x| x.len() > 0)
.map(|x| Pubkey::from_str(&x).unwrap()) .map(|x| Pubkey::new_from_array(fd_bs58::decode_32(x).unwrap()))
.collect_vec(); .collect_vec();
let mut block_senders = vec![]; let mut block_senders = vec![];
@ -400,7 +417,7 @@ async fn main() -> anyhow::Result<()> {
args.grpc_x_token, args.grpc_x_token,
block_senders, block_senders,
slot, slot,
alts_list, alts_list.iter().map(|x| x).collect_vec(),
) )
.await; .await;
} }

View File

@ -3,6 +3,7 @@ use std::{
sync::{atomic::AtomicU64, Arc}, sync::{atomic::AtomicU64, Arc},
time::Duration, time::Duration,
}; };
use std::rc::Rc;
use anyhow::Context; use anyhow::Context;
use base64::Engine; use base64::Engine;
@ -14,6 +15,7 @@ use native_tls::{Certificate, Identity, TlsConnector};
use postgres_native_tls::MakeTlsConnector; use postgres_native_tls::MakeTlsConnector;
use prometheus::{opts, register_int_gauge, IntGauge}; use prometheus::{opts, register_int_gauge, IntGauge};
use serde::Serialize; use serde::Serialize;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::transaction::TransactionError; use solana_sdk::transaction::TransactionError;
use tokio::sync::mpsc::error::SendTimeoutError; use tokio::sync::mpsc::error::SendTimeoutError;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
@ -65,7 +67,7 @@ lazy_static::lazy_static! {
static ref TIME_TO_STORE_TX_ACCOUNT_OLD: IntGauge = static ref TIME_TO_STORE_TX_ACCOUNT_OLD: IntGauge =
register_int_gauge!(opts!("banking_stage_sidecar_tx_account_old", "Account in tx account old")).unwrap(); register_int_gauge!(opts!("banking_stage_sidecar_tx_account_old", "Account in tx account old")).unwrap();
static ref TIME_TO_STORE_TX_ACCOUNT_NEW: IntGauge = static ref TIME_TO_STORE_TX_ACCOUNT_NEW: IntGauge =
register_int_gauge!(opts!("banking_stage_sidecar_tx_account_new", "Account in tx account new")).unwrap(); register_int_gauge!(opts!("banking_stage_sidecar_tx_account_new", "Account in tx account new")).unwrap();
} }
@ -184,6 +186,14 @@ impl PostgresSession {
info!("Configured work_mem={}", work_mem); info!("Configured work_mem={}", work_mem);
} }
pub async fn disable_postgres_workers(&self) {
self.client
.execute("SET max_parallel_workers_per_gather = 0", &[])
.await
.unwrap();
info!("Disable parallel postgres workers");
}
pub async fn drop_temp_table(&self, table: String) -> anyhow::Result<()> { pub async fn drop_temp_table(&self, table: String) -> anyhow::Result<()> {
self.client self.client
.execute(format!("drop table if exists {};", table).as_str(), &[]) .execute(format!("drop table if exists {};", table).as_str(), &[])
@ -191,7 +201,7 @@ impl PostgresSession {
Ok(()) Ok(())
} }
pub async fn create_transaction_ids(&self, signatures: HashSet<String>) -> anyhow::Result<()> { pub async fn create_transaction_ids(&self, signatures: Vec<String>) -> anyhow::Result<()> {
// create temp table // create temp table
let temp_table = self.get_new_temp_table(); let temp_table = self.get_new_temp_table();
@ -413,7 +423,6 @@ impl PostgresSession {
&self, &self,
accounts_for_transaction: Vec<AccountsForTransaction>, accounts_for_transaction: Vec<AccountsForTransaction>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let instant = Instant::now(); let instant = Instant::now();
let temp_table = self.get_new_temp_table(); let temp_table = self.get_new_temp_table();
self.client self.client
@ -449,9 +458,10 @@ impl PostgresSession {
); );
pin_mut!(writer); pin_mut!(writer);
for acc_tx in accounts_for_transaction { for acc_tx in accounts_for_transaction {
for acc in &acc_tx.accounts { for acc in acc_tx.accounts {
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(4); let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(4);
args.push(&acc.key); let pubkey_str = &acc.key;
args.push(&pubkey_str);
args.push(&acc_tx.signature); args.push(&acc_tx.signature);
args.push(&acc.writable); args.push(&acc.writable);
args.push(&acc.is_signer); args.push(&acc.is_signer);
@ -491,60 +501,65 @@ impl PostgresSession {
); );
TIME_TO_STORE_TX_ACCOUNT_OLD.set(instant.elapsed().as_millis() as i64); TIME_TO_STORE_TX_ACCOUNT_OLD.set(instant.elapsed().as_millis() as i64);
let instant = Instant::now(); // DISABLED FOR NOW to see if the rest of the system works better without it
// merge data from temp table into accounts_map_transaction_latest warn!("DISABLED writing of table accounts_map_transaction_latest");
// note: query uses the array_dedup_append postgres function to deduplicate and limit the array size if false {
// example: array_dedup_append('{8,3,2,1}', '{5,3}', 4) -> {2,1,5,3} let instant = Instant::now();
let temp_table_latest_agged = self.get_new_temp_table(); // merge data from temp table into accounts_map_transaction_latest
let statement = format!( // note: query uses the array_dedup_append postgres function to deduplicate and limit the array size
r#" // example: array_dedup_append('{8,3,2,1}', '{5,3}', 4) -> {2,1,5,3}
CREATE TEMP TABLE {temp_table_name} AS let temp_table_latest_agged = self.get_new_temp_table();
WITH amt_new AS ( let statement = format!(
r#"
CREATE TEMP TABLE {temp_table_name} AS
WITH amt_new AS (
SELECT
acc_id, array_agg(transactions.transaction_id) AS tx_agged
FROM {temp_table_newdata} AS newdata
inner join banking_stage_results_2.accounts on accounts.account_key=newdata.account_key
inner join banking_stage_results_2.transactions on transactions.signature=newdata.signature
GROUP BY acc_id
)
SELECT SELECT
acc_id, array_agg(transactions.transaction_id) AS tx_agged acc_id,
FROM {temp_table_newdata} AS newdata array_dedup_append(
inner join banking_stage_results_2.accounts on accounts.account_key=newdata.account_key (SELECT tx_ids FROM banking_stage_results_2.accounts_map_transaction_latest WHERE acc_id=amt_new.acc_id),
inner join banking_stage_results_2.transactions on transactions.signature=newdata.signature amt_new.tx_agged,
GROUP BY acc_id {limit}) AS tx_ids_agg
) FROM amt_new
SELECT "#,
acc_id, temp_table_newdata = temp_table,
array_dedup_append( temp_table_name = temp_table_latest_agged,
(SELECT tx_ids FROM banking_stage_results_2.accounts_map_transaction_latest WHERE acc_id=amt_new.acc_id), limit = LIMIT_LATEST_TXS_PER_ACCOUNT
amt_new.tx_agged, );
{limit}) AS tx_ids_agg let started_at = Instant::now();
FROM amt_new let num_rows = self.client.execute(statement.as_str(), &[]).await?;
"#, debug!(
temp_table_newdata = temp_table, "merged new transactions into accounts_map_transaction_latest temp table for {} accounts in {}ms",
temp_table_name = temp_table_latest_agged, num_rows,
limit = LIMIT_LATEST_TXS_PER_ACCOUNT started_at.elapsed().as_millis()
); );
let started_at = Instant::now();
let num_rows = self.client.execute(statement.as_str(), &[]).await?; let statement = format!(
debug!( r#"
"merged new transactions into accounts_map_transaction_latest temp table for {} accounts in {}ms", INSERT INTO banking_stage_results_2.accounts_map_transaction_latest(acc_id, tx_ids)
num_rows, SELECT acc_id, tx_ids_agg FROM {temp_table_name}
started_at.elapsed().as_millis() ORDER BY acc_id
); ON CONFLICT (acc_id) DO UPDATE SET tx_ids = EXCLUDED.tx_ids
"#,
temp_table_name = temp_table_latest_agged
);
let started_at = Instant::now();
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
debug!(
"upserted {} merged transaction arrays into accounts_map_transaction_latest in {}ms",
num_rows,
started_at.elapsed().as_millis()
);
TIME_TO_STORE_TX_ACCOUNT_NEW.set(instant.elapsed().as_millis() as i64);
self.drop_temp_table(temp_table_latest_agged).await?;
}
let statement = format!(
r#"
INSERT INTO banking_stage_results_2.accounts_map_transaction_latest(acc_id, tx_ids)
SELECT acc_id, tx_ids_agg FROM {temp_table_name}
ORDER BY acc_id
ON CONFLICT (acc_id) DO UPDATE SET tx_ids = EXCLUDED.tx_ids
"#,
temp_table_name = temp_table_latest_agged
);
let started_at = Instant::now();
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
debug!(
"upserted {} merged transaction arrays into accounts_map_transaction_latest in {}ms",
num_rows,
started_at.elapsed().as_millis()
);
TIME_TO_STORE_TX_ACCOUNT_NEW.set(instant.elapsed().as_millis() as i64);
self.drop_temp_table(temp_table_latest_agged).await?;
self.drop_temp_table(temp_table).await?; self.drop_temp_table(temp_table).await?;
Ok(()) Ok(())
} }
@ -835,9 +850,10 @@ impl PostgresSession {
return Ok(()); return Ok(());
} }
// create transaction ids // create transaction ids
let signatures = txs let signatures: Vec<String> = txs
.iter() .iter()
.map(|transaction| transaction.signature.clone()) .map(|transaction| transaction.signature.clone())
.unique()
.collect(); .collect();
self.create_transaction_ids(signatures).await?; self.create_transaction_ids(signatures).await?;
// create account ids // create account ids
@ -858,7 +874,7 @@ impl PostgresSession {
.account_used .account_used
.iter() .iter()
.map(|(key, is_writable)| AccountUsed { .map(|(key, is_writable)| AccountUsed {
key: key.clone(), key: fd_bs58::encode_32(key),
writable: *is_writable, writable: *is_writable,
is_signer: false, is_signer: false,
is_atl: false, is_atl: false,
@ -887,44 +903,59 @@ impl PostgresSession {
} }
pub async fn save_block(&self, block_info: BlockInfo) -> anyhow::Result<()> { pub async fn save_block(&self, block_info: BlockInfo) -> anyhow::Result<()> {
// 750ms
let _span = tracing::info_span!("save_block", slot = block_info.slot);
let instant = Instant::now(); let instant = Instant::now();
// create transaction ids // create transaction ids
let int_sig = Instant::now(); let int_sig = Instant::now();
let signatures = block_info let signatures = {
.transactions // .3ms
.iter() let _span = tracing::debug_span!("map_signatures", slot = block_info.slot);
.map(|transaction| transaction.signature.clone()) block_info
.collect(); .transactions
.iter()
.map(|transaction| transaction.signature.clone())
.collect()
};
self.create_transaction_ids(signatures).await?; self.create_transaction_ids(signatures).await?;
TIME_TO_SAVE_TRANSACTION.set(int_sig.elapsed().as_millis() as i64); TIME_TO_SAVE_TRANSACTION.set(int_sig.elapsed().as_millis() as i64);
// create account ids // create account ids
let ins_acc = Instant::now(); let ins_acc = Instant::now();
let accounts = block_info let accounts = {
.heavily_locked_accounts // .6ms
.iter() let _span = tracing::debug_span!("map_accounts", slot = block_info.slot);
.map(|acc| acc.key.clone()) block_info
.collect(); .heavily_locked_accounts
.iter()
.map(|acc| acc.key.clone())
.collect()
};
self.create_accounts_for_transaction(accounts).await?; self.create_accounts_for_transaction(accounts).await?;
ACCOUNT_SAVE_TIME.set(ins_acc.elapsed().as_millis() as i64); ACCOUNT_SAVE_TIME.set(ins_acc.elapsed().as_millis() as i64);
let instant_acc_tx: Instant = Instant::now(); let instant_acc_tx: Instant = Instant::now();
let txs_accounts = block_info let txs_accounts = {
.transactions // 90ms
.iter() let _span = tracing::debug_span!("map_txs_accounts", slot = block_info.slot);
.map(|tx| AccountsForTransaction { block_info
signature: tx.signature.clone(), .transactions
accounts: tx .iter()
.accounts .map(|tx| AccountsForTransaction {
.iter() signature: tx.signature.clone(),
.map(|acc| AccountUsed { accounts: tx
key: acc.key.to_string(), .accounts
writable: acc.is_writable, .iter()
is_signer: acc.is_signer, .map(|acc| AccountUsed {
is_atl: acc.is_alt, key: fd_bs58::encode_32(&acc.key),
}) writable: acc.is_writable,
.collect(), is_signer: acc.is_signer,
}) is_atl: acc.is_alt,
.collect_vec(); })
.collect(),
})
.collect_vec()
};
if let Err(e) = self.insert_accounts_for_transaction(txs_accounts).await { if let Err(e) = self.insert_accounts_for_transaction(txs_accounts).await {
error!("Error inserting accounts for transactions : {e:?}"); error!("Error inserting accounts for transactions : {e:?}");
} }
@ -959,7 +990,9 @@ impl PostgresSession {
// "accounts_map_transaction" -> delete rows with transaction_id before X // "accounts_map_transaction" -> delete rows with transaction_id before X
// "transaction_infos" -> delete rows processed_slot before X // "transaction_infos" -> delete rows processed_slot before X
// "transaction_slot" -> delete transaction with slot before X // "transaction_slot" -> delete transaction with slot before X
pub async fn cleanup_old_data(&self, slots_to_keep: i64, dry_run: bool) {
// count_rows=true might be very expensive
pub async fn cleanup_old_data(&self, slots_to_keep: i64, dry_run: bool, count_rows: bool) {
// keep 1mio slots (apprx 4 days) // keep 1mio slots (apprx 4 days)
info!( info!(
"{}Running cleanup job with slots_to_keep={}", "{}Running cleanup job with slots_to_keep={}",
@ -968,8 +1001,9 @@ impl PostgresSession {
); );
self.configure_work_mem().await; self.configure_work_mem().await;
self.disable_postgres_workers().await;
{ if count_rows {
info!("Rows before cleanup:"); info!("Rows before cleanup:");
self.log_rowcount(Level::Info, "blocks").await; self.log_rowcount(Level::Info, "blocks").await;
self.log_rowcount(Level::Info, "accounts").await; self.log_rowcount(Level::Info, "accounts").await;
@ -979,6 +1013,8 @@ impl PostgresSession {
.await; .await;
self.log_rowcount(Level::Info, "transaction_infos").await; self.log_rowcount(Level::Info, "transaction_infos").await;
self.log_rowcount(Level::Info, "transaction_slot").await; self.log_rowcount(Level::Info, "transaction_slot").await;
} else {
info!("Skipping row count before cleanup");
} }
// max slot from blocks table // max slot from blocks table
@ -1227,7 +1263,7 @@ impl PostgresSession {
); );
} }
{ if count_rows {
info!("Rows after cleanup:"); info!("Rows after cleanup:");
self.log_rowcount(Level::Info, "blocks").await; self.log_rowcount(Level::Info, "blocks").await;
self.log_rowcount(Level::Info, "accounts").await; self.log_rowcount(Level::Info, "accounts").await;
@ -1237,6 +1273,8 @@ impl PostgresSession {
.await; .await;
self.log_rowcount(Level::Info, "transaction_infos").await; self.log_rowcount(Level::Info, "transaction_infos").await;
self.log_rowcount(Level::Info, "transaction_slot").await; self.log_rowcount(Level::Info, "transaction_slot").await;
} else {
info!("Skipping row count after cleanup");
} }
info!("Cleanup job completed."); info!("Cleanup job completed.");

View File

@ -1,4 +1,6 @@
use std::{collections::HashMap, hash::Hash}; use std::{collections::HashMap, hash::Hash};
use std::rc::Rc;
use std::sync::Arc;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use itertools::Itertools; use itertools::Itertools;
@ -103,8 +105,9 @@ impl TransactionInfo {
.iter() .iter()
.map(|x| (x.account.clone(), x.is_writable)) .map(|x| (x.account.clone(), x.is_writable))
.collect_vec(); .collect_vec();
Self { Self {
signature: notification.signature.clone(), signature: notification.signature.clone().into(),
errors, errors,
slot: notification.slot, slot: notification.slot,
utc_timestamp, utc_timestamp,