Cleanup tpu forwarder (#2377)

* Use unwrap() on locks

An error there generally indicates a programmer error, not a
runtime error, so a detailed runtime message is not generally useful.

* Only clone Arcs when passing them across thread boundaries

* Cleanup TPU forwarder

By separating the query from the update, all the branches get easier to
test. Also, the update operation gets so simple, that we see it
belongs over in packet.rs.

* Thanks clippy

cute
This commit is contained in:
Greg Fitzgerald 2019-01-10 13:34:48 -07:00 committed by GitHub
parent b9c27e3a9d
commit 7341298a11
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 79 deletions

View File

@ -117,6 +117,14 @@ impl Default for Packets {
} }
} }
impl Packets {
pub fn set_addr(&mut self, addr: &SocketAddr) {
for m in self.packets.iter_mut() {
m.meta.set_addr(&addr);
}
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct Blob { pub struct Blob {
pub data: [u8; BLOB_SIZE], pub data: [u8; BLOB_SIZE],
@ -475,7 +483,17 @@ mod tests {
use solana_sdk::transaction::Transaction; use solana_sdk::transaction::Transaction;
use std::io; use std::io;
use std::io::Write; use std::io::Write;
use std::net::UdpSocket; use std::net::{Ipv4Addr, SocketAddr, UdpSocket};
#[test]
fn test_packets_set_addr() {
// test that the address is actually being updated
let send_addr = socketaddr!([127, 0, 0, 1], 123);
let packets = vec![Packet::default()];
let mut msgs = Packets { packets };
msgs.set_addr(&send_addr);
assert_eq!(SocketAddr::from(msgs.packets[0].meta.addr()), send_addr);
}
#[test] #[test]
pub fn packet_send_recv() { pub fn packet_send_recv() {

View File

@ -5,20 +5,27 @@
use crate::cluster_info::ClusterInfo; use crate::cluster_info::ClusterInfo;
use crate::contact_info::ContactInfo; use crate::contact_info::ContactInfo;
use crate::counter::Counter; use crate::counter::Counter;
use crate::packet::Packets;
use crate::result::Result; use crate::result::Result;
use crate::service::Service; use crate::service::Service;
use crate::streamer::{self, PacketReceiver}; use crate::streamer::{self, PacketReceiver};
use log::Level; use log::Level;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std::error::Error; use std::net::{SocketAddr, UdpSocket};
use std::net::UdpSocket;
use std::result;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
fn get_forwarding_addr(leader_data: Option<&ContactInfo>, my_id: &Pubkey) -> Option<SocketAddr> {
let leader_data = leader_data?;
if leader_data.id == *my_id || !ContactInfo::is_valid_address(&leader_data.tpu) {
// weird cases, but we don't want to broadcast, send to ANY, or
// induce an infinite loop, but this shouldn't happen, or shouldn't be true for long...
return None;
}
Some(leader_data.tpu)
}
pub struct TpuForwarder { pub struct TpuForwarder {
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
@ -28,10 +35,7 @@ impl TpuForwarder {
fn forward(receiver: &PacketReceiver, cluster_info: &Arc<RwLock<ClusterInfo>>) -> Result<()> { fn forward(receiver: &PacketReceiver, cluster_info: &Arc<RwLock<ClusterInfo>>) -> Result<()> {
let socket = UdpSocket::bind("0.0.0.0:0")?; let socket = UdpSocket::bind("0.0.0.0:0")?;
let my_id = cluster_info let my_id = cluster_info.read().unwrap().id();
.read()
.expect("cluster_info.read() in TpuForwarder::forward()")
.id();
loop { loop {
let msgs = receiver.recv()?; let msgs = receiver.recv()?;
@ -40,37 +44,13 @@ impl TpuForwarder {
"tpu_forwarder-msgs_received", "tpu_forwarder-msgs_received",
msgs.read().unwrap().packets.len() msgs.read().unwrap().packets.len()
); );
let leader_data = cluster_info
.read()
.expect("cluster_info.read() in TpuForwarder::forward()")
.leader_data()
.cloned();
match TpuForwarder::update_addrs(leader_data, &my_id, &msgs.clone()) {
Ok(_) => msgs.read().unwrap().send_to(&socket)?,
Err(_) => continue,
}
}
}
fn update_addrs( let send_addr = get_forwarding_addr(cluster_info.read().unwrap().leader_data(), &my_id);
leader_data: Option<ContactInfo>,
my_id: &Pubkey,
msgs: &Arc<RwLock<Packets>>,
) -> result::Result<(), Box<dyn Error>> {
match leader_data {
Some(leader_data) => {
if leader_data.id == *my_id || !ContactInfo::is_valid_address(&leader_data.tpu) {
// weird cases, but we don't want to broadcast, send to ANY, or
// induce an infinite loop, but this shouldn't happen, or shouldn't be true for long...
return Err("Invalid leader addr")?;
}
for m in msgs.write().unwrap().packets.iter_mut() { if let Some(send_addr) = send_addr {
m.meta.set_addr(&leader_data.tpu); msgs.write().unwrap().set_addr(&send_addr);
} msgs.read().unwrap().send_to(&socket)?;
Ok(())
} }
_ => Err("No leader contact data")?,
} }
} }
@ -123,51 +103,26 @@ impl Service for TpuForwarder {
mod tests { mod tests {
use super::*; use super::*;
use crate::contact_info::ContactInfo; use crate::contact_info::ContactInfo;
use crate::packet::Packet;
use solana_netutil::bind_in_range;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use std::net::{Ipv4Addr, SocketAddr};
#[test] #[test]
pub fn test_update_addrs() { fn test_get_forwarding_addr() {
let keypair = Keypair::new(); let my_id = Keypair::new().pubkey();
let my_id = keypair.pubkey();
// test with no pubkey // Test with no leader
assert!(!TpuForwarder::update_addrs( assert_eq!(get_forwarding_addr(None, &my_id,), None);
None,
&my_id, // Test with no TPU
&Arc::new(RwLock::new(Packets::default())) let leader_data = ContactInfo::default();
) assert_eq!(get_forwarding_addr(Some(&leader_data), &my_id,), None);
.is_ok());
// test with no tpu // Test with my pubkey
assert!(!TpuForwarder::update_addrs(
Some(ContactInfo::default()),
&my_id,
&Arc::new(RwLock::new(Packets::default()))
)
.is_ok());
// test with my pubkey
let leader_data = ContactInfo::new_localhost(my_id, 0); let leader_data = ContactInfo::new_localhost(my_id, 0);
assert!(!TpuForwarder::update_addrs( assert_eq!(get_forwarding_addr(Some(&leader_data), &my_id,), None);
Some(leader_data),
&my_id, // Test with pubkey other than mine
&Arc::new(RwLock::new(Packets::default())) let alice_id = Keypair::new().pubkey();
) let leader_data = ContactInfo::new_localhost(alice_id, 0);
.is_ok()); assert!(get_forwarding_addr(Some(&leader_data), &my_id,).is_some());
// test that the address is actually being updated
let (port, _) = bind_in_range((8000, 10000)).unwrap();
let leader_data = ContactInfo::new_with_socketaddr(&socketaddr!([127, 0, 0, 1], port));
let packet = Packet::default();
let p = Packets {
packets: vec![packet],
};
let msgs = Arc::new(RwLock::new(p));
assert!(
TpuForwarder::update_addrs(Some(leader_data.clone()), &my_id, &msgs.clone()).is_ok()
);
assert_eq!(
SocketAddr::from(msgs.read().unwrap().packets[0].meta.addr()),
leader_data.tpu
);
} }
} }