Skip to content

Commit d98906a

Browse files
committed
feat(iceberg-catalog-rest): expose invalidate_token, regenerate_token apis
1 parent 772ae1a commit d98906a

File tree

2 files changed

+217
-44
lines changed

2 files changed

+217
-44
lines changed

crates/catalog/rest/src/catalog.rs

Lines changed: 161 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,25 @@ impl RestCatalog {
318318

319319
Ok(file_io)
320320
}
321+
322+
/// Invalidate the current token and set a new one. Generates a new token before invalidating
323+
/// the current token, meaning the old token will be used until this function acquires the lock
324+
/// and overwrites the token.
325+
///
326+
/// - If credential is invalid or not provided, this method will return an error and do nothing.
327+
pub async fn invalidate_token(&self) -> Result<()> {
328+
self.context().await?.client.invalidate_token().await
329+
}
330+
331+
/// Invalidate the current token and set a new one. Generates a new token before invalidating
332+
/// the current token, meaning the old token will be used until this function acquires the lock
333+
/// and overwrites the token.
334+
///
335+
/// If credential is invalid or not provided, this method will return an error and leave the
336+
/// current token unchanged.
337+
pub async fn regenerate_token(&self) -> Result<()> {
338+
self.context().await?.client.regenerate_token().await
339+
}
321340
}
322341

323342
/// All requests and expected responses are derived from the REST catalog API spec:
@@ -860,21 +879,27 @@ mod tests {
860879
}
861880

862881
async fn create_oauth_mock(server: &mut ServerGuard) -> Mock {
863-
create_oauth_mock_with_path(server, "/v1/oauth/tokens").await
882+
create_oauth_mock_with_path(server, "/v1/oauth/tokens", "ey000000000000", 200).await
864883
}
865884

866-
async fn create_oauth_mock_with_path(server: &mut ServerGuard, path: &str) -> Mock {
867-
server
868-
.mock("POST", path)
869-
.with_status(200)
870-
.with_body(
871-
r#"{
872-
"access_token": "ey000000000000",
885+
async fn create_oauth_mock_with_path(
886+
server: &mut ServerGuard,
887+
path: &str,
888+
token: &str,
889+
status: usize,
890+
) -> Mock {
891+
let body = format!(
892+
r#"{{
893+
"access_token": "{token}",
873894
"token_type": "Bearer",
874895
"issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
875896
"expires_in": 86400
876-
}"#,
877-
)
897+
}}"#
898+
);
899+
server
900+
.mock("POST", path)
901+
.with_status(status)
902+
.with_body(body)
878903
.expect(1)
879904
.create_async()
880905
.await
@@ -949,6 +974,129 @@ mod tests {
949974
assert_eq!(token, Some("ey000000000000".to_string()));
950975
}
951976

