implements ping-pong packets between nodes (#12794)

https://hackerone.com/reports/991106

> It’s possible to use UDP gossip protocol to amplify DDoS attacks. An attacker
> can spoof IP address in UDP packet when sending PullRequest to the node.
> There's no any validation if provided source IP address is not spoofed and
> the node can send much larger PullResponse to victim's IP. As I checked,
> PullRequest is about 290 bytes, while PullResponse is about 10 kB. It means
> that amplification is about 34x. This way an attacker can easily perform DDoS
> attack both on Solana node and third-party server.
>
> To prevent it, need for example to implement ping-pong mechanism similar as
> in Ethereum: Before accepting requests from remote client needs to validate
> his IP. Local node sends Ping packet to the remote node and it needs to reply
> with Pong packet that contains hash of matching Ping packet. Content of Ping
> packet is unpredictable. If hash from Pong packet matches, local node can
> remember IP where Ping packet was sent as correct and allow further
> communication.
>
> More info:
> https://github.com/ethereum/devp2p/blob/master/discv4.md#endpoint-proof
> https://github.com/ethereum/devp2p/blob/master/discv4.md#wire-protocol

The commit adds a PingCache, which maintains records of remote nodes
which have returned a valid response to a ping message, and on-the-fly
ping messages pending a pong response from the remote node.

When handling pull-requests, those from addresses which have not passed
the ping-pong check are filtered out, and additionally ping packets are
added for addresses which need to be (re)verified.
This commit is contained in:
behzad nouri 2020-10-28 17:03:02 +00:00 committed by GitHub
parent 49e11e1f9c
commit ae91270961
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 697 additions and 50 deletions

11
Cargo.lock generated
View File

@ -1374,6 +1374,7 @@ version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91b62f79061a0bc2e046024cb7ba44b08419ed238ecbd9adbd787434b9e8c25" checksum = "e91b62f79061a0bc2e046024cb7ba44b08419ed238ecbd9adbd787434b9e8c25"
dependencies = [ dependencies = [
"ahash",
"autocfg 1.0.0", "autocfg 1.0.0",
] ]
@ -1991,6 +1992,15 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "lru"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "111b945ac72ec09eb7bc62a0fbdc3cc6e80555a7245f52a69d3921a75b53b153"
dependencies = [
"hashbrown",
]
[[package]] [[package]]
name = "matches" name = "matches"
version = "0.1.8" version = "0.1.8"
@ -3805,6 +3815,7 @@ dependencies = [
"jsonrpc-pubsub", "jsonrpc-pubsub",
"jsonrpc-ws-server", "jsonrpc-ws-server",
"log 0.4.8", "log 0.4.8",
"lru",
"matches", "matches",
"num-traits", "num-traits",
"num_cpus", "num_cpus",

View File

@ -34,6 +34,7 @@ jsonrpc-http-server = "15.0.0"
jsonrpc-pubsub = "15.0.0" jsonrpc-pubsub = "15.0.0"
jsonrpc-ws-server = "15.0.0" jsonrpc-ws-server = "15.0.0"
log = "0.4.8" log = "0.4.8"
lru = "0.6.0"
num_cpus = "1.13.0" num_cpus = "1.13.0"
num-traits = "0.2" num-traits = "0.2"
rand = "0.7.0" rand = "0.7.0"

View File

@ -23,12 +23,13 @@ use crate::{
}, },
data_budget::DataBudget, data_budget::DataBudget,
epoch_slots::EpochSlots, epoch_slots::EpochSlots,
ping_pong::{self, PingCache, Pong},
result::{Error, Result}, result::{Error, Result},
weighted_shuffle::weighted_shuffle, weighted_shuffle::weighted_shuffle,
}; };
use rand::distributions::{Distribution, WeightedIndex}; use rand::distributions::{Distribution, WeightedIndex};
use rand::SeedableRng; use rand::{CryptoRng, Rng, SeedableRng};
use rand_chacha::ChaChaRng; use rand_chacha::ChaChaRng;
use solana_sdk::sanitize::{Sanitize, SanitizeError}; use solana_sdk::sanitize::{Sanitize, SanitizeError};
@ -97,6 +98,10 @@ const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE;
/// Keep the number of snapshot hashes a node publishes under MAX_PROTOCOL_PAYLOAD_SIZE /// Keep the number of snapshot hashes a node publishes under MAX_PROTOCOL_PAYLOAD_SIZE
pub const MAX_SNAPSHOT_HASHES: usize = 16; pub const MAX_SNAPSHOT_HASHES: usize = 16;
/// Number of bytes in the randomly generated token sent with ping messages.
const GOSSIP_PING_TOKEN_SIZE: usize = 32;
const GOSSIP_PING_CACHE_CAPACITY: usize = 16384;
const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(640);
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
pub enum ClusterInfoError { pub enum ClusterInfoError {
@ -226,6 +231,7 @@ struct GossipStats {
prune_received_cache: Counter, prune_received_cache: Counter,
prune_message_count: Counter, prune_message_count: Counter,
prune_message_len: Counter, prune_message_len: Counter,
pull_request_ping_pong_check_failed_count: Counter,
purge: Counter, purge: Counter,
epoch_slots_lookup: Counter, epoch_slots_lookup: Counter,
epoch_slots_push: Counter, epoch_slots_push: Counter,
@ -251,6 +257,7 @@ pub struct ClusterInfo {
entrypoint: RwLock<Option<ContactInfo>>, entrypoint: RwLock<Option<ContactInfo>>,
outbound_budget: DataBudget, outbound_budget: DataBudget,
my_contact_info: RwLock<ContactInfo>, my_contact_info: RwLock<ContactInfo>,
ping_cache: RwLock<PingCache>,
id: Pubkey, id: Pubkey,
stats: GossipStats, stats: GossipStats,
socket: UdpSocket, socket: UdpSocket,
@ -355,8 +362,10 @@ pub fn make_accounts_hashes_message(
Some(CrdsValue::new_signed(message, keypair)) Some(CrdsValue::new_signed(message, keypair))
} }
type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;
// TODO These messages should go through the gpu pipeline for spam filtering // TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "21UweZ4WXK9RHypF97D1rEJQ4C8Bh4pw52SBSNAKxJvW")] #[frozen_abi(digest = "3jHXixLRv6fuCykW47hBZSwFuwDjbZShR73GVQB6TjGr")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
enum Protocol { enum Protocol {
@ -365,6 +374,8 @@ enum Protocol {
PullResponse(Pubkey, Vec<CrdsValue>), PullResponse(Pubkey, Vec<CrdsValue>),
PushMessage(Pubkey, Vec<CrdsValue>), PushMessage(Pubkey, Vec<CrdsValue>),
PruneMessage(Pubkey, PruneData), PruneMessage(Pubkey, PruneData),
PingMessage(Ping),
PongMessage(Pong),
} }
impl Protocol { impl Protocol {
@ -416,6 +427,22 @@ impl Protocol {
None None
} }
} }
Protocol::PingMessage(ref ping) => {
if ping.verify() {
Some(self)
} else {
inc_new_counter_info!("cluster_info-gossip_ping_msg_verify_fail", 1);
None
}
}
Protocol::PongMessage(ref pong) => {
if pong.verify() {
Some(self)
} else {
inc_new_counter_info!("cluster_info-gossip_pong_msg_verify_fail", 1);
None
}
}
} }
} }
} }
@ -430,6 +457,8 @@ impl Sanitize for Protocol {
Protocol::PullResponse(_, val) => val.sanitize(), Protocol::PullResponse(_, val) => val.sanitize(),
Protocol::PushMessage(_, val) => val.sanitize(), Protocol::PushMessage(_, val) => val.sanitize(),
Protocol::PruneMessage(_, val) => val.sanitize(), Protocol::PruneMessage(_, val) => val.sanitize(),
Protocol::PingMessage(ping) => ping.sanitize(),
Protocol::PongMessage(pong) => pong.sanitize(),
} }
} }
} }
@ -459,6 +488,10 @@ impl ClusterInfo {
entrypoint: RwLock::new(None), entrypoint: RwLock::new(None),
outbound_budget: DataBudget::default(), outbound_budget: DataBudget::default(),
my_contact_info: RwLock::new(contact_info), my_contact_info: RwLock::new(contact_info),
ping_cache: RwLock::new(PingCache::new(
GOSSIP_PING_CACHE_TTL,
GOSSIP_PING_CACHE_CAPACITY,
)),
id, id,
stats: GossipStats::default(), stats: GossipStats::default(),
socket: UdpSocket::bind("0.0.0.0:0").unwrap(), socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
@ -486,6 +519,7 @@ impl ClusterInfo {
entrypoint: RwLock::new(self.entrypoint.read().unwrap().clone()), entrypoint: RwLock::new(self.entrypoint.read().unwrap().clone()),
outbound_budget: self.outbound_budget.clone_non_atomic(), outbound_budget: self.outbound_budget.clone_non_atomic(),
my_contact_info: RwLock::new(my_contact_info), my_contact_info: RwLock::new(my_contact_info),
ping_cache: RwLock::new(self.ping_cache.read().unwrap().mock_clone()),
id: *new_id, id: *new_id,
stats: GossipStats::default(), stats: GossipStats::default(),
socket: UdpSocket::bind("0.0.0.0:0").unwrap(), socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
@ -1790,6 +1824,8 @@ impl ClusterInfo {
.read() .read()
.unwrap() .unwrap()
.make_timeouts(&stakes, epoch_time_ms); .make_timeouts(&stakes, epoch_time_ms);
let mut ping_messages = vec![];
let mut pong_messages = vec![];
let mut pull_responses = HashMap::new(); let mut pull_responses = HashMap::new();
for (from_addr, packet) in packets { for (from_addr, packet) in packets {
match packet { match packet {
@ -1865,9 +1901,15 @@ impl ClusterInfo {
("prune_message", (allocated.get() - start) as i64, i64), ("prune_message", (allocated.get() - start) as i64, i64),
); );
} }
Protocol::PingMessage(ping) => ping_messages.push((ping, from_addr)),
Protocol::PongMessage(pong) => pong_messages.push((pong, from_addr)),
} }
} }
if let Some(response) = self.handle_ping_messages(ping_messages, recycler) {
let _ = response_sender.send(response);
}
self.handle_pong_messages(pong_messages, Instant::now());
for (from, data) in pull_responses { for (from, data) in pull_responses {
self.handle_pull_response(&from, data, &timeouts); self.handle_pull_response(&from, data, &timeouts);
} }
@ -1878,7 +1920,7 @@ impl ClusterInfo {
.pull_requests_count .pull_requests_count
.add_relaxed(gossip_pull_data.len() as u64); .add_relaxed(gossip_pull_data.len() as u64);
let rsp = self.handle_pull_requests(recycler, gossip_pull_data, stakes, feature_set); let rsp = self.handle_pull_requests(recycler, gossip_pull_data, stakes, feature_set);
if let Some(rsp) = rsp { if !rsp.is_empty() {
let _ignore_disconnect = response_sender.send(rsp); let _ignore_disconnect = response_sender.send(rsp);
} }
} }
@ -1898,6 +1940,49 @@ impl ClusterInfo {
}); });
} }
// Returns a predicate checking if the pull request is from a valid
// address, and if the address have responded to a ping request. Also
// appends ping packets for the addresses which need to be (re)verified.
fn check_pull_request<'a, R>(
&'a self,
now: Instant,
mut rng: &'a mut R,
packets: &'a mut Packets,
feature_set: Option<&FeatureSet>,
) -> impl FnMut(&PullData) -> bool + 'a
where
R: Rng + CryptoRng,
{
let check_enabled = matches!(feature_set, Some(feature_set) if
feature_set.is_active(&feature_set::pull_request_ping_pong_check::id()));
let mut cache = HashMap::<(Pubkey, SocketAddr), bool>::new();
let mut pingf = move || Ping::new_rand(&mut rng, &self.keypair).ok();
let mut ping_cache = self.ping_cache.write().unwrap();
let mut hard_check = move |node| {
let (check, ping) = ping_cache.check(now, node, &mut pingf);
if let Some(ping) = ping {
let ping = Protocol::PingMessage(ping);
let ping = Packet::from_data(&node.1, ping);
packets.packets.push(ping);
}
if !check {
self.stats
.pull_request_ping_pong_check_failed_count
.add_relaxed(1)
}
check || !check_enabled
};
// Because pull-responses are sent back to packet.meta.addr() of
// incoming pull-requests, pings are also sent to request.from_addr (as
// opposed to caller.gossip address).
move |request| {
ContactInfo::is_valid_address(&request.from_addr) && {
let node = (request.caller.pubkey(), request.from_addr);
*cache.entry(node).or_insert_with(|| hard_check(node))
}
}
}
// Pull requests take an incoming bloom filter of contained entries from a node // Pull requests take an incoming bloom filter of contained entries from a node
// and tries to send back to them the values it detects are missing. // and tries to send back to them the values it detects are missing.
fn handle_pull_requests( fn handle_pull_requests(
@ -1906,21 +1991,22 @@ impl ClusterInfo {
requests: Vec<PullData>, requests: Vec<PullData>,
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
feature_set: Option<&FeatureSet>, feature_set: Option<&FeatureSet>,
) -> Option<Packets> { ) -> Packets {
if matches!(feature_set, Some(feature_set) if
feature_set.is_active(&feature_set::pull_request_ping_pong_check::id()))
{
// TODO: add ping-pong check on pull-request addresses.
}
// split the requests into addrs and filters
let mut caller_and_filters = vec![];
let mut addrs = vec![];
let mut time = Measure::start("handle_pull_requests"); let mut time = Measure::start("handle_pull_requests");
self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests)
.process_pull_requests(requests.iter().map(|r| r.caller.clone()), timestamp());
self.update_data_budget(stakes.len()); self.update_data_budget(stakes.len());
for pull_data in requests { let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests");
caller_and_filters.push((pull_data.caller, pull_data.filter)); let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = {
addrs.push(pull_data.from_addr); let mut rng = rand::thread_rng();
} let check_pull_request =
self.check_pull_request(Instant::now(), &mut rng, &mut packets, feature_set);
requests
.into_iter()
.filter(check_pull_request)
.map(|r| ((r.caller, r.filter), r.from_addr))
.unzip()
};
let now = timestamp(); let now = timestamp();
let self_id = self.id(); let self_id = self.id();
@ -1931,27 +2017,14 @@ impl ClusterInfo {
) )
.generate_pull_responses(&caller_and_filters, now); .generate_pull_responses(&caller_and_filters, now);
self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests)
.process_pull_requests(caller_and_filters, now);
// Filter bad to addresses
let pull_responses: Vec<_> = pull_responses let pull_responses: Vec<_> = pull_responses
.into_iter() .into_iter()
.zip(addrs.into_iter()) .zip(addrs.into_iter())
.filter_map(|(responses, from_addr)| { .filter(|(response, _)| !response.is_empty())
if !from_addr.ip().is_unspecified()
&& from_addr.port() != 0
&& !responses.is_empty()
{
Some((responses, from_addr))
} else {
None
}
})
.collect(); .collect();
if pull_responses.is_empty() { if pull_responses.is_empty() {
return None; return packets;
} }
let mut stats: Vec<_> = pull_responses let mut stats: Vec<_> = pull_responses
@ -1983,7 +2056,6 @@ impl ClusterInfo {
let rng = &mut ChaChaRng::from_seed(seed); let rng = &mut ChaChaRng::from_seed(seed);
let weighted_index = WeightedIndex::new(weights).unwrap(); let weighted_index = WeightedIndex::new(weights).unwrap();
let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests");
let mut total_bytes = 0; let mut total_bytes = 0;
let mut sent = HashSet::new(); let mut sent = HashSet::new();
while sent.len() < stats.len() { while sent.len() < stats.len() {
@ -2018,10 +2090,7 @@ impl ClusterInfo {
stats.len(), stats.len(),
total_bytes total_bytes
); );
if packets.is_empty() { packets
return None;
}
Some(packets)
} }
// Returns (failed, timeout, success) // Returns (failed, timeout, success)
@ -2108,6 +2177,41 @@ impl ClusterInfo {
} }
} }
fn handle_ping_messages<I>(&self, pings: I, recycler: &PacketsRecycler) -> Option<Packets>
where
I: IntoIterator<Item = (Ping, SocketAddr)>,
{
let packets: Vec<_> = pings
.into_iter()
.filter_map(|(ping, addr)| {
let pong = Pong::new(&ping, &self.keypair).ok()?;
let pong = Protocol::PongMessage(pong);
let packet = Packet::from_data(&addr, pong);
Some(packet)
})
.collect();
if packets.is_empty() {
None
} else {
let packets =
Packets::new_with_recycler_data(recycler, "handle_ping_messages", packets);
Some(packets)
}
}
fn handle_pong_messages<I>(&self, pongs: I, now: Instant)
where
I: IntoIterator<Item = (Pong, SocketAddr)>,
{
let mut pongs = pongs.into_iter().peekable();
if pongs.peek().is_some() {
let mut ping_cache = self.ping_cache.write().unwrap();
for (pong, addr) in pongs {
ping_cache.add(&pong, addr, now);
}
}
}
fn handle_push_message( fn handle_push_message(
&self, &self,
recycler: &PacketsRecycler, recycler: &PacketsRecycler,
@ -2412,6 +2516,11 @@ impl ClusterInfo {
self.stats.process_pull_requests.clear(), self.stats.process_pull_requests.clear(),
i64 i64
), ),
(
"pull_request_ping_pong_check_failed_count",
self.stats.pull_request_ping_pong_check_failed_count.clear(),
i64
),
( (
"generate_pull_responses", "generate_pull_responses",
self.stats.generate_pull_responses.clear(), self.stats.generate_pull_responses.clear(),
@ -2796,12 +2905,14 @@ pub fn stake_weight_peers<S: std::hash::BuildHasher>(
mod tests { mod tests {
use super::*; use super::*;
use crate::crds_value::{CrdsValue, CrdsValueLabel, Vote as CrdsVote}; use crate::crds_value::{CrdsValue, CrdsValueLabel, Vote as CrdsVote};
use itertools::izip;
use rayon::prelude::*; use rayon::prelude::*;
use solana_perf::test_tx::test_tx; use solana_perf::test_tx::test_tx;
use solana_sdk::signature::{Keypair, Signer}; use solana_sdk::signature::{Keypair, Signer};
use solana_vote_program::{vote_instruction, vote_state::Vote}; use solana_vote_program::{vote_instruction, vote_state::Vote};
use std::collections::HashSet; use std::collections::HashSet;
use std::net::{IpAddr, Ipv4Addr}; use std::iter::repeat_with;
use std::net::{IpAddr, Ipv4Addr, SocketAddrV4};
use std::sync::Arc; use std::sync::Arc;
#[test] #[test]
@ -2843,6 +2954,117 @@ mod tests {
); );
} }
fn new_rand_remote_node<R>(rng: &mut R) -> (Keypair, SocketAddr)
where
R: Rng,
{
let keypair = Keypair::new();
let socket = SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()),
rng.gen(),
));
(keypair, socket)
}
#[test]
fn test_handle_pong_messages() {
let now = Instant::now();
let mut rng = rand::thread_rng();
let this_node = Arc::new(Keypair::new());
let cluster_info = ClusterInfo::new(
ContactInfo::new_localhost(&this_node.pubkey(), timestamp()),
this_node.clone(),
);
let remote_nodes: Vec<(Keypair, SocketAddr)> =
repeat_with(|| new_rand_remote_node(&mut rng))
.take(128)
.collect();
let pings: Vec<_> = {
let mut ping_cache = cluster_info.ping_cache.write().unwrap();
let mut pingf = || Ping::new_rand(&mut rng, &this_node).ok();
remote_nodes
.iter()
.map(|(keypair, socket)| {
let node = (keypair.pubkey(), *socket);
let (check, ping) = ping_cache.check(now, node, &mut pingf);
// Assert that initially remote nodes will not pass the
// ping/pong check.
assert!(!check);
ping.unwrap()
})
.collect()
};
let pongs: Vec<(Pong, SocketAddr)> = pings
.iter()
.zip(&remote_nodes)
.map(|(ping, (keypair, socket))| (Pong::new(ping, keypair).unwrap(), *socket))
.collect();
let now = now + Duration::from_millis(1);
cluster_info.handle_pong_messages(pongs, now);
// Assert that remote nodes now pass the ping/pong check.
{
let mut ping_cache = cluster_info.ping_cache.write().unwrap();
for (keypair, socket) in &remote_nodes {
let node = (keypair.pubkey(), *socket);
let (check, _) = ping_cache.check(now, node, || -> Option<Ping> { None });
assert!(check);
}
}
// Assert that a new random remote node still will not pass the check.
{
let mut ping_cache = cluster_info.ping_cache.write().unwrap();
let (keypair, socket) = new_rand_remote_node(&mut rng);
let node = (keypair.pubkey(), socket);
let (check, _) = ping_cache.check(now, node, || -> Option<Ping> { None });
assert!(!check);
}
}
#[test]
fn test_handle_ping_messages() {
let mut rng = rand::thread_rng();
let this_node = Arc::new(Keypair::new());
let cluster_info = ClusterInfo::new(
ContactInfo::new_localhost(&this_node.pubkey(), timestamp()),
this_node.clone(),
);
let remote_nodes: Vec<(Keypair, SocketAddr)> =
repeat_with(|| new_rand_remote_node(&mut rng))
.take(128)
.collect();
let pings: Vec<_> = remote_nodes
.iter()
.map(|(keypair, _)| Ping::new_rand(&mut rng, keypair).unwrap())
.collect();
let pongs: Vec<_> = pings
.iter()
.map(|ping| Pong::new(ping, &this_node).unwrap())
.collect();
let recycler = PacketsRecycler::default();
let packets = cluster_info
.handle_ping_messages(
pings
.into_iter()
.zip(remote_nodes.iter().map(|(_, socket)| *socket)),
&recycler,
)
.unwrap()
.packets;
assert_eq!(remote_nodes.len(), packets.len());
for (packet, (_, socket), pong) in izip!(
packets.into_iter(),
remote_nodes.into_iter(),
pongs.into_iter()
) {
assert_eq!(packet.meta.addr(), socket);
let bytes = serialize(&pong).unwrap();
match limited_deserialize(&packet.data[..packet.meta.size]).unwrap() {
Protocol::PongMessage(pong) => assert_eq!(serialize(&pong).unwrap(), bytes),
_ => panic!("invalid packet!"),
}
}
}
fn test_crds_values(pubkey: Pubkey) -> Vec<CrdsValue> { fn test_crds_values(pubkey: Pubkey) -> Vec<CrdsValue> {
let entrypoint = ContactInfo::new_localhost(&pubkey, timestamp()); let entrypoint = ContactInfo::new_localhost(&pubkey, timestamp());
let entrypoint_crdsvalue = CrdsValue::new_unsigned(CrdsData::ContactInfo(entrypoint)); let entrypoint_crdsvalue = CrdsValue::new_unsigned(CrdsData::ContactInfo(entrypoint));

View File

@ -174,9 +174,12 @@ impl CrdsGossip {
self.pull.mark_pull_request_creation_time(from, now) self.pull.mark_pull_request_creation_time(from, now)
} }
/// process a pull request and create a response /// process a pull request and create a response
pub fn process_pull_requests(&mut self, filters: Vec<(CrdsValue, CrdsFilter)>, now: u64) { pub fn process_pull_requests<I>(&mut self, callers: I, now: u64)
where
I: IntoIterator<Item = CrdsValue>,
{
self.pull self.pull
.process_pull_requests(&mut self.crds, filters, now); .process_pull_requests(&mut self.crds, callers, now);
} }
pub fn generate_pull_responses( pub fn generate_pull_responses(

View File

@ -273,20 +273,18 @@ impl CrdsGossipPull {
} }
/// process a pull request /// process a pull request
pub fn process_pull_requests( pub fn process_pull_requests<I>(&mut self, crds: &mut Crds, callers: I, now: u64)
&mut self, where
crds: &mut Crds, I: IntoIterator<Item = CrdsValue>,
requests: Vec<(CrdsValue, CrdsFilter)>, {
now: u64, for caller in callers {
) {
requests.into_iter().for_each(|(caller, _)| {
let key = caller.label().pubkey(); let key = caller.label().pubkey();
if let Ok(Some(val)) = crds.insert(caller, now) { if let Ok(Some(val)) = crds.insert(caller, now) {
self.purged_values self.purged_values
.push_back((val.value_hash, val.local_timestamp)); .push_back((val.value_hash, val.local_timestamp));
} }
crds.update_record_timestamp(&key, now); crds.update_record_timestamp(&key, now);
}); }
} }
/// Create gossip responses to pull requests /// Create gossip responses to pull requests
@ -1087,7 +1085,11 @@ mod test {
let (_, filters, caller) = req.unwrap(); let (_, filters, caller) = req.unwrap();
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let rsp = dest.generate_pull_responses(&dest_crds, &filters, 0); let rsp = dest.generate_pull_responses(&dest_crds, &filters, 0);
dest.process_pull_requests(&mut dest_crds, filters, 1); dest.process_pull_requests(
&mut dest_crds,
filters.into_iter().map(|(caller, _)| caller),
1,
);
assert!(rsp.iter().all(|rsp| rsp.is_empty())); assert!(rsp.iter().all(|rsp| rsp.is_empty()));
assert!(dest_crds.lookup(&caller.label()).is_some()); assert!(dest_crds.lookup(&caller.label()).is_some());
assert_eq!( assert_eq!(
@ -1161,7 +1163,11 @@ mod test {
let (_, filters, caller) = req.unwrap(); let (_, filters, caller) = req.unwrap();
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let mut rsp = dest.generate_pull_responses(&dest_crds, &filters, 0); let mut rsp = dest.generate_pull_responses(&dest_crds, &filters, 0);
dest.process_pull_requests(&mut dest_crds, filters, 0); dest.process_pull_requests(
&mut dest_crds,
filters.into_iter().map(|(caller, _)| caller),
0,
);
// if there is a false positive this is empty // if there is a false positive this is empty
// prob should be around 0.1 per iteration // prob should be around 0.1 per iteration
if rsp.is_empty() { if rsp.is_empty() {

View File

@ -43,6 +43,7 @@ pub mod local_vote_signer_service;
pub mod non_circulating_supply; pub mod non_circulating_supply;
pub mod optimistic_confirmation_verifier; pub mod optimistic_confirmation_verifier;
pub mod optimistically_confirmed_bank_tracker; pub mod optimistically_confirmed_bank_tracker;
pub mod ping_pong;
pub mod poh_recorder; pub mod poh_recorder;
pub mod poh_service; pub mod poh_service;
pub mod progress_map; pub mod progress_map;

400
core/src/ping_pong.rs Normal file
View File

@ -0,0 +1,400 @@
use bincode::{serialize, Error};
use lru::LruCache;
use rand::{AsByteSliceMut, CryptoRng, Rng};
use serde::Serialize;
use solana_sdk::hash::{self, Hash};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::sanitize::{Sanitize, SanitizeError};
use solana_sdk::signature::{Keypair, Signable, Signature, Signer};
use std::borrow::Cow;
use std::net::SocketAddr;
use std::time::{Duration, Instant};
#[derive(AbiExample, Debug, Deserialize, Serialize)]
pub struct Ping<T> {
from: Pubkey,
token: T,
signature: Signature,
}
#[derive(AbiExample, Debug, Deserialize, Serialize)]
pub struct Pong {
from: Pubkey,
hash: Hash, // Hash of received ping token.
signature: Signature,
}
/// Maintains records of remote nodes which have returned a valid response to a
/// ping message, and on-the-fly ping messages pending a pong response from the
/// remote node.
pub struct PingCache {
// Time-to-live of received pong messages.
ttl: Duration,
// Timestamp of last ping message sent to a remote node.
// Used to rate limit pings to remote nodes.
pings: LruCache<(Pubkey, SocketAddr), Instant>,
// Verified pong responses from remote nodes.
pongs: LruCache<(Pubkey, SocketAddr), Instant>,
// Hash of ping tokens sent out to remote nodes,
// pending a pong response back.
pending_cache: LruCache<Hash, (Pubkey, SocketAddr)>,
}
impl<T: Serialize> Ping<T> {
pub fn new(token: T, keypair: &Keypair) -> Result<Self, Error> {
let signature = keypair.sign_message(&serialize(&token)?);
let ping = Ping {
from: keypair.pubkey(),
token,
signature,
};
Ok(ping)
}
}
impl<T> Ping<T>
where
T: Serialize + AsByteSliceMut + Default,
{
pub fn new_rand<R>(rng: &mut R, keypair: &Keypair) -> Result<Self, Error>
where
R: Rng + CryptoRng,
{
let mut token = T::default();
rng.fill(&mut token);
Ping::new(token, keypair)
}
}
impl<T> Sanitize for Ping<T> {
fn sanitize(&self) -> Result<(), SanitizeError> {
self.from.sanitize()?;
// TODO Add self.token.sanitize()?; when rust's
// specialization feature becomes stable.
self.signature.sanitize()
}
}
impl<T: Serialize> Signable for Ping<T> {
fn pubkey(&self) -> Pubkey {
self.from
}
fn signable_data(&self) -> Cow<[u8]> {
Cow::Owned(serialize(&self.token).unwrap())
}
fn get_signature(&self) -> Signature {
self.signature
}
fn set_signature(&mut self, signature: Signature) {
self.signature = signature;
}
}
impl Pong {
pub fn new<T: Serialize>(ping: &Ping<T>, keypair: &Keypair) -> Result<Self, Error> {
let hash = hash::hash(&serialize(&ping.token)?);
let pong = Pong {
from: keypair.pubkey(),
hash,
signature: keypair.sign_message(hash.as_ref()),
};
Ok(pong)
}
}
impl Sanitize for Pong {
fn sanitize(&self) -> Result<(), SanitizeError> {
self.from.sanitize()?;
self.hash.sanitize()?;
self.signature.sanitize()
}
}
impl Signable for Pong {
fn pubkey(&self) -> Pubkey {
self.from
}
fn signable_data(&self) -> Cow<[u8]> {
Cow::Owned(self.hash.as_ref().into())
}
fn get_signature(&self) -> Signature {
self.signature
}
fn set_signature(&mut self, signature: Signature) {
self.signature = signature;
}
}
impl PingCache {
pub fn new(ttl: Duration, cap: usize) -> Self {
Self {
ttl,
pings: LruCache::new(cap),
pongs: LruCache::new(cap),
pending_cache: LruCache::new(cap),
}
}
/// Checks if the pong hash, pubkey and socket match a ping message sent
/// out previously. If so records current timestamp for the remote node and
/// returns true.
/// Note: Does not verify the signature.
pub fn add(&mut self, pong: &Pong, socket: SocketAddr, now: Instant) -> bool {
let node = (pong.pubkey(), socket);
match self.pending_cache.peek(&pong.hash) {
Some(value) if *value == node => {
self.pings.pop(&node);
self.pongs.put(node, now);
self.pending_cache.pop(&pong.hash);
true
}
_ => false,
}
}
/// Checks if the remote node has been pinged recently. If not, calls the
/// given function to generates a new ping message, records current
/// timestamp and hash of ping token, and returns the ping message.
fn maybe_ping<T, F>(
&mut self,
now: Instant,
node: (Pubkey, SocketAddr),
mut pingf: F,
) -> Option<Ping<T>>
where
T: Serialize,
F: FnMut() -> Option<Ping<T>>,
{
// Rate limit consecutive pings sent to a remote node.
let delay = self.ttl / 64;
match self.pings.peek(&node) {
Some(t) if now.saturating_duration_since(*t) < delay => None,
_ => {
let ping = pingf()?;
let hash = hash::hash(&serialize(&ping.token).ok()?);
self.pings.put(node, now);
self.pending_cache.put(hash, node);
Some(ping)
}
}
}
/// Returns true if the remote node has responded to a ping message.
/// Removes expired pong messages. In order to extend verifications before
/// expiration, if the pong message is not too recent, and the node has not
/// been pinged recently, calls the given function to generates a new ping
/// message, records current timestamp and hash of ping token, and returns
/// the ping message.
/// Caller should verify if the socket address is valid. (e.g. by using
/// ContactInfo::is_valid_address).
pub fn check<T, F>(
&mut self,
now: Instant,
node: (Pubkey, SocketAddr),
pingf: F,
) -> (bool, Option<Ping<T>>)
where
T: Serialize,
F: FnMut() -> Option<Ping<T>>,
{
let (check, should_ping) = match self.pongs.get(&node) {
None => (false, true),
Some(t) => {
let age = now.saturating_duration_since(*t);
// Pop if the pong message has expired.
if age > self.ttl {
self.pongs.pop(&node);
}
// If the pong message is not too recent, generate a new ping
// message to extend remote node verification.
(true, age > self.ttl / 8)
}
};
let ping = if should_ping {
self.maybe_ping(now, node, pingf)
} else {
None
};
(check, ping)
}
// Only for tests and simulations.
pub(crate) fn mock_clone(&self) -> Self {
let mut clone = Self {
ttl: self.ttl,
pings: LruCache::new(self.pings.cap()),
pongs: LruCache::new(self.pongs.cap()),
pending_cache: LruCache::new(self.pending_cache.cap()),
};
for (k, v) in self.pongs.iter().rev() {
clone.pings.put(*k, *v);
}
for (k, v) in self.pongs.iter().rev() {
clone.pongs.put(*k, *v);
}
for (k, v) in self.pending_cache.iter().rev() {
clone.pending_cache.put(*k, *v);
}
clone
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashSet;
use std::iter::repeat_with;
use std::net::{Ipv4Addr, SocketAddrV4};
type Token = [u8; 32];
#[test]
fn test_ping_pong() {
let mut rng = rand::thread_rng();
let keypair = Keypair::new();
let ping = Ping::<Token>::new_rand(&mut rng, &keypair).unwrap();
assert!(ping.verify());
assert!(ping.sanitize().is_ok());
let pong = Pong::new(&ping, &keypair).unwrap();
assert!(pong.verify());
assert!(pong.sanitize().is_ok());
assert_eq!(hash::hash(&ping.token), pong.hash);
}
#[test]
fn test_ping_cache() {
let now = Instant::now();
let mut rng = rand::thread_rng();
let ttl = Duration::from_millis(256);
let mut cache = PingCache::new(ttl, /*cap=*/ 1000);
let this_node = Keypair::new();
let keypairs: Vec<_> = repeat_with(Keypair::new).take(8).collect();
let sockets: Vec<_> = repeat_with(|| {
SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()),
rng.gen(),
))
})
.take(8)
.collect();
let remote_nodes: Vec<(&Keypair, SocketAddr)> = repeat_with(|| {
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
let socket = sockets[rng.gen_range(0, sockets.len())];
(keypair, socket)
})
.take(128)
.collect();
// Initially all checks should fail. The first observation of each node
// should create a ping packet.
let mut seen_nodes = HashSet::<(Pubkey, SocketAddr)>::new();
let pings: Vec<Option<Ping<Token>>> = remote_nodes
.iter()
.map(|(keypair, socket)| {
let node = (keypair.pubkey(), *socket);
let pingf = || Ping::<Token>::new_rand(&mut rng, &this_node).ok();
let (check, ping) = cache.check(now, node, pingf);
assert!(!check);
assert_eq!(seen_nodes.insert(node), ping.is_some());
ping
})
.collect();
let now = now + Duration::from_millis(1);
let panic_ping = || -> Option<Ping<Token>> { panic!("this should not happen!") };
for ((keypair, socket), ping) in remote_nodes.iter().zip(&pings) {
match ping {
None => {
// Already have a recent ping packets for nodes, so no new
// ping packet will be generated.
let node = (keypair.pubkey(), *socket);
let (check, ping) = cache.check(now, node, panic_ping);
assert!(check);
assert!(ping.is_none());
}
Some(ping) => {
let pong = Pong::new(ping, keypair).unwrap();
assert!(cache.add(&pong, *socket, now));
}
}
}
let now = now + Duration::from_millis(1);
// All nodes now have a recent pong packet.
for (keypair, socket) in &remote_nodes {
let node = (keypair.pubkey(), *socket);
let (check, ping) = cache.check(now, node, panic_ping);
assert!(check);
assert!(ping.is_none());
}
let now = now + ttl / 8;
// All nodes still have a valid pong packet, but the cache will create
// a new ping packet to extend verification.
seen_nodes.clear();
for (keypair, socket) in &remote_nodes {
let node = (keypair.pubkey(), *socket);
let pingf = || Ping::<Token>::new_rand(&mut rng, &this_node).ok();
let (check, ping) = cache.check(now, node, pingf);
assert!(check);
assert_eq!(seen_nodes.insert(node), ping.is_some());
}
let now = now + Duration::from_millis(1);
// All nodes still have a valid pong packet, and a very recent ping
// packet pending response. So no new ping packet will be created.
for (keypair, socket) in &remote_nodes {
let node = (keypair.pubkey(), *socket);
let (check, ping) = cache.check(now, node, panic_ping);
assert!(check);
assert!(ping.is_none());
}
let now = now + ttl;
// Pong packets are still valid but expired. The first observation of
// each node will remove the pong packet from cache and create a new
// ping packet.
seen_nodes.clear();
for (keypair, socket) in &remote_nodes {
let node = (keypair.pubkey(), *socket);
let pingf = || Ping::<Token>::new_rand(&mut rng, &this_node).ok();
let (check, ping) = cache.check(now, node, pingf);
if seen_nodes.insert(node) {
assert!(check);
assert!(ping.is_some());
} else {
assert!(!check);
assert!(ping.is_none());
}
}
let now = now + Duration::from_millis(1);
// No valid pong packet in the cache. A recent ping packet already
// created, so no new one will be created.
for (keypair, socket) in &remote_nodes {
let node = (keypair.pubkey(), *socket);
let (check, ping) = cache.check(now, node, panic_ping);
assert!(!check);
assert!(ping.is_none());
}
let now = now + ttl / 64;
// No valid pong packet in the cache. Another ping packet will be
// created for the first observation of each node.
seen_nodes.clear();
for (keypair, socket) in &remote_nodes {
let node = (keypair.pubkey(), *socket);
let pingf = || Ping::<Token>::new_rand(&mut rng, &this_node).ok();
let (check, ping) = cache.check(now, node, pingf);
assert!(!check);
assert_eq!(seen_nodes.insert(node), ping.is_some());
}
}
}

View File

@ -462,7 +462,10 @@ fn network_run_pull(
.into_iter() .into_iter()
.flatten() .flatten()
.collect(); .collect();
node.lock().unwrap().process_pull_requests(filters, now); node.lock().unwrap().process_pull_requests(
filters.into_iter().map(|(caller, _)| caller),
now,
);
rsp rsp
}) })
.unwrap(); .unwrap();