Stream additional block metadata via plugin (#22023)

* Stream additional block metadata through plugin
blockhash, block_height, block_time, rewards are streamed
This commit is contained in:
Lijun Wang 2021-12-29 15:12:01 -08:00 committed by GitHub
parent c9c78622a8
commit f14928a970
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 383 additions and 12 deletions

View File

@ -3,8 +3,8 @@
/// In addition, the dynamic library must export a "C" function _create_plugin which
/// creates the implementation of the plugin.
use {
solana_sdk::{signature::Signature, transaction::SanitizedTransaction},
solana_transaction_status::TransactionStatusMeta,
solana_sdk::{clock::UnixTimestamp, signature::Signature, transaction::SanitizedTransaction},
solana_transaction_status::{Reward, TransactionStatusMeta},
std::{any::Any, error, io},
thiserror::Error,
};
@ -60,6 +60,19 @@ pub enum ReplicaTransactionInfoVersions<'a> {
V0_0_1(&'a ReplicaTransactionInfo<'a>),
}
#[derive(Clone, Debug)]
pub struct ReplicaBlockInfo<'a> {
pub slot: u64,
pub blockhash: &'a str,
pub rewards: &'a [Reward],
pub block_time: Option<UnixTimestamp>,
pub block_height: Option<u64>,
}
pub enum ReplicaBlockInfoVersions<'a> {
V0_0_1(&'a ReplicaBlockInfo<'a>),
}
/// Errors returned by plugin calls
#[derive(Error, Debug)]
pub enum AccountsDbPluginError {
@ -173,6 +186,12 @@ pub trait AccountsDbPlugin: Any + Send + Sync + std::fmt::Debug {
Ok(())
}
/// Called when block's metadata is updated.
#[allow(unused_variables)]
fn notify_block_metadata(&mut self, blockinfo: ReplicaBlockInfoVersions) -> Result<()> {
Ok(())
}
/// Check if the plugin is interested in account data
/// Default is true -- if the plugin is not interested in
/// account data, please return false.

View File

@ -2,6 +2,8 @@ use {
crate::{
accounts_update_notifier::AccountsUpdateNotifierImpl,
accountsdb_plugin_manager::AccountsDbPluginManager,
block_metadata_notifier::BlockMetadataNotifierImpl,
block_metadata_notifier_interface::BlockMetadataNotifierLock,
slot_status_notifier::SlotStatusNotifierImpl, slot_status_observer::SlotStatusObserver,
transaction_notifier::TransactionNotifierImpl,
},
@ -50,6 +52,7 @@ pub struct AccountsDbPluginService {
plugin_manager: Arc<RwLock<AccountsDbPluginManager>>,
accounts_update_notifier: Option<AccountsUpdateNotifier>,
transaction_notifier: Option<TransactionNotifierLock>,
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
}
impl AccountsDbPluginService {
@ -102,17 +105,24 @@ impl AccountsDbPluginService {
None
};
let slot_status_observer =
if account_data_notifications_enabled || transaction_notifications_enabled {
let slot_status_notifier = SlotStatusNotifierImpl::new(plugin_manager.clone());
let slot_status_notifier = Arc::new(RwLock::new(slot_status_notifier));
let (slot_status_observer, block_metadata_notifier): (
Option<SlotStatusObserver>,
Option<BlockMetadataNotifierLock>,
) = if account_data_notifications_enabled || transaction_notifications_enabled {
let slot_status_notifier = SlotStatusNotifierImpl::new(plugin_manager.clone());
let slot_status_notifier = Arc::new(RwLock::new(slot_status_notifier));
(
Some(SlotStatusObserver::new(
confirmed_bank_receiver,
slot_status_notifier,
))
} else {
None
};
)),
Some(Arc::new(RwLock::new(BlockMetadataNotifierImpl::new(
plugin_manager.clone(),
)))),
)
} else {
(None, None)
};
info!("Started AccountsDbPluginService");
Ok(AccountsDbPluginService {
@ -120,6 +130,7 @@ impl AccountsDbPluginService {
plugin_manager,
accounts_update_notifier,
transaction_notifier,
block_metadata_notifier,
})
}
@ -186,6 +197,10 @@ impl AccountsDbPluginService {
self.transaction_notifier.clone()
}
pub fn get_block_metadata_notifier(&self) -> Option<BlockMetadataNotifierLock> {
self.block_metadata_notifier.clone()
}
pub fn join(self) -> thread::Result<()> {
if let Some(mut slot_status_observer) = self.slot_status_observer {
slot_status_observer.join()?;

View File

@ -0,0 +1,105 @@
use {
crate::{
accountsdb_plugin_manager::AccountsDbPluginManager,
block_metadata_notifier_interface::BlockMetadataNotifier,
},
log::*,
solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{
ReplicaBlockInfo, ReplicaBlockInfoVersions,
},
solana_measure::measure::Measure,
solana_metrics::*,
solana_runtime::bank::RewardInfo,
solana_sdk::{clock::UnixTimestamp, pubkey::Pubkey},
solana_transaction_status::{Reward, Rewards},
std::sync::{Arc, RwLock},
};
pub(crate) struct BlockMetadataNotifierImpl {
plugin_manager: Arc<RwLock<AccountsDbPluginManager>>,
}
impl BlockMetadataNotifier for BlockMetadataNotifierImpl {
/// Notify the block metadata
fn notify_block_metadata(
&self,
slot: u64,
blockhash: &str,
rewards: &RwLock<Vec<(Pubkey, RewardInfo)>>,
block_time: Option<UnixTimestamp>,
block_height: Option<u64>,
) {
let mut plugin_manager = self.plugin_manager.write().unwrap();
if plugin_manager.plugins.is_empty() {
return;
}
let rewards = Self::build_rewards(rewards);
for plugin in plugin_manager.plugins.iter_mut() {
let mut measure = Measure::start("accountsdb-plugin-update-slot");
let block_info =
Self::build_replica_block_info(slot, blockhash, &rewards, block_time, block_height);
let block_info = ReplicaBlockInfoVersions::V0_0_1(&block_info);
match plugin.notify_block_metadata(block_info) {
Err(err) => {
error!(
"Failed to update block metadata at slot {}, error: {} to plugin {}",
slot,
err,
plugin.name()
)
}
Ok(_) => {
trace!(
"Successfully updated block metadata at slot {} to plugin {}",
slot,
plugin.name()
);
}
}
measure.stop();
inc_new_counter_debug!(
"accountsdb-plugin-update-block-metadata-us",
measure.as_us() as usize,
1000,
1000
);
}
}
}
impl BlockMetadataNotifierImpl {
fn build_rewards(rewards: &RwLock<Vec<(Pubkey, RewardInfo)>>) -> Rewards {
let rewards = rewards.read().unwrap();
rewards
.iter()
.map(|(pubkey, reward)| Reward {
pubkey: pubkey.to_string(),
lamports: reward.lamports,
post_balance: reward.post_balance,
reward_type: Some(reward.reward_type),
commission: reward.commission,
})
.collect()
}
fn build_replica_block_info<'a>(
slot: u64,
blockhash: &'a str,
rewards: &'a [Reward],
block_time: Option<UnixTimestamp>,
block_height: Option<u64>,
) -> ReplicaBlockInfo<'a> {
ReplicaBlockInfo {
slot,
blockhash,
rewards,
block_time,
block_height,
}
}
pub fn new(plugin_manager: Arc<RwLock<AccountsDbPluginManager>>) -> Self {
Self { plugin_manager }
}
}

View File

@ -0,0 +1,20 @@
use {
solana_runtime::bank::RewardInfo,
solana_sdk::{clock::UnixTimestamp, pubkey::Pubkey},
std::sync::{Arc, RwLock},
};
/// Interface for notifying block metadata changes
pub trait BlockMetadataNotifier {
/// Notify the block metadata
fn notify_block_metadata(
&self,
slot: u64,
blockhash: &str,
rewards: &RwLock<Vec<(Pubkey, RewardInfo)>>,
block_time: Option<UnixTimestamp>,
block_height: Option<u64>,
);
}
pub type BlockMetadataNotifierLock = Arc<RwLock<dyn BlockMetadataNotifier + Sync + Send>>;

View File

@ -1,6 +1,8 @@
pub mod accounts_update_notifier;
pub mod accountsdb_plugin_manager;
pub mod accountsdb_plugin_service;
pub mod block_metadata_notifier;
pub mod block_metadata_notifier_interface;
pub mod slot_status_notifier;
pub mod slot_status_observer;
pub mod transaction_notifier;

View File

@ -153,6 +153,16 @@ CREATE TABLE transaction (
CONSTRAINT transaction_pk PRIMARY KEY (slot, signature)
);
-- The table storing block metadata
CREATE TABLE block (
slot BIGINT PRIMARY KEY,
blockhash VARCHAR(44),
rewards "Reward"[],
block_time BIGINT,
block_height BIGINT,
updated_on TIMESTAMP NOT NULL
);
/**
* The following is for keeping historical data for accounts and is not required for plugin to work.
*/

View File

@ -8,6 +8,7 @@ DROP TABLE account_audit;
DROP TABLE account;
DROP TABLE slot;
DROP TABLE transaction;
DROP TABLE block;
DROP TYPE "TransactionError" CASCADE;
DROP TYPE "TransactionErrorCode" CASCADE;

View File

@ -12,7 +12,7 @@ use {
serde_json,
solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{
AccountsDbPlugin, AccountsDbPluginError, ReplicaAccountInfoVersions,
ReplicaTransactionInfoVersions, Result, SlotStatus,
ReplicaBlockInfoVersions, ReplicaTransactionInfoVersions, Result, SlotStatus,
},
solana_metrics::*,
std::{fs::File, io::Read},
@ -334,6 +334,31 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
Ok(())
}
fn notify_block_metadata(&mut self, block_info: ReplicaBlockInfoVersions) -> Result<()> {
match &mut self.client {
None => {
return Err(AccountsDbPluginError::Custom(Box::new(
AccountsDbPluginPostgresError::DataStoreConnectionError {
msg: "There is no connection to the PostgreSQL database.".to_string(),
},
)));
}
Some(client) => match block_info {
ReplicaBlockInfoVersions::V0_0_1(block_info) => {
let result = client.update_block_metadata(block_info);
if let Err(err) = result {
return Err(AccountsDbPluginError::SlotStatusUpdateError{
msg: format!("Failed to persist the update of block metadata to the PostgreSQL database. Error: {:?}", err)
});
}
}
},
}
Ok(())
}
/// Check if the plugin is interested in account data
/// Default is true -- if the plugin is not interested in
/// account data, please return false.

View File

@ -1,4 +1,6 @@
#![allow(clippy::integer_arithmetic)]
mod postgres_client_block_metadata;
mod postgres_client_transaction;
/// A concurrent implementation for writing accounts into the PostgreSQL in parallel.
@ -10,9 +12,10 @@ use {
crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender},
log::*,
postgres::{Client, NoTls, Statement},
postgres_client_block_metadata::DbBlockInfo,
postgres_client_transaction::LogTransactionRequest,
solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{
AccountsDbPluginError, ReplicaAccountInfo, SlotStatus,
AccountsDbPluginError, ReplicaAccountInfo, ReplicaBlockInfo, SlotStatus,
},
solana_measure::measure::Measure,
solana_metrics::*,
@ -44,6 +47,7 @@ struct PostgresSqlClientWrapper {
update_slot_with_parent_stmt: Statement,
update_slot_without_parent_stmt: Statement,
update_transaction_log_stmt: Statement,
update_block_metadata_stmt: Statement,
}
pub struct SimplePostgresClient {
@ -195,6 +199,11 @@ pub trait PostgresClient {
&mut self,
transaction_log_info: LogTransactionRequest,
) -> Result<(), AccountsDbPluginError>;
fn update_block_metadata(
&mut self,
block_info: UpdateBlockMetadataRequest,
) -> Result<(), AccountsDbPluginError>;
}
impl SimplePostgresClient {
@ -501,6 +510,8 @@ impl SimplePostgresClient {
Self::build_slot_upsert_statement_without_parent(&mut client, config)?;
let update_transaction_log_stmt =
Self::build_transaction_info_upsert_statement(&mut client, config)?;
let update_block_metadata_stmt =
Self::build_block_metadata_upsert_statement(&mut client, config)?;
let batch_size = config
.batch_size
@ -516,6 +527,7 @@ impl SimplePostgresClient {
update_slot_with_parent_stmt,
update_slot_without_parent_stmt,
update_transaction_log_stmt,
update_block_metadata_stmt,
}),
})
}
@ -591,6 +603,13 @@ impl PostgresClient for SimplePostgresClient {
) -> Result<(), AccountsDbPluginError> {
self.log_transaction_impl(transaction_log_info)
}
fn update_block_metadata(
&mut self,
block_info: UpdateBlockMetadataRequest,
) -> Result<(), AccountsDbPluginError> {
self.update_block_metadata_impl(block_info)
}
}
struct UpdateAccountRequest {
@ -604,11 +623,16 @@ struct UpdateSlotRequest {
slot_status: SlotStatus,
}
pub struct UpdateBlockMetadataRequest {
pub block_info: DbBlockInfo,
}
#[warn(clippy::large_enum_variant)]
enum DbWorkItem {
UpdateAccount(Box<UpdateAccountRequest>),
UpdateSlot(Box<UpdateSlotRequest>),
LogTransaction(Box<LogTransactionRequest>),
UpdateBlockMetadata(Box<UpdateBlockMetadataRequest>),
}
impl PostgresClientWorker {
@ -677,6 +701,14 @@ impl PostgresClientWorker {
}
}
}
DbWorkItem::UpdateBlockMetadata(block_info) => {
if let Err(err) = self.client.update_block_metadata(*block_info) {
error!("Failed to update block metadata: ({})", err);
if panic_on_db_errors {
abort();
}
}
}
},
Err(err) => match err {
RecvTimeoutError::Timeout => {
@ -868,6 +900,25 @@ impl ParallelPostgresClient {
Ok(())
}
pub fn update_block_metadata(
&mut self,
block_info: &ReplicaBlockInfo,
) -> Result<(), AccountsDbPluginError> {
if let Err(err) = self.sender.send(DbWorkItem::UpdateBlockMetadata(Box::new(
UpdateBlockMetadataRequest {
block_info: DbBlockInfo::from(block_info),
},
))) {
return Err(AccountsDbPluginError::SlotStatusUpdateError {
msg: format!(
"Failed to update the block metadata at slot {:?}, error: {:?}",
block_info.slot, err
),
});
}
Ok(())
}
pub fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError> {
info!("Notifying the end of startup");
// Ensure all items in the queue has been received by the workers

View File

@ -0,0 +1,97 @@
use {
crate::{
accountsdb_plugin_postgres::{
AccountsDbPluginPostgresConfig, AccountsDbPluginPostgresError,
},
postgres_client::{
postgres_client_transaction::DbReward, SimplePostgresClient, UpdateBlockMetadataRequest,
},
},
chrono::Utc,
log::*,
postgres::{Client, Statement},
solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{
AccountsDbPluginError, ReplicaBlockInfo,
},
};
#[derive(Clone, Debug)]
pub struct DbBlockInfo {
pub slot: i64,
pub blockhash: String,
pub rewards: Vec<DbReward>,
pub block_time: Option<i64>,
pub block_height: Option<i64>,
}
impl<'a> From<&ReplicaBlockInfo<'a>> for DbBlockInfo {
fn from(block_info: &ReplicaBlockInfo) -> Self {
Self {
slot: block_info.slot as i64,
blockhash: block_info.blockhash.to_string(),
rewards: block_info.rewards.iter().map(DbReward::from).collect(),
block_time: block_info.block_time,
block_height: block_info
.block_height
.map(|block_height| block_height as i64),
}
}
}
impl SimplePostgresClient {
pub(crate) fn build_block_metadata_upsert_statement(
client: &mut Client,
config: &AccountsDbPluginPostgresConfig,
) -> Result<Statement, AccountsDbPluginError> {
let stmt =
"INSERT INTO block (slot, blockhash, rewards, block_time, block_height, updated_on) \
VALUES ($1, $2, $3, $4, $5, $6)";
let stmt = client.prepare(stmt);
match stmt {
Err(err) => {
return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
msg: format!(
"Error in preparing for the block metadata update PostgreSQL database: ({}) host: {:?} user: {:?} config: {:?}",
err, config.host, config.user, config
),
})));
}
Ok(stmt) => Ok(stmt),
}
}
pub(crate) fn update_block_metadata_impl(
&mut self,
block_info: UpdateBlockMetadataRequest,
) -> Result<(), AccountsDbPluginError> {
let client = self.client.get_mut().unwrap();
let statement = &client.update_block_metadata_stmt;
let client = &mut client.client;
let updated_on = Utc::now().naive_utc();
let block_info = block_info.block_info;
let result = client.query(
statement,
&[
&block_info.slot,
&block_info.blockhash,
&block_info.rewards,
&block_info.block_time,
&block_info.block_height,
&updated_on,
],
);
if let Err(err) = result {
let msg = format!(
"Failed to persist the update of block metadata to the PostgreSQL database. Error: {:?}",
err);
error!("{}", msg);
return Err(AccountsDbPluginError::AccountsUpdateError { msg });
}
Ok(())
}
}

