Enable cluster tests (#3372)

* Cluster tests

* stable!

* fixup! stable!

* fixup! fixup! stable!

* fixup! fixup! fixup! stable!

* fixup! fixup! fixup! fixup! stable!

* fixed space

* add getNumBlocksSinceSignatureConfirmation entry for the json rpc docs

* Check in upcoming epochs for potential leadership slots in next_leader_slot()
This commit is contained in:
anatoly yakovenko 2019-03-21 07:43:21 -07:00 committed by GitHub
parent 402a733cd7
commit 148e08a8a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 312 additions and 54 deletions

View File

@ -26,6 +26,7 @@ Methods
* [getBalance](#getbalance)
* [getRecentBlockhash](#getrecentblockhash)
* [getSignatureStatus](#getsignaturestatus)
* [getNumBlocksSinceSignatureConfirmation](#getnumblockssincesignatureconfirmation)
* [getTransactionCount](#gettransactioncount)
* [requestAirdrop](#requestairdrop)
* [sendTransaction](#sendtransaction)
@ -185,6 +186,27 @@ curl -X POST -H "Content-Type: application/json" -d '{"jsonrpc":"2.0", "id":1, "
```
---
### getNumBlocksSinceSignatureConfirmation
Returns the current number of blocks since signature has been confirmed.
##### Parameters:
* `string` - Signature of Transaction to confirm, as base-58 encoded string
##### Results:
* `integer` - count, as unsigned 64-bit integer
##### Example:
```bash
// Request
curl -X POST -H "Content-Type: application/json" -d '{"jsonrpc":"2.0", "id":1, "method":"getNumBlocksSinceSignatureConfirmation", "params":["5VERv8NMvzbJMEkV8xnrLkEaWRtSz9CosKDYjCJjBRnbJLgp8uirBgmQpjKhoR4tjF3ZpRzrFmBV6UjKdiSZkQUW"]}' http://localhost:8899
// Result
{"jsonrpc":"2.0","result":8,"id":1}
```
---
### getTransactionCount
Returns the current Transaction count from the ledger

View File

@ -438,6 +438,78 @@ impl RpcClient {
};
}
}
/// Poll the server to confirm a transaction.
pub fn poll_for_signature_confirmation(
&self,
signature: &Signature,
min_confirmed_blocks: usize,
) -> io::Result<()> {
let mut now = Instant::now();
let mut confirmed_blocks = 0;
loop {
let response = self.get_num_blocks_since_signature_confirmation(signature);
match response {
Ok(count) => {
if confirmed_blocks != count {
info!(
"signature {} confirmed {} out of {}",
signature, count, min_confirmed_blocks
);
now = Instant::now();
confirmed_blocks = count;
}
if count >= min_confirmed_blocks {
break;
}
}
Err(err) => {
debug!("check_confirmations request failed: {:?}", err);
}
};
if now.elapsed().as_secs() > 15 {
// TODO: Return a better error.
return Err(io::Error::new(io::ErrorKind::Other, "signature not found"));
}
sleep(Duration::from_millis(250));
}
Ok(())
}
pub fn get_num_blocks_since_signature_confirmation(
&self,
sig: &Signature,
) -> io::Result<usize> {
let params = json!([format!("{}", sig)]);
let response = self
.client
.send(
&RpcRequest::GetNumBlocksSinceSignatureConfirmation,
Some(params.clone()),
1,
)
.map_err(|error| {
debug!(
"Response get_num_blocks_since_signature_confirmation: {}",
error
);
io::Error::new(
io::ErrorKind::Other,
"GetNumBlocksSinceSignatureConfirmation request failure",
)
})?;
serde_json::from_value(response).map_err(|error| {
debug!(
"ParseError: get_num_blocks_since_signature_confirmation: {}",
error
);
io::Error::new(
io::ErrorKind::Other,
"GetNumBlocksSinceSignatureConfirmation parse failure",
)
})
}
pub fn fullnode_exit(&self) -> io::Result<bool> {
let response = self
.client

View File

@ -18,6 +18,7 @@ pub enum RpcRequest {
GetStorageEntryHeight,
GetStoragePubkeysForEntryHeight,
FullnodeExit,
GetNumBlocksSinceSignatureConfirmation,
}
impl RpcRequest {
@ -39,6 +40,9 @@ impl RpcRequest {
RpcRequest::GetStorageEntryHeight => "getStorageEntryHeight",
RpcRequest::GetStoragePubkeysForEntryHeight => "getStoragePubkeysForEntryHeight",
RpcRequest::FullnodeExit => "fullnodeExit",
RpcRequest::GetNumBlocksSinceSignatureConfirmation => {
"getNumBlocksSinceSignatureConfirmation"
}
};
let mut request = json!({
"jsonrpc": jsonrpc,

View File

@ -73,6 +73,36 @@ impl ThinClient {
Ok(transaction.signatures[0])
}
/// Retry a sending a signed Transaction to the server for processing.
pub fn retry_transfer_until_confirmed(
&self,
keypair: &Keypair,
transaction: &mut Transaction,
tries: usize,
min_confirmed_blocks: usize,
) -> io::Result<Signature> {
for x in 0..tries {
transaction.sign(&[keypair], self.get_recent_blockhash()?);
let mut buf = vec![0; transaction.serialized_size().unwrap() as usize];
let mut wr = std::io::Cursor::new(&mut buf[..]);
serialize_into(&mut wr, &transaction)
.expect("serialize Transaction in pub fn transfer_signed");
self.transactions_socket
.send_to(&buf[..], &self.transactions_addr)?;
if self
.poll_for_signature_confirmation(&transaction.signatures[0], min_confirmed_blocks)
.is_ok()
{
return Ok(transaction.signatures[0]);
}
info!("{} tries failed transfer to {}", x, self.transactions_addr);
}
Err(io::Error::new(
io::ErrorKind::Other,
"retry_transfer failed",
))
}
/// Retry a sending a signed Transaction to the server for processing.
pub fn retry_transfer(
&self,
@ -140,7 +170,18 @@ impl ThinClient {
pub fn poll_for_signature(&self, signature: &Signature) -> io::Result<()> {
self.rpc_client.poll_for_signature(signature)
}
/// Poll the server until the signature has been confirmed by at least `min_confirmed_blocks`
pub fn poll_for_signature_confirmation(
&self,
signature: &Signature,
min_confirmed_blocks: usize,
) -> io::Result<()> {
self.rpc_client
.poll_for_signature_confirmation(signature, min_confirmed_blocks)
}
/// Check a signature in the bank. This method blocks
/// until the server sends a response.
pub fn check_signature(&self, signature: &Signature) -> bool {
self.rpc_client.check_signature(signature)
}
@ -148,6 +189,13 @@ impl ThinClient {
pub fn fullnode_exit(&self) -> io::Result<bool> {
self.rpc_client.fullnode_exit()
}
pub fn get_num_blocks_since_signature_confirmation(
&mut self,
sig: &Signature,
) -> io::Result<usize> {
self.rpc_client
.get_num_blocks_since_signature_confirmation(sig)
}
}
pub fn create_client((rpc, tpu): (SocketAddr, SocketAddr), range: (u16, u16)) -> ThinClient {

View File

@ -7,11 +7,14 @@ use crate::cluster_info::FULLNODE_PORT_RANGE;
use crate::contact_info::ContactInfo;
use crate::entry::{Entry, EntrySlice};
use crate::gossip_service::discover;
use crate::locktower::VOTE_THRESHOLD_DEPTH;
use solana_client::thin_client::create_client;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
use solana_sdk::system_transaction::SystemTransaction;
use solana_sdk::timing::{DEFAULT_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT, NUM_TICKS_PER_SECOND};
use solana_sdk::timing::{
DEFAULT_TICKS_PER_SLOT, NUM_CONSECUTIVE_LEADER_SLOTS, NUM_TICKS_PER_SECOND,
};
use std::io;
use std::thread::sleep;
use std::time::Duration;
@ -40,12 +43,13 @@ pub fn spend_and_verify_all_nodes(
client.get_recent_blockhash().unwrap(),
0,
);
let confs = VOTE_THRESHOLD_DEPTH + 1;
let sig = client
.retry_transfer(&funding_keypair, &mut transaction, 5)
.retry_transfer_until_confirmed(&funding_keypair, &mut transaction, 5, confs)
.unwrap();
for validator in &cluster_nodes {
let client = create_client(validator.client_facing_addr(), FULLNODE_PORT_RANGE);
client.poll_for_signature(&sig).unwrap();
client.poll_for_signature_confirmation(&sig, confs).unwrap();
}
}
}
@ -127,14 +131,18 @@ pub fn kill_entry_and_spend_and_verify_rest(
let cluster_nodes = discover(&entry_point_info.gossip, nodes).unwrap();
assert!(cluster_nodes.len() >= nodes);
let client = create_client(entry_point_info.client_facing_addr(), FULLNODE_PORT_RANGE);
info!("sleeping for an epoch");
sleep(Duration::from_millis(SLOT_MILLIS * DEFAULT_SLOTS_PER_EPOCH));
info!("done sleeping for an epoch");
info!("sleeping for 2 leader fortnights");
sleep(Duration::from_millis(
SLOT_MILLIS * NUM_CONSECUTIVE_LEADER_SLOTS * 2,
));
info!("done sleeping for 2 fortnights");
info!("killing entry point");
assert!(client.fullnode_exit().unwrap());
info!("sleeping for a slot");
sleep(Duration::from_millis(SLOT_MILLIS));
info!("done sleeping for a slot");
info!("sleeping for 2 leader fortnights");
sleep(Duration::from_millis(
SLOT_MILLIS * NUM_CONSECUTIVE_LEADER_SLOTS,
));
info!("done sleeping for 2 fortnights");
for ingress_node in &cluster_nodes {
if ingress_node.id == entry_point_info.id {
continue;
@ -163,8 +171,15 @@ pub fn kill_entry_and_spend_and_verify_rest(
0,
);
let confs = VOTE_THRESHOLD_DEPTH + 1;
let sig = {
match client.retry_transfer(&funding_keypair, &mut transaction, 5) {
let sig = client.retry_transfer_until_confirmed(
&funding_keypair,
&mut transaction,
5,
confs,
);
match sig {
Err(e) => {
result = Err(e);
continue;
@ -174,7 +189,7 @@ pub fn kill_entry_and_spend_and_verify_rest(
}
};
match poll_all_nodes_for_signature(&entry_point_info, &cluster_nodes, &sig) {
match poll_all_nodes_for_signature(&entry_point_info, &cluster_nodes, &sig, confs) {
Err(e) => {
result = Err(e);
}
@ -190,13 +205,14 @@ fn poll_all_nodes_for_signature(
entry_point_info: &ContactInfo,
cluster_nodes: &[ContactInfo],
sig: &Signature,
confs: usize,
) -> io::Result<()> {
for validator in cluster_nodes {
if validator.id == entry_point_info.id {
continue;
}
let client = create_client(validator.client_facing_addr(), FULLNODE_PORT_RANGE);
client.poll_for_signature(&sig)?;
client.poll_for_signature_confirmation(&sig, confs)?;
}
Ok(())

View File

@ -44,10 +44,9 @@ pub fn slot_leader_at(slot: u64, bank: &Bank) -> Option<Pubkey> {
}
/// Return the next slot after the given current_slot that the given node will be leader
pub fn next_leader_slot(pubkey: &Pubkey, current_slot: u64, bank: &Bank) -> Option<u64> {
let (epoch, slot_index) = bank.get_epoch_and_slot_index(current_slot + 1);
if let Some(leader_schedule) = leader_schedule(epoch, bank) {
pub fn next_leader_slot(pubkey: &Pubkey, mut current_slot: u64, bank: &Bank) -> Option<u64> {
let (mut epoch, mut start_index) = bank.get_epoch_and_slot_index(current_slot + 1);
while let Some(leader_schedule) = leader_schedule(epoch, bank) {
// clippy thinks I should do this:
// for (i, <item>) in leader_schedule
// .iter()
@ -57,11 +56,15 @@ pub fn next_leader_slot(pubkey: &Pubkey, current_slot: u64, bank: &Bank) -> Opti
//
// but leader_schedule doesn't implement Iter...
#[allow(clippy::needless_range_loop)]
for i in slot_index..bank.get_slots_in_epoch(epoch) {
for i in start_index..bank.get_slots_in_epoch(epoch) {
current_slot += 1;
if *pubkey == leader_schedule[i] {
return Some(current_slot + 1 + (i - slot_index) as u64);
return Some(current_slot);
}
}
epoch += 1;
start_index = 0;
}
None
}
@ -80,8 +83,10 @@ pub fn tick_height_to_slot(ticks_per_slot: u64, tick_height: u64) -> u64 {
mod tests {
use super::*;
use crate::staking_utils;
use crate::voting_keypair::tests::new_vote_account_with_delegate;
use solana_sdk::genesis_block::{GenesisBlock, BOOTSTRAP_LEADER_LAMPORTS};
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::sync::Arc;
#[test]
fn test_next_leader_slot() {
@ -117,6 +122,58 @@ mod tests {
);
}
#[test]
fn test_next_leader_slot_next_epoch() {
let pubkey = Keypair::new().pubkey();
let (mut genesis_block, mint_keypair) = GenesisBlock::new_with_leader(
2 * BOOTSTRAP_LEADER_LAMPORTS,
&pubkey,
BOOTSTRAP_LEADER_LAMPORTS,
);
genesis_block.epoch_warmup = false;
let bank = Bank::new(&genesis_block);
let delegate_id = Keypair::new().pubkey();
// Create new vote account
let new_voting_keypair = Keypair::new();
new_vote_account_with_delegate(
&mint_keypair,
&new_voting_keypair,
&delegate_id,
&bank,
BOOTSTRAP_LEADER_LAMPORTS,
);
// Have to wait until the epoch at after the epoch stakes generated at genesis
// for the new votes to take effect.
let mut target_slot = 1;
let epoch = bank.get_stakers_epoch(0);
while bank.get_stakers_epoch(target_slot) == epoch {
target_slot += 1;
}
let bank = Bank::new_from_parent(&Arc::new(bank), &Pubkey::default(), target_slot);
let mut expected_slot = 0;
let epoch = bank.get_stakers_epoch(target_slot);
for i in 0..epoch {
expected_slot += bank.get_slots_in_epoch(i);
}
let schedule = leader_schedule(epoch, &bank).unwrap();
let mut index = 0;
while schedule[index] != delegate_id {
index += 1
}
expected_slot += index;
assert_eq!(
next_leader_slot(&delegate_id, 0, &bank),
Some(expected_slot)
);
}
#[test]
fn test_leader_schedule_via_bank() {
let pubkey = Keypair::new().pubkey();

View File

@ -162,7 +162,7 @@ impl ReplayStage {
.filter(|(b, stake_lockouts)| {
let vote_threshold =
locktower.check_vote_stake_threshold(b.slot(), &stake_lockouts);
trace!("bank vote_threshold: {} {}", b.slot(), vote_threshold);
debug!("bank vote_threshold: {} {}", b.slot(), vote_threshold);
vote_threshold
})
.map(|(b, stake_lockouts)| {

View File

@ -80,7 +80,20 @@ impl JsonRpcRequestProcessor {
}
pub fn get_signature_status(&self, signature: Signature) -> Option<bank::Result<()>> {
self.bank().get_signature_status(&signature)
self.get_signature_confirmation_status(signature)
.map(|x| x.1)
}
pub fn get_signature_confirmations(&self, signature: Signature) -> Option<usize> {
self.get_signature_confirmation_status(signature)
.map(|x| x.0)
}
pub fn get_signature_confirmation_status(
&self,
signature: Signature,
) -> Option<(usize, bank::Result<()>)> {
self.bank().get_signature_confirmation_status(&signature)
}
fn get_transaction_count(&self) -> Result<u64> {
@ -202,6 +215,20 @@ pub trait RpcSol {
#[rpc(meta, name = "fullnodeExit")]
fn fullnode_exit(&self, _: Self::Metadata) -> Result<bool>;
#[rpc(meta, name = "getNumBlocksSinceSignatureConfirmation")]
fn get_num_blocks_since_signature_confirmation(
&self,
_: Self::Metadata,
_: String,
) -> Result<usize>;
#[rpc(meta, name = "getSignatureConfirmation")]
fn get_signature_confirmation(
&self,
_: Self::Metadata,
_: String,
) -> Result<(usize, RpcSignatureStatus)>;
}
pub struct RpcSolImpl;
@ -239,19 +266,33 @@ impl RpcSol for RpcSolImpl {
}
fn get_signature_status(&self, meta: Self::Metadata, id: String) -> Result<RpcSignatureStatus> {
info!("get_signature_status rpc request received: {:?}", id);
self.get_signature_confirmation(meta, id).map(|x| x.1)
}
fn get_num_blocks_since_signature_confirmation(
&self,
meta: Self::Metadata,
id: String,
) -> Result<usize> {
self.get_signature_confirmation(meta, id).map(|x| x.0)
}
fn get_signature_confirmation(
&self,
meta: Self::Metadata,
id: String,
) -> Result<(usize, RpcSignatureStatus)> {
info!("get_signature_confirmation rpc request received: {:?}", id);
let signature = verify_signature(&id)?;
let res = meta
.request_processor
.read()
.unwrap()
.get_signature_status(signature);
.get_signature_confirmation_status(signature);
let status = {
if res.is_none() {
RpcSignatureStatus::SignatureNotFound
} else {
match res.unwrap() {
if let Some((count, res)) = res {
let res = match res {
Ok(_) => RpcSignatureStatus::Confirmed,
Err(TransactionError::AccountInUse) => RpcSignatureStatus::AccountInUse,
Err(TransactionError::AccountLoadedTwice) => {
@ -264,10 +305,16 @@ impl RpcSol for RpcSolImpl {
trace!("mapping {:?} to GenericFailure", err);
RpcSignatureStatus::GenericFailure
}
}
};
(count, res)
} else {
(0, RpcSignatureStatus::SignatureNotFound)
}
};
info!("get_signature_status rpc request status: {:?}", status);
info!(
"get_signature_confirmation rpc request status: {:?}",
status
);
Ok(status)
}

View File

@ -21,7 +21,6 @@ fn test_spend_and_verify_all_nodes_1() {
}
#[test]
#[ignore] //TODO: confirmations are not useful: #3346
fn test_spend_and_verify_all_nodes_2() {
solana_logger::setup();
let num_nodes = 2;
@ -34,7 +33,6 @@ fn test_spend_and_verify_all_nodes_2() {
}
#[test]
#[ignore] //TODO: confirmations are not useful: #3346
fn test_spend_and_verify_all_nodes_3() {
solana_logger::setup();
let num_nodes = 3;
@ -65,34 +63,20 @@ fn test_fullnode_exit_2() {
cluster_tests::fullnode_exit(&local.entry_point_info, num_nodes);
}
// Cluster needs a supermajority to remain, so the minimum size for this test is 4
#[test]
#[ignore]
fn test_leader_failure_2() {
let num_nodes = 2;
fn test_leader_failure_4() {
solana_logger::setup();
let num_nodes = 4;
let mut fullnode_config = FullnodeConfig::default();
fullnode_config.rpc_config.enable_fullnode_exit = true;
let local = LocalCluster::new_with_config(&[100; 2], 10_000, &fullnode_config);
let local = LocalCluster::new_with_config(&[100; 4], 10_000, &fullnode_config);
cluster_tests::kill_entry_and_spend_and_verify_rest(
&local.entry_point_info,
&local.funding_keypair,
num_nodes,
);
}
#[test]
#[ignore]
fn test_leader_failure_3() {
let num_nodes = 3;
let mut fullnode_config = FullnodeConfig::default();
fullnode_config.rpc_config.enable_fullnode_exit = true;
let local = LocalCluster::new_with_config(&[100; 3], 10_000, &fullnode_config);
cluster_tests::kill_entry_and_spend_and_verify_rest(
&local.entry_point_info,
&local.funding_keypair,
num_nodes,
);
}
#[test]
fn test_two_unbalanced_stakes() {
let mut fullnode_config = FullnodeConfig::default();

View File

@ -790,13 +790,21 @@ impl Bank {
self.accounts.transaction_count(self.accounts_id)
}
pub fn get_signature_status(&self, signature: &Signature) -> Option<Result<()>> {
pub fn get_signature_confirmation_status(
&self,
signature: &Signature,
) -> Option<(usize, Result<()>)> {
let parents = self.parents();
let mut caches = vec![self.status_cache.read().unwrap()];
caches.extend(parents.iter().map(|b| b.status_cache.read().unwrap()));
StatusCache::get_signature_status_all(&caches, signature)
}
pub fn get_signature_status(&self, signature: &Signature) -> Option<Result<()>> {
self.get_signature_confirmation_status(signature)
.map(|v| v.1)
}
pub fn has_signature(&self, signature: &Signature) -> bool {
let parents = self.parents();
let mut caches = vec![self.status_cache.read().unwrap()];

View File

@ -192,13 +192,13 @@ impl<T: Clone> StatusCache<T> {
pub fn get_signature_status_all<U>(
checkpoints: &[U],
signature: &Signature,
) -> Option<Result<(), T>>
) -> Option<(usize, Result<(), T>)>
where
U: Deref<Target = Self>,
{
for c in checkpoints {
for (i, c) in checkpoints.iter().enumerate() {
if let Some(status) = c.get_signature_status(signature) {
return Some(status);
return Some((i, status));
}
}
None
@ -257,7 +257,7 @@ mod tests {
let checkpoints = [&second, &first];
assert_eq!(
BankStatusCache::get_signature_status_all(&checkpoints, &sig),
Some(Ok(())),
Some((1, Ok(()))),
);
assert!(StatusCache::has_signature_all(&checkpoints, &sig));
}