Bigtable: update google proto files and allow configuration of max_message_size (#34740)

* Update proto files with tonic-build v0.9.2

* Manually ignore invalid doc-tests

* Add new ReadRowsRequest fields

* Add LedgerStorageConfig::max_message_size and default value

* Add BigtableConnection::max_message_size and use on client creation

* Add max_message_size to RpcBigtableConfig and make const pub

* Add solana-validator cli arg
This commit is contained in:
Tyera 2024-01-10 21:20:15 -07:00 committed by GitHub
parent 13b8d02170
commit 166be2995e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1777 additions and 116 deletions

View File

@ -653,6 +653,7 @@ async fn get_bigtable(
credential_type: CredentialType::Filepath(Some(args.crediential_path.unwrap())),
instance_name: args.instance_name,
app_profile_id: args.app_profile_id,
max_message_size: solana_storage_bigtable::DEFAULT_MAX_MESSAGE_SIZE,
},
)
.await

View File

@ -170,6 +170,7 @@ pub struct RpcBigtableConfig {
pub bigtable_instance_name: String,
pub bigtable_app_profile_id: String,
pub timeout: Option<Duration>,
pub max_message_size: usize,
}
impl Default for RpcBigtableConfig {
@ -181,6 +182,7 @@ impl Default for RpcBigtableConfig {
bigtable_instance_name,
bigtable_app_profile_id,
timeout: None,
max_message_size: solana_storage_bigtable::DEFAULT_MAX_MESSAGE_SIZE,
}
}
}

View File

