Add fn to push IncrementalSnapshotHashes to cluster via gossip (#20395)
This commit is contained in:
parent
4e65487d2f
commit
1fcfbfccbb
|
@ -115,6 +115,10 @@ const DUPLICATE_SHRED_MAX_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - 115;
|
||||||
/// PACKET_DATA_SIZE.
|
/// PACKET_DATA_SIZE.
|
||||||
// TODO: Update this to 26 once payload sizes are upgraded across fleet.
|
// TODO: Update this to 26 once payload sizes are upgraded across fleet.
|
||||||
pub const MAX_SNAPSHOT_HASHES: usize = 16;
|
pub const MAX_SNAPSHOT_HASHES: usize = 16;
|
||||||
|
/// Maximum number of hashes in IncrementalSnapshotHashes a node publishes
|
||||||
|
/// such that the serialized size of the push/pull message stays below
|
||||||
|
/// PACKET_DATA_SIZE.
|
||||||
|
pub const MAX_INCREMENTAL_SNAPSHOT_HASHES: usize = 25;
|
||||||
/// Maximum number of origin nodes that a PruneData may contain, such that the
|
/// Maximum number of origin nodes that a PruneData may contain, such that the
|
||||||
/// serialized size of the PruneMessage stays below PACKET_DATA_SIZE.
|
/// serialized size of the PruneMessage stays below PACKET_DATA_SIZE.
|
||||||
const MAX_PRUNE_DATA_NODES: usize = 32;
|
const MAX_PRUNE_DATA_NODES: usize = 32;
|
||||||
|
@ -140,6 +144,7 @@ pub enum ClusterInfoError {
|
||||||
NoLeader,
|
NoLeader,
|
||||||
BadContactInfo,
|
BadContactInfo,
|
||||||
BadGossipAddress,
|
BadGossipAddress,
|
||||||
|
TooManyIncrementalSnapshotHashes,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ClusterInfo {
|
pub struct ClusterInfo {
|
||||||
|
@ -950,6 +955,26 @@ impl ClusterInfo {
|
||||||
self.push_message(CrdsValue::new_signed(message, &self.keypair()));
|
self.push_message(CrdsValue::new_signed(message, &self.keypair()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn push_incremental_snapshot_hashes(
|
||||||
|
&self,
|
||||||
|
base: (Slot, Hash),
|
||||||
|
hashes: Vec<(Slot, Hash)>,
|
||||||
|
) -> Result<(), ClusterInfoError> {
|
||||||
|
if hashes.len() > MAX_INCREMENTAL_SNAPSHOT_HASHES {
|
||||||
|
return Err(ClusterInfoError::TooManyIncrementalSnapshotHashes);
|
||||||
|
}
|
||||||
|
|
||||||
|
let message = CrdsData::IncrementalSnapshotHashes(IncrementalSnapshotHashes {
|
||||||
|
from: self.id(),
|
||||||
|
base,
|
||||||
|
hashes,
|
||||||
|
wallclock: timestamp(),
|
||||||
|
});
|
||||||
|
self.push_message(CrdsValue::new_signed(message, &self.keypair()));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn push_vote_at_index(&self, vote: Transaction, vote_index: u8) {
|
pub fn push_vote_at_index(&self, vote: Transaction, vote_index: u8) {
|
||||||
assert!((vote_index as usize) < MAX_LOCKOUT_HISTORY);
|
assert!((vote_index as usize) < MAX_LOCKOUT_HISTORY);
|
||||||
let self_pubkey = self.id();
|
let self_pubkey = self.id();
|
||||||
|
@ -3214,6 +3239,42 @@ mod tests {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_max_incremental_snapshot_hashes_with_push_messages() {
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let incremental_snapshot_hashes = IncrementalSnapshotHashes {
|
||||||
|
from: Pubkey::new_unique(),
|
||||||
|
base: (Slot::default(), Hash::default()),
|
||||||
|
hashes: vec![(Slot::default(), Hash::default()); MAX_INCREMENTAL_SNAPSHOT_HASHES],
|
||||||
|
wallclock: timestamp(),
|
||||||
|
};
|
||||||
|
let crds_value = CrdsValue::new_signed(
|
||||||
|
CrdsData::IncrementalSnapshotHashes(incremental_snapshot_hashes),
|
||||||
|
&Keypair::new(),
|
||||||
|
);
|
||||||
|
let message = Protocol::PushMessage(Pubkey::new_unique(), vec![crds_value]);
|
||||||
|
let socket = new_rand_socket_addr(&mut rng);
|
||||||
|
assert!(Packet::from_data(Some(&socket), message).is_ok());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_max_incremental_snapshot_hashes_with_pull_responses() {
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let incremental_snapshot_hashes = IncrementalSnapshotHashes {
|
||||||
|
from: Pubkey::new_unique(),
|
||||||
|
base: (Slot::default(), Hash::default()),
|
||||||
|
hashes: vec![(Slot::default(), Hash::default()); MAX_INCREMENTAL_SNAPSHOT_HASHES],
|
||||||
|
wallclock: timestamp(),
|
||||||
|
};
|
||||||
|
let crds_value = CrdsValue::new_signed(
|
||||||
|
CrdsData::IncrementalSnapshotHashes(incremental_snapshot_hashes),
|
||||||
|
&Keypair::new(),
|
||||||
|
);
|
||||||
|
let response = Protocol::PullResponse(Pubkey::new_unique(), vec![crds_value]);
|
||||||
|
let socket = new_rand_socket_addr(&mut rng);
|
||||||
|
assert!(Packet::from_data(Some(&socket), response).is_ok());
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_max_prune_data_pubkeys() {
|
fn test_max_prune_data_pubkeys() {
|
||||||
let mut rng = rand::thread_rng();
|
let mut rng = rand::thread_rng();
|
||||||
|
|
Loading…
Reference in New Issue