Use counting semaphore to improve block polling

This commit is contained in:
Godmode Galactus 2023-09-08 11:11:11 +02:00
parent d16bd5bd0d
commit f5b6c09bbd
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
2 changed files with 45 additions and 111 deletions

View File

@ -1,16 +1,16 @@
use anyhow::{bail, Context};
use anyhow::Context;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_lite_rpc_core::{
structures::{
processed_block::{ProcessedBlock, TransactionInfo},
slot_notification::{AtomicSlot, SlotNotification},
slot_notification::SlotNotification,
},
AnyhowJoinHandle,
};
use solana_rpc_client_api::config::RpcBlockConfig;
use solana_sdk::{
borsh0_10::try_from_slice_unchecked,
commitment_config::{CommitmentConfig, CommitmentLevel},
commitment_config::CommitmentConfig,
compute_budget::{self, ComputeBudgetInstruction},
slot_history::Slot,
};
@ -166,120 +166,54 @@ pub fn poll_block(
block_notification_sender: Sender<ProcessedBlock>,
slot_notification: Receiver<SlotNotification>,
) -> Vec<AnyhowJoinHandle> {
let mut tasks: Vec<AnyhowJoinHandle> = vec![];
let recent_slot = AtomicSlot::default();
let (slot_retry_queue_sx, mut slot_retry_queue_rx) = tokio::sync::mpsc::unbounded_channel();
let (block_schedule_queue_sx, block_schedule_queue_rx) =
async_channel::unbounded::<(Slot, CommitmentConfig)>();
for _i in 0..16 {
let block_notification_sender = block_notification_sender.clone();
let rpc_client = rpc_client.clone();
let block_schedule_queue_rx = block_schedule_queue_rx.clone();
let slot_retry_queue_sx = slot_retry_queue_sx.clone();
let task: AnyhowJoinHandle = tokio::spawn(async move {
loop {
let (slot, commitment_config) = block_schedule_queue_rx
.recv()
.await
.context("Recv error on block channel")?;
let processed_block =
process_block(rpc_client.as_ref(), slot, commitment_config).await;
match processed_block {
Some(processed_block) => {
block_notification_sender
.send(processed_block)
.context("Processed block should be sent")?;
// schedule to get finalized commitment
if commitment_config.commitment != CommitmentLevel::Finalized {
let retry_at = tokio::time::Instant::now()
.checked_add(Duration::from_secs(2))
.unwrap();
slot_retry_queue_sx
.send(((slot, CommitmentConfig::finalized()), retry_at))
.context("Failed to reschedule fetch of finalized block")?;
}
}
None => {
let retry_at = tokio::time::Instant::now()
.checked_add(Duration::from_millis(10))
.unwrap();
slot_retry_queue_sx
.send(((slot, commitment_config), retry_at))
.context("should be able to rescheduled for replay")?;
}
}
}
});
tasks.push(task);
}
//let replay task
{
let recent_slot = recent_slot.clone();
let block_schedule_queue_sx = block_schedule_queue_sx.clone();
let replay_task: AnyhowJoinHandle = tokio::spawn(async move {
while let Some(((slot, commitment_config), instant)) = slot_retry_queue_rx.recv().await
{
let recent_slot = recent_slot.load(std::sync::atomic::Ordering::Relaxed);
// if slot is too old ignore
if recent_slot.saturating_sub(slot) > 128 {
// slot too old to retry
// most probably its an empty slot
continue;
}
let now = tokio::time::Instant::now();
if now < instant {
tokio::time::sleep_until(instant).await;
}
if block_schedule_queue_sx
.send((slot, commitment_config))
.await
.is_err()
{
bail!("could not schedule replay for a slot")
}
}
unreachable!();
});
tasks.push(replay_task)
}
//slot poller
let slot_poller = tokio::spawn(async move {
log::info!("block listner started");
let current_slot = rpc_client
.get_slot()
.await
.context("Should get current slot")?;
recent_slot.store(current_slot, std::sync::atomic::Ordering::Relaxed);
let task_spawner: AnyhowJoinHandle = tokio::spawn(async move {
let counting_semaphore = Arc::new(tokio::sync::Semaphore::new(1024));
let mut slot_notification = slot_notification;
let mut last_processed_slot = 0;
loop {
let SlotNotification {
estimated_processed_slot,
..
} = slot_notification
let SlotNotification { processed_slot, .. } = slot_notification
.recv()
.await
.context("Should get slot notification")?;
let last_slot = recent_slot.load(std::sync::atomic::Ordering::Relaxed);
if last_slot < estimated_processed_slot {
recent_slot.store(
estimated_processed_slot,
std::sync::atomic::Ordering::Relaxed,
);
for slot in last_slot + 1..estimated_processed_slot + 1 {
block_schedule_queue_sx
.send((slot, CommitmentConfig::confirmed()))
.context("Slot notification channel close")?;
if processed_slot > last_processed_slot {
last_processed_slot = processed_slot;
let premit = counting_semaphore.clone().acquire_owned().await?;
let rpc_client = rpc_client.clone();
let block_notification_sender = block_notification_sender.clone();
tokio::spawn(async move {
// try 500 times because slot gets
for _ in 0..1024 {
if let Some(processed_block) = process_block(
rpc_client.as_ref(),
processed_slot,
CommitmentConfig::confirmed(),
)
.await
.context("Should be able to schedule message")?;
}
{
let _ = block_notification_sender.send(processed_block);
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
for _ in 0..1024 {
if let Some(processed_block) = process_block(
rpc_client.as_ref(),
processed_slot,
CommitmentConfig::finalized(),
)
.await
{
let _ = block_notification_sender.send(processed_block);
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
drop(premit)
});
}
}
});
tasks.push(slot_poller);
tasks
vec![task_spawner]
}

View File

@ -23,7 +23,7 @@ pub const DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE: usize = 200_000;
/// 25 slots in 10s send to little more leaders
#[from_env]
pub const DEFAULT_FANOUT_SIZE: u64 = 16;
pub const DEFAULT_FANOUT_SIZE: u64 = 32;
#[from_env]
pub const MAX_RETRIES: usize = 40;