2018-05-25 22:00:47 -07:00
|
|
|
//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
|
|
|
|
//!
|
2018-11-16 08:45:59 -08:00
|
|
|
|
2021-12-03 09:00:31 -08:00
|
|
|
use {
|
|
|
|
crate::{
|
2022-02-11 05:36:50 -08:00
|
|
|
packet::{self, PacketBatch, PacketBatchRecycler, PACKETS_PER_BATCH},
|
|
|
|
sendmmsg::{batch_send, SendPktsError},
|
2021-12-03 09:00:31 -08:00
|
|
|
socket::SocketAddrSpace,
|
|
|
|
},
|
2022-01-11 02:44:46 -08:00
|
|
|
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
|
2021-12-17 15:21:05 -08:00
|
|
|
histogram::Histogram,
|
2022-07-11 11:06:40 -07:00
|
|
|
solana_sdk::{packet::Packet, pubkey::Pubkey, timing::timestamp},
|
2021-12-03 09:00:31 -08:00
|
|
|
std::{
|
2021-12-17 15:21:05 -08:00
|
|
|
cmp::Reverse,
|
|
|
|
collections::HashMap,
|
2022-01-02 08:13:57 -08:00
|
|
|
net::{IpAddr, UdpSocket},
|
2021-12-03 09:00:31 -08:00
|
|
|
sync::{
|
2022-05-05 11:56:18 -07:00
|
|
|
atomic::{AtomicBool, AtomicUsize, Ordering},
|
2021-12-03 09:00:31 -08:00
|
|
|
Arc,
|
|
|
|
},
|
2022-05-14 08:53:37 -07:00
|
|
|
thread::{sleep, Builder, JoinHandle},
|
2021-12-03 09:00:31 -08:00
|
|
|
time::{Duration, Instant},
|
|
|
|
},
|
|
|
|
thiserror::Error,
|
2021-07-23 08:25:03 -07:00
|
|
|
};
|
2018-03-07 13:47:13 -08:00
|
|
|
|
2022-06-21 12:06:44 -07:00
|
|
|
// Total stake and nodes => stake map
|
|
|
|
#[derive(Default)]
|
|
|
|
pub struct StakedNodes {
|
2022-06-30 17:56:15 -07:00
|
|
|
pub total_stake: u64,
|
2022-08-09 10:02:47 -07:00
|
|
|
pub max_stake: u64,
|
|
|
|
pub min_stake: u64,
|
2022-07-11 11:06:40 -07:00
|
|
|
pub ip_stake_map: HashMap<IpAddr, u64>,
|
|
|
|
pub pubkey_stake_map: HashMap<Pubkey, u64>,
|
2022-06-21 12:06:44 -07:00
|
|
|
}
|
|
|
|
|
2021-12-11 06:44:15 -08:00
|
|
|
pub type PacketBatchReceiver = Receiver<PacketBatch>;
|
|
|
|
pub type PacketBatchSender = Sender<PacketBatch>;
|
2018-07-17 15:00:22 -07:00
|
|
|
|
2020-01-02 19:50:43 -08:00
|
|
|
#[derive(Error, Debug)]
|
|
|
|
pub enum StreamerError {
|
|
|
|
#[error("I/O error")]
|
2021-02-18 23:42:09 -08:00
|
|
|
Io(#[from] std::io::Error),
|
2020-01-02 19:50:43 -08:00
|
|
|
|
|
|
|
#[error("receive timeout error")]
|
2021-06-18 11:47:40 -07:00
|
|
|
RecvTimeout(#[from] RecvTimeoutError),
|
2020-01-02 19:50:43 -08:00
|
|
|
|
|
|
|
#[error("send packets error")]
|
2021-12-11 06:44:15 -08:00
|
|
|
Send(#[from] SendError<PacketBatch>),
|
2022-02-11 05:36:50 -08:00
|
|
|
|
|
|
|
#[error(transparent)]
|
|
|
|
SendPktsError(#[from] SendPktsError),
|
2020-01-02 19:50:43 -08:00
|
|
|
}
|
|
|
|
|
2022-05-05 11:56:18 -07:00
|
|
|
pub struct StreamerReceiveStats {
|
|
|
|
pub name: &'static str,
|
|
|
|
pub packets_count: AtomicUsize,
|
|
|
|
pub packet_batches_count: AtomicUsize,
|
|
|
|
pub full_packet_batches_count: AtomicUsize,
|
|
|
|
pub max_channel_len: AtomicUsize,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl StreamerReceiveStats {
|
|
|
|
pub fn new(name: &'static str) -> Self {
|
|
|
|
Self {
|
|
|
|
name,
|
|
|
|
packets_count: AtomicUsize::default(),
|
|
|
|
packet_batches_count: AtomicUsize::default(),
|
|
|
|
full_packet_batches_count: AtomicUsize::default(),
|
|
|
|
max_channel_len: AtomicUsize::default(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn report(&self) {
|
|
|
|
datapoint_info!(
|
|
|
|
self.name,
|
|
|
|
(
|
|
|
|
"packets_count",
|
|
|
|
self.packets_count.swap(0, Ordering::Relaxed) as i64,
|
|
|
|
i64
|
|
|
|
),
|
|
|
|
(
|
|
|
|
"packet_batches_count",
|
|
|
|
self.packet_batches_count.swap(0, Ordering::Relaxed) as i64,
|
|
|
|
i64
|
|
|
|
),
|
|
|
|
(
|
|
|
|
"full_packet_batches_count",
|
|
|
|
self.full_packet_batches_count.swap(0, Ordering::Relaxed) as i64,
|
|
|
|
i64
|
|
|
|
),
|
|
|
|
(
|
|
|
|
"channel_len",
|
|
|
|
self.max_channel_len.swap(0, Ordering::Relaxed) as i64,
|
|
|
|
i64
|
|
|
|
),
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-01-02 19:50:43 -08:00
|
|
|
pub type Result<T> = std::result::Result<T, StreamerError>;
|
|
|
|
|
2019-06-27 00:32:32 -07:00
|
|
|
fn recv_loop(
|
2022-05-05 11:56:18 -07:00
|
|
|
socket: &UdpSocket,
|
2019-06-27 00:32:32 -07:00
|
|
|
exit: Arc<AtomicBool>,
|
2022-05-05 11:56:18 -07:00
|
|
|
packet_batch_sender: &PacketBatchSender,
|
2021-12-11 06:44:15 -08:00
|
|
|
recycler: &PacketBatchRecycler,
|
2022-05-05 11:56:18 -07:00
|
|
|
stats: &StreamerReceiveStats,
|
2021-02-26 09:15:45 -08:00
|
|
|
coalesce_ms: u64,
|
2021-06-14 07:10:04 -07:00
|
|
|
use_pinned_memory: bool,
|
2022-05-14 08:53:37 -07:00
|
|
|
in_vote_only_mode: Option<Arc<AtomicBool>>,
|
2019-06-27 00:32:32 -07:00
|
|
|
) -> Result<()> {
|
2018-03-07 13:47:13 -08:00
|
|
|
loop {
|
2021-12-11 06:44:15 -08:00
|
|
|
let mut packet_batch = if use_pinned_memory {
|
2022-05-05 11:56:18 -07:00
|
|
|
PacketBatch::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, stats.name)
|
2021-06-14 07:10:04 -07:00
|
|
|
} else {
|
2021-12-11 06:44:15 -08:00
|
|
|
PacketBatch::with_capacity(PACKETS_PER_BATCH)
|
2021-06-14 07:10:04 -07:00
|
|
|
};
|
2018-03-07 13:47:13 -08:00
|
|
|
loop {
|
2018-09-25 15:41:29 -07:00
|
|
|
// Check for exit signal, even if socket is busy
|
2019-11-14 10:24:53 -08:00
|
|
|
// (for instance the leader transaction socket)
|
2018-09-25 15:41:29 -07:00
|
|
|
if exit.load(Ordering::Relaxed) {
|
|
|
|
return Ok(());
|
|
|
|
}
|
2022-05-14 08:53:37 -07:00
|
|
|
|
|
|
|
if let Some(ref in_vote_only_mode) = in_vote_only_mode {
|
|
|
|
if in_vote_only_mode.load(Ordering::Relaxed) {
|
|
|
|
sleep(Duration::from_millis(1));
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-05-05 11:56:18 -07:00
|
|
|
if let Ok(len) = packet::recv_from(&mut packet_batch, socket, coalesce_ms) {
|
2021-04-07 08:15:38 -07:00
|
|
|
if len > 0 {
|
2022-05-05 11:56:18 -07:00
|
|
|
let StreamerReceiveStats {
|
|
|
|
packets_count,
|
|
|
|
packet_batches_count,
|
|
|
|
full_packet_batches_count,
|
|
|
|
max_channel_len,
|
|
|
|
..
|
|
|
|
} = stats;
|
|
|
|
|
|
|
|
packets_count.fetch_add(len, Ordering::Relaxed);
|
|
|
|
packet_batches_count.fetch_add(1, Ordering::Relaxed);
|
|
|
|
max_channel_len.fetch_max(packet_batch_sender.len(), Ordering::Relaxed);
|
|
|
|
if len == PACKETS_PER_BATCH {
|
|
|
|
full_packet_batches_count.fetch_add(1, Ordering::Relaxed);
|
|
|
|
}
|
|
|
|
|
|
|
|
packet_batch_sender.send(packet_batch)?;
|
2019-12-10 11:28:07 -08:00
|
|
|
}
|
2018-09-27 13:49:50 -07:00
|
|
|
break;
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn receiver(
|
2022-05-05 11:56:18 -07:00
|
|
|
socket: Arc<UdpSocket>,
|
|
|
|
exit: Arc<AtomicBool>,
|
|
|
|
packet_batch_sender: PacketBatchSender,
|
2021-12-11 06:44:15 -08:00
|
|
|
recycler: PacketBatchRecycler,
|
2022-05-05 11:56:18 -07:00
|
|
|
stats: Arc<StreamerReceiveStats>,
|
2021-02-26 09:15:45 -08:00
|
|
|
coalesce_ms: u64,
|
2021-06-14 07:10:04 -07:00
|
|
|
use_pinned_memory: bool,
|
2022-05-14 08:53:37 -07:00
|
|
|
in_vote_only_mode: Option<Arc<AtomicBool>>,
|
2018-05-15 08:53:51 -07:00
|
|
|
) -> JoinHandle<()> {
|
2022-05-05 11:56:18 -07:00
|
|
|
let res = socket.set_read_timeout(Some(Duration::new(1, 0)));
|
2022-01-21 16:01:22 -08:00
|
|
|
assert!(res.is_ok(), "streamer::receiver set_read_timeout error");
|
2018-05-30 13:13:14 -07:00
|
|
|
Builder::new()
|
2022-08-17 08:40:23 -07:00
|
|
|
.name("solReceiver".to_string())
|
2018-05-30 13:13:14 -07:00
|
|
|
.spawn(move || {
|
2021-02-26 09:15:45 -08:00
|
|
|
let _ = recv_loop(
|
2022-05-05 11:56:18 -07:00
|
|
|
&socket,
|
2021-02-26 09:15:45 -08:00
|
|
|
exit,
|
2022-05-05 11:56:18 -07:00
|
|
|
&packet_batch_sender,
|
|
|
|
&recycler,
|
|
|
|
&stats,
|
2021-02-26 09:15:45 -08:00
|
|
|
coalesce_ms,
|
2021-06-14 07:10:04 -07:00
|
|
|
use_pinned_memory,
|
2022-05-14 08:53:37 -07:00
|
|
|
in_vote_only_mode,
|
2021-02-26 09:15:45 -08:00
|
|
|
);
|
2018-12-07 19:01:28 -08:00
|
|
|
})
|
|
|
|
.unwrap()
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|
|
|
|
|
2021-12-17 15:21:05 -08:00
|
|
|
#[derive(Debug, Default)]
|
|
|
|
struct SendStats {
|
|
|
|
bytes: u64,
|
|
|
|
count: u64,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Default)]
|
|
|
|
struct StreamerSendStats {
|
2022-01-02 08:13:57 -08:00
|
|
|
host_map: HashMap<IpAddr, SendStats>,
|
2021-12-17 15:21:05 -08:00
|
|
|
since: Option<Instant>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl StreamerSendStats {
|
|
|
|
fn report_stats(
|
|
|
|
name: &'static str,
|
2022-01-02 08:13:57 -08:00
|
|
|
host_map: HashMap<IpAddr, SendStats>,
|
2021-12-17 15:21:05 -08:00
|
|
|
sample_duration: Option<Duration>,
|
|
|
|
) {
|
|
|
|
const MAX_REPORT_ENTRIES: usize = 5;
|
|
|
|
let sample_ms = sample_duration.map(|d| d.as_millis()).unwrap_or_default();
|
|
|
|
let mut hist = Histogram::default();
|
|
|
|
let mut byte_sum = 0;
|
|
|
|
let mut pkt_count = 0;
|
|
|
|
host_map.iter().for_each(|(_addr, host_stats)| {
|
|
|
|
hist.increment(host_stats.bytes).unwrap();
|
|
|
|
byte_sum += host_stats.bytes;
|
|
|
|
pkt_count += host_stats.count;
|
|
|
|
});
|
|
|
|
|
|
|
|
datapoint_info!(
|
|
|
|
name,
|
|
|
|
("streamer-send-sample_duration_ms", sample_ms, i64),
|
|
|
|
("streamer-send-host_count", host_map.len(), i64),
|
|
|
|
("streamer-send-bytes_total", byte_sum, i64),
|
|
|
|
("streamer-send-pkt_count_total", pkt_count, i64),
|
|
|
|
(
|
|
|
|
"streamer-send-host_bytes_min",
|
|
|
|
hist.minimum().unwrap_or_default(),
|
|
|
|
i64
|
|
|
|
),
|
|
|
|
(
|
|
|
|
"streamer-send-host_bytes_max",
|
|
|
|
hist.maximum().unwrap_or_default(),
|
|
|
|
i64
|
|
|
|
),
|
|
|
|
(
|
|
|
|
"streamer-send-host_bytes_mean",
|
|
|
|
hist.mean().unwrap_or_default(),
|
|
|
|
i64
|
|
|
|
),
|
|
|
|
(
|
|
|
|
"streamer-send-host_bytes_90pct",
|
|
|
|
hist.percentile(90.0).unwrap_or_default(),
|
|
|
|
i64
|
|
|
|
),
|
|
|
|
(
|
|
|
|
"streamer-send-host_bytes_50pct",
|
|
|
|
hist.percentile(50.0).unwrap_or_default(),
|
|
|
|
i64
|
|
|
|
),
|
|
|
|
(
|
|
|
|
"streamer-send-host_bytes_10pct",
|
|
|
|
hist.percentile(10.0).unwrap_or_default(),
|
|
|
|
i64
|
|
|
|
),
|
|
|
|
);
|
|
|
|
|
|
|
|
let num_entries = host_map.len();
|
|
|
|
let mut entries: Vec<_> = host_map.into_iter().collect();
|
|
|
|
if entries.len() > MAX_REPORT_ENTRIES {
|
|
|
|
entries.select_nth_unstable_by_key(MAX_REPORT_ENTRIES, |(_addr, stats)| {
|
|
|
|
Reverse(stats.bytes)
|
|
|
|
});
|
|
|
|
entries.truncate(MAX_REPORT_ENTRIES);
|
|
|
|
}
|
|
|
|
info!(
|
|
|
|
"streamer send {} hosts: count:{} {:?}",
|
|
|
|
name, num_entries, entries,
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
fn maybe_submit(&mut self, name: &'static str, sender: &Sender<Box<dyn FnOnce() + Send>>) {
|
|
|
|
const SUBMIT_CADENCE: Duration = Duration::from_secs(10);
|
|
|
|
const MAP_SIZE_REPORTING_THRESHOLD: usize = 1_000;
|
|
|
|
let elapsed = self.since.as_ref().map(Instant::elapsed);
|
|
|
|
if elapsed.map(|e| e < SUBMIT_CADENCE).unwrap_or_default()
|
|
|
|
&& self.host_map.len() < MAP_SIZE_REPORTING_THRESHOLD
|
|
|
|
{
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
let host_map = std::mem::take(&mut self.host_map);
|
|
|
|
let _ = sender.send(Box::new(move || {
|
|
|
|
Self::report_stats(name, host_map, elapsed);
|
|
|
|
}));
|
|
|
|
|
|
|
|
*self = Self {
|
|
|
|
since: Some(Instant::now()),
|
|
|
|
..Self::default()
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
fn record(&mut self, pkt: &Packet) {
|
2022-12-06 03:54:49 -08:00
|
|
|
let ent = self.host_map.entry(pkt.meta().addr).or_default();
|
2021-12-17 15:21:05 -08:00
|
|
|
ent.count += 1;
|
2022-06-02 18:05:06 -07:00
|
|
|
ent.bytes += pkt.data(..).map(<[u8]>::len).unwrap_or_default() as u64;
|
2021-12-17 15:21:05 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-23 08:25:03 -07:00
|
|
|
fn recv_send(
|
|
|
|
sock: &UdpSocket,
|
2021-12-11 06:44:15 -08:00
|
|
|
r: &PacketBatchReceiver,
|
2021-07-23 08:25:03 -07:00
|
|
|
socket_addr_space: &SocketAddrSpace,
|
2021-12-17 15:21:05 -08:00
|
|
|
stats: &mut Option<StreamerSendStats>,
|
2021-07-23 08:25:03 -07:00
|
|
|
) -> Result<()> {
|
2018-03-07 13:47:13 -08:00
|
|
|
let timer = Duration::new(1, 0);
|
2021-12-11 06:44:15 -08:00
|
|
|
let packet_batch = r.recv_timeout(timer)?;
|
2021-12-17 15:21:05 -08:00
|
|
|
if let Some(stats) = stats {
|
2022-05-23 13:30:15 -07:00
|
|
|
packet_batch.iter().for_each(|p| stats.record(p));
|
2021-12-17 15:21:05 -08:00
|
|
|
}
|
2022-05-23 13:30:15 -07:00
|
|
|
let packets = packet_batch.iter().filter_map(|pkt| {
|
2022-12-06 03:54:49 -08:00
|
|
|
let addr = pkt.meta().socket_addr();
|
2022-06-02 18:05:06 -07:00
|
|
|
let data = pkt.data(..)?;
|
2022-08-22 18:01:03 -07:00
|
|
|
socket_addr_space.check(&addr).then_some((data, addr))
|
2022-02-11 05:36:50 -08:00
|
|
|
});
|
|
|
|
batch_send(sock, &packets.collect::<Vec<_>>())?;
|
2018-03-07 13:47:13 -08:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2021-12-16 06:47:55 -08:00
|
|
|
pub fn recv_vec_packet_batches(
|
|
|
|
recvr: &Receiver<Vec<PacketBatch>>,
|
|
|
|
) -> Result<(Vec<PacketBatch>, usize, Duration)> {
|
|
|
|
let timer = Duration::new(1, 0);
|
|
|
|
let mut packet_batches = recvr.recv_timeout(timer)?;
|
|
|
|
let recv_start = Instant::now();
|
|
|
|
trace!("got packets");
|
|
|
|
let mut num_packets = packet_batches
|
|
|
|
.iter()
|
2022-05-23 13:30:15 -07:00
|
|
|
.map(|packets| packets.len())
|
2021-12-16 06:47:55 -08:00
|
|
|
.sum::<usize>();
|
|
|
|
while let Ok(packet_batch) = recvr.try_recv() {
|
|
|
|
trace!("got more packets");
|
|
|
|
num_packets += packet_batch
|
|
|
|
.iter()
|
2022-05-23 13:30:15 -07:00
|
|
|
.map(|packets| packets.len())
|
2021-12-16 06:47:55 -08:00
|
|
|
.sum::<usize>();
|
|
|
|
packet_batches.extend(packet_batch);
|
|
|
|
}
|
|
|
|
let recv_duration = recv_start.elapsed();
|
|
|
|
trace!(
|
|
|
|
"packet batches len: {}, num packets: {}",
|
|
|
|
packet_batches.len(),
|
|
|
|
num_packets
|
|
|
|
);
|
|
|
|
Ok((packet_batches, num_packets, recv_duration))
|
|
|
|
}
|
|
|
|
|
2021-12-11 06:44:15 -08:00
|
|
|
pub fn recv_packet_batches(
|
|
|
|
recvr: &PacketBatchReceiver,
|
|
|
|
) -> Result<(Vec<PacketBatch>, usize, Duration)> {
|
2018-05-10 14:47:42 -07:00
|
|
|
let timer = Duration::new(1, 0);
|
2021-12-11 06:44:15 -08:00
|
|
|
let packet_batch = recvr.recv_timeout(timer)?;
|
2018-09-24 17:13:49 -07:00
|
|
|
let recv_start = Instant::now();
|
2021-12-11 06:44:15 -08:00
|
|
|
trace!("got packets");
|
2022-05-23 13:30:15 -07:00
|
|
|
let mut num_packets = packet_batch.len();
|
2021-12-11 06:44:15 -08:00
|
|
|
let mut packet_batches = vec![packet_batch];
|
|
|
|
while let Ok(packet_batch) = recvr.try_recv() {
|
|
|
|
trace!("got more packets");
|
2022-05-23 13:30:15 -07:00
|
|
|
num_packets += packet_batch.len();
|
2021-12-11 06:44:15 -08:00
|
|
|
packet_batches.push(packet_batch);
|
2018-05-10 14:47:42 -07:00
|
|
|
}
|
2021-09-21 10:37:58 -07:00
|
|
|
let recv_duration = recv_start.elapsed();
|
2021-12-11 06:44:15 -08:00
|
|
|
trace!(
|
|
|
|
"packet batches len: {}, num packets: {}",
|
|
|
|
packet_batches.len(),
|
|
|
|
num_packets
|
|
|
|
);
|
|
|
|
Ok((packet_batches, num_packets, recv_duration))
|
2018-05-10 14:47:42 -07:00
|
|
|
}
|
|
|
|
|
2021-07-23 08:25:03 -07:00
|
|
|
pub fn responder(
|
|
|
|
name: &'static str,
|
|
|
|
sock: Arc<UdpSocket>,
|
2021-12-11 06:44:15 -08:00
|
|
|
r: PacketBatchReceiver,
|
2021-07-23 08:25:03 -07:00
|
|
|
socket_addr_space: SocketAddrSpace,
|
2021-12-17 15:21:05 -08:00
|
|
|
stats_reporter_sender: Option<Sender<Box<dyn FnOnce() + Send>>>,
|
2021-07-23 08:25:03 -07:00
|
|
|
) -> JoinHandle<()> {
|
2018-05-30 13:13:14 -07:00
|
|
|
Builder::new()
|
2022-08-17 08:40:23 -07:00
|
|
|
.name(format!("solRspndr{}", name))
|
2020-06-18 13:30:55 -07:00
|
|
|
.spawn(move || {
|
|
|
|
let mut errors = 0;
|
|
|
|
let mut last_error = None;
|
|
|
|
let mut last_print = 0;
|
2021-12-17 15:21:05 -08:00
|
|
|
let mut stats = None;
|
|
|
|
|
|
|
|
if stats_reporter_sender.is_some() {
|
|
|
|
stats = Some(StreamerSendStats::default());
|
|
|
|
}
|
|
|
|
|
2020-06-18 13:30:55 -07:00
|
|
|
loop {
|
2021-12-17 15:21:05 -08:00
|
|
|
if let Err(e) = recv_send(&sock, &r, &socket_addr_space, &mut stats) {
|
2020-06-18 13:30:55 -07:00
|
|
|
match e {
|
2021-06-18 11:47:40 -07:00
|
|
|
StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break,
|
|
|
|
StreamerError::RecvTimeout(RecvTimeoutError::Timeout) => (),
|
2020-06-18 13:30:55 -07:00
|
|
|
_ => {
|
|
|
|
errors += 1;
|
|
|
|
last_error = Some(e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
let now = timestamp();
|
|
|
|
if now - last_print > 1000 && errors != 0 {
|
|
|
|
datapoint_info!(name, ("errors", errors, i64),);
|
|
|
|
info!("{} last-error: {:?} count: {}", name, last_error, errors);
|
|
|
|
last_print = now;
|
|
|
|
errors = 0;
|
2018-07-05 15:41:03 -07:00
|
|
|
}
|
2021-12-17 15:21:05 -08:00
|
|
|
if let Some(ref stats_reporter_sender) = stats_reporter_sender {
|
|
|
|
if let Some(ref mut stats) = stats {
|
|
|
|
stats.maybe_submit(name, stats_reporter_sender);
|
|
|
|
}
|
|
|
|
}
|
2018-05-30 13:13:14 -07:00
|
|
|
}
|
2018-12-07 19:01:28 -08:00
|
|
|
})
|
|
|
|
.unwrap()
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|
|
|
|
|
2018-03-19 16:09:47 -07:00
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
2021-12-03 09:00:31 -08:00
|
|
|
use {
|
|
|
|
super::*,
|
|
|
|
crate::{
|
2021-12-11 06:44:15 -08:00
|
|
|
packet::{Packet, PacketBatch, PACKET_DATA_SIZE},
|
2021-12-03 09:00:31 -08:00
|
|
|
streamer::{receiver, responder},
|
|
|
|
},
|
2022-01-11 02:44:46 -08:00
|
|
|
crossbeam_channel::unbounded,
|
2021-12-03 09:00:31 -08:00
|
|
|
solana_perf::recycler::Recycler,
|
|
|
|
std::{
|
|
|
|
io,
|
|
|
|
io::Write,
|
|
|
|
net::UdpSocket,
|
|
|
|
sync::{
|
|
|
|
atomic::{AtomicBool, Ordering},
|
|
|
|
Arc,
|
|
|
|
},
|
|
|
|
time::Duration,
|
|
|
|
},
|
|
|
|
};
|
2018-03-07 13:47:13 -08:00
|
|
|
|
2021-12-11 06:44:15 -08:00
|
|
|
fn get_packet_batches(r: PacketBatchReceiver, num_packets: &mut usize) {
|
2019-03-18 22:08:21 -07:00
|
|
|
for _ in 0..10 {
|
2021-12-11 06:44:15 -08:00
|
|
|
let packet_batch_res = r.recv_timeout(Duration::new(1, 0));
|
|
|
|
if packet_batch_res.is_err() {
|
2019-05-29 12:17:50 -07:00
|
|
|
continue;
|
|
|
|
}
|
2019-03-18 22:08:21 -07:00
|
|
|
|
2022-05-23 13:30:15 -07:00
|
|
|
*num_packets -= packet_batch_res.unwrap().len();
|
2019-03-18 22:08:21 -07:00
|
|
|
|
2021-12-11 06:44:15 -08:00
|
|
|
if *num_packets == 0 {
|
2018-03-07 13:47:13 -08:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-12-13 17:26:34 -08:00
|
|
|
|
2018-03-07 13:47:13 -08:00
|
|
|
#[test]
|
2019-03-10 17:33:01 -07:00
|
|
|
fn streamer_debug() {
|
2018-04-02 19:32:58 -07:00
|
|
|
write!(io::sink(), "{:?}", Packet::default()).unwrap();
|
2021-12-11 06:44:15 -08:00
|
|
|
write!(io::sink(), "{:?}", PacketBatch::default()).unwrap();
|
2018-04-02 19:32:58 -07:00
|
|
|
}
|
|
|
|
#[test]
|
2019-03-10 17:33:01 -07:00
|
|
|
fn streamer_send_test() {
|
2018-04-02 19:32:58 -07:00
|
|
|
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
2018-05-15 08:53:51 -07:00
|
|
|
read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
|
|
|
|
|
2018-03-07 13:47:13 -08:00
|
|
|
let addr = read.local_addr().unwrap();
|
2018-04-02 19:32:58 -07:00
|
|
|
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
2022-01-11 02:44:46 -08:00
|
|
|
let (s_reader, r_reader) = unbounded();
|
2022-05-05 11:56:18 -07:00
|
|
|
let stats = Arc::new(StreamerReceiveStats::new("test"));
|
2021-02-24 00:15:58 -08:00
|
|
|
let t_receiver = receiver(
|
|
|
|
Arc::new(read),
|
2022-05-05 11:56:18 -07:00
|
|
|
exit.clone(),
|
2021-02-24 00:15:58 -08:00
|
|
|
s_reader,
|
2021-04-07 08:15:38 -07:00
|
|
|
Recycler::default(),
|
2022-05-05 11:56:18 -07:00
|
|
|
stats.clone(),
|
2021-02-26 09:15:45 -08:00
|
|
|
1,
|
2021-06-14 07:10:04 -07:00
|
|
|
true,
|
2022-05-14 08:53:37 -07:00
|
|
|
None,
|
2021-02-24 00:15:58 -08:00
|
|
|
);
|
2022-05-05 11:56:18 -07:00
|
|
|
const NUM_PACKETS: usize = 5;
|
2018-07-05 15:41:03 -07:00
|
|
|
let t_responder = {
|
2022-01-11 02:44:46 -08:00
|
|
|
let (s_responder, r_responder) = unbounded();
|
2021-07-23 08:25:03 -07:00
|
|
|
let t_responder = responder(
|
2022-08-17 08:40:23 -07:00
|
|
|
"SendTest",
|
2021-07-23 08:25:03 -07:00
|
|
|
Arc::new(send),
|
|
|
|
r_responder,
|
|
|
|
SocketAddrSpace::Unspecified,
|
2021-12-17 15:21:05 -08:00
|
|
|
None,
|
2021-07-23 08:25:03 -07:00
|
|
|
);
|
2021-12-11 06:44:15 -08:00
|
|
|
let mut packet_batch = PacketBatch::default();
|
2022-05-05 11:56:18 -07:00
|
|
|
for i in 0..NUM_PACKETS {
|
2021-12-11 06:44:15 -08:00
|
|
|
let mut p = Packet::default();
|
2018-07-05 15:41:03 -07:00
|
|
|
{
|
2022-05-25 09:52:54 -07:00
|
|
|
p.buffer_mut()[0] = i as u8;
|
2022-12-06 03:54:49 -08:00
|
|
|
p.meta_mut().size = PACKET_DATA_SIZE;
|
|
|
|
p.meta_mut().set_socket_addr(&addr);
|
2018-07-05 15:41:03 -07:00
|
|
|
}
|
2022-05-23 13:30:15 -07:00
|
|
|
packet_batch.push(p);
|
2018-06-25 16:13:26 -07:00
|
|
|
}
|
2021-12-11 06:44:15 -08:00
|
|
|
s_responder.send(packet_batch).expect("send");
|
2018-07-05 15:41:03 -07:00
|
|
|
t_responder
|
|
|
|
};
|
|
|
|
|
2022-05-05 11:56:18 -07:00
|
|
|
let mut packets_remaining = NUM_PACKETS;
|
2021-12-11 06:44:15 -08:00
|
|
|
get_packet_batches(r_reader, &mut packets_remaining);
|
|
|
|
assert_eq!(packets_remaining, 0);
|
2018-03-22 13:05:23 -07:00
|
|
|
exit.store(true, Ordering::Relaxed);
|
2022-05-21 10:09:26 -07:00
|
|
|
assert!(stats.packet_batches_count.load(Ordering::Relaxed) >= 1);
|
2022-05-05 11:56:18 -07:00
|
|
|
assert_eq!(stats.packets_count.load(Ordering::Relaxed), NUM_PACKETS);
|
|
|
|
assert_eq!(stats.full_packet_batches_count.load(Ordering::Relaxed), 0);
|
2018-03-07 13:47:13 -08:00
|
|
|
t_receiver.join().expect("join");
|
2018-03-24 23:31:54 -07:00
|
|
|
t_responder.join().expect("join");
|
2018-03-07 13:47:13 -08:00
|
|
|
}
|
|
|
|
}
|