add error context
This commit is contained in:
parent
62a2f76c93
commit
4844844e21
|
@ -24,7 +24,7 @@ RUST_LOG="error,solana_streamer::nonblocking::quic=debug" solana-test-validator
|
||||||
```
|
```
|
||||||
3. run quic proxy
|
3. run quic proxy
|
||||||
```bash
|
```bash
|
||||||
RUST_LOG=debug cargo run --bin solana-lite-rpc-quic-forward-proxy -- --proxy-rpc-addr 0.0.0.0:11111 --identity-keypair /pathto-test-ledger/validator-keypair.json
|
RUST_LOG=debug cargo run --bin solana-lite-rpc-quic-forward-proxy -- --proxy-listen-addr 0.0.0.0:11111 --identity-keypair /pathto-test-ledger/validator-keypair.json
|
||||||
```
|
```
|
||||||
2. run lite-rpc
|
2. run lite-rpc
|
||||||
```bash
|
```bash
|
||||||
|
|
|
@ -41,12 +41,14 @@ impl ProxyListener {
|
||||||
let forwarder_channel_copy = forwarder_channel.clone();
|
let forwarder_channel_copy = forwarder_channel.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let connection = connecting.await.context("handshake").unwrap();
|
let connection = connecting.await.context("handshake").unwrap();
|
||||||
match Self::accept_client_connection(connection, forwarder_channel_copy,
|
match Self::accept_client_connection(
|
||||||
exit_signal)
|
connection, forwarder_channel_copy, exit_signal)
|
||||||
.await {
|
.await {
|
||||||
Ok(()) => {}
|
Ok(()) => {
|
||||||
|
debug!("connection handles correctly");
|
||||||
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("setup connection failed: {reason}", reason = err);
|
error!("failed to accect connection from client: {reason} - skip", reason = err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -78,7 +78,7 @@ pub async fn tx_forwarder(validator_identity: ValidatorIdentity, mut transaction
|
||||||
debug!("forwarding transaction batch of size {} to address {}", transactions_batch.len(), packet.tpu_address);
|
debug!("forwarding transaction batch of size {} to address {}", transactions_batch.len(), packet.tpu_address);
|
||||||
|
|
||||||
let result = timeout_fallback(send_tx_batch_to_tpu(&auto_connection, &transactions_batch)).await
|
let result = timeout_fallback(send_tx_batch_to_tpu(&auto_connection, &transactions_batch)).await
|
||||||
.context("send txs to tpu");
|
.context(format!("send txs to tpu node {}", auto_connection.target_address));
|
||||||
|
|
||||||
if result.is_err() {
|
if result.is_err() {
|
||||||
warn!("got send_txs_to_tpu_static error {:?} - loop over errors", result);
|
warn!("got send_txs_to_tpu_static error {:?} - loop over errors", result);
|
||||||
|
@ -101,9 +101,10 @@ pub async fn tx_forwarder(validator_identity: ValidatorIdentity, mut transaction
|
||||||
|
|
||||||
let agent_channel = agents.get(&tpu_address).unwrap();
|
let agent_channel = agents.get(&tpu_address).unwrap();
|
||||||
|
|
||||||
agent_channel.send(forward_packet).await.unwrap();
|
timeout_fallback(agent_channel.send(forward_packet)).await
|
||||||
|
.context("send to agent channel")??;
|
||||||
|
|
||||||
} // -- loop over transactions from ustream channels
|
} // -- loop over transactions from upstream channels
|
||||||
|
|
||||||
// not reachable
|
// not reachable
|
||||||
}
|
}
|
||||||
|
@ -174,7 +175,7 @@ async fn send_tx_batch_to_tpu(
|
||||||
tx_raw
|
tx_raw
|
||||||
})
|
})
|
||||||
.map(|tx_raw| {
|
.map(|tx_raw| {
|
||||||
auto_connection.send(tx_raw) // ignores error
|
auto_connection.send_uni(tx_raw) // ignores error
|
||||||
});
|
});
|
||||||
|
|
||||||
join_all(all_send_fns).await;
|
join_all(all_send_fns).await;
|
||||||
|
|
|
@ -2,15 +2,18 @@
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::atomic::{AtomicU32, Ordering};
|
use std::sync::atomic::{AtomicU32, Ordering};
|
||||||
|
use anyhow::Context;
|
||||||
|
use log::warn;
|
||||||
use tracing::{debug, info};
|
use tracing::{debug, info};
|
||||||
use quinn::{Connection, Endpoint};
|
use quinn::{Connection, Endpoint};
|
||||||
use tokio::sync::{RwLock};
|
use tokio::sync::{RwLock};
|
||||||
|
use crate::util::timeout_fallback;
|
||||||
|
|
||||||
pub struct AutoReconnect {
|
pub struct AutoReconnect {
|
||||||
|
// endoint should be configures with keep-alive and idle timeout
|
||||||
endpoint: Endpoint,
|
endpoint: Endpoint,
|
||||||
// note: no read lock is used ATM
|
|
||||||
current: RwLock<Option<Connection>>,
|
current: RwLock<Option<Connection>>,
|
||||||
target_address: SocketAddr,
|
pub target_address: SocketAddr,
|
||||||
reconnect_count: AtomicU32,
|
reconnect_count: AtomicU32,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -24,25 +27,12 @@ impl AutoReconnect {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn roundtrip(&self, payload: Vec<u8>) -> anyhow::Result<Vec<u8>> {
|
pub async fn send_uni(&self, payload: Vec<u8>) -> anyhow::Result<()> {
|
||||||
// TOOD do smart error handling + reconnect
|
// TOOD do smart error handling + reconnect
|
||||||
// self.refresh().await.open_bi().await.unwrap()
|
let mut send_stream = timeout_fallback(self.refresh().await.open_uni()).await
|
||||||
let (mut send_stream, recv_stream) = self.refresh().await.open_bi().await?;
|
.context("open uni stream for sending")??;
|
||||||
send_stream.write_all(payload.as_slice()).await?;
|
send_stream.write_all(payload.as_slice()).await?;
|
||||||
send_stream.finish().await?;
|
send_stream.finish().await?;
|
||||||
|
|
||||||
let answer = recv_stream.reread_to_end(64 * 1024).await?;
|
|
||||||
|
|
||||||
Ok(answer)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn send(&self, payload: Vec<u8>) -> anyhow::Result<()> {
|
|
||||||
// TOOD do smart error handling + reconnect
|
|
||||||
let mut send_stream = self.refresh().await.open_uni().await?;
|
|
||||||
send_stream.write_all(payload.as_slice()).await?;
|
|
||||||
send_stream.finish().await?;
|
|
||||||
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,52 +40,46 @@ impl AutoReconnect {
|
||||||
pub async fn refresh(&self) -> Connection {
|
pub async fn refresh(&self) -> Connection {
|
||||||
{
|
{
|
||||||
let lock = self.current.read().await;
|
let lock = self.current.read().await;
|
||||||
let maybe_conn: &Option<Connection> = &*lock;
|
let maybe_conn = lock.as_ref();
|
||||||
if maybe_conn.as_ref().filter(|conn| conn.close_reason().is_none()).is_some() {
|
if maybe_conn.filter(|conn| conn.close_reason().is_none()).is_some() {
|
||||||
// let reuse = lock.unwrap().clone();
|
let reuse = maybe_conn.unwrap();
|
||||||
let reuse = maybe_conn.as_ref().unwrap();
|
|
||||||
debug!("Reuse connection {}", reuse.stable_id());
|
debug!("Reuse connection {}", reuse.stable_id());
|
||||||
return reuse.clone();
|
return reuse.clone();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let mut lock = self.current.write().await;
|
let mut lock = self.current.write().await;
|
||||||
match &*lock {
|
let maybe_conn = lock.as_ref();
|
||||||
|
return match maybe_conn {
|
||||||
Some(current) => {
|
Some(current) => {
|
||||||
|
|
||||||
if current.close_reason().is_some() {
|
if current.close_reason().is_some() {
|
||||||
info!("Connection is closed for reason: {:?}", current.close_reason());
|
let old_stable_id = current.stable_id();
|
||||||
// TODO log
|
warn!("Connection {} is closed for reason: {:?}", old_stable_id, current.close_reason());
|
||||||
|
|
||||||
let new_connection = self.create_connection().await;
|
let new_connection = self.create_connection().await;
|
||||||
*lock = Some(new_connection.clone());
|
*lock = Some(new_connection.clone());
|
||||||
// let old_conn = lock.replace(new_connection.clone());
|
// let old_conn = lock.replace(new_connection.clone());
|
||||||
self.reconnect_count.fetch_add(1, Ordering::SeqCst);
|
self.reconnect_count.fetch_add(1, Ordering::SeqCst);
|
||||||
|
|
||||||
// debug!("Replace closed connection {} with {} (retry {})",
|
debug!("Replace closed connection {} with {} (retry {})",
|
||||||
// old_conn.map(|c| c.stable_id().to_string()).unwrap_or("none".to_string()),
|
old_stable_id,
|
||||||
// new_connection.stable_id(),
|
new_connection.stable_id(),
|
||||||
// self.reconnect_count.load(Ordering::SeqCst));
|
self.reconnect_count.load(Ordering::SeqCst));
|
||||||
// TODO log old vs new stable_id
|
|
||||||
|
|
||||||
|
new_connection.clone()
|
||||||
return new_connection.clone();
|
|
||||||
} else {
|
} else {
|
||||||
debug!("Reuse connection {} with write-lock", current.stable_id());
|
debug!("Reuse connection {} with write-lock", current.stable_id());
|
||||||
return current.clone();
|
current.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
let new_connection = self.create_connection().await;
|
let new_connection = self.create_connection().await;
|
||||||
|
|
||||||
// let old_conn = lock.replace(new_connection.clone());
|
assert!(lock.is_none(), "old connection must be None");
|
||||||
// assert!(old_conn.is_none(), "old connection should be None");
|
|
||||||
*lock = Some(new_connection.clone());
|
*lock = Some(new_connection.clone());
|
||||||
// let old_conn = foo.replace(Some(new_connection.clone()));
|
// let old_conn = foo.replace(Some(new_connection.clone()));
|
||||||
// TODO log old vs new stable_id
|
|
||||||
debug!("Create initial connection {}", new_connection.stable_id());
|
debug!("Create initial connection {}", new_connection.stable_id());
|
||||||
|
|
||||||
return new_connection.clone();
|
new_connection.clone()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue