specifies protocol in contact-info get-socket api (#31602)

This commit is contained in:
behzad nouri 2023-05-12 16:16:20 +00:00 committed by GitHub
parent 5c8b5a2a68
commit 4e34abbf3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 198 additions and 148 deletions

View File

@ -9,7 +9,6 @@ use {
spl_convert::FromOtherSolana,
},
solana_client::{
connection_cache::ConnectionCache,
thin_client::ThinClient,
tpu_client::{TpuClient, TpuClientConfig},
},
@ -82,10 +81,10 @@ fn test_bench_tps_local_cluster(config: Config) {
let client = Arc::new(ThinClient::new(
cluster.entry_point_info.rpc().unwrap(),
match *cluster.connection_cache {
ConnectionCache::Quic(_) => cluster.entry_point_info.tpu_quic().unwrap(),
ConnectionCache::Udp(_) => cluster.entry_point_info.tpu().unwrap(),
},
cluster
.entry_point_info
.tpu(cluster.connection_cache.protocol())
.unwrap(),
cluster.connection_cache.clone(),
));

View File

@ -8,7 +8,7 @@ use {
unprocessed_transaction_storage::UnprocessedTransactionStorage,
},
solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
solana_gossip::{cluster_info::ClusterInfo, contact_info::LegacyContactInfo as ContactInfo},
solana_gossip::cluster_info::ClusterInfo,
solana_measure::measure_us,
solana_perf::{data_budget::DataBudget, packet::Packet},
solana_poh::poh_recorder::PohRecorder,
@ -218,15 +218,11 @@ impl Forwarder {
fn get_leader_and_addr(&self, forward_option: &ForwardOption) -> Option<(Pubkey, SocketAddr)> {
match forward_option {
ForwardOption::NotForward => None,
ForwardOption::ForwardTransaction => next_leader(
&self.cluster_info,
&self.poh_recorder,
match *self.connection_cache {
ConnectionCache::Quic(_) => ContactInfo::tpu_forwards_quic,
ConnectionCache::Udp(_) => ContactInfo::tpu_forwards,
},
),
ForwardOption::ForwardTransaction => {
next_leader(&self.cluster_info, &self.poh_recorder, |node| {
node.tpu_forwards(self.connection_cache.protocol())
})
}
ForwardOption::ForwardTpuVote => {
next_leader_tpu_vote(&self.cluster_info, &self.poh_recorder)
}

View File

@ -17,7 +17,7 @@ use {
staked_nodes_updater_service::StakedNodesUpdaterService,
},
crossbeam_channel::{unbounded, Receiver},
solana_client::connection_cache::ConnectionCache,
solana_client::connection_cache::{ConnectionCache, Protocol},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
blockstore::Blockstore, blockstore_processor::TransactionStatusSender,
@ -149,7 +149,7 @@ impl Tpu {
keypair,
cluster_info
.my_contact_info()
.tpu_quic()
.tpu(Protocol::QUIC)
.expect("Operator must spin up node with valid (QUIC) TPU address")
.ip(),
packet_sender,
@ -169,7 +169,7 @@ impl Tpu {
keypair,
cluster_info
.my_contact_info()
.tpu_forwards_quic()
.tpu_forwards(Protocol::QUIC)
.expect("Operator must spin up node with valid (QUIC) TPU-forwards address")
.ip(),
forwarded_packet_sender,

View File

@ -30,7 +30,7 @@ use {
crossbeam_channel::{bounded, unbounded, Receiver},
lazy_static::lazy_static,
rand::{thread_rng, Rng},
solana_client::connection_cache::ConnectionCache,
solana_client::connection_cache::{ConnectionCache, Protocol},
solana_entry::poh::compute_hash_time_ns,
solana_geyser_plugin_manager::{
geyser_plugin_service::GeyserPluginService, GeyserPluginManagerRequest,
@ -874,7 +874,7 @@ impl Validator {
Some((
&identity_keypair,
node.info
.tpu()
.tpu(Protocol::UDP)
.expect("Operator must spin up node with valid TPU address")
.ip(),
)),

View File

@ -3,8 +3,11 @@
use {
rand::{thread_rng, Rng},
solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
solana_gossip::{cluster_info::ClusterInfo, contact_info::LegacyContactInfo as ContactInfo},
solana_client::{
connection_cache::{ConnectionCache, Protocol},
tpu_connection::TpuConnection,
},
solana_gossip::cluster_info::ClusterInfo,
solana_poh::poh_recorder::PohRecorder,
std::{
sync::{
@ -48,7 +51,7 @@ impl WarmQuicCacheService {
{
maybe_last_leader = Some(leader_pubkey);
if let Some(Ok(addr)) = cluster_info
.lookup_contact_info(&leader_pubkey, ContactInfo::tpu_quic)
.lookup_contact_info(&leader_pubkey, |node| node.tpu(Protocol::QUIC))
{
let conn = connection_cache.get_connection(&addr);
if let Err(err) = conn.send_data(&[0u8]) {

View File

@ -49,6 +49,7 @@ use {
solana_core::serve_repair::{RepairProtocol, RepairRequestHeader, ServeRepair},
solana_dos::cli::*,
solana_gossip::{
contact_info::Protocol,
gossip_service::{discover, get_multi_client},
legacy_contact_info::LegacyContactInfo as ContactInfo,
},
@ -416,6 +417,11 @@ fn get_target(
entrypoint_addr: SocketAddr,
tpu_use_quic: bool,
) -> Option<(Pubkey, SocketAddr)> {
let protocol = if tpu_use_quic {
Protocol::QUIC
} else {
Protocol::UDP
};
let mut target = None;
if nodes.is_empty() {
// skip-gossip case
@ -434,24 +440,10 @@ fn get_target(
Mode::Gossip => Some((*node.pubkey(), node.gossip().unwrap())),
Mode::Tvu => Some((*node.pubkey(), node.tvu().unwrap())),
Mode::TvuForwards => Some((*node.pubkey(), node.tvu_forwards().unwrap())),
Mode::Tpu => Some((
*node.pubkey(),
if tpu_use_quic {
node.tpu_quic()
} else {
node.tpu()
}
.unwrap(),
)),
Mode::TpuForwards => Some((
*node.pubkey(),
if tpu_use_quic {
node.tpu_forwards_quic()
} else {
node.tpu_forwards()
}
.unwrap(),
)),
Mode::Tpu => Some((*node.pubkey(), node.tpu(protocol).unwrap())),
Mode::TpuForwards => {
Some((*node.pubkey(), node.tpu_forwards(protocol).unwrap()))
}
Mode::Repair => Some((*node.pubkey(), node.repair().unwrap())),
Mode::ServeRepair => Some((*node.pubkey(), node.serve_repair().unwrap())),
Mode::Rpc => None,
@ -964,7 +956,10 @@ pub mod test {
let client = Arc::new(ThinClient::new(
cluster.entry_point_info.rpc().unwrap(),
cluster.entry_point_info.tpu().unwrap(),
cluster
.entry_point_info
.tpu(cluster.connection_cache.protocol())
.unwrap(),
cluster.connection_cache.clone(),
));
@ -1100,10 +1095,10 @@ pub mod test {
let client = Arc::new(ThinClient::new(
cluster.entry_point_info.rpc().unwrap(),
match *cluster.connection_cache {
ConnectionCache::Quic(_) => cluster.entry_point_info.tpu_quic().unwrap(),
ConnectionCache::Udp(_) => cluster.entry_point_info.tpu().unwrap(),
},
cluster
.entry_point_info
.tpu(cluster.connection_cache.protocol())
.unwrap(),
cluster.connection_cache.clone(),
));

View File

@ -24,7 +24,7 @@ use {
cluster_info_metrics::{
submit_gossip_stats, Counter, GossipStats, ScopedTimer, TimedGuard,
},
contact_info::{ContactInfo, Error as ContactInfoError, LegacyContactInfo},
contact_info::{self, ContactInfo, Error as ContactInfoError, LegacyContactInfo},
crds::{Crds, Cursor, GossipRoute},
crds_gossip::CrdsGossip,
crds_gossip_error::CrdsGossipError,
@ -845,8 +845,8 @@ impl ClusterInfo {
},
self.addr_to_string(&ip_addr, &node.gossip().ok()),
self.addr_to_string(&ip_addr, &node.tpu_vote().ok()),
self.addr_to_string(&ip_addr, &node.tpu().ok()),
self.addr_to_string(&ip_addr, &node.tpu_forwards().ok()),
self.addr_to_string(&ip_addr, &node.tpu(contact_info::Protocol::UDP).ok()),
self.addr_to_string(&ip_addr, &node.tpu_forwards(contact_info::Protocol::UDP).ok()),
self.addr_to_string(&ip_addr, &node.tvu().ok()),
self.addr_to_string(&ip_addr, &node.tvu_forwards().ok()),
self.addr_to_string(&ip_addr, &node.repair().ok()),
@ -1144,7 +1144,7 @@ impl ClusterInfo {
) -> Result<(), GossipError> {
let tpu = tpu
.map(Ok)
.unwrap_or_else(|| self.my_contact_info().tpu())?;
.unwrap_or_else(|| self.my_contact_info().tpu(contact_info::Protocol::UDP))?;
let buf = serialize(transaction)?;
self.socket.send_to(&buf, tpu)?;
Ok(())
@ -1358,12 +1358,16 @@ impl ClusterInfo {
}
fn is_spy_node(node: &LegacyContactInfo, socket_addr_space: &SocketAddrSpace) -> bool {
![node.tpu(), node.gossip(), node.tvu()]
.into_iter()
.all(|addr| {
addr.map(|addr| socket_addr_space.check(&addr))
.unwrap_or_default()
})
![
node.tpu(contact_info::Protocol::UDP),
node.gossip(),
node.tvu(),
]
.into_iter()
.all(|addr| {
addr.map(|addr| socket_addr_space.check(&addr))
.unwrap_or_default()
})
}
/// compute broadcast table
@ -1373,7 +1377,8 @@ impl ClusterInfo {
gossip_crds
.get_nodes_contact_info()
.filter(|node| {
node.pubkey() != &self_pubkey && self.check_socket_addr_space(&node.tpu())
node.pubkey() != &self_pubkey
&& self.check_socket_addr_space(&node.tpu(contact_info::Protocol::UDP))
})
.cloned()
.collect()

View File

@ -1,4 +1,3 @@
pub use crate::legacy_contact_info::LegacyContactInfo;
use {
crate::crds_value::MAX_WALLCLOCK,
matches::{assert_matches, debug_assert_matches},
@ -19,6 +18,9 @@ use {
},
thiserror::Error,
};
pub use {
crate::legacy_contact_info::LegacyContactInfo, solana_client::connection_cache::Protocol,
};
const SOCKET_TAG_GOSSIP: u8 = 0;
const SOCKET_TAG_REPAIR: u8 = 1;
@ -115,6 +117,17 @@ macro_rules! get_socket {
Ok(socket)
}
};
($name:ident, $udp:ident, $quic:ident) => {
pub fn $name(&self, protocol: Protocol) -> Result<SocketAddr, Error> {
let key = match protocol {
Protocol::QUIC => $quic,
Protocol::UDP => $udp,
};
let socket = self.cache[usize::from(key)];
sanitize_socket(&socket)?;
Ok(socket)
}
};
}
macro_rules! set_socket {
@ -203,10 +216,12 @@ impl ContactInfo {
get_socket!(rpc, SOCKET_TAG_RPC);
get_socket!(rpc_pubsub, SOCKET_TAG_RPC_PUBSUB);
get_socket!(serve_repair, SOCKET_TAG_SERVE_REPAIR);
get_socket!(tpu, SOCKET_TAG_TPU);
get_socket!(tpu_forwards, SOCKET_TAG_TPU_FORWARDS);
get_socket!(tpu_forwards_quic, SOCKET_TAG_TPU_FORWARDS_QUIC);
get_socket!(tpu_quic, SOCKET_TAG_TPU_QUIC);
get_socket!(tpu, SOCKET_TAG_TPU, SOCKET_TAG_TPU_QUIC);
get_socket!(
tpu_forwards,
SOCKET_TAG_TPU_FORWARDS,
SOCKET_TAG_TPU_FORWARDS_QUIC
);
get_socket!(tpu_vote, SOCKET_TAG_TPU_VOTE);
get_socket!(tvu, SOCKET_TAG_TVU);
get_socket!(tvu_forwards, SOCKET_TAG_TVU_FORWARDS);
@ -705,19 +720,22 @@ mod tests {
node.serve_repair().ok().as_ref(),
sockets.get(&SOCKET_TAG_SERVE_REPAIR)
);
assert_eq!(node.tpu().ok().as_ref(), sockets.get(&SOCKET_TAG_TPU));
assert_eq!(
node.tpu_forwards().ok().as_ref(),
node.tpu(Protocol::UDP).ok().as_ref(),
sockets.get(&SOCKET_TAG_TPU)
);
assert_eq!(
node.tpu(Protocol::QUIC).ok().as_ref(),
sockets.get(&SOCKET_TAG_TPU_QUIC)
);
assert_eq!(
node.tpu_forwards(Protocol::UDP).ok().as_ref(),
sockets.get(&SOCKET_TAG_TPU_FORWARDS)
);
assert_eq!(
node.tpu_forwards_quic().ok().as_ref(),
node.tpu_forwards(Protocol::QUIC).ok().as_ref(),
sockets.get(&SOCKET_TAG_TPU_FORWARDS_QUIC)
);
assert_eq!(
node.tpu_quic().ok().as_ref(),
sockets.get(&SOCKET_TAG_TPU_QUIC)
);
assert_eq!(
node.tpu_vote().ok().as_ref(),
sockets.get(&SOCKET_TAG_TPU_VOTE)
@ -778,20 +796,34 @@ mod tests {
assert_eq!(old.rpc().unwrap(), node.rpc().unwrap());
assert_eq!(old.rpc_pubsub().unwrap(), node.rpc_pubsub().unwrap());
assert_eq!(old.serve_repair().unwrap(), node.serve_repair().unwrap());
assert_eq!(old.tpu().unwrap(), node.tpu().unwrap());
assert_eq!(old.tpu_forwards().unwrap(), node.tpu_forwards().unwrap());
assert_eq!(
node.tpu_forwards_quic().unwrap(),
old.tpu(Protocol::QUIC).unwrap(),
node.tpu(Protocol::QUIC).unwrap()
);
assert_eq!(
old.tpu(Protocol::UDP).unwrap(),
node.tpu(Protocol::UDP).unwrap()
);
assert_eq!(
old.tpu_forwards(Protocol::QUIC).unwrap(),
node.tpu_forwards(Protocol::QUIC).unwrap()
);
assert_eq!(
old.tpu_forwards(Protocol::UDP).unwrap(),
node.tpu_forwards(Protocol::UDP).unwrap()
);
assert_eq!(
node.tpu_forwards(Protocol::QUIC).unwrap(),
SocketAddr::new(
old.tpu_forwards().unwrap().ip(),
old.tpu_forwards().unwrap().port() + QUIC_PORT_OFFSET
old.tpu_forwards(Protocol::UDP).unwrap().ip(),
old.tpu_forwards(Protocol::UDP).unwrap().port() + QUIC_PORT_OFFSET
)
);
assert_eq!(
node.tpu_quic().unwrap(),
node.tpu(Protocol::QUIC).unwrap(),
SocketAddr::new(
old.tpu().unwrap().ip(),
old.tpu().unwrap().port() + QUIC_PORT_OFFSET
old.tpu(Protocol::UDP).unwrap().ip(),
old.tpu(Protocol::UDP).unwrap().port() + QUIC_PORT_OFFSET
)
);
assert_eq!(old.tpu_vote().unwrap(), node.tpu_vote().unwrap());
@ -860,23 +892,26 @@ mod tests {
.unwrap();
// TPU socket.
node.set_tpu(socket).unwrap();
assert_eq!(node.tpu().unwrap(), socket);
assert_eq!(node.tpu(Protocol::UDP).unwrap(), socket);
assert_eq!(
node.tpu_quic().unwrap(),
node.tpu(Protocol::QUIC).unwrap(),
SocketAddr::new(socket.ip(), socket.port() + QUIC_PORT_OFFSET)
);
node.remove_tpu();
assert_matches!(node.tpu(), Err(Error::InvalidPort(0)));
assert_matches!(node.tpu_quic(), Err(Error::InvalidPort(0)));
assert_matches!(node.tpu(Protocol::UDP), Err(Error::InvalidPort(0)));
assert_matches!(node.tpu(Protocol::QUIC), Err(Error::InvalidPort(0)));
// TPU forwards socket.
node.set_tpu_forwards(socket).unwrap();
assert_eq!(node.tpu_forwards().unwrap(), socket);
assert_eq!(node.tpu_forwards(Protocol::UDP).unwrap(), socket);
assert_eq!(
node.tpu_forwards_quic().unwrap(),
node.tpu_forwards(Protocol::QUIC).unwrap(),
SocketAddr::new(socket.ip(), socket.port() + QUIC_PORT_OFFSET)
);
node.remove_tpu_forwards();
assert_matches!(node.tpu_forwards(), Err(Error::InvalidPort(0)));
assert_matches!(node.tpu_forwards_quic(), Err(Error::InvalidPort(0)));
assert_matches!(node.tpu_forwards(Protocol::UDP), Err(Error::InvalidPort(0)));
assert_matches!(
node.tpu_forwards(Protocol::QUIC),
Err(Error::InvalidPort(0))
);
}
}

View File

@ -2,11 +2,10 @@ use {
crate::{
contact_info::{
get_quic_socket, sanitize_quic_offset, sanitize_socket, socket_addr_unspecified,
ContactInfo, Error,
ContactInfo, Error, Protocol,
},
crds_value::MAX_WALLCLOCK,
},
solana_client::connection_cache::Protocol,
solana_sdk::{
pubkey::Pubkey,
sanitize::{Sanitize, SanitizeError},
@ -65,6 +64,16 @@ macro_rules! get_socket {
Ok(socket).copied()
}
};
(@quic $name:ident) => {
pub fn $name(&self, protocol: Protocol) -> Result<SocketAddr, Error> {
let socket = &self.$name;
sanitize_socket(socket)?;
match protocol {
Protocol::QUIC => get_quic_socket(socket),
Protocol::UDP => Ok(socket).copied(),
}
}
};
}
macro_rules! set_socket {
@ -189,8 +198,8 @@ impl LegacyContactInfo {
get_socket!(tvu);
get_socket!(tvu_forwards);
get_socket!(repair);
get_socket!(tpu);
get_socket!(tpu_forwards);
get_socket!(@quic tpu);
get_socket!(@quic tpu_forwards);
get_socket!(tpu_vote);
get_socket!(rpc);
get_socket!(rpc_pubsub);
@ -199,14 +208,6 @@ impl LegacyContactInfo {
set_socket!(set_gossip, gossip);
set_socket!(set_rpc, rpc);
pub fn tpu_quic(&self) -> Result<SocketAddr, Error> {
self.tpu().and_then(|addr| get_quic_socket(&addr))
}
pub fn tpu_forwards_quic(&self) -> Result<SocketAddr, Error> {
self.tpu_forwards().and_then(|addr| get_quic_socket(&addr))
}
fn is_valid_ip(addr: IpAddr) -> bool {
!(addr.is_unspecified() || addr.is_multicast())
// || (addr.is_loopback() && !cfg_test))
@ -226,17 +227,14 @@ impl LegacyContactInfo {
protocol: Protocol,
socket_addr_space: &SocketAddrSpace,
) -> Option<(SocketAddr, SocketAddr)> {
let rpc = self
.rpc()
.ok()
.filter(|addr| socket_addr_space.check(addr))?;
let tpu = match protocol {
Protocol::QUIC => self.tpu_quic(),
Protocol::UDP => self.tpu(),
}
.ok()
.filter(|addr| socket_addr_space.check(addr))?;
Some((rpc, tpu))
Some((
self.rpc()
.ok()
.filter(|addr| socket_addr_space.check(addr))?,
self.tpu(protocol)
.ok()
.filter(|addr| socket_addr_space.check(addr))?,
))
}
}
@ -248,17 +246,28 @@ impl TryFrom<&ContactInfo> for LegacyContactInfo {
($name:ident) => {
node.$name().ok().unwrap_or_else(socket_addr_unspecified)
};
($name:ident, $protocol:expr) => {
node.$name($protocol)
.ok()
.unwrap_or_else(socket_addr_unspecified)
};
}
sanitize_quic_offset(&node.tpu().ok(), &node.tpu_quic().ok())?;
sanitize_quic_offset(&node.tpu_forwards().ok(), &node.tpu_forwards_quic().ok())?;
sanitize_quic_offset(
&node.tpu(Protocol::UDP).ok(),
&node.tpu(Protocol::QUIC).ok(),
)?;
sanitize_quic_offset(
&node.tpu_forwards(Protocol::UDP).ok(),
&node.tpu_forwards(Protocol::QUIC).ok(),
)?;
Ok(Self {
id: *node.pubkey(),
gossip: unwrap_socket!(gossip),
tvu: unwrap_socket!(tvu),
tvu_forwards: unwrap_socket!(tvu_forwards),
repair: unwrap_socket!(repair),
tpu: unwrap_socket!(tpu),
tpu_forwards: unwrap_socket!(tpu_forwards),
tpu: unwrap_socket!(tpu, Protocol::UDP),
tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP),
tpu_vote: unwrap_socket!(tpu_vote),
rpc: unwrap_socket!(rpc),
rpc_pubsub: unwrap_socket!(rpc_pubsub),

View File

@ -59,10 +59,7 @@ pub fn get_client_facing_addr<T: Borrow<LegacyContactInfo>>(
) -> (SocketAddr, SocketAddr) {
let contact_info = contact_info.borrow();
let rpc = contact_info.rpc().unwrap();
let mut tpu = match protocol {
Protocol::QUIC => contact_info.tpu_quic().unwrap(),
Protocol::UDP => contact_info.tpu().unwrap(),
};
let mut tpu = contact_info.tpu(protocol).unwrap();
// QUIC certificate authentication requires the IP Address to match. ContactInfo might have
// 0.0.0.0 as the IP instead of 127.0.0.1.
tpu.set_ip(IpAddr::V4(Ipv4Addr::LOCALHOST));

View File

@ -1,5 +1,5 @@
use {
solana_gossip::cluster_info::ClusterInfo,
solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol},
solana_poh::poh_recorder::PohRecorder,
solana_sdk::{clock::NUM_CONSECUTIVE_LEADER_SLOTS, pubkey::Pubkey},
solana_send_transaction_service::tpu_info::TpuInfo,
@ -33,7 +33,7 @@ impl TpuInfo for ClusterTpuInfo {
.cluster_info
.tpu_peers()
.into_iter()
.filter_map(|node| Some((*node.pubkey(), node.tpu().ok()?)))
.filter_map(|node| Some((*node.pubkey(), node.tpu(Protocol::UDP).ok()?)))
.collect();
}

View File

@ -14,7 +14,7 @@ use {
parse_token::{is_known_spl_token_id, token_amount_to_ui_amount, UiTokenAmount},
UiAccount, UiAccountEncoding, UiDataSliceConfig, MAX_BASE58_BYTES,
},
solana_client::connection_cache::ConnectionCache,
solana_client::connection_cache::{ConnectionCache, Protocol},
solana_entry::entry::Entry,
solana_faucet::faucet::request_airdrop_transaction,
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
@ -356,11 +356,10 @@ impl JsonRpcRequestProcessor {
);
ClusterInfo::new(contact_info, keypair, socket_addr_space)
});
let tpu_address = match *connection_cache {
ConnectionCache::Quic(_) => ContactInfo::tpu_quic,
ConnectionCache::Udp(_) => ContactInfo::tpu,
}(&cluster_info.my_contact_info())
.unwrap();
let tpu_address = cluster_info
.my_contact_info()
.tpu(connection_cache.protocol())
.unwrap();
let (sender, receiver) = unbounded();
SendTransactionService::new::<NullTpuInfo>(
tpu_address,
@ -3474,11 +3473,11 @@ pub mod rpc_full {
pubkey: contact_info.pubkey().to_string(),
gossip: contact_info.gossip().ok(),
tpu: contact_info
.tpu()
.tpu(Protocol::UDP)
.ok()
.filter(|addr| socket_addr_space.check(addr)),
tpu_quic: contact_info
.tpu_quic()
.tpu(Protocol::QUIC)
.ok()
.filter(|addr| socket_addr_space.check(addr)),
rpc: contact_info
@ -6422,7 +6421,11 @@ pub mod tests {
);
ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified)
});
let tpu_address = cluster_info.my_contact_info().tpu().unwrap();
let connection_cache = Arc::<ConnectionCache>::default();
let tpu_address = cluster_info
.my_contact_info()
.tpu(connection_cache.protocol())
.unwrap();
let (meta, receiver) = JsonRpcRequestProcessor::new(
JsonRpcConfig::default(),
None,
@ -6442,7 +6445,6 @@ pub mod tests {
Arc::new(AtomicU64::default()),
Arc::new(PrioritizationFeeCache::default()),
);
let connection_cache = Arc::new(ConnectionCache::default());
SendTransactionService::new::<NullTpuInfo>(
tpu_address,
&bank_forks,
@ -6691,7 +6693,11 @@ pub mod tests {
)));
let cluster_info = Arc::new(new_test_cluster_info());
let tpu_address = cluster_info.my_contact_info().tpu().unwrap();
let connection_cache = Arc::<ConnectionCache>::default();
let tpu_address = cluster_info
.my_contact_info()
.tpu(connection_cache.protocol())
.unwrap();
let (request_processor, receiver) = JsonRpcRequestProcessor::new(
JsonRpcConfig::default(),
None,
@ -6711,7 +6717,6 @@ pub mod tests {
Arc::new(AtomicU64::default()),
Arc::new(PrioritizationFeeCache::default()),
);
let connection_cache = Arc::new(ConnectionCache::default());
SendTransactionService::new::<NullTpuInfo>(
tpu_address,
&bank_forks,

View File

@ -20,7 +20,7 @@ use {
},
regex::Regex,
solana_client::connection_cache::ConnectionCache,
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
bigtable_upload::ConfirmedBlockUploadConfig,
bigtable_upload_service::BigTableUploadService, blockstore::Blockstore,
@ -378,11 +378,10 @@ impl JsonRpcService {
LARGEST_ACCOUNTS_CACHE_DURATION,
)));
let tpu_address = match *connection_cache {
ConnectionCache::Quic(_) => ContactInfo::tpu_quic,
ConnectionCache::Udp(_) => ContactInfo::tpu,
}(&cluster_info.my_contact_info())
.map_err(|err| format!("{err}"))?;
let tpu_address = cluster_info
.my_contact_info()
.tpu(connection_cache.protocol())
.map_err(|err| format!("{err}"))?;
// sadly, some parts of our current rpc implemention block the jsonrpc's
// _socket-listening_ event loop for too long, due to (blocking) long IO or intesive CPU,

View File

@ -15,6 +15,7 @@ use {
},
solana_gossip::{
cluster_info::{ClusterInfo, Node},
contact_info::Protocol,
gossip_service::discover_cluster,
socketaddr,
},
@ -883,7 +884,7 @@ impl TestValidator {
let vote_account_address = validator_vote_account.pubkey();
let rpc_url = format!("http://{}", node.info.rpc().unwrap());
let rpc_pubsub_url = format!("ws://{}/", node.info.rpc_pubsub().unwrap());
let tpu = node.info.tpu().unwrap();
let tpu = node.info.tpu(Protocol::UDP).unwrap();
let gossip = node.info.gossip().unwrap();
{

View File

@ -14,7 +14,7 @@ use {
tower_storage::TowerStorage, validator::ValidatorStartProgress,
},
solana_geyser_plugin_manager::GeyserPluginManagerRequest,
solana_gossip::contact_info::ContactInfo,
solana_gossip::contact_info::{ContactInfo, Protocol},
solana_rpc::rpc::verify_pubkey,
solana_rpc_client_api::{config::RpcAccountIndex, custom_error::RpcCustomError},
solana_runtime::accounts_index::AccountIndex,
@ -95,6 +95,11 @@ impl From<ContactInfo> for AdminRpcContactInfo {
SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), /*port:*/ 0u16)
})
};
($name:ident, $protocol:expr) => {
node.$name($protocol).unwrap_or_else(|_| {
SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), /*port:*/ 0u16)
})
};
}
Self {
id: node.pubkey().to_string(),
@ -103,8 +108,8 @@ impl From<ContactInfo> for AdminRpcContactInfo {
tvu: unwrap_socket!(tvu),
tvu_forwards: unwrap_socket!(tvu_forwards),
repair: unwrap_socket!(repair),
tpu: unwrap_socket!(tpu),
tpu_forwards: unwrap_socket!(tpu_forwards),
tpu: unwrap_socket!(tpu, Protocol::UDP),
tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP),
tpu_vote: unwrap_socket!(tpu_vote),
rpc: unwrap_socket!(rpc),
rpc_pubsub: unwrap_socket!(rpc_pubsub),
@ -614,7 +619,7 @@ impl AdminRpc for AdminRpcImpl {
post_init
.cluster_info
.my_contact_info()
.tpu()
.tpu(Protocol::UDP)
.map_err(|err| {
error!(
"The public TPU address isn't being published. \
@ -634,8 +639,8 @@ impl AdminRpc for AdminRpcImpl {
let my_contact_info = post_init.cluster_info.my_contact_info();
warn!(
"Public TPU addresses set to {:?} (udp) and {:?} (quic)",
my_contact_info.tpu(),
my_contact_info.tpu_quic(),
my_contact_info.tpu(Protocol::UDP),
my_contact_info.tpu(Protocol::QUIC),
);
Ok(())
})
@ -652,7 +657,7 @@ impl AdminRpc for AdminRpcImpl {
post_init
.cluster_info
.my_contact_info()
.tpu_forwards()
.tpu_forwards(Protocol::UDP)
.map_err(|err| {
error!(
"The public TPU Forwards address isn't being published. \
@ -672,8 +677,8 @@ impl AdminRpc for AdminRpcImpl {
let my_contact_info = post_init.cluster_info.my_contact_info();
warn!(
"Public TPU Forwards addresses set to {:?} (udp) and {:?} (quic)",
my_contact_info.tpu_forwards(),
my_contact_info.tpu_forwards_quic(),
my_contact_info.tpu_forwards(Protocol::UDP),
my_contact_info.tpu_forwards(Protocol::QUIC),
);
Ok(())
})

View File

@ -8,6 +8,7 @@ use {
solana_genesis_utils::download_then_check_genesis_hash,
solana_gossip::{
cluster_info::{ClusterInfo, Node},
contact_info::Protocol,
crds_value,
gossip_service::GossipService,
legacy_contact_info::LegacyContactInfo as ContactInfo,
@ -83,11 +84,11 @@ fn verify_reachable_ports(
if verify_address(&node.info.serve_repair().ok()) {
udp_sockets.push(&node.sockets.serve_repair);
}
if verify_address(&node.info.tpu().ok()) {
if verify_address(&node.info.tpu(Protocol::UDP).ok()) {
udp_sockets.extend(node.sockets.tpu.iter());
udp_sockets.push(&node.sockets.tpu_quic);
}
if verify_address(&node.info.tpu_forwards().ok()) {
if verify_address(&node.info.tpu_forwards(Protocol::UDP).ok()) {
udp_sockets.extend(node.sockets.tpu_forwards.iter());
udp_sockets.push(&node.sockets.tpu_forwards_quic);
}