remove async_recursion and fix clippy bugs

This commit is contained in:
aniketfuryrocks 2023-04-08 16:06:43 +05:30
parent f11c7e4969
commit 713c19bf52
No known key found for this signature in database
GPG Key ID: 1B75EA596D89FF06
7 changed files with 56 additions and 97 deletions

12
Cargo.lock generated
View File

@ -310,17 +310,6 @@ dependencies = [
"event-listener",
]
[[package]]
name = "async-recursion"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e97ce7de6cf12de5d7226c73f5ba9811622f4db3a5b91b55c53e987e5f91cba"
dependencies = [
"proc-macro2 1.0.56",
"quote 1.0.26",
"syn 2.0.13",
]
[[package]]
name = "async-trait"
version = "0.1.64"
@ -2258,7 +2247,6 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-channel",
"async-recursion",
"base64 0.21.0",
"bench",
"bincode",

View File

@ -45,8 +45,6 @@ lazy_static = "1.4.0"
dotenv = "0.15.0"
async-channel = "1.8.0"
quinn = "0.9.3"
rustls = { version = "0.20.6", default-features = false }
async-recursion = "1.0.4"
[dev-dependencies]
bench = { path = "./bench" }
@ -85,6 +83,5 @@ lazy_static = { workspace = true }
dotenv = { workspace = true }
async-channel = { workspace = true }
quinn = { workspace = true }
rustls = { workspace = true }
rustls = { version = "0.20.6", default-features = false }
chrono = { workspace = true }
async-recursion = { workspace = true }

View File

