diff --git a/storage-bigtable/src/access_token.rs b/storage-bigtable/src/access_token.rs index f4d5e9ade9..8881f594ac 100644 --- a/storage-bigtable/src/access_token.rs +++ b/storage-bigtable/src/access_token.rs @@ -91,41 +91,49 @@ impl AccessToken { } /// Call this function regularly to ensure the access token does not expire - pub async fn refresh(&self) { + pub fn refresh(&self) { // Check if it's time to try a token refresh - { - let token_r = self.token.read().unwrap(); - if token_r.1.elapsed().as_secs() < token_r.0.expires_in() as u64 / 2 { - return; - } + let token_r = self.token.read().unwrap(); + if token_r.1.elapsed().as_secs() < token_r.0.expires_in() as u64 / 2 { + debug!("Token is not expired yet"); + return; + } + drop(token_r); - #[allow(deprecated)] - if self - .refresh_active - .compare_and_swap(false, true, Ordering::Relaxed) + // Refresh already is progress + let refresh_progress = + self.refresh_active + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed); + if refresh_progress.is_err() { + debug!("Token update is already in progress"); + return; + } + + let credentials = self.credentials.clone(); + let scope = self.scope.clone(); + let refresh_active = Arc::clone(&self.refresh_active); + let token = Arc::clone(&self.token); + tokio::spawn(async move { + match time::timeout( + time::Duration::from_secs(5), + Self::get_token(&credentials, &scope), + ) + .await { - // Refresh already pending - return; + Ok(new_token) => match new_token { + Ok(new_token) => { + let mut token_w = token.write().unwrap(); + *token_w = new_token; + } + Err(err) => error!("Failed to fetch new token: {}", err), + }, + Err(_timeout) => { + warn!("Token refresh timeout") + } } - } - - info!("Refreshing token"); - match time::timeout( - time::Duration::from_secs(5), - Self::get_token(&self.credentials, &self.scope), - ) - .await - { - Ok(new_token) => match (new_token, self.token.write()) { - (Ok(new_token), Ok(mut token_w)) => *token_w = new_token, - (Ok(_new_token), Err(err)) => warn!("{}", err), - (Err(err), _) => warn!("{}", err), - }, - Err(_) => { - warn!("Token refresh timeout") - } - } - self.refresh_active.store(false, Ordering::Relaxed); + refresh_active.store(false, Ordering::Relaxed); + info!("Token refreshed"); + }); } /// Return an access token suitable for use in an HTTP authorization header diff --git a/storage-bigtable/src/bigtable.rs b/storage-bigtable/src/bigtable.rs index e6df6d868b..f9454a91b8 100644 --- a/storage-bigtable/src/bigtable.rs +++ b/storage-bigtable/src/bigtable.rs @@ -410,9 +410,9 @@ impl) -> InterceptedRequestResult> BigTable { Ok(rows) } - async fn refresh_access_token(&self) { + fn refresh_access_token(&self) { if let Some(ref access_token) = self.access_token { - access_token.refresh().await; + access_token.refresh(); } } @@ -434,7 +434,7 @@ impl) -> InterceptedRequestResult> BigTable { if rows_limit == 0 { return Ok(vec![]); } - self.refresh_access_token().await; + self.refresh_access_token(); let response = self .client .read_rows(ReadRowsRequest { @@ -479,7 +479,7 @@ impl) -> InterceptedRequestResult> BigTable { /// Check whether a row key exists in a `table` pub async fn row_key_exists(&mut self, table_name: &str, row_key: RowKey) -> Result { - self.refresh_access_token().await; + self.refresh_access_token(); let response = self .client @@ -524,7 +524,7 @@ impl) -> InterceptedRequestResult> BigTable { if rows_limit == 0 { return Ok(vec![]); } - self.refresh_access_token().await; + self.refresh_access_token(); let response = self .client .read_rows(ReadRowsRequest { @@ -558,7 +558,7 @@ impl) -> InterceptedRequestResult> BigTable { table_name: &str, row_keys: &[RowKey], ) -> Result> { - self.refresh_access_token().await; + self.refresh_access_token(); let response = self .client @@ -594,7 +594,7 @@ impl) -> InterceptedRequestResult> BigTable { table_name: &str, row_key: RowKey, ) -> Result { - self.refresh_access_token().await; + self.refresh_access_token(); let response = self .client @@ -623,7 +623,7 @@ impl) -> InterceptedRequestResult> BigTable { /// Delete one or more `table` rows async fn delete_rows(&mut self, table_name: &str, row_keys: &[RowKey]) -> Result<()> { - self.refresh_access_token().await; + self.refresh_access_token(); let mut entries = vec![]; for row_key in row_keys { @@ -669,7 +669,7 @@ impl) -> InterceptedRequestResult> BigTable { family_name: &str, row_data: &[(&RowKey, RowData)], ) -> Result<()> { - self.refresh_access_token().await; + self.refresh_access_token(); let mut entries = vec![]; for (row_key, row_data) in row_data {