add experimental perfectseq

This commit is contained in:
GroovieGermanikus 2023-12-14 11:37:50 +01:00
parent babf35e122
commit 37f76e010c
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
4 changed files with 540 additions and 12 deletions

31
Cargo.lock generated
View File

@ -737,6 +737,12 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2"
[[package]]
name = "convert_case"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e"
[[package]]
name = "core-foundation"
version = "0.9.4"
@ -914,6 +920,19 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "derive_more"
version = "0.99.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321"
dependencies = [
"convert_case",
"proc-macro2",
"quote",
"rustc_version",
"syn 1.0.109",
]
[[package]]
name = "digest"
version = "0.9.0"
@ -1226,6 +1245,7 @@ dependencies = [
"async-stream",
"base64 0.21.5",
"bincode",
"derive_more",
"futures",
"itertools",
"log",
@ -2742,15 +2762,6 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1"
dependencies = [
"libc",
]
[[package]]
name = "signature"
version = "1.6.4"
@ -3506,9 +3517,7 @@ dependencies = [
"libc",
"mio",
"num_cpus",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.5.5",
"tokio-macros",
"windows-sys 0.48.0",

View File

@ -10,7 +10,7 @@ yellowstone-grpc-proto = "1.11.0"
solana-sdk = "~1.16.17"
async-stream = "0.3.5"
tokio = { version = "1.28.2" , features = ["full"] }
tokio = { version = "1.28" , features = ["rt"] }
futures = "0.3.28"
anyhow = "1.0.70"
log = "0.4.17"
@ -20,6 +20,7 @@ prometheus = "0.13.3"
itertools = "0.10.5"
base64 = "0.21.5"
bincode = "1.3.3"
derive_more = "0.99.17"
#[dev-dependencies]
#solana-test-validator = "~1.16.17"

View File

@ -0,0 +1,126 @@
/// Experiments with the drain-to-tip pattern used for GRPC multiplexing.
///
use std::fmt::Display;
use derive_more::Display;
use futures::StreamExt;
use log::{debug, error, info, warn};
use serde::Serializer;
use tokio::select;
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::time::{Duration, sleep};
#[derive(Debug, Clone, Display)]
struct Message {
slot: u64,
}
impl Message {
fn new(slot: u64) -> Self {
Message { slot }
}
}
#[tokio::main]
async fn main() {
// RUST_LOG=info,stream_via_grpc=debug,drain_to_tip_pattern=debug
tracing_subscriber::fmt::init();
let (tx, rx) = tokio::sync::broadcast::channel::<Message>(1000);
let (tx_tip, _) = tokio::sync::watch::channel::<Message>(Message::new(0));
start_progressor(rx, tx_tip.subscribe()).await;
send_stream(tx.clone()).await;
// move tip; current tip is 3; next offered slot is 4
info!("==> force tip to 6 - expect progressor to unblock and offer 7");
tx_tip.send(Message::new(6)).unwrap();
info!("Blocking main thread for some time to allow the system to operate...");
sleep(tokio::time::Duration::from_secs(4)).await;
info!("Num broadcast subscribers: {}", tx_tip.receiver_count());
info!("Shutting down....");
drop(tx_tip);
sleep(Duration::from_secs(1)).await;
info!("Shutdown completed.");
}
// this service is dedicated to one source channel which produces a monotonic stream of messages qualified by slot number
// service maintains a tip variable which is updated by different part of system
// service response to tip changes by blocking until the message received from stream has slot number greater than the tip
// service "offers" this message to the rest of the system
async fn start_progressor(mut blocks_notifier: Receiver<Message>, mut rx_tip: tokio::sync::watch::Receiver<Message>) {
info!("Started progressor");
tokio::spawn(async move {
let mut local_tip = Message::new(3);
// block after tip offered by this stream
// TODO: block_after_tip is only valid/useful if greater than tip
let mut highest_block = Message::new(0);
'main_loop: loop {
select! {
result = rx_tip.changed() => {
if result.is_err() {
debug!("Tip variable closed");
break 'main_loop;
}
local_tip = rx_tip.borrow_and_update().clone();
info!("++> tip changed to {}", local_tip);
if local_tip.slot <= highest_block.slot {
info!("!! next offered slot is invalid: {} <= {}", highest_block.slot, local_tip);
}
// slow down in case of loop
// sleep(Duration::from_millis(100)).await;
}
// here goes the strategy: either we get a new block OR a timeout
recv_result = blocks_notifier.recv(), if !(highest_block.slot > local_tip.slot) => {
debug!("!! block_after_tip.slot > local_tip.slot: {} > {}", highest_block.slot, local_tip.slot);
match recv_result {
Ok(msg) => {
info!("=> recv: {}", msg);
if msg.slot > local_tip.slot {
info!("==> offer next slot ({} -> {})", local_tip, msg.slot);
highest_block = msg;
// offer_block_sender.send(OfferBlockMsg::NextSlot(label.clone(), block_after_tip.clone())).await.unwrap();
// this thread will sleep and not issue any recvs until we get tip.changed signal
continue 'main_loop;
}
}
Err(e) => {
// TODO what to do?
error!("Error receiving block: {}", e);
break 'main_loop;
}
}
}
}
} // -- main loop
info!("Shutting down progressor.");
});
}
async fn send_stream(message_channel: Sender<Message>) {
// tip is 3
// drain 0 to 3; offer 4, then block
for i in 0..10 {
info!("> sending {} (queue size={})", i, message_channel.len());
message_channel.send(Message::new(i)).unwrap();
sleep(Duration::from_millis(100)).await;
}
assert_eq!(message_channel.len(), 5);
}

View File

@ -0,0 +1,392 @@
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{bail, Context};
use futures::StreamExt;
use itertools::{ExactlyOneError, Itertools};
use log::{debug, error, info, warn};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use tokio::select;
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::sync::broadcast::error::{RecvError, TryRecvError};
use tokio::sync::RwLock;
use tokio::time::{sleep, Duration, timeout};
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequestFilterBlocksMeta, SubscribeUpdateBlockMeta};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
pub const GRPC_VERSION: &str = "1.16.1";
#[tokio::main]
// #[tokio::main(flavor = "multi_thread", worker_threads = 16)]
pub async fn main() {
// RUST_LOG=info,grpcmultiplex_perfectseq=debug,drain_to_tip_pattern=debug
tracing_subscriber::fmt::init();
// mango validator (mainnet)
let _grpc_addr_mainnet_triton = "http://202.8.9.108:10000".to_string();
// ams81 (mainnet)
let grpc_addr_mainnet_ams81 = "http://202.8.8.12:10000".to_string();
// testnet - NOTE: this connection has terrible lags (almost 5 minutes)
// let grpc_addr = "http://147.28.169.13:10000".to_string();
// let (block_sx_green, blocks_notifier_green) = tokio::sync::broadcast::channel(1000);
let (block_sx_green, blocks_notifier_green) = start_monkey_broadcast::<SimpleBlockMeta>(1000);
let (block_sx_blue, blocks_notifier_blue) = tokio::sync::broadcast::channel(1000);
// TODO ship ProducedBlock
let (sx_multi, mut rx_multi) = tokio::sync::broadcast::channel::<Slot>(1000);
let grpc_x_token = None;
let block_confirmed_task_green = create_blockmeta_processing_task(
_grpc_addr_mainnet_triton.clone(),
grpc_x_token.clone(),
block_sx_green.clone(),
CommitmentLevel::Confirmed,
);
let block_confirmed_task_blue = create_blockmeta_processing_task(
grpc_addr_mainnet_ams81.clone(),
grpc_x_token.clone(),
block_sx_blue.clone(),
CommitmentLevel::Confirmed,
);
let (tx_tip, _) = tokio::sync::watch::channel::<Slot>(0);
let (offer_block_sender, mut offer_block_notifier) = tokio::sync::mpsc::channel::<OfferBlockMsg>(100);
// producers
start_progressor("green".to_string(), blocks_notifier_green, tx_tip.subscribe(), offer_block_sender.clone()).await;
start_progressor("blue".to_string(), blocks_notifier_blue, tx_tip.subscribe(), offer_block_sender.clone()).await;
// merge task
// collect the offered slots from the two channels
tokio::spawn(async move {
// need to wait until channels reached slot beyond tip
// tokio::time::sleep(Duration::from_secs(14)).await;
// see also start procedure!
let mut current_tip = 0;
let mut blocks_offered = Vec::<BlockRef>::new();
'main_loop: loop {
// note: we abuse the timeout mechanism to collect some blocks
let timeout_secs = if current_tip == 0 { 20 } else { 10 };
let msg_or_timeout = timeout(Duration::from_secs(timeout_secs), offer_block_notifier.recv()).await;
info!("- current_tip={}", current_tip);
match msg_or_timeout {
Ok(Some(offer_block_msg)) => {
// collect the offered slots from the channels
if let OfferBlockMsg::NextSlot(label, block_offered) = offer_block_msg {
info!("<< offered slot from {}: {:?}", label, block_offered);
// TOOD use .parent instead
if block_offered.parent_slot == current_tip {
current_tip = block_offered.slot;
info!("<< take block from {} as new tip {}", label, current_tip);
assert_ne!(current_tip, 0, "must not see uninitialized tip");
emit_block_on_multiplex_output_channel(&sx_multi, current_tip);
tx_tip.send(current_tip).unwrap();
blocks_offered.clear();
continue 'main_loop;
} else {
// keep the block for later
blocks_offered.push(block_offered);
continue 'main_loop;
}
}
// TODO handle else
}
Ok(None) => {
// TODO double-check
// channel closed
info!("--> channel closed");
break;
}
Err(_elapsed) => {
// timeout
info!("--> timeout: got these slots: {:?}", blocks_offered);
// we abuse timeout feature to wait for some blocks to arrive to select the "best" one
if current_tip == 0 {
let start_slot = blocks_offered.iter().max_by(|lhs,rhs| lhs.slot.cmp(&rhs.slot)).expect("need at least one slot to start");
current_tip = start_slot.slot;
assert_ne!(current_tip, 0, "must not see uninitialized tip");
emit_block_on_multiplex_output_channel(&sx_multi, current_tip);
tx_tip.send(current_tip).unwrap();
info!("--> initializing with tip {}", current_tip);
blocks_offered.clear();
continue 'main_loop;
}
match blocks_offered.iter().filter(|b| b.parent_slot == current_tip).exactly_one() {
Ok(found_next) => {
current_tip = found_next.slot;
assert_ne!(current_tip, 0, "must not see uninitialized tip");
tx_tip.send(current_tip).unwrap();
info!("--> progressing tip to {}", current_tip);
blocks_offered.clear();
}
Err(missed) => {
warn!("--> no slots offered - SHOULD ABORT - no hope to progress");
}
}
sleep(Duration::from_millis(500)).await;
}
}
} // -- recv loop
info!("Shutting down merge task.");
});
tokio::spawn(async move {
let mut rx_multi = rx_multi;
let mut last_slot = 0;
loop {
let slot = rx_multi.recv().await.unwrap();
assert_ne!(slot, 0, "must not see empty slot");
info!("==> multiplexed slot: {}", slot);
if slot - last_slot > 1 && last_slot != 0 {
warn!("==> gap: {} -> {}", last_slot, slot);
}
last_slot = slot;
}
});
// "infinite" sleep
sleep(Duration::from_secs(1800)).await;
// TODO proper shutdown
info!("Shutdown completed.");
}
fn emit_block_on_multiplex_output_channel(sx_multi: &Sender<Slot>, mut current_tip: u64) {
sx_multi.send(current_tip).unwrap();
}
#[derive(Clone, Debug)]
struct BlockRef {
pub slot: Slot,
pub parent_slot: Slot,
}
impl From<SimpleBlockMeta> for BlockRef {
fn from(block: SimpleBlockMeta) -> Self {
BlockRef {
slot: block.slot,
parent_slot: block.parent_slot,
}
}
}
#[derive(Debug)]
enum OfferBlockMsg {
NextSlot(String, BlockRef),
}
async fn start_progressor(label: String, blocks_notifier: Receiver<SimpleBlockMeta>, mut rx_tip: tokio::sync::watch::Receiver<Slot>,
offer_block_sender: tokio::sync::mpsc::Sender<OfferBlockMsg>) {
tokio::spawn(async move {
// TODO is .resubscribe what we want?
let mut blocks_notifier = blocks_notifier.resubscribe();
// for test only
// let start_slot = blocks_notifier.recv().await.unwrap().slot;
// local copy of tip
let mut local_tip = 0;
// block after tip offered by this stream
// TODO: block_after_tip is only valid/useful if greater than tip
let mut highest_block: BlockRef = BlockRef {
slot: 0,
parent_slot: 0,
};
'main_loop: loop {
select! {
result = rx_tip.changed() => {
if result.is_err() {
debug!("Tip variable closed for {}", label);
break 'main_loop;
}
local_tip = rx_tip.borrow_and_update().clone();
info!("++> {} tip changed to {}", label, local_tip);
// TODO update local tip
}
recv_result = blocks_notifier.recv(), if !(highest_block.slot > local_tip) => {
match recv_result {
Ok(block) => {
info!("=> recv on {}: {}",label, format_block(&block));
if block.slot > local_tip {
info!("==> {}: beyond tip ({} > {})", label, block.slot, local_tip);
highest_block = BlockRef::from(block);
offer_block_sender.send(OfferBlockMsg::NextSlot(label.clone(), highest_block.clone())).await.unwrap();
// this thread will sleep and not issue any recvs until we get tip.changed signal
continue 'main_loop;
}
}
Err(e) => {
// TODO what to do?
error!("Error receiving block: {}", e);
break 'main_loop;
}
}
}
}
}
});
}
fn format_block(block: &SimpleBlockMeta) -> String {
format!("{:?}@{} (-> {})", block.slot, block.commitment_config.commitment, block.parent_slot)
}
fn start_monkey_broadcast<T: Clone + Send + 'static>(capacity: usize) -> (Sender<T>, Receiver<T>) {
let (tx, monkey_upstream) = tokio::sync::broadcast::channel::<T>(1024);
let (monkey_downstream, rx) = tokio::sync::broadcast::channel::<T>(capacity);
tokio::spawn(async move {
let mut monkey_upstream = monkey_upstream;
'recv_loop: for counter in 1.. {
let value = match monkey_upstream.recv().await {
Ok(msg) => {
msg
}
Err(RecvError::Closed) => {
return ();
}
Err(RecvError::Lagged(_)) => {
continue;
}
};
if let Ok(val) = monkey_upstream.recv().await {
val
} else {
return ();
};
// info!("forwarding: {}", value);
// failes if there are no receivers
if counter % 3 == 0 {
debug!("% delay value (monkey)");
tokio::time::sleep(Duration::from_millis(700)).await;
}
if counter % 5 == 0 {
debug!("% drop value (monkey)");
continue 'recv_loop;
}
if counter % 23 == 0 {
debug!("% system outage + reboot (monkey)");
tokio::time::sleep(Duration::from_secs(5)).await;
}
let send_result = monkey_downstream.send(value);
match send_result {
Ok(_) => {
debug!("% forwarded");
}
Err(_) => panic!("Should never happen")
}
}
});
(tx, rx)
}
#[derive(Debug, Clone)]
struct SimpleBlockMeta {
slot: Slot,
parent_slot: Slot,
commitment_config: CommitmentConfig,
}
fn create_blockmeta_processing_task(
grpc_addr: String,
grpc_x_token: Option<String>,
block_sx: Sender<SimpleBlockMeta>,
commitment_level: CommitmentLevel,
) -> tokio::task::JoinHandle<anyhow::Result<()>> {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocksMeta {},
);
let commitment_config = match commitment_level {
CommitmentLevel::Confirmed => CommitmentConfig::confirmed(),
CommitmentLevel::Finalized => CommitmentConfig::finalized(),
CommitmentLevel::Processed => CommitmentConfig::processed(),
};
let foo = tokio::spawn(async move {
// connect to grpc
let mut client = GeyserGrpcClient::connect(grpc_addr, grpc_x_token, None)?;
let mut stream = client
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
Default::default(),
blocks_subs,
Some(commitment_level),
Default::default(),
None,
)
.await?;
while let Some(message) = stream.next().await {
let message = message?;
let Some(update) = message.update_oneof else {
continue;
};
match update {
UpdateOneof::BlockMeta(block_meta) => {
let block_meta = process_blockmeta(block_meta, commitment_config);
block_sx
.send(block_meta)
.context("Grpc failed to send a block")?;
}
UpdateOneof::Ping(_) => {
log::trace!("GRPC Ping");
}
u => {
bail!("Unexpected update: {u:?}");
}
};
}
bail!("geyser slot stream ended");
});
foo
}
fn process_blockmeta(block_meta: SubscribeUpdateBlockMeta, commitment_config: CommitmentConfig) -> SimpleBlockMeta {
SimpleBlockMeta {
slot: block_meta.slot,
parent_slot: block_meta.parent_slot,
commitment_config: commitment_config,
}
}