Merge production main 2024 03 27 (#374)

* Use jemalloc

* Solving issue of finalized meta after processed block (#365)

* Solving issue of finalized meta after processed block

* Fixing the broken test

* Removing unwanted logs

* Add logs on block queues and reception time

* reduce level of block reception log

* Revert "Solving issue of finalized meta after processed block (#365)"

This reverts commit c09700fd79.

Production runs show more stable memory use but worse performance on
landing transactions.

* use yellowstone grpc with hacked-windowsize

* upgraded geyser-grpc-connector + increased timeout

* Fix block stream throughput problems

By temporarily pasting in a function to connect to block streams via a
more generously configured endpoint.

* Enabling unstable tokio on fly

* Fixing prioritization heap metrics (#370)

* Fixing prioritization heap metrics

* Fixing transaction sizes tests

* Correctly ending the block streams instead of aborting them

* Replacing exit signal with exit notification (#372)

* Replacing exit signal with exit notification

* Deprecitating nightly version

* Increase connection size

* Fixing issue with multiple notify channels

* Joining heap task instead of aborting

* fix postgres ssl algo issue - RC2-40-CBC  (#373)

* fix 373: openssl-legacy.cnf

* Remove replace console subscriber with tracing subscriber

---------

Co-authored-by: Christian Kamm <mail@ckamm.de>
Co-authored-by: GroovieGermanikus <groovie@mango.markets>
Co-authored-by: Groovie | Mango <95291500+grooviegermanikus@users.noreply.github.com>
This commit is contained in:
galactus 2024-03-27 17:09:38 +01:00 committed by GitHub
parent bd9d15ab3b
commit 79748c5a58
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 165 additions and 160 deletions

View File

@ -28,7 +28,7 @@ jobs:
- uses: actions-rust-lang/setup-rust-toolchain@v1
with:
# use toolchain version from rust-toolchain.toml
toolchain: nightly-2024-01-05
toolchain: nightly-2023-10-05
components: rustfmt, clippy
cache: true
# avoid the default "-D warnings" which thrashes cache
@ -48,5 +48,5 @@ jobs:
- name: Run fmt+clippy
run: |
cargo +nightly-2024-01-05 fmt --all --check
cargo +nightly-2024-01-05 clippy --locked --workspace --all-targets -- -D warnings
cargo +nightly-2023-10-05 fmt --all --check
cargo +nightly-2023-10-05 clippy --locked --workspace --all-targets -- -D warnings

66
Cargo.lock generated
View File

@ -392,8 +392,8 @@ dependencies = [
"memchr",
"pin-project-lite",
"tokio",
"zstd 0.13.0",
"zstd-safe 7.0.0",
"zstd 0.13.1",
"zstd-safe 7.1.0",
]
[[package]]
@ -460,9 +460,9 @@ dependencies = [
[[package]]
name = "autocfg"
version = "1.1.0"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
checksum = "f1fdabc7756949593fe60f30ec81974b613357de856987752631dea1e3394c80"
[[package]]
name = "autotools"
@ -572,7 +572,7 @@ version = "0.2.4"
dependencies = [
"anyhow",
"bincode",
"clap 4.5.3",
"clap 4.5.4",
"csv",
"dashmap 5.5.3",
"dirs",
@ -868,9 +868,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.35"
version = "0.4.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8eaf5903dcbc0a39312feb77df2ff4c76387d591b9fc7b04a238dcf8bb62639a"
checksum = "8a0d04d43504c61aa6c7531f1871dd0d418d91130162063b789da00fd7057a5e"
dependencies = [
"android-tzdata",
"iana-time-zone",
@ -923,9 +923,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.5.3"
version = "4.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "949626d00e063efc93b6dca932419ceb5432f99769911c0b995f7e884c778813"
checksum = "90bc066a67923782aa8515dbaea16946c5bcc5addbd668bb80af688e53e548a0"
dependencies = [
"clap_builder",
"clap_derive",
@ -945,9 +945,9 @@ dependencies = [
[[package]]
name = "clap_derive"
version = "4.5.3"
version = "4.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90239a040c80f5e14809ca132ddc4176ab33d5e17e49691793296e3fcb34d72f"
checksum = "528131438037fd55894f62d6e9f068b8f45ac57ffa77517819645d10aed04f64"
dependencies = [
"heck 0.5.0",
"proc-macro2",
@ -2236,9 +2236,9 @@ dependencies = [
[[package]]
name = "itoa"
version = "1.0.10"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c"
checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
[[package]]
name = "jemalloc-sys"
@ -2566,7 +2566,7 @@ dependencies = [
"bytes",
"cap",
"chrono",
"clap 4.5.3",
"clap 4.5.4",
"const_env",
"dashmap 5.5.3",
"dotenv",
@ -2675,9 +2675,9 @@ dependencies = [
[[package]]
name = "memoffset"
version = "0.9.0"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c"
checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a"
dependencies = [
"autocfg",
]
@ -3686,7 +3686,7 @@ dependencies = [
"aho-corasick",
"memchr",
"regex-automata 0.4.6",
"regex-syntax 0.8.2",
"regex-syntax 0.8.3",
]
[[package]]
@ -3706,7 +3706,7 @@ checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax 0.8.2",
"regex-syntax 0.8.3",
]
[[package]]
@ -3717,9 +3717,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.8.2"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56"
[[package]]
name = "reqwest"
@ -4033,9 +4033,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.114"
version = "1.0.115"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0"
checksum = "12dc5c46daa8e9fdf4f5e71b6cf9a53f2487da0e86e55808e2d35539666497dd"
dependencies = [
"itoa",
"ryu",
@ -4537,7 +4537,7 @@ dependencies = [
"async-trait",
"bench",
"chrono",
"clap 4.5.3",
"clap 4.5.4",
"futures",
"futures-util",
"itertools 0.10.5",
@ -4703,7 +4703,7 @@ dependencies = [
"bs58",
"bytes",
"chrono",
"clap 4.5.3",
"clap 4.5.4",
"dashmap 5.5.3",
"dotenv",
"futures",
@ -4742,7 +4742,7 @@ dependencies = [
"bs58",
"bytes",
"chrono",
"clap 4.5.3",
"clap 4.5.4",
"countmap",
"crossbeam-channel",
"dashmap 5.5.3",
@ -4960,7 +4960,7 @@ dependencies = [
"libsecp256k1",
"light-poseidon",
"log",
"memoffset 0.9.0",
"memoffset 0.9.1",
"num-bigint 0.4.4",
"num-derive 0.3.3",
"num-traits 0.2.18",
@ -6852,11 +6852,11 @@ dependencies = [
[[package]]
name = "zstd"
version = "0.13.0"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bffb3309596d527cfcba7dfc6ed6052f1d39dfbd7c867aa2e865e4a449c10110"
checksum = "2d789b1514203a1120ad2429eae43a7bd32b90976a7bb8a05f7ec02fa88cc23a"
dependencies = [
"zstd-safe 7.0.0",
"zstd-safe 7.1.0",
]
[[package]]
@ -6871,18 +6871,18 @@ dependencies = [
[[package]]
name = "zstd-safe"
version = "7.0.0"
version = "7.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43747c7422e2924c11144d5229878b98180ef8b06cca4ab5af37afc8a8d8ea3e"
checksum = "1cd99b45c6bc03a018c8b8a86025678c87e55526064e38f9df301989dce7ec0a"
dependencies = [
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "2.0.9+zstd.1.5.5"
version = "2.0.10+zstd.1.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e16efa8a874a0481a574084d34cc26fdb3b99627480f785888deb6386506656"
checksum = "c253a4914af5bafc8fa8c86ee400827e83cf6ec01195ec1f1ed8441bf00d65aa"
dependencies = [
"cc",
"pkg-config",

View File

@ -270,12 +270,12 @@ fn map_compute_budget_instructions(message: &VersionedMessage) -> (Option<u32>,
(cu_requested, prioritization_fees)
}
// not called
pub fn create_block_processing_task(
grpc_addr: String,
grpc_x_token: Option<String>,
block_sx: async_channel::Sender<SubscribeUpdateBlock>,
commitment_level: CommitmentLevel,
exit_notfier: Arc<Notify>,
) -> AnyhowJoinHandle {
tokio::spawn(async move {
loop {
@ -307,33 +307,44 @@ pub fn create_block_processing_task(
)
.await?;
while let Some(message) = stream.next().await {
let message = message?;
loop {
tokio::select! {
message = stream.next() => {
let Some(Ok(message)) = message else {
break;
};
let Some(update) = message.update_oneof else {
continue;
};
let Some(update) = message.update_oneof else {
continue;
};
match update {
UpdateOneof::Block(block) => {
log::trace!(
"received block, hash: {} slot: {}",
block.blockhash,
block.slot
);
block_sx
.send(block)
.await
.context("Problem sending on block channel")?;
match update {
UpdateOneof::Block(block) => {
log::trace!(
"received block, hash: {} slot: {}",
block.blockhash,
block.slot
);
block_sx
.send(block)
.await
.context("Problem sending on block channel")?;
}
UpdateOneof::Ping(_) => {
log::trace!("GRPC Ping");
}
_ => {
log::trace!("unknown GRPC notification");
}
};
},
_ = exit_notfier.notified() => {
break;
}
UpdateOneof::Ping(_) => {
log::trace!("GRPC Ping");
}
_ => {
log::trace!("unknown GRPC notification");
}
};
}
}
drop(stream);
drop(client);
log::error!("Grpc block subscription broken (resubscribing)");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}

View File

@ -85,7 +85,7 @@ impl PrioritizationFeesHeap {
}
}
pub async fn remove_expired_transactions(&self, current_blockheight: u64) {
pub async fn remove_expired_transactions(&self, current_blockheight: u64) -> usize {
let mut write_lock = self.map.lock().await;
let mut cells_to_remove = vec![];
let mut signatures_to_remove = vec![];
@ -104,9 +104,11 @@ impl PrioritizationFeesHeap {
for p in cells_to_remove {
write_lock.map.remove(&p);
}
let signatures_len = signatures_to_remove.len();
for sig in signatures_to_remove {
write_lock.signatures.remove(&sig);
}
signatures_len
}
pub async fn size(&self) -> usize {

View File

@ -1,2 +1,2 @@
cargo +nightly-2024-01-05 fmt --all
cargo +nightly-2024-01-05 clippy --locked --workspace --all-targets -- -D warnings
cargo +nightly-2023-10-05 fmt --all
cargo +nightly-2023-10-05 clippy --locked --workspace --all-targets -- -D warnings

View File

@ -14,7 +14,7 @@ use std::{
Arc,
},
};
use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore};
use tokio::sync::{Notify, OwnedSemaphorePermit, RwLock, Semaphore};
pub type EndpointPool = RotatingQueue<Endpoint>;
@ -40,7 +40,7 @@ pub struct QuicConnection {
identity: Pubkey,
socket_address: SocketAddr,
connection_params: QuicConnectionParameters,
exit_signal: Arc<AtomicBool>,
exit_notify: Arc<Notify>,
timeout_counters: Arc<AtomicU64>,
has_connected_once: Arc<AtomicBool>,
}
@ -51,7 +51,7 @@ impl QuicConnection {
endpoint: Endpoint,
socket_address: SocketAddr,
connection_params: QuicConnectionParameters,
exit_signal: Arc<AtomicBool>,
exit_notify: Arc<Notify>,
) -> Self {
Self {
connection: Arc::new(RwLock::new(None)),
@ -60,7 +60,7 @@ impl QuicConnection {
identity,
socket_address,
connection_params,
exit_signal,
exit_notify,
timeout_counters: Arc::new(AtomicU64::new(0)),
has_connected_once: Arc::new(AtomicBool::new(false)),
}
@ -74,7 +74,7 @@ impl QuicConnection {
self.socket_address,
self.connection_params.connection_timeout,
self.connection_params.connection_retry_count,
self.exit_signal.clone(),
self.exit_notify.clone(),
)
.await
}
@ -127,32 +127,48 @@ impl QuicConnection {
pub async fn send_transaction(&self, tx: Vec<u8>) {
let connection_retry_count = self.connection_params.connection_retry_count;
for _ in 0..connection_retry_count {
if self.exit_signal.load(Ordering::Relaxed) {
// return
return;
}
let mut do_retry = false;
let connection = self.get_connection().await;
let exit_notify = self.exit_notify.clone();
let connection = tokio::select! {
conn = self.get_connection() => {
conn
},
_ = exit_notify.notified() => {
break;
}
};
if let Some(connection) = connection {
TRIED_SEND_TRANSCTION_TRIED.inc();
let current_stable_id = connection.stable_id() as u64;
match QuicConnectionUtils::open_unistream(
connection,
self.connection_params.unistream_timeout,
)
.await
{
let open_uni_result = tokio::select! {
res = QuicConnectionUtils::open_unistream(
connection,
self.connection_params.unistream_timeout,
) => {
res
},
_ = exit_notify.notified() => {
break;
}
};
match open_uni_result {
Ok(send_stream) => {
match QuicConnectionUtils::write_all(
send_stream,
&tx,
self.identity,
self.connection_params,
)
.await
{
let write_add_result = tokio::select! {
res = QuicConnectionUtils::write_all(
send_stream,
&tx,
self.identity,
self.connection_params,
) => {
res
},
_ = exit_notify.notified() => {
break;
}
};
match write_add_result {
Ok(()) => {
SEND_TRANSCTION_SUCESSFUL.inc();
}
@ -231,7 +247,7 @@ impl QuicConnectionPool {
endpoints: EndpointPool,
socket_address: SocketAddr,
connection_parameters: QuicConnectionParameters,
exit_signal: Arc<AtomicBool>,
exit_notify: Arc<Notify>,
nb_connection: usize,
max_number_of_unistream_connection: usize,
) -> Self {
@ -243,7 +259,7 @@ impl QuicConnectionPool {
endpoints.get().expect("Should get and endpoint"),
socket_address,
connection_parameters,
exit_signal.clone(),
exit_notify.clone(),
));
}
Self {

View File

@ -11,13 +11,10 @@ use solana_lite_rpc_core::network_utils::apply_gso_workaround;
use solana_sdk::pubkey::Pubkey;
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
sync::Arc,
time::Duration,
};
use tokio::time::timeout;
use tokio::{sync::Notify, time::timeout};
lazy_static::lazy_static! {
static ref NB_QUIC_0RTT_ATTEMPTED: GenericGauge<prometheus::core::AtomicI64> =
@ -222,15 +219,29 @@ impl QuicConnectionUtils {
addr: SocketAddr,
connection_timeout: Duration,
connection_retry_count: usize,
exit_signal: Arc<AtomicBool>,
exit_notified: Arc<Notify>,
) -> Option<Connection> {
for _ in 0..connection_retry_count {
let conn = if already_connected {
NB_QUIC_0RTT_ATTEMPTED.inc();
Self::make_connection_0rtt(endpoint.clone(), addr, connection_timeout).await
tokio::select! {
res = Self::make_connection_0rtt(endpoint.clone(), addr, connection_timeout) => {
res
},
_ = exit_notified.notified() => {
break;
}
}
} else {
NB_QUIC_CONN_ATTEMPTED.inc();
Self::make_connection(endpoint.clone(), addr, connection_timeout).await
tokio::select! {
res = Self::make_connection(endpoint.clone(), addr, connection_timeout) => {
res
},
_ = exit_notified.notified() => {
break;
}
}
};
match conn {
Ok(conn) => {
@ -239,9 +250,6 @@ impl QuicConnectionUtils {
}
Err(e) => {
trace!("Could not connect to {} because of error {}", identity, e);
if exit_signal.load(Ordering::Relaxed) {
break;
}
}
}
}

View File

@ -13,15 +13,7 @@ use solana_lite_rpc_core::{
};
use solana_sdk::pubkey::Pubkey;
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
use std::{
collections::HashMap,
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
use tokio::sync::{
broadcast::{Receiver, Sender},
Notify,
@ -54,9 +46,9 @@ struct ActiveConnection {
endpoints: RotatingQueue<Endpoint>,
identity: Pubkey,
tpu_address: SocketAddr,
exit_signal: Arc<AtomicBool>,
data_cache: DataCache,
connection_parameters: QuicConnectionParameters,
exit_notifier: Arc<Notify>,
}
impl ActiveConnection {
@ -71,9 +63,9 @@ impl ActiveConnection {
endpoints,
tpu_address,
identity,
exit_signal: Arc::new(AtomicBool::new(false)),
data_cache,
connection_parameters,
exit_notifier: Arc::new(Notify::new()),
}
}
@ -81,13 +73,13 @@ impl ActiveConnection {
async fn listen(
&self,
mut transaction_reciever: Receiver<SentTransactionInfo>,
exit_notifier: Arc<Notify>,
addr: SocketAddr,
identity_stakes: IdentityStakesData,
) {
let fill_notify = Arc::new(Notify::new());
let identity = self.identity;
let exit_notifier = self.exit_notifier.clone();
NB_QUIC_ACTIVE_CONNECTIONS.inc();
@ -98,13 +90,12 @@ impl ActiveConnection {
identity_stakes.stakes,
identity_stakes.total_stakes,
);
let exit_signal = self.exit_signal.clone();
let connection_pool = QuicConnectionPool::new(
identity,
self.endpoints.clone(),
addr,
self.connection_parameters,
exit_signal.clone(),
exit_notifier.clone(),
max_number_of_connections,
max_uni_stream_connections,
);
@ -115,12 +106,19 @@ impl ActiveConnection {
let priorization_heap = priorization_heap.clone();
let data_cache = self.data_cache.clone();
let fill_notify = fill_notify.clone();
let exit_signal = exit_signal.clone();
let exit_notifier = exit_notifier.clone();
tokio::spawn(async move {
let mut current_blockheight =
data_cache.block_information_store.get_last_blockheight();
while !exit_signal.load(Ordering::Relaxed) {
let tx = transaction_reciever.recv().await;
loop {
let tx = tokio::select! {
tx = transaction_reciever.recv() => {
tx
},
_ = exit_notifier.notified() => {
break;
}
};
match tx {
Ok(transaction_sent_info) => {
if data_cache
@ -142,9 +140,10 @@ impl ActiveConnection {
// give more priority to transaction sender
tokio::time::sleep(Duration::from_micros(50)).await;
// remove all expired transactions from the queue
priorization_heap
let elements_removed = priorization_heap
.remove_expired_transactions(current_blockheight)
.await;
TRANSACTIONS_IN_HEAP.sub(elements_removed as i64);
}
}
Err(e) => {
@ -167,26 +166,16 @@ impl ActiveConnection {
let _permit = permit;
connection.get_connection().await;
});
}
};
'main_loop: loop {
// exit signal set
if exit_signal.load(Ordering::Relaxed) {
break;
}
tokio::select! {
_ = fill_notify.notified() => {
loop {
// exit signal set
if exit_signal.load(Ordering::Relaxed) {
break 'main_loop;
}
'process_heap: loop {
let Some(tx) = priorization_heap.pop().await else {
// wait to get notification from fill event
break;
break 'process_heap;
};
TRANSACTIONS_IN_HEAP.dec();
@ -225,7 +214,7 @@ impl ActiveConnection {
}
}
heap_filler_task.abort();
let _ = heap_filler_task.await;
let elements_removed = priorization_heap.clear().await;
TRANSACTIONS_IN_HEAP.sub(elements_removed as i64);
NB_QUIC_ACTIVE_CONNECTIONS.dec();
@ -234,26 +223,20 @@ impl ActiveConnection {
pub fn start_listening(
&self,
transaction_reciever: Receiver<SentTransactionInfo>,
exit_notifier: Arc<Notify>,
identity_stakes: IdentityStakesData,
) {
let addr = self.tpu_address;
let this = self.clone();
tokio::spawn(async move {
this.listen(transaction_reciever, exit_notifier, addr, identity_stakes)
this.listen(transaction_reciever, addr, identity_stakes)
.await;
});
}
}
struct ActiveConnectionWithExitNotifier {
pub active_connection: ActiveConnection,
pub exit_notifier: Arc<Notify>,
}
pub struct TpuConnectionManager {
endpoints: RotatingQueue<Endpoint>,
identity_to_active_connection: Arc<DashMap<Pubkey, Arc<ActiveConnectionWithExitNotifier>>>,
identity_to_active_connection: Arc<DashMap<Pubkey, ActiveConnection>>,
}
impl TpuConnectionManager {
@ -291,21 +274,10 @@ impl TpuConnectionManager {
connection_parameters,
);
// using mpsc as a oneshot channel/ because with one shot channel we cannot reuse the reciever
let exit_notifier = Arc::new(Notify::new());
let broadcast_receiver = broadcast_sender.subscribe();
active_connection.start_listening(
broadcast_receiver,
exit_notifier.clone(),
identity_stakes,
);
self.identity_to_active_connection.insert(
*identity,
Arc::new(ActiveConnectionWithExitNotifier {
active_connection,
exit_notifier,
}),
);
active_connection.start_listening(broadcast_receiver, identity_stakes);
self.identity_to_active_connection
.insert(*identity, active_connection);
}
}
@ -314,11 +286,7 @@ impl TpuConnectionManager {
if !connections_to_keep.contains_key(key) {
trace!("removing a connection for {}", key.to_string());
// ignore error for exit channel
value
.active_connection
.exit_signal
.store(true, Ordering::Relaxed);
value.exit_notifier.notify_one();
value.exit_notifier.notify_waiters();
false
} else {
true