restart block subscription on geyser

This commit is contained in:
godmodegalactus 2023-11-30 16:17:47 +01:00
parent 204f032941
commit 024f341c77
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
2 changed files with 86 additions and 89 deletions

View File

@ -140,9 +140,7 @@ impl BlockInfo {
Some(x) => {
x.cu_requested += cu_requested;
x.cu_consumed += cu_consumed;
if prioritization_fees > 0 {
x.vec_pf.push(prioritization_fees);
}
x.vec_pf.push(prioritization_fees);
}
None => {
writelocked_accounts.insert(
@ -163,9 +161,7 @@ impl BlockInfo {
Some(x) => {
x.cu_requested += cu_requested;
x.cu_consumed += cu_consumed;
if prioritization_fees > 0 {
x.vec_pf.push(prioritization_fees);
}
x.vec_pf.push(prioritization_fees);
}
None => {
readlocked_accounts.insert(

View File

@ -186,42 +186,8 @@ async fn main() {
let postgres = postgres::Postgres::new().await;
let slot = Arc::new(AtomicU64::new(0));
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"sidecar_block_subscription".to_string(),
yellowstone_grpc_proto_original::prelude::SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);
let mut slot_sub = HashMap::new();
slot_sub.insert(
"slot_sub".to_string(),
yellowstone_grpc_proto_original::prelude::SubscribeRequestFilterSlots {
filter_by_commitment: None,
},
);
let mut geyser_stream = client
.subscribe_once(
slot_sub,
Default::default(),
HashMap::new(),
Default::default(),
blocks_subs,
Default::default(),
None,
Default::default(),
None,
)
.await
.unwrap();
postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
let jhs = args
let _jhs = args
.banking_grpc_addresses
.iter()
.map(|address| {
@ -234,58 +200,93 @@ async fn main() {
})
.collect_vec();
while let Some(message) = geyser_stream.next().await {
let Ok(message) = message else {
continue;
};
loop {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"sidecar_block_subscription".to_string(),
yellowstone_grpc_proto_original::prelude::SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);
let Some(update) = message.update_oneof else {
continue;
};
let mut slot_sub = HashMap::new();
slot_sub.insert(
"slot_sub".to_string(),
yellowstone_grpc_proto_original::prelude::SubscribeRequestFilterSlots {
filter_by_commitment: None,
},
);
match update {
yellowstone_grpc_proto_original::prelude::subscribe_update::UpdateOneof::Block(
block,
) => {
debug!("got block {}", block.slot);
BLOCK_TXS.set(block.transactions.len() as i64);
BANKING_STAGE_BLOCKS_COUNTER.inc();
BANKING_STAGE_BLOCKS_TASK.inc();
let postgres = postgres.clone();
let slot = slot.clone();
let map_of_infos = map_of_infos.clone();
let slot_by_errors = slot_by_errors.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(30)).await;
for transaction in &block.transactions {
let Some(tx) = &transaction.transaction else {
continue;
};
let signature = Signature::try_from(tx.signatures[0].clone()).unwrap();
if let Some(mut info) = map_of_infos.get_mut(&signature.to_string()) {
info.add_transaction(transaction, block.slot);
let mut geyser_stream = client
.subscribe_once(
slot_sub,
Default::default(),
HashMap::new(),
Default::default(),
blocks_subs,
Default::default(),
None,
Default::default(),
None,
)
.await
.unwrap();
while let Some(message) = geyser_stream.next().await {
let Ok(message) = message else {
continue;
};
let Some(update) = message.update_oneof else {
continue;
};
match update {
yellowstone_grpc_proto_original::prelude::subscribe_update::UpdateOneof::Block(
block,
) => {
debug!("got block {}", block.slot);
BLOCK_TXS.set(block.transactions.len() as i64);
BANKING_STAGE_BLOCKS_COUNTER.inc();
BANKING_STAGE_BLOCKS_TASK.inc();
let postgres = postgres.clone();
let slot = slot.clone();
let map_of_infos = map_of_infos.clone();
let slot_by_errors = slot_by_errors.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(30)).await;
for transaction in &block.transactions {
let Some(tx) = &transaction.transaction else {
continue;
};
let signature = Signature::try_from(tx.signatures[0].clone()).unwrap();
if let Some(mut info) = map_of_infos.get_mut(&signature.to_string()) {
info.add_transaction(transaction, block.slot);
}
}
}
let banking_stage_error_count =
slot_by_errors.get(&block.slot).map(|x| *x.value() as i64);
let block_info = BlockInfo::new(&block, banking_stage_error_count);
let banking_stage_error_count =
slot_by_errors.get(&block.slot).map(|x| *x.value() as i64);
let block_info = BlockInfo::new(&block, banking_stage_error_count);
TXERROR_COUNT.add(
block_info.processed_transactions - block_info.successful_transactions,
);
if let Err(e) = postgres.save_block_info(block_info).await {
error!("Error saving block {}", e);
}
slot.store(block.slot, std::sync::atomic::Ordering::Relaxed);
slot_by_errors.remove(&block.slot);
BANKING_STAGE_BLOCKS_TASK.dec();
});
// delay queue so that we get all the banking stage errors before processing block
}
_ => {}
};
TXERROR_COUNT.add(
block_info.processed_transactions - block_info.successful_transactions,
);
if let Err(e) = postgres.save_block_info(block_info).await {
error!("Error saving block {}", e);
}
slot.store(block.slot, std::sync::atomic::Ordering::Relaxed);
slot_by_errors.remove(&block.slot);
BANKING_STAGE_BLOCKS_TASK.dec();
});
// delay queue so that we get all the banking stage errors before processing block
}
_ => {}
};
}
log::error!("stopping the sidecar, geyser block stream is broken");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
log::error!("stopping the sidecar, geyser block stream is broken");
let _ = futures::future::select_all(jhs).await;
}