tests and polling

This commit is contained in:
aniketfuryrocks 2023-02-09 17:35:18 +05:30 committed by Godmode Galactus
parent 247b8aa550
commit c2bcf6d126
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
10 changed files with 209 additions and 82 deletions

34
Cargo.lock generated
View File

@ -1345,6 +1345,19 @@ dependencies = [
"miniz_oxide",
]
[[package]]
name = "flume"
version = "0.10.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577"
dependencies = [
"futures-core",
"futures-sink",
"nanorand",
"pin-project",
"spin 0.9.5",
]
[[package]]
name = "fnv"
version = "1.0.7"
@ -2244,6 +2257,7 @@ dependencies = [
"clap 4.1.4",
"const_env",
"dashmap",
"flume",
"futures",
"jsonrpsee",
"lazy_static",
@ -2383,6 +2397,15 @@ dependencies = [
"windows-sys 0.42.0",
]
[[package]]
name = "nanorand"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
dependencies = [
"getrandom 0.2.8",
]
[[package]]
name = "native-tls"
version = "0.2.11"
@ -3259,7 +3282,7 @@ dependencies = [
"cc",
"libc",
"once_cell",
"spin",
"spin 0.5.2",
"untrusted",
"web-sys",
"winapi",
@ -4602,6 +4625,15 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spin"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7dccf47db1b41fa1573ed27ccf5e08e3ca771cb994f776668c5ebda893b248fc"
dependencies = [
"lock_api",
]
[[package]]
name = "spki"
version = "0.5.4"

View File

@ -43,3 +43,4 @@ native-tls = "0.2.11"
postgres-native-tls = "0.5.0"
prometheus = "0.13.3"
lazy_static = "1.4.0"
flume = "0.10.14"

View File

@ -70,15 +70,11 @@ impl BenchHelper {
) -> Vec<Transaction> {
(0..num_of_txs)
.into_iter()
.map(|_| Self::create_memo_tx(b"hello" ,funded_payer, blockhash))
.map(|_| Self::create_memo_tx(b"hello", funded_payer, blockhash))
.collect()
}
}
pub fn create_memo_tx(
msg: &[u8],
payer: &Keypair,
blockhash: Hash,
) -> Transaction {
pub fn create_memo_tx(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction {
let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap();
let instruction = Instruction::new_with_bytes(memo, msg, vec![]);

View File

@ -97,7 +97,10 @@ impl BlockStore {
// Write to block store first in order to prevent
// any race condition i.e prevent some one to
// ask the map what it doesn't have rn
let slot = block_info.slot;
self.blocks.insert(blockhash.clone(), block_info);
*self.get_latest_blockhash(commitment_config).write().await = blockhash;
if slot > self.get_latest_block_info(commitment_config).await.1.slot {
*self.get_latest_blockhash(commitment_config).write().await = blockhash;
}
}
}

View File

@ -1,7 +1,9 @@
use std::sync::Arc;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use dashmap::DashMap;
use futures::future::try_join_all;
use jsonrpsee::SubscriptionSink;
use log::{error, info, warn};
use prometheus::{histogram_opts, opts, register_counter, register_histogram, Counter, Histogram};
@ -190,8 +192,7 @@ impl BlockListener {
.iter()
.find(|reward| Some(RewardType::Fee) == reward.reward_type) else {
return Ok(());
};
};
let _leader_id = &leader_reward.pubkey;
@ -277,51 +278,106 @@ impl BlockListener {
Ok(())
}
pub fn listen(
self,
commitment_config: CommitmentConfig,
postgres: Option<PostgresMpscSend>,
) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
let commitment = commitment_config.commitment;
let (send, recv) = flume::unbounded();
let get_block_errors = Arc::new(AtomicU64::new(0));
info!("Listening to {commitment:?} blocks");
for _i in 0..6 {
let this = self.clone();
let postgres = postgres.clone();
let recv = recv.clone();
let send = send.clone();
let get_block_errors = get_block_errors.clone();
tokio::spawn(async move {
while let Ok(slot) = recv.recv_async().await {
if get_block_errors.load(Ordering::Relaxed) > 6 {
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
get_block_errors.fetch_sub(1, Ordering::Relaxed);
}
// println!("{i} thread in slot {slot}");
if let Err(err) = this
.index_slot(slot, commitment_config, postgres.clone())
.await
{
warn!(
"Error while indexing {commitment_config:?} block with slot {slot} {err}"
);
get_block_errors.fetch_add(1, Ordering::Relaxed);
send.send_async(slot).await.unwrap();
};
// println!("{i} thread done slot {slot}");
}
});
}
let (latest_slot_send, mut latest_slot_recv) = tokio::sync::mpsc::channel(1);
{
let this = self.clone();
let latest_slot_send = latest_slot_send.clone();
tokio::spawn(async move {
while let Some(latest_slot) = latest_slot_recv.recv().await {
if let Err(err) = this
.index_slot(latest_slot, commitment_config, postgres.clone())
.await
{
warn!(
"Error while indexing latest {commitment_config:?} block with slot {latest_slot} {err}"
);
get_block_errors.fetch_add(1, Ordering::Relaxed);
latest_slot_send.send(latest_slot).await.unwrap();
};
}
});
}
tokio::spawn(async move {
let mut slot = self
.block_store
.get_latest_block_info(commitment_config)
.await
.1
.slot;
loop {
let (
_,
BlockInformation {
slot: latest_slot,
block_height: _,
},
) = self
.block_store
.get_latest_block_info(commitment_config)
.await;
info!("{commitment_config:?} {slot}");
let block_slots = self
let mut new_block_slots = self
.rpc_client
.get_blocks_with_commitment(latest_slot, None, commitment_config)
.get_blocks_with_commitment(slot, None, commitment_config)
.await?;
let block_future_handlers = block_slots.into_iter().map(|slot| {
let this = self.clone();
let postgres = postgres.clone();
if new_block_slots.is_empty() {
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
println!("no slots");
tokio::spawn(async move {
if let Err(err) = this
.index_slot(slot, commitment_config, postgres.clone())
.await
{
warn!(
"Error while indexing {commitment_config:?} block with slot {slot} {err}"
);
};
})
});
continue;
}
let _ = try_join_all(block_future_handlers).await;
info!("Received new slots");
let Some(latest_slot) = new_block_slots.pop() else {
warn!("Didn't receive any block slots for {slot}");
continue;
};
slot = latest_slot;
latest_slot_send.send(latest_slot).await?;
for slot in new_block_slots {
send.send_async(slot).await?;
}
}
})
}

24
tests/blockhash.rs Normal file
View File

@ -0,0 +1,24 @@
use lite_rpc::DEFAULT_LITE_RPC_ADDR;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
#[tokio::test]
async fn blockhash() -> anyhow::Result<()> {
let lite_rpc = RpcClient::new(DEFAULT_LITE_RPC_ADDR.to_string());
let mut prev_blockhash = lite_rpc.get_latest_blockhash().await.unwrap();
for _ in 0..5 {
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
let blockhash = lite_rpc.get_latest_blockhash().await.unwrap();
if prev_blockhash != blockhash {
prev_blockhash = blockhash;
} else {
panic!("Blockhash didn't change in appx 500ms");
}
}
Ok(())
}

View File

@ -1,50 +1,37 @@
use std::sync::Arc;
use bench::helpers::BenchHelper;
use lite_rpc::DEFAULT_LITE_RPC_ADDR;
use log::info;
use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction};
use solana_sdk::commitment_config::CommitmentConfig;
const AMOUNT: usize = 5;
#[tokio::test]
async fn send_and_confirm_txs_get_signature_statuses() {
tracing_subscriber::fmt::init();
let rpc_client = Arc::new(RpcClient::new(DEFAULT_LITE_RPC_ADDR.to_string()));
let rpc_client = RpcClient::new(DEFAULT_LITE_RPC_ADDR.to_string());
let funded_payer = BenchHelper::get_payer().await.unwrap();
let blockhash = rpc_client.get_latest_blockhash().await.unwrap();
let txs = BenchHelper::generate_txs(AMOUNT, &funded_payer, blockhash);
let tx = &BenchHelper::generate_txs(1, &funded_payer, blockhash)[0];
let sig = tx.get_signature();
info!("Sending and Confirming {AMOUNT} tx(s)");
rpc_client.send_transaction(tx).await.unwrap();
info!("{sig}");
for tx in &txs {
rpc_client.send_transaction(tx).await.unwrap();
info!("Tx {}", tx.get_signature());
}
for tx in &txs {
let sig = tx.get_signature();
info!("Confirming {sig}");
BenchHelper::wait_till_signature_status(&rpc_client, sig, CommitmentConfig::confirmed())
.await
.unwrap();
}
info!("Sent and Confirmed {AMOUNT} tx(s)");
BenchHelper::wait_till_signature_status(&rpc_client, sig, CommitmentConfig::confirmed())
.await
.unwrap();
}
#[tokio::test]
async fn send_and_confirm_tx_rpc_client() {
let rpc_client = RpcClient::new(DEFAULT_LITE_RPC_ADDR.to_string());
let funded_payer = BenchHelper::get_payer().await.unwrap();
let blockhash = rpc_client.get_latest_blockhash().await.unwrap();
let tx = &BenchHelper::generate_txs(1, &funded_payer, blockhash)[0];
let sig = tx.get_signature();
rpc_client.send_and_confirm_transaction(tx).await.unwrap();

View File

@ -1,27 +1,31 @@
import { Connection, Keypair, LAMPORTS_PER_SOL, SystemProgram, sendAndConfirmTransaction, Transaction, PublicKey } from "@solana/web3.js";
import { Connection, Keypair, LAMPORTS_PER_SOL, SystemProgram, sendAndConfirmTransaction, Transaction, PublicKey, TransactionInstruction, Signer } from "@solana/web3.js";
import * as fs from "fs";
import * as os from "os";
jest.setTimeout(60000);
test('send and confirm transaction', async () => {
const connection = new Connection('http://127.0.0.1:8890', 'confirmed');
const payer = Keypair.generate();
const toAccount = Keypair.generate().publicKey;
const MEMO_PROGRAM_ID = new PublicKey("MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr");
const airdropSignature = await connection.requestAirdrop(payer.publicKey, LAMPORTS_PER_SOL * 2);
console.log('airdrop signature ' + airdropSignature);
await connection.confirmTransaction(airdropSignature, 'finalized');
console.log('confirmed');
test('send and confirm transaction', async () => {
const connection = new Connection('http://0.0.0.0:8890', 'confirmed');
const keypair_file = fs.readFileSync(`${os.homedir}/.config/solana/id.json`, 'utf-8');
const keypair_array = Uint8Array.from(JSON.parse(keypair_file));
const payer = Keypair.fromSecretKey(keypair_array);
const transaction = new Transaction();
// Add an instruction to execute
transaction.add(
SystemProgram.transfer({
fromPubkey: payer.publicKey,
toPubkey: toAccount,
lamports: LAMPORTS_PER_SOL,
}),
new TransactionInstruction({
programId: MEMO_PROGRAM_ID,
keys: [],
data: Buffer.from("Hello")
})
);
const sig = connection.sendTransaction(transaction, [payer]);
console.log(`https://explorer.solana.com/tx/${sig}`)
await sendAndConfirmTransaction(connection, transaction, [payer]);
});

20
tests/memo.rs Normal file
View File

@ -0,0 +1,20 @@
use bench::helpers::BenchHelper;
use lite_rpc::DEFAULT_LITE_RPC_ADDR;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
#[tokio::test]
async fn memo() -> anyhow::Result<()> {
let lite_rpc = RpcClient::new(DEFAULT_LITE_RPC_ADDR.to_string());
let payer = BenchHelper::get_payer().await.unwrap();
let blockhash = lite_rpc.get_latest_blockhash().await.unwrap();
let memo_tx = BenchHelper::create_memo_tx(b"hi", &payer, blockhash);
let memo_sig = lite_rpc.send_transaction(&memo_tx).await.unwrap();
lite_rpc.confirm_transaction(&memo_sig).await.unwrap();
println!("{memo_sig}");
Ok(())
}

View File

@ -4,10 +4,14 @@ use solana_rpc_client::nonblocking::rpc_client::RpcClient;
#[tokio::test]
async fn part_send_n_confirm() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
log::info!("rpc {DEFAULT_RPC_ADDR} lite {DEFAULT_LITE_RPC_ADDR}");
let rpc_client = RpcClient::new(DEFAULT_RPC_ADDR.to_string());
let lite_rpc_client = RpcClient::new(DEFAULT_LITE_RPC_ADDR.to_string());
send_and_confirm_memo(&rpc_client, &lite_rpc_client).await?;
send_and_confirm_memo(&lite_rpc_client, &lite_rpc_client).await?;
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
send_and_confirm_memo(&lite_rpc_client, &rpc_client).await?;
Ok(())