Making the blocks parsing optional, so that triton can launch side car without blocks parsing

This commit is contained in:
godmodegalactus 2023-12-05 14:00:43 +01:00
parent bae8b36465
commit 8c4cb2fafe
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
2 changed files with 96 additions and 105 deletions

View File

@ -3,8 +3,8 @@ use clap::Parser;
#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
pub struct Args {
#[arg(short, long, default_value_t = String::from("http://127.0.0.1:10000"))]
pub grpc_address_to_fetch_blocks: String,
#[arg(short, long)]
pub grpc_address_to_fetch_blocks: Option<String>,
#[arg(short = 'x', long)]
pub grpc_x_token: Option<String>,

View File

@ -39,75 +39,12 @@ lazy_static::lazy_static! {
register_int_gauge!(opts!("bankingstage_blocks_in_rpc_queue", "Banking stage blocks in rpc queue")).unwrap();
}
// fn spawn_rpc_block_processor() -> tokio::task::JoinHandle<()> {
// let map_of_infos = map_of_infos.clone();
// let postgres = postgres.clone();
// let slot_by_errors = slot_by_errors.clone();
// tokio::spawn(async move {
// let mut rpc_blocks_reciever = rpc_blocks_reciever;
// let rpc_client = RpcClient::new(rpc_url);
// while let Some((wait_until, slot)) = rpc_blocks_reciever.recv().await {
// tokio::time::sleep_until(wait_until).await;
// let block = if let Ok(block) = rpc_client
// .get_block_with_config(
// slot,
// solana_rpc_client_api::config::RpcBlockConfig {
// encoding: Some(
// solana_transaction_status::UiTransactionEncoding::Base64,
// ),
// transaction_details: Some(
// solana_transaction_status::TransactionDetails::Full,
// ),
// rewards: Some(true),
// commitment: Some(CommitmentConfig::confirmed()),
// max_supported_transaction_version: Some(0),
// },
// )
// .await
// {
// block
// } else {
// continue;
// };
// let Some(transactions) = &block.transactions else {
// continue;
// };
// for transaction in transactions {
// let Some(transaction) = &transaction.transaction.decode() else {
// continue;
// };
// let signature = transaction.signatures[0].to_string();
// if let Some(mut info) = map_of_infos.get_mut(&signature) {
// info.add_rpc_transaction(slot, transaction);
// }
// }
// let banking_stage_error_count = slot_by_errors
// .get(&slot)
// .map(|x| *x.value() as i64)
// .unwrap_or_default();
// let block_info =
// BlockInfo::new_from_rpc_block(slot, &block, banking_stage_error_count);
// if let Some(block_info) = block_info {
// BANKING_STAGE_ERROR_COUNT.add(banking_stage_error_count);
// TXERROR_COUNT.add(
// block_info.processed_transactions - block_info.successful_transactions,
// );
// if let Err(e) = postgres.save_block_info(block_info).await {
// error!("Error saving block {}", e);
// }
// slot_by_errors.remove(&slot);
// }
// }
// })
// }
pub async fn start_tracking_banking_stage_errors(
grpc_address: String,
map_of_infos: Arc<DashMap<String, TransactionInfo>>,
slot_by_errors: Arc<DashMap<u64, u64>>,
slot: Arc<AtomicU64>,
subscribe_to_slots: bool,
) {
loop {
let token: Option<String> = None;
@ -118,9 +55,23 @@ pub async fn start_tracking_banking_stage_errors(
)
.unwrap();
let slot_subscription: HashMap<
String,
yellowstone_grpc_proto::geyser::SubscribeRequestFilterSlots,
> = if subscribe_to_slots {
let mut slot_sub = HashMap::new();
slot_sub.insert(
"slot_sub".to_string(),
yellowstone_grpc_proto::geyser::SubscribeRequestFilterSlots {},
);
slot_sub
} else {
HashMap::new()
};
let mut geyser_stream = client
.subscribe_once(
HashMap::new(),
slot_subscription,
Default::default(),
HashMap::new(),
Default::default(),
@ -142,7 +93,8 @@ pub async fn start_tracking_banking_stage_errors(
continue;
};
if let yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof::BankingTransactionErrors(transaction) = update {
match update{
yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof::BankingTransactionErrors(transaction) => {
if transaction.error.is_none() && transaction.accounts.is_empty() {
continue;
}
@ -166,47 +118,36 @@ pub async fn start_tracking_banking_stage_errors(
map_of_infos.insert(sig, x);
}
}
}
},
yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof::Slot(s) => {
let load_slot = slot.load(std::sync::atomic::Ordering::Relaxed);
if load_slot < s.slot {
// update slot to process updates
slot.store(s.slot, std::sync::atomic::Ordering::Relaxed);
}
},
_=>{}
}
}
error!("geyser banking stage connection failed {}", grpc_address);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
#[tokio::main()]
async fn main() {
tracing_subscriber::fmt::init();
let args = Args::parse();
let _prometheus_jh = PrometheusSync::sync(args.prometheus_addr.clone());
let grpc_block_addr = args.grpc_address_to_fetch_blocks;
async fn start_tracking_blocks(
grpc_block_addr: String,
grpc_x_token: Option<String>,
postgres: postgres::Postgres,
slot: Arc<AtomicU64>,
slot_by_errors: Arc<DashMap<u64, u64>>,
map_of_infos: Arc<DashMap<String, TransactionInfo>>,
) {
let mut client = yellowstone_grpc_client_original::GeyserGrpcClient::connect(
grpc_block_addr,
args.grpc_x_token,
grpc_x_token,
None,
)
.unwrap();
let map_of_infos = Arc::new(DashMap::<String, TransactionInfo>::new());
let slot_by_errors = Arc::new(DashMap::<u64, u64>::new());
let postgres = postgres::Postgres::new().await;
let slot = Arc::new(AtomicU64::new(0));
postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
let _jhs = args
.banking_grpc_addresses
.iter()
.map(|address| {
let address = address.clone();
let map_of_infos = map_of_infos.clone();
let slot_by_errors = slot_by_errors.clone();
tokio::spawn(async move {
start_tracking_banking_stage_errors(address, map_of_infos, slot_by_errors).await;
})
})
.collect_vec();
loop {
let mut blocks_subs = HashMap::new();
@ -244,12 +185,12 @@ async fn main() {
.unwrap();
while let Some(message) = geyser_stream.next().await {
let Ok(message) = message else {
continue;
};
continue;
};
let Some(update) = message.update_oneof else {
continue;
};
continue;
};
match update {
yellowstone_grpc_proto_original::prelude::subscribe_update::UpdateOneof::Block(
@ -267,8 +208,8 @@ async fn main() {
tokio::time::sleep(Duration::from_secs(30)).await;
for transaction in &block.transactions {
let Some(tx) = &transaction.transaction else {
continue;
};
continue;
};
let signature = Signature::try_from(tx.signatures[0].clone()).unwrap();
if let Some(mut info) = map_of_infos.get_mut(&signature.to_string()) {
info.add_transaction(transaction, block.slot);
@ -298,3 +239,53 @@ async fn main() {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
#[tokio::main()]
async fn main() {
tracing_subscriber::fmt::init();
let args = Args::parse();
let _prometheus_jh = PrometheusSync::sync(args.prometheus_addr.clone());
let grpc_block_addr = args.grpc_address_to_fetch_blocks;
let map_of_infos = Arc::new(DashMap::<String, TransactionInfo>::new());
let slot_by_errors = Arc::new(DashMap::<u64, u64>::new());
let postgres = postgres::Postgres::new().await;
let slot = Arc::new(AtomicU64::new(0));
let no_block_subscription = grpc_block_addr.is_none();
postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
let jhs = args
.banking_grpc_addresses
.iter()
.map(|address| {
let address = address.clone();
let map_of_infos = map_of_infos.clone();
let slot_by_errors = slot_by_errors.clone();
let slot = slot.clone();
tokio::spawn(async move {
start_tracking_banking_stage_errors(
address,
map_of_infos,
slot_by_errors,
slot,
no_block_subscription,
)
.await;
})
})
.collect_vec();
if let Some(gprc_block_addr) = grpc_block_addr {
start_tracking_blocks(
gprc_block_addr,
args.grpc_x_token,
postgres,
slot,
slot_by_errors,
map_of_infos,
)
.await;
}
futures::future::join_all(jhs).await;
}