From 4ee212ae4c11c7c7ec2b4333c419e8e0de233aae Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Thu, 15 Aug 2019 17:04:45 -0700 Subject: [PATCH] Coalesce gossip pull requests and serve them in batches (#5501) * Coalesce gossip pull requests and serve them in batches * batch all filters and immediately respond to messages in gossip * Fix tests * make download_from_replicator perform a greedy recv --- core/src/cluster_info.rs | 250 ++++++++++++++++++----------------- core/src/crds_gossip.rs | 9 +- core/src/crds_gossip_pull.rs | 66 ++++----- core/src/replicator.rs | 5 +- core/tests/crds_gossip.rs | 20 ++- 5 files changed, 187 insertions(+), 163 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 91de03c797..86b9df8a7c 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -19,7 +19,7 @@ use crate::crds_gossip::CrdsGossip; use crate::crds_gossip_error::CrdsGossipError; use crate::crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}; use crate::crds_value::{CrdsValue, CrdsValueLabel, EpochSlots, Vote}; -use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE}; +use crate::packet::{to_shared_blob, SharedBlob, BLOB_SIZE}; use crate::repair_service::RepairType; use crate::result::Result; use crate::staking_utils; @@ -151,6 +151,12 @@ impl Signable for PruneData { } } +struct PullData { + pub from_addr: SocketAddr, + pub caller: CrdsValue, + pub filter: CrdsFilter, +} + // TODO These messages should go through the gpu pipeline for spam filtering #[derive(Serialize, Deserialize, Debug)] #[allow(clippy::large_enum_variant)] @@ -1098,60 +1104,138 @@ impl ClusterInfo { res } - //TODO we should first coalesce all the requests - fn handle_blob( - obj: &Arc>, + fn handle_blobs( + me: &Arc>, blocktree: Option<&Arc>, stakes: &HashMap, - blob: &Blob, - ) -> Vec { - deserialize(&blob.data[..blob.meta.size]) - .into_iter() - .flat_map(|request| { - ClusterInfo::handle_protocol(obj, &blob.meta.addr(), blocktree, stakes, request) - }) - .collect() + blobs: &[SharedBlob], + response_sender: &BlobSender, + ) { + // iter over the blobs, collect pulls separately and process everything else + let mut gossip_pull_data: Vec = vec![]; + blobs.iter().for_each(|blob| { + let blob = blob.read().unwrap(); + let from_addr = blob.meta.addr(); + deserialize(&blob.data[..blob.meta.size]) + .into_iter() + .for_each(|request| match request { + Protocol::PullRequest(filter, caller) => { + if !caller.verify() { + inc_new_counter_error!( + "cluster_info-gossip_pull_request_verify_fail", + 1 + ); + } else if caller.contact_info().is_some() { + if caller.contact_info().unwrap().pubkey() + == me.read().unwrap().gossip.id + { + warn!("PullRequest ignored, I'm talking to myself"); + inc_new_counter_debug!("cluster_info-window-request-loopback", 1); + } else { + gossip_pull_data.push(PullData { + from_addr, + caller, + filter, + }); + } + } + } + Protocol::PullResponse(from, mut data) => { + data.retain(|v| { + let ret = v.verify(); + if !ret { + inc_new_counter_error!( + "cluster_info-gossip_pull_response_verify_fail", + 1 + ); + } + ret + }); + Self::handle_pull_response(me, &from, data); + } + Protocol::PushMessage(from, mut data) => { + data.retain(|v| { + let ret = v.verify(); + if !ret { + inc_new_counter_error!( + "cluster_info-gossip_push_msg_verify_fail", + 1 + ); + } + ret + }); + let _ignore_disconnect = response_sender + .send(Self::handle_push_message(me, &from, data, stakes)); + } + Protocol::PruneMessage(from, data) => { + if data.verify() { + inc_new_counter_debug!("cluster_info-prune_message", 1); + inc_new_counter_debug!( + "cluster_info-prune_message-size", + data.prunes.len() + ); + match me.write().unwrap().gossip.process_prune_msg( + &from, + &data.destination, + &data.prunes, + data.wallclock, + timestamp(), + ) { + Err(CrdsGossipError::PruneMessageTimeout) => { + inc_new_counter_debug!("cluster_info-prune_message_timeout", 1) + } + Err(CrdsGossipError::BadPruneDestination) => { + inc_new_counter_debug!("cluster_info-bad_prune_destination", 1) + } + _ => (), + } + } else { + inc_new_counter_debug!("cluster_info-gossip_prune_msg_verify_fail", 1); + } + } + _ => { + let _ignore_disconnect = response_sender + .send(Self::handle_repair(me, &from_addr, blocktree, request)); + } + }) + }); + // process the collected pulls together + let _ignore_disconnect = + response_sender.send(Self::handle_pull_requests(me, gossip_pull_data)); } - fn handle_pull_request( - me: &Arc>, - filter: CrdsFilter, - caller: CrdsValue, - from_addr: &SocketAddr, - ) -> Vec { - let self_id = me.read().unwrap().gossip.id; - inc_new_counter_debug!("cluster_info-pull_request", 1); - if caller.contact_info().is_none() { - return vec![]; - } - let from = caller.contact_info().unwrap(); - if from.id == self_id { - warn!( - "PullRequest ignored, I'm talking to myself: me={} remoteme={}", - self_id, from.id - ); - inc_new_counter_debug!("cluster_info-window-request-loopback", 1); - return vec![]; + fn handle_pull_requests(me: &Arc>, requests: Vec) -> Vec { + // split the requests into addrs and filters + let mut caller_and_filters = vec![]; + let mut addrs = vec![]; + for pull_data in requests { + caller_and_filters.push((pull_data.caller, pull_data.filter)); + addrs.push(pull_data.from_addr); } let now = timestamp(); - let data = me + let self_id = me.read().unwrap().id(); + let pull_responses = me .write() .unwrap() .gossip - .process_pull_request(caller, filter, now); - let len = data.len(); - trace!("get updates since response {}", len); - let responses: Vec<_> = Self::split_gossip_messages(data) + .process_pull_requests(caller_and_filters, now); + pull_responses .into_iter() - .map(move |payload| Protocol::PullResponse(self_id, payload)) - .collect(); - // The remote node may not know its public IP:PORT. Instead of responding to the caller's - // gossip addr, respond to the origin addr. - inc_new_counter_debug!("cluster_info-pull_request-rsp", len); - responses - .into_iter() - .map(|rsp| to_shared_blob(rsp, *from_addr).ok().into_iter()) - .flatten() + .zip(addrs.into_iter()) + .flat_map(|(response, from_addr)| { + let len = response.len(); + trace!("get updates since response {}", len); + inc_new_counter_debug!("cluster_info-pull_request-rsp", len); + Self::split_gossip_messages(response) + .into_iter() + .filter_map(move |payload| { + let protocol = Protocol::PullResponse(self_id, payload); + // The remote node may not know its public IP:PORT. Instead of responding to the caller's + // gossip addr, respond to the origin addr. The last origin addr is picked from the list of + // addrs. + to_shared_blob(protocol, from_addr).ok() + }) + }) .collect() } @@ -1312,73 +1396,6 @@ impl ClusterInfo { res } - fn handle_protocol( - me: &Arc>, - from_addr: &SocketAddr, - blocktree: Option<&Arc>, - stakes: &HashMap, - request: Protocol, - ) -> Vec { - match request { - // TODO verify messages faster - Protocol::PullRequest(filter, caller) => { - if !caller.verify() { - inc_new_counter_error!("cluster_info-gossip_pull_request_verify_fail", 1); - vec![] - } else { - Self::handle_pull_request(me, filter, caller, from_addr) - } - } - Protocol::PullResponse(from, mut data) => { - data.retain(|v| { - let ret = v.verify(); - if !ret { - inc_new_counter_error!("cluster_info-gossip_pull_response_verify_fail", 1); - } - ret - }); - Self::handle_pull_response(me, &from, data); - vec![] - } - Protocol::PushMessage(from, mut data) => { - data.retain(|v| { - let ret = v.verify(); - if !ret { - inc_new_counter_error!("cluster_info-gossip_push_msg_verify_fail", 1); - } - ret - }); - Self::handle_push_message(me, &from, data, stakes) - } - Protocol::PruneMessage(from, data) => { - if data.verify() { - inc_new_counter_debug!("cluster_info-prune_message", 1); - inc_new_counter_debug!("cluster_info-prune_message-size", data.prunes.len()); - match me.write().unwrap().gossip.process_prune_msg( - &from, - &data.destination, - &data.prunes, - data.wallclock, - timestamp(), - ) { - Err(CrdsGossipError::PruneMessageTimeout) => { - inc_new_counter_debug!("cluster_info-prune_message_timeout", 1) - } - Err(CrdsGossipError::BadPruneDestination) => { - inc_new_counter_debug!("cluster_info-bad_prune_destination", 1) - } - Err(_) => (), - Ok(_) => (), - } - } else { - inc_new_counter_debug!("cluster_info-gossip_prune_msg_verify_fail", 1); - } - vec![] - } - _ => Self::handle_repair(me, from_addr, blocktree, request), - } - } - /// Process messages from the network fn run_listen( obj: &Arc>, @@ -1393,7 +1410,6 @@ impl ClusterInfo { while let Ok(mut more) = requests_receiver.try_recv() { reqs.append(&mut more); } - let mut resps = Vec::new(); let stakes: HashMap<_, _> = match bank_forks { Some(ref bank_forks) => { @@ -1402,11 +1418,7 @@ impl ClusterInfo { None => HashMap::new(), }; - for req in reqs { - let mut resp = Self::handle_blob(obj, blocktree, &stakes, &req.read().unwrap()); - resps.append(&mut resp); - } - response_sender.send(resps)?; + Self::handle_blobs(obj, blocktree, &stakes, &reqs, response_sender); Ok(()) } pub fn listen( @@ -1712,7 +1724,7 @@ mod tests { use crate::blocktree::Blocktree; use crate::crds_value::CrdsValueLabel; use crate::erasure::ErasureConfig; - use crate::packet::BLOB_HEADER_SIZE; + use crate::packet::{Blob, BLOB_HEADER_SIZE}; use crate::repair_service::RepairType; use crate::result::Error; use crate::test_tx::test_tx; diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 88feab7af7..c013c817e7 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -145,14 +145,13 @@ impl CrdsGossip { self.pull.mark_pull_request_creation_time(from, now) } /// process a pull request and create a response - pub fn process_pull_request( + pub fn process_pull_requests( &mut self, - caller: CrdsValue, - filter: CrdsFilter, + filters: Vec<(CrdsValue, CrdsFilter)>, now: u64, - ) -> Vec { + ) -> Vec> { self.pull - .process_pull_request(&mut self.crds, caller, filter, now) + .process_pull_requests(&mut self.crds, filters, now) } /// process a pull response pub fn process_pull_response( diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 65073cce2c..dcc092a1cd 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -191,21 +191,22 @@ impl CrdsGossipPull { } /// process a pull request and create a response - pub fn process_pull_request( + pub fn process_pull_requests( &mut self, crds: &mut Crds, - caller: CrdsValue, - filter: CrdsFilter, + requests: Vec<(CrdsValue, CrdsFilter)>, now: u64, - ) -> Vec { - let rv = self.filter_crds_values(crds, &filter); - let key = caller.label().pubkey(); - let old = crds.insert(caller, now); - if let Some(val) = old.ok().and_then(|opt| opt) { - self.purged_values - .push_back((val.value_hash, val.local_timestamp)); - } - crds.update_record_timestamp(&key, now); + ) -> Vec> { + let rv = self.filter_crds_values(crds, &requests); + requests.into_iter().for_each(|(caller, _)| { + let key = caller.label().pubkey(); + let old = crds.insert(caller, now); + if let Some(val) = old.ok().and_then(|opt| opt) { + self.purged_values + .push_back((val.value_hash, val.local_timestamp)); + } + crds.update_record_timestamp(&key, now); + }); rv } /// process a pull response @@ -251,13 +252,18 @@ impl CrdsGossipPull { filters } /// filter values that fail the bloom filter up to max_bytes - fn filter_crds_values(&self, crds: &Crds, filter: &CrdsFilter) -> Vec { - let mut ret = vec![]; + fn filter_crds_values( + &self, + crds: &Crds, + filters: &[(CrdsValue, CrdsFilter)], + ) -> Vec> { + let mut ret = vec![vec![]; filters.len()]; for v in crds.table.values() { - if filter.contains(&v.value_hash) { - continue; - } - ret.push(v.value.clone()); + filters.iter().enumerate().for_each(|(i, (_, filter))| { + if !filter.contains(&v.value_hash) { + ret[i].push(v.value.clone()); + } + }); } ret } @@ -395,10 +401,9 @@ mod test { let mut dest_crds = Crds::default(); let mut dest = CrdsGossipPull::default(); let (_, filters, caller) = req.unwrap(); - for filter in filters.into_iter() { - let rsp = dest.process_pull_request(&mut dest_crds, caller.clone(), filter, 1); - assert!(rsp.is_empty()); - } + let filters = filters.into_iter().map(|f| (caller.clone(), f)).collect(); + let rsp = dest.process_pull_requests(&mut dest_crds, filters, 1); + assert!(rsp.iter().all(|rsp| rsp.is_empty())); assert!(dest_crds.lookup(&caller.label()).is_some()); assert_eq!( dest_crds @@ -455,21 +460,20 @@ mod test { PACKET_DATA_SIZE, ); let (_, filters, caller) = req.unwrap(); - let mut rsp = vec![]; - for filter in filters { - rsp = dest.process_pull_request(&mut dest_crds, caller.clone(), filter, 0); - // if there is a false positive this is empty - // prob should be around 0.1 per iteration - if rsp.is_empty() { - continue; - } + let filters = filters.into_iter().map(|f| (caller.clone(), f)).collect(); + let mut rsp = dest.process_pull_requests(&mut dest_crds, filters, 0); + // if there is a false positive this is empty + // prob should be around 0.1 per iteration + if rsp.is_empty() { + continue; } if rsp.is_empty() { continue; } assert_eq!(rsp.len(), 1); - let failed = node.process_pull_response(&mut node_crds, &node_pubkey, rsp, 1); + let failed = + node.process_pull_response(&mut node_crds, &node_pubkey, rsp.pop().unwrap(), 1); assert_eq!(failed, 0); assert_eq!( node_crds diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 3f76841d11..249d935d42 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -846,7 +846,10 @@ impl Replicator { } } let res = r_reader.recv_timeout(Duration::new(1, 0)); - if let Ok(blobs) = res { + if let Ok(mut blobs) = res { + while let Ok(mut more) = r_reader.try_recv() { + blobs.append(&mut more); + } window_service::process_blobs(&blobs, blocktree)?; } // check if all the slots in the segment are complete diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 55da346e22..bb1a158c9c 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -399,17 +399,23 @@ fn network_run_pull( .map(|f| f.filter.bits.len() as usize / 8) .sum::(); bytes += serialized_size(&caller_info).unwrap() as usize; + let filters = filters + .into_iter() + .map(|f| (caller_info.clone(), f)) + .collect(); let rsp = network .get(&to) .map(|node| { let mut rsp = vec![]; - for filter in filters { - rsp.append(&mut node.lock().unwrap().process_pull_request( - caller_info.clone(), - filter, - now, - )); - } + rsp.append( + &mut node + .lock() + .unwrap() + .process_pull_requests(filters, now) + .into_iter() + .flatten() + .collect(), + ); rsp }) .unwrap();