use a single BlobRecycler per fullnode
This commit is contained in:
parent
297f859631
commit
a8fdb8a5a7
|
@ -17,6 +17,7 @@ use solana::hash::Hash;
|
|||
use solana::logger;
|
||||
use solana::metrics;
|
||||
use solana::ncp::Ncp;
|
||||
use solana::packet::BlobRecycler;
|
||||
use solana::service::Service;
|
||||
use solana::signature::{read_keypair, GenKeys, Keypair, KeypairUtil};
|
||||
use solana::thin_client::{poll_gossip_for_leader, ThinClient};
|
||||
|
@ -611,12 +612,10 @@ fn main() {
|
|||
.collect();
|
||||
|
||||
// generate and send transactions for the specified duration
|
||||
let now = Instant::now();
|
||||
let mut last_stat = Instant::now();
|
||||
let stat_interval = Duration::new(90, 0);
|
||||
let start = Instant::now();
|
||||
let mut reclaim_tokens_back_to_source_account = false;
|
||||
let mut i = keypair0_balance;
|
||||
while now.elapsed() < duration {
|
||||
while start.elapsed() < duration {
|
||||
let balance = client.poll_get_balance(&id.pubkey()).unwrap_or(-1);
|
||||
metrics_submit_token_balance(balance);
|
||||
|
||||
|
@ -648,16 +647,6 @@ fn main() {
|
|||
if should_switch_directions(num_tokens_per_account, i) {
|
||||
reclaim_tokens_back_to_source_account = !reclaim_tokens_back_to_source_account;
|
||||
}
|
||||
|
||||
if last_stat.elapsed() >= stat_interval {
|
||||
last_stat = Instant::now();
|
||||
compute_and_report_stats(
|
||||
&maxes,
|
||||
sample_period,
|
||||
&now.elapsed(),
|
||||
total_tx_sent_count.load(Ordering::Relaxed),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Stop the sampling threads so it will collect the stats
|
||||
|
@ -684,7 +673,7 @@ fn main() {
|
|||
compute_and_report_stats(
|
||||
&maxes,
|
||||
sample_period,
|
||||
&now.elapsed(),
|
||||
&start.elapsed(),
|
||||
total_tx_sent_count.load(Ordering::Relaxed),
|
||||
);
|
||||
|
||||
|
@ -707,7 +696,14 @@ fn converge(
|
|||
spy_crdt.set_leader(leader.id);
|
||||
let spy_ref = Arc::new(RwLock::new(spy_crdt));
|
||||
let window = Arc::new(RwLock::new(default_window()));
|
||||
let ncp = Ncp::new(&spy_ref, window, None, gossip_socket, exit_signal.clone());
|
||||
let ncp = Ncp::new(
|
||||
&spy_ref,
|
||||
window,
|
||||
BlobRecycler::default(),
|
||||
None,
|
||||
gossip_socket,
|
||||
exit_signal.clone(),
|
||||
);
|
||||
let mut v: Vec<NodeInfo> = vec![];
|
||||
// wait for the network to converge, 30 seconds should be plenty
|
||||
for _ in 0..30 {
|
||||
|
|
|
@ -176,12 +176,14 @@ impl Fullnode {
|
|||
}
|
||||
|
||||
let bank = Arc::new(bank);
|
||||
let blob_recycler = BlobRecycler::default();
|
||||
let mut thread_hdls = vec![];
|
||||
|
||||
let rpu = Rpu::new(
|
||||
&bank,
|
||||
node.sockets.requests,
|
||||
node.sockets.respond,
|
||||
&blob_recycler,
|
||||
exit.clone(),
|
||||
);
|
||||
thread_hdls.extend(rpu.thread_hdls());
|
||||
|
@ -199,7 +201,6 @@ impl Fullnode {
|
|||
);
|
||||
thread_hdls.extend(rpc_service.thread_hdls());
|
||||
|
||||
let blob_recycler = BlobRecycler::default();
|
||||
let window =
|
||||
window::new_window_from_entries(ledger_tail, entry_height, &node.info, &blob_recycler);
|
||||
let shared_window = Arc::new(RwLock::new(window));
|
||||
|
@ -209,6 +210,7 @@ impl Fullnode {
|
|||
let ncp = Ncp::new(
|
||||
&crdt,
|
||||
shared_window.clone(),
|
||||
blob_recycler.clone(),
|
||||
ledger_path,
|
||||
node.sockets.gossip,
|
||||
exit.clone(),
|
||||
|
@ -226,6 +228,7 @@ impl Fullnode {
|
|||
entry_height,
|
||||
crdt,
|
||||
shared_window,
|
||||
blob_recycler.clone(),
|
||||
node.sockets.replicate,
|
||||
node.sockets.repair,
|
||||
node.sockets.retransmit,
|
||||
|
@ -247,7 +250,7 @@ impl Fullnode {
|
|||
&crdt,
|
||||
tick_duration,
|
||||
node.sockets.transaction,
|
||||
&blob_recycler,
|
||||
blob_recycler.clone(),
|
||||
exit.clone(),
|
||||
ledger_path,
|
||||
sigverify_disabled,
|
||||
|
|
12
src/ncp.rs
12
src/ncp.rs
|
@ -20,11 +20,11 @@ impl Ncp {
|
|||
pub fn new(
|
||||
crdt: &Arc<RwLock<Crdt>>,
|
||||
window: SharedWindow,
|
||||
blob_recycler: BlobRecycler,
|
||||
ledger_path: Option<&str>,
|
||||
gossip_socket: UdpSocket,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> Self {
|
||||
let blob_recycler = BlobRecycler::default();
|
||||
let (request_sender, request_receiver) = channel();
|
||||
let gossip_socket = Arc::new(gossip_socket);
|
||||
trace!(
|
||||
|
@ -82,6 +82,7 @@ impl Service for Ncp {
|
|||
mod tests {
|
||||
use crdt::{Crdt, Node};
|
||||
use ncp::Ncp;
|
||||
use packet::BlobRecycler;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
|
@ -94,7 +95,14 @@ mod tests {
|
|||
let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new");
|
||||
let c = Arc::new(RwLock::new(crdt));
|
||||
let w = Arc::new(RwLock::new(vec![]));
|
||||
let d = Ncp::new(&c, w, None, tn.sockets.gossip, exit.clone());
|
||||
let d = Ncp::new(
|
||||
&c,
|
||||
w,
|
||||
BlobRecycler::default(),
|
||||
None,
|
||||
tn.sockets.gossip,
|
||||
exit.clone(),
|
||||
);
|
||||
d.close().expect("thread join");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,6 +44,7 @@ impl Rpu {
|
|||
bank: &Arc<Bank>,
|
||||
requests_socket: UdpSocket,
|
||||
respond_socket: UdpSocket,
|
||||
blob_recycler: &BlobRecycler,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> Self {
|
||||
let packet_recycler = PacketRecycler::default();
|
||||
|
@ -55,7 +56,6 @@ impl Rpu {
|
|||
packet_sender,
|
||||
);
|
||||
|
||||
let blob_recycler = BlobRecycler::default();
|
||||
let request_processor = RequestProcessor::new(bank.clone());
|
||||
let (request_stage, blob_receiver) = RequestStage::new(
|
||||
request_processor,
|
||||
|
|
|
@ -9,6 +9,7 @@ use crdt::{Crdt, CrdtError, NodeInfo};
|
|||
use hash::Hash;
|
||||
use log::Level;
|
||||
use ncp::Ncp;
|
||||
use packet::BlobRecycler;
|
||||
use request::{Request, Response};
|
||||
use result::{Error, Result};
|
||||
use signature::{Keypair, Pubkey, Signature};
|
||||
|
@ -377,7 +378,14 @@ pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option<u64>) -> R
|
|||
let my_addr = gossip_socket.local_addr().unwrap();
|
||||
let crdt = Arc::new(RwLock::new(Crdt::new(node).expect("Crdt::new")));
|
||||
let window = Arc::new(RwLock::new(vec![]));
|
||||
let ncp = Ncp::new(&crdt.clone(), window, None, gossip_socket, exit.clone());
|
||||
let ncp = Ncp::new(
|
||||
&crdt.clone(),
|
||||
window,
|
||||
BlobRecycler::default(),
|
||||
None,
|
||||
gossip_socket,
|
||||
exit.clone(),
|
||||
);
|
||||
|
||||
let leader_entry_point = NodeInfo::new_entry_point(&leader_ncp);
|
||||
crdt.write().unwrap().insert(&leader_entry_point);
|
||||
|
|
|
@ -57,7 +57,7 @@ impl Tpu {
|
|||
crdt: &Arc<RwLock<Crdt>>,
|
||||
tick_duration: Option<Duration>,
|
||||
transactions_sockets: Vec<UdpSocket>,
|
||||
blob_recycler: &BlobRecycler,
|
||||
blob_recycler: BlobRecycler,
|
||||
exit: Arc<AtomicBool>,
|
||||
ledger_path: &str,
|
||||
sigverify_disabled: bool,
|
||||
|
|
19
src/tvu.rs
19
src/tvu.rs
|
@ -75,6 +75,7 @@ impl Tvu {
|
|||
entry_height: u64,
|
||||
crdt: Arc<RwLock<Crdt>>,
|
||||
window: SharedWindow,
|
||||
blob_recycler: BlobRecycler,
|
||||
replicate_socket: UdpSocket,
|
||||
repair_socket: UdpSocket,
|
||||
retransmit_socket: UdpSocket,
|
||||
|
@ -82,7 +83,6 @@ impl Tvu {
|
|||
exit: Arc<AtomicBool>,
|
||||
) -> Self {
|
||||
let repair_socket = Arc::new(repair_socket);
|
||||
let blob_recycler = BlobRecycler::default();
|
||||
let (fetch_stage, blob_fetch_receiver) = BlobFetchStage::new_multi_socket(
|
||||
vec![Arc::new(replicate_socket), repair_socket.clone()],
|
||||
exit.clone(),
|
||||
|
@ -168,10 +168,11 @@ pub mod tests {
|
|||
crdt: Arc<RwLock<Crdt>>,
|
||||
gossip: UdpSocket,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> (Ncp, SharedWindow) {
|
||||
) -> (Ncp, SharedWindow, BlobRecycler) {
|
||||
let window = Arc::new(RwLock::new(window::default_window()));
|
||||
let ncp = Ncp::new(&crdt, window.clone(), None, gossip, exit);
|
||||
(ncp, window)
|
||||
let recycler = BlobRecycler::default();
|
||||
let ncp = Ncp::new(&crdt, window.clone(), recycler.clone(), None, gossip, exit);
|
||||
(ncp, window, recycler)
|
||||
}
|
||||
|
||||
/// Test that message sent from leader to target1 and replicated to target2
|
||||
|
@ -202,13 +203,12 @@ pub mod tests {
|
|||
// setup some blob services to send blobs into the socket
|
||||
// to simulate the source peer and get blobs out of the socket to
|
||||
// simulate target peer
|
||||
let recv_recycler = BlobRecycler::default();
|
||||
let resp_recycler = BlobRecycler::default();
|
||||
let recycler = BlobRecycler::default();
|
||||
let (s_reader, r_reader) = channel();
|
||||
let t_receiver = streamer::blob_receiver(
|
||||
Arc::new(target2.sockets.replicate),
|
||||
exit.clone(),
|
||||
recv_recycler.clone(),
|
||||
recycler.clone(),
|
||||
s_reader,
|
||||
);
|
||||
|
||||
|
@ -217,7 +217,7 @@ pub mod tests {
|
|||
let t_responder = streamer::responder(
|
||||
"test_replicate",
|
||||
Arc::new(leader.sockets.requests),
|
||||
resp_recycler.clone(),
|
||||
recycler.clone(),
|
||||
r_responder,
|
||||
);
|
||||
|
||||
|
@ -239,6 +239,7 @@ pub mod tests {
|
|||
0,
|
||||
cref1,
|
||||
dr_1.1,
|
||||
dr_1.2,
|
||||
target1.sockets.replicate,
|
||||
target1.sockets.repair,
|
||||
target1.sockets.retransmit,
|
||||
|
@ -273,7 +274,7 @@ pub mod tests {
|
|||
alice_ref_balance -= transfer_amount;
|
||||
|
||||
for entry in vec![entry0, entry1] {
|
||||
let b = resp_recycler.allocate();
|
||||
let b = recycler.allocate();
|
||||
{
|
||||
let mut w = b.write().unwrap();
|
||||
w.set_index(blob_id).unwrap();
|
||||
|
|
|
@ -7,7 +7,7 @@ use rayon::iter::*;
|
|||
use solana::crdt::{Crdt, Node};
|
||||
use solana::logger;
|
||||
use solana::ncp::Ncp;
|
||||
use solana::packet::Blob;
|
||||
use solana::packet::{Blob, BlobRecycler};
|
||||
use solana::result;
|
||||
use solana::service::Service;
|
||||
use std::net::UdpSocket;
|
||||
|
@ -21,7 +21,14 @@ fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<Crdt>>, Ncp, UdpSocket) {
|
|||
let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new");
|
||||
let c = Arc::new(RwLock::new(crdt));
|
||||
let w = Arc::new(RwLock::new(vec![]));
|
||||
let d = Ncp::new(&c.clone(), w, None, tn.sockets.gossip, exit);
|
||||
let d = Ncp::new(
|
||||
&c.clone(),
|
||||
w,
|
||||
BlobRecycler::default(),
|
||||
None,
|
||||
tn.sockets.gossip,
|
||||
exit,
|
||||
);
|
||||
(c, d, tn.sockets.replicate)
|
||||
}
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@ use solana::ledger::LedgerWriter;
|
|||
use solana::logger;
|
||||
use solana::mint::Mint;
|
||||
use solana::ncp::Ncp;
|
||||
use solana::packet::BlobRecycler;
|
||||
use solana::result;
|
||||
use solana::service::Service;
|
||||
use solana::signature::{Keypair, KeypairUtil, Pubkey};
|
||||
|
@ -42,7 +43,15 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
|
|||
spy_crdt.set_leader(leader.id);
|
||||
let spy_ref = Arc::new(RwLock::new(spy_crdt));
|
||||
let spy_window = Arc::new(RwLock::new(default_window()));
|
||||
let ncp = Ncp::new(&spy_ref, spy_window, None, spy.sockets.gossip, exit.clone());
|
||||
let recycler = BlobRecycler::default();
|
||||
let ncp = Ncp::new(
|
||||
&spy_ref,
|
||||
spy_window,
|
||||
recycler,
|
||||
None,
|
||||
spy.sockets.gossip,
|
||||
exit.clone(),
|
||||
);
|
||||
//wait for the network to converge
|
||||
let mut converged = false;
|
||||
let mut rv = vec![];
|
||||
|
|
Loading…
Reference in New Issue