solving current slot update issues
This commit is contained in:
parent
0038041eb3
commit
0403d80676
|
@ -83,7 +83,7 @@ impl LiteBridge {
|
|||
let current_slot = rpc_client.get_slot().await?;
|
||||
|
||||
let tpu_service = TpuService::new(
|
||||
Arc::new(std::sync::atomic::AtomicU64::new(current_slot)),
|
||||
current_slot,
|
||||
fanout_slots,
|
||||
Arc::new(identity),
|
||||
rpc_client.clone(),
|
||||
|
|
|
@ -94,13 +94,12 @@ impl Default for IdentityStakes {
|
|||
|
||||
impl TpuService {
|
||||
pub async fn new(
|
||||
current_slot: Arc<AtomicU64>,
|
||||
current_slot: Slot,
|
||||
fanout_slots: u64,
|
||||
identity: Arc<Keypair>,
|
||||
rpc_client: Arc<RpcClient>,
|
||||
rpc_ws_address: String,
|
||||
) -> anyhow::Result<Self> {
|
||||
let slot = current_slot.load(Ordering::Relaxed);
|
||||
let pubsub_client = PubsubClient::new(&rpc_ws_address).await?;
|
||||
let (sender, _) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE);
|
||||
let (certificate, key) = new_self_signed_tls_certificate(
|
||||
|
@ -114,8 +113,8 @@ impl TpuService {
|
|||
|
||||
Ok(Self {
|
||||
cluster_nodes: Arc::new(DashMap::new()),
|
||||
current_slot,
|
||||
estimated_slot: Arc::new(AtomicU64::new(slot)),
|
||||
current_slot: Arc::new(AtomicU64::new(current_slot)),
|
||||
estimated_slot: Arc::new(AtomicU64::new(current_slot)),
|
||||
leader_schedule: Arc::new(RwLock::new(VecDeque::new())),
|
||||
fanout_slots,
|
||||
rpc_client,
|
||||
|
@ -182,6 +181,7 @@ impl TpuService {
|
|||
|
||||
pub async fn update_leader_schedule(&self) -> Result<()> {
|
||||
let current_slot = self.current_slot.load(Ordering::Relaxed);
|
||||
let estimated_slot = self.estimated_slot.load(Ordering::Relaxed);
|
||||
|
||||
let (queue_begin_slot, queue_end_slot) = {
|
||||
let mut leader_queue = self.leader_schedule.write().await;
|
||||
|
@ -190,8 +190,10 @@ impl TpuService {
|
|||
leader_queue.pop_front();
|
||||
}
|
||||
|
||||
let last_element = leader_queue.back().map_or(current_slot, |x| x.leader_slot);
|
||||
(current_slot, last_element)
|
||||
let last_element = leader_queue
|
||||
.back()
|
||||
.map_or(estimated_slot, |x| x.leader_slot);
|
||||
(estimated_slot, last_element)
|
||||
};
|
||||
|
||||
let last_slot_needed = queue_begin_slot + CACHE_NEXT_SLOT_LEADERS_PUBKEY_SIZE as u64;
|
||||
|
@ -230,7 +232,7 @@ impl TpuService {
|
|||
let current_slot = self.current_slot.load(Ordering::Relaxed);
|
||||
let load_slot = if estimated_slot <= current_slot {
|
||||
current_slot
|
||||
} else if estimated_slot - current_slot > 8 {
|
||||
} else if estimated_slot.saturating_sub(current_slot) > 8 {
|
||||
estimated_slot - 8
|
||||
} else {
|
||||
current_slot
|
||||
|
@ -274,6 +276,14 @@ impl TpuService {
|
|||
}
|
||||
|
||||
async fn update_current_slot(&self, update_notifier: tokio::sync::mpsc::UnboundedSender<u64>) {
|
||||
let update_slot = |slot: u64| {
|
||||
if slot > self.current_slot.load(Ordering::Relaxed) {
|
||||
self.current_slot.store(slot, Ordering::Relaxed);
|
||||
CURRENT_SLOT.set(slot as i64);
|
||||
let _ = update_notifier.send(slot);
|
||||
}
|
||||
};
|
||||
|
||||
loop {
|
||||
let slot = self
|
||||
.rpc_client
|
||||
|
@ -283,7 +293,7 @@ impl TpuService {
|
|||
.await;
|
||||
match slot {
|
||||
Ok(slot) => {
|
||||
self.current_slot.store(slot, Ordering::Relaxed);
|
||||
update_slot(slot);
|
||||
}
|
||||
Err(e) => {
|
||||
// error getting slot
|
||||
|
@ -294,7 +304,7 @@ impl TpuService {
|
|||
}
|
||||
|
||||
let res = tokio::time::timeout(
|
||||
Duration::from_millis(2000),
|
||||
Duration::from_millis(1000),
|
||||
self.pubsub_client.slot_subscribe(),
|
||||
)
|
||||
.await;
|
||||
|
@ -311,14 +321,7 @@ impl TpuService {
|
|||
match next {
|
||||
Ok(slot_info) => {
|
||||
if let Some(slot_info) = slot_info {
|
||||
if slot_info.slot
|
||||
> self.current_slot.load(Ordering::Relaxed)
|
||||
{
|
||||
self.current_slot
|
||||
.store(slot_info.slot, Ordering::Relaxed);
|
||||
CURRENT_SLOT.set(slot_info.slot as i64);
|
||||
let _ = update_notifier.send(slot_info.slot);
|
||||
}
|
||||
update_slot(slot_info.slot);
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
|
@ -408,9 +411,17 @@ impl TpuService {
|
|||
}
|
||||
}
|
||||
Err(_) => {
|
||||
let slot = estimated_slot.fetch_add(1, Ordering::Relaxed);
|
||||
ESTIMATED_SLOT.set((slot + 1) as i64);
|
||||
let es = estimated_slot.load(Ordering::Relaxed);
|
||||
let cs = estimated_slot.load(Ordering::Relaxed);
|
||||
// estimated slot should not go ahead more than 32 slots
|
||||
// this is because it may be a slot block
|
||||
if es < cs + 32 {
|
||||
estimated_slot.fetch_add(1, Ordering::Relaxed);
|
||||
ESTIMATED_SLOT.set((es + 1) as i64);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -1,7 +1,4 @@
|
|||
use std::{
|
||||
sync::{atomic::AtomicU64, Arc},
|
||||
time::Duration,
|
||||
};
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use bench::helpers::BenchHelper;
|
||||
use futures::future::try_join_all;
|
||||
|
@ -23,7 +20,7 @@ async fn send_and_confirm_txs() {
|
|||
let current_slot = rpc_client.get_slot().await.unwrap();
|
||||
|
||||
let tpu_service = TpuService::new(
|
||||
Arc::new(AtomicU64::new(current_slot)),
|
||||
current_slot,
|
||||
32,
|
||||
Arc::new(Keypair::new()),
|
||||
rpc_client.clone(),
|
||||
|
|
Loading…
Reference in New Issue