Fix incorrectly signed CrdsValues (#6696)

This commit is contained in:
Sagar Dhawan 2019-11-03 10:07:51 -08:00 committed by GitHub
parent 9ea398416e
commit 568475e2db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 308 additions and 266 deletions

View File

@ -12,12 +12,13 @@
//! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes.
//!
//! Bank needs to provide an interface for us to query the stake weight
use crate::crds_value::CrdsValue;
use crate::{
contact_info::ContactInfo,
crds_gossip::CrdsGossip,
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
crds_value::{CrdsValue, CrdsValueLabel, EpochSlots, Vote},
crds_value::{CrdsData, CrdsValueLabel, EpochSlots, Vote},
packet::{to_shared_blob, Blob, Packet, SharedBlob},
repair_service::RepairType,
result::{Error, Result},
@ -198,8 +199,8 @@ impl ClusterInfo {
pub fn insert_self(&mut self, contact_info: ContactInfo) {
if self.id() == contact_info.id {
let mut value = CrdsValue::ContactInfo(contact_info.clone());
value.sign(&self.keypair);
let value =
CrdsValue::new_signed(CrdsData::ContactInfo(contact_info.clone()), &self.keypair);
let _ = self.gossip.crds.insert(value, timestamp());
}
}
@ -208,8 +209,7 @@ impl ClusterInfo {
let mut my_data = self.my_data();
let now = timestamp();
my_data.wallclock = now;
let mut entry = CrdsValue::ContactInfo(my_data);
entry.sign(&self.keypair);
let entry = CrdsValue::new_signed(CrdsData::ContactInfo(my_data), &self.keypair);
self.gossip.refresh_push_active_set(stakes);
self.gossip
.process_push_message(&self.id(), vec![entry], now);
@ -217,8 +217,7 @@ impl ClusterInfo {
// TODO kill insert_info, only used by tests
pub fn insert_info(&mut self, contact_info: ContactInfo) {
let mut value = CrdsValue::ContactInfo(contact_info);
value.sign(&self.keypair);
let value = CrdsValue::new_signed(CrdsData::ContactInfo(contact_info), &self.keypair);
let _ = self.gossip.crds.insert(value, timestamp());
}
@ -300,8 +299,10 @@ impl ClusterInfo {
pub fn push_epoch_slots(&mut self, id: Pubkey, root: u64, slots: BTreeSet<u64>) {
let now = timestamp();
let mut entry = CrdsValue::EpochSlots(EpochSlots::new(id, root, slots, now));
entry.sign(&self.keypair);
let entry = CrdsValue::new_signed(
CrdsData::EpochSlots(EpochSlots::new(id, root, slots, now)),
&self.keypair,
);
self.gossip
.process_push_message(&self.id(), vec![entry], now);
}
@ -309,8 +310,7 @@ impl ClusterInfo {
pub fn push_vote(&mut self, vote: Transaction) {
let now = timestamp();
let vote = Vote::new(&self.id(), vote, now);
let mut entry = CrdsValue::Vote(vote);
entry.sign(&self.keypair);
let entry = CrdsValue::new_signed(CrdsData::Vote(vote), &self.keypair);
self.gossip
.process_push_message(&self.id(), vec![entry], now);
}
@ -918,7 +918,7 @@ impl ClusterInfo {
.expect("unable to serialize default filter") as usize;
let protocol = Protocol::PullRequest(
CrdsFilter::default(),
CrdsValue::ContactInfo(ContactInfo::default()),
CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())),
);
let protocol_size =
serialized_size(&protocol).expect("unable to serialize gossip protocol") as usize;
@ -1164,9 +1164,7 @@ impl ClusterInfo {
1
);
} else if caller.contact_info().is_some() {
if caller.contact_info().unwrap().pubkey()
== me.read().unwrap().gossip.id
{
if caller.contact_info().unwrap().id == me.read().unwrap().gossip.id {
warn!("PullRequest ignored, I'm talking to myself");
inc_new_counter_debug!("cluster_info-window-request-loopback", 1);
} else {
@ -2387,7 +2385,8 @@ mod tests {
}
// now add this message back to the table and make sure after the next pull, the entrypoint is unset
let entrypoint_crdsvalue = CrdsValue::ContactInfo(entrypoint.clone());
let entrypoint_crdsvalue =
CrdsValue::new_unsigned(CrdsData::ContactInfo(entrypoint.clone()));
let cluster_info = Arc::new(RwLock::new(cluster_info));
ClusterInfo::handle_pull_response(
&cluster_info,
@ -2404,7 +2403,7 @@ mod tests {
#[test]
fn test_split_messages_small() {
let value = CrdsValue::ContactInfo(ContactInfo::default());
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
test_split_messages(value);
}
@ -2414,13 +2413,12 @@ mod tests {
for i in 0..128 {
btree_slots.insert(i);
}
let value = CrdsValue::EpochSlots(EpochSlots {
let value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots {
from: Pubkey::default(),
root: 0,
slots: btree_slots,
signature: Signature::default(),
wallclock: 0,
});
}));
test_split_messages(value);
}
@ -2444,7 +2442,7 @@ mod tests {
}
fn check_pull_request_size(filter: CrdsFilter) {
let value = CrdsValue::ContactInfo(ContactInfo::default());
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
let protocol = Protocol::PullRequest(filter, value.clone());
assert!(serialized_size(&protocol).unwrap() <= PACKET_DATA_SIZE as u64);
}

View File

@ -1,12 +1,9 @@
use bincode::serialize;
use solana_sdk::pubkey::Pubkey;
#[cfg(test)]
use solana_sdk::rpc_port;
#[cfg(test)]
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::signature::{Signable, Signature};
use solana_sdk::timing::timestamp;
use std::borrow::Cow;
use std::cmp::{Ord, Ordering, PartialEq, PartialOrd};
use std::net::{IpAddr, SocketAddr};
@ -14,8 +11,6 @@ use std::net::{IpAddr, SocketAddr};
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct ContactInfo {
pub id: Pubkey,
/// signature of this ContactInfo
pub signature: Signature,
/// gossip address
pub gossip: SocketAddr,
/// address to connect to for replication
@ -89,7 +84,6 @@ impl Default for ContactInfo {
rpc: socketaddr_any!(),
rpc_pubsub: socketaddr_any!(),
wallclock: 0,
signature: Signature::default(),
}
}
}
@ -111,7 +105,6 @@ impl ContactInfo {
) -> Self {
Self {
id: *id,
signature: Signature::default(),
gossip,
tvu,
tvu_forwards,
@ -242,51 +235,6 @@ impl ContactInfo {
}
}
impl Signable for ContactInfo {
fn pubkey(&self) -> Pubkey {
self.id
}
fn signable_data(&self) -> Cow<[u8]> {
#[derive(Serialize)]
struct SignData {
id: Pubkey,
gossip: SocketAddr,
tvu: SocketAddr,
tpu: SocketAddr,
tpu_forwards: SocketAddr,
repair: SocketAddr,
storage_addr: SocketAddr,
rpc: SocketAddr,
rpc_pubsub: SocketAddr,
wallclock: u64,
}
let me = self;
let data = SignData {
id: me.id,
gossip: me.gossip,
tvu: me.tvu,
tpu: me.tpu,
storage_addr: me.storage_addr,
tpu_forwards: me.tpu_forwards,
repair: me.repair,
rpc: me.rpc,
rpc_pubsub: me.rpc_pubsub,
wallclock: me.wallclock,
};
Cow::Owned(serialize(&data).expect("failed to serialize ContactInfo"))
}
fn get_signature(&self) -> Signature {
self.signature
}
fn set_signature(&mut self, signature: Signature) {
self.signature = signature
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -165,11 +165,12 @@ impl Crds {
mod test {
use super::*;
use crate::contact_info::ContactInfo;
use crate::crds_value::CrdsData;
#[test]
fn test_insert() {
let mut crds = Crds::default();
let val = CrdsValue::ContactInfo(ContactInfo::default());
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
assert_eq!(crds.insert(val.clone(), 0).ok(), Some(None));
assert_eq!(crds.table.len(), 1);
assert!(crds.table.contains_key(&val.label()));
@ -178,7 +179,7 @@ mod test {
#[test]
fn test_update_old() {
let mut crds = Crds::default();
let val = CrdsValue::ContactInfo(ContactInfo::default());
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
assert_eq!(crds.insert(val.clone(), 0), Ok(None));
assert_eq!(crds.insert(val.clone(), 1), Err(CrdsError::InsertFailed));
assert_eq!(crds.table[&val.label()].local_timestamp, 0);
@ -186,9 +187,15 @@ mod test {
#[test]
fn test_update_new() {
let mut crds = Crds::default();
let original = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::default(), 0));
let original = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::default(),
0,
)));
assert_matches!(crds.insert(original.clone(), 0), Ok(_));
let val = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::default(), 1));
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::default(),
1,
)));
assert_eq!(
crds.insert(val.clone(), 1).unwrap().unwrap().value,
original
@ -198,14 +205,17 @@ mod test {
#[test]
fn test_update_timestamp() {
let mut crds = Crds::default();
let val = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::default(), 0));
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::default(),
0,
)));
assert_eq!(crds.insert(val.clone(), 0), Ok(None));
crds.update_label_timestamp(&val.label(), 1);
assert_eq!(crds.table[&val.label()].local_timestamp, 1);
assert_eq!(crds.table[&val.label()].insert_timestamp, 0);
let val2 = CrdsValue::ContactInfo(ContactInfo::default());
let val2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
assert_eq!(val2.label().pubkey(), val.label().pubkey());
assert_matches!(crds.insert(val2.clone(), 0), Ok(Some(_)));
@ -221,7 +231,7 @@ mod test {
let mut ci = ContactInfo::default();
ci.wallclock += 1;
let val3 = CrdsValue::ContactInfo(ci);
let val3 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci));
assert_matches!(crds.insert(val3.clone(), 3), Ok(Some(_)));
assert_eq!(crds.table[&val2.label()].local_timestamp, 3);
assert_eq!(crds.table[&val2.label()].insert_timestamp, 3);
@ -229,7 +239,7 @@ mod test {
#[test]
fn test_find_old_records() {
let mut crds = Crds::default();
let val = CrdsValue::ContactInfo(ContactInfo::default());
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
assert_eq!(crds.insert(val.clone(), 1), Ok(None));
assert!(crds.find_old_labels(0).is_empty());
@ -239,7 +249,7 @@ mod test {
#[test]
fn test_remove() {
let mut crds = Crds::default();
let val = CrdsValue::ContactInfo(ContactInfo::default());
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
assert_matches!(crds.insert(val.clone(), 1), Ok(_));
assert_eq!(crds.find_old_labels(1), vec![val.label()]);
@ -248,7 +258,7 @@ mod test {
}
#[test]
fn test_equal() {
let val = CrdsValue::ContactInfo(ContactInfo::default());
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
let v1 = VersionedCrdsValue::new(1, val.clone());
let v2 = VersionedCrdsValue::new(1, val);
assert_eq!(v1, v2);
@ -258,12 +268,15 @@ mod test {
fn test_hash_order() {
let v1 = VersionedCrdsValue::new(
1,
CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::default(), 0)),
CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::default(),
0,
))),
);
let v2 = VersionedCrdsValue::new(1, {
let mut contact_info = ContactInfo::new_localhost(&Pubkey::default(), 0);
contact_info.rpc = socketaddr!("0.0.0.0:0");
CrdsValue::ContactInfo(contact_info)
CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info))
});
assert_eq!(v1.value.label(), v2.value.label());
@ -285,11 +298,17 @@ mod test {
fn test_wallclock_order() {
let v1 = VersionedCrdsValue::new(
1,
CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::default(), 1)),
CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::default(),
1,
))),
);
let v2 = VersionedCrdsValue::new(
1,
CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::default(), 0)),
CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::default(),
0,
))),
);
assert_eq!(v1.value.label(), v2.value.label());
assert!(v1 > v2);
@ -301,11 +320,17 @@ mod test {
fn test_label_order() {
let v1 = VersionedCrdsValue::new(
1,
CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0)),
CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
))),
);
let v2 = VersionedCrdsValue::new(
1,
CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0)),
CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
))),
);
assert_ne!(v1, v2);
assert!(!(v1 == v2));

