Add bigtable
This commit is contained in:
parent
243e05d59f
commit
6e0353965a
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "solana-storage-bigtable"
|
name = "solana-storage-bigtable"
|
||||||
version = "1.1.22"
|
version = "1.4.0"
|
||||||
description = "Solana Storage BigTable"
|
description = "Solana Storage BigTable"
|
||||||
authors = ["Solana Maintainers <maintainers@solana.com>"]
|
authors = ["Solana Maintainers <maintainers@solana.com>"]
|
||||||
repository = "https://github.com/solana-labs/solana"
|
repository = "https://github.com/solana-labs/solana"
|
||||||
|
@ -9,18 +9,23 @@ homepage = "https://solana.com/"
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
backoff = {version="0.2.1", features = ["tokio"]}
|
||||||
bincode = "1.2.1"
|
bincode = "1.2.1"
|
||||||
|
bzip2 = "0.3.3"
|
||||||
|
enum-iterator = "0.6.0"
|
||||||
|
flate2 = "1.0.14"
|
||||||
goauth = "0.7.2"
|
goauth = "0.7.2"
|
||||||
log = "0.4.8"
|
log = "0.4.8"
|
||||||
smpl_jwt = "0.5.0"
|
|
||||||
tonic = {version="0.3.0", features = ["tls", "transport"]}
|
|
||||||
prost = "0.6.1"
|
prost = "0.6.1"
|
||||||
prost-types = "0.6.1"
|
prost-types = "0.6.1"
|
||||||
enum-iterator = "0.6.0"
|
|
||||||
bzip2 = "0.3.3"
|
|
||||||
flate2 = "1.0.14"
|
|
||||||
serde = "1.0.112"
|
serde = "1.0.112"
|
||||||
serde_derive = "1.0.103"
|
serde_derive = "1.0.103"
|
||||||
|
smpl_jwt = "0.5.0"
|
||||||
|
solana-sdk = { path = "../sdk", version = "1.1.20" }
|
||||||
|
solana-transaction-status = { path = "../transaction-status", version = "1.1.20" }
|
||||||
|
thiserror = "1.0"
|
||||||
|
futures = "0.3.5"
|
||||||
|
tonic = {version="0.3.0", features = ["tls", "transport"]}
|
||||||
zstd = "0.5.1"
|
zstd = "0.5.1"
|
||||||
|
|
||||||
[lib]
|
[lib]
|
||||||
|
|
|
@ -0,0 +1,471 @@
|
||||||
|
// Primitives for reading/writing BigTable tables
|
||||||
|
|
||||||
|
use crate::access_token::{AccessToken, Scope};
|
||||||
|
use crate::compression::{compress_best, decompress};
|
||||||
|
use crate::root_ca_certificate;
|
||||||
|
use log::*;
|
||||||
|
use std::sync::{Arc, RwLock};
|
||||||
|
use thiserror::Error;
|
||||||
|
use tonic::{metadata::MetadataValue, transport::ClientTlsConfig, Request};
|
||||||
|
|
||||||
|
mod google {
|
||||||
|
mod rpc {
|
||||||
|
include!(concat!(
|
||||||
|
env!("CARGO_MANIFEST_DIR"),
|
||||||
|
concat!("/proto/google.rpc.rs")
|
||||||
|
));
|
||||||
|
}
|
||||||
|
pub mod bigtable {
|
||||||
|
pub mod v2 {
|
||||||
|
include!(concat!(
|
||||||
|
env!("CARGO_MANIFEST_DIR"),
|
||||||
|
concat!("/proto/google.bigtable.v2.rs")
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
use google::bigtable::v2::*;
|
||||||
|
|
||||||
|
pub type RowKey = String;
|
||||||
|
pub type CellName = String;
|
||||||
|
pub type CellValue = Vec<u8>;
|
||||||
|
pub type RowData = Vec<(CellName, CellValue)>;
|
||||||
|
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum Error {
|
||||||
|
#[error("AccessToken error: {0}")]
|
||||||
|
AccessTokenError(String),
|
||||||
|
|
||||||
|
#[error("Certificate error: {0}")]
|
||||||
|
CertificateError(String),
|
||||||
|
|
||||||
|
#[error("I/O Error: {0}")]
|
||||||
|
IoError(std::io::Error),
|
||||||
|
|
||||||
|
#[error("Transport error: {0}")]
|
||||||
|
TransportError(tonic::transport::Error),
|
||||||
|
|
||||||
|
#[error("Invalid URI {0}: {1}")]
|
||||||
|
InvalidUri(String, String),
|
||||||
|
|
||||||
|
#[error("Row not found")]
|
||||||
|
RowNotFound,
|
||||||
|
|
||||||
|
#[error("Row write failed")]
|
||||||
|
RowWriteFailed,
|
||||||
|
|
||||||
|
#[error("Object not found: {0}")]
|
||||||
|
ObjectNotFound(String),
|
||||||
|
|
||||||
|
#[error("Object is corrupt: {0}")]
|
||||||
|
ObjectCorrupt(String),
|
||||||
|
|
||||||
|
#[error("RPC error: {0}")]
|
||||||
|
RpcError(tonic::Status),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::convert::From<std::io::Error> for Error {
|
||||||
|
fn from(err: std::io::Error) -> Self {
|
||||||
|
Self::IoError(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::convert::From<tonic::transport::Error> for Error {
|
||||||
|
fn from(err: tonic::transport::Error) -> Self {
|
||||||
|
Self::TransportError(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::convert::From<tonic::Status> for Error {
|
||||||
|
fn from(err: tonic::Status) -> Self {
|
||||||
|
Self::RpcError(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct BigTableConnection {
|
||||||
|
access_token: Option<Arc<RwLock<AccessToken>>>,
|
||||||
|
channel: tonic::transport::Channel,
|
||||||
|
table_prefix: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BigTableConnection {
|
||||||
|
/// Establish a connection to the BigTable instance named `instance_name`. If read-only access
|
||||||
|
/// is required, the `read_only` flag should be used to reduce the requested OAuth2 scope.
|
||||||
|
///
|
||||||
|
/// The GOOGLE_APPLICATION_CREDENTIALS environment variable will be used to determine the
|
||||||
|
/// program name that contains the BigTable instance in addition to access credentials.
|
||||||
|
///
|
||||||
|
/// The BIGTABLE_EMULATOR_HOST environment variable is also respected.
|
||||||
|
///
|
||||||
|
pub async fn new(instance_name: &str, read_only: bool) -> Result<Self> {
|
||||||
|
match std::env::var("BIGTABLE_EMULATOR_HOST") {
|
||||||
|
Ok(endpoint) => {
|
||||||
|
info!("Connecting to bigtable emulator at {}", endpoint);
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
access_token: None,
|
||||||
|
channel: tonic::transport::Channel::from_shared(format!("http://{}", endpoint))
|
||||||
|
.map_err(|err| Error::InvalidUri(endpoint, err.to_string()))?
|
||||||
|
.connect_lazy()?,
|
||||||
|
table_prefix: format!("projects/emulator/instances/{}/tables/", instance_name),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(_) => {
|
||||||
|
let mut access_token = AccessToken::new(if read_only {
|
||||||
|
&Scope::BigTableDataReadOnly
|
||||||
|
} else {
|
||||||
|
&Scope::BigTableData
|
||||||
|
})
|
||||||
|
.map_err(Error::AccessTokenError)?;
|
||||||
|
|
||||||
|
access_token.refresh().await;
|
||||||
|
let table_prefix = format!(
|
||||||
|
"projects/{}/instances/{}/tables/",
|
||||||
|
access_token.project(),
|
||||||
|
instance_name
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
access_token: Some(Arc::new(RwLock::new(access_token))),
|
||||||
|
channel: tonic::transport::Channel::from_static(
|
||||||
|
"https://bigtable.googleapis.com",
|
||||||
|
)
|
||||||
|
.tls_config(
|
||||||
|
ClientTlsConfig::new()
|
||||||
|
.ca_certificate(
|
||||||
|
root_ca_certificate::load().map_err(Error::CertificateError)?,
|
||||||
|
)
|
||||||
|
.domain_name("bigtable.googleapis.com"),
|
||||||
|
)?
|
||||||
|
.connect_lazy()?,
|
||||||
|
table_prefix,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a new BigTable client.
|
||||||
|
///
|
||||||
|
/// Clients require `&mut self`, due to `Tonic::transport::Channel` limitations, however
|
||||||
|
/// creating new clients is cheap and thus can be used as a work around for ease of use.
|
||||||
|
pub fn client(&self) -> BigTable {
|
||||||
|
let client = {
|
||||||
|
if let Some(ref access_token) = self.access_token {
|
||||||
|
let access_token = access_token.clone();
|
||||||
|
bigtable_client::BigtableClient::with_interceptor(
|
||||||
|
self.channel.clone(),
|
||||||
|
move |mut req: Request<()>| {
|
||||||
|
match access_token.read().unwrap().get() {
|
||||||
|
Ok(access_token) => match MetadataValue::from_str(&access_token) {
|
||||||
|
Ok(authorization_header) => {
|
||||||
|
req.metadata_mut()
|
||||||
|
.insert("authorization", authorization_header);
|
||||||
|
}
|
||||||
|
Err(err) => warn!("Failed to set authorization header: {}", err),
|
||||||
|
},
|
||||||
|
Err(err) => warn!("{}", err),
|
||||||
|
}
|
||||||
|
Ok(req)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
bigtable_client::BigtableClient::new(self.channel.clone())
|
||||||
|
}
|
||||||
|
};
|
||||||
|
BigTable {
|
||||||
|
access_token: self.access_token.clone(),
|
||||||
|
client,
|
||||||
|
table_prefix: self.table_prefix.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn put_bincode_cells_with_retry<T>(
|
||||||
|
&self,
|
||||||
|
table: &str,
|
||||||
|
cells: &[(RowKey, T)],
|
||||||
|
) -> Result<usize>
|
||||||
|
where
|
||||||
|
T: serde::ser::Serialize,
|
||||||
|
{
|
||||||
|
use backoff::{future::FutureOperation as _, ExponentialBackoff};
|
||||||
|
(|| async {
|
||||||
|
let mut client = self.client();
|
||||||
|
Ok(client.put_bincode_cells(table, cells).await?)
|
||||||
|
})
|
||||||
|
.retry(ExponentialBackoff::default())
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct BigTable {
|
||||||
|
access_token: Option<Arc<RwLock<AccessToken>>>,
|
||||||
|
client: bigtable_client::BigtableClient<tonic::transport::Channel>,
|
||||||
|
table_prefix: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BigTable {
|
||||||
|
async fn decode_read_rows_response(
|
||||||
|
mut rrr: tonic::codec::Streaming<ReadRowsResponse>,
|
||||||
|
) -> Result<Vec<(RowKey, RowData)>> {
|
||||||
|
let mut rows: Vec<(RowKey, RowData)> = vec![];
|
||||||
|
|
||||||
|
let mut row_key = None;
|
||||||
|
let mut row_data = vec![];
|
||||||
|
|
||||||
|
let mut cell_name = None;
|
||||||
|
let mut cell_timestamp = 0;
|
||||||
|
let mut cell_value = vec![];
|
||||||
|
let mut cell_version_ok = true;
|
||||||
|
while let Some(res) = rrr.message().await? {
|
||||||
|
for (i, mut chunk) in res.chunks.into_iter().enumerate() {
|
||||||
|
// The comments for `read_rows_response::CellChunk` provide essential details for
|
||||||
|
// understanding how the below decoding works...
|
||||||
|
trace!("chunk {}: {:?}", i, chunk);
|
||||||
|
|
||||||
|
// Starting a new row?
|
||||||
|
if !chunk.row_key.is_empty() {
|
||||||
|
row_key = String::from_utf8(chunk.row_key).ok(); // Require UTF-8 for row keys
|
||||||
|
}
|
||||||
|
|
||||||
|
// Starting a new cell?
|
||||||
|
if let Some(qualifier) = chunk.qualifier {
|
||||||
|
if let Some(cell_name) = cell_name {
|
||||||
|
row_data.push((cell_name, cell_value));
|
||||||
|
cell_value = vec![];
|
||||||
|
}
|
||||||
|
cell_name = String::from_utf8(qualifier).ok(); // Require UTF-8 for cell names
|
||||||
|
cell_timestamp = chunk.timestamp_micros;
|
||||||
|
cell_version_ok = true;
|
||||||
|
} else {
|
||||||
|
// Continuing the existing cell. Check if this is the start of another version of the cell
|
||||||
|
if chunk.timestamp_micros != 0 {
|
||||||
|
if chunk.timestamp_micros < cell_timestamp {
|
||||||
|
cell_version_ok = false; // ignore older versions of the cell
|
||||||
|
} else {
|
||||||
|
// newer version of the cell, remove the older cell
|
||||||
|
cell_version_ok = true;
|
||||||
|
cell_value = vec![];
|
||||||
|
cell_timestamp = chunk.timestamp_micros;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if cell_version_ok {
|
||||||
|
cell_value.append(&mut chunk.value);
|
||||||
|
}
|
||||||
|
|
||||||
|
// End of a row?
|
||||||
|
if chunk.row_status.is_some() {
|
||||||
|
if let Some(read_rows_response::cell_chunk::RowStatus::CommitRow(_)) =
|
||||||
|
chunk.row_status
|
||||||
|
{
|
||||||
|
if let Some(cell_name) = cell_name {
|
||||||
|
row_data.push((cell_name, cell_value));
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(row_key) = row_key {
|
||||||
|
rows.push((row_key, row_data))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
row_key = None;
|
||||||
|
row_data = vec![];
|
||||||
|
cell_value = vec![];
|
||||||
|
cell_name = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(rows)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn refresh_access_token(&self) {
|
||||||
|
if let Some(ref access_token) = self.access_token {
|
||||||
|
access_token.write().unwrap().refresh().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get `table` row keys in lexical order.
|
||||||
|
///
|
||||||
|
/// If `start_at` is provided, the row key listing will start with key.
|
||||||
|
/// Otherwise the listing will start from the start of the table.
|
||||||
|
pub async fn get_row_keys(
|
||||||
|
&mut self,
|
||||||
|
table_name: &str,
|
||||||
|
start_at: Option<RowKey>,
|
||||||
|
rows_limit: i64,
|
||||||
|
) -> Result<Vec<RowKey>> {
|
||||||
|
self.refresh_access_token().await;
|
||||||
|
|
||||||
|
let response = self
|
||||||
|
.client
|
||||||
|
.read_rows(ReadRowsRequest {
|
||||||
|
table_name: format!("{}{}", self.table_prefix, table_name),
|
||||||
|
rows_limit,
|
||||||
|
rows: Some(RowSet {
|
||||||
|
row_keys: vec![],
|
||||||
|
row_ranges: if let Some(row_key) = start_at {
|
||||||
|
vec![RowRange {
|
||||||
|
start_key: Some(row_range::StartKey::StartKeyClosed(
|
||||||
|
row_key.into_bytes(),
|
||||||
|
)),
|
||||||
|
end_key: None,
|
||||||
|
}]
|
||||||
|
} else {
|
||||||
|
vec![]
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
filter: Some(RowFilter {
|
||||||
|
filter: Some(row_filter::Filter::Chain(row_filter::Chain {
|
||||||
|
filters: vec![
|
||||||
|
RowFilter {
|
||||||
|
// Return minimal number of cells
|
||||||
|
filter: Some(row_filter::Filter::CellsPerRowLimitFilter(1)),
|
||||||
|
},
|
||||||
|
RowFilter {
|
||||||
|
// Only return the latest version of each cell
|
||||||
|
filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)),
|
||||||
|
},
|
||||||
|
RowFilter {
|
||||||
|
// Strip the cell values
|
||||||
|
filter: Some(row_filter::Filter::StripValueTransformer(true)),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
})),
|
||||||
|
}),
|
||||||
|
..ReadRowsRequest::default()
|
||||||
|
})
|
||||||
|
.await?
|
||||||
|
.into_inner();
|
||||||
|
|
||||||
|
let rows = Self::decode_read_rows_response(response).await?;
|
||||||
|
Ok(rows.into_iter().map(|r| r.0).collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get latest data from `limit` rows of `table`, starting inclusively at the `row_key` row.
|
||||||
|
///
|
||||||
|
/// All column families are accepted, and only the latest version of each column cell will be
|
||||||
|
/// returned.
|
||||||
|
pub async fn get_row_data(&mut self, table_name: &str, row_key: RowKey) -> Result<RowData> {
|
||||||
|
self.refresh_access_token().await;
|
||||||
|
|
||||||
|
let response = self
|
||||||
|
.client
|
||||||
|
.read_rows(ReadRowsRequest {
|
||||||
|
table_name: format!("{}{}", self.table_prefix, table_name),
|
||||||
|
rows_limit: 1,
|
||||||
|
rows: Some(RowSet {
|
||||||
|
row_keys: vec![row_key.into_bytes()],
|
||||||
|
row_ranges: vec![],
|
||||||
|
}),
|
||||||
|
filter: Some(RowFilter {
|
||||||
|
// Only return the latest version of each cell
|
||||||
|
filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)),
|
||||||
|
}),
|
||||||
|
..ReadRowsRequest::default()
|
||||||
|
})
|
||||||
|
.await?
|
||||||
|
.into_inner();
|
||||||
|
|
||||||
|
let rows = Self::decode_read_rows_response(response).await?;
|
||||||
|
rows.into_iter()
|
||||||
|
.next()
|
||||||
|
.map(|r| r.1)
|
||||||
|
.ok_or_else(|| Error::RowNotFound)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Store data for one or more `table` rows in the `family_name` Column family
|
||||||
|
async fn put_row_data(
|
||||||
|
&mut self,
|
||||||
|
table_name: &str,
|
||||||
|
family_name: &str,
|
||||||
|
row_data: &[(&RowKey, RowData)],
|
||||||
|
) -> Result<()> {
|
||||||
|
self.refresh_access_token().await;
|
||||||
|
|
||||||
|
let mut entries = vec![];
|
||||||
|
for (row_key, row_data) in row_data {
|
||||||
|
let mutations = row_data
|
||||||
|
.iter()
|
||||||
|
.map(|(column_key, column_value)| Mutation {
|
||||||
|
mutation: Some(mutation::Mutation::SetCell(mutation::SetCell {
|
||||||
|
family_name: family_name.to_string(),
|
||||||
|
column_qualifier: column_key.clone().into_bytes(),
|
||||||
|
timestamp_micros: -1, // server assigned
|
||||||
|
value: column_value.to_vec(),
|
||||||
|
})),
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
entries.push(mutate_rows_request::Entry {
|
||||||
|
row_key: (*row_key).clone().into_bytes(),
|
||||||
|
mutations,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut response = self
|
||||||
|
.client
|
||||||
|
.mutate_rows(MutateRowsRequest {
|
||||||
|
table_name: format!("{}{}", self.table_prefix, table_name),
|
||||||
|
entries,
|
||||||
|
..MutateRowsRequest::default()
|
||||||
|
})
|
||||||
|
.await?
|
||||||
|
.into_inner();
|
||||||
|
|
||||||
|
while let Some(res) = response.message().await? {
|
||||||
|
for entry in res.entries {
|
||||||
|
if let Some(status) = entry.status {
|
||||||
|
if status.code != 0 {
|
||||||
|
eprintln!("put_row_data error {}: {}", status.code, status.message);
|
||||||
|
warn!("put_row_data error {}: {}", status.code, status.message);
|
||||||
|
return Err(Error::RowWriteFailed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_bincode_cell<T>(&mut self, table: &str, key: RowKey) -> Result<T>
|
||||||
|
where
|
||||||
|
T: serde::de::DeserializeOwned,
|
||||||
|
{
|
||||||
|
let row_data = self.get_row_data(table, key.clone()).await?;
|
||||||
|
|
||||||
|
let value = row_data
|
||||||
|
.into_iter()
|
||||||
|
.find(|(name, _)| name == "bin")
|
||||||
|
.ok_or_else(|| Error::ObjectNotFound(format!("{}/{}", table, key)))?
|
||||||
|
.1;
|
||||||
|
|
||||||
|
let data = decompress(&value)?;
|
||||||
|
bincode::deserialize(&data).map_err(|err| {
|
||||||
|
warn!("Failed to deserialize {}/{}: {}", table, key, err);
|
||||||
|
Error::ObjectCorrupt(format!("{}/{}", table, key))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn put_bincode_cells<T>(
|
||||||
|
&mut self,
|
||||||
|
table: &str,
|
||||||
|
cells: &[(RowKey, T)],
|
||||||
|
) -> Result<usize>
|
||||||
|
where
|
||||||
|
T: serde::ser::Serialize,
|
||||||
|
{
|
||||||
|
let mut bytes_written = 0;
|
||||||
|
let mut new_row_data = vec![];
|
||||||
|
for (row_key, data) in cells {
|
||||||
|
let data = compress_best(&bincode::serialize(&data).unwrap())?;
|
||||||
|
bytes_written += data.len();
|
||||||
|
new_row_data.push((row_key, vec![("bin".to_string(), data)]));
|
||||||
|
}
|
||||||
|
|
||||||
|
self.put_row_data(table, "x", &new_row_data).await?;
|
||||||
|
Ok(bytes_written)
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,6 +1,536 @@
|
||||||
mod access_token;
|
use log::*;
|
||||||
mod compression;
|
use serde::{Deserialize, Serialize};
|
||||||
mod root_ca_certificate;
|
use solana_sdk::{
|
||||||
|
clock::{Slot, UnixTimestamp},
|
||||||
|
pubkey::Pubkey,
|
||||||
|
signature::Signature,
|
||||||
|
sysvar::is_sysvar_id,
|
||||||
|
transaction::{Transaction, TransactionError},
|
||||||
|
};
|
||||||
|
use solana_transaction_status::{
|
||||||
|
ConfirmedBlock, ConfirmedTransaction, EncodedTransaction, Rewards, TransactionStatus,
|
||||||
|
TransactionWithStatusMeta, UiTransactionEncoding, UiTransactionStatusMeta,
|
||||||
|
};
|
||||||
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
convert::{TryFrom, TryInto},
|
||||||
|
};
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate serde_derive;
|
extern crate serde_derive;
|
||||||
|
|
||||||
|
mod access_token;
|
||||||
|
mod bigtable;
|
||||||
|
mod compression;
|
||||||
|
mod root_ca_certificate;
|
||||||
|
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum Error {
|
||||||
|
#[error("BigTable: {0}")]
|
||||||
|
BigTableError(bigtable::Error),
|
||||||
|
|
||||||
|
#[error("I/O Error: {0}")]
|
||||||
|
IoError(std::io::Error),
|
||||||
|
|
||||||
|
#[error("Transaction encoded is not supported")]
|
||||||
|
UnsupportedTransactionEncoding,
|
||||||
|
|
||||||
|
#[error("Block not found: {0}")]
|
||||||
|
BlockNotFound(Slot),
|
||||||
|
|
||||||
|
#[error("Signature not found")]
|
||||||
|
SignatureNotFound,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::convert::From<bigtable::Error> for Error {
|
||||||
|
fn from(err: bigtable::Error) -> Self {
|
||||||
|
Self::BigTableError(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::convert::From<std::io::Error> for Error {
|
||||||
|
fn from(err: std::io::Error) -> Self {
|
||||||
|
Self::IoError(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
|
|
||||||
|
// Convert a slot to its bucket representation whereby lower slots are always lexically ordered
|
||||||
|
// before higher slots
|
||||||
|
fn slot_to_key(slot: Slot) -> String {
|
||||||
|
format!("{:016x}", slot)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reverse of `slot_to_key`
|
||||||
|
fn key_to_slot(key: &str) -> Option<Slot> {
|
||||||
|
match Slot::from_str_radix(key, 16) {
|
||||||
|
Ok(slot) => Some(slot),
|
||||||
|
Err(err) => {
|
||||||
|
// bucket data is probably corrupt
|
||||||
|
warn!("Failed to parse object key as a slot: {}: {}", key, err);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// A serialized `StoredConfirmedBlock` is stored in the `block` table
|
||||||
|
//
|
||||||
|
// StoredConfirmedBlock holds the same contents as ConfirmedBlock, but is slightly compressed and avoids
|
||||||
|
// some serde JSON directives that cause issues with bincode
|
||||||
|
//
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
struct StoredConfirmedBlock {
|
||||||
|
previous_blockhash: String,
|
||||||
|
blockhash: String,
|
||||||
|
parent_slot: Slot,
|
||||||
|
transactions: Vec<StoredConfirmedBlockTransaction>,
|
||||||
|
rewards: Rewards,
|
||||||
|
block_time: Option<UnixTimestamp>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StoredConfirmedBlock {
|
||||||
|
fn into_confirmed_block(self, encoding: UiTransactionEncoding) -> ConfirmedBlock {
|
||||||
|
let StoredConfirmedBlock {
|
||||||
|
previous_blockhash,
|
||||||
|
blockhash,
|
||||||
|
parent_slot,
|
||||||
|
transactions,
|
||||||
|
rewards,
|
||||||
|
block_time,
|
||||||
|
} = self;
|
||||||
|
|
||||||
|
ConfirmedBlock {
|
||||||
|
previous_blockhash,
|
||||||
|
blockhash,
|
||||||
|
parent_slot,
|
||||||
|
transactions: transactions
|
||||||
|
.into_iter()
|
||||||
|
.map(|transaction| transaction.into_transaction_with_status_meta(encoding))
|
||||||
|
.collect(),
|
||||||
|
rewards,
|
||||||
|
block_time,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TryFrom<ConfirmedBlock> for StoredConfirmedBlock {
|
||||||
|
type Error = Error;
|
||||||
|
|
||||||
|
fn try_from(confirmed_block: ConfirmedBlock) -> Result<Self> {
|
||||||
|
let ConfirmedBlock {
|
||||||
|
previous_blockhash,
|
||||||
|
blockhash,
|
||||||
|
parent_slot,
|
||||||
|
transactions,
|
||||||
|
rewards,
|
||||||
|
block_time,
|
||||||
|
} = confirmed_block;
|
||||||
|
|
||||||
|
let mut encoded_transactions = vec![];
|
||||||
|
for transaction in transactions.into_iter() {
|
||||||
|
encoded_transactions.push(transaction.try_into()?);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
previous_blockhash,
|
||||||
|
blockhash,
|
||||||
|
parent_slot,
|
||||||
|
transactions: encoded_transactions,
|
||||||
|
rewards,
|
||||||
|
block_time,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
struct StoredConfirmedBlockTransaction {
|
||||||
|
transaction: Transaction,
|
||||||
|
meta: Option<StoredConfirmedBlockTransactionStatusMeta>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StoredConfirmedBlockTransaction {
|
||||||
|
fn into_transaction_with_status_meta(
|
||||||
|
self,
|
||||||
|
encoding: UiTransactionEncoding,
|
||||||
|
) -> TransactionWithStatusMeta {
|
||||||
|
let StoredConfirmedBlockTransaction { transaction, meta } = self;
|
||||||
|
TransactionWithStatusMeta {
|
||||||
|
transaction: EncodedTransaction::encode(transaction, encoding),
|
||||||
|
meta: meta.map(|meta| meta.into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TryFrom<TransactionWithStatusMeta> for StoredConfirmedBlockTransaction {
|
||||||
|
type Error = Error;
|
||||||
|
|
||||||
|
fn try_from(value: TransactionWithStatusMeta) -> Result<Self> {
|
||||||
|
let TransactionWithStatusMeta { transaction, meta } = value;
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
transaction: transaction
|
||||||
|
.decode()
|
||||||
|
.ok_or(Error::UnsupportedTransactionEncoding)?,
|
||||||
|
meta: meta.map(|meta| meta.into()),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
struct StoredConfirmedBlockTransactionStatusMeta {
|
||||||
|
err: Option<TransactionError>,
|
||||||
|
fee: u64,
|
||||||
|
pre_balances: Vec<u64>,
|
||||||
|
post_balances: Vec<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<StoredConfirmedBlockTransactionStatusMeta> for UiTransactionStatusMeta {
|
||||||
|
fn from(value: StoredConfirmedBlockTransactionStatusMeta) -> Self {
|
||||||
|
let StoredConfirmedBlockTransactionStatusMeta {
|
||||||
|
err,
|
||||||
|
fee,
|
||||||
|
pre_balances,
|
||||||
|
post_balances,
|
||||||
|
} = value;
|
||||||
|
let status = match &err {
|
||||||
|
None => Ok(()),
|
||||||
|
Some(err) => Err(err.clone()),
|
||||||
|
};
|
||||||
|
Self {
|
||||||
|
err,
|
||||||
|
status,
|
||||||
|
fee,
|
||||||
|
pre_balances,
|
||||||
|
post_balances,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<UiTransactionStatusMeta> for StoredConfirmedBlockTransactionStatusMeta {
|
||||||
|
fn from(value: UiTransactionStatusMeta) -> Self {
|
||||||
|
let UiTransactionStatusMeta {
|
||||||
|
err,
|
||||||
|
fee,
|
||||||
|
pre_balances,
|
||||||
|
post_balances,
|
||||||
|
..
|
||||||
|
} = value;
|
||||||
|
Self {
|
||||||
|
err,
|
||||||
|
fee,
|
||||||
|
pre_balances,
|
||||||
|
post_balances,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// A serialized `TransactionInfo` is stored in the `tx` table
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
struct TransactionInfo {
|
||||||
|
slot: Slot, // The slot that contains the block with this transaction in it
|
||||||
|
index: u32, // Where the transaction is located in the block
|
||||||
|
err: Option<TransactionError>, // None if the transaction executed successfully
|
||||||
|
memo: Option<String>, // Transaction memo
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<TransactionInfo> for TransactionStatus {
|
||||||
|
fn from(transaction_info: TransactionInfo) -> Self {
|
||||||
|
let TransactionInfo { slot, err, .. } = transaction_info;
|
||||||
|
let status = match &err {
|
||||||
|
None => Ok(()),
|
||||||
|
Some(err) => Err(err.clone()),
|
||||||
|
};
|
||||||
|
Self {
|
||||||
|
slot,
|
||||||
|
confirmations: None,
|
||||||
|
status,
|
||||||
|
err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// A serialized `Vec<TransactionByAddrInfo>` is stored in the `tx-by-addr` table. The row keys are
|
||||||
|
// the one's compliment of the slot so that rows may be listed in reverse order
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
struct TransactionByAddrInfo {
|
||||||
|
signature: Signature, // The transaction signature
|
||||||
|
err: Option<TransactionError>, // None if the transaction executed successfully
|
||||||
|
index: u32, // Where the transaction is located in the block
|
||||||
|
memo: Option<String>, // Transaction memo
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct LedgerStorage {
|
||||||
|
connection: bigtable::BigTableConnection,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LedgerStorage {
|
||||||
|
pub async fn new(read_only: bool) -> Result<Self> {
|
||||||
|
let connection = bigtable::BigTableConnection::new("solana-ledger", read_only).await?;
|
||||||
|
Ok(Self { connection })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return the available slot that contains a block
|
||||||
|
pub async fn get_first_available_block(&self) -> Result<Option<Slot>> {
|
||||||
|
let mut bigtable = self.connection.client();
|
||||||
|
let blocks = bigtable.get_row_keys("blocks", None, 1).await?;
|
||||||
|
if blocks.is_empty() {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
Ok(key_to_slot(&blocks[0]))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fetch the next slots after the provided slot that contains a block
|
||||||
|
///
|
||||||
|
/// start_slot: slot to start the search from (inclusive)
|
||||||
|
/// limit: stop after this many slots have been found.
|
||||||
|
pub async fn get_confirmed_blocks(&self, start_slot: Slot, limit: usize) -> Result<Vec<Slot>> {
|
||||||
|
let mut bigtable = self.connection.client();
|
||||||
|
let blocks = bigtable
|
||||||
|
.get_row_keys("blocks", Some(slot_to_key(start_slot)), limit as i64)
|
||||||
|
.await?;
|
||||||
|
Ok(blocks.into_iter().filter_map(|s| key_to_slot(&s)).collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fetch the confirmed block from the desired slot
|
||||||
|
pub async fn get_confirmed_block(
|
||||||
|
&self,
|
||||||
|
slot: Slot,
|
||||||
|
encoding: UiTransactionEncoding,
|
||||||
|
) -> Result<ConfirmedBlock> {
|
||||||
|
let mut bigtable = self.connection.client();
|
||||||
|
let block = bigtable
|
||||||
|
.get_bincode_cell::<StoredConfirmedBlock>("blocks", slot_to_key(slot))
|
||||||
|
.await?;
|
||||||
|
Ok(block.into_confirmed_block(encoding))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_signature_status(&self, signature: &Signature) -> Result<TransactionStatus> {
|
||||||
|
let mut bigtable = self.connection.client();
|
||||||
|
let transaction_info = bigtable
|
||||||
|
.get_bincode_cell::<TransactionInfo>("tx", signature.to_string())
|
||||||
|
.await?;
|
||||||
|
Ok(transaction_info.into())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fetch a confirmed transaction
|
||||||
|
pub async fn get_confirmed_transaction(
|
||||||
|
&self,
|
||||||
|
signature: &Signature,
|
||||||
|
encoding: UiTransactionEncoding,
|
||||||
|
) -> Result<Option<ConfirmedTransaction>> {
|
||||||
|
let mut bigtable = self.connection.client();
|
||||||
|
|
||||||
|
// Figure out which block the transaction is located in
|
||||||
|
let TransactionInfo { slot, index, .. } = bigtable
|
||||||
|
.get_bincode_cell("tx", signature.to_string())
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Load the block and return the transaction
|
||||||
|
let block = bigtable
|
||||||
|
.get_bincode_cell::<StoredConfirmedBlock>("blocks", slot_to_key(slot))
|
||||||
|
.await?;
|
||||||
|
match block.transactions.into_iter().nth(index as usize) {
|
||||||
|
None => {
|
||||||
|
warn!("Transaction info for {} is corrupt", signature);
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
Some(bucket_block_transaction) => {
|
||||||
|
if bucket_block_transaction.transaction.signatures[0] != *signature {
|
||||||
|
warn!(
|
||||||
|
"Transaction info or confirmed block for {} is corrupt",
|
||||||
|
signature
|
||||||
|
);
|
||||||
|
Ok(None)
|
||||||
|
} else {
|
||||||
|
Ok(Some(ConfirmedTransaction {
|
||||||
|
slot,
|
||||||
|
transaction: bucket_block_transaction
|
||||||
|
.into_transaction_with_status_meta(encoding),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get confirmed signatures for the provided address, in descending ledger order
|
||||||
|
///
|
||||||
|
/// address: address to search for
|
||||||
|
/// start_after_signature: start with the first signature older than this one
|
||||||
|
/// limit: stop after this many signatures.
|
||||||
|
pub async fn get_confirmed_signatures_for_address(
|
||||||
|
&self,
|
||||||
|
address: &Pubkey,
|
||||||
|
start_after_signature: Option<&Signature>,
|
||||||
|
limit: usize,
|
||||||
|
) -> Result<Vec<(Signature, Slot, Option<String>, Option<TransactionError>)>> {
|
||||||
|
let mut bigtable = self.connection.client();
|
||||||
|
let address_prefix = format!("{}/", address);
|
||||||
|
|
||||||
|
// Figure out where to start listing from based on `start_after_signature`
|
||||||
|
let (first_slot, mut first_transaction_index) = match start_after_signature {
|
||||||
|
None => (Slot::MAX, 0),
|
||||||
|
Some(start_after_signature) => {
|
||||||
|
let TransactionInfo { slot, index, .. } = bigtable
|
||||||
|
.get_bincode_cell("tx", start_after_signature.to_string())
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
(slot, index + 1)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut infos = vec![];
|
||||||
|
|
||||||
|
// Return the next `limit` tx-by-addr keys
|
||||||
|
let tx_by_addr_info_keys = bigtable
|
||||||
|
.get_row_keys(
|
||||||
|
"tx-by-addr",
|
||||||
|
Some(format!("{}{}", address_prefix, slot_to_key(!first_slot))),
|
||||||
|
limit as i64,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Read each tx-by-addr object until `limit` signatures have been found
|
||||||
|
'outer: for key in tx_by_addr_info_keys {
|
||||||
|
trace!("key is {}: slot is {}", key, &key[address_prefix.len()..]);
|
||||||
|
if !key.starts_with(&address_prefix) {
|
||||||
|
break 'outer;
|
||||||
|
}
|
||||||
|
|
||||||
|
let slot = !key_to_slot(&key[address_prefix.len()..]).ok_or_else(|| {
|
||||||
|
bigtable::Error::ObjectCorrupt(format!(
|
||||||
|
"Failed to convert key to slot: tx-by-addr/{}",
|
||||||
|
key
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let tx_by_addr_infos = bigtable
|
||||||
|
.get_bincode_cell::<Vec<TransactionByAddrInfo>>("tx-by-addr", key)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
for tx_by_addr_info in tx_by_addr_infos
|
||||||
|
.into_iter()
|
||||||
|
.skip(first_transaction_index as usize)
|
||||||
|
{
|
||||||
|
infos.push((
|
||||||
|
tx_by_addr_info.signature,
|
||||||
|
slot,
|
||||||
|
tx_by_addr_info.memo,
|
||||||
|
tx_by_addr_info.err,
|
||||||
|
));
|
||||||
|
if infos.len() >= limit {
|
||||||
|
break 'outer;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
first_transaction_index = 0;
|
||||||
|
}
|
||||||
|
Ok(infos)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Upload a new confirmed block and associated meta data.
|
||||||
|
pub async fn upload_confirmed_block(
|
||||||
|
&self,
|
||||||
|
slot: Slot,
|
||||||
|
confirmed_block: ConfirmedBlock,
|
||||||
|
) -> Result<()> {
|
||||||
|
let mut bytes_written = 0;
|
||||||
|
|
||||||
|
let mut by_addr: HashMap<Pubkey, Vec<TransactionByAddrInfo>> = HashMap::new();
|
||||||
|
|
||||||
|
let mut tx_cells = vec![];
|
||||||
|
for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() {
|
||||||
|
let err = transaction_with_meta
|
||||||
|
.meta
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|meta| meta.err.clone());
|
||||||
|
let index = index as u32;
|
||||||
|
let transaction = transaction_with_meta
|
||||||
|
.transaction
|
||||||
|
.decode()
|
||||||
|
.expect("transaction decode failed");
|
||||||
|
let signature = transaction.signatures[0];
|
||||||
|
|
||||||
|
for address in transaction.message.account_keys {
|
||||||
|
if !is_sysvar_id(&address) {
|
||||||
|
by_addr
|
||||||
|
.entry(address)
|
||||||
|
.or_default()
|
||||||
|
.push(TransactionByAddrInfo {
|
||||||
|
signature,
|
||||||
|
err: err.clone(),
|
||||||
|
index,
|
||||||
|
memo: None, // TODO
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tx_cells.push((
|
||||||
|
signature.to_string(),
|
||||||
|
TransactionInfo {
|
||||||
|
slot,
|
||||||
|
index,
|
||||||
|
err,
|
||||||
|
memo: None, // TODO
|
||||||
|
},
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let tx_by_addr_cells: Vec<_> = by_addr
|
||||||
|
.into_iter()
|
||||||
|
.map(|(address, transaction_info_by_addr)| {
|
||||||
|
(
|
||||||
|
format!("{}/{}", address, slot_to_key(!slot)),
|
||||||
|
transaction_info_by_addr,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if !tx_cells.is_empty() {
|
||||||
|
bytes_written += self
|
||||||
|
.connection
|
||||||
|
.put_bincode_cells_with_retry::<TransactionInfo>("tx", &tx_cells)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !tx_by_addr_cells.is_empty() {
|
||||||
|
bytes_written += self
|
||||||
|
.connection
|
||||||
|
.put_bincode_cells_with_retry::<Vec<TransactionByAddrInfo>>(
|
||||||
|
"tx-by-addr",
|
||||||
|
&tx_by_addr_cells,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let num_transactions = confirmed_block.transactions.len();
|
||||||
|
|
||||||
|
// Store the block itself last, after all other metadata about the block has been
|
||||||
|
// successfully stored. This avoids partial uploaded blocks from becoming visible to
|
||||||
|
// `get_confirmed_block()` and `get_confirmed_blocks()`
|
||||||
|
let blocks_cells = [(slot_to_key(slot), confirmed_block.try_into()?)];
|
||||||
|
bytes_written += self
|
||||||
|
.connection
|
||||||
|
.put_bincode_cells_with_retry::<StoredConfirmedBlock>("blocks", &blocks_cells)
|
||||||
|
.await?;
|
||||||
|
info!(
|
||||||
|
"uploaded block for slot {}: {} transactions, {} bytes",
|
||||||
|
slot, num_transactions, bytes_written
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_slot_to_key() {
|
||||||
|
assert_eq!(slot_to_key(0), "0000000000000000");
|
||||||
|
assert_eq!(slot_to_key(!0), "ffffffffffffffff");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue