From 2b1bf634c728c49401ce1aca520292ef534100c4 Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Tue, 22 Mar 2022 12:54:38 +0100 Subject: [PATCH] grpc proto: Change back to AccountsDb name to avoid incompatibility Existing plugin deployments must continue to work with the new version of the connector. --- geyser-plugin-grpc/src/geyser_plugin_grpc.rs | 24 +++++++++++--------- geyser-plugin-grpc/src/test_server.rs | 6 ++--- lib/src/grpc_plugin_source.rs | 6 ++--- lib/src/websocket_source.rs | 14 ++++++++---- proto/geyser.proto | 2 +- 5 files changed, 29 insertions(+), 23 deletions(-) diff --git a/geyser-plugin-grpc/src/geyser_plugin_grpc.rs b/geyser-plugin-grpc/src/geyser_plugin_grpc.rs index 956b2f0..2a2a91d 100644 --- a/geyser-plugin-grpc/src/geyser_plugin_grpc.rs +++ b/geyser-plugin-grpc/src/geyser_plugin_grpc.rs @@ -1,16 +1,16 @@ use { crate::accounts_selector::AccountsSelector, + bs58, geyser_proto::{ slot_update::Status as SlotUpdateStatus, update::UpdateOneof, AccountWrite, Ping, SlotUpdate, SubscribeRequest, SubscribeResponse, Update, }, - bs58, log::*, serde_derive::Deserialize, serde_json, solana_geyser_plugin_interface::geyser_plugin_interface::{ - GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, - Result as PluginResult, SlotStatus, + GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, Result as PluginResult, + SlotStatus, }, std::collections::HashSet, std::convert::TryInto, @@ -28,7 +28,7 @@ pub mod geyser_proto { pub mod geyser_service { use super::*; use { - geyser_proto::geyser_server::Geyser, + geyser_proto::accounts_db_server::AccountsDb, tokio_stream::wrappers::ReceiverStream, tonic::{Code, Request, Response, Status}, }; @@ -58,7 +58,7 @@ pub mod geyser_service { } #[tonic::async_trait] - impl Geyser for Service { + impl AccountsDb for Service { type SubscribeStream = ReceiverStream>; async fn subscribe( @@ -171,11 +171,13 @@ impl GeyserPlugin for Plugin { } })?; - let addr = config.bind_address.parse().map_err(|err| { - GeyserPluginError::ConfigFileReadError { - msg: format!("Error parsing the bind_address {:?}", err), - } - })?; + let addr = + config + .bind_address + .parse() + .map_err(|err| GeyserPluginError::ConfigFileReadError { + msg: format!("Error parsing the bind_address {:?}", err), + })?; let highest_write_slot = Arc::new(AtomicU64::new(0)); let service = @@ -183,7 +185,7 @@ impl GeyserPlugin for Plugin { let (server_exit_sender, mut server_exit_receiver) = broadcast::channel::<()>(1); let server_broadcast = service.sender.clone(); - let server = geyser_proto::geyser_server::GeyserServer::new(service); + let server = geyser_proto::accounts_db_server::AccountsDbServer::new(service); let runtime = tokio::runtime::Runtime::new().unwrap(); runtime.spawn(Server::builder().add_service(server).serve_with_shutdown( addr, diff --git a/geyser-plugin-grpc/src/test_server.rs b/geyser-plugin-grpc/src/test_server.rs index b018e82..56cd2f0 100644 --- a/geyser-plugin-grpc/src/test_server.rs +++ b/geyser-plugin-grpc/src/test_server.rs @@ -10,7 +10,7 @@ use geyser_proto::{update::UpdateOneof, SlotUpdate, SubscribeRequest, Update}; pub mod geyser_service { use super::*; use { - geyser_proto::geyser_server::Geyser, + geyser_proto::accounts_db_server::AccountsDb, tokio_stream::wrappers::ReceiverStream, tonic::{Request, Response, Status}, }; @@ -28,7 +28,7 @@ pub mod geyser_service { } #[tonic::async_trait] - impl Geyser for Service { + impl AccountsDb for Service { type SubscribeStream = ReceiverStream>; async fn subscribe( @@ -55,7 +55,7 @@ async fn main() -> Result<(), Box> { let service = geyser_service::Service::new(); let sender = service.sender.clone(); - let svc = geyser_proto::geyser_server::GeyserServer::new(service); + let svc = geyser_proto::accounts_db_server::AccountsDbServer::new(service); tokio::spawn(async move { let mut slot = 1; diff --git a/lib/src/grpc_plugin_source.rs b/lib/src/grpc_plugin_source.rs index 7bcccec..88229f4 100644 --- a/lib/src/grpc_plugin_source.rs +++ b/lib/src/grpc_plugin_source.rs @@ -4,7 +4,7 @@ use jsonrpc_core_client::transports::http; use solana_account_decoder::UiAccountEncoding; use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}; use solana_client::rpc_response::{Response, RpcKeyedAccount}; -use solana_rpc::{rpc::OptionalContext, rpc::rpc_accounts::AccountsDataClient}; +use solana_rpc::{rpc::rpc_accounts::AccountsDataClient, rpc::OptionalContext}; use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey}; use futures::{future, future::FutureExt}; @@ -16,7 +16,7 @@ use std::{collections::HashMap, str::FromStr, time::Duration}; pub mod geyser_proto { tonic::include_proto!("geyser"); } -use geyser_proto::geyser_client::GeyserClient; +use geyser_proto::accounts_db_client::AccountsDbClient; use crate::{ metrics, AccountWrite, AnyhowWrap, Config, GrpcSourceConfig, SlotStatus, SlotUpdate, @@ -77,7 +77,7 @@ async fn feed_data_geyser( } .connect() .await?; - let mut client = GeyserClient::new(channel); + let mut client = AccountsDbClient::new(channel); let mut update_stream = client .subscribe(geyser_proto::SubscribeRequest {}) diff --git a/lib/src/websocket_source.rs b/lib/src/websocket_source.rs index 0aa84bc..be93b39 100644 --- a/lib/src/websocket_source.rs +++ b/lib/src/websocket_source.rs @@ -7,7 +7,9 @@ use solana_client::{ //rpc_filter::RpcFilterType, rpc_response::{Response, RpcKeyedAccount}, }; -use solana_rpc::{rpc::OptionalContext, rpc_pubsub::RpcSolPubSubClient, rpc::rpc_accounts::AccountsDataClient}; +use solana_rpc::{ + rpc::rpc_accounts::AccountsDataClient, rpc::OptionalContext, rpc_pubsub::RpcSolPubSubClient, +}; use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey}; use log::*; @@ -36,10 +38,12 @@ async fn feed_data( let connect = ws::try_connect::(&config.rpc_ws_url).map_err_anyhow()?; let client = connect.await.map_err_anyhow()?; - let rpc_client = - http::connect_with_options::(&config.snapshot_source.rpc_http_url, true) - .await - .map_err_anyhow()?; + let rpc_client = http::connect_with_options::( + &config.snapshot_source.rpc_http_url, + true, + ) + .await + .map_err_anyhow()?; let account_info_config = RpcAccountInfoConfig { encoding: Some(UiAccountEncoding::Base64), diff --git a/proto/geyser.proto b/proto/geyser.proto index d1b0127..804c37d 100644 --- a/proto/geyser.proto +++ b/proto/geyser.proto @@ -6,7 +6,7 @@ option java_outer_classname = "GeyserProto"; package geyser; -service Geyser { +service AccountsDb { rpc Subscribe(SubscribeRequest) returns (stream Update) {} }