moving lite-rpc libraries to version v0.2.2

This commit is contained in:
Godmode Galactus 2023-06-23 15:30:02 +02:00
parent 33850b1e5f
commit 7853684abf
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
3 changed files with 85 additions and 41 deletions

View File

@ -161,7 +161,8 @@ pub fn confirmation_by_lite_rpc_notification_stream(
tx_block_data: tokio::sync::broadcast::Sender<BlockData>, tx_block_data: tokio::sync::broadcast::Sender<BlockData>,
exit_signal: Arc<AtomicBool>, exit_signal: Arc<AtomicBool>,
) -> Vec<JoinHandle<()>> { ) -> Vec<JoinHandle<()>> {
let transaction_map: Arc<DashMap<String, (TransactionSendRecord, Instant)>> = Arc::new(DashMap::new()); let transaction_map: Arc<DashMap<String, (TransactionSendRecord, Instant)>> =
Arc::new(DashMap::new());
let confirming_task = { let confirming_task = {
let transaction_map = transaction_map.clone(); let transaction_map = transaction_map.clone();
@ -170,7 +171,7 @@ pub fn confirmation_by_lite_rpc_notification_stream(
tokio::spawn(async move { tokio::spawn(async move {
let mut tx_record_rx = tx_record_rx; let mut tx_record_rx = tx_record_rx;
let mut notification_stream = notification_stream; let mut notification_stream = notification_stream;
while !transaction_map.is_empty() || !exit_signal.load(Ordering::Relaxed) { while !transaction_map.is_empty() || !exit_signal.load(Ordering::Relaxed) {
tokio::select! { tokio::select! {
transaction_record = tx_record_rx.recv() => { transaction_record = tx_record_rx.recv() => {
@ -182,7 +183,7 @@ pub fn confirmation_by_lite_rpc_notification_stream(
}, },
notification = notification_stream.recv() => { notification = notification_stream.recv() => {
if let Some(notification) = notification { if let Some(notification) = notification {
match notification { match notification {
NotificationMsg::BlockNotificationMsg(block_notification) => { NotificationMsg::BlockNotificationMsg(block_notification) => {
let _ = tx_block_data.send(BlockData { let _ = tx_block_data.send(BlockData {

View File

@ -2,6 +2,7 @@ use {
log::info, log::info,
mango_simulation::{ mango_simulation::{
cli, cli,
confirmation_strategies::confirmation_by_lite_rpc_notification_stream,
crank::{self, KeeperConfig}, crank::{self, KeeperConfig},
helpers::{ helpers::{
get_latest_blockhash, get_mango_market_perps_cache, start_blockhash_polling_service, get_latest_blockhash, get_mango_market_perps_cache, start_blockhash_polling_service,
@ -14,10 +15,22 @@ use {
states::PerpMarketCache, states::PerpMarketCache,
stats::MangoSimulationStats, stats::MangoSimulationStats,
tpu_manager::TpuManager, tpu_manager::TpuManager,
confirmation_strategies::confirmation_by_lite_rpc_notification_stream,
}, },
serde_json, serde_json,
solana_client::nonblocking::rpc_client::RpcClient as NbRpcClient, solana_client::nonblocking::rpc_client::RpcClient as NbRpcClient,
solana_lite_rpc_core::{
block_store::BlockStore,
notifications::NotificationMsg,
quic_connection_utils::QuicConnectionParameters,
tx_store::{empty_tx_store, TxStore},
},
solana_lite_rpc_services::{
block_listenser::BlockListener,
tpu_utils::tpu_service::{TpuService, TpuServiceConfig},
transaction_replayer::TransactionReplayer,
transaction_service::{TransactionService, TransactionServiceBuilder},
tx_sender::TxSender,
},
solana_program::pubkey::Pubkey, solana_program::pubkey::Pubkey,
solana_sdk::{commitment_config::CommitmentConfig, signer::keypair::Keypair}, solana_sdk::{commitment_config::CommitmentConfig, signer::keypair::Keypair},
std::{ std::{
@ -27,34 +40,38 @@ use {
sync::Arc, sync::Arc,
time::Duration, time::Duration,
}, },
tokio::sync::mpsc::{unbounded_channel, UnboundedSender},
tokio::{sync::RwLock, task::JoinHandle}, tokio::{sync::RwLock, task::JoinHandle},
solana_lite_rpc_core::{block_store::BlockStore, tx_store::{TxStore, empty_tx_store}, notifications::NotificationMsg, quic_connection_utils::QuicConnectionParameters},
solana_lite_rpc_services::{transaction_service::{TransactionService, TransactionServiceBuilder}, tx_sender::TxSender, tpu_utils::tpu_service::{TpuService, TpuServiceConfig}, block_listenser::{BlockListener}, transaction_replayer::TransactionReplayer},
tokio::sync::mpsc::{UnboundedSender, unbounded_channel},
}; };
const METRICS_NAME: &str = "mango-bencher"; const METRICS_NAME: &str = "mango-bencher";
async fn configure_transaction_service(rpc_client: Arc<NbRpcClient>, identity: Keypair, block_store: BlockStore, tx_store: TxStore, notifier: UnboundedSender<NotificationMsg> ) -> (TransactionService, JoinHandle<anyhow::Result<()>>) { async fn configure_transaction_service(
rpc_client: Arc<NbRpcClient>,
identity: Keypair,
block_store: BlockStore,
tx_store: TxStore,
notifier: UnboundedSender<NotificationMsg>,
) -> (TransactionService, JoinHandle<anyhow::Result<()>>) {
let slot = rpc_client.get_slot().await.expect("GetSlot should work"); let slot = rpc_client.get_slot().await.expect("GetSlot should work");
let tpu_config = TpuServiceConfig { let tpu_config = TpuServiceConfig {
fanout_slots: 12, fanout_slots: 12,
number_of_leaders_to_cache: 1024, number_of_leaders_to_cache: 1024,
clusterinfo_refresh_time: Duration::from_secs(60*60), clusterinfo_refresh_time: Duration::from_secs(60 * 60),
leader_schedule_update_frequency: Duration::from_secs(10), leader_schedule_update_frequency: Duration::from_secs(10),
maximum_transaction_in_queue: 200_000, maximum_transaction_in_queue: 200_000,
maximum_number_of_errors: 10, maximum_number_of_errors: 10,
quic_connection_params : QuicConnectionParameters { quic_connection_params: QuicConnectionParameters {
connection_timeout: Duration::from_secs(1), connection_timeout: Duration::from_secs(1),
connection_retry_count: 10, connection_retry_count: 10,
finalize_timeout: Duration::from_millis(200), finalize_timeout: Duration::from_millis(200),
max_number_of_connections: 10, max_number_of_connections: 10,
unistream_timeout: Duration::from_millis(500), unistream_timeout: Duration::from_millis(500),
write_timeout: Duration::from_secs(1), write_timeout: Duration::from_secs(1),
number_of_transactions_per_unistream: 10, number_of_transactions_per_unistream: 10,
} },
}; };
let tpu_service = TpuService::new( let tpu_service = TpuService::new(
tpu_config, tpu_config,
Arc::new(identity), Arc::new(identity),
@ -62,12 +79,24 @@ async fn configure_transaction_service(rpc_client: Arc<NbRpcClient>, identity: K
rpc_client.clone(), rpc_client.clone(),
tx_store.clone(), tx_store.clone(),
) )
.await.expect("Should be able to create TPU"); .await
.expect("Should be able to create TPU");
let tx_sender = TxSender::new(tx_store.clone(), tpu_service.clone()); let tx_sender = TxSender::new(tx_store.clone(), tpu_service.clone());
let block_listenser = BlockListener::new(rpc_client.clone(), tx_store.clone(), block_store.clone()); let block_listenser =
let replayer = TransactionReplayer::new(tpu_service.clone(), tx_store.clone(), Duration::from_secs(2)); BlockListener::new(rpc_client.clone(), tx_store.clone(), block_store.clone());
let builder = TransactionServiceBuilder::new(tx_sender, replayer, block_listenser, tpu_service, 1_000_000); let replayer = TransactionReplayer::new(
tpu_service.clone(),
tx_store.clone(),
Duration::from_secs(2),
);
let builder = TransactionServiceBuilder::new(
tx_sender,
replayer,
block_listenser,
tpu_service,
1_000_000,
);
builder.start(Some(notifier), block_store, 10, Duration::from_secs(90)) builder.start(Some(notifier), block_store, 10, Duration::from_secs(90))
} }
@ -128,9 +157,18 @@ pub async fn main() -> anyhow::Result<()> {
)); ));
let tx_store = empty_tx_store(); let tx_store = empty_tx_store();
let block_store = BlockStore::new(&nb_rpc_client).await.expect("Blockstore should be created"); let block_store = BlockStore::new(&nb_rpc_client)
.await
.expect("Blockstore should be created");
let (notif_sx, notif_rx) = unbounded_channel(); let (notif_sx, notif_rx) = unbounded_channel();
let (transaction_service, tx_service_jh) = configure_transaction_service(nb_rpc_client.clone(), Keypair::from_bytes(identity.to_bytes().as_slice()).unwrap(), block_store, tx_store, notif_sx).await; let (transaction_service, tx_service_jh) = configure_transaction_service(
nb_rpc_client.clone(),
Keypair::from_bytes(identity.to_bytes().as_slice()).unwrap(),
block_store,
tx_store,
notif_sx,
)
.await;
let nb_users = account_keys_parsed.len(); let nb_users = account_keys_parsed.len();
@ -173,20 +211,25 @@ pub async fn main() -> anyhow::Result<()> {
duration duration
); );
let mango_program_pk = Pubkey::from_str(mango_group_config.mango_program_id.as_str()).expect("Mango program should be able to convert into pubkey"); let mango_program_pk = Pubkey::from_str(mango_group_config.mango_program_id.as_str())
.expect("Mango program should be able to convert into pubkey");
let perp_market_caches: Vec<PerpMarketCache> = let perp_market_caches: Vec<PerpMarketCache> =
get_mango_market_perps_cache(nb_rpc_client.clone(), mango_group_config, &mango_program_pk) get_mango_market_perps_cache(nb_rpc_client.clone(), mango_group_config, &mango_program_pk)
.await; .await;
let quote_root_bank = let quote_root_bank =
Pubkey::from_str(mango_group_config.tokens.last().unwrap().root_key.as_str()).expect("Quote root bank should be able to convert into pubkey");(); Pubkey::from_str(mango_group_config.tokens.last().unwrap().root_key.as_str())
.expect("Quote root bank should be able to convert into pubkey");
();
let quote_node_banks = mango_group_config let quote_node_banks = mango_group_config
.tokens .tokens
.last() .last()
.unwrap() .unwrap()
.node_keys .node_keys
.iter() .iter()
.map(|x| Pubkey::from_str(x.as_str()).expect("Token mint should be able to convert into pubkey")) .map(|x| {
Pubkey::from_str(x.as_str()).expect("Token mint should be able to convert into pubkey")
})
.collect(); .collect();
clean_market_makers( clean_market_makers(
@ -272,7 +315,7 @@ pub async fn main() -> anyhow::Result<()> {
notif_rx, notif_rx,
tx_status_sx, tx_status_sx,
block_status_sx, block_status_sx,
exit_signal.clone() exit_signal.clone(),
); );
tasks.append(&mut confirmation_threads); tasks.append(&mut confirmation_threads);
@ -315,11 +358,11 @@ pub async fn main() -> anyhow::Result<()> {
info!("Transaction service joined"); info!("Transaction service joined");
}); });
tokio::select!{ tokio::select! {
_ = futures::future::join_all(tasks) => {}, _ = futures::future::join_all(tasks) => {},
_ = transaction_service => {}, _ = transaction_service => {},
}; };
mango_sim_stats.report(true, METRICS_NAME).await; mango_sim_stats.report(true, METRICS_NAME).await;
Ok(()) Ok(())
} }

View File

@ -1,6 +1,6 @@
use log::warn; use log::warn;
use solana_client::connection_cache::ConnectionCache; use solana_client::connection_cache::ConnectionCache;
use solana_lite_rpc_services::{transaction_service::{TransactionService}}; use solana_lite_rpc_services::transaction_service::TransactionService;
use solana_sdk::transaction::Transaction; use solana_sdk::transaction::Transaction;
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
@ -22,7 +22,6 @@ impl TpuManager {
stats: MangoSimulationStats, stats: MangoSimulationStats,
tx_send_record: UnboundedSender<TransactionSendRecord>, tx_send_record: UnboundedSender<TransactionSendRecord>,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
Ok(Self { Ok(Self {
transaction_service, transaction_service,
stats, stats,
@ -49,7 +48,8 @@ impl TpuManager {
let transaction = bincode::serialize(transaction).unwrap(); let transaction = bincode::serialize(transaction).unwrap();
self.transaction_service self.transaction_service
.send_transaction(transaction, None).await .send_transaction(transaction, None)
.await
.is_ok() .is_ok()
} }