Submit leader's vote after observing 2/3 validator votes (#780)

* fixup!

* fixups!

* send the vote and count it

* actually vote

* test

* Spelling fixes

* Process the voting transaction in the leader's bank

* Send tokens to the leader

* Give leader tokens in more cases

* Test for write_stage::leader_vote

* Request airdrop inside fullnode and not the script

* Change readme to indicate that drone should be up before leader

And start drone before leader in snap scripts

* Rename _kp => _keypair for keypairs and other review fixups

* Remove empty else
* tweak test_leader_vote numbers to be closer to testing 2/3 boundary
* combine creating blob and transaction for leader/validator
This commit is contained in:
sakridge 2018-07-31 22:07:53 -07:00 committed by anatoly yakovenko
parent 7c5172a65e
commit 2ea6f86199
21 changed files with 463 additions and 229 deletions

View File

@ -71,6 +71,20 @@ These files can be generated by running the following script.
$ ./multinode-demo/setup.sh
```
Drone
---
In order for the leader, client and validators to work, we'll need to
spin up a drone to give out some test tokens. The drone delivers Milton
Friedman-style "air drops" (free tokens to requesting clients) to be used in
test transactions.
Start the drone on the leader node with:
```bash
$ ./multinode-demo/drone.sh
```
Singlenode Testnet
---
@ -85,22 +99,8 @@ $ ./multinode-demo/leader.sh
```
Wait a few seconds for the server to initialize. It will print "Ready." when it's ready to
receive transactions.
Drone
---
In order for the below test client and validators to work, we'll also need to
spin up a drone to give out some test tokens. The drone delivers Milton
Friedman-style "air drops" (free tokens to requesting clients) to be used in
test transactions.
Start the drone on the leader node with:
```bash
$ ./multinode-demo/drone.sh
```
receive transactions. The leader will request some tokens from the drone if it doesn't have any.
The drone does not need to be running for subsequent leader starts.
Multinode Testnet
---

View File

@ -10,5 +10,5 @@ PATH="$HOME"/.cargo/bin:"$PATH"
# Run setup
USE_INSTALL=1 ./multinode-demo/setup.sh -p
USE_INSTALL=1 SOLANA_CUDA=1 ./multinode-demo/leader.sh >leader.log 2>&1 &
USE_INSTALL=1 ./multinode-demo/drone.sh >drone.log 2>&1 &
USE_INSTALL=1 SOLANA_CUDA=1 ./multinode-demo/leader.sh >leader.log 2>&1 &

View File

@ -76,14 +76,6 @@ $rsync -vPrz --max-size=100M "$rsync_leader_url"/config/ "$SOLANA_LEADER_CONFIG_
exit 1
}
# Ensure the validator has at least 1 token before connecting to the network
# TODO: Remove this workaround
while ! $solana_wallet \
-l "$SOLANA_LEADER_CONFIG_DIR"/leader.json \
-k "$SOLANA_CONFIG_PRIVATE_DIR"/validator-id.json airdrop --tokens 1; do
sleep 1
done
trap 'kill "$pid" && wait "$pid"' INT TERM
$program \
--identity "$SOLANA_CONFIG_DIR"/validator.json \

View File

@ -16,8 +16,8 @@ num_tokens="$(snapctl get num-tokens)"
case $mode in
leader+drone)
$SNAP/bin/setup.sh ${num_tokens:+-n $num_tokens} ${ip_address_arg} -t leader
snapctl start --enable solana.daemon-leader
snapctl start --enable solana.daemon-drone
snapctl start --enable solana.daemon-leader
;;
leader)
$SNAP/bin/setup.sh ${num_tokens:+-n $num_tokens} ${ip_address_arg} -t leader

View File

