Use Vec instead of VecDeque for SharedBlobs

This commit is contained in:
Greg Fitzgerald 2018-09-03 00:22:47 -10:00
parent 9a9f89293a
commit 8cc030ef84
10 changed files with 49 additions and 66 deletions

View File

@ -27,7 +27,6 @@ use result::{Error, Result};
use signature::{Keypair, KeypairUtil, Pubkey};
use std;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::io::Cursor;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
@ -796,9 +795,7 @@ impl Crdt {
// TODO this will get chatty, so we need to first ask for number of updates since
// then only ask for specific data that we dont have
let blob = to_blob(req, remote_gossip_addr, blob_recycler)?;
let mut q: VecDeque<SharedBlob> = VecDeque::new();
q.push_back(blob);
blob_sender.send(q)?;
blob_sender.send(vec![blob])?;
Ok(())
}
/// TODO: This is obviously the wrong way to do this. Need to implement leader selection
@ -1188,8 +1185,8 @@ impl Crdt {
while let Ok(mut more) = requests_receiver.try_recv() {
reqs.append(&mut more);
}
let mut resps = VecDeque::new();
while let Some(req) = reqs.pop_front() {
let mut resps = Vec::new();
for req in reqs {
if let Some(resp) = Self::handle_blob(
obj,
window,
@ -1197,7 +1194,7 @@ impl Crdt {
blob_recycler,
&req.read().unwrap(),
) {
resps.push_back(resp);
resps.push(resp);
}
blob_recycler.recycle(req);
}

View File

@ -10,7 +10,6 @@ use packet::{self, SharedBlob, BLOB_DATA_SIZE};
use rayon::prelude::*;
use result::{Error, Result};
use signature::Pubkey;
use std::collections::VecDeque;
use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions};
use std::io::prelude::*;
use std::io::{self, BufReader, BufWriter, Seek, SeekFrom};
@ -413,7 +412,7 @@ pub fn read_ledger(
pub trait Block {
/// Verifies the hashes and counts of a slice of transactions are all consistent.
fn verify(&self, start_hash: &Hash) -> bool;
fn to_blobs(&self, blob_recycler: &packet::BlobRecycler, q: &mut VecDeque<SharedBlob>);
fn to_blobs(&self, blob_recycler: &packet::BlobRecycler, q: &mut Vec<SharedBlob>);
fn votes(&self) -> Vec<(Pubkey, Vote, Hash)>;
}
@ -434,10 +433,10 @@ impl Block for [Entry] {
})
}
fn to_blobs(&self, blob_recycler: &packet::BlobRecycler, q: &mut VecDeque<SharedBlob>) {
fn to_blobs(&self, blob_recycler: &packet::BlobRecycler, q: &mut Vec<SharedBlob>) {
for entry in self {
let blob = entry.to_blob(blob_recycler, None, None, None);
q.push_back(blob);
q.push(blob);
}
}
@ -448,7 +447,7 @@ impl Block for [Entry] {
}
}
pub fn reconstruct_entries_from_blobs(blobs: VecDeque<SharedBlob>) -> Result<Vec<Entry>> {
pub fn reconstruct_entries_from_blobs(blobs: Vec<SharedBlob>) -> Result<Vec<Entry>> {
let mut entries: Vec<Entry> = Vec::with_capacity(blobs.len());
for blob in blobs {
@ -635,7 +634,7 @@ mod tests {
let entries = make_test_entries();
let blob_recycler = BlobRecycler::default();
let mut blob_q = VecDeque::new();
let mut blob_q = Vec::new();
entries.to_blobs(&blob_recycler, &mut blob_q);
assert_eq!(reconstruct_entries_from_blobs(blob_q).unwrap(), entries);

View File

@ -6,7 +6,6 @@ use log::Level;
use result::{Error, Result};
use serde::Serialize;
use signature::Pubkey;
use std::collections::VecDeque;
use std::fmt;
use std::io;
use std::mem::size_of;
@ -16,7 +15,7 @@ use std::sync::{Arc, Mutex, RwLock};
pub type SharedPackets = Arc<RwLock<Packets>>;
pub type SharedBlob = Arc<RwLock<Blob>>;
pub type SharedBlobs = VecDeque<SharedBlob>;
pub type SharedBlobs = Vec<SharedBlob>;
pub type PacketRecycler = Recycler<Packets>;
pub type BlobRecycler = Recycler<Blob>;
@ -336,9 +335,9 @@ pub fn to_blobs<T: Serialize>(
rsps: Vec<(T, SocketAddr)>,
blob_recycler: &BlobRecycler,
) -> Result<SharedBlobs> {
let mut blobs = VecDeque::new();
let mut blobs = Vec::new();
for (resp, rsp_addr) in rsps {
blobs.push_back(to_blob(resp, rsp_addr, blob_recycler)?);
blobs.push(to_blob(resp, rsp_addr, blob_recycler)?);
}
Ok(blobs)
}
@ -437,7 +436,7 @@ impl Blob {
self.set_data_size(new_size as u64).unwrap();
}
pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result<SharedBlobs> {
let mut v = VecDeque::new();
let mut v = Vec::new();
//DOCUMENTED SIDE-EFFECT
//Performance out of the IO without poll
// * block on the socket until it's readable
@ -471,12 +470,12 @@ impl Blob {
}
}
}
v.push_back(r);
v.push(r);
}
Ok(v)
}
pub fn send_to(re: &BlobRecycler, socket: &UdpSocket, v: &mut SharedBlobs) -> Result<()> {
while let Some(r) = v.pop_front() {
pub fn send_to(re: &BlobRecycler, socket: &UdpSocket, v: SharedBlobs) -> Result<()> {
for r in v {
{
let p = r.read().expect("'r' read lock in pub fn send_to");
let a = p.meta.addr();
@ -501,7 +500,6 @@ mod tests {
BLOB_HEADER_SIZE, NUM_PACKETS,
};
use request::Request;
use std::collections::VecDeque;
use std::io;
use std::io::Write;
use std::net::UdpSocket;
@ -613,18 +611,13 @@ mod tests {
let p = r.allocate();
p.write().unwrap().meta.set_addr(&addr);
p.write().unwrap().meta.size = 1024;
let mut v = VecDeque::new();
v.push_back(p);
assert_eq!(v.len(), 1);
Blob::send_to(&r, &sender, &mut v).unwrap();
let v = vec![p];
Blob::send_to(&r, &sender, v).unwrap();
trace!("send_to");
assert_eq!(v.len(), 0);
let mut rv = Blob::recv_from(&r, &reader).unwrap();
let rv = Blob::recv_from(&r, &reader).unwrap();
trace!("recv_from");
assert_eq!(rv.len(), 1);
let rp = rv.pop_front().unwrap();
assert_eq!(rp.write().unwrap().meta.size, 1024);
r.recycle(rp);
assert_eq!(rv[0].write().unwrap().meta.size, 1024);
}
#[cfg(all(feature = "ipv6", test))]

View File

@ -43,7 +43,7 @@ impl ReplicateStage {
let res = bank.process_entries(entries.clone());
while let Some(blob) = blobs.pop_front() {
for blob in blobs {
blob_recycler.recycle(blob);
}

View File

@ -32,7 +32,7 @@ fn retransmit(
Crdt::retransmit(&crdt, b, sock)?;
}
}
while let Some(b) = dq.pop_front() {
for b in dq {
recycler.recycle(b);
}
Ok(())

View File

@ -64,8 +64,8 @@ pub fn receiver(
fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> {
let timer = Duration::new(1, 0);
let mut msgs = r.recv_timeout(timer)?;
Blob::send_to(recycler, sock, &mut msgs)?;
let msgs = r.recv_timeout(timer)?;
Blob::send_to(recycler, sock, msgs)?;
Ok(())
}
@ -144,7 +144,6 @@ pub fn blob_receiver(
#[cfg(test)]
mod test {
use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE};
use std::collections::VecDeque;
use std::io;
use std::io::Write;
use std::net::UdpSocket;
@ -198,7 +197,7 @@ mod test {
resp_recycler.clone(),
r_responder,
);
let mut msgs = VecDeque::new();
let mut msgs = Vec::new();
for i in 0..10 {
let b = resp_recycler.allocate();
{
@ -207,7 +206,7 @@ mod test {
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&addr);
}
msgs.push_back(b);
msgs.push(b);
}
s_responder.send(msgs).expect("send");
t_responder

View File

@ -154,7 +154,6 @@ pub mod tests {
use packet::BlobRecycler;
use service::Service;
use signature::{Keypair, KeypairUtil};
use std::collections::VecDeque;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
@ -248,7 +247,7 @@ pub mod tests {
);
let mut alice_ref_balance = starting_balance;
let mut msgs = VecDeque::new();
let mut msgs = Vec::new();
let mut cur_hash = Hash::default();
let mut blob_id = 0;
let num_transfers = 10;
@ -287,7 +286,7 @@ pub mod tests {
w.set_size(serialized_entry.len());
w.meta.set_addr(&replicate_addr);
}
msgs.push_back(b);
msgs.push(b);
}
}

View File

@ -12,7 +12,6 @@ use packet::{BlobRecycler, SharedBlob};
use result::Result;
use service::Service;
use signature::Keypair;
use std::collections::VecDeque;
use std::result;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
@ -136,7 +135,7 @@ pub fn send_leader_vote(
if let Ok(shared_blob) =
create_new_signed_vote_blob(&last_id, keypair, crdt, blob_recycler)
{
vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?;
vote_blob_sender.send(vec![shared_blob])?;
let finality_ms = now - super_majority_timestamp;
*last_valid_validator_timestamp = super_majority_timestamp;
@ -170,7 +169,7 @@ fn send_validator_vote(
if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, keypair, crdt, blob_recycler) {
inc_new_counter_info!("replicate-vote_sent", 1);
vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?;
vote_blob_sender.send(vec![shared_blob])?;
}
Ok(())
}
@ -388,7 +387,7 @@ pub mod tests {
assert!(vote_blob.is_ok());
// vote should be valid
let blob = vote_blob.unwrap().pop_front().unwrap();
let blob = &vote_blob.unwrap()[0];
let tx = deserialize(&(blob.read().unwrap().data)).unwrap();
assert!(bank.process_transaction(&tx).is_ok());
}

View File

@ -12,7 +12,6 @@ use rand::{thread_rng, Rng};
use result::{Error, Result};
use signature::Pubkey;
use std::cmp;
use std::collections::VecDeque;
use std::mem;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::AtomicUsize;
@ -158,7 +157,7 @@ fn add_block_to_retransmit_queue(
b: &SharedBlob,
leader_id: Pubkey,
recycler: &BlobRecycler,
retransmit_queue: &mut VecDeque<SharedBlob>,
retransmit_queue: &mut Vec<SharedBlob>,
) {
let p = b
.read()
@ -193,7 +192,7 @@ fn add_block_to_retransmit_queue(
mnv.meta.size = sz;
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
}
retransmit_queue.push_back(nv);
retransmit_queue.push(nv);
}
}
@ -208,7 +207,7 @@ fn retransmit_all_leader_blocks(
window: &SharedWindow,
pending_retransmits: &mut bool,
) -> Result<()> {
let mut retransmit_queue: VecDeque<SharedBlob> = VecDeque::new();
let mut retransmit_queue: Vec<SharedBlob> = Vec::new();
if let Some(leader) = maybe_leader {
let leader_id = leader.id;
for b in dq {
@ -364,7 +363,7 @@ fn process_blob(
// window[k].data is None, end of received
break;
}
consume_queue.push_back(window[k].data.clone().expect("clone in fn recv_window"));
consume_queue.push(window[k].data.clone().expect("clone in fn recv_window"));
*consumed += 1;
}
}
@ -450,8 +449,8 @@ fn recv_window(
let mut pixs = Vec::new();
//send a contiguous set of blocks
let mut consume_queue = VecDeque::new();
while let Some(b) = dq.pop_front() {
let mut consume_queue = Vec::new();
for b in dq {
let (pix, meta_size) = {
let p = b.write().unwrap();
(p.get_index()?, p.meta.size)
@ -614,7 +613,7 @@ pub fn new_window_from_entries(
blob_recycler: &BlobRecycler,
) -> SharedWindow {
// convert to blobs
let mut blobs = VecDeque::new();
let mut blobs = Vec::new();
ledger_tail.to_blobs(&blob_recycler, &mut blobs);
// flatten deque to vec
@ -684,7 +683,6 @@ mod test {
use crdt::{Crdt, Node};
use logger;
use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE};
use std::collections::VecDeque;
use std::io;
use std::io::Write;
use std::net::UdpSocket;
@ -741,7 +739,7 @@ mod test {
resp_recycler.clone(),
r_responder,
);
let mut msgs = VecDeque::new();
let mut msgs = Vec::new();
for i in 0..10 {
let b = resp_recycler.allocate();
{
@ -750,7 +748,7 @@ mod test {
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&addr);
}
msgs.push_back(b);
msgs.push(b);
}
s_responder.send(msgs).expect("send");
t_responder
@ -821,7 +819,7 @@ mod test {
resp_recycler.clone(),
r_responder,
);
let mut msgs = VecDeque::new();
let mut msgs = Vec::new();
for v in 0..10 {
let i = 9 - v;
let b = resp_recycler.allocate();
@ -833,7 +831,7 @@ mod test {
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.info.contact_info.ncp);
}
msgs.push_back(b);
msgs.push(b);
}
s_responder.send(msgs).expect("send");
t_responder
@ -891,7 +889,7 @@ mod test {
resp_recycler.clone(),
r_responder,
);
let mut msgs = VecDeque::new();
let mut msgs = Vec::new();
for v in 0..10 {
let i = 9 - v;
let b = resp_recycler.allocate();
@ -903,7 +901,7 @@ mod test {
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.info.contact_info.ncp);
}
msgs.push_back(b);
msgs.push(b);
}
s_responder.send(msgs).expect("send");
t_responder
@ -954,7 +952,7 @@ mod test {
resp_recycler.clone(),
r_responder,
);
let mut msgs = VecDeque::new();
let mut msgs = Vec::new();
for v in 0..10 {
let i = 9 - v;
let b = resp_recycler.allocate();
@ -966,7 +964,7 @@ mod test {
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.info.contact_info.ncp);
}
msgs.push_back(b);
msgs.push(b);
}
s_responder.send(msgs).expect("send");
@ -974,7 +972,7 @@ mod test {
subs.write().unwrap().set_leader(me_id);
let mut msgs1 = VecDeque::new();
let mut msgs1 = Vec::new();
for v in 1..5 {
let i = 9 + v;
let b = resp_recycler.allocate();
@ -986,7 +984,7 @@ mod test {
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.info.contact_info.ncp);
}
msgs1.push_back(b);
msgs1.push(b);
}
s_responder.send(msgs1).expect("send");
t_responder

View File

@ -12,7 +12,6 @@ use packet::BlobRecycler;
use result::{Error, Result};
use service::Service;
use signature::Keypair;
use std::collections::VecDeque;
use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
@ -55,7 +54,7 @@ impl WriteStage {
//on a valid last id
trace!("New blobs? {}", entries.len());
let mut blobs = VecDeque::new();
let mut blobs = Vec::new();
entries.to_blobs(blob_recycler, &mut blobs);
if !blobs.is_empty() {