Inline create_leader_threads and create_validator_threads

This commit is contained in:
Greg Fitzgerald 2018-08-22 18:50:19 -06:00 committed by Grimes
parent 731f8512c6
commit d8820053af
1 changed files with 128 additions and 182 deletions

View File

@ -110,54 +110,7 @@ impl Fullnode {
server
}
pub fn new_with_bank(
keypair: Keypair,
bank: Bank,
entry_height: u64,
ledger_tail: &[Entry],
mut node: TestNode,
leader_info: Option<NodeInfo>,
exit: Arc<AtomicBool>,
ledger_path: Option<&str>,
sigverify_disabled: bool,
) -> Self {
let bank = Arc::new(bank);
let thread_hdls = match leader_info {
Some(leader_info) => {
// Start in validator mode.
Self::create_validator_threads(
keypair,
&bank,
entry_height,
&ledger_tail,
node,
&leader_info,
exit.clone(),
ledger_path,
sigverify_disabled,
)
}
None => {
// Start in leader mode.
node.data.leader_id = node.data.id;
Self::create_leader_threads(
keypair,
&bank,
entry_height,
&ledger_tail,
node,
exit.clone(),
ledger_path.expect("ledger path"),
sigverify_disabled,
)
}
};
Fullnode { exit, thread_hdls }
}
/// Create a server instance acting as a leader.
/// Create a fullnode instance acting as a leader or validator.
///
/// ```text
/// .---------------------.
@ -180,86 +133,7 @@ impl Fullnode {
/// | `-----` `-----` | | |
/// | | `------------`
/// `---------------------`
/// ```
fn create_leader_threads(
keypair: Keypair,
bank: &Arc<Bank>,
entry_height: u64,
ledger_tail: &[Entry],
node: TestNode,
exit: Arc<AtomicBool>,
ledger_path: &str,
sigverify_disabled: bool,
) -> Vec<JoinHandle<()>> {
let tick_duration = None;
// TODO: To light up PoH, uncomment the following line:
//let tick_duration = Some(Duration::from_millis(1000));
let mut thread_hdls = vec![];
let rpu = Rpu::new(
&bank,
node.sockets.requests,
node.sockets.respond,
exit.clone(),
);
thread_hdls.extend(rpu.thread_hdls());
let mut drone_addr = node.data.contact_info.tpu;
drone_addr.set_port(DRONE_PORT);
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), RPC_PORT);
let rpc_service = JsonRpcService::new(
&bank,
node.data.contact_info.tpu,
drone_addr,
rpc_addr,
exit.clone(),
);
thread_hdls.extend(rpc_service.thread_hdls());
let blob_recycler = BlobRecycler::default();
let window =
window::new_window_from_entries(ledger_tail, entry_height, &node.data, &blob_recycler);
let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new")));
let (tpu, blob_receiver) = Tpu::new(
keypair,
&bank,
&crdt,
tick_duration,
node.sockets.transaction,
&blob_recycler,
exit.clone(),
ledger_path,
sigverify_disabled,
);
thread_hdls.extend(tpu.thread_hdls());
let ncp = Ncp::new(
&crdt,
window.clone(),
Some(ledger_path),
node.sockets.gossip,
node.sockets.gossip_send,
exit.clone(),
).expect("Ncp::new");
thread_hdls.extend(ncp.thread_hdls());
let broadcast_stage = BroadcastStage::new(
node.sockets.broadcast,
crdt,
window,
entry_height,
blob_recycler.clone(),
blob_receiver,
);
thread_hdls.extend(broadcast_stage.thread_hdls());
thread_hdls
}
/// Create a server instance acting as a validator.
///
/// ```text
/// .-------------------------------.
/// | Validator |
/// | |
@ -286,71 +160,143 @@ impl Fullnode {
/// `--------` | | `------------`
/// `-------------------------------`
/// ```
fn create_validator_threads(
pub fn new_with_bank(
keypair: Keypair,
bank: &Arc<Bank>,
bank: Bank,
entry_height: u64,
ledger_tail: &[Entry],
node: TestNode,
entry_point: &NodeInfo,
mut node: TestNode,
leader_info: Option<NodeInfo>,
exit: Arc<AtomicBool>,
ledger_path: Option<&str>,
_sigverify_disabled: bool,
) -> Vec<JoinHandle<()>> {
let mut thread_hdls = vec![];
let rpu = Rpu::new(
&bank,
node.sockets.requests,
node.sockets.respond,
exit.clone(),
);
thread_hdls.extend(rpu.thread_hdls());
sigverify_disabled: bool,
) -> Self {
if leader_info.is_none() {
node.data.leader_id = node.data.id;
}
let mut drone_addr = entry_point.contact_info.ncp;
drone_addr.set_port(DRONE_PORT);
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), RPC_PORT);
let rpc_service = JsonRpcService::new(
&bank,
node.data.contact_info.tpu,
drone_addr,
rpc_addr,
exit.clone(),
);
thread_hdls.extend(rpc_service.thread_hdls());
let bank = Arc::new(bank);
let thread_hdls = match leader_info {
Some(leader_info) => {
// Start in validator mode.
let mut thread_hdls = vec![];
let rpu = Rpu::new(
&bank,
node.sockets.requests,
node.sockets.respond,
exit.clone(),
);
thread_hdls.extend(rpu.thread_hdls());
let blob_recycler = BlobRecycler::default();
let window =
window::new_window_from_entries(ledger_tail, entry_height, &node.data, &blob_recycler);
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), RPC_PORT);
let rpc_service = JsonRpcService::new(bank.clone(), rpc_addr, exit.clone());
thread_hdls.extend(rpc_service.thread_hdls());
let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new")));
crdt.write()
.expect("'crdt' write lock before insert() in pub fn replicate")
.insert(&entry_point);
let blob_recycler = BlobRecycler::default();
let window = window::new_window_from_entries(
ledger_tail,
entry_height,
&node.data,
&blob_recycler,
);
let ncp = Ncp::new(
&crdt,
window.clone(),
ledger_path,
node.sockets.gossip,
node.sockets.gossip_send,
exit.clone(),
).expect("Ncp::new");
let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new")));
crdt.write()
.expect("'crdt' write lock before insert() in pub fn replicate")
.insert(&leader_info);
let tvu = Tvu::new(
keypair,
&bank,
entry_height,
crdt.clone(),
window.clone(),
node.sockets.replicate,
node.sockets.repair,
node.sockets.retransmit,
ledger_path,
exit.clone(),
);
thread_hdls.extend(tvu.thread_hdls());
thread_hdls.extend(ncp.thread_hdls());
thread_hdls
let ncp = Ncp::new(
&crdt,
window.clone(),
ledger_path,
node.sockets.gossip,
node.sockets.gossip_send,
exit.clone(),
).expect("Ncp::new");
let tvu = Tvu::new(
keypair,
&bank,
entry_height,
crdt.clone(),
window.clone(),
node.sockets.replicate,
node.sockets.repair,
node.sockets.retransmit,
ledger_path,
exit.clone(),
);
thread_hdls.extend(tvu.thread_hdls());
thread_hdls.extend(ncp.thread_hdls());
thread_hdls
}
None => {
// Start in leader mode.
let ledger_path = ledger_path.expect("ledger path");
let tick_duration = None;
// TODO: To light up PoH, uncomment the following line:
//let tick_duration = Some(Duration::from_millis(1000));
let mut thread_hdls = vec![];
let rpu = Rpu::new(
&bank,
node.sockets.requests,
node.sockets.respond,
exit.clone(),
);
thread_hdls.extend(rpu.thread_hdls());
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), RPC_PORT);
let rpc_service = JsonRpcService::new(bank.clone(), rpc_addr, exit.clone());
thread_hdls.extend(rpc_service.thread_hdls());
let blob_recycler = BlobRecycler::default();
let window = window::new_window_from_entries(
ledger_tail,
entry_height,
&node.data,
&blob_recycler,
);
let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new")));
let (tpu, blob_receiver) = Tpu::new(
keypair,
&bank,
&crdt,
tick_duration,
node.sockets.transaction,
&blob_recycler,
exit.clone(),
ledger_path,
sigverify_disabled,
);
thread_hdls.extend(tpu.thread_hdls());
let ncp = Ncp::new(
&crdt,
window.clone(),
Some(ledger_path),
node.sockets.gossip,
node.sockets.gossip_send,
exit.clone(),
).expect("Ncp::new");
thread_hdls.extend(ncp.thread_hdls());
let broadcast_stage = BroadcastStage::new(
node.sockets.broadcast,
crdt,
window,
entry_height,
blob_recycler.clone(),
blob_receiver,
);
thread_hdls.extend(broadcast_stage.thread_hdls());
thread_hdls
}
};
Fullnode { exit, thread_hdls }
}
//used for notifying many nodes in parallel to exit