@ -176,6 +176,14 @@ impl Bank {
}
Err(BankError::LastIdNotFound(*last_id))
}
/// Look through the last_ids and find all the valid ids
/// This is batched to avoid holding the lock for a significant amount of time
pub fn count_valid_ids(&self, ids: &[Hash]) -> usize {
let last_ids = self.last_ids_sigs.read().unwrap();
ids.iter()
.map(|id| last_ids.get(id).is_some() as usize)
.sum()
}
/// Tell the bank which Entry IDs exist on the ledger. This function
/// assumes subsequent calls correspond to later entries, and will boot
@ -737,7 +745,21 @@ mod tests {
Err(BankError::LastIdNotFound(mint.last_id()))
);
}
#[test]
fn test_count_valid_ids() {
let mint = Mint::new(1);
let bank = Bank::new(&mint);
let ids: Vec<_> = (0..MAX_ENTRY_IDS)
.map(|i| {
let last_id = hash(&serialize(&i).unwrap()); // Unique hash
bank.register_entry_id(&last_id);
last_id
})
.collect();
assert_eq!(bank.count_valid_ids(&[]), 0);
assert_eq!(bank.count_valid_ids(&[mint.last_id()]), 0);
assert_eq!(bank.count_valid_ids(&ids), ids.len());
}
#[test]
fn test_debits_before_credits() {
let mint = Mint::new(2);

View File

@ -5,12 +5,12 @@ extern crate rayon;
extern crate serde_json;
extern crate solana;
use bincode::serialize;
use clap::{App, Arg};
use influx_db_client as influxdb;
use rayon::prelude::*;
use solana::client::mk_client;
use solana::crdt::{Crdt, NodeInfo};
use solana::drone::{DroneRequest, DRONE_PORT};
use solana::drone::DRONE_PORT;
use solana::fullnode::Config;
use solana::hash::Hash;
use solana::logger;
@ -23,11 +23,10 @@ use solana::streamer::default_window;
use solana::thin_client::ThinClient;
use solana::timing::{duration_as_ms, duration_as_s};
use solana::transaction::Transaction;
use solana::wallet::request_airdrop;
use std::collections::VecDeque;
use std::error;
use std::fs::File;
use std::io::Write;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream, UdpSocket};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
use std::process::exit;
use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering};
use std::sync::{Arc, RwLock};
@ -244,7 +243,7 @@ fn airdrop_tokens(client: &mut ThinClient, leader: &NodeInfo, id: &KeyPair, tx_c
);
let previous_balance = starting_balance;
request_airdrop(&drone_addr, &id, airdrop_amount as u64).unwrap();
request_airdrop(&drone_addr, &id.pubkey(), airdrop_amount as u64).unwrap();
// TODO: return airdrop Result from Drone instead of polling the
// network
@ -590,22 +589,6 @@ fn main() {
}
}
fn mk_client(r: &NodeInfo) -> ThinClient {
let requests_socket = udp_random_bind(8000, 10000, 5).unwrap();
let transactions_socket = udp_random_bind(8000, 10000, 5).unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();
ThinClient::new(
r.contact_info.rpu,
requests_socket,
r.contact_info.tpu,
transactions_socket,
)
}
fn spy_node(addr: Option<String>) -> (NodeInfo, UdpSocket) {
let gossip_socket_pair;
if let Some(a) = addr {
@ -693,19 +676,3 @@ fn read_leader(path: &str) -> Config {
let file = File::open(path).unwrap_or_else(|_| panic!("file not found: {}", path));
serde_json::from_reader(file).unwrap_or_else(|_| panic!("failed to parse {}", path))
}
fn request_airdrop(
drone_addr: &SocketAddr,
id: &KeyPair,
tokens: u64,
) -> Result<(), Box<error::Error>> {
let mut stream = TcpStream::connect(drone_addr)?;
let req = DroneRequest::GetAirdrop {
airdrop_request_amount: tokens,
client_public_key: id.pubkey(),
};
let tx = serialize(&req).expect("serialize drone request");
stream.write_all(&tx).unwrap();
// TODO: add timeout to this function, in case of unresponsive drone
Ok(())
}

View File

@ -5,12 +5,15 @@ extern crate serde_json;
extern crate solana;
use clap::{App, Arg};
use solana::client::mk_client;
use solana::crdt::{NodeInfo, TestNode};
use solana::drone::DRONE_PORT;
use solana::fullnode::{Config, FullNode, LedgerFile};
use solana::logger;
use solana::metrics::set_panic_hook;
use solana::service::Service;
use solana::signature::{KeyPair, KeyPairUtil};
use solana::wallet::request_airdrop;
use std::fs::File;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::process::exit;
@ -65,6 +68,10 @@ fn main() -> () {
exit(1);
}
}
let leader_pubkey = keypair.pubkey();
let repl_clone = repl_data.clone();
let ledger = if let Some(l) = matches.value_of("ledger") {
LedgerFile::Path(l.to_string())
} else {
@ -76,11 +83,32 @@ fn main() -> () {
let testnet_address_string = t.to_string();
let testnet_addr = testnet_address_string.parse().unwrap();
FullNode::new(node, false, ledger, Some(keypair), Some(testnet_addr))
FullNode::new(node, false, ledger, keypair, Some(testnet_addr))
} else {
node.data.leader_id = node.data.id;
FullNode::new(node, true, ledger, None, None)
FullNode::new(node, true, ledger, keypair, None)
};
let mut client = mk_client(&repl_clone);
let previous_balance = client.poll_get_balance(&leader_pubkey).unwrap();
eprintln!("balance is {}", previous_balance);
if previous_balance == 0 {
let mut drone_addr = repl_clone.contact_info.tpu;
drone_addr.set_port(DRONE_PORT);
request_airdrop(&drone_addr, &leader_pubkey, 50).unwrap_or_else(|_| {
panic!(
"Airdrop failed, is the drone address correct {:?} drone running?",
drone_addr
)
});
let balance = client.poll_get_balance(&leader_pubkey).unwrap();
eprintln!("new balance is {}", balance);
assert!(balance > 0);
}
fullnode.join().expect("join");
}

View File

@ -6,20 +6,19 @@ extern crate dirs;
extern crate serde_json;
extern crate solana;
use bincode::serialize;
use clap::{App, Arg, SubCommand};
use solana::client::mk_client;
use solana::crdt::NodeInfo;
use solana::drone::{DroneRequest, DRONE_PORT};
use solana::drone::DRONE_PORT;
use solana::fullnode::Config;
use solana::logger;
use solana::signature::{read_keypair, KeyPair, KeyPairUtil, PublicKey, Signature};
use solana::thin_client::ThinClient;
use solana::wallet::request_airdrop;
use std::error;
use std::fmt;
use std::fs::File;
use std::io;
use std::io::prelude::*;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream, UdpSocket};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::thread::sleep;
use std::time::Duration;
@ -255,7 +254,7 @@ fn process_command(
tokens, config.drone_addr
);
let previous_balance = client.poll_get_balance(&config.id.pubkey())?;
request_airdrop(&config.drone_addr, &config.id, tokens as u64)?;
request_airdrop(&config.drone_addr, &config.id.pubkey(), tokens as u64)?;
// TODO: return airdrop Result from Drone instead of polling the
// network
@ -318,40 +317,9 @@ fn read_leader(path: &str) -> Result<Config, WalletError> {
})
}
fn mk_client(r: &NodeInfo) -> io::Result<ThinClient> {
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();
Ok(ThinClient::new(
r.contact_info.rpu,
requests_socket,
r.contact_info.tpu,
transactions_socket,
))
}
fn request_airdrop(
drone_addr: &SocketAddr,
id: &KeyPair,
tokens: u64,
) -> Result<(), Box<error::Error>> {
let mut stream = TcpStream::connect(drone_addr)?;
let req = DroneRequest::GetAirdrop {
airdrop_request_amount: tokens,
client_public_key: id.pubkey(),
};
let tx = serialize(&req).expect("serialize drone request");
stream.write_all(&tx).unwrap();
// TODO: add timeout to this function, in case of unresponsive drone
Ok(())
}
fn main() -> Result<(), Box<error::Error>> {
logger::setup();
let config = parse_args()?;
let mut client = mk_client(&config.leader)?;
let mut client = mk_client(&config.leader);
process_command(&config, &mut client)
}

