//! The `crdt` module defines a data structure that is shared by all the nodes in the network over //! a gossip control plane. The goal is to share small bits of off-chain information and detect and //! repair partitions. //! //! This CRDT only supports a very limited set of types. A map of PublicKey -> Versioned Struct. //! The last version is always picked during an update. //! //! The network is arranged in layers: //! //! * layer 0 - Leader. //! * layer 1 - As many nodes as we can fit //! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes. //! //! Bank needs to provide an interface for us to query the stake weight use bincode::{deserialize, serialize}; use byteorder::{LittleEndian, ReadBytesExt}; use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerStrategy}; use counter::Counter; use hash::Hash; use ledger::LedgerWindow; use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; use pnet_datalink as datalink; use rand::{thread_rng, RngCore}; use rayon::prelude::*; use result::{Error, Result}; use signature::{KeyPair, KeyPairUtil, PublicKey}; use std; use std::collections::HashMap; use std::collections::VecDeque; use std::io::Cursor; use std::net::{IpAddr, SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::{sleep, Builder, JoinHandle}; use std::time::Duration; use streamer::{BlobReceiver, BlobSender, SharedWindow, WindowIndex}; use timing::timestamp; use transaction::Vote; /// milliseconds we sleep for between gossip requests const GOSSIP_SLEEP_MILLIS: u64 = 100; const GOSSIP_PURGE_MILLIS: u64 = 15000; /// minimum membership table size before we start purging dead nodes const MIN_TABLE_SIZE: usize = 2; #[derive(Debug, PartialEq, Eq)] pub enum CrdtError { NoPeers, NoLeader, BadContactInfo, BadNodeInfo, BadGossipAddress, } pub fn parse_port_or_addr(optstr: Option) -> SocketAddr { let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address"); if let Some(addrstr) = optstr { if let Ok(port) = addrstr.parse() { let mut addr = daddr; addr.set_port(port); addr } else if let Ok(addr) = addrstr.parse() { addr } else { daddr } } else { daddr } } pub fn get_ip_addr() -> Option { for iface in datalink::interfaces() { for p in iface.ips { if !p.ip().is_loopback() && !p.ip().is_multicast() { match p.ip() { IpAddr::V4(addr) => { if !addr.is_link_local() { return Some(p.ip()); } } IpAddr::V6(_addr) => { // Select an ipv6 address if the config is selected #[cfg(feature = "ipv6")] { return Some(p.ip()); } } } } } } None } /// Structure to be replicated by the network #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct ContactInfo { /// gossip address pub ncp: SocketAddr, /// address to connect to for replication pub tvu: SocketAddr, /// address to connect to when this node is leader pub rpu: SocketAddr, /// transactions address pub tpu: SocketAddr, /// repair address, we use this to jump ahead of the packets /// destined to the replciate_addr pub tvu_window: SocketAddr, /// if this struture changes update this value as well /// Always update `NodeInfo` version too /// This separate version for addresses allows us to use the `Vote` /// as means of updating the `NodeInfo` table without touching the /// addresses if they haven't changed. pub version: u64, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct LedgerState { /// last verified hash that was submitted to the leader pub last_id: Hash, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct NodeInfo { pub id: PublicKey, /// If any of the bits change, update increment this value pub version: u64, /// network addresses pub contact_info: ContactInfo, /// current leader identity pub leader_id: PublicKey, /// information about the state of the ledger pub ledger_state: LedgerState, } fn make_debug_id(key: &PublicKey) -> u64 { let buf: &[u8] = &key.as_ref(); let mut rdr = Cursor::new(&buf[..8]); rdr.read_u64::() .expect("rdr.read_u64 in fn debug_id") } impl NodeInfo { pub fn new( id: PublicKey, ncp: SocketAddr, tvu: SocketAddr, rpu: SocketAddr, tpu: SocketAddr, tvu_window: SocketAddr, ) -> Self { NodeInfo { id, version: 0, contact_info: ContactInfo { ncp, tvu, rpu, tpu, tvu_window, version: 0, }, leader_id: PublicKey::default(), ledger_state: LedgerState { last_id: Hash::default(), }, } } #[cfg(test)] /// NodeInfo with unspecified addresses for adversarial testing. pub fn new_unspecified() -> Self { let addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); assert!(addr.ip().is_unspecified()); Self::new( KeyPair::new().pubkey(), addr.clone(), addr.clone(), addr.clone(), addr.clone(), addr.clone(), ) } #[cfg(test)] /// NodeInfo with multicast addresses for adversarial testing. pub fn new_multicast() -> Self { let addr: SocketAddr = "224.0.1.255:1000".parse().unwrap(); assert!(addr.ip().is_multicast()); Self::new( KeyPair::new().pubkey(), addr.clone(), addr.clone(), addr.clone(), addr.clone(), addr.clone(), ) } pub fn debug_id(&self) -> u64 { make_debug_id(&self.id) } fn next_port(addr: &SocketAddr, nxt: u16) -> SocketAddr { let mut nxt_addr = *addr; nxt_addr.set_port(addr.port() + nxt); nxt_addr } pub fn new_leader_with_pubkey(pubkey: PublicKey, bind_addr: &SocketAddr) -> Self { let transactions_addr = *bind_addr; let gossip_addr = Self::next_port(&bind_addr, 1); let replicate_addr = Self::next_port(&bind_addr, 2); let requests_addr = Self::next_port(&bind_addr, 3); let repair_addr = Self::next_port(&bind_addr, 4); NodeInfo::new( pubkey, gossip_addr, replicate_addr, requests_addr, transactions_addr, repair_addr, ) } pub fn new_leader(bind_addr: &SocketAddr) -> Self { let keypair = KeyPair::new(); Self::new_leader_with_pubkey(keypair.pubkey(), bind_addr) } pub fn new_entry_point(gossip_addr: SocketAddr) -> Self { let daddr: SocketAddr = "0.0.0.0:0".parse().unwrap(); NodeInfo::new( PublicKey::default(), gossip_addr, daddr, daddr, daddr, daddr, ) } } /// `Crdt` structure keeps a table of `NodeInfo` structs /// # Properties /// * `table` - map of public id's to versioned and signed NodeInfo structs /// * `local` - map of public id's to what `self.update_index` `self.table` was updated /// * `remote` - map of public id's to the `remote.update_index` was sent /// * `update_index` - my update index /// # Remarks /// This implements two services, `gossip` and `listen`. /// * `gossip` - asynchronously ask nodes to send updates /// * `listen` - listen for requests and responses /// No attempt to keep track of timeouts or dropped requests is made, or should be. pub struct Crdt { /// table of everyone in the network pub table: HashMap, /// Value of my update index when entry in table was updated. /// Nodes will ask for updates since `update_index`, and this node /// should respond with all the identities that are greater then the /// request's `update_index` in this list local: HashMap, /// The value of the remote update index that I have last seen /// This Node will ask external nodes for updates since the value in this list pub remote: HashMap, /// last time the public key had sent us a message pub alive: HashMap, pub update_index: u64, pub me: PublicKey, /// last time we heard from anyone getting a message fro this public key /// these are rumers and shouldn't be trusted directly external_liveness: HashMap>, } // TODO These messages should be signed, and go through the gpu pipeline for spam filtering #[derive(Serialize, Deserialize, Debug)] enum Protocol { /// forward your own latest data structure when requesting an update /// this doesn't update the `remote` update index, but it allows the /// recepient of this request to add knowledge of this node to the network /// (last update index i saw from you, my replicated data) RequestUpdates(u64, NodeInfo), //TODO might need a since? /// from id, form's last update index, NodeInfo ReceiveUpdates(PublicKey, u64, Vec, Vec<(PublicKey, u64)>), /// ask for a missing index /// (my replicated data to keep alive, missing window index) RequestWindowIndex(NodeInfo, u64), } impl Crdt { pub fn new(me: NodeInfo) -> Result { if me.version != 0 { return Err(Error::CrdtError(CrdtError::BadNodeInfo)); } if me.contact_info.ncp.ip().is_unspecified() || me.contact_info.ncp.port() == 0 || me.contact_info.ncp.ip().is_multicast() { return Err(Error::CrdtError(CrdtError::BadGossipAddress)); } for addr in &[ me.contact_info.tvu, me.contact_info.rpu, me.contact_info.tpu, me.contact_info.tvu_window, ] { //dummy address is allowed, services will filter them if addr.ip().is_unspecified() && addr.port() == 0 { continue; } //if addr is not a dummy address, than it must be valid if addr.ip().is_unspecified() || addr.port() == 0 || addr.ip().is_multicast() { return Err(Error::CrdtError(CrdtError::BadContactInfo)); } } let mut g = Crdt { table: HashMap::new(), local: HashMap::new(), remote: HashMap::new(), alive: HashMap::new(), external_liveness: HashMap::new(), me: me.id, update_index: 1, }; g.local.insert(me.id, g.update_index); g.table.insert(me.id, me); Ok(g) } pub fn debug_id(&self) -> u64 { make_debug_id(&self.me) } pub fn my_data(&self) -> &NodeInfo { &self.table[&self.me] } pub fn leader_data(&self) -> Option<&NodeInfo> { let leader_id = self.table[&self.me].leader_id; // leader_id can be 0s from network entry point if leader_id == PublicKey::default() { return None; } self.table.get(&leader_id) } pub fn set_leader(&mut self, key: PublicKey) -> () { let mut me = self.my_data().clone(); warn!( "{:x}: LEADER_UPDATE TO {:x} from {:x}", me.debug_id(), make_debug_id(&key), make_debug_id(&me.leader_id), ); me.leader_id = key; me.version += 1; self.insert(&me); } pub fn get_external_liveness_entry(&self, key: &PublicKey) -> Option<&HashMap> { self.external_liveness.get(key) } pub fn insert_vote(&mut self, pubkey: &PublicKey, v: &Vote, last_id: Hash) { if self.table.get(pubkey).is_none() { warn!( "{:x}: VOTE for unknown id: {:x}", self.debug_id(), make_debug_id(pubkey) ); return; } if v.contact_info_version > self.table[pubkey].contact_info.version { warn!( "{:x}: VOTE for new address version from: {:x} ours: {} vote: {:?}", self.debug_id(), make_debug_id(pubkey), self.table[pubkey].contact_info.version, v, ); return; } if *pubkey == self.my_data().leader_id { info!( "{:x}: LEADER_VOTED! {:x}", self.debug_id(), make_debug_id(&pubkey) ); inc_new_counter!("crdt-insert_vote-leader_voted", 1); } if v.version <= self.table[pubkey].version { debug!( "{:x}: VOTE for old version: {:x}", self.debug_id(), make_debug_id(&pubkey) ); self.update_liveness(*pubkey); return; } else { let mut data = self.table[pubkey].clone(); data.version = v.version; data.ledger_state.last_id = last_id; debug!( "{:x}: INSERTING VOTE! for {:x}", self.debug_id(), data.debug_id() ); self.update_liveness(data.id); self.insert(&data); } } pub fn insert_votes(&mut self, votes: &[(PublicKey, Vote, Hash)]) { inc_new_counter!("crdt-vote-count", votes.len()); if !votes.is_empty() { info!("{:x}: INSERTING VOTES {}", self.debug_id(), votes.len()); } for v in votes { self.insert_vote(&v.0, &v.1, v.2); } } pub fn insert(&mut self, v: &NodeInfo) { // TODO check that last_verified types are always increasing //update the peer table if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) { //somehow we signed a message for our own identity with a higher version that // we have stored ourselves trace!( "{:x}: insert v.id: {:x} version: {}", self.debug_id(), v.debug_id(), v.version ); if self.table.get(&v.id).is_none() { inc_new_counter!("crdt-insert-new_entry", 1, 1); } self.update_index += 1; let _ = self.table.insert(v.id, v.clone()); let _ = self.local.insert(v.id, self.update_index); inc_new_counter!("crdt-update-count", 1); self.update_liveness(v.id); } else { trace!( "{:x}: INSERT FAILED data: {:x} new.version: {} me.version: {}", self.debug_id(), v.debug_id(), v.version, self.table[&v.id].version ); } } fn update_liveness(&mut self, id: PublicKey) { //update the liveness table let now = timestamp(); trace!( "{:x} updating liveness {:x} to {}", self.debug_id(), make_debug_id(&id), now ); *self.alive.entry(id).or_insert(now) = now; } /// purge old validators /// TODO: we need a robust membership protocol /// http://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf /// challenging part is that we are on a permissionless network pub fn purge(&mut self, now: u64) { if self.table.len() <= MIN_TABLE_SIZE { trace!("purge: skipped: table too small: {}", self.table.len()); return; } if self.leader_data().is_none() { trace!("purge: skipped: no leader_data"); return; } let leader_id = self.leader_data().unwrap().id; let limit = GOSSIP_PURGE_MILLIS; let dead_ids: Vec = self .alive .iter() .filter_map(|(&k, v)| { if k != self.me && (now - v) > limit { Some(k) } else { trace!( "{:x} purge skipped {:x} {} {}", self.debug_id(), make_debug_id(&k), now - v, limit ); None } }) .collect(); inc_new_counter!("crdt-purge-count", dead_ids.len()); for id in &dead_ids { self.alive.remove(id); self.table.remove(id); self.remote.remove(id); self.local.remove(id); self.external_liveness.remove(id); info!("{:x}: PURGE {:x}", self.debug_id(), make_debug_id(id)); for map in self.external_liveness.values_mut() { map.remove(id); } if *id == leader_id { info!( "{:x}: PURGE LEADER {:x}", self.debug_id(), make_debug_id(id), ); inc_new_counter!("crdt-purge-purged_leader", 1, 1); self.set_leader(PublicKey::default()); } } } /// compute broadcast table /// # Remarks pub fn compute_broadcast_table(&self) -> Vec { let live: Vec<_> = self.alive.iter().collect(); //thread_rng().shuffle(&mut live); let me = &self.table[&self.me]; let cloned_table: Vec = live .iter() .map(|x| &self.table[x.0]) .filter(|v| { if me.id == v.id { //filter myself false } else if !(Self::is_valid_address(v.contact_info.tvu)) { trace!( "{:x}:broadcast skip not listening {:x}", me.debug_id(), v.debug_id() ); false } else { trace!( "{:x}:broadcast node {:x} {}", me.debug_id(), v.debug_id(), v.contact_info.tvu ); true } }) .cloned() .collect(); cloned_table } /// broadcast messages from the leader to layer 1 nodes /// # Remarks /// We need to avoid having obj locked while doing any io, such as the `send_to` pub fn broadcast( me: &NodeInfo, broadcast_table: &[NodeInfo], window: &SharedWindow, s: &UdpSocket, transmit_index: &mut WindowIndex, received_index: u64, ) -> Result<()> { if broadcast_table.is_empty() { warn!("{:x}:not enough peers in crdt table", me.debug_id()); inc_new_counter!("crdt-broadcast-not_enough_peers_error", 1); Err(CrdtError::NoPeers)?; } trace!( "{:x} transmit_index: {:?} received_index: {} broadcast_len: {}", me.debug_id(), *transmit_index, received_index, broadcast_table.len() ); let old_transmit_index = transmit_index.data; // enumerate all the blobs in the window, those are the indices // transmit them to nodes, starting from a different node let mut orders = Vec::with_capacity((received_index - transmit_index.data) as usize); let window_l = window.write().unwrap(); let mut br_idx = transmit_index.data as usize % broadcast_table.len(); for idx in transmit_index.data..received_index { let w_idx = idx as usize % window_l.len(); trace!( "{:x} broadcast order data w_idx {} br_idx {}", me.debug_id(), w_idx, br_idx ); orders.push((window_l[w_idx].data.clone(), &broadcast_table[br_idx])); br_idx += 1; br_idx %= broadcast_table.len(); } for idx in transmit_index.coding..received_index { let w_idx = idx as usize % window_l.len(); // skip over empty slots if window_l[w_idx].coding.is_none() { continue; } trace!( "{:x} broadcast order coding w_idx: {} br_idx :{}", me.debug_id(), w_idx, br_idx, ); orders.push((window_l[w_idx].coding.clone(), &broadcast_table[br_idx])); br_idx += 1; br_idx %= broadcast_table.len(); } trace!("broadcast orders table {}", orders.len()); let errs: Vec<_> = orders .into_iter() .map(|(b, v)| { // only leader should be broadcasting assert!(me.leader_id != v.id); let bl = b.unwrap(); let blob = bl.read().expect("blob read lock in streamer::broadcast"); //TODO profile this, may need multiple sockets for par_iter trace!( "{:x}: BROADCAST idx: {} sz: {} to {:x},{} coding: {}", me.debug_id(), blob.get_index().unwrap(), blob.meta.size, v.debug_id(), v.contact_info.tvu, blob.is_coding() ); assert!(blob.meta.size <= BLOB_SIZE); let e = s.send_to(&blob.data[..blob.meta.size], &v.contact_info.tvu); trace!( "{:x}: done broadcast {} to {:x} {}", me.debug_id(), blob.meta.size, v.debug_id(), v.contact_info.tvu ); e }) .collect(); trace!("broadcast results {}", errs.len()); for e in errs { if let Err(e) = &e { eprintln!("broadcast result {:?}", e); } e?; if transmit_index.data < received_index { transmit_index.data += 1; } } inc_new_counter!( "crdt-broadcast-max_idx", (transmit_index.data - old_transmit_index) as usize ); transmit_index.coding = transmit_index.data; Ok(()) } /// retransmit messages from the leader to layer 1 nodes /// # Remarks /// We need to avoid having obj locked while doing any io, such as the `send_to` pub fn retransmit(obj: &Arc>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> { let (me, table): (NodeInfo, Vec) = { // copy to avoid locking during IO let s = obj.read().expect("'obj' read lock in pub fn retransmit"); (s.table[&s.me].clone(), s.table.values().cloned().collect()) }; blob.write() .unwrap() .set_id(me.id) .expect("set_id in pub fn retransmit"); let rblob = blob.read().unwrap(); let orders: Vec<_> = table .iter() .filter(|v| { if me.id == v.id { false } else if me.leader_id == v.id { trace!("skip retransmit to leader {:?}", v.id); false } else if !(Self::is_valid_address(v.contact_info.tvu)) { trace!("skip nodes that are not listening {:?}", v.id); false } else { true } }) .collect(); trace!("retransmit orders {}", orders.len()); let errs: Vec<_> = orders .par_iter() .map(|v| { debug!( "{:x}: retransmit blob {} to {:x}", me.debug_id(), rblob.get_index().unwrap(), v.debug_id(), ); //TODO profile this, may need multiple sockets for par_iter assert!(rblob.meta.size <= BLOB_SIZE); s.send_to(&rblob.data[..rblob.meta.size], &v.contact_info.tvu) }) .collect(); for e in errs { if let Err(e) = &e { inc_new_counter!("crdt-retransmit-send_to_error", 1, 1); error!("retransmit result {:?}", e); } e?; } Ok(()) } // max number of nodes that we could be converged to pub fn convergence(&self) -> u64 { let max = self.remote.values().len() as u64 + 1; self.remote.values().fold(max, |a, b| std::cmp::min(a, *b)) } fn random() -> u64 { thread_rng().next_u64() } // TODO: fill in with real implmentation once staking is implemented fn get_stake(_id: PublicKey) -> f64 { 1.0 } fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec) { //trace!("get updates since {}", v); let data = self .table .values() .filter(|x| x.id != PublicKey::default() && self.local[&x.id] > v) .cloned() .collect(); let id = self.me; let ups = self.update_index; (id, ups, data) } pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec)> { let valid: Vec<_> = self .table .values() .filter(|r| r.id != self.me && Self::is_valid_address(r.contact_info.tvu_window)) .collect(); if valid.is_empty() { Err(CrdtError::NoPeers)?; } let n = (Self::random() as usize) % valid.len(); let addr = valid[n].contact_info.ncp; let req = Protocol::RequestWindowIndex(self.table[&self.me].clone(), ix); let out = serialize(&req)?; Ok((addr, out)) } /// Create a random gossip request /// # Returns /// (A,B) /// * A - Address to send to /// * B - RequestUpdates protocol message fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> { let options: Vec<_> = self .table .values() .filter(|v| { v.id != self.me && !v.contact_info.ncp.ip().is_unspecified() && !v.contact_info.ncp.ip().is_multicast() }) .collect(); let choose_peer_strategy = ChooseWeightedPeerStrategy::new( &self.remote, &self.external_liveness, &Self::get_stake, ); let choose_peer_result = choose_peer_strategy.choose_peer(options); if let Err(Error::CrdtError(CrdtError::NoPeers)) = &choose_peer_result { trace!( "crdt too small for gossip {:x} {}", self.debug_id(), self.table.len() ); }; let v = choose_peer_result?; let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0); let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone()); trace!( "created gossip request from {:x} to {:x} {}", self.debug_id(), v.debug_id(), v.contact_info.ncp ); Ok((v.contact_info.ncp, req)) } pub fn new_vote(&mut self, last_id: Hash) -> Result<(Vote, SocketAddr)> { let mut me = self.my_data().clone(); let leader = self.leader_data().ok_or(CrdtError::NoLeader)?.clone(); me.version += 1; me.ledger_state.last_id = last_id; let vote = Vote { version: me.version, contact_info_version: me.contact_info.version, }; self.insert(&me); Ok((vote, leader.contact_info.tpu)) } /// At random pick a node and try to get updated changes from them fn run_gossip( obj: &Arc>, blob_sender: &BlobSender, blob_recycler: &BlobRecycler, ) -> Result<()> { //TODO we need to keep track of stakes and weight the selection by stake size //TODO cache sockets // Lock the object only to do this operation and not for any longer // especially not when doing the `sock.send_to` let (remote_gossip_addr, req) = obj .read() .expect("'obj' read lock in fn run_gossip") .gossip_request()?; // TODO this will get chatty, so we need to first ask for number of updates since // then only ask for specific data that we dont have let blob = to_blob(req, remote_gossip_addr, blob_recycler)?; let mut q: VecDeque = VecDeque::new(); q.push_back(blob); blob_sender.send(q)?; Ok(()) } /// TODO: This is obviously the wrong way to do this. Need to implement leader selection fn top_leader(&self) -> Option { let mut table = HashMap::new(); let def = PublicKey::default(); let cur = self.table.values().filter(|x| x.leader_id != def); for v in cur { let cnt = table.entry(&v.leader_id).or_insert(0); *cnt += 1; trace!("leader {:x} {}", make_debug_id(&v.leader_id), *cnt); } let mut sorted: Vec<(&PublicKey, usize)> = table.into_iter().collect(); let my_id = self.debug_id(); for x in &sorted { trace!( "{:x}: sorted leaders {:x} votes: {}", my_id, make_debug_id(&x.0), x.1 ); } sorted.sort_by_key(|a| a.1); sorted.last().map(|a| *a.0) } /// TODO: This is obviously the wrong way to do this. Need to implement leader selection /// A t-shirt for the first person to actually use this bad behavior to attack the alpha testnet fn update_leader(&mut self) { if let Some(leader_id) = self.top_leader() { if self.my_data().leader_id != leader_id && self.table.get(&leader_id).is_some() { self.set_leader(leader_id); } } } /// Apply updates that we received from the identity `from` /// # Arguments /// * `from` - identity of the sender of the updates /// * `update_index` - the number of updates that `from` has completed and this set of `data` represents /// * `data` - the update data fn apply_updates( &mut self, from: PublicKey, update_index: u64, data: &[NodeInfo], external_liveness: &[(PublicKey, u64)], ) { trace!("got updates {}", data.len()); // TODO we need to punish/spam resist here // sig verify the whole update and slash anyone who sends a bad update for v in data { self.insert(&v); } for (pk, external_remote_index) in external_liveness { let remote_entry = if let Some(v) = self.remote.get(pk) { *v } else { 0 }; if remote_entry >= *external_remote_index { continue; } let liveness_entry = self .external_liveness .entry(*pk) .or_insert_with(HashMap::new); let peer_index = *liveness_entry.entry(from).or_insert(*external_remote_index); if *external_remote_index > peer_index { liveness_entry.insert(from, *external_remote_index); } } *self.remote.entry(from).or_insert(update_index) = update_index; // Clear the remote liveness table for this node, b/c we've heard directly from them // so we don't need to rely on rumors self.external_liveness.remove(&from); } /// randomly pick a node and ask them for updates asynchronously pub fn gossip( obj: Arc>, blob_recycler: BlobRecycler, blob_sender: BlobSender, exit: Arc, ) -> JoinHandle<()> { Builder::new() .name("solana-gossip".to_string()) .spawn(move || loop { let start = timestamp(); let _ = Self::run_gossip(&obj, &blob_sender, &blob_recycler); if exit.load(Ordering::Relaxed) { return; } obj.write().unwrap().purge(timestamp()); //TODO: possibly tune this parameter //we saw a deadlock passing an obj.read().unwrap().timeout into sleep obj.write().unwrap().update_leader(); let elapsed = timestamp() - start; if GOSSIP_SLEEP_MILLIS > elapsed { let time_left = GOSSIP_SLEEP_MILLIS - elapsed; sleep(Duration::from_millis(time_left)); } }) .unwrap() } fn run_window_request( window: &SharedWindow, ledger_window: &mut Option<&mut LedgerWindow>, me: &NodeInfo, from: &NodeInfo, ix: u64, blob_recycler: &BlobRecycler, ) -> Option { let pos = (ix as usize) % window.read().unwrap().len(); if let Some(blob) = &window.read().unwrap()[pos].data { let mut wblob = blob.write().unwrap(); let blob_ix = wblob.get_index().expect("run_window_request get_index"); if blob_ix == ix { let num_retransmits = wblob.meta.num_retransmits; wblob.meta.num_retransmits += 1; // Setting the sender id to the requester id // prevents the requester from retransmitting this response // to other peers let mut sender_id = from.id; // Allow retransmission of this response if the node // is the leader and the number of repair requests equals // a power of two if me.leader_id == me.id && (num_retransmits == 0 || num_retransmits.is_power_of_two()) { sender_id = me.id } let out = blob_recycler.allocate(); // copy to avoid doing IO inside the lock { let mut outblob = out.write().unwrap(); let sz = wblob.meta.size; outblob.meta.size = sz; outblob.data[..sz].copy_from_slice(&wblob.data[..sz]); outblob.meta.set_addr(&from.contact_info.tvu_window); outblob.set_id(sender_id).expect("blob set_id"); } inc_new_counter!("crdt-window-request-pass", 1); return Some(out); } else { inc_new_counter!("crdt-window-request-outside", 1); info!( "requested ix {} != blob_ix {}, outside window!", ix, blob_ix ); // falls through to checking window_ledger } } if let Some(ledger_window) = ledger_window { if let Ok(entry) = ledger_window.get_entry(ix) { inc_new_counter!("crdt-window-request-ledger", 1); let out = entry.to_blob( blob_recycler, Some(ix), Some(from.id), Some(&from.contact_info.tvu_window), ); return Some(out); } } inc_new_counter!("crdt-window-request-fail", 1); info!( "{:x}: failed RequestWindowIndex {:x} {} {}", me.debug_id(), from.debug_id(), ix, pos, ); None } //TODO we should first coalesce all the requests fn handle_blob( obj: &Arc>, window: &SharedWindow, ledger_window: &mut Option<&mut LedgerWindow>, blob_recycler: &BlobRecycler, blob: &Blob, ) -> Option { match deserialize(&blob.data[..blob.meta.size]) { Ok(request) => { Crdt::handle_protocol(request, obj, window, ledger_window, blob_recycler) } Err(_) => { warn!("deserialize crdt packet failed"); None } } } fn handle_protocol( request: Protocol, obj: &Arc>, window: &SharedWindow, ledger_window: &mut Option<&mut LedgerWindow>, blob_recycler: &BlobRecycler, ) -> Option { match request { // TODO sigverify these Protocol::RequestUpdates(v, from_rd) => { let addr = from_rd.contact_info.ncp; trace!("RequestUpdates {} from {}", v, addr); let me = obj.read().unwrap(); if addr == me.table[&me.me].contact_info.ncp { warn!( "RequestUpdates ignored, I'm talking to myself: me={:x} remoteme={:x}", me.debug_id(), make_debug_id(&from_rd.id) ); inc_new_counter!("crdt-window-request-loopback", 1); return None; } // only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from` let (from, ups, data) = me.get_updates_since(v); let external_liveness = me.remote.iter().map(|(k, v)| (*k, *v)).collect(); drop(me); trace!("get updates since response {} {}", v, data.len()); let len = data.len(); let rsp = Protocol::ReceiveUpdates(from, ups, data, external_liveness); { let mut me = obj.write().unwrap(); me.insert(&from_rd); me.update_liveness(from_rd.id); } if len < 1 { let me = obj.read().unwrap(); trace!( "no updates me {:x} ix {} since {}", me.debug_id(), me.update_index, v ); None } else if let Ok(r) = to_blob(rsp, addr, &blob_recycler) { trace!( "sending updates me {:x} len {} to {:x} {}", obj.read().unwrap().debug_id(), len, from_rd.debug_id(), addr, ); Some(r) } else { warn!("to_blob failed"); None } } Protocol::ReceiveUpdates(from, update_index, data, external_liveness) => { trace!( "ReceivedUpdates from={:x} update_index={} len={}", make_debug_id(&from), update_index, data.len() ); obj.write() .expect("'obj' write lock in ReceiveUpdates") .apply_updates(from, update_index, &data, &external_liveness); None } Protocol::RequestWindowIndex(from, ix) => { //TODO this doesn't depend on CRDT module, can be moved //but we are using the listen thread to service these request //TODO verify from is signed obj.write().unwrap().insert(&from); let me = obj.read().unwrap().my_data().clone(); inc_new_counter!("crdt-window-request-recv", 1); trace!( "{:x}:received RequestWindowIndex {:x} {} ", me.debug_id(), from.debug_id(), ix, ); if from.contact_info.tvu_window == me.contact_info.tvu_window { warn!( "Ignored {:x}:received RequestWindowIndex from ME {:x} {} ", me.debug_id(), from.debug_id(), ix, ); inc_new_counter!("crdt-window-request-address-eq", 1); return None; } Self::run_window_request(&window, ledger_window, &me, &from, ix, blob_recycler) } } } /// Process messages from the network fn run_listen( obj: &Arc>, window: &SharedWindow, ledger_window: &mut Option<&mut LedgerWindow>, blob_recycler: &BlobRecycler, requests_receiver: &BlobReceiver, response_sender: &BlobSender, ) -> Result<()> { //TODO cache connections let timeout = Duration::new(1, 0); let mut reqs = requests_receiver.recv_timeout(timeout)?; while let Ok(mut more) = requests_receiver.try_recv() { reqs.append(&mut more); } let mut resps = VecDeque::new(); while let Some(req) = reqs.pop_front() { if let Some(resp) = Self::handle_blob( obj, window, ledger_window, blob_recycler, &req.read().unwrap(), ) { resps.push_back(resp); } blob_recycler.recycle(req); } response_sender.send(resps)?; Ok(()) } pub fn listen( obj: Arc>, window: SharedWindow, ledger_path: Option<&str>, blob_recycler: BlobRecycler, requests_receiver: BlobReceiver, response_sender: BlobSender, exit: Arc, ) -> JoinHandle<()> { let debug_id = obj.read().unwrap().debug_id(); let mut ledger_window = ledger_path.map(|p| LedgerWindow::new(p).unwrap()); Builder::new() .name("solana-listen".to_string()) .spawn(move || loop { let e = Self::run_listen( &obj, &window, &mut ledger_window.as_mut(), &blob_recycler, &requests_receiver, &response_sender, ); if exit.load(Ordering::Relaxed) { return; } if e.is_err() { debug!( "{:x}: run_listen timeout, table size: {}", debug_id, obj.read().unwrap().table.len() ); } }) .unwrap() } pub fn is_valid_address(addr: SocketAddr) -> bool { (addr.port() != 0) && !(addr.ip().is_unspecified() || addr.ip().is_multicast()) } } pub struct Sockets { pub gossip: UdpSocket, pub gossip_send: UdpSocket, pub requests: UdpSocket, pub replicate: UdpSocket, pub transaction: UdpSocket, pub respond: UdpSocket, pub broadcast: UdpSocket, pub repair: UdpSocket, pub retransmit: UdpSocket, } pub struct TestNode { pub data: NodeInfo, pub sockets: Sockets, } impl TestNode { pub fn new_localhost() -> Self { let pubkey = KeyPair::new().pubkey(); Self::new_localhost_with_pubkey(pubkey) } pub fn new_localhost_with_pubkey(pubkey: PublicKey) -> Self { let transaction = UdpSocket::bind("127.0.0.1:0").unwrap(); let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); let requests = UdpSocket::bind("127.0.0.1:0").unwrap(); let repair = UdpSocket::bind("127.0.0.1:0").unwrap(); let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap(); let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); let data = NodeInfo::new( pubkey, gossip.local_addr().unwrap(), replicate.local_addr().unwrap(), requests.local_addr().unwrap(), transaction.local_addr().unwrap(), repair.local_addr().unwrap(), ); TestNode { data, sockets: Sockets { gossip, gossip_send, requests, replicate, transaction, respond, broadcast, repair, retransmit, }, } } pub fn new_with_bind_addr(data: NodeInfo, bind_addr: SocketAddr) -> TestNode { let mut local_gossip_addr = bind_addr; local_gossip_addr.set_port(data.contact_info.ncp.port()); let mut local_replicate_addr = bind_addr; local_replicate_addr.set_port(data.contact_info.tvu.port()); let mut local_requests_addr = bind_addr; local_requests_addr.set_port(data.contact_info.rpu.port()); let mut local_transactions_addr = bind_addr; local_transactions_addr.set_port(data.contact_info.tpu.port()); let mut local_repair_addr = bind_addr; local_repair_addr.set_port(data.contact_info.tvu_window.port()); fn bind(addr: SocketAddr) -> UdpSocket { match UdpSocket::bind(addr) { Ok(socket) => socket, Err(err) => { panic!("Failed to bind to {:?}: {:?}", addr, err); } } }; let transaction = bind(local_transactions_addr); let gossip = bind(local_gossip_addr); let replicate = bind(local_replicate_addr); let repair = bind(local_repair_addr); let requests = bind(local_requests_addr); // Responses are sent from the same Udp port as requests are received // from, in hopes that a NAT sitting in the middle will route the // response Udp packet correctly back to the requester. let respond = requests.try_clone().unwrap(); let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); TestNode { data, sockets: Sockets { gossip, gossip_send, requests, replicate, transaction, respond, broadcast, repair, retransmit, }, } } } #[cfg(test)] mod tests { use crdt::{ parse_port_or_addr, Crdt, CrdtError, NodeInfo, Protocol, GOSSIP_PURGE_MILLIS, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE, }; use entry::Entry; use hash::{hash, Hash}; use ledger::{LedgerWindow, LedgerWriter}; use logger; use packet::BlobRecycler; use result::Error; use signature::{KeyPair, KeyPairUtil, PublicKey}; use std::fs::remove_dir_all; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; use streamer::default_window; use transaction::Vote; #[test] fn test_parse_port_or_addr() { let p1 = parse_port_or_addr(Some("9000".to_string())); assert_eq!(p1.port(), 9000); let p2 = parse_port_or_addr(Some("127.0.0.1:7000".to_string())); assert_eq!(p2.port(), 7000); let p3 = parse_port_or_addr(None); assert_eq!(p3.port(), 8000); } #[test] fn test_bad_address() { let d1 = NodeInfo::new( KeyPair::new().pubkey(), "0.0.0.0:1234".parse().unwrap(), "0.0.0.0:1235".parse().unwrap(), "0.0.0.0:1236".parse().unwrap(), "0.0.0.0:1237".parse().unwrap(), "0.0.0.0:1238".parse().unwrap(), ); assert_matches!( Crdt::new(d1).err(), Some(Error::CrdtError(CrdtError::BadGossipAddress)) ); let d1_1 = NodeInfo::new( KeyPair::new().pubkey(), "0.0.0.1:1234".parse().unwrap(), "0.0.0.0:1235".parse().unwrap(), "0.0.0.0:1236".parse().unwrap(), "0.0.0.0:1237".parse().unwrap(), "0.0.0.0:1238".parse().unwrap(), ); assert_matches!( Crdt::new(d1_1).err(), Some(Error::CrdtError(CrdtError::BadContactInfo)) ); let d2 = NodeInfo::new( KeyPair::new().pubkey(), "0.0.0.1:0".parse().unwrap(), "0.0.0.1:0".parse().unwrap(), "0.0.0.1:0".parse().unwrap(), "0.0.0.1:0".parse().unwrap(), "0.0.0.1:0".parse().unwrap(), ); assert_matches!( Crdt::new(d2).err(), Some(Error::CrdtError(CrdtError::BadGossipAddress)) ); let d2_1 = NodeInfo::new( KeyPair::new().pubkey(), "0.0.0.1:1234".parse().unwrap(), "0.0.0.1:0".parse().unwrap(), "0.0.0.1:0".parse().unwrap(), "0.0.0.1:0".parse().unwrap(), "0.0.0.1:0".parse().unwrap(), ); assert_matches!( Crdt::new(d2_1).err(), Some(Error::CrdtError(CrdtError::BadContactInfo)) ); let d3 = NodeInfo::new_unspecified(); assert_matches!( Crdt::new(d3).err(), Some(Error::CrdtError(CrdtError::BadGossipAddress)) ); let d4 = NodeInfo::new_multicast(); assert_matches!( Crdt::new(d4).err(), Some(Error::CrdtError(CrdtError::BadGossipAddress)) ); let mut d5 = NodeInfo::new_multicast(); d5.version = 1; assert_matches!( Crdt::new(d5).err(), Some(Error::CrdtError(CrdtError::BadNodeInfo)) ); let d6 = NodeInfo::new( KeyPair::new().pubkey(), "0.0.0.0:1234".parse().unwrap(), "0.0.0.0:0".parse().unwrap(), "0.0.0.0:0".parse().unwrap(), "0.0.0.0:0".parse().unwrap(), "0.0.0.0:0".parse().unwrap(), ); assert_matches!( Crdt::new(d6).err(), Some(Error::CrdtError(CrdtError::BadGossipAddress)) ); let d7 = NodeInfo::new( KeyPair::new().pubkey(), "0.0.0.1:0".parse().unwrap(), "0.0.0.0:0".parse().unwrap(), "0.0.0.0:0".parse().unwrap(), "0.0.0.0:0".parse().unwrap(), "0.0.0.0:0".parse().unwrap(), ); assert_matches!( Crdt::new(d7).err(), Some(Error::CrdtError(CrdtError::BadGossipAddress)) ); let d8 = NodeInfo::new( KeyPair::new().pubkey(), "0.0.0.1:1234".parse().unwrap(), "0.0.0.0:0".parse().unwrap(), "0.0.0.0:0".parse().unwrap(), "0.0.0.0:0".parse().unwrap(), "0.0.0.0:0".parse().unwrap(), ); assert_eq!(Crdt::new(d8).is_ok(), true); } #[test] fn insert_test() { let mut d = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1238".parse().unwrap(), ); assert_eq!(d.version, 0); let mut crdt = Crdt::new(d.clone()).unwrap(); assert_eq!(crdt.table[&d.id].version, 0); assert!(!crdt.alive.contains_key(&d.id)); d.version = 2; crdt.insert(&d); let liveness = crdt.alive[&d.id]; assert_eq!(crdt.table[&d.id].version, 2); d.version = 1; crdt.insert(&d); assert_eq!(crdt.table[&d.id].version, 2); assert_eq!(liveness, crdt.alive[&d.id]); // Ensure liveness will be updated for version 3 sleep(Duration::from_millis(1)); d.version = 3; crdt.insert(&d); assert_eq!(crdt.table[&d.id].version, 3); assert!(liveness < crdt.alive[&d.id]); } #[test] fn test_new_vote() { let d = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); assert_eq!(d.version, 0); let mut crdt = Crdt::new(d.clone()).unwrap(); assert_eq!(crdt.table[&d.id].version, 0); let leader = NodeInfo::new_leader(&"127.0.0.2:1235".parse().unwrap()); assert_ne!(d.id, leader.id); assert_matches!( crdt.new_vote(Hash::default()).err(), Some(Error::CrdtError(CrdtError::NoLeader)) ); crdt.insert(&leader); assert_matches!( crdt.new_vote(Hash::default()).err(), Some(Error::CrdtError(CrdtError::NoLeader)) ); crdt.set_leader(leader.id); assert_eq!(crdt.table[&d.id].version, 1); let v = Vote { version: 2, //version should increase when we vote contact_info_version: 0, }; let expected = (v, crdt.table[&leader.id].contact_info.tpu); assert_eq!(crdt.new_vote(Hash::default()).unwrap(), expected); } #[test] fn test_insert_vote() { let d = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); assert_eq!(d.version, 0); let mut crdt = Crdt::new(d.clone()).unwrap(); assert_eq!(crdt.table[&d.id].version, 0); let vote_same_version = Vote { version: d.version, contact_info_version: 0, }; crdt.insert_vote(&d.id, &vote_same_version, Hash::default()); assert_eq!(crdt.table[&d.id].version, 0); let vote_new_version_new_addrs = Vote { version: d.version + 1, contact_info_version: 1, }; crdt.insert_vote(&d.id, &vote_new_version_new_addrs, Hash::default()); //should be dropped since the address is newer then we know assert_eq!(crdt.table[&d.id].version, 0); let vote_new_version_old_addrs = Vote { version: d.version + 1, contact_info_version: 0, }; crdt.insert_vote(&d.id, &vote_new_version_old_addrs, Hash::default()); //should be accepted, since the update is for the same address field as the one we know assert_eq!(crdt.table[&d.id].version, 1); } fn sorted(ls: &Vec) -> Vec { let mut copy: Vec<_> = ls.iter().cloned().collect(); copy.sort_by(|x, y| x.id.cmp(&y.id)); copy } #[test] fn replicated_data_new_leader_with_pubkey() { let kp = KeyPair::new(); let d1 = NodeInfo::new_leader_with_pubkey( kp.pubkey().clone(), &"127.0.0.1:1234".parse().unwrap(), ); assert_eq!(d1.id, kp.pubkey()); assert_eq!(d1.contact_info.ncp, "127.0.0.1:1235".parse().unwrap()); assert_eq!(d1.contact_info.tvu, "127.0.0.1:1236".parse().unwrap()); assert_eq!(d1.contact_info.rpu, "127.0.0.1:1237".parse().unwrap()); assert_eq!(d1.contact_info.tpu, "127.0.0.1:1234".parse().unwrap()); assert_eq!( d1.contact_info.tvu_window, "127.0.0.1:1238".parse().unwrap() ); } #[test] fn update_test() { let d1 = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1238".parse().unwrap(), ); let d2 = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1238".parse().unwrap(), ); let d3 = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1238".parse().unwrap(), ); let mut crdt = Crdt::new(d1.clone()).expect("Crdt::new"); let (key, ix, ups) = crdt.get_updates_since(0); assert_eq!(key, d1.id); assert_eq!(ix, 1); assert_eq!(ups.len(), 1); assert_eq!(sorted(&ups), sorted(&vec![d1.clone()])); crdt.insert(&d2); let (key, ix, ups) = crdt.get_updates_since(0); assert_eq!(key, d1.id); assert_eq!(ix, 2); assert_eq!(ups.len(), 2); assert_eq!(sorted(&ups), sorted(&vec![d1.clone(), d2.clone()])); crdt.insert(&d3); let (key, ix, ups) = crdt.get_updates_since(0); assert_eq!(key, d1.id); assert_eq!(ix, 3); assert_eq!(ups.len(), 3); assert_eq!( sorted(&ups), sorted(&vec![d1.clone(), d2.clone(), d3.clone()]) ); let mut crdt2 = Crdt::new(d2.clone()).expect("Crdt::new"); crdt2.apply_updates(key, ix, &ups, &vec![]); assert_eq!(crdt2.table.values().len(), 3); assert_eq!( sorted(&crdt2.table.values().map(|x| x.clone()).collect()), sorted(&crdt.table.values().map(|x| x.clone()).collect()) ); let d4 = NodeInfo::new_entry_point("127.0.0.4:1234".parse().unwrap()); crdt.insert(&d4); let (_key, _ix, ups) = crdt.get_updates_since(0); assert_eq!(sorted(&ups), sorted(&vec![d2.clone(), d1, d3])); } #[test] fn window_index_request() { let me = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1238".parse().unwrap(), ); let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); let rv = crdt.window_index_request(0); assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers))); let nxt = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(), "0.0.0.0:0".parse().unwrap(), ); crdt.insert(&nxt); let rv = crdt.window_index_request(0); assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers))); let nxt = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.2:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1238".parse().unwrap(), ); crdt.insert(&nxt); let rv = crdt.window_index_request(0).unwrap(); assert_eq!(nxt.contact_info.ncp, "127.0.0.2:1234".parse().unwrap()); assert_eq!(rv.0, "127.0.0.2:1234".parse().unwrap()); let nxt = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.3:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1238".parse().unwrap(), ); crdt.insert(&nxt); let mut one = false; let mut two = false; while !one || !two { //this randomly picks an option, so eventually it should pick both let rv = crdt.window_index_request(0).unwrap(); if rv.0 == "127.0.0.2:1234".parse().unwrap() { one = true; } if rv.0 == "127.0.0.3:1234".parse().unwrap() { two = true; } } assert!(one && two); } #[test] fn gossip_request_bad_addr() { let me = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.1:127".parse().unwrap(), "127.0.0.1:127".parse().unwrap(), "127.0.0.1:127".parse().unwrap(), "127.0.0.1:127".parse().unwrap(), "127.0.0.1:127".parse().unwrap(), ); let mut crdt = Crdt::new(me).expect("Crdt::new"); let nxt1 = NodeInfo::new_unspecified(); // Filter out unspecified addresses crdt.insert(&nxt1); //<--- attack! let rv = crdt.gossip_request(); assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers))); let nxt2 = NodeInfo::new_multicast(); // Filter out multicast addresses crdt.insert(&nxt2); //<--- attack! let rv = crdt.gossip_request(); assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers))); } /// test that gossip requests are eventually generated for all nodes #[test] fn gossip_request() { let me = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1238".parse().unwrap(), ); let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); let rv = crdt.gossip_request(); assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers))); let nxt1 = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.2:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1238".parse().unwrap(), ); crdt.insert(&nxt1); let rv = crdt.gossip_request().unwrap(); assert_eq!(rv.0, nxt1.contact_info.ncp); let nxt2 = NodeInfo::new_entry_point("127.0.0.3:1234".parse().unwrap()); crdt.insert(&nxt2); // check that the service works // and that it eventually produces a request for both nodes let (sender, reader) = channel(); let recycler = BlobRecycler::default(); let exit = Arc::new(AtomicBool::new(false)); let obj = Arc::new(RwLock::new(crdt)); let thread = Crdt::gossip(obj, recycler, sender, exit.clone()); let mut one = false; let mut two = false; for _ in 0..30 { //50% chance each try that we get a repeat let mut rv = reader.recv_timeout(Duration::new(1, 0)).unwrap(); while let Ok(mut more) = reader.try_recv() { rv.append(&mut more); } assert!(rv.len() > 0); for i in rv.iter() { if i.read().unwrap().meta.addr() == nxt1.contact_info.ncp { one = true; } else if i.read().unwrap().meta.addr() == nxt2.contact_info.ncp { two = true; } else { //unexpected request assert!(false); } } if one && two { break; } } exit.store(true, Ordering::Relaxed); thread.join().unwrap(); //created requests to both assert!(one && two); } #[test] fn purge_test() { logger::setup(); let me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); let nxt = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap()); assert_ne!(me.id, nxt.id); crdt.set_leader(me.id); crdt.insert(&nxt); let rv = crdt.gossip_request().unwrap(); assert_eq!(rv.0, nxt.contact_info.ncp); let now = crdt.alive[&nxt.id]; crdt.purge(now); let rv = crdt.gossip_request().unwrap(); assert_eq!(rv.0, nxt.contact_info.ncp); crdt.purge(now + GOSSIP_PURGE_MILLIS); let rv = crdt.gossip_request().unwrap(); assert_eq!(rv.0, nxt.contact_info.ncp); crdt.purge(now + GOSSIP_PURGE_MILLIS + 1); let rv = crdt.gossip_request().unwrap(); assert_eq!(rv.0, nxt.contact_info.ncp); let nxt2 = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap()); assert_ne!(me.id, nxt2.id); assert_ne!(nxt.id, nxt2.id); crdt.insert(&nxt2); while now == crdt.alive[&nxt2.id] { sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); crdt.insert(&nxt2); } let len = crdt.table.len() as u64; assert!((MIN_TABLE_SIZE as u64) < len); crdt.purge(now + GOSSIP_PURGE_MILLIS); assert_eq!(len as usize, crdt.table.len()); trace!("purging"); crdt.purge(now + GOSSIP_PURGE_MILLIS + 1); assert_eq!(len as usize - 1, crdt.table.len()); let rv = crdt.gossip_request().unwrap(); assert_eq!(rv.0, nxt.contact_info.ncp); } #[test] fn purge_leader_test() { logger::setup(); let me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); let nxt = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap()); assert_ne!(me.id, nxt.id); crdt.insert(&nxt); crdt.set_leader(nxt.id); let now = crdt.alive[&nxt.id]; let mut nxt2 = NodeInfo::new_leader(&"127.0.0.2:1234".parse().unwrap()); crdt.insert(&nxt2); while now == crdt.alive[&nxt2.id] { sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); nxt2.version = nxt2.version + 1; crdt.insert(&nxt2); } let len = crdt.table.len() as u64; crdt.purge(now + GOSSIP_PURGE_MILLIS + 1); assert_eq!(len as usize - 1, crdt.table.len()); assert_eq!(crdt.my_data().leader_id, PublicKey::default()); assert!(crdt.leader_data().is_none()); } /// test window requests respond with the right blob, and do not overrun #[test] fn run_window_request() { logger::setup(); let window = default_window(); let me = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1238".parse().unwrap(), ); let recycler = BlobRecycler::default(); let rv = Crdt::run_window_request(&window, &mut None, &me, &me, 0, &recycler); assert!(rv.is_none()); let out = recycler.allocate(); out.write().unwrap().meta.size = 200; window.write().unwrap()[0].data = Some(out); let rv = Crdt::run_window_request(&window, &mut None, &me, &me, 0, &recycler); assert!(rv.is_some()); let v = rv.unwrap(); //test we copied the blob assert_eq!(v.read().unwrap().meta.size, 200); let len = window.read().unwrap().len() as u64; let rv = Crdt::run_window_request(&window, &mut None, &me, &me, len, &recycler); assert!(rv.is_none()); fn tmp_ledger(name: &str) -> String { let keypair = KeyPair::new(); let path = format!("/tmp/farf/{}-{}", name, keypair.pubkey()); let mut writer = LedgerWriter::new(&path, true).unwrap(); let zero = Hash::default(); let one = hash(&zero.as_ref()); writer .write_entries(vec![Entry::new_tick(0, &zero), Entry::new_tick(0, &one)].to_vec()) .unwrap(); path } let ledger_path = tmp_ledger("run_window_request"); let mut ledger_window = LedgerWindow::new(&ledger_path).unwrap(); let rv = Crdt::run_window_request( &window, &mut Some(&mut ledger_window), &me, &me, 1, &recycler, ); assert!(rv.is_some()); remove_dir_all(ledger_path).unwrap(); } /// test window requests respond with the right blob, and do not overrun #[test] fn run_window_request_with_backoff() { let window = default_window(); let mut me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); me.leader_id = me.id; let mock_peer = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); let recycler = BlobRecycler::default(); // Simulate handling a repair request from mock_peer let rv = Crdt::run_window_request(&window, &mut None, &me, &mock_peer, 0, &recycler); assert!(rv.is_none()); let blob = recycler.allocate(); let blob_size = 200; blob.write().unwrap().meta.size = blob_size; window.write().unwrap()[0].data = Some(blob); let num_requests: u32 = 64; for i in 0..num_requests { let shared_blob = Crdt::run_window_request( &window, &mut None, &me, &mock_peer, 0, &recycler, ).unwrap(); let blob = shared_blob.read().unwrap(); // Test we copied the blob assert_eq!(blob.meta.size, blob_size); let id = if i == 0 || i.is_power_of_two() { me.id } else { mock_peer.id }; assert_eq!(blob.get_id().unwrap(), id); } } /// TODO: This is obviously the wrong way to do this. Need to implement leader selection, /// delete this test after leader selection is correctly implemented #[test] fn test_update_leader() { logger::setup(); let me = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); let leader0 = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); let leader1 = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); assert_eq!(crdt.top_leader(), None); crdt.set_leader(leader0.id); assert_eq!(crdt.top_leader().unwrap(), leader0.id); //add a bunch of nodes with a new leader for _ in 0..10 { let mut dum = NodeInfo::new_entry_point("127.0.0.1:1234".parse().unwrap()); dum.id = KeyPair::new().pubkey(); dum.leader_id = leader1.id; crdt.insert(&dum); } assert_eq!(crdt.top_leader().unwrap(), leader1.id); crdt.update_leader(); assert_eq!(crdt.my_data().leader_id, leader0.id); crdt.insert(&leader1); crdt.update_leader(); assert_eq!(crdt.my_data().leader_id, leader1.id); } /// Validates the node that sent Protocol::ReceiveUpdates gets its /// liveness updated, but not if the node sends Protocol::ReceiveUpdates /// to itself. #[test] fn protocol_requestupdate_alive() { logger::setup(); let window = default_window(); let recycler = BlobRecycler::default(); let node = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); let node_with_same_addr = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); assert_ne!(node.id, node_with_same_addr.id); let node_with_diff_addr = NodeInfo::new_leader(&"127.0.0.1:4321".parse().unwrap()); let crdt = Crdt::new(node.clone()).expect("Crdt::new"); assert_eq!(crdt.alive.len(), 0); let obj = Arc::new(RwLock::new(crdt)); let request = Protocol::RequestUpdates(1, node.clone()); assert!(Crdt::handle_protocol(request, &obj, &window, &mut None, &recycler).is_none()); let request = Protocol::RequestUpdates(1, node_with_same_addr.clone()); assert!(Crdt::handle_protocol(request, &obj, &window, &mut None, &recycler).is_none()); let request = Protocol::RequestUpdates(1, node_with_diff_addr.clone()); Crdt::handle_protocol(request, &obj, &window, &mut None, &recycler); let me = obj.write().unwrap(); // |node| and |node_with_same_addr| should not be in me.alive, but // |node_with_diff_addr| should now be. assert!(!me.alive.contains_key(&node.id)); assert!(!me.alive.contains_key(&node_with_same_addr.id)); assert!(me.alive[&node_with_diff_addr.id] > 0); } #[test] fn test_is_valid_address() { let bad_address_port = "127.0.0.1:0".parse().unwrap(); assert!(!Crdt::is_valid_address(bad_address_port)); let bad_address_unspecified = "0.0.0.0:1234".parse().unwrap(); assert!(!Crdt::is_valid_address(bad_address_unspecified)); let bad_address_multicast = "224.254.0.0:1234".parse().unwrap(); assert!(!Crdt::is_valid_address(bad_address_multicast)); } #[test] fn test_default_leader() { logger::setup(); let node_info = NodeInfo::new( KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1235".parse().unwrap(), "127.0.0.1:1236".parse().unwrap(), "127.0.0.1:1237".parse().unwrap(), "127.0.0.1:1238".parse().unwrap(), ); let mut crdt = Crdt::new(node_info).unwrap(); let network_entry_point = NodeInfo::new_entry_point("127.0.0.1:1239".parse().unwrap()); crdt.insert(&network_entry_point); assert!(crdt.leader_data().is_none()); } }