adds hash domain to ping-pong protocol (#27193)

In order to maintain backward compatibility, for now the responding node
will hash the token both with and without domain so that the other node
will accept the response regardless of its upgrade status.
Once the cluster has upgraded to the new code, we will remove the legacy
domain = false case.
This commit is contained in:
behzad nouri 2022-08-18 22:39:31 +00:00 committed by GitHub
parent d8380e4d4a
commit 6928b2a5af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 72 additions and 28 deletions

View File

@ -425,16 +425,21 @@ impl AncestorHashesService {
stats.invalid_packets += 1; stats.invalid_packets += 1;
return None; return None;
} }
if ping.verify() { if !ping.verify() {
stats.ping_err_verify_count += 1;
return None;
}
stats.ping_count += 1; stats.ping_count += 1;
if let Ok(pong) = Pong::new(&ping, keypair) { // Respond both with and without domain so that the other node
// will accept the response regardless of its upgrade status.
// TODO: remove domain = false once cluster is upgraded.
for domain in [false, true] {
if let Ok(pong) = Pong::new(domain, &ping, keypair) {
let pong = RepairProtocol::Pong(pong); let pong = RepairProtocol::Pong(pong);
if let Ok(pong_bytes) = serialize(&pong) { if let Ok(pong_bytes) = serialize(&pong) {
let _ignore = ancestor_socket.send_to(&pong_bytes[..], from_addr); let _ignore = ancestor_socket.send_to(&pong_bytes[..], from_addr);
} }
} }
} else {
stats.ping_err_verify_count += 1;
} }
None None
} }

View File

