Merge pull request #237 from blockworks-foundation/revert_block_polling_logic
reverting logic of block polling
This commit is contained in:
commit
91dbe2b546
|
@ -1,23 +1,21 @@
|
|||
use anyhow::Context;
|
||||
use prometheus::{core::GenericGauge, opts, register_int_gauge};
|
||||
use anyhow::{bail, Context};
|
||||
use solana_client::nonblocking::rpc_client::RpcClient;
|
||||
use solana_lite_rpc_core::{
|
||||
structures::{produced_block::ProducedBlock, slot_notification::SlotNotification},
|
||||
structures::{
|
||||
produced_block::ProducedBlock,
|
||||
slot_notification::{AtomicSlot, SlotNotification},
|
||||
},
|
||||
AnyhowJoinHandle,
|
||||
};
|
||||
use solana_rpc_client_api::config::RpcBlockConfig;
|
||||
use solana_sdk::{commitment_config::CommitmentConfig, slot_history::Slot};
|
||||
use solana_transaction_status::{TransactionDetails, UiTransactionEncoding};
|
||||
use std::{
|
||||
sync::{atomic::AtomicU64, Arc},
|
||||
time::Duration,
|
||||
use solana_sdk::{
|
||||
commitment_config::{CommitmentConfig, CommitmentLevel},
|
||||
slot_history::Slot,
|
||||
};
|
||||
use solana_transaction_status::{TransactionDetails, UiTransactionEncoding};
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tokio::sync::broadcast::{Receiver, Sender};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
static ref NB_BLOCK_FETCHING_TASKS: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_num_blockfetching_tasks", "Transactions in store")).unwrap();
|
||||
}
|
||||
|
||||
pub async fn process_block(
|
||||
rpc_client: &RpcClient,
|
||||
slot: Slot,
|
||||
|
@ -35,40 +33,9 @@ pub async fn process_block(
|
|||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
match block {
|
||||
Ok(block) => Some(ProducedBlock::from_ui_block(block, slot, commitment_config)),
|
||||
Err(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn check_finalized(rpc_client: &RpcClient, slot: Slot, blockhash: &String) -> bool {
|
||||
let block = rpc_client
|
||||
.get_block_with_config(
|
||||
slot,
|
||||
RpcBlockConfig {
|
||||
transaction_details: None,
|
||||
commitment: Some(CommitmentConfig::finalized()),
|
||||
max_supported_transaction_version: Some(0),
|
||||
encoding: Some(UiTransactionEncoding::Base64),
|
||||
rewards: None,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
match block {
|
||||
Ok(block) => {
|
||||
if block.blockhash != *blockhash {
|
||||
log::error!(
|
||||
"blockhash mismatch confirmed : {} finalized : {} for slot: {}",
|
||||
blockhash,
|
||||
block.blockhash,
|
||||
slot
|
||||
);
|
||||
}
|
||||
true
|
||||
}
|
||||
Err(_) => false,
|
||||
}
|
||||
block
|
||||
.ok()
|
||||
.map(|block| ProducedBlock::from_ui_block(block, slot, commitment_config))
|
||||
}
|
||||
|
||||
pub fn poll_block(
|
||||
|
@ -76,82 +43,120 @@ pub fn poll_block(
|
|||
block_notification_sender: Sender<ProducedBlock>,
|
||||
slot_notification: Receiver<SlotNotification>,
|
||||
) -> Vec<AnyhowJoinHandle> {
|
||||
let task_spawner: AnyhowJoinHandle = tokio::spawn(async move {
|
||||
let counting_semaphore = Arc::new(tokio::sync::Semaphore::new(1024));
|
||||
let mut tasks: Vec<AnyhowJoinHandle> = vec![];
|
||||
|
||||
let recent_slot = AtomicSlot::default();
|
||||
let (slot_retry_queue_sx, mut slot_retry_queue_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (block_schedule_queue_sx, block_schedule_queue_rx) =
|
||||
async_channel::unbounded::<(Slot, CommitmentConfig)>();
|
||||
|
||||
for _i in 0..16 {
|
||||
let block_notification_sender = block_notification_sender.clone();
|
||||
let rpc_client = rpc_client.clone();
|
||||
let block_schedule_queue_rx = block_schedule_queue_rx.clone();
|
||||
let slot_retry_queue_sx = slot_retry_queue_sx.clone();
|
||||
let task: AnyhowJoinHandle = tokio::spawn(async move {
|
||||
loop {
|
||||
let (slot, commitment_config) = block_schedule_queue_rx
|
||||
.recv()
|
||||
.await
|
||||
.context("Recv error on block channel")?;
|
||||
let processed_block =
|
||||
process_block(rpc_client.as_ref(), slot, commitment_config).await;
|
||||
match processed_block {
|
||||
Some(processed_block) => {
|
||||
block_notification_sender
|
||||
.send(processed_block)
|
||||
.context("Processed block should be sent")?;
|
||||
// schedule to get finalized commitment
|
||||
if commitment_config.commitment != CommitmentLevel::Finalized {
|
||||
let retry_at = tokio::time::Instant::now()
|
||||
.checked_add(Duration::from_secs(2))
|
||||
.unwrap();
|
||||
slot_retry_queue_sx
|
||||
.send(((slot, CommitmentConfig::finalized()), retry_at))
|
||||
.context("Failed to reschedule fetch of finalized block")?;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
let retry_at = tokio::time::Instant::now()
|
||||
.checked_add(Duration::from_millis(10))
|
||||
.unwrap();
|
||||
slot_retry_queue_sx
|
||||
.send(((slot, commitment_config), retry_at))
|
||||
.context("should be able to rescheduled for replay")?;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
tasks.push(task);
|
||||
}
|
||||
|
||||
//let replay task
|
||||
{
|
||||
let recent_slot = recent_slot.clone();
|
||||
let block_schedule_queue_sx = block_schedule_queue_sx.clone();
|
||||
let replay_task: AnyhowJoinHandle = tokio::spawn(async move {
|
||||
while let Some(((slot, commitment_config), instant)) = slot_retry_queue_rx.recv().await
|
||||
{
|
||||
let recent_slot = recent_slot.load(std::sync::atomic::Ordering::Relaxed);
|
||||
// if slot is too old ignore
|
||||
if recent_slot.saturating_sub(slot) > 128 {
|
||||
// slot too old to retry
|
||||
// most probably its an empty slot
|
||||
continue;
|
||||
}
|
||||
|
||||
let now = tokio::time::Instant::now();
|
||||
if now < instant {
|
||||
tokio::time::sleep_until(instant).await;
|
||||
}
|
||||
if block_schedule_queue_sx
|
||||
.send((slot, commitment_config))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
bail!("could not schedule replay for a slot")
|
||||
}
|
||||
}
|
||||
unreachable!();
|
||||
});
|
||||
tasks.push(replay_task)
|
||||
}
|
||||
|
||||
//slot poller
|
||||
let slot_poller = tokio::spawn(async move {
|
||||
log::info!("block listner started");
|
||||
let current_slot = rpc_client
|
||||
.get_slot()
|
||||
.await
|
||||
.context("Should get current slot")?;
|
||||
recent_slot.store(current_slot, std::sync::atomic::Ordering::Relaxed);
|
||||
let mut slot_notification = slot_notification;
|
||||
let current_slot = Arc::new(AtomicU64::new(0));
|
||||
loop {
|
||||
let SlotNotification { processed_slot, .. } = slot_notification
|
||||
let SlotNotification {
|
||||
estimated_processed_slot,
|
||||
..
|
||||
} = slot_notification
|
||||
.recv()
|
||||
.await
|
||||
.context("Slot notification channel close")?;
|
||||
let last_processed_slot = current_slot.load(std::sync::atomic::Ordering::Relaxed);
|
||||
let last_processed_slot = if last_processed_slot == 0 {
|
||||
processed_slot.saturating_sub(1)
|
||||
} else {
|
||||
last_processed_slot
|
||||
};
|
||||
if processed_slot > last_processed_slot {
|
||||
current_slot.store(processed_slot, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
for slot in last_processed_slot + 1..processed_slot + 1 {
|
||||
let premit = counting_semaphore.clone().acquire_owned().await?;
|
||||
NB_BLOCK_FETCHING_TASKS.inc();
|
||||
let rpc_client = rpc_client.clone();
|
||||
let block_notification_sender = block_notification_sender.clone();
|
||||
let current_slot = current_slot.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut confirmed_block = None;
|
||||
while current_slot
|
||||
.load(std::sync::atomic::Ordering::Relaxed)
|
||||
.saturating_sub(slot)
|
||||
< 64
|
||||
{
|
||||
if let Some(produced_block) = process_block(
|
||||
rpc_client.as_ref(),
|
||||
slot,
|
||||
CommitmentConfig::confirmed(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
let _ = block_notification_sender.send(produced_block.clone());
|
||||
confirmed_block = Some(produced_block);
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
}
|
||||
while current_slot
|
||||
.load(std::sync::atomic::Ordering::Relaxed)
|
||||
.saturating_sub(slot)
|
||||
< 256
|
||||
{
|
||||
if let Some(confirmed_block) = &confirmed_block {
|
||||
if check_finalized(
|
||||
rpc_client.as_ref(),
|
||||
slot,
|
||||
&confirmed_block.blockhash,
|
||||
)
|
||||
.await
|
||||
{
|
||||
let mut finalized_block = confirmed_block.clone();
|
||||
finalized_block.commitment_config =
|
||||
CommitmentConfig::finalized();
|
||||
let _ = block_notification_sender.send(finalized_block);
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
NB_BLOCK_FETCHING_TASKS.dec();
|
||||
drop(premit)
|
||||
});
|
||||
.context("Should get slot notification")?;
|
||||
let last_slot = recent_slot.load(std::sync::atomic::Ordering::Relaxed);
|
||||
if last_slot < estimated_processed_slot {
|
||||
recent_slot.store(
|
||||
estimated_processed_slot,
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
);
|
||||
for slot in last_slot + 1..estimated_processed_slot + 1 {
|
||||
block_schedule_queue_sx
|
||||
.send((slot, CommitmentConfig::confirmed()))
|
||||
.await
|
||||
.context("Should be able to schedule message")?;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
tasks.push(slot_poller);
|
||||
|
||||
vec![task_spawner]
|
||||
tasks
|
||||
}
|
||||
|
|
|
@ -22,12 +22,12 @@ pub const DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE: usize = 200_000;
|
|||
|
||||
/// 25 slots in 10s send to little more leaders
|
||||
#[from_env]
|
||||
pub const DEFAULT_FANOUT_SIZE: u64 = 10;
|
||||
pub const DEFAULT_FANOUT_SIZE: u64 = 18;
|
||||
|
||||
#[from_env]
|
||||
pub const MAX_RETRIES: usize = 40;
|
||||
|
||||
pub const DEFAULT_RETRY_TIMEOUT: u64 = 1;
|
||||
pub const DEFAULT_RETRY_TIMEOUT: u64 = 3;
|
||||
|
||||
#[from_env]
|
||||
pub const DEFAULT_CLEAN_INTERVAL_MS: u64 = 5 * 60 * 1000; // five minute
|
||||
|
|
Loading…
Reference in New Issue