moved
This commit is contained in:
parent
91cd396d9f
commit
481e6f949e
|
@ -1,5 +1 @@
|
|||
/target
|
||||
/node_modules
|
||||
/test-ledger
|
||||
/config
|
||||
**/validator.log
|
||||
target
|
||||
|
|
File diff suppressed because it is too large
Load Diff
76
Cargo.toml
76
Cargo.toml
|
@ -2,82 +2,40 @@
|
|||
name = "lite-rpc"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
description = "A lite version of solana rpc to send and confirm transactions"
|
||||
|
||||
[workspace]
|
||||
members = [
|
||||
"lite-client",
|
||||
"lite-bench-utils"
|
||||
"bench-utils"
|
||||
]
|
||||
|
||||
[[bench]]
|
||||
name="tps"
|
||||
harness=false
|
||||
|
||||
[package.metadata.docs.rs]
|
||||
targets = ["x86_64-unknown-linux-gnu"]
|
||||
|
||||
[dev-dependencies]
|
||||
lite-client = { path = "./lite-client" }
|
||||
bench-utils = { path = "./bench-utils" }
|
||||
csv = "1.1.6"
|
||||
serde = { version = "1", features = ["derive"]}
|
||||
lite-client = { path ="./lite-client" }
|
||||
lite-bench-utils = { path = "./lite-bench-utils" }
|
||||
log = "0.4.17"
|
||||
simplelog = "0.12.0"
|
||||
|
||||
[dependencies]
|
||||
solana-client = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-sdk = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-clap-utils = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-cli-config = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-pubsub-client = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-account-decoder = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-entry = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-faucet = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-gossip = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-ledger = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-measure = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-metrics = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-perf = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-poh = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-rayon-threadlimit = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-rpc-client-api = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-runtime = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-send-transaction-service = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-stake-program = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-storage-bigtable = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-streamer = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-tpu-client = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-transaction-status = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-version = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-vote-program = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-rpc = { git = "https://github.com/solana-labs/solana.git" }
|
||||
|
||||
tokio = { version = "1.14.1", features = ["full"]}
|
||||
tokio-util = { version = "0.6", features = ["codec", "compat"] }
|
||||
futures = "0.3.25"
|
||||
jsonrpc-core = "18.0.0"
|
||||
jsonrpc-core-client = { version = "18.0.0" }
|
||||
jsonrpc-derive = "18.0.0"
|
||||
jsonrpc-http-server = "18.0.0"
|
||||
jsonrpc-pubsub = "18.0.0"
|
||||
clap = { version = "4.0.29", features = ["derive"] }
|
||||
procinfo = "0.4.2"
|
||||
|
||||
base64 = "0.13.1"
|
||||
solana-transaction-status = { git = "https://github.com/solana-labs/solana.git" }
|
||||
actix-web = "4.2.1"
|
||||
serde = { version = "1.0.150", features = ["derive"] }
|
||||
serde_json = "1.0.89"
|
||||
tokio = { version = "1.23.0", features = ["full"]}
|
||||
bincode = "1.3.3"
|
||||
bs58 = "0.4.0"
|
||||
crossbeam-channel = "0.5.6"
|
||||
dashmap = "5.4.0"
|
||||
itertools = "0.10.5"
|
||||
libc = "0.2.138"
|
||||
log = "0.4.17"
|
||||
rayon = "1.6.1"
|
||||
regex = "1.7.0"
|
||||
serde = "1.0.149"
|
||||
serde_derive = "1.0.149"
|
||||
serde_json = "1.0.89"
|
||||
soketto = "0.7.1"
|
||||
spl-token = { version = "=3.5.0", features = ["no-entrypoint"] }
|
||||
spl-token-2022 = { version = "0.5.0", features = ["no-entrypoint"] }
|
||||
stream-cancel = "0.8.1"
|
||||
base64 = "0.20.0"
|
||||
thiserror = "1.0.37"
|
||||
futures = "0.3.25"
|
||||
bytes = "1.3.0"
|
||||
reqwest = "0.11.13"
|
||||
anyhow = "1.0.66"
|
||||
log = "0.4.17"
|
||||
simplelog = "0.12.0"
|
||||
clap = { version = "4.0.29", features = ["derive"] }
|
||||
|
|
21
LICENSE
21
LICENSE
|
@ -1,21 +0,0 @@
|
|||
MIT License
|
||||
|
||||
Copyright (c) 2022 Blockworks Foundation
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
59
README.md
59
README.md
|
@ -1,32 +1,39 @@
|
|||
# Solana Lite RPC
|
||||
# Light RPC
|
||||
|
||||
This project aims to create a lite rpc server which is responsible only for sending and confirming the transactions.
|
||||
The lite-rpc server will not have any ledger or banks.
|
||||
While sending transaction the lite rpc server will send the transaction to next few leader (FANOUT) and then use different strategies to confirm the transaction.
|
||||
The rpc server will also have a websocket port which is reponsible for just subscribing to slots and signatures.
|
||||
The lite rpc server will be optimized to confirm transactions which are forwarded to leader using the same server.
|
||||
This project is currently based on an unstable feature of block subscription of solana which can be enabled using `--rpc-pubsub-enable-block-subscription` while running rpc node.
|
||||
Submitting a [transaction](https://docs.solana.com/terminology#transaction) to be executed on the solana blockchain,
|
||||
requires the client to identify the next few leaders based on the
|
||||
[leader schedule](https://docs.solana.com/terminology#leader-schedule), look up their peering information in gossip and
|
||||
connect to them via the [quic protocol](https://en.wikipedia.org/wiki/QUIC). In order to simplify the
|
||||
process so it can be triggered from a web browser, most applications
|
||||
run full [validators](https://docs.solana.com/terminology#validator) that forward the transactions according to the
|
||||
protocol on behalf of the web browser. Running full solana [validators](https://docs.solana.com/terminology#validator)
|
||||
is incredibly resource intensive `(>256GB RAM)`, the goal of this
|
||||
project would be to create a specialized micro-service that allows
|
||||
to deploy this logic quickly and allows [horizontal scalability](https://en.wikipedia.org/wiki/Scalability) with
|
||||
commodity vms.
|
||||
|
||||
### Confirmation strategies
|
||||
1) Subscribing to blocks changes and checking the confirmations. (Under development)
|
||||
2) Subscribing to signatures with pool of rpc servers. (Under development)
|
||||
3) Listining to gossip protocol. (Future roadmap)
|
||||
## Test
|
||||
|
||||
## Build
|
||||
`cargo build`
|
||||
|
||||
## Run
|
||||
* For RPC node : `http://localhost:8899`,
|
||||
* Websocket : `http://localhost:8900` (Optional),
|
||||
* Port : `9000` Listening port for LiteRpc server,
|
||||
* Subscription Port : `9001` Listening port of websocket subscriptions for LiteRpc server,
|
||||
|
||||
|
||||
```
|
||||
cargo run --bin lite-rpc -- run --port 9000 --subscription-port 9001 --rpc-url http://localhost:8899
|
||||
*make sure `solana-test-validator` is running in the background*
|
||||
```bash
|
||||
$ cd ~ && solana-test-validator
|
||||
```
|
||||
|
||||
## Tests
|
||||
*run `light-rpc` test*
|
||||
```bash
|
||||
$ cargo test
|
||||
```
|
||||
cargo run --bin lite-rpc -- test
|
||||
```
|
||||
|
||||
## Bench
|
||||
|
||||
*make sure `solana-test-validator` is running in the background*
|
||||
```bash
|
||||
$ cd ~ && solana-test-validator
|
||||
```
|
||||
|
||||
*run `light-rpc` bench*
|
||||
```bash
|
||||
$ cargo bench
|
||||
```
|
||||
|
||||
Find a new file named `metrics.csv` in the project root.
|
||||
|
|
|
@ -1,14 +1,12 @@
|
|||
[package]
|
||||
name = "lite-bench-utils"
|
||||
name = "bench-utils"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
lite-client = { path ="../lite-client" }
|
||||
serde = { version = "1.0.149", features = ["derive"] }
|
||||
serde_json = "1.0.89"
|
||||
solana-client = { git = "https://github.com/solana-labs/solana.git" }
|
||||
solana-sdk = { git = "https://github.com/solana-labs/solana.git" }
|
||||
tokio = "1.14.1"
|
||||
lite-client = { path = "../lite-client" }
|
||||
log = "0.4.17"
|
||||
anyhow = "1.0.66"
|
||||
serde = "1.0.150"
|
|
@ -1,11 +1,8 @@
|
|||
pub mod metrics;
|
||||
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::bail;
|
||||
use lite_client::LiteClient;
|
||||
use log::info;
|
||||
use solana_client::nonblocking::rpc_client::RpcClient;
|
||||
use solana_sdk::commitment_config::CommitmentConfig;
|
||||
|
||||
use solana_sdk::hash::Hash;
|
||||
use solana_sdk::signature::Signature;
|
||||
|
@ -14,28 +11,26 @@ use solana_sdk::{
|
|||
transaction::Transaction,
|
||||
};
|
||||
|
||||
pub async fn new_funded_payer(lite_client: &LiteClient, amount: u64) -> anyhow::Result<Keypair> {
|
||||
pub async fn new_funded_payer(rpc_client: &RpcClient, amount: u64) -> anyhow::Result<Keypair> {
|
||||
let payer = Keypair::new();
|
||||
let payer_pubkey = payer.pubkey().to_string();
|
||||
|
||||
// request airdrop to payer
|
||||
let airdrop_sig = lite_client.request_airdrop(&payer.pubkey(), amount).await?;
|
||||
let airdrop_sig = rpc_client.request_airdrop(&payer.pubkey(), amount).await?;
|
||||
|
||||
info!("Air Dropping {payer_pubkey} with {amount}L");
|
||||
|
||||
thread::sleep(Duration::from_secs(12));
|
||||
|
||||
//loop {
|
||||
// if let Some(res) = lite_client
|
||||
// .get_signature_status_with_commitment(&airdrop_sig, CommitmentConfig::finalized())
|
||||
// .await?
|
||||
// {
|
||||
// match res {
|
||||
// Ok(_) => break,
|
||||
// Err(_) => bail!("Error air dropping {payer_pubkey}"),
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
loop {
|
||||
if let Some(res) = rpc_client
|
||||
.get_signature_status_with_commitment(&airdrop_sig, CommitmentConfig::finalized())
|
||||
.await?
|
||||
{
|
||||
match res {
|
||||
Ok(_) => break,
|
||||
Err(_) => bail!("Error air dropping {payer_pubkey}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("Air Drop Successful: {airdrop_sig}");
|
||||
|
||||
|
@ -43,7 +38,7 @@ pub async fn new_funded_payer(lite_client: &LiteClient, amount: u64) -> anyhow::
|
|||
}
|
||||
|
||||
pub async fn wait_till_confirmed(lite_client: &LiteClient, sig: &Signature) {
|
||||
while lite_client.confirm_transaction(sig.to_string()).await.value {}
|
||||
while lite_client.confirm_transaction(sig.to_string()).await {}
|
||||
}
|
||||
|
||||
pub fn create_transaction(funded_payer: &Keypair, blockhash: Hash) -> Transaction {
|
|
@ -0,0 +1,2 @@
|
|||
pub mod helpers;
|
||||
pub mod metrics;
|
|
@ -1,101 +0,0 @@
|
|||
import { Connection, Keypair, LAMPORTS_PER_SOL, PublicKey, TransactionSignature } from '@solana/web3.js';
|
||||
import * as fs from 'fs';
|
||||
import * as splToken from "@solana/spl-token";
|
||||
import * as os from 'os';
|
||||
|
||||
// number of users
|
||||
const tps : number = +process.argv[2];
|
||||
const forSeconds : number = +process.argv[3];
|
||||
// url
|
||||
const url = process.argv.length > 4 ? process.argv[4] : "http://localhost:8899";
|
||||
const skip_confirmations = process.argv.length > 5 ? process.argv[5] === "true": false;
|
||||
import * as InFile from "./out.json";
|
||||
|
||||
function sleep(ms: number) {
|
||||
return new Promise( resolve => setTimeout(resolve, ms) );
|
||||
}
|
||||
|
||||
console.log("benching " + tps + " transactions per second on " + url + " for " + forSeconds + " seconds");
|
||||
|
||||
export async function main() {
|
||||
|
||||
const connection = new Connection(url, 'confirmed');
|
||||
const authority = Keypair.fromSecretKey(
|
||||
Uint8Array.from(
|
||||
JSON.parse(
|
||||
process.env.KEYPAIR ||
|
||||
fs.readFileSync(os.homedir() + '/.config/solana/id.json', 'utf-8'),
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
const users = InFile.users.map(x => Keypair.fromSecretKey(Uint8Array.from(x.secretKey)));
|
||||
const userAccounts = InFile.tokenAccounts.map(x => new PublicKey(x));
|
||||
let promises_to_unpack : Promise<TransactionSignature>[][] = [];
|
||||
|
||||
for (let i = 0; i<forSeconds; ++i)
|
||||
{
|
||||
const start = performance.now();
|
||||
let promises : Promise<TransactionSignature>[] = [];
|
||||
for (let j=0; j<tps; ++j)
|
||||
{
|
||||
const toIndex = Math.floor(Math.random() * users.length);
|
||||
let fromIndex = toIndex;
|
||||
while (fromIndex === toIndex)
|
||||
{
|
||||
fromIndex = Math.floor(Math.random() * users.length);
|
||||
}
|
||||
const userFrom = userAccounts[fromIndex];
|
||||
const userTo = userAccounts[toIndex];
|
||||
if(skip_confirmations === false) {
|
||||
promises.push(
|
||||
splToken.transfer(
|
||||
connection,
|
||||
authority,
|
||||
userFrom,
|
||||
userTo,
|
||||
users[fromIndex],
|
||||
100,
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
if (skip_confirmations === false)
|
||||
{
|
||||
promises_to_unpack.push(promises)
|
||||
}
|
||||
const end = performance.now();
|
||||
const diff = (end - start);
|
||||
if (diff > 0) {
|
||||
await sleep(1000 - diff)
|
||||
}
|
||||
}
|
||||
|
||||
console.log('checking for confirmations');
|
||||
if(skip_confirmations === false) {
|
||||
const size = promises_to_unpack.length
|
||||
let successes : Uint32Array = new Uint32Array(size).fill(0);
|
||||
let failures : Uint32Array = new Uint32Array(size).fill(0);
|
||||
for (let i=0; i< size; ++i)
|
||||
{
|
||||
const promises = promises_to_unpack[i];
|
||||
|
||||
await Promise.all( promises.map( promise => {
|
||||
promise.then((_fullfil)=>{
|
||||
Atomics.add(successes, i, 1);
|
||||
},
|
||||
(_reject)=>{
|
||||
Atomics.add(failures, i, 1);
|
||||
})
|
||||
}))
|
||||
}
|
||||
console.log("sucesses " + successes)
|
||||
console.log("failures " + failures)
|
||||
}
|
||||
}
|
||||
|
||||
main().then(x => {
|
||||
console.log('finished sucessfully')
|
||||
}).catch(e => {
|
||||
console.log('caught an error : ' + e)
|
||||
})
|
|
@ -1,11 +1,12 @@
|
|||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use lite_bench_utils::{
|
||||
generate_txs,
|
||||
use bench_utils::{
|
||||
helpers::{generate_txs, new_funded_payer},
|
||||
metrics::{AvgMetric, Metric},
|
||||
new_funded_payer,
|
||||
};
|
||||
use log::info;
|
||||
use solana_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction};
|
||||
|
@ -15,8 +16,8 @@ use lite_client::{LiteClient, LOCAL_LIGHT_RPC_ADDR};
|
|||
use simplelog::*;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
const NUM_OF_TXS: usize = 10_000;
|
||||
const NUM_OF_RUNS: usize = 3;
|
||||
const NUM_OF_TXS: usize = 20_000;
|
||||
const NUM_OF_RUNS: usize = 5;
|
||||
const CSV_FILE_NAME: &str = "metrics.csv";
|
||||
|
||||
#[tokio::main]
|
||||
|
@ -92,7 +93,7 @@ async fn foo(lite_client: Arc<LiteClient>) -> Metric {
|
|||
*time_elapsed_since_last_confirmed = Some(Instant::now())
|
||||
}
|
||||
|
||||
if lite_client.confirm_transaction(sig.clone()).await.value {
|
||||
if lite_client.confirm_transaction(sig.clone()).await {
|
||||
metrics.txs_confirmed += 1;
|
||||
to_remove_txs.push(sig.clone());
|
||||
} else if time_elapsed_since_last_confirmed.unwrap().elapsed()
|
||||
|
|
|
@ -1,76 +0,0 @@
|
|||
import { Connection, Keypair, LAMPORTS_PER_SOL, PublicKey } from '@solana/web3.js';
|
||||
import * as fs from 'fs';
|
||||
import * as splToken from "@solana/spl-token";
|
||||
import * as os from 'os';
|
||||
|
||||
// number of users
|
||||
const nbUsers = +process.argv[2];
|
||||
// url
|
||||
const url = process.argv.length > 3 ? process.argv[3] : "http://localhost:8899";
|
||||
// outfile
|
||||
const outFile = process.argv.length > 4 ? process.argv[4] : "out.json";
|
||||
|
||||
console.log("creating " + nbUsers + " Users on " + url + " out file " + outFile);
|
||||
|
||||
export async function main() {
|
||||
const connection = new Connection(url, 'confirmed');
|
||||
let authority = Keypair.fromSecretKey(
|
||||
Uint8Array.from(
|
||||
JSON.parse(
|
||||
process.env.KEYPAIR ||
|
||||
fs.readFileSync(os.homedir() + '/.config/solana/id.json', 'utf-8'),
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
let userKps = [...Array(nbUsers)].map(_x => Keypair.generate())
|
||||
let mint = await splToken.createMint(
|
||||
connection,
|
||||
authority,
|
||||
authority.publicKey,
|
||||
null,
|
||||
6,
|
||||
);
|
||||
let accounts = await Promise.all( userKps.map(x => {
|
||||
return splToken.createAccount(
|
||||
connection,
|
||||
authority,
|
||||
mint,
|
||||
x.publicKey,
|
||||
)
|
||||
}));
|
||||
|
||||
let res = await Promise.all( accounts.map(x=> {
|
||||
return splToken.mintTo(
|
||||
connection,
|
||||
authority,
|
||||
mint,
|
||||
x,
|
||||
authority,
|
||||
1_000_000_000_000,
|
||||
)
|
||||
}));
|
||||
|
||||
const users = userKps.map(x => {
|
||||
const info = {};
|
||||
info['publicKey'] = x.publicKey.toBase58();
|
||||
info['secretKey'] = Array.from(x.secretKey);
|
||||
return info;
|
||||
});
|
||||
|
||||
const data = {
|
||||
'users' : users,
|
||||
'tokenAccounts' : accounts,
|
||||
'mint' : mint,
|
||||
'minted_amount' : 1_000_000_000_000
|
||||
};
|
||||
|
||||
console.log('created ' + nbUsers + ' Users and minted 10^12 tokens for mint ' + mint);
|
||||
fs.writeFileSync(outFile, JSON.stringify(data));
|
||||
}
|
||||
|
||||
main().then(x => {
|
||||
console.log('finished sucessfully')
|
||||
}).catch(e => {
|
||||
console.log('caught an error : ' + e)
|
||||
})
|
|
@ -1,8 +0,0 @@
|
|||
module.exports = {
|
||||
preset: 'ts-jest',
|
||||
testEnvironment: 'node',
|
||||
transform: {
|
||||
'^.+\\.ts?$': 'ts-jest',
|
||||
},
|
||||
transformIgnorePatterns: ['<rootDir>/node_modules/'],
|
||||
};
|
|
@ -1,8 +0,0 @@
|
|||
test:
|
||||
cargo test send_and_confirm_tx -- --nocapture
|
||||
|
||||
benchmark:
|
||||
cargo bench
|
||||
|
||||
clean:
|
||||
cargo clean
|
|
@ -1,9 +1,6 @@
|
|||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
use solana_client::{
|
||||
nonblocking::rpc_client::RpcClient, rpc_request::RpcRequest,
|
||||
rpc_response::Response as RpcResponse,
|
||||
};
|
||||
use solana_client::{nonblocking::rpc_client::RpcClient, rpc_request::RpcRequest};
|
||||
|
||||
pub const LOCAL_LIGHT_RPC_ADDR: &str = "http://127.0.0.1:8890";
|
||||
|
||||
|
@ -24,7 +21,7 @@ impl DerefMut for LiteClient {
|
|||
}
|
||||
|
||||
impl LiteClient {
|
||||
pub async fn confirm_transaction(&self, signature: String) -> RpcResponse<bool> {
|
||||
pub async fn confirm_transaction(&self, signature: String) -> bool {
|
||||
self.send(
|
||||
RpcRequest::Custom {
|
||||
method: "confirmTransaction",
|
||||
|
@ -35,4 +32,3 @@ impl LiteClient {
|
|||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
10
metrics.csv
10
metrics.csv
|
@ -1,5 +1,7 @@
|
|||
time_elapsed_sec,txs_sent,txs_confirmed,txs_un_confirmed,tps
|
||||
3.72111197,10000,9836,164,2643.2958963070387
|
||||
3.808483126,10000,9999,1,2625.4547202108306
|
||||
4.130764515,10000,9999,1,2420.617288565044
|
||||
3.8867865370000003,10000,9944,55,2563.122635027638
|
||||
12.032304419,20000,19171,829,1593.294129903114
|
||||
12.04903728,20000,19173,827,1591.2474627184488
|
||||
11.402919085,20000,19080,920,1673.2557565105267
|
||||
9.711972646,20000,19162,838,1973.0286213164034
|
||||
10.691780508,20000,19105,895,1786.8866636108837
|
||||
11.1776027876,20000,19138,861,1723.5425268118754
|
||||
|
|
|
File diff suppressed because it is too large
Load Diff
27
package.json
27
package.json
|
@ -1,27 +0,0 @@
|
|||
{
|
||||
"name": "lightrpc-test",
|
||||
"version": "1.0.0",
|
||||
"repository": "",
|
||||
"author": "",
|
||||
"license": "MIT",
|
||||
"private": true,
|
||||
"dependencies": {
|
||||
"@solana/web3.js": "^1.62.0",
|
||||
"@types/jest": "^29.2.3",
|
||||
"jest": "^29.3.1",
|
||||
"start-server-and-test": "^1.14.0",
|
||||
"ts-jest": "^29.0.3",
|
||||
"@solana/spl-token": "^0.3.5",
|
||||
"ts-node": "^10.9.1",
|
||||
"yarn": "^1.22.19"
|
||||
},
|
||||
"scripts": {
|
||||
"test": "jest --detectOpenHandles",
|
||||
"test:literpc": "start-server-and-test 'target/debug/lite-rpc --port 9000 --subscription-port 9001 --rpc-url http://localhost:8899 --websocket-url ws://localhost:8900/' http://localhost:8899/health test",
|
||||
"test:validator": "start-server-and-test ' ./scripts/run.sh & target/debug/lite-rpc --port 9000 --subscription-port 9001 --rpc-url http://localhost:8899 --websocket-url ws://localhost:8900/' http://localhost:8899/health test"
|
||||
},
|
||||
"devDependencies": {
|
||||
"typescript": "^4.8.4"
|
||||
}
|
||||
}
|
||||
|
123
scripts/run.sh
123
scripts/run.sh
|
@ -1,123 +0,0 @@
|
|||
#!/usr/bin/env bash
|
||||
#
|
||||
# Run a minimal Solana cluster. Ctrl-C to exit.
|
||||
#
|
||||
# Before running this script ensure standard Solana programs are available
|
||||
# in the PATH, or that `cargo build` ran successfully
|
||||
#
|
||||
set -e
|
||||
|
||||
# Prefer possible `cargo build` binaries over PATH binaries
|
||||
script_dir="$(readlink -f "$(dirname "$0")")"
|
||||
if [[ "$script_dir" =~ /scripts$ ]]; then
|
||||
cd "$script_dir/.."
|
||||
else
|
||||
cd "$script_dir"
|
||||
fi
|
||||
|
||||
|
||||
profile=debug
|
||||
if [[ -n $NDEBUG ]]; then
|
||||
profile=release
|
||||
fi
|
||||
PATH=$PWD/target/$profile:$PATH
|
||||
|
||||
ok=true
|
||||
for program in solana-{faucet,genesis,keygen,validator}; do
|
||||
$program -V || ok=false
|
||||
done
|
||||
$ok || {
|
||||
echo
|
||||
echo "Unable to locate required programs. Try building them first with:"
|
||||
echo
|
||||
echo " $ cargo build --all"
|
||||
echo
|
||||
exit 1
|
||||
}
|
||||
|
||||
export RUST_LOG=${RUST_LOG:-solana=info,solana_runtime::message_processor=debug} # if RUST_LOG is unset, default to info
|
||||
export RUST_BACKTRACE=1
|
||||
dataDir=$PWD/config/"$(basename "$0" .sh)"
|
||||
ledgerDir=$PWD/config/ledger
|
||||
|
||||
SOLANA_RUN_SH_CLUSTER_TYPE=${SOLANA_RUN_SH_CLUSTER_TYPE:-development}
|
||||
|
||||
set -x
|
||||
if ! solana address; then
|
||||
echo Generating default keypair
|
||||
solana-keygen new --no-passphrase
|
||||
fi
|
||||
validator_identity="$dataDir/validator-identity.json"
|
||||
if [[ -e $validator_identity ]]; then
|
||||
echo "Use existing validator keypair"
|
||||
else
|
||||
solana-keygen new --no-passphrase -so "$validator_identity"
|
||||
fi
|
||||
validator_vote_account="$dataDir/validator-vote-account.json"
|
||||
if [[ -e $validator_vote_account ]]; then
|
||||
echo "Use existing validator vote account keypair"
|
||||
else
|
||||
solana-keygen new --no-passphrase -so "$validator_vote_account"
|
||||
fi
|
||||
validator_stake_account="$dataDir/validator-stake-account.json"
|
||||
if [[ -e $validator_stake_account ]]; then
|
||||
echo "Use existing validator stake account keypair"
|
||||
else
|
||||
solana-keygen new --no-passphrase -so "$validator_stake_account"
|
||||
fi
|
||||
|
||||
if [[ -e "$ledgerDir"/genesis.bin || -e "$ledgerDir"/genesis.tar.bz2 ]]; then
|
||||
echo "Use existing genesis"
|
||||
else
|
||||
./fetch-spl.sh
|
||||
if [[ -r spl-genesis-args.sh ]]; then
|
||||
SPL_GENESIS_ARGS=$(cat spl-genesis-args.sh)
|
||||
fi
|
||||
|
||||
# shellcheck disable=SC2086
|
||||
solana-genesis \
|
||||
--hashes-per-tick sleep \
|
||||
--faucet-lamports 500000000000000000 \
|
||||
--bootstrap-validator \
|
||||
"$validator_identity" \
|
||||
"$validator_vote_account" \
|
||||
"$validator_stake_account" \
|
||||
--ledger "$ledgerDir" \
|
||||
--cluster-type "$SOLANA_RUN_SH_CLUSTER_TYPE" \
|
||||
$SPL_GENESIS_ARGS \
|
||||
$SOLANA_RUN_SH_GENESIS_ARGS
|
||||
fi
|
||||
|
||||
abort() {
|
||||
set +e
|
||||
kill "$faucet" "$validator"
|
||||
wait "$validator"
|
||||
}
|
||||
trap abort INT TERM EXIT
|
||||
|
||||
solana-faucet &
|
||||
faucet=$!
|
||||
|
||||
args=(
|
||||
--identity "$validator_identity"
|
||||
--vote-account "$validator_vote_account"
|
||||
--ledger "$ledgerDir"
|
||||
--gossip-port 8001
|
||||
--full-rpc-api
|
||||
--rpc-port 8899
|
||||
--log validator.log
|
||||
--rpc-faucet-address 127.0.0.1:9900
|
||||
--enable-rpc-transaction-history
|
||||
--enable-extended-tx-metadata-storage
|
||||
--init-complete-file "$dataDir"/init-completed
|
||||
--snapshot-compression none
|
||||
--require-tower
|
||||
--no-wait-for-vote-to-start-leader
|
||||
--no-os-network-limits-test
|
||||
--rpc-pubsub-enable-block-subscription
|
||||
)
|
||||
# shellcheck disable=SC2086
|
||||
solana-validator "${args[@]}" $SOLANA_RUN_SH_VALIDATOR_ARGS &
|
||||
validator=$!
|
||||
|
||||
wait "$validator"
|
|
@ -0,0 +1,172 @@
|
|||
use crate::{
|
||||
configs::SendTransactionConfig,
|
||||
encoding::BinaryEncoding,
|
||||
rpc::{
|
||||
ConfirmTransactionParams, JsonRpcError, JsonRpcReq, JsonRpcRes, RpcMethod,
|
||||
SendTransactionParams,
|
||||
},
|
||||
workers::{BlockListener, TxSender},
|
||||
DEFAULT_TX_MAX_RETRIES,
|
||||
};
|
||||
|
||||
use std::{net::ToSocketAddrs, str::FromStr, sync::Arc};
|
||||
|
||||
use actix_web::{web, App, HttpServer, Responder};
|
||||
use reqwest::Url;
|
||||
|
||||
use solana_client::{
|
||||
nonblocking::{rpc_client::RpcClient, tpu_client::TpuClient},
|
||||
rpc_response::RpcVersionInfo,
|
||||
};
|
||||
use solana_sdk::{signature::Signature, transaction::VersionedTransaction};
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
/// A bridge between clients and tpu
|
||||
pub struct LightBridge {
|
||||
pub tpu_client: Arc<TpuClient>,
|
||||
pub rpc_url: Url,
|
||||
pub tx_sender: TxSender,
|
||||
pub block_listner: BlockListener,
|
||||
}
|
||||
|
||||
impl LightBridge {
|
||||
pub async fn new(rpc_url: reqwest::Url, ws_addr: &str) -> anyhow::Result<Self> {
|
||||
let rpc_client = Arc::new(RpcClient::new(rpc_url.to_string()));
|
||||
|
||||
let tpu_client =
|
||||
Arc::new(TpuClient::new(rpc_client.clone(), ws_addr, Default::default()).await?);
|
||||
|
||||
let block_listner = BlockListener::new(rpc_client.clone(), ws_addr).await?;
|
||||
|
||||
Ok(Self {
|
||||
tx_sender: TxSender::new(tpu_client.clone(), block_listner.clone()),
|
||||
block_listner,
|
||||
rpc_url,
|
||||
tpu_client,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn send_transaction(
|
||||
&self,
|
||||
SendTransactionParams(
|
||||
tx,
|
||||
SendTransactionConfig {
|
||||
skip_preflight: _, //TODO:
|
||||
preflight_commitment: _, //TODO:
|
||||
encoding,
|
||||
max_retries,
|
||||
min_context_slot: _, //TODO:
|
||||
},
|
||||
): SendTransactionParams,
|
||||
) -> Result<String, JsonRpcError> {
|
||||
let raw_tx = encoding.decode(tx)?;
|
||||
|
||||
let sig = bincode::deserialize::<VersionedTransaction>(&raw_tx)?.signatures[0];
|
||||
|
||||
self.tpu_client.send_wire_transaction(raw_tx.clone()).await;
|
||||
|
||||
self.tx_sender
|
||||
.enqnueue_tx(sig, raw_tx, max_retries.unwrap_or(DEFAULT_TX_MAX_RETRIES))
|
||||
.await;
|
||||
|
||||
Ok(BinaryEncoding::Base58.encode(sig))
|
||||
}
|
||||
|
||||
pub async fn confirm_transaction(
|
||||
&self,
|
||||
ConfirmTransactionParams(sig, _): ConfirmTransactionParams,
|
||||
) -> Result<bool, JsonRpcError> {
|
||||
let sig = Signature::from_str(&sig)?;
|
||||
|
||||
Ok(self.block_listner.confirm_tx(sig).await.is_some())
|
||||
}
|
||||
|
||||
pub fn get_version(&self) -> RpcVersionInfo {
|
||||
let version = solana_version::Version::default();
|
||||
RpcVersionInfo {
|
||||
solana_core: version.to_string(),
|
||||
feature_set: Some(version.feature_set),
|
||||
}
|
||||
}
|
||||
|
||||
/// Serialize params and execute the specified method
|
||||
pub async fn execute_rpc_request(
|
||||
&self,
|
||||
JsonRpcReq { method, params }: JsonRpcReq,
|
||||
) -> Result<serde_json::Value, JsonRpcError> {
|
||||
match method {
|
||||
RpcMethod::SendTransaction => Ok(self
|
||||
.send_transaction(serde_json::from_value(params)?)
|
||||
.await?
|
||||
.into()),
|
||||
RpcMethod::ConfirmTransaction => Ok(self
|
||||
.confirm_transaction(serde_json::from_value(params)?)
|
||||
.await?
|
||||
.into()),
|
||||
RpcMethod::GetVersion => Ok(serde_json::to_value(self.get_version()).unwrap()),
|
||||
RpcMethod::Other => unreachable!("Other Rpc Methods should be handled externally"),
|
||||
}
|
||||
}
|
||||
|
||||
/// List for `JsonRpc` requests
|
||||
pub fn start_services(
|
||||
self,
|
||||
addr: impl ToSocketAddrs + Send + 'static,
|
||||
) -> Vec<JoinHandle<anyhow::Result<()>>> {
|
||||
let this = Arc::new(self);
|
||||
let tx_sender = this.tx_sender.clone().execute();
|
||||
let block_listenser = this.block_listner.clone().listen();
|
||||
|
||||
let json_cfg = web::JsonConfig::default().error_handler(|err, req| {
|
||||
let err = JsonRpcRes::Err(serde_json::Value::String(format!("{err}")))
|
||||
.respond_to(req)
|
||||
.into_body();
|
||||
actix_web::error::ErrorBadRequest(err)
|
||||
});
|
||||
|
||||
let server = tokio::spawn(async move {
|
||||
let server = HttpServer::new(move || {
|
||||
App::new()
|
||||
.app_data(web::Data::new(this.clone()))
|
||||
.app_data(json_cfg.clone())
|
||||
.route("/", web::post().to(Self::rpc_route))
|
||||
})
|
||||
.bind(addr)?
|
||||
.run();
|
||||
|
||||
server.await?;
|
||||
|
||||
Ok(())
|
||||
});
|
||||
|
||||
vec![server, block_listenser, tx_sender]
|
||||
}
|
||||
|
||||
async fn rpc_route(body: bytes::Bytes, state: web::Data<Arc<LightBridge>>) -> JsonRpcRes {
|
||||
let json_rpc_req = match serde_json::from_slice::<JsonRpcReq>(&body) {
|
||||
Ok(json_rpc_req) => json_rpc_req,
|
||||
Err(err) => return JsonRpcError::SerdeError(err).into(),
|
||||
};
|
||||
|
||||
if let RpcMethod::Other = json_rpc_req.method {
|
||||
let res = reqwest::Client::new()
|
||||
.post(state.rpc_url.clone())
|
||||
.body(body)
|
||||
.header("Content-Type", "application/json")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
JsonRpcRes::Raw {
|
||||
status: res.status().as_u16(),
|
||||
body: res.text().await.unwrap(),
|
||||
}
|
||||
} else {
|
||||
state
|
||||
.execute_rpc_request(json_rpc_req)
|
||||
.await
|
||||
.try_into()
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
}
|
70
src/cli.rs
70
src/cli.rs
|
@ -1,67 +1,13 @@
|
|||
use clap::Subcommand;
|
||||
|
||||
use crate::{DEFAULT_LITE_RPC_ADDR, DEFAULT_RPC_ADDR, DEFAULT_WS_ADDR};
|
||||
use clap::Parser;
|
||||
|
||||
/// Holds the configuration for a single run of the benchmark
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(
|
||||
version,
|
||||
about = "A lite version of solana rpc to send and confirm transactions.",
|
||||
long_about = "Lite rpc is optimized to send and confirm transactions for solana blockchain. \
|
||||
When it recieves a transaction it will directly send it to next few leaders. It then adds the signature into internal map. It listen to block subscriptions for confirmed and finalized blocks. \
|
||||
It also has a websocket port for subscription to onSlotChange and onSignature subscriptions. \
|
||||
"
|
||||
)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
pub struct Args {
|
||||
#[clap(subcommand)]
|
||||
pub command: Command,
|
||||
/*
|
||||
#[arg(short, long, default_value_t = String::from("8899"))]
|
||||
pub port: String,
|
||||
#[arg(short, long, default_value_t = String::from("8900"))]
|
||||
pub subscription_port: String,
|
||||
#[arg(short, long, default_value_t = String::from("http://localhost:8899"))]
|
||||
pub rpc_url: String,
|
||||
#[arg(short, long, default_value_t = String::new())]
|
||||
pub websocket_url: String,
|
||||
*/
|
||||
}
|
||||
/*
|
||||
impl Args {
|
||||
|
||||
pub fn resolve_address(&mut self) {
|
||||
|
||||
if self.rpc_url.is_empty() {
|
||||
let (_, rpc_url) = ConfigInput::compute_json_rpc_url_setting(
|
||||
self.rpc_url.as_str(),
|
||||
&ConfigInput::default().json_rpc_url,
|
||||
);
|
||||
self.rpc_url = rpc_url;
|
||||
}
|
||||
if self.websocket_url.is_empty() {
|
||||
let (_, ws_url) = ConfigInput::compute_websocket_url_setting(
|
||||
&self.websocket_url.as_str(),
|
||||
"",
|
||||
self.rpc_url.as_str(),
|
||||
"",
|
||||
);
|
||||
self.websocket_url = ws_url;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
*/
|
||||
#[derive(Subcommand, Debug)]
|
||||
pub enum Command {
|
||||
Run {
|
||||
#[arg(short, long, default_value_t = String::from("8899"))]
|
||||
port: String,
|
||||
#[arg(short, long, default_value_t = String::from("8900"))]
|
||||
subscription_port: String,
|
||||
#[arg(short, long, default_value_t = String::from("http://localhost:8899"))]
|
||||
rpc_url: String,
|
||||
#[arg(short, long, default_value_t = String::new())]
|
||||
websocket_url: String,
|
||||
},
|
||||
Test,
|
||||
#[arg(short, long, default_value_t = String::from(DEFAULT_RPC_ADDR))]
|
||||
pub rpc_addr: String,
|
||||
#[arg(short, long, default_value_t = String::from(DEFAULT_WS_ADDR))]
|
||||
pub ws_addr: String,
|
||||
#[arg(short, long, default_value_t = String::from(DEFAULT_LITE_RPC_ADDR))]
|
||||
pub lite_rpc_addr: String,
|
||||
}
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
use crate::encoding::BinaryEncoding;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use solana_sdk::commitment_config::CommitmentLevel;
|
||||
use solana_sdk::slot_history::Slot;
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct SendTransactionConfig {
|
||||
#[serde(default)]
|
||||
pub skip_preflight: bool,
|
||||
#[serde(default)]
|
||||
pub preflight_commitment: CommitmentLevel,
|
||||
#[serde(default)]
|
||||
pub encoding: BinaryEncoding,
|
||||
pub max_retries: Option<u16>,
|
||||
pub min_context_slot: Option<Slot>,
|
||||
}
|
338
src/context.rs
338
src/context.rs
|
@ -1,338 +0,0 @@
|
|||
use crossbeam_channel::Sender;
|
||||
use dashmap::DashMap;
|
||||
use serde::Serialize;
|
||||
use solana_client::{
|
||||
rpc_client::RpcClient,
|
||||
rpc_response::{ProcessedSignatureResult, RpcResponseContext, RpcSignatureResult, SlotInfo},
|
||||
};
|
||||
use solana_rpc::rpc_subscription_tracker::{
|
||||
SignatureSubscriptionParams, SubscriptionId, SubscriptionParams,
|
||||
};
|
||||
use solana_sdk::{
|
||||
commitment_config::{CommitmentConfig, CommitmentLevel},
|
||||
signature::Signature,
|
||||
};
|
||||
use std::{
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc, RwLock,
|
||||
},
|
||||
thread::{self, Builder, JoinHandle},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
pub struct BlockInformation {
|
||||
pub block_hash: RwLock<String>,
|
||||
pub block_height: AtomicU64,
|
||||
pub slot: AtomicU64,
|
||||
pub confirmation_level: CommitmentLevel,
|
||||
}
|
||||
|
||||
impl BlockInformation {
|
||||
pub fn new(rpc_client: Arc<RpcClient>, commitment: CommitmentLevel) -> Self {
|
||||
let slot = rpc_client
|
||||
.get_slot_with_commitment(CommitmentConfig { commitment })
|
||||
.unwrap();
|
||||
|
||||
let (blockhash, blockheight) = rpc_client
|
||||
.get_latest_blockhash_with_commitment(CommitmentConfig { commitment })
|
||||
.unwrap();
|
||||
|
||||
BlockInformation {
|
||||
block_hash: RwLock::new(blockhash.to_string()),
|
||||
block_height: AtomicU64::new(blockheight),
|
||||
slot: AtomicU64::new(slot),
|
||||
confirmation_level: commitment,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LiteRpcContext {
|
||||
pub signature_status: DashMap<String, Option<CommitmentLevel>>,
|
||||
pub finalized_block_info: BlockInformation,
|
||||
pub confirmed_block_info: BlockInformation,
|
||||
pub notification_sender: Sender<NotificationType>,
|
||||
}
|
||||
|
||||
impl LiteRpcContext {
|
||||
pub fn new(rpc_client: Arc<RpcClient>, notification_sender: Sender<NotificationType>) -> Self {
|
||||
LiteRpcContext {
|
||||
signature_status: DashMap::new(),
|
||||
confirmed_block_info: BlockInformation::new(
|
||||
rpc_client.clone(),
|
||||
CommitmentLevel::Confirmed,
|
||||
),
|
||||
finalized_block_info: BlockInformation::new(rpc_client, CommitmentLevel::Finalized),
|
||||
notification_sender,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SignatureNotification {
|
||||
pub signature: Signature,
|
||||
pub commitment: CommitmentLevel,
|
||||
pub slot: u64,
|
||||
pub error: Option<String>,
|
||||
}
|
||||
|
||||
pub struct SlotNotification {
|
||||
pub slot: u64,
|
||||
pub commitment: CommitmentLevel,
|
||||
pub parent: u64,
|
||||
pub root: u64,
|
||||
}
|
||||
|
||||
pub enum NotificationType {
|
||||
Signature(SignatureNotification),
|
||||
Slot(SlotNotification),
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct NotificationParams<T> {
|
||||
result: T,
|
||||
subscription: SubscriptionId,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct Notification<T> {
|
||||
jsonrpc: Option<jsonrpc_core::Version>,
|
||||
method: &'static str,
|
||||
params: NotificationParams<T>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
|
||||
pub struct Response<T> {
|
||||
pub context: RpcResponseContext,
|
||||
pub value: T,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
struct RpcNotificationResponse<T> {
|
||||
context: RpcNotificationContext,
|
||||
value: T,
|
||||
}
|
||||
|
||||
impl<T> From<RpcNotificationResponse<T>> for Response<T> {
|
||||
fn from(notification: RpcNotificationResponse<T>) -> Self {
|
||||
let RpcNotificationResponse {
|
||||
context: RpcNotificationContext { slot },
|
||||
value,
|
||||
} = notification;
|
||||
Self {
|
||||
context: RpcResponseContext {
|
||||
slot,
|
||||
api_version: None,
|
||||
},
|
||||
value,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
struct RpcNotificationContext {
|
||||
slot: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct LiteRpcNotification {
|
||||
pub subscription_id: SubscriptionId,
|
||||
pub is_final: bool,
|
||||
pub json: String,
|
||||
pub created_at: Instant,
|
||||
}
|
||||
|
||||
pub struct LiteRpcSubsrciptionControl {
|
||||
pub broadcast_sender: broadcast::Sender<LiteRpcNotification>,
|
||||
notification_reciever: crossbeam_channel::Receiver<NotificationType>,
|
||||
pub subscriptions: DashMap<SubscriptionParams, SubscriptionId>,
|
||||
pub last_subscription_id: AtomicU64,
|
||||
}
|
||||
|
||||
impl LiteRpcSubsrciptionControl {
|
||||
pub fn new(
|
||||
broadcast_sender: broadcast::Sender<LiteRpcNotification>,
|
||||
notification_reciever: crossbeam_channel::Receiver<NotificationType>,
|
||||
) -> Self {
|
||||
Self {
|
||||
broadcast_sender,
|
||||
notification_reciever,
|
||||
subscriptions: DashMap::new(),
|
||||
last_subscription_id: AtomicU64::new(2),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start_broadcasting(&self) {
|
||||
loop {
|
||||
let notification = self.notification_reciever.recv();
|
||||
match notification {
|
||||
Ok(notification_type) => {
|
||||
let rpc_notification = match notification_type {
|
||||
NotificationType::Signature(data) => {
|
||||
let signature_params = SignatureSubscriptionParams {
|
||||
commitment: CommitmentConfig {
|
||||
commitment: data.commitment,
|
||||
},
|
||||
signature: data.signature,
|
||||
enable_received_notification: false,
|
||||
};
|
||||
|
||||
let param = SubscriptionParams::Signature(signature_params);
|
||||
|
||||
match self.subscriptions.entry(param) {
|
||||
dashmap::mapref::entry::Entry::Occupied(x) => {
|
||||
let subscription_id = *x.get();
|
||||
let slot = data.slot;
|
||||
let value = Response::from(RpcNotificationResponse {
|
||||
context: RpcNotificationContext { slot },
|
||||
value: RpcSignatureResult::ProcessedSignature(
|
||||
ProcessedSignatureResult { err: None },
|
||||
),
|
||||
});
|
||||
|
||||
let notification = Notification {
|
||||
jsonrpc: Some(jsonrpc_core::Version::V2),
|
||||
method: &"signatureNotification",
|
||||
params: NotificationParams {
|
||||
result: value,
|
||||
subscription: subscription_id,
|
||||
},
|
||||
};
|
||||
let json = serde_json::to_string(¬ification).unwrap();
|
||||
Some(LiteRpcNotification {
|
||||
subscription_id: *x.get(),
|
||||
created_at: Instant::now(),
|
||||
is_final: false,
|
||||
json,
|
||||
})
|
||||
}
|
||||
dashmap::mapref::entry::Entry::Vacant(_x) => None,
|
||||
}
|
||||
}
|
||||
NotificationType::Slot(data) => {
|
||||
// SubscriptionId 0 will be used for slots
|
||||
let subscription_id = if data.commitment == CommitmentLevel::Confirmed {
|
||||
SubscriptionId::from(0)
|
||||
} else {
|
||||
SubscriptionId::from(1)
|
||||
};
|
||||
let value = SlotInfo {
|
||||
parent: data.parent,
|
||||
slot: data.slot,
|
||||
root: data.root,
|
||||
};
|
||||
|
||||
let notification = Notification {
|
||||
jsonrpc: Some(jsonrpc_core::Version::V2),
|
||||
method: &"slotNotification",
|
||||
params: NotificationParams {
|
||||
result: value,
|
||||
subscription: subscription_id,
|
||||
},
|
||||
};
|
||||
let json = serde_json::to_string(¬ification).unwrap();
|
||||
Some(LiteRpcNotification {
|
||||
subscription_id: subscription_id,
|
||||
created_at: Instant::now(),
|
||||
is_final: false,
|
||||
json,
|
||||
})
|
||||
}
|
||||
};
|
||||
if let Some(rpc_notification) = rpc_notification {
|
||||
self.broadcast_sender.send(rpc_notification).unwrap();
|
||||
}
|
||||
}
|
||||
Err(_e) => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PerformanceCounter {
|
||||
pub total_confirmations: Arc<AtomicU64>,
|
||||
pub total_transactions_sent: Arc<AtomicU64>,
|
||||
|
||||
pub confirmations_per_seconds: Arc<AtomicU64>,
|
||||
pub transactions_per_seconds: Arc<AtomicU64>,
|
||||
|
||||
last_count_for_confirmations: Arc<AtomicU64>,
|
||||
last_count_for_transactions_sent: Arc<AtomicU64>,
|
||||
}
|
||||
|
||||
impl PerformanceCounter {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
total_confirmations: Arc::new(AtomicU64::new(0)),
|
||||
total_transactions_sent: Arc::new(AtomicU64::new(0)),
|
||||
confirmations_per_seconds: Arc::new(AtomicU64::new(0)),
|
||||
transactions_per_seconds: Arc::new(AtomicU64::new(0)),
|
||||
last_count_for_confirmations: Arc::new(AtomicU64::new(0)),
|
||||
last_count_for_transactions_sent: Arc::new(AtomicU64::new(0)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_per_seconds_transactions(&self) {
|
||||
let total_confirmations: u64 = self.total_confirmations.load(Ordering::Relaxed);
|
||||
|
||||
let total_transactions: u64 = self.total_transactions_sent.load(Ordering::Relaxed);
|
||||
|
||||
self.confirmations_per_seconds.store(
|
||||
total_confirmations - self.last_count_for_confirmations.load(Ordering::Relaxed),
|
||||
Ordering::Release,
|
||||
);
|
||||
self.transactions_per_seconds.store(
|
||||
total_transactions
|
||||
- self
|
||||
.last_count_for_transactions_sent
|
||||
.load(Ordering::Relaxed),
|
||||
Ordering::Release,
|
||||
);
|
||||
|
||||
self.last_count_for_confirmations
|
||||
.store(total_confirmations, Ordering::Relaxed);
|
||||
self.last_count_for_transactions_sent
|
||||
.store(total_transactions, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn update_sent_transactions_counter(&self) {
|
||||
self.total_transactions_sent.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn update_confirm_transaction_counter(&self) {
|
||||
self.total_confirmations.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn launch_performance_updating_thread(
|
||||
performance_counter: PerformanceCounter,
|
||||
) -> JoinHandle<()> {
|
||||
Builder::new()
|
||||
.name("Performance Counter".to_string())
|
||||
.spawn(move || loop {
|
||||
let start = Instant::now();
|
||||
|
||||
let wait_time = Duration::from_millis(1000);
|
||||
let performance_counter = performance_counter.clone();
|
||||
performance_counter.update_per_seconds_transactions();
|
||||
let confirmations_per_seconds = performance_counter
|
||||
.confirmations_per_seconds
|
||||
.load(Ordering::Acquire);
|
||||
let total_transactions_per_seconds = performance_counter
|
||||
.transactions_per_seconds
|
||||
.load(Ordering::Acquire);
|
||||
|
||||
let runtime = start.elapsed();
|
||||
if let Some(remaining) = wait_time.checked_sub(runtime) {
|
||||
println!(
|
||||
"Sent {} transactions and confrimed {} transactions",
|
||||
total_transactions_per_seconds, confirmations_per_seconds
|
||||
);
|
||||
thread::sleep(remaining);
|
||||
}
|
||||
})
|
||||
.unwrap()
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub enum BinaryEncoding {
|
||||
#[default]
|
||||
Base58,
|
||||
Base64,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum BinaryCodecError {
|
||||
#[error("Base58DecodeError {0}")]
|
||||
Base58DecodeError(#[from] bs58::decode::Error),
|
||||
#[error("Base58EncodeError {0}")]
|
||||
Base58EncodeError(#[from] bs58::encode::Error),
|
||||
#[error("Base64DecodeError {0}")]
|
||||
Base64DecodeError(#[from] base64::DecodeError),
|
||||
}
|
||||
|
||||
impl BinaryEncoding {
|
||||
pub fn decode<D: AsRef<[u8]>>(&self, to_decode: D) -> Result<Vec<u8>, BinaryCodecError> {
|
||||
match self {
|
||||
Self::Base58 => Ok(bs58::decode(to_decode).into_vec()?),
|
||||
Self::Base64 => Ok(base64::decode(to_decode)?),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn encode<E: AsRef<[u8]>>(&self, to_encode: E) -> String {
|
||||
match self {
|
||||
Self::Base58 => bs58::encode(to_encode).into_string(),
|
||||
Self::Base64 => base64::encode(to_encode),
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
pub mod bridge;
|
||||
pub mod cli;
|
||||
pub mod configs;
|
||||
pub mod encoding;
|
||||
pub mod rpc;
|
||||
pub mod workers;
|
||||
|
||||
pub type WireTransaction = Vec<u8>;
|
||||
|
||||
pub const DEFAULT_RPC_ADDR: &str = "http://127.0.0.1:8899";
|
||||
pub const DEFAULT_LITE_RPC_ADDR: &str = "127.0.0.1:8890";
|
||||
pub const DEFAULT_WS_ADDR: &str = "ws://127.0.0.1:8900";
|
||||
pub const DEFAULT_TX_MAX_RETRIES: u16 = 1;
|
||||
pub const DEFAULT_TX_RETRY_BATCH_SIZE: usize = 20;
|
182
src/main.rs
182
src/main.rs
|
@ -1,160 +1,42 @@
|
|||
mod cli;
|
||||
mod context;
|
||||
mod pubsub;
|
||||
mod rpc;
|
||||
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
use std::str::FromStr;
|
||||
|
||||
use clap::Parser;
|
||||
use context::LiteRpcSubsrciptionControl;
|
||||
use jsonrpc_core::MetaIoHandler;
|
||||
use jsonrpc_http_server::{hyper, AccessControlAllowOrigin, DomainsValidation, ServerBuilder};
|
||||
use pubsub::LitePubSubService;
|
||||
use solana_cli_config::ConfigInput;
|
||||
use solana_perf::thread::renice_this_thread;
|
||||
use tokio::sync::broadcast;
|
||||
use lite_rpc::bridge::LightBridge;
|
||||
use lite_rpc::cli::Args;
|
||||
use reqwest::Url;
|
||||
use simplelog::*;
|
||||
|
||||
use crate::{
|
||||
context::{launch_performance_updating_thread, PerformanceCounter},
|
||||
rpc::{
|
||||
lite_rpc::{self, Lite},
|
||||
LightRpcRequestProcessor,
|
||||
},
|
||||
};
|
||||
use cli::Args;
|
||||
#[tokio::main]
|
||||
pub async fn main() -> anyhow::Result<()> {
|
||||
TermLogger::init(
|
||||
LevelFilter::Info,
|
||||
Config::default(),
|
||||
TerminalMode::Mixed,
|
||||
ColorChoice::Auto,
|
||||
)?;
|
||||
|
||||
fn run(port: String, subscription_port: String, rpc_url: String, websocket_url: String) {
|
||||
let rpc_url = if rpc_url.is_empty() {
|
||||
let (_, rpc_url) = ConfigInput::compute_json_rpc_url_setting(
|
||||
rpc_url.as_str(),
|
||||
&ConfigInput::default().json_rpc_url,
|
||||
);
|
||||
rpc_url
|
||||
} else {
|
||||
rpc_url
|
||||
};
|
||||
let websocket_url = if websocket_url.is_empty() {
|
||||
let (_, ws_url) = ConfigInput::compute_websocket_url_setting(
|
||||
&websocket_url.as_str(),
|
||||
"",
|
||||
rpc_url.as_str(),
|
||||
"",
|
||||
);
|
||||
ws_url
|
||||
} else {
|
||||
websocket_url
|
||||
};
|
||||
println!(
|
||||
"Using rpc server {} and ws server {}",
|
||||
rpc_url, websocket_url
|
||||
);
|
||||
let performance_counter = PerformanceCounter::new();
|
||||
launch_performance_updating_thread(performance_counter.clone());
|
||||
let Args {
|
||||
rpc_addr,
|
||||
ws_addr,
|
||||
lite_rpc_addr,
|
||||
} = Args::parse();
|
||||
|
||||
let (broadcast_sender, _broadcast_receiver) = broadcast::channel(128);
|
||||
let (notification_sender, notification_reciever) = crossbeam_channel::unbounded();
|
||||
let light_bridge = LightBridge::new(Url::from_str(&rpc_addr).unwrap(), &ws_addr).await?;
|
||||
|
||||
let pubsub_control = Arc::new(LiteRpcSubsrciptionControl::new(
|
||||
broadcast_sender,
|
||||
notification_reciever,
|
||||
));
|
||||
let services = light_bridge.start_services(lite_rpc_addr);
|
||||
let services = futures::future::join_all(services);
|
||||
|
||||
let subscription_port = format!("127.0.0.1:{}", subscription_port)
|
||||
.parse::<SocketAddr>()
|
||||
.expect("Invalid subscription port");
|
||||
let ctrl_c_signal = tokio::signal::ctrl_c();
|
||||
|
||||
// start websocket server
|
||||
let (_trigger, websocket_service) = LitePubSubService::new(
|
||||
pubsub_control.clone(),
|
||||
subscription_port,
|
||||
performance_counter.clone(),
|
||||
);
|
||||
{
|
||||
let pubsub_control = pubsub_control.clone();
|
||||
std::thread::Builder::new()
|
||||
.name("broadcasting thread".to_string())
|
||||
.spawn(move || {
|
||||
pubsub_control.start_broadcasting();
|
||||
})
|
||||
.unwrap();
|
||||
tokio::select! {
|
||||
services = services => {
|
||||
for res in services {
|
||||
res??;
|
||||
}
|
||||
anyhow::bail!("Some services exited unexpectedly")
|
||||
}
|
||||
_ = ctrl_c_signal => {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
let mut io = MetaIoHandler::default();
|
||||
let lite_rpc = lite_rpc::LightRpc;
|
||||
io.extend_with(lite_rpc.to_delegate());
|
||||
|
||||
let mut request_processor = LightRpcRequestProcessor::new(
|
||||
rpc_url.as_str(),
|
||||
&websocket_url,
|
||||
notification_sender,
|
||||
performance_counter.clone(),
|
||||
);
|
||||
let runtime = Arc::new(
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(1)
|
||||
.on_thread_start(move || renice_this_thread(0).unwrap())
|
||||
.thread_name("solLiteRpcProcessor")
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Runtime"),
|
||||
);
|
||||
let max_request_body_size: usize = 50 * (1 << 10);
|
||||
|
||||
let socket_addr = format!("127.0.0.1:{}", port).parse::<SocketAddr>().unwrap();
|
||||
|
||||
{
|
||||
let request_processor = request_processor.clone();
|
||||
let server =
|
||||
ServerBuilder::with_meta_extractor(io, move |_req: &hyper::Request<hyper::Body>| {
|
||||
request_processor.clone()
|
||||
})
|
||||
.event_loop_executor(runtime.handle().clone())
|
||||
.threads(1)
|
||||
.cors(DomainsValidation::AllowOnly(vec![
|
||||
AccessControlAllowOrigin::Any,
|
||||
]))
|
||||
.cors_max_age(86400)
|
||||
.max_request_body_size(max_request_body_size)
|
||||
.start_http(&socket_addr);
|
||||
println!("Starting Lite RPC node");
|
||||
server.unwrap().wait();
|
||||
}
|
||||
request_processor.free();
|
||||
websocket_service.close().unwrap();
|
||||
}
|
||||
|
||||
fn ts_test() {
|
||||
let res = std::process::Command::new("yarn")
|
||||
.args(["run", "test:test-validator"])
|
||||
.output()
|
||||
.unwrap();
|
||||
println!("{}", String::from_utf8_lossy(&res.stdout));
|
||||
println!("{}", String::from_utf8_lossy(&res.stderr));
|
||||
}
|
||||
|
||||
pub fn main() {
|
||||
let cli_command = Args::parse();
|
||||
|
||||
match cli_command.command {
|
||||
cli::Command::Run {
|
||||
port,
|
||||
subscription_port,
|
||||
rpc_url,
|
||||
websocket_url,
|
||||
} => run(port, subscription_port, rpc_url, websocket_url),
|
||||
cli::Command::Test => ts_test(),
|
||||
}
|
||||
//cli_config.resolve_address();
|
||||
//println!(
|
||||
// "Using rpc server {} and ws server {}",
|
||||
// cli_config.rpc_url, cli_config.websocket_url
|
||||
//);
|
||||
//let Args {
|
||||
// rpc_url: json_rpc_url,
|
||||
// websocket_url,
|
||||
// port: rpc_addr,
|
||||
// subscription_port,
|
||||
// ..
|
||||
//} = &cli_config;
|
||||
|
||||
// start recieving notifications and broadcast them
|
||||
}
|
||||
|
|
263
src/pubsub.rs
263
src/pubsub.rs
|
@ -1,263 +0,0 @@
|
|||
use dashmap::DashMap;
|
||||
use jsonrpc_core::{ErrorCode, IoHandler};
|
||||
use soketto::handshake::{server, Server};
|
||||
use solana_rpc::rpc_subscription_tracker::{SignatureSubscriptionParams, SubscriptionParams};
|
||||
use std::{net::SocketAddr, str::FromStr, thread::JoinHandle};
|
||||
use stream_cancel::{Trigger, Tripwire};
|
||||
use tokio::{net::TcpStream, pin, select};
|
||||
use tokio_util::compat::TokioAsyncReadCompatExt;
|
||||
|
||||
use crate::context::{LiteRpcSubsrciptionControl, PerformanceCounter};
|
||||
use {
|
||||
jsonrpc_core::{Error, Result},
|
||||
jsonrpc_derive::rpc,
|
||||
solana_rpc::rpc_subscription_tracker::SubscriptionId,
|
||||
solana_rpc_client_api::config::*,
|
||||
solana_sdk::signature::Signature,
|
||||
std::sync::Arc,
|
||||
};
|
||||
|
||||
#[rpc]
|
||||
pub trait LiteRpcPubSub {
|
||||
// Get notification when signature is verified
|
||||
// Accepts signature parameter as base-58 encoded string
|
||||
#[rpc(name = "signatureSubscribe")]
|
||||
fn signature_subscribe(
|
||||
&self,
|
||||
signature_str: String,
|
||||
config: Option<RpcSignatureSubscribeConfig>,
|
||||
) -> Result<SubscriptionId>;
|
||||
|
||||
// Unsubscribe from signature notification subscription.
|
||||
#[rpc(name = "signatureUnsubscribe")]
|
||||
fn signature_unsubscribe(&self, id: SubscriptionId) -> Result<bool>;
|
||||
|
||||
// Get notification when slot is encountered
|
||||
#[rpc(name = "slotSubscribe")]
|
||||
fn slot_subscribe(&self) -> Result<SubscriptionId>;
|
||||
|
||||
// Unsubscribe from slot notification subscription.
|
||||
#[rpc(name = "slotUnsubscribe")]
|
||||
fn slot_unsubscribe(&self, id: SubscriptionId) -> Result<bool>;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct LiteRpcPubSubImpl {
|
||||
subscription_control: Arc<LiteRpcSubsrciptionControl>,
|
||||
pub current_subscriptions: Arc<DashMap<SubscriptionId, SubscriptionParams>>,
|
||||
}
|
||||
|
||||
impl LiteRpcPubSubImpl {
|
||||
pub fn new(subscription_control: Arc<LiteRpcSubsrciptionControl>) -> Self {
|
||||
Self {
|
||||
current_subscriptions: Arc::new(DashMap::new()),
|
||||
subscription_control,
|
||||
}
|
||||
}
|
||||
|
||||
fn subscribe(&self, params: SubscriptionParams) -> Result<SubscriptionId> {
|
||||
match self
|
||||
.subscription_control
|
||||
.subscriptions
|
||||
.entry(params.clone())
|
||||
{
|
||||
dashmap::mapref::entry::Entry::Occupied(x) => Ok(*x.get()),
|
||||
dashmap::mapref::entry::Entry::Vacant(x) => {
|
||||
let new_subscription_id = self
|
||||
.subscription_control
|
||||
.last_subscription_id
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
let new_subsription_id = SubscriptionId::from(new_subscription_id);
|
||||
x.insert(new_subsription_id);
|
||||
self.current_subscriptions
|
||||
.insert(new_subsription_id, params);
|
||||
Ok(new_subsription_id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn unsubscribe(&self, id: SubscriptionId) -> Result<bool> {
|
||||
match self.current_subscriptions.entry(id) {
|
||||
dashmap::mapref::entry::Entry::Occupied(x) => {
|
||||
x.remove();
|
||||
Ok(true)
|
||||
}
|
||||
dashmap::mapref::entry::Entry::Vacant(_) => Ok(false),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn param<T: FromStr>(param_str: &str, thing: &str) -> Result<T> {
|
||||
param_str.parse::<T>().map_err(|_e| Error {
|
||||
code: ErrorCode::InvalidParams,
|
||||
message: format!("Invalid Request: Invalid {} provided", thing),
|
||||
data: None,
|
||||
})
|
||||
}
|
||||
|
||||
impl LiteRpcPubSub for LiteRpcPubSubImpl {
|
||||
fn signature_subscribe(
|
||||
&self,
|
||||
signature_str: String,
|
||||
config: Option<RpcSignatureSubscribeConfig>,
|
||||
) -> Result<SubscriptionId> {
|
||||
let config = config.unwrap_or_default();
|
||||
let params = SignatureSubscriptionParams {
|
||||
signature: param::<Signature>(&signature_str, "signature")?,
|
||||
commitment: config.commitment.unwrap_or_default(),
|
||||
enable_received_notification: false,
|
||||
};
|
||||
self.subscribe(SubscriptionParams::Signature(params))
|
||||
}
|
||||
|
||||
fn signature_unsubscribe(&self, id: SubscriptionId) -> Result<bool> {
|
||||
self.unsubscribe(id)
|
||||
}
|
||||
|
||||
// Get notification when slot is encountered
|
||||
fn slot_subscribe(&self) -> Result<SubscriptionId> {
|
||||
self.current_subscriptions
|
||||
.insert(SubscriptionId::from(0), SubscriptionParams::Slot);
|
||||
Ok(SubscriptionId::from(0))
|
||||
}
|
||||
|
||||
// Unsubscribe from slot notification subscription.
|
||||
fn slot_unsubscribe(&self, _id: SubscriptionId) -> Result<bool> {
|
||||
self.current_subscriptions.remove(&SubscriptionId::from(0));
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LitePubSubService {
|
||||
thread_hdl: JoinHandle<()>,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
enum HandleError {
|
||||
#[error("handshake error: {0}")]
|
||||
Handshake(#[from] soketto::handshake::Error),
|
||||
#[error("connection error: {0}")]
|
||||
Connection(#[from] soketto::connection::Error),
|
||||
#[error("broadcast queue error: {0}")]
|
||||
Broadcast(#[from] tokio::sync::broadcast::error::RecvError),
|
||||
}
|
||||
|
||||
async fn handle_connection(
|
||||
socket: TcpStream,
|
||||
subscription_control: Arc<LiteRpcSubsrciptionControl>,
|
||||
performance_counter: PerformanceCounter,
|
||||
) -> core::result::Result<(), HandleError> {
|
||||
let mut server = Server::new(socket.compat());
|
||||
let request = server.receive_request().await?;
|
||||
let accept = server::Response::Accept {
|
||||
key: request.key(),
|
||||
protocol: None,
|
||||
};
|
||||
server.send_response(&accept).await?;
|
||||
let (mut sender, mut receiver) = server.into_builder().finish();
|
||||
|
||||
let mut broadcast_receiver = subscription_control.broadcast_sender.subscribe();
|
||||
let mut json_rpc_handler = IoHandler::new();
|
||||
let rpc_impl = LiteRpcPubSubImpl::new(subscription_control);
|
||||
json_rpc_handler.extend_with(rpc_impl.clone().to_delegate());
|
||||
loop {
|
||||
let mut data = Vec::new();
|
||||
// Extra block for dropping `receive_future`.
|
||||
{
|
||||
// soketto is not cancel safe, so we have to introduce an inner loop to poll
|
||||
// `receive_data` to completion.
|
||||
let receive_future = receiver.receive_data(&mut data);
|
||||
pin!(receive_future);
|
||||
loop {
|
||||
select! {
|
||||
result = &mut receive_future => match result {
|
||||
Ok(_) => break,
|
||||
Err(soketto::connection::Error::Closed) => return Ok(()),
|
||||
Err(err) => return Err(err.into()),
|
||||
},
|
||||
result = broadcast_receiver.recv() => {
|
||||
if let Ok(x) = result {
|
||||
if rpc_impl.current_subscriptions.contains_key(&x.subscription_id) {
|
||||
performance_counter.update_confirm_transaction_counter();
|
||||
sender.send_text(&x.json).await?;
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
let data_str = String::from_utf8(data).unwrap();
|
||||
if let Some(response) = json_rpc_handler.handle_request(data_str.as_str()).await {
|
||||
sender.send_text(&response).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn listen(
|
||||
listen_address: SocketAddr,
|
||||
subscription_control: Arc<LiteRpcSubsrciptionControl>,
|
||||
mut tripwire: Tripwire,
|
||||
performance_counter: PerformanceCounter,
|
||||
) -> std::io::Result<()> {
|
||||
let listener = tokio::net::TcpListener::bind(&listen_address).await?;
|
||||
loop {
|
||||
select! {
|
||||
result = listener.accept() => match result {
|
||||
Ok((socket, addr)) => {
|
||||
let subscription_control = subscription_control.clone();
|
||||
let performance_counter = performance_counter.clone();
|
||||
tokio::spawn(async move {
|
||||
let handle = handle_connection(
|
||||
socket, subscription_control, performance_counter,
|
||||
);
|
||||
match handle.await {
|
||||
Ok(()) => println!("connection closed ({:?})", addr),
|
||||
Err(err) => println!("connection handler error ({:?}): {}", addr, err),
|
||||
}
|
||||
});
|
||||
},
|
||||
Err(e) => println!("couldn't accept connection: {:?}", e),
|
||||
},
|
||||
_ = &mut tripwire => return Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LitePubSubService {
|
||||
pub fn new(
|
||||
subscription_control: Arc<LiteRpcSubsrciptionControl>,
|
||||
pubsub_addr: SocketAddr,
|
||||
performance_counter: PerformanceCounter,
|
||||
) -> (Trigger, Self) {
|
||||
let (trigger, tripwire) = Tripwire::new();
|
||||
|
||||
let thread_hdl = std::thread::Builder::new()
|
||||
.name("solRpcPubSub".to_string())
|
||||
.spawn(move || {
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(512)
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("runtime creation failed");
|
||||
if let Err(err) = runtime.block_on(listen(
|
||||
pubsub_addr,
|
||||
subscription_control,
|
||||
tripwire,
|
||||
performance_counter,
|
||||
)) {
|
||||
println!("pubsub service failed: {}", err);
|
||||
};
|
||||
})
|
||||
.expect("thread spawn failed");
|
||||
|
||||
(trigger, Self { thread_hdl })
|
||||
}
|
||||
|
||||
pub fn close(self) -> std::thread::Result<()> {
|
||||
self.join()
|
||||
}
|
||||
|
||||
pub fn join(self) -> std::thread::Result<()> {
|
||||
self.thread_hdl.join()
|
||||
}
|
||||
}
|
733
src/rpc.rs
733
src/rpc.rs
|
@ -1,672 +1,99 @@
|
|||
use dashmap::DashMap;
|
||||
use crate::{configs::SendTransactionConfig, encoding::BinaryCodecError};
|
||||
|
||||
use actix_web::error::JsonPayloadError;
|
||||
use actix_web::{http::StatusCode, HttpResponse, Responder};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use solana_client::{
|
||||
pubsub_client::{BlockSubscription, PubsubClientError},
|
||||
tpu_client::TpuClientConfig,
|
||||
};
|
||||
use solana_pubsub_client::pubsub_client::{PubsubBlockClientSubscription, PubsubClient};
|
||||
use std::{
|
||||
str::FromStr,
|
||||
sync::Mutex,
|
||||
thread::{Builder, JoinHandle},
|
||||
};
|
||||
use serde_json::json;
|
||||
use solana_sdk::signature::ParseSignatureError;
|
||||
use solana_sdk::transport::TransportError;
|
||||
|
||||
use crate::context::{
|
||||
BlockInformation, LiteRpcContext, NotificationType, PerformanceCounter, SignatureNotification,
|
||||
SlotNotification,
|
||||
};
|
||||
use crossbeam_channel::Sender;
|
||||
use {
|
||||
bincode::config::Options,
|
||||
crossbeam_channel::Receiver,
|
||||
jsonrpc_core::{Error, Metadata, Result},
|
||||
jsonrpc_derive::rpc,
|
||||
solana_client::connection_cache::ConnectionCache,
|
||||
solana_client::{rpc_client::RpcClient, tpu_client::TpuClient},
|
||||
solana_perf::packet::PACKET_DATA_SIZE,
|
||||
solana_rpc_client_api::{
|
||||
config::*,
|
||||
response::{Response as RpcResponse, *},
|
||||
},
|
||||
solana_sdk::{
|
||||
commitment_config::{CommitmentConfig, CommitmentLevel},
|
||||
signature::Signature,
|
||||
transaction::VersionedTransaction,
|
||||
},
|
||||
solana_transaction_status::{TransactionBinaryEncoding, UiTransactionEncoding},
|
||||
std::{
|
||||
any::type_name,
|
||||
sync::{atomic::Ordering, Arc},
|
||||
},
|
||||
};
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct SendTransactionParams(pub String, #[serde(default)] pub SendTransactionConfig);
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct LightRpcRequestProcessor {
|
||||
pub rpc_client: Arc<RpcClient>,
|
||||
pub tpu_client: Arc<TpuClient>,
|
||||
pub last_valid_block_height: u64,
|
||||
pub ws_url: String,
|
||||
pub context: Arc<LiteRpcContext>,
|
||||
_connection_cache: Arc<ConnectionCache>,
|
||||
joinables: Arc<Mutex<Vec<JoinHandle<()>>>>,
|
||||
subscribed_clients: Arc<Mutex<Vec<PubsubBlockClientSubscription>>>,
|
||||
performance_counter: PerformanceCounter,
|
||||
}
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct ConfirmTransactionParams(pub String, #[serde(default)] pub Option<()>);
|
||||
|
||||
impl LightRpcRequestProcessor {
|
||||
pub fn new(
|
||||
json_rpc_url: &str,
|
||||
websocket_url: &str,
|
||||
notification_sender: Sender<NotificationType>,
|
||||
performance_counter: PerformanceCounter,
|
||||
) -> LightRpcRequestProcessor {
|
||||
let rpc_client = Arc::new(RpcClient::new(json_rpc_url));
|
||||
let connection_cache = Arc::new(ConnectionCache::default());
|
||||
let tpu_client = Arc::new(
|
||||
TpuClient::new_with_connection_cache(
|
||||
rpc_client.clone(),
|
||||
websocket_url,
|
||||
TpuClientConfig::default(),
|
||||
connection_cache.clone(),
|
||||
)
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
let context = Arc::new(LiteRpcContext::new(rpc_client.clone(), notification_sender));
|
||||
|
||||
// subscribe for confirmed_blocks
|
||||
let (client_confirmed, receiver_confirmed) =
|
||||
Self::subscribe_block(websocket_url, CommitmentLevel::Confirmed).unwrap();
|
||||
|
||||
// subscribe for finalized blocks
|
||||
let (client_finalized, receiver_finalized) =
|
||||
Self::subscribe_block(websocket_url, CommitmentLevel::Finalized).unwrap();
|
||||
|
||||
// create threads to listen for finalized and confrimed blocks
|
||||
let joinables = vec![
|
||||
Self::build_thread_to_process_blocks(
|
||||
receiver_confirmed,
|
||||
&context,
|
||||
CommitmentLevel::Confirmed,
|
||||
),
|
||||
Self::build_thread_to_process_blocks(
|
||||
receiver_finalized,
|
||||
&context,
|
||||
CommitmentLevel::Finalized,
|
||||
),
|
||||
];
|
||||
|
||||
LightRpcRequestProcessor {
|
||||
rpc_client,
|
||||
tpu_client,
|
||||
last_valid_block_height: 0,
|
||||
ws_url: websocket_url.to_string(),
|
||||
context,
|
||||
_connection_cache: connection_cache,
|
||||
joinables: Arc::new(Mutex::new(joinables)),
|
||||
subscribed_clients: Arc::new(Mutex::new(vec![client_confirmed, client_finalized])),
|
||||
performance_counter,
|
||||
}
|
||||
}
|
||||
|
||||
fn subscribe_block(
|
||||
websocket_url: &str,
|
||||
commitment: CommitmentLevel,
|
||||
) -> std::result::Result<BlockSubscription, PubsubClientError> {
|
||||
PubsubClient::block_subscribe(
|
||||
websocket_url,
|
||||
RpcBlockSubscribeFilter::All,
|
||||
Some(RpcBlockSubscribeConfig {
|
||||
commitment: Some(CommitmentConfig { commitment }),
|
||||
encoding: None,
|
||||
transaction_details: Some(
|
||||
solana_transaction_status::TransactionDetails::Signatures,
|
||||
),
|
||||
show_rewards: None,
|
||||
max_supported_transaction_version: None,
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
fn build_thread_to_process_blocks(
|
||||
reciever: Receiver<RpcResponse<RpcBlockUpdate>>,
|
||||
context: &Arc<LiteRpcContext>,
|
||||
commitment: CommitmentLevel,
|
||||
) -> JoinHandle<()> {
|
||||
let context = context.clone();
|
||||
Builder::new()
|
||||
.name("thread working on confirmation block".to_string())
|
||||
.spawn(move || {
|
||||
let block_info = if commitment.eq(&CommitmentLevel::Finalized) {
|
||||
&context.confirmed_block_info
|
||||
} else {
|
||||
&context.finalized_block_info
|
||||
};
|
||||
Self::process_block(
|
||||
reciever,
|
||||
&context.signature_status,
|
||||
commitment,
|
||||
&context.notification_sender,
|
||||
block_info,
|
||||
);
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn process_block(
|
||||
reciever: Receiver<RpcResponse<RpcBlockUpdate>>,
|
||||
signature_status: &DashMap<String, Option<CommitmentLevel>>,
|
||||
commitment: CommitmentLevel,
|
||||
notification_sender: &crossbeam_channel::Sender<NotificationType>,
|
||||
block_information: &BlockInformation,
|
||||
) {
|
||||
loop {
|
||||
let block_data = reciever.recv();
|
||||
|
||||
match block_data {
|
||||
Ok(data) => {
|
||||
let block_update = &data.value;
|
||||
block_information
|
||||
.slot
|
||||
.store(block_update.slot, Ordering::Relaxed);
|
||||
let slot_notification = SlotNotification {
|
||||
commitment: commitment,
|
||||
slot: block_update.slot,
|
||||
parent: 0,
|
||||
root: 0,
|
||||
};
|
||||
if let Err(e) =
|
||||
notification_sender.send(NotificationType::Slot(slot_notification))
|
||||
{
|
||||
println!("Error sending slot notification error : {}", e.to_string());
|
||||
}
|
||||
|
||||
if let Some(block) = &block_update.block {
|
||||
block_information
|
||||
.block_height
|
||||
.store(block.block_height.unwrap(), Ordering::Relaxed);
|
||||
// context to update blockhash
|
||||
{
|
||||
let mut lock = block_information.block_hash.write().unwrap();
|
||||
*lock = block.blockhash.clone();
|
||||
}
|
||||
|
||||
if let Some(signatures) = &block.signatures {
|
||||
for signature in signatures {
|
||||
match signature_status.entry(signature.clone()) {
|
||||
dashmap::mapref::entry::Entry::Occupied(mut x) => {
|
||||
let signature_notification = SignatureNotification {
|
||||
signature: Signature::from_str(signature.as_str())
|
||||
.unwrap(),
|
||||
commitment,
|
||||
slot: block_update.slot,
|
||||
error: None,
|
||||
};
|
||||
if let Err(e) = notification_sender.send(
|
||||
NotificationType::Signature(signature_notification),
|
||||
) {
|
||||
println!(
|
||||
"Error sending signature notification error : {}",
|
||||
e.to_string()
|
||||
);
|
||||
}
|
||||
x.insert(Some(commitment));
|
||||
}
|
||||
dashmap::mapref::entry::Entry::Vacant(_x) => {
|
||||
// do nothing transaction not sent by lite rpc
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
println!(
|
||||
"Cannot get signatures at slot {} block hash {}",
|
||||
block_update.slot, block.blockhash,
|
||||
);
|
||||
}
|
||||
} else {
|
||||
println!("Cannot get a block at slot {}", block_update.slot);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Got error when recieving the block ({})", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn free(&mut self) {
|
||||
let subscribed_clients = &mut self.subscribed_clients.lock().unwrap();
|
||||
let len_sc = subscribed_clients.len();
|
||||
for _i in 0..len_sc {
|
||||
let mut subscribed_client = subscribed_clients.pop().unwrap();
|
||||
subscribed_client.send_unsubscribe().unwrap();
|
||||
subscribed_client.shutdown().unwrap();
|
||||
}
|
||||
|
||||
let joinables = &mut self.joinables.lock().unwrap();
|
||||
let len = joinables.len();
|
||||
for _i in 0..len {
|
||||
joinables.pop().unwrap().join().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Metadata for LightRpcRequestProcessor {}
|
||||
|
||||
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct RpcPerformanceCounterResults {
|
||||
pub transactions_per_seconds: u64,
|
||||
pub confirmations_per_seconds: u64,
|
||||
pub total_transactions_count: u64,
|
||||
pub total_confirmations_count: u64,
|
||||
pub memory_used: u64,
|
||||
pub nb_threads: u64,
|
||||
pub enum RpcMethod {
|
||||
SendTransaction,
|
||||
ConfirmTransaction,
|
||||
GetVersion,
|
||||
#[serde(other)]
|
||||
Other,
|
||||
}
|
||||
|
||||
pub mod lite_rpc {
|
||||
use std::str::FromStr;
|
||||
/// According to <https://www.jsonrpc.org/specification#overview>
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
pub struct JsonRpcReq {
|
||||
pub method: RpcMethod,
|
||||
#[serde(default)]
|
||||
pub params: serde_json::Value,
|
||||
}
|
||||
|
||||
use itertools::Itertools;
|
||||
use solana_sdk::{fee_calculator::FeeCalculator, pubkey::Pubkey};
|
||||
use solana_transaction_status::{TransactionConfirmationStatus, TransactionStatus};
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
pub enum JsonRpcRes {
|
||||
Raw { status: u16, body: String },
|
||||
Err(serde_json::Value),
|
||||
Ok(serde_json::Value),
|
||||
}
|
||||
|
||||
use super::*;
|
||||
#[rpc]
|
||||
pub trait Lite {
|
||||
type Metadata;
|
||||
impl Responder for JsonRpcRes {
|
||||
type Body = String;
|
||||
|
||||
#[rpc(meta, name = "sendTransaction")]
|
||||
fn send_transaction(
|
||||
&self,
|
||||
meta: Self::Metadata,
|
||||
data: String,
|
||||
config: Option<RpcSendTransactionConfig>,
|
||||
) -> Result<String>;
|
||||
|
||||
#[rpc(meta, name = "getRecentBlockhash")]
|
||||
fn get_recent_blockhash(
|
||||
&self,
|
||||
meta: Self::Metadata,
|
||||
commitment: Option<CommitmentConfig>,
|
||||
) -> Result<RpcResponse<RpcBlockhashFeeCalculator>>;
|
||||
|
||||
#[rpc(meta, name = "confirmTransaction")]
|
||||
fn confirm_transaction(
|
||||
&self,
|
||||
meta: Self::Metadata,
|
||||
signature_str: String,
|
||||
commitment: Option<CommitmentConfig>,
|
||||
) -> Result<RpcResponse<bool>>;
|
||||
|
||||
#[rpc(meta, name = "requestAirdrop")]
|
||||
fn request_airdrop(
|
||||
&self,
|
||||
meta: Self::Metadata,
|
||||
pubkey_str: String,
|
||||
lamports: u64,
|
||||
config: Option<RpcRequestAirdropConfig>,
|
||||
) -> Result<String>;
|
||||
|
||||
#[rpc(meta, name = "getPerformanceCounters")]
|
||||
fn get_performance_counters(
|
||||
&self,
|
||||
meta: Self::Metadata,
|
||||
) -> Result<RpcPerformanceCounterResults>;
|
||||
|
||||
#[rpc(meta, name = "getLatestBlockhash")]
|
||||
fn get_latest_blockhash(
|
||||
&self,
|
||||
meta: Self::Metadata,
|
||||
config: Option<RpcContextConfig>,
|
||||
) -> Result<RpcResponse<RpcBlockhash>>;
|
||||
|
||||
#[rpc(meta, name = "getSignatureStatuses")]
|
||||
fn get_signature_statuses(
|
||||
&self,
|
||||
meta: Self::Metadata,
|
||||
signature_strs: Vec<String>,
|
||||
config: Option<RpcSignatureStatusConfig>,
|
||||
) -> Result<RpcResponse<Vec<Option<TransactionStatus>>>>;
|
||||
|
||||
#[rpc(name = "getVersion")]
|
||||
fn get_version(&self) -> Result<RpcVersionInfo>;
|
||||
}
|
||||
|
||||
pub struct LightRpc;
|
||||
impl Lite for LightRpc {
|
||||
type Metadata = LightRpcRequestProcessor;
|
||||
|
||||
fn send_transaction(
|
||||
&self,
|
||||
meta: Self::Metadata,
|
||||
data: String,
|
||||
config: Option<RpcSendTransactionConfig>,
|
||||
) -> Result<String> {
|
||||
let config = config.unwrap_or_default();
|
||||
let encoding = config.encoding;
|
||||
let tx_encoding = encoding.unwrap_or(UiTransactionEncoding::Base58);
|
||||
let binary_encoding = tx_encoding.into_binary_encoding().ok_or_else(|| {
|
||||
Error::invalid_params(format!(
|
||||
"unsupported encoding: {}. Supported encodings: base58, base64",
|
||||
tx_encoding
|
||||
))
|
||||
})?;
|
||||
let (wire_transaction, transaction) =
|
||||
decode_and_deserialize::<VersionedTransaction>(data, binary_encoding)?;
|
||||
|
||||
meta.context
|
||||
.signature_status
|
||||
.insert(transaction.signatures[0].to_string(), None);
|
||||
meta.tpu_client.send_wire_transaction(wire_transaction);
|
||||
meta.performance_counter.update_sent_transactions_counter();
|
||||
Ok(transaction.signatures[0].to_string())
|
||||
fn respond_to(self, _: &actix_web::HttpRequest) -> HttpResponse<Self::Body> {
|
||||
if let Self::Raw { status, body } = self {
|
||||
return HttpResponse::new(StatusCode::from_u16(status).unwrap()).set_body(body);
|
||||
}
|
||||
let mut res = json!({
|
||||
"jsonrpc" : "2.0",
|
||||
// TODO: add id
|
||||
});
|
||||
|
||||
fn get_recent_blockhash(
|
||||
&self,
|
||||
meta: Self::Metadata,
|
||||
commitment: Option<CommitmentConfig>,
|
||||
) -> Result<RpcResponse<RpcBlockhashFeeCalculator>> {
|
||||
let commitment = match commitment {
|
||||
Some(x) => x.commitment,
|
||||
None => CommitmentLevel::Finalized,
|
||||
};
|
||||
let (block_hash, slot) = match commitment {
|
||||
CommitmentLevel::Finalized => {
|
||||
let slot = meta
|
||||
.context
|
||||
.finalized_block_info
|
||||
.slot
|
||||
.load(Ordering::Relaxed);
|
||||
let lock = meta.context.finalized_block_info.block_hash.read().unwrap();
|
||||
(lock.clone(), slot)
|
||||
}
|
||||
_ => {
|
||||
let slot = meta
|
||||
.context
|
||||
.confirmed_block_info
|
||||
.slot
|
||||
.load(Ordering::Relaxed);
|
||||
let lock = meta.context.confirmed_block_info.block_hash.read().unwrap();
|
||||
(lock.clone(), slot)
|
||||
}
|
||||
};
|
||||
|
||||
Ok(RpcResponse {
|
||||
context: RpcResponseContext::new(slot),
|
||||
value: RpcBlockhashFeeCalculator {
|
||||
blockhash: block_hash,
|
||||
fee_calculator: FeeCalculator::default(),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
fn get_latest_blockhash(
|
||||
&self,
|
||||
meta: Self::Metadata,
|
||||
config: Option<RpcContextConfig>,
|
||||
) -> Result<RpcResponse<RpcBlockhash>> {
|
||||
let commitment = match config {
|
||||
Some(x) => match x.commitment {
|
||||
Some(x) => x.commitment,
|
||||
None => CommitmentLevel::Finalized,
|
||||
},
|
||||
None => CommitmentLevel::Finalized,
|
||||
};
|
||||
|
||||
let block_info = match commitment {
|
||||
CommitmentLevel::Finalized => &meta.context.finalized_block_info,
|
||||
_ => &meta.context.confirmed_block_info,
|
||||
};
|
||||
|
||||
let slot = block_info.slot.load(Ordering::Relaxed);
|
||||
let last_valid_block_height = block_info.block_height.load(Ordering::Relaxed);
|
||||
let blockhash = block_info.block_hash.read().unwrap().clone();
|
||||
|
||||
Ok(RpcResponse {
|
||||
context: RpcResponseContext::new(slot),
|
||||
value: RpcBlockhash {
|
||||
blockhash,
|
||||
last_valid_block_height,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
fn confirm_transaction(
|
||||
&self,
|
||||
meta: Self::Metadata,
|
||||
signature_str: String,
|
||||
commitment_cfg: Option<CommitmentConfig>,
|
||||
) -> Result<RpcResponse<bool>> {
|
||||
let singature_status = meta.context.signature_status.get(&signature_str);
|
||||
let k_value = singature_status;
|
||||
|
||||
let commitment = match commitment_cfg {
|
||||
Some(x) => x.commitment,
|
||||
None => CommitmentLevel::Confirmed,
|
||||
};
|
||||
|
||||
let slot = if commitment.eq(&CommitmentLevel::Finalized) {
|
||||
meta.context
|
||||
.finalized_block_info
|
||||
.slot
|
||||
.load(Ordering::Relaxed)
|
||||
} else {
|
||||
meta.context
|
||||
.confirmed_block_info
|
||||
.slot
|
||||
.load(Ordering::Relaxed)
|
||||
};
|
||||
meta.performance_counter
|
||||
.update_confirm_transaction_counter();
|
||||
|
||||
match k_value {
|
||||
Some(value) => match *value {
|
||||
Some(commitment_for_signature) => Ok(RpcResponse {
|
||||
context: RpcResponseContext::new(slot),
|
||||
value: if commitment.eq(&CommitmentLevel::Finalized) {
|
||||
commitment_for_signature.eq(&CommitmentLevel::Finalized)
|
||||
} else {
|
||||
commitment_for_signature.eq(&CommitmentLevel::Finalized)
|
||||
|| commitment_for_signature.eq(&CommitmentLevel::Confirmed)
|
||||
},
|
||||
}),
|
||||
None => Ok(RpcResponse {
|
||||
context: RpcResponseContext::new(slot),
|
||||
value: false,
|
||||
}),
|
||||
},
|
||||
None => {
|
||||
let signature = Signature::from_str(signature_str.as_str()).unwrap();
|
||||
let ans = match commitment_cfg {
|
||||
None => meta.rpc_client.confirm_transaction(&signature).unwrap(),
|
||||
Some(cfg) => {
|
||||
meta.rpc_client
|
||||
.confirm_transaction_with_commitment(&signature, cfg)
|
||||
.unwrap()
|
||||
.value
|
||||
}
|
||||
};
|
||||
Ok(RpcResponse {
|
||||
context: RpcResponseContext::new(slot),
|
||||
value: ans,
|
||||
})
|
||||
}
|
||||
match self {
|
||||
Self::Err(error) => {
|
||||
res["error"] = error;
|
||||
HttpResponse::new(StatusCode::from_u16(500).unwrap()).set_body(res.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
fn get_signature_statuses(
|
||||
&self,
|
||||
meta: Self::Metadata,
|
||||
signature_strs: Vec<String>,
|
||||
_config: Option<RpcSignatureStatusConfig>,
|
||||
) -> Result<RpcResponse<Vec<Option<TransactionStatus>>>> {
|
||||
let confirmed_slot = meta
|
||||
.context
|
||||
.confirmed_block_info
|
||||
.slot
|
||||
.load(Ordering::Relaxed);
|
||||
let status = signature_strs
|
||||
.iter()
|
||||
.map(|x| {
|
||||
let singature_status = meta.context.signature_status.get(x);
|
||||
let k_value = singature_status;
|
||||
match k_value {
|
||||
Some(value) => match *value {
|
||||
Some(commitment_for_signature) => {
|
||||
let slot = meta
|
||||
.context
|
||||
.confirmed_block_info
|
||||
.slot
|
||||
.load(Ordering::Relaxed);
|
||||
meta.performance_counter
|
||||
.update_confirm_transaction_counter();
|
||||
|
||||
let status = match commitment_for_signature {
|
||||
CommitmentLevel::Finalized => {
|
||||
TransactionConfirmationStatus::Finalized
|
||||
}
|
||||
_ => TransactionConfirmationStatus::Confirmed,
|
||||
};
|
||||
Some(TransactionStatus {
|
||||
slot,
|
||||
confirmations: Some(1),
|
||||
status: Ok(()),
|
||||
err: None,
|
||||
confirmation_status: Some(status),
|
||||
})
|
||||
}
|
||||
None => None,
|
||||
},
|
||||
None => None,
|
||||
}
|
||||
})
|
||||
.collect_vec();
|
||||
Ok(RpcResponse {
|
||||
context: RpcResponseContext::new(confirmed_slot),
|
||||
value: status,
|
||||
})
|
||||
}
|
||||
|
||||
fn request_airdrop(
|
||||
&self,
|
||||
meta: Self::Metadata,
|
||||
pubkey_str: String,
|
||||
lamports: u64,
|
||||
config: Option<RpcRequestAirdropConfig>,
|
||||
) -> Result<String> {
|
||||
let pubkey = Pubkey::from_str(pubkey_str.as_str()).unwrap();
|
||||
let signature = match config {
|
||||
Some(c) => meta
|
||||
.rpc_client
|
||||
.request_airdrop_with_config(&pubkey, lamports, c),
|
||||
None => meta.rpc_client.request_airdrop(&pubkey, lamports),
|
||||
};
|
||||
Ok(signature.unwrap().to_string())
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Result<RpcVersionInfo> {
|
||||
let version = solana_version::Version::default();
|
||||
Ok(RpcVersionInfo {
|
||||
solana_core: version.to_string(),
|
||||
feature_set: Some(version.feature_set),
|
||||
})
|
||||
}
|
||||
|
||||
fn get_performance_counters(
|
||||
&self,
|
||||
meta: Self::Metadata,
|
||||
) -> Result<RpcPerformanceCounterResults> {
|
||||
let total_transactions_count = meta
|
||||
.performance_counter
|
||||
.total_transactions_sent
|
||||
.load(Ordering::Relaxed);
|
||||
let total_confirmations_count = meta
|
||||
.performance_counter
|
||||
.total_confirmations
|
||||
.load(Ordering::Relaxed);
|
||||
let transactions_per_seconds = meta
|
||||
.performance_counter
|
||||
.transactions_per_seconds
|
||||
.load(Ordering::Acquire);
|
||||
let confirmations_per_seconds = meta
|
||||
.performance_counter
|
||||
.confirmations_per_seconds
|
||||
.load(Ordering::Acquire);
|
||||
|
||||
let procinfo::pid::Statm { size, .. } = procinfo::pid::statm_self().unwrap();
|
||||
let procinfo::pid::Stat { num_threads, .. } = procinfo::pid::stat_self().unwrap();
|
||||
|
||||
Ok(RpcPerformanceCounterResults {
|
||||
confirmations_per_seconds,
|
||||
transactions_per_seconds,
|
||||
total_confirmations_count,
|
||||
total_transactions_count,
|
||||
memory_used: size as u64,
|
||||
nb_threads: num_threads as u64,
|
||||
})
|
||||
Self::Ok(result) => {
|
||||
res["result"] = result;
|
||||
HttpResponse::new(StatusCode::OK).set_body(res.to_string())
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const MAX_BASE58_SIZE: usize = 1683; // Golden, bump if PACKET_DATA_SIZE changes
|
||||
const MAX_BASE64_SIZE: usize = 1644; // Golden, bump if PACKET_DATA_SIZE changes
|
||||
fn decode_and_deserialize<T>(
|
||||
encoded: String,
|
||||
encoding: TransactionBinaryEncoding,
|
||||
) -> Result<(Vec<u8>, T)>
|
||||
where
|
||||
T: serde::de::DeserializeOwned,
|
||||
{
|
||||
let wire_output = match encoding {
|
||||
TransactionBinaryEncoding::Base58 => {
|
||||
if encoded.len() > MAX_BASE58_SIZE {
|
||||
return Err(Error::invalid_params(format!(
|
||||
"base58 encoded {} too large: {} bytes (max: encoded/raw {}/{})",
|
||||
type_name::<T>(),
|
||||
encoded.len(),
|
||||
MAX_BASE58_SIZE,
|
||||
PACKET_DATA_SIZE,
|
||||
)));
|
||||
}
|
||||
bs58::decode(encoded)
|
||||
.into_vec()
|
||||
.map_err(|e| Error::invalid_params(format!("invalid base58 encoding: {:?}", e)))?
|
||||
}
|
||||
TransactionBinaryEncoding::Base64 => {
|
||||
if encoded.len() > MAX_BASE64_SIZE {
|
||||
return Err(Error::invalid_params(format!(
|
||||
"base64 encoded {} too large: {} bytes (max: encoded/raw {}/{})",
|
||||
type_name::<T>(),
|
||||
encoded.len(),
|
||||
MAX_BASE64_SIZE,
|
||||
PACKET_DATA_SIZE,
|
||||
)));
|
||||
}
|
||||
base64::decode(encoded)
|
||||
.map_err(|e| Error::invalid_params(format!("invalid base64 encoding: {:?}", e)))?
|
||||
}
|
||||
};
|
||||
if wire_output.len() > PACKET_DATA_SIZE {
|
||||
return Err(Error::invalid_params(format!(
|
||||
"decoded {} too large: {} bytes (max: {} bytes)",
|
||||
type_name::<T>(),
|
||||
wire_output.len(),
|
||||
PACKET_DATA_SIZE
|
||||
)));
|
||||
}
|
||||
bincode::options()
|
||||
.with_limit(PACKET_DATA_SIZE as u64)
|
||||
.with_fixint_encoding()
|
||||
.allow_trailing_bytes()
|
||||
.deserialize_from(&wire_output[..])
|
||||
.map_err(|err| {
|
||||
Error::invalid_params(format!(
|
||||
"failed to deserialize {}: {}",
|
||||
type_name::<T>(),
|
||||
&err.to_string()
|
||||
))
|
||||
impl<T: serde::Serialize> TryFrom<Result<T, JsonRpcError>> for JsonRpcRes {
|
||||
type Error = serde_json::Error;
|
||||
|
||||
fn try_from(result: Result<T, JsonRpcError>) -> Result<Self, Self::Error> {
|
||||
Ok(match result {
|
||||
Ok(value) => Self::Ok(serde_json::to_value(value)?),
|
||||
// TODO: add custom handle
|
||||
Err(error) => error.into(),
|
||||
})
|
||||
.map(|output| (wire_output, output))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<JsonRpcError> for JsonRpcRes {
|
||||
fn from(error: JsonRpcError) -> Self {
|
||||
Self::Err(serde_json::Value::String(format!("{error:?}")))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum JsonRpcError {
|
||||
#[error("TransportError {0}")]
|
||||
TransportError(#[from] TransportError),
|
||||
#[error("BinaryCodecError {0}")]
|
||||
BinaryCodecError(#[from] BinaryCodecError),
|
||||
#[error("BincodeDeserializeError {0}")]
|
||||
BincodeDeserializeError(#[from] bincode::Error),
|
||||
#[error("SerdeError {0}")]
|
||||
SerdeError(#[from] serde_json::Error),
|
||||
#[error("JsonPayloadError {0}")]
|
||||
JsonPayloadError(#[from] JsonPayloadError),
|
||||
#[error("ParseSignatureError {0}")]
|
||||
ParseSignatureError(#[from] ParseSignatureError),
|
||||
}
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use futures::StreamExt;
|
||||
use log::info;
|
||||
use solana_client::nonblocking::pubsub_client::PubsubClient;
|
||||
use solana_client::nonblocking::rpc_client::RpcClient;
|
||||
use solana_client::rpc_config::{RpcBlockSubscribeConfig, RpcBlockSubscribeFilter};
|
||||
use solana_sdk::commitment_config::CommitmentConfig;
|
||||
use solana_sdk::signature::Signature;
|
||||
use solana_sdk::transaction;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
/// Background worker which listen's to new blocks
|
||||
/// and keeps a track of confirmed txs
|
||||
#[derive(Clone)]
|
||||
pub struct BlockListener {
|
||||
pub_sub_client: Arc<PubsubClient>,
|
||||
pub confirmed_txs: Arc<RwLock<HashSet<String>>>,
|
||||
rpc_client: Arc<RpcClient>,
|
||||
}
|
||||
|
||||
impl BlockListener {
|
||||
pub async fn new(rpc_client: Arc<RpcClient>, ws_url: &str) -> anyhow::Result<Self> {
|
||||
let pub_sub_client = Arc::new(PubsubClient::new(ws_url).await?);
|
||||
Ok(Self {
|
||||
pub_sub_client,
|
||||
rpc_client,
|
||||
confirmed_txs: Default::default(),
|
||||
})
|
||||
}
|
||||
|
||||
/// check if tx is in the confirmed cache
|
||||
///
|
||||
/// ## Return
|
||||
///
|
||||
/// None if transaction is un-confirmed
|
||||
/// Some(Err) in case of transaction failure
|
||||
/// Some(Ok(())) if tx is confirmed without failure
|
||||
pub async fn confirm_tx(&self, sig: Signature) -> Option<transaction::Result<()>> {
|
||||
let sig_string = sig.to_string();
|
||||
if self.confirmed_txs.read().await.contains(&sig_string) {
|
||||
info!("Confirmed {sig} from cache");
|
||||
Some(Ok(()))
|
||||
} else {
|
||||
let res = self.rpc_client.get_signature_status(&sig).await.unwrap();
|
||||
if res.is_some() {
|
||||
self.confirmed_txs.write().await.insert(sig_string);
|
||||
}
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
pub fn listen(self) -> JoinHandle<anyhow::Result<()>> {
|
||||
tokio::spawn(async move {
|
||||
info!("Subscribing to blocks");
|
||||
|
||||
let (mut recv, un_sub) = self
|
||||
.pub_sub_client
|
||||
.block_subscribe(
|
||||
RpcBlockSubscribeFilter::All,
|
||||
Some(RpcBlockSubscribeConfig {
|
||||
commitment: Some(CommitmentConfig::confirmed()),
|
||||
encoding: None,
|
||||
transaction_details: Some(
|
||||
solana_transaction_status::TransactionDetails::Signatures,
|
||||
),
|
||||
show_rewards: None,
|
||||
max_supported_transaction_version: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.context("Error calling block_subscribe")?;
|
||||
|
||||
info!("Listening to confirmed blocks");
|
||||
|
||||
while let Some(block) = recv.as_mut().next().await {
|
||||
let Some(block) = block.value.block else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(signatures) = block.signatures else {
|
||||
continue;
|
||||
};
|
||||
|
||||
for sig in signatures {
|
||||
info!("Confirmed {sig}");
|
||||
self.confirmed_txs.write().await.insert(sig);
|
||||
}
|
||||
}
|
||||
|
||||
info!("Stopped Listening to confirmed blocks");
|
||||
|
||||
un_sub();
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
|
@ -0,0 +1,5 @@
|
|||
mod block_listenser;
|
||||
mod tx_sender;
|
||||
|
||||
pub use block_listenser::*;
|
||||
pub use tx_sender::*;
|
|
@ -0,0 +1,136 @@
|
|||
use std::time::Duration;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use log::{info, warn};
|
||||
|
||||
use solana_client::nonblocking::tpu_client::TpuClient;
|
||||
|
||||
use solana_sdk::signature::Signature;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::{WireTransaction, DEFAULT_TX_RETRY_BATCH_SIZE};
|
||||
|
||||
use super::block_listenser::BlockListener;
|
||||
|
||||
/// Retry transactions to a maximum of `u16` times, keep a track of confirmed transactions
|
||||
#[derive(Clone)]
|
||||
pub struct TxSender {
|
||||
/// Transactions queue for retrying
|
||||
enqueued_txs: Arc<RwLock<HashMap<Signature, (WireTransaction, u16)>>>,
|
||||
/// block_listner
|
||||
block_listner: BlockListener,
|
||||
/// TpuClient to call the tpu port
|
||||
tpu_client: Arc<TpuClient>,
|
||||
}
|
||||
|
||||
impl TxSender {
|
||||
pub fn new(tpu_client: Arc<TpuClient>, block_listner: BlockListener) -> Self {
|
||||
Self {
|
||||
enqueued_txs: Default::default(),
|
||||
block_listner,
|
||||
tpu_client,
|
||||
}
|
||||
}
|
||||
/// en-queue transaction if it doesn't already exist
|
||||
pub async fn enqnueue_tx(&self, sig: Signature, raw_tx: WireTransaction, max_retries: u16) {
|
||||
if max_retries == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
if !self
|
||||
.block_listner
|
||||
.confirmed_txs
|
||||
.read()
|
||||
.await
|
||||
.contains(&sig.to_string())
|
||||
{
|
||||
info!("en-queuing {sig} with max retries {max_retries}");
|
||||
self.enqueued_txs
|
||||
.write()
|
||||
.await
|
||||
.insert(sig, (raw_tx, max_retries));
|
||||
|
||||
println!("{:?}", self.enqueued_txs.read().await.len());
|
||||
}
|
||||
}
|
||||
|
||||
/// retry enqued_tx(s)
|
||||
pub async fn retry_txs(&self) {
|
||||
let len = self.enqueued_txs.read().await.len();
|
||||
|
||||
info!("retrying {len} tx(s)");
|
||||
|
||||
if len == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut enqued_tx = self.enqueued_txs.write().await;
|
||||
|
||||
let mut tx_batch = Vec::with_capacity(enqued_tx.len() / DEFAULT_TX_RETRY_BATCH_SIZE);
|
||||
let mut stale_txs = vec![];
|
||||
|
||||
let mut batch_index = 0;
|
||||
|
||||
for (index, (sig, (tx, retries))) in enqued_tx.iter_mut().enumerate() {
|
||||
if self
|
||||
.block_listner
|
||||
.confirmed_txs
|
||||
.read()
|
||||
.await
|
||||
.contains(&sig.to_string())
|
||||
{
|
||||
stale_txs.push(sig.to_owned());
|
||||
continue;
|
||||
}
|
||||
|
||||
if index % DEFAULT_TX_RETRY_BATCH_SIZE == 0 {
|
||||
tx_batch.push(Vec::with_capacity(DEFAULT_TX_RETRY_BATCH_SIZE));
|
||||
batch_index += 1;
|
||||
}
|
||||
|
||||
tx_batch[batch_index - 1].push(tx.clone());
|
||||
|
||||
let Some(retries_left) = retries.checked_sub(1) else {
|
||||
stale_txs.push(sig.to_owned());
|
||||
continue;
|
||||
};
|
||||
|
||||
info!("retrying {sig} with {retries_left} retries left");
|
||||
|
||||
*retries = retries_left;
|
||||
}
|
||||
|
||||
// remove stale tx(s)
|
||||
for stale_tx in stale_txs {
|
||||
enqued_tx.remove(&stale_tx);
|
||||
}
|
||||
|
||||
for tx_batch in tx_batch {
|
||||
if let Err(err) = self
|
||||
.tpu_client
|
||||
.try_send_wire_transaction_batch(tx_batch)
|
||||
.await
|
||||
{
|
||||
warn!("{err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// retry and confirm transactions every 800ms (avg time to confirm tx)
|
||||
pub fn execute(self) -> JoinHandle<anyhow::Result<()>> {
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(800));
|
||||
|
||||
#[allow(unreachable_code)]
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
info!("{} tx(s) en-queued", self.enqueued_txs.read().await.len());
|
||||
interval.tick().await;
|
||||
self.retry_txs().await;
|
||||
}
|
||||
|
||||
// to give the correct type to JoinHandle
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
|
@ -0,0 +1,91 @@
|
|||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
use reqwest::Url;
|
||||
use solana_client::rpc_response::RpcVersionInfo;
|
||||
use solana_sdk::{
|
||||
message::Message, native_token::LAMPORTS_PER_SOL, pubkey::Pubkey, signature::Keypair,
|
||||
signer::Signer, system_instruction, transaction::Transaction,
|
||||
};
|
||||
|
||||
use lite_rpc::{bridge::LightBridge, encoding::BinaryEncoding, rpc::SendTransactionParams};
|
||||
|
||||
const RPC_ADDR: &str = "http://127.0.0.1:8899";
|
||||
const WS_ADDR: &str = "ws://127.0.0.1:8900";
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_version() {
|
||||
let light_bridge = LightBridge::new(Url::from_str(RPC_ADDR).unwrap(), WS_ADDR)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let RpcVersionInfo {
|
||||
solana_core,
|
||||
feature_set,
|
||||
} = light_bridge.get_version();
|
||||
let version_crate = solana_version::Version::default();
|
||||
|
||||
assert_eq!(solana_core, version_crate.to_string());
|
||||
assert_eq!(feature_set.unwrap(), version_crate.feature_set);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_send_transaction() {
|
||||
let light_bridge = LightBridge::new(Url::from_str(RPC_ADDR).unwrap(), WS_ADDR)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let payer = Keypair::new();
|
||||
|
||||
light_bridge
|
||||
.tpu_client
|
||||
.rpc_client()
|
||||
.request_airdrop(&payer.pubkey(), LAMPORTS_PER_SOL * 2)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
std::thread::sleep(Duration::from_secs(2));
|
||||
|
||||
let to_pubkey = Pubkey::new_unique();
|
||||
let instruction = system_instruction::transfer(&payer.pubkey(), &to_pubkey, LAMPORTS_PER_SOL);
|
||||
|
||||
let message = Message::new(&[instruction], Some(&payer.pubkey()));
|
||||
|
||||
let blockhash = light_bridge
|
||||
.tpu_client
|
||||
.rpc_client()
|
||||
.get_latest_blockhash()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let tx = Transaction::new(&[&payer], message, blockhash);
|
||||
let signature = tx.signatures[0];
|
||||
let encoded_signature = BinaryEncoding::Base58.encode(signature);
|
||||
|
||||
let tx = BinaryEncoding::Base58.encode(bincode::serialize(&tx).unwrap());
|
||||
|
||||
assert_eq!(
|
||||
light_bridge
|
||||
.send_transaction(SendTransactionParams(tx, Default::default()))
|
||||
.await
|
||||
.unwrap(),
|
||||
encoded_signature
|
||||
);
|
||||
|
||||
std::thread::sleep(Duration::from_secs(5));
|
||||
|
||||
let mut passed = false;
|
||||
|
||||
for _ in 0..100 {
|
||||
passed = light_bridge
|
||||
.tpu_client
|
||||
.rpc_client()
|
||||
.confirm_transaction(&signature)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
}
|
||||
|
||||
passed.then_some(()).unwrap();
|
||||
}
|
|
@ -1,10 +1,9 @@
|
|||
use log::info;
|
||||
use solana_client::nonblocking::rpc_client::RpcClient;
|
||||
use solana_client::rpc_client::SerializableTransaction;
|
||||
use solana_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction};
|
||||
use solana_sdk::native_token::LAMPORTS_PER_SOL;
|
||||
|
||||
use bench_utils::helpers::{generate_txs, new_funded_payer, wait_till_confirmed};
|
||||
use lite_client::{LiteClient, LOCAL_LIGHT_RPC_ADDR};
|
||||
use lite_bench_utils::{generate_txs, new_funded_payer, wait_till_confirmed};
|
||||
use simplelog::*;
|
||||
|
||||
const AMOUNT: usize = 100;
|
||||
|
|
|
@ -1,31 +0,0 @@
|
|||
import { Connection, Keypair, LAMPORTS_PER_SOL, Message, VersionedTransaction } from "@solana/web3.js";
|
||||
import { url } from "./urls";
|
||||
jest.setTimeout(60000);
|
||||
test('send and confirm transaction', async () => {
|
||||
const connection = new Connection(url, 'confirmed');
|
||||
const payer = Keypair.generate();
|
||||
|
||||
await connection.requestAirdrop(payer.publicKey, LAMPORTS_PER_SOL);
|
||||
const recentBlockhash = (await connection.getLatestBlockhash('confirmed')).blockhash;
|
||||
const versionedTx = new VersionedTransaction(
|
||||
new Message({
|
||||
header: {
|
||||
numRequiredSignatures: 1,
|
||||
numReadonlySignedAccounts: 0,
|
||||
numReadonlyUnsignedAccounts: 0,
|
||||
},
|
||||
recentBlockhash,
|
||||
instructions: [],
|
||||
accountKeys: [payer.publicKey.toBase58()],
|
||||
}),
|
||||
);
|
||||
|
||||
versionedTx.sign([payer]);
|
||||
const signature = await connection.sendTransaction(versionedTx);
|
||||
const latestBlockHash = await connection.getLatestBlockhash();
|
||||
await connection.confirmTransaction({
|
||||
blockhash: latestBlockHash.blockhash,
|
||||
lastValidBlockHeight: latestBlockHash.lastValidBlockHeight,
|
||||
signature: signature,
|
||||
});
|
||||
});
|
|
@ -1 +0,0 @@
|
|||
export const url = "http://127.0.0.1:9000";
|
|
@ -0,0 +1,64 @@
|
|||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use bench_utils::helpers::{create_transaction, new_funded_payer};
|
||||
use futures::future::join;
|
||||
use lite_rpc::{
|
||||
encoding::BinaryEncoding,
|
||||
workers::{BlockListener, TxSender},
|
||||
DEFAULT_RPC_ADDR, DEFAULT_WS_ADDR,
|
||||
};
|
||||
use solana_client::nonblocking::{rpc_client::RpcClient, tpu_client::TpuClient};
|
||||
|
||||
use solana_sdk::native_token::LAMPORTS_PER_SOL;
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_and_confirm_txs() {
|
||||
let rpc_client = Arc::new(RpcClient::new(DEFAULT_RPC_ADDR.to_string()));
|
||||
let tpu_client = Arc::new(
|
||||
TpuClient::new(rpc_client.clone(), DEFAULT_WS_ADDR, Default::default())
|
||||
.await
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
let block_listener = BlockListener::new(rpc_client.clone(), DEFAULT_WS_ADDR)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let tx_sender = TxSender::new(tpu_client, block_listener.clone());
|
||||
|
||||
let services = join(block_listener.clone().listen(), tx_sender.clone().execute());
|
||||
|
||||
let confirm = tokio::spawn(async move {
|
||||
let funded_payer = new_funded_payer(&rpc_client, LAMPORTS_PER_SOL * 2)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let blockhash = rpc_client.get_latest_blockhash().await.unwrap();
|
||||
|
||||
let tx = create_transaction(&funded_payer, blockhash);
|
||||
let sig = tx.signatures[0];
|
||||
let tx = BinaryEncoding::Base58.encode(bincode::serialize(&tx).unwrap());
|
||||
|
||||
tx_sender.enqnueue_tx(sig, tx.as_bytes().to_vec(), 2).await;
|
||||
|
||||
let sig = sig.to_string();
|
||||
|
||||
for _ in 0..2 {
|
||||
if block_listener.confirmed_txs.read().await.contains(&sig) {
|
||||
return;
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(800)).await;
|
||||
}
|
||||
|
||||
panic!("Tx {sig} not confirmed in 1600ms");
|
||||
});
|
||||
|
||||
tokio::select! {
|
||||
_ = services => {
|
||||
panic!("Services stopped unexpectedly")
|
||||
},
|
||||
_ = confirm => {}
|
||||
}
|
||||
}
|
|
@ -1,82 +0,0 @@
|
|||
{
|
||||
"compilerOptions": {
|
||||
/* Visit https://aka.ms/tsconfig to read more about this file */
|
||||
|
||||
/* Projects */
|
||||
// "incremental": true, /* Save .tsbuildinfo files to allow for incremental compilation of projects. */
|
||||
// "composite": true, /* Enable constraints that allow a TypeScript project to be used with project references. */
|
||||
// "tsBuildInfoFile": "./.tsbuildinfo", /* Specify the path to .tsbuildinfo incremental compilation file. */
|
||||
// "disableSourceOfProjectReferenceRedirect": true, /* Disable preferring source files instead of declaration files when referencing composite projects. */
|
||||
// "disableSolutionSearching": true, /* Opt a project out of multi-project reference checking when editing. */
|
||||
// "disableReferencedProjectLoad": true, /* Reduce the number of projects loaded automatically by TypeScript. */
|
||||
|
||||
/* Language and Environment */
|
||||
"target": "es2022", /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */
|
||||
// "lib": [], /* Specify a set of bundled library declaration files that describe the target runtime environment. */
|
||||
// "jsx": "preserve", /* Specify what JSX code is generated. */
|
||||
// "experimentalDecorators": true, /* Enable experimental support for TC39 stage 2 draft decorators. */
|
||||
// "emitDecoratorMetadata": true, /* Emit design-type metadata for decorated declarations in source files. */
|
||||
// "jsxFactory": "", /* Specify the JSX factory function used when targeting React JSX emit, e.g. 'React.createElement' or 'h'. */
|
||||
// "jsxFragmentFactory": "", /* Specify the JSX Fragment reference used for fragments when targeting React JSX emit e.g. 'React.Fragment' or 'Fragment'. */
|
||||
// "jsxImportSource": "", /* Specify module specifier used to import the JSX factory functions when using 'jsx: react-jsx*'. */
|
||||
// "reactNamespace": "", /* Specify the object invoked for 'createElement'. This only applies when targeting 'react' JSX emit. */
|
||||
// "noLib": true, /* Disable including any library files, including the default lib.d.ts. */
|
||||
// "useDefineForClassFields": true, /* Emit ECMAScript-standard-compliant class fields. */
|
||||
// "moduleDetection": "auto", /* Control what method is used to detect module-format JS files. */
|
||||
|
||||
/* Modules */
|
||||
"module": "commonjs", /* Specify what module code is generated. */
|
||||
// "rootDir": "./", /* Specify the root folder within your source files. */
|
||||
// "moduleResolution": "node", /* Specify how TypeScript looks up a file from a given module specifier. */
|
||||
// "baseUrl": "./", /* Specify the base directory to resolve non-relative module names. */
|
||||
// "paths": {}, /* Specify a set of entries that re-map imports to additional lookup locations. */
|
||||
// "rootDirs": [], /* Allow multiple folders to be treated as one when resolving modules. */
|
||||
// "typeRoots": [], /* Specify multiple folders that act like './node_modules/@types'. */
|
||||
// "types": [], /* Specify type package names to be included without being referenced in a source file. */
|
||||
// "allowUmdGlobalAccess": true, /* Allow accessing UMD globals from modules. */
|
||||
// "moduleSuffixes": [], /* List of file name suffixes to search when resolving a module. */
|
||||
"resolveJsonModule": true, /* Enable importing .json files. */
|
||||
// "noResolve": true, /* Disallow 'import's, 'require's or '<reference>'s from expanding the number of files TypeScript should add to a project. */
|
||||
|
||||
/* JavaScript Support */
|
||||
// "allowJs": true, /* Allow JavaScript files to be a part of your program. Use the 'checkJS' option to get errors from these files. */
|
||||
// "checkJs": true, /* Enable error reporting in type-checked JavaScript files. */
|
||||
// "maxNodeModuleJsDepth": 1, /* Specify the maximum folder depth used for checking JavaScript files from 'node_modules'. Only applicable with 'allowJs'. */
|
||||
|
||||
/* Emit */
|
||||
// "declaration": true, /* Generate .d.ts files from TypeScript and JavaScript files in your project. */
|
||||
// "declarationMap": true, /* Create sourcemaps for d.ts files. */
|
||||
// "emitDeclarationOnly": true, /* Only output d.ts files and not JavaScript files. */
|
||||
// "sourceMap": true, /* Create source map files for emitted JavaScript files. */
|
||||
// "outFile": "./", /* Specify a file that bundles all outputs into one JavaScript file. If 'declaration' is true, also designates a file that bundles all .d.ts output. */
|
||||
// "outDir": "./", /* Specify an output folder for all emitted files. */
|
||||
// "removeComments": true, /* Disable emitting comments. */
|
||||
// "noEmit": true, /* Disable emitting files from a compilation. */
|
||||
// "importHelpers": true, /* Allow importing helper functions from tslib once per project, instead of including them per-file. */
|
||||
// "importsNotUsedAsValues": "remove", /* Specify emit/checking behavior for imports that are only used for types. */
|
||||
// "downlevelIteration": true, /* Emit more compliant, but verbose and less performant JavaScript for iteration. */
|
||||
// "sourceRoot": "", /* Specify the root path for debuggers to find the reference source code. */
|
||||
// "mapRoot": "", /* Specify the location where debugger should locate map files instead of generated locations. */
|
||||
// "inlineSourceMap": true, /* Include sourcemap files inside the emitted JavaScript. */
|
||||
// "inlineSources": true, /* Include source code in the sourcemaps inside the emitted JavaScript. */
|
||||
// "emitBOM": true, /* Emit a UTF-8 Byte Order Mark (BOM) in the beginning of output files. */
|
||||
// "newLine": "crlf", /* Set the newline character for emitting files. */
|
||||
// "stripInternal": true, /* Disable emitting declarations that have '@internal' in their JSDoc comments. */
|
||||
// "noEmitHelpers": true, /* Disable generating custom helper functions like '__extends' in compiled output. */
|
||||
// "noEmitOnError": true, /* Disable emitting files if any type checking errors are reported. */
|
||||
// "preserveConstEnums": true, /* Disable erasing 'const enum' declarations in generated code. */
|
||||
// "declarationDir": "./", /* Specify the output directory for generated declaration files. */
|
||||
// "preserveValueImports": true, /* Preserve unused imported values in the JavaScript output that would otherwise be removed. */
|
||||
|
||||
/* Interop Constraints */
|
||||
// "isolatedModules": true, /* Ensure that each file can be safely transpiled without relying on other imports. */
|
||||
// "allowSyntheticDefaultImports": true, /* Allow 'import x from y' when a module doesn't have a default export. */
|
||||
"esModuleInterop": true, /* Emit additional JavaScript to ease support for importing CommonJS modules. This enables 'allowSyntheticDefaultImports' for type compatibility. */
|
||||
// "preserveSymlinks": true, /* Disable resolving symlinks to their realpath. This correlates to the same flag in node. */
|
||||
"forceConsistentCasingInFileNames": true, /* Ensure that casing is correct in imports. */
|
||||
|
||||
/* Type Checking */
|
||||
// "strict": true, /* Enable all strict type-checking options. */
|
||||
"skipLibCheck": true /* Skip type checking all .d.ts files. */
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue