Hoist shared code between leaders and validators

This commit is contained in:
Greg Fitzgerald 2018-08-22 19:00:56 -06:00 committed by Grimes
parent 42229a1105
commit 816de4f8ec
1 changed files with 33 additions and 69 deletions

View File

@ -177,49 +177,45 @@ impl Fullnode {
let bank = Arc::new(bank); let bank = Arc::new(bank);
let mut thread_hdls = vec![]; let mut thread_hdls = vec![];
let rpu = Rpu::new(
&bank,
node.sockets.requests,
node.sockets.respond,
exit.clone(),
);
thread_hdls.extend(rpu.thread_hdls());
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), RPC_PORT);
let rpc_service = JsonRpcService::new(bank.clone(), rpc_addr, exit.clone());
thread_hdls.extend(rpc_service.thread_hdls());
let blob_recycler = BlobRecycler::default();
let window =
window::new_window_from_entries(ledger_tail, entry_height, &node.data, &blob_recycler);
let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new")));
let ncp = Ncp::new(
&crdt,
window.clone(),
ledger_path,
node.sockets.gossip,
node.sockets.gossip_send,
exit.clone(),
).expect("Ncp::new");
thread_hdls.extend(ncp.thread_hdls());
match leader_info { match leader_info {
Some(leader_info) => { Some(ref leader_info) => {
// Start in validator mode. // Start in validator mode.
let rpu = Rpu::new( crdt.write().unwrap().insert(leader_info);
&bank,
node.sockets.requests,
node.sockets.respond,
exit.clone(),
);
thread_hdls.extend(rpu.thread_hdls());
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), RPC_PORT);
let rpc_service = JsonRpcService::new(bank.clone(), rpc_addr, exit.clone());
thread_hdls.extend(rpc_service.thread_hdls());
let blob_recycler = BlobRecycler::default();
let window = window::new_window_from_entries(
ledger_tail,
entry_height,
&node.data,
&blob_recycler,
);
let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new")));
crdt.write()
.expect("'crdt' write lock before insert() in pub fn replicate")
.insert(&leader_info);
let ncp = Ncp::new(
&crdt,
window.clone(),
ledger_path,
node.sockets.gossip,
node.sockets.gossip_send,
exit.clone(),
).expect("Ncp::new");
let tvu = Tvu::new( let tvu = Tvu::new(
keypair, keypair,
&bank, &bank,
entry_height, entry_height,
crdt.clone(), crdt,
window.clone(), window,
node.sockets.replicate, node.sockets.replicate,
node.sockets.repair, node.sockets.repair,
node.sockets.retransmit, node.sockets.retransmit,
@ -227,7 +223,6 @@ impl Fullnode {
exit.clone(), exit.clone(),
); );
thread_hdls.extend(tvu.thread_hdls()); thread_hdls.extend(tvu.thread_hdls());
thread_hdls.extend(ncp.thread_hdls());
} }
None => { None => {
// Start in leader mode. // Start in leader mode.
@ -236,28 +231,6 @@ impl Fullnode {
// TODO: To light up PoH, uncomment the following line: // TODO: To light up PoH, uncomment the following line:
//let tick_duration = Some(Duration::from_millis(1000)); //let tick_duration = Some(Duration::from_millis(1000));
let rpu = Rpu::new(
&bank,
node.sockets.requests,
node.sockets.respond,
exit.clone(),
);
thread_hdls.extend(rpu.thread_hdls());
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), RPC_PORT);
let rpc_service = JsonRpcService::new(bank.clone(), rpc_addr, exit.clone());
thread_hdls.extend(rpc_service.thread_hdls());
let blob_recycler = BlobRecycler::default();
let window = window::new_window_from_entries(
ledger_tail,
entry_height,
&node.data,
&blob_recycler,
);
let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new")));
let (tpu, blob_receiver) = Tpu::new( let (tpu, blob_receiver) = Tpu::new(
keypair, keypair,
&bank, &bank,
@ -270,15 +243,6 @@ impl Fullnode {
sigverify_disabled, sigverify_disabled,
); );
thread_hdls.extend(tpu.thread_hdls()); thread_hdls.extend(tpu.thread_hdls());
let ncp = Ncp::new(
&crdt,
window.clone(),
Some(ledger_path),
node.sockets.gossip,
node.sockets.gossip_send,
exit.clone(),
).expect("Ncp::new");
thread_hdls.extend(ncp.thread_hdls());
let broadcast_stage = BroadcastStage::new( let broadcast_stage = BroadcastStage::new(
node.sockets.broadcast, node.sockets.broadcast,