@ -1044,7 +1044,11 @@ impl ServeRepair {
} }
packet.meta.set_discard(true); packet.meta.set_discard(true);
stats.ping_count += 1; stats.ping_count += 1;
if let Ok(pong) = Pong::new(&ping, keypair) { // Respond both with and without domain so that the other node
// will accept the response regardless of its upgrade status.
// TODO: remove domain = false once cluster is upgraded.
for domain in [false, true] {
if let Ok(pong) = Pong::new(domain, &ping, keypair) {
let pong = RepairProtocol::Pong(pong); let pong = RepairProtocol::Pong(pong);
if let Ok(pong_bytes) = serialize(&pong) { if let Ok(pong_bytes) = serialize(&pong) {
let from_addr = packet.meta.socket_addr(); let from_addr = packet.meta.socket_addr();
@ -1053,6 +1057,7 @@ impl ServeRepair {
} }
} }
} }
}
if !pending_pongs.is_empty() { if !pending_pongs.is_empty() {
if let Err(SendPktsError::IoError(err, num_failed)) = if let Err(SendPktsError::IoError(err, num_failed)) =
batch_send(repair_socket, &pending_pongs) batch_send(repair_socket, &pending_pongs)

View File

@ -2170,14 +2170,18 @@ impl ClusterInfo {
I: IntoIterator<Item = (SocketAddr, Ping)>, I: IntoIterator<Item = (SocketAddr, Ping)>,
{ {
let keypair = self.keypair(); let keypair = self.keypair();
let pongs_and_dests: Vec<_> = pings let mut pongs_and_dests = Vec::new();
.into_iter() for (addr, ping) in pings {
.filter_map(|(addr, ping)| { // Respond both with and without domain so that the other node will
let pong = Pong::new(&ping, &keypair).ok()?; // accept the response regardless of its upgrade status.
// TODO: remove domain = false once cluster is upgraded.
for domain in [false, true] {
if let Ok(pong) = Pong::new(domain, &ping, &keypair) {
let pong = Protocol::PongMessage(pong); let pong = Protocol::PongMessage(pong);
Some((addr, pong)) pongs_and_dests.push((addr, pong));
}) }
.collect(); }
}
if pongs_and_dests.is_empty() { if pongs_and_dests.is_empty() {
None None
} else { } else {
@ -3287,7 +3291,9 @@ RPC Enabled Nodes: 1"#;
let pongs: Vec<(SocketAddr, Pong)> = pings let pongs: Vec<(SocketAddr, Pong)> = pings
.iter() .iter()
.zip(&remote_nodes) .zip(&remote_nodes)
.map(|(ping, (keypair, socket))| (*socket, Pong::new(ping, keypair).unwrap())) .map(|(ping, (keypair, socket))| {
(*socket, Pong::new(/*domain:*/ true, ping, keypair).unwrap())
})
.collect(); .collect();
let now = now + Duration::from_millis(1); let now = now + Duration::from_millis(1);
cluster_info.handle_batch_pong_messages(pongs, now); cluster_info.handle_batch_pong_messages(pongs, now);
@ -3330,7 +3336,7 @@ RPC Enabled Nodes: 1"#;
.collect(); .collect();
let pongs: Vec<_> = pings let pongs: Vec<_> = pings
.iter() .iter()
.map(|ping| Pong::new(ping, &this_node).unwrap()) .map(|ping| Pong::new(/*domain:*/ false, ping, &this_node).unwrap())
.collect(); .collect();
let recycler = PacketBatchRecycler::default(); let recycler = PacketBatchRecycler::default();
let packets = cluster_info let packets = cluster_info
@ -3342,9 +3348,9 @@ RPC Enabled Nodes: 1"#;
&recycler, &recycler,
) )
.unwrap(); .unwrap();
assert_eq!(remote_nodes.len(), packets.len()); assert_eq!(remote_nodes.len() * 2, packets.len());
for (packet, (_, socket), pong) in izip!( for (packet, (_, socket), pong) in izip!(
packets.into_iter(), packets.into_iter().step_by(2),
remote_nodes.into_iter(), remote_nodes.into_iter(),
pongs.into_iter() pongs.into_iter()
) { ) {

View File

@ -16,6 +16,8 @@ use {
}, },
}; };
const PING_PONG_HASH_PREFIX: &[u8] = "SOLANA_PING_PONG".as_bytes();
#[derive(AbiExample, Debug, Deserialize, Serialize)] #[derive(AbiExample, Debug, Deserialize, Serialize)]
pub struct Ping<T> { pub struct Ping<T> {
from: Pubkey, from: Pubkey,
@ -100,8 +102,17 @@ impl<T: Serialize> Signable for Ping<T> {
} }
impl Pong { impl Pong {
pub fn new<T: Serialize>(ping: &Ping<T>, keypair: &Keypair) -> Result<Self, Error> { pub fn new<T: Serialize>(
let hash = hash::hash(&serialize(&ping.token)?); domain: bool,
ping: &Ping<T>,
keypair: &Keypair,
) -> Result<Self, Error> {
let token = serialize(&ping.token)?;
let hash = if domain {
hash::hashv(&[PING_PONG_HASH_PREFIX, &token])
} else {
hash::hash(&token)
};
let pong = Pong { let pong = Pong {
from: keypair.pubkey(), from: keypair.pubkey(),
hash, hash,
@ -187,9 +198,15 @@ impl PingCache {
Some(t) if now.saturating_duration_since(*t) < delay => None, Some(t) if now.saturating_duration_since(*t) < delay => None,
_ => { _ => {
let ping = pingf()?; let ping = pingf()?;
let hash = hash::hash(&serialize(&ping.token).ok()?); let token = serialize(&ping.token).ok()?;
self.pings.put(node, now); // For backward compatibility, for now responses both with and
// without domain are accepted.
// TODO: remove no domain case once cluster is upgraded.
let hash = hash::hash(&token);
self.pending_cache.put(hash, node); self.pending_cache.put(hash, node);
let hash = hash::hashv(&[PING_PONG_HASH_PREFIX, &token]);
self.pending_cache.put(hash, node);
self.pings.put(node, now);
Some(ping) Some(ping)
} }
} }
@ -281,10 +298,18 @@ mod tests {
assert!(ping.verify()); assert!(ping.verify());
assert!(ping.sanitize().is_ok()); assert!(ping.sanitize().is_ok());
let pong = Pong::new(&ping, &keypair).unwrap(); let pong = Pong::new(/*domain:*/ false, &ping, &keypair).unwrap();
assert!(pong.verify()); assert!(pong.verify());
assert!(pong.sanitize().is_ok()); assert!(pong.sanitize().is_ok());
assert_eq!(hash::hash(&ping.token), pong.hash); assert_eq!(hash::hash(&ping.token), pong.hash);
let pong = Pong::new(/*domian:*/ true, &ping, &keypair).unwrap();
assert!(pong.verify());
assert!(pong.sanitize().is_ok());
assert_eq!(
hash::hashv(&[PING_PONG_HASH_PREFIX, &ping.token]),
pong.hash
);
} }
#[test] #[test]
@ -339,7 +364,10 @@ mod tests {
assert!(ping.is_none()); assert!(ping.is_none());
} }
Some(ping) => { Some(ping) => {
let pong = Pong::new(ping, keypair).unwrap(); let domain = rng.gen_ratio(1, 2);
let pong = Pong::new(domain, ping, keypair).unwrap();
assert!(cache.add(&pong, *socket, now));
let pong = Pong::new(!domain, ping, keypair).unwrap();
assert!(cache.add(&pong, *socket, now)); assert!(cache.add(&pong, *socket, now));
} }
} }