Skip to content

Commit

Permalink
[regression](plugin) Support different object storage for recycler pl…
Browse files Browse the repository at this point in the history
…ugin (apache#37353)

Former implementation only supports recycler plugin for s3, this pr
makes azure also available.
  • Loading branch information
ByteYue authored Jul 5, 2024
1 parent 0dc186e commit 663f1b0
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 47 deletions.
24 changes: 24 additions & 0 deletions regression-test/framework/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ under the License.
</plugins>
</build>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-sdk-bom</artifactId>
<version>1.2.24</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.thrift/libthrift -->
<dependency>
Expand Down Expand Up @@ -371,5 +382,18 @@ under the License.
<version>1.6.1</version>
</dependency>
<!-- flink end -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob-batch</artifactId>
<version>12.22.0</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.apache.doris.regression.suite.Suite

import com.amazonaws.auth.AWSStaticCredentialsProvider
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import com.amazonaws.services.s3.model.ListObjectsRequest
import com.amazonaws.services.s3.model.ObjectListing

import com.azure.core.http.rest.PagedIterable;
import com.azure.core.http.rest.PagedResponse;
import com.azure.core.http.rest.Response;
import com.azure.core.util.Context;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.ListBlobsOptions;
import com.azure.storage.common.StorageSharedKeyCredential;

import java.util.Iterator;

interface ListObjectsFileNames {
public boolean isEmpty(String tableName, String tableId);
public Set<String> listObjects(String userName, String userId);
};

class AwsListObjectsFileNames implements ListObjectsFileNames {
private String ak;
private String sk;
private String endpoint;
private String region;
private String prefix;
private String bucket;
private Suite suite;
private AmazonS3Client s3Client;
public AwsListObjectsFileNames(String ak, String sk, String endpoint, String region, String prefix, String bucket, Suite suite) {
this.ak = ak;
this.sk = sk;
this.endpoint = endpoint;
this.region = region;
this.prefix = prefix;
this.bucket = bucket;
this.suite = suite;
def credentials = new BasicAWSCredentials(ak, sk)
def endpointConfiguration = new EndpointConfiguration(endpoint, region)
this.s3Client = AmazonS3ClientBuilder.standard().withEndpointConfiguration(endpointConfiguration)
.withCredentials(new AWSStaticCredentialsProvider(credentials)).build()
}

public boolean isEmpty(String tableName, String tableId) {
def objectListing = s3Client.listObjects(
new ListObjectsRequest().withMaxKeys(1).withBucketName(bucket).withPrefix("${prefix}/data/${tabletId}/"))

suite.getLogger().info("tableName: ${tableName}, tabletId:${tabletId}, objectListing:${objectListing.getObjectSummaries()}".toString())
return objectListing.getObjectSummaries().isEmpty();
}

public boolean isEmpty(String userName, String userId, String fileName) {
def objectListing = s3Client.listObjects(
new ListObjectsRequest().withMaxKeys(1)
.withBucketName(bucket)
.withPrefix("${prefix}/stage/${userName}/${userId}/${fileName}"))

suite.getLogger().info("${prefix}/stage/${userName}/${userId}/${fileName}, objectListing:${objectListing.getObjectSummaries()}".toString())
return objectListing.getObjectSummaries().isEmpty();
}

public Set<String> listObjects(String userName, String userId) {
def objectListing = s3Client.listObjects(
new ListObjectsRequest()
.withBucketName(bucket)
.withPrefix("${prefix}/stage/${userName}/${userId}/"))

suite.getLogger().info("${prefix}/stage/${userName}/${userId}/, objectListing:${objectListing.getObjectSummaries()}".toString())
Set<String> fileNames = new HashSet<>()
for (def os: objectListing.getObjectSummaries()) {
def split = os.key.split("/")
if (split.length <= 0 ) {
continue
}
fileNames.add(split[split.length-1])
}
return fileNames
}
}

class AzureListObjectsFileNames implements ListObjectsFileNames {
private String ak;
private String sk;
private String endpoint;
private String region;
private String prefix;
private String bucket;
private Suite suite;
private static String URI_TEMPLATE = "https://%s.blob.core.windows.net/%s"
private BlobContainerClient containerClient;
public AzureListObjectsFileNames(String ak, String sk, String endpoint, String region, String prefix, String bucket, Suite suite) {
this.ak = ak;
this.sk = sk;
this.endpoint = endpoint;
this.region = region;
this.prefix = prefix;
this.bucket = bucket;
this.suite = suite;
String uri = String.format(URI_TEMPLATE, this.ak, this.bucket);
StorageSharedKeyCredential cred = new StorageSharedKeyCredential(this.ak, this.sk);
this.containerClient = new BlobContainerClientBuilder().credential(cred).endpoint(uri).build();
}

public boolean isEmpty(String tableName, String tableId) {
PagedIterable<BlobItem> blobs = containerClient.listBlobs(
new ListBlobsOptions()
.setPrefix("${prefix}/data/${tabletId}/")
.setMaxResultsPerPage(1), Duration.ofMinutes(1));

Iterator<BlobItem> iterator = blobs.iterator();
suite.getLogger().info("${prefix}/data/${tabletId}/, objectListing:${blobs.stream().map(BlobItem::getName).toList()}".toString())
return !iterator.hasNext();
}

public boolean isEmpty(String userName, String userId, String fileName) {
PagedIterable<BlobItem> blobs = containerClient.listBlobs(
new ListBlobsOptions()
.setPrefix("${prefix}/stage/${userName}/${userId}/${fileName}")
.setMaxResultsPerPage(1), Duration.ofMinutes(1));

Iterator<BlobItem> iterator = blobs.iterator();
suite.getLogger().info("${prefix}/stage/${userName}/${userId}/${fileName}, objectListing:${blobs.stream().map(BlobItem::getName).toList()}".toString())
return !iterator.hasNext();
}

public Set<String> listObjects(String userName, String userId) {
PagedIterable<BlobItem> blobs = containerClient.listBlobs(
new ListBlobsOptions()
.setPrefix("${prefix}/stage/${userName}/${userId}/"), Duration.ofMinutes(1));

suite.getLogger().info("${prefix}/stage/${userName}/${userId}/, objectListing:${blobs.stream().map(BlobItem::getName).toList()}".toString())
Set<String> fileNames = new HashSet<>();
for (BlobItem blobItem : blobs) {
String[] split = blobItem.getName().split("/");
if (split.length <= 0) {
continue;
}
fileNames.add(split[split.length - 1]);
}
return fileNames
}
}

class ListObjectsFileNamesUtil {
public ListObjectsFileNamesUtil() {}

public static ListObjectsFileNames getListObjectsFileNames(String provider, String ak, String sk, String endpoint, String region, String prefix, String bucket, Suite suite) {
if (provider.equalsIgnoreCase("azure")) {
return AzureListObjectsFileNames(ak, sk, endpoint, region, prefix, bucket, suite)
}
return AwsListObjectsFileNames(ak, sk, endpoint, region, prefix, bucket, suite)
}
}
63 changes: 16 additions & 47 deletions regression-test/plugins/cloud_recycler_plugin.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,7 @@
import groovy.json.JsonOutput

import org.apache.doris.regression.suite.Suite

import com.amazonaws.auth.AWSStaticCredentialsProvider
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import com.amazonaws.services.s3.model.ListObjectsRequest
import com.amazonaws.services.s3.model.ObjectListing
import org.apache.doris.regression.util.*

Suite.metaClass.triggerRecycle = { String token, String instanceId /* param */ ->
// which suite invoke current function?
Expand Down Expand Up @@ -57,7 +50,6 @@ Suite.metaClass.triggerRecycle = { String token, String instanceId /* param */ -

logger.info("Added 'triggerRecycle' function to Suite")


//cloud mode recycler plugin
Suite.metaClass.checkRecycleTable = { String token, String instanceId, String cloudUniqueId, String tableName,
Collection<String> tabletIdList /* param */ ->
Expand All @@ -76,21 +68,19 @@ Suite.metaClass.checkRecycleTable = { String token, String instanceId, String cl
String region = getObjStoreInfoApiResult.result.obj_info[0].region
String prefix = getObjStoreInfoApiResult.result.obj_info[0].prefix
String bucket = getObjStoreInfoApiResult.result.obj_info[0].bucket
suite.getLogger().info("ak:${ak}, sk:${sk}, endpoint:${endpoint}, prefix:${prefix}".toString())
String provider = getObjStoreInfoApiResult.result.obj_info[0].provider
suite.getLogger().info("ak:${ak}, sk:${sk}, endpoint:${endpoint}, prefix:${prefix}, provider:${provider}".toString())

def credentials = new BasicAWSCredentials(ak, sk)
def endpointConfiguration = new EndpointConfiguration(endpoint, region)
def s3Client = AmazonS3ClientBuilder.standard().withEndpointConfiguration(endpointConfiguration)
.withCredentials(new AWSStaticCredentialsProvider(credentials)).build()
ListObjectsFileNames client = ListObjectsFileNamesUtil.getListObjectsFileNames(provider, ak, sk, endpoint, region, prefix, bucket, suite)

assertTrue(tabletIdList.size() > 0)
for (tabletId : tabletIdList) {
suite.getLogger().info("tableName: ${tableName}, tabletId:${tabletId}");
def objectListing = s3Client.listObjects(
new ListObjectsRequest().withMaxKeys(1).withBucketName(bucket).withPrefix("${prefix}/data/${tabletId}/"))
// def objectListing = s3Client.listObjects(
// new ListObjectsRequest().withMaxKeys(1).withBucketName(bucket).withPrefix("${prefix}/data/${tabletId}/"))

suite.getLogger().info("tableName: ${tableName}, tabletId:${tabletId}, objectListing:${objectListing.getObjectSummaries()}".toString())
if (!objectListing.getObjectSummaries().isEmpty()) {
if (!client.isEmpty(tableName, tabletId)) {
return false;
}
}
Expand All @@ -116,23 +106,16 @@ Suite.metaClass.checkRecycleInternalStage = { String token, String instanceId, S
String region = getObjStoreInfoApiResult.result.obj_info[0].region
String prefix = getObjStoreInfoApiResult.result.obj_info[0].prefix
String bucket = getObjStoreInfoApiResult.result.obj_info[0].bucket
suite.getLogger().info("ak:${ak}, sk:${sk}, endpoint:${endpoint}, prefix:${prefix}".toString())
String provider = getObjStoreInfoApiResult.result.obj_info[0].provider
suite.getLogger().info("ak:${ak}, sk:${sk}, endpoint:${endpoint}, prefix:${prefix}, provider:${provider}".toString())

def credentials = new BasicAWSCredentials(ak, sk)
def endpointConfiguration = new EndpointConfiguration(endpoint, region)
def s3Client = AmazonS3ClientBuilder.standard().withEndpointConfiguration(endpointConfiguration)
.withCredentials(new AWSStaticCredentialsProvider(credentials)).build()
ListObjectsFileNames client = ListObjectsFileNamesUtil.getListObjectsFileNames(provider, ak, sk, endpoint, region, prefix, bucket, suite)

// for root and admin, userId equal userName
String userName = suite.context.config.jdbcUser;
String userId = suite.context.config.jdbcUser;
def objectListing = s3Client.listObjects(
new ListObjectsRequest().withMaxKeys(1)
.withBucketName(bucket)
.withPrefix("${prefix}/stage/${userName}/${userId}/${fileName}"))

suite.getLogger().info("${prefix}/stage/${userName}/${userId}/${fileName}, objectListing:${objectListing.getObjectSummaries()}".toString())
if (!objectListing.getObjectSummaries().isEmpty()) {
if (!client.isEmpty(userName, userId, fileName)) {
return false;
}

Expand All @@ -156,30 +139,16 @@ Suite.metaClass.checkRecycleExpiredStageObjects = { String token, String instanc
String region = getObjStoreInfoApiResult.result.obj_info[0].region
String prefix = getObjStoreInfoApiResult.result.obj_info[0].prefix
String bucket = getObjStoreInfoApiResult.result.obj_info[0].bucket
suite.getLogger().info("ak:${ak}, sk:${sk}, endpoint:${endpoint}, prefix:${prefix}".toString())
String provider = getObjStoreInfoApiResult.result.obj_info[0].provider
suite.getLogger().info("ak:${ak}, sk:${sk}, endpoint:${endpoint}, prefix:${prefix}, provider:${provider}".toString())

def credentials = new BasicAWSCredentials(ak, sk)
def endpointConfiguration = new EndpointConfiguration(endpoint, region)
def s3Client = AmazonS3ClientBuilder.standard().withEndpointConfiguration(endpointConfiguration)
.withCredentials(new AWSStaticCredentialsProvider(credentials)).build()
ListObjectsFileNames client = ListObjectsFileNamesUtil.getListObjectsFileNames(provider, ak, sk, endpoint, region, prefix, bucket, suite)

// for root and admin, userId equal userName
String userName = suite.context.config.jdbcUser;
String userId = suite.context.config.jdbcUser;
def objectListing = s3Client.listObjects(
new ListObjectsRequest()
.withBucketName(bucket)
.withPrefix("${prefix}/stage/${userName}/${userId}/"))

suite.getLogger().info("${prefix}/stage/${userName}/${userId}/, objectListing:${objectListing.getObjectSummaries()}".toString())
Set<String> fileNames = new HashSet<>()
for (def os: objectListing.getObjectSummaries()) {
def split = os.key.split("/")
if (split.length <= 0 ) {
continue
}
fileNames.add(split[split.length-1])
}

Set<String> fileNames = client.listObjects(userName, userId)
for(def f : nonExistFileNames) {
if (fileNames.contains(f)) {
return false
Expand Down

0 comments on commit 663f1b0

Please sign in to comment.