Placeholder storage contract and replicator client (#1286)

* Add hooks for executing the storage contract

* Add store_ledger stage
  Similar to replicate_stage but no voting/banking stuff, just convert
  blobs to entries and write the ledger out

* Add storage_addr to tests and add new NodeInfo constructor
  to reduce duplication...
This commit is contained in:
sakridge 2018-09-21 15:32:15 -07:00 committed by GitHub
parent 3dcee9f79e
commit a9355c33b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 436 additions and 68 deletions

View File

@ -33,6 +33,10 @@ path = "src/bin/bench-tps.rs"
name = "solana-drone"
path = "src/bin/drone.rs"
[[bin]]
name = "solana-replicator"
path = "src/bin/replicator.rs"
[[bin]]
name = "solana-fullnode"
path = "src/bin/fullnode.rs"

View File

@ -21,6 +21,7 @@ use std::result;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::RwLock;
use std::time::Instant;
use storage_program::StorageProgram;
use system_program::SystemProgram;
use timing::{duration_as_us, timestamp};
use transaction::Transaction;
@ -359,6 +360,8 @@ impl Bank {
// TODO: the runtime should be checking read/write access to memory
// we are trusting the hard coded contracts not to clobber or allocate
BudgetState::process_transaction(&tx, accounts)
} else if StorageProgram::check_id(&tx.program_id) {
StorageProgram::process_transaction(&tx, accounts)
} else {
return Err(BankError::UnknownContractId(tx.program_id));
}

89
src/bin/replicator.rs Normal file
View File

@ -0,0 +1,89 @@
#[macro_use]
extern crate clap;
extern crate getopts;
extern crate serde_json;
#[macro_use]
extern crate solana;
use clap::{App, Arg};
use solana::crdt::Node;
use solana::fullnode::Config;
use solana::logger;
use solana::replicator::Replicator;
use solana::signature::{Keypair, KeypairUtil};
use std::fs::File;
use std::net::{Ipv4Addr, SocketAddr};
use std::process::exit;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
fn main() {
logger::setup();
let matches = App::new("replicator")
.version(crate_version!())
.arg(
Arg::with_name("identity")
.short("i")
.long("identity")
.value_name("PATH")
.takes_value(true)
.help("Run with the identity found in FILE"),
).arg(
Arg::with_name("network")
.short("n")
.long("network")
.value_name("HOST:PORT")
.takes_value(true)
.help("Rendezvous with the network at this gossip entry point"),
).arg(
Arg::with_name("ledger")
.short("l")
.long("ledger")
.value_name("DIR")
.takes_value(true)
.required(true)
.help("use DIR as persistent ledger location"),
).get_matches();
let ledger_path = matches.value_of("ledger");
let (keypair, ncp) = if let Some(i) = matches.value_of("identity") {
let path = i.to_string();
if let Ok(file) = File::open(path.clone()) {
let parse: serde_json::Result<Config> = serde_json::from_reader(file);
if let Ok(data) = parse {
(data.keypair(), data.node_info.contact_info.ncp)
} else {
eprintln!("failed to parse {}", path);
exit(1);
}
} else {
eprintln!("failed to read {}", path);
exit(1);
}
} else {
(Keypair::new(), socketaddr!([127, 0, 0, 1], 8700))
};
let node = Node::new_with_external_ip(keypair.pubkey(), &ncp);
println!(
"replicating the data with keypair: {:?} ncp:{:?}",
keypair.pubkey(),
ncp
);
println!("my node: {:?}", node);
let exit = Arc::new(AtomicBool::new(false));
let network_addr = matches
.value_of("network")
.map(|network| network.parse().expect("failed to parse network address"));
// TODO: ask network what slice we should store
let entry_height = 0;
let replicator = Replicator::new(entry_height, &exit, ledger_path, node, network_addr);
replicator.join();
}

View File

@ -82,6 +82,8 @@ pub struct ContactInfo {
pub rpu: SocketAddr,
/// transactions address
pub tpu: SocketAddr,
/// storage data address
pub storage_addr: SocketAddr,
/// if this struture changes update this value as well
/// Always update `NodeInfo` version too
/// This separate version for addresses allows us to use the `Vote`
@ -116,6 +118,7 @@ impl NodeInfo {
tvu: SocketAddr,
rpu: SocketAddr,
tpu: SocketAddr,
storage_addr: SocketAddr,
) -> Self {
NodeInfo {
id,
@ -125,6 +128,7 @@ impl NodeInfo {
tvu,
rpu,
tpu,
storage_addr,
version: 0,
},
leader_id: Pubkey::default(),
@ -134,19 +138,30 @@ impl NodeInfo {
}
}
pub fn new_localhost(id: Pubkey) -> Self {
Self::new(
id,
socketaddr!("127.0.0.1:1234"),
socketaddr!("127.0.0.1:1235"),
socketaddr!("127.0.0.1:1236"),
socketaddr!("127.0.0.1:1237"),
socketaddr!("127.0.0.1:1238"),
)
}
#[cfg(test)]
/// NodeInfo with unspecified addresses for adversarial testing.
pub fn new_unspecified() -> Self {
let addr = socketaddr!(0, 0);
assert!(addr.ip().is_unspecified());
Self::new(Keypair::new().pubkey(), addr, addr, addr, addr)
Self::new(Keypair::new().pubkey(), addr, addr, addr, addr, addr)
}
#[cfg(test)]
/// NodeInfo with multicast addresses for adversarial testing.
pub fn new_multicast() -> Self {
let addr = socketaddr!("224.0.1.255:1000");
assert!(addr.ip().is_multicast());
Self::new(Keypair::new().pubkey(), addr, addr, addr, addr)
Self::new(Keypair::new().pubkey(), addr, addr, addr, addr, addr)
}
fn next_port(addr: &SocketAddr, nxt: u16) -> SocketAddr {
let mut nxt_addr = *addr;
@ -164,6 +179,7 @@ impl NodeInfo {
replicate_addr,
requests_addr,
transactions_addr,
"0.0.0.0:0".parse().unwrap(),
)
}
pub fn new_with_socketaddr(bind_addr: &SocketAddr) -> Self {
@ -173,7 +189,7 @@ impl NodeInfo {
//
pub fn new_entry_point(gossip_addr: &SocketAddr) -> Self {
let daddr: SocketAddr = socketaddr!("0.0.0.0:0");
NodeInfo::new(Pubkey::default(), *gossip_addr, daddr, daddr, daddr)
NodeInfo::new(Pubkey::default(), *gossip_addr, daddr, daddr, daddr, daddr)
}
}
@ -1238,11 +1254,12 @@ impl Crdt {
let pubkey = Keypair::new().pubkey();
let daddr = socketaddr_any!();
let node = NodeInfo::new(pubkey, daddr, daddr, daddr, daddr);
let node = NodeInfo::new(pubkey, daddr, daddr, daddr, daddr, daddr);
(node, gossip_socket)
}
}
#[derive(Debug)]
pub struct Sockets {
pub gossip: UdpSocket,
pub requests: UdpSocket,
@ -1254,6 +1271,7 @@ pub struct Sockets {
pub retransmit: UdpSocket,
}
#[derive(Debug)]
pub struct Node {
pub info: NodeInfo,
pub sockets: Sockets,
@ -1274,12 +1292,14 @@ impl Node {
let respond = UdpSocket::bind("0.0.0.0:0").unwrap();
let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap();
let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap();
let storage = UdpSocket::bind("0.0.0.0:0").unwrap();
let info = NodeInfo::new(
pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
requests.local_addr().unwrap(),
transaction.local_addr().unwrap(),
storage.local_addr().unwrap(),
);
Node {
info,
@ -1317,6 +1337,7 @@ impl Node {
let (_, repair) = bind();
let (_, broadcast) = bind();
let (_, retransmit) = bind();
let (storage_port, _) = bind();
// Responses are sent from the same Udp port as requests are received
// from, in hopes that a NAT sitting in the middle will route the
@ -1329,6 +1350,7 @@ impl Node {
SocketAddr::new(ncp.ip(), replicate_port),
SocketAddr::new(ncp.ip(), requests_port),
SocketAddr::new(ncp.ip(), transaction_port),
SocketAddr::new(ncp.ip(), storage_port),
);
trace!("new NodeInfo: {:?}", info);
@ -1380,13 +1402,7 @@ mod tests {
#[test]
fn insert_test() {
let mut d = NodeInfo::new(
Keypair::new().pubkey(),
socketaddr!("127.0.0.1:1234"),
socketaddr!("127.0.0.1:1235"),
socketaddr!("127.0.0.1:1236"),
socketaddr!("127.0.0.1:1237"),
);
let mut d = NodeInfo::new_localhost(Keypair::new().pubkey());
assert_eq!(d.version, 0);
let mut crdt = Crdt::new(d.clone()).unwrap();
assert_eq!(crdt.table[&d.id].version, 0);
@ -1486,27 +1502,9 @@ mod tests {
}
#[test]
fn update_test() {
let d1 = NodeInfo::new(
Keypair::new().pubkey(),
socketaddr!("127.0.0.1:1234"),
socketaddr!("127.0.0.1:1235"),
socketaddr!("127.0.0.1:1236"),
socketaddr!("127.0.0.1:1237"),
);
let d2 = NodeInfo::new(
Keypair::new().pubkey(),
socketaddr!("127.0.0.1:1234"),
socketaddr!("127.0.0.1:1235"),
socketaddr!("127.0.0.1:1236"),
socketaddr!("127.0.0.1:1237"),
);
let d3 = NodeInfo::new(
Keypair::new().pubkey(),
socketaddr!("127.0.0.1:1234"),
socketaddr!("127.0.0.1:1235"),
socketaddr!("127.0.0.1:1236"),
socketaddr!("127.0.0.1:1237"),
);
let d1 = NodeInfo::new_localhost(Keypair::new().pubkey());
let d2 = NodeInfo::new_localhost(Keypair::new().pubkey());
let d3 = NodeInfo::new_localhost(Keypair::new().pubkey());
let mut crdt = Crdt::new(d1.clone()).expect("Crdt::new");
let (key, ix, ups) = crdt.get_updates_since(0);
assert_eq!(key, d1.id);
@ -1542,13 +1540,7 @@ mod tests {
}
#[test]
fn window_index_request() {
let me = NodeInfo::new(
Keypair::new().pubkey(),
socketaddr!([127, 0, 0, 1], 1234),
socketaddr!([127, 0, 0, 1], 1235),
socketaddr!([127, 0, 0, 1], 1236),
socketaddr!([127, 0, 0, 1], 1237),
);
let me = NodeInfo::new_localhost(Keypair::new().pubkey());
let mut crdt = Crdt::new(me).expect("Crdt::new");
let rv = crdt.window_index_request(0);
assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers)));
@ -1560,6 +1552,7 @@ mod tests {
socketaddr!([127, 0, 0, 1], 1235),
socketaddr!([127, 0, 0, 1], 1236),
socketaddr!([127, 0, 0, 1], 1237),
socketaddr!([127, 0, 0, 1], 1238),
);
crdt.insert(&nxt);
let rv = crdt.window_index_request(0).unwrap();
@ -1573,6 +1566,7 @@ mod tests {
socketaddr!([127, 0, 0, 1], 1235),
socketaddr!([127, 0, 0, 1], 1236),
socketaddr!([127, 0, 0, 1], 1237),
socketaddr!([127, 0, 0, 1], 1238),
);
crdt.insert(&nxt);
let mut one = false;
@ -1598,6 +1592,7 @@ mod tests {
socketaddr!("127.0.0.1:127"),
socketaddr!("127.0.0.1:127"),
socketaddr!("127.0.0.1:127"),
socketaddr!("127.0.0.1:127"),
);
let mut crdt = Crdt::new(me).expect("Crdt::new");
@ -1616,23 +1611,11 @@ mod tests {
/// test that gossip requests are eventually generated for all nodes
#[test]
fn gossip_request() {
let me = NodeInfo::new(
Keypair::new().pubkey(),
socketaddr!("127.0.0.1:1234"),
socketaddr!("127.0.0.1:1235"),
socketaddr!("127.0.0.1:1236"),
socketaddr!("127.0.0.1:1237"),
);
let me = NodeInfo::new_localhost(Keypair::new().pubkey());
let mut crdt = Crdt::new(me.clone()).expect("Crdt::new");
let rv = crdt.gossip_request();
assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers)));
let nxt1 = NodeInfo::new(
Keypair::new().pubkey(),
socketaddr!("127.0.0.2:1234"),
socketaddr!("127.0.0.1:1235"),
socketaddr!("127.0.0.1:1236"),
socketaddr!("127.0.0.1:1237"),
);
let nxt1 = NodeInfo::new_localhost(Keypair::new().pubkey());
crdt.insert(&nxt1);
@ -1754,6 +1737,7 @@ mod tests {
socketaddr!("127.0.0.1:1235"),
socketaddr!("127.0.0.1:1236"),
socketaddr!("127.0.0.1:1237"),
socketaddr!("127.0.0.1:1238"),
);
let recycler = BlobRecycler::default();
let rv = Crdt::run_window_request(
@ -1922,6 +1906,7 @@ mod tests {
socketaddr_any!(),
socketaddr!("127.0.0.1:1236"),
socketaddr_any!(),
socketaddr_any!(),
);
leader3.ledger_state.last_id = hash(b"3");
let mut crdt = Crdt::new(leader0.clone()).expect("Crdt::new");
@ -2011,13 +1996,7 @@ mod tests {
#[test]
fn test_default_leader() {
logger::setup();
let node_info = NodeInfo::new(
Keypair::new().pubkey(),
socketaddr!("127.0.0.1:1234"),
socketaddr!("127.0.0.1:1235"),
socketaddr!("127.0.0.1:1236"),
socketaddr!("127.0.0.1:1237"),
);
let node_info = NodeInfo::new_localhost(Keypair::new().pubkey());
let mut crdt = Crdt::new(node_info).unwrap();
let network_entry_point = NodeInfo::new_entry_point(&socketaddr!("127.0.0.1:1239"));
crdt.insert(&network_entry_point);

View File

@ -737,13 +737,7 @@ mod test {
blobs.push(b_);
}
let d = crdt::NodeInfo::new(
Keypair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
);
let d = crdt::NodeInfo::new_localhost(Keypair::new().pubkey());
assert!(index_blobs(&d, &blobs, &mut (offset as u64)).is_ok());
for b in blobs {
let idx = b.read().get_index().unwrap() as usize % WINDOW_SIZE;

View File

@ -43,6 +43,7 @@ pub mod recorder;
pub mod recvmmsg;
pub mod recycler;
pub mod replicate_stage;
pub mod replicator;
pub mod request;
pub mod request_processor;
pub mod request_stage;
@ -54,6 +55,8 @@ pub mod service;
pub mod signature;
pub mod sigverify;
pub mod sigverify_stage;
pub mod storage_program;
pub mod store_ledger_stage;
pub mod streamer;
pub mod system_program;
pub mod thin_client;

181
src/replicator.rs Normal file
View File

@ -0,0 +1,181 @@
use blob_fetch_stage::BlobFetchStage;
use crdt::{Crdt, Node, NodeInfo};
use ncp::Ncp;
use service::Service;
use std::net::SocketAddr;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread::JoinHandle;
use store_ledger_stage::StoreLedgerStage;
use streamer::BlobReceiver;
use window;
use window_service::window_service;
pub struct Replicator {
ncp: Ncp,
fetch_stage: BlobFetchStage,
store_ledger_stage: StoreLedgerStage,
t_window: JoinHandle<()>,
pub retransmit_receiver: BlobReceiver,
}
impl Replicator {
pub fn new(
entry_height: u64,
exit: &Arc<AtomicBool>,
ledger_path: Option<&str>,
node: Node,
network_addr: Option<SocketAddr>,
) -> Replicator {
let window = window::new_window_from_entries(&[], entry_height, &node.info);
let shared_window = Arc::new(RwLock::new(window));
let crdt = Arc::new(RwLock::new(Crdt::new(node.info).expect("Crdt::new")));
let leader_info = network_addr.map(|i| NodeInfo::new_entry_point(&i));
if let Some(leader_info) = leader_info.as_ref() {
crdt.write().unwrap().insert(leader_info);
} else {
panic!("No leader info!");
}
let repair_socket = Arc::new(node.sockets.repair);
let mut blob_sockets: Vec<Arc<UdpSocket>> =
node.sockets.replicate.into_iter().map(Arc::new).collect();
blob_sockets.push(repair_socket.clone());
let (fetch_stage, blob_fetch_receiver) =
BlobFetchStage::new_multi_socket(blob_sockets, exit.clone());
let (blob_window_sender, blob_window_receiver) = channel();
// todo: pull blobs off the retransmit_receiver and recycle them?
let (retransmit_sender, retransmit_receiver) = channel();
let t_window = window_service(
crdt.clone(),
shared_window.clone(),
entry_height,
blob_fetch_receiver,
blob_window_sender,
retransmit_sender,
repair_socket,
);
let store_ledger_stage = StoreLedgerStage::new(blob_window_receiver, ledger_path);
let ncp = Ncp::new(
&crdt,
shared_window.clone(),
ledger_path,
node.sockets.gossip,
exit.clone(),
);
Replicator {
ncp,
fetch_stage,
store_ledger_stage,
t_window,
retransmit_receiver,
}
}
pub fn join(self) {
self.ncp.join().unwrap();
self.t_window.join().unwrap();
self.fetch_stage.join().unwrap();
self.store_ledger_stage.join().unwrap();
}
}
#[cfg(test)]
mod tests {
use client::mk_client;
use crdt::Node;
use fullnode::Fullnode;
use ledger::{genesis, read_ledger};
use logger;
use replicator::Replicator;
use signature::{Keypair, KeypairUtil};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
#[test]
fn test_replicator_startup() {
logger::setup();
info!("starting replicator test");
let entry_height = 0;
let replicator_ledger_path = "replicator_test_replicator_ledger";
let exit = Arc::new(AtomicBool::new(false));
let leader_ledger_path = "replicator_test_leader_ledger";
let (mint, leader_ledger_path) = genesis(leader_ledger_path, 100);
info!("starting leader node");
let leader_keypair = Keypair::new();
let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let network_addr = leader_node.sockets.gossip.local_addr().unwrap();
let leader_info = leader_node.info.clone();
let leader_rotation_interval = 20;
let leader = Fullnode::new(
leader_node,
&leader_ledger_path,
leader_keypair,
None,
false,
Some(leader_rotation_interval),
);
let mut leader_client = mk_client(&leader_info);
let bob = Keypair::new();
let last_id = leader_client.get_last_id();
leader_client
.transfer(1, &mint.keypair(), bob.pubkey(), &last_id)
.unwrap();
let replicator_keypair = Keypair::new();
info!("starting replicator node");
let replicator_node = Node::new_localhost_with_pubkey(replicator_keypair.pubkey());
let replicator = Replicator::new(
entry_height,
&exit,
Some(replicator_ledger_path),
replicator_node,
Some(network_addr),
);
let mut num_entries = 0;
for _ in 0..10 {
match read_ledger(replicator_ledger_path, true) {
Ok(entries) => {
for _ in entries {
num_entries += 1;
}
info!("{} entries", num_entries);
if num_entries > 0 {
break;
}
}
Err(e) => {
info!("error reading ledger: {:?}", e);
}
}
sleep(Duration::new(1, 0));
let last_id = leader_client.get_last_id();
leader_client
.transfer(1, &mint.keypair(), bob.pubkey(), &last_id)
.unwrap();
}
assert!(num_entries > 0);
exit.store(true, Ordering::Relaxed);
replicator.join();
leader.exit();
}
}

42
src/storage_program.rs Normal file
View File

@ -0,0 +1,42 @@
//! storage program
//! Receive mining proofs from miners, validate the answers
//! and give reward for good proofs.
use bank::Account;
use bincode::deserialize;
use signature::Pubkey;
use transaction::Transaction;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum StorageProgram {
SubmitMiningProof { sha_state: [u8; 32] },
}
pub const STORAGE_PROGRAM_ID: [u8; 32] = [1u8; 32];
impl StorageProgram {
pub fn check_id(program_id: &Pubkey) -> bool {
program_id.as_ref() == STORAGE_PROGRAM_ID
}
pub fn id() -> Pubkey {
Pubkey::new(&STORAGE_PROGRAM_ID)
}
pub fn get_balance(account: &Account) -> i64 {
account.tokens
}
pub fn process_transaction(tx: &Transaction, _accounts: &mut [Account]) {
let syscall: StorageProgram = deserialize(&tx.userdata).unwrap();
match syscall {
StorageProgram::SubmitMiningProof { sha_state } => {
info!("Mining proof submitted with state {}", sha_state[0]);
return;
}
}
}
}
#[cfg(test)]
mod test {}

73
src/store_ledger_stage.rs Normal file
View File

@ -0,0 +1,73 @@
//! The `store_ledger` stores the ledger from received blobs for storage nodes
use counter::Counter;
use ledger::{reconstruct_entries_from_blobs, LedgerWriter};
use log::Level;
use result::{Error, Result};
use service::Service;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::RecvTimeoutError;
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use streamer::BlobReceiver;
pub struct StoreLedgerStage {
thread_hdls: Vec<JoinHandle<()>>,
}
impl StoreLedgerStage {
/// Process entry blobs, already in order
fn store_requests(
window_receiver: &BlobReceiver,
ledger_writer: Option<&mut LedgerWriter>,
) -> Result<()> {
let timer = Duration::new(1, 0);
let mut blobs = window_receiver.recv_timeout(timer)?;
while let Ok(mut more) = window_receiver.try_recv() {
blobs.append(&mut more);
}
let entries = reconstruct_entries_from_blobs(blobs.clone())?;
inc_new_counter_info!(
"store-transactions",
entries.iter().map(|x| x.transactions.len()).sum()
);
if let Some(ledger_writer) = ledger_writer {
ledger_writer.write_entries(entries)?;
}
Ok(())
}
pub fn new(window_receiver: BlobReceiver, ledger_path: Option<&str>) -> Self {
let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, true).unwrap());
let t_store_requests = Builder::new()
.name("solana-store-ledger-stage".to_string())
.spawn(move || loop {
if let Err(e) = Self::store_requests(&window_receiver, ledger_writer.as_mut()) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => error!("{:?}", e),
}
}
}).unwrap();
let thread_hdls = vec![t_store_requests];
StoreLedgerStage { thread_hdls }
}
}
impl Service for StoreLedgerStage {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls {
thread_hdl.join()?;
}
Ok(())
}
}

View File

@ -133,7 +133,7 @@ impl WriteStage {
let start = Instant::now();
for _ in 0..ventries.len() {
let entries = ventries.pop().unwrap();
for e in entries.iter() {
for e in &entries {
num_txs += e.transactions.len();
}
let crdt_votes_start = Instant::now();