grpc configurability

This commit is contained in:
Christian Kamm 2021-11-08 11:42:22 +01:00
parent d366581826
commit 0424c3c9d6
5 changed files with 45 additions and 17 deletions

View File

@ -1,6 +1,11 @@
grpc_connection_string = "http://[::1]:10000"
rpc_http_url = ""
rpc_ws_url = "" rpc_ws_url = ""
[grpc_source]
connection_string = "http://[::1]:10000"
retry_connection_sleep_secs = 30
[snapshot_source]
rpc_http_url = ""
program_id = "mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68" program_id = "mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68"
[postgres_target] [postgres_target]

View File

@ -1,6 +1,11 @@
grpc_connection_string = "http://[::1]:10000"
rpc_http_url = ""
rpc_ws_url = "" rpc_ws_url = ""
[grpc_source]
connection_string = "http://[::1]:10000"
retry_connection_sleep_secs = 30
[snapshot_source]
rpc_http_url = ""
program_id = "" program_id = ""
[postgres_target] [postgres_target]

View File

@ -74,10 +74,11 @@ async fn feed_data_accountsdb(
config: &Config, config: &Config,
sender: async_channel::Sender<Message>, sender: async_channel::Sender<Message>,
) -> Result<(), anyhow::Error> { ) -> Result<(), anyhow::Error> {
let program_id = Pubkey::from_str(&config.program_id)?; let program_id = Pubkey::from_str(&config.snapshot_source.program_id)?;
let mut client = let mut client =
AccountsDbClient::connect(Endpoint::from_str(&config.grpc_connection_string)?).await?; AccountsDbClient::connect(Endpoint::from_str(&config.grpc_source.connection_string)?)
.await?;
let mut update_stream = client let mut update_stream = client
.subscribe(accountsdb_proto::SubscribeRequest {}) .subscribe(accountsdb_proto::SubscribeRequest {})
@ -93,6 +94,9 @@ async fn feed_data_accountsdb(
let mut trigger_snapshot_on_slot = true; let mut trigger_snapshot_on_slot = true;
let mut snapshot_future = future::Fuse::terminated(); let mut snapshot_future = future::Fuse::terminated();
// The plugin sends a ping every 5s or so
let fatal_idle_timeout = Duration::from_secs(60);
loop { loop {
tokio::select! { tokio::select! {
update = update_stream.next() => { update = update_stream.next() => {
@ -102,7 +106,7 @@ async fn feed_data_accountsdb(
let update = update?; let update = update?;
if let UpdateOneof::SlotUpdate(slot_update) = update.update_oneof.as_ref().expect("invalid grpc") { if let UpdateOneof::SlotUpdate(slot_update) = update.update_oneof.as_ref().expect("invalid grpc") {
if trigger_snapshot_on_slot && slot_update.status == Status::Processed as i32 { if trigger_snapshot_on_slot && slot_update.status == Status::Processed as i32 {
snapshot_future = tokio::spawn(get_snapshot(config.rpc_http_url.clone(), program_id, slot_update.slot)).fuse(); snapshot_future = tokio::spawn(get_snapshot(config.snapshot_source.rpc_http_url.clone(), program_id, slot_update.slot)).fuse();
trigger_snapshot_on_slot = false; trigger_snapshot_on_slot = false;
} }
} }
@ -119,7 +123,7 @@ async fn feed_data_accountsdb(
.await .await
.expect("send success"); .expect("send success");
}, },
_ = tokio::time::sleep(Duration::from_secs(60)) => { _ = tokio::time::sleep(fatal_idle_timeout) => {
anyhow::bail!("accountsdb plugin hasn't sent a message in too long"); anyhow::bail!("accountsdb plugin hasn't sent a message in too long");
} }
} }
@ -145,7 +149,10 @@ pub async fn process_events(
err err
); );
} }
tokio::time::sleep(std::time::Duration::from_secs(5)).await; tokio::time::sleep(std::time::Duration::from_secs(
config.grpc_source.retry_connection_sleep_secs,
))
.await;
} }
}); });

View File

@ -70,13 +70,24 @@ pub struct PostgresConfig {
pub fatal_connection_timeout_secs: u64, pub fatal_connection_timeout_secs: u64,
} }
#[derive(Clone, Debug, Deserialize)]
pub struct GrpcSourceConfig {
pub connection_string: String,
pub retry_connection_sleep_secs: u64,
}
#[derive(Clone, Debug, Deserialize)]
pub struct SnapshotSourceConfig {
pub rpc_http_url: String,
pub program_id: String,
}
#[derive(Clone, Debug, Deserialize)] #[derive(Clone, Debug, Deserialize)]
pub struct Config { pub struct Config {
pub postgres_target: PostgresConfig, pub postgres_target: PostgresConfig,
pub grpc_connection_string: String, pub grpc_source: GrpcSourceConfig,
pub rpc_http_url: String, pub snapshot_source: SnapshotSourceConfig,
pub rpc_ws_url: String, pub rpc_ws_url: String,
pub program_id: String,
} }
#[async_trait] #[async_trait]

View File

@ -30,22 +30,22 @@ async fn feed_data(
config: &Config, config: &Config,
sender: async_channel::Sender<WebsocketMessage>, sender: async_channel::Sender<WebsocketMessage>,
) -> Result<(), anyhow::Error> { ) -> Result<(), anyhow::Error> {
let program_id = Pubkey::from_str(&config.program_id)?; let program_id = Pubkey::from_str(&config.snapshot_source.program_id)?;
let snapshot_duration = Duration::from_secs(300); let snapshot_duration = Duration::from_secs(300);
let connect = ws::try_connect::<RpcSolPubSubClient>(&config.rpc_ws_url).map_err_anyhow()?; let connect = ws::try_connect::<RpcSolPubSubClient>(&config.rpc_ws_url).map_err_anyhow()?;
let client = connect.await.map_err_anyhow()?; let client = connect.await.map_err_anyhow()?;
let rpc_client = http::connect_with_options::<FullClient>(&config.rpc_http_url, true) let rpc_client =
.await http::connect_with_options::<FullClient>(&config.snapshot_source.rpc_http_url, true)
.map_err_anyhow()?; .await
.map_err_anyhow()?;
let account_info_config = RpcAccountInfoConfig { let account_info_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64), encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::processed()), commitment: Some(CommitmentConfig::processed()),
data_slice: None, data_slice: None,
}; };
// TODO: Make addresses filters configurable
let program_accounts_config = RpcProgramAccountsConfig { let program_accounts_config = RpcProgramAccountsConfig {
filters: None, filters: None,
with_context: Some(true), with_context: Some(true),