geyser: update solana =1.16.1 (#146)

This commit is contained in:
Kirill Fomichev 2023-06-16 23:22:42 -04:00 committed by GitHub
parent b7fa0cd9ff
commit cdd6bcbef0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 847 additions and 966 deletions

View File

@ -16,6 +16,36 @@ The minor version will be incremented upon a breaking change and the patch versi
### Breaking
## [yellowstone-grpc-proto-1.3.0+solana.1.16.1] - 2023-06-16
### Features
- geyser: update solana =1.16.1 ([#146](https://github.com/rpcpool/yellowstone-grpc/pull/146)).
### Fixes
### Breaking
## [yellowstone-grpc-client-1.3.0+solana.1.16.1] - 2023-06-16
### Features
- geyser: update solana =1.16.1 ([#146](https://github.com/rpcpool/yellowstone-grpc/pull/146)).
### Fixes
### Breaking
## [yellowstone-grpc-geyser-0.8.2+solana.1.16.1] - 2023-06-16
### Features
- geyser: update solana =1.16.1 ([#146](https://github.com/rpcpool/yellowstone-grpc/pull/146)).
### Fixes
### Breaking
## [yellowstone-grpc-proto-1.3.0+solana.1.15.2] - 2023-06-15
### Features

1475
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,9 +1,9 @@
[workspace]
members = [
"examples/rust", # 1.3.0+solana.1.15.2
"yellowstone-grpc-client", # 1.3.0+solana.1.15.2
"yellowstone-grpc-geyser", # 0.8.2+solana.1.15.2
"yellowstone-grpc-proto", # 1.3.0+solana.1.15.2
"examples/rust", # 1.3.0+solana.1.16.1
"yellowstone-grpc-client", # 1.3.0+solana.1.16.1
"yellowstone-grpc-geyser", # 0.8.2+solana.1.16.1
"yellowstone-grpc-proto", # 1.3.0+solana.1.16.1
]
[profile.release]

View File

@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-client-simple"
version = "1.3.0+solana.1.15.2"
version = "1.3.0+solana.1.16.1"
authors = ["Triton One"]
edition = "2021"
publish = false
@ -17,7 +17,7 @@ env_logger = "0.10.0"
futures = "0.3.24"
hex = "0.4.3"
log = { version = "0.4.14", features = ["std"] }
solana-sdk = "=1.15.2"
solana-sdk = "=1.16.1"
tokio = { version = "1.21.2", features = ["rt-multi-thread", "macros", "time"] }
yellowstone-grpc-client = { path = "../../yellowstone-grpc-client" }
yellowstone-grpc-proto = { path = "../../yellowstone-grpc-proto" }

View File

@ -1,5 +1,5 @@
[toolchain]
channel = "1.66.1"
channel = "1.69.0"
components = ["clippy", "rustfmt"]
targets = []
profile = "minimal"

View File

@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-client"
version = "1.3.0+solana.1.15.2"
version = "1.3.0+solana.1.16.1"
authors = ["Triton One"]
edition = "2021"
description = "Yellowstone gRPC Geyser Simple Client"
@ -16,7 +16,7 @@ http = "0.2.8"
thiserror = "1.0"
tonic = { version = "0.9.2", features = ["gzip", "tls", "tls-roots"] }
tonic-health = "0.9.2"
yellowstone-grpc-proto = { path = "../yellowstone-grpc-proto", version = "1.3.0+solana.1.15.2" }
yellowstone-grpc-proto = { path = "../yellowstone-grpc-proto", version = "1.3.0+solana.1.16.1" }
[dev-dependencies]
tokio = { version = "1.21.2", features = ["macros"] }

View File

@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-geyser"
version = "0.8.2+solana.1.15.2"
version = "0.8.2+solana.1.16.1"
authors = ["Triton One"]
edition = "2021"
description = "Yellowstone gRPC Geyser Plugin"
@ -25,10 +25,10 @@ log = "0.4.17"
prometheus = "0.13.2"
serde = { version = "1.0.145", features = ["derive"] }
serde_json = "1.0.86"
solana-geyser-plugin-interface = "=1.15.2"
solana-logger = "=1.15.2"
solana-sdk = "=1.15.2"
solana-transaction-status = "=1.15.2"
solana-geyser-plugin-interface = "=1.16.1"
solana-logger = "=1.16.1"
solana-sdk = "=1.16.1"
solana-transaction-status = "=1.16.1"
tokio = { version = "1.21.2", features = ["rt-multi-thread", "macros", "time"] }
tokio-stream = "0.1.11"
tonic = { version = "0.9.2", features = ["gzip", "tls", "tls-roots"] }
@ -39,4 +39,4 @@ yellowstone-grpc-proto = { path = "../yellowstone-grpc-proto" }
anyhow = "1.0.62"
cargo-lock = "9.0.0"
git-version = "0.3.5"
vergen = "=7.2.1"
vergen = { version = "8.2.1", features = ["build", "cargo", "git", "gitcl", "rustc", "si"] }

View File

@ -1,11 +1,13 @@
use {
cargo_lock::Lockfile,
std::collections::HashSet,
vergen::{vergen, Config},
};
use {cargo_lock::Lockfile, std::collections::HashSet};
fn main() -> anyhow::Result<()> {
vergen(Config::default())?;
let mut envs = vergen::EmitBuilder::builder();
envs.all_build()
.all_cargo()
.all_git()
.all_rustc()
.all_sysinfo();
envs.emit()?;
// vergen git version does not looks cool
println!(

View File

@ -2,7 +2,7 @@ use {
crate::{
config::ConfigGrpc,
filters::Filter,
prom::{CONNECTIONS_TOTAL, MESSAGE_QUEUE_SIZE},
prom::{CONNECTIONS_TOTAL, INVALID_FULL_BLOCKS, MESSAGE_QUEUE_SIZE},
proto::{
self,
geyser_server::{Geyser, GeyserServer},
@ -19,7 +19,7 @@ use {
},
log::*,
solana_geyser_plugin_interface::geyser_plugin_interface::{
ReplicaAccountInfoV2, ReplicaBlockInfoV2, ReplicaTransactionInfoV2, SlotStatus,
ReplicaAccountInfoV3, ReplicaBlockInfoV2, ReplicaTransactionInfoV2, SlotStatus,
},
solana_sdk::{
clock::{UnixTimestamp, MAX_RECENT_BLOCKHASHES},
@ -29,7 +29,7 @@ use {
},
solana_transaction_status::{Reward, TransactionStatusMeta},
std::{
collections::HashMap,
collections::{BTreeMap, HashMap},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
@ -67,8 +67,8 @@ pub struct MessageAccount {
pub is_startup: bool,
}
impl<'a> From<(&'a ReplicaAccountInfoV2<'a>, u64, bool)> for MessageAccount {
fn from((account, slot, is_startup): (&'a ReplicaAccountInfoV2<'a>, u64, bool)) -> Self {
impl<'a> From<(&'a ReplicaAccountInfoV3<'a>, u64, bool)> for MessageAccount {
fn from((account, slot, is_startup): (&'a ReplicaAccountInfoV3<'a>, u64, bool)) -> Self {
Self {
account: MessageAccountInfo {
pubkey: Pubkey::try_from(account.pubkey).expect("valid Pubkey"),
@ -78,7 +78,7 @@ impl<'a> From<(&'a ReplicaAccountInfoV2<'a>, u64, bool)> for MessageAccount {
rent_epoch: account.rent_epoch,
data: account.data.into(),
write_version: account.write_version,
txn_signature: account.txn_signature.cloned(),
txn_signature: account.txn.map(|txn| *txn.signature()),
},
slot,
is_startup,
@ -510,10 +510,69 @@ impl GrpcService {
const PROCESSED_MESSAGES_MAX: usize = 31;
const PROCESSED_MESSAGES_SLEEP: Duration = Duration::from_millis(10);
let mut transactions: BTreeMap<
u64,
(Option<MessageBlockMeta>, Vec<MessageTransactionInfo>),
> = BTreeMap::new();
let mut messages: HashMap<u64, Vec<Message>> = HashMap::new();
let mut processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX);
let processed_sleep = sleep(PROCESSED_MESSAGES_SLEEP);
tokio::pin!(processed_sleep);
macro_rules! process_message {
($message:ident) => {
if let Message::Slot(slot) = $message {
let (mut confirmed_messages, mut finalized_messages) = match slot.status {
CommitmentLevel::Processed => {
(Vec::with_capacity(1), Vec::with_capacity(1))
}
CommitmentLevel::Confirmed => {
let messages = messages.get(&slot.slot).cloned().unwrap_or_default();
(messages, Vec::with_capacity(1))
}
CommitmentLevel::Finalized => {
messages.retain(|msg_slot, _messages| *msg_slot >= slot.slot);
let messages = messages.remove(&slot.slot).unwrap_or_default();
(Vec::with_capacity(1), messages)
}
};
// processed
processed_messages.push($message.clone());
let _ =
broadcast_tx.send((CommitmentLevel::Processed, processed_messages.into()));
processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX);
processed_sleep
.as_mut()
.reset(Instant::now() + PROCESSED_MESSAGES_SLEEP);
// confirmed
confirmed_messages.push($message.clone());
let _ =
broadcast_tx.send((CommitmentLevel::Confirmed, confirmed_messages.into()));
// finalized
finalized_messages.push($message);
let _ =
broadcast_tx.send((CommitmentLevel::Finalized, finalized_messages.into()));
} else {
processed_messages.push($message.clone());
if processed_messages.len() >= PROCESSED_MESSAGES_MAX {
let _ = broadcast_tx
.send((CommitmentLevel::Processed, processed_messages.into()));
processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX);
processed_sleep
.as_mut()
.reset(Instant::now() + PROCESSED_MESSAGES_SLEEP);
}
messages
.entry($message.get_slot())
.or_default()
.push($message);
}
};
}
loop {
tokio::select! {
Some(message) = messages_rx.recv() => {
@ -523,42 +582,52 @@ impl GrpcService {
let _ = blocks_meta_tx.send(message.clone());
}
if let Message::Slot(slot) = message {
let (mut confirmed_messages, mut finalized_messages) = match slot.status {
CommitmentLevel::Processed => (Vec::with_capacity(1), Vec::with_capacity(1)),
CommitmentLevel::Confirmed => {
let messages = messages.get(&slot.slot).cloned().unwrap_or_default();
(messages, Vec::with_capacity(1))
}
CommitmentLevel::Finalized => {
messages.retain(|msg_slot, _messages| *msg_slot >= slot.slot);
let messages = messages.remove(&slot.slot).unwrap_or_default();
(Vec::with_capacity(1), messages)
}
};
// processed
processed_messages.push(message.clone());
let _ = broadcast_tx.send((CommitmentLevel::Processed, processed_messages.into()));
processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX);
processed_sleep.as_mut().reset(Instant::now() + PROCESSED_MESSAGES_SLEEP);
// confirmed
confirmed_messages.push(message.clone());
let _ = broadcast_tx.send((CommitmentLevel::Confirmed, confirmed_messages.into()));
// finalized
finalized_messages.push(message);
let _ = broadcast_tx.send((CommitmentLevel::Finalized, finalized_messages.into()));
} else {
processed_messages.push(message.clone());
if processed_messages.len() >= PROCESSED_MESSAGES_MAX {
let _ = broadcast_tx.send((CommitmentLevel::Processed, processed_messages.into()));
processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX);
processed_sleep.as_mut().reset(Instant::now() + PROCESSED_MESSAGES_SLEEP);
// consctruct Block message
let slot = message.get_slot();
if match &message {
// Collect Transactions for full Block message
Message::Transaction(msg_tx) => {
transactions.entry(slot).or_default().1.push(msg_tx.transaction.clone());
true
}
messages.entry(message.get_slot()).or_default().push(message);
// Save block meta for full Block message
Message::BlockMeta(msg_block) => {
transactions.entry(slot).or_default().0 = Some(msg_block.clone());
true
}
_ => false
} && matches!(
transactions.get(&slot),
Some((Some(block_meta), transactions)) if block_meta.executed_transaction_count as usize == transactions.len()
) {
let (block_meta, mut transactions) = transactions.remove(&slot).expect("checked");
transactions.sort_by(|tx1, tx2| tx1.index.cmp(&tx2.index));
let message = Message::Block((block_meta.expect("checked"), transactions).into());
process_message!(message);
}
// remove outdated transactions
if matches!(message, Message::Slot(msg) if msg.status == CommitmentLevel::Finalized) {
loop {
match transactions.keys().next().cloned() {
// Block was dropped, not in chain
Some(kslot) if kslot < slot => {
transactions.remove(&kslot);
}
// Maybe log error
Some(kslot) if kslot == slot => {
if let Some((Some(_), vec)) = transactions.remove(&kslot) {
INVALID_FULL_BLOCKS.inc();
error!("{} transactions left for block {kslot}", vec.len());
}
}
_ => break,
}
}
}
// process original message
process_message!(message);
}
() = &mut processed_sleep => {
if !processed_messages.is_empty() {

View File

@ -1,17 +1,17 @@
use {
crate::{
config::Config,
grpc::{
GrpcService, Message, MessageBlockMeta, MessageTransaction, MessageTransactionInfo,
},
grpc::{GrpcService, Message},
prom::{self, PrometheusService, MESSAGE_QUEUE_SIZE},
},
log::*,
solana_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus,
},
std::{collections::BTreeMap, time::Duration},
std::{
sync::atomic::{AtomicBool, Ordering},
time::Duration,
},
tokio::{
runtime::Runtime,
sync::{mpsc, oneshot},
@ -21,27 +21,14 @@ use {
#[derive(Debug)]
pub struct PluginInner {
runtime: Runtime,
startup_received: bool,
startup_processed_received: bool,
startup_received: AtomicBool,
startup_processed_received: AtomicBool,
grpc_channel: mpsc::UnboundedSender<Message>,
grpc_shutdown_tx: oneshot::Sender<()>,
prometheus: PrometheusService,
transactions: BTreeMap<u64, (Option<MessageBlockMeta>, Vec<MessageTransactionInfo>)>,
}
impl PluginInner {
fn try_send_full_block(&mut self, slot: u64) {
if matches!(
self.transactions.get(&slot),
Some((Some(block_meta), transactions)) if block_meta.executed_transaction_count as usize == transactions.len()
) {
let (block_meta, mut transactions) = self.transactions.remove(&slot).expect("checked");
transactions.sort_by(|tx1, tx2| tx1.index.cmp(&tx2.index));
let message = Message::Block((block_meta.expect("checked"), transactions).into());
self.send_message(message);
}
}
fn send_message(&self, message: Message) {
if self.grpc_channel.send(message).is_ok() {
MESSAGE_QUEUE_SIZE.inc();
@ -55,13 +42,15 @@ pub struct Plugin {
}
impl Plugin {
fn with_inner<F>(&mut self, f: F) -> PluginResult<()>
fn with_inner<F>(&self, f: F) -> PluginResult<()>
where
F: FnOnce(&mut PluginInner) -> PluginResult<()>,
F: FnOnce(&PluginInner) -> PluginResult<()>,
{
// Before processed slot after end of startup message we will fail to construct full block
let inner = self.inner.as_mut().expect("initialized");
if inner.startup_received && inner.startup_processed_received {
let inner = self.inner.as_ref().expect("initialized");
if inner.startup_received.load(Ordering::SeqCst)
&& inner.startup_processed_received.load(Ordering::SeqCst)
{
f(inner)
} else {
Ok(())
@ -93,12 +82,11 @@ impl GeyserPlugin for Plugin {
self.inner = Some(PluginInner {
runtime,
startup_received: false,
startup_processed_received: false,
startup_received: AtomicBool::new(false),
startup_processed_received: AtomicBool::new(false),
grpc_channel,
grpc_shutdown_tx,
prometheus,
transactions: BTreeMap::new(),
});
Ok(())
@ -113,14 +101,14 @@ impl GeyserPlugin for Plugin {
}
}
fn notify_end_of_startup(&mut self) -> PluginResult<()> {
let inner = self.inner.as_mut().expect("initialized");
inner.startup_received = true;
fn notify_end_of_startup(&self) -> PluginResult<()> {
let inner = self.inner.as_ref().expect("initialized");
inner.startup_received.store(true, Ordering::SeqCst);
Ok(())
}
fn update_account(
&mut self,
&self,
account: ReplicaAccountInfoVersions,
slot: u64,
is_startup: bool,
@ -130,7 +118,10 @@ impl GeyserPlugin for Plugin {
ReplicaAccountInfoVersions::V0_0_1(_info) => {
unreachable!("ReplicaAccountInfoVersions::V0_0_1 is not supported")
}
ReplicaAccountInfoVersions::V0_0_2(info) => info,
ReplicaAccountInfoVersions::V0_0_2(_info) => {
unreachable!("ReplicaAccountInfoVersions::V0_0_2 is not supported")
}
ReplicaAccountInfoVersions::V0_0_3(info) => info,
};
let message = Message::Account((account, slot, is_startup).into());
@ -140,40 +131,22 @@ impl GeyserPlugin for Plugin {
}
fn update_slot_status(
&mut self,
&self,
slot: u64,
parent: Option<u64>,
status: SlotStatus,
) -> PluginResult<()> {
let inner = self.inner.as_mut().expect("initialized");
if inner.startup_received
&& !inner.startup_processed_received
let inner = self.inner.as_ref().expect("initialized");
if inner.startup_received.load(Ordering::SeqCst)
&& !inner.startup_processed_received.load(Ordering::SeqCst)
&& status == SlotStatus::Processed
{
inner.startup_processed_received = true;
inner
.startup_processed_received
.store(true, Ordering::SeqCst);
}
self.with_inner(|inner| {
// Remove outdated records
if status == SlotStatus::Rooted {
loop {
match inner.transactions.keys().next().cloned() {
// Block was dropped, not in chain
Some(kslot) if kslot < slot => {
inner.transactions.remove(&kslot);
}
// Maybe log error
Some(kslot) if kslot == slot => {
if let Some((Some(_), vec)) = inner.transactions.remove(&kslot) {
prom::INVALID_FULL_BLOCKS.inc();
error!("{} transactions left for block {kslot}", vec.len());
}
}
_ => break,
}
}
}
let message = Message::Slot((slot, parent, status).into());
inner.send_message(message);
prom::update_slot_status(status, slot);
@ -183,7 +156,7 @@ impl GeyserPlugin for Plugin {
}
fn notify_transaction(
&mut self,
&self,
transaction: ReplicaTransactionInfoVersions<'_>,
slot: u64,
) -> PluginResult<()> {
@ -195,24 +168,14 @@ impl GeyserPlugin for Plugin {
ReplicaTransactionInfoVersions::V0_0_2(info) => info,
};
let msg_tx: MessageTransaction = (transaction, slot).into();
// Collect Transactions for full block message
let tx = msg_tx.transaction.clone();
inner.transactions.entry(slot).or_default().1.push(tx);
inner.try_send_full_block(slot);
let message = Message::Transaction(msg_tx);
let message = Message::Transaction((transaction, slot).into());
inner.send_message(message);
Ok(())
})
}
fn notify_block_metadata(
&mut self,
blockinfo: ReplicaBlockInfoVersions<'_>,
) -> PluginResult<()> {
fn notify_block_metadata(&self, blockinfo: ReplicaBlockInfoVersions<'_>) -> PluginResult<()> {
self.with_inner(|inner| {
let blockinfo = match blockinfo {
ReplicaBlockInfoVersions::V0_0_1(_info) => {
@ -221,13 +184,7 @@ impl GeyserPlugin for Plugin {
ReplicaBlockInfoVersions::V0_0_2(info) => info,
};
let block_meta: MessageBlockMeta = (blockinfo).into();
// Save block meta for full block message
inner.transactions.entry(block_meta.slot).or_default().0 = Some(block_meta.clone());
inner.try_send_full_block(block_meta.slot);
let message = Message::BlockMeta(block_meta);
let message = Message::BlockMeta((blockinfo).into());
inner.send_message(message);
Ok(())

View File

@ -11,7 +11,7 @@ pub struct Version {
}
pub const VERSION: Version = Version {
version: env!("VERGEN_BUILD_SEMVER"),
version: env!("CARGO_PKG_VERSION"),
proto: env!("YELLOWSTONE_GRPC_PROTO_VERSION"),
solana: env!("SOLANA_SDK_VERSION"),
git: env!("GIT_VERSION"),

View File

@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-proto"
version = "1.3.0+solana.1.15.2"
version = "1.3.0+solana.1.16.1"
authors = ["Triton One"]
edition = "2021"
description = "Yellowstone gRPC Geyser Protobuf Definitions"