effeciently pack gossip responsens and only respond up to max size. (#1493)

This commit is contained in:
anatoly yakovenko 2018-10-14 06:45:02 -07:00 committed by GitHub
parent d3b4dfe104
commit 6aaa350145
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 141 additions and 68 deletions

View File

@ -12,7 +12,7 @@
//! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes.
//!
//! Bank needs to provide an interface for us to query the stake weight
use bincode::{deserialize, serialize};
use bincode::{deserialize, serialize, serialized_size};
use budget_instruction::Vote;
use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerStrategy};
use counter::Counter;
@ -228,6 +228,7 @@ pub struct ClusterInfo {
// TODO These messages should be signed, and go through the gpu pipeline for spam filtering
#[derive(Serialize, Deserialize, Debug)]
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
enum Protocol {
/// forward your own latest data structure when requesting an update
/// this doesn't update the `remote` update index, but it allows the
@ -235,8 +236,14 @@ enum Protocol {
/// (last update index i saw from you, my replicated data)
RequestUpdates(u64, NodeInfo),
//TODO might need a since?
/// from id, form's last update index, NodeInfo
ReceiveUpdates(Pubkey, u64, Vec<NodeInfo>, Vec<(Pubkey, u64)>),
/// * from - from id,
/// * max_update_index - from's max update index in the response
/// * nodes - (NodeInfo, remote_last_update) vector
ReceiveUpdates {
from: Pubkey,
max_update_index: u64,
nodes: Vec<(NodeInfo, u64)>,
},
/// ask for a missing index
/// (my replicated data to keep alive, missing window index)
RequestWindowIndex(NodeInfo, u64),
@ -702,17 +709,43 @@ impl ClusterInfo {
1.0
}
fn get_updates_since(&self, v: u64) -> (Pubkey, u64, Vec<NodeInfo>) {
//trace!("get updates since {}", v);
let data = self
fn max_updates(max_bytes: usize) -> usize {
let unit = (NodeInfo::new_localhost(Default::default()), 0);
let unit_size = serialized_size(&unit).unwrap();
let msg = Protocol::ReceiveUpdates {
from: Default::default(),
max_update_index: 0,
nodes: vec![unit],
};
let msg_size = serialized_size(&msg).unwrap();
((max_bytes - (msg_size as usize)) / (unit_size as usize)) + 1
}
// Get updated node since v up to a maximum of `max_bytes` updates
fn get_updates_since(&self, v: u64, max: usize) -> (Pubkey, u64, Vec<(NodeInfo, u64)>) {
let nodes: Vec<_> = self
.table
.values()
.filter(|x| x.id != Pubkey::default() && self.local[&x.id] > v)
.cloned()
.collect();
let liveness: Vec<u64> = nodes
.iter()
.map(|d| *self.remote.get(&d.id).unwrap_or(&0))
.collect();
let updates: Vec<u64> = nodes.iter().map(|d| self.local[&d.id]).collect();
trace!("{:?}", updates);
let id = self.id;
let ups = self.update_index;
(id, ups, data)
let mut out: Vec<(u64, (NodeInfo, u64))> = updates
.into_iter()
.zip(nodes.into_iter().zip(liveness))
.collect();
out.sort_by_key(|k| k.0);
let last_node = std::cmp::max(1, std::cmp::min(out.len(), max)) - 1;
let max_updated_node = out.get(last_node).map(|x| x.0).unwrap_or(0);
let updated_data: Vec<(NodeInfo, u64)> = out.into_iter().take(max).map(|x| x.1).collect();
trace!("get updates since response {} {}", v, updated_data.len());
(id, max_updated_node, updated_data)
}
pub fn valid_last_ids(&self) -> Vec<Hash> {
@ -852,24 +885,19 @@ impl ClusterInfo {
/// * `from` - identity of the sender of the updates
/// * `update_index` - the number of updates that `from` has completed and this set of `data` represents
/// * `data` - the update data
fn apply_updates(
&mut self,
from: Pubkey,
update_index: u64,
data: &[NodeInfo],
external_liveness: &[(Pubkey, u64)],
) {
fn apply_updates(&mut self, from: Pubkey, update_index: u64, data: &[(NodeInfo, u64)]) {
trace!("got updates {}", data.len());
// TODO we need to punish/spam resist here
// sigverify the whole update and slash anyone who sends a bad update
let mut insert_total = 0;
for v in data {
insert_total += self.insert(&v);
insert_total += self.insert(&v.0);
}
inc_new_counter_info!("cluster_info-update-count", insert_total);
for (pubkey, external_remote_index) in external_liveness {
let remote_entry = if let Some(v) = self.remote.get(pubkey) {
for (node, external_remote_index) in data {
let pubkey = node.id;
let remote_entry = if let Some(v) = self.remote.get(&pubkey) {
*v
} else {
0
@ -881,7 +909,7 @@ impl ClusterInfo {
let liveness_entry = self
.external_liveness
.entry(*pubkey)
.entry(pubkey)
.or_insert_with(HashMap::new);
let peer_index = *liveness_entry.entry(from).or_insert(*external_remote_index);
if *external_remote_index > peer_index {
@ -1055,20 +1083,8 @@ impl ClusterInfo {
inc_new_counter_info!("cluster_info-window-request-updates-unspec-ncp", 1);
from.contact_info.ncp = *from_addr;
}
let (from_id, ups, data, liveness) = {
let me = me.read().unwrap();
// only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from`
let (from_id, ups, data) = me.get_updates_since(version);
(
from_id,
ups,
data,
me.remote.iter().map(|(k, v)| (*k, *v)).collect(),
)
};
let max = Self::max_updates(1024 * 64 - 512);
let (from_id, ups, data) = me.read().unwrap().get_updates_since(version, max);
// update entry only after collecting liveness
{
@ -1077,10 +1093,10 @@ impl ClusterInfo {
me.update_liveness(from.id);
}
trace!("get updates since response {} {}", version, data.len());
let len = data.len();
trace!("get updates since response {} {}", version, len);
if len < 1 {
if data.is_empty() {
let me = me.read().unwrap();
trace!(
"no updates me {} ix {} since {}",
@ -1090,7 +1106,11 @@ impl ClusterInfo {
);
None
} else {
let rsp = Protocol::ReceiveUpdates(from_id, ups, data, liveness);
let rsp = Protocol::ReceiveUpdates {
from: from_id,
max_update_index: ups,
nodes: data,
};
if let Ok(r) = to_blob(rsp, from.contact_info.ncp) {
trace!(
@ -1107,22 +1127,26 @@ impl ClusterInfo {
}
}
}
Protocol::ReceiveUpdates(from, update_index, data, external_liveness) => {
Protocol::ReceiveUpdates {
from,
max_update_index,
nodes,
} => {
let now = Instant::now();
trace!(
"ReceivedUpdates from={} update_index={} len={}",
from,
update_index,
data.len()
max_update_index,
nodes.len()
);
me.write()
.expect("'me' write lock in ReceiveUpdates")
.apply_updates(from, update_index, &data, &external_liveness);
.apply_updates(from, max_update_index, &nodes);
report_time_spent(
"ReceiveUpdates",
&now.elapsed(),
&format!(" len: {}", data.len()),
&format!(" len: {}", nodes.len()),
);
None
}
@ -1361,6 +1385,7 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) {
#[cfg(test)]
mod tests {
use bincode::serialize;
use budget_instruction::Vote;
use cluster_info::{
ClusterInfo, ClusterInfoError, Node, NodeInfo, Protocol, FULLNODE_PORT_RANGE,
@ -1465,9 +1490,9 @@ mod tests {
//should be accepted, since the update is for the same address field as the one we know
assert_eq!(cluster_info.table[&d.id].version, 1);
}
fn sorted(ls: &Vec<NodeInfo>) -> Vec<NodeInfo> {
fn sorted(ls: &Vec<(NodeInfo, u64)>) -> Vec<(NodeInfo, u64)> {
let mut copy: Vec<_> = ls.iter().cloned().collect();
copy.sort_by(|x, y| x.id.cmp(&y.id));
copy.sort_by(|x, y| x.0.id.cmp(&y.0.id));
copy
}
#[test]
@ -1484,42 +1509,92 @@ mod tests {
assert_eq!(d1.contact_info.tpu, socketaddr!("127.0.0.1:1234"));
}
#[test]
fn max_updates() {
let size = 1024 * 64 - 512;
let num = ClusterInfo::max_updates(size);
let msg = Protocol::ReceiveUpdates {
from: Default::default(),
max_update_index: 0,
nodes: vec![(NodeInfo::new_unspecified(), 0); num],
};
trace!("{} {} {}", serialize(&msg).unwrap().len(), size, num);
assert!(serialize(&msg).unwrap().len() <= size);
}
#[test]
fn update_test() {
let d1 = NodeInfo::new_localhost(Keypair::new().pubkey());
let d2 = NodeInfo::new_localhost(Keypair::new().pubkey());
let d3 = NodeInfo::new_localhost(Keypair::new().pubkey());
let mut cluster_info = ClusterInfo::new(d1.clone()).expect("ClusterInfo::new");
let (key, ix, ups) = cluster_info.get_updates_since(0);
let (key, ix, ups) = cluster_info.get_updates_since(0, 1);
assert_eq!(key, d1.id);
assert_eq!(ix, 1);
assert_eq!(ups.len(), 1);
assert_eq!(sorted(&ups), sorted(&vec![d1.clone()]));
assert_eq!(sorted(&ups), sorted(&vec![(d1.clone(), 0)]));
cluster_info.insert(&d2);
let (key, ix, ups) = cluster_info.get_updates_since(0);
let (key, ix, ups) = cluster_info.get_updates_since(0, 2);
assert_eq!(key, d1.id);
assert_eq!(ix, 2);
assert_eq!(ups.len(), 2);
assert_eq!(sorted(&ups), sorted(&vec![d1.clone(), d2.clone()]));
assert_eq!(
sorted(&ups),
sorted(&vec![(d1.clone(), 0), (d2.clone(), 0)])
);
cluster_info.insert(&d3);
let (key, ix, ups) = cluster_info.get_updates_since(0);
let (key, ix, ups) = cluster_info.get_updates_since(0, 3);
assert_eq!(key, d1.id);
assert_eq!(ix, 3);
assert_eq!(ups.len(), 3);
assert_eq!(
sorted(&ups),
sorted(&vec![d1.clone(), d2.clone(), d3.clone()])
sorted(&vec![(d1.clone(), 0), (d2.clone(), 0), (d3.clone(), 0)])
);
let mut cluster_info2 = ClusterInfo::new(d2.clone()).expect("ClusterInfo::new");
cluster_info2.apply_updates(key, ix, &ups, &vec![]);
cluster_info2.apply_updates(key, ix, &ups);
assert_eq!(cluster_info2.table.values().len(), 3);
assert_eq!(
sorted(&cluster_info2.table.values().map(|x| x.clone()).collect()),
sorted(&cluster_info.table.values().map(|x| x.clone()).collect())
sorted(
&cluster_info2
.table
.values()
.map(|x| (x.clone(), 0))
.collect()
),
sorted(
&cluster_info
.table
.values()
.map(|x| (x.clone(), 0))
.collect()
)
);
let d4 = NodeInfo::new_entry_point(&socketaddr!("127.0.0.4:1234"));
cluster_info.insert(&d4);
let (_key, _ix, ups) = cluster_info.get_updates_since(0);
assert_eq!(sorted(&ups), sorted(&vec![d2.clone(), d1, d3]));
let (_key, ix, ups) = cluster_info.get_updates_since(0, 3);
assert_eq!(
sorted(&ups),
sorted(&vec![(d2.clone(), 0), (d1.clone(), 0), (d3.clone(), 0)])
);
assert_eq!(ix, 3);
let (_key, ix, ups) = cluster_info.get_updates_since(0, 2);
assert_eq!(
sorted(&ups),
sorted(&vec![(d2.clone(), 0), (d1.clone(), 0)])
);
assert_eq!(ix, 2);
let (_key, ix, ups) = cluster_info.get_updates_since(0, 1);
assert_eq!(sorted(&ups), sorted(&vec![(d1.clone(), 0)]));
assert_eq!(ix, 1);
let (_key, ix, ups) = cluster_info.get_updates_since(1, 3);
assert_eq!(ups.len(), 2);
assert_eq!(ix, 3);
assert_eq!(sorted(&ups), sorted(&vec![(d2, 0), (d3, 0)]));
let (_key, ix, ups) = cluster_info.get_updates_since(3, 3);
assert_eq!(ups.len(), 0);
assert_eq!(ix, 0);
}
#[test]
fn window_index_request() {

View File

@ -651,30 +651,28 @@ fn test_multi_node_dynamic_network() {
let t1: Vec<_> = (0..num_nodes)
.into_iter()
.map(|n| {
let leader_data = leader_data.clone();
let alice_clone = alice_arc.clone();
Builder::new()
.name("keypair-thread".to_string())
.spawn(move || {
info!("Spawned thread {}", n);
let keypair = Keypair::new();
//send some tokens to the new validators
let bal = retry_send_tx_and_retry_get_balance(
&leader_data,
&alice_clone.read().unwrap(),
&keypair.pubkey(),
Some(500),
);
assert_eq!(bal, Some(500));
info!("sent balance to[{}/{}] {}", n, num_nodes, keypair.pubkey());
keypair
Keypair::new()
}).unwrap()
}).collect();
info!("Waiting for keypairs to be created");
let keypairs: Vec<_> = t1.into_iter().map(|t| t.join().unwrap()).collect();
info!("keypairs created");
keypairs.iter().enumerate().for_each(|(n, keypair)| {
//send some tokens to the new validators
let bal = retry_send_tx_and_retry_get_balance(
&leader_data,
&alice_arc.read().unwrap(),
&keypair.pubkey(),
Some(500),
);
assert_eq!(bal, Some(500));
info!("sent balance to [{}/{}] {}", n, num_nodes, keypair.pubkey());
});
let t2: Vec<_> = keypairs
.into_iter()
.map(|keypair| {