bigtable: fix AccessToken issues (#34213)
* bigtable: fix AccessToken issue * remove inner * less changes * fmt + drop lock
This commit is contained in:
parent
564f1a97e0
commit
873bef9fef
|
@ -91,41 +91,49 @@ impl AccessToken {
|
|||
}
|
||||
|
||||
/// Call this function regularly to ensure the access token does not expire
|
||||
pub async fn refresh(&self) {
|
||||
pub fn refresh(&self) {
|
||||
// Check if it's time to try a token refresh
|
||||
{
|
||||
let token_r = self.token.read().unwrap();
|
||||
if token_r.1.elapsed().as_secs() < token_r.0.expires_in() as u64 / 2 {
|
||||
return;
|
||||
}
|
||||
let token_r = self.token.read().unwrap();
|
||||
if token_r.1.elapsed().as_secs() < token_r.0.expires_in() as u64 / 2 {
|
||||
debug!("Token is not expired yet");
|
||||
return;
|
||||
}
|
||||
drop(token_r);
|
||||
|
||||
#[allow(deprecated)]
|
||||
if self
|
||||
.refresh_active
|
||||
.compare_and_swap(false, true, Ordering::Relaxed)
|
||||
// Refresh already is progress
|
||||
let refresh_progress =
|
||||
self.refresh_active
|
||||
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed);
|
||||
if refresh_progress.is_err() {
|
||||
debug!("Token update is already in progress");
|
||||
return;
|
||||
}
|
||||
|
||||
let credentials = self.credentials.clone();
|
||||
let scope = self.scope.clone();
|
||||
let refresh_active = Arc::clone(&self.refresh_active);
|
||||
let token = Arc::clone(&self.token);
|
||||
tokio::spawn(async move {
|
||||
match time::timeout(
|
||||
time::Duration::from_secs(5),
|
||||
Self::get_token(&credentials, &scope),
|
||||
)
|
||||
.await
|
||||
{
|
||||
// Refresh already pending
|
||||
return;
|
||||
Ok(new_token) => match new_token {
|
||||
Ok(new_token) => {
|
||||
let mut token_w = token.write().unwrap();
|
||||
*token_w = new_token;
|
||||
}
|
||||
Err(err) => error!("Failed to fetch new token: {}", err),
|
||||
},
|
||||
Err(_timeout) => {
|
||||
warn!("Token refresh timeout")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("Refreshing token");
|
||||
match time::timeout(
|
||||
time::Duration::from_secs(5),
|
||||
Self::get_token(&self.credentials, &self.scope),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(new_token) => match (new_token, self.token.write()) {
|
||||
(Ok(new_token), Ok(mut token_w)) => *token_w = new_token,
|
||||
(Ok(_new_token), Err(err)) => warn!("{}", err),
|
||||
(Err(err), _) => warn!("{}", err),
|
||||
},
|
||||
Err(_) => {
|
||||
warn!("Token refresh timeout")
|
||||
}
|
||||
}
|
||||
self.refresh_active.store(false, Ordering::Relaxed);
|
||||
refresh_active.store(false, Ordering::Relaxed);
|
||||
info!("Token refreshed");
|
||||
});
|
||||
}
|
||||
|
||||
/// Return an access token suitable for use in an HTTP authorization header
|
||||
|
|
|
@ -410,9 +410,9 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
|
|||
Ok(rows)
|
||||
}
|
||||
|
||||
async fn refresh_access_token(&self) {
|
||||
fn refresh_access_token(&self) {
|
||||
if let Some(ref access_token) = self.access_token {
|
||||
access_token.refresh().await;
|
||||
access_token.refresh();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -434,7 +434,7 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
|
|||
if rows_limit == 0 {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
self.refresh_access_token().await;
|
||||
self.refresh_access_token();
|
||||
let response = self
|
||||
.client
|
||||
.read_rows(ReadRowsRequest {
|
||||
|
@ -479,7 +479,7 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
|
|||
|
||||
/// Check whether a row key exists in a `table`
|
||||
pub async fn row_key_exists(&mut self, table_name: &str, row_key: RowKey) -> Result<bool> {
|
||||
self.refresh_access_token().await;
|
||||
self.refresh_access_token();
|
||||
|
||||
let response = self
|
||||
.client
|
||||
|
@ -524,7 +524,7 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
|
|||
if rows_limit == 0 {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
self.refresh_access_token().await;
|
||||
self.refresh_access_token();
|
||||
let response = self
|
||||
.client
|
||||
.read_rows(ReadRowsRequest {
|
||||
|
@ -558,7 +558,7 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
|
|||
table_name: &str,
|
||||
row_keys: &[RowKey],
|
||||
) -> Result<Vec<(RowKey, RowData)>> {
|
||||
self.refresh_access_token().await;
|
||||
self.refresh_access_token();
|
||||
|
||||
let response = self
|
||||
.client
|
||||
|
@ -594,7 +594,7 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
|
|||
table_name: &str,
|
||||
row_key: RowKey,
|
||||
) -> Result<RowData> {
|
||||
self.refresh_access_token().await;
|
||||
self.refresh_access_token();
|
||||
|
||||
let response = self
|
||||
.client
|
||||
|
@ -623,7 +623,7 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
|
|||
|
||||
/// Delete one or more `table` rows
|
||||
async fn delete_rows(&mut self, table_name: &str, row_keys: &[RowKey]) -> Result<()> {
|
||||
self.refresh_access_token().await;
|
||||
self.refresh_access_token();
|
||||
|
||||
let mut entries = vec![];
|
||||
for row_key in row_keys {
|
||||
|
@ -669,7 +669,7 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
|
|||
family_name: &str,
|
||||
row_data: &[(&RowKey, RowData)],
|
||||
) -> Result<()> {
|
||||
self.refresh_access_token().await;
|
||||
self.refresh_access_token();
|
||||
|
||||
let mut entries = vec![];
|
||||
for (row_key, row_data) in row_data {
|
||||
|
|
Loading…
Reference in New Issue