solana/src/crdt.rs

1029 lines
37 KiB
Rust
Raw Normal View History

2018-04-21 11:02:49 -07:00
//! The `crdt` module defines a data structure that is shared by all the nodes in the network over
2018-04-28 00:31:20 -07:00
//! a gossip control plane. The goal is to share small bits of off-chain information and detect and
2018-04-21 11:02:49 -07:00
//! repair partitions.
//!
//! This CRDT only supports a very limited set of types. A map of PublicKey -> Versioned Struct.
2018-05-15 04:35:41 -07:00
//! The last version is always picked during an update.
2018-04-28 00:31:20 -07:00
//!
//! The network is arranged in layers:
//!
//! * layer 0 - Leader.
//! * layer 1 - As many nodes as we can fit
//! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes.
//!
2018-05-14 14:33:11 -07:00
//! Bank needs to provide an interface for us to query the stake weight
2018-04-21 11:02:49 -07:00
use bincode::{deserialize, serialize};
use byteorder::{LittleEndian, ReadBytesExt};
use hash::Hash;
use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE};
use pnet::datalink;
2018-04-28 00:31:20 -07:00
use rayon::prelude::*;
use result::{Error, Result};
2018-04-21 11:02:49 -07:00
use ring::rand::{SecureRandom, SystemRandom};
use signature::{KeyPair, KeyPairUtil, PublicKey, Signature};
2018-05-16 23:11:51 -07:00
use std;
2018-04-21 11:02:49 -07:00
use std::collections::HashMap;
use std::collections::VecDeque;
2018-04-21 11:02:49 -07:00
use std::io::Cursor;
use std::net::{IpAddr, SocketAddr, UdpSocket};
2018-04-21 11:02:49 -07:00
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
2018-05-30 13:25:32 -07:00
use std::thread::{sleep, Builder, JoinHandle};
2018-04-21 11:02:49 -07:00
use std::time::Duration;
use streamer::{BlobReceiver, BlobSender, Window};
2018-04-21 11:02:49 -07:00
pub fn parse_port_or_addr(optstr: Option<String>) -> SocketAddr {
let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address");
if let Some(addrstr) = optstr {
if let Ok(port) = addrstr.parse() {
let mut addr = daddr.clone();
addr.set_port(port);
addr
} else if let Ok(addr) = addrstr.parse() {
addr
} else {
daddr
}
} else {
daddr
}
}
pub fn get_ip_addr() -> Option<IpAddr> {
for iface in datalink::interfaces() {
for p in iface.ips {
if !p.ip().is_loopback() && !p.ip().is_multicast() {
match p.ip() {
IpAddr::V4(addr) => {
if !addr.is_link_local() {
return Some(p.ip());
}
}
IpAddr::V6(_addr) => {
// Select an ipv6 address if the config is selected
#[cfg(feature = "ipv6")]
{
return Some(p.ip());
}
}
}
}
}
}
None
}
2018-04-21 11:02:49 -07:00
/// Structure to be replicated by the network
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
2018-04-21 11:02:49 -07:00
pub struct ReplicatedData {
2018-04-28 00:31:20 -07:00
pub id: PublicKey,
2018-04-21 11:02:49 -07:00
sig: Signature,
/// should always be increasing
pub version: u64,
2018-04-21 11:02:49 -07:00
/// address to connect to for gossip
2018-04-28 00:31:20 -07:00
pub gossip_addr: SocketAddr,
2018-04-21 11:02:49 -07:00
/// address to connect to for replication
2018-04-28 00:31:20 -07:00
pub replicate_addr: SocketAddr,
2018-04-21 11:02:49 -07:00
/// address to connect to when this node is leader
2018-05-23 07:11:11 -07:00
pub requests_addr: SocketAddr,
2018-05-25 14:51:41 -07:00
/// transactions address
pub transactions_addr: SocketAddr,
/// repair address, we use this to jump ahead of the packets
/// destined to the replciate_addr
pub repair_addr: SocketAddr,
2018-04-21 11:02:49 -07:00
/// current leader identity
pub current_leader_id: PublicKey,
2018-04-21 11:02:49 -07:00
/// last verified hash that was submitted to the leader
last_verified_hash: Hash,
/// last verified count, always increasing
last_verified_count: u64,
}
impl ReplicatedData {
2018-04-28 00:31:20 -07:00
pub fn new(
id: PublicKey,
gossip_addr: SocketAddr,
replicate_addr: SocketAddr,
2018-05-23 07:11:11 -07:00
requests_addr: SocketAddr,
2018-05-25 14:51:41 -07:00
transactions_addr: SocketAddr,
repair_addr: SocketAddr,
2018-04-28 00:31:20 -07:00
) -> ReplicatedData {
2018-04-21 11:02:49 -07:00
ReplicatedData {
id,
sig: Signature::default(),
version: 0,
gossip_addr,
2018-04-28 00:31:20 -07:00
replicate_addr,
2018-05-23 07:11:11 -07:00
requests_addr,
2018-05-25 14:51:41 -07:00
transactions_addr,
repair_addr,
2018-04-21 11:02:49 -07:00
current_leader_id: PublicKey::default(),
last_verified_hash: Hash::default(),
last_verified_count: 0,
}
}
fn next_port(addr: &SocketAddr, nxt: u16) -> SocketAddr {
let mut nxt_addr = addr.clone();
nxt_addr.set_port(addr.port() + nxt);
nxt_addr
}
pub fn new_leader(bind_addr: &SocketAddr) -> Self {
let transactions_addr = bind_addr.clone();
let gossip_addr = Self::next_port(&bind_addr, 1);
let replicate_addr = Self::next_port(&bind_addr, 2);
let requests_addr = Self::next_port(&bind_addr, 3);
let repair_addr = Self::next_port(&bind_addr, 4);
let pubkey = KeyPair::new().pubkey();
ReplicatedData::new(
pubkey,
gossip_addr,
replicate_addr,
requests_addr,
transactions_addr,
repair_addr,
)
}
pub fn new_entry_point(gossip_addr: SocketAddr) -> Self {
let daddr: SocketAddr = "0.0.0.0:0".parse().unwrap();
ReplicatedData::new(
PublicKey::default(),
gossip_addr,
daddr.clone(),
daddr.clone(),
daddr.clone(),
daddr,
)
}
2018-04-21 11:02:49 -07:00
}
/// `Crdt` structure keeps a table of `ReplicatedData` structs
/// # Properties
/// * `table` - map of public id's to versioned and signed ReplicatedData structs
/// * `local` - map of public id's to what `self.update_index` `self.table` was updated
/// * `remote` - map of public id's to the `remote.update_index` was sent
/// * `update_index` - my update index
/// # Remarks
/// This implements two services, `gossip` and `listen`.
/// * `gossip` - asynchronously ask nodes to send updates
/// * `listen` - listen for requests and responses
/// No attempt to keep track of timeouts or dropped requests is made, or should be.
pub struct Crdt {
2018-05-12 19:00:22 -07:00
pub table: HashMap<PublicKey, ReplicatedData>,
2018-04-21 11:02:49 -07:00
/// Value of my update index when entry in table was updated.
/// Nodes will ask for updates since `update_index`, and this node
/// should respond with all the identities that are greater then the
/// request's `update_index` in this list
local: HashMap<PublicKey, u64>,
2018-05-15 04:35:41 -07:00
/// The value of the remote update index that I have last seen
2018-04-21 11:02:49 -07:00
/// This Node will ask external nodes for updates since the value in this list
pub remote: HashMap<PublicKey, u64>,
2018-04-28 00:31:20 -07:00
pub update_index: u64,
2018-05-12 19:00:22 -07:00
pub me: PublicKey,
2018-04-21 11:02:49 -07:00
}
// TODO These messages should be signed, and go through the gpu pipeline for spam filtering
2018-06-03 19:59:17 -07:00
#[derive(Serialize, Deserialize, Debug)]
2018-04-21 11:02:49 -07:00
enum Protocol {
2018-04-26 13:48:42 -07:00
/// forward your own latest data structure when requesting an update
/// this doesn't update the `remote` update index, but it allows the
/// recepient of this request to add knowledge of this node to the network
RequestUpdates(u64, ReplicatedData),
2018-04-21 11:02:49 -07:00
//TODO might need a since?
/// from id, form's last update index, ReplicatedData
ReceiveUpdates(PublicKey, u64, Vec<ReplicatedData>),
2018-05-12 19:00:22 -07:00
/// ask for a missing index
RequestWindowIndex(ReplicatedData, u64),
2018-04-21 11:02:49 -07:00
}
impl Crdt {
pub fn new(me: ReplicatedData) -> Crdt {
assert_eq!(me.version, 0);
let mut g = Crdt {
table: HashMap::new(),
local: HashMap::new(),
remote: HashMap::new(),
me: me.id,
update_index: 1,
};
g.local.insert(me.id, g.update_index);
g.table.insert(me.id, me);
g
}
2018-04-28 00:31:20 -07:00
pub fn my_data(&self) -> &ReplicatedData {
&self.table[&self.me]
}
pub fn leader_data(&self) -> &ReplicatedData {
&self.table[&self.table[&self.me].current_leader_id]
}
pub fn set_leader(&mut self, key: PublicKey) -> () {
let mut me = self.my_data().clone();
me.current_leader_id = key;
me.version += 1;
2018-05-12 19:00:22 -07:00
self.insert(&me);
2018-04-26 13:48:42 -07:00
}
2018-04-28 00:31:20 -07:00
2018-05-12 19:00:22 -07:00
pub fn insert(&mut self, v: &ReplicatedData) {
2018-04-28 00:31:20 -07:00
// TODO check that last_verified types are always increasing
2018-04-21 11:02:49 -07:00
if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) {
2018-04-28 00:31:20 -07:00
//somehow we signed a message for our own identity with a higher version that
// we have stored ourselves
trace!(
"me: {:?} v.id: {:?} version: {}",
&self.me[..4],
&v.id[..4],
v.version
);
2018-04-21 11:02:49 -07:00
self.update_index += 1;
2018-04-28 00:31:20 -07:00
let _ = self.table.insert(v.id.clone(), v.clone());
2018-04-21 11:02:49 -07:00
let _ = self.local.insert(v.id, self.update_index);
} else {
2018-04-28 00:31:20 -07:00
trace!(
"INSERT FAILED me: {:?} data: {:?} new.version: {} me.version: {}",
&self.me[..4],
&v.id[..4],
2018-04-28 00:31:20 -07:00
v.version,
self.table[&v.id].version
);
2018-04-21 11:02:49 -07:00
}
}
2018-04-28 00:31:20 -07:00
2018-05-25 11:36:31 -07:00
pub fn index_blobs(
obj: &Arc<RwLock<Self>>,
blobs: &Vec<SharedBlob>,
receive_index: &mut u64,
2018-05-25 11:36:31 -07:00
) -> Result<()> {
let me: ReplicatedData = {
let robj = obj.read().expect("'obj' read lock in crdt::index_blobs");
2018-05-30 16:33:05 -07:00
debug!("broadcast table {}", robj.table.len());
2018-05-25 11:36:31 -07:00
robj.table[&robj.me].clone()
};
// enumerate all the blobs, those are the indices
2018-05-29 18:50:36 -07:00
for (i, b) in blobs.iter().enumerate() {
// only leader should be broadcasting
let mut blob = b.write().expect("'blob' write lock in crdt::index_blobs");
blob.set_id(me.id).expect("set_id in pub fn broadcast");
blob.set_index(*receive_index + i as u64)
2018-05-29 18:50:36 -07:00
.expect("set_index in pub fn broadcast");
}
2018-05-25 11:36:31 -07:00
Ok(())
}
2018-04-28 00:31:20 -07:00
/// broadcast messages from the leader to layer 1 nodes
/// # Remarks
/// We need to avoid having obj locked while doing any io, such as the `send_to`
pub fn broadcast(
obj: &Arc<RwLock<Self>>,
window: &Window,
2018-04-28 00:31:20 -07:00
s: &UdpSocket,
transmit_index: &mut u64,
2018-06-05 12:19:34 -07:00
received_index: u64,
2018-04-28 00:31:20 -07:00
) -> Result<()> {
let (me, table): (ReplicatedData, Vec<ReplicatedData>) = {
2018-05-15 04:35:41 -07:00
// copy to avoid locking during IO
2018-05-09 18:10:48 -07:00
let robj = obj.read().expect("'obj' read lock in pub fn broadcast");
2018-05-24 16:26:05 -07:00
trace!("broadcast table {}", robj.table.len());
2018-04-28 00:31:20 -07:00
let cloned_table: Vec<ReplicatedData> = robj.table.values().cloned().collect();
(robj.table[&robj.me].clone(), cloned_table)
};
let daddr = "0.0.0.0:0".parse().unwrap();
2018-05-12 19:00:22 -07:00
let nodes: Vec<&ReplicatedData> = table
2018-04-28 00:31:20 -07:00
.iter()
.filter(|v| {
2018-04-28 00:31:20 -07:00
if me.id == v.id {
//filter myself
false
} else if v.replicate_addr == daddr {
//filter nodes that are not listening
false
} else {
2018-05-24 16:26:05 -07:00
trace!("broadcast node {}", v.replicate_addr);
true
2018-04-28 00:31:20 -07:00
}
})
2018-05-12 19:00:22 -07:00
.collect();
2018-05-14 22:06:42 -07:00
if nodes.len() < 1 {
warn!("crdt too small");
2018-05-15 07:24:52 -07:00
return Err(Error::CrdtTooSmall);
2018-05-14 22:06:42 -07:00
}
2018-05-30 16:33:05 -07:00
trace!("nodes table {}", nodes.len());
// enumerate all the blobs in the window, those are the indices
2018-05-12 19:00:22 -07:00
// transmit them to nodes, starting from a different node
let mut orders = Vec::new();
2018-05-25 18:21:18 -07:00
let window_l = window.write().unwrap();
2018-06-05 12:19:34 -07:00
for i in *transmit_index..received_index {
let is = i as usize;
let k = is % window_l.len();
assert!(window_l[k].is_some());
orders.push((window_l[k].clone(), nodes[is % nodes.len()]));
}
2018-05-30 16:33:05 -07:00
trace!("orders table {}", orders.len());
let errs: Vec<_> = orders
2018-05-12 19:00:22 -07:00
.into_iter()
.map(|(b, v)| {
2018-04-28 00:31:20 -07:00
// only leader should be broadcasting
assert!(me.current_leader_id != v.id);
let bl = b.unwrap();
2018-05-25 18:21:18 -07:00
let blob = bl.read().expect("blob read lock in streamer::broadcast");
//TODO profile this, may need multiple sockets for par_iter
2018-06-05 12:24:39 -07:00
trace!(
"broadcast idx: {} sz: {} to {} coding: {}",
blob.get_index().unwrap(),
blob.meta.size,
v.replicate_addr,
blob.is_coding()
);
assert!(blob.meta.size < BLOB_SIZE);
2018-05-12 19:00:22 -07:00
let e = s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr);
2018-05-24 16:26:05 -07:00
trace!("done broadcast {} to {}", blob.meta.size, v.replicate_addr);
2018-05-12 19:00:22 -07:00
e
2018-04-28 00:31:20 -07:00
})
.collect();
2018-05-30 16:33:05 -07:00
trace!("broadcast results {}", errs.len());
2018-04-28 00:31:20 -07:00
for e in errs {
match e {
2018-05-12 19:00:22 -07:00
Err(e) => {
error!("broadcast result {:?}", e);
return Err(Error::IO(e));
}
2018-04-28 00:31:20 -07:00
_ => (),
}
*transmit_index += 1;
}
Ok(())
}
/// retransmit messages from the leader to layer 1 nodes
/// # Remarks
/// We need to avoid having obj locked while doing any io, such as the `send_to`
pub fn retransmit(obj: &Arc<RwLock<Self>>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> {
let (me, table): (ReplicatedData, Vec<ReplicatedData>) = {
2018-05-15 04:35:41 -07:00
// copy to avoid locking during IO
2018-05-09 18:10:48 -07:00
let s = obj.read().expect("'obj' read lock in pub fn retransmit");
2018-04-28 00:31:20 -07:00
(s.table[&s.me].clone(), s.table.values().cloned().collect())
};
2018-05-12 19:00:22 -07:00
blob.write()
.unwrap()
.set_id(me.id)
.expect("set_id in pub fn retransmit");
let rblob = blob.read().unwrap();
let daddr = "0.0.0.0:0".parse().unwrap();
let orders: Vec<_> = table
.iter()
.filter(|v| {
2018-04-28 00:31:20 -07:00
if me.id == v.id {
false
} else if me.current_leader_id == v.id {
trace!("skip retransmit to leader {:?}", v.id);
false
} else if v.replicate_addr == daddr {
trace!("skip nodes that are not listening {:?}", v.id);
false
} else {
true
2018-04-28 00:31:20 -07:00
}
})
.collect();
let errs: Vec<_> = orders
.par_iter()
.map(|v| {
2018-05-24 16:26:05 -07:00
trace!(
2018-05-12 19:00:22 -07:00
"retransmit blob {} to {}",
rblob.get_index().unwrap(),
v.replicate_addr
);
//TODO profile this, may need multiple sockets for par_iter
assert!(rblob.meta.size < BLOB_SIZE);
2018-04-28 00:31:20 -07:00
s.send_to(&rblob.data[..rblob.meta.size], &v.replicate_addr)
})
.collect();
for e in errs {
match e {
2018-05-12 19:00:22 -07:00
Err(e) => {
info!("retransmit error {:?}", e);
return Err(Error::IO(e));
}
2018-04-28 00:31:20 -07:00
_ => (),
}
}
Ok(())
}
2018-05-16 22:58:32 -07:00
// max number of nodes that we could be converged to
2018-05-16 22:54:06 -07:00
pub fn convergence(&self) -> u64 {
2018-05-16 23:17:45 -07:00
let max = self.remote.values().len() as u64 + 1;
2018-05-16 23:18:56 -07:00
self.remote.values().fold(max, |a, b| std::cmp::min(a, *b))
2018-05-16 22:54:06 -07:00
}
2018-05-16 23:11:51 -07:00
2018-04-21 11:02:49 -07:00
fn random() -> u64 {
let rnd = SystemRandom::new();
let mut buf = [0u8; 8];
2018-05-09 18:10:48 -07:00
rnd.fill(&mut buf).expect("rnd.fill in pub fn random");
2018-04-21 11:02:49 -07:00
let mut rdr = Cursor::new(&buf);
2018-05-11 11:38:52 -07:00
rdr.read_u64::<LittleEndian>()
.expect("rdr.read_u64 in fn random")
2018-04-21 11:02:49 -07:00
}
fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec<ReplicatedData>) {
2018-04-28 00:31:20 -07:00
//trace!("get updates since {}", v);
2018-05-30 12:07:28 -07:00
let data = self.table
2018-04-21 11:02:49 -07:00
.values()
.filter(|x| x.id != PublicKey::default() && self.local[&x.id] > v)
2018-04-21 11:02:49 -07:00
.cloned()
.collect();
let id = self.me;
let ups = self.update_index;
(id, ups, data)
}
2018-05-12 19:00:22 -07:00
pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec<u8>)> {
let daddr = "0.0.0.0:0".parse().unwrap();
2018-05-30 12:07:28 -07:00
let valid: Vec<_> = self.table
.values()
.filter(|r| r.id != self.me && r.repair_addr != daddr)
.collect();
if valid.is_empty() {
2018-05-15 07:24:52 -07:00
return Err(Error::CrdtTooSmall);
2018-05-12 19:00:22 -07:00
}
let n = (Self::random() as usize) % valid.len();
let addr = valid[n].gossip_addr.clone();
2018-05-12 19:00:22 -07:00
let req = Protocol::RequestWindowIndex(self.table[&self.me].clone(), ix);
let out = serialize(&req)?;
Ok((addr, out))
}
2018-04-21 11:02:49 -07:00
/// Create a random gossip request
/// # Returns
2018-04-28 00:31:20 -07:00
/// (A,B)
/// * A - Address to send to
/// * B - RequestUpdates protocol message
fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> {
2018-05-12 19:00:22 -07:00
let options: Vec<_> = self.table.values().filter(|v| v.id != self.me).collect();
if options.len() < 1 {
trace!(
"crdt too small for gossip {:?} {}",
&self.me[..4],
self.table.len()
);
2018-05-15 07:24:52 -07:00
return Err(Error::CrdtTooSmall);
}
2018-05-12 19:00:22 -07:00
let n = (Self::random() as usize) % options.len();
let v = options[n].clone();
2018-04-21 11:02:49 -07:00
let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0);
2018-04-26 13:48:42 -07:00
let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone());
trace!(
"created gossip request from {:?} to {:?} {}",
&self.me[..4],
&v.id[..4],
v.gossip_addr
);
Ok((v.gossip_addr, req))
2018-04-21 11:02:49 -07:00
}
/// At random pick a node and try to get updated changes from them
fn run_gossip(
obj: &Arc<RwLock<Self>>,
blob_sender: &BlobSender,
blob_recycler: &BlobRecycler,
) -> Result<()> {
2018-04-21 11:02:49 -07:00
//TODO we need to keep track of stakes and weight the selection by stake size
//TODO cache sockets
// Lock the object only to do this operation and not for any longer
// especially not when doing the `sock.send_to`
2018-05-30 12:07:28 -07:00
let (remote_gossip_addr, req) = obj.read()
2018-05-11 11:38:52 -07:00
.expect("'obj' read lock in fn run_gossip")
.gossip_request()?;
2018-04-21 11:02:49 -07:00
// TODO this will get chatty, so we need to first ask for number of updates since
// then only ask for specific data that we dont have
let blob = to_blob(req, remote_gossip_addr, blob_recycler)?;
let mut q: VecDeque<SharedBlob> = VecDeque::new();
q.push_back(blob);
blob_sender.send(q)?;
2018-04-21 11:02:49 -07:00
Ok(())
}
/// Apply updates that we received from the identity `from`
/// # Arguments
/// * `from` - identity of the sender of the updates
/// * `update_index` - the number of updates that `from` has completed and this set of `data` represents
/// * `data` - the update data
2018-04-26 13:48:42 -07:00
fn apply_updates(&mut self, from: PublicKey, update_index: u64, data: &[ReplicatedData]) {
2018-04-21 11:02:49 -07:00
trace!("got updates {}", data.len());
// TODO we need to punish/spam resist here
// sig verify the whole update and slash anyone who sends a bad update
for v in data {
2018-05-12 19:00:22 -07:00
self.insert(&v);
2018-04-21 11:02:49 -07:00
}
*self.remote.entry(from).or_insert(update_index) = update_index;
}
/// randomly pick a node and ask them for updates asynchronously
pub fn gossip(
obj: Arc<RwLock<Self>>,
blob_recycler: BlobRecycler,
blob_sender: BlobSender,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
2018-05-30 13:25:32 -07:00
Builder::new()
.name("solana-gossip".to_string())
.spawn(move || loop {
let _ = Self::run_gossip(&obj, &blob_sender, &blob_recycler);
if exit.load(Ordering::Relaxed) {
return;
}
2018-06-07 11:39:25 -07:00
//TODO: possibly tune this parameter
//we saw a deadlock passing an obj.read().unwrap().timeout into sleep
sleep(Duration::from_millis(100));
2018-05-30 13:25:32 -07:00
})
.unwrap()
2018-04-21 11:02:49 -07:00
}
2018-05-12 19:00:22 -07:00
fn run_window_request(
window: &Window,
2018-05-12 19:00:22 -07:00
from: &ReplicatedData,
ix: u64,
blob_recycler: &BlobRecycler,
) -> Option<SharedBlob> {
2018-05-12 19:00:22 -07:00
let pos = (ix as usize) % window.read().unwrap().len();
if let Some(blob) = &window.read().unwrap()[pos] {
2018-05-12 19:00:22 -07:00
let rblob = blob.read().unwrap();
let blob_ix = rblob.get_index().expect("run_window_request get_index");
if blob_ix == ix {
let out = blob_recycler.allocate();
2018-05-12 19:00:22 -07:00
// copy to avoid doing IO inside the lock
{
let mut outblob = out.write().unwrap();
let sz = rblob.meta.size;
outblob.meta.size = sz;
outblob.data[..sz].copy_from_slice(&rblob.data[..sz]);
outblob.meta.set_addr(&from.repair_addr);
//TODO, set the sender id to the requester so we dont retransmit
//come up with a cleaner solution for this when sender signatures are checked
outblob.set_id(from.id).expect("blob set_id");
}
return Some(out);
2018-05-12 19:00:22 -07:00
}
} else {
assert!(window.read().unwrap()[pos].is_none());
info!("failed RequestWindowIndex {} {}", ix, from.repair_addr);
2018-05-12 19:00:22 -07:00
}
None
2018-05-12 19:00:22 -07:00
}
//TODO we should first coalesce all the requests
fn handle_blob(
2018-05-12 19:00:22 -07:00
obj: &Arc<RwLock<Self>>,
window: &Window,
blob_recycler: &BlobRecycler,
blob: &Blob,
) -> Option<SharedBlob> {
match deserialize(&blob.data[..blob.meta.size]) {
2018-04-21 11:02:49 -07:00
// TODO sigverify these
Ok(Protocol::RequestUpdates(v, reqdata)) => {
2018-06-07 08:43:43 -07:00
trace!("RequestUpdates {}", v);
2018-04-26 13:48:42 -07:00
let addr = reqdata.gossip_addr;
2018-05-15 04:35:41 -07:00
// only lock for this call, dont lock during IO `sock.send_to` or `sock.recv_from`
2018-05-30 12:07:28 -07:00
let (from, ups, data) = obj.read()
2018-05-11 11:38:52 -07:00
.expect("'obj' read lock in RequestUpdates")
.get_updates_since(v);
2018-04-21 11:02:49 -07:00
trace!("get updates since response {} {}", v, data.len());
let len = data.len();
let rsp = Protocol::ReceiveUpdates(from, ups, data);
obj.write().unwrap().insert(&reqdata);
if len < 1 {
let me = obj.read().unwrap();
trace!(
"no updates me {:?} ix {} since {}",
&me.me[..4],
me.update_index,
v
);
None
} else if let Ok(r) = to_blob(rsp, addr, &blob_recycler) {
trace!(
"sending updates me {:?} len {} to {:?} {}",
&obj.read().unwrap().me[..4],
len,
&reqdata.id[..4],
addr,
);
Some(r)
} else {
warn!("to_blob failed");
None
}
2018-04-21 11:02:49 -07:00
}
Ok(Protocol::ReceiveUpdates(from, ups, data)) => {
trace!("ReceivedUpdates {:?} {} {}", &from[0..4], ups, data.len());
2018-05-11 11:38:52 -07:00
obj.write()
.expect("'obj' write lock in ReceiveUpdates")
.apply_updates(from, ups, &data);
None
2018-04-21 11:02:49 -07:00
}
Ok(Protocol::RequestWindowIndex(from, ix)) => {
2018-06-03 20:31:09 -07:00
//TODO this doesn't depend on CRDT module, can be moved
//but we are using the listen thread to service these request
2018-05-12 19:00:22 -07:00
//TODO verify from is signed
obj.write().unwrap().insert(&from);
let me = obj.read().unwrap().my_data().clone();
2018-05-24 16:26:05 -07:00
trace!(
2018-05-12 19:00:22 -07:00
"received RequestWindowIndex {} {} myaddr {}",
2018-05-24 16:30:01 -07:00
ix,
from.repair_addr,
me.repair_addr
2018-05-12 19:00:22 -07:00
);
assert_ne!(from.repair_addr, me.repair_addr);
Self::run_window_request(&window, &from, ix, blob_recycler)
}
Err(_) => {
warn!("deserialize crdt packet failed");
None
2018-05-12 19:00:22 -07:00
}
2018-04-21 11:02:49 -07:00
}
}
/// Process messages from the network
fn run_listen(
obj: &Arc<RwLock<Self>>,
window: &Window,
blob_recycler: &BlobRecycler,
requests_receiver: &BlobReceiver,
response_sender: &BlobSender,
) -> Result<()> {
//TODO cache connections
let timeout = Duration::new(1, 0);
let mut reqs = requests_receiver.recv_timeout(timeout)?;
while let Ok(mut more) = requests_receiver.try_recv() {
reqs.append(&mut more);
}
2018-05-30 12:07:28 -07:00
let resp: VecDeque<_> = reqs.iter()
.filter_map(|b| Self::handle_blob(obj, window, blob_recycler, &b.read().unwrap()))
.collect();
response_sender.send(resp)?;
while let Some(r) = reqs.pop_front() {
blob_recycler.recycle(r);
}
2018-04-21 11:02:49 -07:00
Ok(())
}
pub fn listen(
obj: Arc<RwLock<Self>>,
window: Window,
blob_recycler: BlobRecycler,
requests_receiver: BlobReceiver,
response_sender: BlobSender,
2018-04-21 11:02:49 -07:00
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
2018-05-30 13:25:32 -07:00
Builder::new()
.name("solana-listen".to_string())
.spawn(move || loop {
let e = Self::run_listen(
&obj,
&window,
&blob_recycler,
&requests_receiver,
&response_sender,
2018-05-23 21:45:40 -07:00
);
2018-05-30 13:25:32 -07:00
if e.is_err() {
info!(
"run_listen timeout, table size: {}",
obj.read().unwrap().table.len()
);
}
if exit.load(Ordering::Relaxed) {
return;
}
})
.unwrap()
2018-04-21 11:02:49 -07:00
}
}
pub struct Sockets {
pub gossip: UdpSocket,
pub gossip_send: UdpSocket,
pub requests: UdpSocket,
pub replicate: UdpSocket,
pub transaction: UdpSocket,
pub respond: UdpSocket,
pub broadcast: UdpSocket,
pub repair: UdpSocket,
pub retransmit: UdpSocket,
}
pub struct TestNode {
pub data: ReplicatedData,
pub sockets: Sockets,
}
2018-04-21 11:02:49 -07:00
impl TestNode {
pub fn new() -> TestNode {
2018-04-28 00:31:20 -07:00
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap();
let requests = UdpSocket::bind("0.0.0.0:0").unwrap();
let transaction = UdpSocket::bind("0.0.0.0:0").unwrap();
2018-04-28 00:31:20 -07:00
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let repair = UdpSocket::bind("0.0.0.0:0").unwrap();
let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap();
2018-04-28 00:31:20 -07:00
let pubkey = KeyPair::new().pubkey();
let data = ReplicatedData::new(
2018-04-28 00:31:20 -07:00
pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
requests.local_addr().unwrap(),
transaction.local_addr().unwrap(),
repair.local_addr().unwrap(),
2018-04-28 00:31:20 -07:00
);
TestNode {
data: data,
sockets: Sockets {
gossip,
gossip_send,
requests,
replicate,
transaction,
respond,
broadcast,
repair,
retransmit,
},
2018-04-21 11:02:49 -07:00
}
2018-04-26 13:48:42 -07:00
}
}
2018-04-26 13:48:42 -07:00
#[cfg(test)]
mod tests {
use crdt::{parse_port_or_addr, Crdt, ReplicatedData};
2018-06-03 19:59:17 -07:00
use packet::BlobRecycler;
use result::Error;
use signature::{KeyPair, KeyPairUtil};
2018-06-03 19:59:17 -07:00
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::time::Duration;
2018-06-03 20:31:09 -07:00
use streamer::default_window;
2018-04-26 13:48:42 -07:00
#[test]
fn test_parse_port_or_addr() {
let p1 = parse_port_or_addr(Some("9000".to_string()));
assert_eq!(p1.port(), 9000);
let p2 = parse_port_or_addr(Some("127.0.0.1:7000".to_string()));
assert_eq!(p2.port(), 7000);
let p3 = parse_port_or_addr(None);
assert_eq!(p3.port(), 8000);
}
2018-04-21 11:02:49 -07:00
#[test]
fn insert_test() {
2018-04-28 00:31:20 -07:00
let mut d = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
2018-05-23 07:11:11 -07:00
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
2018-04-28 00:31:20 -07:00
);
2018-04-21 11:02:49 -07:00
assert_eq!(d.version, 0);
let mut crdt = Crdt::new(d.clone());
assert_eq!(crdt.table[&d.id].version, 0);
d.version = 2;
2018-05-12 19:00:22 -07:00
crdt.insert(&d);
2018-04-21 11:02:49 -07:00
assert_eq!(crdt.table[&d.id].version, 2);
d.version = 1;
2018-05-12 19:00:22 -07:00
crdt.insert(&d);
2018-04-21 11:02:49 -07:00
assert_eq!(crdt.table[&d.id].version, 2);
}
fn sorted(ls: &Vec<ReplicatedData>) -> Vec<ReplicatedData> {
2018-05-30 10:04:11 -07:00
let mut copy: Vec<_> = ls.iter().cloned().collect();
copy.sort_by(|x, y| x.id.cmp(&y.id));
copy
}
2018-04-28 00:31:20 -07:00
#[test]
fn replicated_data_new_leader() {
let d1 = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap());
assert_eq!(d1.gossip_addr, "127.0.0.1:1235".parse().unwrap());
assert_eq!(d1.replicate_addr, "127.0.0.1:1236".parse().unwrap());
assert_eq!(d1.requests_addr, "127.0.0.1:1237".parse().unwrap());
assert_eq!(d1.transactions_addr, "127.0.0.1:1234".parse().unwrap());
assert_eq!(d1.repair_addr, "127.0.0.1:1238".parse().unwrap());
}
#[test]
fn update_test() {
let d1 = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
let d2 = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
let d3 = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
let mut crdt = Crdt::new(d1.clone());
let (key, ix, ups) = crdt.get_updates_since(0);
assert_eq!(key, d1.id);
assert_eq!(ix, 1);
assert_eq!(ups.len(), 1);
assert_eq!(sorted(&ups), sorted(&vec![d1.clone()]));
crdt.insert(&d2);
let (key, ix, ups) = crdt.get_updates_since(0);
assert_eq!(key, d1.id);
assert_eq!(ix, 2);
assert_eq!(ups.len(), 2);
assert_eq!(sorted(&ups), sorted(&vec![d1.clone(), d2.clone()]));
crdt.insert(&d3);
let (key, ix, ups) = crdt.get_updates_since(0);
assert_eq!(key, d1.id);
assert_eq!(ix, 3);
assert_eq!(ups.len(), 3);
assert_eq!(
sorted(&ups),
sorted(&vec![d1.clone(), d2.clone(), d3.clone()])
);
let mut crdt2 = Crdt::new(d2.clone());
crdt2.apply_updates(key, ix, &ups);
assert_eq!(crdt2.table.values().len(), 3);
assert_eq!(
sorted(&crdt2.table.values().map(|x| x.clone()).collect()),
sorted(&crdt.table.values().map(|x| x.clone()).collect())
);
let d4 = ReplicatedData::new_entry_point("127.0.0.4:1234".parse().unwrap());
crdt.insert(&d4);
let (_key, _ix, ups) = crdt.get_updates_since(0);
assert_eq!(sorted(&ups), sorted(&vec![d2.clone(), d1, d3]));
2018-04-28 00:31:20 -07:00
}
#[test]
fn window_index_request() {
let me = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
let mut crdt = Crdt::new(me.clone());
let rv = crdt.window_index_request(0);
assert_matches!(rv, Err(Error::CrdtTooSmall));
let nxt = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"0.0.0.0:0".parse().unwrap(),
);
crdt.insert(&nxt);
let rv = crdt.window_index_request(0);
assert_matches!(rv, Err(Error::CrdtTooSmall));
let nxt = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.2:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
crdt.insert(&nxt);
let rv = crdt.window_index_request(0).unwrap();
assert_eq!(nxt.gossip_addr, "127.0.0.2:1234".parse().unwrap());
assert_eq!(rv.0, "127.0.0.2:1234".parse().unwrap());
let nxt = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.3:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
crdt.insert(&nxt);
let mut one = false;
let mut two = false;
while !one || !two {
//this randomly picks an option, so eventually it should pick both
let rv = crdt.window_index_request(0).unwrap();
if rv.0 == "127.0.0.2:1234".parse().unwrap() {
one = true;
}
if rv.0 == "127.0.0.3:1234".parse().unwrap() {
two = true;
}
}
assert!(one && two);
}
2018-06-03 20:36:27 -07:00
/// test that gossip requests are eventually generated for all nodes
2018-06-03 19:59:17 -07:00
#[test]
fn gossip_request() {
let me = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
let mut crdt = Crdt::new(me.clone());
let rv = crdt.gossip_request();
assert_matches!(rv, Err(Error::CrdtTooSmall));
2018-06-03 20:04:25 -07:00
let nxt1 = ReplicatedData::new(
2018-06-03 19:59:17 -07:00
KeyPair::new().pubkey(),
2018-06-03 20:04:25 -07:00
"127.0.0.2:1234".parse().unwrap(),
2018-06-03 19:59:17 -07:00
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
2018-06-03 20:04:25 -07:00
crdt.insert(&nxt1);
2018-06-03 19:59:17 -07:00
let rv = crdt.gossip_request().unwrap();
2018-06-03 20:04:25 -07:00
assert_eq!(rv.0, nxt1.gossip_addr);
let nxt2 = ReplicatedData::new_entry_point("127.0.0.3:1234".parse().unwrap());
2018-06-03 20:04:25 -07:00
crdt.insert(&nxt2);
2018-06-03 20:36:27 -07:00
// check that the service works
// and that it eventually produces a request for both nodes
2018-06-03 19:59:17 -07:00
let (sender, reader) = channel();
let recycler = BlobRecycler::default();
let exit = Arc::new(AtomicBool::new(false));
let obj = Arc::new(RwLock::new(crdt));
let thread = Crdt::gossip(obj, recycler, sender, exit.clone());
2018-06-03 20:04:25 -07:00
let mut one = false;
let mut two = false;
for _ in 0..30 {
//50% chance each try that we get a repeat
2018-06-03 20:31:09 -07:00
let mut rv = reader.recv_timeout(Duration::new(1, 0)).unwrap();
while let Ok(mut more) = reader.try_recv() {
rv.append(&mut more);
}
2018-06-03 20:04:25 -07:00
assert!(rv.len() > 0);
for i in rv.iter() {
if i.read().unwrap().meta.addr() == nxt1.gossip_addr {
one = true;
} else if i.read().unwrap().meta.addr() == nxt2.gossip_addr {
two = true;
} else {
//unexpected request
assert!(false);
}
}
if one && two {
break;
}
2018-06-03 19:59:17 -07:00
}
exit.store(true, Ordering::Relaxed);
thread.join().unwrap();
2018-06-03 20:04:25 -07:00
//created requests to both
assert!(one && two);
2018-06-03 19:59:17 -07:00
}
2018-06-03 20:31:09 -07:00
2018-06-03 20:36:27 -07:00
/// test window requests respond with the right blob, and do not overrun
2018-06-03 20:31:09 -07:00
#[test]
fn run_window_request() {
let window = default_window();
let me = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
let recycler = BlobRecycler::default();
let rv = Crdt::run_window_request(&window, &me, 0, &recycler);
assert!(rv.is_none());
let out = recycler.allocate();
out.write().unwrap().meta.size = 200;
window.write().unwrap()[0] = Some(out);
let rv = Crdt::run_window_request(&window, &me, 0, &recycler);
assert!(rv.is_some());
let v = rv.unwrap();
//test we copied the blob
assert_eq!(v.read().unwrap().meta.size, 200);
let len = window.read().unwrap().len() as u64;
let rv = Crdt::run_window_request(&window, &me, len, &recycler);
assert!(rv.is_none());
}
2018-04-21 11:02:49 -07:00
}