Add wen_restart module (#33344)

* Add wen_restart module:
- Implement reading LastVotedForkSlots from blockstore.
- Add proto file to record the intermediate results.
- Also link wen_restart into validator.
- Move recreation of tower outside replay_stage so we can get last_vote.

* Update lock file.

* Fix linter errors.

* Fix depencies order.

* Update wen_restart explanation and small fixes.

* Generate tower outside tvu.

* Update validator/src/cli.rs

Co-authored-by: Tyera <teulberg@gmail.com>

* Update wen-restart/protos/wen_restart.proto

Co-authored-by: Tyera <teulberg@gmail.com>

* Update wen-restart/build.rs

Co-authored-by: Tyera <teulberg@gmail.com>

* Update wen-restart/src/wen_restart.rs

Co-authored-by: Tyera <teulberg@gmail.com>

* Rename proto directory.

* Rename InitRecord to MyLastVotedForkSlots, add imports.

* Update wen-restart/Cargo.toml

Co-authored-by: Tyera <teulberg@gmail.com>

* Update wen-restart/src/wen_restart.rs

Co-authored-by: Tyera <teulberg@gmail.com>

* Move prost-build dependency to project toml.

* No need to continue if the distance between slot and last_vote is
already larger than MAX_SLOTS_ON_VOTED_FORKS.

* Use 16k slots instead of 81k slots, a few more wording changes.

* Use AncestorIterator which does the same thing.

* Update Cargo.lock

* Update Cargo.lock

---------

Co-authored-by: Tyera <teulberg@gmail.com>
This commit is contained in:
Wen 2023-10-06 15:04:37 -07:00 committed by GitHub
parent f075867ceb
commit 630feeddf2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 387 additions and 18 deletions

23
Cargo.lock generated
View File

@ -5792,6 +5792,7 @@ dependencies = [
"solana-version",
"solana-vote",
"solana-vote-program",
"solana-wen-restart",
"static_assertions",
"strum",
"strum_macros",
@ -7482,6 +7483,28 @@ dependencies = [
"solana-version",
]
[[package]]
name = "solana-wen-restart"
version = "1.18.0"
dependencies = [
"log",
"prost",
"prost-build",
"prost-types",
"protobuf-src",
"rustc_version 0.4.0",
"serial_test",
"solana-entry",
"solana-gossip",
"solana-ledger",
"solana-logger",
"solana-program",
"solana-runtime",
"solana-sdk",
"solana-streamer",
"solana-vote-program",
]
[[package]]
name = "solana-zk-keygen"
version = "1.18.0"

View File

@ -111,6 +111,7 @@ members = [
"version",
"vote",
"watchtower",
"wen-restart",
"zk-keygen",
"zk-token-sdk",
]
@ -261,6 +262,7 @@ pretty-hex = "0.3.0"
proc-macro2 = "1.0.67"
proptest = "1.2"
prost = "0.11.9"
prost-build = "0.11.9"
prost-types = "0.11.9"
protobuf-src = "1.1.0"
qstring = "0.7.2"
@ -371,6 +373,7 @@ solana-udp-client = { path = "udp-client", version = "=1.18.0" }
solana-version = { path = "version", version = "=1.18.0" }
solana-vote = { path = "vote", version = "=1.18.0" }
solana-vote-program = { path = "programs/vote", version = "=1.18.0" }
solana-wen-restart = { path = "wen-restart", version = "=1.18.0" }
solana-zk-keygen = { path = "zk-keygen", version = "=1.18.0" }
solana-zk-token-proof-program = { path = "programs/zk-token-proof", version = "=1.18.0" }
solana-zk-token-sdk = { path = "zk-token-sdk", version = "=1.18.0" }

View File

@ -71,6 +71,7 @@ solana-turbine = { workspace = true }
solana-version = { workspace = true }
solana-vote = { workspace = true }
solana-vote-program = { workspace = true }
solana-wen-restart = { workspace = true }
strum = { workspace = true, features = ["derive"] }
strum_macros = { workspace = true }
sys-info = { workspace = true }

View File

@ -29,7 +29,6 @@ use {
},
rewards_recorder_service::{RewardsMessage, RewardsRecorderSender},
unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes,
validator::ProcessBlockStore,
voting_service::VoteOp,
window_service::DuplicateSlotReceiver,
},
@ -483,7 +482,7 @@ impl ReplayStage {
ledger_signal_receiver: Receiver<bool>,
duplicate_slots_receiver: DuplicateSlotReceiver,
poh_recorder: Arc<RwLock<PohRecorder>>,
maybe_process_blockstore: Option<ProcessBlockStore>,
mut tower: Tower,
vote_tracker: Arc<VoteTracker>,
cluster_slots: Arc<ClusterSlots>,
retransmit_slots_sender: Sender<Slot>,
@ -502,15 +501,6 @@ impl ReplayStage {
banking_tracer: Arc<BankingTracer>,
popular_pruned_forks_receiver: PopularPrunedForksReceiver,
) -> Result<Self, String> {
let mut tower = if let Some(process_blockstore) = maybe_process_blockstore {
let tower = process_blockstore.process_to_create_tower()?;
info!("Tower state: {:?}", tower);
tower
} else {
warn!("creating default tower....");
Tower::default()
};
let ReplayStageConfig {
vote_account,
authorized_voter_keypairs,

View File

@ -11,7 +11,7 @@ use {
},
cluster_slots_service::{cluster_slots::ClusterSlots, ClusterSlotsService},
completed_data_sets_service::CompletedDataSetsSender,
consensus::tower_storage::TowerStorage,
consensus::{tower_storage::TowerStorage, Tower},
cost_update_service::CostUpdateService,
drop_bank_service::DropBankService,
ledger_cleanup_service::LedgerCleanupService,
@ -19,7 +19,6 @@ use {
replay_stage::{ReplayStage, ReplayStageConfig},
rewards_recorder_service::RewardsRecorderSender,
shred_fetch_stage::ShredFetchStage,
validator::ProcessBlockStore,
voting_service::VotingService,
warm_quic_cache_service::WarmQuicCacheService,
window_service::WindowService,
@ -109,7 +108,7 @@ impl Tvu {
ledger_signal_receiver: Receiver<bool>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
maybe_process_block_store: Option<ProcessBlockStore>,
tower: Tower,
tower_storage: Arc<dyn TowerStorage>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
exit: Arc<AtomicBool>,
@ -292,7 +291,7 @@ impl Tvu {
ledger_signal_receiver,
duplicate_slots_receiver,
poh_recorder.clone(),
maybe_process_block_store,
tower,
vote_tracker,
cluster_slots,
retransmit_slots_sender,
@ -463,7 +462,7 @@ pub mod tests {
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
)),
&poh_recorder,
None,
Tower::default(),
Arc::new(FileTowerStorage::default()),
&leader_schedule_cache,
exit.clone(),

View File

@ -119,6 +119,7 @@ use {
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
solana_turbine::{self, broadcast_stage::BroadcastStageType},
solana_vote_program::vote_state,
solana_wen_restart::wen_restart::wait_for_wen_restart,
std::{
collections::{HashMap, HashSet},
net::SocketAddr,
@ -259,6 +260,7 @@ pub struct ValidatorConfig {
pub block_production_method: BlockProductionMethod,
pub generator_config: Option<GeneratorConfig>,
pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
pub wen_restart_proto_path: Option<PathBuf>,
}
impl Default for ValidatorConfig {
@ -326,6 +328,7 @@ impl Default for ValidatorConfig {
block_production_method: BlockProductionMethod::default(),
generator_config: None,
use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
wen_restart_proto_path: None,
}
}
}
@ -1202,6 +1205,22 @@ impl Validator {
)
.unwrap();
let in_wen_restart = config.wen_restart_proto_path.is_some() && !waited_for_supermajority;
let tower = match process_blockstore.process_to_create_tower() {
Ok(tower) => {
info!("Tower state: {:?}", tower);
tower
}
Err(e) => {
warn!(
"Unable to retrieve tower: {:?} creating default tower....",
e
);
Tower::default()
}
};
let last_vote = tower.last_vote();
let (replay_vote_sender, replay_vote_receiver) = unbounded();
let tvu = Tvu::new(
vote_account,
@ -1218,7 +1237,7 @@ impl Validator {
ledger_signal_receiver,
&rpc_subscriptions,
&poh_recorder,
Some(process_blockstore),
tower,
config.tower_storage.clone(),
&leader_schedule_cache,
exit.clone(),
@ -1257,6 +1276,21 @@ impl Validator {
repair_quic_endpoint_sender,
)?;
if in_wen_restart {
info!("Waiting for wen_restart phase one to finish");
match wait_for_wen_restart(
&config.wen_restart_proto_path.clone().unwrap(),
last_vote,
blockstore.clone(),
cluster_info.clone(),
) {
Ok(()) => {
return Err("wen_restart phase one completedy".to_string());
}
Err(e) => return Err(format!("wait_for_wen_restart failed: {e:?}")),
};
}
let tpu = Tpu::new(
&cluster_info,
&poh_recorder,

View File

@ -13,7 +13,7 @@ use {
},
};
const MAX_SLOTS_PER_ENTRY: usize = 2048 * 8;
pub const MAX_SLOTS_PER_ENTRY: usize = 2048 * 8;
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, AbiExample)]
pub struct Uncompressed {
pub first_slot: Slot,

View File

@ -68,6 +68,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
block_production_method: config.block_production_method.clone(),
generator_config: config.generator_config.clone(),
use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
wen_restart_proto_path: config.wen_restart_proto_path.clone(),
}
}

View File

@ -4831,6 +4831,7 @@ dependencies = [
"solana-version",
"solana-vote",
"solana-vote-program",
"solana-wen-restart",
"strum",
"strum_macros",
"sys-info",
@ -6436,6 +6437,25 @@ dependencies = [
"thiserror",
]
[[package]]
name = "solana-wen-restart"
version = "1.18.0"
dependencies = [
"log",
"prost",
"prost-build",
"prost-types",
"protobuf-src",
"rustc_version",
"solana-gossip",
"solana-ledger",
"solana-logger",
"solana-program",
"solana-runtime",
"solana-sdk",
"solana-vote-program",
]
[[package]]
name = "solana-zk-token-proof-program"
version = "1.18.0"

View File

@ -1382,6 +1382,35 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
.possible_values(BlockProductionMethod::cli_names())
.help(BlockProductionMethod::cli_message())
)
.arg(
Arg::with_name("wen_restart")
.long("wen-restart")
.value_name("DIR")
.takes_value(true)
.required(false)
.default_value(&default_args.wen_restart_path)
.conflicts_with("wait_for_supermajority")
.help(
"When specified, the validator will enter Wen Restart mode which
pauses normal activity. Validators in this mode will gossip their last
vote to reach consensus on a safe restart slot and repair all blocks
on the selected fork. The safe slot will be a descendant of the latest
optimistically confirmed slot to ensure we do not roll back any
optimistically confirmed slots.
The progress in this mode will be saved in the file location provided.
If consensus is reached, the validator will automatically exit and then
execute wait_for_supermajority logic so the cluster will resume execution.
The progress file will be kept around for future debugging.
After the cluster resumes normal operation, the validator arguments can
be adjusted to remove --wen_restart and update expected_shred_version to
the new shred_version agreed on in the consensus.
If wen_restart fails, refer to the progress file (in proto3 format) for
further debugging.
")
)
.args(&get_deprecated_arguments())
.after_help("The default subcommand is run")
.subcommand(
@ -1931,6 +1960,8 @@ pub struct DefaultArgs {
pub wait_for_restart_window_max_delinquent_stake: String,
pub banking_trace_dir_byte_limit: String,
pub wen_restart_path: String,
}
impl DefaultArgs {
@ -2009,6 +2040,7 @@ impl DefaultArgs {
wait_for_restart_window_min_idle_time: "10".to_string(),
wait_for_restart_window_max_delinquent_stake: "5".to_string(),
banking_trace_dir_byte_limit: BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT.to_string(),
wen_restart_path: "wen_restart_progress.proto".to_string(),
}
}
}

43
wen-restart/Cargo.toml Normal file
View File

@ -0,0 +1,43 @@
[package]
name = "solana-wen-restart"
description = "Automatic repair and restart protocol"
documentation = "https://github.com/solana-foundation/solana-improvement-documents/pull/46"
version = { workspace = true }
authors = { workspace = true }
repository = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
edition = { workspace = true }
publish = false
[dependencies]
log = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
solana-gossip = { workspace = true }
solana-ledger = { workspace = true }
solana-logger = { workspace = true }
solana-program = { workspace = true }
solana-runtime = { workspace = true }
solana-sdk = { workspace = true }
solana-vote-program = { workspace = true }
[dev-dependencies]
serial_test = { workspace = true }
solana-entry = { workspace = true }
solana-streamer = { workspace = true }
[build-dependencies]
prost-build = { workspace = true }
rustc_version = { workspace = true }
# windows users should install the protobuf compiler manually and set the PROTOC
# envar to point to the installed binary
[target."cfg(not(windows))".build-dependencies]
protobuf-src = { workspace = true }
[lib]
name = "solana_wen_restart"
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]

41
wen-restart/build.rs Normal file
View File

@ -0,0 +1,41 @@
extern crate rustc_version;
use {
rustc_version::{version_meta, Channel},
std::io::Result,
};
fn main() -> Result<()> {
const PROTOC_ENVAR: &str = "PROTOC";
if std::env::var(PROTOC_ENVAR).is_err() {
#[cfg(not(windows))]
std::env::set_var(PROTOC_ENVAR, protobuf_src::protoc());
}
// Copied and adapted from
// https://github.com/Kimundi/rustc-version-rs/blob/1d692a965f4e48a8cb72e82cda953107c0d22f47/README.md#example
// Licensed under Apache-2.0 + MIT
match version_meta().unwrap().channel {
Channel::Stable => {
println!("cargo:rustc-cfg=RUSTC_WITHOUT_SPECIALIZATION");
}
Channel::Beta => {
println!("cargo:rustc-cfg=RUSTC_WITHOUT_SPECIALIZATION");
}
Channel::Nightly => {
println!("cargo:rustc-cfg=RUSTC_WITH_SPECIALIZATION");
}
Channel::Dev => {
println!("cargo:rustc-cfg=RUSTC_WITH_SPECIALIZATION");
// See https://github.com/solana-labs/solana/issues/11055
// We may be running the custom `rust-bpf-builder` toolchain,
// which currently needs `#![feature(proc_macro_hygiene)]` to
// be applied.
println!("cargo:rustc-cfg=RUSTC_NEEDS_PROC_MACRO_HYGIENE");
}
}
// Generate rust files from protos.
prost_build::compile_protos(&["proto/wen_restart.proto"], &["proto/"])?;
Ok(())
}

View File

@ -0,0 +1,23 @@
syntax = "proto3";
package solana.wen_restart_proto;
enum State {
INIT = 0;
LAST_VOTED_FORK_SLOTS = 1;
HEAVIEST_FORK = 2;
GENERATING_SNAPSHOT = 3;
FINISHED_SNAPSHOT = 4;
WAITING_FOR_SUPERMAJORITY = 5;
DONE = 6;
}
message MyLastVotedForkSlots {
uint64 last_vote_slot = 1;
string last_vote_bankhash = 2;
uint32 shred_version = 3;
}
message WenRestartProgress {
State state = 1;
optional MyLastVotedForkSlots my_last_voted_fork_slots = 2;
}

7
wen-restart/src/lib.rs Normal file
View File

@ -0,0 +1,7 @@
pub(crate) mod solana {
pub(crate) mod wen_restart_proto {
include!(concat!(env!("OUT_DIR"), "/solana.wen_restart_proto.rs"));
}
}
pub mod wen_restart;

View File

@ -0,0 +1,152 @@
//! The `wen-restart` module handles automatic repair during a cluster restart
use {
crate::solana::wen_restart_proto::{
MyLastVotedForkSlots, State as RestartState, WenRestartProgress,
},
log::*,
prost::Message,
solana_gossip::{cluster_info::ClusterInfo, epoch_slots::MAX_SLOTS_PER_ENTRY},
solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore},
solana_vote_program::vote_state::VoteTransaction,
std::{
fs::File,
io::{Error, Write},
path::PathBuf,
sync::Arc,
},
};
pub fn wait_for_wen_restart(
wen_restart_path: &PathBuf,
last_vote: VoteTransaction,
blockstore: Arc<Blockstore>,
cluster_info: Arc<ClusterInfo>,
) -> Result<(), Box<dyn std::error::Error>> {
// repair and restart option does not work without last voted slot.
let last_vote_slot = last_vote
.last_voted_slot()
.expect("wen_restart doesn't work if local tower is wiped");
let mut last_vote_fork: Vec<u64> = AncestorIterator::new_inclusive(last_vote_slot, &blockstore)
.take(MAX_SLOTS_PER_ENTRY)
.collect();
info!(
"wen_restart last voted fork {} {:?}",
last_vote_slot, last_vote_fork
);
last_vote_fork.reverse();
// Todo(wen): add the following back in after Gossip code is checked in.
// cluster_info.push_last_voted_fork_slots(&last_voted_fork, last_vote.hash());
// The rest of the protocol will be in another PR.
let current_progress = WenRestartProgress {
state: RestartState::Init.into(),
my_last_voted_fork_slots: Some(MyLastVotedForkSlots {
last_vote_slot,
last_vote_bankhash: last_vote.hash().to_string(),
shred_version: cluster_info.my_shred_version() as u32,
}),
};
write_wen_restart_records(wen_restart_path, current_progress)?;
Ok(())
}
fn write_wen_restart_records(
records_path: &PathBuf,
new_progress: WenRestartProgress,
) -> Result<(), Error> {
// overwrite anything if exists
let mut file = File::create(records_path)?;
info!("writing new record {:?}", new_progress);
let mut buf = Vec::with_capacity(new_progress.encoded_len());
new_progress.encode(&mut buf)?;
file.write_all(&buf)?;
Ok(())
}
#[cfg(test)]
mod tests {
use {
crate::wen_restart::*,
solana_entry::entry,
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
solana_ledger::{blockstore, get_tmp_ledger_path_auto_delete},
solana_program::{hash::Hash, vote::state::Vote},
solana_sdk::{
signature::{Keypair, Signer},
timing::timestamp,
},
solana_streamer::socket::SocketAddrSpace,
std::{fs::read, sync::Arc},
};
#[test]
fn test_wen_restart_normal_flow() {
solana_logger::setup();
let node_keypair = Arc::new(Keypair::new());
let cluster_info = Arc::new(ClusterInfo::new(
{
let mut contact_info =
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp());
contact_info.set_shred_version(2);
contact_info
},
node_keypair,
SocketAddrSpace::Unspecified,
));
let ledger_path = get_tmp_ledger_path_auto_delete!();
let mut wen_restart_proto_path = ledger_path.path().to_path_buf();
wen_restart_proto_path.push("wen_restart_status.proto");
let blockstore = Arc::new(blockstore::Blockstore::open(ledger_path.path()).unwrap());
let expected_slots = 400;
let last_vote_slot = (MAX_SLOTS_PER_ENTRY + expected_slots).try_into().unwrap();
let last_parent = (MAX_SLOTS_PER_ENTRY >> 1).try_into().unwrap();
for i in 0..expected_slots {
let entries = entry::create_ticks(1, 0, Hash::default());
let parent_slot = if i > 0 {
(MAX_SLOTS_PER_ENTRY + i).try_into().unwrap()
} else {
last_parent
};
let shreds = blockstore::entries_to_test_shreds(
&entries,
(MAX_SLOTS_PER_ENTRY + i + 1).try_into().unwrap(),
parent_slot,
false,
0,
true, // merkle_variant
);
blockstore.insert_shreds(shreds, None, false).unwrap();
}
// link directly to slot 1 whose distance to last_vote > MAX_SLOTS_PER_ENTRY so it will not be included.
let entries = entry::create_ticks(1, 0, Hash::default());
let shreds = blockstore::entries_to_test_shreds(
&entries,
last_parent,
1,
false,
0,
true, // merkle_variant
);
blockstore.insert_shreds(shreds, None, false).unwrap();
let last_vote_bankhash = Hash::new_unique();
assert!(wait_for_wen_restart(
&wen_restart_proto_path,
VoteTransaction::from(Vote::new(vec![last_vote_slot], last_vote_bankhash)),
blockstore,
cluster_info
)
.is_ok());
let buffer = read(wen_restart_proto_path).unwrap();
let progress = WenRestartProgress::decode(&mut std::io::Cursor::new(buffer)).unwrap();
assert_eq!(
progress,
WenRestartProgress {
state: RestartState::Init.into(),
my_last_voted_fork_slots: Some(MyLastVotedForkSlots {
last_vote_slot,
last_vote_bankhash: last_vote_bankhash.to_string(),
shred_version: 2,
}),
}
)
}
}