From 72b11081a43b68adbd932892ff5281c58dc5902e Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Tue, 4 Feb 2020 19:50:24 -0700 Subject: [PATCH] Report validator rewards in getConfirmedBlock JSON RPC --- book/src/apps/jsonrpc-api.md | 3 ++ client/src/rpc_response.rs | 9 ++++ core/src/lib.rs | 1 + core/src/replay_stage.rs | 28 ++++++++++-- core/src/rewards_recorder_service.rs | 67 ++++++++++++++++++++++++++++ core/src/tvu.rs | 4 ++ core/src/validator.rs | 23 ++++++++++ ledger/src/blockstore.rs | 35 ++++++++++++++- ledger/src/blockstore_db.rs | 27 ++++++++--- programs/stake/src/stake_state.rs | 4 +- runtime/src/bank.rs | 59 +++++++++++++++++++----- 11 files changed, 235 insertions(+), 25 deletions(-) create mode 100644 core/src/rewards_recorder_service.rs diff --git a/book/src/apps/jsonrpc-api.md b/book/src/apps/jsonrpc-api.md index bf4d40c32..2e0d9aada 100644 --- a/book/src/apps/jsonrpc-api.md +++ b/book/src/apps/jsonrpc-api.md @@ -303,6 +303,9 @@ The result field will be an object with the following fields: * `fee: ` - fee this transaction was charged, as u64 integer * `preBalances: ` - array of u64 account balances from before the transaction was processed * `postBalances: ` - array of u64 account balances after the transaction was processed +* `rewards: ` - an array of JSON objects containing: + * `pubkey: ` - The public key, as base-58 encoded string, of the account that received the reward + * `lamports: `- number of reward lamports credited or debited by the account, as a i64 #### Example: diff --git a/client/src/rpc_response.rs b/client/src/rpc_response.rs index 17979f8b7..c413a6c76 100644 --- a/client/src/rpc_response.rs +++ b/client/src/rpc_response.rs @@ -32,6 +32,14 @@ pub struct RpcBlockCommitment { pub total_stake: u64, } +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub struct RpcReward { + pub pubkey: String, + pub lamports: i64, +} + +pub type RpcRewards = Vec; + #[derive(Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct RpcConfirmedBlock { @@ -39,6 +47,7 @@ pub struct RpcConfirmedBlock { pub blockhash: String, pub parent_slot: Slot, pub transactions: Vec, + pub rewards: RpcRewards, } #[derive(Debug, PartialEq, Serialize, Deserialize)] diff --git a/core/src/lib.rs b/core/src/lib.rs index e8dfaf8f9..d0bd3b53f 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -36,6 +36,7 @@ pub mod repair_service; pub mod replay_stage; mod result; pub mod retransmit_stage; +pub mod rewards_recorder_service; pub mod rpc; pub mod rpc_pubsub; pub mod rpc_pubsub_service; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 5b022dfa6..514f0822f 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -6,6 +6,7 @@ use crate::{ consensus::{StakeLockout, Tower}, poh_recorder::PohRecorder, result::Result, + rewards_recorder_service::RewardsRecorderSender, rpc_subscriptions::RpcSubscriptions, }; use solana_ledger::{ @@ -77,6 +78,7 @@ pub struct ReplayStageConfig { pub snapshot_package_sender: Option, pub block_commitment_cache: Arc>, pub transaction_status_sender: Option, + pub rewards_sender: Option, } pub struct ReplayStage { @@ -179,6 +181,7 @@ impl ReplayStage { snapshot_package_sender, block_commitment_cache, transaction_status_sender, + rewards_sender, } = config; let (root_bank_sender, root_bank_receiver) = channel(); @@ -219,6 +222,7 @@ impl ReplayStage { &bank_forks, &leader_schedule_cache, &subscriptions, + rewards_sender.clone(), ); datapoint_debug!( "replay_stage-memory", @@ -395,6 +399,7 @@ impl ReplayStage { &poh_recorder, &leader_schedule_cache, &subscriptions, + rewards_sender.clone(), ); if let Some(bank) = poh_recorder.lock().unwrap().bank() { @@ -468,6 +473,7 @@ impl ReplayStage { poh_recorder: &Arc>, leader_schedule_cache: &Arc, subscriptions: &Arc, + rewards_sender: Option, ) { // all the individual calls to poh_recorder.lock() are designed to // increase granularity, decrease contention @@ -533,6 +539,7 @@ impl ReplayStage { .unwrap() .insert(Bank::new_from_parent(&parent, my_pubkey, poh_slot)); + Self::record_rewards(&tpu_bank, &rewards_sender); poh_recorder.lock().unwrap().set_bank(&tpu_bank); } else { error!("{} No next leader found", my_pubkey); @@ -976,6 +983,7 @@ impl ReplayStage { forks_lock: &RwLock, leader_schedule_cache: &Arc, subscriptions: &Arc, + rewards_sender: Option, ) { // Find the next slot that chains to the old slot let forks = forks_lock.read().unwrap(); @@ -1011,10 +1019,10 @@ impl ReplayStage { forks.root() ); subscriptions.notify_slot(child_slot, parent_slot, forks.root()); - new_banks.insert( - child_slot, - Bank::new_from_parent(&parent_bank, &leader, child_slot), - ); + + let child_bank = Bank::new_from_parent(&parent_bank, &leader, child_slot); + Self::record_rewards(&child_bank, &rewards_sender); + new_banks.insert(child_slot, child_bank); } } drop(forks); @@ -1025,6 +1033,16 @@ impl ReplayStage { } } + fn record_rewards(bank: &Bank, rewards_sender: &Option) { + if let Some(rewards_sender) = rewards_sender { + if let Some(ref rewards) = bank.rewards { + rewards_sender + .send((bank.slot(), rewards.iter().copied().collect())) + .unwrap_or_else(|err| warn!("rewards_sender failed: {:?}", err)); + } + } + } + pub fn join(self) -> thread::Result<()> { self.commitment_service.join()?; self.t_replay.join().map(|_| ()) @@ -1382,6 +1400,7 @@ pub(crate) mod tests { &bank_forks, &leader_schedule_cache, &subscriptions, + None, ); assert!(bank_forks.read().unwrap().get(1).is_some()); @@ -1394,6 +1413,7 @@ pub(crate) mod tests { &bank_forks, &leader_schedule_cache, &subscriptions, + None, ); assert!(bank_forks.read().unwrap().get(1).is_some()); assert!(bank_forks.read().unwrap().get(2).is_some()); diff --git a/core/src/rewards_recorder_service.rs b/core/src/rewards_recorder_service.rs new file mode 100644 index 000000000..525709052 --- /dev/null +++ b/core/src/rewards_recorder_service.rs @@ -0,0 +1,67 @@ +use crossbeam_channel::{Receiver, RecvTimeoutError, Sender}; +use solana_client::rpc_response::RpcReward; +use solana_ledger::blockstore::Blockstore; +use solana_sdk::{clock::Slot, pubkey::Pubkey}; +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, Builder, JoinHandle}, + time::Duration, +}; + +pub type RewardsRecorderReceiver = Receiver<(Slot, Vec<(Pubkey, i64)>)>; +pub type RewardsRecorderSender = Sender<(Slot, Vec<(Pubkey, i64)>)>; + +pub struct RewardsRecorderService { + thread_hdl: JoinHandle<()>, +} + +impl RewardsRecorderService { + #[allow(clippy::new_ret_no_self)] + pub fn new( + rewards_receiver: RewardsRecorderReceiver, + blockstore: Arc, + exit: &Arc, + ) -> Self { + let exit = exit.clone(); + let thread_hdl = Builder::new() + .name("solana-rewards-writer".to_string()) + .spawn(move || loop { + if exit.load(Ordering::Relaxed) { + break; + } + if let Err(RecvTimeoutError::Disconnected) = + Self::write_rewards(&rewards_receiver, &blockstore) + { + break; + } + }) + .unwrap(); + Self { thread_hdl } + } + + fn write_rewards( + rewards_receiver: &RewardsRecorderReceiver, + blockstore: &Arc, + ) -> Result<(), RecvTimeoutError> { + let (slot, rewards) = rewards_receiver.recv_timeout(Duration::from_secs(1))?; + let rpc_rewards = rewards + .into_iter() + .map(|(pubkey, lamports)| RpcReward { + pubkey: pubkey.to_string(), + lamports, + }) + .collect(); + + blockstore + .write_rewards(slot, rpc_rewards) + .expect("Expect database write to succeed"); + Ok(()) + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 8c9dfbf74..e7510d3af 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -9,6 +9,7 @@ use crate::{ poh_recorder::PohRecorder, replay_stage::{ReplayStage, ReplayStageConfig}, retransmit_stage::RetransmitStage, + rewards_recorder_service::RewardsRecorderSender, rpc_subscriptions::RpcSubscriptions, shred_fetch_stage::ShredFetchStage, sigverify_shreds::ShredSigVerifier, @@ -86,6 +87,7 @@ impl Tvu { cfg: Option>, shred_version: u16, transaction_status_sender: Option, + rewards_sender: Option, ) -> Self { let keypair: Arc = cluster_info .read() @@ -170,6 +172,7 @@ impl Tvu { snapshot_package_sender, block_commitment_cache, transaction_status_sender, + rewards_sender, }; let (replay_stage, root_bank_receiver) = ReplayStage::new( @@ -312,6 +315,7 @@ pub mod tests { None, 0, None, + None, ); exit.store(true, Ordering::Relaxed); tvu.join().unwrap(); diff --git a/core/src/validator.rs b/core/src/validator.rs index 90ef99897..30b667033 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -8,6 +8,7 @@ use crate::{ gossip_service::{discover_cluster, GossipService}, poh_recorder::PohRecorder, poh_service::PohService, + rewards_recorder_service::RewardsRecorderService, rpc::JsonRpcConfig, rpc_pubsub_service::PubSubService, rpc_service::JsonRpcService, @@ -122,6 +123,7 @@ pub struct Validator { validator_exit: Arc>>, rpc_service: Option<(JsonRpcService, PubSubService)>, transaction_status_service: Option, + rewards_recorder_service: Option, gossip_service: GossipService, serve_repair_service: ServeRepairService, poh_recorder: Arc>, @@ -268,6 +270,21 @@ impl Validator { (None, None) }; + let (rewards_sender, rewards_recorder_service) = + if rpc_service.is_some() && !config.transaction_status_service_disabled { + let (rewards_sender, rewards_receiver) = unbounded(); + ( + Some(rewards_sender), + Some(RewardsRecorderService::new( + rewards_receiver, + blockstore.clone(), + &exit, + )), + ) + } else { + (None, None) + }; + info!( "Starting PoH: epoch={} slot={} tick_height={} blockhash={} leader={:?}", bank.epoch(), @@ -388,6 +405,7 @@ impl Validator { config.enable_partition.clone(), node.info.shred_version, transaction_status_sender.clone(), + rewards_sender, ); if config.dev_sigverify_disabled { @@ -416,6 +434,7 @@ impl Validator { serve_repair_service, rpc_service, transaction_status_service, + rewards_recorder_service, tpu, tvu, poh_service, @@ -473,6 +492,10 @@ impl Validator { transaction_status_service.join()?; } + if let Some(rewards_recorder_service) = self.rewards_recorder_service { + rewards_recorder_service.join()?; + } + self.gossip_service.join()?; self.serve_repair_service.join()?; self.tpu.join()?; diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index d36f3c30d..8d657e073 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -22,8 +22,8 @@ use rayon::{ }; use rocksdb::DBRawIterator; use solana_client::rpc_response::{ - RpcConfirmedBlock, RpcEncodedTransaction, RpcTransactionEncoding, RpcTransactionStatus, - RpcTransactionWithStatusMeta, + RpcConfirmedBlock, RpcEncodedTransaction, RpcRewards, RpcTransactionEncoding, + RpcTransactionStatus, RpcTransactionWithStatusMeta, }; use solana_measure::measure::Measure; use solana_metrics::{datapoint_debug, datapoint_error}; @@ -86,6 +86,7 @@ pub struct Blockstore { data_shred_cf: LedgerColumn, code_shred_cf: LedgerColumn, transaction_status_cf: LedgerColumn, + rewards_cf: LedgerColumn, last_root: Arc>, insert_shreds_lock: Arc>, pub new_shreds_signals: Vec>, @@ -195,6 +196,7 @@ impl Blockstore { let data_shred_cf = db.column(); let code_shred_cf = db.column(); let transaction_status_cf = db.column(); + let rewards_cf = db.column(); let db = Arc::new(db); @@ -219,6 +221,7 @@ impl Blockstore { data_shred_cf, code_shred_cf, transaction_status_cf, + rewards_cf, new_shreds_signals: vec![], completed_slots_senders: vec![], insert_shreds_lock: Arc::new(Mutex::new(())), @@ -346,6 +349,10 @@ impl Blockstore { & self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .unwrap_or(false) + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) .unwrap_or(false); if let Err(e) = self.db.write(write_batch) { error!( @@ -398,6 +405,10 @@ impl Blockstore { && self .transaction_status_cf .compact_range(from_slot, to_slot) + .unwrap_or(false) + && self + .rewards_cf + .compact_range(from_slot, to_slot) .unwrap_or(false); Ok(result) } @@ -1396,6 +1407,12 @@ impl Blockstore { let blockhash = get_last_hash(slot_entries.iter()) .unwrap_or_else(|| panic!("Rooted slot {:?} must have blockhash", slot)); + let rewards = self + .rewards_cf + .get(slot) + .expect("Expect rewards get to succeed") + .unwrap_or_else(|| vec![]); + let block = RpcConfirmedBlock { previous_blockhash: previous_blockhash.to_string(), blockhash: blockhash.to_string(), @@ -1405,6 +1422,7 @@ impl Blockstore { encoding, slot_transaction_iterator, ), + rewards, }; return Ok(block); } @@ -1442,6 +1460,10 @@ impl Blockstore { self.transaction_status_cf.put(index, status) } + pub fn write_rewards(&self, index: Slot, rewards: RpcRewards) -> Result<()> { + self.rewards_cf.put(index, &rewards) + } + fn get_block_timestamps(&self, slot: Slot) -> Result> { let slot_entries = self.get_slot_entries(slot, 0, None)?; Ok(slot_entries @@ -2574,6 +2596,13 @@ pub mod tests { .unwrap() .next() .map(|((slot, _), _)| slot >= min_slot) + .unwrap_or(true) + & blockstore + .db + .iter::(IteratorMode::Start) + .unwrap() + .next() + .map(|(slot, _)| slot >= min_slot) .unwrap_or(true); assert!(condition_met); } @@ -4826,6 +4855,7 @@ pub mod tests { parent_slot: slot - 1, blockhash: blockhash.to_string(), previous_blockhash: Hash::default().to_string(), + rewards: vec![], }; // The previous_blockhash of `expected_block` is default because its parent slot is a // root, but empty of entries. This is special handling for snapshot root slots. @@ -4846,6 +4876,7 @@ pub mod tests { parent_slot: slot, blockhash: blockhash.to_string(), previous_blockhash: blockhash.to_string(), + rewards: vec![], }; assert_eq!(confirmed_block, expected_block); diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 06ba48909..5f278c42e 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -10,7 +10,7 @@ use rocksdb::{ }; use serde::de::DeserializeOwned; use serde::Serialize; -use solana_client::rpc_response::RpcTransactionStatus; +use solana_client::rpc_response::{RpcRewards, RpcTransactionStatus}; use solana_sdk::{clock::Slot, signature::Signature}; use std::{collections::HashMap, fs, marker::PhantomData, path::Path, sync::Arc}; use thiserror::Error; @@ -38,6 +38,8 @@ const DATA_SHRED_CF: &str = "data_shred"; const CODE_SHRED_CF: &str = "code_shred"; /// Column family for Transaction Status const TRANSACTION_STATUS_CF: &str = "transaction_status"; +/// Column family for Rewards +const REWARDS_CF: &str = "rewards"; #[derive(Error, Debug)] pub enum BlockstoreError { @@ -105,6 +107,10 @@ pub mod columns { #[derive(Debug)] /// The transaction status column pub struct TransactionStatus; + + #[derive(Debug)] + /// The rewards column + pub struct Rewards; } #[derive(Debug)] @@ -113,8 +119,8 @@ struct Rocks(rocksdb::DB); impl Rocks { fn open(path: &Path) -> Result { use columns::{ - DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Root, ShredCode, ShredData, - SlotMeta, TransactionStatus, + DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Rewards, Root, ShredCode, + ShredData, SlotMeta, TransactionStatus, }; fs::create_dir_all(&path)?; @@ -139,6 +145,7 @@ impl Rocks { ColumnFamilyDescriptor::new(ShredCode::NAME, get_cf_options()); let transaction_status_cf_descriptor = ColumnFamilyDescriptor::new(TransactionStatus::NAME, get_cf_options()); + let rewards_cf_descriptor = ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options()); let cfs = vec![ meta_cf_descriptor, @@ -151,6 +158,7 @@ impl Rocks { shred_data_cf_descriptor, shred_code_cf_descriptor, transaction_status_cf_descriptor, + rewards_cf_descriptor, ]; // Open the database @@ -161,8 +169,8 @@ impl Rocks { fn columns(&self) -> Vec<&'static str> { use columns::{ - DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Root, ShredCode, ShredData, - SlotMeta, TransactionStatus, + DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Rewards, Root, ShredCode, + ShredData, SlotMeta, TransactionStatus, }; vec![ @@ -176,6 +184,7 @@ impl Rocks { ShredData::NAME, ShredCode::NAME, TransactionStatus::NAME, + Rewards::NAME, ] } @@ -316,6 +325,14 @@ impl ColumnName for columns::TransactionStatus { const NAME: &'static str = TRANSACTION_STATUS_CF; } +impl SlotColumn for columns::Rewards {} +impl ColumnName for columns::Rewards { + const NAME: &'static str = REWARDS_CF; +} +impl TypedColumn for columns::Rewards { + type Type = RpcRewards; +} + impl Column for columns::ShredCode { type Index = (u64, u64); diff --git a/programs/stake/src/stake_state.rs b/programs/stake/src/stake_state.rs index 538eb20cc..53304c2bc 100644 --- a/programs/stake/src/stake_state.rs +++ b/programs/stake/src/stake_state.rs @@ -776,7 +776,7 @@ pub fn redeem_rewards( vote_account: &mut Account, point_value: f64, stake_history: Option<&StakeHistory>, -) -> Result { +) -> Result<(u64, u64), InstructionError> { if let StakeState::Stake(meta, mut stake) = stake_account.state()? { let vote_state = vote_account.state()?; @@ -788,7 +788,7 @@ pub fn redeem_rewards( stake_account.set_state(&StakeState::Stake(meta, stake))?; - Ok(stakers_reward + voters_reward) + Ok((stakers_reward, voters_reward)) } else { Err(StakeError::NoCreditsToRedeem.into()) } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 7df80851f..99ae021f7 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -333,6 +333,10 @@ pub struct Bank { /// Last time when the cluster info vote listener has synced with this bank #[serde(skip)] pub last_vote_sync: AtomicU64, + + /// Rewards that were paid out immediately after this bank was created + #[serde(skip)] + pub rewards: Option>, } impl Default for BlockhashQueue { @@ -429,6 +433,7 @@ impl Bank { entered_epoch_callback: parent.entered_epoch_callback.clone(), hard_forks: parent.hard_forks.clone(), last_vote_sync: AtomicU64::new(parent.last_vote_sync.load(Ordering::Relaxed)), + rewards: None, }; datapoint_debug!( @@ -614,15 +619,16 @@ impl Bank { // years_elapsed = slots_elapsed / slots/year let period = self.epoch_schedule.get_slots_in_epoch(epoch) as f64 / self.slots_per_year; - let inflation = self.inflation.read().unwrap(); + let (validator_rewards, storage_rewards) = { + let inflation = self.inflation.read().unwrap(); - let validator_rewards = - (*inflation).validator(year) * self.capitalization() as f64 * period; + ( + (*inflation).validator(year) * self.capitalization() as f64 * period, + (*inflation).storage(year) * self.capitalization() as f64 * period, + ) + }; let validator_points = self.stakes.write().unwrap().claim_points(); - - let storage_rewards = (*inflation).storage(year) * self.capitalization() as f64 * period; - let storage_points = self.storage_accounts.write().unwrap().claim_points(); let (validator_point_value, storage_point_value) = self.check_point_values( @@ -634,7 +640,6 @@ impl Bank { }); let validator_rewards = self.pay_validator_rewards(validator_point_value); - self.capitalization.fetch_add( validator_rewards + storage_rewards as u64, Ordering::Relaxed, @@ -643,9 +648,12 @@ impl Bank { /// iterate over all stakes, redeem vote credits for each stake we can /// successfully load and parse, return total payout - fn pay_validator_rewards(&self, point_value: f64) -> u64 { + fn pay_validator_rewards(&mut self, point_value: f64) -> u64 { let stake_history = self.stakes.read().unwrap().history().clone(); - self.stake_delegations() + let mut validator_rewards = HashMap::new(); + + let total_validator_rewards = self + .stake_delegations() .iter() .map(|(stake_pubkey, delegation)| { match ( @@ -659,10 +667,22 @@ impl Bank { point_value, Some(&stake_history), ); - if let Ok(rewards) = rewards { + if let Ok((stakers_reward, voters_reward)) = rewards { self.store_account(&stake_pubkey, &stake_account); self.store_account(&delegation.voter_pubkey, &vote_account); - rewards + + if voters_reward > 0 { + *validator_rewards + .entry(delegation.voter_pubkey) + .or_insert(0i64) += voters_reward as i64; + } + + if stakers_reward > 0 { + *validator_rewards.entry(*stake_pubkey).or_insert(0i64) += + stakers_reward as i64; + } + + stakers_reward + voters_reward } else { debug!( "stake_state::redeem_rewards() failed for {}: {:?}", @@ -674,7 +694,11 @@ impl Bank { (_, _) => 0, } }) - .sum() + .sum(); + + assert_eq!(self.rewards, None); + self.rewards = Some(validator_rewards.drain().collect()); + total_validator_rewards } pub fn update_recent_blockhashes(&self) { @@ -3003,6 +3027,7 @@ mod tests { ..GenesisConfig::default() })); assert_eq!(bank.capitalization(), 42 * 1_000_000_000); + assert_eq!(bank.rewards, None); let ((vote_id, mut vote_account), (stake_id, stake_account)) = crate::stakes::tests::create_staked_node_accounts(1_0000); @@ -3061,6 +3086,16 @@ mod tests { .abs() < 1.0 // rounding, truncating ); + + // verify validator rewards show up in bank1.rewards vector + // (currently storage rewards will not show up) + assert_eq!( + bank1.rewards, + Some(vec![( + stake_id, + (rewards.validator_point_value * validator_points as f64) as i64 + )]) + ); } // Test that purging 0 lamports accounts works.