support an initial window filled with last up-to-WINDOW_SIZE blobs
This commit is contained in:
parent
71f05cb23e
commit
ed0a590549
98
src/bank.rs
98
src/bank.rs
|
@ -19,6 +19,7 @@ use std::result;
|
|||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::RwLock;
|
||||
use std::time::Instant;
|
||||
use streamer::WINDOW_SIZE;
|
||||
use timing::duration_as_us;
|
||||
use transaction::{Instruction, Plan, Transaction};
|
||||
|
||||
|
@ -306,10 +307,7 @@ impl Bank {
|
|||
}
|
||||
|
||||
/// Process an ordered list of entries.
|
||||
pub fn process_entries<I>(&self, entries: I) -> Result<u64>
|
||||
where
|
||||
I: IntoIterator<Item = Entry>,
|
||||
{
|
||||
pub fn process_entries(&self, entries: Vec<Entry>) -> Result<u64> {
|
||||
let mut entry_count = 0;
|
||||
for entry in entries {
|
||||
entry_count += 1;
|
||||
|
@ -348,7 +346,7 @@ impl Bank {
|
|||
}
|
||||
|
||||
/// Process a full ledger.
|
||||
pub fn process_ledger<I>(&self, entries: I) -> Result<u64>
|
||||
pub fn process_ledger<I>(&self, entries: I) -> Result<(u64, Vec<Entry>)>
|
||||
where
|
||||
I: IntoIterator<Item = Entry>,
|
||||
{
|
||||
|
@ -364,20 +362,39 @@ impl Bank {
|
|||
let entry1 = entries
|
||||
.next()
|
||||
.expect("invalid ledger: need at least 2 entries");
|
||||
let tx = &entry1.transactions[0];
|
||||
let deposit = if let Instruction::NewContract(contract) = &tx.instruction {
|
||||
contract.plan.final_payment()
|
||||
} else {
|
||||
None
|
||||
}.expect("invalid ledger, needs to start with a contract");
|
||||
{
|
||||
let tx = &entry1.transactions[0];
|
||||
let deposit = if let Instruction::NewContract(contract) = &tx.instruction {
|
||||
contract.plan.final_payment()
|
||||
} else {
|
||||
None
|
||||
}.expect("invalid ledger, needs to start with a contract");
|
||||
|
||||
self.apply_payment(&deposit, &mut self.balances.write().unwrap());
|
||||
self.apply_payment(&deposit, &mut self.balances.write().unwrap());
|
||||
}
|
||||
self.register_entry_id(&entry0.id);
|
||||
self.register_entry_id(&entry1.id);
|
||||
|
||||
let mut entry_count = 2;
|
||||
entry_count += self.process_blocks(entries)?;
|
||||
Ok(entry_count)
|
||||
let mut tail = Vec::with_capacity(WINDOW_SIZE as usize);
|
||||
let mut next = Vec::with_capacity(WINDOW_SIZE as usize);
|
||||
|
||||
for block in &entries.into_iter().chunks(WINDOW_SIZE as usize) {
|
||||
tail = next;
|
||||
next = block.collect();
|
||||
entry_count += self.process_blocks(next.clone())?;
|
||||
}
|
||||
|
||||
tail.append(&mut next);
|
||||
|
||||
if tail.len() < WINDOW_SIZE as usize {
|
||||
tail.insert(0, entry1);
|
||||
if tail.len() < WINDOW_SIZE as usize {
|
||||
tail.insert(0, entry0);
|
||||
}
|
||||
}
|
||||
|
||||
Ok((entry_count, tail))
|
||||
}
|
||||
|
||||
/// Process a Witness Signature. Any payment plans waiting on this signature
|
||||
|
@ -483,9 +500,9 @@ mod tests {
|
|||
use super::*;
|
||||
use bincode::serialize;
|
||||
use entry::next_entry;
|
||||
use entry::Entry;
|
||||
use entry_writer::{self, EntryWriter};
|
||||
use hash::hash;
|
||||
use ledger::next_entries;
|
||||
use signature::KeyPairUtil;
|
||||
use std::io::{BufReader, Cursor, Seek, SeekFrom};
|
||||
|
||||
|
@ -720,25 +737,52 @@ mod tests {
|
|||
assert_eq!(bank.get_balance(&mint.pubkey()), 1);
|
||||
}
|
||||
|
||||
fn create_sample_block(mint: &Mint) -> impl Iterator<Item = Entry> {
|
||||
let keypair = KeyPair::new();
|
||||
let tx = Transaction::new(&mint.keypair(), keypair.pubkey(), 1, mint.last_id());
|
||||
next_entries(&mint.last_id(), 0, vec![tx]).into_iter()
|
||||
fn create_sample_block(mint: &Mint, length: usize) -> impl Iterator<Item = Entry> {
|
||||
let mut entries = Vec::with_capacity(length);
|
||||
let mut hash = mint.last_id();
|
||||
let mut cur_hashes = 0;
|
||||
for _ in 0..length {
|
||||
let keypair = KeyPair::new();
|
||||
let tx = Transaction::new(&mint.keypair(), keypair.pubkey(), 1, mint.last_id());
|
||||
let entry = Entry::new_mut(&mut hash, &mut cur_hashes, vec![tx], false);
|
||||
entries.push(entry);
|
||||
}
|
||||
entries.into_iter()
|
||||
}
|
||||
|
||||
fn create_sample_ledger() -> (impl Iterator<Item = Entry>, PublicKey) {
|
||||
let mint = Mint::new(2);
|
||||
fn create_sample_ledger(length: usize) -> (impl Iterator<Item = Entry>, PublicKey) {
|
||||
let mint = Mint::new(1 + length as i64);
|
||||
let genesis = mint.create_entries();
|
||||
let block = create_sample_block(&mint);
|
||||
let block = create_sample_block(&mint, length);
|
||||
(genesis.into_iter().chain(block), mint.pubkey())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_process_ledger() {
|
||||
let (ledger, pubkey) = create_sample_ledger();
|
||||
let (ledger, pubkey) = create_sample_ledger(1);
|
||||
let (ledger, dup) = ledger.tee();
|
||||
let bank = Bank::default();
|
||||
bank.process_ledger(ledger).unwrap();
|
||||
let (ledger_height, tail) = bank.process_ledger(ledger).unwrap();
|
||||
assert_eq!(bank.get_balance(&pubkey), 1);
|
||||
assert_eq!(ledger_height, 3);
|
||||
assert_eq!(tail.len(), 3);
|
||||
assert_eq!(tail, dup.collect_vec());
|
||||
let last_entry = &tail[tail.len() - 1];
|
||||
assert_eq!(bank.last_id(), last_entry.id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_process_ledger_around_window_size() {
|
||||
let window_size = WINDOW_SIZE as usize;
|
||||
for entry_count in window_size - 1..window_size + 1 {
|
||||
let (ledger, pubkey) = create_sample_ledger(entry_count);
|
||||
let bank = Bank::default();
|
||||
let (ledger_height, tail) = bank.process_ledger(ledger).unwrap();
|
||||
assert_eq!(bank.get_balance(&pubkey), 1);
|
||||
assert_eq!(ledger_height, entry_count as u64 + 2);
|
||||
assert!(tail.len() <= window_size);
|
||||
let last_entry = &tail[tail.len() - 1];
|
||||
assert_eq!(bank.last_id(), last_entry.id);
|
||||
}
|
||||
}
|
||||
|
||||
// Write the given entries to a file and then return a file iterator to them.
|
||||
|
@ -753,7 +797,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_process_ledger_from_file() {
|
||||
let (ledger, pubkey) = create_sample_ledger();
|
||||
let (ledger, pubkey) = create_sample_ledger(1);
|
||||
let ledger = to_file_iter(ledger);
|
||||
|
||||
let bank = Bank::default();
|
||||
|
@ -765,7 +809,7 @@ mod tests {
|
|||
fn test_process_ledger_from_files() {
|
||||
let mint = Mint::new(2);
|
||||
let genesis = to_file_iter(mint.create_entries().into_iter());
|
||||
let block = to_file_iter(create_sample_block(&mint));
|
||||
let block = to_file_iter(create_sample_block(&mint, 1));
|
||||
|
||||
let bank = Bank::default();
|
||||
bank.process_ledger(genesis.chain(block)).unwrap();
|
||||
|
|
14
src/crdt.rs
14
src/crdt.rs
|
@ -388,7 +388,7 @@ impl Crdt {
|
|||
//filter myself
|
||||
false
|
||||
} else if v.replicate_addr == daddr {
|
||||
//filter nodes that are not listening
|
||||
trace!("broadcast skip not listening {:x}", v.debug_id());
|
||||
false
|
||||
} else {
|
||||
trace!("broadcast node {}", v.replicate_addr);
|
||||
|
@ -400,7 +400,7 @@ impl Crdt {
|
|||
warn!("crdt too small");
|
||||
Err(CrdtError::TooSmall)?;
|
||||
}
|
||||
trace!("nodes table {}", nodes.len());
|
||||
trace!("broadcast nodes {}", nodes.len());
|
||||
|
||||
// enumerate all the blobs in the window, those are the indices
|
||||
// transmit them to nodes, starting from a different node
|
||||
|
@ -414,7 +414,7 @@ impl Crdt {
|
|||
orders.push((window_l[k].clone(), nodes[is % nodes.len()]));
|
||||
}
|
||||
|
||||
trace!("orders table {}", orders.len());
|
||||
trace!("broadcast orders table {}", orders.len());
|
||||
let errs: Vec<_> = orders
|
||||
.into_iter()
|
||||
.map(|(b, v)| {
|
||||
|
@ -471,13 +471,14 @@ impl Crdt {
|
|||
trace!("skip retransmit to leader {:?}", v.id);
|
||||
false
|
||||
} else if v.replicate_addr == daddr {
|
||||
trace!("skip nodes that are not listening {:?}", v.id);
|
||||
trace!("retransmit skip not listening {:x}", v.debug_id());
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
trace!("retransmit orders {}", orders.len());
|
||||
let errs: Vec<_> = orders
|
||||
.par_iter()
|
||||
.map(|v| {
|
||||
|
@ -757,6 +758,11 @@ impl Crdt {
|
|||
}
|
||||
|
||||
return Some(out);
|
||||
} else {
|
||||
info!(
|
||||
"requested ix {} != blob_ix {}, outside window!",
|
||||
ix, blob_ix
|
||||
);
|
||||
}
|
||||
} else {
|
||||
assert!(window.read().unwrap()[pos].is_none());
|
||||
|
|
|
@ -273,6 +273,7 @@ mod tests {
|
|||
let server = FullNode::new_leader(
|
||||
bank,
|
||||
0,
|
||||
None,
|
||||
Some(Duration::from_millis(30)),
|
||||
leader,
|
||||
exit.clone(),
|
||||
|
|
|
@ -2,11 +2,14 @@
|
|||
|
||||
use bank::Bank;
|
||||
use crdt::{Crdt, ReplicatedData, TestNode};
|
||||
use entry::Entry;
|
||||
use entry_writer;
|
||||
use ledger::Block;
|
||||
use ncp::Ncp;
|
||||
use packet::BlobRecycler;
|
||||
use rpu::Rpu;
|
||||
use service::Service;
|
||||
use std::collections::VecDeque;
|
||||
use std::fs::{File, OpenOptions};
|
||||
use std::io::{sink, stdin, stdout, BufReader};
|
||||
use std::io::{Read, Write};
|
||||
|
@ -51,9 +54,9 @@ impl FullNode {
|
|||
};
|
||||
let reader = BufReader::new(infile);
|
||||
let entries = entry_writer::read_entries(reader).map(|e| e.expect("failed to parse entry"));
|
||||
info!("processing ledger...");
|
||||
let entry_height = bank.process_ledger(entries).expect("process_ledger");
|
||||
|
||||
info!("processing ledger...");
|
||||
let (entry_height, ledger_tail) = bank.process_ledger(entries).expect("process_ledger");
|
||||
// entry_height is the network-wide agreed height of the ledger.
|
||||
// initialize it from the input ledger
|
||||
info!("processed {} ledger...", entry_height);
|
||||
|
@ -74,6 +77,7 @@ impl FullNode {
|
|||
let server = FullNode::new_validator(
|
||||
bank,
|
||||
entry_height,
|
||||
Some(ledger_tail),
|
||||
node,
|
||||
network_entry_point,
|
||||
exit.clone(),
|
||||
|
@ -100,6 +104,7 @@ impl FullNode {
|
|||
let server = FullNode::new_leader(
|
||||
bank,
|
||||
entry_height,
|
||||
Some(ledger_tail),
|
||||
//Some(Duration::from_millis(1000)),
|
||||
None,
|
||||
node,
|
||||
|
@ -113,6 +118,27 @@ impl FullNode {
|
|||
server
|
||||
}
|
||||
}
|
||||
|
||||
fn new_window(
|
||||
ledger_tail: Option<Vec<Entry>>,
|
||||
entry_height: u64,
|
||||
crdt: &Arc<RwLock<Crdt>>,
|
||||
blob_recycler: &BlobRecycler,
|
||||
) -> streamer::Window {
|
||||
match ledger_tail {
|
||||
Some(ledger_tail) => {
|
||||
// convert to blobs
|
||||
let mut blobs = VecDeque::new();
|
||||
ledger_tail.to_blobs(&blob_recycler, &mut blobs);
|
||||
|
||||
// flatten deque to vec
|
||||
let blobs: Vec<_> = blobs.into_iter().collect();
|
||||
streamer::initialized_window(&crdt, blobs, entry_height)
|
||||
}
|
||||
None => streamer::default_window(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a server instance acting as a leader.
|
||||
///
|
||||
/// ```text
|
||||
|
@ -140,6 +166,7 @@ impl FullNode {
|
|||
pub fn new_leader<W: Write + Send + 'static>(
|
||||
bank: Bank,
|
||||
entry_height: u64,
|
||||
ledger_tail: Option<Vec<Entry>>,
|
||||
tick_duration: Option<Duration>,
|
||||
node: TestNode,
|
||||
exit: Arc<AtomicBool>,
|
||||
|
@ -166,7 +193,9 @@ impl FullNode {
|
|||
);
|
||||
thread_hdls.extend(tpu.thread_hdls());
|
||||
let crdt = Arc::new(RwLock::new(Crdt::new(node.data)));
|
||||
let window = streamer::default_window();
|
||||
|
||||
let window = FullNode::new_window(ledger_tail, entry_height, &crdt, &blob_recycler);
|
||||
|
||||
let ncp = Ncp::new(
|
||||
crdt.clone(),
|
||||
window.clone(),
|
||||
|
@ -221,6 +250,7 @@ impl FullNode {
|
|||
pub fn new_validator(
|
||||
bank: Bank,
|
||||
entry_height: u64,
|
||||
ledger_tail: Option<Vec<Entry>>,
|
||||
node: TestNode,
|
||||
entry_point: ReplicatedData,
|
||||
exit: Arc<AtomicBool>,
|
||||
|
@ -239,7 +269,11 @@ impl FullNode {
|
|||
crdt.write()
|
||||
.expect("'crdt' write lock before insert() in pub fn replicate")
|
||||
.insert(&entry_point);
|
||||
let window = streamer::default_window();
|
||||
|
||||
let blob_recycler = BlobRecycler::default();
|
||||
|
||||
let window = FullNode::new_window(ledger_tail, entry_height, &crdt, &blob_recycler);
|
||||
|
||||
let ncp = Ncp::new(
|
||||
crdt.clone(),
|
||||
window.clone(),
|
||||
|
@ -292,7 +326,7 @@ mod tests {
|
|||
let bank = Bank::new(&alice);
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let entry = tn.data.clone();
|
||||
let v = FullNode::new_validator(bank, 0, tn, entry, exit.clone());
|
||||
let v = FullNode::new_validator(bank, 0, None, tn, entry, exit.clone());
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
for t in v.thread_hdls {
|
||||
t.join().unwrap();
|
||||
|
|
|
@ -462,6 +462,44 @@ pub fn default_window() -> Window {
|
|||
Arc::new(RwLock::new(vec![None; WINDOW_SIZE as usize]))
|
||||
}
|
||||
|
||||
/// Initialize a rebroadcast window with most recent Entry blobs
|
||||
/// * `crdt` - gossip instance, used to set blob ids
|
||||
/// * `blobs` - up to WINDOW_SIZE most recent blobs
|
||||
/// * `entry_height` - current entry height
|
||||
pub fn initialized_window(
|
||||
crdt: &Arc<RwLock<Crdt>>,
|
||||
blobs: Vec<SharedBlob>,
|
||||
entry_height: u64,
|
||||
) -> Window {
|
||||
let window = default_window();
|
||||
|
||||
{
|
||||
let mut win = window.write().unwrap();
|
||||
assert!(blobs.len() <= win.len());
|
||||
|
||||
debug!(
|
||||
"initialized window entry_height:{} blobs_len:{}",
|
||||
entry_height,
|
||||
blobs.len()
|
||||
);
|
||||
|
||||
// Index the blobs
|
||||
let mut received = entry_height - blobs.len() as u64;
|
||||
Crdt::index_blobs(crdt, &blobs, &mut received).expect("index blobs for initial window");
|
||||
|
||||
// populate the window, offset by implied index
|
||||
for b in blobs {
|
||||
let ix = b.read().unwrap().get_index().expect("blob index");
|
||||
let pos = (ix % WINDOW_SIZE) as usize;
|
||||
trace!("caching {} at {}", ix, pos);
|
||||
assert!(win[pos].is_none());
|
||||
win[pos] = Some(b);
|
||||
}
|
||||
}
|
||||
|
||||
window
|
||||
}
|
||||
|
||||
pub fn window(
|
||||
crdt: Arc<RwLock<Crdt>>,
|
||||
window: Window,
|
||||
|
|
|
@ -290,6 +290,7 @@ mod tests {
|
|||
let server = FullNode::new_leader(
|
||||
bank,
|
||||
0,
|
||||
None,
|
||||
Some(Duration::from_millis(30)),
|
||||
leader,
|
||||
exit.clone(),
|
||||
|
@ -329,6 +330,7 @@ mod tests {
|
|||
let server = FullNode::new_leader(
|
||||
bank,
|
||||
0,
|
||||
None,
|
||||
Some(Duration::from_millis(30)),
|
||||
leader,
|
||||
exit.clone(),
|
||||
|
@ -380,6 +382,7 @@ mod tests {
|
|||
let server = FullNode::new_leader(
|
||||
bank,
|
||||
0,
|
||||
None,
|
||||
Some(Duration::from_millis(30)),
|
||||
leader,
|
||||
exit.clone(),
|
||||
|
|
|
@ -306,6 +306,7 @@ fn test_multi_node_dynamic_network() {
|
|||
Some(OutFile::Path(ledger_path.clone())),
|
||||
exit.clone(),
|
||||
);
|
||||
info!("{:x} LEADER", leader_data.debug_id());
|
||||
let threads = server.thread_hdls();
|
||||
let leader_balance =
|
||||
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap();
|
||||
|
@ -328,6 +329,7 @@ fn test_multi_node_dynamic_network() {
|
|||
Some(OutFile::Path(ledger_path.clone())),
|
||||
exit.clone(),
|
||||
);
|
||||
info!("{:x} VALIDATOR", rd.debug_id());
|
||||
(rd, exit, val)
|
||||
})
|
||||
.collect();
|
||||
|
|
Loading…
Reference in New Issue