introduce proxy request

This commit is contained in:
GroovieGermanikus 2023-06-26 08:05:18 +02:00
parent 47fd8b15d3
commit eb33095778
11 changed files with 168 additions and 68 deletions

View File

@ -9,5 +9,6 @@ pub mod structures;
pub mod subscription_handler;
pub mod subscription_sink;
pub mod tx_store;
pub mod proxy_request_format;
pub type AnyhowJoinHandle = tokio::task::JoinHandle<anyhow::Result<()>>;

View File

@ -1,4 +1,4 @@
use std::net::SocketAddrV4;
use std::net::{SocketAddr, SocketAddrV4};
use anyhow::Context;
use serde::{Deserialize, Serialize};
use solana_sdk::pubkey::Pubkey;
@ -6,7 +6,7 @@ use solana_sdk::transaction::VersionedTransaction;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TpuForwardingRequestV1 {
pub tpu_socket_addr: SocketAddrV4, // TODO is that correct
pub tpu_socket_addr: SocketAddr, // TODO is that correct
pub identity_tpunode: Pubkey,
pub transactions: Vec<VersionedTransaction>,
}
@ -17,7 +17,7 @@ pub enum TpuForwardingRequest {
}
impl TpuForwardingRequest {
pub fn new(tpu_socket_addr: SocketAddrV4, identity_tpunode: Pubkey,
pub fn new(tpu_socket_addr: SocketAddr, identity_tpunode: Pubkey,
transactions: Vec<VersionedTransaction>) -> Self {
TpuForwardingRequest::V1(
TpuForwardingRequestV1 {
@ -26,10 +26,28 @@ impl TpuForwardingRequest {
transactions,
})
}
pub fn get_tpu_socket_addr(&self) -> SocketAddr {
match self {
TpuForwardingRequest::V1(request) => request.tpu_socket_addr,
}
}
pub fn get_identity_tpunode(&self) -> Pubkey {
match self {
TpuForwardingRequest::V1(request) => request.identity_tpunode,
}
}
pub fn get_transactions(&self) -> Vec<VersionedTransaction> {
match self {
TpuForwardingRequest::V1(request) => request.transactions.clone(),
}
}
}
fn serialize_tpu_forwarding_request(
tpu_socket_addr: SocketAddrV4,
tpu_socket_addr: SocketAddr,
tpu_identity: Pubkey,
transactions: Vec<VersionedTransaction>) -> Vec<u8> {
@ -47,36 +65,3 @@ fn deserialize_tpu_forwarding_request(raw_proxy_request: &Vec<u8>) -> TpuForward
request
}
mod test {
use std::str::FromStr;
use log::info;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::transaction::Transaction;
use spl_memo::solana_program::message::VersionedMessage;
use crate::proxy_request_format::*;
#[test]
fn roundtrip() {
let payer_pubkey = Pubkey::new_unique();
let signer_pubkey = Pubkey::new_unique();
let memo_ix = spl_memo::build_memo("Hello world".as_bytes(), &[&signer_pubkey]);
let tx = Transaction::new_with_payer(&[memo_ix], Some(&payer_pubkey));
let wire_data = serialize_tpu_forwarding_request(
"127.0.0.1:5454".parse().unwrap(),
Pubkey::from_str("Bm8rtweCQ19ksNebrLY92H7x4bCaeDJSSmEeWqkdCeop").unwrap(),
vec![tx.into()]);
println!("wire_data: {:02X?}", wire_data);
let request = deserialize_tpu_forwarding_request(&wire_data);
let TpuForwardingRequest::V1(req1) = request;
assert_eq!(req1.transactions.len(), 1);
}
}

View File

@ -96,7 +96,7 @@ impl QuicConnectionUtils {
identity: Pubkey,
already_connected: bool,
endpoint: Endpoint,
addr: SocketAddr,
tpu_address: SocketAddr,
connection_timeout: Duration,
connection_retry_count: usize,
exit_signal: Arc<AtomicBool>,
@ -104,9 +104,9 @@ impl QuicConnectionUtils {
) -> Option<Connection> {
for _ in 0..connection_retry_count {
let conn = if already_connected {
Self::make_connection_0rtt(endpoint.clone(), addr, connection_timeout).await
Self::make_connection_0rtt(endpoint.clone(), tpu_address, connection_timeout).await
} else {
Self::make_connection(endpoint.clone(), addr, connection_timeout).await
Self::make_connection(endpoint.clone(), tpu_address, connection_timeout).await
};
match conn {
Ok(conn) => {

View File

@ -253,6 +253,7 @@ impl LiteRpcServer for LiteBridge {
.await
{
Ok(sig) => {
println!("sig: {}", sig);
TXS_IN_CHANNEL.inc();
Ok(sig)

View File

@ -37,6 +37,5 @@ async-trait = { workspace = true }
chrono = { workspace = true }
tokio = { version = "1.28.2", features = ["full", "fs"]}
rcgen = "0.9.3"
[dev-dependencies]
spl-memo = "3.0.1"

View File

@ -1,11 +1,15 @@
// DEPRECATED: use quic-proxy main.rs
use std::net::{IpAddr, Ipv4Addr};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::anyhow;
use log::info;
use rcgen::IsCa::SelfSignedOnly;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::transaction::Transaction;
use spl_memo::build_memo;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::time::timeout;
use lite_rpc_quic_forward_proxy::quic_util::ALPN_TPU_FORWARDPROXY_PROTOCOL_ID;
@ -42,7 +46,8 @@ async fn main() -> anyhow::Result<()> {
endpoint.rebind(socket).expect("rebind failed");
}
let request = "FOO BAR";
// let request = "FOO BAR";
let request = build_memo_tx_raw();
send.write_all(request.as_bytes())
.await
@ -65,3 +70,21 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}
fn build_memo_tx_raw() {
let payer_pubkey = Pubkey::new_unique();
let signer_pubkey = Pubkey::new_unique();
let memo_ix = spl_memo::build_memo("Hello world".as_bytes(), &[&signer_pubkey]);
let tx = Transaction::new_with_payer(&[memo_ix], Some(&payer_pubkey));
let wire_data = serialize_tpu_forwarding_request(
"127.0.0.1:5454".parse().unwrap(),
Pubkey::from_str("Bm8rtweCQ19ksNebrLY92H7x4bCaeDJSSmEeWqkdCeop").unwrap(),
vec![tx.into()]);
println!("wire_data: {:02X?}", wire_data);
wire_data
}

View File

@ -2,7 +2,6 @@ pub mod quic_util;
pub mod tls_config_provicer;
pub mod proxy;
pub mod test_client;
pub mod proxy_request_format;
pub use tls_config_provicer::SelfSignedTlsConfigProvider;
// pub mod tls_config;

View File

@ -15,6 +15,7 @@ use solana_sdk::transaction::VersionedTransaction;
use tokio::net::ToSocketAddrs;
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use solana_lite_rpc_core::proxy_request_format::TpuForwardingRequest;
use solana_lite_rpc_services::tpu_utils::tpu_connection_manager::ActiveConnection;
use crate::tls_config_provicer::{ProxyTlsConfigProvider, SelfSignedTlsConfigProvider};
@ -105,24 +106,26 @@ async fn handle_connection2(connecting: Connecting) -> anyhow::Result<()> {
Ok(s) => s,
};
tokio::spawn(async move {
let raw_tx = recv.read_to_end(100000).await
let raw_request = recv.read_to_end(100000).await
.unwrap();
// let str = std::str::from_utf8(&result).unwrap();
info!("read raw_tx {:02X?}", raw_tx);
info!("read proxy_request {:02X?}", raw_request);
let tx = match bincode::deserialize::<VersionedTransaction>(&raw_tx) {
Ok(tx) => tx,
let proxy_request = match bincode::deserialize::<TpuForwardingRequest>(&raw_request) {
Ok(raw_request) => raw_request,
Err(err) => {
bail!(err.to_string());
warn!("failed to deserialize proxy request: {:?}", err);
// bail!(err.to_string());
return;
}
};
info!("transaction details: {} sigs", tx.signatures.len());
info!("transaction details: {} sigs", proxy_request.get_transactions().len());
// ActiveConnection::new(e)new(tx).await;
// send_data(send).await;
Ok(())
// Ok(())
});
// info!("stream okey {:?}", stream);
// let fut = handle_request2(stream).await;
@ -133,7 +136,7 @@ async fn handle_connection2(connecting: Connecting) -> anyhow::Result<()> {
// }
// }
// );
}
} // -- loop
}
.await?;
Ok(())

View File

@ -1,4 +1,5 @@
use std::net::SocketAddr;
use std::net::{SocketAddr, SocketAddrV4};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::bail;
@ -6,11 +7,14 @@ use bytes::BufMut;
use log::info;
use quinn::{Endpoint, VarInt};
use rustls::ClientConfig;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::transaction::{Transaction, VersionedTransaction};
use tokio::io::AsyncWriteExt;
use solana_lite_rpc_core::AnyhowJoinHandle;
use crate::quic_util::ALPN_TPU_FORWARDPROXY_PROTOCOL_ID;
use crate::tls_config_provicer::ProxyTlsConfigProvider;
use solana_lite_rpc_core::quic_connection_utils::SkipServerVerification;
use solana_lite_rpc_core::proxy_request_format::TpuForwardingRequest;
use crate::test_client::sample_data_factory::build_raw_sample_tx;
pub struct QuicTestClient {
@ -49,9 +53,9 @@ impl QuicTestClient {
for si in 0..5 {
let (mut send, mut recv) = connection.open_bi().await?;
let raw = build_raw_sample_tx();
let raw = build_memo_tx_raw();
info!("raw: {:02X?}", raw);
send.write_all(format!("SAMPLE DATA on stream {}", si).as_bytes()).await?;
// send.write_all(format!("SAMPLE DATA on stream {}", si).as_bytes()).await?;
// shutdown stream
send.finish().await?;
@ -91,3 +95,32 @@ fn build_tls_config() -> ClientConfig {
return client_crypto;
}
fn build_memo_tx_raw() -> Vec<u8> {
let payer_pubkey = Pubkey::new_unique();
let signer_pubkey = Pubkey::new_unique();
let memo_ix = spl_memo::build_memo("Hello world".as_bytes(), &[&signer_pubkey]);
let tx = Transaction::new_with_payer(&[memo_ix], Some(&payer_pubkey));
let wire_data = serialize_tpu_forwarding_request(
"127.0.0.1:5454".parse().unwrap(),
Pubkey::from_str("Bm8rtweCQ19ksNebrLY92H7x4bCaeDJSSmEeWqkdCeop").unwrap(),
vec![tx.into()]);
println!("wire_data: {:02X?}", wire_data);
wire_data
}
fn serialize_tpu_forwarding_request(
tpu_socket_addr: SocketAddr,
tpu_identity: Pubkey,
transactions: Vec<VersionedTransaction>) -> Vec<u8> {
let request = TpuForwardingRequest::new(tpu_socket_addr, tpu_identity, transactions);
bincode::serialize(&request).expect("Expect to serialize transactions")
}

View File

@ -0,0 +1,32 @@
use std::str::FromStr;
use log::info;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::transaction::Transaction;
use spl_memo::solana_program::message::VersionedMessage;
use crate::proxy_request_format::*;
#[test]
fn roundtrip() {
let payer_pubkey = Pubkey::new_unique();
let signer_pubkey = Pubkey::new_unique();
let memo_ix = spl_memo::build_memo("Hello world".as_bytes(), &[&signer_pubkey]);
let tx = Transaction::new_with_payer(&[memo_ix], Some(&payer_pubkey));
let wire_data = serialize_tpu_forwarding_request(
"127.0.0.1:5454".parse().unwrap(),
Pubkey::from_str("Bm8rtweCQ19ksNebrLY92H7x4bCaeDJSSmEeWqkdCeop").unwrap(),
vec![tx.into()]);
println!("wire_data: {:02X?}", wire_data);
let request = deserialize_tpu_forwarding_request(&wire_data);
let TpuForwardingRequest::V1(req1) = request;
assert_eq!(req1.transactions.len(), 1);
}

View File

@ -17,8 +17,11 @@ use std::{
},
time::Duration,
};
use anyhow::bail;
use solana_sdk::transaction::VersionedTransaction;
use tokio::sync::{broadcast::Receiver, broadcast::Sender, RwLock};
use tokio::time::timeout;
use solana_lite_rpc_core::proxy_request_format::TpuForwardingRequest;
pub const QUIC_CONNECTION_TIMEOUT: Duration = Duration::from_secs(1);
pub const CONNECTION_RETRY_COUNT: usize = 10;
@ -74,7 +77,7 @@ impl ActiveConnection {
transaction_reciever: Receiver<(String, Vec<u8>)>,
exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>,
endpoint: Endpoint,
addr: SocketAddr,
tpu_address: SocketAddr,
exit_signal: Arc<AtomicBool>,
identity: Pubkey,
identity_stakes: IdentityStakes,
@ -146,7 +149,7 @@ impl ActiveConnection {
identity,
false,
endpoint.clone(),
addr,
tpu_address,
QUIC_CONNECTION_TIMEOUT,
CONNECTION_RETRY_COUNT,
exit_signal.clone(),
@ -177,7 +180,9 @@ impl ActiveConnection {
Self::send_copy_of_txs_to_quicproxy(
&txs, endpoint.clone(),
// proxy address
"127.0.0.1:11111".parse().unwrap()).await.unwrap();
"127.0.0.1:11111".parse().unwrap(),
tpu_address,
identity.clone()).await.unwrap();
QuicConnectionUtils::send_transaction_batch(
@ -185,7 +190,7 @@ impl ActiveConnection {
txs,
identity,
endpoint,
addr,
tpu_address,
exit_signal,
last_stable_id,
QUIC_CONNECTION_TIMEOUT,
@ -209,9 +214,28 @@ impl ActiveConnection {
NB_QUIC_ACTIVE_CONNECTIONS.dec();
}
async fn send_copy_of_txs_to_quicproxy(txs: &Vec<Vec<u8>>, endpoint: Endpoint, proxy_address: SocketAddr) -> anyhow::Result<()> {
let txs_copy = txs.clone();
let send_result = timeout(Duration::from_millis(500), Self::send_tx(endpoint, proxy_address, &txs_copy));
async fn send_copy_of_txs_to_quicproxy(raw_tx_batch: &Vec<Vec<u8>>, endpoint: Endpoint,
proxy_address: SocketAddr, tpu_target_address: SocketAddr,
identity: Pubkey) -> anyhow::Result<()> {
let raw_tx_batch_copy = raw_tx_batch.clone();
let mut txs = vec![];
for raw_tx in raw_tx_batch_copy {
let tx = match bincode::deserialize::<VersionedTransaction>(&raw_tx) {
Ok(tx) => tx,
Err(err) => {
bail!(err.to_string());
}
};
txs.push(tx);
}
let forwarding_request = TpuForwardingRequest::new(tpu_target_address, identity, txs);
let proxy_request_raw = bincode::serialize(&forwarding_request).expect("Expect to serialize transactions");
let send_result = timeout(Duration::from_millis(500), Self::send_proxy_request(endpoint, proxy_address, &proxy_request_raw));
match send_result.await {
Ok(..) => {
@ -224,13 +248,13 @@ impl ActiveConnection {
Ok(())
}
async fn send_tx(endpoint: Endpoint, proxy_address: SocketAddr, mut txs: &Vec<Vec<u8>>) -> anyhow::Result<()> {
async fn send_proxy_request(endpoint: Endpoint, proxy_address: SocketAddr, proxy_request_raw: &Vec<u8>) -> anyhow::Result<()> {
let mut connecting = endpoint.connect(proxy_address, "localhost")?;
let connection = timeout(Duration::from_millis(500), connecting).await??;
let (mut send, mut recv) = connection.open_bi().await?;
for tx in txs {
send.write_all(tx).await?;
}
send.write_all(proxy_request_raw).await?;
send.finish().await?;
Ok(())
@ -243,7 +267,7 @@ impl ActiveConnection {
identity_stakes: IdentityStakes,
) {
let endpoint = self.endpoint.clone();
let addr = self.tpu_address;
let tpu_address = self.tpu_address;
let exit_signal = self.exit_signal.clone();
let identity = self.identity;
let txs_sent_store = self.txs_sent_store.clone();
@ -252,7 +276,7 @@ impl ActiveConnection {
transaction_reciever,
exit_oneshot_channel,
endpoint,
addr,
tpu_address,
exit_signal,
identity,
identity_stakes,