gRPC: Optionally use TLS

This commit is contained in:
Christian Kamm 2021-11-18 14:21:17 +01:00
parent 4c9b782d1f
commit b65815e4d4
6 changed files with 54 additions and 6 deletions

1
Cargo.lock generated
View File

@ -5904,6 +5904,7 @@ dependencies = [
"prost 0.9.0", "prost 0.9.0",
"prost-derive 0.9.0", "prost-derive 0.9.0",
"tokio", "tokio",
"tokio-rustls",
"tokio-stream", "tokio-stream",
"tokio-util", "tokio-util",
"tower", "tower",

View File

@ -5,6 +5,11 @@ name = "server"
connection_string = "http://[::1]:10000" connection_string = "http://[::1]:10000"
retry_connection_sleep_secs = 30 retry_connection_sleep_secs = 30
#[grpc_sources.tls]
#ca_cert_path = "ca.pem"
#client_cert_path = "client.pem"
#client_key_path = "client.pem"
[snapshot_source] [snapshot_source]
rpc_http_url = "" rpc_http_url = ""
program_id = "mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68" program_id = "mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68"

View File

@ -5,6 +5,11 @@ name = "server"
connection_string = "http://[::1]:10000" connection_string = "http://[::1]:10000"
retry_connection_sleep_secs = 30 retry_connection_sleep_secs = 30
#[grpc_sources.tls]
#ca_cert_path = "ca.pem"
#client_cert_path = "client.pem"
#client_key_path = "client.pem"
[snapshot_source] [snapshot_source]
rpc_http_url = "" rpc_http_url = ""
program_id = "" program_id = ""

View File

@ -30,7 +30,7 @@ serde = "1.0.130"
serde_derive = "1.0.130" serde_derive = "1.0.130"
serde_json = "1.0.68" serde_json = "1.0.68"
tonic = "0.6" tonic = { version = "0.6", features = ["tls"] }
prost = "0.9" prost = "0.9"
bs58 = "0.3.1" bs58 = "0.3.1"

View File

@ -8,7 +8,7 @@ use solana_rpc::{rpc::rpc_full::FullClient, rpc::OptionalContext};
use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey}; use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey};
use futures::{future, future::FutureExt}; use futures::{future, future::FutureExt};
use tonic::transport::Endpoint; use tonic::transport::{Certificate, ClientTlsConfig, Endpoint, Identity};
use log::*; use log::*;
use std::{collections::HashMap, str::FromStr, time::Duration}; use std::{collections::HashMap, str::FromStr, time::Duration};
@ -20,7 +20,7 @@ use accountsdb_proto::accounts_db_client::AccountsDbClient;
use crate::{ use crate::{
metrics, AccountWrite, AnyhowWrap, Config, GrpcSourceConfig, SlotStatus, SlotUpdate, metrics, AccountWrite, AnyhowWrap, Config, GrpcSourceConfig, SlotStatus, SlotUpdate,
SnapshotSourceConfig, SnapshotSourceConfig, TlsConfig,
}; };
type SnapshotData = Response<Vec<RpcKeyedAccount>>; type SnapshotData = Response<Vec<RpcKeyedAccount>>;
@ -75,13 +75,21 @@ async fn get_snapshot(
async fn feed_data_accountsdb( async fn feed_data_accountsdb(
grpc_config: &GrpcSourceConfig, grpc_config: &GrpcSourceConfig,
tls_config: Option<ClientTlsConfig>,
snapshot_config: &SnapshotSourceConfig, snapshot_config: &SnapshotSourceConfig,
sender: async_channel::Sender<Message>, sender: async_channel::Sender<Message>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let program_id = Pubkey::from_str(&snapshot_config.program_id)?; let program_id = Pubkey::from_str(&snapshot_config.program_id)?;
let mut client = let endpoint = Endpoint::from_str(&grpc_config.connection_string)?;
AccountsDbClient::connect(Endpoint::from_str(&grpc_config.connection_string)?).await?; let channel = if let Some(tls) = tls_config {
endpoint.tls_config(tls)?
} else {
endpoint
}
.connect()
.await?;
let mut client = AccountsDbClient::new(channel);
let mut update_stream = client let mut update_stream = client
.subscribe(accountsdb_proto::SubscribeRequest {}) .subscribe(accountsdb_proto::SubscribeRequest {})
@ -141,6 +149,18 @@ async fn feed_data_accountsdb(
} }
} }
fn make_tls_config(config: &TlsConfig) -> ClientTlsConfig {
let server_root_ca_cert =
std::fs::read(&config.ca_cert_path).expect("reading server root ca cert");
let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
let client_cert = std::fs::read(&config.client_cert_path).expect("reading client cert");
let client_key = std::fs::read(&config.client_key_path).expect("reading client key");
let client_identity = Identity::from_pem(client_cert, client_key);
ClientTlsConfig::new()
.ca_certificate(server_root_ca_cert)
.identity(client_identity)
}
pub async fn process_events( pub async fn process_events(
config: Config, config: Config,
account_write_queue_sender: async_channel::Sender<AccountWrite>, account_write_queue_sender: async_channel::Sender<AccountWrite>,
@ -153,6 +173,10 @@ pub async fn process_events(
let msg_sender = msg_sender.clone(); let msg_sender = msg_sender.clone();
let snapshot_source = config.snapshot_source.clone(); let snapshot_source = config.snapshot_source.clone();
let metrics_sender = metrics_sender.clone(); let metrics_sender = metrics_sender.clone();
// Make TLS config if configured
let tls_config = grpc_source.tls.as_ref().map(make_tls_config);
tokio::spawn(async move { tokio::spawn(async move {
let mut metric_retries = metrics_sender.register_u64(format!( let mut metric_retries = metrics_sender.register_u64(format!(
"grpc_source_{}_connection_retries", "grpc_source_{}_connection_retries",
@ -164,7 +188,12 @@ pub async fn process_events(
// Continuously reconnect on failure // Continuously reconnect on failure
loop { loop {
metric_status.set("connected".into()); metric_status.set("connected".into());
let out = feed_data_accountsdb(&grpc_source, &snapshot_source, msg_sender.clone()); let out = feed_data_accountsdb(
&grpc_source,
tls_config.clone(),
&snapshot_source,
msg_sender.clone(),
);
let result = out.await; let result = out.await;
assert!(result.is_err()); assert!(result.is_err());
if let Err(err) = result { if let Err(err) = result {

View File

@ -84,11 +84,19 @@ pub struct PostgresConfig {
pub allow_invalid_certs: bool, pub allow_invalid_certs: bool,
} }
#[derive(Clone, Debug, Deserialize)]
pub struct TlsConfig {
pub ca_cert_path: String,
pub client_cert_path: String,
pub client_key_path: String,
}
#[derive(Clone, Debug, Deserialize)] #[derive(Clone, Debug, Deserialize)]
pub struct GrpcSourceConfig { pub struct GrpcSourceConfig {
pub name: String, pub name: String,
pub connection_string: String, pub connection_string: String,
pub retry_connection_sleep_secs: u64, pub retry_connection_sleep_secs: u64,
pub tls: Option<TlsConfig>,
} }
#[derive(Clone, Debug, Deserialize)] #[derive(Clone, Debug, Deserialize)]