@ -406,6 +406,7 @@ impl JsonRpcService {
ref bigtable_instance_name,
ref bigtable_app_profile_id,
timeout,
max_message_size,
}) = config.rpc_bigtable_config
{
let bigtable_config = solana_storage_bigtable::LedgerStorageConfig {
@ -414,6 +415,7 @@ impl JsonRpcService {
credential_type: CredentialType::Filepath(None),
instance_name: bigtable_instance_name.clone(),
app_profile_id: bigtable_app_profile_id.clone(),
max_message_size,
};
runtime
.block_on(solana_storage_bigtable::LedgerStorage::new_with_config(

File diff suppressed because it is too large Load Diff

View File

@ -246,7 +246,7 @@ pub mod value_range {
/// RowFilter.Chain and RowFilter.Interleave documentation.
///
/// The total serialized size of a RowFilter message must not
/// exceed 4096 bytes, and RowFilters may not be nested within each other
/// exceed 20480 bytes, and RowFilters may not be nested within each other
/// (in Chains or Interleaves) to a depth of more than 20.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
@ -623,6 +623,130 @@ pub mod read_modify_write_rule {
IncrementAmount(i64),
}
}
/// NOTE: This API is intended to be used by Apache Beam BigtableIO.
/// A partition of a change stream.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct StreamPartition {
/// The row range covered by this partition and is specified by
/// [`start_key_closed`, `end_key_open`).
#[prost(message, optional, tag = "1")]
pub row_range: ::core::option::Option<RowRange>,
}
/// NOTE: This API is intended to be used by Apache Beam BigtableIO.
/// The information required to continue reading the data from multiple
/// `StreamPartitions` from where a previous read left off.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct StreamContinuationTokens {
/// List of continuation tokens.
#[prost(message, repeated, tag = "1")]
pub tokens: ::prost::alloc::vec::Vec<StreamContinuationToken>,
}
/// NOTE: This API is intended to be used by Apache Beam BigtableIO.
/// The information required to continue reading the data from a
/// `StreamPartition` from where a previous read left off.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct StreamContinuationToken {
/// The partition that this token applies to.
#[prost(message, optional, tag = "1")]
pub partition: ::core::option::Option<StreamPartition>,
/// An encoded position in the stream to restart reading from.
#[prost(string, tag = "2")]
pub token: ::prost::alloc::string::String,
}
/// ReadIterationStats captures information about the iteration of rows or cells
/// over the course of a read, e.g. how many results were scanned in a read
/// operation versus the results returned.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ReadIterationStats {
/// The rows seen (scanned) as part of the request. This includes the count of
/// rows returned, as captured below.
#[prost(int64, tag = "1")]
pub rows_seen_count: i64,
/// The rows returned as part of the request.
#[prost(int64, tag = "2")]
pub rows_returned_count: i64,
/// The cells seen (scanned) as part of the request. This includes the count of
/// cells returned, as captured below.
#[prost(int64, tag = "3")]
pub cells_seen_count: i64,
/// The cells returned as part of the request.
#[prost(int64, tag = "4")]
pub cells_returned_count: i64,
}
/// RequestLatencyStats provides a measurement of the latency of the request as
/// it interacts with different systems over its lifetime, e.g. how long the
/// request took to execute within a frontend server.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RequestLatencyStats {
/// The latency measured by the frontend server handling this request, from
/// when the request was received, to when this value is sent back in the
/// response. For more context on the component that is measuring this latency,
/// see: <https://cloud.google.com/bigtable/docs/overview>
///
/// Note: This value may be slightly shorter than the value reported into
/// aggregate latency metrics in Monitoring for this request
/// (<https://cloud.google.com/bigtable/docs/monitoring-instance>) as this value
/// needs to be sent in the response before the latency measurement including
/// that transmission is finalized.
///
/// Note: This value includes the end-to-end latency of contacting nodes in
/// the targeted cluster, e.g. measuring from when the first byte arrives at
/// the frontend server, to when this value is sent back as the last value in
/// the response, including any latency incurred by contacting nodes, waiting
/// for results from nodes, and finally sending results from nodes back to the
/// caller.
#[prost(message, optional, tag = "1")]
pub frontend_server_latency: ::core::option::Option<::prost_types::Duration>,
}
/// FullReadStatsView captures all known information about a read.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FullReadStatsView {
/// Iteration stats describe how efficient the read is, e.g. comparing
/// rows seen vs. rows returned or cells seen vs cells returned can provide an
/// indication of read efficiency (the higher the ratio of seen to retuned the
/// better).
#[prost(message, optional, tag = "1")]
pub read_iteration_stats: ::core::option::Option<ReadIterationStats>,
/// Request latency stats describe the time taken to complete a request, from
/// the server side.
#[prost(message, optional, tag = "2")]
pub request_latency_stats: ::core::option::Option<RequestLatencyStats>,
}
/// RequestStats is the container for additional information pertaining to a
/// single request, helpful for evaluating the performance of the sent request.
/// Currently, there are the following supported methods:
/// * google.bigtable.v2.ReadRows
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RequestStats {
/// Information pertaining to each request type received. The type is chosen
/// based on the requested view.
///
/// See the messages above for additional context.
#[prost(oneof = "request_stats::StatsView", tags = "1")]
pub stats_view: ::core::option::Option<request_stats::StatsView>,
}
/// Nested message and enum types in `RequestStats`.
pub mod request_stats {
/// Information pertaining to each request type received. The type is chosen
/// based on the requested view.
///
/// See the messages above for additional context.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum StatsView {
/// Available with the ReadRowsRequest.RequestStatsView.REQUEST_STATS_FULL
/// view, see package google.bigtable.v2.
#[prost(message, tag = "1")]
FullReadStatsView(super::FullReadStatsView),
}
}
/// Request message for Bigtable.ReadRows.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
@ -636,17 +760,85 @@ pub struct ReadRowsRequest {
/// "default" application profile will be used.
#[prost(string, tag = "5")]
pub app_profile_id: ::prost::alloc::string::String,
/// The row keys and/or ranges to read. If not specified, reads from all rows.
/// The row keys and/or ranges to read sequentially. If not specified, reads
/// from all rows.
#[prost(message, optional, tag = "2")]
pub rows: ::core::option::Option<RowSet>,
/// The filter to apply to the contents of the specified row(s). If unset,
/// reads the entirety of each row.
#[prost(message, optional, tag = "3")]
pub filter: ::core::option::Option<RowFilter>,
/// The read will terminate after committing to N rows' worth of results. The
/// The read will stop after committing to N rows' worth of results. The
/// default (zero) is to return all results.
#[prost(int64, tag = "4")]
pub rows_limit: i64,
/// The view into RequestStats, as described above.
#[prost(enumeration = "read_rows_request::RequestStatsView", tag = "6")]
pub request_stats_view: i32,
/// Experimental API - Please note that this API is currently experimental
/// and can change in the future.
///
/// Return rows in lexiographical descending order of the row keys. The row
/// contents will not be affected by this flag.
///
/// Example result set:
///```ignore
/// [
/// {key: "k2", "f:col1": "v1", "f:col2": "v1"},
/// {key: "k1", "f:col1": "v2", "f:col2": "v2"}
/// ]
#[prost(bool, tag = "7")]
pub reversed: bool,
}
/// Nested message and enum types in `ReadRowsRequest`.
pub mod read_rows_request {
/// The desired view into RequestStats that should be returned in the response.
///
/// See also: RequestStats message.
#[derive(
Clone,
Copy,
Debug,
PartialEq,
Eq,
Hash,
PartialOrd,
Ord,
::prost::Enumeration
)]
#[repr(i32)]
pub enum RequestStatsView {
/// The default / unset value. The API will default to the NONE option below.
Unspecified = 0,
/// Do not include any RequestStats in the response. This will leave the
/// RequestStats embedded message unset in the response.
RequestStatsNone = 1,
/// Include the full set of available RequestStats in the response,
/// applicable to this read.
RequestStatsFull = 2,
}
impl RequestStatsView {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
RequestStatsView::Unspecified => "REQUEST_STATS_VIEW_UNSPECIFIED",
RequestStatsView::RequestStatsNone => "REQUEST_STATS_NONE",
RequestStatsView::RequestStatsFull => "REQUEST_STATS_FULL",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"REQUEST_STATS_VIEW_UNSPECIFIED" => Some(Self::Unspecified),
"REQUEST_STATS_NONE" => Some(Self::RequestStatsNone),
"REQUEST_STATS_FULL" => Some(Self::RequestStatsFull),
_ => None,
}
}
}
}
/// Response message for Bigtable.ReadRows.
#[allow(clippy::derive_partial_eq_without_eq)]
@ -664,6 +856,28 @@ pub struct ReadRowsResponse {
/// key, allowing the client to skip that work on a retry.
#[prost(bytes = "vec", tag = "2")]
pub last_scanned_row_key: ::prost::alloc::vec::Vec<u8>,
///
/// If requested, provide enhanced query performance statistics. The semantics
/// dictate:
/// * request_stats is empty on every (streamed) response, except
/// * request_stats has non-empty information after all chunks have been
/// streamed, where the ReadRowsResponse message only contains
/// request_stats.
/// * For example, if a read request would have returned an empty
/// response instead a single ReadRowsResponse is streamed with empty
/// chunks and request_stats filled.
///
/// Visually, response messages will stream as follows:
/// ... -> {chunks: \[...\]} -> {chunks: [], request_stats: {...}}
/// \______________________/ \________________________________/
/// Primary response Trailer of RequestStats info
///
/// Or if the read did not return any values:
/// {chunks: [], request_stats: {...}}
/// \________________________________/
/// Trailer of RequestStats info
#[prost(message, optional, tag = "3")]
pub request_stats: ::core::option::Option<RequestStats>,
}
/// Nested message and enum types in `ReadRowsResponse`.
pub mod read_rows_response {
@ -780,8 +994,8 @@ pub struct SampleRowKeysResponse {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct MutateRowRequest {
/// Required. The unique name of the table to which the mutation should be applied.
/// Values are of the form
/// Required. The unique name of the table to which the mutation should be
/// applied. Values are of the form
/// `projects/<project>/instances/<instance>/tables/<table>`.
#[prost(string, tag = "1")]
pub table_name: ::prost::alloc::string::String,
@ -792,9 +1006,9 @@ pub struct MutateRowRequest {
/// Required. The key of the row to which the mutation should be applied.
#[prost(bytes = "vec", tag = "2")]
pub row_key: ::prost::alloc::vec::Vec<u8>,
/// Required. Changes to be atomically applied to the specified row. Entries are applied
/// in order, meaning that earlier mutations can be masked by later ones.
/// Must contain at least one entry and at most 100000.
/// Required. Changes to be atomically applied to the specified row. Entries
/// are applied in order, meaning that earlier mutations can be masked by later
/// ones. Must contain at least one entry and at most 100000.
#[prost(message, repeated, tag = "3")]
pub mutations: ::prost::alloc::vec::Vec<Mutation>,
}
@ -806,7 +1020,8 @@ pub struct MutateRowResponse {}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct MutateRowsRequest {
/// Required. The unique name of the table to which the mutations should be applied.
/// Required. The unique name of the table to which the mutations should be
/// applied.
#[prost(string, tag = "1")]
pub table_name: ::prost::alloc::string::String,
/// This value specifies routing for replication. If not specified, the
@ -830,10 +1045,9 @@ pub mod mutate_rows_request {
/// The key of the row to which the `mutations` should be applied.
#[prost(bytes = "vec", tag = "1")]
pub row_key: ::prost::alloc::vec::Vec<u8>,
/// Required. Changes to be atomically applied to the specified row. Mutations are
/// applied in order, meaning that earlier mutations can be masked by
/// later ones.
/// You must specify at least one mutation.
/// Required. Changes to be atomically applied to the specified row.
/// Mutations are applied in order, meaning that earlier mutations can be
/// masked by later ones. You must specify at least one mutation.
#[prost(message, repeated, tag = "2")]
pub mutations: ::prost::alloc::vec::Vec<super::Mutation>,
}
@ -845,6 +1059,11 @@ pub struct MutateRowsResponse {
/// One or more results for Entries from the batch request.
#[prost(message, repeated, tag = "1")]
pub entries: ::prost::alloc::vec::Vec<mutate_rows_response::Entry>,
/// Information about how client should limit the rate (QPS). Primirily used by
/// supported official Cloud Bigtable clients. If unset, the rate limit info is
/// not provided by the server.
#[prost(message, optional, tag = "3")]
pub rate_limit_info: ::core::option::Option<RateLimitInfo>,
}
/// Nested message and enum types in `MutateRowsResponse`.
pub mod mutate_rows_response {
@ -864,13 +1083,36 @@ pub mod mutate_rows_response {
pub status: ::core::option::Option<super::super::super::rpc::Status>,
}
}
/// Information about how client should adjust the load to Bigtable.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RateLimitInfo {
/// Time that clients should wait before adjusting the target rate again.
/// If clients adjust rate too frequently, the impact of the previous
/// adjustment may not have been taken into account and may
/// over-throttle or under-throttle. If clients adjust rate too slowly, they
/// will not be responsive to load changes on server side, and may
/// over-throttle or under-throttle.
#[prost(message, optional, tag = "1")]
pub period: ::core::option::Option<::prost_types::Duration>,
/// If it has been at least one `period` since the last load adjustment, the
/// client should multiply the current load by this value to get the new target
/// load. For example, if the current load is 100 and `factor` is 0.8, the new
/// target load should be 80. After adjusting, the client should ignore
/// `factor` until another `period` has passed.
///
/// The client can measure its load using any unit that's comparable over time
/// For example, QPS can be used as long as each request involves a similar
/// amount of work.
#[prost(double, tag = "2")]
pub factor: f64,
}
/// Request message for Bigtable.CheckAndMutateRow.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CheckAndMutateRowRequest {
/// Required. The unique name of the table to which the conditional mutation should be
/// applied.
/// Values are of the form
/// Required. The unique name of the table to which the conditional mutation
/// should be applied. Values are of the form
/// `projects/<project>/instances/<instance>/tables/<table>`.
#[prost(string, tag = "1")]
pub table_name: ::prost::alloc::string::String,
@ -878,7 +1120,8 @@ pub struct CheckAndMutateRowRequest {
/// "default" application profile will be used.
#[prost(string, tag = "7")]
pub app_profile_id: ::prost::alloc::string::String,
/// Required. The key of the row to which the conditional mutation should be applied.
/// Required. The key of the row to which the conditional mutation should be
/// applied.
#[prost(bytes = "vec", tag = "2")]
pub row_key: ::prost::alloc::vec::Vec<u8>,
/// The filter to be applied to the contents of the specified row. Depending
@ -911,13 +1154,30 @@ pub struct CheckAndMutateRowResponse {
#[prost(bool, tag = "1")]
pub predicate_matched: bool,
}
/// Request message for client connection keep-alive and warming.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PingAndWarmRequest {
/// Required. The unique name of the instance to check permissions for as well
/// as respond. Values are of the form
/// `projects/<project>/instances/<instance>`.
#[prost(string, tag = "1")]
pub name: ::prost::alloc::string::String,
/// This value specifies routing for replication. If not specified, the
/// "default" application profile will be used.
#[prost(string, tag = "2")]
pub app_profile_id: ::prost::alloc::string::String,
}
/// Response message for Bigtable.PingAndWarm connection keepalive and warming.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PingAndWarmResponse {}
/// Request message for Bigtable.ReadModifyWriteRow.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ReadModifyWriteRowRequest {
/// Required. The unique name of the table to which the read/modify/write rules should be
/// applied.
/// Values are of the form
/// Required. The unique name of the table to which the read/modify/write rules
/// should be applied. Values are of the form
/// `projects/<project>/instances/<instance>/tables/<table>`.
#[prost(string, tag = "1")]
pub table_name: ::prost::alloc::string::String,
@ -925,12 +1185,13 @@ pub struct ReadModifyWriteRowRequest {
/// "default" application profile will be used.
#[prost(string, tag = "4")]
pub app_profile_id: ::prost::alloc::string::String,
/// Required. The key of the row to which the read/modify/write rules should be applied.
/// Required. The key of the row to which the read/modify/write rules should be
/// applied.
#[prost(bytes = "vec", tag = "2")]
pub row_key: ::prost::alloc::vec::Vec<u8>,
/// Required. Rules specifying how the specified row's contents are to be transformed
/// into writes. Entries are applied in order, meaning that earlier rules will
/// affect the results of later ones.
/// Required. Rules specifying how the specified row's contents are to be
/// transformed into writes. Entries are applied in order, meaning that earlier
/// rules will affect the results of later ones.
#[prost(message, repeated, tag = "3")]
pub rules: ::prost::alloc::vec::Vec<ReadModifyWriteRule>,
}
@ -942,6 +1203,312 @@ pub struct ReadModifyWriteRowResponse {
#[prost(message, optional, tag = "1")]
pub row: ::core::option::Option<Row>,
}
/// NOTE: This API is intended to be used by Apache Beam BigtableIO.
/// Request message for Bigtable.GenerateInitialChangeStreamPartitions.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GenerateInitialChangeStreamPartitionsRequest {
/// Required. The unique name of the table from which to get change stream
/// partitions. Values are of the form
/// `projects/<project>/instances/<instance>/tables/<table>`.
/// Change streaming must be enabled on the table.
#[prost(string, tag = "1")]
pub table_name: ::prost::alloc::string::String,
/// This value specifies routing for replication. If not specified, the
/// "default" application profile will be used.
/// Single cluster routing must be configured on the profile.
#[prost(string, tag = "2")]
pub app_profile_id: ::prost::alloc::string::String,
}
/// NOTE: This API is intended to be used by Apache Beam BigtableIO.
/// Response message for Bigtable.GenerateInitialChangeStreamPartitions.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GenerateInitialChangeStreamPartitionsResponse {
/// A partition of the change stream.
#[prost(message, optional, tag = "1")]
pub partition: ::core::option::Option<StreamPartition>,
}
/// NOTE: This API is intended to be used by Apache Beam BigtableIO.
/// Request message for Bigtable.ReadChangeStream.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ReadChangeStreamRequest {
/// Required. The unique name of the table from which to read a change stream.
/// Values are of the form
/// `projects/<project>/instances/<instance>/tables/<table>`.
/// Change streaming must be enabled on the table.
#[prost(string, tag = "1")]
pub table_name: ::prost::alloc::string::String,
/// This value specifies routing for replication. If not specified, the
/// "default" application profile will be used.
/// Single cluster routing must be configured on the profile.
#[prost(string, tag = "2")]
pub app_profile_id: ::prost::alloc::string::String,
/// The partition to read changes from.
#[prost(message, optional, tag = "3")]
pub partition: ::core::option::Option<StreamPartition>,
/// If specified, OK will be returned when the stream advances beyond
/// this time. Otherwise, changes will be continuously delivered on the stream.
/// This value is inclusive and will be truncated to microsecond granularity.
#[prost(message, optional, tag = "5")]
pub end_time: ::core::option::Option<::prost_types::Timestamp>,
/// If specified, the duration between `Heartbeat` messages on the stream.
/// Otherwise, defaults to 5 seconds.
#[prost(message, optional, tag = "7")]
pub heartbeat_duration: ::core::option::Option<::prost_types::Duration>,
/// Options for describing where we want to start reading from the stream.
#[prost(oneof = "read_change_stream_request::StartFrom", tags = "4, 6")]
pub start_from: ::core::option::Option<read_change_stream_request::StartFrom>,
}
/// Nested message and enum types in `ReadChangeStreamRequest`.
pub mod read_change_stream_request {
/// Options for describing where we want to start reading from the stream.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum StartFrom {
/// Start reading the stream at the specified timestamp. This timestamp must
/// be within the change stream retention period, less than or equal to the
/// current time, and after change stream creation, whichever is greater.
/// This value is inclusive and will be truncated to microsecond granularity.
#[prost(message, tag = "4")]
StartTime(::prost_types::Timestamp),
/// Tokens that describe how to resume reading a stream where reading
/// previously left off. If specified, changes will be read starting at the
/// the position. Tokens are delivered on the stream as part of `Heartbeat`
/// and `CloseStream` messages.
///
/// If a single token is provided, the tokens partition must exactly match
/// the requests partition. If multiple tokens are provided, as in the case
/// of a partition merge, the union of the token partitions must exactly
/// cover the requests partition. Otherwise, INVALID_ARGUMENT will be
/// returned.
#[prost(message, tag = "6")]
ContinuationTokens(super::StreamContinuationTokens),
}
}
/// NOTE: This API is intended to be used by Apache Beam BigtableIO.
/// Response message for Bigtable.ReadChangeStream.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ReadChangeStreamResponse {
/// The data or control message on the stream.
#[prost(oneof = "read_change_stream_response::StreamRecord", tags = "1, 2, 3")]
pub stream_record: ::core::option::Option<read_change_stream_response::StreamRecord>,
}
/// Nested message and enum types in `ReadChangeStreamResponse`.
pub mod read_change_stream_response {
/// A partial or complete mutation.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct MutationChunk {
/// If set, then the mutation is a `SetCell` with a chunked value across
/// multiple messages.
#[prost(message, optional, tag = "1")]
pub chunk_info: ::core::option::Option<mutation_chunk::ChunkInfo>,
/// If this is a continuation of a chunked message (`chunked_value_offset` >
/// 0), ignore all fields except the `SetCell`'s value and merge it with
/// the previous message by concatenating the value fields.
#[prost(message, optional, tag = "2")]
pub mutation: ::core::option::Option<super::Mutation>,
}
/// Nested message and enum types in `MutationChunk`.
pub mod mutation_chunk {
/// Information about the chunking of this mutation.
/// Only `SetCell` mutations can be chunked, and all chunks for a `SetCell`
/// will be delivered contiguously with no other mutation types interleaved.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ChunkInfo {
/// The total value size of all the chunks that make up the `SetCell`.
#[prost(int32, tag = "1")]
pub chunked_value_size: i32,
/// The byte offset of this chunk into the total value size of the
/// mutation.
#[prost(int32, tag = "2")]
pub chunked_value_offset: i32,
/// When true, this is the last chunk of a chunked `SetCell`.
#[prost(bool, tag = "3")]
pub last_chunk: bool,
}
}
/// A message corresponding to one or more mutations to the partition
/// being streamed. A single logical `DataChange` message may also be split
/// across a sequence of multiple individual messages. Messages other than
/// the first in a sequence will only have the `type` and `chunks` fields
/// populated, with the final message in the sequence also containing `done`
/// set to true.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DataChange {
/// The type of the mutation.
#[prost(enumeration = "data_change::Type", tag = "1")]
pub r#type: i32,
/// The cluster where the mutation was applied.
/// Not set when `type` is `GARBAGE_COLLECTION`.
#[prost(string, tag = "2")]
pub source_cluster_id: ::prost::alloc::string::String,
/// The row key for all mutations that are part of this `DataChange`.
/// If the `DataChange` is chunked across multiple messages, then this field
/// will only be set for the first message.
#[prost(bytes = "vec", tag = "3")]
pub row_key: ::prost::alloc::vec::Vec<u8>,
/// The timestamp at which the mutation was applied on the Bigtable server.
#[prost(message, optional, tag = "4")]
pub commit_timestamp: ::core::option::Option<::prost_types::Timestamp>,
/// A value that lets stream consumers reconstruct Bigtable's
/// conflict resolution semantics.
/// <https://cloud.google.com/bigtable/docs/writes#conflict-resolution>
/// In the event that the same row key, column family, column qualifier,
/// timestamp are modified on different clusters at the same
/// `commit_timestamp`, the mutation with the larger `tiebreaker` will be the
/// one chosen for the eventually consistent state of the system.
#[prost(int32, tag = "5")]
pub tiebreaker: i32,
/// The mutations associated with this change to the partition.
/// May contain complete mutations or chunks of a multi-message chunked
/// `DataChange` record.
#[prost(message, repeated, tag = "6")]
pub chunks: ::prost::alloc::vec::Vec<MutationChunk>,
/// When true, indicates that the entire `DataChange` has been read
/// and the client can safely process the message.
#[prost(bool, tag = "8")]
pub done: bool,
/// An encoded position for this stream's partition to restart reading from.
/// This token is for the StreamPartition from the request.
#[prost(string, tag = "9")]
pub token: ::prost::alloc::string::String,
/// An estimate of the commit timestamp that is usually lower than or equal
/// to any timestamp for a record that will be delivered in the future on the
/// stream. It is possible that, under particular circumstances that a future
/// record has a timestamp is is lower than a previously seen timestamp. For
/// an example usage see
/// <https://beam.apache.org/documentation/basics/#watermarks>
#[prost(message, optional, tag = "10")]
pub estimated_low_watermark: ::core::option::Option<::prost_types::Timestamp>,
}
/// Nested message and enum types in `DataChange`.
pub mod data_change {
/// The type of mutation.
#[derive(
Clone,
Copy,
Debug,
PartialEq,
Eq,
Hash,
PartialOrd,
Ord,
::prost::Enumeration
)]
#[repr(i32)]
pub enum Type {
/// The type is unspecified.
Unspecified = 0,
/// A user-initiated mutation.
User = 1,
/// A system-initiated mutation as part of garbage collection.
/// <https://cloud.google.com/bigtable/docs/garbage-collection>
GarbageCollection = 2,
/// This is a continuation of a multi-message change.
Continuation = 3,
}
impl Type {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
Type::Unspecified => "TYPE_UNSPECIFIED",
Type::User => "USER",
Type::GarbageCollection => "GARBAGE_COLLECTION",
Type::Continuation => "CONTINUATION",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"TYPE_UNSPECIFIED" => Some(Self::Unspecified),
"USER" => Some(Self::User),
"GARBAGE_COLLECTION" => Some(Self::GarbageCollection),
"CONTINUATION" => Some(Self::Continuation),
_ => None,
}
}
}
}
/// A periodic message with information that can be used to checkpoint
/// the state of a stream.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Heartbeat {
/// A token that can be provided to a subsequent `ReadChangeStream` call
/// to pick up reading at the current stream position.
#[prost(message, optional, tag = "1")]
pub continuation_token: ::core::option::Option<super::StreamContinuationToken>,
/// An estimate of the commit timestamp that is usually lower than or equal
/// to any timestamp for a record that will be delivered in the future on the
/// stream. It is possible that, under particular circumstances that a future
/// record has a timestamp is is lower than a previously seen timestamp. For
/// an example usage see
/// <https://beam.apache.org/documentation/basics/#watermarks>
#[prost(message, optional, tag = "2")]
pub estimated_low_watermark: ::core::option::Option<::prost_types::Timestamp>,
}
/// A message indicating that the client should stop reading from the stream.
/// If status is OK and `continuation_tokens` & `new_partitions` are empty, the
/// stream has finished (for example if there was an `end_time` specified).
/// If `continuation_tokens` & `new_partitions` are present, then a change in
/// partitioning requires the client to open a new stream for each token to
/// resume reading. Example:
/// [B, D) ends
/// |
/// v
/// new_partitions: [A, C) [C, E)
/// continuation_tokens.partitions: [B,C) [C,D)
/// ^---^ ^---^
/// ^ ^
/// | |
/// | StreamContinuationToken 2
/// |
/// StreamContinuationToken 1
/// To read the new partition [A,C), supply the continuation tokens whose
/// ranges cover the new partition, for example ContinuationToken[A,B) &
/// ContinuationToken[B,C).
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CloseStream {
/// The status of the stream.
#[prost(message, optional, tag = "1")]
pub status: ::core::option::Option<super::super::super::rpc::Status>,
/// If non-empty, contains the information needed to resume reading their
/// associated partitions.
#[prost(message, repeated, tag = "2")]
pub continuation_tokens: ::prost::alloc::vec::Vec<
super::StreamContinuationToken,
>,
/// If non-empty, contains the new partitions to start reading from, which
/// are related to but not necessarily identical to the partitions for the
/// above `continuation_tokens`.
#[prost(message, repeated, tag = "3")]
pub new_partitions: ::prost::alloc::vec::Vec<super::StreamPartition>,
}
/// The data or control message on the stream.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum StreamRecord {
/// A mutation to the partition.
#[prost(message, tag = "1")]
DataChange(DataChange),
/// A periodic heartbeat message.
#[prost(message, tag = "2")]
Heartbeat(Heartbeat),
/// An indication that the stream should be closed.
#[prost(message, tag = "3")]
CloseStream(CloseStream),
}
}
/// Generated client implementations.
pub mod bigtable_client {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
@ -956,7 +1523,7 @@ pub mod bigtable_client {
/// Attempt to create a new client by connecting to a given endpoint.
pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
where
D: std::convert::TryInto<tonic::transport::Endpoint>,
D: TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
@ -1012,6 +1579,22 @@ pub mod bigtable_client {
self.inner = self.inner.accept_compressed(encoding);
self
}
/// Limits the maximum size of a decoded message.
///
/// Default: `4MB`
#[must_use]
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
self.inner = self.inner.max_decoding_message_size(limit);
self
}
/// Limits the maximum size of an encoded message.
///
/// Default: `usize::MAX`
#[must_use]
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
self.inner = self.inner.max_encoding_message_size(limit);
self
}
/// Streams back the contents of all requested rows in key order, optionally
/// applying the same Reader filter to each. Depending on their size,
/// rows and cells may be broken up across multiple responses, but
@ -1020,7 +1603,7 @@ pub mod bigtable_client {
pub async fn read_rows(
&mut self,
request: impl tonic::IntoRequest<super::ReadRowsRequest>,
) -> Result<
) -> std::result::Result<
tonic::Response<tonic::codec::Streaming<super::ReadRowsResponse>>,
tonic::Status,
> {
@ -1037,7 +1620,10 @@ pub mod bigtable_client {
let path = http::uri::PathAndQuery::from_static(
"/google.bigtable.v2.Bigtable/ReadRows",
);
self.inner.server_streaming(request.into_request(), path, codec).await
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("google.bigtable.v2.Bigtable", "ReadRows"));
self.inner.server_streaming(req, path, codec).await
}
/// Returns a sample of row keys in the table. The returned row keys will
/// delimit contiguous sections of the table of approximately equal size,
@ -1046,7 +1632,7 @@ pub mod bigtable_client {
pub async fn sample_row_keys(
&mut self,
request: impl tonic::IntoRequest<super::SampleRowKeysRequest>,
) -> Result<
) -> std::result::Result<
tonic::Response<tonic::codec::Streaming<super::SampleRowKeysResponse>>,
tonic::Status,
> {
@ -1063,14 +1649,20 @@ pub mod bigtable_client {
let path = http::uri::PathAndQuery::from_static(
"/google.bigtable.v2.Bigtable/SampleRowKeys",
);
self.inner.server_streaming(request.into_request(), path, codec).await
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("google.bigtable.v2.Bigtable", "SampleRowKeys"));
self.inner.server_streaming(req, path, codec).await
}
/// Mutates a row atomically. Cells already present in the row are left
/// unchanged unless explicitly changed by `mutation`.
pub async fn mutate_row(
&mut self,
request: impl tonic::IntoRequest<super::MutateRowRequest>,
) -> Result<tonic::Response<super::MutateRowResponse>, tonic::Status> {
) -> std::result::Result<
tonic::Response<super::MutateRowResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
@ -1084,7 +1676,10 @@ pub mod bigtable_client {
let path = http::uri::PathAndQuery::from_static(
"/google.bigtable.v2.Bigtable/MutateRow",
);
self.inner.unary(request.into_request(), path, codec).await
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("google.bigtable.v2.Bigtable", "MutateRow"));
self.inner.unary(req, path, codec).await
}
/// Mutates multiple rows in a batch. Each individual row is mutated
/// atomically as in MutateRow, but the entire batch is not executed
@ -1092,7 +1687,7 @@ pub mod bigtable_client {
pub async fn mutate_rows(
&mut self,
request: impl tonic::IntoRequest<super::MutateRowsRequest>,
) -> Result<
) -> std::result::Result<
tonic::Response<tonic::codec::Streaming<super::MutateRowsResponse>>,
tonic::Status,
> {
@ -1109,13 +1704,19 @@ pub mod bigtable_client {
let path = http::uri::PathAndQuery::from_static(
"/google.bigtable.v2.Bigtable/MutateRows",
);
self.inner.server_streaming(request.into_request(), path, codec).await
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("google.bigtable.v2.Bigtable", "MutateRows"));
self.inner.server_streaming(req, path, codec).await
}
/// Mutates a row atomically based on the output of a predicate Reader filter.
pub async fn check_and_mutate_row(
&mut self,
request: impl tonic::IntoRequest<super::CheckAndMutateRowRequest>,
) -> Result<tonic::Response<super::CheckAndMutateRowResponse>, tonic::Status> {
) -> std::result::Result<
tonic::Response<super::CheckAndMutateRowResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
@ -1129,7 +1730,39 @@ pub mod bigtable_client {
let path = http::uri::PathAndQuery::from_static(
"/google.bigtable.v2.Bigtable/CheckAndMutateRow",
);
self.inner.unary(request.into_request(), path, codec).await
let mut req = request.into_request();
req.extensions_mut()
.insert(
GrpcMethod::new("google.bigtable.v2.Bigtable", "CheckAndMutateRow"),
);
self.inner.unary(req, path, codec).await
}
/// Warm up associated instance metadata for this connection.
/// This call is not required but may be useful for connection keep-alive.
pub async fn ping_and_warm(
&mut self,
request: impl tonic::IntoRequest<super::PingAndWarmRequest>,
) -> std::result::Result<
tonic::Response<super::PingAndWarmResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/google.bigtable.v2.Bigtable/PingAndWarm",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("google.bigtable.v2.Bigtable", "PingAndWarm"));
self.inner.unary(req, path, codec).await
}
/// Modifies a row atomically on the server. The method reads the latest
/// existing timestamp and value from the specified columns and writes a new
@ -1139,7 +1772,10 @@ pub mod bigtable_client {
pub async fn read_modify_write_row(
&mut self,
request: impl tonic::IntoRequest<super::ReadModifyWriteRowRequest>,
) -> Result<tonic::Response<super::ReadModifyWriteRowResponse>, tonic::Status> {
) -> std::result::Result<
tonic::Response<super::ReadModifyWriteRowResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
@ -1153,7 +1789,85 @@ pub mod bigtable_client {
let path = http::uri::PathAndQuery::from_static(
"/google.bigtable.v2.Bigtable/ReadModifyWriteRow",
);
self.inner.unary(request.into_request(), path, codec).await
let mut req = request.into_request();
req.extensions_mut()
.insert(
GrpcMethod::new("google.bigtable.v2.Bigtable", "ReadModifyWriteRow"),
);
self.inner.unary(req, path, codec).await
}
/// NOTE: This API is intended to be used by Apache Beam BigtableIO.
/// Returns the current list of partitions that make up the table's
/// change stream. The union of partitions will cover the entire keyspace.
/// Partitions can be read with `ReadChangeStream`.
pub async fn generate_initial_change_stream_partitions(
&mut self,
request: impl tonic::IntoRequest<
super::GenerateInitialChangeStreamPartitionsRequest,
>,
) -> std::result::Result<
tonic::Response<
tonic::codec::Streaming<
super::GenerateInitialChangeStreamPartitionsResponse,
>,
>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/google.bigtable.v2.Bigtable/GenerateInitialChangeStreamPartitions",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(
GrpcMethod::new(
"google.bigtable.v2.Bigtable",
"GenerateInitialChangeStreamPartitions",
),
);
self.inner.server_streaming(req, path, codec).await
}
/// NOTE: This API is intended to be used by Apache Beam BigtableIO.
/// Reads changes from a table's change stream. Changes will
/// reflect both user-initiated mutations and mutations that are caused by
/// garbage collection.
pub async fn read_change_stream(
&mut self,
request: impl tonic::IntoRequest<super::ReadChangeStreamRequest>,
) -> std::result::Result<
tonic::Response<
tonic::codec::Streaming<super::ReadChangeStreamResponse>,
>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/google.bigtable.v2.Bigtable/ReadChangeStream",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(
GrpcMethod::new("google.bigtable.v2.Bigtable", "ReadChangeStream"),
);
self.inner.server_streaming(req, path, codec).await
}
}
}

