2018-05-22 14:26:28 -07:00
|
|
|
//! The `replicate_stage` replicates transactions broadcast by the leader.
|
|
|
|
|
2018-05-22 15:30:46 -07:00
|
|
|
use bank::Bank;
|
2018-07-05 12:01:40 -07:00
|
|
|
use bincode::serialize;
|
|
|
|
use crdt::Crdt;
|
2018-05-22 15:30:46 -07:00
|
|
|
use ledger;
|
2018-07-05 12:01:40 -07:00
|
|
|
use packet::BlobRecycler;
|
2018-07-05 15:29:49 -07:00
|
|
|
use result::{Error, Result};
|
2018-07-03 21:14:08 -07:00
|
|
|
use service::Service;
|
2018-07-05 12:01:40 -07:00
|
|
|
use signature::KeyPair;
|
|
|
|
use std::collections::VecDeque;
|
|
|
|
use std::net::UdpSocket;
|
|
|
|
use std::sync::mpsc::channel;
|
2018-07-05 15:29:49 -07:00
|
|
|
use std::sync::mpsc::RecvTimeoutError;
|
2018-07-05 12:01:40 -07:00
|
|
|
use std::sync::{Arc, RwLock};
|
2018-07-03 21:14:08 -07:00
|
|
|
use std::thread::{self, Builder, JoinHandle};
|
2018-05-22 15:30:46 -07:00
|
|
|
use std::time::Duration;
|
2018-07-05 12:01:40 -07:00
|
|
|
use streamer::{responder, BlobReceiver, BlobSender};
|
|
|
|
use timing;
|
|
|
|
use transaction::Transaction;
|
|
|
|
use voting::entries_to_votes;
|
2018-05-22 14:26:28 -07:00
|
|
|
|
|
|
|
pub struct ReplicateStage {
|
2018-07-05 12:01:40 -07:00
|
|
|
thread_hdls: Vec<JoinHandle<()>>,
|
2018-05-22 14:26:28 -07:00
|
|
|
}
|
|
|
|
|
2018-07-05 12:01:40 -07:00
|
|
|
const VOTE_TIMEOUT_MS: u64 = 1000;
|
|
|
|
|
2018-05-22 14:26:28 -07:00
|
|
|
impl ReplicateStage {
|
2018-06-15 14:27:06 -07:00
|
|
|
/// Process entry blobs, already in order
|
2018-07-05 12:01:40 -07:00
|
|
|
fn replicate_requests(
|
|
|
|
keypair: &Arc<KeyPair>,
|
|
|
|
bank: &Arc<Bank>,
|
|
|
|
crdt: &Arc<RwLock<Crdt>>,
|
|
|
|
blob_recycler: &BlobRecycler,
|
|
|
|
window_receiver: &BlobReceiver,
|
|
|
|
vote_blob_sender: &BlobSender,
|
|
|
|
last_vote: &mut u64,
|
|
|
|
) -> Result<()> {
|
2018-05-22 15:30:46 -07:00
|
|
|
let timer = Duration::new(1, 0);
|
2018-07-05 12:01:40 -07:00
|
|
|
//coalesce all the available blobs into a single vote
|
|
|
|
let mut blobs = window_receiver.recv_timeout(timer)?;
|
|
|
|
while let Ok(mut more) = window_receiver.try_recv() {
|
|
|
|
blobs.append(&mut more);
|
|
|
|
}
|
2018-06-22 12:20:29 -07:00
|
|
|
let blobs_len = blobs.len();
|
2018-07-05 12:01:40 -07:00
|
|
|
let entries = ledger::reconstruct_entries_from_blobs(blobs.clone())?;
|
|
|
|
let votes = entries_to_votes(&entries);
|
2018-05-29 08:52:40 -07:00
|
|
|
let res = bank.process_entries(entries);
|
2018-05-22 15:30:46 -07:00
|
|
|
if res.is_err() {
|
2018-06-22 12:20:29 -07:00
|
|
|
error!("process_entries {} {:?}", blobs_len, res);
|
2018-05-22 15:30:46 -07:00
|
|
|
}
|
2018-07-05 12:01:40 -07:00
|
|
|
let now = timing::timestamp();
|
|
|
|
if now - *last_vote > VOTE_TIMEOUT_MS {
|
|
|
|
let height = res?;
|
|
|
|
let last_id = bank.last_id();
|
|
|
|
let shared_blob = blob_recycler.allocate();
|
|
|
|
let (vote, addr) = {
|
|
|
|
let mut wcrdt = crdt.write().unwrap();
|
|
|
|
wcrdt.insert_votes(votes);
|
|
|
|
//TODO: doesn't seem like there is a synchronous call to get height and id
|
|
|
|
info!("replicate_stage {} {:?}", height, &last_id[..8]);
|
2018-07-10 12:02:51 -07:00
|
|
|
wcrdt.new_vote(height, last_id)
|
|
|
|
}?;
|
2018-07-05 12:01:40 -07:00
|
|
|
{
|
|
|
|
let mut blob = shared_blob.write().unwrap();
|
|
|
|
let tx = Transaction::new_vote(&keypair, vote, last_id, 0);
|
|
|
|
let bytes = serialize(&tx)?;
|
|
|
|
let len = bytes.len();
|
|
|
|
blob.data[..len].copy_from_slice(&bytes);
|
|
|
|
blob.meta.set_addr(&addr);
|
|
|
|
blob.meta.size = len;
|
|
|
|
}
|
|
|
|
*last_vote = now;
|
|
|
|
vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?;
|
|
|
|
}
|
|
|
|
while let Some(blob) = blobs.pop_front() {
|
|
|
|
blob_recycler.recycle(blob);
|
|
|
|
}
|
2018-05-22 15:30:46 -07:00
|
|
|
Ok(())
|
|
|
|
}
|
2018-07-05 12:01:40 -07:00
|
|
|
pub fn new(
|
2018-07-10 12:02:51 -07:00
|
|
|
keypair: KeyPair,
|
2018-07-05 12:01:40 -07:00
|
|
|
bank: Arc<Bank>,
|
|
|
|
crdt: Arc<RwLock<Crdt>>,
|
|
|
|
blob_recycler: BlobRecycler,
|
|
|
|
window_receiver: BlobReceiver,
|
|
|
|
) -> Self {
|
|
|
|
let (vote_blob_sender, vote_blob_receiver) = channel();
|
|
|
|
let send = UdpSocket::bind("0.0.0.0:0").expect("bind");
|
|
|
|
let t_responder = responder(send, blob_recycler.clone(), vote_blob_receiver);
|
2018-07-10 12:02:51 -07:00
|
|
|
let skeypair = Arc::new(keypair);
|
2018-05-22 15:30:46 -07:00
|
|
|
|
2018-07-05 12:01:40 -07:00
|
|
|
let t_replicate = Builder::new()
|
2018-05-30 13:38:15 -07:00
|
|
|
.name("solana-replicate-stage".to_string())
|
2018-07-05 12:01:40 -07:00
|
|
|
.spawn(move || {
|
|
|
|
let mut timestamp: u64 = 0;
|
|
|
|
loop {
|
|
|
|
if let Err(e) = Self::replicate_requests(
|
2018-07-10 12:02:51 -07:00
|
|
|
&skeypair,
|
2018-07-05 12:01:40 -07:00
|
|
|
&bank,
|
|
|
|
&crdt,
|
|
|
|
&blob_recycler,
|
|
|
|
&window_receiver,
|
|
|
|
&vote_blob_sender,
|
|
|
|
&mut timestamp,
|
|
|
|
) {
|
|
|
|
match e {
|
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
|
|
|
_ => error!("{:?}", e),
|
|
|
|
}
|
2018-07-05 15:29:49 -07:00
|
|
|
}
|
2018-05-30 13:38:15 -07:00
|
|
|
}
|
|
|
|
})
|
|
|
|
.unwrap();
|
2018-07-05 12:01:40 -07:00
|
|
|
ReplicateStage {
|
|
|
|
thread_hdls: vec![t_responder, t_replicate],
|
|
|
|
}
|
2018-05-22 14:26:28 -07:00
|
|
|
}
|
|
|
|
}
|
2018-07-03 21:14:08 -07:00
|
|
|
|
|
|
|
impl Service for ReplicateStage {
|
|
|
|
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
|
2018-07-05 12:01:40 -07:00
|
|
|
self.thread_hdls
|
2018-07-03 21:14:08 -07:00
|
|
|
}
|
|
|
|
fn join(self) -> thread::Result<()> {
|
2018-07-05 12:01:40 -07:00
|
|
|
for thread_hdl in self.thread_hdls() {
|
|
|
|
thread_hdl.join()?;
|
|
|
|
}
|
|
|
|
Ok(())
|
2018-07-03 21:14:08 -07:00
|
|
|
}
|
|
|
|
}
|