Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#5973] feat(hadoop-catalog): Support credential when using fileset catalog with cloud storage #5974

Open
wants to merge 64 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
c424d8e
Support using dynamic credential
yuqi1129 Dec 24, 2024
d7a031c
Merge branch 'main' of github.com:apache/gravitino into issue_5973
yuqi1129 Dec 26, 2024
5b648e8
Fix again.
yuqi1129 Dec 26, 2024
0c61a48
fix
yuqi1129 Dec 27, 2024
44425e9
Merge branch 'main' of github.com:datastrato/graviton into issue_5973
yuqi1129 Dec 27, 2024
18f6ff6
fix
yuqi1129 Dec 27, 2024
8fc56ae
Fix
yuqi1129 Dec 27, 2024
00fa098
Fix
yuqi1129 Dec 27, 2024
682705d
Fix
yuqi1129 Dec 27, 2024
50a4d15
Fix
yuqi1129 Dec 28, 2024
7f0a99b
Fix
yuqi1129 Dec 29, 2024
fede4a8
Merge branch 'main' of github.com:datastrato/graviton into issue_5973
yuqi1129 Dec 29, 2024
22bea5c
fix conflict
yuqi1129 Dec 29, 2024
f4f2287
fix
yuqi1129 Dec 30, 2024
20f7ec6
fix
yuqi1129 Dec 30, 2024
e8814b0
Polish code.
yuqi1129 Dec 30, 2024
9b7bf01
Merge branch 'main' of github.com:datastrato/graviton into issue_5973
yuqi1129 Dec 30, 2024
06b192b
fix
yuqi1129 Dec 31, 2024
440db59
fix
yuqi1129 Dec 31, 2024
9678513
fix
yuqi1129 Jan 2, 2025
c4fb29a
fix
yuqi1129 Jan 2, 2025
4791a64
Merge branch 'main' of github.com:datastrato/graviton into 5472
yuqi1129 Jan 2, 2025
9a90e7a
Merge branch 'main' of github.com:datastrato/graviton into issue_5973
yuqi1129 Jan 2, 2025
826239d
Merge branch 'main' of github.com:datastrato/graviton into issue_5973
yuqi1129 Jan 3, 2025
baf42e1
fix
yuqi1129 Jan 3, 2025
d86610b
Merge branch 'main' of github.com:datastrato/graviton into 5472
yuqi1129 Jan 3, 2025
a1aa4d5
fix
yuqi1129 Jan 3, 2025
f67981c
fix typo
yuqi1129 Jan 3, 2025
c0db96b
fix
yuqi1129 Jan 3, 2025
d2ba98b
refactor module and create a new module `filesystem-hadoop3-common`
yuqi1129 Jan 3, 2025
b7eb621
fix
yuqi1129 Jan 3, 2025
b34c526
Rename class of credential providers and optimize expired time.
yuqi1129 Jan 3, 2025
1ecc378
update the docs
yuqi1129 Jan 4, 2025
d232e92
polish document again.
yuqi1129 Jan 6, 2025
fbd57ba
Again
yuqi1129 Jan 6, 2025
c47fd09
Merge branch 'main' of github.com:datastrato/graviton into issue_5973
yuqi1129 Jan 6, 2025
2d8f4dc
Merge remote-tracking branch 'me/issue_5973' into issue_5973
yuqi1129 Jan 6, 2025
193f467
resolve comments.
yuqi1129 Jan 6, 2025
a1f0989
fix
yuqi1129 Jan 6, 2025
d75b67e
Merge remote-tracking branch 'me/issue_5973' into issue_5973
yuqi1129 Jan 6, 2025
4fb6e79
fix
yuqi1129 Jan 6, 2025
755a474
fix comments.
yuqi1129 Jan 6, 2025
5251a85
Merge remote-tracking branch 'me/issue_5973' into issue_5973
yuqi1129 Jan 6, 2025
3bb9252
fix
yuqi1129 Jan 6, 2025
614464f
Optimize
yuqi1129 Jan 6, 2025
e481c8d
fix
yuqi1129 Jan 6, 2025
0a97fc7
fix
yuqi1129 Jan 6, 2025
9bb28a8
Merge branch '5472' of github.com:yuqi1129/gravitino into issue_5973
yuqi1129 Jan 7, 2025
7e74fc9
optimize again.
yuqi1129 Jan 7, 2025
c853de2
Merge remote-tracking branch 'me/issue_5973' into issue_5973
yuqi1129 Jan 7, 2025
571c1e9
The new framework of credential vending in fileset.
yuqi1129 Jan 7, 2025
e27f9cd
Refactor the framework of using credential in fileset
yuqi1129 Jan 7, 2025
37a5735
Merge branch 'main' of github.com:apache/gravitino into issue_5973
yuqi1129 Jan 7, 2025
bad0c87
fix
yuqi1129 Jan 7, 2025
ca51efb
fix
yuqi1129 Jan 7, 2025
bf1290c
fix
yuqi1129 Jan 8, 2025
a1b249f
fix typo
yuqi1129 Jan 8, 2025
a722080
fix the checkstyle problem.
yuqi1129 Jan 8, 2025
03395c8
fix
yuqi1129 Jan 8, 2025
274ec3e
fix
yuqi1129 Jan 8, 2025
3eb9e92
Merge branch 'main' of github.com:apache/gravitino into issue_5973
yuqi1129 Jan 8, 2025
a285dee
Polish the code.
yuqi1129 Jan 8, 2025
ce49fa3
fix
yuqi1129 Jan 8, 2025
6b848c8
fix
yuqi1129 Jan 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public long expireTimeInMs() {
public Map<String, String> credentialInfo() {
return (new ImmutableMap.Builder<String, String>())
.put(GRAVITINO_ADLS_SAS_TOKEN, sasToken)
.put(GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME, accountName)
.build();
}

Expand Down
4 changes: 4 additions & 0 deletions bundles/aliyun/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@ dependencies {
compileOnly(project(":catalogs:catalog-common"))
compileOnly(project(":catalogs:catalog-hadoop"))
compileOnly(project(":core"))
compileOnly(project(":clients:client-java"))
compileOnly(libs.hadoop3.client.api)
compileOnly(libs.hadoop3.client.runtime)
compileOnly(libs.hadoop3.oss)

implementation(project(":catalogs:catalog-common")) {
exclude("*")
}
implementation(project(":clients:filesystem-hadoop3-common")) {
exclude("*")
}
implementation(project(":catalogs:hadoop-common")) {
exclude("*")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.oss.fs;

import com.aliyun.oss.common.auth.BasicCredentials;
import com.aliyun.oss.common.auth.Credentials;
import com.aliyun.oss.common.auth.CredentialsProvider;
import com.aliyun.oss.common.auth.DefaultCredentials;
import java.net.URI;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.client.GravitinoClient;
import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.OSSSecretKeyCredential;
import org.apache.gravitino.credential.OSSTokenCredential;
import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.file.FilesetCatalog;
import org.apache.gravitino.filesystem.common.GravitinoVirtualFileSystemConfiguration;
import org.apache.gravitino.filesystem.common.GravitinoVirtualFileSystemUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.aliyun.oss.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OSSCredentialsProvider implements CredentialsProvider {

private static final Logger LOG = LoggerFactory.getLogger(OSSCredentialsProvider.class);
private Credentials basicCredentials;
private final String filesetIdentifier;
private GravitinoClient client;
private final Configuration configuration;

private long expirationTime = Long.MAX_VALUE;
private static final double EXPIRATION_TIME_FACTOR = 0.9D;

public OSSCredentialsProvider(URI uri, Configuration conf) {
this.filesetIdentifier =
conf.get(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_IDENTIFIER);
this.configuration = conf;
}

@Override
public void setCredentials(Credentials credentials) {}

@Override
public Credentials getCredentials() {
// If the credentials are null or about to expire, refresh the credentials.
if (basicCredentials == null || System.currentTimeMillis() >= expirationTime) {
synchronized (this) {
try {
refresh();
} finally {
if (null != this.client) {
this.client.close();
}
}
}
}

return basicCredentials;
}

private void refresh() {
String[] idents = filesetIdentifier.split("\\.");
String catalog = idents[1];

client = GravitinoVirtualFileSystemUtils.createClient(configuration);
FilesetCatalog filesetCatalog = client.loadCatalog(catalog).asFilesetCatalog();

Fileset fileset = filesetCatalog.loadFileset(NameIdentifier.of(idents[2], idents[3]));
Credential[] credentials = fileset.supportsCredentials().getCredentials();
Credential credential = getCredential(credentials);

if (credential == null) {
LOG.warn("No credential found for fileset: {}, try to use static AKSK", filesetIdentifier);
expirationTime = Long.MAX_VALUE;
this.basicCredentials =
new DefaultCredentials(
configuration.get(Constants.ACCESS_KEY_ID),
configuration.get(Constants.ACCESS_KEY_SECRET));
return;
}

if (credential instanceof OSSSecretKeyCredential) {
OSSSecretKeyCredential ossSecretKeyCredential = (OSSSecretKeyCredential) credential;
basicCredentials =
new DefaultCredentials(
ossSecretKeyCredential.accessKeyId(), ossSecretKeyCredential.secretAccessKey());
} else if (credential instanceof OSSTokenCredential) {
OSSTokenCredential ossTokenCredential = (OSSTokenCredential) credential;
basicCredentials =
new BasicCredentials(
ossTokenCredential.accessKeyId(),
ossTokenCredential.secretAccessKey(),
ossTokenCredential.securityToken());
}

if (credential.expireTimeInMs() > 0) {
expirationTime =
System.currentTimeMillis()
+ (long)
((credential.expireTimeInMs() - System.currentTimeMillis())
* EXPIRATION_TIME_FACTOR);
}
}

/**
* Get the credential from the credential array. Using dynamic credential first, if not found,
* uses static credential.
*
* @param credentials The credential array.
* @return A credential. Null if not found.
*/
private Credential getCredential(Credential[] credentials) {
// Use dynamic credential if found.
for (Credential credential : credentials) {
if (credential instanceof OSSTokenCredential) {
return credential;
}
}

// If dynamic credential not found, use the static one
for (Credential credential : credentials) {
if (credential instanceof OSSSecretKeyCredential) {
return credential;
}
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.filesystem.common.GravitinoVirtualFileSystemConfiguration;
import org.apache.gravitino.storage.OSSProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -60,10 +61,40 @@ public FileSystem getFileSystem(Path path, Map<String, String> config) throws IO
hadoopConfMap.put(OSS_FILESYSTEM_IMPL, AliyunOSSFileSystem.class.getCanonicalName());
}

// if (shouldSetCredentialsProviderExplicitly(config)) {
// hadoopConfMap.put(
// Constants.CREDENTIALS_PROVIDER_KEY,
// OSSCredentialsProvider.class.getCanonicalName());
// }

if (enableCredentialProvidedByGravitino(config)) {
hadoopConfMap.put(
Constants.CREDENTIALS_PROVIDER_KEY, TestOSSCredentialProvider.class.getCanonicalName());
}

hadoopConfMap.forEach(configuration::set);

return AliyunOSSFileSystem.newInstance(path.toUri(), configuration);
}

private boolean enableCredentialProvidedByGravitino(Map<String, String> config) {
return null != config.get("fs.gvfs.provider.impl");
}

/**
* Check if the credential provider should be set explicitly.
*
* <p>When the credential provider is not set and the server URI is set (this means the call is
* from GVFS client), we need to manually set the credential provider
*
* @param config the configuration map
* @return true if the credential provider should be set explicitly
*/
private boolean shouldSetCredentialsProviderExplicitly(Map<String, String> config) {
return !config.containsKey(Constants.CREDENTIALS_PROVIDER_KEY)
&& config.containsKey(GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_SERVER_URI_KEY);
}

@Override
public String scheme() {
return "oss";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.oss.fs;

import com.aliyun.oss.common.auth.BasicCredentials;
import com.aliyun.oss.common.auth.Credentials;
import com.aliyun.oss.common.auth.CredentialsProvider;
import com.aliyun.oss.common.auth.DefaultCredentials;
import java.net.URI;
import org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialProvider;
import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.OSSSecretKeyCredential;
import org.apache.gravitino.credential.OSSTokenCredential;
import org.apache.hadoop.conf.Configuration;

public class TestOSSCredentialProvider implements CredentialsProvider {
private GravitinoFileSystemCredentialProvider gravitinoFileSystemCredentialProvider;
private Credentials basicCredentials;
private long expirationTime = Long.MAX_VALUE;
private static final double EXPIRATION_TIME_FACTOR = 0.9D;

public TestOSSCredentialProvider(URI uri, Configuration conf) {
try {
gravitinoFileSystemCredentialProvider =
(GravitinoFileSystemCredentialProvider)
Class.forName(conf.get("fs.gvfs.credential.provider"))
.getDeclaredConstructor()
.newInstance();
gravitinoFileSystemCredentialProvider.setConf(conf);
} catch (Exception e) {
throw new RuntimeException("Failed to create GravitinoFileSystemCredentialProvider", e);
}
}

@Override
public void setCredentials(Credentials credentials) {}

@Override
public Credentials getCredentials() {
if (basicCredentials == null || System.currentTimeMillis() >= expirationTime) {
Credential[] gravitinoCredentials = gravitinoFileSystemCredentialProvider.getCredentials();
if (gravitinoCredentials.length == 0) {
throw new RuntimeException("No credentials found");
}

// Get dynamic credentials from Gravitino
Credential gravitinoCredential = gravitinoCredentials[0];

if (gravitinoCredential instanceof OSSSecretKeyCredential) {
OSSSecretKeyCredential ossSecretKeyCredential =
(OSSSecretKeyCredential) gravitinoCredential;
basicCredentials =
new DefaultCredentials(
ossSecretKeyCredential.accessKeyId(), ossSecretKeyCredential.secretAccessKey());
} else if (gravitinoCredential instanceof OSSTokenCredential) {
OSSTokenCredential ossTokenCredential = (OSSTokenCredential) gravitinoCredential;
basicCredentials =
new BasicCredentials(
ossTokenCredential.accessKeyId(),
ossTokenCredential.secretAccessKey(),
ossTokenCredential.securityToken());
}

if (gravitinoCredential.expireTimeInMs() > 0) {
expirationTime =
System.currentTimeMillis()
+ (long)
((gravitinoCredential.expireTimeInMs() - System.currentTimeMillis())
* EXPIRATION_TIME_FACTOR);
}
}

return basicCredentials;
}
}
4 changes: 4 additions & 0 deletions bundles/aws/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@ dependencies {
compileOnly(project(":catalogs:catalog-common"))
compileOnly(project(":catalogs:catalog-hadoop"))
compileOnly(project(":core"))
compileOnly(project(":clients:client-java"))
compileOnly(libs.hadoop3.aws)
compileOnly(libs.hadoop3.client.api)
compileOnly(libs.hadoop3.client.runtime)

implementation(project(":catalogs:catalog-common")) {
exclude("*")
}
implementation(project(":clients:filesystem-hadoop3-common")) {
exclude("*")
}
implementation(project(":catalogs:hadoop-common")) {
exclude("*")
}
Expand Down
Loading