View File

@ -8,12 +8,14 @@
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Status {
/// The status code, which should be an enum value of \[google.rpc.Code][google.rpc.Code\].
/// The status code, which should be an enum value of
/// \[google.rpc.Code][google.rpc.Code\].
#[prost(int32, tag = "1")]
pub code: i32,
/// A developer-facing error message, which should be in English. Any
/// user-facing error message should be localized and sent in the
/// \[google.rpc.Status.details][google.rpc.Status.details\] field, or localized by the client.
/// \[google.rpc.Status.details][google.rpc.Status.details\] field, or localized
/// by the client.
#[prost(string, tag = "2")]
pub message: ::prost::alloc::string::String,
/// A list of messages that carry the error details. There is a common set of

View File

@ -121,6 +121,7 @@ pub struct BigTableConnection {
table_prefix: String,
app_profile_id: String,
timeout: Option<Duration>,
max_message_size: usize,
}
impl BigTableConnection {
@ -141,11 +142,18 @@ impl BigTableConnection {
read_only: bool,
timeout: Option<Duration>,
credential_type: CredentialType,
max_message_size: usize,
) -> Result<Self> {
match std::env::var("BIGTABLE_EMULATOR_HOST") {
Ok(endpoint) => {
info!("Connecting to bigtable emulator at {}", endpoint);
Self::new_for_emulator(instance_name, app_profile_id, &endpoint, timeout)
Self::new_for_emulator(
instance_name,
app_profile_id,
&endpoint,
timeout,
max_message_size,
)
}
Err(_) => {
@ -210,6 +218,7 @@ impl BigTableConnection {
table_prefix,
app_profile_id: app_profile_id.to_string(),
timeout,
max_message_size,
})
}
}
@ -220,6 +229,7 @@ impl BigTableConnection {
app_profile_id: &str,
endpoint: &str,
timeout: Option<Duration>,
max_message_size: usize,
) -> Result<Self> {
Ok(Self {
access_token: None,
@ -229,6 +239,7 @@ impl BigTableConnection {
table_prefix: format!("projects/emulator/instances/{instance_name}/tables/"),
app_profile_id: app_profile_id.to_string(),
timeout,
max_message_size,
})
}
@ -254,7 +265,9 @@ impl BigTableConnection {
}
Ok(req)
},
);
)
.max_decoding_message_size(self.max_message_size)
.max_encoding_message_size(self.max_message_size);
BigTable {
access_token: self.access_token.clone(),
client,
@ -469,6 +482,8 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
],
})),
}),
request_stats_view: 0,
reversed: false,
})
.await?
.into_inner();
@ -494,6 +509,8 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
filter: Some(RowFilter {
filter: Some(row_filter::Filter::StripValueTransformer(true)),
}),
request_stats_view: 0,
reversed: false,
})
.await?
.into_inner();
@ -545,6 +562,8 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
// Only return the latest version of each cell
filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)),
}),
request_stats_view: 0,
reversed: false,
})
.await?
.into_inner();
@ -577,6 +596,8 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
// Only return the latest version of each cell
filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)),
}),
request_stats_view: 0,
reversed: false,
})
.await?
.into_inner();
@ -610,6 +631,8 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
// Only return the latest version of each cell
filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)),
}),
request_stats_view: 0,
reversed: false,
})
.await?
.into_inner();

