fix stake setup

This commit is contained in:
GroovieGermanikus 2023-06-30 16:14:20 +02:00
parent 400b45e1e7
commit 0d4f83a6fc
1 changed files with 94 additions and 39 deletions

View File

@ -5,10 +5,13 @@ use std::path::Path;
use std::str::FromStr;
use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::{Duration, Instant};
use crossbeam_channel::Receiver;
use anyhow::bail;
use crossbeam_channel::{Receiver, RecvError, Sender};
use futures::future::join_all;
use log::{debug, info};
use itertools::join;
use log::{debug, error, info};
use quinn::TokioRuntime;
use solana_rpc_client::rpc_client::SerializableTransaction;
use solana_sdk::hash::Hash;
@ -34,6 +37,9 @@ use solana_lite_rpc_core::structures::identity_stakes::IdentityStakes;
use solana_lite_rpc_core::tx_store::empty_tx_store;
use solana_lite_rpc_services::tpu_utils::tpu_connection_manager::TpuConnectionManager;
const SAMPLE_TX_COUNT: u32 = 2;
const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000;
#[test]
@ -44,7 +50,7 @@ pub fn wireup_and_send_txs_via_channel() -> anyhow::Result<()> {
// solana quic streamer - see quic.rs -> rt()
const NUM_QUIC_STREAMER_WORKER_THREADS: usize = 1;
let runtime1 = Builder::new_multi_thread()
.worker_threads(NUM_QUIC_STREAMER_WORKER_THREADS)
.worker_threads(1)
.thread_name("quic-server")
.enable_all()
.build()
@ -59,32 +65,94 @@ pub fn wireup_and_send_txs_via_channel() -> anyhow::Result<()> {
.expect("failed to build tokio runtime for lite-rpc");
let literpc_validator_identity = Arc::new(Keypair::new());
let udp_listen_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
let listen_addr = udp_listen_socket.local_addr()?;
let literpc_validator_identity = Arc::new(Keypair::new());
let (inbound_packets_sender, inbound_packets_receiver) = crossbeam_channel::unbounded();
runtime1.block_on(async {
/// setup solana Quic streamer
// see log "Start quic server on UdpSocket { addr: 127.0.0.1:xxxxx, fd: 10 }"
let mut solana_quic_streamer = SolanaQuicStreamer::new_start_listening(udp_listen_socket);
let staked_nodes = StakedNodes {
total_stake: 100,
max_stake: 40,
min_stake: 0,
ip_stake_map: Default::default(),
pubkey_stake_map:
// literpc_validator_identity.as_ref()
if STAKE_CONNECTION {
solana_quic_streamer.add_stake_for_identity(
literpc_validator_identity.as_ref(),
30,
);
}
let mut map = HashMap::new();
map.insert(literpc_validator_identity.pubkey(),30);
map
} else { HashMap::default() }
};
let mut solana_quic_streamer = SolanaQuicStreamer::new_start_listening(udp_listen_socket, inbound_packets_sender, staked_nodes);
info!("end of quic streamer runtime");
});
runtime2.block_on(start_literpc_client(listen_addr, literpc_validator_identity))?;
let packet_consumer_jh = thread::spawn(move || {
info!("start pulling packets...");
let mut packet_count = 0;
while packet_count != SAMPLE_TX_COUNT {
match inbound_packets_receiver.recv() {
Ok(packet_batch) => {
info!("received packets: {}", packet_batch.len());
packet_count = packet_count + packet_batch.len() as u32;
for packet in packet_batch.iter() {
info!("packet: {:?}", packet);
let tx = packet.deserialize_slice::<VersionedTransaction, _>(..).unwrap();
debug!("read transaction from quic streaming server: {:?}", tx.get_signature());
// for ix in tx.message.instructions() {
// info!("instruction: {:?}", ix.data);
// }
}
if packet_count == 10 {
return;
}
}
Err(err) => {
error!("Error polling quic streaming channel: {}", err);
}
}
}
while let Ok(packet_batch) = inbound_packets_receiver.recv() {
info!("received packets: {}", packet_batch.len());
packet_count = packet_count + packet_batch.len() as u32;
for packet in packet_batch.iter() {
info!("packet: {:?}", packet);
let tx = packet.deserialize_slice::<VersionedTransaction, _>(..).unwrap();
debug!("read transaction from quic streaming server: {:?}", tx.get_signature());
// for ix in tx.message.instructions() {
// info!("instruction: {:?}", ix.data);
// }
}
if packet_count == 10 {
return;
}
}
});
// shutdown streamer
// solana_quic_streamer.shutdown().await;
packet_consumer_jh.join().unwrap();
Ok(())
}
@ -146,23 +214,22 @@ async fn start_literpc_client(listen_addrs: SocketAddr, literpc_validator_identi
)
.await;
let raw_sample_tx = build_raw_sample_tx();
// TODO this is a rase
sleep(Duration::from_millis(1500)).await;
for i in 0..SAMPLE_TX_COUNT {
let raw_sample_tx = build_raw_sample_tx(i);
debug!("broadcast transaction {}: {}", raw_sample_tx.0, format!("hi {}", i));
broadcast_sender.send(raw_sample_tx)?;
}
// TODO improve
sleep(Duration::from_millis(100)).await;
debug!("remaining: {}", broadcast_sender.len());
debug!("end of lite-rpc client runtime: {}", broadcast_sender.len());
Ok(())
}
#[tokio::test]
// taken from solana -> test_nonblocking_quic_client_multiple_writes
async fn solana_quic_streamer_start() {
tracing_subscriber::fmt::fmt()
.with_max_level(tracing::Level::TRACE).init();
let (sender, _receiver) = crossbeam_channel::unbounded();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
// will create random free port
@ -205,18 +272,12 @@ struct SolanaQuicStreamer {
exit: Arc<AtomicBool>,
join_handler: JoinHandle<()>,
stats: Arc<StreamStats>,
staked_nodes: Arc<RwLock<StakedNodes>>,
}
impl SolanaQuicStreamer {
pub fn get_socket_addr(&self) -> SocketAddr {
self.sock.local_addr().unwrap()
}
pub fn add_stake_for_identity(&self, identity: &Keypair, stake: u64) {
let mut lock = self.staked_nodes.write().unwrap();
let prev = lock.pubkey_stake_map.insert(identity.pubkey(), stake);
assert!(prev.is_none(), "identity {} already staked", identity.pubkey());
}
}
impl SolanaQuicStreamer {
@ -228,9 +289,8 @@ impl SolanaQuicStreamer {
}
impl SolanaQuicStreamer {
fn new_start_listening(udp_socket: UdpSocket) -> Self {
let (sender, _receiver) = crossbeam_channel::unbounded();
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
fn new_start_listening(udp_socket: UdpSocket, sender: Sender<PacketBatch>, staked_nodes: StakedNodes) -> Self {
let staked_nodes = Arc::new(RwLock::new(staked_nodes));
let exit = Arc::new(AtomicBool::new(false));
// keypair to derive the server tls certificate
let keypair = Keypair::new();
@ -261,7 +321,6 @@ impl SolanaQuicStreamer {
exit,
join_handler: jh,
stats,
staked_nodes,
}
}
}
@ -269,24 +328,20 @@ impl SolanaQuicStreamer {
const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr";
pub fn build_raw_sample_tx() -> (String, Vec<u8>) {
pub fn build_raw_sample_tx(i: u32) -> (String, Vec<u8>) {
// FIXME
let payer_keypair = keypair::read_keypair_file(
Path::new("/Users/stefan/mango/solana-wallet/solana-testnet-stefantest.json")
).unwrap();
let payer_keypair = Keypair::from_base58_string("rKiJ7H5UUp3JR18kNyTF1XPuwPKHEM7gMLWHZPWP5djrW1vSjfwjhvJrevxF9MPmUmN9gJMLHZdLMgc9ao78eKr");
let tx = build_sample_tx(&payer_keypair);
let tx = build_sample_tx(&payer_keypair, i);
let raw_tx = bincode::serialize::<VersionedTransaction>(&tx).expect("failed to serialize tx");
(tx.get_signature().to_string(), raw_tx)
}
fn build_sample_tx(payer_keypair: &Keypair) -> VersionedTransaction {
fn build_sample_tx(payer_keypair: &Keypair, i: u32) -> VersionedTransaction {
let blockhash = Hash::default();
create_memo_tx(b"hi", payer_keypair, blockhash).into()
create_memo_tx(format!("hi {}", i).as_bytes(), payer_keypair, blockhash).into()
}
fn create_memo_tx(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction {