Add signature verification to gossip (#1937)

This commit is contained in:
Sagar Dhawan 2018-12-01 12:00:30 -08:00 committed by GitHub
parent 8ef73eee51
commit 34c3a0cc1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 432 additions and 99 deletions

View File

@ -30,6 +30,22 @@ impl Signature {
} }
} }
pub trait Signable {
fn sign(&mut self, keypair: &Keypair) {
let data = self.signable_data();
self.set_signature(Signature::new(&keypair.sign(&data).as_ref()));
}
fn verify(&self) -> bool {
self.get_signature()
.verify(&self.pubkey().as_ref(), &self.signable_data())
}
fn pubkey(&self) -> Pubkey;
fn signable_data(&self) -> Vec<u8>;
fn get_signature(&self) -> Signature;
fn set_signature(&mut self, signature: Signature);
}
impl AsRef<[u8]> for Signature { impl AsRef<[u8]> for Signature {
fn as_ref(&self) -> &[u8] { fn as_ref(&self) -> &[u8] {
&self.0[..] &self.0[..]

View File

@ -27,7 +27,7 @@ use rand::{thread_rng, Rng};
use rayon::prelude::*; use rayon::prelude::*;
use result::Result; use result::Result;
use rpc::RPC_PORT; use rpc::RPC_PORT;
use signature::{Keypair, KeypairUtil}; use signature::{Keypair, KeypairUtil, Signable, Signature};
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::{duration_as_ms, timestamp}; use solana_sdk::timing::{duration_as_ms, timestamp};
@ -60,17 +60,64 @@ pub enum ClusterInfoError {
pub struct ClusterInfo { pub struct ClusterInfo {
/// The network /// The network
pub gossip: CrdsGossip, pub gossip: CrdsGossip,
/// set the keypair that will be used to sign crds values generated. It is unset only in tests.
keypair: Arc<Keypair>,
} }
// TODO These messages should be signed, and go through the gpu pipeline for spam filtering #[derive(Debug, Deserialize, Serialize)]
pub struct PruneData {
/// Pubkey of the node that sent this prune data
pub pubkey: Pubkey,
/// Pubkeys of nodes that should be pruned
pub prunes: Vec<Pubkey>,
/// Signature of this Prune Message
pub signature: Signature,
/// The Pubkey of the intended node/destination for this message
pub destination: Pubkey,
/// Wallclock of the node that generated this message
pub wallclock: u64,
}
impl Signable for PruneData {
fn pubkey(&self) -> Pubkey {
self.pubkey
}
fn signable_data(&self) -> Vec<u8> {
#[derive(Serialize)]
struct SignData {
pubkey: Pubkey,
prunes: Vec<Pubkey>,
destination: Pubkey,
wallclock: u64,
}
let data = SignData {
pubkey: self.pubkey,
prunes: self.prunes.clone(),
destination: self.destination,
wallclock: self.wallclock,
};
serialize(&data).expect("serialize PruneData")
}
fn get_signature(&self) -> Signature {
self.signature
}
fn set_signature(&mut self, signature: Signature) {
self.signature = signature
}
}
// TODO These messages should go through the gpu pipeline for spam filtering
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
enum Protocol { enum Protocol {
/// Gosisp protocol messages /// Gossip protocol messages
PullRequest(Bloom<Hash>, CrdsValue), PullRequest(Bloom<Hash>, CrdsValue),
PullResponse(Pubkey, Vec<CrdsValue>), PullResponse(Pubkey, Vec<CrdsValue>),
PushMessage(Pubkey, Vec<CrdsValue>), PushMessage(Pubkey, Vec<CrdsValue>),
PruneMessage(Pubkey, Vec<Pubkey>), PruneMessage(Pubkey, PruneData),
/// Window protocol messages /// Window protocol messages
/// TODO: move this message to a different module /// TODO: move this message to a different module
@ -79,8 +126,13 @@ enum Protocol {
impl ClusterInfo { impl ClusterInfo {
pub fn new(node_info: NodeInfo) -> Self { pub fn new(node_info: NodeInfo) -> Self {
//Without a keypair, gossip will not function. Only useful for tests.
ClusterInfo::new_with_keypair(node_info, Arc::new(Keypair::new()))
}
pub fn new_with_keypair(node_info: NodeInfo, keypair: Arc<Keypair>) -> Self {
let mut me = ClusterInfo { let mut me = ClusterInfo {
gossip: CrdsGossip::default(), gossip: CrdsGossip::default(),
keypair,
}; };
let id = node_info.id; let id = node_info.id;
me.gossip.set_self(id); me.gossip.set_self(id);
@ -92,12 +144,14 @@ impl ClusterInfo {
let mut my_data = self.my_data(); let mut my_data = self.my_data();
let now = timestamp(); let now = timestamp();
my_data.wallclock = now; my_data.wallclock = now;
let entry = CrdsValue::ContactInfo(my_data); let mut entry = CrdsValue::ContactInfo(my_data);
entry.sign(&self.keypair);
self.gossip.refresh_push_active_set(); self.gossip.refresh_push_active_set();
self.gossip.process_push_message(&[entry], now); self.gossip.process_push_message(&[entry], now);
} }
pub fn insert_info(&mut self, node_info: NodeInfo) { pub fn insert_info(&mut self, node_info: NodeInfo) {
let value = CrdsValue::ContactInfo(node_info); let mut value = CrdsValue::ContactInfo(node_info);
value.sign(&self.keypair);
let _ = self.gossip.crds.insert(value, timestamp()); let _ = self.gossip.crds.insert(value, timestamp());
} }
pub fn id(&self) -> Pubkey { pub fn id(&self) -> Pubkey {
@ -165,13 +219,10 @@ impl ClusterInfo {
let prev = self.leader_id(); let prev = self.leader_id();
let self_id = self.gossip.id; let self_id = self.gossip.id;
let now = timestamp(); let now = timestamp();
let leader = LeaderId { let leader = LeaderId::new(self_id, key, now);
id: self_id, let mut entry = CrdsValue::LeaderId(leader);
leader_id: key,
wallclock: now,
};
let entry = CrdsValue::LeaderId(leader);
warn!("{}: LEADER_UPDATE TO {} from {}", self_id, key, prev); warn!("{}: LEADER_UPDATE TO {} from {}", self_id, key, prev);
entry.sign(&self.keypair);
self.gossip.process_push_message(&[entry], now); self.gossip.process_push_message(&[entry], now);
} }
@ -743,15 +794,23 @@ impl ClusterInfo {
.gossip .gossip
.process_push_message(&data, timestamp()); .process_push_message(&data, timestamp());
if !prunes.is_empty() { if !prunes.is_empty() {
let mut wme = me.write().unwrap();
inc_new_counter_info!("cluster_info-push_message-prunes", prunes.len()); inc_new_counter_info!("cluster_info-push_message-prunes", prunes.len());
let rsp = Protocol::PruneMessage(self_id, prunes); let ci = me.read().unwrap().lookup(from).cloned();
let ci = wme.lookup(from).cloned(); let pushes: Vec<_> = me.write().unwrap().new_push_requests();
let pushes: Vec<_> = wme.new_push_requests();
inc_new_counter_info!("cluster_info-push_message-pushes", pushes.len()); inc_new_counter_info!("cluster_info-push_message-pushes", pushes.len());
let mut rsp: Vec<_> = ci let mut rsp: Vec<_> = ci
.and_then(|ci| to_blob(rsp, ci.ncp).ok()) .and_then(|ci| {
.into_iter() let mut prune_msg = PruneData {
pubkey: self_id,
prunes,
signature: Signature::default(),
destination: from,
wallclock: timestamp(),
};
prune_msg.sign(&me.read().unwrap().keypair);
let rsp = Protocol::PruneMessage(self_id, prune_msg);
to_blob(rsp, ci.ncp).ok()
}).into_iter()
.collect(); .collect();
let mut blobs: Vec<_> = pushes let mut blobs: Vec<_> = pushes
.into_iter() .into_iter()
@ -821,19 +880,35 @@ impl ClusterInfo {
ledger_window: &mut Option<&mut LedgerWindow>, ledger_window: &mut Option<&mut LedgerWindow>,
) -> Vec<SharedBlob> { ) -> Vec<SharedBlob> {
match request { match request {
// TODO sigverify these // TODO verify messages faster
Protocol::PullRequest(filter, caller) => { Protocol::PullRequest(filter, caller) => {
//Pulls don't need to be verified
Self::handle_pull_request(me, filter, caller, from_addr) Self::handle_pull_request(me, filter, caller, from_addr)
} }
Protocol::PullResponse(from, data) => { Protocol::PullResponse(from, mut data) => {
data.retain(|v| v.verify());
Self::handle_pull_response(me, from, data); Self::handle_pull_response(me, from, data);
vec![] vec![]
} }
Protocol::PushMessage(from, data) => Self::handle_push_message(me, from, &data), Protocol::PushMessage(from, mut data) => {
data.retain(|v| v.verify());
Self::handle_push_message(me, from, &data)
}
Protocol::PruneMessage(from, data) => { Protocol::PruneMessage(from, data) => {
inc_new_counter_info!("cluster_info-prune_message", 1); if data.verify() {
inc_new_counter_info!("cluster_info-prune_message-size", data.len()); inc_new_counter_info!("cluster_info-prune_message", 1);
me.write().unwrap().gossip.process_prune_msg(from, &data); inc_new_counter_info!("cluster_info-prune_message-size", data.prunes.len());
me.write()
.unwrap()
.gossip
.process_prune_msg(
from,
data.destination,
&data.prunes,
data.wallclock,
timestamp(),
).ok();
}
vec![] vec![]
} }
Protocol::RequestWindowIndex(from, ix) => { Protocol::RequestWindowIndex(from, ix) => {
@ -1343,4 +1418,32 @@ mod tests {
assert!(node.sockets.repair.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0); assert!(node.sockets.repair.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0);
assert!(node.sockets.repair.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1); assert!(node.sockets.repair.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1);
} }
//test that all cluster_info objects only generate signed messages
//when constructed with keypairs
#[test]
fn test_gossip_signature_verification() {
//create new cluster info, leader, and peer
let keypair = Keypair::new();
let peer_keypair = Keypair::new();
let leader_keypair = Keypair::new();
let node_info = NodeInfo::new_localhost(keypair.pubkey(), 0);
let leader = NodeInfo::new_localhost(leader_keypair.pubkey(), 0);
let peer = NodeInfo::new_localhost(peer_keypair.pubkey(), 0);
let mut cluster_info = ClusterInfo::new_with_keypair(node_info.clone(), Arc::new(keypair));
cluster_info.set_leader(leader.id);
cluster_info.insert_info(peer.clone());
//check that all types of gossip messages are signed correctly
let (_, _, vals) = cluster_info.gossip.new_push_messages(timestamp());
// there should be some pushes ready
assert!(vals.len() > 0);
vals.par_iter().for_each(|v| assert!(v.verify()));
let (_, _, val) = cluster_info
.gossip
.new_pull_request(timestamp())
.ok()
.unwrap();
assert!(val.verify());
}
} }

View File

@ -1,5 +1,6 @@
use bincode::serialize;
use rpc::RPC_PORT; use rpc::RPC_PORT;
use signature::{Keypair, KeypairUtil}; use signature::{Keypair, KeypairUtil, Signable, Signature};
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::timestamp; use solana_sdk::timing::timestamp;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
@ -8,6 +9,8 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct ContactInfo { pub struct ContactInfo {
pub id: Pubkey, pub id: Pubkey,
/// signature of this ContactInfo
pub signature: Signature,
/// gossip address /// gossip address
pub ncp: SocketAddr, pub ncp: SocketAddr,
/// address to connect to for replication /// address to connect to for replication
@ -52,6 +55,7 @@ impl Default for ContactInfo {
rpc: socketaddr_any!(), rpc: socketaddr_any!(),
rpc_pubsub: socketaddr_any!(), rpc_pubsub: socketaddr_any!(),
wallclock: 0, wallclock: 0,
signature: Signature::default(),
} }
} }
} }
@ -69,6 +73,7 @@ impl ContactInfo {
) -> Self { ) -> Self {
ContactInfo { ContactInfo {
id, id,
signature: Signature::default(),
ncp, ncp,
tvu, tvu,
tpu, tpu,
@ -161,6 +166,47 @@ impl ContactInfo {
} }
} }
impl Signable for ContactInfo {
fn pubkey(&self) -> Pubkey {
self.id
}
fn signable_data(&self) -> Vec<u8> {
#[derive(Serialize)]
struct SignData {
id: Pubkey,
ncp: SocketAddr,
tvu: SocketAddr,
tpu: SocketAddr,
storage_addr: SocketAddr,
rpc: SocketAddr,
rpc_pubsub: SocketAddr,
wallclock: u64,
}
let me = self;
let data = SignData {
id: me.id,
ncp: me.ncp,
tvu: me.tvu,
tpu: me.tpu,
storage_addr: me.storage_addr,
rpc: me.rpc,
rpc_pubsub: me.rpc_pubsub,
wallclock: me.wallclock,
};
serialize(&data).expect("failed to serialize ContactInfo")
}
fn get_signature(&self) -> Signature {
self.signature
}
fn set_signature(&mut self, signature: Signature) {
self.signature = signature
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -41,7 +41,7 @@ pub enum CrdsError {
InsertFailed, InsertFailed,
} }
/// This structure stores some local metadata assosciated with the CrdsValue /// This structure stores some local metadata associated with the CrdsValue
/// The implementation of PartialOrd ensures that the "highest" version is always picked to be /// The implementation of PartialOrd ensures that the "highest" version is always picked to be
/// stored in the Crds /// stored in the Crds
#[derive(PartialEq, Debug)] #[derive(PartialEq, Debug)]
@ -188,11 +188,7 @@ mod test {
let mut crds = Crds::default(); let mut crds = Crds::default();
let original = CrdsValue::LeaderId(LeaderId::default()); let original = CrdsValue::LeaderId(LeaderId::default());
assert_matches!(crds.insert(original.clone(), 0), Ok(_)); assert_matches!(crds.insert(original.clone(), 0), Ok(_));
let val = CrdsValue::LeaderId(LeaderId { let val = CrdsValue::LeaderId(LeaderId::new(Pubkey::default(), Pubkey::default(), 1));
id: Pubkey::default(),
leader_id: Pubkey::default(),
wallclock: 1,
});
assert_eq!( assert_eq!(
crds.insert(val.clone(), 1).unwrap().unwrap().value, crds.insert(val.clone(), 1).unwrap().unwrap().value,
original original
@ -255,19 +251,11 @@ mod test {
let key = Keypair::new(); let key = Keypair::new();
let v1 = VersionedCrdsValue::new( let v1 = VersionedCrdsValue::new(
1, 1,
CrdsValue::LeaderId(LeaderId { CrdsValue::LeaderId(LeaderId::new(key.pubkey(), Pubkey::default(), 0)),
id: key.pubkey(),
leader_id: Pubkey::default(),
wallclock: 0,
}),
); );
let v2 = VersionedCrdsValue::new( let v2 = VersionedCrdsValue::new(
1, 1,
CrdsValue::LeaderId(LeaderId { CrdsValue::LeaderId(LeaderId::new(key.pubkey(), Pubkey::default(), 0)),
id: key.pubkey(),
leader_id: Pubkey::default(),
wallclock: 0,
}),
); );
assert!(!(v1 != v2)); assert!(!(v1 != v2));
assert!(v1 == v2); assert!(v1 == v2);
@ -277,19 +265,11 @@ mod test {
let key = Keypair::new(); let key = Keypair::new();
let v1 = VersionedCrdsValue::new( let v1 = VersionedCrdsValue::new(
1, 1,
CrdsValue::LeaderId(LeaderId { CrdsValue::LeaderId(LeaderId::new(key.pubkey(), Pubkey::default(), 0)),
id: key.pubkey(),
leader_id: Pubkey::default(),
wallclock: 0,
}),
); );
let v2 = VersionedCrdsValue::new( let v2 = VersionedCrdsValue::new(
1, 1,
CrdsValue::LeaderId(LeaderId { CrdsValue::LeaderId(LeaderId::new(key.pubkey(), key.pubkey(), 0)),
id: key.pubkey(),
leader_id: key.pubkey(),
wallclock: 0,
}),
); );
assert!(v1 != v2); assert!(v1 != v2);
assert!(!(v1 == v2)); assert!(!(v1 == v2));
@ -304,19 +284,11 @@ mod test {
let key = Keypair::new(); let key = Keypair::new();
let v1 = VersionedCrdsValue::new( let v1 = VersionedCrdsValue::new(
1, 1,
CrdsValue::LeaderId(LeaderId { CrdsValue::LeaderId(LeaderId::new(key.pubkey(), Pubkey::default(), 1)),
id: key.pubkey(),
leader_id: Pubkey::default(),
wallclock: 1,
}),
); );
let v2 = VersionedCrdsValue::new( let v2 = VersionedCrdsValue::new(
1, 1,
CrdsValue::LeaderId(LeaderId { CrdsValue::LeaderId(LeaderId::new(key.pubkey(), Pubkey::default(), 0)),
id: key.pubkey(),
leader_id: Pubkey::default(),
wallclock: 0,
}),
); );
assert!(v1 > v2); assert!(v1 > v2);
assert!(!(v1 < v2)); assert!(!(v1 < v2));
@ -327,19 +299,11 @@ mod test {
fn test_label_order() { fn test_label_order() {
let v1 = VersionedCrdsValue::new( let v1 = VersionedCrdsValue::new(
1, 1,
CrdsValue::LeaderId(LeaderId { CrdsValue::LeaderId(LeaderId::new(Keypair::new().pubkey(), Pubkey::default(), 0)),
id: Keypair::new().pubkey(),
leader_id: Pubkey::default(),
wallclock: 0,
}),
); );
let v2 = VersionedCrdsValue::new( let v2 = VersionedCrdsValue::new(
1, 1,
CrdsValue::LeaderId(LeaderId { CrdsValue::LeaderId(LeaderId::new(Keypair::new().pubkey(), Pubkey::default(), 0)),
id: Keypair::new().pubkey(),
leader_id: Pubkey::default(),
wallclock: 0,
}),
); );
assert!(v1 != v2); assert!(v1 != v2);
assert!(!(v1 == v2)); assert!(!(v1 == v2));

View File

@ -12,6 +12,9 @@ use crds_value::CrdsValue;
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
///The min size for bloom filters
pub const CRDS_GOSSIP_BLOOM_SIZE: usize = 1000;
pub struct CrdsGossip { pub struct CrdsGossip {
pub crds: Crds, pub crds: Crds,
pub id: Pubkey, pub id: Pubkey,
@ -64,8 +67,24 @@ impl CrdsGossip {
} }
/// add the `from` to the peer's filter of nodes /// add the `from` to the peer's filter of nodes
pub fn process_prune_msg(&mut self, peer: Pubkey, origin: &[Pubkey]) { pub fn process_prune_msg(
self.push.process_prune_msg(peer, origin) &mut self,
peer: Pubkey,
destination: Pubkey,
origin: &[Pubkey],
wallclock: u64,
now: u64,
) -> Result<(), CrdsGossipError> {
let expired = now > wallclock + self.push.prune_timeout;
if expired {
return Err(CrdsGossipError::PruneMessageTimeout);
}
if self.id == destination {
self.push.process_prune_msg(peer, origin);
Ok(())
} else {
Err(CrdsGossipError::BadPruneDestination)
}
} }
/// refresh the push active set /// refresh the push active set
@ -138,11 +157,14 @@ impl CrdsGossip {
mod test { mod test {
use super::*; use super::*;
use bincode::serialized_size; use bincode::serialized_size;
use cluster_info::NodeInfo;
use contact_info::ContactInfo; use contact_info::ContactInfo;
use crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS; use crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS;
use crds_value::CrdsValueLabel; use crds_value::CrdsValueLabel;
use rayon::prelude::*; use rayon::prelude::*;
use signature::{Keypair, KeypairUtil}; use signature::{Keypair, KeypairUtil};
use solana_sdk::hash::hash;
use solana_sdk::timing::timestamp;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@ -317,8 +339,13 @@ mod test {
prunes += rsps.len(); prunes += rsps.len();
network network
.get(&from) .get(&from)
.map(|node| node.lock().unwrap().process_prune_msg(*to, &rsps)) .map(|node| {
.unwrap(); let mut node = node.lock().unwrap();
let destination = node.id;
let now = timestamp();
node.process_prune_msg(*to, destination, &rsps, now, now)
.unwrap()
}).unwrap();
delivered += rsps.is_empty() as usize; delivered += rsps.is_empty() as usize;
} }
(bytes, delivered, num_msgs, prunes) (bytes, delivered, num_msgs, prunes)
@ -483,4 +510,34 @@ mod test {
let mut network = star_network_create(4002); let mut network = star_network_create(4002);
network_simulator(&mut network); network_simulator(&mut network);
} }
#[test]
fn test_prune_errors() {
let mut crds_gossip = CrdsGossip::default();
crds_gossip.id = Pubkey::new(&[0; 32]);
let id = crds_gossip.id;
let ci = NodeInfo::new_localhost(Pubkey::new(&[1; 32]), 0);
let prune_pubkey = Pubkey::new(&[2; 32]);
crds_gossip
.crds
.insert(CrdsValue::ContactInfo(ci.clone()), 0)
.unwrap();
crds_gossip.refresh_push_active_set();
let now = timestamp();
//incorrect dest
let mut res = crds_gossip.process_prune_msg(
ci.id,
Pubkey::new(hash(&[1; 32]).as_ref()),
&[prune_pubkey],
now,
now,
);
assert_eq!(res.err(), Some(CrdsGossipError::BadPruneDestination));
//correct dest
res = crds_gossip.process_prune_msg(ci.id, id, &[prune_pubkey], now, now);
assert!(res.is_ok());
//test timeout
let timeout = now + crds_gossip.push.prune_timeout * 2;
res = crds_gossip.process_prune_msg(ci.id, id, &[prune_pubkey], now, timeout);
assert_eq!(res.err(), Some(CrdsGossipError::PruneMessageTimeout));
}
} }

View File

@ -4,4 +4,6 @@ pub enum CrdsGossipError {
PushMessageTimeout, PushMessageTimeout,
PushMessagePrune, PushMessagePrune,
PushMessageOldVersion, PushMessageOldVersion,
BadPruneDestination,
PruneMessageTimeout,
} }

View File

@ -12,6 +12,7 @@
use bincode::serialized_size; use bincode::serialized_size;
use bloom::Bloom; use bloom::Bloom;
use crds::Crds; use crds::Crds;
use crds_gossip::CRDS_GOSSIP_BLOOM_SIZE;
use crds_gossip_error::CrdsGossipError; use crds_gossip_error::CrdsGossipError;
use crds_value::{CrdsValue, CrdsValueLabel}; use crds_value::{CrdsValue, CrdsValueLabel};
use packet::BLOB_DATA_SIZE; use packet::BLOB_DATA_SIZE;
@ -135,7 +136,10 @@ impl CrdsGossipPull {
} }
/// build a filter of the current crds table /// build a filter of the current crds table
fn build_crds_filter(&self, crds: &Crds) -> Bloom<Hash> { fn build_crds_filter(&self, crds: &Crds) -> Bloom<Hash> {
let num = crds.table.values().count() + self.purged_values.len(); let num = cmp::max(
CRDS_GOSSIP_BLOOM_SIZE,
crds.table.values().count() + self.purged_values.len(),
);
let mut bloom = Bloom::random(num, 0.1, 4 * 1024 * 8 - 1); let mut bloom = Bloom::random(num, 0.1, 4 * 1024 * 8 - 1);
for v in crds.table.values() { for v in crds.table.values() {
bloom.add(&v.value_hash); bloom.add(&v.value_hash);
@ -292,11 +296,7 @@ mod test {
// node contains a key from the dest node, but at an older local timestamp // node contains a key from the dest node, but at an older local timestamp
let dest_id = new.label().pubkey(); let dest_id = new.label().pubkey();
let same_key = CrdsValue::LeaderId(LeaderId { let same_key = CrdsValue::LeaderId(LeaderId::new(dest_id, dest_id, 1));
id: dest_id,
leader_id: dest_id,
wallclock: 1,
});
node_crds.insert(same_key.clone(), 0).unwrap(); node_crds.insert(same_key.clone(), 0).unwrap();
assert_eq!( assert_eq!(
node_crds node_crds

View File

@ -12,6 +12,7 @@ use bincode::serialized_size;
use bloom::Bloom; use bloom::Bloom;
use contact_info::ContactInfo; use contact_info::ContactInfo;
use crds::{Crds, VersionedCrdsValue}; use crds::{Crds, VersionedCrdsValue};
use crds_gossip::CRDS_GOSSIP_BLOOM_SIZE;
use crds_gossip_error::CrdsGossipError; use crds_gossip_error::CrdsGossipError;
use crds_value::{CrdsValue, CrdsValueLabel}; use crds_value::{CrdsValue, CrdsValueLabel};
use indexmap::map::IndexMap; use indexmap::map::IndexMap;
@ -25,6 +26,7 @@ use std::collections::HashMap;
pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30; pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30;
pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6; pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6;
pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 5000; pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 5000;
pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500;
pub struct CrdsGossipPush { pub struct CrdsGossipPush {
/// max bytes per message /// max bytes per message
@ -37,6 +39,7 @@ pub struct CrdsGossipPush {
pub num_active: usize, pub num_active: usize,
pub push_fanout: usize, pub push_fanout: usize,
pub msg_timeout: u64, pub msg_timeout: u64,
pub prune_timeout: u64,
} }
impl Default for CrdsGossipPush { impl Default for CrdsGossipPush {
@ -49,6 +52,7 @@ impl Default for CrdsGossipPush {
num_active: CRDS_GOSSIP_NUM_ACTIVE, num_active: CRDS_GOSSIP_NUM_ACTIVE,
push_fanout: CRDS_GOSSIP_PUSH_FANOUT, push_fanout: CRDS_GOSSIP_PUSH_FANOUT,
msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS, msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS,
prune_timeout: CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS,
} }
} }
} }
@ -183,7 +187,8 @@ impl CrdsGossipPush {
continue; continue;
} }
} }
let bloom = Bloom::random(network_size, 0.1, 1024 * 8 * 4); let size = cmp::max(CRDS_GOSSIP_BLOOM_SIZE, network_size);
let mut bloom = Bloom::random(size, 0.1, 1024 * 8 * 4);
new_items.insert(val.0.pubkey(), bloom); new_items.insert(val.0.pubkey(), bloom);
if new_items.len() == need { if new_items.len() == need {
break; break;

View File

@ -1,4 +1,6 @@
use bincode::serialize;
use contact_info::ContactInfo; use contact_info::ContactInfo;
use signature::{Keypair, Signable, Signature};
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::transaction::Transaction; use solana_sdk::transaction::Transaction;
use std::fmt; use std::fmt;
@ -18,6 +20,7 @@ pub enum CrdsValue {
#[derive(Serialize, Deserialize, Default, Clone, Debug, PartialEq)] #[derive(Serialize, Deserialize, Default, Clone, Debug, PartialEq)]
pub struct LeaderId { pub struct LeaderId {
pub id: Pubkey, pub id: Pubkey,
pub signature: Signature,
pub leader_id: Pubkey, pub leader_id: Pubkey,
pub wallclock: u64, pub wallclock: u64,
} }
@ -25,12 +28,71 @@ pub struct LeaderId {
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct Vote { pub struct Vote {
pub transaction: Transaction, pub transaction: Transaction,
pub signature: Signature,
pub height: u64, pub height: u64,
pub wallclock: u64, pub wallclock: u64,
} }
impl Signable for LeaderId {
fn pubkey(&self) -> Pubkey {
self.id
}
fn signable_data(&self) -> Vec<u8> {
#[derive(Serialize)]
struct SignData {
id: Pubkey,
leader_id: Pubkey,
wallclock: u64,
}
let data = SignData {
id: self.id,
leader_id: self.leader_id,
wallclock: self.wallclock,
};
serialize(&data).expect("unable to serialize LeaderId")
}
fn get_signature(&self) -> Signature {
self.signature
}
fn set_signature(&mut self, signature: Signature) {
self.signature = signature
}
}
impl Signable for Vote {
fn pubkey(&self) -> Pubkey {
self.transaction.account_keys[0]
}
fn signable_data(&self) -> Vec<u8> {
#[derive(Serialize)]
struct SignData {
transaction: Transaction,
height: u64,
wallclock: u64,
}
let data = SignData {
transaction: self.transaction.clone(),
height: self.height,
wallclock: self.wallclock,
};
serialize(&data).expect("unable to serialize Vote")
}
fn get_signature(&self) -> Signature {
self.signature
}
fn set_signature(&mut self, signature: Signature) {
self.signature = signature
}
}
/// Type of the replicated value /// Type of the replicated value
/// These are labels for values in a record that is assosciated with `Pubkey` /// These are labels for values in a record that is associated with `Pubkey`
#[derive(PartialEq, Hash, Eq, Clone, Debug)] #[derive(PartialEq, Hash, Eq, Clone, Debug)]
pub enum CrdsValueLabel { pub enum CrdsValueLabel {
ContactInfo(Pubkey), ContactInfo(Pubkey),
@ -58,8 +120,30 @@ impl CrdsValueLabel {
} }
} }
impl LeaderId {
pub fn new(id: Pubkey, leader_id: Pubkey, wallclock: u64) -> Self {
LeaderId {
id,
signature: Signature::default(),
leader_id,
wallclock,
}
}
}
impl Vote {
pub fn new(transaction: Transaction, height: u64, wallclock: u64) -> Self {
Vote {
transaction,
signature: Signature::default(),
height,
wallclock,
}
}
}
impl CrdsValue { impl CrdsValue {
/// Totally unsecure unverfiable wallclock of the node that generatd this message /// Totally unsecure unverfiable wallclock of the node that generated this message
/// Latest wallclock is always picked. /// Latest wallclock is always picked.
/// This is used to time out push messages. /// This is used to time out push messages.
pub fn wallclock(&self) -> u64 { pub fn wallclock(&self) -> u64 {
@ -71,9 +155,11 @@ impl CrdsValue {
} }
pub fn label(&self) -> CrdsValueLabel { pub fn label(&self) -> CrdsValueLabel {
match self { match self {
CrdsValue::ContactInfo(contact_info) => CrdsValueLabel::ContactInfo(contact_info.id), CrdsValue::ContactInfo(contact_info) => {
CrdsValue::Vote(vote) => CrdsValueLabel::Vote(vote.transaction.account_keys[0]), CrdsValueLabel::ContactInfo(contact_info.pubkey())
CrdsValue::LeaderId(leader_id) => CrdsValueLabel::LeaderId(leader_id.id), }
CrdsValue::Vote(vote) => CrdsValueLabel::Vote(vote.pubkey()),
CrdsValue::LeaderId(leader_id) => CrdsValueLabel::LeaderId(leader_id.pubkey()),
} }
} }
pub fn contact_info(&self) -> Option<&ContactInfo> { pub fn contact_info(&self) -> Option<&ContactInfo> {
@ -103,10 +189,50 @@ impl CrdsValue {
] ]
} }
} }
impl Signable for CrdsValue {
fn sign(&mut self, keypair: &Keypair) {
match self {
CrdsValue::ContactInfo(contact_info) => contact_info.sign(keypair),
CrdsValue::Vote(vote) => vote.sign(keypair),
CrdsValue::LeaderId(leader_id) => leader_id.sign(keypair),
};
}
fn verify(&self) -> bool {
match self {
CrdsValue::ContactInfo(contact_info) => contact_info.verify(),
CrdsValue::Vote(vote) => vote.verify(),
CrdsValue::LeaderId(leader_id) => leader_id.verify(),
}
}
fn pubkey(&self) -> Pubkey {
match self {
CrdsValue::ContactInfo(contact_info) => contact_info.pubkey(),
CrdsValue::Vote(vote) => vote.pubkey(),
CrdsValue::LeaderId(leader_id) => leader_id.pubkey(),
}
}
fn signable_data(&self) -> Vec<u8> {
unimplemented!()
}
fn get_signature(&self) -> Signature {
unimplemented!()
}
fn set_signature(&mut self, _: Signature) {
unimplemented!()
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
use contact_info::ContactInfo; use contact_info::ContactInfo;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::timing::timestamp;
use system_transaction::test_tx; use system_transaction::test_tx;
#[test] #[test]
@ -134,14 +260,21 @@ mod test {
let key = v.clone().contact_info().unwrap().id; let key = v.clone().contact_info().unwrap().id;
assert_eq!(v.label(), CrdsValueLabel::ContactInfo(key)); assert_eq!(v.label(), CrdsValueLabel::ContactInfo(key));
let v = CrdsValue::Vote(Vote { let v = CrdsValue::Vote(Vote::new(test_tx(), 1, 0));
transaction: test_tx(),
height: 1,
wallclock: 0,
});
assert_eq!(v.wallclock(), 0); assert_eq!(v.wallclock(), 0);
let key = v.clone().vote().unwrap().transaction.account_keys[0]; let key = v.clone().vote().unwrap().transaction.account_keys[0];
assert_eq!(v.label(), CrdsValueLabel::Vote(key)); assert_eq!(v.label(), CrdsValueLabel::Vote(key));
} }
#[test]
fn test_signature() {
let keypair = Keypair::new();
let fake_keypair = Keypair::new();
let leader = LeaderId::new(keypair.pubkey(), Pubkey::default(), timestamp());
let mut v = CrdsValue::LeaderId(leader);
v.sign(&keypair);
assert!(v.verify());
v.sign(&fake_keypair);
assert!(!v.verify());
}
} }

View File

@ -232,7 +232,10 @@ impl Fullnode {
let window = new_window(32 * 1024); let window = new_window(32 * 1024);
let shared_window = Arc::new(RwLock::new(window)); let shared_window = Arc::new(RwLock::new(window));
node.info.wallclock = timestamp(); node.info.wallclock = timestamp();
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(node.info))); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_keypair(
node.info,
keypair.clone(),
)));
let (rpc_service, rpc_pubsub_service) = let (rpc_service, rpc_pubsub_service) =
Self::startup_rpc_services(rpc_addr, rpc_pubsub_addr, &bank, &cluster_info); Self::startup_rpc_services(rpc_addr, rpc_pubsub_addr, &bank, &cluster_info);

View File

@ -11,6 +11,7 @@ use solana::ncp::Ncp;
use solana::packet::{Blob, SharedBlob}; use solana::packet::{Blob, SharedBlob};
use solana::result; use solana::result;
use solana::service::Service; use solana::service::Service;
use solana::signature::{Keypair, KeypairUtil};
use solana_sdk::timing::timestamp; use solana_sdk::timing::timestamp;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
@ -19,8 +20,9 @@ use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<ClusterInfo>>, Ncp, UdpSocket) { fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<ClusterInfo>>, Ncp, UdpSocket) {
let mut tn = Node::new_localhost(); let keypair = Keypair::new();
let cluster_info = ClusterInfo::new(tn.info.clone()); let mut tn = Node::new_localhost_with_pubkey(keypair.pubkey());
let cluster_info = ClusterInfo::new_with_keypair(tn.info.clone(), Arc::new(keypair));
let c = Arc::new(RwLock::new(cluster_info)); let c = Arc::new(RwLock::new(cluster_info));
let w = Arc::new(RwLock::new(vec![])); let w = Arc::new(RwLock::new(vec![]));
let d = Ncp::new(&c.clone(), w, None, tn.sockets.gossip, exit); let d = Ncp::new(&c.clone(), w, None, tn.sockets.gossip, exit);

View File

@ -42,12 +42,13 @@ use std::thread::{sleep, Builder, JoinHandle};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc<RwLock<ClusterInfo>>, Pubkey) { fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc<RwLock<ClusterInfo>>, Pubkey) {
let keypair = Keypair::new();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let mut spy = Node::new_localhost(); let mut spy = Node::new_localhost_with_pubkey(keypair.pubkey());
let me = spy.info.id.clone(); let me = spy.info.id.clone();
let daddr = "0.0.0.0:0".parse().unwrap(); let daddr = "0.0.0.0:0".parse().unwrap();
spy.info.tvu = daddr; spy.info.tvu = daddr;
let mut spy_cluster_info = ClusterInfo::new(spy.info); let mut spy_cluster_info = ClusterInfo::new_with_keypair(spy.info, Arc::new(keypair));
spy_cluster_info.insert_info(leader.clone()); spy_cluster_info.insert_info(leader.clone());
spy_cluster_info.set_leader(leader.id); spy_cluster_info.set_leader(leader.id);
let spy_cluster_info_ref = Arc::new(RwLock::new(spy_cluster_info)); let spy_cluster_info_ref = Arc::new(RwLock::new(spy_cluster_info));
@ -64,11 +65,12 @@ fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc<RwLock<ClusterInfo>>, Pubkey) {
} }
fn make_listening_node(leader: &NodeInfo) -> (Ncp, Arc<RwLock<ClusterInfo>>, Node, Pubkey) { fn make_listening_node(leader: &NodeInfo) -> (Ncp, Arc<RwLock<ClusterInfo>>, Node, Pubkey) {
let keypair = Keypair::new();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let new_node = Node::new_localhost(); let new_node = Node::new_localhost_with_pubkey(keypair.pubkey());
let new_node_info = new_node.info.clone(); let new_node_info = new_node.info.clone();
let me = new_node.info.id.clone(); let me = new_node.info.id.clone();
let mut new_node_cluster_info = ClusterInfo::new(new_node_info); let mut new_node_cluster_info = ClusterInfo::new_with_keypair(new_node_info, Arc::new(keypair));
new_node_cluster_info.insert_info(leader.clone()); new_node_cluster_info.insert_info(leader.clone());
new_node_cluster_info.set_leader(leader.id); new_node_cluster_info.set_leader(leader.id);
let new_node_cluster_info_ref = Arc::new(RwLock::new(new_node_cluster_info)); let new_node_cluster_info_ref = Arc::new(RwLock::new(new_node_cluster_info));