optional postgres

This commit is contained in:
aniketfuryrocks 2023-01-30 22:31:40 +05:30
parent 38c67c3ef5
commit e0e42e9187
No known key found for this signature in database
GPG Key ID: FA6BFCFAA7D4B764
4 changed files with 31 additions and 20 deletions

View File

@ -4,14 +4,14 @@ version = "0.1.0"
edition = "2021"
[dependencies]
solana-client = "1.14.12"
solana-sdk = "1.14.12"
solana-client = "1.14.13"
solana-sdk = "1.14.13"
log = "0.4.17"
anyhow = "1.0.68"
serde = "1.0.152"
serde_json = "1.0.91"
csv = "1.1.6"
clap = { version = "4.1.1", features = ["derive"] }
tokio = { version = "1.24.2", features = ["full", "fs"]}
clap = { version = "4.1.4", features = ["derive"] }
tokio = { version = "1.25.0", features = ["full", "fs"]}
tracing-subscriber = "0.3.16"
dirs = "4.0.0"

View File

@ -99,12 +99,18 @@ impl LiteBridge {
tx_batch_size: usize,
tx_send_interval: Duration,
clean_interval: Duration,
postgres_config: String,
) -> anyhow::Result<[JoinHandle<anyhow::Result<()>>; 10]> {
let (postgres_send, postgres_recv) = mpsc::unbounded_channel();
let (postgres_connection, postgres) = Postgres::new(postgres_config).await?;
postgres_config: Option<String>,
) -> anyhow::Result<Vec<JoinHandle<anyhow::Result<()>>>> {
let (postgres, postgres_send) = if let Some(postgres_config) = postgres_config {
let (postgres_send, postgres_recv) = mpsc::unbounded_channel();
let (postgres_connection, postgres) = Postgres::new(postgres_config).await?;
let postgres = postgres.start(postgres_recv);
let postgres = postgres.start(postgres_recv);
(Some((postgres, postgres_connection)), Some(postgres_send))
} else {
(None, None)
};
let (tx_send, tx_recv) = mpsc::unbounded_channel();
self.tx_send = Some(tx_send);
@ -113,7 +119,7 @@ impl LiteBridge {
tx_recv,
tx_batch_size,
tx_send_interval,
Some(postgres_send.clone()),
postgres_send.clone(),
);
let metrics_capture = MetricsCapture::new(self.tx_sender.clone());
@ -122,7 +128,7 @@ impl LiteBridge {
let finalized_block_listenser = self
.finalized_block_listenser
.clone()
.listen(Some(postgres_send.clone()));
.listen(postgres_send.clone());
let confirmed_block_listenser = self.confirmed_block_listenser.clone().listen(None);
let cleaner = Cleaner::new(
@ -164,18 +170,23 @@ impl LiteBridge {
(ws_server, http_server)
};
Ok([
let mut services = vec![
ws_server,
http_server,
tx_sender,
finalized_block_listenser,
confirmed_block_listenser,
postgres_connection,
postgres,
metrics_capture.capture(Some(postgres_send)),
metrics_capture.capture(postgres_send),
prometheus_sync,
cleaner,
])
];
if let Some((postgres, connection)) = postgres {
services.push(connection);
services.push(postgres);
}
Ok(services)
}
}

View File

@ -28,6 +28,6 @@ pub struct Args {
#[arg(short = 'c', long, default_value_t = DEFAULT_CLEAN_INTERVAL_MS)]
pub clean_interval_ms: u64,
/// addr to postgres
#[arg(short = 'p', long, default_value_t = String::from("host=localhost user=postgres"))]
pub postgres_config: String,
#[arg(short = 'p', long)]
pub postgres_config: Option<String>,
}

View File

@ -123,8 +123,6 @@ impl BlockListener {
pub fn listen(self, postgres: Option<PostgresMpscSend>) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
info!("Subscribing to blocks");
let commitment = self.commitment_config.commitment;
let comfirmation_status = match commitment {
@ -132,6 +130,8 @@ impl BlockListener {
_ => TransactionConfirmationStatus::Confirmed,
};
info!("Subscribing to {commitment:?} blocks");
let (mut recv, _) = self
.pub_sub_client
.block_subscribe(