diff --git a/connector-mango/example-config.toml b/connector-mango/example-config.toml index c19b5e1..1f44a34 100644 --- a/connector-mango/example-config.toml +++ b/connector-mango/example-config.toml @@ -19,6 +19,7 @@ program_id = "mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68" connection_string = "host=/var/run/postgresql" account_write_connection_count = 4 account_write_max_batch_size = 10 +account_write_max_queue_size = 10000 slot_update_connection_count = 4 retry_query_max_count = 3 retry_query_sleep_secs = 5 diff --git a/connector-raw/example-config.toml b/connector-raw/example-config.toml index 62a2e85..b46c18f 100644 --- a/connector-raw/example-config.toml +++ b/connector-raw/example-config.toml @@ -19,6 +19,7 @@ program_id = "" connection_string = "host=/var/run/postgresql" account_write_connection_count = 4 account_write_max_batch_size = 10 +account_write_max_queue_size = 10000 slot_update_connection_count = 2 retry_query_max_count = 3 retry_query_sleep_secs = 5 diff --git a/lib/src/grpc_plugin_source.rs b/lib/src/grpc_plugin_source.rs index 958f8d5..9dd513a 100644 --- a/lib/src/grpc_plugin_source.rs +++ b/lib/src/grpc_plugin_source.rs @@ -242,7 +242,8 @@ pub async fn process_events( metrics_sender: metrics::Metrics, ) { // Subscribe to accountsdb - let (msg_sender, msg_receiver) = async_channel::unbounded::(); + let (msg_sender, msg_receiver) = + async_channel::bounded::(config.postgres_target.account_write_max_queue_size); for grpc_source in config.grpc_sources { let msg_sender = msg_sender.clone(); let snapshot_source = config.snapshot_source.clone(); diff --git a/lib/src/lib.rs b/lib/src/lib.rs index d349de4..7f11052 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -74,6 +74,8 @@ pub struct PostgresConfig { pub account_write_connection_count: u64, /// Maximum batch size for account write inserts over one connection pub account_write_max_batch_size: usize, + /// Max size of account write queues + pub account_write_max_queue_size: usize, /// Number of parallel postgres connections used for slot insertions pub slot_update_connection_count: u64, /// Number of queries retries before fatal error diff --git a/lib/src/postgres_target.rs b/lib/src/postgres_target.rs index 4b8c77b..bd0aabf 100644 --- a/lib/src/postgres_target.rs +++ b/lib/src/postgres_target.rs @@ -338,7 +338,7 @@ pub async fn init( )> { // The actual message may want to also contain a retry count, if it self-reinserts on failure? let (account_write_queue_sender, account_write_queue_receiver) = - async_channel::unbounded::(); + async_channel::bounded::(config.account_write_max_queue_size); // Slot updates flowing from the outside into the single processing thread. From // there they'll flow into the postgres sending thread.