solana/src/fullnode.rs

302 lines
10 KiB
Rust
Raw Normal View History

2018-07-02 15:24:40 -07:00
//! The `fullnode` module hosts all the fullnode microservices.
2018-07-02 11:20:35 -07:00
use bank::Bank;
2018-07-02 15:24:40 -07:00
use crdt::{Crdt, ReplicatedData, TestNode};
2018-07-02 11:20:35 -07:00
use entry_writer;
2018-07-02 15:24:40 -07:00
use ncp::Ncp;
use packet::BlobRecycler;
use rpu::Rpu;
use service::Service;
use std::fs::{File, OpenOptions};
use std::io::{sink, stdin, stdout, BufReader};
2018-07-03 15:19:57 -07:00
use std::io::{Read, Write};
2018-07-02 11:20:35 -07:00
use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
2018-07-02 15:24:40 -07:00
use std::sync::{Arc, RwLock};
use std::thread::{JoinHandle, Result};
2018-07-02 15:24:40 -07:00
use std::time::Duration;
use streamer;
use tpu::Tpu;
use tvu::Tvu;
2018-07-02 11:20:35 -07:00
2018-07-02 15:24:40 -07:00
//use std::time::Duration;
pub struct FullNode {
thread_hdls: Vec<JoinHandle<()>>,
2018-07-02 15:24:40 -07:00
}
2018-07-02 11:20:35 -07:00
2018-07-03 10:20:02 -07:00
pub enum InFile {
StdIn,
2018-07-03 10:20:02 -07:00
Path(String),
}
pub enum OutFile {
StdOut,
Path(String),
}
2018-07-02 15:24:40 -07:00
impl FullNode {
pub fn new(
mut node: TestNode,
leader: bool,
2018-07-03 10:20:02 -07:00
infile: InFile,
2018-07-02 15:24:40 -07:00
network_entry_for_validator: Option<SocketAddr>,
2018-07-03 10:20:02 -07:00
outfile_for_leader: Option<OutFile>,
2018-07-02 15:24:40 -07:00
exit: Arc<AtomicBool>,
) -> FullNode {
info!("creating bank...");
let bank = Bank::default();
2018-07-03 15:19:57 -07:00
let infile: Box<Read> = match infile {
InFile::Path(path) => Box::new(File::open(path).unwrap()),
InFile::StdIn => Box::new(stdin()),
2018-07-02 15:24:40 -07:00
};
2018-07-03 15:19:57 -07:00
let reader = BufReader::new(infile);
let entries = entry_writer::read_entries(reader).map(|e| e.expect("failed to parse entry"));
info!("processing ledger...");
let entry_height = bank.process_ledger(entries).expect("process_ledger");
2018-07-02 11:20:35 -07:00
2018-07-02 15:24:40 -07:00
// entry_height is the network-wide agreed height of the ledger.
// initialize it from the input ledger
info!("processed {} ledger...", entry_height);
2018-07-02 11:20:35 -07:00
2018-07-02 15:24:40 -07:00
info!("creating networking stack...");
2018-07-02 11:20:35 -07:00
2018-07-02 15:24:40 -07:00
let local_gossip_addr = node.sockets.gossip.local_addr().unwrap();
let local_requests_addr = node.sockets.requests.local_addr().unwrap();
2018-07-02 11:20:35 -07:00
info!(
2018-07-02 15:24:40 -07:00
"starting... local gossip address: {} (advertising {})",
local_gossip_addr, node.data.gossip_addr
2018-07-02 11:20:35 -07:00
);
let requests_addr = node.data.requests_addr.clone();
2018-07-02 15:24:40 -07:00
if !leader {
let testnet_addr = network_entry_for_validator.expect("validator requires entry");
let network_entry_point = ReplicatedData::new_entry_point(testnet_addr);
let server = FullNode::new_validator(
2018-07-02 14:46:23 -07:00
bank,
entry_height,
node,
2018-07-02 15:24:40 -07:00
network_entry_point,
2018-07-02 14:46:23 -07:00
exit.clone(),
2018-07-02 15:24:40 -07:00
);
info!(
"validator ready... local request address: {} (advertising {}) connected to: {}",
local_requests_addr, requests_addr, testnet_addr
2018-07-02 15:24:40 -07:00
);
server
2018-07-02 14:46:23 -07:00
} else {
2018-07-02 15:24:40 -07:00
node.data.current_leader_id = node.data.id.clone();
2018-07-03 15:24:55 -07:00
let outfile_for_leader: Box<Write + Send> = match outfile_for_leader {
Some(OutFile::Path(file)) => Box::new(
OpenOptions::new()
.create(true)
.append(true)
.open(file)
.expect("opening ledger file"),
),
2018-07-03 15:24:55 -07:00
Some(OutFile::StdOut) => Box::new(stdout()),
None => Box::new(sink()),
2018-07-03 10:20:02 -07:00
};
2018-07-03 15:24:55 -07:00
let server = FullNode::new_leader(
bank,
entry_height,
//Some(Duration::from_millis(1000)),
None,
node,
exit.clone(),
outfile_for_leader,
);
2018-07-02 15:24:40 -07:00
info!(
"leader ready... local request address: {} (advertising {})",
local_requests_addr, requests_addr
2018-07-02 15:24:40 -07:00
);
server
}
}
/// Create a server instance acting as a leader.
///
/// ```text
/// .---------------------.
/// | Leader |
/// | |
/// .--------. | .-----. |
/// | |---->| | |
/// | Client | | | RPU | |
/// | |<----| | |
/// `----+---` | `-----` |
/// | | ^ |
/// | | | |
/// | | .--+---. |
/// | | | Bank | |
/// | | `------` |
/// | | ^ |
/// | | | | .------------.
/// | | .--+--. .-----. | | |
/// `-------->| TPU +-->| NCP +------>| Validators |
/// | `-----` `-----` | | |
/// | | `------------`
/// `---------------------`
/// ```
pub fn new_leader<W: Write + Send + 'static>(
bank: Bank,
entry_height: u64,
tick_duration: Option<Duration>,
node: TestNode,
2018-07-02 15:24:40 -07:00
exit: Arc<AtomicBool>,
writer: W,
) -> Self {
let bank = Arc::new(bank);
let mut thread_hdls = vec![];
let rpu = Rpu::new(
bank.clone(),
node.sockets.requests,
node.sockets.respond,
exit.clone(),
);
thread_hdls.extend(rpu.thread_hdls());
2018-07-02 15:24:40 -07:00
let blob_recycler = BlobRecycler::default();
2018-07-02 15:30:19 -07:00
let (tpu, blob_receiver) = Tpu::new(
2018-07-02 15:24:40 -07:00
bank.clone(),
tick_duration,
node.sockets.transaction,
2018-07-02 15:24:40 -07:00
blob_recycler.clone(),
exit.clone(),
writer,
);
thread_hdls.extend(tpu.thread_hdls());
let crdt = Arc::new(RwLock::new(Crdt::new(node.data)));
2018-07-02 15:24:40 -07:00
let window = streamer::default_window();
let ncp = Ncp::new(
crdt.clone(),
window.clone(),
node.sockets.gossip,
node.sockets.gossip_send,
2018-07-02 15:24:40 -07:00
exit.clone(),
).expect("Ncp::new");
thread_hdls.extend(ncp.thread_hdls());
2018-07-02 15:24:40 -07:00
let t_broadcast = streamer::broadcaster(
node.sockets.broadcast,
2018-07-02 15:24:40 -07:00
crdt,
window,
entry_height,
blob_recycler.clone(),
2018-07-02 15:30:19 -07:00
blob_receiver,
2018-07-02 15:24:40 -07:00
);
thread_hdls.extend(vec![t_broadcast]);
FullNode { thread_hdls }
}
/// Create a server instance acting as a validator.
///
/// ```text
/// .-------------------------------.
/// | Validator |
/// | |
/// .--------. | .-----. |
/// | |-------------->| | |
/// | Client | | | RPU | |
/// | |<--------------| | |
/// `--------` | `-----` |
/// | ^ |
/// | | |
/// | .--+---. |
/// | | Bank | |
/// | `------` |
/// | ^ |
/// .--------. | | | .------------.
/// | | | .--+--. | | |
/// | Leader |<------------->| TVU +<--------------->| |
/// | | | `-----` | | Validators |
/// | | | ^ | | |
/// | | | | | | |
/// | | | .--+--. | | |
/// | |<------------->| NCP +<--------------->| |
/// | | | `-----` | | |
/// `--------` | | `------------`
/// `-------------------------------`
/// ```
pub fn new_validator(
bank: Bank,
entry_height: u64,
node: TestNode,
2018-07-02 15:24:40 -07:00
entry_point: ReplicatedData,
exit: Arc<AtomicBool>,
) -> Self {
let bank = Arc::new(bank);
let mut thread_hdls = vec![];
let rpu = Rpu::new(
bank.clone(),
node.sockets.requests,
node.sockets.respond,
exit.clone(),
);
thread_hdls.extend(rpu.thread_hdls());
2018-07-02 15:24:40 -07:00
let crdt = Arc::new(RwLock::new(Crdt::new(node.data)));
2018-07-02 15:24:40 -07:00
crdt.write()
.expect("'crdt' write lock before insert() in pub fn replicate")
.insert(&entry_point);
let window = streamer::default_window();
let ncp = Ncp::new(
crdt.clone(),
window.clone(),
node.sockets.gossip,
node.sockets.gossip_send,
2018-07-02 15:24:40 -07:00
exit.clone(),
).expect("Ncp::new");
let tvu = Tvu::new(
bank.clone(),
entry_height,
crdt.clone(),
window.clone(),
node.sockets.replicate,
node.sockets.repair,
node.sockets.retransmit,
2018-07-02 15:24:40 -07:00
exit.clone(),
);
thread_hdls.extend(tvu.thread_hdls());
thread_hdls.extend(ncp.thread_hdls());
2018-07-02 15:24:40 -07:00
FullNode { thread_hdls }
}
}
impl Service for FullNode {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
self.thread_hdls
}
fn join(self) -> Result<()> {
for thread_hdl in self.thread_hdls() {
thread_hdl.join()?;
}
Ok(())
}
}
2018-07-02 15:24:40 -07:00
#[cfg(test)]
mod tests {
use bank::Bank;
use crdt::TestNode;
use fullnode::FullNode;
use mint::Mint;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
#[test]
fn validator_exit() {
let tn = TestNode::new();
let alice = Mint::new(10_000);
let bank = Bank::new(&alice);
let exit = Arc::new(AtomicBool::new(false));
let entry = tn.data.clone();
let v = FullNode::new_validator(bank, 0, tn, entry, exit.clone());
2018-07-02 15:24:40 -07:00
exit.store(true, Ordering::Relaxed);
for t in v.thread_hdls {
t.join().unwrap();
}
2018-07-02 11:20:35 -07:00
}
}