View File

@ -9,7 +9,6 @@ use crate::crds_gossip_pull::{CrdsFilter, CrdsGossipPull};
use crate::crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE};
use crate::crds_value::{CrdsValue, CrdsValueLabel};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signable;
use std::collections::{HashMap, HashSet};
///The min size for bloom filters
@ -204,6 +203,7 @@ pub fn get_weight(max_weight: f32, time_since_last_selected: u32, stake: f32) ->
mod test {
use super::*;
use crate::contact_info::ContactInfo;
use crate::crds_value::CrdsData;
use solana_sdk::hash::hash;
use solana_sdk::timing::timestamp;
@ -216,7 +216,10 @@ mod test {
let prune_pubkey = Pubkey::new(&[2; 32]);
crds_gossip
.crds
.insert(CrdsValue::ContactInfo(ci.clone()), 0)
.insert(
CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())),
0,
)
.unwrap();
crds_gossip.refresh_push_active_set(&HashMap::new());
let now = timestamp();

View File

@ -294,6 +294,7 @@ impl CrdsGossipPull {
mod test {
use super::*;
use crate::contact_info::ContactInfo;
use crate::crds_value::CrdsData;
use itertools::Itertools;
use solana_sdk::hash::hash;
use solana_sdk::packet::PACKET_DATA_SIZE;
@ -303,10 +304,16 @@ mod test {
let mut crds = Crds::default();
let mut stakes = HashMap::new();
let node = CrdsGossipPull::default();
let me = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let me = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
crds.insert(me.clone(), 0).unwrap();
for i in 1..=30 {
let entry = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
let id = entry.label().pubkey();
crds.insert(entry.clone(), 0).unwrap();
stakes.insert(id, i * 100);
@ -325,7 +332,10 @@ mod test {
#[test]
fn test_new_pull_request() {
let mut crds = Crds::default();
let entry = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
let id = entry.label().pubkey();
let node = CrdsGossipPull::default();
assert_eq!(
@ -339,7 +349,10 @@ mod test {
Err(CrdsGossipError::NoPeers)
);
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
crds.insert(new.clone(), 0).unwrap();
let req = node.new_pull_request(&crds, &id, 0, &HashMap::new(), PACKET_DATA_SIZE);
let (to, _, self_info) = req.unwrap();
@ -350,13 +363,22 @@ mod test {
#[test]
fn test_new_mark_creation_time() {
let mut crds = Crds::default();
let entry = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
let node_pubkey = entry.label().pubkey();
let mut node = CrdsGossipPull::default();
crds.insert(entry.clone(), 0).unwrap();
let old = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
crds.insert(old.clone(), 0).unwrap();
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
crds.insert(new.clone(), 0).unwrap();
// set request creation time to max_value
@ -380,11 +402,17 @@ mod test {
#[test]
fn test_process_pull_request() {
let mut node_crds = Crds::default();
let entry = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
let node_pubkey = entry.label().pubkey();
let node = CrdsGossipPull::default();
node_crds.insert(entry.clone(), 0).unwrap();
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
node_crds.insert(new.clone(), 0).unwrap();
let req = node.new_pull_request(
&node_crds,
@ -419,22 +447,32 @@ mod test {
#[test]
fn test_process_pull_request_response() {
let mut node_crds = Crds::default();
let entry = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
let node_pubkey = entry.label().pubkey();
let mut node = CrdsGossipPull::default();
node_crds.insert(entry.clone(), 0).unwrap();
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
node_crds.insert(new.clone(), 0).unwrap();
let mut dest = CrdsGossipPull::default();
let mut dest_crds = Crds::default();
let new_id = Pubkey::new_rand();
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(&new_id, 1));
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&new_id, 1,
)));
dest_crds.insert(new.clone(), 0).unwrap();
// node contains a key from the dest node, but at an older local timestamp
let same_key = CrdsValue::ContactInfo(ContactInfo::new_localhost(&new_id, 0));
let same_key = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&new_id, 0,
)));
assert_eq!(same_key.label(), new.label());
assert!(same_key.wallclock() < new.wallclock());
node_crds.insert(same_key.clone(), 0).unwrap();
@ -494,12 +532,18 @@ mod test {
#[test]
fn test_gossip_purge() {
let mut node_crds = Crds::default();
let entry = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
let node_label = entry.label();
let node_pubkey = node_label.pubkey();
let mut node = CrdsGossipPull::default();
node_crds.insert(entry.clone(), 0).unwrap();
let old = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
node_crds.insert(old.clone(), 0).unwrap();
let value_hash = node_crds.lookup_versioned(&old.label()).unwrap().value_hash;

View File

@ -339,7 +339,7 @@ impl CrdsGossipPush {
mod test {
use super::*;
use crate::contact_info::ContactInfo;
use solana_sdk::signature::Signable;
use crate::crds_value::CrdsData;
#[test]
fn test_prune() {
@ -352,7 +352,9 @@ mod test {
stakes.insert(self_id, 100);
stakes.insert(origin, 100);
let value = CrdsValue::ContactInfo(ContactInfo::new_localhost(&origin, 0));
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&origin, 0,
)));
let label = value.label();
let low_staked_peers = (0..10).map(|_| Pubkey::new_rand());
let mut low_staked_set = HashSet::new();
@ -394,7 +396,10 @@ mod test {
fn test_process_push() {
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let value = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
let label = value.label();
// push a new message
assert_eq!(
@ -415,7 +420,7 @@ mod test {
let mut push = CrdsGossipPush::default();
let mut ci = ContactInfo::new_localhost(&Pubkey::new_rand(), 0);
ci.wallclock = 1;
let value = CrdsValue::ContactInfo(ci.clone());
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone()));
// push a new message
assert_eq!(
@ -425,7 +430,7 @@ mod test {
// push an old version
ci.wallclock = 0;
let value = CrdsValue::ContactInfo(ci.clone());
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone()));
assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), value, 0),
Err(CrdsGossipError::PushMessageOldVersion)
@ -440,7 +445,7 @@ mod test {
// push a version to far in the future
ci.wallclock = timeout + 1;
let value = CrdsValue::ContactInfo(ci.clone());
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone()));
assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), value, 0),
Err(CrdsGossipError::PushMessageTimeout)
@ -448,7 +453,7 @@ mod test {
// push a version to far in the past
ci.wallclock = 0;
let value = CrdsValue::ContactInfo(ci.clone());
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone()));
assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), value, timeout + 1),
Err(CrdsGossipError::PushMessageTimeout)
@ -460,7 +465,7 @@ mod test {
let mut push = CrdsGossipPush::default();
let mut ci = ContactInfo::new_localhost(&Pubkey::new_rand(), 0);
ci.wallclock = 0;
let value_old = CrdsValue::ContactInfo(ci.clone());
let value_old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone()));
// push a new message
assert_eq!(
@ -470,7 +475,7 @@ mod test {
// push an old version
ci.wallclock = 1;
let value = CrdsValue::ContactInfo(ci.clone());
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone()));
assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), value, 0)
.unwrap()
@ -491,13 +496,19 @@ mod test {
solana_logger::setup();
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let value1 = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let value1 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
assert_eq!(crds.insert(value1.clone(), 0), Ok(None));
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1);
assert!(push.active_set.get(&value1.label().pubkey()).is_some());
let value2 = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
assert!(push.active_set.get(&value2.label().pubkey()).is_none());
assert_eq!(crds.insert(value2.clone(), 0), Ok(None));
for _ in 0..30 {
@ -509,7 +520,9 @@ mod test {
assert!(push.active_set.get(&value2.label().pubkey()).is_some());
for _ in 0..push.num_active {
let value2 = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(
ContactInfo::new_localhost(&Pubkey::new_rand(), 0),
));
assert_eq!(crds.insert(value2.clone(), 0), Ok(None));
}
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1);
@ -522,8 +535,10 @@ mod test {
let push = CrdsGossipPush::default();
let mut stakes = HashMap::new();
for i in 1..=100 {
let peer =
CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), time));
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
time,
)));
let id = peer.label().pubkey();
crds.insert(peer.clone(), time).unwrap();
stakes.insert(id, i * 100);
@ -541,11 +556,17 @@ mod test {
fn test_new_push_messages() {
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1);
let new_msg = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
let mut expected = HashMap::new();
expected.insert(peer.label().pubkey(), vec![new_msg.clone()]);
assert_eq!(
@ -559,11 +580,20 @@ mod test {
fn test_personalized_push_messages() {
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let peer_1 = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let peer_1 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
assert_eq!(crds.insert(peer_1.clone(), 0), Ok(None));
let peer_2 = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let peer_2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
assert_eq!(crds.insert(peer_2.clone(), 0), Ok(None));
let peer_3 = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let peer_3 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), peer_3.clone(), 0),
Ok(None)
@ -571,7 +601,10 @@ mod test {
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1);
// push 3's contact info to 1 and 2 and 3
let new_msg = CrdsValue::ContactInfo(ContactInfo::new_localhost(&peer_3.pubkey(), 0));
let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&peer_3.pubkey(),
0,
)));
let mut expected = HashMap::new();
expected.insert(peer_1.pubkey(), vec![new_msg.clone()]);
expected.insert(peer_2.pubkey(), vec![new_msg.clone()]);
@ -582,11 +615,17 @@ mod test {
fn test_process_prune() {
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1);
let new_msg = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
let expected = HashMap::new();
assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), new_msg.clone(), 0),
@ -599,13 +638,16 @@ mod test {
fn test_purge_old_pending_push_messages() {
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let peer = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let peer = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1);
let mut ci = ContactInfo::new_localhost(&Pubkey::new_rand(), 0);
ci.wallclock = 1;
let new_msg = CrdsValue::ContactInfo(ci.clone());
let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone()));
let expected = HashMap::new();
assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), new_msg.clone(), 1),
@ -621,7 +663,7 @@ mod test {
let mut push = CrdsGossipPush::default();
let mut ci = ContactInfo::new_localhost(&Pubkey::new_rand(), 0);
ci.wallclock = 0;
let value = CrdsValue::ContactInfo(ci.clone());
let value = CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone()));
let label = value.label();
// push a new message
assert_eq!(

View File

@ -8,9 +8,34 @@ use std::collections::BTreeSet;
use std::fmt;
/// CrdsValue that is replicated across the cluster
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct CrdsValue {
pub signature: Signature,
pub data: CrdsData,
}
impl Signable for CrdsValue {
fn pubkey(&self) -> Pubkey {
self.pubkey()
}
fn signable_data(&self) -> Cow<[u8]> {
Cow::Owned(serialize(&self.data).expect("failed to serialize CrdsData"))
}
fn get_signature(&self) -> Signature {
self.signature
}
fn set_signature(&mut self, signature: Signature) {
self.signature = signature
}
}
/// CrdsData that defines the different types of items CrdsValues can hold
#[allow(clippy::large_enum_variant)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum CrdsValue {
pub enum CrdsData {
/// * Merge Strategy - Latest wallclock is picked
ContactInfo(ContactInfo),
/// * Merge Strategy - Latest wallclock is picked
@ -24,7 +49,6 @@ pub struct EpochSlots {
pub from: Pubkey,
pub root: u64,
pub slots: BTreeSet<u64>,
pub signature: Signature,
pub wallclock: u64,
}
@ -34,46 +58,15 @@ impl EpochSlots {
from,
root,
slots,
signature: Signature::default(),
wallclock,
}
}
}
impl Signable for EpochSlots {
fn pubkey(&self) -> Pubkey {
self.from
}
fn signable_data(&self) -> Cow<[u8]> {
#[derive(Serialize)]
struct SignData<'a> {
root: u64,
slots: &'a BTreeSet<u64>,
wallclock: u64,
}
let data = SignData {
root: self.root,
slots: &self.slots,
wallclock: self.wallclock,
};
Cow::Owned(serialize(&data).expect("unable to serialize EpochSlots"))
}
fn get_signature(&self) -> Signature {
self.signature
}
fn set_signature(&mut self, signature: Signature) {
self.signature = signature;
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct Vote {
pub from: Pubkey,
pub transaction: Transaction,
pub signature: Signature,
pub wallclock: u64,
}
@ -82,39 +75,11 @@ impl Vote {
Self {
from: *from,
transaction,
signature: Signature::default(),
wallclock,
}
}
}
impl Signable for Vote {
fn pubkey(&self) -> Pubkey {
self.from
}
fn signable_data(&self) -> Cow<[u8]> {
#[derive(Serialize)]
struct SignData<'a> {
transaction: &'a Transaction,
wallclock: u64,
}
let data = SignData {
transaction: &self.transaction,
wallclock: self.wallclock,
};
Cow::Owned(serialize(&data).expect("unable to serialize Vote"))
}
fn get_signature(&self) -> Signature {
self.signature
}
fn set_signature(&mut self, signature: Signature) {
self.signature = signature
}
}
/// Type of the replicated value
/// These are labels for values in a record that is associated with `Pubkey`
#[derive(PartialEq, Hash, Eq, Clone, Debug)]
@ -145,40 +110,57 @@ impl CrdsValueLabel {
}
impl CrdsValue {
pub fn new_unsigned(data: CrdsData) -> Self {
Self {
signature: Signature::default(),
data,
}
}
pub fn new_signed(data: CrdsData, keypair: &Keypair) -> Self {
let mut value = Self::new_unsigned(data);
value.sign(keypair);
value
}
/// Totally unsecure unverfiable wallclock of the node that generated this message
/// Latest wallclock is always picked.
/// This is used to time out push messages.
pub fn wallclock(&self) -> u64 {
match self {
CrdsValue::ContactInfo(contact_info) => contact_info.wallclock,
CrdsValue::Vote(vote) => vote.wallclock,
CrdsValue::EpochSlots(vote) => vote.wallclock,
match &self.data {
CrdsData::ContactInfo(contact_info) => contact_info.wallclock,
CrdsData::Vote(vote) => vote.wallclock,
CrdsData::EpochSlots(vote) => vote.wallclock,
}
}
pub fn pubkey(&self) -> Pubkey {
match &self.data {
CrdsData::ContactInfo(contact_info) => contact_info.id,
CrdsData::Vote(vote) => vote.from,
CrdsData::EpochSlots(slots) => slots.from,
}
}
pub fn label(&self) -> CrdsValueLabel {
match self {
CrdsValue::ContactInfo(contact_info) => {
CrdsValueLabel::ContactInfo(contact_info.pubkey())
}
CrdsValue::Vote(vote) => CrdsValueLabel::Vote(vote.pubkey()),
CrdsValue::EpochSlots(slots) => CrdsValueLabel::EpochSlots(slots.pubkey()),
match &self.data {
CrdsData::ContactInfo(_) => CrdsValueLabel::ContactInfo(self.pubkey()),
CrdsData::Vote(_) => CrdsValueLabel::Vote(self.pubkey()),
CrdsData::EpochSlots(_) => CrdsValueLabel::EpochSlots(self.pubkey()),
}
}
pub fn contact_info(&self) -> Option<&ContactInfo> {
match self {
CrdsValue::ContactInfo(contact_info) => Some(contact_info),
match &self.data {
CrdsData::ContactInfo(contact_info) => Some(contact_info),
_ => None,
}
}
pub fn vote(&self) -> Option<&Vote> {
match self {
CrdsValue::Vote(vote) => Some(vote),
match &self.data {
CrdsData::Vote(vote) => Some(vote),
_ => None,
}
}
pub fn epoch_slots(&self) -> Option<&EpochSlots> {
match self {
CrdsValue::EpochSlots(slots) => Some(slots),
match &self.data {
CrdsData::EpochSlots(slots) => Some(slots),
_ => None,
}
}
@ -197,48 +179,6 @@ impl CrdsValue {
}
}
impl Signable for CrdsValue {
fn sign(&mut self, keypair: &Keypair) {
match self {
CrdsValue::ContactInfo(contact_info) => contact_info.sign(keypair),
CrdsValue::Vote(vote) => vote.sign(keypair),
CrdsValue::EpochSlots(epoch_slots) => epoch_slots.sign(keypair),
};
}
fn verify(&self) -> bool {
match self {
CrdsValue::ContactInfo(contact_info) => contact_info.verify(),
CrdsValue::Vote(vote) => vote.verify(),
CrdsValue::EpochSlots(epoch_slots) => epoch_slots.verify(),
}
}
fn pubkey(&self) -> Pubkey {
match self {
CrdsValue::ContactInfo(contact_info) => contact_info.pubkey(),
CrdsValue::Vote(vote) => vote.pubkey(),
CrdsValue::EpochSlots(epoch_slots) => epoch_slots.pubkey(),
}
}
fn signable_data(&self) -> Cow<[u8]> {
unimplemented!()
}
fn get_signature(&self) -> Signature {
match self {
CrdsValue::ContactInfo(contact_info) => contact_info.get_signature(),
CrdsValue::Vote(vote) => vote.get_signature(),
CrdsValue::EpochSlots(epoch_slots) => epoch_slots.get_signature(),
}
}
fn set_signature(&mut self, _: Signature) {
unimplemented!()
}
}
#[cfg(test)]
mod test {
use super::*;
@ -263,17 +203,23 @@ mod test {
}
#[test]
fn test_keys_and_values() {
let v = CrdsValue::ContactInfo(ContactInfo::default());
let v = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
assert_eq!(v.wallclock(), 0);
let key = v.clone().contact_info().unwrap().id;
assert_eq!(v.label(), CrdsValueLabel::ContactInfo(key));
let v = CrdsValue::Vote(Vote::new(&Pubkey::default(), test_tx(), 0));
let v =
CrdsValue::new_unsigned(CrdsData::Vote(Vote::new(&Pubkey::default(), test_tx(), 0)));
assert_eq!(v.wallclock(), 0);
let key = v.clone().vote().unwrap().from;
assert_eq!(v.label(), CrdsValueLabel::Vote(key));
let v = CrdsValue::EpochSlots(EpochSlots::new(Pubkey::default(), 0, BTreeSet::new(), 0));
let v = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new(
Pubkey::default(),
0,
BTreeSet::new(),
0,
)));
assert_eq!(v.wallclock(), 0);
let key = v.clone().epoch_slots().unwrap().from;
assert_eq!(v.label(), CrdsValueLabel::EpochSlots(key));
@ -282,13 +228,24 @@ mod test {
fn test_signature() {
let keypair = Keypair::new();
let wrong_keypair = Keypair::new();
let mut v =
CrdsValue::ContactInfo(ContactInfo::new_localhost(&keypair.pubkey(), timestamp()));
let mut v = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&keypair.pubkey(),
timestamp(),
)));
verify_signatures(&mut v, &keypair, &wrong_keypair);
v = CrdsValue::Vote(Vote::new(&keypair.pubkey(), test_tx(), timestamp()));
v = CrdsValue::new_unsigned(CrdsData::Vote(Vote::new(
&keypair.pubkey(),
test_tx(),
timestamp(),
)));
verify_signatures(&mut v, &keypair, &wrong_keypair);
let btreeset: BTreeSet<u64> = vec![1, 2, 3, 6, 8].into_iter().collect();
v = CrdsValue::EpochSlots(EpochSlots::new(keypair.pubkey(), 0, btreeset, timestamp()));
v = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new(
keypair.pubkey(),
0,
btreeset,
timestamp(),
)));
verify_signatures(&mut v, &keypair, &wrong_keypair);
}

View File

@ -6,8 +6,8 @@ use solana_core::contact_info::ContactInfo;
use solana_core::crds_gossip::*;
use solana_core::crds_gossip_error::CrdsGossipError;
use solana_core::crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS;
use solana_core::crds_value::CrdsValue;
use solana_core::crds_value::CrdsValueLabel;
use solana_core::crds_value::{CrdsData, CrdsValue};
use solana_sdk::hash::hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::timestamp;
@ -72,10 +72,16 @@ fn stakes(network: &Network) -> HashMap<Pubkey, u64> {
}
fn star_network_create(num: usize) -> Network {
let entry = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
let mut network: HashMap<_, _> = (1..num)
.map(|_| {
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
let id = new.label().pubkey();
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), 0).unwrap();
@ -93,14 +99,20 @@ fn star_network_create(num: usize) -> Network {
}
fn rstar_network_create(num: usize) -> Network {
let entry = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
let mut origin = CrdsGossip::default();
let id = entry.label().pubkey();
origin.crds.insert(entry.clone(), 0).unwrap();
origin.set_self(&id);
let mut network: HashMap<_, _> = (1..num)
.map(|_| {
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
let id = new.label().pubkey();
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), 0).unwrap();
@ -116,7 +128,10 @@ fn rstar_network_create(num: usize) -> Network {
fn ring_network_create(num: usize) -> Network {
let mut network: HashMap<_, _> = (0..num)
.map(|_| {
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
let id = new.label().pubkey();
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), 0).unwrap();
@ -147,7 +162,10 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network {
let num = stakes.len();
let mut network: HashMap<_, _> = (0..num)
.map(|n| {
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
let id = new.label().pubkey();
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), 0).unwrap();
@ -219,7 +237,11 @@ fn network_simulator(network: &mut Network, max_convergance: f64) {
.and_then(|v| v.contact_info().cloned())
.unwrap();
m.wallclock = now;
node.process_push_message(&Pubkey::default(), vec![CrdsValue::ContactInfo(m)], now);
node.process_push_message(
&Pubkey::default(),
vec![CrdsValue::new_unsigned(CrdsData::ContactInfo(m))],
now,
);
});
// push for a bit
let (queue_size, bytes_tx) = network_run_push(network, start, end);
@ -547,7 +569,10 @@ fn test_prune_errors() {
let prune_pubkey = Pubkey::new(&[2; 32]);
crds_gossip
.crds
.insert(CrdsValue::ContactInfo(ci.clone()), 0)
.insert(
CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())),
0,
)
.unwrap();
crds_gossip.refresh_push_active_set(&HashMap::new());
let now = timestamp();