Merge pull request #37 from blockworks-foundation/lite_main_merge

Lite merge with Pan's changes
This commit is contained in:
Aniket Prajapati 2023-01-19 22:50:43 +05:30 committed by GitHub
commit f1b39479f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
52 changed files with 3040 additions and 12877 deletions

5
.dockerignore Normal file
View File

@ -0,0 +1,5 @@
/target
/node_modules
/test-ledger
/config
**/validator.log

21
.github/workflows/fly-deploy.yml vendored Normal file
View File

@ -0,0 +1,21 @@
name: Deploy to Fly
on:
push:
branches: [main]
env:
FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }}
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup Fly
uses: superfly/flyctl-actions/setup-flyctl@master
- name: Deploy
run: flyctl deploy

8
.gitignore vendored
View File

@ -1,5 +1,3 @@
/target
/node_modules
/test-ledger
/config
**/validator.log
target
node_modules
bench/metrics.csv

3401
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -2,70 +2,35 @@
name = "lite-rpc"
version = "0.1.0"
edition = "2021"
description = "A lite version of solana rpc to send and confirm transactions"
[workspace]
members = [
"lite-client",
"lite-bench-utils"
"bench"
]
[[bench]]
name="tps"
harness=false
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
[dev-dependencies]
csv = "1.1.6"
serde = { version = "1", features = ["derive"]}
lite-client = { path ="./lite-client" }
lite-bench-utils = { path = "./lite-bench-utils" }
log = "0.4.17"
simplelog = "0.12.0"
bench = { path = "./bench" }
[dependencies]
solana-client = { git = "https://github.com/solana-labs/solana.git" }
solana-sdk = { git = "https://github.com/solana-labs/solana.git" }
solana-clap-utils = { git = "https://github.com/solana-labs/solana.git" }
solana-cli-config = { git = "https://github.com/solana-labs/solana.git" }
solana-pubsub-client = { git = "https://github.com/solana-labs/solana.git" }
solana-rpc-client-api = { git = "https://github.com/solana-labs/solana.git" }
solana-runtime = { git = "https://github.com/solana-labs/solana.git" }
solana-send-transaction-service = { git = "https://github.com/solana-labs/solana.git" }
solana-tpu-client = { git = "https://github.com/solana-labs/solana.git" }
solana-transaction-status = { git = "https://github.com/solana-labs/solana.git" }
solana-version = { git = "https://github.com/solana-labs/solana.git" }
solana-rpc = { git = "https://github.com/solana-labs/solana.git" }
solana-perf = { git = "https://github.com/solana-labs/solana.git" }
tokio = { version = "1.14.1", features = ["full"]}
tokio-util = { version = "0.6", features = ["codec", "compat"] }
futures = "0.3.25"
jsonrpc-core = "18.0.0"
jsonrpc-core-client = { version = "18.0.0" }
jsonrpc-derive = "18.0.0"
jsonrpc-http-server = "18.0.0"
jsonrpc-pubsub = "18.0.0"
clap = { version = "4.0.29", features = ["derive"] }
procinfo = "0.4.2"
base64 = "0.13.1"
solana-client = "1.14.12"
solana-sdk = "1.14.12"
solana-transaction-status = "1.14.12"
solana-version = "1.14.12"
serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.91"
tokio = { version = "1.24.2", features = ["full"]}
bincode = "1.3.3"
bs58 = "0.4.0"
crossbeam-channel = "0.5.6"
dashmap = "5.4.0"
itertools = "0.10.5"
libc = "0.2.138"
base64 = "0.21.0"
thiserror = "1.0.38"
futures = "0.3.25"
bytes = "1.3.0"
anyhow = "1.0.68"
log = "0.4.17"
rayon = "1.6.1"
regex = "1.7.0"
serde = "1.0.149"
serde_derive = "1.0.149"
serde_json = "1.0.89"
soketto = "0.7.1"
spl-token = { version = "=3.5.0", features = ["no-entrypoint"] }
spl-token-2022 = { version = "0.5.0", features = ["no-entrypoint"] }
stream-cancel = "0.8.1"
thiserror = "1.0.37"
chrono = "0.4.23"
clap = { version = "4.1.1", features = ["derive"] }
dashmap = "5.4.0"
const_env = "0.1.2"
jsonrpsee = { version = "0.16.2", features = ["macros", "full"] }
tracing-subscriber = "0.3.16"
procinfo = "0.4.2"

23
Dockerfile Normal file
View File

@ -0,0 +1,23 @@
# syntax = docker/dockerfile:1.2
FROM rust:1.65.0 as base
RUN cargo install cargo-chef
RUN rustup component add rustfmt
RUN apt-get update && apt-get install -y clang cmake ssh
WORKDIR /app
FROM base AS plan
COPY . .
WORKDIR /app
RUN cargo chef prepare --recipe-path recipe.json
FROM base as build
COPY --from=plan /app/recipe.json recipe.json
RUN cargo chef cook --release --recipe-path recipe.json
COPY . .
RUN cargo build --release --bin lite-rpc
FROM debian:bullseye-slim as run
RUN apt-get update && apt-get -y install ca-certificates libc6
COPY --from=build /app/target/release/lite-rpc /usr/local/bin/
CMD lite-rpc --rpc-addr "$RPC_URL" --websocket-addr "$WS_URL"

View File

@ -1,32 +1,56 @@
# Solana Lite RPC
# Lite RPC For Solana Blockchain
This project aims to create a lite rpc server which is responsible only for sending and confirming the transactions.
The lite-rpc server will not have any ledger or banks.
While sending transaction the lite rpc server will send the transaction to next few leader (FANOUT) and then use different strategies to confirm the transaction.
The rpc server will also have a websocket port which is reponsible for just subscribing to slots and signatures.
The lite rpc server will be optimized to confirm transactions which are forwarded to leader using the same server.
This project is currently based on an unstable feature of block subscription of solana which can be enabled using `--rpc-pubsub-enable-block-subscription` while running rpc node.
Submitting a [transaction](https://docs.solana.com/terminology#transaction) to be executed on the solana blockchain,
requires the client to identify the next few leaders based on the
[leader schedule](https://docs.solana.com/terminology#leader-schedule), look up their peering information in gossip and
connect to them via the [quic protocol](https://en.wikipedia.org/wiki/QUIC). In order to simplify the
process so it can be triggered from a web browser, most applications
run full [validators](https://docs.solana.com/terminology#validator) that forward the transactions according to the
protocol on behalf of the web browser. Running full solana [validators](https://docs.solana.com/terminology#validator)
is incredibly resource intensive `(>256GB RAM)`, the goal of this
project would be to create a specialized micro-service that allows
to deploy this logic quickly and allows [horizontal scalability](https://en.wikipedia.org/wiki/Scalability) with
commodity vms.
### Confirmation strategies
1) Subscribing to blocks changes and checking the confirmations. (Under development)
1) Subscribe to new blocks using [blockSubscribe](https://docs.solana.com/developing/clients/jsonrpc-api#blocksubscribe---unstable-disabled-by-default)
2) Subscribing to signatures with pool of rpc servers. (Under development)
3) Listining to gossip protocol. (Future roadmap)
3) Listening to gossip protocol. (Future roadmap)
## Build
`cargo build`
## Executing
## Run
* For RPC node : `http://localhost:8899`,
* Websocket : `http://localhost:8900` (Optional),
* Port : `9000` Listening port for LiteRpc server,
* Subscription Port : `9001` Listening port of websocket subscriptions for LiteRpc server,
*make sure `solana-validator` is running in the background with `--rpc-pubsub-enable-block-subscription`*
```
cargo run --bin lite-rpc -- run --port 9000 --subscription-port 9001 --rpc-url http://localhost:8899
*run using*
```bash
$ cargo run --release
```
## Tests
*to know about command line options*
```bash
$ cargo run --release -- --help
```
cargo run --bin lite-rpc -- test
```
## Test and Bench
*Make sure both `solana-validator` and `lite-rpc` is running*
*test*
```bash
$ cargo test
```
*bench*
```bash
$ cd bench and cargo run --release
```
Find a new file named `metrics.csv` in the project root.
## License & Copyright
Copyright (c) 2022 Blockworks Foundation
Licensed under the **[MIT LICENSE](LICENSE)**

17
bench/Cargo.toml Normal file
View File

@ -0,0 +1,17 @@
[package]
name = "bench"
version = "0.1.0"
edition = "2021"
[dependencies]
solana-client = "1.14.12"
solana-sdk = "1.14.12"
log = "0.4.17"
anyhow = "1.0.68"
serde = "1.0.152"
serde_json = "1.0.91"
csv = "1.1.6"
clap = { version = "4.1.1", features = ["derive"] }
tokio = { version = "1.24.2", features = ["full", "fs"]}
tracing-subscriber = "0.3.16"
dirs = "4.0.0"

3
bench/metrics.csv Normal file
View File

@ -0,0 +1,3 @@
total_time_elapsed_sec,txs_sent,time_to_send_txs,txs_confirmed,txs_un_confirmed,tps
8.96211512,20000,6.054988221,14321,5679,1597.9486770975554
8.96211512,20000,6.054988221,14321,5679,1597.9486770975554
1 total_time_elapsed_sec txs_sent time_to_send_txs txs_confirmed txs_un_confirmed tps
2 8.96211512 20000 6.054988221 14321 5679 1597.9486770975554
3 8.96211512 20000 6.054988221 14321 5679 1597.9486770975554

21
bench/src/cli.rs Normal file
View File

@ -0,0 +1,21 @@
use clap::{command, Parser};
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
pub struct Args {
/// Number of tx(s) sent in each run
#[arg(short = 't', long, default_value_t = 20_000)]
pub tx_count: usize,
/// Number of bench runs
#[arg(short = 'r', long, default_value_t = 1)]
pub runs: usize,
/// Interval between each bench run (ms)
#[arg(short = 'i', long, default_value_t = 1000)]
pub run_interval_ms: u64,
/// Metrics output file name
#[arg(short = 'm', long, default_value_t = String::from("metrics.csv"))]
pub metrics_file_name: String,
/// Lite Rpc Address
#[arg(short = 'l', long, default_value_t = String::from("http://127.0.0.1:8890"))]
pub lite_rpc_addr: String,
}

93
bench/src/helpers.rs Normal file
View File

@ -0,0 +1,93 @@
use std::{ops::Deref, sync::Arc};
use anyhow::Context;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{
commitment_config::CommitmentConfig,
hash::Hash,
message::Message,
pubkey::Pubkey,
signature::{Keypair, Signature},
signer::Signer,
system_instruction,
transaction::Transaction,
};
#[derive(Clone)]
pub struct BenchHelper {
pub rpc_client: Arc<RpcClient>,
}
impl Deref for BenchHelper {
type Target = RpcClient;
fn deref(&self) -> &Self::Target {
&self.rpc_client
}
}
impl BenchHelper {
pub fn new(rpc_client: Arc<RpcClient>) -> Self {
Self { rpc_client }
}
pub async fn get_payer(&self) -> anyhow::Result<Keypair> {
let mut config_dir = dirs::config_dir().context("Unable to get path to user config dir")?;
config_dir.push("solana");
config_dir.push("id.json");
let payer = tokio::fs::read_to_string(config_dir.to_str().unwrap())
.await
.context("Error reading payer file")?;
let payer: Vec<u8> = serde_json::from_str(&payer)?;
let payer = Keypair::from_bytes(&payer)?;
Ok(payer)
}
pub async fn wait_till_signature_status(
&self,
sig: &Signature,
commitment_config: CommitmentConfig,
) -> anyhow::Result<()> {
loop {
if let Some(err) = self
.rpc_client
.get_signature_status_with_commitment(sig, commitment_config)
.await?
{
err?;
return Ok(());
}
}
}
pub fn create_transaction(&self, funded_payer: &Keypair, blockhash: Hash) -> Transaction {
let to_pubkey = Pubkey::new_unique();
// transfer instruction
let instruction =
system_instruction::transfer(&funded_payer.pubkey(), &to_pubkey, 1_000_000);
let message = Message::new(&[instruction], Some(&funded_payer.pubkey()));
Transaction::new(&[funded_payer], message, blockhash)
}
pub async fn generate_txs(
&self,
num_of_txs: usize,
funded_payer: &Keypair,
) -> anyhow::Result<Vec<Transaction>> {
let mut txs = Vec::with_capacity(num_of_txs);
let blockhash = self.rpc_client.get_latest_blockhash().await?;
for _ in 0..num_of_txs {
txs.push(self.create_transaction(funded_payer, blockhash));
}
Ok(txs)
}
}

3
bench/src/lib.rs Normal file
View File

@ -0,0 +1,3 @@
pub mod cli;
pub mod helpers;
pub mod metrics;

140
bench/src/main.rs Normal file
View File

@ -0,0 +1,140 @@
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
};
use bench::{
cli::Args,
helpers::BenchHelper,
metrics::{AvgMetric, Metric},
};
use clap::Parser;
use log::info;
use solana_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction};
use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature};
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
let Args {
tx_count,
runs,
run_interval_ms,
metrics_file_name,
lite_rpc_addr,
} = Args::parse();
let mut run_interval_ms = tokio::time::interval(Duration::from_millis(run_interval_ms));
info!("Connecting to {lite_rpc_addr}");
let rpc_client = Arc::new(RpcClient::new_with_commitment(
lite_rpc_addr,
CommitmentConfig::confirmed(),
));
let bench_helper = BenchHelper { rpc_client };
let mut csv_writer = csv::Writer::from_path(metrics_file_name).unwrap();
let mut avg_metric = AvgMetric::default();
for run_num in 0..runs {
let metric = bench(&bench_helper, tx_count).await;
info!("Run {run_num}: Sent and Confirmed {tx_count} tx(s) in {metric:?} with",);
// update avg metric
avg_metric += &metric;
// write metric to file
csv_writer.serialize(metric).unwrap();
// wait for an interval
run_interval_ms.tick().await;
}
let avg_metric = Metric::from(avg_metric);
info!("Avg Metric {avg_metric:?}",);
csv_writer.serialize(avg_metric).unwrap();
csv_writer.flush().unwrap();
}
async fn bench(bench_helper: &BenchHelper, tx_count: usize) -> Metric {
let funded_payer = bench_helper.get_payer().await.unwrap();
let txs = bench_helper
.generate_txs(tx_count, &funded_payer)
.await
.unwrap();
let mut un_confirmed_txs: HashMap<Signature, Option<Instant>> =
HashMap::with_capacity(txs.len());
for tx in &txs {
un_confirmed_txs.insert(*tx.get_signature(), None);
}
let start_time = Instant::now();
info!("Sending and Confirming {tx_count} tx(s)",);
let lite_client = bench_helper.rpc_client.clone();
let send_fut = {
let lite_client = lite_client.clone();
tokio::spawn(async move {
for tx in txs {
lite_client.send_transaction(&tx).await.unwrap();
info!("Tx {}", &tx.signatures[0]);
}
info!("Sent {tx_count} tx(s)");
start_time.elapsed()
})
};
let confirm_fut = tokio::spawn(async move {
let mut metrics = Metric::default();
while !un_confirmed_txs.is_empty() {
let mut to_remove_txs = Vec::new();
for (sig, time_elapsed_since_last_confirmed) in un_confirmed_txs.iter_mut() {
let sig = *sig;
if time_elapsed_since_last_confirmed.is_none() {
*time_elapsed_since_last_confirmed = Some(Instant::now())
}
if lite_client.confirm_transaction(&sig).await.unwrap() {
metrics.txs_confirmed += 1;
to_remove_txs.push(sig);
} else if time_elapsed_since_last_confirmed.unwrap().elapsed()
> Duration::from_secs(3)
{
metrics.txs_un_confirmed += 1;
to_remove_txs.push(sig);
}
}
for to_remove_tx in to_remove_txs {
un_confirmed_txs.remove(&to_remove_tx);
}
}
metrics.total_time_elapsed_sec = start_time.elapsed().as_secs_f64();
metrics.txs_sent = tx_count as u64;
metrics
});
let (send_fut, confirm_fut) = tokio::join!(send_fut, confirm_fut);
let time_to_send_txs = send_fut.unwrap();
let mut metrics = confirm_fut.unwrap();
metrics.time_to_send_txs = time_to_send_txs.as_secs_f64();
metrics.calc_tps();
metrics
}

View File

