block_processor and bridge

This commit is contained in:
aniketfuryrocks 2023-06-12 05:45:40 +05:30
parent 20dd700342
commit 2bc65f9ec6
No known key found for this signature in database
GPG Key ID: 1B75EA596D89FF06
4 changed files with 132 additions and 87 deletions

View File

@ -3,7 +3,7 @@ use crate::{
encoding::BinaryEncoding,
postgres::Postgres,
rpc::LiteRpcServer,
DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE,
AnyhowJoinHandle, DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE,
};
use solana_lite_rpc_services::{
@ -36,13 +36,12 @@ use solana_rpc_client_api::{
};
use solana_sdk::{
commitment_config::CommitmentConfig, hash::Hash, pubkey::Pubkey, signature::Keypair,
transaction::VersionedTransaction, slot_history::Slot,
slot_history::Slot, transaction::VersionedTransaction,
};
use solana_transaction_status::TransactionStatus;
use tokio::{
net::ToSocketAddrs,
sync::mpsc::{self, Sender, UnboundedSender},
task::JoinHandle,
time::Instant,
};
@ -134,7 +133,7 @@ impl LiteBridge {
clean_interval: Duration,
enable_postgres: bool,
prometheus_addr: T,
) -> anyhow::Result<Vec<JoinHandle<anyhow::Result<()>>>> {
) -> anyhow::Result<()> {
let (postgres, postgres_send) = if enable_postgres {
let (postgres_send, postgres_recv) = mpsc::unbounded_channel();
let postgres = Postgres::new().await?;
@ -145,7 +144,8 @@ impl LiteBridge {
(None, None)
};
let mut tpu_services = self.tpu_service.start().await?;
let tpu_service = self.tpu_service.clone();
let tpu_service = tpu_service.start();
let (tx_send, tx_recv) = mpsc::channel(DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE);
self.tx_send_channel = Some(tx_send);
@ -200,13 +200,13 @@ impl LiteBridge {
.await?
.start(rpc)?;
let ws_server = tokio::spawn(async move {
let ws_server: AnyhowJoinHandle = tokio::spawn(async move {
info!("Websocket Server started at {ws_addr:?}");
ws_server_handle.stopped().await;
bail!("Websocket server stopped");
});
let http_server = tokio::spawn(async move {
let http_server: AnyhowJoinHandle = tokio::spawn(async move {
info!("HTTP Server started at {http_addr:?}");
http_server_handle.stopped().await;
bail!("HTTP server stopped");
@ -215,26 +215,53 @@ impl LiteBridge {
(ws_server, http_server)
};
let mut services = vec![
ws_server,
http_server,
tx_sender,
finalized_block_listener,
confirmed_block_listener,
processed_block_listener,
metrics_capture,
prometheus_sync,
cleaner,
replay_service,
];
let postgres = tokio::spawn(async {
let Some(postgres) = postgres else {
std::future::pending::<()>().await;
unreachable!();
};
services.append(&mut tpu_services);
postgres.await
});
if let Some(postgres) = postgres {
services.push(postgres);
tokio::select! {
res = tpu_service => {
bail!("Tpu Services exited unexpectedly {res:?}");
},
res = ws_server => {
bail!("WebSocket server exited unexpectedly {res:?}");
},
res = http_server => {
bail!("HTTP server exited unexpectedly {res:?}");
},
res = tx_sender => {
bail!("Tx Sender exited unexpectedly {res:?}");
},
res = finalized_block_listener => {
bail!("Finalized Block Listener exited unexpectedly {res:?}");
},
res = confirmed_block_listener => {
bail!("Confirmed Block Listener exited unexpectedly {res:?}");
},
res = processed_block_listener => {
bail!("Processed Block Listener exited unexpectedly {res:?}");
},
res = metrics_capture => {
bail!("Metrics Capture exited unexpectedly {res:?}");
},
res = prometheus_sync => {
bail!("Prometheus Service exited unexpectedly {res:?}");
},
res = cleaner => {
bail!("Cleaner Service exited unexpectedly {res:?}");
},
res = replay_service => {
bail!("Tx replay service exited unexpectedly {res:?}");
},
res = postgres => {
bail!("Postgres service exited unexpectedly {res:?}");
},
}
Ok(services)
}
}
@ -458,19 +485,13 @@ impl LiteRpcServer for LiteBridge {
Ok(airdrop_sig)
}
async fn get_slot(
&self,
config: Option<RpcContextConfig>,
) -> crate::rpc::Result<Slot> {
async fn get_slot(&self, config: Option<RpcContextConfig>) -> crate::rpc::Result<Slot> {
let commitment_config = config
.map(|config| config.commitment.unwrap_or_default())
.unwrap_or_default();
let (_,
BlockInformation {
slot, ..
}
) = self.block_store.get_latest_block(commitment_config).await;
let (_, BlockInformation { slot, .. }) =
self.block_store.get_latest_block(commitment_config).await;
Ok(slot)
}

View File

@ -9,6 +9,8 @@ pub mod errors;
pub mod postgres;
pub mod rpc;
pub type AnyhowJoinHandle = tokio::task::JoinHandle<anyhow::Result<()>>;
#[from_env]
pub const DEFAULT_RPC_ADDR: &str = "http://0.0.0.0:8899";
#[from_env]

View File

@ -63,7 +63,8 @@ pub async fn main() -> anyhow::Result<()> {
};
let retry_after = Duration::from_secs(transaction_retry_after_secs);
let light_bridge = LiteBridge::new(
let services = LiteBridge::new(
rpc_addr,
ws_addr,
fanout_size,
@ -71,29 +72,25 @@ pub async fn main() -> anyhow::Result<()> {
retry_after,
maximum_retries_per_tx,
)
.await?;
let services = light_bridge
.start_services(
lite_rpc_http_addr,
lite_rpc_ws_addr,
clean_interval_ms,
enable_postgres,
prometheus_addr,
)
.await?;
let services = futures::future::try_join_all(services);
.await?
.start_services(
lite_rpc_http_addr,
lite_rpc_ws_addr,
clean_interval_ms,
enable_postgres,
prometheus_addr,
);
let ctrl_c_signal = tokio::signal::ctrl_c();
tokio::select! {
_ = services => {
bail!("Services quit unexpectedly");
res = services => {
bail!("Services quit unexpectedly {res:?}");
}
_ = ctrl_c_signal => {
info!("Received ctrl+c signal");
Ok(())
}
}
Ok(())
}

View File

@ -1,3 +1,4 @@
use std::{
sync::{
atomic::{AtomicU64, Ordering},
@ -6,9 +7,10 @@ use std::{
time::Duration,
};
use anyhow::bail;
use anyhow::{bail, Context};
use chrono::{TimeZone, Utc};
use jsonrpsee::SubscriptionSink;
use jsonrpsee::{SubscriptionSink};
use log::{error, info, trace};
use prometheus::{
core::GenericGauge, histogram_opts, opts, register_histogram, register_int_counter,
@ -17,6 +19,7 @@ use prometheus::{
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel},
slot_history::Slot,
@ -250,32 +253,26 @@ impl BlockListener {
Ok(())
}
pub fn listen(
pub async fn listen(
self,
commitment_config: CommitmentConfig,
notifier: Option<NotificationSender>,
estimated_slot: Arc<AtomicU64>,
) -> JoinHandle<anyhow::Result<()>> {
) -> anyhow::Result<()> {
let (slot_retry_queue_sx, mut slot_retry_queue_rx) = tokio::sync::mpsc::unbounded_channel();
let (block_schedule_queue_sx, block_schedule_queue_rx) = async_channel::unbounded::<Slot>();
// task to fetch blocks
for _i in 0..8 {
let this = self.clone();
//
let this = self.clone();
let slot_indexer_tasks = (0..8).map(move |_| {
let this = this.clone();
let notifier = notifier.clone();
let slot_retry_queue_sx = slot_retry_queue_sx.clone();
let block_schedule_queue_rx = block_schedule_queue_rx.clone();
tokio::spawn(async move {
loop {
let slot = match block_schedule_queue_rx.recv().await {
Ok(v) => v,
Err(e) => {
error!("Recv error on block channel {}", e);
break;
}
};
let task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
while let Ok(slot) = block_schedule_queue_rx.recv().await {
if commitment_config.is_finalized() {
BLOCKS_IN_FINALIZED_QUEUE.dec();
} else {
@ -291,19 +288,28 @@ impl BlockListener {
let retry_at = tokio::time::Instant::now()
.checked_add(Duration::from_millis(10))
.unwrap();
let _ = slot_retry_queue_sx.send((slot, retry_at));
slot_retry_queue_sx
.send((slot, retry_at))
.context("Error sending slot to retry queue from slot indexer task")?;
BLOCKS_IN_RETRY_QUEUE.inc();
};
}
bail!("Block Slot channel closed")
});
}
task
});
// a task that will queue back the slots to be retried after a certain delay
let recent_slot = Arc::new(AtomicU64::new(0));
{
let slot_retry_task: JoinHandle<anyhow::Result<()>> = {
let block_schedule_queue_sx = block_schedule_queue_sx.clone();
let recent_slot = recent_slot.clone();
tokio::spawn(async move {
while let Some((slot, instant)) = slot_retry_queue_rx.recv().await {
BLOCKS_IN_RETRY_QUEUE.dec();
@ -315,22 +321,26 @@ impl BlockListener {
continue;
}
let now = tokio::time::Instant::now();
if now < instant {
if tokio::time::Instant::now() < instant {
tokio::time::sleep_until(instant).await;
}
if block_schedule_queue_sx.send(slot).await.is_ok() {
if commitment_config.is_finalized() {
BLOCKS_IN_FINALIZED_QUEUE.inc();
} else {
BLOCKS_IN_CONFIRMED_QUEUE.inc();
}
block_schedule_queue_sx.send(slot).await.context(
"Slot retry que failed to send block to block_schedule_queue_sx",
)?;
if commitment_config.is_finalized() {
BLOCKS_IN_FINALIZED_QUEUE.inc();
} else {
BLOCKS_IN_CONFIRMED_QUEUE.inc();
}
}
});
}
tokio::spawn(async move {
bail!("Slot retry task exit")
})
};
let get_slot_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
info!("{commitment_config:?} block listner started");
let last_latest_slot = self
@ -355,9 +365,9 @@ impl BlockListener {
// context for lock
{
for slot in new_block_slots {
if let Err(e) = block_schedule_queue_sx.send(slot).await {
error!("error sending of block schedule queue {}", e);
} else if commitment_config.is_finalized() {
block_schedule_queue_sx.send(slot).await?;
if commitment_config.is_finalized() {
BLOCKS_IN_FINALIZED_QUEUE.inc();
} else {
BLOCKS_IN_CONFIRMED_QUEUE.inc();
@ -368,7 +378,19 @@ impl BlockListener {
last_latest_slot = new_slot;
recent_slot.store(last_latest_slot, std::sync::atomic::Ordering::Relaxed);
}
})
});
tokio::select! {
res = get_slot_task => {
anyhow::bail!("Get slot task exited unexpectedly {res:?}")
}
res = slot_retry_task => {
anyhow::bail!("Slot retry task exited unexpectedly {res:?}")
},
res = futures::future::try_join_all(slot_indexer_tasks) => {
anyhow::bail!("Slot indexer exited unexpectedly {res:?}")
},
}
}
// continuosly poll processed blocks and feed into blockstore
@ -379,10 +401,13 @@ impl BlockListener {
info!("processed block listner started");
loop {
// ignore errors from processed block polling
let _ = block_processor
if let Err(err) = block_processor
.poll_latest_block(CommitmentConfig::processed())
.await;
.await {
error!("Error fetching latest processed block {err:?}");
}
// sleep
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
})