Merge pull request #130 from blockworks-foundation/solving_current_slot_update_issue

solving current slot update issues
This commit is contained in:
galactus 2023-04-26 11:26:11 +02:00 committed by GitHub
commit 93c9ec4066
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 34 additions and 26 deletions

View File

@ -83,7 +83,7 @@ impl LiteBridge {
let current_slot = rpc_client.get_slot().await?;
let tpu_service = TpuService::new(
Arc::new(std::sync::atomic::AtomicU64::new(current_slot)),
current_slot,
fanout_slots,
Arc::new(identity),
rpc_client.clone(),

View File

@ -94,13 +94,12 @@ impl Default for IdentityStakes {
impl TpuService {
pub async fn new(
current_slot: Arc<AtomicU64>,
current_slot: Slot,
fanout_slots: u64,
identity: Arc<Keypair>,
rpc_client: Arc<RpcClient>,
rpc_ws_address: String,
) -> anyhow::Result<Self> {
let slot = current_slot.load(Ordering::Relaxed);
let pubsub_client = PubsubClient::new(&rpc_ws_address).await?;
let (sender, _) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE);
let (certificate, key) = new_self_signed_tls_certificate(
@ -114,8 +113,8 @@ impl TpuService {
Ok(Self {
cluster_nodes: Arc::new(DashMap::new()),
current_slot,
estimated_slot: Arc::new(AtomicU64::new(slot)),
current_slot: Arc::new(AtomicU64::new(current_slot)),
estimated_slot: Arc::new(AtomicU64::new(current_slot)),
leader_schedule: Arc::new(RwLock::new(VecDeque::new())),
fanout_slots,
rpc_client,
@ -182,6 +181,7 @@ impl TpuService {
pub async fn update_leader_schedule(&self) -> Result<()> {
let current_slot = self.current_slot.load(Ordering::Relaxed);
let estimated_slot = self.estimated_slot.load(Ordering::Relaxed);
let (queue_begin_slot, queue_end_slot) = {
let mut leader_queue = self.leader_schedule.write().await;
@ -190,8 +190,10 @@ impl TpuService {
leader_queue.pop_front();
}
let last_element = leader_queue.back().map_or(current_slot, |x| x.leader_slot);
(current_slot, last_element)
let last_element = leader_queue
.back()
.map_or(estimated_slot, |x| x.leader_slot);
(estimated_slot, last_element)
};
let last_slot_needed = queue_begin_slot + CACHE_NEXT_SLOT_LEADERS_PUBKEY_SIZE as u64;
@ -230,7 +232,7 @@ impl TpuService {
let current_slot = self.current_slot.load(Ordering::Relaxed);
let load_slot = if estimated_slot <= current_slot {
current_slot
} else if estimated_slot - current_slot > 8 {
} else if estimated_slot.saturating_sub(current_slot) > 8 {
estimated_slot - 8
} else {
current_slot
@ -274,6 +276,14 @@ impl TpuService {
}
async fn update_current_slot(&self, update_notifier: tokio::sync::mpsc::UnboundedSender<u64>) {
let update_slot = |slot: u64| {
if slot > self.current_slot.load(Ordering::Relaxed) {
self.current_slot.store(slot, Ordering::Relaxed);
CURRENT_SLOT.set(slot as i64);
let _ = update_notifier.send(slot);
}
};
loop {
let slot = self
.rpc_client
@ -283,7 +293,7 @@ impl TpuService {
.await;
match slot {
Ok(slot) => {
self.current_slot.store(slot, Ordering::Relaxed);
update_slot(slot);
}
Err(e) => {
// error getting slot
@ -294,7 +304,7 @@ impl TpuService {
}
let res = tokio::time::timeout(
Duration::from_millis(2000),
Duration::from_millis(1000),
self.pubsub_client.slot_subscribe(),
)
.await;
@ -311,14 +321,7 @@ impl TpuService {
match next {
Ok(slot_info) => {
if let Some(slot_info) = slot_info {
if slot_info.slot
> self.current_slot.load(Ordering::Relaxed)
{
self.current_slot
.store(slot_info.slot, Ordering::Relaxed);
CURRENT_SLOT.set(slot_info.slot as i64);
let _ = update_notifier.send(slot_info.slot);
}
update_slot(slot_info.slot);
}
}
Err(_) => {
@ -408,9 +411,17 @@ impl TpuService {
}
}
Err(_) => {
let slot = estimated_slot.fetch_add(1, Ordering::Relaxed);
ESTIMATED_SLOT.set((slot + 1) as i64);
let es = estimated_slot.load(Ordering::Relaxed);
let cs = current_slot.load(Ordering::Relaxed);
// estimated slot should not go ahead more than 32 slots
// this is because it may be a slot block
if es < cs + 32 {
estimated_slot.fetch_add(1, Ordering::Relaxed);
ESTIMATED_SLOT.set((es + 1) as i64);
true
} else {
false
}
}
};

View File

@ -1,7 +1,4 @@
use std::{
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
use std::{sync::Arc, time::Duration};
use bench::helpers::BenchHelper;
use futures::future::try_join_all;
@ -23,7 +20,7 @@ async fn send_and_confirm_txs() {
let current_slot = rpc_client.get_slot().await.unwrap();
let tpu_service = TpuService::new(
Arc::new(AtomicU64::new(current_slot)),
current_slot,
32,
Arc::new(Keypair::new()),
rpc_client.clone(),