Making the blocks parsing optional, so that triton can launch side car without blocks parsing (#9)
This commit is contained in:
parent
bae8b36465
commit
80283c5d48
|
@ -3,8 +3,8 @@ use clap::Parser;
|
|||
#[derive(Parser, Debug, Clone)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
pub struct Args {
|
||||
#[arg(short, long, default_value_t = String::from("http://127.0.0.1:10000"))]
|
||||
pub grpc_address_to_fetch_blocks: String,
|
||||
#[arg(short, long)]
|
||||
pub grpc_address_to_fetch_blocks: Option<String>,
|
||||
|
||||
#[arg(short = 'x', long)]
|
||||
pub grpc_x_token: Option<String>,
|
||||
|
|
197
src/main.rs
197
src/main.rs
|
@ -39,75 +39,12 @@ lazy_static::lazy_static! {
|
|||
register_int_gauge!(opts!("bankingstage_blocks_in_rpc_queue", "Banking stage blocks in rpc queue")).unwrap();
|
||||
}
|
||||
|
||||
// fn spawn_rpc_block_processor() -> tokio::task::JoinHandle<()> {
|
||||
// let map_of_infos = map_of_infos.clone();
|
||||
// let postgres = postgres.clone();
|
||||
// let slot_by_errors = slot_by_errors.clone();
|
||||
// tokio::spawn(async move {
|
||||
// let mut rpc_blocks_reciever = rpc_blocks_reciever;
|
||||
// let rpc_client = RpcClient::new(rpc_url);
|
||||
// while let Some((wait_until, slot)) = rpc_blocks_reciever.recv().await {
|
||||
// tokio::time::sleep_until(wait_until).await;
|
||||
// let block = if let Ok(block) = rpc_client
|
||||
// .get_block_with_config(
|
||||
// slot,
|
||||
// solana_rpc_client_api::config::RpcBlockConfig {
|
||||
// encoding: Some(
|
||||
// solana_transaction_status::UiTransactionEncoding::Base64,
|
||||
// ),
|
||||
// transaction_details: Some(
|
||||
// solana_transaction_status::TransactionDetails::Full,
|
||||
// ),
|
||||
// rewards: Some(true),
|
||||
// commitment: Some(CommitmentConfig::confirmed()),
|
||||
// max_supported_transaction_version: Some(0),
|
||||
// },
|
||||
// )
|
||||
// .await
|
||||
// {
|
||||
// block
|
||||
// } else {
|
||||
// continue;
|
||||
// };
|
||||
|
||||
// let Some(transactions) = &block.transactions else {
|
||||
// continue;
|
||||
// };
|
||||
|
||||
// for transaction in transactions {
|
||||
// let Some(transaction) = &transaction.transaction.decode() else {
|
||||
// continue;
|
||||
// };
|
||||
// let signature = transaction.signatures[0].to_string();
|
||||
// if let Some(mut info) = map_of_infos.get_mut(&signature) {
|
||||
// info.add_rpc_transaction(slot, transaction);
|
||||
// }
|
||||
// }
|
||||
|
||||
// let banking_stage_error_count = slot_by_errors
|
||||
// .get(&slot)
|
||||
// .map(|x| *x.value() as i64)
|
||||
// .unwrap_or_default();
|
||||
// let block_info =
|
||||
// BlockInfo::new_from_rpc_block(slot, &block, banking_stage_error_count);
|
||||
// if let Some(block_info) = block_info {
|
||||
// BANKING_STAGE_ERROR_COUNT.add(banking_stage_error_count);
|
||||
// TXERROR_COUNT.add(
|
||||
// block_info.processed_transactions - block_info.successful_transactions,
|
||||
// );
|
||||
// if let Err(e) = postgres.save_block_info(block_info).await {
|
||||
// error!("Error saving block {}", e);
|
||||
// }
|
||||
// slot_by_errors.remove(&slot);
|
||||
// }
|
||||
// }
|
||||
// })
|
||||
// }
|
||||
|
||||
pub async fn start_tracking_banking_stage_errors(
|
||||
grpc_address: String,
|
||||
map_of_infos: Arc<DashMap<String, TransactionInfo>>,
|
||||
slot_by_errors: Arc<DashMap<u64, u64>>,
|
||||
slot: Arc<AtomicU64>,
|
||||
subscribe_to_slots: bool,
|
||||
) {
|
||||
loop {
|
||||
let token: Option<String> = None;
|
||||
|
@ -118,9 +55,23 @@ pub async fn start_tracking_banking_stage_errors(
|
|||
)
|
||||
.unwrap();
|
||||
|
||||
let slot_subscription: HashMap<
|
||||
String,
|
||||
yellowstone_grpc_proto::geyser::SubscribeRequestFilterSlots,
|
||||
> = if subscribe_to_slots {
|
||||
let mut slot_sub = HashMap::new();
|
||||
slot_sub.insert(
|
||||
"slot_sub".to_string(),
|
||||
yellowstone_grpc_proto::geyser::SubscribeRequestFilterSlots {},
|
||||
);
|
||||
slot_sub
|
||||
} else {
|
||||
HashMap::new()
|
||||
};
|
||||
|
||||
let mut geyser_stream = client
|
||||
.subscribe_once(
|
||||
HashMap::new(),
|
||||
slot_subscription,
|
||||
Default::default(),
|
||||
HashMap::new(),
|
||||
Default::default(),
|
||||
|
@ -142,7 +93,8 @@ pub async fn start_tracking_banking_stage_errors(
|
|||
continue;
|
||||
};
|
||||
|
||||
if let yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof::BankingTransactionErrors(transaction) = update {
|
||||
match update{
|
||||
yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof::BankingTransactionErrors(transaction) => {
|
||||
if transaction.error.is_none() && transaction.accounts.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
@ -166,47 +118,36 @@ pub async fn start_tracking_banking_stage_errors(
|
|||
map_of_infos.insert(sig, x);
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof::Slot(s) => {
|
||||
let load_slot = slot.load(std::sync::atomic::Ordering::Relaxed);
|
||||
if load_slot < s.slot {
|
||||
// update slot to process updates
|
||||
slot.store(s.slot, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
},
|
||||
_=>{}
|
||||
}
|
||||
}
|
||||
error!("geyser banking stage connection failed {}", grpc_address);
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main()]
|
||||
async fn main() {
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
let args = Args::parse();
|
||||
|
||||
let _prometheus_jh = PrometheusSync::sync(args.prometheus_addr.clone());
|
||||
|
||||
let grpc_block_addr = args.grpc_address_to_fetch_blocks;
|
||||
async fn start_tracking_blocks(
|
||||
grpc_block_addr: String,
|
||||
grpc_x_token: Option<String>,
|
||||
postgres: postgres::Postgres,
|
||||
slot: Arc<AtomicU64>,
|
||||
slot_by_errors: Arc<DashMap<u64, u64>>,
|
||||
map_of_infos: Arc<DashMap<String, TransactionInfo>>,
|
||||
) {
|
||||
let mut client = yellowstone_grpc_client_original::GeyserGrpcClient::connect(
|
||||
grpc_block_addr,
|
||||
args.grpc_x_token,
|
||||
grpc_x_token,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
let map_of_infos = Arc::new(DashMap::<String, TransactionInfo>::new());
|
||||
let slot_by_errors = Arc::new(DashMap::<u64, u64>::new());
|
||||
|
||||
let postgres = postgres::Postgres::new().await;
|
||||
let slot = Arc::new(AtomicU64::new(0));
|
||||
|
||||
postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
|
||||
let _jhs = args
|
||||
.banking_grpc_addresses
|
||||
.iter()
|
||||
.map(|address| {
|
||||
let address = address.clone();
|
||||
let map_of_infos = map_of_infos.clone();
|
||||
let slot_by_errors = slot_by_errors.clone();
|
||||
tokio::spawn(async move {
|
||||
start_tracking_banking_stage_errors(address, map_of_infos, slot_by_errors).await;
|
||||
})
|
||||
})
|
||||
.collect_vec();
|
||||
|
||||
loop {
|
||||
let mut blocks_subs = HashMap::new();
|
||||
|
@ -244,12 +185,12 @@ async fn main() {
|
|||
.unwrap();
|
||||
while let Some(message) = geyser_stream.next().await {
|
||||
let Ok(message) = message else {
|
||||
continue;
|
||||
};
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(update) = message.update_oneof else {
|
||||
continue;
|
||||
};
|
||||
continue;
|
||||
};
|
||||
|
||||
match update {
|
||||
yellowstone_grpc_proto_original::prelude::subscribe_update::UpdateOneof::Block(
|
||||
|
@ -267,8 +208,8 @@ async fn main() {
|
|||
tokio::time::sleep(Duration::from_secs(30)).await;
|
||||
for transaction in &block.transactions {
|
||||
let Some(tx) = &transaction.transaction else {
|
||||
continue;
|
||||
};
|
||||
continue;
|
||||
};
|
||||
let signature = Signature::try_from(tx.signatures[0].clone()).unwrap();
|
||||
if let Some(mut info) = map_of_infos.get_mut(&signature.to_string()) {
|
||||
info.add_transaction(transaction, block.slot);
|
||||
|
@ -298,3 +239,53 @@ async fn main() {
|
|||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main()]
|
||||
async fn main() {
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
let args = Args::parse();
|
||||
|
||||
let _prometheus_jh = PrometheusSync::sync(args.prometheus_addr.clone());
|
||||
|
||||
let grpc_block_addr = args.grpc_address_to_fetch_blocks;
|
||||
let map_of_infos = Arc::new(DashMap::<String, TransactionInfo>::new());
|
||||
let slot_by_errors = Arc::new(DashMap::<u64, u64>::new());
|
||||
|
||||
let postgres = postgres::Postgres::new().await;
|
||||
let slot = Arc::new(AtomicU64::new(0));
|
||||
let no_block_subscription = grpc_block_addr.is_none();
|
||||
postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
|
||||
let jhs = args
|
||||
.banking_grpc_addresses
|
||||
.iter()
|
||||
.map(|address| {
|
||||
let address = address.clone();
|
||||
let map_of_infos = map_of_infos.clone();
|
||||
let slot_by_errors = slot_by_errors.clone();
|
||||
let slot = slot.clone();
|
||||
tokio::spawn(async move {
|
||||
start_tracking_banking_stage_errors(
|
||||
address,
|
||||
map_of_infos,
|
||||
slot_by_errors,
|
||||
slot,
|
||||
no_block_subscription,
|
||||
)
|
||||
.await;
|
||||
})
|
||||
})
|
||||
.collect_vec();
|
||||
if let Some(gprc_block_addr) = grpc_block_addr {
|
||||
start_tracking_blocks(
|
||||
gprc_block_addr,
|
||||
args.grpc_x_token,
|
||||
postgres,
|
||||
slot,
|
||||
slot_by_errors,
|
||||
map_of_infos,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
futures::future::join_all(jhs).await;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue