Coalesce gossip pull requests and serve them in batches (#5501)
* Coalesce gossip pull requests and serve them in batches * batch all filters and immediately respond to messages in gossip * Fix tests * make download_from_replicator perform a greedy recv
This commit is contained in:
parent
d5fb493aa4
commit
4ee212ae4c
|
@ -19,7 +19,7 @@ use crate::crds_gossip::CrdsGossip;
|
|||
use crate::crds_gossip_error::CrdsGossipError;
|
||||
use crate::crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS};
|
||||
use crate::crds_value::{CrdsValue, CrdsValueLabel, EpochSlots, Vote};
|
||||
use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE};
|
||||
use crate::packet::{to_shared_blob, SharedBlob, BLOB_SIZE};
|
||||
use crate::repair_service::RepairType;
|
||||
use crate::result::Result;
|
||||
use crate::staking_utils;
|
||||
|
@ -151,6 +151,12 @@ impl Signable for PruneData {
|
|||
}
|
||||
}
|
||||
|
||||
struct PullData {
|
||||
pub from_addr: SocketAddr,
|
||||
pub caller: CrdsValue,
|
||||
pub filter: CrdsFilter,
|
||||
}
|
||||
|
||||
// TODO These messages should go through the gpu pipeline for spam filtering
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
|
@ -1098,60 +1104,138 @@ impl ClusterInfo {
|
|||
res
|
||||
}
|
||||
|
||||
//TODO we should first coalesce all the requests
|
||||
fn handle_blob(
|
||||
obj: &Arc<RwLock<Self>>,
|
||||
fn handle_blobs(
|
||||
me: &Arc<RwLock<Self>>,
|
||||
blocktree: Option<&Arc<Blocktree>>,
|
||||
stakes: &HashMap<Pubkey, u64>,
|
||||
blob: &Blob,
|
||||
) -> Vec<SharedBlob> {
|
||||
deserialize(&blob.data[..blob.meta.size])
|
||||
.into_iter()
|
||||
.flat_map(|request| {
|
||||
ClusterInfo::handle_protocol(obj, &blob.meta.addr(), blocktree, stakes, request)
|
||||
})
|
||||
.collect()
|
||||
blobs: &[SharedBlob],
|
||||
response_sender: &BlobSender,
|
||||
) {
|
||||
// iter over the blobs, collect pulls separately and process everything else
|
||||
let mut gossip_pull_data: Vec<PullData> = vec![];
|
||||
blobs.iter().for_each(|blob| {
|
||||
let blob = blob.read().unwrap();
|
||||
let from_addr = blob.meta.addr();
|
||||
deserialize(&blob.data[..blob.meta.size])
|
||||
.into_iter()
|
||||
.for_each(|request| match request {
|
||||
Protocol::PullRequest(filter, caller) => {
|
||||
if !caller.verify() {
|
||||
inc_new_counter_error!(
|
||||
"cluster_info-gossip_pull_request_verify_fail",
|
||||
1
|
||||
);
|
||||
} else if caller.contact_info().is_some() {
|
||||
if caller.contact_info().unwrap().pubkey()
|
||||
== me.read().unwrap().gossip.id
|
||||
{
|
||||
warn!("PullRequest ignored, I'm talking to myself");
|
||||
inc_new_counter_debug!("cluster_info-window-request-loopback", 1);
|
||||
} else {
|
||||
gossip_pull_data.push(PullData {
|
||||
from_addr,
|
||||
caller,
|
||||
filter,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
Protocol::PullResponse(from, mut data) => {
|
||||
data.retain(|v| {
|
||||
let ret = v.verify();
|
||||
if !ret {
|
||||
inc_new_counter_error!(
|
||||
"cluster_info-gossip_pull_response_verify_fail",
|
||||
1
|
||||
);
|
||||
}
|
||||
ret
|
||||
});
|
||||
Self::handle_pull_response(me, &from, data);
|
||||
}
|
||||
Protocol::PushMessage(from, mut data) => {
|
||||
data.retain(|v| {
|
||||
let ret = v.verify();
|
||||
if !ret {
|
||||
inc_new_counter_error!(
|
||||
"cluster_info-gossip_push_msg_verify_fail",
|
||||
1
|
||||
);
|
||||
}
|
||||
ret
|
||||
});
|
||||
let _ignore_disconnect = response_sender
|
||||
.send(Self::handle_push_message(me, &from, data, stakes));
|
||||
}
|
||||
Protocol::PruneMessage(from, data) => {
|
||||
if data.verify() {
|
||||
inc_new_counter_debug!("cluster_info-prune_message", 1);
|
||||
inc_new_counter_debug!(
|
||||
"cluster_info-prune_message-size",
|
||||
data.prunes.len()
|
||||
);
|
||||
match me.write().unwrap().gossip.process_prune_msg(
|
||||
&from,
|
||||
&data.destination,
|
||||
&data.prunes,
|
||||
data.wallclock,
|
||||
timestamp(),
|
||||
) {
|
||||
Err(CrdsGossipError::PruneMessageTimeout) => {
|
||||
inc_new_counter_debug!("cluster_info-prune_message_timeout", 1)
|
||||
}
|
||||
Err(CrdsGossipError::BadPruneDestination) => {
|
||||
inc_new_counter_debug!("cluster_info-bad_prune_destination", 1)
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
} else {
|
||||
inc_new_counter_debug!("cluster_info-gossip_prune_msg_verify_fail", 1);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
let _ignore_disconnect = response_sender
|
||||
.send(Self::handle_repair(me, &from_addr, blocktree, request));
|
||||
}
|
||||
})
|
||||
});
|
||||
// process the collected pulls together
|
||||
let _ignore_disconnect =
|
||||
response_sender.send(Self::handle_pull_requests(me, gossip_pull_data));
|
||||
}
|
||||
|
||||
fn handle_pull_request(
|
||||
me: &Arc<RwLock<Self>>,
|
||||
filter: CrdsFilter,
|
||||
caller: CrdsValue,
|
||||
from_addr: &SocketAddr,
|
||||
) -> Vec<SharedBlob> {
|
||||
let self_id = me.read().unwrap().gossip.id;
|
||||
inc_new_counter_debug!("cluster_info-pull_request", 1);
|
||||
if caller.contact_info().is_none() {
|
||||
return vec![];
|
||||
}
|
||||
let from = caller.contact_info().unwrap();
|
||||
if from.id == self_id {
|
||||
warn!(
|
||||
"PullRequest ignored, I'm talking to myself: me={} remoteme={}",
|
||||
self_id, from.id
|
||||
);
|
||||
inc_new_counter_debug!("cluster_info-window-request-loopback", 1);
|
||||
return vec![];
|
||||
fn handle_pull_requests(me: &Arc<RwLock<Self>>, requests: Vec<PullData>) -> Vec<SharedBlob> {
|
||||
// split the requests into addrs and filters
|
||||
let mut caller_and_filters = vec![];
|
||||
let mut addrs = vec![];
|
||||
for pull_data in requests {
|
||||
caller_and_filters.push((pull_data.caller, pull_data.filter));
|
||||
addrs.push(pull_data.from_addr);
|
||||
}
|
||||
let now = timestamp();
|
||||
let data = me
|
||||
let self_id = me.read().unwrap().id();
|
||||
let pull_responses = me
|
||||
.write()
|
||||
.unwrap()
|
||||
.gossip
|
||||
.process_pull_request(caller, filter, now);
|
||||
let len = data.len();
|
||||
trace!("get updates since response {}", len);
|
||||
let responses: Vec<_> = Self::split_gossip_messages(data)
|
||||
.process_pull_requests(caller_and_filters, now);
|
||||
pull_responses
|
||||
.into_iter()
|
||||
.map(move |payload| Protocol::PullResponse(self_id, payload))
|
||||
.collect();
|
||||
// The remote node may not know its public IP:PORT. Instead of responding to the caller's
|
||||
// gossip addr, respond to the origin addr.
|
||||
inc_new_counter_debug!("cluster_info-pull_request-rsp", len);
|
||||
responses
|
||||
.into_iter()
|
||||
.map(|rsp| to_shared_blob(rsp, *from_addr).ok().into_iter())
|
||||
.flatten()
|
||||
.zip(addrs.into_iter())
|
||||
.flat_map(|(response, from_addr)| {
|
||||
let len = response.len();
|
||||
trace!("get updates since response {}", len);
|
||||
inc_new_counter_debug!("cluster_info-pull_request-rsp", len);
|
||||
Self::split_gossip_messages(response)
|
||||
.into_iter()
|
||||
.filter_map(move |payload| {
|
||||
let protocol = Protocol::PullResponse(self_id, payload);
|
||||
// The remote node may not know its public IP:PORT. Instead of responding to the caller's
|
||||
// gossip addr, respond to the origin addr. The last origin addr is picked from the list of
|
||||
// addrs.
|
||||
to_shared_blob(protocol, from_addr).ok()
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
|
@ -1312,73 +1396,6 @@ impl ClusterInfo {
|
|||
res
|
||||
}
|
||||
|
||||
fn handle_protocol(
|
||||
me: &Arc<RwLock<Self>>,
|
||||
from_addr: &SocketAddr,
|
||||
blocktree: Option<&Arc<Blocktree>>,
|
||||
stakes: &HashMap<Pubkey, u64>,
|
||||
request: Protocol,
|
||||
) -> Vec<SharedBlob> {
|
||||
match request {
|
||||
// TODO verify messages faster
|
||||
Protocol::PullRequest(filter, caller) => {
|
||||
if !caller.verify() {
|
||||
inc_new_counter_error!("cluster_info-gossip_pull_request_verify_fail", 1);
|
||||
vec![]
|
||||
} else {
|
||||
Self::handle_pull_request(me, filter, caller, from_addr)
|
||||
}
|
||||
}
|
||||
Protocol::PullResponse(from, mut data) => {
|
||||
data.retain(|v| {
|
||||
let ret = v.verify();
|
||||
if !ret {
|
||||
inc_new_counter_error!("cluster_info-gossip_pull_response_verify_fail", 1);
|
||||
}
|
||||
ret
|
||||
});
|
||||
Self::handle_pull_response(me, &from, data);
|
||||
vec![]
|
||||
}
|
||||
Protocol::PushMessage(from, mut data) => {
|
||||
data.retain(|v| {
|
||||
let ret = v.verify();
|
||||
if !ret {
|
||||
inc_new_counter_error!("cluster_info-gossip_push_msg_verify_fail", 1);
|
||||
}
|
||||
ret
|
||||
});
|
||||
Self::handle_push_message(me, &from, data, stakes)
|
||||
}
|
||||
Protocol::PruneMessage(from, data) => {
|
||||
if data.verify() {
|
||||
inc_new_counter_debug!("cluster_info-prune_message", 1);
|
||||
inc_new_counter_debug!("cluster_info-prune_message-size", data.prunes.len());
|
||||
match me.write().unwrap().gossip.process_prune_msg(
|
||||
&from,
|
||||
&data.destination,
|
||||
&data.prunes,
|
||||
data.wallclock,
|
||||
timestamp(),
|
||||
) {
|
||||
Err(CrdsGossipError::PruneMessageTimeout) => {
|
||||
inc_new_counter_debug!("cluster_info-prune_message_timeout", 1)
|
||||
}
|
||||
Err(CrdsGossipError::BadPruneDestination) => {
|
||||
inc_new_counter_debug!("cluster_info-bad_prune_destination", 1)
|
||||
}
|
||||
Err(_) => (),
|
||||
Ok(_) => (),
|
||||
}
|
||||
} else {
|
||||
inc_new_counter_debug!("cluster_info-gossip_prune_msg_verify_fail", 1);
|
||||
}
|
||||
vec![]
|
||||
}
|
||||
_ => Self::handle_repair(me, from_addr, blocktree, request),
|
||||
}
|
||||
}
|
||||
|
||||
/// Process messages from the network
|
||||
fn run_listen(
|
||||
obj: &Arc<RwLock<Self>>,
|
||||
|
@ -1393,7 +1410,6 @@ impl ClusterInfo {
|
|||
while let Ok(mut more) = requests_receiver.try_recv() {
|
||||
reqs.append(&mut more);
|
||||
}
|
||||
let mut resps = Vec::new();
|
||||
|
||||
let stakes: HashMap<_, _> = match bank_forks {
|
||||
Some(ref bank_forks) => {
|
||||
|
@ -1402,11 +1418,7 @@ impl ClusterInfo {
|
|||
None => HashMap::new(),
|
||||
};
|
||||
|
||||
for req in reqs {
|
||||
let mut resp = Self::handle_blob(obj, blocktree, &stakes, &req.read().unwrap());
|
||||
resps.append(&mut resp);
|
||||
}
|
||||
response_sender.send(resps)?;
|
||||
Self::handle_blobs(obj, blocktree, &stakes, &reqs, response_sender);
|
||||
Ok(())
|
||||
}
|
||||
pub fn listen(
|
||||
|
@ -1712,7 +1724,7 @@ mod tests {
|
|||
use crate::blocktree::Blocktree;
|
||||
use crate::crds_value::CrdsValueLabel;
|
||||
use crate::erasure::ErasureConfig;
|
||||
use crate::packet::BLOB_HEADER_SIZE;
|
||||
use crate::packet::{Blob, BLOB_HEADER_SIZE};
|
||||
use crate::repair_service::RepairType;
|
||||
use crate::result::Error;
|
||||
use crate::test_tx::test_tx;
|
||||
|
|
|
@ -145,14 +145,13 @@ impl CrdsGossip {
|
|||
self.pull.mark_pull_request_creation_time(from, now)
|
||||
}
|
||||
/// process a pull request and create a response
|
||||
pub fn process_pull_request(
|
||||
pub fn process_pull_requests(
|
||||
&mut self,
|
||||
caller: CrdsValue,
|
||||
filter: CrdsFilter,
|
||||
filters: Vec<(CrdsValue, CrdsFilter)>,
|
||||
now: u64,
|
||||
) -> Vec<CrdsValue> {
|
||||
) -> Vec<Vec<CrdsValue>> {
|
||||
self.pull
|
||||
.process_pull_request(&mut self.crds, caller, filter, now)
|
||||
.process_pull_requests(&mut self.crds, filters, now)
|
||||
}
|
||||
/// process a pull response
|
||||
pub fn process_pull_response(
|
||||
|
|
|
@ -191,21 +191,22 @@ impl CrdsGossipPull {
|
|||
}
|
||||
|
||||
/// process a pull request and create a response
|
||||
pub fn process_pull_request(
|
||||
pub fn process_pull_requests(
|
||||
&mut self,
|
||||
crds: &mut Crds,
|
||||
caller: CrdsValue,
|
||||
filter: CrdsFilter,
|
||||
requests: Vec<(CrdsValue, CrdsFilter)>,
|
||||
now: u64,
|
||||
) -> Vec<CrdsValue> {
|
||||
let rv = self.filter_crds_values(crds, &filter);
|
||||
let key = caller.label().pubkey();
|
||||
let old = crds.insert(caller, now);
|
||||
if let Some(val) = old.ok().and_then(|opt| opt) {
|
||||
self.purged_values
|
||||
.push_back((val.value_hash, val.local_timestamp));
|
||||
}
|
||||
crds.update_record_timestamp(&key, now);
|
||||
) -> Vec<Vec<CrdsValue>> {
|
||||
let rv = self.filter_crds_values(crds, &requests);
|
||||
requests.into_iter().for_each(|(caller, _)| {
|
||||
let key = caller.label().pubkey();
|
||||
let old = crds.insert(caller, now);
|
||||
if let Some(val) = old.ok().and_then(|opt| opt) {
|
||||
self.purged_values
|
||||
.push_back((val.value_hash, val.local_timestamp));
|
||||
}
|
||||
crds.update_record_timestamp(&key, now);
|
||||
});
|
||||
rv
|
||||
}
|
||||
/// process a pull response
|
||||
|
@ -251,13 +252,18 @@ impl CrdsGossipPull {
|
|||
filters
|
||||
}
|
||||
/// filter values that fail the bloom filter up to max_bytes
|
||||
fn filter_crds_values(&self, crds: &Crds, filter: &CrdsFilter) -> Vec<CrdsValue> {
|
||||
let mut ret = vec![];
|
||||
fn filter_crds_values(
|
||||
&self,
|
||||
crds: &Crds,
|
||||
filters: &[(CrdsValue, CrdsFilter)],
|
||||
) -> Vec<Vec<CrdsValue>> {
|
||||
let mut ret = vec![vec![]; filters.len()];
|
||||
for v in crds.table.values() {
|
||||
if filter.contains(&v.value_hash) {
|
||||
continue;
|
||||
}
|
||||
ret.push(v.value.clone());
|
||||
filters.iter().enumerate().for_each(|(i, (_, filter))| {
|
||||
if !filter.contains(&v.value_hash) {
|
||||
ret[i].push(v.value.clone());
|
||||
}
|
||||
});
|
||||
}
|
||||
ret
|
||||
}
|
||||
|
@ -395,10 +401,9 @@ mod test {
|
|||
let mut dest_crds = Crds::default();
|
||||
let mut dest = CrdsGossipPull::default();
|
||||
let (_, filters, caller) = req.unwrap();
|
||||
for filter in filters.into_iter() {
|
||||
let rsp = dest.process_pull_request(&mut dest_crds, caller.clone(), filter, 1);
|
||||
assert!(rsp.is_empty());
|
||||
}
|
||||
let filters = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
||||
let rsp = dest.process_pull_requests(&mut dest_crds, filters, 1);
|
||||
assert!(rsp.iter().all(|rsp| rsp.is_empty()));
|
||||
assert!(dest_crds.lookup(&caller.label()).is_some());
|
||||
assert_eq!(
|
||||
dest_crds
|
||||
|
@ -455,21 +460,20 @@ mod test {
|
|||
PACKET_DATA_SIZE,
|
||||
);
|
||||
let (_, filters, caller) = req.unwrap();
|
||||
let mut rsp = vec![];
|
||||
for filter in filters {
|
||||
rsp = dest.process_pull_request(&mut dest_crds, caller.clone(), filter, 0);
|
||||
// if there is a false positive this is empty
|
||||
// prob should be around 0.1 per iteration
|
||||
if rsp.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let filters = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
||||
let mut rsp = dest.process_pull_requests(&mut dest_crds, filters, 0);
|
||||
// if there is a false positive this is empty
|
||||
// prob should be around 0.1 per iteration
|
||||
if rsp.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if rsp.is_empty() {
|
||||
continue;
|
||||
}
|
||||
assert_eq!(rsp.len(), 1);
|
||||
let failed = node.process_pull_response(&mut node_crds, &node_pubkey, rsp, 1);
|
||||
let failed =
|
||||
node.process_pull_response(&mut node_crds, &node_pubkey, rsp.pop().unwrap(), 1);
|
||||
assert_eq!(failed, 0);
|
||||
assert_eq!(
|
||||
node_crds
|
||||
|
|
|
@ -846,7 +846,10 @@ impl Replicator {
|
|||
}
|
||||
}
|
||||
let res = r_reader.recv_timeout(Duration::new(1, 0));
|
||||
if let Ok(blobs) = res {
|
||||
if let Ok(mut blobs) = res {
|
||||
while let Ok(mut more) = r_reader.try_recv() {
|
||||
blobs.append(&mut more);
|
||||
}
|
||||
window_service::process_blobs(&blobs, blocktree)?;
|
||||
}
|
||||
// check if all the slots in the segment are complete
|
||||
|
|
|
@ -399,17 +399,23 @@ fn network_run_pull(
|
|||
.map(|f| f.filter.bits.len() as usize / 8)
|
||||
.sum::<usize>();
|
||||
bytes += serialized_size(&caller_info).unwrap() as usize;
|
||||
let filters = filters
|
||||
.into_iter()
|
||||
.map(|f| (caller_info.clone(), f))
|
||||
.collect();
|
||||
let rsp = network
|
||||
.get(&to)
|
||||
.map(|node| {
|
||||
let mut rsp = vec![];
|
||||
for filter in filters {
|
||||
rsp.append(&mut node.lock().unwrap().process_pull_request(
|
||||
caller_info.clone(),
|
||||
filter,
|
||||
now,
|
||||
));
|
||||
}
|
||||
rsp.append(
|
||||
&mut node
|
||||
.lock()
|
||||
.unwrap()
|
||||
.process_pull_requests(filters, now)
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.collect(),
|
||||
);
|
||||
rsp
|
||||
})
|
||||
.unwrap();
|
||||
|
|
Loading…
Reference in New Issue