Merge pull request #56 from blockworks-foundation/some_changes_in_block_listner

Some changes in block listner
This commit is contained in:
galactus 2023-02-14 13:16:52 +01:00 committed by GitHub
commit 2a9324157c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 45 additions and 51 deletions

View File

@ -31,7 +31,7 @@ async fn get_identity_keypair(identity_from_cli: &String) -> Keypair {
}
}
#[tokio::main]
#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
pub async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();

View File

@ -1,7 +1,4 @@
use std::{
collections::{BTreeSet, VecDeque},
sync::Arc,
};
use std::{collections::VecDeque, sync::Arc, time::Instant};
use dashmap::DashMap;
use jsonrpsee::SubscriptionSink;
@ -144,6 +141,8 @@ impl BlockListener {
TT_RECV_CON_BLOCK.start_timer()
};
let start = Instant::now();
let block = self
.rpc_client
.get_block_with_config(
@ -152,8 +151,8 @@ impl BlockListener {
transaction_details: Some(TransactionDetails::Full),
commitment: Some(commitment_config),
max_supported_transaction_version: Some(0),
encoding: Some(UiTransactionEncoding::Binary),
rewards: Some(false),
encoding: Some(UiTransactionEncoding::Base64),
rewards: Some(true),
},
)
.await?;
@ -191,28 +190,7 @@ impl BlockListener {
)
.await;
if let Some(postgres) = &postgres {
let Some(rewards) = block.rewards else {
return Ok(());
};
let Some(leader_reward) = rewards
.iter()
.find(|reward| Some(RewardType::Fee) == reward.reward_type) else {
return Ok(());
};
let _leader_id = &leader_reward.pubkey;
postgres
.send(PostgresMsg::PostgresBlock(PostgresBlock {
slot: slot as i64,
leader_id: 0, //FIX:
parent_slot: parent_slot as i64,
}))
.expect("Error sending block to postgres service");
}
let mut transactions_processed = 0;
for tx in transactions {
let Some(UiTransactionStatusMeta { err, status, compute_units_consumed ,.. }) = tx.meta else {
info!("tx with no meta");
@ -226,6 +204,7 @@ impl BlockListener {
continue;
}
};
transactions_processed += 1;
let sig = tx.signatures[0].to_string();
if let Some(mut tx_status) = self.tx_sender.txs_sent.get_mut(&sig) {
@ -285,6 +264,36 @@ impl BlockListener {
}
}
info!(
"Number of transactions processed {} for slot {} for commitment {} time taken {} ms",
transactions_processed,
slot,
commitment_config.commitment,
start.elapsed().as_millis()
);
if let Some(postgres) = &postgres {
let Some(rewards) = block.rewards else {
return Ok(());
};
let Some(leader_reward) = rewards
.iter()
.find(|reward| Some(RewardType::Fee) == reward.reward_type) else {
return Ok(());
};
let _leader_id = &leader_reward.pubkey;
postgres
.send(PostgresMsg::PostgresBlock(PostgresBlock {
slot: slot as i64,
leader_id: 0, //FIX:
parent_slot: parent_slot as i64,
}))
.expect("Error sending block to postgres service");
}
Ok(())
}
pub fn listen(
@ -322,13 +331,12 @@ impl BlockListener {
continue;
}
if let Err(err) = this
if let Err(_) = this
.index_slot(slot, commitment_config, postgres.clone())
.await
{
warn!(
"Error while indexing {commitment_config:?} block with slot {slot} {err}"
);
// usually as we index all the slots even if they are not been processed we get some errors for slot
// as they are not in long term storage of the rpc // we check 10 times before ignoring the slot
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
{
let mut queue = slots_task_queue.lock().await;
@ -352,43 +360,29 @@ impl BlockListener {
let mut last_latest_slot = last_latest_slot - 5;
// storage for recent slots processed
const SLOT_PROCESSED_SIZE: usize = 128;
let mut slot_processed = BTreeSet::<u64>::new();
let rpc_client = rpc_client.clone();
loop {
let new_slot = rpc_client
.get_slot_with_commitment(commitment_config)
.await?;
// filter already processed slots
let new_block_slots: Vec<u64> = (last_latest_slot..new_slot)
.filter(|x| !slot_processed.contains(x))
.map(|x| x)
.collect();
if new_block_slots.is_empty() {
if last_latest_slot == new_slot {
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
println!("no slots");
continue;
}
//info!("Received new slots {commitment_config:?} {last_latest_slot}");
let latest_slot = *new_block_slots.last().unwrap();
// filter already processed slots
let new_block_slots: Vec<u64> = (last_latest_slot..new_slot).collect();
// context for lock
{
let mut lock = slots_task_queue.lock().await;
for slot in new_block_slots {
lock.push_back((slot, 0));
if slot_processed.insert(slot) && slot_processed.len() > SLOT_PROCESSED_SIZE
{
slot_processed.pop_first();
}
}
}
last_latest_slot = latest_slot;
last_latest_slot = new_slot;
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
}
})