20
src/client.rs Normal file
View File

@ -0,0 +1,20 @@
use crdt::NodeInfo;
use nat::udp_random_bind;
use std::time::Duration;
use thin_client::ThinClient;
pub fn mk_client(r: &NodeInfo) -> ThinClient {
let requests_socket = udp_random_bind(8000, 10000, 5).unwrap();
let transactions_socket = udp_random_bind(8000, 10000, 5).unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();
ThinClient::new(
r.contact_info.rpu,
requests_socket,
r.contact_info.tpu,
transactions_socket,
)
}

View File

@ -132,7 +132,7 @@ pub struct NodeInfo {
/// current leader identity
pub leader_id: PublicKey,
/// information about the state of the ledger
ledger_state: LedgerState,
pub ledger_state: LedgerState,
}
fn make_debug_id(buf: &[u8]) -> u64 {
@ -360,7 +360,7 @@ impl Crdt {
warn!(
"{:x}: VOTE for unknown id: {:x}",
self.debug_id(),
make_debug_id(&pubkey)
make_debug_id(pubkey)
);
return;
}
@ -374,7 +374,15 @@ impl Crdt {
);
return;
}
self.update_leader_liveness();
if *pubkey == self.my_data().leader_id {
info!(
"{:x}: LEADER_VOTED! {:x}",
self.debug_id(),
make_debug_id(&pubkey)
);
inc_new_counter!("crdt-insert_vote-leader_voted", 1);
}
if v.version <= self.table[pubkey].version {
debug!(
"{:x}: VOTE for old version: {:x}",
@ -387,23 +395,16 @@ impl Crdt {
let mut data = self.table[pubkey].clone();
data.version = v.version;
data.ledger_state.last_id = last_id;
debug!(
"{:x}: INSERTING VOTE! for {:x}",
self.debug_id(),
data.debug_id()
);
self.update_liveness(data.id);
self.insert(&data);
}
}
fn update_leader_liveness(&mut self) {
//TODO: (leaders should vote)
//until then we pet their liveness every time we see some votes from anyone
let ld = self.leader_data().map(|x| x.id);
trace!("leader_id {:?}", ld);
if let Some(leader_id) = ld {
self.update_liveness(leader_id);
}
}
pub fn insert_votes(&mut self, votes: &[(PublicKey, Vote, Hash)]) {
inc_new_counter!("crdt-vote-count", votes.len());
if !votes.is_empty() {
@ -1165,7 +1166,7 @@ impl Crdt {
return;
}
if e.is_err() {
info!(
debug!(
"{:x}: run_listen timeout, table size: {}",
debug_id,
obj.read().unwrap().table.len()
@ -1465,7 +1466,7 @@ mod tests {
crdt.set_leader(leader.id);
assert_eq!(crdt.table[&d.id].version, 1);
let v = Vote {
version: 2, //version shoud increase when we vote
version: 2, //version should increase when we vote
contact_info_version: 0,
};
let expected = (v, crdt.table[&leader.id].contact_info.tpu);
@ -1501,34 +1502,6 @@ mod tests {
//should be accepted, since the update is for the same address field as the one we know
assert_eq!(crdt.table[&d.id].version, 1);
}
#[test]
fn test_insert_vote_leader_liveness() {
logger::setup();
// TODO: remove this test once leaders vote
let d = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
assert_eq!(d.version, 0);
let mut crdt = Crdt::new(d.clone()).unwrap();
let leader = NodeInfo::new_leader(&"127.0.0.2:1235".parse().unwrap());
assert_ne!(d.id, leader.id);
crdt.insert(&leader);
crdt.set_leader(leader.id);
let live: u64 = crdt.alive[&leader.id];
trace!("{:x} live {}", leader.debug_id(), live);
let vote_new_version_old_addrs = Vote {
version: d.version + 1,
contact_info_version: 0,
};
sleep(Duration::from_millis(100));
let votes = vec![(d.id.clone(), vote_new_version_old_addrs, Hash::default())];
crdt.insert_votes(&votes);
let updated = crdt.alive[&leader.id];
//should be accepted, since the update is for the same address field as the one we know
assert_eq!(crdt.table[&d.id].version, 1);
trace!("{:x} {} {}", leader.debug_id(), updated, live);
assert!(updated > live);
}
fn sorted(ls: &Vec<NodeInfo>) -> Vec<NodeInfo> {
let mut copy: Vec<_> = ls.iter().cloned().collect();
copy.sort_by(|x, y| x.id.cmp(&y.id));

View File

@ -266,7 +266,8 @@ mod tests {
const TPS_BATCH: i64 = 5_000_000;
logger::setup();
let leader = TestNode::new_localhost();
let leader_keypair = KeyPair::new();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let alice = Mint::new(10_000_000);
let bank = Bank::new(&alice);
@ -276,6 +277,7 @@ mod tests {
let leader_data = leader.data.clone();
let server = FullNode::new_leader(
leader_keypair,
bank,
0,
None,

View File

@ -62,7 +62,7 @@ impl FullNode {
mut node: TestNode,
leader: bool,
ledger: LedgerFile,
keypair_for_validator: Option<KeyPair>,
keypair: KeyPair,
network_entry_for_validator: Option<SocketAddr>,
sigverify_disabled: bool,
) -> FullNode {
@ -104,7 +104,6 @@ impl FullNode {
let testnet_addr = network_entry_for_validator.expect("validator requires entry");
let network_entry_point = NodeInfo::new_entry_point(testnet_addr);
let keypair = keypair_for_validator.expect("validator requires keypair");
let server = FullNode::new_validator(
keypair,
bank,
@ -124,6 +123,7 @@ impl FullNode {
node.data.leader_id = node.data.id;
let server = FullNode::new_leader(
keypair,
bank,
entry_height,
Some(ledger_tail),
@ -221,6 +221,7 @@ impl FullNode {
/// `---------------------`
/// ```
pub fn new_leader<W: Write + Send + 'static>(
keypair: KeyPair,
bank: Bank,
entry_height: u64,
ledger_tail: Option<Vec<Entry>>,
@ -245,6 +246,7 @@ impl FullNode {
let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new")));
let (tpu, blob_receiver) = Tpu::new(
keypair,
&bank,
&crdt,
tick_duration,

View File

@ -14,6 +14,7 @@ pub mod banking_stage;
pub mod blob_fetch_stage;
pub mod budget;
pub mod choose_gossip_peer_strategy;
pub mod client;
pub mod crdt;
pub mod drone;
pub mod entry;
@ -51,6 +52,7 @@ pub mod transaction;
pub mod tvu;
pub mod vote_stage;
pub mod voting;
pub mod wallet;
pub mod window_stage;
pub mod write_stage;
extern crate bincode;

View File

@ -297,7 +297,8 @@ mod tests {
#[test]
fn test_thin_client() {
logger::setup();
let leader = TestNode::new_localhost();
let leader_keypair = KeyPair::new();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.data.clone();
let alice = Mint::new(10_000);
@ -306,6 +307,7 @@ mod tests {
let exit = Arc::new(AtomicBool::new(false));
let server = FullNode::new_leader(
leader_keypair,
bank,
0,
None,
@ -342,7 +344,8 @@ mod tests {
#[ignore]
fn test_bad_sig() {
logger::setup();
let leader = TestNode::new_localhost();
let leader_keypair = KeyPair::new();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let alice = Mint::new(10_000);
let bank = Bank::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
@ -350,6 +353,7 @@ mod tests {
let leader_data = leader.data.clone();
let server = FullNode::new_leader(
leader_keypair,
bank,
0,
None,
@ -398,13 +402,15 @@ mod tests {
#[test]
fn test_client_check_signature() {
logger::setup();
let leader = TestNode::new_localhost();
let leader_keypair = KeyPair::new();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let alice = Mint::new(10_000);
let bank = Bank::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let leader_data = leader.data.clone();
let server = FullNode::new_leader(
leader_keypair,
bank,
0,
None,

View File

@ -32,6 +32,7 @@ use fetch_stage::FetchStage;
use packet::{BlobRecycler, PacketRecycler};
use record_stage::RecordStage;
use service::Service;
use signature::KeyPair;
use sigverify_stage::SigVerifyStage;
use std::io::Write;
use std::net::UdpSocket;
@ -52,6 +53,7 @@ pub struct Tpu {
impl Tpu {
pub fn new<W: Write + Send + 'static>(
keypair: KeyPair,
bank: &Arc<Bank>,
crdt: &Arc<RwLock<Crdt>>,
tick_duration: Option<Duration>,
@ -80,6 +82,7 @@ impl Tpu {
};
let (write_stage, blob_receiver) = WriteStage::new(
keypair,
bank.clone(),
crdt.clone(),
blob_recycler.clone(),

View File

@ -175,8 +175,8 @@ pub mod tests {
fn test_replicate() {
logger::setup();
let leader = TestNode::new_localhost();
let target1_kp = KeyPair::new();
let target1 = TestNode::new_localhost_with_pubkey(target1_kp.pubkey());
let target1_keypair = KeyPair::new();
let target1 = TestNode::new_localhost_with_pubkey(target1_keypair.pubkey());
let target2 = TestNode::new_localhost();
let exit = Arc::new(AtomicBool::new(false));
@ -230,7 +230,7 @@ pub mod tests {
let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone()).unwrap();
let tvu = Tvu::new(
target1_kp,
target1_keypair,
&bank,
0,
cref1,

View File

@ -4,8 +4,7 @@ use bank::Bank;
use bincode::serialize;
use counter::Counter;
use crdt::Crdt;
use hash::Hash;
use packet::BlobRecycler;
use packet::{BlobRecycler, SharedBlob};
use result::Result;
use service::Service;
use signature::KeyPair;
@ -15,14 +14,99 @@ use std::sync::{Arc, RwLock};
use std::thread::{self, sleep, spawn, JoinHandle};
use std::time::Duration;
use streamer::BlobSender;
use timing;
use transaction::Transaction;
const VOTE_TIMEOUT_MS: u64 = 1000;
pub const VOTE_TIMEOUT_MS: u64 = 1000;
pub struct VoteStage {
thread_hdl: JoinHandle<()>,
}
pub fn create_vote_tx_and_blob(
bank: &Arc<Bank>,
keypair: &KeyPair,
crdt: &Arc<RwLock<Crdt>>,
blob_recycler: &BlobRecycler,
) -> Result<(Transaction, SharedBlob)> {
let last_id = bank.last_id();
let shared_blob = blob_recycler.allocate();
let (vote, addr) = {
let mut wcrdt = crdt.write().unwrap();
//TODO: doesn't seem like there is a synchronous call to get height and id
info!("voting on {:?}", &last_id[..8]);
wcrdt.new_vote(last_id)
}?;
let tx = Transaction::new_vote(&keypair, vote, last_id, 0);
{
let mut blob = shared_blob.write().unwrap();
let bytes = serialize(&tx)?;
let len = bytes.len();
blob.data[..len].copy_from_slice(&bytes);
blob.meta.set_addr(&addr);
blob.meta.size = len;
}
Ok((tx, shared_blob))
}
pub fn send_leader_vote(
debug_id: u64,
keypair: &KeyPair,
bank: &Arc<Bank>,
crdt: &Arc<RwLock<Crdt>>,
blob_recycler: &BlobRecycler,
vote_blob_sender: &BlobSender,
last_vote: &mut u64,
) -> Result<()> {
let now = timing::timestamp();
if now - *last_vote > VOTE_TIMEOUT_MS {
//TODO(anatoly): vote if the last id set is mostly valid
let ids: Vec<_> = crdt.read()
.unwrap()
.table
.values()
.map(|x| x.ledger_state.last_id)
.collect();
let total = bank.count_valid_ids(&ids);
//TODO(anatoly): this isn't stake based voting
info!(
"{:x}: valid_ids {}/{} {}",
debug_id,
total,
ids.len(),
(2 * ids.len()) / 3
);
if total > (2 * ids.len()) / 3 {
*last_vote = now;
if let Ok((tx, shared_blob)) =
create_vote_tx_and_blob(bank, keypair, crdt, blob_recycler)
{
bank.process_transaction(&tx)?;
vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?;
info!("{:x} leader_sent_vote", debug_id);
inc_new_counter!("write_stage-leader_sent_vote", 1);
}
}
}
Ok(())
}
fn send_validator_vote(
bank: &Arc<Bank>,
keypair: &Arc<KeyPair>,
crdt: &Arc<RwLock<Crdt>>,
blob_recycler: &BlobRecycler,
vote_blob_sender: &BlobSender,
) -> Result<()> {
if let Ok((_, shared_blob)) = create_vote_tx_and_blob(bank, keypair, crdt, blob_recycler) {
inc_new_counter!("replicate-vote_sent", 1);
vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?;
}
Ok(())
}
impl VoteStage {
pub fn new(
keypair: Arc<KeyPair>,
@ -54,43 +138,14 @@ impl VoteStage {
exit: &Arc<AtomicBool>,
) {
while !exit.load(Ordering::Relaxed) {
let last_id = bank.last_id();
if let Err(err) = Self::vote(&last_id, keypair, crdt, blob_recycler, vote_blob_sender) {
if let Err(err) =
send_validator_vote(bank, keypair, crdt, blob_recycler, vote_blob_sender)
{
info!("Vote failed: {:?}", err);
}
sleep(Duration::from_millis(VOTE_TIMEOUT_MS));
}
}
fn vote(
last_id: &Hash,
keypair: &Arc<KeyPair>,
crdt: &Arc<RwLock<Crdt>>,
blob_recycler: &BlobRecycler,
vote_blob_sender: &BlobSender,
) -> Result<()> {
let shared_blob = blob_recycler.allocate();
let (vote, addr) = {
let mut wcrdt = crdt.write().unwrap();
//TODO: doesn't seem like there is a synchronous call to get height and id
info!("voting on {:?}", &last_id[..8]);
wcrdt.new_vote(*last_id)
}?;
{
let mut blob = shared_blob.write().unwrap();
let tx = Transaction::new_vote(&keypair, vote, *last_id, 0);
let bytes = serialize(&tx)?;
let len = bytes.len();
blob.data[..len].copy_from_slice(&bytes);
blob.meta.set_addr(&addr);
blob.meta.size = len;
}
inc_new_counter!("replicate-vote_sent", 1);
vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?;
Ok(())
}
}
impl Service for VoteStage {
@ -108,7 +163,10 @@ impl Service for VoteStage {
pub mod tests {
use super::*;
use bank::Bank;
use crdt::{Crdt, TestNode};
use crdt::{Crdt, NodeInfo, TestNode};
use entry::next_entry;
use hash::Hash;
use logger;
use mint::Mint;
use packet::BlobRecycler;
use service::Service;
@ -116,6 +174,7 @@ pub mod tests {
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use transaction::{Transaction, Vote};
/// Ensure the VoteStage issues votes at the expected cadence
#[test]
@ -150,4 +209,111 @@ pub mod tests {
exit.store(true, Ordering::Relaxed);
vote_stage.join().expect("join");
}
#[test]
fn test_send_leader_vote() {
logger::setup();
// create a mint/bank
let mint = Mint::new(1000);
let bank = Arc::new(Bank::new(&mint));
let hash0 = Hash::default();
// get a non-default hash last_id
let entry = next_entry(&hash0, 1, vec![]);
bank.register_entry_id(&entry.id);
// Create a leader
let leader_data = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap());
let leader_pubkey = leader_data.id.clone();
let mut leader_crdt = Crdt::new(leader_data).unwrap();
// give the leader some tokens
let give_leader_tokens_tx =
Transaction::new(&mint.keypair(), leader_pubkey.clone(), 100, entry.id);
bank.process_transaction(&give_leader_tokens_tx).unwrap();
leader_crdt.set_leader(leader_pubkey);
// Insert 7 agreeing validators / 3 disagreeing
// and votes for new last_id
for i in 0..10 {
let mut validator =
NodeInfo::new_leader(&format!("127.0.0.1:234{}", i).parse().unwrap());
let vote = Vote {
version: validator.version + 1,
contact_info_version: 1,
};
if i < 7 {
validator.ledger_state.last_id = entry.id;
}
leader_crdt.insert(&validator);
trace!("validator id: {:?}", validator.id);
leader_crdt.insert_vote(&validator.id, &vote, entry.id);
}
let leader = Arc::new(RwLock::new(leader_crdt));
let blob_recycler = BlobRecycler::default();
let (vote_blob_sender, vote_blob_receiver) = channel();
let mut last_vote: u64 = timing::timestamp() - VOTE_TIMEOUT_MS - 1;
let res = send_leader_vote(
1234,
&mint.keypair(),
&bank,
&leader,
&blob_recycler,
&vote_blob_sender,
&mut last_vote,
);
trace!("vote result: {:?}", res);
assert!(res.is_ok());
let vote_blob = vote_blob_receiver.recv_timeout(Duration::from_millis(500));
trace!("vote_blob: {:?}", vote_blob);
// leader shouldn't vote yet, not enough votes
assert!(vote_blob.is_err());
// add two more nodes and see that it succeeds
for i in 0..2 {
let mut validator =
NodeInfo::new_leader(&format!("127.0.0.1:234{}", i).parse().unwrap());
let vote = Vote {
version: validator.version + 1,
contact_info_version: 1,
};
validator.ledger_state.last_id = entry.id;
leader.write().unwrap().insert(&validator);
trace!("validator id: {:?}", validator.id);
leader
.write()
.unwrap()
.insert_vote(&validator.id, &vote, entry.id);
}
last_vote = timing::timestamp() - VOTE_TIMEOUT_MS - 1;
let res = send_leader_vote(
2345,
&mint.keypair(),
&bank,
&leader,
&blob_recycler,
&vote_blob_sender,
&mut last_vote,
);
trace!("vote result: {:?}", res);
assert!(res.is_ok());
let vote_blob = vote_blob_receiver.recv_timeout(Duration::from_millis(500));
trace!("vote_blob: {:?}", vote_blob);
// leader should vote now
assert!(vote_blob.is_ok());
}
}

View File

@ -1,21 +1,18 @@
use entry::Entry;
use hash::Hash;
use signature::PublicKey;
use transaction::{Instruction, Vote};
use transaction::{Instruction, Transaction, Vote};
pub fn entries_to_votes(entries: &[Entry]) -> Vec<(PublicKey, Vote, Hash)> {
entries
.iter()
.flat_map(|entry| {
let vs: Vec<(PublicKey, Vote, Hash)> = entry
.transactions
.iter()
.filter_map(|tx| match tx.instruction {
Instruction::NewVote(ref vote) => Some((tx.from, vote.clone(), tx.last_id)),
_ => None,
})
.collect();
vs
})
.flat_map(|entry| entry.transactions.iter().filter_map(transaction_to_vote))
.collect()
}
pub fn transaction_to_vote(tx: &Transaction) -> Option<(PublicKey, Vote, Hash)> {
match tx.instruction {
Instruction::NewVote(ref vote) => Some((tx.from, vote.clone(), tx.last_id)),
_ => None,
}
}

22
src/wallet.rs Normal file
View File

@ -0,0 +1,22 @@
use bincode::serialize;
use drone::DroneRequest;
use signature::PublicKey;
use std::error;
use std::io::Write;
use std::net::{SocketAddr, TcpStream};
pub fn request_airdrop(
drone_addr: &SocketAddr,
id: &PublicKey,
tokens: u64,
) -> Result<(), Box<error::Error>> {
let mut stream = TcpStream::connect(drone_addr)?;
let req = DroneRequest::GetAirdrop {
airdrop_request_amount: tokens,
client_public_key: *id,
};
let tx = serialize(&req).expect("serialize drone request");
stream.write_all(&tx).unwrap();
// TODO: add timeout to this function, in case of unresponsive drone
Ok(())
}

View File

@ -11,18 +11,21 @@ use ledger::Block;
use packet::BlobRecycler;
use result::{Error, Result};
use service::Service;
use signature::KeyPair;
use std::collections::VecDeque;
use std::io::Write;
use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use streamer::{BlobReceiver, BlobSender};
use streamer::{responder, BlobReceiver, BlobSender};
use vote_stage::send_leader_vote;
use voting::entries_to_votes;
pub struct WriteStage {
thread_hdl: JoinHandle<()>,
thread_hdls: Vec<JoinHandle<()>>,
}
impl WriteStage {
@ -38,13 +41,18 @@ impl WriteStage {
let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
let votes = entries_to_votes(&entries);
crdt.write().unwrap().insert_votes(&votes);
//TODO(anatoly): real stake based voting needs to change this
//leader simply votes if the current set of validators have voted
//on a valid last id
entry_writer.write_and_register_entries(&entries)?;
trace!("New blobs? {}", entries.len());
let mut blobs = VecDeque::new();
entries.to_blobs(blob_recycler, &mut blobs);
if !blobs.is_empty() {
inc_new_counter!("write_stage-broadcast_vote-count", votes.len());
inc_new_counter!("write_stage-broadcast_blobs-count", blobs.len());
inc_new_counter!("write_stage-recv_vote", votes.len());
inc_new_counter!("write_stage-broadcast_blobs", blobs.len());
trace!("broadcasting {}", blobs.len());
blob_sender.send(blobs)?;
}
@ -53,17 +61,28 @@ impl WriteStage {
/// Create a new WriteStage for writing and broadcasting entries.
pub fn new<W: Write + Send + 'static>(
keypair: KeyPair,
bank: Arc<Bank>,
crdt: Arc<RwLock<Crdt>>,
blob_recycler: BlobRecycler,
writer: W,
entry_receiver: Receiver<Vec<Entry>>,
) -> (Self, BlobReceiver) {
let (vote_blob_sender, vote_blob_receiver) = channel();
let send = UdpSocket::bind("0.0.0.0:0").expect("bind");
let t_responder = responder(
"write_stage_vote_sender",
send,
blob_recycler.clone(),
vote_blob_receiver,
);
let (blob_sender, blob_receiver) = channel();
let thread_hdl = Builder::new()
.name("solana-writer".to_string())
.spawn(move || {
let mut entry_writer = EntryWriter::new(&bank, writer);
let mut last_vote = 0;
let debug_id = crdt.read().unwrap().debug_id();
loop {
if let Err(e) = Self::write_and_send_entries(
&crdt,
@ -76,25 +95,40 @@ impl WriteStage {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => {
inc_new_counter!("write_stage-error", 1);
inc_new_counter!("write_stage-write_and_send_entries-error", 1);
error!("{:?}", e);
}
}
};
if let Err(e) = send_leader_vote(
debug_id,
&keypair,
&bank,
&crdt,
&blob_recycler,
&vote_blob_sender,
&mut last_vote,
) {
inc_new_counter!("write_stage-leader_vote-error", 1);
error!("{:?}", e);
}
}
})
.unwrap();
(WriteStage { thread_hdl }, blob_receiver)
let thread_hdls = vec![t_responder, thread_hdl];
(WriteStage { thread_hdls }, blob_receiver)
}
}
impl Service for WriteStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
vec![self.thread_hdl]
self.thread_hdls
}
fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
for thread_hdl in self.thread_hdls() {
thread_hdl.join()?;
}
Ok(())
}
}

View File

@ -91,7 +91,9 @@ fn test_multi_node_validator_catchup_from_zero() {
logger::setup();
const N: usize = 5;
trace!("test_multi_node_validator_catchup_from_zero");
let leader = TestNode::new_localhost();
let leader_keypair = KeyPair::new();
let leader_pubkey = leader_keypair.pubkey().clone();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.data.clone();
let bob_pubkey = KeyPair::new().pubkey();
@ -100,9 +102,15 @@ fn test_multi_node_validator_catchup_from_zero() {
leader,
true,
LedgerFile::Path(ledger_path.clone()),
None,
leader_keypair,
None,
);
// Send leader some tokens to vote
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &leader_pubkey, None).unwrap();
info!("leader balance {}", leader_balance);
let mut nodes = vec![server];
for _ in 0..N {
let keypair = KeyPair::new();
@ -111,7 +119,7 @@ fn test_multi_node_validator_catchup_from_zero() {
validator,
false,
LedgerFile::Path(ledger_path.clone()),
Some(keypair),
keypair,
Some(leader_data.contact_info.ncp),
);
nodes.push(val);
@ -145,7 +153,7 @@ fn test_multi_node_validator_catchup_from_zero() {
validator,
false,
LedgerFile::Path(ledger_path.clone()),
Some(keypair),
keypair,
Some(leader_data.contact_info.ncp),
);
nodes.push(val);
@ -191,7 +199,9 @@ fn test_multi_node_basic() {
logger::setup();
const N: usize = 5;
trace!("test_multi_node_basic");
let leader = TestNode::new_localhost();
let leader_keypair = KeyPair::new();
let leader_pubkey = leader_keypair.pubkey().clone();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.data.clone();
let bob_pubkey = KeyPair::new().pubkey();
let (alice, ledger_path) = genesis(10_000);
@ -199,9 +209,15 @@ fn test_multi_node_basic() {
leader,
true,
LedgerFile::Path(ledger_path.clone()),
None,
leader_keypair,
None,
);
// Send leader some tokens to vote
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &leader_pubkey, None).unwrap();
info!("leader balance {}", leader_balance);
let mut nodes = vec![server];
for _ in 0..N {
let keypair = KeyPair::new();
@ -210,7 +226,7 @@ fn test_multi_node_basic() {
validator,
false,
LedgerFile::Path(ledger_path.clone()),
Some(keypair),
keypair,
Some(leader_data.contact_info.ncp),
);
nodes.push(val);
@ -244,7 +260,8 @@ fn test_multi_node_basic() {
#[test]
fn test_boot_validator_from_file() {
logger::setup();
let leader = TestNode::new_localhost();
let leader_keypair = KeyPair::new();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let bob_pubkey = KeyPair::new().pubkey();
let (alice, ledger_path) = genesis(100_000);
let leader_data = leader.data.clone();
@ -252,7 +269,7 @@ fn test_boot_validator_from_file() {
leader,
true,
LedgerFile::Path(ledger_path.clone()),
None,
leader_keypair,
None,
);
let leader_balance =
@ -269,7 +286,7 @@ fn test_boot_validator_from_file() {
validator,
false,
LedgerFile::Path(ledger_path.clone()),
Some(keypair),
keypair,
Some(leader_data.contact_info.ncp),
);
let mut client = mk_client(&validator_data);
@ -282,13 +299,14 @@ fn test_boot_validator_from_file() {
}
fn create_leader(ledger_path: &str) -> (NodeInfo, FullNode) {
let leader = TestNode::new_localhost();
let leader_keypair = KeyPair::new();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.data.clone();
let leader_fullnode = FullNode::new(
leader,
true,
LedgerFile::Path(ledger_path.to_string()),
None,
leader_keypair,
None,
);
(leader_data, leader_fullnode)
@ -339,7 +357,7 @@ fn test_leader_restart_validator_start_from_old_ledger() {
validator,
false,
LedgerFile::Path(stale_ledger_path.clone()),
Some(keypair),
keypair,
Some(leader_data.contact_info.ncp),
);
@ -387,7 +405,9 @@ fn test_multi_node_dynamic_network() {
Err(_) => std::usize::MAX,
};
let leader = TestNode::new_localhost();
let leader_keypair = KeyPair::new();
let leader_pubkey = leader_keypair.pubkey().clone();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let bob_pubkey = KeyPair::new().pubkey();
let (alice, ledger_path) = genesis(10_000_000);
let alice_arc = Arc::new(RwLock::new(alice));
@ -396,9 +416,19 @@ fn test_multi_node_dynamic_network() {
leader,
true,
LedgerFile::Path(ledger_path.clone()),
None,
leader_keypair,
None,
);
// Send leader some tokens to vote
let leader_balance = send_tx_and_retry_get_balance(
&leader_data,
&alice_arc.read().unwrap(),
&leader_pubkey,
None,
).unwrap();
info!("leader balance {}", leader_balance);
info!("{:x} LEADER", leader_data.debug_id());
let leader_balance = retry_send_tx_and_retry_get_balance(
&leader_data,
@ -464,7 +494,7 @@ fn test_multi_node_dynamic_network() {
validator,
false,
LedgerFile::Path(ledger_path.clone()),
Some(keypair),
keypair,
Some(leader_data.contact_info.ncp),
);
(rd, val)