View File

@ -26,6 +26,7 @@ use {
voting_service::VoteOp,
window_service::DuplicateSlotReceiver,
},
solana_accountsdb_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierLock,
solana_client::rpc_response::SlotUpdate,
solana_entry::entry::VerifyRecyclers,
solana_gossip::cluster_info::ClusterInfo,
@ -335,6 +336,7 @@ impl ReplayStage {
cost_update_sender: Sender<CostUpdate>,
voting_sender: Sender<VoteOp>,
drop_bank_sender: Sender<Vec<Arc<Bank>>>,
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
) -> Self {
let ReplayStageConfig {
vote_account,
@ -440,6 +442,7 @@ impl ReplayStage {
&cost_update_sender,
&mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender,
block_metadata_notifier.clone(),
);
replay_active_banks_time.stop();
@ -2042,6 +2045,7 @@ impl ReplayStage {
cost_update_sender: &Sender<CostUpdate>,
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
) -> bool {
let mut did_complete_bank = false;
let mut tx_count = 0;
@ -2197,6 +2201,16 @@ impl ReplayStage {
}
}
Self::record_rewards(&bank, rewards_recorder_sender);
if let Some(ref block_metadata_notifier) = block_metadata_notifier {
let block_metadata_notifier = block_metadata_notifier.read().unwrap();
block_metadata_notifier.notify_block_metadata(
bank.slot(),
&bank.last_blockhash().to_string(),
&bank.rewards,
Some(bank.clock().unix_timestamp),
Some(bank.block_height()),
)
}
} else {
trace!(
"bank {} not completed tick_height: {}, max_tick_height: {}",

View File

@ -26,6 +26,7 @@ use {
voting_service::VotingService,
},
crossbeam_channel::unbounded,
solana_accountsdb_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierLock,
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
blockstore::Blockstore, blockstore_processor::TransactionStatusSender,
@ -143,6 +144,7 @@ impl Tvu {
cost_model: &Arc<RwLock<CostModel>>,
accounts_package_channel: (AccountsPackageSender, AccountsPackageReceiver),
last_full_snapshot_slot: Option<Slot>,
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
) -> Self {
let Sockets {
repair: repair_socket,
@ -333,6 +335,7 @@ impl Tvu {
cost_update_sender,
voting_sender,
drop_bank_sender,
block_metadata_notifier,
);
let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
@ -501,6 +504,7 @@ pub mod tests {
&Arc::new(RwLock::new(CostModel::default())),
accounts_package_channel,
None,
None,
);
exit.store(true, Ordering::Relaxed);
tvu.join().unwrap();

View File

@ -427,6 +427,13 @@ impl Validator {
accountsdb_plugin_service.get_transaction_notifier()
});
let block_metadata_notifier =
accountsdb_plugin_service
.as_ref()
.and_then(|accountsdb_plugin_service| {
accountsdb_plugin_service.get_block_metadata_notifier()
});
info!(
"AccountsDb plugin: accounts_update_notifier: {} transaction_notifier: {}",
accounts_update_notifier.is_some(),
@ -884,6 +891,7 @@ impl Validator {
&cost_model,
accounts_package_channel,
last_full_snapshot_slot,
block_metadata_notifier,
);
let tpu = Tpu::new(