Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#5973] feat(hadoop-catalog): Support credential when using fileset catalog with cloud storage #5974

Open
wants to merge 64 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
c424d8e
Support using dynamic credential
yuqi1129 Dec 24, 2024
d7a031c
Merge branch 'main' of github.com:apache/gravitino into issue_5973
yuqi1129 Dec 26, 2024
5b648e8
Fix again.
yuqi1129 Dec 26, 2024
0c61a48
fix
yuqi1129 Dec 27, 2024
44425e9
Merge branch 'main' of github.com:datastrato/graviton into issue_5973
yuqi1129 Dec 27, 2024
18f6ff6
fix
yuqi1129 Dec 27, 2024
8fc56ae
Fix
yuqi1129 Dec 27, 2024
00fa098
Fix
yuqi1129 Dec 27, 2024
682705d
Fix
yuqi1129 Dec 27, 2024
50a4d15
Fix
yuqi1129 Dec 28, 2024
7f0a99b
Fix
yuqi1129 Dec 29, 2024
fede4a8
Merge branch 'main' of github.com:datastrato/graviton into issue_5973
yuqi1129 Dec 29, 2024
22bea5c
fix conflict
yuqi1129 Dec 29, 2024
f4f2287
fix
yuqi1129 Dec 30, 2024
20f7ec6
fix
yuqi1129 Dec 30, 2024
e8814b0
Polish code.
yuqi1129 Dec 30, 2024
9b7bf01
Merge branch 'main' of github.com:datastrato/graviton into issue_5973
yuqi1129 Dec 30, 2024
06b192b
fix
yuqi1129 Dec 31, 2024
440db59
fix
yuqi1129 Dec 31, 2024
9678513
fix
yuqi1129 Jan 2, 2025
c4fb29a
fix
yuqi1129 Jan 2, 2025
4791a64
Merge branch 'main' of github.com:datastrato/graviton into 5472
yuqi1129 Jan 2, 2025
9a90e7a
Merge branch 'main' of github.com:datastrato/graviton into issue_5973
yuqi1129 Jan 2, 2025
826239d
Merge branch 'main' of github.com:datastrato/graviton into issue_5973
yuqi1129 Jan 3, 2025
baf42e1
fix
yuqi1129 Jan 3, 2025
d86610b
Merge branch 'main' of github.com:datastrato/graviton into 5472
yuqi1129 Jan 3, 2025
a1aa4d5
fix
yuqi1129 Jan 3, 2025
f67981c
fix typo
yuqi1129 Jan 3, 2025
c0db96b
fix
yuqi1129 Jan 3, 2025
d2ba98b
refactor module and create a new module `filesystem-hadoop3-common`
yuqi1129 Jan 3, 2025
b7eb621
fix
yuqi1129 Jan 3, 2025
b34c526
Rename class of credential providers and optimize expired time.
yuqi1129 Jan 3, 2025
1ecc378
update the docs
yuqi1129 Jan 4, 2025
d232e92
polish document again.
yuqi1129 Jan 6, 2025
fbd57ba
Again
yuqi1129 Jan 6, 2025
c47fd09
Merge branch 'main' of github.com:datastrato/graviton into issue_5973
yuqi1129 Jan 6, 2025
2d8f4dc
Merge remote-tracking branch 'me/issue_5973' into issue_5973
yuqi1129 Jan 6, 2025
193f467
resolve comments.
yuqi1129 Jan 6, 2025
a1f0989
fix
yuqi1129 Jan 6, 2025
d75b67e
Merge remote-tracking branch 'me/issue_5973' into issue_5973
yuqi1129 Jan 6, 2025
4fb6e79
fix
yuqi1129 Jan 6, 2025
755a474
fix comments.
yuqi1129 Jan 6, 2025
5251a85
Merge remote-tracking branch 'me/issue_5973' into issue_5973
yuqi1129 Jan 6, 2025
3bb9252
fix
yuqi1129 Jan 6, 2025
614464f
Optimize
yuqi1129 Jan 6, 2025
e481c8d
fix
yuqi1129 Jan 6, 2025
0a97fc7
fix
yuqi1129 Jan 6, 2025
9bb28a8
Merge branch '5472' of github.com:yuqi1129/gravitino into issue_5973
yuqi1129 Jan 7, 2025
7e74fc9
optimize again.
yuqi1129 Jan 7, 2025
c853de2
Merge remote-tracking branch 'me/issue_5973' into issue_5973
yuqi1129 Jan 7, 2025
571c1e9
The new framework of credential vending in fileset.
yuqi1129 Jan 7, 2025
e27f9cd
Refactor the framework of using credential in fileset
yuqi1129 Jan 7, 2025
37a5735
Merge branch 'main' of github.com:apache/gravitino into issue_5973
yuqi1129 Jan 7, 2025
bad0c87
fix
yuqi1129 Jan 7, 2025
ca51efb
fix
yuqi1129 Jan 7, 2025
bf1290c
fix
yuqi1129 Jan 8, 2025
a1b249f
fix typo
yuqi1129 Jan 8, 2025
a722080
fix the checkstyle problem.
yuqi1129 Jan 8, 2025
03395c8
fix
yuqi1129 Jan 8, 2025
274ec3e
fix
yuqi1129 Jan 8, 2025
3eb9e92
Merge branch 'main' of github.com:apache/gravitino into issue_5973
yuqi1129 Jan 8, 2025
a285dee
Polish the code.
yuqi1129 Jan 8, 2025
ce49fa3
fix
yuqi1129 Jan 8, 2025
6b848c8
fix
yuqi1129 Jan 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public long expireTimeInMs() {
public Map<String, String> credentialInfo() {
return (new ImmutableMap.Builder<String, String>())
.put(GRAVITINO_ADLS_SAS_TOKEN, sasToken)
.put(GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME, accountName)
.build();
}

Expand Down
2 changes: 2 additions & 0 deletions bundles/aliyun/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ dependencies {
implementation(project(":catalogs:hadoop-common")) {
exclude("*")
}
implementation(project(":clients:client-java-runtime", configuration = "shadow"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, why this should be implementation, AFAIK, the bundles are typically used with gvfs, we already included this client runtime in gvfs, what's the reason to do this here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me think a bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed as suggested.

implementation(project(":clients:filesystem-hadoop3-runtime", configuration = "shadow"))
yuqi1129 marked this conversation as resolved.
Show resolved Hide resolved

implementation(libs.aliyun.credentials.sdk)
implementation(libs.commons.collections3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ private String createPolicy(Set<String> readLocations, Set<String> writeLocation
key ->
Statement.builder()
.effect(Effect.ALLOW)
.addAction("oss:ListBucket")
.addAction("oss:ListObjects")
.addResource(key)
.condition(getCondition(uri)));
// GetBucketLocation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration;
import org.apache.gravitino.storage.OSSProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -60,7 +61,16 @@ public FileSystem getFileSystem(Path path, Map<String, String> config) throws IO
hadoopConfMap.put(OSS_FILESYSTEM_IMPL, AliyunOSSFileSystem.class.getCanonicalName());
}

if (!hadoopConfMap.containsKey(Constants.CREDENTIALS_PROVIDER_KEY)
yuqi1129 marked this conversation as resolved.
Show resolved Hide resolved
&& config.containsKey(
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_SERVER_URI_KEY)) {
hadoopConfMap.put(
Constants.CREDENTIALS_PROVIDER_KEY,
OSSSessionCredentialProvider.class.getCanonicalName());
}

hadoopConfMap.forEach(configuration::set);

return AliyunOSSFileSystem.newInstance(path.toUri(), configuration);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.oss.fs;

import static org.apache.gravitino.credential.OSSTokenCredential.GRAVITINO_OSS_SESSION_ACCESS_KEY_ID;
import static org.apache.gravitino.credential.OSSTokenCredential.GRAVITINO_OSS_SESSION_SECRET_ACCESS_KEY;
import static org.apache.gravitino.credential.OSSTokenCredential.GRAVITINO_OSS_TOKEN;

import com.aliyun.oss.common.auth.BasicCredentials;
import com.aliyun.oss.common.auth.Credentials;
import com.aliyun.oss.common.auth.CredentialsProvider;
import com.aliyun.oss.common.auth.DefaultCredentials;
import java.net.URI;
import java.util.Map;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.client.GravitinoClient;
import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.OSSTokenCredential;
import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.file.FilesetCatalog;
import org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem;
import org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.aliyun.oss.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OSSSessionCredentialProvider implements CredentialsProvider {

private static final Logger LOGGER = LoggerFactory.getLogger(OSSSessionCredentialProvider.class);
private Credentials basicCredentials;
private final String filesetIdentifier;
private long expirationTime;
private final GravitinoClient client;
private final Configuration configuration;

public OSSSessionCredentialProvider(URI uri, Configuration conf) {
this.filesetIdentifier =
conf.get(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_IDENTIFIER);
// extra value and init Gravitino client here
GravitinoVirtualFileSystem gravitinoVirtualFileSystem = new GravitinoVirtualFileSystem();
this.client = gravitinoVirtualFileSystem.initializeClient(conf);
this.configuration = conf;
}

@Override
public void setCredentials(Credentials credentials) {}

@Override
public Credentials getCredentials() {
// If the credentials are null or about to expire, refresh the credentials.
if (basicCredentials == null || System.currentTimeMillis() > expirationTime - 5 * 60 * 1000) {
synchronized (this) {
refresh();
}
}

return basicCredentials;
}

private void refresh() {
String[] idents = filesetIdentifier.split("\\.");
String catalog = idents[1];

FilesetCatalog filesetCatalog = client.loadCatalog(catalog).asFilesetCatalog();

Fileset fileset = filesetCatalog.loadFileset(NameIdentifier.of(idents[2], idents[3]));
Credential[] credentials = fileset.supportsCredentials().getCredentials();
if (credentials.length == 0) {
LOGGER.warn("No credential found for fileset: {}, try to use static AKSK", filesetIdentifier);
expirationTime = Long.MAX_VALUE;
this.basicCredentials =
new DefaultCredentials(
configuration.get(Constants.ACCESS_KEY_ID),
configuration.get(Constants.ACCESS_KEY_SECRET));
return;
}

// Use the first one.
Credential credential = credentials[0];
yuqi1129 marked this conversation as resolved.
Show resolved Hide resolved
Map<String, String> credentialMap = credential.toProperties();

String accessKeyId = credentialMap.get(GRAVITINO_OSS_SESSION_ACCESS_KEY_ID);
String secretAccessKey = credentialMap.get(GRAVITINO_OSS_SESSION_SECRET_ACCESS_KEY);

if (OSSTokenCredential.OSS_TOKEN_CREDENTIAL_TYPE.equals(
credentialMap.get(Credential.CREDENTIAL_TYPE))) {
String sessionToken = credentialMap.get(GRAVITINO_OSS_TOKEN);
this.basicCredentials = new BasicCredentials(accessKeyId, secretAccessKey, sessionToken);
} else {
this.basicCredentials = new DefaultCredentials(accessKeyId, secretAccessKey);
}

this.expirationTime = credential.expireTimeInMs();
if (expirationTime <= 0) {
expirationTime = Long.MAX_VALUE;
}
}
}
2 changes: 2 additions & 0 deletions bundles/aws/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ dependencies {
implementation(project(":catalogs:hadoop-common")) {
exclude("*")
}
implementation(project(":clients:client-java-runtime", configuration = "shadow"))
implementation(project(":clients:filesystem-hadoop3-runtime", configuration = "shadow"))

implementation(libs.aws.iam)
implementation(libs.aws.policy)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration;
import org.apache.gravitino.storage.S3Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -61,11 +62,15 @@ public FileSystem getFileSystem(Path path, Map<String, String> config) throws IO
Map<String, String> hadoopConfMap =
FileSystemUtils.toHadoopConfigMap(config, GRAVITINO_KEY_TO_S3_HADOOP_KEY);

hadoopConfMap.forEach(configuration::set);
if (!hadoopConfMap.containsKey(S3_CREDENTIAL_KEY)) {
hadoopConfMap.put(S3_CREDENTIAL_KEY, S3_SIMPLE_CREDENTIAL);
configuration.set(S3_CREDENTIAL_KEY, S3_SIMPLE_CREDENTIAL);
}

hadoopConfMap.forEach(configuration::set);
if (config.containsKey(GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_SERVER_URI_KEY)) {
configuration.set(
Constants.AWS_CREDENTIALS_PROVIDER, S3SessionCredentialProvider.class.getCanonicalName());
}

// Hadoop-aws 2 does not support IAMInstanceCredentialsProvider
checkAndSetCredentialProvider(configuration);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.s3.fs;

import static org.apache.gravitino.credential.S3TokenCredential.GRAVITINO_S3_SESSION_ACCESS_KEY_ID;
import static org.apache.gravitino.credential.S3TokenCredential.GRAVITINO_S3_SESSION_SECRET_ACCESS_KEY;
import static org.apache.gravitino.credential.S3TokenCredential.GRAVITINO_S3_TOKEN;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.BasicSessionCredentials;
import java.net.URI;
import java.util.Map;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.client.GravitinoClient;
import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.S3TokenCredential;
import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.file.FilesetCatalog;
import org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem;
import org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3SessionCredentialProvider implements AWSCredentialsProvider {

private static final Logger LOGGER = LoggerFactory.getLogger(S3SessionCredentialProvider.class);
private final GravitinoClient client;
private final String filesetIdentifier;
private final Configuration configuration;

private AWSCredentials basicSessionCredentials;
private long expirationTime;

public S3SessionCredentialProvider(final URI uri, final Configuration conf) {
this.filesetIdentifier =
conf.get(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_IDENTIFIER);
this.configuration = conf;

// extra value and init Gravitino client here
GravitinoVirtualFileSystem gravitinoVirtualFileSystem = new GravitinoVirtualFileSystem();
this.client = gravitinoVirtualFileSystem.initializeClient(conf);
}

@Override
public AWSCredentials getCredentials() {
// Refresh credentials if they are null or about to expire in 5 minutes
if (basicSessionCredentials == null
|| System.currentTimeMillis() > expirationTime - 5 * 60 * 1000) {
synchronized (this) {
refresh();
}
}

return basicSessionCredentials;
}

@Override
public void refresh() {
// The format of filesetIdentifier is "metalake.catalog.fileset.schema"
String[] idents = filesetIdentifier.split("\\.");
String catalog = idents[1];

FilesetCatalog filesetCatalog = client.loadCatalog(catalog).asFilesetCatalog();

Fileset fileset = filesetCatalog.loadFileset(NameIdentifier.of(idents[2], idents[3]));
Credential[] credentials = fileset.supportsCredentials().getCredentials();

// Can't find any credential, use the default one.
if (credentials.length == 0) {
LOGGER.warn("No credential found for fileset: {}, try to use static AKSK", filesetIdentifier);
expirationTime = Long.MAX_VALUE;
this.basicSessionCredentials =
new BasicAWSCredentials(
configuration.get(Constants.ACCESS_KEY), configuration.get(Constants.SECRET_KEY));
return;
}

Credential credential = credentials[0];
Map<String, String> credentialMap = credential.toProperties();

String accessKeyId = credentialMap.get(GRAVITINO_S3_SESSION_ACCESS_KEY_ID);
String secretAccessKey = credentialMap.get(GRAVITINO_S3_SESSION_SECRET_ACCESS_KEY);

if (S3TokenCredential.S3_TOKEN_CREDENTIAL_TYPE.equals(
credentialMap.get(Credential.CREDENTIAL_TYPE))) {
String sessionToken = credentialMap.get(GRAVITINO_S3_TOKEN);
this.basicSessionCredentials =
new BasicSessionCredentials(accessKeyId, secretAccessKey, sessionToken);
} else {
this.basicSessionCredentials = new BasicAWSCredentials(accessKeyId, secretAccessKey);
}

this.expirationTime = credential.expireTimeInMs();
if (expirationTime <= 0) {
expirationTime = Long.MAX_VALUE;
}
}
}
2 changes: 2 additions & 0 deletions bundles/azure/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ dependencies {
implementation(project(":catalogs:hadoop-common")) {
exclude("*")
}
implementation(project(":clients:client-java-runtime", configuration = "shadow"))
implementation(project(":clients:filesystem-hadoop3-runtime", configuration = "shadow"))

implementation(libs.azure.identity)
implementation(libs.azure.storage.file.datalake)
Expand Down
Loading
Loading