Revert "Block Listner Channel Fix (#137)"

This reverts commit 4e5506cc58.
This commit is contained in:
Godmode Galactus 2023-05-09 16:52:51 +02:00
parent 4e5506cc58
commit b3a5622b5f
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
8 changed files with 104 additions and 147 deletions

View File

@ -59,7 +59,7 @@ async fn main() {
Ok(bh) => {
let mut lock = block_hash.write().await;
*lock = bh;
}
},
Err(e) => println!("blockhash update error {}", e),
}
tokio::time::sleep(Duration::from_millis(500)).await;

View File

@ -8,7 +8,7 @@ use crate::{
PrometheusSync, TransactionReplay, TransactionReplayer, TxProps, TxSender, WireTransaction,
MESSAGES_IN_REPLAY_QUEUE,
},
AnyhowJoinHandle, DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE,
DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE,
};
use std::{ops::Deref, str::FromStr, sync::Arc, time::Duration};
@ -16,7 +16,6 @@ use std::{ops::Deref, str::FromStr, sync::Arc, time::Duration};
use anyhow::bail;
use dashmap::DashMap;
use log::{error, info};
use jsonrpsee::{core::SubscriptionResult, server::ServerBuilder, PendingSubscriptionSink};
@ -35,6 +34,7 @@ use solana_transaction_status::TransactionStatus;
use tokio::{
net::ToSocketAddrs,
sync::mpsc::{self, Sender, UnboundedSender},
task::JoinHandle,
time::Instant,
};
@ -63,10 +63,9 @@ pub struct LiteBridge {
// None if LiteBridge is not executed
pub tx_send_channel: Option<Sender<(String, WireTransaction, u64)>>,
pub tx_sender: TxSender,
// blocks
pub block_listner: BlockListener,
pub block_store: BlockStore,
// txs
pub tx_replayer: TransactionReplayer,
pub tx_replay_sender: Option<UnboundedSender<TransactionReplay>>,
pub max_retries: usize,
@ -128,7 +127,7 @@ impl LiteBridge {
clean_interval: Duration,
enable_postgres: bool,
prometheus_addr: T,
) -> anyhow::Result<()> {
) -> anyhow::Result<Vec<JoinHandle<anyhow::Result<()>>>> {
let (postgres, postgres_send) = if enable_postgres {
let (postgres_send, postgres_recv) = mpsc::unbounded_channel();
let postgres = Postgres::new().await?;
@ -139,8 +138,7 @@ impl LiteBridge {
(None, None)
};
let tpu_service = self.tpu_service.clone();
let tpu_service = tpu_service.start();
let mut tpu_services = self.tpu_service.start().await?;
let (tx_send, tx_recv) = mpsc::channel(DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE);
self.tx_send_channel = Some(tx_send);
@ -195,13 +193,13 @@ impl LiteBridge {
.await?
.start(rpc)?;
let ws_server: AnyhowJoinHandle = tokio::spawn(async move {
let ws_server = tokio::spawn(async move {
info!("Websocket Server started at {ws_addr:?}");
ws_server_handle.stopped().await;
bail!("Websocket server stopped");
});
let http_server: AnyhowJoinHandle = tokio::spawn(async move {
let http_server = tokio::spawn(async move {
info!("HTTP Server started at {http_addr:?}");
http_server_handle.stopped().await;
bail!("HTTP server stopped");
@ -210,53 +208,26 @@ impl LiteBridge {
(ws_server, http_server)
};
let postgres = async {
let Some(postgres) = postgres else {
std::future::pending::<()>().await;
unreachable!();
};
let mut services = vec![
ws_server,
http_server,
tx_sender,
finalized_block_listener,
confirmed_block_listener,
processed_block_listener,
metrics_capture,
prometheus_sync,
cleaner,
replay_service,
];
postgres.await
};
services.append(&mut tpu_services);
tokio::select! {
res = tpu_service => {
bail!("Tpu Services exited unexpectedly {res:?}");
},
res = ws_server => {
bail!("WebSocket server exited unexpectedly {res:?}");
},
res = http_server => {
bail!("HTTP server exited unexpectedly {res:?}");
},
res = tx_sender => {
bail!("Tx Sender exited unexpectedly {res:?}");
},
res = finalized_block_listener => {
bail!("Finalized Block Listener exited unexpectedly {res:?}");
},
res = confirmed_block_listener => {
bail!("Confirmed Block Listener exited unexpectedly {res:?}");
},
res = processed_block_listener => {
bail!("Processed Block Listener exited unexpectedly {res:?}");
},
res = metrics_capture => {
bail!("Metrics Capture exited unexpectedly {res:?}");
},
res = prometheus_sync => {
bail!("Prometheus Service exited unexpectedly {res:?}");
},
res = cleaner => {
bail!("Cleaner Service exited unexpectedly {res:?}");
},
res = replay_service => {
bail!("Tx replay service exited unexpectedly {res:?}");
},
res = postgres => {
bail!("Postgres service exited unexpectedly {res:?}");
},
if let Some(postgres) = postgres {
services.push(postgres);
}
Ok(services)
}
}

View File

@ -1,6 +1,5 @@
use const_env::from_env;
use solana_transaction_status::TransactionConfirmationStatus;
use tokio::task::JoinHandle;
pub mod block_store;
pub mod bridge;
@ -11,8 +10,6 @@ pub mod errors;
pub mod rpc;
pub mod workers;
pub type AnyhowJoinHandle = JoinHandle<anyhow::Result<()>>;
#[from_env]
pub const DEFAULT_RPC_ADDR: &str = "http://0.0.0.0:8899";
#[from_env]

View File

@ -52,10 +52,11 @@ pub async fn main() -> anyhow::Result<()> {
dotenv().ok();
let identity = get_identity_keypair(&identity_keypair).await;
let clean_interval_ms = Duration::from_millis(clean_interval_ms);
let retry_after = Duration::from_secs(transaction_retry_after_secs);
let services = LiteBridge::new(
let clean_interval_ms = Duration::from_millis(clean_interval_ms);
let retry_after = Duration::from_secs(transaction_retry_after_secs);
let light_bridge = LiteBridge::new(
rpc_addr,
ws_addr,
fanout_size,
@ -63,25 +64,29 @@ pub async fn main() -> anyhow::Result<()> {
retry_after,
maximum_retries_per_tx,
)
.await?
.start_services(
lite_rpc_http_addr,
lite_rpc_ws_addr,
clean_interval_ms,
enable_postgres,
prometheus_addr,
);
.await?;
let services = light_bridge
.start_services(
lite_rpc_http_addr,
lite_rpc_ws_addr,
clean_interval_ms,
enable_postgres,
prometheus_addr,
)
.await?;
let services = futures::future::try_join_all(services);
let ctrl_c_signal = tokio::signal::ctrl_c();
tokio::select! {
res = services => {
bail!("Services quit unexpectedly {res:?}");
_ = services => {
bail!("Services quit unexpectedly");
}
_ = ctrl_c_signal => {
info!("Received ctrl+c signal");
Ok(())
}
}
Ok(())
}

View File

@ -9,7 +9,7 @@ use std::{
use chrono::{TimeZone, Utc};
use dashmap::DashMap;
use jsonrpsee::{SubscriptionMessage, SubscriptionSink};
use log::{info, trace, warn};
use log::{error, info, trace, warn};
use prometheus::{
core::GenericGauge, histogram_opts, opts, register_histogram, register_int_counter,
register_int_gauge, Histogram, IntCounter,
@ -395,26 +395,33 @@ impl BlockListener {
Ok(())
}
pub async fn listen(
pub fn listen(
self,
commitment_config: CommitmentConfig,
postgres: Option<PostgresMpscSend>,
estimated_slot: Arc<AtomicU64>,
) -> anyhow::Result<()> {
) -> JoinHandle<anyhow::Result<()>> {
let (slot_retry_queue_sx, mut slot_retry_queue_rx) = tokio::sync::mpsc::unbounded_channel();
let (block_schedule_queue_sx, block_schedule_queue_rx) = async_channel::unbounded::<Slot>();
let (block_schedule_queue_sx, block_schedule_queue_rx) =
async_channel::unbounded::<Slot>();
// task to fetch blocks
//
let this = self.clone();
let slot_indexer_tasks = (0..8).map(move |_| {
let this = this.clone();
for _i in 0..8 {
let this = self.clone();
let postgres = postgres.clone();
let slot_retry_queue_sx = slot_retry_queue_sx.clone();
let block_schedule_queue_rx = block_schedule_queue_rx.clone();
tokio::spawn(async move {
while let Ok(slot) = block_schedule_queue_rx.recv().await {
loop {
let slot = match block_schedule_queue_rx.recv().await {
Ok(v) => v,
Err(e) => {
error!("Recv error on block channel {}", e);
continue;
}
};
if commitment_config.is_finalized() {
BLOCKS_IN_FINALIZED_QUEUE.dec();
} else {
@ -430,28 +437,19 @@ impl BlockListener {
let retry_at = tokio::time::Instant::now()
.checked_add(Duration::from_millis(10))
.unwrap();
slot_retry_queue_sx.send((slot, retry_at))?;
let _ = slot_retry_queue_sx.send((slot, retry_at));
BLOCKS_IN_RETRY_QUEUE.inc();
};
}
anyhow::bail!("Block Slot channel closed");
// for type
#[allow(unreachable_code)]
Ok(())
})
});
});
}
// a task that will queue back the slots to be retried after a certain delay
let recent_slot = Arc::new(AtomicU64::new(0));
let slot_retry_task = {
{
let block_schedule_queue_sx = block_schedule_queue_sx.clone();
let recent_slot = recent_slot.clone();
tokio::spawn(async move {
while let Some((slot, instant)) = slot_retry_queue_rx.recv().await {
BLOCKS_IN_RETRY_QUEUE.dec();
@ -463,11 +461,11 @@ impl BlockListener {
continue;
}
if tokio::time::Instant::now() < instant {
let now = tokio::time::Instant::now();
if now < instant {
tokio::time::sleep_until(instant).await;
}
if (block_schedule_queue_sx.send(slot).await).is_ok() {
if let Ok(_) = block_schedule_queue_sx.send(slot).await {
if commitment_config.is_finalized() {
BLOCKS_IN_FINALIZED_QUEUE.inc();
} else {
@ -475,10 +473,10 @@ impl BlockListener {
}
}
}
})
};
});
}
let get_slot_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
tokio::spawn(async move {
info!("{commitment_config:?} block listner started");
let last_latest_slot = self
@ -503,12 +501,14 @@ impl BlockListener {
// context for lock
{
for slot in new_block_slots {
block_schedule_queue_sx.send(slot).await?;
if commitment_config.is_finalized() {
BLOCKS_IN_FINALIZED_QUEUE.inc();
if let Err(e) = block_schedule_queue_sx.send(slot).await {
error!("error sending of block schedule queue {}", e);
} else {
BLOCKS_IN_CONFIRMED_QUEUE.inc();
if commitment_config.is_finalized() {
BLOCKS_IN_FINALIZED_QUEUE.inc();
} else {
BLOCKS_IN_CONFIRMED_QUEUE.inc();
}
}
}
}
@ -516,19 +516,7 @@ impl BlockListener {
last_latest_slot = new_slot;
recent_slot.store(last_latest_slot, std::sync::atomic::Ordering::Relaxed);
}
});
tokio::select! {
res = get_slot_task => {
anyhow::bail!("Get slot task exited unexpectedly {res:?}")
}
res = slot_retry_task => {
anyhow::bail!("Slot retry task exited unexpectedly {res:?}")
},
res = futures::future::try_join_all(slot_indexer_tasks) => {
anyhow::bail!("Slot indexer exited unexpectedly {res:?}")
},
}
})
}
// continuosly poll processed blocks and feed into blockstore

View File

@ -15,7 +15,9 @@ use quinn::{
ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream,
TokioRuntime, TransportConfig,
};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::{
pubkey::Pubkey,
};
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
use tokio::{
sync::{broadcast::Receiver, broadcast::Sender, RwLock},
@ -211,6 +213,7 @@ impl ActiveConnection {
exit_signal: Arc<AtomicBool>,
last_stable_id: Arc<AtomicU64>,
) {
let mut queue = VecDeque::new();
for tx in txs {
queue.push_back(tx);

View File

@ -1,4 +1,4 @@
use anyhow::{bail, Result};
use anyhow::Result;
use dashmap::DashMap;
use futures::StreamExt;
use log::{error, info, warn};
@ -25,10 +25,11 @@ use std::{
};
use tokio::{
sync::RwLock,
task::JoinHandle,
time::{Duration, Instant},
};
use crate::{workers::TxProps, AnyhowJoinHandle};
use crate::workers::TxProps;
use super::tpu_connection_manager::TpuConnectionManager;
@ -351,7 +352,7 @@ impl TpuService {
}
}
pub async fn start(&self) -> anyhow::Result<()> {
pub async fn start(&self) -> anyhow::Result<Vec<JoinHandle<anyhow::Result<()>>>> {
self.update_cluster_nodes().await?;
self.update_leader_schedule().await?;
self.update_quic_connections().await;
@ -381,7 +382,7 @@ impl TpuService {
let this = self.clone();
let (slot_sender, slot_reciever) = tokio::sync::mpsc::unbounded_channel::<Slot>();
let slot_sub_task: AnyhowJoinHandle = tokio::spawn(async move {
let slot_sub_task = tokio::spawn(async move {
this.update_current_slot(slot_sender).await;
Ok(())
});
@ -436,17 +437,11 @@ impl TpuService {
}
});
tokio::select! {
res = jh_update_leaders => {
bail!("Leader update service exited unexpectedly {res:?}");
},
res = slot_sub_task => {
bail!("Leader update service exited unexpectedly {res:?}");
},
res = estimated_slot_calculation => {
bail!("Estimated slot calculation service exited unexpectedly {res:?}");
},
}
Ok(vec![
jh_update_leaders,
slot_sub_task,
estimated_slot_calculation,
])
}
pub fn get_estimated_slot(&self) -> u64 {

View File

@ -2,7 +2,7 @@ use std::{sync::Arc, time::Duration};
use bench::helpers::BenchHelper;
use dashmap::DashMap;
use futures::future::try_join_all;
use lite_rpc::{
block_store::BlockStore,
encoding::BinaryEncoding,
@ -40,13 +40,14 @@ async fn send_and_confirm_txs() {
let (tx_send, tx_recv) = mpsc::channel(1024);
let block_listner_service = block_listener.clone().listen(
CommitmentConfig::confirmed(),
None,
tpu_service.get_estimated_slot_holder(),
);
let tx_sender_service = tx_sender.clone().execute(tx_recv, None);
let services = try_join_all(vec![
block_listener.clone().listen(
CommitmentConfig::confirmed(),
None,
tpu_service.get_estimated_slot_holder(),
),
tx_sender.clone().execute(tx_recv, None),
]);
let confirm = tokio::spawn(async move {
let funded_payer = BenchHelper::get_payer().await.unwrap();
@ -76,11 +77,8 @@ async fn send_and_confirm_txs() {
});
tokio::select! {
res = block_listner_service => {
panic!("BlockListener service stopped unexpectedly {res:?}")
},
res = tx_sender_service => {
panic!("TxSender service stopped unexpectedly {res:?}")
_ = services => {
panic!("Services stopped unexpectedly")
},
_ = confirm => {}
}