Compare commits
3 Commits
06373c0844
...
b84e880961
Author | SHA1 | Date |
---|---|---|
Groovie | Mango | b84e880961 | |
godmodegalactus | 90dbcaa9d8 | |
Groovie | Mango | 69d7dbb123 |
|
@ -49,7 +49,7 @@ fn create_grpc_multiplex_processed_block_task(
|
||||||
let jh_merging_streams = tokio::task::spawn(async move {
|
let jh_merging_streams = tokio::task::spawn(async move {
|
||||||
let mut slots_processed = BTreeSet::<u64>::new();
|
let mut slots_processed = BTreeSet::<u64>::new();
|
||||||
let mut last_tick = Instant::now();
|
let mut last_tick = Instant::now();
|
||||||
loop {
|
'recv_loop: loop {
|
||||||
// recv loop
|
// recv loop
|
||||||
if last_tick.elapsed() > Duration::from_millis(800) {
|
if last_tick.elapsed() > Duration::from_millis(800) {
|
||||||
warn!(
|
warn!(
|
||||||
|
@ -70,16 +70,23 @@ fn create_grpc_multiplex_processed_block_task(
|
||||||
};
|
};
|
||||||
match blocks_rx_result {
|
match blocks_rx_result {
|
||||||
Some(Message::GeyserSubscribeUpdate(subscribe_update)) => {
|
Some(Message::GeyserSubscribeUpdate(subscribe_update)) => {
|
||||||
let mapfilter =
|
// note: avoid mapping of full block as long as possible
|
||||||
map_block_from_yellowstone_update(*subscribe_update, COMMITMENT_CONFIG);
|
let extracted_slot = extract_slot_from_yellowstone_update(&subscribe_update);
|
||||||
if let Some((slot, produced_block)) = mapfilter {
|
if let Some(slot) = extracted_slot {
|
||||||
assert_eq!(COMMITMENT_CONFIG, produced_block.commitment_config);
|
|
||||||
// check if the slot is in the map, if not check if the container is half full and the slot in question is older than the lowest value
|
// check if the slot is in the map, if not check if the container is half full and the slot in question is older than the lowest value
|
||||||
// it means that the slot is too old to process
|
// it means that the slot is too old to process
|
||||||
if !slots_processed.contains(&slot)
|
if slots_processed.contains(&slot) {
|
||||||
&& (slots_processed.len() < MAX_SIZE / 2
|
continue 'recv_loop;
|
||||||
|| slot > slots_processed.first().cloned().unwrap_or_default())
|
}
|
||||||
|
if slots_processed.len() >= MAX_SIZE / 2
|
||||||
|
&& slot <= slots_processed.first().cloned().unwrap_or_default()
|
||||||
{
|
{
|
||||||
|
continue 'recv_loop;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mapfilter =
|
||||||
|
map_block_from_yellowstone_update(*subscribe_update, COMMITMENT_CONFIG);
|
||||||
|
if let Some((_slot, produced_block)) = mapfilter {
|
||||||
let send_started_at = Instant::now();
|
let send_started_at = Instant::now();
|
||||||
let send_result = block_sender
|
let send_result = block_sender
|
||||||
.send(produced_block)
|
.send(produced_block)
|
||||||
|
@ -552,6 +559,16 @@ pub fn create_grpc_multiplex_processed_slots_subscription(
|
||||||
(multiplexed_messages_rx, jh_multiplex_task)
|
(multiplexed_messages_rx, jh_multiplex_task)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn extract_slot_from_yellowstone_update(update: &SubscribeUpdate) -> Option<Slot> {
|
||||||
|
match &update.update_oneof {
|
||||||
|
// list is not exhaustive
|
||||||
|
Some(UpdateOneof::Slot(update_message)) => Some(update_message.slot),
|
||||||
|
Some(UpdateOneof::BlockMeta(update_message)) => Some(update_message.slot),
|
||||||
|
Some(UpdateOneof::Block(update_message)) => Some(update_message.slot),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn map_slot_from_yellowstone_update(update: SubscribeUpdate) -> Option<Slot> {
|
fn map_slot_from_yellowstone_update(update: SubscribeUpdate) -> Option<Slot> {
|
||||||
match update.update_oneof {
|
match update.update_oneof {
|
||||||
Some(UpdateOneof::Slot(update_slot_message)) => Some(update_slot_message.slot),
|
Some(UpdateOneof::Slot(update_slot_message)) => Some(update_slot_message.slot),
|
||||||
|
|
|
@ -273,7 +273,7 @@ fn map_compute_budget_instructions(message: &VersionedMessage) -> (Option<u32>,
|
||||||
pub fn create_block_processing_task(
|
pub fn create_block_processing_task(
|
||||||
grpc_addr: String,
|
grpc_addr: String,
|
||||||
grpc_x_token: Option<String>,
|
grpc_x_token: Option<String>,
|
||||||
block_sx: async_channel::Sender<SubscribeUpdateBlock>,
|
block_sx: tokio::sync::mpsc::Sender<SubscribeUpdateBlock>,
|
||||||
commitment_level: CommitmentLevel,
|
commitment_level: CommitmentLevel,
|
||||||
mut exit_notify: broadcast::Receiver<()>,
|
mut exit_notify: broadcast::Receiver<()>,
|
||||||
) -> AnyhowJoinHandle {
|
) -> AnyhowJoinHandle {
|
||||||
|
@ -362,7 +362,7 @@ pub fn create_block_processing_task(
|
||||||
pub fn create_slot_stream_task(
|
pub fn create_slot_stream_task(
|
||||||
grpc_addr: String,
|
grpc_addr: String,
|
||||||
grpc_x_token: Option<String>,
|
grpc_x_token: Option<String>,
|
||||||
slot_sx: async_channel::Sender<SubscribeUpdateSlot>,
|
slot_sx: tokio::sync::mpsc::Sender<SubscribeUpdateSlot>,
|
||||||
commitment_level: CommitmentLevel,
|
commitment_level: CommitmentLevel,
|
||||||
) -> AnyhowJoinHandle {
|
) -> AnyhowJoinHandle {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
|
|
@ -45,6 +45,31 @@ lazy_static::lazy_static! {
|
||||||
static ref NB_QUIC_FINISH_ERRORED: GenericGauge<prometheus::core::AtomicI64> =
|
static ref NB_QUIC_FINISH_ERRORED: GenericGauge<prometheus::core::AtomicI64> =
|
||||||
register_int_gauge!(opts!("literpc_quic_finish_errored", "Number of times finish errored")).unwrap();
|
register_int_gauge!(opts!("literpc_quic_finish_errored", "Number of times finish errored")).unwrap();
|
||||||
|
|
||||||
|
|
||||||
|
static ref NB_QUIC_CONNECTION_ERROR_VERSION_MISMATCH: GenericGauge<prometheus::core::AtomicI64> =
|
||||||
|
register_int_gauge!(opts!("literpc_quic_connection_error_version_mismatch", "Number of times connection errored VersionMismatch")).unwrap();
|
||||||
|
static ref NB_QUIC_CONNECTION_ERROR_TRANSPORT_ERROR: GenericGauge<prometheus::core::AtomicI64> =
|
||||||
|
register_int_gauge!(opts!("literpc_quic_connection_error_transport_error", "Number of times connection errored TransportError")).unwrap();
|
||||||
|
static ref NB_QUIC_CONNECTION_ERROR_CONNECTION_CLOSED: GenericGauge<prometheus::core::AtomicI64> =
|
||||||
|
register_int_gauge!(opts!("literpc_quic_connection_error_connection_closed", "Number of times connection errored ConnectionClosed")).unwrap();
|
||||||
|
static ref NB_QUIC_CONNECTION_ERROR_APPLICATION_CLOSED: GenericGauge<prometheus::core::AtomicI64> =
|
||||||
|
register_int_gauge!(opts!("literpc_quic_connection_error_application_closed", "Number of times connection errored ApplicationClosed")).unwrap();
|
||||||
|
static ref NB_QUIC_CONNECTION_ERROR_RESET: GenericGauge<prometheus::core::AtomicI64> =
|
||||||
|
register_int_gauge!(opts!("literpc_quic_connection_error_reset", "Number of times connection errored Reset")).unwrap();
|
||||||
|
static ref NB_QUIC_CONNECTION_ERROR_TIMEDOUT: GenericGauge<prometheus::core::AtomicI64> =
|
||||||
|
register_int_gauge!(opts!("literpc_quic_connection_error_timed_out", "Number of times connection errored TimedOut")).unwrap();
|
||||||
|
static ref NB_QUIC_CONNECTION_ERROR_LOCALLY_CLOSED: GenericGauge<prometheus::core::AtomicI64> =
|
||||||
|
register_int_gauge!(opts!("literpc_quic_connection_error_locally_closed", "Number of times connection errored locally closed")).unwrap();
|
||||||
|
|
||||||
|
static ref NB_QUIC_WRITE_ERROR_STOPPED: GenericGauge<prometheus::core::AtomicI64> =
|
||||||
|
register_int_gauge!(opts!("literpc_quic_write_error_stopped", "Number of times write_error Stopped")).unwrap();
|
||||||
|
static ref NB_QUIC_WRITE_ERROR_CONNECTION_LOST: GenericGauge<prometheus::core::AtomicI64> =
|
||||||
|
register_int_gauge!(opts!("literpc_quic_write_error_connection_lost", "Number of times write_error ConnectionLost")).unwrap();
|
||||||
|
static ref NB_QUIC_WRITE_ERROR_UNKNOWN_STREAM: GenericGauge<prometheus::core::AtomicI64> =
|
||||||
|
register_int_gauge!(opts!("literpc_quic_write_error_unknown_stream", "Number of times write_error UnknownStream")).unwrap();
|
||||||
|
static ref NB_QUIC_WRITE_ERROR_0RTT_REJECT: GenericGauge<prometheus::core::AtomicI64> =
|
||||||
|
register_int_gauge!(opts!("literpc_quic_write_error_0RTT_reject", "Number of times write_error ZeroRttRejected")).unwrap();
|
||||||
|
|
||||||
static ref NB_QUIC_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
|
static ref NB_QUIC_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
|
||||||
register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap();
|
register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap();
|
||||||
|
|
||||||
|
@ -169,6 +194,25 @@ impl QuicConnectionUtils {
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
NB_QUIC_CONNECTION_ERRORED.inc();
|
NB_QUIC_CONNECTION_ERRORED.inc();
|
||||||
|
match &e {
|
||||||
|
ConnectionError::VersionMismatch => {
|
||||||
|
NB_QUIC_CONNECTION_ERROR_VERSION_MISMATCH.inc()
|
||||||
|
}
|
||||||
|
ConnectionError::TransportError(_) => {
|
||||||
|
NB_QUIC_CONNECTION_ERROR_TRANSPORT_ERROR.inc()
|
||||||
|
}
|
||||||
|
ConnectionError::ConnectionClosed(_) => {
|
||||||
|
NB_QUIC_CONNECTION_ERROR_CONNECTION_CLOSED.inc()
|
||||||
|
}
|
||||||
|
ConnectionError::ApplicationClosed(_) => {
|
||||||
|
NB_QUIC_CONNECTION_ERROR_APPLICATION_CLOSED.inc()
|
||||||
|
}
|
||||||
|
ConnectionError::Reset => NB_QUIC_CONNECTION_ERROR_RESET.inc(),
|
||||||
|
ConnectionError::TimedOut => NB_QUIC_CONNECTION_ERROR_TIMEDOUT.inc(),
|
||||||
|
ConnectionError::LocallyClosed => {
|
||||||
|
NB_QUIC_CONNECTION_ERROR_LOCALLY_CLOSED.inc()
|
||||||
|
}
|
||||||
|
}
|
||||||
Err(e.into())
|
Err(e.into())
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
@ -273,6 +317,17 @@ impl QuicConnectionUtils {
|
||||||
match write_timeout_res {
|
match write_timeout_res {
|
||||||
Ok(write_res) => {
|
Ok(write_res) => {
|
||||||
if let Err(e) = write_res {
|
if let Err(e) = write_res {
|
||||||
|
match &e {
|
||||||
|
quinn::WriteError::Stopped(_) => NB_QUIC_WRITE_ERROR_STOPPED.inc(),
|
||||||
|
quinn::WriteError::ConnectionLost(_) => {
|
||||||
|
NB_QUIC_WRITE_ERROR_CONNECTION_LOST.inc()
|
||||||
|
}
|
||||||
|
quinn::WriteError::UnknownStream => {
|
||||||
|
NB_QUIC_WRITE_ERROR_UNKNOWN_STREAM.inc()
|
||||||
|
}
|
||||||
|
quinn::WriteError::ZeroRttRejected => NB_QUIC_WRITE_ERROR_0RTT_REJECT.inc(),
|
||||||
|
};
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
"Error while writing transaction for {}, error {}",
|
"Error while writing transaction for {}, error {}",
|
||||||
identity,
|
identity,
|
||||||
|
@ -297,6 +352,16 @@ impl QuicConnectionUtils {
|
||||||
match finish_timeout_res {
|
match finish_timeout_res {
|
||||||
Ok(finish_res) => {
|
Ok(finish_res) => {
|
||||||
if let Err(e) = finish_res {
|
if let Err(e) = finish_res {
|
||||||
|
match &e {
|
||||||
|
quinn::WriteError::Stopped(_) => NB_QUIC_WRITE_ERROR_STOPPED.inc(),
|
||||||
|
quinn::WriteError::ConnectionLost(_) => {
|
||||||
|
NB_QUIC_WRITE_ERROR_CONNECTION_LOST.inc()
|
||||||
|
}
|
||||||
|
quinn::WriteError::UnknownStream => {
|
||||||
|
NB_QUIC_WRITE_ERROR_UNKNOWN_STREAM.inc()
|
||||||
|
}
|
||||||
|
quinn::WriteError::ZeroRttRejected => NB_QUIC_WRITE_ERROR_0RTT_REJECT.inc(),
|
||||||
|
};
|
||||||
trace!(
|
trace!(
|
||||||
"Error while finishing transaction for {}, error {}",
|
"Error while finishing transaction for {}, error {}",
|
||||||
identity,
|
identity,
|
||||||
|
|
Loading…
Reference in New Issue