Getting finalized meta before processed main (#366)

* Use jemalloc

* Solving issue of finalized meta after processed block (#365)

* Solving issue of finalized meta after processed block

* Fixing the broken test

* Minor changes

---------

Co-authored-by: Christian Kamm <mail@ckamm.de>
This commit is contained in:
galactus 2024-03-22 12:30:14 +01:00 committed by GitHub
parent ae495bed53
commit defdc20dd5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 66 additions and 1 deletions

21
Cargo.lock generated
View File

@ -2223,6 +2223,26 @@ version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c"
[[package]]
name = "jemalloc-sys"
version = "0.5.4+5.3.0-patched"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac6c1946e1cea1788cbfde01c993b52a10e2da07f4bac608228d1bed20bfebf2"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "jemallocator"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0de374a9f8e63150e6f5e8a60cc14c668226d7a347d8aee1a45766e3c4dd3bc"
dependencies = [
"jemalloc-sys",
"libc",
]
[[package]]
name = "jobserver"
version = "0.1.28"
@ -2537,6 +2557,7 @@ dependencies = [
"futures-util",
"hyper",
"itertools 0.10.5",
"jemallocator",
"jsonrpsee",
"lazy_static",
"log",

View File

@ -68,6 +68,7 @@ lazy_static = "1.4.0"
dotenv = "0.15.0"
async-channel = "1.8.0"
merge-streams = "0.1.2"
jemallocator = "0.5"
quinn = "0.10.2"
quinn-proto = "0.10.5"

View File

@ -144,3 +144,33 @@ impl BenchHelper {
Transaction::new(&signers, message, blockhash)
}
}
#[test]
fn transaction_size_small() {
let blockhash = Hash::default();
let payer_keypair = Keypair::from_base58_string(
"rKiJ7H5UUp3JR18kNyTF1XPuwPKHEM7gMLWHZPWP5djrW1vSjfwjhvJrevxF9MPmUmN9gJMLHZdLMgc9ao78eKr",
);
let seed = 42;
let random_strings = BenchHelper::generate_random_strings(1, Some(seed), 10);
let rand_string = random_strings.first().unwrap();
let tx = BenchHelper::create_memo_tx_small(rand_string, &payer_keypair, blockhash);
assert_eq!(bincode::serialized_size(&tx).unwrap(), 231);
}
#[test]
fn transaction_size_large() {
let blockhash = Hash::default();
let payer_keypair = Keypair::from_base58_string(
"rKiJ7H5UUp3JR18kNyTF1XPuwPKHEM7gMLWHZPWP5djrW1vSjfwjhvJrevxF9MPmUmN9gJMLHZdLMgc9ao78eKr",
);
let seed = 42;
let random_strings = BenchHelper::generate_random_strings(1, Some(seed), 232);
let rand_string = random_strings.first().unwrap();
let tx = BenchHelper::create_memo_tx_large(rand_string, &payer_keypair, blockhash);
assert_eq!(bincode::serialized_size(&tx).unwrap(), 1230);
}

View File

@ -259,6 +259,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
let mut cleanup_without_confirmed_recv_blocks_meta: u8 = 0;
let mut cleanup_without_finalized_recv_blocks_meta: u8 = 0;
let mut confirmed_block_not_yet_processed = HashSet::<solana_sdk::hash::Hash>::new();
let mut finalized_block_not_yet_processed = HashSet::<solana_sdk::hash::Hash>::new();
// start logging errors when we recieve first finalized block
let mut startup_completed = false;
@ -284,6 +285,11 @@ pub fn create_grpc_multiplex_blocks_subscription(
warn!("produced block channel has no receivers while trying to send confirmed block {e:?}");
}
}
if finalized_block_not_yet_processed.remove(&processed_block.blockhash) {
if let Err(e) = producedblock_sender.send(processed_block.to_finalized_block()) {
warn!("produced block channel has no receivers while trying to send confirmed block {e:?}");
}
}
recent_processed_blocks.insert(processed_block.blockhash, processed_block);
},
meta_confirmed = block_meta_reciever_confirmed.recv() => {
@ -320,6 +326,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
} else if startup_completed {
// this warning is ok for first few blocks when we start lrpc
log::warn!("finalized block meta received for blockhash {} which was never seen or already emitted", blockhash);
finalized_block_not_yet_processed.insert(blockhash);
}
},
_ = cleanup_tick.tick() => {

View File

@ -123,7 +123,7 @@ impl BlockInformationStore {
}
}
match self.blocks.entry(block_info.blockhash.clone()) {
match self.blocks.entry(block_info.blockhash) {
dashmap::mapref::entry::Entry::Occupied(entry) => {
let should_update = match entry.get().commitment_config.commitment {
CommitmentLevel::Finalized => false, // should never update blocks of finalized commitment

View File

@ -50,6 +50,7 @@ cap = { version = "0.1.2", features = ["stats"] }
tower = "0.4.13"
hyper = { version = "0.14", features = ["server", "http1", "http2"] }
tower-http = { version = "0.4.0", features = ["full"] }
jemallocator = { workspace = true }
solana-lite-rpc-core = { workspace = true }
solana-lite-rpc-util = { workspace = true }

View File

@ -70,6 +70,11 @@ use tokio::time::{timeout, Instant};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::EnvFilter;
// jemalloc seems to be better at keeping the memory footprint reasonable over
// longer periods of time
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
async fn get_latest_block(
mut block_stream: BlockStream,
commitment_config: CommitmentConfig,