From aa24181a53c9192fc3e6035755bc282027ec6fc2 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Sat, 21 Mar 2020 20:17:11 -0700 Subject: [PATCH] Remove blockstream unix socket support. RPC or bust (#9004) automerge --- core/src/blockstream.rs | 283 -------------------------------- core/src/blockstream_service.rs | 228 ------------------------- core/src/lib.rs | 2 - core/src/replay_stage.rs | 21 +-- core/src/tvu.rs | 23 --- core/src/validator.rs | 3 - docs/src/cluster/bench-tps.md | 30 +--- multinode-demo/validator.sh | 4 - run.sh | 14 -- validator/src/main.rs | 11 -- 10 files changed, 3 insertions(+), 616 deletions(-) delete mode 100644 core/src/blockstream.rs delete mode 100644 core/src/blockstream_service.rs diff --git a/core/src/blockstream.rs b/core/src/blockstream.rs deleted file mode 100644 index a59e4c67b..000000000 --- a/core/src/blockstream.rs +++ /dev/null @@ -1,283 +0,0 @@ -//! The `blockstream` module provides a method for streaming entries out via a -//! local unix socket, to provide client services such as a block explorer with -//! real-time access to entries. - -use bincode::serialize; -use chrono::{SecondsFormat, Utc}; -use serde_json::json; -use solana_ledger::entry::Entry; -use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey}; -use std::cell::RefCell; -use std::io::Result; -use std::path::{Path, PathBuf}; - -pub trait EntryWriter: std::fmt::Debug { - fn write(&self, payload: String) -> Result<()>; -} - -#[derive(Debug, Default)] -pub struct EntryVec { - values: RefCell>, -} - -impl EntryWriter for EntryVec { - fn write(&self, payload: String) -> Result<()> { - self.values.borrow_mut().push(payload); - Ok(()) - } -} - -impl EntryVec { - pub fn new() -> Self { - EntryVec { - values: RefCell::new(Vec::new()), - } - } - - pub fn entries(&self) -> Vec { - self.values.borrow().clone() - } -} - -#[derive(Debug)] -pub struct EntrySocket { - unix_socket: PathBuf, -} - -impl EntryWriter for EntrySocket { - #[cfg(not(windows))] - fn write(&self, payload: String) -> Result<()> { - use std::io::prelude::*; - use std::net::Shutdown; - use std::os::unix::net::UnixStream; - - const MESSAGE_TERMINATOR: &str = "\n"; - - let mut socket = UnixStream::connect(&self.unix_socket)?; - socket.write_all(payload.as_bytes())?; - socket.write_all(MESSAGE_TERMINATOR.as_bytes())?; - socket.shutdown(Shutdown::Write)?; - Ok(()) - } - #[cfg(windows)] - fn write(&self, _payload: String) -> Result<()> { - Err(std::io::Error::new( - std::io::ErrorKind::Other, - "EntryWriter::write() not implemented for windows", - )) - } -} - -pub trait BlockstreamEvents { - fn emit_entry_event( - &self, - slot: Slot, - tick_height: u64, - leader_pubkey: &Pubkey, - entries: &Entry, - ) -> Result<()>; - fn emit_block_event( - &self, - slot: Slot, - tick_height: u64, - leader_pubkey: &Pubkey, - blockhash: Hash, - ) -> Result<()>; -} - -#[derive(Debug)] -pub struct Blockstream { - pub output: T, -} - -impl BlockstreamEvents for Blockstream -where - T: EntryWriter, -{ - fn emit_entry_event( - &self, - slot: Slot, - tick_height: u64, - leader_pubkey: &Pubkey, - entry: &Entry, - ) -> Result<()> { - let transactions: Vec> = serialize_transactions(entry); - let stream_entry = json!({ - "num_hashes": entry.num_hashes, - "hash": entry.hash, - "transactions": transactions - }); - let json_entry = serde_json::to_string(&stream_entry)?; - let payload = format!( - r#"{{"dt":"{}","t":"entry","s":{},"h":{},"l":"{:?}","entry":{}}}"#, - Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true), - slot, - tick_height, - leader_pubkey, - json_entry, - ); - self.output.write(payload)?; - Ok(()) - } - - fn emit_block_event( - &self, - slot: Slot, - tick_height: u64, - leader_pubkey: &Pubkey, - blockhash: Hash, - ) -> Result<()> { - let payload = format!( - r#"{{"dt":"{}","t":"block","s":{},"h":{},"l":"{:?}","hash":"{:?}"}}"#, - Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true), - slot, - tick_height, - leader_pubkey, - blockhash, - ); - self.output.write(payload)?; - Ok(()) - } -} - -pub type SocketBlockstream = Blockstream; - -impl SocketBlockstream { - pub fn new(unix_socket: &Path) -> Self { - Blockstream { - output: EntrySocket { - unix_socket: unix_socket.to_path_buf(), - }, - } - } -} - -pub type MockBlockstream = Blockstream; - -impl MockBlockstream { - pub fn new(_: &Path) -> Self { - Blockstream { - output: EntryVec::new(), - } - } - - pub fn entries(&self) -> Vec { - self.output.entries() - } -} - -fn serialize_transactions(entry: &Entry) -> Vec> { - entry - .transactions - .iter() - .map(|tx| serialize(&tx).unwrap()) - .collect() -} - -#[cfg(test)] -mod test { - use super::*; - use chrono::{DateTime, FixedOffset}; - use serde_json::Value; - use solana_sdk::hash::Hash; - use solana_sdk::signature::{Keypair, Signer}; - use solana_sdk::system_transaction; - use std::collections::HashSet; - use std::path::PathBuf; - - #[test] - fn test_serialize_transactions() { - let entry = Entry::new(&Hash::default(), 1, vec![]); - let empty_vec: Vec> = vec![]; - assert_eq!(serialize_transactions(&entry), empty_vec); - - let keypair0 = Keypair::new(); - let keypair1 = Keypair::new(); - let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); - let tx1 = system_transaction::transfer(&keypair1, &keypair0.pubkey(), 2, Hash::default()); - let serialized_tx0 = serialize(&tx0).unwrap(); - let serialized_tx1 = serialize(&tx1).unwrap(); - let entry = Entry::new(&Hash::default(), 1, vec![tx0, tx1]); - assert_eq!( - serialize_transactions(&entry), - vec![serialized_tx0, serialized_tx1] - ); - } - - #[test] - fn test_blockstream() -> () { - let blockstream = MockBlockstream::new(&PathBuf::from("test_stream")); - let ticks_per_slot = 5; - - let mut blockhash = Hash::default(); - let mut entries = Vec::new(); - let mut expected_entries = Vec::new(); - - let tick_height_initial = 1; - let tick_height_final = tick_height_initial + ticks_per_slot + 2; - let mut curr_slot = 0; - let leader_pubkey = Pubkey::new_rand(); - - for tick_height in tick_height_initial..=tick_height_final { - if tick_height == 5 { - blockstream - .emit_block_event(curr_slot, tick_height, &leader_pubkey, blockhash) - .unwrap(); - curr_slot += 1; - } - let entry = Entry::new(&mut blockhash, 1, vec![]); // just ticks - blockhash = entry.hash; - blockstream - .emit_entry_event(curr_slot, tick_height, &leader_pubkey, &entry) - .unwrap(); - expected_entries.push(entry.clone()); - entries.push(entry); - } - - assert_eq!( - blockstream.entries().len() as u64, - // one entry per tick (1..=N+2) is +3, plus one block - ticks_per_slot + 3 + 1 - ); - - let mut j = 0; - let mut matched_entries = 0; - let mut matched_slots = HashSet::new(); - let mut matched_blocks = HashSet::new(); - - for item in blockstream.entries() { - let json: Value = serde_json::from_str(&item).unwrap(); - let dt_str = json["dt"].as_str().unwrap(); - - // Ensure `ts` field parses as valid DateTime - let _dt: DateTime = DateTime::parse_from_rfc3339(dt_str).unwrap(); - - let item_type = json["t"].as_str().unwrap(); - match item_type { - "block" => { - let hash = json["hash"].to_string(); - matched_blocks.insert(hash); - } - - "entry" => { - let slot = json["s"].as_u64().unwrap(); - matched_slots.insert(slot); - let entry_obj = json["entry"].clone(); - let entry: Entry = serde_json::from_value(entry_obj).unwrap(); - - assert_eq!(entry, expected_entries[j]); - matched_entries += 1; - j += 1; - } - - _ => { - assert!(false, "unknown item type {}", item); - } - } - } - - assert_eq!(matched_entries, expected_entries.len()); - assert_eq!(matched_slots.len(), 2); - assert_eq!(matched_blocks.len(), 1); - } -} diff --git a/core/src/blockstream_service.rs b/core/src/blockstream_service.rs deleted file mode 100644 index e56230e50..000000000 --- a/core/src/blockstream_service.rs +++ /dev/null @@ -1,228 +0,0 @@ -//! The `blockstream_service` implements optional streaming of entries and block metadata -//! using the `blockstream` module, providing client services such as a block explorer with -//! real-time access to entries. - -use crate::blockstream::BlockstreamEvents; -#[cfg(test)] -use crate::blockstream::MockBlockstream as Blockstream; -#[cfg(not(test))] -use crate::blockstream::SocketBlockstream as Blockstream; -use crate::result::{Error, Result}; -use solana_ledger::blockstore::Blockstore; -use solana_sdk::pubkey::Pubkey; -use std::path::Path; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{Receiver, RecvTimeoutError}; -use std::sync::Arc; -use std::thread::{self, Builder, JoinHandle}; -use std::time::Duration; - -pub struct BlockstreamService { - t_blockstream: JoinHandle<()>, -} - -impl BlockstreamService { - #[allow(clippy::new_ret_no_self)] - pub fn new( - slot_full_receiver: Receiver<(u64, Pubkey)>, - blockstore: Arc, - unix_socket: &Path, - exit: &Arc, - ) -> Self { - let mut blockstream = Blockstream::new(unix_socket); - let exit = exit.clone(); - let t_blockstream = Builder::new() - .name("solana-blockstream".to_string()) - .spawn(move || loop { - if exit.load(Ordering::Relaxed) { - break; - } - if let Err(e) = - Self::process_entries(&slot_full_receiver, &blockstore, &mut blockstream) - { - match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => info!("Error from process_entries: {:?}", e), - } - } - }) - .unwrap(); - Self { t_blockstream } - } - fn process_entries( - slot_full_receiver: &Receiver<(u64, Pubkey)>, - blockstore: &Arc, - blockstream: &mut Blockstream, - ) -> Result<()> { - let timeout = Duration::new(1, 0); - let (slot, slot_leader) = slot_full_receiver.recv_timeout(timeout)?; - - // Slot might not exist due to LedgerCleanupService, check first - let blockstore_meta = blockstore.meta(slot).unwrap(); - if let Some(blockstore_meta) = blockstore_meta { - // Return error to main loop. Thread won't exit, will just log the error - let entries = blockstore.get_slot_entries(slot, 0, None)?; - let _parent_slot = if slot == 0 { - None - } else { - Some(blockstore_meta.parent_slot) - }; - let ticks_per_slot = entries.iter().filter(|entry| entry.is_tick()).count() as u64; - let mut tick_height = ticks_per_slot * slot; - - for (i, entry) in entries.iter().enumerate() { - if entry.is_tick() { - tick_height += 1; - } - blockstream - .emit_entry_event(slot, tick_height, &slot_leader, &entry) - .unwrap_or_else(|e| { - debug!("Blockstream error: {:?}, {:?}", e, blockstream.output); - }); - if i == entries.len() - 1 { - blockstream - .emit_block_event(slot, tick_height, &slot_leader, entry.hash) - .unwrap_or_else(|e| { - debug!("Blockstream error: {:?}, {:?}", e, blockstream.output); - }); - } - } - } - Ok(()) - } - - pub fn join(self) -> thread::Result<()> { - self.t_blockstream.join() - } -} - -#[cfg(test)] -mod test { - use super::*; - use bincode::{deserialize, serialize}; - use chrono::{DateTime, FixedOffset}; - use serde_json::Value; - use solana_ledger::create_new_tmp_ledger; - use solana_ledger::entry::{create_ticks, Entry}; - use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}; - use solana_sdk::hash::Hash; - use solana_sdk::signature::{Keypair, Signer}; - use solana_sdk::system_transaction; - use std::path::PathBuf; - use std::sync::mpsc::channel; - - #[test] - fn test_blockstream_service_process_entries() { - let ticks_per_slot = 5; - let leader_pubkey = Pubkey::new_rand(); - - // Set up genesis config and blockstore - let GenesisConfigInfo { - mut genesis_config, .. - } = create_genesis_config(1000); - genesis_config.ticks_per_slot = ticks_per_slot; - - let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config); - let blockstore = Blockstore::open(&ledger_path).unwrap(); - - // Set up blockstream - let mut blockstream = Blockstream::new(&PathBuf::from("test_stream")); - - // Set up dummy channel to receive a full-slot notification - let (slot_full_sender, slot_full_receiver) = channel(); - - // Create entries - 4 ticks + 1 populated entry + 1 tick - let mut entries = create_ticks(4, 0, Hash::default()); - - let keypair = Keypair::new(); - let mut blockhash = entries[3].hash; - let tx = system_transaction::transfer(&keypair, &keypair.pubkey(), 1, Hash::default()); - let entry = Entry::new(&mut blockhash, 1, vec![tx]); - blockhash = entry.hash; - entries.push(entry); - let final_tick = create_ticks(1, 0, blockhash); - entries.extend_from_slice(&final_tick); - - let expected_entries = entries.clone(); - let expected_tick_heights = [6, 7, 8, 9, 9, 10]; - - blockstore - .write_entries( - 1, - 0, - 0, - ticks_per_slot, - None, - true, - &Arc::new(Keypair::new()), - entries, - 0, - ) - .unwrap(); - - slot_full_sender.send((1, leader_pubkey)).unwrap(); - BlockstreamService::process_entries( - &slot_full_receiver, - &Arc::new(blockstore), - &mut blockstream, - ) - .unwrap(); - assert_eq!(blockstream.entries().len(), 7); - - let (entry_events, block_events): (Vec, Vec) = blockstream - .entries() - .iter() - .map(|item| { - let json: Value = serde_json::from_str(&item).unwrap(); - let dt_str = json["dt"].as_str().unwrap(); - // Ensure `ts` field parses as valid DateTime - let _dt: DateTime = DateTime::parse_from_rfc3339(dt_str).unwrap(); - json - }) - .partition(|json| { - let item_type = json["t"].as_str().unwrap(); - item_type == "entry" - }); - for (i, json) in entry_events.iter().enumerate() { - let height = json["h"].as_u64().unwrap(); - assert_eq!(height, expected_tick_heights[i]); - let entry_obj = json["entry"].clone(); - let tx = entry_obj["transactions"].as_array().unwrap(); - let entry: Entry; - if tx.len() == 0 { - entry = serde_json::from_value(entry_obj).unwrap(); - } else { - let entry_json = entry_obj.as_object().unwrap(); - entry = Entry { - num_hashes: entry_json.get("num_hashes").unwrap().as_u64().unwrap(), - hash: serde_json::from_value(entry_json.get("hash").unwrap().clone()).unwrap(), - transactions: entry_json - .get("transactions") - .unwrap() - .as_array() - .unwrap() - .into_iter() - .enumerate() - .map(|(j, tx)| { - let tx_vec: Vec = serde_json::from_value(tx.clone()).unwrap(); - // Check explicitly that transaction matches bincode-serialized format - assert_eq!( - tx_vec, - serialize(&expected_entries[i].transactions[j]).unwrap() - ); - deserialize(&tx_vec).unwrap() - }) - .collect(), - }; - } - assert_eq!(entry, expected_entries[i]); - } - for json in block_events { - let slot = json["s"].as_u64().unwrap(); - assert_eq!(1, slot); - let height = json["h"].as_u64().unwrap(); - assert_eq!(2 * ticks_per_slot, height); - } - } -} diff --git a/core/src/lib.rs b/core/src/lib.rs index 23699199d..429ffe3de 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -14,8 +14,6 @@ mod deprecated; pub mod shred_fetch_stage; #[macro_use] pub mod contact_info; -pub mod blockstream; -pub mod blockstream_service; pub mod cluster_info; pub mod cluster_slots; pub mod consensus; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index b80ce3412..c8564b1d1 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -85,7 +85,6 @@ pub struct ReplayStageConfig { pub exit: Arc, pub subscriptions: Arc, pub leader_schedule_cache: Arc, - pub slot_full_senders: Vec>, pub latest_root_senders: Vec>, pub accounts_hash_sender: Option, pub block_commitment_cache: Arc>, @@ -190,7 +189,6 @@ impl ReplayStage { exit, subscriptions, leader_schedule_cache, - slot_full_senders, latest_root_senders, accounts_hash_sender, block_commitment_cache, @@ -255,7 +253,6 @@ impl ReplayStage { &bank_forks, &my_pubkey, &mut progress, - &slot_full_senders, transaction_status_sender.clone(), &verify_recyclers, ); @@ -796,7 +793,6 @@ impl ReplayStage { bank_forks: &Arc>, my_pubkey: &Pubkey, progress: &mut ProgressMap, - slot_full_senders: &[Sender<(u64, Pubkey)>], transaction_status_sender: Option, verify_recyclers: &VerifyRecyclers, ) -> bool { @@ -846,7 +842,8 @@ impl ReplayStage { bank_progress.replay_progress.num_shreds, ); did_complete_bank = true; - Self::process_completed_bank(my_pubkey, bank, slot_full_senders); + info!("bank frozen: {}", bank.slot()); + bank.freeze(); } else { trace!( "bank {} not completed tick_height: {}, max_tick_height: {}", @@ -1183,20 +1180,6 @@ impl ReplayStage { progress.retain(|k, _| r_bank_forks.get(*k).is_some()); } - fn process_completed_bank( - my_pubkey: &Pubkey, - bank: Arc, - slot_full_senders: &[Sender<(u64, Pubkey)>], - ) { - info!("bank frozen: {}", bank.slot()); - bank.freeze(); - slot_full_senders.iter().for_each(|sender| { - if let Err(e) = sender.send((bank.slot(), *bank.collector_id())) { - trace!("{} slot_full alert failed: {:?}", my_pubkey, e); - } - }); - } - fn generate_new_bank_forks( blockstore: &Blockstore, forks_lock: &RwLock, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index b466ac192..62d82ce26 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -3,7 +3,6 @@ use crate::{ accounts_hash_verifier::AccountsHashVerifier, - blockstream_service::BlockstreamService, broadcast_stage::RetransmitSlotsSender, cluster_info::ClusterInfo, cluster_info_vote_listener::VoteTracker, @@ -34,7 +33,6 @@ use solana_sdk::{ use std::collections::HashSet; use std::{ net::UdpSocket, - path::PathBuf, sync::{ atomic::AtomicBool, mpsc::{channel, Receiver}, @@ -48,7 +46,6 @@ pub struct Tvu { sigverify_stage: SigVerifyStage, retransmit_stage: RetransmitStage, replay_stage: ReplayStage, - blockstream_service: Option, ledger_cleanup_service: Option, storage_stage: StorageStage, accounts_hash_verifier: AccountsHashVerifier, @@ -88,7 +85,6 @@ impl Tvu { sockets: Sockets, blockstore: Arc, storage_state: &StorageState, - blockstream_unix_socket: Option<&PathBuf>, ledger_signal_receiver: Receiver, subscriptions: &Arc, poh_recorder: &Arc>, @@ -162,7 +158,6 @@ impl Tvu { tvu_config.shred_version, ); - let (blockstream_slot_sender, blockstream_slot_receiver) = channel(); let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); let (accounts_hash_sender, accounts_hash_receiver) = channel(); @@ -183,7 +178,6 @@ impl Tvu { exit: exit.clone(), subscriptions: subscriptions.clone(), leader_schedule_cache: leader_schedule_cache.clone(), - slot_full_senders: vec![blockstream_slot_sender], latest_root_senders: vec![ledger_cleanup_slot_sender], accounts_hash_sender: Some(accounts_hash_sender), block_commitment_cache, @@ -202,18 +196,6 @@ impl Tvu { retransmit_slots_sender, ); - let blockstream_service = if let Some(blockstream_unix_socket) = blockstream_unix_socket { - let blockstream_service = BlockstreamService::new( - blockstream_slot_receiver, - blockstore.clone(), - blockstream_unix_socket, - &exit, - ); - Some(blockstream_service) - } else { - None - }; - let ledger_cleanup_service = tvu_config.max_ledger_slots.map(|max_ledger_slots| { LedgerCleanupService::new( ledger_cleanup_slot_receiver, @@ -239,7 +221,6 @@ impl Tvu { sigverify_stage, retransmit_stage, replay_stage, - blockstream_service, ledger_cleanup_service, storage_stage, accounts_hash_verifier, @@ -251,9 +232,6 @@ impl Tvu { self.fetch_stage.join()?; self.sigverify_stage.join()?; self.storage_stage.join()?; - if self.blockstream_service.is_some() { - self.blockstream_service.unwrap().join()?; - } if self.ledger_cleanup_service.is_some() { self.ledger_cleanup_service.unwrap().join()?; } @@ -319,7 +297,6 @@ pub mod tests { }, blockstore, &StorageState::default(), - None, l_receiver, &Arc::new(RpcSubscriptions::new(&exit)), &poh_recorder, diff --git a/core/src/validator.rs b/core/src/validator.rs index 992971b2c..7a9c062db 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -64,7 +64,6 @@ pub struct ValidatorConfig { pub expected_genesis_hash: Option, pub expected_shred_version: Option, pub voting_disabled: bool, - pub blockstream_unix_socket: Option, pub storage_slots_per_turn: u64, pub account_paths: Vec, pub rpc_config: JsonRpcConfig, @@ -89,7 +88,6 @@ impl Default for ValidatorConfig { expected_genesis_hash: None, expected_shred_version: None, voting_disabled: false, - blockstream_unix_socket: None, storage_slots_per_turn: DEFAULT_SLOTS_PER_TURN, max_ledger_slots: None, account_paths: Vec::new(), @@ -418,7 +416,6 @@ impl Validator { }, blockstore.clone(), &storage_state, - config.blockstream_unix_socket.as_ref(), ledger_signal_receiver, &subscriptions, &poh_recorder, diff --git a/docs/src/cluster/bench-tps.md b/docs/src/cluster/bench-tps.md index 07be36d2a..102c3928d 100644 --- a/docs/src/cluster/bench-tps.md +++ b/docs/src/cluster/bench-tps.md @@ -121,35 +121,7 @@ thread apply all bt This will dump all the threads stack traces into gdb.txt -### Blockstreamer - -Solana supports a node type called a _blockstreamer_. This validator variation is intended for applications that need to observe the data plane without participating in transaction validation or ledger replication. - -A blockstreamer runs without a vote signer, and can optionally stream ledger entries out to a Unix domain socket as they are processed. The JSON-RPC service still functions as on any other node. - -To run a blockstreamer, include the argument `no-signer` and \(optional\) `blockstream` socket location: - -```bash -$ NDEBUG=1 ./multinode-demo/validator-x.sh --no-signer --blockstream -``` - -The stream will output a series of JSON objects: - -* An Entry event JSON object is sent when each ledger entry is processed, with the following fields: - * `dt`, the system datetime, as RFC3339-formatted string - * `t`, the event type, always "entry" - * `s`, the slot height, as unsigned 64-bit integer - * `h`, the tick height, as unsigned 64-bit integer - * `entry`, the entry, as JSON object -* A Block event JSON object is sent when a block is complete, with the following fields: - * `dt`, the system datetime, as RFC3339-formatted string - * `t`, the event type, always "block" - * `s`, the slot height, as unsigned 64-bit integer - * `h`, the tick height, as unsigned 64-bit integer - * `l`, the slot leader id, as base-58 encoded string - * `hash`, the [blockhash](terminology.md#blockhash), as base-58 encoded string - -## Public Testnet +## Developer Testnet In this example the client connects to our public testnet. To run validators on the testnet you would need to open udp ports `8000-10000`. diff --git a/multinode-demo/validator.sh b/multinode-demo/validator.sh index 407c7187c..5ffc24a7c 100755 --- a/multinode-demo/validator.sh +++ b/multinode-demo/validator.sh @@ -29,7 +29,6 @@ Start a validator with no stake OPTIONS: --ledger PATH - store ledger under this PATH - --blockstream PATH - open blockstream at this unix domain socket location --init-complete-file FILE - create this file, if it doesn't already exist, once node initialization is complete --label LABEL - Append the given label to the configuration files, useful when running multiple validators in the same workspace @@ -60,9 +59,6 @@ while [[ -n $1 ]]; do airdrops_enabled=0 shift # solana-validator options - elif [[ $1 = --blockstream ]]; then - args+=("$1" "$2") - shift 2 elif [[ $1 = --expected-genesis-hash ]]; then args+=("$1" "$2") shift 2 diff --git a/run.sh b/run.sh index da09d9640..6197bbf4a 100755 --- a/run.sh +++ b/run.sh @@ -29,17 +29,6 @@ $ok || { exit 1 } -blockstreamSocket=/tmp/solana-blockstream.sock # Default to location used by the block explorer -while [[ -n $1 ]]; do - if [[ $1 = --blockstream ]]; then - blockstreamSocket=$2 - shift 2 - else - echo "Unknown argument: $1" - exit 1 - fi -done - export RUST_LOG=${RUST_LOG:-solana=info} # if RUST_LOG is unset, default to info export RUST_BACKTRACE=1 dataDir=$PWD/config/"$(basename "$0" .sh)" @@ -108,9 +97,6 @@ args=( --enable-rpc-get-confirmed-block --init-complete-file "$dataDir"/init-completed ) -if [[ -n $blockstreamSocket ]]; then - args+=(--blockstream "$blockstreamSocket") -fi solana-validator "${args[@]}" & validator=$! diff --git a/validator/src/main.rs b/validator/src/main.rs index 9bdba70b9..80f0f5e57 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -356,14 +356,6 @@ pub fn main() { let matches = App::new(crate_name!()).about(crate_description!()) .version(solana_clap_utils::version!()) - .arg( - Arg::with_name("blockstream_unix_socket") - .long("blockstream") - .takes_value(true) - .hidden(true) // Don't document this argument to discourage its use - .value_name("UNIX DOMAIN SOCKET") - .help("Stream entries to this unix domain socket path") - ) .arg( Arg::with_name(SKIP_SEED_PHRASE_VALIDATION_ARG.name) .long(SKIP_SEED_PHRASE_VALIDATION_ARG.long) @@ -719,9 +711,6 @@ pub fn main() { }; let mut validator_config = ValidatorConfig { - blockstream_unix_socket: matches - .value_of("blockstream_unix_socket") - .map(PathBuf::from), dev_sigverify_disabled: matches.is_present("dev_no_sigverify"), dev_halt_at_slot: value_t!(matches, "dev_halt_at_slot", Slot).ok(), expected_genesis_hash: matches