insert votes as they are observed
This commit is contained in:
parent
194a84c8dd
commit
ab3e460e64
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
use bank::Bank;
|
use bank::Bank;
|
||||||
use bincode::serialize;
|
use bincode::serialize;
|
||||||
use counter::Counter;
|
use counter::{Counter, DEFAULT_LOG_RATE};
|
||||||
use crdt::Crdt;
|
use crdt::Crdt;
|
||||||
use ledger;
|
use ledger;
|
||||||
use packet::BlobRecycler;
|
use packet::BlobRecycler;
|
||||||
|
@ -27,7 +27,6 @@ pub struct ReplicateStage {
|
||||||
}
|
}
|
||||||
|
|
||||||
const VOTE_TIMEOUT_MS: u64 = 1000;
|
const VOTE_TIMEOUT_MS: u64 = 1000;
|
||||||
const LOG_RATE: usize = 10;
|
|
||||||
|
|
||||||
impl ReplicateStage {
|
impl ReplicateStage {
|
||||||
/// Process entry blobs, already in order
|
/// Process entry blobs, already in order
|
||||||
|
@ -48,13 +47,19 @@ impl ReplicateStage {
|
||||||
}
|
}
|
||||||
let blobs_len = blobs.len();
|
let blobs_len = blobs.len();
|
||||||
let entries = ledger::reconstruct_entries_from_blobs(blobs.clone())?;
|
let entries = ledger::reconstruct_entries_from_blobs(blobs.clone())?;
|
||||||
|
{
|
||||||
let votes = entries_to_votes(&entries);
|
let votes = entries_to_votes(&entries);
|
||||||
|
let mut wcrdt = crdt.write().unwrap();
|
||||||
static mut COUNTER_REPLICATE: Counter = create_counter!("replicate-transactions", LOG_RATE);
|
wcrdt.insert_votes(&votes);
|
||||||
|
};
|
||||||
|
{
|
||||||
|
static mut COUNTER_REPLICATE: Counter =
|
||||||
|
create_counter!("replicate-transactions", DEFAULT_LOG_RATE);
|
||||||
inc_counter!(
|
inc_counter!(
|
||||||
COUNTER_REPLICATE,
|
COUNTER_REPLICATE,
|
||||||
entries.iter().map(|x| x.transactions.len()).sum()
|
entries.iter().map(|x| x.transactions.len()).sum()
|
||||||
);
|
);
|
||||||
|
}
|
||||||
let res = bank.process_entries(entries);
|
let res = bank.process_entries(entries);
|
||||||
if res.is_err() {
|
if res.is_err() {
|
||||||
error!("process_entries {} {:?}", blobs_len, res);
|
error!("process_entries {} {:?}", blobs_len, res);
|
||||||
|
@ -66,7 +71,6 @@ impl ReplicateStage {
|
||||||
let shared_blob = blob_recycler.allocate();
|
let shared_blob = blob_recycler.allocate();
|
||||||
let (vote, addr) = {
|
let (vote, addr) = {
|
||||||
let mut wcrdt = crdt.write().unwrap();
|
let mut wcrdt = crdt.write().unwrap();
|
||||||
wcrdt.insert_votes(&votes);
|
|
||||||
//TODO: doesn't seem like there is a synchronous call to get height and id
|
//TODO: doesn't seem like there is a synchronous call to get height and id
|
||||||
info!("replicate_stage {} {:?}", height, &last_id[..8]);
|
info!("replicate_stage {} {:?}", height, &last_id[..8]);
|
||||||
wcrdt.new_vote(height, last_id)
|
wcrdt.new_vote(height, last_id)
|
||||||
|
@ -80,7 +84,13 @@ impl ReplicateStage {
|
||||||
blob.meta.set_addr(&addr);
|
blob.meta.set_addr(&addr);
|
||||||
blob.meta.size = len;
|
blob.meta.size = len;
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
static mut COUNTER_REPLICATE_VOTE: Counter =
|
||||||
|
create_counter!("replicate-vote_sent", DEFAULT_LOG_RATE);
|
||||||
|
inc_counter!(COUNTER_REPLICATE_VOTE, 1);
|
||||||
|
}
|
||||||
*last_vote = now;
|
*last_vote = now;
|
||||||
|
|
||||||
vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?;
|
vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?;
|
||||||
}
|
}
|
||||||
while let Some(blob) = blobs.pop_front() {
|
while let Some(blob) = blobs.pop_front() {
|
||||||
|
|
Loading…
Reference in New Issue