diff --git a/src/crdt.rs b/src/crdt.rs index 6e136b5da..f9ef05a2b 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -479,7 +479,7 @@ impl Crdt { let errs: Vec<_> = orders .par_iter() .map(|v| { - debug!( + info!( "{:x}: retransmit blob {} to {:x}", me.debug_id(), rblob.get_index().unwrap(), @@ -766,8 +766,11 @@ impl Crdt { } else { assert!(window.read().unwrap()[pos].is_none()); info!( - "failed RequestWindowIndex {} {} {}", - ix, pos, from.repair_addr + "{:x}: failed RequestWindowIndex {:x} {} {}", + me.debug_id(), + from.debug_id(), + ix, + pos, ); } @@ -840,10 +843,10 @@ impl Crdt { obj.write().unwrap().insert(&from); let me = obj.read().unwrap().my_data().clone(); trace!( - "received RequestWindowIndex {} {} myaddr {}", + "{:x}:received RequestWindowIndex {:x} {} ", + me.debug_id(), + from.debug_id(), ix, - from.repair_addr, - me.repair_addr ); assert_ne!(from.repair_addr, me.repair_addr); Self::run_window_request(&window, &me, &from, ix, blob_recycler) diff --git a/src/streamer.rs b/src/streamer.rs index 25bc7d086..91674657f 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -253,6 +253,13 @@ fn recv_window( while let Ok(mut nq) = r.try_recv() { dq.append(&mut nq) } + info!( + "{:x}: RECV_WINDOW {} {}: got packets {}", + debug_id, + *consumed, + *received, + dq.len(), + ); { //retransmit all leader blocks let mut retransmitq = VecDeque::new(); @@ -291,6 +298,13 @@ fn recv_window( warn!("{:x}: no leader to retransmit from", debug_id); } if !retransmitq.is_empty() { + info!( + "{:x}: RECV_WINDOW {} {}: retransmit {}", + debug_id, + *consumed, + *received, + retransmitq.len(), + ); retransmit.send(retransmitq)?; } } @@ -398,6 +412,13 @@ fn recv_window( print_window(locked_window, *consumed); trace!("sending contq.len: {}", contq.len()); if !contq.is_empty() { + info!( + "{:x}: RECV_WINDOW {} {}: forwarding contq {}", + debug_id, + *consumed, + *received, + contq.len(), + ); trace!("sending contq.len: {}", contq.len()); s.send(contq)?; } diff --git a/tests/multinode.rs b/tests/multinode.rs index 62732b1d4..51344d19d 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -126,7 +126,7 @@ fn test_multi_node_validator_catchup_from_zero() { //verify validator has the same balance let mut success = 0usize; for server in servers.iter() { - info!("0server: {:?}", server.id[0]); + info!("0server: {:x}", server.debug_id()); let mut client = mk_client(server); if let Ok(bal) = client.poll_get_balance(&bob_pubkey) { info!("validator balance {}", bal); @@ -166,7 +166,7 @@ fn test_multi_node_validator_catchup_from_zero() { for server in servers.iter() { let mut client = mk_client(server); - info!("1server: {:?}", server.id[0]); + info!("1server: {:x}", server.debug_id()); for _ in 0..10 { if let Ok(bal) = client.poll_get_balance(&bob_pubkey) { info!("validator balance {}", bal); @@ -294,7 +294,7 @@ fn test_boot_validator_from_file() { #[test] fn test_multi_node_dynamic_network() { logger::setup(); - const N: usize = 3; + const N: usize = 25; let leader = TestNode::new(); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); @@ -305,7 +305,7 @@ fn test_multi_node_dynamic_network() { true, InFile::Path(ledger_path.clone()), None, - None, + Some(OutFile::Path(ledger_path.clone())), exit.clone(), ); let threads = server.thread_hdls; @@ -316,7 +316,7 @@ fn test_multi_node_dynamic_network() { send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(1000)).unwrap(); assert_eq!(leader_balance, 1000); - let mut vals: Vec<(ReplicatedData, Arc, FullNode)> = (0..N) + let mut validators: Vec<(ReplicatedData, Arc, FullNode)> = (0..N) .into_iter() .map(|_| { let exit = Arc::new(AtomicBool::new(false)); @@ -327,7 +327,7 @@ fn test_multi_node_dynamic_network() { false, InFile::Path(ledger_path.clone()), Some(leader_data.gossip_addr), - None, + Some(OutFile::Path(ledger_path.clone())), exit.clone(), ); (rd, exit, val) @@ -341,30 +341,50 @@ fn test_multi_node_dynamic_network() { .unwrap(); assert_eq!(leader_balance, expected); //verify all validators have the same balance - let mut success = 0usize; - for server in vals.iter() { - let mut client = mk_client(&server.0); - let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(expected)); - info!("{:x} {} get_balance: {:?}", server.0.debug_id(), i, getbal); - if let Some(bal) = getbal { - if bal == leader_balance { - success += 1; + for i in 0..10 { + let mut success = 0usize; + let mut distance = 0i64; + for server in validators.iter() { + let mut client = mk_client(&server.0); + let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(expected)); + trace!( + "{:x} {} get_balance: {:?} expected: {}", + server.0.debug_id(), + i, + getbal, + expected + ); + let bal = getbal.unwrap_or(0); + distance += (expected - bal) / 500; + if let Some(bal) = getbal { + if bal == leader_balance { + success += 1; + } } } + if success == validators.len() { + break; + } + sleep(Duration::from_millis(i * 100)); + info!( + "SUCCESS {} out of {} distance: {}", + success, + validators.len(), + distance + ); } - info!("SUCCESS {} out of {}", success, vals.len()); - // this should be almost true, or at least vals.len() - 1 while the other node catches up - //assert!(success == vals.len()); + // this should be almost true, or at least validators.len() - 1 while the other node catches up + //assert!(success == validators.len()); //kill a validator - vals[i].1.store(true, Ordering::Relaxed); + validators[i].1.store(true, Ordering::Relaxed); let mut ts = vec![]; - ts.append(&mut vals[i].2.thread_hdls); + ts.append(&mut validators[i].2.thread_hdls); for t in ts.into_iter() { t.join().unwrap(); } - info!("{:x} KILLED", vals[i].0.debug_id()); + info!("{:x} KILLED", validators[i].0.debug_id()); //add a new one - vals[i] = { + validators[i] = { let exit = Arc::new(AtomicBool::new(false)); let validator = TestNode::new(); let rd = validator.data.clone(); @@ -373,14 +393,14 @@ fn test_multi_node_dynamic_network() { false, InFile::Path(ledger_path.clone()), Some(leader_data.gossip_addr), - None, + Some(OutFile::Path(ledger_path.clone())), exit.clone(), ); info!("{:x} ADDED", rd.debug_id()); (rd, exit, val) }; } - for (_, exit, val) in vals.into_iter() { + for (_, exit, val) in validators.into_iter() { exit.store(true, Ordering::Relaxed); for t in val.thread_hdls { t.join().unwrap(); @@ -415,9 +435,10 @@ fn retry_get_balance( bob_pubkey: &PublicKey, expected: Option, ) -> Option { - for _ in 0..10 { + const LAST: usize = 9; + for run in 0..(LAST + 1) { let out = client.poll_get_balance(bob_pubkey); - if expected.is_none() { + if expected.is_none() || run == LAST { return out.ok().clone(); } if let (Some(e), Ok(o)) = (expected, out) {