geyser: update block reconstruction code (#177)

This commit is contained in:
Kirill Fomichev 2023-08-21 17:44:58 -04:00 committed by GitHub
parent 18a7faed08
commit afd6569ddc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 317 additions and 181 deletions

View File

@ -12,14 +12,23 @@ The minor version will be incremented upon a breaking change and the patch versi
### Features
### Fixes
### Breaking
## 2023-08-21
- yellowstone-grpc-geyser-1.7.1+solana.1.16.1
### Features
geyser: add package name to version info ([#173](https://github.com/rpcpool/yellowstone-grpc/pull/173)).
### Fixes
geyser: fix overflow for small slot number ([#171](https://github.com/rpcpool/yellowstone-grpc/pull/171)).
geyser: use Notify instead of AtomicBool in send loop ([#176](https://github.com/rpcpool/yellowstone-grpc/pull/176)).
### Breaking
geyser: update block reconstruction code ([#177](https://github.com/rpcpool/yellowstone-grpc/pull/177)).
## 2023-08-10

2
Cargo.lock generated
View File

@ -4269,7 +4269,7 @@ dependencies = [
[[package]]
name = "yellowstone-grpc-geyser"
version = "1.7.0+solana.1.16.1"
version = "1.7.1+solana.1.16.1"
dependencies = [
"anyhow",
"base64 0.21.2",

View File

@ -2,7 +2,7 @@
members = [
"examples/rust", # 1.9.0+solana.1.16.1
"yellowstone-grpc-client", # 1.9.0+solana.1.16.1
"yellowstone-grpc-geyser", # 1.7.0+solana.1.16.1
"yellowstone-grpc-geyser", # 1.7.1+solana.1.16.1
"yellowstone-grpc-proto", # 1.9.0+solana.1.16.1
]

View File

@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-geyser"
version = "1.7.0+solana.1.16.1"
version = "1.7.1+solana.1.16.1"
authors = ["Triton One"]
edition = "2021"
description = "Yellowstone gRPC Geyser Plugin"

View File

@ -2,7 +2,7 @@ use {
crate::{
config::{ConfigBlockFailAction, ConfigGrpc},
filters::{Filter, FilterAccountsDataSlice},
prom::{CONNECTIONS_TOTAL, INVALID_FULL_BLOCKS, MESSAGE_QUEUE_SIZE},
prom::{self, CONNECTIONS_TOTAL, MESSAGE_QUEUE_SIZE},
proto::{
self,
geyser_server::{Geyser, GeyserServer},
@ -17,7 +17,7 @@ use {
},
version::VERSION,
},
log::*,
log::{error, info},
solana_geyser_plugin_interface::geyser_plugin_interface::{
ReplicaAccountInfoV3, ReplicaBlockInfoV2, ReplicaEntryInfo, ReplicaTransactionInfoV2,
SlotStatus,
@ -229,8 +229,22 @@ pub struct MessageBlock {
pub entries: Vec<MessageEntry>,
}
impl From<(MessageBlockMeta, Vec<MessageTransactionInfo>)> for MessageBlock {
fn from((blockinfo, transactions): (MessageBlockMeta, Vec<MessageTransactionInfo>)) -> Self {
impl
From<(
MessageBlockMeta,
Vec<MessageTransactionInfo>,
Vec<MessageAccountInfo>,
Vec<MessageEntry>,
)> for MessageBlock
{
fn from(
(blockinfo, transactions, accounts, entries): (
MessageBlockMeta,
Vec<MessageTransactionInfo>,
Vec<MessageAccountInfo>,
Vec<MessageEntry>,
),
) -> Self {
Self {
parent_slot: blockinfo.parent_slot,
slot: blockinfo.slot,
@ -241,10 +255,10 @@ impl From<(MessageBlockMeta, Vec<MessageTransactionInfo>)> for MessageBlock {
block_height: blockinfo.block_height,
executed_transaction_count: blockinfo.executed_transaction_count,
transactions,
updated_account_count: 0,
accounts: Vec::new(),
entries_count: 0,
entries: Vec::new(),
updated_account_count: accounts.len() as u64,
accounts,
entries_count: entries.len() as u64,
entries,
}
}
}
@ -610,6 +624,52 @@ impl BlockMetaStorage {
}
}
#[derive(Debug, Default)]
struct SlotMessages {
messages: Vec<Option<Message>>, // Option is used for accounts with low write_version
block_meta: Option<MessageBlockMeta>,
transactions: Vec<MessageTransactionInfo>,
accounts_dedup: HashMap<Pubkey, (u64, usize)>, // (write_version, message_index)
entries_parent_updated: bool,
entries_received: bool,
sealed: bool,
confirmed_at: Option<usize>,
finalized_at: Option<usize>,
}
impl SlotMessages {
pub fn try_seal(&mut self) -> Option<Message> {
if !self.sealed {
if let Some(block_meta) = &self.block_meta {
let transactions_count = block_meta.executed_transaction_count as usize;
if transactions_count == self.transactions.len() && self.entries_received {
let transactions = std::mem::take(&mut self.transactions);
let mut accounts = Vec::with_capacity(self.messages.len());
let mut entries = Vec::with_capacity(self.messages.len());
for item in self.messages.iter().flatten() {
match item {
Message::Account(account) => accounts.push(account.account.clone()),
Message::Entry(entry) => entries.push(entry.clone()),
_ => {}
}
}
let message = Message::Block(
(block_meta.clone(), transactions, accounts, entries).into(),
);
self.messages.push(Some(message.clone()));
self.sealed = true;
return Some(message);
}
}
}
None
}
}
#[derive(Debug)]
pub struct GrpcService {
config: ConfigGrpc,
@ -692,195 +752,254 @@ impl GrpcService {
const PROCESSED_MESSAGES_MAX: usize = 31;
const PROCESSED_MESSAGES_SLEEP: Duration = Duration::from_millis(10);
let mut transactions: BTreeMap<
u64,
(Option<MessageBlockMeta>, Vec<MessageTransactionInfo>),
> = BTreeMap::new();
#[allow(clippy::type_complexity)]
let mut messages: HashMap<
u64,
(Vec<Option<Message>>, HashMap<Pubkey, (u64, usize)>, bool),
> = HashMap::new();
let mut messages: BTreeMap<u64, SlotMessages> = Default::default();
let mut processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX);
let mut processed_first_slot = None;
let processed_sleep = sleep(PROCESSED_MESSAGES_SLEEP);
tokio::pin!(processed_sleep);
macro_rules! process_message {
($message:ident) => {
if let Message::Slot(slot) = $message {
let (mut confirmed_messages, mut finalized_messages) = match slot.status {
CommitmentLevel::Processed => {
(Vec::with_capacity(1), Vec::with_capacity(1))
}
CommitmentLevel::Confirmed => {
let messages = messages
.get(&slot.slot)
.map(|entry| entry.0.iter().filter_map(|x| x.clone()).collect())
.unwrap_or_default();
(messages, Vec::with_capacity(1))
}
CommitmentLevel::Finalized => {
messages.retain(|msg_slot, _messages| *msg_slot >= slot.slot);
let messages = messages
.remove(&slot.slot)
.map(|entry| entry.0.into_iter().filter_map(|x| x).collect())
.unwrap_or_default();
(Vec::with_capacity(1), messages)
}
};
// processed
processed_messages.push($message.clone());
let _ =
broadcast_tx.send((CommitmentLevel::Processed, processed_messages.into()));
processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX);
processed_sleep
.as_mut()
.reset(Instant::now() + PROCESSED_MESSAGES_SLEEP);
// confirmed
confirmed_messages.push($message.clone());
let _ =
broadcast_tx.send((CommitmentLevel::Confirmed, confirmed_messages.into()));
// finalized
finalized_messages.push($message);
let _ =
broadcast_tx.send((CommitmentLevel::Finalized, finalized_messages.into()));
} else {
if let Message::Block(block) = &mut $message {
if let Some((vec, map, collected)) = messages.get_mut(&block.slot) {
let mut accounts = Vec::with_capacity(vec.len());
for (_write_version, idx) in map.values() {
if let Some(Some(Message::Account(account))) = vec.get(*idx) {
accounts.push(account.account.clone());
}
}
block.updated_account_count = accounts.len() as u64;
block.accounts = accounts;
let mut entries = Vec::with_capacity(vec.len());
for item in vec {
if let Some(Message::Entry(entry)) = item {
entries.push(entry.clone());
}
}
block.entries_count = entries.len() as u64;
block.entries = entries;
*collected = true;
}
}
let processed_message = $message.clone();
let (vec, map, collected) = messages.entry($message.get_slot()).or_default();
if *collected && !matches!(&$message, Message::Block(_) | Message::BlockMeta(_)) {
match block_fail_action {
ConfigBlockFailAction::Log => {
INVALID_FULL_BLOCKS.inc();
error!("unexpected message ({}) order for slot {}", $message.kind(), $message.get_slot());
}
ConfigBlockFailAction::Panic => {
panic!("unexpected message ({}) order for slot {}", $message.kind(), $message.get_slot());
}
}
}
if let Message::Account(message) = &$message {
let write_version = message.account.write_version;
let index = vec.len();
if let Some(entry) = map.get_mut(&message.account.pubkey) {
if entry.0 < write_version {
vec[entry.1] = None; // We would able to make replace but then we will lose message order
vec.push(Some($message));
entry.0 = write_version;
entry.1 = index;
}
} else {
map.insert(message.account.pubkey, (write_version, index));
vec.push(Some($message));
}
} else {
vec.push(Some($message));
}
processed_messages.push(processed_message);
if processed_messages.len() >= PROCESSED_MESSAGES_MAX {
let _ = broadcast_tx
.send((CommitmentLevel::Processed, processed_messages.into()));
processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX);
processed_sleep
.as_mut()
.reset(Instant::now() + PROCESSED_MESSAGES_SLEEP);
}
}
};
}
loop {
tokio::select! {
Some(mut message) = messages_rx.recv() => {
Some(message) = messages_rx.recv() => {
MESSAGE_QUEUE_SIZE.dec();
// Update blocks info
if let Some(blocks_meta_tx) = &blocks_meta_tx {
if matches!(message, Message::Slot(_) | Message::BlockMeta(_)) {
let _ = blocks_meta_tx.send(message.clone());
}
}
// consctruct Block message
let slot = message.get_slot();
if match &message {
// Collect Transactions for full Block message
Message::Transaction(msg_tx) => {
transactions.entry(slot).or_default().1.push(msg_tx.transaction.clone());
true
// Remove outdated block reconstruction info
match &message {
// On startup we can receive few Confirmed/Finalized slots without BlockMeta message
// With saved first Processed slot we can ignore errors caused by startup process
Message::Slot(msg) if processed_first_slot.is_none() && msg.status == CommitmentLevel::Processed => {
processed_first_slot = Some(msg.slot);
}
// Save block meta for full Block message
Message::BlockMeta(msg_block) => {
transactions.entry(slot).or_default().0 = Some(msg_block.clone());
true
}
_ => false
} && matches!(
transactions.get(&slot),
Some((Some(block_meta), transactions)) if block_meta.executed_transaction_count as usize == transactions.len()
) {
if let Some((Some(block_meta), mut transactions)) = transactions.remove(&slot) {
transactions.sort_by(|tx1, tx2| tx1.index.cmp(&tx2.index));
let mut message = Message::Block((block_meta, transactions).into());
process_message!(message);
}
}
Message::Slot(msg) if msg.status == CommitmentLevel::Finalized => {
// keep extra 10 slots
if let Some(msg_slot) = msg.slot.checked_sub(10) {
loop {
match messages.keys().next().cloned() {
Some(slot) if slot < msg_slot => {
if let Some(slot_messages) = messages.remove(&slot) {
match processed_first_slot {
Some(processed_first) if slot <= processed_first => continue,
None => continue,
_ => {}
}
// remove outdated transactions
if matches!(message, Message::Slot(msg) if msg.status == CommitmentLevel::Finalized) {
loop {
match transactions.keys().next().cloned() {
// Block was dropped, not in chain
Some(kslot) if kslot < slot => {
transactions.remove(&kslot);
}
// Maybe log error
Some(kslot) if kslot == slot => {
if let Some((Some(_), vec)) = transactions.remove(&kslot) {
match block_fail_action {
ConfigBlockFailAction::Log => {
INVALID_FULL_BLOCKS.inc();
error!("{} transactions left for block {kslot}", vec.len());
}
ConfigBlockFailAction::Panic => {
panic!("{} transactions left for block {kslot}", vec.len());
if !slot_messages.sealed && slot_messages.finalized_at.is_some() {
let mut reasons = vec![];
if let Some(block_meta) = slot_messages.block_meta {
let block_txn_count = block_meta.executed_transaction_count as usize;
let msg_txn_count = slot_messages.transactions.len();
if block_txn_count != msg_txn_count {
reasons.push("InvalidTxnCount");
error!("failed reconstruct #{slot} -- tx count: {block_txn_count} vs {msg_txn_count}");
}
} else {
reasons.push("NoBlockMeta");
}
if !slot_messages.entries_received {
reasons.push("MissedEntries");
let msg_entries_count = slot_messages.messages.iter().filter(|x| matches!(x, Some(Message::Entry(_)))).count();
error!("failed reconstruct #{slot} -- entries: {msg_entries_count}");
}
let reason = reasons.join(",");
prom::update_invalid_blocks(format!("failed reconstruct {reason}"));
match block_fail_action {
ConfigBlockFailAction::Log => {
error!("failed reconstruct #{slot} {reason}");
}
ConfigBlockFailAction::Panic => {
panic!("failed reconstruct #{slot} {reason}");
}
}
}
}
}
_ => break,
}
}
_ => break,
}
}
_ => {}
}
// Update block reconstruction info
let slot_messages = messages.entry(message.get_slot()).or_default();
if !matches!(message, Message::Slot(_)) {
slot_messages.messages.push(Some(message.clone()));
// If we already build Block message, new message will be a problem
if slot_messages.sealed {
prom::update_invalid_blocks(format!("unexpected message {}", message.kind()));
match block_fail_action {
ConfigBlockFailAction::Log => {
error!("unexpected message #{} -- {} (invalid order)", message.get_slot(), message.kind());
}
ConfigBlockFailAction::Panic => {
panic!("unexpected message #{} -- {} (invalid order)", message.get_slot(), message.kind());
}
}
}
}
let mut sealed_block_msg = None;
match &message {
Message::BlockMeta(msg) => {
if slot_messages.block_meta.is_some() {
prom::update_invalid_blocks("unexpected message: BlockMeta (duplicate)");
match block_fail_action {
ConfigBlockFailAction::Log => {
error!("unexpected message #{} -- BlockMeta (duplicate)", message.get_slot());
}
ConfigBlockFailAction::Panic => {
panic!("unexpected message #{} -- BlockMeta (duplicate)", message.get_slot());
}
}
}
slot_messages.block_meta = Some(msg.clone());
sealed_block_msg = slot_messages.try_seal();
}
Message::Transaction(msg) => {
slot_messages.transactions.push(msg.transaction.clone());
sealed_block_msg = slot_messages.try_seal();
}
// Dedup accounts by max write_version
Message::Account(msg) => {
let write_version = msg.account.write_version;
let msg_index = slot_messages.messages.len() - 1;
if let Some(entry) = slot_messages.accounts_dedup.get_mut(&msg.account.pubkey) {
if entry.0 < write_version {
// We can replace the message, but in this case we will lose the order
slot_messages.messages[entry.1] = None;
*entry = (write_version, msg_index);
}
} else {
slot_messages.accounts_dedup.insert(msg.account.pubkey, (write_version, msg_index));
}
}
// Entries can come after block meta or last transaction
Message::Entry(msg) if !slot_messages.entries_parent_updated => {
slot_messages.entries_parent_updated = true;
// process original message
process_message!(message);
let mut iter = messages.iter_mut().peekable();
while let Some((_slot, slot_messages)) = iter.next() {
if let Some((slot_peek, _slot_messages)) = iter.peek() {
if **slot_peek == msg.slot {
slot_messages.entries_received = true;
sealed_block_msg = slot_messages.try_seal();
break;
}
}
}
}
_ => {}
}
// Send messages to filter (and to clients)
let mut messages_vec = vec![message];
if let Some(sealed_block_msg) = sealed_block_msg {
messages_vec.push(sealed_block_msg);
}
for message in messages_vec {
if let Message::Slot(slot) = message {
let (mut confirmed_messages, mut finalized_messages) = match slot.status {
CommitmentLevel::Processed => {
(Vec::with_capacity(1), Vec::with_capacity(1))
}
CommitmentLevel::Confirmed => {
if let Some(slot_messages) = messages.get_mut(&slot.slot) {
if !slot_messages.sealed {
slot_messages.confirmed_at = Some(slot_messages.messages.len());
}
}
let vec = messages
.get(&slot.slot)
.map(|slot_messages| slot_messages.messages.iter().flatten().cloned().collect())
.unwrap_or_default();
(vec, Vec::with_capacity(1))
}
CommitmentLevel::Finalized => {
if let Some(slot_messages) = messages.get_mut(&slot.slot) {
if !slot_messages.sealed {
slot_messages.finalized_at = Some(slot_messages.messages.len());
}
}
let vec = messages
.get_mut(&slot.slot)
.map(|slot_messages| slot_messages.messages.iter().flatten().cloned().collect())
.unwrap_or_default();
(Vec::with_capacity(1), vec)
}
};
// processed
processed_messages.push(message.clone());
let _ =
broadcast_tx.send((CommitmentLevel::Processed, processed_messages.into()));
processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX);
processed_sleep
.as_mut()
.reset(Instant::now() + PROCESSED_MESSAGES_SLEEP);
// confirmed
confirmed_messages.push(message.clone());
let _ =
broadcast_tx.send((CommitmentLevel::Confirmed, confirmed_messages.into()));
// finalized
finalized_messages.push(message);
let _ =
broadcast_tx.send((CommitmentLevel::Finalized, finalized_messages.into()));
} else {
let mut confirmed_messages = vec![];
let mut finalized_messages = vec![];
if matches!(message, Message::Block(_)) {
if let Some(slot_messages) = messages.get(&message.get_slot()) {
if let Some(confirmed_at) = slot_messages.confirmed_at {
confirmed_messages.extend(
slot_messages.messages.as_slice()[confirmed_at..].iter().filter_map(|x| x.clone())
);
}
if let Some(finalized_at) = slot_messages.finalized_at {
finalized_messages.extend(
slot_messages.messages.as_slice()[finalized_at..].iter().filter_map(|x| x.clone())
);
}
}
}
processed_messages.push(message);
if processed_messages.len() >= PROCESSED_MESSAGES_MAX
|| !confirmed_messages.is_empty()
|| !finalized_messages.is_empty()
{
let _ = broadcast_tx
.send((CommitmentLevel::Processed, processed_messages.into()));
processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX);
processed_sleep
.as_mut()
.reset(Instant::now() + PROCESSED_MESSAGES_SLEEP);
}
if !confirmed_messages.is_empty() {
let _ =
broadcast_tx.send((CommitmentLevel::Confirmed, confirmed_messages.into()));
}
if !finalized_messages.is_empty() {
let _ =
broadcast_tx.send((CommitmentLevel::Finalized, finalized_messages.into()));
}
}
}
}
() = &mut processed_sleep => {
if !processed_messages.is_empty() {

View File

@ -6,7 +6,7 @@ use {
service::{make_service_fn, service_fn},
Body, Request, Response, Server, StatusCode,
},
log::*,
log::error,
prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder},
solana_geyser_plugin_interface::geyser_plugin_interface::SlotStatus,
std::sync::Once,
@ -26,8 +26,9 @@ lazy_static::lazy_static! {
&["status"]
).unwrap();
pub static ref INVALID_FULL_BLOCKS: IntGauge = IntGauge::new(
"invalid_full_blocks_total", "Total number of fails on constructin full blocks"
pub static ref INVALID_FULL_BLOCKS: IntGaugeVec = IntGaugeVec::new(
Opts::new("invalid_full_blocks_total", "Total number of fails on constructin full blocks"),
&["reason"]
).unwrap();
pub static ref MESSAGE_QUEUE_SIZE: IntGauge = IntGauge::new(
@ -128,3 +129,10 @@ pub fn update_slot_status(status: SlotStatus, slot: u64) {
}])
.set(slot as i64);
}
pub fn update_invalid_blocks(reason: impl AsRef<str>) {
INVALID_FULL_BLOCKS
.with_label_values(&[reason.as_ref()])
.inc();
INVALID_FULL_BLOCKS.with_label_values(&["all"]).inc();
}