@ -2,8 +2,9 @@ use std::ops::{AddAssign, DivAssign};
#[derive(Debug, Default, serde::Serialize)]
pub struct Metric {
pub time_elapsed_sec: f64,
pub total_time_elapsed_sec: f64,
pub txs_sent: u64,
pub time_to_send_txs: f64,
pub txs_confirmed: u64,
pub txs_un_confirmed: u64,
pub tps: f64,
@ -17,14 +18,15 @@ pub struct AvgMetric {
impl Metric {
pub fn calc_tps(&mut self) {
self.tps = self.txs_confirmed as f64 / self.time_elapsed_sec
self.tps = self.txs_confirmed as f64 / self.total_time_elapsed_sec
}
}
impl AddAssign<&Self> for Metric {
fn add_assign(&mut self, rhs: &Self) {
self.time_elapsed_sec += rhs.time_elapsed_sec;
self.total_time_elapsed_sec += rhs.total_time_elapsed_sec;
self.txs_sent += rhs.txs_sent;
self.time_to_send_txs += rhs.time_to_send_txs;
self.txs_confirmed += rhs.txs_confirmed;
self.txs_un_confirmed += rhs.txs_un_confirmed;
self.tps += rhs.tps
@ -33,8 +35,9 @@ impl AddAssign<&Self> for Metric {
impl DivAssign<u64> for Metric {
fn div_assign(&mut self, rhs: u64) {
self.time_elapsed_sec /= rhs as f64;
self.total_time_elapsed_sec /= rhs as f64;
self.txs_sent /= rhs;
self.time_to_send_txs /= rhs as f64;
self.txs_confirmed /= rhs;
self.txs_un_confirmed /= rhs;
self.tps /= rhs as f64;

View File

@ -4,21 +4,21 @@ import * as splToken from "@solana/spl-token";
import * as os from 'os';
// number of users
const tps : number = +process.argv[2];
const forSeconds : number = +process.argv[3];
const tps: number = +process.argv[2];
const forSeconds: number = +process.argv[3];
// url
const url = process.argv.length > 4 ? process.argv[4] : "http://localhost:8899";
const skip_confirmations = process.argv.length > 5 ? process.argv[5] === "true": false;
const skip_confirmations = process.argv.length > 5 ? process.argv[5] === "true" : false;
import * as InFile from "./out.json";
function sleep(ms: number) {
return new Promise( resolve => setTimeout(resolve, ms) );
return new Promise(resolve => setTimeout(resolve, ms));
}
console.log("benching " + tps + " transactions per second on " + url + " for " + forSeconds + " seconds");
function delay(ms: number) {
return new Promise( resolve => setTimeout(resolve, ms) );
return new Promise(resolve => setTimeout(resolve, ms));
}
export async function main() {
@ -26,45 +26,41 @@ export async function main() {
const connection = new Connection(url, 'confirmed');
const authority = Keypair.fromSecretKey(
Uint8Array.from(
JSON.parse(
process.env.KEYPAIR ||
JSON.parse(
process.env.KEYPAIR ||
fs.readFileSync(os.homedir() + '/.config/solana/id.json', 'utf-8'),
),
),
),
);
);
const users = InFile.users.map(x => Keypair.fromSecretKey(Uint8Array.from(x.secretKey)));
const userAccounts = InFile.tokenAccounts.map(x => new PublicKey(x));
let signatures_to_unpack : TransactionSignature[][] = [];
let signatures_to_unpack: TransactionSignature[][] = [];
let time_taken_to_send = [];
for (let i = 0; i<forSeconds; ++i)
{
for (let i = 0; i < forSeconds; ++i) {
const start = performance.now();
let signatures : TransactionSignature[] = [];
let signatures: TransactionSignature[] = [];
const blockhash = (await connection.getLatestBlockhash()).blockhash;
for (let j=0; j<tps; ++j)
{
for (let j = 0; j < tps; ++j) {
const toIndex = Math.floor(Math.random() * users.length);
let fromIndex = toIndex;
while (fromIndex === toIndex)
{
while (fromIndex === toIndex) {
fromIndex = Math.floor(Math.random() * users.length);
}
const userFrom = userAccounts[fromIndex];
const userTo = userAccounts[toIndex];
if(skip_confirmations === false) {
if (skip_confirmations === false) {
const transaction = new Transaction().add(
splToken.createTransferInstruction(userFrom, userTo, users[fromIndex].publicKey, Math.ceil(Math.random() * 100))
);
transaction.recentBlockhash = blockhash;
transaction.feePayer = authority.publicKey;
const p = connection.sendTransaction(transaction, [authority, users[fromIndex]], {skipPreflight: true});
const p = connection.sendTransaction(transaction, [authority, users[fromIndex]], { skipPreflight: true });
signatures.push(await p)
}
}
if (skip_confirmations === false)
{
if (skip_confirmations === false) {
signatures_to_unpack.push(signatures)
}
const end = performance.now();
@ -78,12 +74,11 @@ export async function main() {
console.log('finish sending transactions');
await delay(5000)
console.log('checking for confirmations');
if(skip_confirmations === false) {
if (skip_confirmations === false) {
const size = signatures_to_unpack.length
let successes : Uint32Array = new Uint32Array(size).fill(0);
let failures : Uint32Array = new Uint32Array(size).fill(0);
for (let i=0; i< size; ++i)
{
let successes: Uint32Array = new Uint32Array(size).fill(0);
let failures: Uint32Array = new Uint32Array(size).fill(0);
for (let i = 0; i < size; ++i) {
const signatures = signatures_to_unpack[i];
for (const signature of signatures) {
const confirmed = await connection.getSignatureStatus(signature);
@ -93,9 +88,9 @@ export async function main() {
failures[i]++;
}
}
}
console.log("sucesses : " + successes)
console.log("sucesses : " + successes)
console.log("failures : " + failures)
//console.log("time taken to send : " + time_taken_to_send)
}
@ -105,4 +100,4 @@ main().then(x => {
console.log('finished sucessfully')
}).catch(e => {
console.log('caught an error : ' + e)
})
})

View File

@ -1,37 +1,43 @@
import { Connection, Keypair, LAMPORTS_PER_SOL, PublicKey } from '@solana/web3.js';
import { Connection, Keypair } from '@solana/web3.js';
import * as fs from 'fs';
import * as splToken from "@solana/spl-token";
import * as os from 'os';
// number of users
const nbUsers = +process.argv[2];
const nbUsers = process.argv[2];
// url
const url = process.argv.length > 3 ? process.argv[3] : "http://0.0.0.0:8899";
// outfile
const outFile = process.argv.length > 4 ? process.argv[4] : "out.json";
console.log("creating " + nbUsers + " Users on " + url + " out file " + outFile);
export async function main() {
(async () => {
const connection = new Connection(url, 'confirmed');
let authority = Keypair.fromSecretKey(
Uint8Array.from(
JSON.parse(
process.env.KEYPAIR ||
fs.readFileSync(os.homedir() + '/.config/solana/id.json', 'utf-8'),
),
),
);
let userKps = [...Array(nbUsers)].map(_x => Keypair.generate())
let mint = await splToken.createMint(
const authority = Keypair.fromSecretKey(
Uint8Array.from(
JSON.parse(
process.env.KEYPAIR ||
fs.readFileSync(os.homedir() + '/.config/solana/id.json', 'utf-8'),
),
),
);
// create n key pairs
const userKps = [...Array(nbUsers)].map(_x => Keypair.generate())
// create and initialize new mint
const mint = await splToken.createMint(
connection,
authority,
authority.publicKey,
null,
6,
);
let accounts = await Promise.all( userKps.map(x => {
// create accounts for each key pair created earlier
const accounts = await Promise.all(userKps.map(x => {
return splToken.createAccount(
connection,
authority,
@ -40,37 +46,34 @@ export async function main() {
)
}));
let res = await Promise.all( accounts.map(x=> {
// mint to accounts
await Promise.all(accounts.map(to => {
return splToken.mintTo(
connection,
authority,
mint,
x,
to,
authority,
1_000_000_000_000,
)
}));
const users = userKps.map(x => {
const info = {};
info['publicKey'] = x.publicKey.toBase58();
info['secretKey'] = Array.from(x.secretKey);
return info;
const users = userKps.map(user => {
return {
publicKey: user.publicKey.toBase58(),
secretKey: Array.from(user.secretKey)
}
});
const data = {
'users' : users,
'tokenAccounts' : accounts,
'mint' : mint,
'minted_amount' : 1_000_000_000_000
'users': users,
'tokenAccounts': accounts,
'mint': mint,
'minted_amount': 1_000_000_000_000
};
console.log('created ' + nbUsers + ' Users and minted 10^12 tokens for mint ' + mint);
fs.writeFileSync(outFile, JSON.stringify(data));
}
main().then(x => {
console.log('finished sucessfully')
}).catch(e => {
console.log('caught an error : ' + e)
})
fs.writeFileSync(outFile, JSON.stringify(data));
})()

View File

@ -1,123 +0,0 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use lite_bench_utils::{
generate_txs,
metrics::{AvgMetric, Metric},
new_funded_payer,
};
use log::info;
use solana_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction};
use solana_sdk::native_token::LAMPORTS_PER_SOL;
use lite_client::{LiteClient, LOCAL_LIGHT_RPC_ADDR};
use simplelog::*;
use tokio::sync::mpsc;
const NUM_OF_TXS: usize = 10_000;
const NUM_OF_RUNS: usize = 3;
const CSV_FILE_NAME: &str = "metrics.csv";
#[tokio::main]
async fn main() {
TermLogger::init(
LevelFilter::Info,
Config::default(),
TerminalMode::Mixed,
ColorChoice::Auto,
)
.unwrap();
let lite_client = Arc::new(LiteClient(RpcClient::new(LOCAL_LIGHT_RPC_ADDR.to_string())));
let mut csv_writer = csv::Writer::from_path(CSV_FILE_NAME).unwrap();
let mut avg_metric = AvgMetric::default();
for run_num in 0..NUM_OF_RUNS {
let metric = foo(lite_client.clone()).await;
info!("Run {run_num}: Sent and Confirmed {NUM_OF_TXS} tx(s) in {metric:?}",);
avg_metric += &metric;
csv_writer.serialize(metric).unwrap();
}
let avg_metric = Metric::from(avg_metric);
info!("Avg Metric {avg_metric:?}",);
csv_writer.serialize(avg_metric).unwrap();
csv_writer.flush().unwrap();
}
async fn foo(lite_client: Arc<LiteClient>) -> Metric {
let funded_payer = new_funded_payer(&lite_client, LAMPORTS_PER_SOL * 2000)
.await
.unwrap();
let txs = generate_txs(NUM_OF_TXS, &lite_client.0, &funded_payer)
.await
.unwrap();
let mut un_confirmed_txs: HashMap<String, Option<Instant>> = HashMap::with_capacity(txs.len());
for tx in &txs {
un_confirmed_txs.insert(tx.get_signature().to_string(), None);
}
let start_time = Instant::now();
info!("Sending and Confirming {NUM_OF_TXS} tx(s)");
let send_fut = {
let lite_client = lite_client.clone();
tokio::spawn(async move {
for tx in txs {
lite_client.send_transaction(&tx).await.unwrap();
info!("Tx {}", &tx.signatures[0]);
}
info!("Sent {NUM_OF_TXS} tx(s)");
})
};
let (metrics_send, mut metrics_recv) = mpsc::channel(1);
let confirm_fut = tokio::spawn(async move {
let mut metrics = Metric::default();
while !un_confirmed_txs.is_empty() {
let mut to_remove_txs = Vec::new();
for (sig, time_elapsed_since_last_confirmed) in un_confirmed_txs.iter_mut() {
if time_elapsed_since_last_confirmed.is_none() {
*time_elapsed_since_last_confirmed = Some(Instant::now())
}
if lite_client.confirm_transaction(sig.clone()).await.value {
metrics.txs_confirmed += 1;
to_remove_txs.push(sig.clone());
} else if time_elapsed_since_last_confirmed.unwrap().elapsed()
> Duration::from_secs(3)
{
metrics.txs_un_confirmed += 1;
to_remove_txs.push(sig.clone());
}
}
for to_remove_tx in to_remove_txs {
un_confirmed_txs.remove(&to_remove_tx);
}
}
metrics.time_elapsed_sec = start_time.elapsed().as_secs_f64();
metrics.txs_sent = NUM_OF_TXS as u64;
metrics.calc_tps();
metrics_send.send(metrics).await.unwrap();
});
let (res1, res2) = tokio::join!(send_fut, confirm_fut);
res1.unwrap();
res2.unwrap();
metrics_recv.recv().await.unwrap()
}

31
fly.toml Normal file
View File

@ -0,0 +1,31 @@
app = "solana-lite-rpc"
kill_signal = "SIGINT"
kill_timeout = 5
[env]
PORT_HTTP = "8890"
PORT_WS = "8891"
[[services]]
internal_port = 9000
processes = ["app"]
protocol = "tcp"
[services.concurrency]
hard_limit = 1024
soft_limit = 1024
type = "connections"
[[services]]
internal_port = 9001
processes = ["app"]
protocol = "tcp"
[services.concurrency]
hard_limit = 1024
soft_limit = 1024
type = "connections"
# [metrics]
# path = "/metrics"
# port = 9091

View File

@ -1,8 +1,5 @@
/** @type {import('ts-jest').JestConfigWithTsJest} */
module.exports = {
preset: 'ts-jest',
testEnvironment: 'node',
transform: {
'^.+\\.ts?$': 'ts-jest',
},
transformIgnorePatterns: ['<rootDir>/node_modules/'],
preset: 'ts-jest',
testEnvironment: 'node',
};

View File

@ -1,14 +0,0 @@
[package]
name = "lite-bench-utils"
version = "0.1.0"
edition = "2021"
[dependencies]
lite-client = { path ="../lite-client" }
serde = { version = "1.0.149", features = ["derive"] }
serde_json = "1.0.89"
solana-client = { git = "https://github.com/solana-labs/solana.git" }
solana-sdk = { git = "https://github.com/solana-labs/solana.git" }
tokio = "1.14.1"
log = "0.4.17"
anyhow = "1.0.66"

View File

@ -1,8 +0,0 @@
test:
cargo test send_and_confirm_tx -- --nocapture
benchmark:
cargo bench
clean:
cargo clean

View File

@ -1,74 +0,0 @@
pub mod metrics;
use std::thread;
use std::time::Duration;
use lite_client::LiteClient;
use log::info;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::hash::Hash;
use solana_sdk::signature::Signature;
use solana_sdk::{
message::Message, pubkey::Pubkey, signature::Keypair, signer::Signer, system_instruction,
transaction::Transaction,
};
pub async fn new_funded_payer(lite_client: &LiteClient, amount: u64) -> anyhow::Result<Keypair> {
let payer = Keypair::new();
let payer_pubkey = payer.pubkey().to_string();
// request airdrop to payer
let airdrop_sig = lite_client.request_airdrop(&payer.pubkey(), amount).await?;
info!("Air Dropping {payer_pubkey} with {amount}L");
thread::sleep(Duration::from_secs(12));
//loop {
// if let Some(res) = lite_client
// .get_signature_status_with_commitment(&airdrop_sig, CommitmentConfig::finalized())
// .await?
// {
// match res {
// Ok(_) => break,
// Err(_) => bail!("Error air dropping {payer_pubkey}"),
// }
// }
//}
info!("Air Drop Successful: {airdrop_sig}");
Ok(payer)
}
pub async fn wait_till_confirmed(lite_client: &LiteClient, sig: &Signature) {
while lite_client.confirm_transaction(sig.to_string()).await.value {}
}
pub fn create_transaction(funded_payer: &Keypair, blockhash: Hash) -> Transaction {
let to_pubkey = Pubkey::new_unique();
// transfer instruction
let instruction = system_instruction::transfer(&funded_payer.pubkey(), &to_pubkey, 1_000_000);
let message = Message::new(&[instruction], Some(&funded_payer.pubkey()));
Transaction::new(&[funded_payer], message, blockhash)
}
pub async fn generate_txs(
num_of_txs: usize,
rpc_client: &RpcClient,
funded_payer: &Keypair,
) -> anyhow::Result<Vec<Transaction>> {
let mut txs = Vec::with_capacity(num_of_txs);
let blockhash = rpc_client.get_latest_blockhash().await?;
for _ in 0..num_of_txs {
txs.push(create_transaction(funded_payer, blockhash));
}
Ok(txs)
}

View File

@ -1,8 +0,0 @@
[package]
name = "lite-client"
version = "0.1.0"
edition = "2021"
[dependencies]
serde_json = "1.0.89"
solana-client = { git = "https://github.com/solana-labs/solana.git" }

View File

@ -1,37 +0,0 @@
use std::ops::{Deref, DerefMut};
use solana_client::{
nonblocking::rpc_client::RpcClient, rpc_request::RpcRequest,
rpc_response::Response as RpcResponse,
};
pub const LOCAL_LIGHT_RPC_ADDR: &str = "http://0.0.0.0:8890";
pub struct LiteClient(pub RpcClient);
impl Deref for LiteClient {
type Target = RpcClient;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for LiteClient {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl LiteClient {
pub async fn confirm_transaction(&self, signature: String) -> RpcResponse<bool> {
self.send(
RpcRequest::Custom {
method: "confirmTransaction",
},
serde_json::json!([signature]),
)
.await
.unwrap()
}
}

View File

@ -1,5 +0,0 @@
time_elapsed_sec,txs_sent,txs_confirmed,txs_un_confirmed,tps
3.72111197,10000,9836,164,2643.2958963070387
3.808483126,10000,9999,1,2625.4547202108306
4.130764515,10000,9999,1,2420.617288565044
3.8867865370000003,10000,9944,55,2563.122635027638
1 time_elapsed_sec txs_sent txs_confirmed txs_un_confirmed tps
2 3.72111197 10000 9836 164 2643.2958963070387
3 3.808483126 10000 9999 1 2625.4547202108306
4 4.130764515 10000 9999 1 2420.617288565044
5 3.8867865370000003 10000 9944 55 2563.122635027638

7241
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -1,27 +1,17 @@
{
"name": "lightrpc-test",
"name": "lite-rpc",
"version": "1.0.0",
"repository": "",
"author": "",
"repository": "https://github.com/blockworks-foundation/lite-rpc.git",
"license": "MIT",
"private": true,
"dependencies": {
"@solana/web3.js": "^1.62.0",
"@types/jest": "^29.2.3",
"devDependencies": {
"@solana/spl-token": "^0.3.6",
"@solana/web3.js": "^1.73.0",
"@types/jest": "^29.2.5",
"jest": "^29.3.1",
"start-server-and-test": "^1.14.0",
"ts-jest": "^29.0.3",
"@solana/spl-token": "^0.3.5",
"ts-node": "^10.9.1",
"yarn": "^1.22.19"
"typescript": "^4.9.4"
},
"scripts": {
"test": "jest --detectOpenHandles",
"test:literpc": "start-server-and-test 'target/debug/lite-rpc --port 9000 --subscription-port 9001 --rpc-url http://localhost:8899 --websocket-url ws://localhost:8900/' http://localhost:8899/health test",
"test:validator": "start-server-and-test ' ./scripts/run.sh & target/debug/lite-rpc --port 9000 --subscription-port 9001 --rpc-url http://localhost:8899 --websocket-url ws://localhost:8900/' http://localhost:8899/health test"
},
"devDependencies": {
"typescript": "^4.8.4"
"test": "jest"
}
}

View File

@ -1,123 +0,0 @@
#!/usr/bin/env bash
#
# Run a minimal Solana cluster. Ctrl-C to exit.
#
# Before running this script ensure standard Solana programs are available
# in the PATH, or that `cargo build` ran successfully
#
set -e
# Prefer possible `cargo build` binaries over PATH binaries
script_dir="$(readlink -f "$(dirname "$0")")"
if [[ "$script_dir" =~ /scripts$ ]]; then
cd "$script_dir/.."
else
cd "$script_dir"
fi
profile=debug
if [[ -n $NDEBUG ]]; then
profile=release
fi
PATH=$PWD/target/$profile:$PATH
ok=true
for program in solana-{faucet,genesis,keygen,validator}; do
$program -V || ok=false
done
$ok || {
echo
echo "Unable to locate required programs. Try building them first with:"
echo
echo " $ cargo build --all"
echo
exit 1
}
export RUST_LOG=${RUST_LOG:-solana=info,solana_runtime::message_processor=debug} # if RUST_LOG is unset, default to info
export RUST_BACKTRACE=1
dataDir=$PWD/config/"$(basename "$0" .sh)"
ledgerDir=$PWD/config/ledger
SOLANA_RUN_SH_CLUSTER_TYPE=${SOLANA_RUN_SH_CLUSTER_TYPE:-development}
set -x
if ! solana address; then
echo Generating default keypair
solana-keygen new --no-passphrase
fi
validator_identity="$dataDir/validator-identity.json"
if [[ -e $validator_identity ]]; then
echo "Use existing validator keypair"
else
solana-keygen new --no-passphrase -so "$validator_identity"
fi
validator_vote_account="$dataDir/validator-vote-account.json"
if [[ -e $validator_vote_account ]]; then
echo "Use existing validator vote account keypair"
else
solana-keygen new --no-passphrase -so "$validator_vote_account"
fi
validator_stake_account="$dataDir/validator-stake-account.json"
if [[ -e $validator_stake_account ]]; then
echo "Use existing validator stake account keypair"
else
solana-keygen new --no-passphrase -so "$validator_stake_account"
fi
if [[ -e "$ledgerDir"/genesis.bin || -e "$ledgerDir"/genesis.tar.bz2 ]]; then
echo "Use existing genesis"
else
./fetch-spl.sh
if [[ -r spl-genesis-args.sh ]]; then
SPL_GENESIS_ARGS=$(cat spl-genesis-args.sh)
fi
# shellcheck disable=SC2086
solana-genesis \
--hashes-per-tick sleep \
--faucet-lamports 500000000000000000 \
--bootstrap-validator \
"$validator_identity" \
"$validator_vote_account" \
"$validator_stake_account" \
--ledger "$ledgerDir" \
--cluster-type "$SOLANA_RUN_SH_CLUSTER_TYPE" \
$SPL_GENESIS_ARGS \
$SOLANA_RUN_SH_GENESIS_ARGS
fi
abort() {
set +e
kill "$faucet" "$validator"
wait "$validator"
}
trap abort INT TERM EXIT
solana-faucet &
faucet=$!
args=(
--identity "$validator_identity"
--vote-account "$validator_vote_account"
--ledger "$ledgerDir"
--gossip-port 8001
--full-rpc-api
--rpc-port 8899
--log validator.log
--rpc-faucet-address 127.0.0.1:9900
--enable-rpc-transaction-history
--enable-extended-tx-metadata-storage
--init-complete-file "$dataDir"/init-completed
--snapshot-compression none
--require-tower
--no-wait-for-vote-to-start-leader
--no-os-network-limits-test
--rpc-pubsub-enable-block-subscription
)
# shellcheck disable=SC2086
solana-validator "${args[@]}" $SOLANA_RUN_SH_VALIDATOR_ARGS &
validator=$!
wait "$validator"

337
src/bridge.rs Normal file
View File

@ -0,0 +1,337 @@
use crate::{
configs::{IsBlockHashValidConfig, SendTransactionConfig},
encoding::BinaryEncoding,
rpc::LiteRpcServer,
tpu_manager::TpuManager,
workers::{BlockListener, Cleaner, Metrics, MetricsCapture, TxSender},
};
use std::{ops::Deref, str::FromStr, sync::Arc, time::Duration};
use anyhow::bail;
use log::info;
use jsonrpsee::{server::ServerBuilder, types::SubscriptionResult, SubscriptionSink};
use solana_client::{
nonblocking::{pubsub_client::PubsubClient, rpc_client::RpcClient},
rpc_client::SerializableTransaction,
rpc_config::{RpcContextConfig, RpcRequestAirdropConfig},
rpc_response::{Response as RpcResponse, RpcBlockhash, RpcResponseContext, RpcVersionInfo},
};
use solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel},
hash::Hash,
pubkey::Pubkey,
transaction::VersionedTransaction,
};
use solana_transaction_status::TransactionStatus;
use tokio::{net::ToSocketAddrs, task::JoinHandle};
/// A bridge between clients and tpu
#[derive(Clone)]
pub struct LiteBridge {
pub rpc_client: Arc<RpcClient>,
pub tpu_manager: Arc<TpuManager>,
pub tx_sender: TxSender,
pub finalized_block_listenser: BlockListener,
pub confirmed_block_listenser: BlockListener,
pub metrics_capture: MetricsCapture,
}
impl LiteBridge {
pub async fn new(rpc_url: String, ws_addr: String, fanout_slots: u64) -> anyhow::Result<Self> {
let rpc_client = Arc::new(RpcClient::new(rpc_url.clone()));
let pub_sub_client = Arc::new(PubsubClient::new(&ws_addr).await?);
let tpu_manager =
Arc::new(TpuManager::new(rpc_client.clone(), ws_addr, fanout_slots).await?);
let tx_sender = TxSender::new(tpu_manager.clone());
let finalized_block_listenser = BlockListener::new(
pub_sub_client.clone(),
rpc_client.clone(),
tx_sender.clone(),
CommitmentConfig::finalized(),
)
.await?;
let confirmed_block_listenser = BlockListener::new(
pub_sub_client,
rpc_client.clone(),
tx_sender.clone(),
CommitmentConfig::confirmed(),
)
.await?;
let metrics_capture = MetricsCapture::new(tx_sender.clone());
Ok(Self {
rpc_client,
tpu_manager,
tx_sender,
finalized_block_listenser,
confirmed_block_listenser,
metrics_capture,
})
}
pub fn get_block_listner(&self, commitment_config: CommitmentConfig) -> BlockListener {
if let CommitmentLevel::Finalized = commitment_config.commitment {
self.finalized_block_listenser.clone()
} else {
self.confirmed_block_listenser.clone()
}
}
/// List for `JsonRpc` requests
pub async fn start_services<T: ToSocketAddrs + std::fmt::Debug + 'static + Send + Clone>(
self,
http_addr: T,
ws_addr: T,
tx_batch_size: usize,
tx_send_interval: Duration,
clean_interval: Duration,
) -> anyhow::Result<[JoinHandle<anyhow::Result<()>>; 7]> {
let finalized_block_listenser = self.finalized_block_listenser.clone().listen();
let confirmed_block_listenser = self.confirmed_block_listenser.clone().listen();
let tx_sender = self
.tx_sender
.clone()
.execute(tx_batch_size, tx_send_interval);
let ws_server_handle = ServerBuilder::default()
.ws_only()
.build(ws_addr.clone())
.await?
.start(self.clone().into_rpc())?;
let http_server_handle = ServerBuilder::default()
.http_only()
.build(http_addr.clone())
.await?
.start(self.clone().into_rpc())?;
let ws_server = tokio::spawn(async move {
info!("Websocket Server started at {ws_addr:?}");
ws_server_handle.stopped().await;
bail!("Websocket server stopped");
});
let http_server = tokio::spawn(async move {
info!("HTTP Server started at {http_addr:?}");
http_server_handle.stopped().await;
bail!("HTTP server stopped");
});
let metrics_capture = self.metrics_capture.capture();
let cleaner = Cleaner::new(
self.tx_sender.clone(),
[
self.finalized_block_listenser.clone(),
self.confirmed_block_listenser.clone(),
],
)
.start(clean_interval);
Ok([
ws_server,
http_server,
tx_sender,
finalized_block_listenser,
confirmed_block_listenser,
metrics_capture,
cleaner,
])
}
}
#[jsonrpsee::core::async_trait]
impl LiteRpcServer for LiteBridge {
async fn send_transaction(
&self,
tx: String,
send_transaction_config: Option<SendTransactionConfig>,
) -> crate::rpc::Result<String> {
let SendTransactionConfig {
encoding,
max_retries: _,
} = send_transaction_config.unwrap_or_default();
let raw_tx = match encoding.decode(tx) {
Ok(raw_tx) => raw_tx,
Err(err) => {
return Err(jsonrpsee::core::Error::Custom(err.to_string()));
}
};
let tx = match bincode::deserialize::<VersionedTransaction>(&raw_tx) {
Ok(tx) => tx,
Err(err) => {
return Err(jsonrpsee::core::Error::Custom(err.to_string()));
}
};
let sig = tx.get_signature();
self.tx_sender.enqnueue_tx(sig.to_string(), raw_tx).await;
Ok(BinaryEncoding::Base58.encode(sig))
}
async fn get_latest_blockhash(
&self,
config: Option<solana_client::rpc_config::RpcContextConfig>,
) -> crate::rpc::Result<RpcResponse<solana_client::rpc_response::RpcBlockhash>> {
let commitment_config = if let Some(RpcContextConfig { commitment, .. }) = config {
commitment.unwrap_or_default()
} else {
CommitmentConfig::default()
};
let block_listner = self.get_block_listner(commitment_config);
let (blockhash, last_valid_block_height) = block_listner.get_latest_blockhash().await;
let slot = block_listner.get_slot().await;
Ok(RpcResponse {
context: RpcResponseContext {
slot,
api_version: None,
},
value: RpcBlockhash {
blockhash,
last_valid_block_height,
},
})
}
async fn is_blockhash_valid(
&self,
blockhash: String,
config: Option<IsBlockHashValidConfig>,
) -> crate::rpc::Result<RpcResponse<bool>> {
let commitment = config.unwrap_or_default().commitment.unwrap_or_default();
let commitment = CommitmentConfig { commitment };
let blockhash = match Hash::from_str(&blockhash) {
Ok(blockhash) => blockhash,
Err(err) => {
return Err(jsonrpsee::core::Error::Custom(err.to_string()));
}
};
let block_listner = self.get_block_listner(commitment);
let is_valid = match self
.rpc_client
.is_blockhash_valid(&blockhash, commitment)
.await
{
Ok(is_valid) => is_valid,
Err(err) => {
return Err(jsonrpsee::core::Error::Custom(err.to_string()));
}
};
let slot = block_listner.get_slot().await;
Ok(RpcResponse {
context: RpcResponseContext {
slot,
api_version: None,
},
value: is_valid,
})
}
async fn get_signature_statuses(
&self,
sigs: Vec<String>,
_config: Option<solana_client::rpc_config::RpcSignatureStatusConfig>,
) -> crate::rpc::Result<RpcResponse<Vec<Option<TransactionStatus>>>> {
let sig_statuses = sigs
.iter()
.map(|sig| {
self.tx_sender
.txs_sent
.get(sig)
.and_then(|v| v.status.clone())
})
.collect();
Ok(RpcResponse {
context: RpcResponseContext {
slot: self.finalized_block_listenser.get_slot().await,
api_version: None,
},
value: sig_statuses,
})
}
fn get_version(&self) -> crate::rpc::Result<RpcVersionInfo> {
let version = solana_version::Version::default();
Ok(RpcVersionInfo {
solana_core: version.to_string(),
feature_set: Some(version.feature_set),
})
}
async fn request_airdrop(
&self,
pubkey_str: String,
lamports: u64,
config: Option<RpcRequestAirdropConfig>,
) -> crate::rpc::Result<String> {
let pubkey = match Pubkey::from_str(&pubkey_str) {
Ok(pubkey) => pubkey,
Err(err) => {
return Err(jsonrpsee::core::Error::Custom(err.to_string()));
}
};
let airdrop_sig = match self
.rpc_client
.request_airdrop_with_config(&pubkey, lamports, config.unwrap_or_default())
.await
{
Ok(airdrop_sig) => airdrop_sig.to_string(),
Err(err) => {
return Err(jsonrpsee::core::Error::Custom(err.to_string()));
}
};
self.tx_sender
.txs_sent
.insert(airdrop_sig.clone(), Default::default());
Ok(airdrop_sig)
}
async fn get_metrics(&self) -> crate::rpc::Result<Metrics> {
return Ok(self.metrics_capture.get_metrics().await);
}
fn signature_subscribe(
&self,
mut sink: SubscriptionSink,
signature: String,
commitment_config: CommitmentConfig,
) -> SubscriptionResult {
sink.accept()?;
self.get_block_listner(commitment_config)
.signature_subscribe(signature, sink);
Ok(())
}
}
impl Deref for LiteBridge {
type Target = RpcClient;
fn deref(&self) -> &Self::Target {
&self.rpc_client
}
}

View File

@ -1,44 +1,30 @@
use crate::{
DEFAULT_CLEAN_INTERVAL_MS, DEFAULT_FANOUT_SIZE, DEFAULT_RPC_ADDR, DEFAULT_TX_BATCH_INTERVAL_MS,
DEFAULT_TX_BATCH_SIZE, DEFAULT_WS_ADDR,
};
use clap::Parser;
use solana_cli_config::ConfigInput;
/// Holds the configuration for a single run of the benchmark
#[derive(Parser, Debug)]
#[command(
version,
about = "A lite version of solana rpc to send and confirm transactions.",
long_about = "Lite rpc is optimized to send and confirm transactions for solana blockchain. \
When it recieves a transaction it will directly send it to next few leaders. It then adds the signature into internal map. It listen to block subscriptions for confirmed and finalized blocks. \
It also has a websocket port for subscription to onSlotChange and onSignature subscriptions. \
"
)]
#[command(author, version, about, long_about = None)]
pub struct Args {
#[arg(short, long, default_value_t = 9000)]
pub port: u16,
#[arg(short, long, default_value_t = 9001)]
pub subscription_port: u16,
#[arg(short, long, default_value_t = String::from("http://localhost:8899"))]
pub rpc_url: String,
#[arg(short, long, default_value_t = String::from("ws://localhost:8900"))]
pub websocket_url: String,
}
impl Args {
pub fn resolve_address(&mut self) {
if self.rpc_url.is_empty() {
let (_, rpc_url) = ConfigInput::compute_json_rpc_url_setting(
self.rpc_url.as_str(),
&ConfigInput::default().json_rpc_url,
);
self.rpc_url = rpc_url;
}
if self.websocket_url.is_empty() {
let (_, ws_url) = ConfigInput::compute_websocket_url_setting(
&self.websocket_url.as_str(),
"",
self.rpc_url.as_str(),
"",
);
self.websocket_url = ws_url;
}
}
#[arg(short, long, default_value_t = String::from(DEFAULT_RPC_ADDR))]
pub rpc_addr: String,
#[arg(short, long, default_value_t = String::from(DEFAULT_WS_ADDR))]
pub ws_addr: String,
#[arg(short = 'l', long, default_value_t = String::from("[::]:8890"))]
pub lite_rpc_http_addr: String,
#[arg(short = 's', long, default_value_t = String::from("[::]:8891"))]
pub lite_rpc_ws_addr: String,
/// batch size of each batch forward
#[arg(short = 'b', long, default_value_t = DEFAULT_TX_BATCH_SIZE)]
pub tx_batch_size: usize,
/// tpu fanout
#[arg(short = 'f', long, default_value_t = DEFAULT_FANOUT_SIZE) ]
pub fanout_size: u64,
/// interval between each batch forward
#[arg(short = 'i', long, default_value_t = DEFAULT_TX_BATCH_INTERVAL_MS)]
pub tx_batch_interval_ms: u64,
/// interval between clean
#[arg(short = 'c', long, default_value_t = DEFAULT_CLEAN_INTERVAL_MS)]
pub clean_interval_ms: u64,
}

23
src/configs.rs Normal file
View File

@ -0,0 +1,23 @@
use crate::encoding::BinaryEncoding;
use serde::{Deserialize, Serialize};
use solana_sdk::commitment_config::CommitmentLevel;
#[derive(Debug, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SendTransactionConfig {
// #[serde(default)]
// pub skip_preflight: bool,
// #[serde(default)]
// pub preflight_commitment: CommitmentLevel,
#[serde(default)]
pub encoding: BinaryEncoding,
pub max_retries: Option<u16>,
// pub min_context_slot: Option<Slot>,
}
#[derive(Debug, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct IsBlockHashValidConfig {
pub commitment: Option<CommitmentLevel>,
// pub minContextSlot: Option<u64>,
}

View File

@ -1,380 +0,0 @@
use crossbeam_channel::Sender;
use dashmap::DashMap;
use serde::Serialize;
use solana_client::{
rpc_client::RpcClient,
rpc_response::{ProcessedSignatureResult, RpcResponseContext, RpcSignatureResult, SlotInfo},
};
use solana_rpc::rpc_subscription_tracker::{
SignatureSubscriptionParams, SubscriptionParams,
};
use solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel},
signature::Signature,
transaction::TransactionError,
};
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
};
use tokio::sync::broadcast;
pub struct BlockInformation {
pub block_hash: RwLock<String>,
pub block_height: AtomicU64,
pub slot: AtomicU64,
pub confirmation_level: CommitmentLevel,
}
impl BlockInformation {
pub fn new(rpc_client: Arc<RpcClient>, commitment: CommitmentLevel) -> Self {
let slot = rpc_client
.get_slot_with_commitment(CommitmentConfig { commitment })
.unwrap();
let (blockhash, blockheight) = rpc_client
.get_latest_blockhash_with_commitment(CommitmentConfig { commitment })
.unwrap();
BlockInformation {
block_hash: RwLock::new(blockhash.to_string()),
block_height: AtomicU64::new(blockheight),
slot: AtomicU64::new(slot),
confirmation_level: commitment,
}
}
}
pub struct SignatureStatus {
pub status: Option<CommitmentLevel>,
pub error: Option<TransactionError>,
pub created: Instant,
}
pub struct LiteRpcContext {
pub signature_status: DashMap<String, SignatureStatus>,
pub finalized_block_info: BlockInformation,
pub confirmed_block_info: BlockInformation,
pub notification_sender: Sender<NotificationType>,
}
impl LiteRpcContext {
pub fn new(rpc_client: Arc<RpcClient>, notification_sender: Sender<NotificationType>) -> Self {
LiteRpcContext {
signature_status: DashMap::new(),
confirmed_block_info: BlockInformation::new(
rpc_client.clone(),
CommitmentLevel::Confirmed,
),
finalized_block_info: BlockInformation::new(rpc_client, CommitmentLevel::Finalized),
notification_sender,
}
}
pub fn remove_stale_data(&self, purgetime_in_seconds: u64) {
self.signature_status
.retain(|_k, v| v.created.elapsed().as_secs() < purgetime_in_seconds);
}
}
pub struct SignatureNotification {
pub signature: Signature,
pub commitment: CommitmentLevel,
pub slot: u64,
pub error: Option<String>,
}
pub struct SlotNotification {
pub slot: u64,
pub commitment: CommitmentLevel,
pub parent: u64,
pub root: u64,
}
pub enum NotificationType {
Signature(SignatureNotification),
Slot(SlotNotification),
}
type SubscriptionId = u64;
#[derive(Debug, Serialize)]
struct NotificationParams<T> {
result: T,
subscription: SubscriptionId,
}
#[derive(Debug, Serialize)]
struct Notification<T> {
jsonrpc: Option<jsonrpc_core::Version>,
method: &'static str,
params: NotificationParams<T>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct Response<T> {
pub context: RpcResponseContext,
pub value: T,
}
#[derive(Debug, Clone, PartialEq)]
struct RpcNotificationResponse<T> {
context: RpcNotificationContext,
value: T,
}
impl<T> From<RpcNotificationResponse<T>> for Response<T> {
fn from(notification: RpcNotificationResponse<T>) -> Self {
let RpcNotificationResponse {
context: RpcNotificationContext { slot },
value,
} = notification;
Self {
context: RpcResponseContext {
slot,
api_version: None,
},
value,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct RpcNotificationContext {
slot: u64,
}
#[derive(Debug, Clone)]
pub struct LiteRpcNotification {
pub subscription_id: SubscriptionId,
pub is_final: bool,
pub json: String,
pub created_at: Instant,
}
pub struct LiteRpcSubsrciptionControl {
pub broadcast_sender: broadcast::Sender<LiteRpcNotification>,
notification_reciever: crossbeam_channel::Receiver<NotificationType>,
pub subscriptions: DashMap<SubscriptionParams, SubscriptionId>,
pub last_subscription_id: AtomicU64,
}
impl LiteRpcSubsrciptionControl {
pub fn new(
broadcast_sender: broadcast::Sender<LiteRpcNotification>,
notification_reciever: crossbeam_channel::Receiver<NotificationType>,
) -> Self {
Self {
broadcast_sender,
notification_reciever,
subscriptions: DashMap::new(),
last_subscription_id: AtomicU64::new(2),
}
}
pub fn start_broadcasting(&self) {
loop {
let notification = self.notification_reciever.recv();
match notification {
Ok(notification_type) => {
let rpc_notification = match notification_type {
NotificationType::Signature(data) => {
let signature_params = SignatureSubscriptionParams {
commitment: CommitmentConfig {
commitment: data.commitment,
},
signature: data.signature,
enable_received_notification: false,
};
let param = SubscriptionParams::Signature(signature_params);
match self.subscriptions.entry(param) {
dashmap::mapref::entry::Entry::Occupied(x) => {
let subscription_id = *x.get();
let slot = data.slot;
let value = Response::from(RpcNotificationResponse {
context: RpcNotificationContext { slot },
value: RpcSignatureResult::ProcessedSignature(
ProcessedSignatureResult { err: None },
),
});
let notification = Notification {
jsonrpc: Some(jsonrpc_core::Version::V2),
method: &"signatureNotification",
params: NotificationParams {
result: value,
subscription: subscription_id,
},
};
let json = serde_json::to_string(&notification).unwrap();
let subscription_id = *x.get();
// no more notification for this signature has been finalized
if data.commitment.eq(&CommitmentLevel::Finalized) {
x.remove();
}
Some(LiteRpcNotification {
subscription_id,
created_at: Instant::now(),
is_final: false,
json,
})
}
dashmap::mapref::entry::Entry::Vacant(_x) => None,
}
}
NotificationType::Slot(data) => {
// SubscriptionId 0 will be used for slots
let subscription_id = if data.commitment == CommitmentLevel::Confirmed {
0
} else {
1
};
let value = SlotInfo {
parent: data.parent,
slot: data.slot,
root: data.root,
};
let notification = Notification {
jsonrpc: Some(jsonrpc_core::Version::V2),
method: &"slotNotification",
params: NotificationParams {
result: value,
subscription: subscription_id,
},
};
let json = serde_json::to_string(&notification).unwrap();
Some(LiteRpcNotification {
subscription_id: subscription_id,
created_at: Instant::now(),
is_final: false,
json,
})
}
};
if let Some(rpc_notification) = rpc_notification {
self.broadcast_sender.send(rpc_notification).unwrap();
}
}
Err(_e) => {
break;
}
}
}
}
}
#[derive(Clone)]
pub struct PerformanceCounter {
pub total_finalized: Arc<AtomicU64>,
pub total_confirmations: Arc<AtomicU64>,
pub total_transactions_sent: Arc<AtomicU64>,
pub transaction_sent_error: Arc<AtomicU64>,
pub total_transactions_recieved: Arc<AtomicU64>,
last_count_for_finalized: Arc<AtomicU64>,
last_count_for_confirmations: Arc<AtomicU64>,
last_count_for_transactions_sent: Arc<AtomicU64>,
last_count_for_sent_errors: Arc<AtomicU64>,
last_count_for_transactions_recieved: Arc<AtomicU64>,
}
pub struct PerformancePerSec {
pub finalized_per_seconds: u64,
pub confirmations_per_seconds: u64,
pub transactions_per_seconds: u64,
pub send_transactions_errors_per_seconds: u64,
pub transaction_recieved_per_second: u64,
}
impl PerformanceCounter {
pub fn new() -> Self {
Self {
total_finalized: Arc::new(AtomicU64::new(0)),
total_confirmations: Arc::new(AtomicU64::new(0)),
total_transactions_sent: Arc::new(AtomicU64::new(0)),
total_transactions_recieved: Arc::new(AtomicU64::new(0)),
transaction_sent_error: Arc::new(AtomicU64::new(0)),
last_count_for_finalized: Arc::new(AtomicU64::new(0)),
last_count_for_confirmations: Arc::new(AtomicU64::new(0)),
last_count_for_transactions_sent: Arc::new(AtomicU64::new(0)),
last_count_for_transactions_recieved: Arc::new(AtomicU64::new(0)),
last_count_for_sent_errors: Arc::new(AtomicU64::new(0)),
}
}
pub fn update_per_seconds_transactions(&self) -> PerformancePerSec {
let total_finalized: u64 = self.total_finalized.load(Ordering::Relaxed);
let total_confirmations: u64 = self.total_confirmations.load(Ordering::Relaxed);
let total_transactions: u64 = self.total_transactions_sent.load(Ordering::Relaxed);
let total_errors: u64 = self.transaction_sent_error.load(Ordering::Relaxed);
let total_transactions_recieved: u64 =
self.total_transactions_recieved.load(Ordering::Relaxed);
let finalized_per_seconds = total_finalized
- self
.last_count_for_finalized
.swap(total_finalized, Ordering::Relaxed);
let confirmations_per_seconds = total_confirmations
- self
.last_count_for_confirmations
.swap(total_confirmations, Ordering::Relaxed);
let transactions_per_seconds = total_transactions
- self
.last_count_for_transactions_sent
.swap(total_transactions, Ordering::Relaxed);
let send_transactions_errors_per_seconds = total_errors
- self
.last_count_for_sent_errors
.swap(total_errors, Ordering::Relaxed);
let transaction_recieved_per_second = total_transactions_recieved
- self
.last_count_for_transactions_recieved
.swap(total_transactions_recieved, Ordering::Relaxed);
PerformancePerSec {
confirmations_per_seconds,
finalized_per_seconds,
send_transactions_errors_per_seconds,
transaction_recieved_per_second,
transactions_per_seconds,
}
}
}
const PRINT_COUNTERS : bool = true;
pub fn launch_performance_updating_thread(
performance_counter: PerformanceCounter,
) -> JoinHandle<()> {
Builder::new()
.name("Performance Counter".to_string())
.spawn(move || {
let mut nb_seconds: u64 = 0;
loop {
let start = Instant::now();
let wait_time = Duration::from_millis(1000);
let performance_counter = performance_counter.clone();
let data = performance_counter.update_per_seconds_transactions();
if PRINT_COUNTERS {
println!(
"At {} second, Recieved {}, Sent {} transactions, finalized {} and confirmed {} transactions",
nb_seconds, data.transaction_recieved_per_second, data.transactions_per_seconds, data.finalized_per_seconds, data.confirmations_per_seconds
);
}
let runtime = start.elapsed();
nb_seconds += 1;
if let Some(remaining) = wait_time.checked_sub(runtime) {
thread::sleep(remaining);
}
}
})
.unwrap()
}

36
src/encoding.rs Normal file
View File

@ -0,0 +1,36 @@
use base64::Engine;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum BinaryEncoding {
#[default]
Base58,
Base64,
}
#[derive(thiserror::Error, Debug)]
pub enum BinaryCodecError {
#[error("Base58DecodeError {0}")]
Base58DecodeError(#[from] bs58::decode::Error),
#[error("Base58EncodeError {0}")]
Base58EncodeError(#[from] bs58::encode::Error),
#[error("Base64DecodeError {0}")]
Base64DecodeError(#[from] base64::DecodeError),
}
impl BinaryEncoding {
pub fn decode<D: AsRef<[u8]>>(&self, to_decode: D) -> Result<Vec<u8>, BinaryCodecError> {
match self {
Self::Base58 => Ok(bs58::decode(to_decode).into_vec()?),
Self::Base64 => Ok(base64::engine::general_purpose::STANDARD.decode(to_decode)?),
}
}
pub fn encode<E: AsRef<[u8]>>(&self, to_encode: E) -> String {
match self {
Self::Base58 => bs58::encode(to_encode).into_string(),
Self::Base64 => base64::engine::general_purpose::STANDARD.encode(to_encode),
}
}
}

17
src/errors.rs Normal file
View File

@ -0,0 +1,17 @@
use solana_sdk::{signature::ParseSignatureError, transport::TransportError};
use crate::encoding::BinaryCodecError;
#[derive(thiserror::Error, Debug)]
pub enum JsonRpcError {
#[error("TransportError {0}")]
TransportError(#[from] TransportError),
#[error("BinaryCodecError {0}")]
BinaryCodecError(#[from] BinaryCodecError),
#[error("BincodeDeserializeError {0}")]
BincodeDeserializeError(#[from] bincode::Error),
#[error("SerdeError {0}")]
SerdeError(#[from] serde_json::Error),
#[error("ParseSignatureError {0}")]
ParseSignatureError(#[from] ParseSignatureError),
}

32
src/lib.rs Normal file
View File

@ -0,0 +1,32 @@
use const_env::from_env;
use solana_transaction_status::TransactionConfirmationStatus;
pub mod bridge;
pub mod cli;
pub mod configs;
pub mod encoding;
pub mod errors;
pub mod rpc;
pub mod tpu_manager;
pub mod workers;
#[from_env]
pub const DEFAULT_RPC_ADDR: &str = "http://0.0.0.0:8899";
#[from_env]
pub const DEFAULT_LITE_RPC_ADDR: &str = "http://0.0.0.0:8890";
#[from_env]
pub const DEFAULT_WS_ADDR: &str = "ws://0.0.0.0:8900";
#[from_env]
pub const DEFAULT_TX_MAX_RETRIES: u16 = 1;
#[from_env]
pub const DEFAULT_TX_BATCH_SIZE: usize = 1 << 7;
#[from_env]
pub const DEFAULT_FANOUT_SIZE: u64 = 32;
#[from_env]
pub const DEFAULT_TX_BATCH_INTERVAL_MS: u64 = 1;
#[from_env]
pub const DEFAULT_CLEAN_INTERVAL_MS: u64 = 5 * 60 * 1000; // five minute
#[from_env]
pub const DEFAULT_TX_SENT_TTL_S: u64 = 12;
pub const DEFAULT_TRANSACTION_CONFIRMATION_STATUS: TransactionConfirmationStatus =
TransactionConfirmationStatus::Finalized;

View File

@ -1,150 +1,49 @@
mod cli;
mod context;
mod pubsub;
mod rpc;
use std::{net::SocketAddr, sync::Arc, thread::sleep};
use std::time::Duration;
use anyhow::Context;
use clap::Parser;
use context::LiteRpcSubsrciptionControl;
use jsonrpc_core::MetaIoHandler;
use jsonrpc_http_server::{hyper, AccessControlAllowOrigin, DomainsValidation, ServerBuilder};
use pubsub::LitePubSubService;
use solana_cli_config::ConfigInput;
use solana_perf::thread::renice_this_thread;
use tokio::sync::broadcast;
use lite_rpc::{bridge::LiteBridge, cli::Args};
use crate::{
context::{launch_performance_updating_thread, PerformanceCounter},
rpc::{
lite_rpc::{self, Lite},
LightRpcRequestProcessor,
},
};
use cli::Args;
fn run(port: u16, subscription_port: u16, rpc_url: String, websocket_url: String) {
let rpc_url = if rpc_url.is_empty() {
let (_, rpc_url) = ConfigInput::compute_json_rpc_url_setting(
rpc_url.as_str(),
&ConfigInput::default().json_rpc_url,
);
rpc_url
} else {
rpc_url
};
let websocket_url = if websocket_url.is_empty() {
let (_, ws_url) = ConfigInput::compute_websocket_url_setting(
&websocket_url.as_str(),
"",
rpc_url.as_str(),
"",
);
ws_url
} else {
websocket_url
};
println!(
"Using rpc server {} and ws server {}",
rpc_url, websocket_url
);
let performance_counter = PerformanceCounter::new();
launch_performance_updating_thread(performance_counter.clone());
let (broadcast_sender, _broadcast_receiver) = broadcast::channel(10000);
let (notification_sender, notification_reciever) = crossbeam_channel::unbounded();
let pubsub_control = Arc::new(LiteRpcSubsrciptionControl::new(
broadcast_sender,
notification_reciever,
));
let subscription_port = format!("0.0.0.0:{}", subscription_port)
.parse::<SocketAddr>()
.expect("Invalid subscription port");
// start websocket server
let (_trigger, websocket_service) = LitePubSubService::new(
pubsub_control.clone(),
subscription_port,
performance_counter.clone(),
);
let _broadcast_thread = {
// build broadcasting thread
let pubsub_control = pubsub_control.clone();
std::thread::Builder::new()
.name("broadcasting thread".to_string())
.spawn(move || {
pubsub_control.start_broadcasting();
})
.unwrap()
};
let mut io = MetaIoHandler::default();
let lite_rpc = lite_rpc::LightRpc;
io.extend_with(lite_rpc.to_delegate());
let mut request_processor = LightRpcRequestProcessor::new(
rpc_url.as_str(),
&websocket_url,
notification_sender,
performance_counter.clone(),
);
let _cleaning_thread = {
// build cleaning thread
let context = request_processor.context.clone();
std::thread::Builder::new()
.name("cleaning thread".to_string())
.spawn(move || {
context.remove_stale_data(60 * 10);
sleep(std::time::Duration::from_secs(60))
})
.unwrap()
};
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(64)
.on_thread_start(move || renice_this_thread(0).unwrap())
.thread_name("solLiteRpcProcessor")
.enable_all()
.build()
.expect("Runtime"),
);
let max_request_body_size: usize = 50 * (1 << 10);
let socket_addr = format!("0.0.0.0:{}", port).parse::<SocketAddr>().unwrap();
{
let request_processor = request_processor.clone();
let server =
ServerBuilder::with_meta_extractor(io, move |_req: &hyper::Request<hyper::Body>| {
request_processor.clone()
})
.event_loop_executor(runtime.handle().clone())
.threads(1)
.cors(DomainsValidation::AllowOnly(vec![
AccessControlAllowOrigin::Any,
]))
.cors_max_age(86400)
.max_request_body_size(max_request_body_size)
.start_http(&socket_addr);
println!("Starting Lite RPC node");
server.unwrap().wait();
}
request_processor.free();
websocket_service.close().unwrap();
}
pub fn main() {
let mut cli_command = Args::parse();
cli_command.resolve_address();
#[tokio::main]
pub async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let Args {
port,
subscription_port,
rpc_url,
websocket_url,
} = cli_command;
run(port, subscription_port, rpc_url, websocket_url)
rpc_addr,
ws_addr,
tx_batch_size,
lite_rpc_ws_addr,
lite_rpc_http_addr,
tx_batch_interval_ms,
clean_interval_ms,
fanout_size,
} = Args::parse();
let tx_batch_interval_ms = Duration::from_millis(tx_batch_interval_ms);
let clean_interval_ms = Duration::from_millis(clean_interval_ms);
let light_bridge = LiteBridge::new(rpc_addr, ws_addr, fanout_size).await?;
let services = light_bridge
.start_services(
lite_rpc_http_addr,
lite_rpc_ws_addr,
tx_batch_size,
tx_batch_interval_ms,
clean_interval_ms,
)
.await?;
let services = futures::future::try_join_all(services);
let ctrl_c_signal = tokio::signal::ctrl_c();
tokio::select! {
services = services => {
services.context("Some services exited unexpectedly")?;
}
_ = ctrl_c_signal => {}
}
Ok(())
}

View File

@ -1,297 +0,0 @@
use jsonrpc_core::{ErrorCode, IoHandler};
use soketto::handshake::{server, Server};
use solana_rpc::rpc_subscription_tracker::{SignatureSubscriptionParams, SubscriptionParams};
use std::{net::SocketAddr, str::FromStr, thread::JoinHandle, collections::{BTreeSet}, sync::{RwLock}};
use stream_cancel::{Trigger, Tripwire};
use tokio::{net::TcpStream, pin, select};
use tokio_util::compat::TokioAsyncReadCompatExt;
use crate::context::{LiteRpcSubsrciptionControl, PerformanceCounter};
use {
jsonrpc_core::{Error, Result},
jsonrpc_derive::rpc,
solana_rpc_client_api::config::*,
solana_sdk::signature::Signature,
std::sync::Arc,
};
type SubscriptionId = u64;
#[rpc]
pub trait LiteRpcPubSub {
// Get notification when signature is verified
// Accepts signature parameter as base-58 encoded string
#[rpc(name = "signatureSubscribe")]
fn signature_subscribe(
&self,
signature_str: String,
config: Option<RpcSignatureSubscribeConfig>,
) -> Result<SubscriptionId>;
// Unsubscribe from signature notification subscription.
#[rpc(name = "signatureUnsubscribe")]
fn signature_unsubscribe(&self, id: SubscriptionId) -> Result<bool>;
// Get notification when slot is encountered
#[rpc(name = "slotSubscribe")]
fn slot_subscribe(&self) -> Result<SubscriptionId>;
// Unsubscribe from slot notification subscription.
#[rpc(name = "slotUnsubscribe")]
fn slot_unsubscribe(&self, id: SubscriptionId) -> Result<bool>;
}
#[derive(Clone)]
pub struct LiteRpcPubSubImpl {
subscription_control: Arc<LiteRpcSubsrciptionControl>,
pub current_subscriptions: Arc<RwLock<BTreeSet<u64>>>,
}
impl LiteRpcPubSubImpl {
pub fn new(subscription_control: Arc<LiteRpcSubsrciptionControl>) -> Self {
Self {
current_subscriptions: Arc::new(RwLock::new(BTreeSet::new())),
subscription_control,
}
}
fn subscribe(&self, params: SubscriptionParams) -> Result<SubscriptionId> {
match self
.subscription_control
.subscriptions
.entry(params.clone())
{
dashmap::mapref::entry::Entry::Occupied(x) => Ok(*x.get()),
dashmap::mapref::entry::Entry::Vacant(x) => {
let new_subscription_id = self
.subscription_control
.last_subscription_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let new_subsription_id = SubscriptionId::from(new_subscription_id);
x.insert(new_subsription_id);
let mut lock = self.current_subscriptions.write();
match &mut lock {
Ok(set) => {
set.insert(new_subsription_id);
Ok(new_subsription_id)
},
Err(_) => {
Err(Error::new(jsonrpc_core::ErrorCode::InternalError))
}
}
}
}
}
fn unsubscribe(&self, id: SubscriptionId) -> Result<bool> {
let mut lock = self.current_subscriptions.write();
match &mut lock {
Ok(set) => {
if set.contains(&id) {
set.remove(&id);
return Ok(true)
}
return Ok(false)
},
Err(_) => {
Err(Error::new(jsonrpc_core::ErrorCode::InternalError))
}
}
}
}
fn param<T: FromStr>(param_str: &str, thing: &str) -> Result<T> {
param_str.parse::<T>().map_err(|_e| Error {
code: ErrorCode::InvalidParams,
message: format!("Invalid Request: Invalid {} provided", thing),
data: None,
})
}
impl LiteRpcPubSub for LiteRpcPubSubImpl {
fn signature_subscribe(
&self,
signature_str: String,
config: Option<RpcSignatureSubscribeConfig>,
) -> Result<SubscriptionId> {
let config = config.unwrap_or_default();
let params = SignatureSubscriptionParams {
signature: param::<Signature>(&signature_str, "signature")?,
commitment: config.commitment.unwrap_or_default(),
enable_received_notification: false,
};
let id = self.subscribe(SubscriptionParams::Signature(params));
id
}
fn signature_unsubscribe(&self, id: SubscriptionId) -> Result<bool> {
self.unsubscribe(id)
}
// Get notification when slot is encountered
fn slot_subscribe(&self) -> Result<SubscriptionId> {
let mut lock = self.current_subscriptions.write();
match &mut lock {
Ok(set) => {
set.insert(0);
Ok(0)
},
Err(_) => {
Err(Error::new(jsonrpc_core::ErrorCode::InternalError))
}
}
}
// Unsubscribe from slot notification subscription.
fn slot_unsubscribe(&self, _id: SubscriptionId) -> Result<bool> {
let mut lock = self.current_subscriptions.write();
match &mut lock {
Ok(set) => {
if set.contains(&0) {
set.remove(&0);
return Ok(true)
}
return Ok(false)
},
Err(_) => {
Err(Error::new(jsonrpc_core::ErrorCode::InternalError))
}
}
}
}
pub struct LitePubSubService {
thread_hdl: JoinHandle<()>,
}
#[derive(Debug, thiserror::Error)]
enum HandleError {
#[error("handshake error: {0}")]
Handshake(#[from] soketto::handshake::Error),
#[error("connection error: {0}")]
Connection(#[from] soketto::connection::Error),
#[error("broadcast queue error: {0}")]
Broadcast(#[from] tokio::sync::broadcast::error::RecvError),
}
async fn handle_connection(
socket: TcpStream,
subscription_control: Arc<LiteRpcSubsrciptionControl>,
_performance_counter: PerformanceCounter,
) -> core::result::Result<(), HandleError> {
let mut server = Server::new(socket.compat());
let request = server.receive_request().await?;
let accept = server::Response::Accept {
key: request.key(),
protocol: None,
};
server.send_response(&accept).await?;
let (mut sender, mut receiver) = server.into_builder().finish();
let mut broadcast_receiver = subscription_control.broadcast_sender.subscribe();
let mut json_rpc_handler = IoHandler::new();
let rpc_impl = LiteRpcPubSubImpl::new(subscription_control);
json_rpc_handler.extend_with(rpc_impl.clone().to_delegate());
loop {
let mut data = Vec::new();
// Extra block for dropping `receive_future`.
{
// soketto is not cancel safe, so we have to introduce an inner loop to poll
// `receive_data` to completion.
let receive_future = receiver.receive_data(&mut data);
pin!(receive_future);
loop {
select! {
result = &mut receive_future => match result {
Ok(_) => break,
Err(soketto::connection::Error::Closed) => return Ok(()),
Err(err) => return Err(err.into()),
},
result = broadcast_receiver.recv() => {
if let Ok(x) = result {
if rpc_impl.current_subscriptions.read().unwrap().contains(&x.subscription_id) {
sender.send_text(&x.json).await?;
}
}
},
}
}
}
let data_str = String::from_utf8(data).unwrap();
if let Some(response) = json_rpc_handler.handle_request(data_str.as_str()).await {
sender.send_text(&response).await?;
}
}
}
async fn listen(
listen_address: SocketAddr,
subscription_control: Arc<LiteRpcSubsrciptionControl>,
mut tripwire: Tripwire,
performance_counter: PerformanceCounter,
) -> std::io::Result<()> {
let listener = tokio::net::TcpListener::bind(&listen_address).await?;
loop {
select! {
result = listener.accept() => match result {
Ok((socket, addr)) => {
let subscription_control = subscription_control.clone();
let performance_counter = performance_counter.clone();
tokio::spawn(async move {
let handle = handle_connection(
socket, subscription_control, performance_counter,
);
match handle.await {
Ok(()) => println!("connection closed ({:?})", addr),
Err(err) => println!("connection handler error ({:?}): {}", addr, err),
}
});
},
Err(e) => println!("couldn't accept connection: {:?}", e),
},
_ = &mut tripwire => return Ok(()),
}
}
}
impl LitePubSubService {
pub fn new(
subscription_control: Arc<LiteRpcSubsrciptionControl>,
pubsub_addr: SocketAddr,
performance_counter: PerformanceCounter,
) -> (Trigger, Self) {
let (trigger, tripwire) = Tripwire::new();
let thread_hdl = std::thread::Builder::new()
.name("solRpcPubSub".to_string())
.spawn(move || {
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(128)
.enable_all()
.build()
.expect("runtime creation failed");
if let Err(err) = runtime.block_on(listen(
pubsub_addr,
subscription_control,
tripwire,
performance_counter,
)) {
println!("pubsub service failed: {}", err);
};
})
.expect("thread spawn failed");
(trigger, Self { thread_hdl })
}
pub fn close(self) -> std::thread::Result<()> {
self.join()
}
pub fn join(self) -> std::thread::Result<()> {
self.thread_hdl.join()
}
}

View File

@ -1,778 +1,61 @@
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use solana_client::{
pubsub_client::{BlockSubscription, PubsubClientError},
tpu_client::TpuClientConfig,
use jsonrpsee::proc_macros::rpc;
use solana_client::rpc_config::{
RpcContextConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig,
};
use solana_pubsub_client::pubsub_client::{PubsubBlockClientSubscription, PubsubClient};
use solana_sdk::transaction::Transaction;
use std::{
str::FromStr,
sync::Mutex,
thread::{Builder, JoinHandle},
time::{Duration, Instant},
use solana_client::rpc_response::{Response as RpcResponse, RpcBlockhash, RpcVersionInfo};
use solana_sdk::commitment_config::CommitmentConfig;
use solana_transaction_status::TransactionStatus;
use crate::{
configs::{IsBlockHashValidConfig, SendTransactionConfig},
workers::Metrics,
};
use crate::context::{
BlockInformation, LiteRpcContext, NotificationType, PerformanceCounter, SignatureNotification,
SignatureStatus, SlotNotification,
};
use crossbeam_channel::Sender;
use {
bincode::config::Options,
crossbeam_channel::Receiver,
jsonrpc_core::{Error, Metadata, Result},
jsonrpc_derive::rpc,
solana_client::connection_cache::ConnectionCache,
solana_client::rpc_client::RpcClient,
solana_client::tpu_client::TpuClient,
solana_perf::packet::PACKET_DATA_SIZE,
solana_rpc_client_api::{
config::*,
response::{Response as RpcResponse, *},
},
solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel},
signature::Signature,
},
solana_transaction_status::{TransactionBinaryEncoding, UiTransactionEncoding},
std::{
any::type_name,
sync::{atomic::Ordering, Arc},
},
};
pub type Result<T> = std::result::Result<T, jsonrpsee::core::Error>;
const TPU_BATCH_SIZE: usize = 8;
#[rpc(server)]
pub trait LiteRpc {
#[method(name = "sendTransaction")]
async fn send_transaction(
&self,
tx: String,
send_transaction_config: Option<SendTransactionConfig>,
) -> Result<String>;
#[derive(Clone)]
pub struct LightRpcRequestProcessor {
pub rpc_client: Arc<RpcClient>,
pub last_valid_block_height: u64,
pub ws_url: String,
pub context: Arc<LiteRpcContext>,
_connection_cache: Arc<ConnectionCache>,
joinables: Arc<Mutex<Vec<JoinHandle<()>>>>,
subscribed_clients: Arc<Mutex<Vec<PubsubBlockClientSubscription>>>,
performance_counter: PerformanceCounter,
tpu_producer_channel: Sender<Transaction>,
}
impl LightRpcRequestProcessor {
pub fn new(
json_rpc_url: &str,
websocket_url: &str,
notification_sender: Sender<NotificationType>,
performance_counter: PerformanceCounter,
) -> LightRpcRequestProcessor {
let rpc_client = Arc::new(RpcClient::new(json_rpc_url));
let context = Arc::new(LiteRpcContext::new(rpc_client.clone(), notification_sender));
let connection_cache = Arc::new(ConnectionCache::default());
println!("ws_url {}", websocket_url);
// subscribe for confirmed_blocks
let (client_confirmed, receiver_confirmed) =
Self::subscribe_block(websocket_url, CommitmentLevel::Confirmed).unwrap();
// subscribe for finalized blocks
let (client_finalized, receiver_finalized) =
Self::subscribe_block(websocket_url, CommitmentLevel::Finalized).unwrap();
let (tpu_producer, tpu_consumer) = crossbeam_channel::bounded(100000);
// create threads to listen for finalized and confrimed blocks
let joinables = vec![
Self::build_thread_to_process_blocks(
receiver_confirmed,
&context,
CommitmentLevel::Confirmed,
performance_counter.clone(),
),
Self::build_thread_to_process_blocks(
receiver_finalized,
&context,
CommitmentLevel::Finalized,
performance_counter.clone(),
),
Self::build_thread_to_process_transactions(
json_rpc_url.to_string(),
websocket_url.to_string(),
&context,
tpu_consumer.clone(),
performance_counter.clone(),
),
];
LightRpcRequestProcessor {
rpc_client,
last_valid_block_height: 0,
ws_url: websocket_url.to_string(),
context,
_connection_cache: connection_cache,
joinables: Arc::new(Mutex::new(joinables)),
subscribed_clients: Arc::new(Mutex::new(vec![client_confirmed, client_finalized])),
performance_counter,
tpu_producer_channel: tpu_producer,
}
}
fn subscribe_block(
websocket_url: &str,
commitment: CommitmentLevel,
) -> std::result::Result<BlockSubscription, PubsubClientError> {
PubsubClient::block_subscribe(
websocket_url,
RpcBlockSubscribeFilter::All,
Some(RpcBlockSubscribeConfig {
commitment: Some(CommitmentConfig { commitment }),
encoding: None,
transaction_details: Some(solana_transaction_status::TransactionDetails::Full),
show_rewards: None,
max_supported_transaction_version: None,
}),
)
}
fn build_thread_to_process_blocks(
reciever: Receiver<RpcResponse<RpcBlockUpdate>>,
context: &Arc<LiteRpcContext>,
commitment: CommitmentLevel,
performance_counters: PerformanceCounter,
) -> JoinHandle<()> {
let context = context.clone();
Builder::new()
.name("thread working on confirmation block".to_string())
.spawn(move || {
let block_info = if commitment.eq(&CommitmentLevel::Finalized) {
&context.confirmed_block_info
} else {
&context.finalized_block_info
};
Self::process_block(
reciever,
&context.signature_status,
commitment,
&context.notification_sender,
block_info,
performance_counters,
);
})
.unwrap()
}
fn build_thread_to_process_transactions(
json_rpc_url: String,
websocket_url: String,
context: &Arc<LiteRpcContext>,
receiver: Receiver<Transaction>,
performance_counters: PerformanceCounter,
) -> JoinHandle<()> {
let context = context.clone();
Builder::new()
.name("thread working on confirmation block".to_string())
.spawn(move || {
let rpc_client =
Arc::new(RpcClient::new(json_rpc_url.to_string()));
let mut connection_cache = Arc::new(ConnectionCache::default());
let tpu_client = TpuClient::new_with_connection_cache(
rpc_client.clone(),
websocket_url.as_str(),
TpuClientConfig::default(), // value for max fanout slots
connection_cache.clone(),
);
let mut tpu_client = Arc::new(tpu_client.unwrap());
let mut consecutive_errors: u8 = 0;
loop {
let recv_res = receiver.recv();
match recv_res {
Ok(transaction) => {
let (fut_res, count) = if TPU_BATCH_SIZE > 1 {
let mut transactions_vec = vec![transaction];
let mut time_remaining = Duration::from_micros(1000);
for _i in 1..TPU_BATCH_SIZE {
let start = std::time::Instant::now();
let another = receiver.recv_timeout(time_remaining);
match another {
Ok(x) => transactions_vec.push(x),
Err(_) => break,
}
match time_remaining.checked_sub(start.elapsed()) {
Some(x) => time_remaining = x,
None => break,
}
}
let count: u64 = transactions_vec.len() as u64;
let slice = transactions_vec.as_slice();
let fut_res = tpu_client.try_send_transaction_batch(slice);
// insert sent transactions into signature status map
transactions_vec.iter().for_each(|x| {
let signature = x.signatures[0].to_string();
context.signature_status.insert(
signature.clone(),
SignatureStatus {
status: None,
error: None,
created: Instant::now(),
},
);
});
(fut_res, count)
} else {
let fut_res = tpu_client.try_send_transaction(&transaction);
let signature = transaction.signatures[0].to_string();
context.signature_status.insert(
signature.clone(),
SignatureStatus {
status: None,
error: None,
created: Instant::now(),
},
);
(fut_res, 1)
};
match fut_res {
Ok(_) => {
consecutive_errors = 0;
performance_counters
.total_transactions_sent
.fetch_add(count, Ordering::Relaxed);
},
Err(e) => {
println!("Got error while sending transaction batch of size {}, error {}", count, e.to_string());
consecutive_errors += 1;
if consecutive_errors > 3 {
connection_cache = Arc::new(ConnectionCache::default());
let new_tpu_client = TpuClient::new_with_connection_cache(
rpc_client.clone(),
websocket_url.as_str(),
TpuClientConfig::default(), // value for max fanout slots
connection_cache.clone(),
);
// reset TPU connection
tpu_client = Arc::new(new_tpu_client.unwrap());
}
performance_counters
.transaction_sent_error
.fetch_add(count, Ordering::Relaxed);
}
};
}
Err(e) => {
println!("got error on tpu channel {}", e.to_string());
break;
}
};
}
}).unwrap()
}
fn process_block(
reciever: Receiver<RpcResponse<RpcBlockUpdate>>,
signature_status: &DashMap<String, SignatureStatus>,
commitment: CommitmentLevel,
notification_sender: &crossbeam_channel::Sender<NotificationType>,
block_information: &BlockInformation,
performance_counters: PerformanceCounter,
) {
loop {
let block_data = reciever.recv();
match block_data {
Ok(data) => {
let block_update = &data.value;
block_information
.slot
.store(block_update.slot, Ordering::Relaxed);
let slot_notification = SlotNotification {
commitment: commitment,
slot: block_update.slot,
parent: 0,
root: 0,
};
if let Err(e) =
notification_sender.send(NotificationType::Slot(slot_notification))
{
println!("Error sending slot notification error : {}", e.to_string());
}
if let Some(block) = &block_update.block {
block_information
.block_height
.store(block.block_height.unwrap(), Ordering::Relaxed);
// context to update blockhash
{
let mut lock = block_information.block_hash.write().unwrap();
*lock = block.blockhash.clone();
}
if let Some(transactions) = &block.transactions {
for transaction in transactions {
let decoded_transaction =
&transaction.transaction.decode().unwrap();
let signature = decoded_transaction.signatures[0].to_string();
match signature_status.entry(signature.clone()) {
dashmap::mapref::entry::Entry::Occupied(mut x) => {
// get signature status
let transaction_error = match &transaction.meta {
Some(x) => x.err.clone(),
None => {
println!("cannot decode transaction error");
None
}
};
let signature_notification = SignatureNotification {
signature: Signature::from_str(signature.as_str())
.unwrap(),
commitment,
slot: block_update.slot,
error: transaction_error.clone().map(|x| x.to_string()),
};
if let Err(e) = notification_sender.send(
NotificationType::Signature(signature_notification),
) {
println!(
"Error sending signature notification error : {}",
e.to_string()
);
}
if commitment.eq(&CommitmentLevel::Finalized) {
performance_counters
.total_finalized
.fetch_add(1, Ordering::Relaxed);
} else {
performance_counters
.total_confirmations
.fetch_add(1, Ordering::Relaxed);
}
x.insert(SignatureStatus {
status: Some(commitment),
error: transaction_error,
created: Instant::now(),
});
}
dashmap::mapref::entry::Entry::Vacant(_x) => {
// do nothing transaction not sent by lite rpc
}
}
}
} else {
println!(
"Cannot get signatures at slot {} block hash {}",
block_update.slot, block.blockhash,
);
}
} else {
println!("Cannot get a block at slot {}", block_update.slot);
}
}
Err(e) => {
println!("Got error when recieving the block ({})", e);
}
}
}
}
pub fn free(&mut self) {
let subscribed_clients = &mut self.subscribed_clients.lock().unwrap();
let len_sc = subscribed_clients.len();
for _i in 0..len_sc {
let mut subscribed_client = subscribed_clients.pop().unwrap();
subscribed_client.send_unsubscribe().unwrap();
subscribed_client.shutdown().unwrap();
}
let joinables = &mut self.joinables.lock().unwrap();
let len = joinables.len();
for _i in 0..len {
joinables.pop().unwrap().join().unwrap();
}
}
}
impl Metadata for LightRpcRequestProcessor {}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RpcPerformanceCounterResults {
pub transactions_per_seconds: u64,
pub confirmations_per_seconds: u64,
pub total_transactions_count: u64,
pub total_confirmations_count: u64,
pub memory_used: u64,
pub nb_threads: u64,
}
pub mod lite_rpc {
use std::str::FromStr;
use itertools::Itertools;
use solana_sdk::{fee_calculator::FeeCalculator, pubkey::Pubkey, transaction::Transaction};
use solana_transaction_status::{TransactionConfirmationStatus, TransactionStatus};
use super::*;
#[rpc]
pub trait Lite {
type Metadata;
#[rpc(meta, name = "sendTransaction")]
fn send_transaction(
&self,
meta: Self::Metadata,
data: String,
config: Option<RpcSendTransactionConfig>,
) -> Result<String>;
#[rpc(meta, name = "getRecentBlockhash")]
fn get_recent_blockhash(
&self,
meta: Self::Metadata,
commitment: Option<CommitmentConfig>,
) -> Result<RpcResponse<RpcBlockhashFeeCalculator>>;
#[rpc(meta, name = "confirmTransaction")]
fn confirm_transaction(
&self,
meta: Self::Metadata,
signature_str: String,
commitment: Option<CommitmentConfig>,
) -> Result<RpcResponse<bool>>;
#[rpc(meta, name = "requestAirdrop")]
fn request_airdrop(
&self,
meta: Self::Metadata,
pubkey_str: String,
lamports: u64,
config: Option<RpcRequestAirdropConfig>,
) -> Result<String>;
#[rpc(meta, name = "getLatestBlockhash")]
fn get_latest_blockhash(
&self,
meta: Self::Metadata,
config: Option<RpcContextConfig>,
) -> Result<RpcResponse<RpcBlockhash>>;
#[rpc(meta, name = "getSignatureStatuses")]
fn get_signature_statuses(
&self,
meta: Self::Metadata,
signature_strs: Vec<String>,
config: Option<RpcSignatureStatusConfig>,
) -> Result<RpcResponse<Vec<Option<TransactionStatus>>>>;
#[rpc(name = "getVersion")]
fn get_version(&self) -> Result<RpcVersionInfo>;
}
pub struct LightRpc;
impl Lite for LightRpc {
type Metadata = LightRpcRequestProcessor;
fn send_transaction(
&self,
meta: Self::Metadata,
data: String,
config: Option<RpcSendTransactionConfig>,
) -> Result<String> {
let config = config.unwrap_or_default();
let encoding = config.encoding;
let tx_encoding = encoding.unwrap_or(UiTransactionEncoding::Base58);
let binary_encoding = tx_encoding.into_binary_encoding().ok_or_else(|| {
Error::invalid_params(format!(
"unsupported encoding: {}. Supported encodings: base58, base64",
tx_encoding
))
})?;
let transaction = decode_and_deserialize::<Transaction>(data, binary_encoding)?;
let signature = transaction.signatures[0].to_string();
meta.performance_counter
.total_transactions_recieved
.fetch_add(1, Ordering::Relaxed);
match meta.tpu_producer_channel.send(transaction) {
Ok(_) => Ok(signature),
Err(e) => {
println!("got error while sending on channel {}", e.to_string());
Err(jsonrpc_core::Error::new(
jsonrpc_core::ErrorCode::InternalError,
))
}
}
}
fn get_recent_blockhash(
&self,
meta: Self::Metadata,
commitment: Option<CommitmentConfig>,
) -> Result<RpcResponse<RpcBlockhashFeeCalculator>> {
let commitment = match commitment {
Some(x) => x.commitment,
None => CommitmentLevel::Finalized,
};
let (block_hash, slot) = match commitment {
CommitmentLevel::Finalized => {
let slot = meta
.context
.finalized_block_info
.slot
.load(Ordering::Relaxed);
let lock = meta.context.finalized_block_info.block_hash.read().unwrap();
(lock.clone(), slot)
}
_ => {
let slot = meta
.context
.confirmed_block_info
.slot
.load(Ordering::Relaxed);
let lock = meta.context.confirmed_block_info.block_hash.read().unwrap();
(lock.clone(), slot)
}
};
Ok(RpcResponse {
context: RpcResponseContext::new(slot),
value: RpcBlockhashFeeCalculator {
blockhash: block_hash,
fee_calculator: FeeCalculator::default(),
},
})
}
fn get_latest_blockhash(
&self,
meta: Self::Metadata,
config: Option<RpcContextConfig>,
) -> Result<RpcResponse<RpcBlockhash>> {
let commitment = match config {
Some(x) => match x.commitment {
Some(x) => x.commitment,
None => CommitmentLevel::Finalized,
},
None => CommitmentLevel::Finalized,
};
let block_info = match commitment {
CommitmentLevel::Finalized => &meta.context.finalized_block_info,
_ => &meta.context.confirmed_block_info,
};
let slot = block_info.slot.load(Ordering::Relaxed);
let last_valid_block_height = block_info.block_height.load(Ordering::Relaxed);
let blockhash = block_info.block_hash.read().unwrap().clone();
Ok(RpcResponse {
context: RpcResponseContext::new(slot),
value: RpcBlockhash {
blockhash,
last_valid_block_height,
},
})
}
fn confirm_transaction(
&self,
meta: Self::Metadata,
signature_str: String,
commitment_cfg: Option<CommitmentConfig>,
) -> Result<RpcResponse<bool>> {
let singature_status = meta.context.signature_status.get(&signature_str);
let k_value = singature_status;
let commitment = match commitment_cfg {
Some(x) => x.commitment,
None => CommitmentLevel::Confirmed,
};
let slot = if commitment.eq(&CommitmentLevel::Finalized) {
meta.context
.finalized_block_info
.slot
.load(Ordering::Relaxed)
} else {
meta.context
.confirmed_block_info
.slot
.load(Ordering::Relaxed)
};
match k_value {
Some(value) => match value.status {
Some(commitment) => {
let commitment_matches = if commitment.eq(&CommitmentLevel::Finalized) {
commitment.eq(&CommitmentLevel::Finalized)
} else {
commitment.eq(&CommitmentLevel::Finalized)
|| commitment.eq(&CommitmentLevel::Confirmed)
};
Ok(RpcResponse {
context: RpcResponseContext::new(slot),
value: commitment_matches && value.error.is_none(),
})
}
None => Ok(RpcResponse {
context: RpcResponseContext::new(slot),
value: false,
}),
},
None => {
let signature = Signature::from_str(signature_str.as_str()).unwrap();
let ans = match commitment_cfg {
None => meta.rpc_client.confirm_transaction(&signature).unwrap(),
Some(cfg) => {
meta.rpc_client
.confirm_transaction_with_commitment(&signature, cfg)
.unwrap()
.value
}
};
Ok(RpcResponse {
context: RpcResponseContext::new(slot),
value: ans,
})
}
}
}
fn get_signature_statuses(
&self,
meta: Self::Metadata,
signature_strs: Vec<String>,
_config: Option<RpcSignatureStatusConfig>,
) -> Result<RpcResponse<Vec<Option<TransactionStatus>>>> {
let confirmed_slot = meta
.context
.confirmed_block_info
.slot
.load(Ordering::Relaxed);
let status = signature_strs
.iter()
.map(|x| {
let singature_status = meta.context.signature_status.get(x);
let k_value = singature_status;
match k_value {
Some(value) => match value.status {
Some(commitment_level) => {
let slot = meta
.context
.confirmed_block_info
.slot
.load(Ordering::Relaxed);
let status = match commitment_level {
CommitmentLevel::Finalized => {
TransactionConfirmationStatus::Finalized
}
_ => TransactionConfirmationStatus::Confirmed,
};
Some(TransactionStatus {
slot,
confirmations: None,
status: Ok(()),
err: value.error.clone(),
confirmation_status: Some(status),
})
}
None => None,
},
None => None,
}
})
.collect_vec();
Ok(RpcResponse {
context: RpcResponseContext::new(confirmed_slot),
value: status,
})
}
fn request_airdrop(
&self,
meta: Self::Metadata,
pubkey_str: String,
lamports: u64,
config: Option<RpcRequestAirdropConfig>,
) -> Result<String> {
let pubkey = Pubkey::from_str(pubkey_str.as_str()).unwrap();
let signature = match config {
Some(c) => meta
.rpc_client
.request_airdrop_with_config(&pubkey, lamports, c),
None => meta.rpc_client.request_airdrop(&pubkey, lamports),
};
Ok(signature.unwrap().to_string())
}
fn get_version(&self) -> Result<RpcVersionInfo> {
let version = solana_version::Version::default();
Ok(RpcVersionInfo {
solana_core: version.to_string(),
feature_set: Some(version.feature_set),
})
}
}
}
const MAX_BASE58_SIZE: usize = 1683; // Golden, bump if PACKET_DATA_SIZE changes
const MAX_BASE64_SIZE: usize = 1644; // Golden, bump if PACKET_DATA_SIZE changes
fn decode_and_deserialize<T>(encoded: String, encoding: TransactionBinaryEncoding) -> Result<T>
where
T: serde::de::DeserializeOwned,
{
let wire_output = match encoding {
TransactionBinaryEncoding::Base58 => {
if encoded.len() > MAX_BASE58_SIZE {
return Err(Error::invalid_params(format!(
"base58 encoded {} too large: {} bytes (max: encoded/raw {}/{})",
type_name::<T>(),
encoded.len(),
MAX_BASE58_SIZE,
PACKET_DATA_SIZE,
)));
}
bs58::decode(encoded)
.into_vec()
.map_err(|e| Error::invalid_params(format!("invalid base58 encoding: {:?}", e)))?
}
TransactionBinaryEncoding::Base64 => {
if encoded.len() > MAX_BASE64_SIZE {
return Err(Error::invalid_params(format!(
"base64 encoded {} too large: {} bytes (max: encoded/raw {}/{})",
type_name::<T>(),
encoded.len(),
MAX_BASE64_SIZE,
PACKET_DATA_SIZE,
)));
}
base64::decode(encoded)
.map_err(|e| Error::invalid_params(format!("invalid base64 encoding: {:?}", e)))?
}
};
if wire_output.len() > PACKET_DATA_SIZE {
return Err(Error::invalid_params(format!(
"decoded {} too large: {} bytes (max: {} bytes)",
type_name::<T>(),
wire_output.len(),
PACKET_DATA_SIZE
)));
}
bincode::options()
.with_limit(PACKET_DATA_SIZE as u64)
.with_fixint_encoding()
.allow_trailing_bytes()
.deserialize_from(&wire_output[..])
.map_err(|err| {
Error::invalid_params(format!(
"failed to deserialize {}: {}",
type_name::<T>(),
&err.to_string()
))
})
#[method(name = "getLatestBlockhash")]
async fn get_latest_blockhash(
&self,
config: Option<RpcContextConfig>,
) -> Result<RpcResponse<RpcBlockhash>>;
#[method(name = "isBlockhashValid")]
async fn is_blockhash_valid(
&self,
blockhash: String,
config: Option<IsBlockHashValidConfig>,
) -> Result<RpcResponse<bool>>;
#[method(name = "getSignatureStatuses")]
async fn get_signature_statuses(
&self,
signature_strs: Vec<String>,
config: Option<RpcSignatureStatusConfig>,
) -> Result<RpcResponse<Vec<Option<TransactionStatus>>>>;
#[method(name = "getVersion")]
fn get_version(&self) -> Result<RpcVersionInfo>;
#[method(name = "requestAirdrop")]
async fn request_airdrop(
&self,
pubkey_str: String,
lamports: u64,
config: Option<RpcRequestAirdropConfig>,
) -> Result<String>;
#[method(name = "getMetrics")]
async fn get_metrics(&self) -> Result<Metrics>;
#[subscription(name = "signatureSubscribe" => "signatureNotification", unsubscribe="signatureUnsubscribe", item=RpcResponse<serde_json::Value>)]
fn signature_subscribe(&self, signature: String, commitment_config: CommitmentConfig);
}

86
src/tpu_manager.rs Normal file
View File

@ -0,0 +1,86 @@
use std::sync::{
atomic::{AtomicU32, Ordering},
Arc,
};
use log::info;
use solana_client::{
nonblocking::{rpc_client::RpcClient, tpu_client::TpuClient},
tpu_client::TpuClientConfig,
};
use tokio::sync::RwLock;
#[derive(Clone)]
pub struct TpuManager {
error_count: Arc<AtomicU32>,
rpc_client: Arc<RpcClient>,
tpu_client: Arc<RwLock<TpuClient>>,
ws_addr: String,
fanout_slots: u64,
}
impl TpuManager {
pub async fn new(
rpc_client: Arc<RpcClient>,
ws_addr: String,
fanout_slots: u64,
) -> anyhow::Result<Self> {
let tpu_client = Self::new_tpu_client(rpc_client.clone(), &ws_addr, fanout_slots).await?;
let tpu_client = Arc::new(RwLock::new(tpu_client));
Ok(Self {
rpc_client,
tpu_client,
ws_addr,
fanout_slots,
error_count: Default::default(),
})
}
pub async fn new_tpu_client(
rpc_client: Arc<RpcClient>,
ws_addr: &str,
fanout_slots: u64,
) -> anyhow::Result<TpuClient> {
Ok(TpuClient::new(
rpc_client.clone(),
ws_addr,
TpuClientConfig { fanout_slots },
)
.await?)
}
pub async fn reset(&self) -> anyhow::Result<()> {
self.error_count.fetch_add(1, Ordering::Relaxed);
if self.error_count.load(Ordering::Relaxed) > 5 {
let tpu_client =
Self::new_tpu_client(self.rpc_client.clone(), &self.ws_addr, self.fanout_slots)
.await?;
self.error_count.store(0, Ordering::Relaxed);
*self.tpu_client.write().await = tpu_client;
info!("TPU Reset after 5 errors");
}
Ok(())
}
pub async fn try_send_wire_transaction_batch(
&self,
wire_transactions: Vec<Vec<u8>>,
) -> anyhow::Result<()> {
match self
.tpu_client
.read()
.await
.try_send_wire_transaction_batch(wire_transactions)
.await
{
Ok(_) => Ok(()),
Err(err) => {
self.reset().await?;
Err(err.into())
}
}
}
}

View File

@ -0,0 +1,186 @@
use std::sync::Arc;
use anyhow::{bail, Context};
use dashmap::DashMap;
use futures::StreamExt;
use jsonrpsee::SubscriptionSink;
use log::info;
use solana_client::{
nonblocking::{pubsub_client::PubsubClient, rpc_client::RpcClient},
rpc_client::SerializableTransaction,
rpc_config::{RpcBlockSubscribeConfig, RpcBlockSubscribeFilter},
rpc_response::{Response as RpcResponse, RpcResponseContext},
};
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_transaction_status::{
TransactionConfirmationStatus, TransactionStatus, UiTransactionStatusMeta,
};
use tokio::{sync::RwLock, task::JoinHandle};
use super::TxSender;
/// Background worker which listen's to new blocks
/// and keeps a track of confirmed txs
#[derive(Clone)]
pub struct BlockListener {
pub_sub_client: Arc<PubsubClient>,
commitment_config: CommitmentConfig,
tx_sender: TxSender,
latest_block_info: Arc<RwLock<BlockInformation>>,
pub signature_subscribers: Arc<DashMap<String, SubscriptionSink>>,
}
struct BlockInformation {
pub slot: u64,
pub blockhash: String,
pub block_height: u64,
}
impl BlockListener {
pub async fn new(
pub_sub_client: Arc<PubsubClient>,
rpc_client: Arc<RpcClient>,
tx_sender: TxSender,
commitment_config: CommitmentConfig,
) -> anyhow::Result<Self> {
let (latest_block_hash, block_height) = rpc_client
.get_latest_blockhash_with_commitment(commitment_config)
.await?;
Ok(Self {
pub_sub_client,
tx_sender,
latest_block_info: Arc::new(RwLock::new(BlockInformation {
slot: rpc_client.get_slot().await?,
blockhash: latest_block_hash.to_string(),
block_height,
})),
commitment_config,
signature_subscribers: Default::default(),
})
}
pub async fn num_of_sigs_commited(&self, sigs: &[String]) -> usize {
let mut num_of_sigs_commited = 0;
for sig in sigs {
if self.tx_sender.txs_sent.contains_key(sig) {
num_of_sigs_commited += 1;
}
}
num_of_sigs_commited
}
pub async fn get_slot(&self) -> u64 {
self.latest_block_info.read().await.slot
}
pub async fn get_latest_blockhash(&self) -> (String, u64) {
let block = self.latest_block_info.read().await;
(block.blockhash.clone(), block.block_height)
}
pub fn signature_subscribe(&self, signature: String, sink: SubscriptionSink) {
let _ = self.signature_subscribers.insert(signature, sink);
}
pub fn signature_un_subscribe(&self, signature: String) {
self.signature_subscribers.remove(&signature);
}
pub fn listen(self) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
info!("Subscribing to blocks");
let commitment = self.commitment_config.commitment;
let comfirmation_status = match commitment {
CommitmentLevel::Finalized => TransactionConfirmationStatus::Finalized,
_ => TransactionConfirmationStatus::Confirmed,
};
let (mut recv, _) = self
.pub_sub_client
.block_subscribe(
RpcBlockSubscribeFilter::All,
Some(RpcBlockSubscribeConfig {
commitment: Some(self.commitment_config),
encoding: None,
transaction_details: Some(
solana_transaction_status::TransactionDetails::Full,
),
show_rewards: None,
max_supported_transaction_version: None,
}),
)
.await
.context("Error calling block_subscribe")?;
info!("Listening to {commitment:?} blocks");
while let Some(block) = recv.as_mut().next().await {
let slot = block.value.slot;
let Some(block) = block.value.block else {
continue;
};
let Some(block_height) = block.block_height else {
continue;
};
let blockhash = block.blockhash;
let Some(transactions) = block.transactions else {
continue;
};
*self.latest_block_info.write().await = BlockInformation {
slot,
blockhash,
block_height,
};
for tx in transactions {
let Some(UiTransactionStatusMeta { err, status, .. }) = tx.meta else {
info!("tx with no meta");
continue;
};
let Some(tx) = tx.transaction.decode() else {
info!("unable to decode tx");
continue;
};
let sig = tx.get_signature().to_string();
if let Some(mut tx_status) = self.tx_sender.txs_sent.get_mut(&sig) {
tx_status.value_mut().status = Some(TransactionStatus {
slot,
confirmations: None, //TODO: talk about this
status,
err: err.clone(),
confirmation_status: Some(comfirmation_status.clone()),
});
};
// subscribers
if let Some((_sig, mut sink)) = self.signature_subscribers.remove(&sig) {
// info!("notification {}", sig);
// none if transaction succeeded
sink.send(&RpcResponse {
context: RpcResponseContext {
slot,
api_version: None,
},
value: serde_json::json!({ "err": err }),
})?;
}
}
}
bail!("Stopped Listening to {commitment:?} blocks")
})
}
}

72
src/workers/cleaner.rs Normal file
View File

@ -0,0 +1,72 @@
use std::time::Duration;
use log::info;
use tokio::task::JoinHandle;
use super::{BlockListener, TxSender};
/// Background worker which cleans up memory
#[derive(Clone)]
pub struct Cleaner<const N: usize> {
tx_sender: TxSender,
block_listeners: [BlockListener; N],
}
impl<const N: usize> Cleaner<N> {
pub fn new(tx_sender: TxSender, block_listeners: [BlockListener; N]) -> Self {
Self {
tx_sender,
block_listeners,
}
}
pub fn clean_tx_sender(&self, ttl_duration: Duration) {
let mut to_remove = vec![];
for tx in self.tx_sender.txs_sent.iter() {
if tx.sent_at.elapsed() >= ttl_duration {
to_remove.push(tx.key().to_owned());
}
}
for to_remove in &to_remove {
self.tx_sender.txs_sent.remove(to_remove);
}
info!("Cleaned {} txs", to_remove.len());
}
/// Clean Signature Subscribers from Block Listeners
pub fn clean_block_listeners(&self) {
for block_listenser in &self.block_listeners {
let mut to_remove = vec![];
for subscriber in block_listenser.signature_subscribers.iter() {
if subscriber.value().is_closed() {
to_remove.push(subscriber.key().to_owned());
}
}
for to_remove in &to_remove {
block_listenser.signature_subscribers.remove(to_remove);
}
info!("Cleaned {} Signature Subscribers", to_remove.len());
}
}
pub fn start(self, ttl_duration: Duration) -> JoinHandle<anyhow::Result<()>> {
let mut ttl = tokio::time::interval(ttl_duration);
tokio::spawn(async move {
info!("Cleaning memory");
loop {
ttl.tick().await;
self.clean_tx_sender(ttl_duration);
self.clean_block_listeners();
}
})
}
}

View File

@ -0,0 +1,92 @@
use std::sync::Arc;
use log::{info, warn};
use solana_transaction_status::TransactionConfirmationStatus;
use tokio::{sync::RwLock, task::JoinHandle};
use super::TxSender;
use serde::{Deserialize, Serialize};
/// Background worker which captures metrics
#[derive(Clone)]
pub struct MetricsCapture {
tx_sender: TxSender,
metrics: Arc<RwLock<Metrics>>,
}
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
pub struct Metrics {
pub txs_sent: usize,
pub txs_confirmed: usize,
pub txs_finalized: usize,
pub txs_ps: usize,
pub txs_confirmed_ps: usize,
pub txs_finalized_ps: usize,
pub mem_used: Option<usize>,
}
impl MetricsCapture {
pub fn new(tx_sender: TxSender) -> Self {
Self {
tx_sender,
metrics: Default::default(),
}
}
pub async fn get_metrics(&self) -> Metrics {
self.metrics.read().await.to_owned()
}
pub fn capture(self) -> JoinHandle<anyhow::Result<()>> {
let mut one_second = tokio::time::interval(std::time::Duration::from_secs(1));
tokio::spawn(async move {
info!("Capturing Metrics");
loop {
one_second.tick().await;
let txs_sent = self.tx_sender.txs_sent.len();
let mut txs_confirmed: usize = 0;
let mut txs_finalized: usize = 0;
for tx in self.tx_sender.txs_sent.iter() {
if let Some(tx) = &tx.value().status {
match tx.confirmation_status() {
TransactionConfirmationStatus::Confirmed => txs_confirmed += 1,
TransactionConfirmationStatus::Finalized => {
txs_confirmed += 1;
txs_finalized += 1;
}
_ => (),
}
}
}
let mut metrics = self.metrics.write().await;
metrics.txs_ps = txs_sent.checked_sub(metrics.txs_sent).unwrap_or_default();
metrics.txs_confirmed_ps = txs_confirmed
.checked_sub(metrics.txs_confirmed)
.unwrap_or_default();
metrics.txs_finalized_ps = txs_finalized
.checked_sub(metrics.txs_finalized)
.unwrap_or_default();
metrics.txs_sent = txs_sent;
metrics.txs_confirmed = txs_confirmed;
metrics.txs_finalized = txs_finalized;
metrics.mem_used = match procinfo::pid::statm_self() {
Ok(statm) => Some(statm.size),
Err(err) => {
warn!("Error capturing memory consumption {err}");
None
}
};
log::info!("{metrics:?}");
}
})
}
}

9
src/workers/mod.rs Normal file
View File

@ -0,0 +1,9 @@
mod block_listenser;
mod cleaner;
mod metrics_capture;
mod tx_sender;
pub use block_listenser::*;
pub use cleaner::*;
pub use metrics_capture::*;
pub use tx_sender::*;

121
src/workers/tx_sender.rs Normal file
View File

@ -0,0 +1,121 @@
use std::{
sync::Arc,
time::{Duration, Instant},
};
use dashmap::DashMap;
use log::{info, warn};
use solana_transaction_status::TransactionStatus;
use tokio::{sync::RwLock, task::JoinHandle};
use crate::tpu_manager::TpuManager;
pub type WireTransaction = Vec<u8>;
/// Retry transactions to a maximum of `u16` times, keep a track of confirmed transactions
#[derive(Clone)]
pub struct TxSender {
/// Tx(s) forwarded to tpu
pub txs_sent: Arc<DashMap<String, TxProps>>,
/// Transactions queue for retrying
enqueued_txs: Arc<RwLock<Vec<(String, WireTransaction)>>>,
/// TpuClient to call the tpu port
tpu_manager: Arc<TpuManager>,
}
/// Transaction Properties
pub struct TxProps {
pub status: Option<TransactionStatus>,
pub sent_at: Instant,
}
impl Default for TxProps {
fn default() -> Self {
Self {
status: Default::default(),
sent_at: Instant::now(),
}
}
}
impl TxSender {
pub fn new(tpu_manager: Arc<TpuManager>) -> Self {
Self {
enqueued_txs: Default::default(),
tpu_manager,
txs_sent: Default::default(),
}
}
/// en-queue transaction if it doesn't already exist
pub async fn enqnueue_tx(&self, sig: String, raw_tx: WireTransaction) {
self.enqueued_txs.write().await.push((sig, raw_tx));
}
/// retry enqued_tx(s)
pub async fn forward_txs(&self, tx_batch_size: usize) {
if self.enqueued_txs.read().await.is_empty() {
return;
}
let mut enqueued_txs = Vec::new();
std::mem::swap(&mut enqueued_txs, &mut *self.enqueued_txs.write().await);
let mut tx_remaining = enqueued_txs.len();
let mut enqueued_txs = enqueued_txs.into_iter();
let tpu_client = self.tpu_manager.clone();
let txs_sent = self.txs_sent.clone();
tokio::spawn(async move {
while tx_remaining != 0 {
let mut batch = Vec::with_capacity(tx_batch_size);
let mut sigs = Vec::with_capacity(tx_batch_size);
for (batched, (sig, tx)) in enqueued_txs.by_ref().enumerate() {
batch.push(tx);
sigs.push(sig);
tx_remaining -= 1;
if batched == tx_batch_size {
break;
}
}
match tpu_client.try_send_wire_transaction_batch(batch).await {
Ok(_) => {
for sig in sigs {
txs_sent.insert(sig, TxProps::default());
}
}
Err(err) => {
warn!("{err}");
}
}
}
});
}
/// retry and confirm transactions every 2ms (avg time to confirm tx)
pub fn execute(
self,
tx_batch_size: usize,
tx_send_interval: Duration,
) -> JoinHandle<anyhow::Result<()>> {
let mut interval = tokio::time::interval(tx_send_interval);
#[allow(unreachable_code)]
tokio::spawn(async move {
info!(
"Batching tx(s) with batch size of {tx_batch_size} every {}ms",
tx_send_interval.as_millis()
);
loop {
interval.tick().await;
self.forward_txs(tx_batch_size).await;
}
// to give the correct type to JoinHandle
Ok(())
})
}
}

View File

@ -1,45 +1,57 @@
use std::sync::Arc;
use bench::helpers::BenchHelper;
use lite_rpc::DEFAULT_LITE_RPC_ADDR;
use log::info;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_client::rpc_client::SerializableTransaction;
use solana_sdk::native_token::LAMPORTS_PER_SOL;
use solana_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction};
use solana_sdk::commitment_config::CommitmentConfig;
use lite_bench_utils::{generate_txs, new_funded_payer, wait_till_confirmed};
use lite_client::{LiteClient, LOCAL_LIGHT_RPC_ADDR};
use simplelog::*;
const AMOUNT: usize = 100;
const AMOUNT: usize = 5;
#[tokio::test]
async fn send_and_confirm_tx() {
TermLogger::init(
LevelFilter::Info,
Config::default(),
TerminalMode::Mixed,
ColorChoice::Auto,
)
.unwrap();
async fn send_and_confirm_txs_get_signature_statuses() {
tracing_subscriber::fmt::init();
let lite_client = LiteClient(RpcClient::new(LOCAL_LIGHT_RPC_ADDR.to_string()));
let funded_payer = new_funded_payer(&lite_client, LAMPORTS_PER_SOL * 2)
.await
.unwrap();
let rpc_client = Arc::new(RpcClient::new(DEFAULT_LITE_RPC_ADDR.to_string()));
let bench_helper = BenchHelper::new(rpc_client.clone());
let txs = generate_txs(AMOUNT, &lite_client.0, &funded_payer)
let funded_payer = bench_helper.get_payer().await.unwrap();
let txs = bench_helper
.generate_txs(AMOUNT, &funded_payer)
.await
.unwrap();
info!("Sending and Confirming {AMOUNT} tx(s)");
for tx in &txs {
lite_client.send_transaction(tx).await.unwrap();
info!("Tx {}", &tx.signatures[0]);
rpc_client.send_transaction(tx).await.unwrap();
info!("Tx {}", tx.get_signature());
}
for tx in &txs {
let sig = tx.get_signature();
info!("Confirming {sig}");
wait_till_confirmed(&lite_client, sig).await;
bench_helper
.wait_till_signature_status(sig, CommitmentConfig::confirmed())
.await
.unwrap();
}
info!("Sent and Confirmed {AMOUNT} tx(s)");
}
#[tokio::test]
async fn send_and_confirm_tx_rpc_client() {
let rpc_client = Arc::new(RpcClient::new(DEFAULT_LITE_RPC_ADDR.to_string()));
let bench_helper = BenchHelper::new(rpc_client.clone());
let funded_payer = bench_helper.get_payer().await.unwrap();
let tx = &bench_helper.generate_txs(1, &funded_payer).await.unwrap()[0];
let sig = tx.get_signature();
bench_helper.send_and_confirm_transaction(tx).await.unwrap();
info!("Sent and Confirmed {sig}");
}

27
tests/client.test.ts Normal file
View File

@ -0,0 +1,27 @@
import { Connection, Keypair, LAMPORTS_PER_SOL, SystemProgram, sendAndConfirmTransaction, Transaction, PublicKey } from "@solana/web3.js";
jest.setTimeout(60000);
test('send and confirm transaction', async () => {
const connection = new Connection('http://127.0.0.1:8890', 'confirmed');
const payer = Keypair.generate();
const toAccount = Keypair.generate().publicKey;
const airdropSignature = await connection.requestAirdrop(payer.publicKey, LAMPORTS_PER_SOL * 2);
console.log('airdrop signature ' + airdropSignature);
await connection.confirmTransaction(airdropSignature, 'finalized');
console.log('confirmed');
const transaction = new Transaction();
// Add an instruction to execute
transaction.add(
SystemProgram.transfer({
fromPubkey: payer.publicKey,
toPubkey: toAccount,
lamports: LAMPORTS_PER_SOL,
}),
);
await sendAndConfirmTransaction(connection, transaction, [payer]);
});

View File

@ -1,31 +0,0 @@
import { Connection, Keypair, LAMPORTS_PER_SOL, Message, VersionedTransaction } from "@solana/web3.js";
import { url } from "./urls";
jest.setTimeout(60000);
test('send and confirm transaction', async () => {
const connection = new Connection(url, 'confirmed');
const payer = Keypair.generate();
await connection.requestAirdrop(payer.publicKey, LAMPORTS_PER_SOL);
const recentBlockhash = (await connection.getLatestBlockhash('confirmed')).blockhash;
const versionedTx = new VersionedTransaction(
new Message({
header: {
numRequiredSignatures: 1,
numReadonlySignedAccounts: 0,
numReadonlyUnsignedAccounts: 0,
},
recentBlockhash,
instructions: [],
accountKeys: [payer.publicKey.toBase58()],
}),
);
versionedTx.sign([payer]);
const signature = await connection.sendTransaction(versionedTx);
const latestBlockHash = await connection.getLatestBlockhash();
await connection.confirmTransaction({
blockhash: latestBlockHash.blockhash,
lastValidBlockHeight: latestBlockHash.lastValidBlockHeight,
signature: signature,
});
});

View File

@ -1 +0,0 @@
export const url = "http://127.0.0.1:9000";

89
tests/workers.rs Normal file
View File

@ -0,0 +1,89 @@
use std::{sync::Arc, time::Duration};
use bench::helpers::BenchHelper;
use futures::future::try_join_all;
use lite_rpc::{
encoding::BinaryEncoding,
tpu_manager::TpuManager,
workers::{BlockListener, TxSender},
DEFAULT_LITE_RPC_ADDR, DEFAULT_RPC_ADDR, DEFAULT_TX_BATCH_INTERVAL_MS, DEFAULT_TX_BATCH_SIZE,
DEFAULT_WS_ADDR,
};
use solana_client::nonblocking::{pubsub_client::PubsubClient, rpc_client::RpcClient};
use solana_sdk::commitment_config::CommitmentConfig;
use solana_transaction_status::TransactionConfirmationStatus;
#[tokio::test]
async fn send_and_confirm_txs() {
let rpc_client = Arc::new(RpcClient::new(DEFAULT_RPC_ADDR.to_string()));
let lite_client = Arc::new(RpcClient::new(DEFAULT_LITE_RPC_ADDR.to_string()));
let bench_helper = BenchHelper::new(lite_client.clone());
let tpu_client = Arc::new(
TpuManager::new(
rpc_client.clone(),
DEFAULT_WS_ADDR.into(),
Default::default(),
)
.await
.unwrap(),
);
let pub_sub_client = Arc::new(PubsubClient::new(DEFAULT_WS_ADDR).await.unwrap());
let tx_sender = TxSender::new(tpu_client);
let block_listener = BlockListener::new(
pub_sub_client.clone(),
rpc_client.clone(),
tx_sender.clone(),
CommitmentConfig::confirmed(),
)
.await
.unwrap();
let services = try_join_all(vec![
block_listener.clone().listen(),
tx_sender.clone().execute(
DEFAULT_TX_BATCH_SIZE,
Duration::from_millis(DEFAULT_TX_BATCH_INTERVAL_MS),
),
]);
let confirm = tokio::spawn(async move {
let funded_payer = bench_helper.get_payer().await.unwrap();
let blockhash = rpc_client.get_latest_blockhash().await.unwrap();
let tx = bench_helper.create_transaction(&funded_payer, blockhash);
let sig = tx.signatures[0];
let tx = BinaryEncoding::Base58.encode(bincode::serialize(&tx).unwrap());
let sig = sig.to_string();
tx_sender
.enqnueue_tx(sig.clone(), tx.as_bytes().to_vec())
.await;
for _ in 0..2 {
let tx_status = tx_sender.txs_sent.get(&sig).unwrap();
if let Some(tx_status) = &tx_status.value().status {
if tx_status.confirmation_status() == TransactionConfirmationStatus::Confirmed {
return;
}
}
tokio::time::sleep(Duration::from_millis(800)).await;
}
panic!("Tx {sig} not confirmed in 1600ms");
});
tokio::select! {
_ = services => {
panic!("Services stopped unexpectedly")
},
_ = confirm => {}
}
}

View File

@ -11,7 +11,7 @@
// "disableReferencedProjectLoad": true, /* Reduce the number of projects loaded automatically by TypeScript. */
/* Language and Environment */
"target": "es2022", /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */
"target": "es2016", /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */
// "lib": [], /* Specify a set of bundled library declaration files that describe the target runtime environment. */
// "jsx": "preserve", /* Specify what JSX code is generated. */
// "experimentalDecorators": true, /* Enable experimental support for TC39 stage 2 draft decorators. */
@ -35,7 +35,7 @@
// "types": [], /* Specify type package names to be included without being referenced in a source file. */
// "allowUmdGlobalAccess": true, /* Allow accessing UMD globals from modules. */
// "moduleSuffixes": [], /* List of file name suffixes to search when resolving a module. */
"resolveJsonModule": true, /* Enable importing .json files. */
// "resolveJsonModule": true, /* Enable importing .json files. */
// "noResolve": true, /* Disallow 'import's, 'require's or '<reference>'s from expanding the number of files TypeScript should add to a project. */
/* JavaScript Support */
@ -76,7 +76,28 @@
"forceConsistentCasingInFileNames": true, /* Ensure that casing is correct in imports. */
/* Type Checking */
// "strict": true, /* Enable all strict type-checking options. */
"strict": true, /* Enable all strict type-checking options. */
// "noImplicitAny": true, /* Enable error reporting for expressions and declarations with an implied 'any' type. */
// "strictNullChecks": true, /* When type checking, take into account 'null' and 'undefined'. */
// "strictFunctionTypes": true, /* When assigning functions, check to ensure parameters and the return values are subtype-compatible. */
// "strictBindCallApply": true, /* Check that the arguments for 'bind', 'call', and 'apply' methods match the original function. */
// "strictPropertyInitialization": true, /* Check for class properties that are declared but not set in the constructor. */
// "noImplicitThis": true, /* Enable error reporting when 'this' is given the type 'any'. */
// "useUnknownInCatchVariables": true, /* Default catch clause variables as 'unknown' instead of 'any'. */
// "alwaysStrict": true, /* Ensure 'use strict' is always emitted. */
// "noUnusedLocals": true, /* Enable error reporting when local variables aren't read. */
// "noUnusedParameters": true, /* Raise an error when a function parameter isn't read. */
// "exactOptionalPropertyTypes": true, /* Interpret optional property types as written, rather than adding 'undefined'. */
// "noImplicitReturns": true, /* Enable error reporting for codepaths that do not explicitly return in a function. */
// "noFallthroughCasesInSwitch": true, /* Enable error reporting for fallthrough cases in switch statements. */
// "noUncheckedIndexedAccess": true, /* Add 'undefined' to a type when accessed using an index. */
// "noImplicitOverride": true, /* Ensure overriding members in derived classes are marked with an override modifier. */
// "noPropertyAccessFromIndexSignature": true, /* Enforces using indexed accessors for keys declared using an indexed type. */
// "allowUnusedLabels": true, /* Disable error reporting for unused labels. */
// "allowUnreachableCode": true, /* Disable error reporting for unreachable code. */
/* Completeness */
// "skipDefaultLibCheck": true, /* Skip type checking .d.ts files that are included with TypeScript. */
"skipLibCheck": true /* Skip type checking all .d.ts files. */
}
}

1200
yarn.lock

File diff suppressed because it is too large Load Diff