977+
#[tokio::test]
978+
async fn test_invalidate_token() {
979+
let mut server = Server::new_async().await;
980+
let oauth_mock = create_oauth_mock(&mut server).await;
981+
let config_mock = create_config_mock(&mut server).await;
982+
983+
let mut props = HashMap::new();
984+
props.insert("credential".to_string(), "client1:secret1".to_string());
985+
986+
let catalog = RestCatalog::new(
987+
RestCatalogConfig::builder()
988+
.uri(server.url())
989+
.props(props)
990+
.build(),
991+
);
992+
993+
let token = catalog.context().await.unwrap().client.token().await;
994+
oauth_mock.assert_async().await;
995+
config_mock.assert_async().await;
996+
assert_eq!(token, Some("ey000000000000".to_string()));
997+
998+
let oauth_mock =
999+
create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1000+
.await;
1001+
catalog.invalidate_token().await.unwrap();
1002+
let token = catalog.context().await.unwrap().client.token().await;
1003+
oauth_mock.assert_async().await;
1004+
assert_eq!(token, Some("ey000000000001".to_string()));
1005+
}
1006+
1007+
#[tokio::test]
1008+
async fn test_invalidate_token_failing_request() {
1009+
let mut server = Server::new_async().await;
1010+
let oauth_mock = create_oauth_mock(&mut server).await;
1011+
let config_mock = create_config_mock(&mut server).await;
1012+
1013+
let mut props = HashMap::new();
1014+
props.insert("credential".to_string(), "client1:secret1".to_string());
1015+
1016+
let catalog = RestCatalog::new(
1017+
RestCatalogConfig::builder()
1018+
.uri(server.url())
1019+
.props(props)
1020+
.build(),
1021+
);
1022+
1023+
let token = catalog.context().await.unwrap().client.token().await;
1024+
oauth_mock.assert_async().await;
1025+
config_mock.assert_async().await;
1026+
assert_eq!(token, Some("ey000000000000".to_string()));
1027+
1028+
let oauth_mock =
1029+
create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1030+
.await;
1031+
catalog.invalidate_token().await.unwrap();
1032+
let token = catalog.context().await.unwrap().client.token().await;
1033+
oauth_mock.assert_async().await;
1034+
assert_eq!(token, None);
1035+
}
1036+
1037+
#[tokio::test]
1038+
async fn test_regenerate_token() {
1039+
let mut server = Server::new_async().await;
1040+
let oauth_mock = create_oauth_mock(&mut server).await;
1041+
let config_mock = create_config_mock(&mut server).await;
1042+
1043+
let mut props = HashMap::new();
1044+
props.insert("credential".to_string(), "client1:secret1".to_string());
1045+
1046+
let catalog = RestCatalog::new(
1047+
RestCatalogConfig::builder()
1048+
.uri(server.url())
1049+
.props(props)
1050+
.build(),
1051+
);
1052+
1053+
let token = catalog.context().await.unwrap().client.token().await;
1054+
oauth_mock.assert_async().await;
1055+
config_mock.assert_async().await;
1056+
assert_eq!(token, Some("ey000000000000".to_string()));
1057+
1058+
let oauth_mock =
1059+
create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 200)
1060+
.await;
1061+
catalog.regenerate_token().await.unwrap();
1062+
oauth_mock.assert_async().await;
1063+
let token = catalog.context().await.unwrap().client.token().await;
1064+
assert_eq!(token, Some("ey000000000001".to_string()));
1065+
}
1066+
1067+
#[tokio::test]
1068+
async fn test_regenerate_token_failing_request() {
1069+
let mut server = Server::new_async().await;
1070+
let oauth_mock = create_oauth_mock(&mut server).await;
1071+
let config_mock = create_config_mock(&mut server).await;
1072+
1073+
let mut props = HashMap::new();
1074+
props.insert("credential".to_string(), "client1:secret1".to_string());
1075+
1076+
let catalog = RestCatalog::new(
1077+
RestCatalogConfig::builder()
1078+
.uri(server.url())
1079+
.props(props)
1080+
.build(),
1081+
);
1082+
1083+
let token = catalog.context().await.unwrap().client.token().await;
1084+
oauth_mock.assert_async().await;
1085+
config_mock.assert_async().await;
1086+
assert_eq!(token, Some("ey000000000000".to_string()));
1087+
1088+
let oauth_mock =
1089+
create_oauth_mock_with_path(&mut server, "/v1/oauth/tokens", "ey000000000001", 500)
1090+
.await;
1091+
let invalidate_result = catalog.regenerate_token().await;
1092+
assert!(invalidate_result.is_err());
1093+
oauth_mock.assert_async().await;
1094+
let token = catalog.context().await.unwrap().client.token().await;
1095+
1096+
// original token is left intact
1097+
assert_eq!(token, Some("ey000000000000".to_string()));
1098+
}
1099+
9521100
#[tokio::test]
9531101
async fn test_http_headers() {
9541102
let server = Server::new_async().await;
@@ -1026,7 +1174,9 @@ mod tests {
10261174

10271175
let mut auth_server = Server::new_async().await;
10281176
let auth_server_path = "/some/path";
1029-
let oauth_mock = create_oauth_mock_with_path(&mut auth_server, auth_server_path).await;
1177+
let oauth_mock =
1178+
create_oauth_mock_with_path(&mut auth_server, auth_server_path, "ey000000000000", 200)
1179+
.await;
10301180

10311181
let mut props = HashMap::new();
10321182
props.insert("credential".to_string(), "client1:secret1".to_string());

crates/catalog/rest/src/client.rs

Lines changed: 56 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -106,38 +106,7 @@ impl HttpClient {
106106
self.token.lock().await.clone()
107107
}
108108

109-
/// Authenticate the request by filling token.
110-
///
111-
/// - If neither token nor credential is provided, this method will do nothing.
112-
/// - If only credential is provided, this method will try to fetch token from the server.
113-
/// - If token is provided, this method will use the token directly.
114-
///
115-
/// # TODO
116-
///
117-
/// Support refreshing token while needed.
118-
async fn authenticate(&self, req: &mut Request) -> Result<()> {
119-
// Clone the token from lock without holding the lock for entire function.
120-
let token = self.token.lock().await.clone();
121-
122-
if self.credential.is_none() && token.is_none() {
123-
return Ok(());
124-
}
125-
126-
// Use token if provided.
127-
if let Some(token) = &token {
128-
req.headers_mut().insert(
129-
http::header::AUTHORIZATION,
130-
format!("Bearer {token}").parse().map_err(|e| {
131-
Error::new(
132-
ErrorKind::DataInvalid,
133-
"Invalid token received from catalog server!",
134-
)
135-
.with_source(e)
136-
})?,
137-
);
138-
return Ok(());
139-
}
140-
109+
async fn exchange_credential_for_token(&self) -> Result<String> {
141110
// Credential must exist here.
142111
let (client_id, client_secret) = self.credential.as_ref().ok_or_else(|| {
143112
Error::new(
@@ -202,7 +171,61 @@ impl HttpClient {
202171
})?;
203172
Err(Error::from(e))
204173
}?;
205-
let token = auth_res.access_token;
174+
Ok(auth_res.access_token)
175+
}
176+
177+
/// Invalidate the current token without generating a new one. On the next request, the client
178+
/// will attempt to generate a new token.
179+
pub(crate) async fn invalidate_token(&self) -> Result<()> {
180+
*self.token.lock().await = None;
181+
Ok(())
182+
}
183+
184+
/// Invalidate the current token and set a new one. Generates a new token before invalidating
185+
/// the current token, meaning the old token will be used until this function acquires the lock
186+
/// and overwrites the token.
187+
///
188+
/// If credential is invalid or not provided, this method will return an error and leave the
189+
/// current token unchanged.
190+
pub(crate) async fn regenerate_token(&self) -> Result<()> {
191+
let new_token = self.exchange_credential_for_token().await?;
192+
*self.token.lock().await = Some(new_token.clone());
193+
Ok(())
194+
}
195+
196+
/// Authenticate the request by filling token.
197+
///
198+
/// - If neither token nor credential is provided, this method will do nothing.
199+
/// - If only credential is provided, this method will try to fetch token from the server.
200+
/// - If token is provided, this method will use the token directly.
201+
///
202+
/// # TODO
203+
///
204+
/// Support refreshing token while needed.
205+
async fn authenticate(&self, req: &mut Request) -> Result<()> {
206+
// Clone the token from lock without holding the lock for entire function.
207+
let token = self.token.lock().await.clone();
208+
209+
if self.credential.is_none() && token.is_none() {
210+
return Ok(());
211+
}
212+
213+
// Use token if provided.
214+
if let Some(token) = &token {
215+
req.headers_mut().insert(
216+
http::header::AUTHORIZATION,
217+
format!("Bearer {token}").parse().map_err(|e| {
218+
Error::new(
219+
ErrorKind::DataInvalid,
220+
"Invalid token received from catalog server!",
221+
)
222+
.with_source(e)
223+
})?,
224+
);
225+
return Ok(());
226+
}
227+
228+
let token = self.exchange_credential_for_token().await?;
206229
// Update token.
207230
*self.token.lock().await = Some(token.clone());
208231
// Insert token in request.

0 commit comments

Comments
 (0)