@ -54,18 +54,9 @@ impl BlockStore {
blocks.insert(finalized_blockhash.clone(), finalized_block);
Ok(Self {
latest_processed_block: Arc::new(RwLock::new((
processed_blockhash.clone(),
processed_block,
))),
latest_confirmed_block: Arc::new(RwLock::new((
confirmed_blockhash.clone(),
confirmed_block,
))),
latest_finalized_block: Arc::new(RwLock::new((
finalized_blockhash.clone(),
finalized_block,
))),
latest_processed_block: Arc::new(RwLock::new((processed_blockhash, processed_block))),
latest_confirmed_block: Arc::new(RwLock::new((confirmed_blockhash, confirmed_block))),
latest_finalized_block: Arc::new(RwLock::new((finalized_blockhash, finalized_block))),
blocks,
last_add_block_metric: Arc::new(RwLock::new(Instant::now())),
})

View File

@ -9,7 +9,7 @@ use solana_sdk::signature::Keypair;
use std::env;
async fn get_identity_keypair(identity_from_cli: &String) -> Keypair {
if let Some(identity_env_var) = env::var("IDENTITY").ok() {
if let Ok(identity_env_var) = env::var("IDENTITY") {
if let Ok(identity_bytes) = serde_json::from_str::<Vec<u8>>(identity_env_var.as_str()) {
Keypair::from_bytes(identity_bytes.as_slice()).unwrap()
} else {

View File

@ -333,8 +333,8 @@ impl BlockListener {
slot: slot as i64,
leader_id: 0, // TODO: lookup leader
parent_slot: parent_slot as i64,
cluster_time: Utc.timestamp_millis_opt(block_time * 1000).unwrap(),
local_time: block_info.map(|b| b.processed_local_time).flatten(),
cluster_time: Utc.timestamp_millis_opt(block_time*1000).unwrap(),
local_time: block_info.and_then(|b| b.processed_local_time),
}))
.expect("Error sending block to postgres service");
@ -488,8 +488,8 @@ impl BlockListener {
// continuosly poll processed blocks and feed into blockstore
pub fn listen_processed(self) -> JoinHandle<anyhow::Result<()>> {
let rpc_client = self.rpc_client.clone();
let block_store = self.block_store.clone();
let rpc_client = self.rpc_client;
let block_store = self.block_store;
tokio::spawn(async move {
info!("processed block listner started");

View File

@ -8,7 +8,6 @@ use std::{
time::Duration,
};
use async_recursion::async_recursion;
use dashmap::DashMap;
use log::{error, info, trace, warn};
use prometheus::{core::GenericGauge, opts, register_int_gauge};
@ -67,7 +66,7 @@ impl ActiveConnection {
let connecting = endpoint.connect(addr, "connect")?;
let connection = match connecting.into_0rtt() {
Ok((connection, zero_rtt)) => {
if let Ok(_) = timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, zero_rtt).await {
if (timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, zero_rtt).await).is_ok() {
connection
} else {
return Err(ConnectionError::TimedOut.into());
@ -96,10 +95,10 @@ impl ActiveConnection {
for _i in 0..CONNECTION_RETRY_COUNT {
let conn = if already_connected {
info!("making make_connection_0rtt");
Self::make_connection_0rtt(endpoint.clone(), addr.clone()).await
Self::make_connection_0rtt(endpoint.clone(), addr).await
} else {
info!("making make_connection");
Self::make_connection(endpoint.clone(), addr.clone()).await
Self::make_connection(endpoint.clone(), addr).await
};
match conn {
Ok(conn) => {
@ -117,64 +116,50 @@ impl ActiveConnection {
None
}
#[async_recursion]
async fn open_unistream(
connection: &mut Option<Arc<Connection>>,
reconnect: bool,
mut reconnect: bool,
identity: Pubkey,
already_connected: bool,
endpoint: Endpoint,
addr: SocketAddr,
exit_signal: Arc<AtomicBool>,
) -> Option<SendStream> {
let (unistream, reconnect_and_try_again) = match connection {
Some(conn) => {
let unistream_maybe_timeout =
timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, conn.open_uni()).await;
match unistream_maybe_timeout {
Ok(unistream_res) => match unistream_res {
Ok(unistream) => (Some(unistream), false),
Err(_) => (None, reconnect),
},
Err(_) => {
// timed out
(None, false)
}
loop {
if let Some(connection) = connection {
match timeout(
QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC,
connection.open_uni(),
)
.await
{
Ok(Ok(unistream)) => return Some(unistream),
Ok(Err(_)) => (),
Err(_) => return None,
}
} else {
reconnect = true
}
None => (None, true),
};
if reconnect_and_try_again {
let conn = Self::connect(
if !reconnect {
return None;
}
// re connect
let Some(conn) = Self::connect(
identity,
already_connected,
endpoint.clone(),
addr.clone(),
addr,
exit_signal.clone(),
)
.await;
match conn {
Some(conn) => {
*connection = Some(conn);
Self::open_unistream(
connection,
false,
identity,
already_connected,
endpoint,
addr,
exit_signal,
)
.await
}
None => {
// connection with the peer is not possible
None
}
}
} else {
unistream
.await else {
return None;
};
// new connection don't reconnect now
*connection = Some(conn);
reconnect = false;
}
}
@ -220,10 +205,10 @@ impl ActiveConnection {
let unistream = Self::open_unistream(
&mut connection,
true,
identity.clone(),
identity,
already_connected,
endpoint.clone(),
addr.clone(),
addr,
exit_signal.clone(),
).await;
@ -280,7 +265,7 @@ impl ActiveConnection {
},
Err(_) => {
// timed out
if let Some(_) = &mut connection {
if connection.is_some() {
NB_QUIC_CONNECTIONS.dec();
connection = None;
}
@ -293,7 +278,7 @@ impl ActiveConnection {
};
}
if let Some(_) = &mut connection {
if connection.is_some() {
NB_QUIC_CONNECTIONS.dec();
}
NB_QUIC_TASKS.dec();
@ -305,9 +290,9 @@ impl ActiveConnection {
exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>,
) {
let endpoint = self.endpoint.clone();
let addr = self.tpu_address.clone();
let addr = self.tpu_address;
let exit_signal = self.exit_signal.clone();
let identity = self.identity.clone();
let identity = self.identity;
tokio::spawn(async move {
Self::listen(
transaction_reciever,
@ -383,18 +368,17 @@ impl TpuConnectionManager {
) {
NB_CONNECTIONS_TO_KEEP.set(connections_to_keep.len() as i64);
for (identity, socket_addr) in &connections_to_keep {
if self.identity_to_active_connection.get(&identity).is_none() {
if self.identity_to_active_connection.get(identity).is_none() {
trace!("added a connection for {}, {}", identity, socket_addr);
let endpoint = self.endpoints.get();
let active_connection =
ActiveConnection::new(endpoint, socket_addr.clone(), identity.clone());
let active_connection = ActiveConnection::new(endpoint, *socket_addr, *identity);
// using mpsc as a oneshot channel/ because with one shot channel we cannot reuse the reciever
let (sx, rx) = tokio::sync::mpsc::channel(1);
let transaction_reciever = transaction_sender.subscribe();
active_connection.start_listening(transaction_reciever, rx);
self.identity_to_active_connection.insert(
identity.clone(),
*identity,
Arc::new(ActiveConnectionWithExitChannel {
active_connection,
exit_stream: sx,
@ -407,7 +391,7 @@ impl TpuConnectionManager {
let collect_current_active_connections = self
.identity_to_active_connection
.iter()
.map(|x| (x.key().clone(), x.value().clone()))
.map(|x| (*x.key(), x.value().clone()))
.collect::<Vec<_>>();
for (identity, value) in collect_current_active_connections.iter() {
if !connections_to_keep.contains_key(identity) {

View File

@ -87,7 +87,7 @@ impl TpuService {
Ok(Self {
cluster_nodes: Arc::new(DashMap::new()),
current_slot: current_slot,
current_slot,
estimated_slot: Arc::new(AtomicU64::new(slot)),
leader_schedule: Arc::new(RwLock::new(VecDeque::new())),
fanout_slots,
@ -164,13 +164,12 @@ impl TpuService {
let current_slot = self.current_slot.load(Ordering::Relaxed);
let load_slot = if estimated_slot <= current_slot {
current_slot
} else if estimated_slot - current_slot > 8 {
estimated_slot - 8
} else {
if estimated_slot - current_slot > 8 {
estimated_slot - 8
} else {
current_slot
}
current_slot
};
let fanout = self.fanout_slots;
let last_slot = estimated_slot + fanout;
@ -190,7 +189,7 @@ impl TpuService {
.iter()
.filter(|x| x.tpu.is_some())
.map(|x| {
let mut addr = x.tpu.unwrap().clone();
let mut addr = x.tpu.unwrap();
// add quic port offset
addr.set_port(addr.port() + QUIC_PORT_OFFSET);
(Pubkey::from_str(x.pubkey.as_str()).unwrap(), addr)