grpc proto: Change back to AccountsDb name to avoid incompatibility

Existing plugin deployments must continue to work with the new version
of the connector.
This commit is contained in:
Christian Kamm 2022-03-22 12:54:38 +01:00
parent b6cefddef2
commit 2b1bf634c7
5 changed files with 29 additions and 23 deletions

View File

@ -1,16 +1,16 @@
use {
crate::accounts_selector::AccountsSelector,
bs58,
geyser_proto::{
slot_update::Status as SlotUpdateStatus, update::UpdateOneof, AccountWrite, Ping,
SlotUpdate, SubscribeRequest, SubscribeResponse, Update,
},
bs58,
log::*,
serde_derive::Deserialize,
serde_json,
solana_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions,
Result as PluginResult, SlotStatus,
GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, Result as PluginResult,
SlotStatus,
},
std::collections::HashSet,
std::convert::TryInto,
@ -28,7 +28,7 @@ pub mod geyser_proto {
pub mod geyser_service {
use super::*;
use {
geyser_proto::geyser_server::Geyser,
geyser_proto::accounts_db_server::AccountsDb,
tokio_stream::wrappers::ReceiverStream,
tonic::{Code, Request, Response, Status},
};
@ -58,7 +58,7 @@ pub mod geyser_service {
}
#[tonic::async_trait]
impl Geyser for Service {
impl AccountsDb for Service {
type SubscribeStream = ReceiverStream<Result<Update, Status>>;
async fn subscribe(
@ -171,11 +171,13 @@ impl GeyserPlugin for Plugin {
}
})?;
let addr = config.bind_address.parse().map_err(|err| {
GeyserPluginError::ConfigFileReadError {
msg: format!("Error parsing the bind_address {:?}", err),
}
})?;
let addr =
config
.bind_address
.parse()
.map_err(|err| GeyserPluginError::ConfigFileReadError {
msg: format!("Error parsing the bind_address {:?}", err),
})?;
let highest_write_slot = Arc::new(AtomicU64::new(0));
let service =
@ -183,7 +185,7 @@ impl GeyserPlugin for Plugin {
let (server_exit_sender, mut server_exit_receiver) = broadcast::channel::<()>(1);
let server_broadcast = service.sender.clone();
let server = geyser_proto::geyser_server::GeyserServer::new(service);
let server = geyser_proto::accounts_db_server::AccountsDbServer::new(service);
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.spawn(Server::builder().add_service(server).serve_with_shutdown(
addr,

View File

@ -10,7 +10,7 @@ use geyser_proto::{update::UpdateOneof, SlotUpdate, SubscribeRequest, Update};
pub mod geyser_service {
use super::*;
use {
geyser_proto::geyser_server::Geyser,
geyser_proto::accounts_db_server::AccountsDb,
tokio_stream::wrappers::ReceiverStream,
tonic::{Request, Response, Status},
};
@ -28,7 +28,7 @@ pub mod geyser_service {
}
#[tonic::async_trait]
impl Geyser for Service {
impl AccountsDb for Service {
type SubscribeStream = ReceiverStream<Result<Update, Status>>;
async fn subscribe(
@ -55,7 +55,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let service = geyser_service::Service::new();
let sender = service.sender.clone();
let svc = geyser_proto::geyser_server::GeyserServer::new(service);
let svc = geyser_proto::accounts_db_server::AccountsDbServer::new(service);
tokio::spawn(async move {
let mut slot = 1;

View File

@ -4,7 +4,7 @@ use jsonrpc_core_client::transports::http;
use solana_account_decoder::UiAccountEncoding;
use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
use solana_client::rpc_response::{Response, RpcKeyedAccount};
use solana_rpc::{rpc::OptionalContext, rpc::rpc_accounts::AccountsDataClient};
use solana_rpc::{rpc::rpc_accounts::AccountsDataClient, rpc::OptionalContext};
use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey};
use futures::{future, future::FutureExt};
@ -16,7 +16,7 @@ use std::{collections::HashMap, str::FromStr, time::Duration};
pub mod geyser_proto {
tonic::include_proto!("geyser");
}
use geyser_proto::geyser_client::GeyserClient;
use geyser_proto::accounts_db_client::AccountsDbClient;
use crate::{
metrics, AccountWrite, AnyhowWrap, Config, GrpcSourceConfig, SlotStatus, SlotUpdate,
@ -77,7 +77,7 @@ async fn feed_data_geyser(
}
.connect()
.await?;
let mut client = GeyserClient::new(channel);
let mut client = AccountsDbClient::new(channel);
let mut update_stream = client
.subscribe(geyser_proto::SubscribeRequest {})

View File

@ -7,7 +7,9 @@ use solana_client::{
//rpc_filter::RpcFilterType,
rpc_response::{Response, RpcKeyedAccount},
};
use solana_rpc::{rpc::OptionalContext, rpc_pubsub::RpcSolPubSubClient, rpc::rpc_accounts::AccountsDataClient};
use solana_rpc::{
rpc::rpc_accounts::AccountsDataClient, rpc::OptionalContext, rpc_pubsub::RpcSolPubSubClient,
};
use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey};
use log::*;
@ -36,10 +38,12 @@ async fn feed_data(
let connect = ws::try_connect::<RpcSolPubSubClient>(&config.rpc_ws_url).map_err_anyhow()?;
let client = connect.await.map_err_anyhow()?;
let rpc_client =
http::connect_with_options::<AccountsDataClient>(&config.snapshot_source.rpc_http_url, true)
.await
.map_err_anyhow()?;
let rpc_client = http::connect_with_options::<AccountsDataClient>(
&config.snapshot_source.rpc_http_url,
true,
)
.await
.map_err_anyhow()?;
let account_info_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),

View File

@ -6,7 +6,7 @@ option java_outer_classname = "GeyserProto";
package geyser;
service Geyser {
service AccountsDb {
rpc Subscribe(SubscribeRequest) returns (stream Update) {}
}