View File

@ -381,6 +381,7 @@ impl From<LegacyTransactionByAddrInfo> for TransactionByAddrInfo {
pub const DEFAULT_INSTANCE_NAME: &str = "solana-ledger";
pub const DEFAULT_APP_PROFILE_ID: &str = "default";
pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; // 64MB
#[derive(Debug)]
pub enum CredentialType {
@ -395,6 +396,7 @@ pub struct LedgerStorageConfig {
pub credential_type: CredentialType,
pub instance_name: String,
pub app_profile_id: String,
pub max_message_size: usize,
}
impl Default for LedgerStorageConfig {
@ -405,6 +407,7 @@ impl Default for LedgerStorageConfig {
credential_type: CredentialType::Filepath(None),
instance_name: DEFAULT_INSTANCE_NAME.to_string(),
app_profile_id: DEFAULT_APP_PROFILE_ID.to_string(),
max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
}
}
}
@ -471,6 +474,7 @@ impl LedgerStorage {
app_profile_id,
endpoint,
timeout,
LedgerStorageConfig::default().max_message_size,
)?,
stats,
})
@ -484,6 +488,7 @@ impl LedgerStorage {
instance_name,
app_profile_id,
credential_type,
max_message_size,
} = config;
let connection = bigtable::BigTableConnection::new(
instance_name.as_str(),
@ -491,6 +496,7 @@ impl LedgerStorage {
read_only,
timeout,
credential_type,
max_message_size,
)
.await?;
Ok(Self { stats, connection })

View File

@ -430,6 +430,7 @@ fn main() {
String
),
timeout: None,
..RpcBigtableConfig::default()
})
} else {
None

View File

@ -878,6 +878,15 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
.default_value(&default_args.rpc_bigtable_app_profile_id)
.help("Bigtable application profile id to use in requests")
)
.arg(
Arg::with_name("rpc_bigtable_max_message_size")
.long("rpc-bigtable-max-message-size")
.value_name("BYTES")
.validator(is_parsable::<usize>)
.takes_value(true)
.default_value(&default_args.rpc_bigtable_max_message_size)
.help("Max encoding and decoding message size used in Bigtable Grpc client"),
)
.arg(
Arg::with_name("rpc_pubsub_worker_threads")
.long("rpc-pubsub-worker-threads")
@ -1925,6 +1934,7 @@ pub struct DefaultArgs {
pub rpc_bigtable_timeout: String,
pub rpc_bigtable_instance_name: String,
pub rpc_bigtable_app_profile_id: String,
pub rpc_bigtable_max_message_size: String,
pub rpc_max_request_body_size: String,
pub rpc_pubsub_worker_threads: String,
@ -2010,6 +2020,8 @@ impl DefaultArgs {
rpc_bigtable_instance_name: solana_storage_bigtable::DEFAULT_INSTANCE_NAME.to_string(),
rpc_bigtable_app_profile_id: solana_storage_bigtable::DEFAULT_APP_PROFILE_ID
.to_string(),
rpc_bigtable_max_message_size: solana_storage_bigtable::DEFAULT_MAX_MESSAGE_SIZE
.to_string(),
rpc_pubsub_worker_threads: "4".to_string(),
accountsdb_repl_threads: num_cpus::get().to_string(),
maximum_full_snapshot_archives_to_retain: DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN

View File

@ -1230,6 +1230,7 @@ pub fn main() {
timeout: value_t!(matches, "rpc_bigtable_timeout", u64)
.ok()
.map(Duration::from_secs),
max_message_size: value_t_or_exit!(matches, "rpc_bigtable_max_message_size", usize),
})
} else {
None