Merge pull request #189 from blockworks-foundation/use_counting_semaphore_to_poll_blocks

Use counting semaphore to poll blocks
This commit is contained in:
galactus 2023-09-08 13:21:00 +02:00 committed by GitHub
commit bdb124cfab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 62 additions and 51 deletions

View File

@ -18,7 +18,10 @@ use solana_transaction_status::{
option_serializer::OptionSerializer, RewardType, TransactionDetails, UiTransactionEncoding, option_serializer::OptionSerializer, RewardType, TransactionDetails, UiTransactionEncoding,
UiTransactionStatusMeta, UiTransactionStatusMeta,
}; };
use std::{sync::Arc, time::Duration}; use std::{
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
use tokio::sync::broadcast::{Receiver, Sender}; use tokio::sync::broadcast::{Receiver, Sender};
pub async fn process_block( pub async fn process_block(
@ -169,37 +172,51 @@ pub fn poll_block(
let task_spawner: AnyhowJoinHandle = tokio::spawn(async move { let task_spawner: AnyhowJoinHandle = tokio::spawn(async move {
let counting_semaphore = Arc::new(tokio::sync::Semaphore::new(1024)); let counting_semaphore = Arc::new(tokio::sync::Semaphore::new(1024));
let mut slot_notification = slot_notification; let mut slot_notification = slot_notification;
let mut last_processed_slot = 0; let current_slot = Arc::new(AtomicU64::new(0));
loop { loop {
let SlotNotification { processed_slot, .. } = slot_notification let SlotNotification { processed_slot, .. } = slot_notification
.recv() .recv()
.await .await
.context("Slot notification channel close")?; .context("Slot notification channel close")?;
let last_processed_slot = current_slot.load(std::sync::atomic::Ordering::Relaxed);
if processed_slot > last_processed_slot { if processed_slot > last_processed_slot {
last_processed_slot = processed_slot; current_slot.store(processed_slot, std::sync::atomic::Ordering::Relaxed);
for slot in last_processed_slot + 1..processed_slot + 1 {
let premit = counting_semaphore.clone().acquire_owned().await?; let premit = counting_semaphore.clone().acquire_owned().await?;
let rpc_client = rpc_client.clone(); let rpc_client = rpc_client.clone();
let block_notification_sender = block_notification_sender.clone(); let block_notification_sender = block_notification_sender.clone();
let current_slot = current_slot.clone();
tokio::spawn(async move { tokio::spawn(async move {
// try 500 times because slot gets let mut confirmed_slot_fetch = false;
for _ in 0..1024 { while current_slot
.load(std::sync::atomic::Ordering::Relaxed)
.saturating_sub(slot)
< 32
{
if let Some(processed_block) = process_block( if let Some(processed_block) = process_block(
rpc_client.as_ref(), rpc_client.as_ref(),
processed_slot, slot,
CommitmentConfig::confirmed(), CommitmentConfig::confirmed(),
) )
.await .await
{ {
let _ = block_notification_sender.send(processed_block); let _ = block_notification_sender.send(processed_block);
confirmed_slot_fetch = true;
break; break;
} }
tokio::time::sleep(Duration::from_millis(50)).await; tokio::time::sleep(Duration::from_millis(50)).await;
} }
for _ in 0..1024 { while confirmed_slot_fetch
&& current_slot
.load(std::sync::atomic::Ordering::Relaxed)
.saturating_sub(slot)
< 128
{
if let Some(processed_block) = process_block( if let Some(processed_block) = process_block(
rpc_client.as_ref(), rpc_client.as_ref(),
processed_slot, slot,
CommitmentConfig::finalized(), CommitmentConfig::finalized(),
) )
.await .await
@ -213,6 +230,7 @@ pub fn poll_block(
}); });
} }
} }
}
}); });
vec![task_spawner] vec![task_spawner]

View File

@ -23,7 +23,7 @@ pub const DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE: usize = 200_000;
/// 25 slots in 10s send to little more leaders /// 25 slots in 10s send to little more leaders
#[from_env] #[from_env]
pub const DEFAULT_FANOUT_SIZE: u64 = 32; pub const DEFAULT_FANOUT_SIZE: u64 = 10;
#[from_env] #[from_env]
pub const MAX_RETRIES: usize = 40; pub const MAX_RETRIES: usize = 40;

View File

@ -151,10 +151,10 @@ pub async fn start_lite_rpc(args: Args, rpc_client: Arc<RpcClient>) -> anyhow::R
connection_timeout: Duration::from_secs(1), connection_timeout: Duration::from_secs(1),
connection_retry_count: 10, connection_retry_count: 10,
finalize_timeout: Duration::from_millis(200), finalize_timeout: Duration::from_millis(200),
max_number_of_connections: 10, max_number_of_connections: 8,
unistream_timeout: Duration::from_millis(500), unistream_timeout: Duration::from_millis(500),
write_timeout: Duration::from_secs(1), write_timeout: Duration::from_secs(1),
number_of_transactions_per_unistream: 8, number_of_transactions_per_unistream: 1,
}, },
tpu_connection_path, tpu_connection_path,
}; };

View File

@ -68,16 +68,17 @@ impl ActiveConnection {
let mut exit_oneshot_channel = exit_oneshot_channel; let mut exit_oneshot_channel = exit_oneshot_channel;
let identity = self.identity; let identity = self.identity;
let max_uni_stream_connections: u64 = compute_max_allowed_uni_streams(
identity_stakes.peer_type,
identity_stakes.stakes,
identity_stakes.total_stakes,
) as u64;
let number_of_transactions_per_unistream = self let number_of_transactions_per_unistream = self
.connection_parameters .connection_parameters
.number_of_transactions_per_unistream; .number_of_transactions_per_unistream;
let max_number_of_connections = self.connection_parameters.max_number_of_connections; let max_number_of_connections = self.connection_parameters.max_number_of_connections;
let max_uni_stream_connections: u64 = (compute_max_allowed_uni_streams(
identity_stakes.peer_type,
identity_stakes.stakes,
identity_stakes.total_stakes,
) * max_number_of_connections) as u64;
let task_counter: Arc<AtomicU64> = Arc::new(AtomicU64::new(0)); let task_counter: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
let exit_signal = self.exit_signal.clone(); let exit_signal = self.exit_signal.clone();
let connection_pool = QuicConnectionPool::new( let connection_pool = QuicConnectionPool::new(

View File

@ -102,14 +102,6 @@ impl TpuService {
current_slot: Slot, current_slot: Slot,
estimated_slot: Slot, estimated_slot: Slot,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let load_slot = if estimated_slot <= current_slot {
current_slot
} else if estimated_slot.saturating_sub(current_slot) > 8 {
estimated_slot - 8
} else {
current_slot
};
let fanout = self.config.fanout_slots; let fanout = self.config.fanout_slots;
let last_slot = estimated_slot + fanout; let last_slot = estimated_slot + fanout;
@ -117,7 +109,7 @@ impl TpuService {
let next_leaders = self let next_leaders = self
.leader_schedule .leader_schedule
.get_slot_leaders(load_slot, last_slot) .get_slot_leaders(current_slot, last_slot)
.await?; .await?;
// get next leader with its tpu port // get next leader with its tpu port
let connections_to_keep = next_leaders let connections_to_keep = next_leaders