Stream entries (#2582)

* Add entry streaming option

* Fix tests

* Remove obsolete comment

* Move entry stream functionality to struct w/ trait in order to test without i/o
This commit is contained in:
Tyera Eulberg 2019-01-29 00:21:27 -08:00 committed by GitHub
parent 12cddf725e
commit 6da7a784f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 192 additions and 6 deletions

View File

@ -130,6 +130,13 @@ fn main() {
let matches = App::new("fullnode")
.version(crate_version!())
.arg(
Arg::with_name("entry_stream")
.long("entry-stream")
.takes_value(true)
.value_name("UNIX DOMAIN SOCKET")
.help("Open entry stream at this unix domain socket location")
)
.arg(
Arg::with_name("identity")
.short("i")
@ -227,6 +234,7 @@ fn main() {
.expect("unable to allocate rpc port")
};
let init_complete_file = matches.value_of("init_complete_file");
let entry_stream = matches.value_of("entry_stream").map(|s| s.to_string());
let keypair = Arc::new(keypair);
let node = Node::new_with_external_ip(keypair.pubkey(), &gossip);
@ -263,6 +271,7 @@ fn main() {
.as_ref(),
no_sigverify,
Some(rpc_port),
entry_stream,
);
if !no_signer {

View File

@ -21,13 +21,14 @@ usage() {
echo
fi
cat <<EOF
usage: $0 [-x] [--init-complete-file FILE] [--no-leader-rotation] [--no-signer] [--rpc-port port] [rsync network path to bootstrap leader configuration] [network entry point]
usage: $0 [-x] [--entry-stream PATH] [--init-complete-file FILE] [--no-leader-rotation] [--no-signer] [--rpc-port port] [rsync network path to bootstrap leader configuration] [network entry point]
Start a full node on the specified network
-x - start a new, dynamically-configured full node
-X [label] - start or restart a dynamically-configured full node with
the specified label
--entry-stream PATH - open entry stream at this unix domain socket location
--init-complete-file FILE - create this file, if it doesn't already exist, once node initialization is complete
--no-leader-rotation - disable leader rotation
--no-signer - start node without vote signer
@ -41,6 +42,7 @@ if [[ $1 = -h ]]; then
usage
fi
maybe_entry_stream=
maybe_init_complete_file=
maybe_no_leader_rotation=
maybe_no_signer=
@ -56,6 +58,9 @@ while [[ ${1:0:1} = - ]]; do
self_setup=1
self_setup_label=$$
shift
elif [[ $1 = --entry-stream ]]; then
maybe_entry_stream="$1 $2"
shift 2
elif [[ $1 = --init-complete-file ]]; then
maybe_init_complete_file="--init-complete-file $2"
shift 2
@ -241,8 +246,9 @@ if [[ ! -d "$ledger_config_dir" ]]; then
fi
trap 'kill "$pid" && wait "$pid"' INT TERM
# shellcheck disable=SC2086 # Don't want to double quote maybe_init_complete_file or maybe_no_signer or maybe_rpc_port
# shellcheck disable=SC2086 # Don't want to double quote maybe_entry_stream or maybe_init_complete_file or maybe_no_signer or maybe_rpc_port
$program \
$maybe_entry_stream \
$maybe_init_complete_file \
$maybe_no_leader_rotation \
$maybe_no_signer \

57
src/entry_stream.rs Normal file
View File

@ -0,0 +1,57 @@
//! The `entry_stream` module provides a method for streaming entries out via a
//! local unix socket, to provide client services such as a block explorer with
//! real-time access to entries.
use crate::entry::Entry;
use crate::result::Result;
use std::io::prelude::*;
use std::net::Shutdown;
use std::os::unix::net::UnixStream;
use std::path::Path;
pub trait EntryStreamHandler {
fn stream_entries(&mut self, entries: &[Entry]) -> Result<()>;
}
pub struct EntryStream {
pub socket: String,
}
impl EntryStream {
pub fn new(socket: String) -> Self {
EntryStream { socket }
}
}
impl EntryStreamHandler for EntryStream {
fn stream_entries(&mut self, entries: &[Entry]) -> Result<()> {
let mut socket = UnixStream::connect(Path::new(&self.socket))?;
for entry in entries {
let result = serde_json::to_string(&entry)?;
socket.write_all(result.as_bytes())?;
}
socket.shutdown(Shutdown::Write)?;
Ok(())
}
}
pub struct MockEntryStream {
pub socket: Vec<String>,
}
impl MockEntryStream {
#[allow(clippy::needless_pass_by_value)]
pub fn new(_socket: String) -> Self {
MockEntryStream { socket: Vec::new() }
}
}
impl EntryStreamHandler for MockEntryStream {
fn stream_entries(&mut self, entries: &[Entry]) -> Result<()> {
for entry in entries {
let result = serde_json::to_string(&entry)?;
self.socket.push(result);
}
Ok(())
}
}

View File

@ -95,6 +95,7 @@ impl Fullnode {
entrypoint_info_option: Option<&NodeInfo>,
sigverify_disabled: bool,
rpc_port: Option<u16>,
entry_stream: Option<String>,
) -> Self {
Self::new_with_storage_rotate(
node,
@ -106,9 +107,11 @@ impl Fullnode {
leader_scheduler,
rpc_port,
NUM_HASHES_FOR_STORAGE_ROTATE,
entry_stream,
)
}
#[allow(clippy::too_many_arguments)]
pub fn new_with_storage_rotate(
node: Node,
keypair: Arc<Keypair>,
@ -119,6 +122,7 @@ impl Fullnode {
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
rpc_port: Option<u16>,
storage_rotate_count: u64,
entry_stream: Option<String>,
) -> Self {
let (genesis_block, db_ledger) = Self::make_db_ledger(ledger_path);
let (bank, entry_height, last_entry_id) =
@ -135,6 +139,7 @@ impl Fullnode {
sigverify_disabled,
rpc_port,
storage_rotate_count,
entry_stream,
)
}
@ -150,6 +155,7 @@ impl Fullnode {
entrypoint_info_option: Option<&NodeInfo>,
sigverify_disabled: bool,
rpc_port: Option<u16>,
entry_stream: Option<String>,
) -> Self {
let (_genesis_block, db_ledger) = Self::make_db_ledger(ledger_path);
Self::new_with_bank_and_db_ledger(
@ -164,6 +170,7 @@ impl Fullnode {
sigverify_disabled,
rpc_port,
NUM_HASHES_FOR_STORAGE_ROTATE,
entry_stream,
)
}
@ -180,6 +187,7 @@ impl Fullnode {
sigverify_disabled: bool,
rpc_port: Option<u16>,
storage_rotate_count: u64,
entry_stream: Option<String>,
) -> Self {
let mut rpc_addr = node.info.rpc;
let mut rpc_pubsub_addr = node.info.rpc_pubsub;
@ -299,6 +307,7 @@ impl Fullnode {
storage_rotate_count,
to_leader_sender,
&storage_state,
entry_stream,
);
let max_tick_height = {
let ls_lock = bank.leader_scheduler.read().unwrap();
@ -564,6 +573,7 @@ mod tests {
Some(&entry),
false,
None,
None,
);
v.close().unwrap();
remove_dir_all(validator_ledger_path).unwrap();
@ -606,6 +616,7 @@ mod tests {
Some(&entry),
false,
None,
None,
)
})
.collect();
@ -677,6 +688,7 @@ mod tests {
Some(&bootstrap_leader_info),
false,
None,
None,
);
// Wait for the leader to transition, ticks should cause the leader to
@ -781,6 +793,7 @@ mod tests {
Some(&bootstrap_leader_info),
false,
None,
None,
);
assert!(!bootstrap_leader.node_services.tpu.is_leader());
@ -795,6 +808,7 @@ mod tests {
Some(&bootstrap_leader_info),
false,
None,
None,
);
assert!(validator.node_services.tpu.is_leader());
@ -890,6 +904,7 @@ mod tests {
Some(&leader_node.info),
false,
None,
None,
);
// Send blobs to the validator from our mock leader

View File

@ -34,6 +34,7 @@ pub mod compute_leader_confirmation_service;
pub mod db_ledger;
pub mod db_window;
pub mod entry;
pub mod entry_stream;
#[cfg(feature = "erasure")]
pub mod erasure;
pub mod fetch_stage;

View File

@ -3,8 +3,12 @@
use crate::bank::Bank;
use crate::cluster_info::ClusterInfo;
use crate::counter::Counter;
use crate::entry::EntrySlice;
use crate::entry::{EntryReceiver, EntrySender};
use crate::entry::{EntryReceiver, EntrySender, EntrySlice};
#[cfg(not(test))]
use crate::entry_stream::EntryStream;
use crate::entry_stream::EntryStreamHandler;
#[cfg(test)]
use crate::entry_stream::MockEntryStream as EntryStream;
use crate::fullnode::TvuRotationSender;
use crate::leader_scheduler::TICKS_PER_BLOCK;
use crate::packet::BlobError;
@ -65,6 +69,7 @@ impl ReplayStage {
ledger_entry_sender: &EntrySender,
entry_height: &Arc<RwLock<u64>>,
last_entry_id: &Arc<RwLock<Hash>>,
entry_stream: Option<&mut EntryStream>,
) -> Result<()> {
let timer = Duration::new(1, 0);
//coalesce all the available entries into a single vote
@ -76,6 +81,12 @@ impl ReplayStage {
}
}
if let Some(stream) = entry_stream {
stream.stream_entries(&entries).unwrap_or_else(|e| {
error!("Entry Stream error: {:?}, {:?}", e, stream.socket);
});
}
submit(
influxdb::Point::new("replicate-stage")
.add_field("count", influxdb::Value::Integer(entries.len() as i64))
@ -192,7 +203,7 @@ impl ReplayStage {
Ok(())
}
#[allow(clippy::new_ret_no_self)]
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
pub fn new(
keypair: Arc<Keypair>,
vote_signer: Option<Arc<VoteSignerProxy>>,
@ -203,6 +214,7 @@ impl ReplayStage {
entry_height: Arc<RwLock<u64>>,
last_entry_id: Arc<RwLock<Hash>>,
to_leader_sender: TvuRotationSender,
entry_stream: Option<String>,
) -> (Self, EntryReceiver) {
let (vote_blob_sender, vote_blob_receiver) = channel();
let (ledger_entry_sender, ledger_entry_receiver) = channel();
@ -219,6 +231,7 @@ impl ReplayStage {
let (mut last_leader_id, _) = bank
.get_current_leader()
.expect("Scheduled leader should be calculated by this point");
let mut entry_stream = entry_stream.map(EntryStream::new);
loop {
let (leader_id, _) = bank
.get_current_leader()
@ -244,6 +257,7 @@ impl ReplayStage {
&ledger_entry_sender,
&entry_height_.clone(),
&last_entry_id.clone(),
entry_stream.as_mut(),
) {
Err(Error::RecvTimeoutError(RecvTimeoutError::Disconnected)) => break,
Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
@ -275,6 +289,7 @@ impl Service for ReplayStage {
#[cfg(test)]
mod test {
use super::*;
use crate::bank::Bank;
use crate::cluster_info::{ClusterInfo, Node};
use crate::db_ledger::create_tmp_sample_ledger;
@ -383,6 +398,7 @@ mod test {
Arc::new(RwLock::new(initial_entry_len)),
Arc::new(RwLock::new(last_entry_id)),
rotation_sender,
None,
);
// Send enough ticks to trigger leader rotation
@ -485,6 +501,7 @@ mod test {
Arc::new(RwLock::new(initial_entry_len as u64)),
Arc::new(RwLock::new(last_entry_id)),
to_leader_sender,
None,
);
// Vote sender should error because no leader contact info is found in the
@ -603,6 +620,7 @@ mod test {
Arc::new(RwLock::new(initial_entry_len as u64)),
Arc::new(RwLock::new(last_entry_id)),
rotation_tx,
None,
);
// Vote sender should error because no leader contact info is found in the
@ -673,7 +691,6 @@ mod test {
let (entry_sender, entry_receiver) = channel();
let (ledger_entry_sender, _ledger_entry_receiver) = channel();
let last_entry_id = Hash::default();
// Create keypair for the old leader
let entry_height = 0;
let mut last_id = Hash::default();
@ -699,6 +716,7 @@ mod test {
&ledger_entry_sender,
&Arc::new(RwLock::new(entry_height)),
&Arc::new(RwLock::new(last_entry_id)),
None,
);
match res {
@ -725,6 +743,7 @@ mod test {
&ledger_entry_sender,
&Arc::new(RwLock::new(entry_height)),
&Arc::new(RwLock::new(last_entry_id)),
None,
);
match res {
@ -737,4 +756,53 @@ mod test {
),
}
}
#[test]
fn test_replay_stage_stream_entries() {
// Set up entry stream
let mut entry_stream = EntryStream::new("test_stream".to_string());
// Set up dummy node to host a ReplayStage
let my_keypair = Keypair::new();
let my_id = my_keypair.pubkey();
let my_node = Node::new_localhost_with_pubkey(my_id);
// Set up the cluster info
let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone())));
let (entry_sender, entry_receiver) = channel();
let (ledger_entry_sender, _ledger_entry_receiver) = channel();
let last_entry_id = Hash::default();
let entry_height = 0;
let mut last_id = Hash::default();
let mut entries = Vec::new();
let mut expected_entries = Vec::new();
for _ in 0..5 {
let entry = Entry::new(&mut last_id, 0, 1, vec![]); //just ticks
last_id = entry.id;
expected_entries.push(serde_json::to_string(&entry).unwrap());
entries.push(entry);
}
entry_sender
.send(entries.clone())
.expect("Expected to err out");
let my_keypair = Arc::new(my_keypair);
let vote_signer = Arc::new(VoteSignerProxy::new_local(&my_keypair));
ReplayStage::process_entries(
&Arc::new(Bank::default()),
&cluster_info_me,
&entry_receiver,
&my_keypair,
Some(&vote_signer),
None,
&ledger_entry_sender,
&Arc::new(RwLock::new(entry_height)),
&Arc::new(RwLock::new(last_entry_id)),
Some(&mut entry_stream),
)
.unwrap();
assert_eq!(entry_stream.socket.len(), 5);
assert_eq!(entry_stream.socket, expected_entries);
}
}

View File

@ -474,6 +474,7 @@ pub fn new_fullnode(
None,
false,
None,
None,
);
(server, leader_data, genesis_block, alice, ledger_path)

View File

@ -72,6 +72,7 @@ impl Tvu {
storage_rotate_count: u64,
to_leader_sender: TvuRotationSender,
storage_state: &StorageState,
entry_stream: Option<String>,
) -> Self {
let exit = Arc::new(AtomicBool::new(false));
let keypair: Arc<Keypair> = cluster_info
@ -121,6 +122,7 @@ impl Tvu {
l_entry_height.clone(),
l_last_entry_id.clone(),
to_leader_sender,
entry_stream,
);
let storage_stage = StorageStage::new(
@ -300,6 +302,7 @@ pub mod tests {
STORAGE_ROTATE_TEST_COUNT,
sender,
&StorageState::default(),
None,
);
let mut alice_ref_balance = starting_balance;

View File

@ -165,6 +165,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
None,
false,
None,
None,
);
// start up another validator from zero, converge and then check
@ -185,6 +186,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
Some(&leader_data),
false,
None,
None,
);
// Send validator some tokens to vote
@ -269,6 +271,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
None,
false,
None,
None,
);
let mut nodes = vec![server];
@ -303,6 +306,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
Some(&leader_data),
false,
None,
None,
);
nodes.push(val);
}
@ -366,6 +370,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
Some(&leader_data),
false,
None,
None,
);
nodes.push(val);
let servers = converge(&leader_data, N + 2); // contains the leader and new node
@ -456,6 +461,7 @@ fn test_multi_node_basic() {
None,
false,
None,
None,
);
let mut nodes = vec![server];
@ -486,6 +492,7 @@ fn test_multi_node_basic() {
Some(&leader_data),
false,
None,
None,
);
nodes.push(val);
}
@ -566,6 +573,7 @@ fn test_boot_validator_from_file() -> result::Result<()> {
None,
false,
None,
None,
);
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(500)).unwrap();
@ -591,6 +599,7 @@ fn test_boot_validator_from_file() -> result::Result<()> {
Some(&leader_data),
false,
None,
None,
);
let mut client = mk_client(&validator_data);
let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(leader_balance));
@ -624,6 +633,7 @@ fn create_leader(
None,
false,
None,
None,
);
(leader_data, leader_fullnode)
}
@ -701,6 +711,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> {
Some(&leader_data),
false,
None,
None,
);
// trigger broadcast, validator should catch up from leader, whose window contains
@ -770,6 +781,7 @@ fn test_multi_node_dynamic_network() {
None,
true,
None,
None,
);
info!(
"found leader: {:?}",
@ -845,6 +857,7 @@ fn test_multi_node_dynamic_network() {
Some(&leader_data),
true,
None,
None,
);
(rd, val)
})
@ -1026,6 +1039,7 @@ fn test_leader_to_validator_transition() {
Some(&leader_info),
false,
None,
None,
);
// Make an extra node for our leader to broadcast to,
@ -1181,6 +1195,7 @@ fn test_leader_validator_basic() {
Some(&leader_info),
false,
None,
None,
);
// Start the leader fullnode
@ -1194,6 +1209,7 @@ fn test_leader_validator_basic() {
Some(&leader_info),
false,
None,
None,
);
// Wait for convergence
@ -1389,6 +1405,7 @@ fn test_dropped_handoff_recovery() {
Some(&bootstrap_leader_info),
false,
None,
None,
);
let mut nodes = vec![bootstrap_leader];
@ -1412,6 +1429,7 @@ fn test_dropped_handoff_recovery() {
Some(&bootstrap_leader_info),
false,
None,
None,
);
nodes.push(validator);
@ -1439,6 +1457,7 @@ fn test_dropped_handoff_recovery() {
Some(&bootstrap_leader_info),
false,
None,
None,
);
info!("Wait for 'next leader' to assume leader role");
@ -1589,6 +1608,7 @@ fn test_full_leader_validator_network() {
Some(&bootstrap_leader_info),
false,
None,
None,
);
schedules.push(leader_scheduler);
@ -1607,6 +1627,7 @@ fn test_full_leader_validator_network() {
Some(&bootstrap_leader_info),
false,
None,
None,
);
schedules.push(leader_scheduler);
@ -1785,6 +1806,7 @@ fn test_broadcast_last_tick() {
Some(&bootstrap_leader_info),
false,
None,
None,
);
// Wait for convergence

View File

@ -61,6 +61,7 @@ fn test_replicator_startup() {
))),
None,
STORAGE_ROTATE_TEST_COUNT,
None,
);
let validator_keypair = Arc::new(Keypair::new());
@ -91,6 +92,7 @@ fn test_replicator_startup() {
))),
None,
STORAGE_ROTATE_TEST_COUNT,
None,
);
let bob = Keypair::new();
@ -287,6 +289,7 @@ fn test_replicator_startup_ledger_hang() {
None,
false,
None,
None,
);
let validator_keypair = Arc::new(Keypair::new());
@ -304,6 +307,7 @@ fn test_replicator_startup_ledger_hang() {
Some(&leader_info),
false,
None,
None,
);
info!("starting replicator node");