stick all the addrs into one struct

This commit is contained in:
Anatoly Yakovenko 2018-07-09 15:53:49 -07:00 committed by Greg Fitzgerald
parent 606cfbfe1e
commit 2ea030be48
10 changed files with 258 additions and 190 deletions

19
src/bin/client-demo.rs Executable file → Normal file
View File

@ -51,17 +51,20 @@ fn sample_tx_count(
now = Instant::now(); now = Instant::now();
let sample = tx_count - initial_tx_count; let sample = tx_count - initial_tx_count;
initial_tx_count = tx_count; initial_tx_count = tx_count;
println!("{}: Transactions processed {}", v.transactions_addr, sample); println!(
"{}: Transactions processed {}",
v.addrs.transactions, sample
);
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let tps = (sample * 1_000_000_000) as f64 / ns as f64; let tps = (sample * 1_000_000_000) as f64 / ns as f64;
if tps > max_tps { if tps > max_tps {
max_tps = tps; max_tps = tps;
} }
println!("{}: {:.2} tps", v.transactions_addr, tps); println!("{}: {:.2} tps", v.addrs.transactions, tps);
total = tx_count - first_count; total = tx_count - first_count;
println!( println!(
"{}: Total Transactions processed {}", "{}: Total Transactions processed {}",
v.transactions_addr, total v.addrs.transactions, total
); );
sleep(Duration::new(sample_period, 0)); sleep(Duration::new(sample_period, 0));
@ -113,7 +116,7 @@ fn generate_and_send_txs(
println!( println!(
"Transferring 1 unit {} times... to {:?}", "Transferring 1 unit {} times... to {:?}",
txs.len(), txs.len(),
leader.transactions_addr leader.addrs.transactions
); );
for tx in txs { for tx in txs {
client.transfer_signed(tx.clone()).unwrap(); client.transfer_signed(tx.clone()).unwrap();
@ -212,7 +215,7 @@ fn main() {
time_sec = s.to_string().parse().expect("integer"); time_sec = s.to_string().parse().expect("integer");
} }
let mut drone_addr = leader.transactions_addr.clone(); let mut drone_addr = leader.addrs.transactions.clone();
drone_addr.set_port(9900); drone_addr.set_port(9900);
let signal = Arc::new(AtomicBool::new(false)); let signal = Arc::new(AtomicBool::new(false));
@ -327,9 +330,9 @@ fn mk_client(r: &ReplicatedData) -> ThinClient {
.unwrap(); .unwrap();
ThinClient::new( ThinClient::new(
r.requests_addr, r.addrs.requests,
requests_socket, requests_socket,
r.transactions_addr, r.addrs.transactions,
transactions_socket, transactions_socket,
) )
} }
@ -381,7 +384,7 @@ fn converge(
.table .table
.values() .values()
.into_iter() .into_iter()
.filter(|x| x.requests_addr != daddr) .filter(|x| x.addrs.requests != daddr)
.cloned() .cloned()
.collect(); .collect();
if v.len() >= num_nodes { if v.len() >= num_nodes {

View File

@ -94,8 +94,8 @@ fn main() {
let drone = Arc::new(Mutex::new(Drone::new( let drone = Arc::new(Mutex::new(Drone::new(
mint_keypair, mint_keypair,
drone_addr, drone_addr,
leader.transactions_addr, leader.addrs.transactions,
leader.requests_addr, leader.addrs.requests,
time_slice, time_slice,
request_cap, request_cap,
))); )));

View File

@ -156,7 +156,7 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
exit(1); exit(1);
}; };
let mut drone_addr = leader.transactions_addr.clone(); let mut drone_addr = leader.addrs.transactions.clone();
drone_addr.set_port(9900); drone_addr.set_port(9900);
let command = match matches.subcommand() { let command = match matches.subcommand() {
@ -305,9 +305,9 @@ fn mk_client(r: &ReplicatedData) -> io::Result<ThinClient> {
.unwrap(); .unwrap();
Ok(ThinClient::new( Ok(ThinClient::new(
r.requests_addr, r.addrs.requests,
requests_socket, requests_socket,
r.transactions_addr, r.addrs.transactions,
transactions_socket, transactions_socket,
)) ))
} }

View File

@ -19,10 +19,11 @@ use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerSt
use hash::Hash; use hash::Hash;
use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE};
use pnet_datalink as datalink; use pnet_datalink as datalink;
use rand::{thread_rng, Rng};
use rayon::prelude::*; use rayon::prelude::*;
use result::{Error, Result}; use result::{Error, Result};
use ring::rand::{SecureRandom, SystemRandom}; use ring::rand::{SecureRandom, SystemRandom};
use signature::{KeyPair, KeyPairUtil, PublicKey, Signature}; use signature::{KeyPair, KeyPairUtil, PublicKey};
use std; use std;
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::VecDeque; use std::collections::VecDeque;
@ -36,8 +37,8 @@ use streamer::{BlobReceiver, BlobSender, Window};
use timing::timestamp; use timing::timestamp;
/// milliseconds we sleep for between gossip requests /// milliseconds we sleep for between gossip requests
const GOSSIP_SLEEP_MILLIS: u64 = 100; const GOSSIP_SLEEP_MILLIS: u64 = 200;
//const GOSSIP_MIN_PURGE_MILLIS: u64 = 15000; const GOSSIP_PURGE_MILLIS: u64 = 15000;
/// minimum membership table size before we start purging dead nodes /// minimum membership table size before we start purging dead nodes
const MIN_TABLE_SIZE: usize = 2; const MIN_TABLE_SIZE: usize = 2;
@ -45,6 +46,7 @@ const MIN_TABLE_SIZE: usize = 2;
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
pub enum CrdtError { pub enum CrdtError {
TooSmall, TooSmall,
NoLeader,
} }
pub fn parse_port_or_addr(optstr: Option<String>) -> SocketAddr { pub fn parse_port_or_addr(optstr: Option<String>) -> SocketAddr {
@ -90,28 +92,39 @@ pub fn get_ip_addr() -> Option<IpAddr> {
/// Structure to be replicated by the network /// Structure to be replicated by the network
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct ReplicatedData { pub struct Addrs {
pub id: PublicKey, /// gossip address
sig: Signature, pub gossip: SocketAddr,
/// should always be increasing
pub version: u64,
/// address to connect to for gossip
pub gossip_addr: SocketAddr,
/// address to connect to for replication /// address to connect to for replication
pub replicate_addr: SocketAddr, pub replicate: SocketAddr,
/// address to connect to when this node is leader /// address to connect to when this node is leader
pub requests_addr: SocketAddr, pub requests: SocketAddr,
/// transactions address /// transactions address
pub transactions_addr: SocketAddr, pub transactions: SocketAddr,
/// repair address, we use this to jump ahead of the packets /// repair address, we use this to jump ahead of the packets
/// destined to the replciate_addr /// destined to the replciate_addr
pub repair_addr: SocketAddr, pub repair: SocketAddr,
/// if this struture changes update this value as well
/// Always update `ReplicatedData` version too
/// This separate version for addresses allows us to use the `Vote`
/// as means of updating the `ReplicatedData` table without touching the
/// addresses if they haven't changed.
pub version: u64,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct ReplicatedData {
pub id: PublicKey,
/// If any of the bits change, update increment this value
pub version: u64,
/// network addresses
pub addrs: Addrs,
/// current leader identity /// current leader identity
pub current_leader_id: PublicKey, pub current_leader_id: PublicKey,
/// last verified hash that was submitted to the leader /// last verified hash that was submitted to the leader
last_verified_hash: Hash, last_verified_id: Hash,
/// last verified count, always increasing /// last verified count, always increasing
last_verified_count: u64, last_verified_height: u64,
} }
fn make_debug_id(buf: &[u8]) -> u64 { fn make_debug_id(buf: &[u8]) -> u64 {
@ -123,24 +136,26 @@ fn make_debug_id(buf: &[u8]) -> u64 {
impl ReplicatedData { impl ReplicatedData {
pub fn new( pub fn new(
id: PublicKey, id: PublicKey,
gossip_addr: SocketAddr, gossip: SocketAddr,
replicate_addr: SocketAddr, replicate: SocketAddr,
requests_addr: SocketAddr, requests: SocketAddr,
transactions_addr: SocketAddr, transactions: SocketAddr,
repair_addr: SocketAddr, repair: SocketAddr,
) -> ReplicatedData { ) -> ReplicatedData {
ReplicatedData { ReplicatedData {
id, id,
sig: Signature::default(),
version: 0, version: 0,
gossip_addr, addrs: Addrs {
replicate_addr, gossip,
requests_addr, replicate,
transactions_addr, requests,
repair_addr, transactions,
repair,
version: 0,
},
current_leader_id: PublicKey::default(), current_leader_id: PublicKey::default(),
last_verified_hash: Hash::default(), last_verified_id: Hash::default(),
last_verified_count: 0, last_verified_height: 0,
} }
} }
pub fn debug_id(&self) -> u64 { pub fn debug_id(&self) -> u64 {
@ -151,14 +166,12 @@ impl ReplicatedData {
nxt_addr.set_port(addr.port() + nxt); nxt_addr.set_port(addr.port() + nxt);
nxt_addr nxt_addr
} }
pub fn new_leader_with_pubkey(pubkey: PublicKey, bind_addr: &SocketAddr) -> Self {
pub fn new_leader(bind_addr: &SocketAddr) -> Self {
let transactions_addr = bind_addr.clone(); let transactions_addr = bind_addr.clone();
let gossip_addr = Self::next_port(&bind_addr, 1); let gossip_addr = Self::next_port(&bind_addr, 1);
let replicate_addr = Self::next_port(&bind_addr, 2); let replicate_addr = Self::next_port(&bind_addr, 2);
let requests_addr = Self::next_port(&bind_addr, 3); let requests_addr = Self::next_port(&bind_addr, 3);
let repair_addr = Self::next_port(&bind_addr, 4); let repair_addr = Self::next_port(&bind_addr, 4);
let pubkey = KeyPair::new().pubkey();
ReplicatedData::new( ReplicatedData::new(
pubkey, pubkey,
gossip_addr, gossip_addr,
@ -168,6 +181,10 @@ impl ReplicatedData {
repair_addr, repair_addr,
) )
} }
pub fn new_leader(bind_addr: &SocketAddr) -> Self {
let keypair = KeyPair::new();
Self::new_leader_with_pubkey(keypair.pubkey(), bind_addr)
}
pub fn new_entry_point(gossip_addr: SocketAddr) -> Self { pub fn new_entry_point(gossip_addr: SocketAddr) -> Self {
let daddr: SocketAddr = "0.0.0.0:0".parse().unwrap(); let daddr: SocketAddr = "0.0.0.0:0".parse().unwrap();
ReplicatedData::new( ReplicatedData::new(
@ -193,6 +210,7 @@ impl ReplicatedData {
/// * `listen` - listen for requests and responses /// * `listen` - listen for requests and responses
/// No attempt to keep track of timeouts or dropped requests is made, or should be. /// No attempt to keep track of timeouts or dropped requests is made, or should be.
pub struct Crdt { pub struct Crdt {
/// table of everyone in the network
pub table: HashMap<PublicKey, ReplicatedData>, pub table: HashMap<PublicKey, ReplicatedData>,
/// Value of my update index when entry in table was updated. /// Value of my update index when entry in table was updated.
/// Nodes will ask for updates since `update_index`, and this node /// Nodes will ask for updates since `update_index`, and this node
@ -202,9 +220,12 @@ pub struct Crdt {
/// The value of the remote update index that I have last seen /// The value of the remote update index that I have last seen
/// This Node will ask external nodes for updates since the value in this list /// This Node will ask external nodes for updates since the value in this list
pub remote: HashMap<PublicKey, u64>, pub remote: HashMap<PublicKey, u64>,
/// last time the public key had sent us a message
pub alive: HashMap<PublicKey, u64>, pub alive: HashMap<PublicKey, u64>,
pub update_index: u64, pub update_index: u64,
pub me: PublicKey, pub me: PublicKey,
/// last time we heard from anyone getting a message fro this public key
/// these are rumers and shouldn't be trusted directly
external_liveness: HashMap<PublicKey, HashMap<PublicKey, u64>>, external_liveness: HashMap<PublicKey, HashMap<PublicKey, u64>>,
} }
// TODO These messages should be signed, and go through the gpu pipeline for spam filtering // TODO These messages should be signed, and go through the gpu pipeline for spam filtering
@ -213,11 +234,13 @@ enum Protocol {
/// forward your own latest data structure when requesting an update /// forward your own latest data structure when requesting an update
/// this doesn't update the `remote` update index, but it allows the /// this doesn't update the `remote` update index, but it allows the
/// recepient of this request to add knowledge of this node to the network /// recepient of this request to add knowledge of this node to the network
/// (last update index i saw from you, my replicated data)
RequestUpdates(u64, ReplicatedData), RequestUpdates(u64, ReplicatedData),
//TODO might need a since? //TODO might need a since?
/// from id, form's last update index, ReplicatedData /// from id, form's last update index, ReplicatedData
ReceiveUpdates(PublicKey, u64, Vec<ReplicatedData>, Vec<(PublicKey, u64)>), ReceiveUpdates(PublicKey, u64, Vec<ReplicatedData>, Vec<(PublicKey, u64)>),
/// ask for a missing index /// ask for a missing index
/// (my replicated data to keep alive, missing window index)
RequestWindowIndex(ReplicatedData, u64), RequestWindowIndex(ReplicatedData, u64),
} }
@ -249,8 +272,8 @@ impl Crdt {
pub fn set_leader(&mut self, key: PublicKey) -> () { pub fn set_leader(&mut self, key: PublicKey) -> () {
let mut me = self.my_data().clone(); let mut me = self.my_data().clone();
info!( warn!(
"{:x}: setting leader to {:x} from {:x}", "{:x}: LEADER_UPDATE TO {:x} from {:x}",
me.debug_id(), me.debug_id(),
make_debug_id(&key), make_debug_id(&key),
make_debug_id(&me.current_leader_id), make_debug_id(&me.current_leader_id),
@ -266,32 +289,43 @@ impl Crdt {
pub fn insert(&mut self, v: &ReplicatedData) { pub fn insert(&mut self, v: &ReplicatedData) {
// TODO check that last_verified types are always increasing // TODO check that last_verified types are always increasing
//update the peer table
if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) { if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) {
//somehow we signed a message for our own identity with a higher version that //somehow we signed a message for our own identity with a higher version that
// we have stored ourselves // we have stored ourselves
trace!( trace!(
"me: {:x} v.id: {:x} version: {}", "{:x}: insert v.id: {:x} version: {}",
self.debug_id(), self.debug_id(),
v.debug_id(), v.debug_id(),
v.version v.version
); );
self.update_index += 1; self.update_index += 1;
let _ = self.table.insert(v.id.clone(), v.clone()); let _ = self.table.insert(v.id.clone(), v.clone());
let _ = self.local.insert(v.id, self.update_index); let _ = self.local.insert(v.id, self.update_index);
} else { } else {
trace!( trace!(
"INSERT FAILED me: {:x} data: {:?} new.version: {} me.version: {}", "{:x}: INSERT FAILED data: {:?} new.version: {} me.version: {}",
self.debug_id(), self.debug_id(),
v.debug_id(), v.debug_id(),
v.version, v.version,
self.table[&v.id].version self.table[&v.id].version
); );
} }
//update the liveness table self.update_liveness(v.id);
let now = timestamp();
*self.alive.entry(v.id).or_insert(now) = now;
} }
fn update_liveness(&mut self, id: PublicKey) {
//update the liveness table
let now = timestamp();
trace!(
"{:x} updating liveness {:x} to {}",
self.debug_id(),
make_debug_id(&id),
now
);
*self.alive.entry(id).or_insert(now) = now;
}
/// purge old validators /// purge old validators
/// TODO: we need a robust membership protocol /// TODO: we need a robust membership protocol
/// http://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf /// http://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf
@ -300,18 +334,33 @@ impl Crdt {
if self.table.len() <= MIN_TABLE_SIZE { if self.table.len() <= MIN_TABLE_SIZE {
return; return;
} }
if self.leader_data().is_none() {
return;
}
let leader_id = self.leader_data().unwrap().id;
//wait for 4x as long as it would randomly take to reach our node let limit = GOSSIP_PURGE_MILLIS;
//assuming everyone is waiting the same amount of time as this node
let limit = self.table.len() as u64 * GOSSIP_SLEEP_MILLIS * 4;
//let limit = std::cmp::max(limit, GOSSIP_MIN_PURGE_MILLIS);
let dead_ids: Vec<PublicKey> = self.alive let dead_ids: Vec<PublicKey> = self.alive
.iter() .iter()
.filter_map(|(&k, v)| { .filter_map(|(&k, v)| {
if k != self.me && (now - v) > limit { if k != self.me && (now - v) > limit {
info!("purge {:x} {}", make_debug_id(&k), now - v); if leader_id == k {
Some(k) info!(
"{:x}: PURGE LEADER {:x} {}",
self.debug_id(),
make_debug_id(&k),
now - v
);
Some(k)
} else {
info!(
"{:x}: purge {:x} {}",
self.debug_id(),
make_debug_id(&k),
now - v
);
Some(k)
}
} else { } else {
trace!( trace!(
"purge skipped {:x} {} {}", "purge skipped {:x} {} {}",
@ -337,20 +386,10 @@ impl Crdt {
} }
pub fn index_blobs( pub fn index_blobs(
obj: &Arc<RwLock<Self>>, me: &ReplicatedData,
blobs: &Vec<SharedBlob>, blobs: &Vec<SharedBlob>,
receive_index: &mut u64, receive_index: &mut u64,
) -> Result<()> { ) -> Result<()> {
let me: ReplicatedData = {
let robj = obj.read().expect("'obj' read lock in crdt::index_blobs");
debug!(
"{:x}: broadcast table {}",
robj.debug_id(),
robj.table.len()
);
robj.table[&robj.me].clone()
};
// enumerate all the blobs, those are the indices // enumerate all the blobs, those are the indices
for (i, b) in blobs.iter().enumerate() { for (i, b) in blobs.iter().enumerate() {
// only leader should be broadcasting // only leader should be broadcasting
@ -362,45 +401,48 @@ impl Crdt {
Ok(()) Ok(())
} }
/// compute broadcast table
/// # Remarks
pub fn compute_broadcast_table(&self) -> Vec<ReplicatedData> {
let mut live: Vec<_> = self.alive.iter().collect();
thread_rng().shuffle(&mut live);
let daddr = "0.0.0.0:0".parse().unwrap();
let me = &self.table[&self.me];
let cloned_table: Vec<ReplicatedData> = live.iter()
.map(|x| &self.table[x.0])
.filter(|v| {
if me.id == v.id {
//filter myself
false
} else if v.addrs.replicate == daddr {
trace!("broadcast skip not listening {:x}", v.debug_id());
false
} else {
trace!("broadcast node {}", v.addrs.replicate);
true
}
})
.cloned()
.collect();
cloned_table
}
/// broadcast messages from the leader to layer 1 nodes /// broadcast messages from the leader to layer 1 nodes
/// # Remarks /// # Remarks
/// We need to avoid having obj locked while doing any io, such as the `send_to` /// We need to avoid having obj locked while doing any io, such as the `send_to`
pub fn broadcast( pub fn broadcast(
obj: &Arc<RwLock<Self>>, me: &ReplicatedData,
broadcast_table: &Vec<ReplicatedData>,
window: &Window, window: &Window,
s: &UdpSocket, s: &UdpSocket,
transmit_index: &mut u64, transmit_index: &mut u64,
received_index: u64, received_index: u64,
) -> Result<()> { ) -> Result<()> {
let (me, table): (ReplicatedData, Vec<ReplicatedData>) = { if broadcast_table.len() < 1 {
// copy to avoid locking during IO warn!("not enough peers in crdt table");
let robj = obj.read().expect("'obj' read lock in pub fn broadcast");
trace!("broadcast table {}", robj.table.len());
let cloned_table: Vec<ReplicatedData> = robj.table.values().cloned().collect();
(robj.table[&robj.me].clone(), cloned_table)
};
let daddr = "0.0.0.0:0".parse().unwrap();
let nodes: Vec<&ReplicatedData> = table
.iter()
.filter(|v| {
if me.id == v.id {
//filter myself
false
} else if v.replicate_addr == daddr {
trace!("broadcast skip not listening {:x}", v.debug_id());
false
} else {
trace!("broadcast node {}", v.replicate_addr);
true
}
})
.collect();
if nodes.len() < 1 {
warn!("crdt too small");
Err(CrdtError::TooSmall)?; Err(CrdtError::TooSmall)?;
} }
trace!("broadcast nodes {}", nodes.len()); trace!("broadcast nodes {}", broadcast_table.len());
// enumerate all the blobs in the window, those are the indices // enumerate all the blobs in the window, those are the indices
// transmit them to nodes, starting from a different node // transmit them to nodes, starting from a different node
@ -411,7 +453,8 @@ impl Crdt {
let k = is % window_l.len(); let k = is % window_l.len();
assert!(window_l[k].is_some()); assert!(window_l[k].is_some());
orders.push((window_l[k].clone(), nodes[is % nodes.len()])); let pos = is % broadcast_table.len();
orders.push((window_l[k].clone(), &broadcast_table[pos]));
} }
trace!("broadcast orders table {}", orders.len()); trace!("broadcast orders table {}", orders.len());
@ -427,12 +470,12 @@ impl Crdt {
"broadcast idx: {} sz: {} to {} coding: {}", "broadcast idx: {} sz: {} to {} coding: {}",
blob.get_index().unwrap(), blob.get_index().unwrap(),
blob.meta.size, blob.meta.size,
v.replicate_addr, v.addrs.replicate,
blob.is_coding() blob.is_coding()
); );
assert!(blob.meta.size < BLOB_SIZE); assert!(blob.meta.size < BLOB_SIZE);
let e = s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr); let e = s.send_to(&blob.data[..blob.meta.size], &v.addrs.replicate);
trace!("done broadcast {} to {}", blob.meta.size, v.replicate_addr); trace!("done broadcast {} to {}", blob.meta.size, v.addrs.replicate);
e e
}) })
.collect(); .collect();
@ -470,8 +513,8 @@ impl Crdt {
} else if me.current_leader_id == v.id { } else if me.current_leader_id == v.id {
trace!("skip retransmit to leader {:?}", v.id); trace!("skip retransmit to leader {:?}", v.id);
false false
} else if v.replicate_addr == daddr { } else if v.addrs.replicate == daddr {
trace!("retransmit skip not listening {:x}", v.debug_id()); trace!("skip nodes that are not listening {:?}", v.id);
false false
} else { } else {
true true
@ -482,7 +525,7 @@ impl Crdt {
let errs: Vec<_> = orders let errs: Vec<_> = orders
.par_iter() .par_iter()
.map(|v| { .map(|v| {
info!( debug!(
"{:x}: retransmit blob {} to {:x}", "{:x}: retransmit blob {} to {:x}",
me.debug_id(), me.debug_id(),
rblob.get_index().unwrap(), rblob.get_index().unwrap(),
@ -490,7 +533,7 @@ impl Crdt {
); );
//TODO profile this, may need multiple sockets for par_iter //TODO profile this, may need multiple sockets for par_iter
assert!(rblob.meta.size < BLOB_SIZE); assert!(rblob.meta.size < BLOB_SIZE);
s.send_to(&rblob.data[..rblob.meta.size], &v.replicate_addr) s.send_to(&rblob.data[..rblob.meta.size], &v.addrs.replicate)
}) })
.collect(); .collect();
for e in errs { for e in errs {
@ -538,13 +581,13 @@ impl Crdt {
let daddr = "0.0.0.0:0".parse().unwrap(); let daddr = "0.0.0.0:0".parse().unwrap();
let valid: Vec<_> = self.table let valid: Vec<_> = self.table
.values() .values()
.filter(|r| r.id != self.me && r.repair_addr != daddr) .filter(|r| r.id != self.me && r.addrs.repair != daddr)
.collect(); .collect();
if valid.is_empty() { if valid.is_empty() {
Err(CrdtError::TooSmall)?; Err(CrdtError::TooSmall)?;
} }
let n = (Self::random() as usize) % valid.len(); let n = (Self::random() as usize) % valid.len();
let addr = valid[n].gossip_addr.clone(); let addr = valid[n].addrs.gossip.clone();
let req = Protocol::RequestWindowIndex(self.table[&self.me].clone(), ix); let req = Protocol::RequestWindowIndex(self.table[&self.me].clone(), ix);
let out = serialize(&req)?; let out = serialize(&req)?;
Ok((addr, out)) Ok((addr, out))
@ -581,10 +624,10 @@ impl Crdt {
"created gossip request from {:x} to {:x} {}", "created gossip request from {:x} to {:x} {}",
self.debug_id(), self.debug_id(),
v.debug_id(), v.debug_id(),
v.gossip_addr v.addrs.gossip
); );
Ok((v.gossip_addr, req)) Ok((v.addrs.gossip, req))
} }
/// At random pick a node and try to get updated changes from them /// At random pick a node and try to get updated changes from them
@ -702,10 +745,10 @@ impl Crdt {
.spawn(move || loop { .spawn(move || loop {
let start = timestamp(); let start = timestamp();
let _ = Self::run_gossip(&obj, &blob_sender, &blob_recycler); let _ = Self::run_gossip(&obj, &blob_sender, &blob_recycler);
obj.write().unwrap().purge(timestamp());
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
return; return;
} }
obj.write().unwrap().purge(timestamp());
//TODO: possibly tune this parameter //TODO: possibly tune this parameter
//we saw a deadlock passing an obj.read().unwrap().timeout into sleep //we saw a deadlock passing an obj.read().unwrap().timeout into sleep
let _ = obj.write().unwrap().update_leader(); let _ = obj.write().unwrap().update_leader();
@ -753,7 +796,7 @@ impl Crdt {
let sz = wblob.meta.size; let sz = wblob.meta.size;
outblob.meta.size = sz; outblob.meta.size = sz;
outblob.data[..sz].copy_from_slice(&wblob.data[..sz]); outblob.data[..sz].copy_from_slice(&wblob.data[..sz]);
outblob.meta.set_addr(&from.repair_addr); outblob.meta.set_addr(&from.addrs.repair);
outblob.set_id(sender_id).expect("blob set_id"); outblob.set_id(sender_id).expect("blob set_id");
} }
@ -789,7 +832,7 @@ impl Crdt {
// TODO sigverify these // TODO sigverify these
Ok(Protocol::RequestUpdates(v, from_rd)) => { Ok(Protocol::RequestUpdates(v, from_rd)) => {
trace!("RequestUpdates {}", v); trace!("RequestUpdates {}", v);
let addr = from_rd.gossip_addr; let addr = from_rd.addrs.gossip;
let me = obj.read().unwrap(); let me = obj.read().unwrap();
// only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from` // only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from`
let (from, ups, data) = me.get_updates_since(v); let (from, ups, data) = me.get_updates_since(v);
@ -849,7 +892,7 @@ impl Crdt {
from.debug_id(), from.debug_id(),
ix, ix,
); );
assert_ne!(from.repair_addr, me.repair_addr); assert_ne!(from.addrs.repair, me.addrs.repair);
Self::run_window_request(&window, &me, &from, ix, blob_recycler) Self::run_window_request(&window, &me, &from, ix, blob_recycler)
} }
Err(_) => { Err(_) => {
@ -890,6 +933,7 @@ impl Crdt {
response_sender: BlobSender, response_sender: BlobSender,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
let debug_id = obj.read().unwrap().debug_id();
Builder::new() Builder::new()
.name("solana-listen".to_string()) .name("solana-listen".to_string())
.spawn(move || loop { .spawn(move || loop {
@ -900,15 +944,16 @@ impl Crdt {
&requests_receiver, &requests_receiver,
&response_sender, &response_sender,
); );
if e.is_err() {
info!(
"run_listen timeout, table size: {}",
obj.read().unwrap().table.len()
);
}
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
return; return;
} }
if e.is_err() {
info!(
"{:x}: run_listen timeout, table size: {}",
debug_id,
obj.read().unwrap().table.len()
);
}
}) })
.unwrap() .unwrap()
} }
@ -932,7 +977,11 @@ pub struct TestNode {
} }
impl TestNode { impl TestNode {
pub fn new() -> TestNode { pub fn new() -> Self {
let pubkey = KeyPair::new().pubkey();
Self::new_with_pubkey(pubkey)
}
pub fn new_with_pubkey(pubkey: PublicKey) -> Self {
let transaction = UdpSocket::bind("0.0.0.0:0").unwrap(); let transaction = UdpSocket::bind("0.0.0.0:0").unwrap();
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
@ -942,7 +991,6 @@ impl TestNode {
let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap();
let pubkey = KeyPair::new().pubkey();
let data = ReplicatedData::new( let data = ReplicatedData::new(
pubkey, pubkey,
gossip.local_addr().unwrap(), gossip.local_addr().unwrap(),
@ -968,19 +1016,19 @@ impl TestNode {
} }
pub fn new_with_bind_addr(data: ReplicatedData, bind_addr: SocketAddr) -> TestNode { pub fn new_with_bind_addr(data: ReplicatedData, bind_addr: SocketAddr) -> TestNode {
let mut local_gossip_addr = bind_addr.clone(); let mut local_gossip_addr = bind_addr.clone();
local_gossip_addr.set_port(data.gossip_addr.port()); local_gossip_addr.set_port(data.addrs.gossip.port());
let mut local_replicate_addr = bind_addr.clone(); let mut local_replicate_addr = bind_addr.clone();
local_replicate_addr.set_port(data.replicate_addr.port()); local_replicate_addr.set_port(data.addrs.replicate.port());
let mut local_requests_addr = bind_addr.clone(); let mut local_requests_addr = bind_addr.clone();
local_requests_addr.set_port(data.requests_addr.port()); local_requests_addr.set_port(data.addrs.requests.port());
let mut local_transactions_addr = bind_addr.clone(); let mut local_transactions_addr = bind_addr.clone();
local_transactions_addr.set_port(data.transactions_addr.port()); local_transactions_addr.set_port(data.addrs.transactions.port());
let mut local_repair_addr = bind_addr.clone(); let mut local_repair_addr = bind_addr.clone();
local_repair_addr.set_port(data.repair_addr.port()); local_repair_addr.set_port(data.addrs.repair.port());
let transaction = UdpSocket::bind(local_transactions_addr).unwrap(); let transaction = UdpSocket::bind(local_transactions_addr).unwrap();
let gossip = UdpSocket::bind(local_gossip_addr).unwrap(); let gossip = UdpSocket::bind(local_gossip_addr).unwrap();
@ -1016,7 +1064,8 @@ impl TestNode {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crdt::{ use crdt::{
parse_port_or_addr, Crdt, CrdtError, ReplicatedData, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE, parse_port_or_addr, Crdt, CrdtError, ReplicatedData, GOSSIP_PURGE_MILLIS,
GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE,
}; };
use logger; use logger;
use packet::BlobRecycler; use packet::BlobRecycler;
@ -1064,13 +1113,18 @@ mod tests {
copy copy
} }
#[test] #[test]
fn replicated_data_new_leader() { fn replicated_data_new_leader_with_pubkey() {
let d1 = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); let kp = KeyPair::new();
assert_eq!(d1.gossip_addr, "127.0.0.1:1235".parse().unwrap()); let d1 = ReplicatedData::new_leader_with_pubkey(
assert_eq!(d1.replicate_addr, "127.0.0.1:1236".parse().unwrap()); kp.pubkey().clone(),
assert_eq!(d1.requests_addr, "127.0.0.1:1237".parse().unwrap()); &"127.0.0.1:1234".parse().unwrap(),
assert_eq!(d1.transactions_addr, "127.0.0.1:1234".parse().unwrap()); );
assert_eq!(d1.repair_addr, "127.0.0.1:1238".parse().unwrap()); assert_eq!(d1.id, kp.pubkey());
assert_eq!(d1.addrs.gossip, "127.0.0.1:1235".parse().unwrap());
assert_eq!(d1.addrs.replicate, "127.0.0.1:1236".parse().unwrap());
assert_eq!(d1.addrs.requests, "127.0.0.1:1237".parse().unwrap());
assert_eq!(d1.addrs.transactions, "127.0.0.1:1234".parse().unwrap());
assert_eq!(d1.addrs.repair, "127.0.0.1:1238".parse().unwrap());
} }
#[test] #[test]
fn update_test() { fn update_test() {
@ -1165,7 +1219,7 @@ mod tests {
); );
crdt.insert(&nxt); crdt.insert(&nxt);
let rv = crdt.window_index_request(0).unwrap(); let rv = crdt.window_index_request(0).unwrap();
assert_eq!(nxt.gossip_addr, "127.0.0.2:1234".parse().unwrap()); assert_eq!(nxt.addrs.gossip, "127.0.0.2:1234".parse().unwrap());
assert_eq!(rv.0, "127.0.0.2:1234".parse().unwrap()); assert_eq!(rv.0, "127.0.0.2:1234".parse().unwrap());
let nxt = ReplicatedData::new( let nxt = ReplicatedData::new(
@ -1218,7 +1272,7 @@ mod tests {
crdt.insert(&nxt1); crdt.insert(&nxt1);
let rv = crdt.gossip_request().unwrap(); let rv = crdt.gossip_request().unwrap();
assert_eq!(rv.0, nxt1.gossip_addr); assert_eq!(rv.0, nxt1.addrs.gossip);
let nxt2 = ReplicatedData::new_entry_point("127.0.0.3:1234".parse().unwrap()); let nxt2 = ReplicatedData::new_entry_point("127.0.0.3:1234".parse().unwrap());
crdt.insert(&nxt2); crdt.insert(&nxt2);
@ -1239,9 +1293,9 @@ mod tests {
} }
assert!(rv.len() > 0); assert!(rv.len() > 0);
for i in rv.iter() { for i in rv.iter() {
if i.read().unwrap().meta.addr() == nxt1.gossip_addr { if i.read().unwrap().meta.addr() == nxt1.addrs.gossip {
one = true; one = true;
} else if i.read().unwrap().meta.addr() == nxt2.gossip_addr { } else if i.read().unwrap().meta.addr() == nxt2.addrs.gossip {
two = true; two = true;
} else { } else {
//unexpected request //unexpected request
@ -1260,26 +1314,27 @@ mod tests {
#[test] #[test]
fn purge_test() { fn purge_test() {
logger::setup();
let me = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap()); let me = ReplicatedData::new_leader(&"127.0.0.1:1234".parse().unwrap());
let mut crdt = Crdt::new(me.clone()); let mut crdt = Crdt::new(me.clone());
let nxt = ReplicatedData::new_leader(&"127.0.0.2:1234".parse().unwrap()); let nxt = ReplicatedData::new_leader(&"127.0.0.2:1234".parse().unwrap());
assert_ne!(me.id, nxt.id); assert_ne!(me.id, nxt.id);
crdt.set_leader(me.id);
crdt.insert(&nxt); crdt.insert(&nxt);
let rv = crdt.gossip_request().unwrap(); let rv = crdt.gossip_request().unwrap();
assert_eq!(rv.0, nxt.gossip_addr); assert_eq!(rv.0, nxt.addrs.gossip);
let now = crdt.alive[&nxt.id]; let now = crdt.alive[&nxt.id];
let len = crdt.table.len() as u64;
crdt.purge(now); crdt.purge(now);
let rv = crdt.gossip_request().unwrap(); let rv = crdt.gossip_request().unwrap();
assert_eq!(rv.0, nxt.gossip_addr); assert_eq!(rv.0, nxt.addrs.gossip);
crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4); crdt.purge(now + GOSSIP_PURGE_MILLIS);
let rv = crdt.gossip_request().unwrap(); let rv = crdt.gossip_request().unwrap();
assert_eq!(rv.0, nxt.gossip_addr); assert_eq!(rv.0, nxt.addrs.gossip);
crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4 + 1); crdt.purge(now + GOSSIP_PURGE_MILLIS + 1);
let rv = crdt.gossip_request().unwrap(); let rv = crdt.gossip_request().unwrap();
assert_eq!(rv.0, nxt.gossip_addr); assert_eq!(rv.0, nxt.addrs.gossip);
let nxt2 = ReplicatedData::new_leader(&"127.0.0.2:1234".parse().unwrap()); let nxt2 = ReplicatedData::new_leader(&"127.0.0.2:1234".parse().unwrap());
assert_ne!(me.id, nxt2.id); assert_ne!(me.id, nxt2.id);
@ -1291,12 +1346,13 @@ mod tests {
} }
let len = crdt.table.len() as u64; let len = crdt.table.len() as u64;
assert!((MIN_TABLE_SIZE as u64) < len); assert!((MIN_TABLE_SIZE as u64) < len);
crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4); crdt.purge(now + GOSSIP_PURGE_MILLIS);
assert_eq!(len as usize, crdt.table.len()); assert_eq!(len as usize, crdt.table.len());
crdt.purge(now + len * GOSSIP_SLEEP_MILLIS * 4 + 1); trace!("purging");
crdt.purge(now + GOSSIP_PURGE_MILLIS + 1);
assert_eq!(len as usize - 1, crdt.table.len());
let rv = crdt.gossip_request().unwrap(); let rv = crdt.gossip_request().unwrap();
assert_eq!(rv.0, nxt.gossip_addr); assert_eq!(rv.0, nxt.addrs.gossip);
assert_eq!(2, crdt.table.len());
} }
/// test window requests respond with the right blob, and do not overrun /// test window requests respond with the right blob, and do not overrun

View File

@ -287,8 +287,8 @@ mod tests {
let mut drone = Drone::new( let mut drone = Drone::new(
alice.keypair(), alice.keypair(),
addr, addr,
leader_data.transactions_addr, leader_data.addrs.transactions,
leader_data.requests_addr, leader_data.addrs.requests,
None, None,
Some(150_000), Some(150_000),
); );
@ -312,9 +312,9 @@ mod tests {
UdpSocket::bind("0.0.0.0:0").expect("drone bind to transactions socket"); UdpSocket::bind("0.0.0.0:0").expect("drone bind to transactions socket");
let mut client = ThinClient::new( let mut client = ThinClient::new(
leader_data.requests_addr, leader_data.addrs.requests,
requests_socket, requests_socket,
leader_data.transactions_addr, leader_data.addrs.transactions,
transactions_socket, transactions_socket,
); );

View File

@ -67,9 +67,9 @@ impl FullNode {
let local_requests_addr = node.sockets.requests.local_addr().unwrap(); let local_requests_addr = node.sockets.requests.local_addr().unwrap();
info!( info!(
"starting... local gossip address: {} (advertising {})", "starting... local gossip address: {} (advertising {})",
local_gossip_addr, node.data.gossip_addr local_gossip_addr, node.data.addrs.gossip
); );
let requests_addr = node.data.requests_addr.clone(); let requests_addr = node.data.addrs.requests.clone();
if !leader { if !leader {
let testnet_addr = network_entry_for_validator.expect("validator requires entry"); let testnet_addr = network_entry_for_validator.expect("validator requires entry");

View File

@ -227,7 +227,7 @@ fn repair_window(
let sock = UdpSocket::bind("0.0.0.0:0")?; let sock = UdpSocket::bind("0.0.0.0:0")?;
for (to, req) in reqs { for (to, req) in reqs {
//todo cache socket //todo cache socket
info!( debug!(
"{:x} repair_window request {} {} {}", "{:x} repair_window request {} {} {}",
debug_id, *consumed, *received, to debug_id, *consumed, *received, to
); );
@ -257,7 +257,7 @@ fn recv_window(
while let Ok(mut nq) = r.try_recv() { while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq) dq.append(&mut nq)
} }
info!( debug!(
"{:x}: RECV_WINDOW {} {}: got packets {}", "{:x}: RECV_WINDOW {} {}: got packets {}",
debug_id, debug_id,
*consumed, *consumed,
@ -302,7 +302,7 @@ fn recv_window(
warn!("{:x}: no leader to retransmit from", debug_id); warn!("{:x}: no leader to retransmit from", debug_id);
} }
if !retransmitq.is_empty() { if !retransmitq.is_empty() {
info!( debug!(
"{:x}: RECV_WINDOW {} {}: retransmit {}", "{:x}: RECV_WINDOW {} {}: retransmit {}",
debug_id, debug_id,
*consumed, *consumed,
@ -416,7 +416,7 @@ fn recv_window(
print_window(locked_window, *consumed); print_window(locked_window, *consumed);
trace!("sending contq.len: {}", contq.len()); trace!("sending contq.len: {}", contq.len());
if !contq.is_empty() { if !contq.is_empty() {
info!( debug!(
"{:x}: RECV_WINDOW {} {}: forwarding contq {}", "{:x}: RECV_WINDOW {} {}: forwarding contq {}",
debug_id, debug_id,
*consumed, *consumed,
@ -475,6 +475,7 @@ pub fn initialized_window(
{ {
let mut win = window.write().unwrap(); let mut win = window.write().unwrap();
let me = crdt.read().unwrap().my_data().clone();
assert!(blobs.len() <= win.len()); assert!(blobs.len() <= win.len());
debug!( debug!(
@ -485,7 +486,7 @@ pub fn initialized_window(
// Index the blobs // Index the blobs
let mut received = entry_height - blobs.len() as u64; let mut received = entry_height - blobs.len() as u64;
Crdt::index_blobs(crdt, &blobs, &mut received).expect("index blobs for initial window"); Crdt::index_blobs(&me, &blobs, &mut received).expect("index blobs for initial window");
// populate the window, offset by implied index // populate the window, offset by implied index
for b in blobs { for b in blobs {
@ -552,8 +553,8 @@ pub fn window(
} }
fn broadcast( fn broadcast(
debug_id: u64, me: &ReplicatedData,
crdt: &Arc<RwLock<Crdt>>, broadcast_table: &Vec<ReplicatedData>,
window: &Window, window: &Window,
recycler: &BlobRecycler, recycler: &BlobRecycler,
r: &BlobReceiver, r: &BlobReceiver,
@ -561,6 +562,7 @@ fn broadcast(
transmit_index: &mut u64, transmit_index: &mut u64,
receive_index: &mut u64, receive_index: &mut u64,
) -> Result<()> { ) -> Result<()> {
let debug_id = me.debug_id();
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?; let mut dq = r.recv_timeout(timer)?;
while let Ok(mut nq) = r.try_recv() { while let Ok(mut nq) = r.try_recv() {
@ -585,7 +587,7 @@ fn broadcast(
debug!("{:x} broadcast blobs.len: {}", debug_id, blobs_len); debug!("{:x} broadcast blobs.len: {}", debug_id, blobs_len);
// Index the blobs // Index the blobs
Crdt::index_blobs(crdt, &blobs, receive_index)?; Crdt::index_blobs(&me, &blobs, receive_index)?;
// keep the cache of blobs that are broadcast // keep the cache of blobs that are broadcast
{ {
let mut win = window.write().unwrap(); let mut win = window.write().unwrap();
@ -625,7 +627,14 @@ fn broadcast(
*receive_index += blobs_len as u64; *receive_index += blobs_len as u64;
// Send blobs out from the window // Send blobs out from the window
Crdt::broadcast(crdt, &window, &sock, transmit_index, *receive_index)?; Crdt::broadcast(
&me,
&broadcast_table,
&window,
&sock,
transmit_index,
*receive_index,
)?;
} }
Ok(()) Ok(())
} }
@ -652,11 +661,12 @@ pub fn broadcaster(
.spawn(move || { .spawn(move || {
let mut transmit_index = entry_height; let mut transmit_index = entry_height;
let mut receive_index = entry_height; let mut receive_index = entry_height;
let debug_id = crdt.read().unwrap().debug_id(); let me = crdt.read().unwrap().my_data().clone();
loop { loop {
let broadcast_table = crdt.read().unwrap().compute_broadcast_table();
if let Err(e) = broadcast( if let Err(e) = broadcast(
debug_id, &me,
&crdt, &broadcast_table,
&window, &window,
&recycler, &recycler,
&r, &r,
@ -955,7 +965,6 @@ mod test {
s_window, s_window,
s_retransmit, s_retransmit,
); );
let t_responder = { let t_responder = {
let (s_responder, r_responder) = channel(); let (s_responder, r_responder) = channel();
let t_responder = responder(tn.sockets.replicate, resp_recycler.clone(), r_responder); let t_responder = responder(tn.sockets.replicate, resp_recycler.clone(), r_responder);
@ -969,7 +978,7 @@ mod test {
w.set_id(me_id).unwrap(); w.set_id(me_id).unwrap();
assert_eq!(i, w.get_index().unwrap()); assert_eq!(i, w.get_index().unwrap());
w.meta.size = PACKET_DATA_SIZE; w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.data.gossip_addr); w.meta.set_addr(&tn.data.addrs.gossip);
} }
msgs.push_back(b); msgs.push_back(b);
} }

View File

@ -302,9 +302,9 @@ mod tests {
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new( let mut client = ThinClient::new(
leader_data.requests_addr, leader_data.addrs.requests,
requests_socket, requests_socket,
leader_data.transactions_addr, leader_data.addrs.transactions,
transactions_socket, transactions_socket,
); );
let last_id = client.get_last_id(); let last_id = client.get_last_id();
@ -344,9 +344,9 @@ mod tests {
.unwrap(); .unwrap();
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new( let mut client = ThinClient::new(
leader_data.requests_addr, leader_data.addrs.requests,
requests_socket, requests_socket,
leader_data.transactions_addr, leader_data.addrs.transactions,
transactions_socket, transactions_socket,
); );
let last_id = client.get_last_id(); let last_id = client.get_last_id();
@ -396,9 +396,9 @@ mod tests {
.unwrap(); .unwrap();
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut client = ThinClient::new( let mut client = ThinClient::new(
leader_data.requests_addr, leader_data.addrs.requests,
requests_socket, requests_socket,
leader_data.transactions_addr, leader_data.addrs.transactions,
transactions_socket, transactions_socket,
); );
let last_id = client.get_last_id(); let last_id = client.get_last_id();

View File

@ -198,7 +198,7 @@ pub mod tests {
let starting_balance = 10_000; let starting_balance = 10_000;
let mint = Mint::new(starting_balance); let mint = Mint::new(starting_balance);
let replicate_addr = target1.data.replicate_addr; let replicate_addr = target1.data.addrs.replicate;
let bank = Arc::new(Bank::new(&mint)); let bank = Arc::new(Bank::new(&mint));
//start crdt1 //start crdt1

View File

@ -29,8 +29,8 @@ fn converge(leader: &ReplicatedData, num_nodes: usize) -> Vec<ReplicatedData> {
let mut spy = TestNode::new(); let mut spy = TestNode::new();
let daddr = "0.0.0.0:0".parse().unwrap(); let daddr = "0.0.0.0:0".parse().unwrap();
let me = spy.data.id.clone(); let me = spy.data.id.clone();
spy.data.replicate_addr = daddr; spy.data.addrs.replicate = daddr;
spy.data.requests_addr = daddr; spy.data.addrs.requests = daddr;
let mut spy_crdt = Crdt::new(spy.data); let mut spy_crdt = Crdt::new(spy.data);
spy_crdt.insert(&leader); spy_crdt.insert(&leader);
spy_crdt.set_leader(leader.id); spy_crdt.set_leader(leader.id);
@ -55,7 +55,7 @@ fn converge(leader: &ReplicatedData, num_nodes: usize) -> Vec<ReplicatedData> {
.values() .values()
.into_iter() .into_iter()
.filter(|x| x.id != me) .filter(|x| x.id != me)
.filter(|x| x.requests_addr != daddr) .filter(|x| x.addrs.requests != daddr)
.cloned() .cloned()
.collect(); .collect();
if num >= num_nodes as u64 && v.len() >= num_nodes { if num >= num_nodes as u64 && v.len() >= num_nodes {
@ -110,7 +110,7 @@ fn test_multi_node_validator_catchup_from_zero() {
validator, validator,
false, false,
InFile::Path(ledger_path.clone()), InFile::Path(ledger_path.clone()),
Some(leader_data.gossip_addr), Some(leader_data.addrs.gossip),
None, None,
exit.clone(), exit.clone(),
); );
@ -143,7 +143,7 @@ fn test_multi_node_validator_catchup_from_zero() {
TestNode::new(), TestNode::new(),
false, false,
InFile::Path(ledger_path.clone()), InFile::Path(ledger_path.clone()),
Some(leader_data.gossip_addr), Some(leader_data.addrs.gossip),
None, None,
exit.clone(), exit.clone(),
); );
@ -211,7 +211,7 @@ fn test_multi_node_basic() {
validator, validator,
false, false,
InFile::Path(ledger_path.clone()), InFile::Path(ledger_path.clone()),
Some(leader_data.gossip_addr), Some(leader_data.addrs.gossip),
None, None,
exit.clone(), exit.clone(),
); );
@ -272,7 +272,7 @@ fn test_boot_validator_from_file() {
validator, validator,
false, false,
InFile::Path(ledger_path.clone()), InFile::Path(ledger_path.clone()),
Some(leader_data.gossip_addr), Some(leader_data.addrs.gossip),
None, None,
exit.clone(), exit.clone(),
); );
@ -356,7 +356,7 @@ fn test_leader_restart_validator_start_from_old_ledger() {
validator, validator,
false, false,
InFile::Path(stale_ledger_path.clone()), InFile::Path(stale_ledger_path.clone()),
Some(leader_data.gossip_addr), Some(leader_data.addrs.gossip),
None, None,
exit.clone(), exit.clone(),
); );
@ -416,7 +416,7 @@ fn test_multi_node_dynamic_network() {
validator, validator,
false, false,
InFile::Path(ledger_path.clone()), InFile::Path(ledger_path.clone()),
Some(leader_data.gossip_addr), Some(leader_data.addrs.gossip),
Some(OutFile::Path(ledger_path.clone())), Some(OutFile::Path(ledger_path.clone())),
exit.clone(), exit.clone(),
); );
@ -473,7 +473,7 @@ fn test_multi_node_dynamic_network() {
validator, validator,
false, false,
InFile::Path(ledger_path.clone()), InFile::Path(ledger_path.clone()),
Some(leader_data.gossip_addr), Some(leader_data.addrs.gossip),
Some(OutFile::Path(ledger_path.clone())), Some(OutFile::Path(ledger_path.clone())),
exit.clone(), exit.clone(),
); );
@ -509,12 +509,12 @@ fn mk_client(leader: &ReplicatedData) -> ThinClient {
.unwrap(); .unwrap();
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let daddr = "0.0.0.0:0".parse().unwrap(); let daddr = "0.0.0.0:0".parse().unwrap();
assert!(leader.requests_addr != daddr); assert!(leader.addrs.requests != daddr);
assert!(leader.transactions_addr != daddr); assert!(leader.addrs.transactions != daddr);
ThinClient::new( ThinClient::new(
leader.requests_addr, leader.addrs.requests,
requests_socket, requests_socket,
leader.transactions_addr, leader.addrs.transactions,
transactions_socket, transactions_socket,
) )
} }