pack list of target tpu nodes in the wire format

This commit is contained in:
GroovieGermanikus 2023-08-09 17:00:17 +02:00
parent 9a6542c7b7
commit 677801f5be
9 changed files with 161 additions and 124 deletions

View File

@ -12,16 +12,21 @@ use std::net::SocketAddr;
/// lite-rpc to proxy wire format
/// compat info: non-public format ATM
/// initial version
const FORMAT_VERSION1: u16 = 2400;
const FORMAT_VERSION1: u16 = 2500;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TxData(Signature, Vec<u8>);
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TpuNode {
pub tpu_socket_addr: SocketAddr,
pub identity_tpunode: Pubkey, // note: this is only used for debugging
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TpuForwardingRequest {
format_version: u16,
tpu_socket_addr: SocketAddr,
identity_tpunode: Pubkey, // note: this is only used for debugging
tpu_nodes: Vec<TpuNode>,
transactions: Vec<TxData>,
}
@ -29,50 +34,38 @@ impl Display for TpuForwardingRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"TpuForwardingRequest for tpu target {} with identity {}",
&self.get_tpu_socket_addr(),
&self.get_identity_tpunode(),
"TpuForwardingRequest t9 {} tpu nodes",
&self.tpu_nodes.len(),
)
}
}
impl TpuForwardingRequest {
pub fn new(
tpu_socket_addr: SocketAddr,
identity_tpunode: Pubkey,
tpu_fanout_nodes: &Vec<(SocketAddr, Pubkey)>,
transactions: Vec<VersionedTransaction>,
) -> Self {
TpuForwardingRequest {
format_version: FORMAT_VERSION1,
tpu_socket_addr,
identity_tpunode,
transactions: transactions.iter().map(Self::serialize).collect_vec(),
tpu_nodes: tpu_fanout_nodes.iter().map(|(tpu_addr, identity)| TpuNode {
tpu_socket_addr: *tpu_addr,
identity_tpunode: *identity,
}).collect_vec(),
transactions: transactions.iter()
.map(|tx| TxData(tx.signatures[0],
bincode::serialize(tx).unwrap()))
.collect_vec(),
}
}
fn serialize(tx: &VersionedTransaction) -> TxData {
TxData(tx.signatures[0], bincode::serialize(&tx).unwrap())
pub fn try_serialize_wire_format(&self) -> anyhow::Result<Vec<u8>> {
bincode::serialize(&self)
.context("serialize proxy request")
.map_err(anyhow::Error::from)
}
pub fn serialize_wire_format(&self) -> Vec<u8> {
bincode::serialize(&self).expect("Expect to serialize transactions")
pub fn get_tpu_nodes(&self) -> &Vec<TpuNode> {
&self.tpu_nodes
}
pub fn deserialize_from_raw_request(raw_proxy_request: &[u8]) -> TpuForwardingRequest {
let request = bincode::deserialize::<TpuForwardingRequest>(raw_proxy_request)
.context("deserialize proxy request")
.unwrap();
assert_eq!(request.format_version, 2301);
request
}
pub fn get_tpu_socket_addr(&self) -> SocketAddr {
self.tpu_socket_addr
}
pub fn get_identity_tpunode(&self) -> Pubkey {
self.identity_tpunode
}
}

View File

@ -137,14 +137,12 @@ impl ProxyListener {
.unwrap();
trace!("proxy request details: {}", proxy_request);
let _tpu_identity = proxy_request.get_identity_tpunode();
let tpu_address = proxy_request.get_tpu_socket_addr();
let txs = proxy_request.get_transaction_bytes();
debug!(
"enqueue transaction batch of size {} to address {}",
"enqueue transaction batch of size {} to {} tpu nodes",
txs.len(),
tpu_address
proxy_request.get_tpu_nodes().len(),
);
if forwarder_channel_copy.capacity() < forwarder_channel_copy.max_capacity()
{
@ -154,14 +152,22 @@ impl ProxyListener {
forwarder_channel_copy.max_capacity()
);
}
forwarder_channel_copy
.send_timeout(
ForwardPacket::new(txs, tpu_address, proxy_request.get_hash()),
FALLBACK_TIMEOUT,
)
.await
.context("sending internal packet from proxy to forwarder")
.unwrap();
for tpu_node in proxy_request.get_tpu_nodes() {
let tpu_address = tpu_node.tpu_socket_addr;
forwarder_channel_copy
.send_timeout(
ForwardPacket::new(
txs.clone(),
tpu_address,
proxy_request.get_hash(),
),
FALLBACK_TIMEOUT,
)
.await
.context("sending internal packet from proxy to forwarder")
.unwrap();
}
});
debug!(

View File

@ -90,6 +90,9 @@ pub async fn tx_forwarder(
let _exit_signal_copy = global_exit_signal.clone();
'tx_channel_loop: loop {
let timeout_result = timeout_fallback(per_connection_receiver.recv()).await;
if let Err(_elapsed) = timeout_result {
continue 'tx_channel_loop;
}
if global_exit_signal.load(Ordering::Relaxed) {
warn!("Caught global exit signal - stopping agent thread");
@ -100,9 +103,6 @@ pub async fn tx_forwarder(
return;
}
if let Err(_elapsed) = timeout_result {
continue 'tx_channel_loop;
}
let maybe_packet = timeout_result.unwrap();
if let Err(_recv_error) = maybe_packet {
@ -118,6 +118,14 @@ pub async fn tx_forwarder(
continue 'tx_channel_loop;
}
if auto_connection.is_permanent_dead().await {
warn!("Connection is considered permanently dead");
// while let Ok(more) = per_connection_receiver.try_recv() {
// // drain
// }
// continue 'tx_channel_loop;
}
let mut transactions_batch: Vec<Vec<u8>> = packet.transactions.clone();
'more: while let Ok(more) = per_connection_receiver.try_recv() {
@ -276,7 +284,7 @@ fn create_tpu_client_endpoint(
transport_config.max_concurrent_bidi_streams(VarInt::from_u32(0));
let timeout = IdleTimeout::try_from(Duration::from_millis(QUIC_MAX_TIMEOUT_MS as u64)).unwrap();
transport_config.max_idle_timeout(Some(timeout));
transport_config.keep_alive_interval(None);
transport_config.keep_alive_interval(Some(Duration::from_millis(500)));
config.transport_config(Arc::new(transport_config));

View File

@ -36,7 +36,7 @@ impl QuicForwardProxy {
pub async fn start_services(self) -> anyhow::Result<()> {
let exit_signal = Arc::new(AtomicBool::new(false));
let (forwarder_channel, forward_receiver) = tokio::sync::mpsc::channel(100_000);
let (forwarder_channel, forward_receiver) = tokio::sync::mpsc::channel(1000);
let proxy_listener =
proxy_listener::ProxyListener::new(self.proxy_listener_addr, self.tls_config);

View File

@ -14,16 +14,21 @@ use std::net::SocketAddr;
/// lite-rpc to proxy wire format
/// compat info: non-public format ATM
/// initial version
pub const FORMAT_VERSION1: u16 = 2400;
pub const FORMAT_VERSION1: u16 = 2500;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TxData(Signature, Vec<u8>);
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TpuNode {
pub tpu_socket_addr: SocketAddr,
pub identity_tpunode: Pubkey, // note: this is only used for debugging
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TpuForwardingRequest {
format_version: u16,
tpu_socket_addr: SocketAddr,
identity_tpunode: Pubkey, // note: this is only used for debugging
tpu_nodes: Vec<TpuNode>,
transactions: Vec<TxData>,
}
@ -31,32 +36,34 @@ impl Display for TpuForwardingRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"TpuForwardingRequest for tpu target {} with identity {}: payload {} tx",
&self.get_tpu_socket_addr(),
&self.get_identity_tpunode(),
&self.get_transaction_bytes().len()
"TpuForwardingRequest t9 {} tpu nodes",
&self.tpu_nodes.len(),
)
}
}
impl TpuForwardingRequest {
pub fn new(
tpu_socket_addr: SocketAddr,
identity_tpunode: Pubkey,
tpu_fanout_nodes: Vec<(SocketAddr, Pubkey)>,
transactions: Vec<VersionedTransaction>,
) -> Self {
TpuForwardingRequest {
format_version: FORMAT_VERSION1,
tpu_socket_addr,
identity_tpunode,
transactions: transactions.into_iter().map(Self::serialize).collect_vec(),
tpu_nodes: tpu_fanout_nodes
.iter()
.map(|(tpu_addr, identity)| TpuNode {
tpu_socket_addr: *tpu_addr,
identity_tpunode: *identity,
})
.collect_vec(),
transactions: transactions
.iter()
.map(|tx| TxData(tx.signatures[0], bincode::serialize(tx).unwrap()))
.collect_vec(),
}
}
fn serialize(tx: VersionedTransaction) -> TxData {
TxData(tx.signatures[0], bincode::serialize(&tx).unwrap())
}
// test only
pub fn try_serialize_wire_format(&self) -> anyhow::Result<Vec<u8>> {
bincode::serialize(&self)
.context("serialize proxy request")
@ -77,12 +84,8 @@ impl TpuForwardingRequest {
.map_err(anyhow::Error::from)
}
pub fn get_tpu_socket_addr(&self) -> SocketAddr {
self.tpu_socket_addr
}
pub fn get_identity_tpunode(&self) -> Pubkey {
self.identity_tpunode
pub fn get_tpu_nodes(&self) -> &Vec<TpuNode> {
&self.tpu_nodes
}
pub fn get_transaction_bytes(&self) -> Vec<Vec<u8>> {

View File

@ -3,7 +3,6 @@ use log::{info, warn};
use quinn::{Connection, ConnectionError, Endpoint};
use std::fmt;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::timeout;
@ -19,6 +18,7 @@ use tracing::debug;
/// * the ActiveConnection instance gets renewed on leader schedule change
const SEND_TIMEOUT: Duration = Duration::from_secs(5);
const MAX_RETRY_ATTEMPTS: u32 = 10;
enum ConnectionState {
NotConnected,
@ -34,7 +34,6 @@ pub struct AutoReconnect {
pub target_address: SocketAddr,
}
impl AutoReconnect {
pub fn new(endpoint: Endpoint, target_address: SocketAddr) -> Self {
Self {
@ -101,11 +100,11 @@ impl AutoReconnect {
Some(new_connection) => {
*lock = ConnectionState::Connection(new_connection.clone());
info!(
"Restored closed connection {} with {} to target {}",
old_stable_id,
new_connection.stable_id(),
self.target_address,
);
"Restored closed connection {} with {} to target {}",
old_stable_id,
new_connection.stable_id(),
self.target_address,
);
}
None => {
warn!(
@ -135,8 +134,7 @@ impl AutoReconnect {
);
}
None => {
warn!("Failed connect initially to target {}",
self.target_address);
warn!("Failed connect initially to target {}", self.target_address);
*lock = ConnectionState::FailedAttempt(1);
}
};
@ -151,12 +149,22 @@ impl AutoReconnect {
ConnectionState::FailedAttempt(attempts) => {
match self.create_connection().await {
Some(new_connection) => {
*lock = ConnectionState::Connection(new_connection.clone());
*lock = ConnectionState::Connection(new_connection);
}
None => {
warn!("Reconnect to {} failed",
self.target_address);
*lock = ConnectionState::FailedAttempt(attempts + 1);
if *attempts < MAX_RETRY_ATTEMPTS {
warn!(
"Reconnect to {} failed (attempt {})",
self.target_address, attempts
);
*lock = ConnectionState::FailedAttempt(attempts + 1);
} else {
warn!(
"Reconnect to {} failed permanently (attempt {})",
self.target_address, attempts
);
*lock = ConnectionState::PermanentError;
}
}
};
}

View File

@ -16,8 +16,10 @@ fn roundtrip() {
let tx = Transaction::new_with_payer(&[memo_ix], Some(&payer_pubkey));
let wire_data = TpuForwardingRequest::new(
"127.0.0.1:5454".parse().unwrap(),
Pubkey::from_str("Bm8rtweCQ19ksNebrLY92H7x4bCaeDJSSmEeWqkdCeop").unwrap(),
vec![(
"127.0.0.1:5454".parse().unwrap(),
Pubkey::from_str("Bm8rtweCQ19ksNebrLY92H7x4bCaeDJSSmEeWqkdCeop").unwrap(),
)],
vec![tx.into()],
)
.try_serialize_wire_format()
@ -27,7 +29,8 @@ fn roundtrip() {
let request = TpuForwardingRequest::try_deserialize_from_wire_format(&wire_data).unwrap();
assert!(request.get_tpu_socket_addr().is_ipv4());
assert_eq!(request.get_tpu_nodes().len(), 1);
assert!(request.get_tpu_nodes()[0].tpu_socket_addr.is_ipv4());
assert_eq!(request.get_transaction_bytes().len(), 1);
}

View File

@ -8,7 +8,7 @@ use anyhow::bail;
use std::time::Duration;
use itertools::Itertools;
use log::{debug, error, info, trace};
use log::{debug, error, info, trace, warn};
use quinn::{
ClientConfig, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig, VarInt,
};
@ -132,7 +132,6 @@ impl QuicProxyConnectionManager {
// note: this config must be aligned with quic-proxy's server config
let mut transport_config = TransportConfig::default();
let _timeout = IdleTimeout::try_from(Duration::from_secs(1)).unwrap();
// no remotely-initiated streams required
transport_config.max_concurrent_uni_streams(VarInt::from_u32(0));
transport_config.max_concurrent_bidi_streams(VarInt::from_u32(0));
@ -163,7 +162,6 @@ impl QuicProxyConnectionManager {
}
tokio::select! {
// TODO add timeout
tx = transaction_receiver.recv() => {
let first_tx: Vec<u8> = match tx {
@ -189,26 +187,26 @@ impl QuicProxyConnectionManager {
trace!("Sending copy of transaction batch of {} txs to {} tpu nodes via quic proxy",
txs.len(), tpu_fanout_nodes.len());
for target_tpu_node in tpu_fanout_nodes {
let send_result =
Self::send_copy_of_txs_to_quicproxy(
&txs, &auto_connection,
proxy_addr,
target_tpu_node.tpu_address,
target_tpu_node.tpu_identity)
.await.unwrap();
tpu_fanout_nodes)
.await;
if let Err(err) = send_result {
warn!("Failed to send copy of txs to quic proxy - skip (error {})", err);
}
},
};
}
} // -- loop
}
async fn send_copy_of_txs_to_quicproxy(
raw_tx_batch: &[Vec<u8>],
auto_connection: &AutoReconnect,
_proxy_address: SocketAddr,
tpu_target_address: SocketAddr,
target_tpu_identity: Pubkey,
tpu_fanout_nodes: Vec<TpuNode>,
) -> anyhow::Result<()> {
let mut txs = vec![];
@ -222,9 +220,13 @@ impl QuicProxyConnectionManager {
txs.push(tx);
}
let tpu_data = tpu_fanout_nodes.iter()
.map(|tpu| (tpu.tpu_address, tpu.tpu_identity))
.collect_vec();
for chunk in txs.chunks(CHUNK_SIZE_PER_STREAM) {
let forwarding_request =
TpuForwardingRequest::new(tpu_target_address, target_tpu_identity, chunk.into());
TpuForwardingRequest::new(&tpu_data, chunk.into());
debug!("forwarding_request: {}", forwarding_request);
let proxy_request_raw =

View File

@ -11,12 +11,15 @@ use tracing::debug;
/// copy of quic-proxy AutoReconnect - used that for reference
const SEND_TIMEOUT: Duration = Duration::from_secs(5);
const MAX_RETRY_ATTEMPTS: u32 = 10;
enum ConnectionState {
NotConnected,
Connection(Connection),
PermanentError,
FailedAttempt(u32),
}
pub struct AutoReconnect {
@ -24,19 +27,23 @@ pub struct AutoReconnect {
endpoint: Endpoint,
current: RwLock<ConnectionState>,
pub target_address: SocketAddr,
reconnect_count: AtomicU32,
}
impl AutoReconnect {
pub fn new(endpoint: Endpoint, target_address: SocketAddr) -> Self {
Self {
endpoint,
current: RwLock::new(ConnectionState::NotConnected),
target_address,
reconnect_count: AtomicU32::new(0),
}
}
pub async fn is_permanent_dead(&self) -> bool {
let lock = self.current.read().await;
matches!(&*lock, ConnectionState::PermanentError)
}
pub async fn send_uni(&self, payload: &Vec<u8>) -> anyhow::Result<()> {
let mut send_stream = timeout(SEND_TIMEOUT, self.refresh_and_get().await?.open_uni())
.await
@ -54,6 +61,7 @@ impl AutoReconnect {
ConnectionState::NotConnected => bail!("not connected"),
ConnectionState::Connection(conn) => Ok(conn.clone()),
ConnectionState::PermanentError => bail!("permanent error"),
ConnectionState::FailedAttempt(_) => bail!("failed connection attempt"),
}
}
@ -78,7 +86,7 @@ impl AutoReconnect {
if current.close_reason().is_some() {
let old_stable_id = current.stable_id();
warn!(
"Connection {} to {} is closed for reason: {:?}",
"Connection {} to {} is closed for reason: {:?} - reconnecting",
old_stable_id,
self.target_address,
current.close_reason()
@ -87,33 +95,19 @@ impl AutoReconnect {
match self.create_connection().await {
Some(new_connection) => {
*lock = ConnectionState::Connection(new_connection.clone());
let reconnect_count =
self.reconnect_count.fetch_add(1, Ordering::SeqCst);
if reconnect_count < 10 {
info!(
"Replace closed connection {} with {} to target {} (retry {})",
info!(
"Restored closed connection {} with {} to target {}",
old_stable_id,
new_connection.stable_id(),
self.target_address,
reconnect_count
);
} else {
*lock = ConnectionState::PermanentError;
warn!(
"Too many reconnect attempts to {}, last one with {} (retry {})",
self.target_address,
new_connection.stable_id(),
reconnect_count
);
}
}
None => {
warn!(
"Reconnect to {} failed for connection {}",
self.target_address, old_stable_id
);
*lock = ConnectionState::PermanentError;
*lock = ConnectionState::FailedAttempt(1);
}
};
} else {
@ -128,7 +122,6 @@ impl AutoReconnect {
match self.create_connection().await {
Some(new_connection) => {
*lock = ConnectionState::Connection(new_connection.clone());
self.reconnect_count.fetch_add(1, Ordering::SeqCst);
info!(
"Create initial connection {} to {}",
@ -137,11 +130,9 @@ impl AutoReconnect {
);
}
None => {
warn!(
"Initial connection to {} failed permanently",
self.target_address
);
*lock = ConnectionState::PermanentError;
warn!("Failed connect initially to target {}",
self.target_address);
*lock = ConnectionState::FailedAttempt(1);
}
};
}
@ -152,6 +143,28 @@ impl AutoReconnect {
self.target_address
);
}
ConnectionState::FailedAttempt(attempts) => {
match self.create_connection().await {
Some(new_connection) => {
*lock = ConnectionState::Connection(new_connection);
}
None => {
if *attempts < MAX_RETRY_ATTEMPTS {
warn!(
"Reconnect to {} failed (attempt {})",
self.target_address, attempts
);
*lock = ConnectionState::FailedAttempt(attempts + 1);
} else {
warn!(
"Reconnect to {} failed permanently (attempt {})",
self.target_address, attempts
);
*lock = ConnectionState::PermanentError;
}
}
};
}
}
}
@ -192,6 +205,7 @@ impl AutoReconnect {
),
ConnectionState::NotConnected => "n/c".to_string(),
ConnectionState::PermanentError => "n/a (permanent)".to_string(),
ConnectionState::FailedAttempt(_) => "fail".to_string(),
}
}
}