Skip to content

Commit

Permalink
Merge pull request #90 from aliyun/process-object
Browse files Browse the repository at this point in the history
process object
  • Loading branch information
baiyubin2020 authored Jun 29, 2017
2 parents a71fd65 + a3dc918 commit 98d5626
Show file tree
Hide file tree
Showing 24 changed files with 437 additions and 348 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ The recommended way to use the Aliyun OSS SDK for Java in your project is to con
<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<version>2.6.1</version>
<version>2.7.0</version>
</dependency>
```

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<version>2.6.1</version>
<version>2.7.0</version>
<packaging>jar</packaging>
<name>Aliyun OSS SDK for Java</name>
<description>The Aliyun OSS SDK for Java used for accessing Aliyun Object Storage Service</description>
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/aliyun/oss/ClientConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class ClientConfiguration {
private int maxErrorRetry = DEFAULT_MAX_RETRIES;
private int connectionRequestTimeout = DEFAULT_CONNECTION_REQUEST_TIMEOUT;
private int connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
private int socketTimeout = DEFAULT_CONNECTION_TIMEOUT;
private int socketTimeout = DEFAULT_SOCKET_TIMEOUT;
private int maxConnections = DEFAULT_MAX_CONNECTIONS;
private long connectionTTL = DEFAULT_CONNECTION_TTL;
private boolean useReaper = DEFAULT_USE_REAPER;
Expand Down
32 changes: 31 additions & 1 deletion src/main/java/com/aliyun/oss/OSS.java
Original file line number Diff line number Diff line change
Expand Up @@ -595,13 +595,24 @@ public DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteObjectsReque
public boolean doesObjectExist(String bucketName, String key)
throws OSSException, ClientException;

/**
* 判断指定的{@link OSSObject}是否存在。
* @param genericRequest
* 请求参数{@link GenericRequest}实例。
* @return
* 如果存在返回True,不存在则返回False。
*/
public boolean doesObjectExist(GenericRequest genericRequest)
throws OSSException, ClientException;

/**
* 判断指定的{@link OSSObject}是否存在。
* @param headObjectRequest
* 请求参数{@link HeadObjectRequest}实例。
* @return
* 如果存在返回True,不存在则返回False。
*/
@Deprecated
public boolean doesObjectExist(HeadObjectRequest headObjectRequest)
throws OSSException, ClientException;

Expand Down Expand Up @@ -1649,7 +1660,26 @@ public OSSSymlink getSymlink(String bucketName, String symlink)
public OSSSymlink getSymlink(GenericRequest genericRequest)
throws OSSException, ClientException;

// UDF
/**
* 对指定的Object进行处理。
* <p>
* 处理结果{@link GenericResult}实例,使用完之后需要手动关闭释放请求连接,
* 请调用getResponse().getContent().close()关闭。
* </p>
* @param processObjectRequest 请求,包括指定的处理方式process。
* @return 处理结果{@link GenericResult}实例。使用完之后需要手动关闭释放请求连接。
* @throws OSSException OSS Server异常信息。
* @throws ClientException OSS Client异常信息。
*/
public GenericResult processObject(ProcessObjectRequest processObjectRequest)
throws OSSException, ClientException;

/**
* 创建UDF。
* @param createUdfRequest 请求。
* @throws OSSException OSS Server异常信息。
* @throws ClientException OSS Client异常信息。
*/
public void createUdf(CreateUdfRequest createUdfRequest)
throws OSSException, ClientException;

Expand Down
24 changes: 16 additions & 8 deletions src/main/java/com/aliyun/oss/OSSClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -604,26 +604,28 @@ public DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteObjectsReque
return objectOperation.deleteObjects(deleteObjectsRequest);
}

private void headObject(HeadObjectRequest headObjectRequest)
@Override
public boolean doesObjectExist(String bucketName, String key)
throws OSSException, ClientException {
objectOperation.headObject(headObjectRequest);
return doesObjectExist(new GenericRequest(bucketName, key));
}

@Deprecated
@Override
public boolean doesObjectExist(String bucketName, String key)
public boolean doesObjectExist(HeadObjectRequest headObjectRequest)
throws OSSException, ClientException {
return doesObjectExist(new HeadObjectRequest(bucketName, key));
return doesObjectExist(new GenericRequest(headObjectRequest.getBucketName(), headObjectRequest.getKey()));
}

@Override
public boolean doesObjectExist(HeadObjectRequest headObjectRequest)
public boolean doesObjectExist(GenericRequest genericRequest)
throws OSSException, ClientException {
try {
headObject(headObjectRequest);
getSimplifiedObjectMeta(genericRequest);
return true;
} catch (OSSException e) {
if (e.getErrorCode() == OSSErrorCode.NO_SUCH_BUCKET
|| e.getErrorCode() == OSSErrorCode.NO_SUCH_KEY) {
if (e.getErrorCode().equals(OSSErrorCode.NO_SUCH_BUCKET)
|| e.getErrorCode().equals(OSSErrorCode.NO_SUCH_KEY)) {
return false;
}
throw e;
Expand Down Expand Up @@ -1373,6 +1375,12 @@ public OSSSymlink getSymlink(GenericRequest genericRequest)
return objectOperation.getSymlink(genericRequest);
}

@Override
public GenericResult processObject(ProcessObjectRequest processObjectRequest)
throws OSSException, ClientException {
return this.objectOperation.processObject(processObjectRequest);
}

@Override
public void createUdf(CreateUdfRequest createUdfRequest)
throws OSSException, ClientException {
Expand Down
26 changes: 24 additions & 2 deletions src/main/java/com/aliyun/oss/common/parser/RequestMarshallers.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.aliyun.oss.ClientException;
import com.aliyun.oss.common.comm.io.FixedLengthInputStream;
import com.aliyun.oss.common.utils.DateUtil;
import com.aliyun.oss.internal.RequestParameters;
import com.aliyun.oss.model.AddBucketReplicationRequest.ReplicationAction;
import com.aliyun.oss.model.BucketReferer;
import com.aliyun.oss.model.CompleteMultipartUploadRequest;
Expand All @@ -46,6 +47,7 @@
import com.aliyun.oss.model.LifecycleRule.StorageTransition;
import com.aliyun.oss.model.LiveChannelTarget;
import com.aliyun.oss.model.PartETag;
import com.aliyun.oss.model.ProcessObjectRequest;
import com.aliyun.oss.model.PutBucketImageRequest;
import com.aliyun.oss.model.PutImageStyleRequest;
import com.aliyun.oss.model.ResizeUdfApplicationRequest;
Expand Down Expand Up @@ -94,7 +96,8 @@ public final class RequestMarshallers {
public static final CreateUdfApplicationRequestMarshaller createUdfApplicationRequestMarshaller = new CreateUdfApplicationRequestMarshaller();
public static final UpgradeUdfApplicationRequestMarshaller upgradeUdfApplicationRequestMarshaller = new UpgradeUdfApplicationRequestMarshaller();
public static final ResizeUdfApplicationRequestMarshaller resizeUdfApplicationRequestMarshaller = new ResizeUdfApplicationRequestMarshaller();

public static final ProcessObjectRequestMarshaller processObjectRequestMarshaller = new ProcessObjectRequestMarshaller();

public interface RequestMarshaller<R> extends Marshaller<FixedLengthInputStream, R> {

}
Expand Down Expand Up @@ -726,7 +729,6 @@ public byte[] marshall(CreateUdfApplicationRequest request) {
xmlBody.append("<InstanceNum>" + config.getInstanceNum() + "</InstanceNum>");
xmlBody.append("<Flavor>");
xmlBody.append("<InstanceType>" + config.getFlavor().getInstanceType() + "</InstanceType>");
xmlBody.append("<IoOptimized>" + config.getFlavor().getIoOptimized() + "</IoOptimized>");
xmlBody.append("</Flavor>");
xmlBody.append("</CreateUDFApplicationConfiguration>");

Expand Down Expand Up @@ -784,6 +786,26 @@ public byte[] marshall(ResizeUdfApplicationRequest request) {

}

public static final class ProcessObjectRequestMarshaller implements RequestMarshaller2<ProcessObjectRequest> {

@Override
public byte[] marshall(ProcessObjectRequest request) {
StringBuffer processBody = new StringBuffer();

processBody.append(RequestParameters.SUBRESOURCE_PROCESS);
processBody.append("=" + request.getProcess());

byte[] rawData = null;
try {
rawData = processBody.toString().getBytes(DEFAULT_CHARSET_NAME);
} catch (UnsupportedEncodingException e) {
throw new ClientException("Unsupported encoding " + e.getMessage(), e);
}
return rawData;
}

}

private static enum EscapedChar {
// "\r"
RETURN("&#x000D;"),
Expand Down
39 changes: 32 additions & 7 deletions src/main/java/com/aliyun/oss/internal/Mimetypes.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,38 @@ public void loadMimetypes(InputStream is) throws IOException {
}
}
}

public String getMimetype(String fileName) {
String mimeType = getMimetypeByExt(fileName);
if (mimeType != null) {
return mimeType;
}
return DEFAULT_MIMETYPE;
}

public String getMimetype(File file) {
return getMimetype(file.getName());
}

public String getMimetype(File file, String key) {
return getMimetype(file.getName(), key);
}

public String getMimetype(String primaryObject, String secondaryObject) {
String mimeType = getMimetypeByExt(primaryObject);
if (mimeType != null) {
return mimeType;
}

mimeType = getMimetypeByExt(secondaryObject);
if (mimeType != null) {
return mimeType;
}

return DEFAULT_MIMETYPE;
}

private String getMimetypeByExt(String fileName) {
int lastPeriodIndex = fileName.lastIndexOf(".");
if (lastPeriodIndex > 0 && lastPeriodIndex + 1 < fileName.length()) {
String ext = fileName.substring(lastPeriodIndex + 1).toLowerCase();
Expand All @@ -98,11 +128,6 @@ public String getMimetype(String fileName) {
return mimetype;
}
}
return DEFAULT_MIMETYPE;
return null;
}

public String getMimetype(File file) {
return getMimetype(file.getName());
}

}
41 changes: 37 additions & 4 deletions src/main/java/com/aliyun/oss/internal/OSSDownloadOperation.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import com.aliyun.oss.event.ProgressEventType;
import com.aliyun.oss.event.ProgressListener;
import com.aliyun.oss.event.ProgressPublisher;
import com.aliyun.oss.model.DownloadFileRequest;
import com.aliyun.oss.model.DownloadFileResult;
import com.aliyun.oss.model.GenericRequest;
Expand Down Expand Up @@ -335,14 +338,22 @@ private DownloadFileResult downloadFileWithCheckpoint(DownloadFileRequest downlo
prepare(downloadCheckPoint, downloadFileRequest);
}

// 进度条开始下载数据
ProgressListener listener = downloadFileRequest.getProgressListener();
ProgressPublisher.publishProgress(listener, ProgressEventType.TRANSFER_STARTED_EVENT);

// 并发下载分片
DownloadResult downloadResult = download(downloadCheckPoint, downloadFileRequest);
for (PartResult partResult : downloadResult.getPartResults()) {
if (partResult.isFailed()) {
ProgressPublisher.publishProgress(listener, ProgressEventType.TRANSFER_PART_FAILED_EVENT);
throw partResult.getException();
}
}

// 进度条下载数据完成
ProgressPublisher.publishProgress(listener, ProgressEventType.TRANSFER_COMPLETED_EVENT);

// 重命名临时文件
renameTo(downloadFileRequest.getTempDownloadFile(), downloadFileRequest.getDownloadFile());

Expand Down Expand Up @@ -389,10 +400,23 @@ private DownloadResult download(DownloadCheckPoint downloadCheckPoint, DownloadF
ExecutorService service = Executors.newFixedThreadPool(downloadFileRequest.getTaskNum());
ArrayList<Future<PartResult>> futures = new ArrayList<Future<PartResult>>();
List<Task> tasks = new ArrayList<Task>();

ProgressListener listener = downloadFileRequest.getProgressListener();

// 计算待下载的数据量
long contentLength = 0;
for (int i = 0; i < downloadCheckPoint.downloadParts.size(); i++) {
if (!downloadCheckPoint.downloadParts.get(i).isCompleted) {
long partSize = downloadCheckPoint.downloadParts.get(i).end - downloadCheckPoint.downloadParts.get(i).start + 1;
contentLength += partSize;
}
}
ProgressPublisher.publishResponseContentLength(listener, contentLength);
downloadFileRequest.setProgressListener(null);

// 下载数据分片
for (int i = 0; i < downloadCheckPoint.downloadParts.size(); i++) {
if (!downloadCheckPoint.downloadParts.get(i).isCompleted) {
Task task = new Task(i, "download-" + i, downloadCheckPoint, i, downloadFileRequest, objectOperation);
Task task = new Task(i, "download-" + i, downloadCheckPoint, i, downloadFileRequest, objectOperation, listener);
futures.add(service.submit(task));
tasks.add(task);
} else {
Expand All @@ -402,42 +426,48 @@ private DownloadResult download(DownloadCheckPoint downloadCheckPoint, DownloadF
}
service.shutdown();

// 等待分片下载完成
service.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

for (Future<PartResult> future : futures) {
try {
PartResult tr = future.get();
taskResults.add(tr);
} catch (ExecutionException e) {
downloadFileRequest.setProgressListener(listener);
throw e.getCause();
}
}

// 对PartResult按照分片序号排序
Collections.sort(taskResults, new Comparator<PartResult>() {
@Override
public int compare(PartResult p1, PartResult p2) {
return p1.getNumber() - p2.getNumber();
}
});

// 设置返回结果
downloadResult.setPartResults(taskResults);
if (tasks.size() > 0) {
downloadResult.setObjectMetadata(tasks.get(0).GetobjectMetadata());
}
downloadFileRequest.setProgressListener(listener);

return downloadResult;
}

static class Task implements Callable<PartResult> {

public Task(int id, String name, DownloadCheckPoint downloadCheckPoint, int partIndex,
DownloadFileRequest downloadFileRequest, OSSObjectOperation objectOperation) {
DownloadFileRequest downloadFileRequest, OSSObjectOperation objectOperation,
ProgressListener progressListener) {
this.id = id;
this.name = name;
this.downloadCheckPoint = downloadCheckPoint;
this.partIndex = partIndex;
this.downloadFileRequest = downloadFileRequest;
this.objectOperation = objectOperation;
this.progressListener = progressListener;
}

@Override
Expand Down Expand Up @@ -476,6 +506,8 @@ public PartResult call() throws Exception {
if (downloadFileRequest.isEnableCheckpoint()) {
downloadCheckPoint.dump(downloadFileRequest.getCheckpointFile());
}
ProgressPublisher.publishResponseBytesTransferred(progressListener,
(downloadPart.end - downloadPart.start + 1));
} catch (Exception e) {
tr.setFailed(true);
tr.setException(e);
Expand Down Expand Up @@ -504,6 +536,7 @@ public ObjectMetadata GetobjectMetadata () {
private DownloadFileRequest downloadFileRequest;
private OSSObjectOperation objectOperation;
private ObjectMetadata objectMetadata;
private ProgressListener progressListener;
}

private ArrayList<DownloadPart> splitFile(long objectSize, long partSize) {
Expand Down
Loading

0 comments on commit 98d5626

Please sign in to comment.